240 lines
5.6 KiB
Go
240 lines
5.6 KiB
Go
// backend/postwork.go
|
|
package main
|
|
|
|
import (
|
|
"context"
|
|
"reflect"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// Eine Nacharbeit (kann ffmpeg, ffprobe, thumbnails, rename, etc. enthalten)
|
|
type PostWorkTask struct {
|
|
Key string // z.B. Dateiname oder Job-ID, zum Deduplizieren
|
|
Run func(ctx context.Context) error
|
|
Added time.Time
|
|
}
|
|
|
|
type PostWorkQueue struct {
|
|
q chan PostWorkTask
|
|
ffmpegSem chan struct{} // "heavy" gate: maxParallelFFmpeg gleichzeitig
|
|
|
|
mu sync.Mutex
|
|
inflight map[string]struct{} // dedupe: queued ODER running
|
|
|
|
queued int // Anzahl inflight (queued + running)
|
|
|
|
// ✅ für UI: Warteschlange + Running-Keys tracken
|
|
waitingKeys []string // FIFO der Keys, die noch NICHT gestartet sind
|
|
runningKeys map[string]struct{} // Keys, die gerade wirklich laufen (Semaphor gehalten)
|
|
}
|
|
|
|
func NewPostWorkQueue(queueSize int, maxParallelFFmpeg int) *PostWorkQueue {
|
|
if queueSize <= 0 {
|
|
queueSize = 256
|
|
}
|
|
if maxParallelFFmpeg <= 0 {
|
|
maxParallelFFmpeg = 1 // Default: "nacheinander"
|
|
}
|
|
|
|
return &PostWorkQueue{
|
|
q: make(chan PostWorkTask, queueSize),
|
|
ffmpegSem: make(chan struct{}, maxParallelFFmpeg),
|
|
|
|
inflight: make(map[string]struct{}),
|
|
|
|
waitingKeys: make([]string, 0, queueSize),
|
|
runningKeys: make(map[string]struct{}),
|
|
}
|
|
}
|
|
|
|
// Enqueue dedupliziert nach Key (damit du nicht durch Events doppelt queue-st)
|
|
func (pq *PostWorkQueue) Enqueue(task PostWorkTask) bool {
|
|
if task.Key == "" || task.Run == nil {
|
|
return false
|
|
}
|
|
|
|
pq.mu.Lock()
|
|
if _, ok := pq.inflight[task.Key]; ok {
|
|
pq.mu.Unlock()
|
|
return false // schon queued oder läuft
|
|
}
|
|
pq.inflight[task.Key] = struct{}{}
|
|
pq.waitingKeys = append(pq.waitingKeys, task.Key) // ✅ queued für UI
|
|
pq.queued++
|
|
pq.mu.Unlock()
|
|
|
|
select {
|
|
case pq.q <- task:
|
|
return true
|
|
default:
|
|
// Queue voll -> inflight zurückrollen
|
|
pq.mu.Lock()
|
|
delete(pq.inflight, task.Key)
|
|
if pq.queued > 0 {
|
|
pq.queued--
|
|
}
|
|
pq.removeWaitingKeyLocked(task.Key)
|
|
pq.mu.Unlock()
|
|
return false
|
|
}
|
|
}
|
|
|
|
func (pq *PostWorkQueue) removeWaitingKeyLocked(key string) {
|
|
for i, k := range pq.waitingKeys {
|
|
if k == key {
|
|
pq.waitingKeys = append(pq.waitingKeys[:i], pq.waitingKeys[i+1:]...)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (pq *PostWorkQueue) workerLoop(id int) {
|
|
for task := range pq.q {
|
|
// 1) Heavy-Gate: erst wenn ein Slot frei ist, gilt der Task als "running"
|
|
pq.ffmpegSem <- struct{}{} // kann blocken
|
|
|
|
// 2) Ab hier startet er wirklich → waiting -> running
|
|
pq.mu.Lock()
|
|
pq.removeWaitingKeyLocked(task.Key)
|
|
pq.runningKeys[task.Key] = struct{}{}
|
|
pq.mu.Unlock()
|
|
|
|
func() {
|
|
defer func() {
|
|
// Panic-Schutz: Worker darf nicht sterben
|
|
if r := recover(); r != nil {
|
|
// optional: hier loggen
|
|
_ = r
|
|
}
|
|
|
|
// States immer konsistent aufräumen (auch wenn er nie sauber run() beendet)
|
|
pq.mu.Lock()
|
|
pq.removeWaitingKeyLocked(task.Key) // falls er noch als waiting drin hing
|
|
delete(pq.runningKeys, task.Key)
|
|
delete(pq.inflight, task.Key)
|
|
if pq.queued > 0 {
|
|
pq.queued--
|
|
}
|
|
pq.mu.Unlock()
|
|
|
|
// Slot freigeben
|
|
<-pq.ffmpegSem
|
|
}()
|
|
|
|
// 3) Optional: Task timeout (gegen hängende ffmpeg)
|
|
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Minute)
|
|
defer cancel()
|
|
|
|
// 4) Run
|
|
if task.Run != nil {
|
|
_ = task.Run(ctx) // optional: loggen
|
|
}
|
|
}()
|
|
}
|
|
}
|
|
|
|
func (pq *PostWorkQueue) StartWorkers(n int) {
|
|
if n <= 0 {
|
|
n = 1
|
|
}
|
|
for i := 0; i < n; i++ {
|
|
go pq.workerLoop(i + 1)
|
|
}
|
|
}
|
|
|
|
// Stats bleibt kompatibel zu deinem bisherigen Code
|
|
func (pq *PostWorkQueue) Stats() (queued int, inflight int, maxParallel int) {
|
|
pq.mu.Lock()
|
|
defer pq.mu.Unlock()
|
|
return pq.queued, len(pq.inflight), cap(pq.ffmpegSem)
|
|
}
|
|
|
|
// Optional für UI/Debug (praktisch für "Warte… Position X/Y")
|
|
type PostWorkKeyStatus struct {
|
|
State string `json:"state"` // "queued" | "running" | "missing"
|
|
Position int `json:"position"` // 1..n (nur queued), 0 sonst
|
|
Waiting int `json:"waiting"` // Anzahl wartend
|
|
Running int `json:"running"` // Anzahl running
|
|
MaxParallel int `json:"maxParallel"` // cap(ffmpegSem)
|
|
}
|
|
|
|
func (pq *PostWorkQueue) StatusForKey(key string) PostWorkKeyStatus {
|
|
pq.mu.Lock()
|
|
defer pq.mu.Unlock()
|
|
|
|
waiting := len(pq.waitingKeys)
|
|
running := len(pq.runningKeys)
|
|
maxPar := cap(pq.ffmpegSem)
|
|
|
|
if _, ok := pq.runningKeys[key]; ok {
|
|
return PostWorkKeyStatus{
|
|
State: "running",
|
|
Position: 0,
|
|
Waiting: waiting,
|
|
Running: running,
|
|
MaxParallel: maxPar,
|
|
}
|
|
}
|
|
|
|
for i, k := range pq.waitingKeys {
|
|
if k == key {
|
|
return PostWorkKeyStatus{
|
|
State: "queued",
|
|
Position: i + 1, // 1-basiert
|
|
Waiting: waiting,
|
|
Running: running,
|
|
MaxParallel: maxPar,
|
|
}
|
|
}
|
|
}
|
|
|
|
// Key ist weder queued noch running (wahrscheinlich schon fertig oder nie queued)
|
|
return PostWorkKeyStatus{
|
|
State: "missing",
|
|
Position: 0,
|
|
Waiting: waiting,
|
|
Running: running,
|
|
MaxParallel: maxPar,
|
|
}
|
|
}
|
|
|
|
// global (oder in deinem app struct halten)
|
|
var postWorkQ = NewPostWorkQueue(512, 4) // maxParallelFFmpeg = 4
|
|
|
|
// --- Status Refresher (ehemals postwork_refresh.go) ---
|
|
|
|
func startPostWorkStatusRefresher() {
|
|
t := time.NewTicker(1 * time.Second)
|
|
go func() {
|
|
defer t.Stop()
|
|
|
|
for range t.C {
|
|
changed := false
|
|
|
|
jobsMu.Lock()
|
|
for _, job := range jobs {
|
|
key := strings.TrimSpace(job.PostWorkKey)
|
|
if key == "" {
|
|
continue
|
|
}
|
|
|
|
st := postWorkQ.StatusForKey(key)
|
|
|
|
// ✅ Kein Typname nötig: job.PostWork ist *<StatusType>, st ist <StatusType>
|
|
if job.PostWork == nil || !reflect.DeepEqual(*job.PostWork, st) {
|
|
tmp := st
|
|
job.PostWork = &tmp
|
|
changed = true
|
|
}
|
|
}
|
|
jobsMu.Unlock()
|
|
|
|
if changed {
|
|
notifyJobsChanged()
|
|
}
|
|
}
|
|
}()
|
|
}
|