// backend/postwork_queue.go package main import ( "context" "sync" "time" ) // Eine Nacharbeit (kann ffmpeg, ffprobe, thumbnails, rename, etc. enthalten) type PostWorkTask struct { Key string // z.B. Dateiname oder Job-ID, zum Deduplizieren Run func(ctx context.Context) error Added time.Time } type PostWorkQueue struct { q chan PostWorkTask ffmpegSem chan struct{} // "heavy" gate: maxParallelFFmpeg gleichzeitig mu sync.Mutex inflight map[string]struct{} // dedupe: queued ODER running queued int // Anzahl inflight (queued + running) // ✅ für UI: Warteschlange + Running-Keys tracken waitingKeys []string // FIFO der Keys, die noch NICHT gestartet sind runningKeys map[string]struct{} // Keys, die gerade wirklich laufen (Semaphor gehalten) } func NewPostWorkQueue(queueSize int, maxParallelFFmpeg int) *PostWorkQueue { if queueSize <= 0 { queueSize = 256 } if maxParallelFFmpeg <= 0 { maxParallelFFmpeg = 1 // Default: "nacheinander" } return &PostWorkQueue{ q: make(chan PostWorkTask, queueSize), ffmpegSem: make(chan struct{}, maxParallelFFmpeg), inflight: make(map[string]struct{}), waitingKeys: make([]string, 0, queueSize), runningKeys: make(map[string]struct{}), } } // Enqueue dedupliziert nach Key (damit du nicht durch Events doppelt queue-st) func (pq *PostWorkQueue) Enqueue(task PostWorkTask) bool { if task.Key == "" || task.Run == nil { return false } pq.mu.Lock() if _, ok := pq.inflight[task.Key]; ok { pq.mu.Unlock() return false // schon queued oder läuft } pq.inflight[task.Key] = struct{}{} pq.waitingKeys = append(pq.waitingKeys, task.Key) // ✅ queued für UI pq.queued++ pq.mu.Unlock() select { case pq.q <- task: return true default: // Queue voll -> inflight zurückrollen pq.mu.Lock() delete(pq.inflight, task.Key) if pq.queued > 0 { pq.queued-- } pq.removeWaitingKeyLocked(task.Key) pq.mu.Unlock() return false } } func (pq *PostWorkQueue) removeWaitingKeyLocked(key string) { for i, k := range pq.waitingKeys { if k == key { pq.waitingKeys = append(pq.waitingKeys[:i], pq.waitingKeys[i+1:]...) return } } } func (pq *PostWorkQueue) workerLoop(id int) { for task := range pq.q { // 1) Heavy-Gate: erst wenn ein Slot frei ist, gilt der Task als "running" pq.ffmpegSem <- struct{}{} // kann blocken // 2) Ab hier startet er wirklich → waiting -> running pq.mu.Lock() pq.removeWaitingKeyLocked(task.Key) pq.runningKeys[task.Key] = struct{}{} pq.mu.Unlock() func() { defer func() { // Panic-Schutz: Worker darf nicht sterben if r := recover(); r != nil { // optional: hier loggen _ = r } // States immer konsistent aufräumen (auch wenn er nie sauber run() beendet) pq.mu.Lock() pq.removeWaitingKeyLocked(task.Key) // falls er noch als waiting drin hing delete(pq.runningKeys, task.Key) delete(pq.inflight, task.Key) if pq.queued > 0 { pq.queued-- } pq.mu.Unlock() // Slot freigeben <-pq.ffmpegSem }() // 3) Optional: Task timeout (gegen hängende ffmpeg) ctx, cancel := context.WithTimeout(context.Background(), 60*time.Minute) defer cancel() // 4) Run if task.Run != nil { _ = task.Run(ctx) // optional: loggen } }() } } func (pq *PostWorkQueue) StartWorkers(n int) { if n <= 0 { n = 1 } for i := 0; i < n; i++ { go pq.workerLoop(i + 1) } } // Stats bleibt kompatibel zu deinem bisherigen Code func (pq *PostWorkQueue) Stats() (queued int, inflight int, maxParallel int) { pq.mu.Lock() defer pq.mu.Unlock() return pq.queued, len(pq.inflight), cap(pq.ffmpegSem) } // Optional für UI/Debug (praktisch für "Warte… Position X/Y") type PostWorkKeyStatus struct { State string `json:"state"` // "queued" | "running" | "missing" Position int `json:"position"` // 1..n (nur queued), 0 sonst Waiting int `json:"waiting"` // Anzahl wartend Running int `json:"running"` // Anzahl running MaxParallel int `json:"maxParallel"` // cap(ffmpegSem) } func (pq *PostWorkQueue) StatusForKey(key string) PostWorkKeyStatus { pq.mu.Lock() defer pq.mu.Unlock() waiting := len(pq.waitingKeys) running := len(pq.runningKeys) maxPar := cap(pq.ffmpegSem) if _, ok := pq.runningKeys[key]; ok { return PostWorkKeyStatus{ State: "running", Position: 0, Waiting: waiting, Running: running, MaxParallel: maxPar, } } for i, k := range pq.waitingKeys { if k == key { return PostWorkKeyStatus{ State: "queued", Position: i + 1, // 1-basiert Waiting: waiting, Running: running, MaxParallel: maxPar, } } } // Key ist weder queued noch running (wahrscheinlich schon fertig oder nie queued) return PostWorkKeyStatus{ State: "missing", Position: 0, Waiting: waiting, Running: running, MaxParallel: maxPar, } } // global (oder in deinem app struct halten) var postWorkQ = NewPostWorkQueue(512, 4) // maxParallelFFmpeg = 2