nsfwapp/backend/postwork_queue.go
2026-02-09 12:29:19 +01:00

203 lines
5.0 KiB
Go

// backend/postwork_queue.go
package main
import (
"context"
"sync"
"time"
)
// Eine Nacharbeit (kann ffmpeg, ffprobe, thumbnails, rename, etc. enthalten)
type PostWorkTask struct {
Key string // z.B. Dateiname oder Job-ID, zum Deduplizieren
Run func(ctx context.Context) error
Added time.Time
}
type PostWorkQueue struct {
q chan PostWorkTask
ffmpegSem chan struct{} // "heavy" gate: maxParallelFFmpeg gleichzeitig
mu sync.Mutex
inflight map[string]struct{} // dedupe: queued ODER running
queued int // Anzahl inflight (queued + running)
// ✅ für UI: Warteschlange + Running-Keys tracken
waitingKeys []string // FIFO der Keys, die noch NICHT gestartet sind
runningKeys map[string]struct{} // Keys, die gerade wirklich laufen (Semaphor gehalten)
}
func NewPostWorkQueue(queueSize int, maxParallelFFmpeg int) *PostWorkQueue {
if queueSize <= 0 {
queueSize = 256
}
if maxParallelFFmpeg <= 0 {
maxParallelFFmpeg = 1 // Default: "nacheinander"
}
return &PostWorkQueue{
q: make(chan PostWorkTask, queueSize),
ffmpegSem: make(chan struct{}, maxParallelFFmpeg),
inflight: make(map[string]struct{}),
waitingKeys: make([]string, 0, queueSize),
runningKeys: make(map[string]struct{}),
}
}
// Enqueue dedupliziert nach Key (damit du nicht durch Events doppelt queue-st)
func (pq *PostWorkQueue) Enqueue(task PostWorkTask) bool {
if task.Key == "" || task.Run == nil {
return false
}
pq.mu.Lock()
if _, ok := pq.inflight[task.Key]; ok {
pq.mu.Unlock()
return false // schon queued oder läuft
}
pq.inflight[task.Key] = struct{}{}
pq.waitingKeys = append(pq.waitingKeys, task.Key) // ✅ queued für UI
pq.queued++
pq.mu.Unlock()
select {
case pq.q <- task:
return true
default:
// Queue voll -> inflight zurückrollen
pq.mu.Lock()
delete(pq.inflight, task.Key)
if pq.queued > 0 {
pq.queued--
}
pq.removeWaitingKeyLocked(task.Key)
pq.mu.Unlock()
return false
}
}
func (pq *PostWorkQueue) removeWaitingKeyLocked(key string) {
for i, k := range pq.waitingKeys {
if k == key {
pq.waitingKeys = append(pq.waitingKeys[:i], pq.waitingKeys[i+1:]...)
return
}
}
}
func (pq *PostWorkQueue) workerLoop(id int) {
for task := range pq.q {
// 1) Heavy-Gate: erst wenn ein Slot frei ist, gilt der Task als "running"
pq.ffmpegSem <- struct{}{} // kann blocken
// 2) Ab hier startet er wirklich → waiting -> running
pq.mu.Lock()
pq.removeWaitingKeyLocked(task.Key)
pq.runningKeys[task.Key] = struct{}{}
pq.mu.Unlock()
func() {
defer func() {
// Panic-Schutz: Worker darf nicht sterben
if r := recover(); r != nil {
// optional: hier loggen
_ = r
}
// States immer konsistent aufräumen (auch wenn er nie sauber run() beendet)
pq.mu.Lock()
pq.removeWaitingKeyLocked(task.Key) // falls er noch als waiting drin hing
delete(pq.runningKeys, task.Key)
delete(pq.inflight, task.Key)
if pq.queued > 0 {
pq.queued--
}
pq.mu.Unlock()
// Slot freigeben
<-pq.ffmpegSem
}()
// 3) Optional: Task timeout (gegen hängende ffmpeg)
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Minute)
defer cancel()
// 4) Run
if task.Run != nil {
_ = task.Run(ctx) // optional: loggen
}
}()
}
}
func (pq *PostWorkQueue) StartWorkers(n int) {
if n <= 0 {
n = 1
}
for i := 0; i < n; i++ {
go pq.workerLoop(i + 1)
}
}
// Stats bleibt kompatibel zu deinem bisherigen Code
func (pq *PostWorkQueue) Stats() (queued int, inflight int, maxParallel int) {
pq.mu.Lock()
defer pq.mu.Unlock()
return pq.queued, len(pq.inflight), cap(pq.ffmpegSem)
}
// Optional für UI/Debug (praktisch für "Warte… Position X/Y")
type PostWorkKeyStatus struct {
State string `json:"state"` // "queued" | "running" | "missing"
Position int `json:"position"` // 1..n (nur queued), 0 sonst
Waiting int `json:"waiting"` // Anzahl wartend
Running int `json:"running"` // Anzahl running
MaxParallel int `json:"maxParallel"` // cap(ffmpegSem)
}
func (pq *PostWorkQueue) StatusForKey(key string) PostWorkKeyStatus {
pq.mu.Lock()
defer pq.mu.Unlock()
waiting := len(pq.waitingKeys)
running := len(pq.runningKeys)
maxPar := cap(pq.ffmpegSem)
if _, ok := pq.runningKeys[key]; ok {
return PostWorkKeyStatus{
State: "running",
Position: 0,
Waiting: waiting,
Running: running,
MaxParallel: maxPar,
}
}
for i, k := range pq.waitingKeys {
if k == key {
return PostWorkKeyStatus{
State: "queued",
Position: i + 1, // 1-basiert
Waiting: waiting,
Running: running,
MaxParallel: maxPar,
}
}
}
// Key ist weder queued noch running (wahrscheinlich schon fertig oder nie queued)
return PostWorkKeyStatus{
State: "missing",
Position: 0,
Waiting: waiting,
Running: running,
MaxParallel: maxPar,
}
}
// global (oder in deinem app struct halten)
var postWorkQ = NewPostWorkQueue(512, 4) // maxParallelFFmpeg = 2