// backend/postwork.go package main import ( "context" "strings" "sync" "time" ) // Eine Nacharbeit (kann ffmpeg, ffprobe, thumbnails, rename, etc. enthalten) type PostWorkTask struct { Key string // z.B. Dateiname oder Job-ID, zum Deduplizieren Run func(ctx context.Context) error Added time.Time } type PostWorkQueue struct { q chan PostWorkTask ffmpegSem chan struct{} // "heavy" gate: maxParallelFFmpeg gleichzeitig mu sync.Mutex inflight map[string]struct{} // dedupe: queued ODER running queued int // Anzahl inflight (queued + running) // ✅ für UI: Warteschlange + Running-Keys tracken waitingKeys []string // FIFO der Keys, die noch NICHT gestartet sind runningKeys map[string]struct{} // Keys, die gerade wirklich laufen (Semaphor gehalten) } func NewPostWorkQueue(queueSize int, maxParallelFFmpeg int) *PostWorkQueue { if queueSize <= 0 { queueSize = 256 } if maxParallelFFmpeg <= 0 { maxParallelFFmpeg = 1 // Default: "nacheinander" } return &PostWorkQueue{ q: make(chan PostWorkTask, queueSize), ffmpegSem: make(chan struct{}, maxParallelFFmpeg), inflight: make(map[string]struct{}), waitingKeys: make([]string, 0, queueSize), runningKeys: make(map[string]struct{}), } } // Enqueue dedupliziert nach Key (damit du nicht durch Events doppelt queue-st) func (pq *PostWorkQueue) Enqueue(task PostWorkTask) bool { if task.Key == "" || task.Run == nil { return false } pq.mu.Lock() if _, ok := pq.inflight[task.Key]; ok { pq.mu.Unlock() return false // schon queued oder läuft } pq.inflight[task.Key] = struct{}{} pq.waitingKeys = append(pq.waitingKeys, task.Key) // ✅ queued für UI pq.queued++ pq.mu.Unlock() select { case pq.q <- task: return true default: // Queue voll -> inflight zurückrollen pq.mu.Lock() delete(pq.inflight, task.Key) if pq.queued > 0 { pq.queued-- } pq.removeWaitingKeyLocked(task.Key) pq.mu.Unlock() return false } } func (pq *PostWorkQueue) removeWaitingKeyLocked(key string) { for i, k := range pq.waitingKeys { if k == key { pq.waitingKeys = append(pq.waitingKeys[:i], pq.waitingKeys[i+1:]...) return } } } func (pq *PostWorkQueue) RemoveQueued(key string) bool { if strings.TrimSpace(key) == "" { return false } pq.mu.Lock() defer pq.mu.Unlock() // running darf hier NICHT entfernt werden if _, running := pq.runningKeys[key]; running { return false } found := false for i, k := range pq.waitingKeys { if k == key { pq.waitingKeys = append(pq.waitingKeys[:i], pq.waitingKeys[i+1:]...) found = true break } } if !found { return false } delete(pq.inflight, key) if pq.queued > 0 { pq.queued-- } // Das Task-Element bleibt evtl. noch im Channel liegen. // Beim Worker-Start muss es dann erkannt und übersprungen werden. return true } func (pq *PostWorkQueue) workerLoop(id int) { for task := range pq.q { pq.mu.Lock() _, stillInflight := pq.inflight[task.Key] _, alreadyRunning := pq.runningKeys[task.Key] waitingFound := false for _, k := range pq.waitingKeys { if k == task.Key { waitingFound = true break } } pq.mu.Unlock() // queued Job wurde zwischenzeitlich entfernt -> Task aus dem Channel verwerfen if !stillInflight || (!waitingFound && !alreadyRunning) { continue } // Task startet jetzt wirklich → waiting -> running pq.mu.Lock() pq.removeWaitingKeyLocked(task.Key) pq.runningKeys[task.Key] = struct{}{} pq.mu.Unlock() func() { defer func() { // Panic-Schutz: Worker darf nicht sterben if r := recover(); r != nil { // optional: hier loggen _ = r } // States immer konsistent aufräumen (auch wenn er nie sauber run() beendet) pq.mu.Lock() pq.removeWaitingKeyLocked(task.Key) // falls er noch als waiting drin hing delete(pq.runningKeys, task.Key) delete(pq.inflight, task.Key) if pq.queued > 0 { pq.queued-- } pq.mu.Unlock() }() // 3) Optional: Task timeout (gegen hängende ffmpeg) ctx, cancel := context.WithTimeout(context.Background(), 60*time.Minute) defer cancel() // 4) Run if task.Run != nil { _ = task.Run(ctx) // optional: loggen } }() } } func (pq *PostWorkQueue) StartWorkers(n int) { if n <= 0 { n = 1 } for i := 0; i < n; i++ { go pq.workerLoop(i + 1) } } // Stats bleibt kompatibel zu deinem bisherigen Code func (pq *PostWorkQueue) Stats() (queued int, inflight int, maxParallel int) { pq.mu.Lock() defer pq.mu.Unlock() return pq.queued, len(pq.inflight), cap(pq.ffmpegSem) } // Optional für UI/Debug (praktisch für "Warte… Position X/Y") type PostWorkKeyStatus struct { State string `json:"state"` // "queued" | "running" | "missing" Position int `json:"position"` // 1..n (nur queued), 0 sonst Waiting int `json:"waiting"` // Anzahl wartend Running int `json:"running"` // Anzahl running MaxParallel int `json:"maxParallel"` // cap(ffmpegSem) } func (pq *PostWorkQueue) StatusForKey(key string) PostWorkKeyStatus { pq.mu.Lock() defer pq.mu.Unlock() waiting := len(pq.waitingKeys) running := len(pq.runningKeys) maxPar := cap(pq.ffmpegSem) if _, ok := pq.runningKeys[key]; ok { return PostWorkKeyStatus{ State: "running", Position: 0, Waiting: waiting, Running: running, MaxParallel: maxPar, } } for i, k := range pq.waitingKeys { if k == key { return PostWorkKeyStatus{ State: "queued", Position: i + 1, // 1-basiert Waiting: waiting, Running: running, MaxParallel: maxPar, } } } // Key ist weder queued noch running (wahrscheinlich schon fertig oder nie queued) return PostWorkKeyStatus{ State: "missing", Position: 0, Waiting: waiting, Running: running, MaxParallel: maxPar, } } // global (oder in deinem app struct halten) var postWorkQ = NewPostWorkQueue(512, 6) // maxParallelFFmpeg = 6 // --- Status Refresher (ehemals postwork_refresh.go) --- func startPostWorkStatusRefresher() { t := time.NewTicker(1 * time.Second) go func() { defer t.Stop() for range t.C { changedJobs := make([]*RecordJob, 0, 16) jobsMu.Lock() for _, job := range jobs { key := strings.TrimSpace(job.PostWorkKey) if key == "" { continue } st := postWorkQ.StatusForKey(key) changed := false // PostWork-Daten aktualisieren if job.PostWork == nil || job.PostWork.State != st.State || job.PostWork.Position != st.Position || job.PostWork.Waiting != st.Waiting || job.PostWork.Running != st.Running || job.PostWork.MaxParallel != st.MaxParallel { tmp := st job.PostWork = &tmp changed = true } // Status / Phase für UI vereinheitlichen switch st.State { case "queued": if job.Status != JobPostwork { job.Status = JobPostwork changed = true } phaseLower := strings.TrimSpace(strings.ToLower(job.Phase)) if phaseLower == "" || phaseLower == "recording" { job.Phase = "postwork" changed = true } if job.Progress < 0 || job.Progress > 100 { job.Progress = 0 changed = true } case "running": if job.Status != JobPostwork { job.Status = JobPostwork changed = true } phaseLower := strings.TrimSpace(strings.ToLower(job.Phase)) // Konkrete Unterphasen NICHT überschreiben switch phaseLower { case "probe", "remuxing", "moving", "assets": // ok, so lassen default: if phaseLower == "" || phaseLower == "recording" || phaseLower == "postwork" { if phaseLower != "postwork" { job.Phase = "postwork" changed = true } } } } if changed { changedJobs = append(changedJobs, job) } } jobsMu.Unlock() for _, job := range changedJobs { publishJobUpsert(job) } } }() }