nsfwapp/backend/postwork.go
2026-03-14 14:28:33 +01:00

337 lines
7.8 KiB
Go

// backend/postwork.go
package main
import (
"context"
"strings"
"sync"
"time"
)
// Eine Nacharbeit (kann ffmpeg, ffprobe, thumbnails, rename, etc. enthalten)
type PostWorkTask struct {
Key string // z.B. Dateiname oder Job-ID, zum Deduplizieren
Run func(ctx context.Context) error
Added time.Time
}
type PostWorkQueue struct {
q chan PostWorkTask
ffmpegSem chan struct{} // "heavy" gate: maxParallelFFmpeg gleichzeitig
mu sync.Mutex
inflight map[string]struct{} // dedupe: queued ODER running
queued int // Anzahl inflight (queued + running)
// ✅ für UI: Warteschlange + Running-Keys tracken
waitingKeys []string // FIFO der Keys, die noch NICHT gestartet sind
runningKeys map[string]struct{} // Keys, die gerade wirklich laufen (Semaphor gehalten)
}
func NewPostWorkQueue(queueSize int, maxParallelFFmpeg int) *PostWorkQueue {
if queueSize <= 0 {
queueSize = 256
}
if maxParallelFFmpeg <= 0 {
maxParallelFFmpeg = 1 // Default: "nacheinander"
}
return &PostWorkQueue{
q: make(chan PostWorkTask, queueSize),
ffmpegSem: make(chan struct{}, maxParallelFFmpeg),
inflight: make(map[string]struct{}),
waitingKeys: make([]string, 0, queueSize),
runningKeys: make(map[string]struct{}),
}
}
// Enqueue dedupliziert nach Key (damit du nicht durch Events doppelt queue-st)
func (pq *PostWorkQueue) Enqueue(task PostWorkTask) bool {
if task.Key == "" || task.Run == nil {
return false
}
pq.mu.Lock()
if _, ok := pq.inflight[task.Key]; ok {
pq.mu.Unlock()
return false // schon queued oder läuft
}
pq.inflight[task.Key] = struct{}{}
pq.waitingKeys = append(pq.waitingKeys, task.Key) // ✅ queued für UI
pq.queued++
pq.mu.Unlock()
select {
case pq.q <- task:
return true
default:
// Queue voll -> inflight zurückrollen
pq.mu.Lock()
delete(pq.inflight, task.Key)
if pq.queued > 0 {
pq.queued--
}
pq.removeWaitingKeyLocked(task.Key)
pq.mu.Unlock()
return false
}
}
func (pq *PostWorkQueue) removeWaitingKeyLocked(key string) {
for i, k := range pq.waitingKeys {
if k == key {
pq.waitingKeys = append(pq.waitingKeys[:i], pq.waitingKeys[i+1:]...)
return
}
}
}
func (pq *PostWorkQueue) RemoveQueued(key string) bool {
if strings.TrimSpace(key) == "" {
return false
}
pq.mu.Lock()
defer pq.mu.Unlock()
// running darf hier NICHT entfernt werden
if _, running := pq.runningKeys[key]; running {
return false
}
found := false
for i, k := range pq.waitingKeys {
if k == key {
pq.waitingKeys = append(pq.waitingKeys[:i], pq.waitingKeys[i+1:]...)
found = true
break
}
}
if !found {
return false
}
delete(pq.inflight, key)
if pq.queued > 0 {
pq.queued--
}
// Das Task-Element bleibt evtl. noch im Channel liegen.
// Beim Worker-Start muss es dann erkannt und übersprungen werden.
return true
}
func (pq *PostWorkQueue) workerLoop(id int) {
for task := range pq.q {
pq.mu.Lock()
_, stillInflight := pq.inflight[task.Key]
_, alreadyRunning := pq.runningKeys[task.Key]
waitingFound := false
for _, k := range pq.waitingKeys {
if k == task.Key {
waitingFound = true
break
}
}
pq.mu.Unlock()
// queued Job wurde zwischenzeitlich entfernt -> Task aus dem Channel verwerfen
if !stillInflight || (!waitingFound && !alreadyRunning) {
continue
}
// Task startet jetzt wirklich → waiting -> running
pq.mu.Lock()
pq.removeWaitingKeyLocked(task.Key)
pq.runningKeys[task.Key] = struct{}{}
pq.mu.Unlock()
func() {
defer func() {
// Panic-Schutz: Worker darf nicht sterben
if r := recover(); r != nil {
// optional: hier loggen
_ = r
}
// States immer konsistent aufräumen (auch wenn er nie sauber run() beendet)
pq.mu.Lock()
pq.removeWaitingKeyLocked(task.Key) // falls er noch als waiting drin hing
delete(pq.runningKeys, task.Key)
delete(pq.inflight, task.Key)
if pq.queued > 0 {
pq.queued--
}
pq.mu.Unlock()
}()
// 3) Optional: Task timeout (gegen hängende ffmpeg)
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Minute)
defer cancel()
// 4) Run
if task.Run != nil {
_ = task.Run(ctx) // optional: loggen
}
}()
}
}
func (pq *PostWorkQueue) StartWorkers(n int) {
if n <= 0 {
n = 1
}
for i := 0; i < n; i++ {
go pq.workerLoop(i + 1)
}
}
// Stats bleibt kompatibel zu deinem bisherigen Code
func (pq *PostWorkQueue) Stats() (queued int, inflight int, maxParallel int) {
pq.mu.Lock()
defer pq.mu.Unlock()
return pq.queued, len(pq.inflight), cap(pq.ffmpegSem)
}
// Optional für UI/Debug (praktisch für "Warte… Position X/Y")
type PostWorkKeyStatus struct {
State string `json:"state"` // "queued" | "running" | "missing"
Position int `json:"position"` // 1..n (nur queued), 0 sonst
Waiting int `json:"waiting"` // Anzahl wartend
Running int `json:"running"` // Anzahl running
MaxParallel int `json:"maxParallel"` // cap(ffmpegSem)
}
func (pq *PostWorkQueue) StatusForKey(key string) PostWorkKeyStatus {
pq.mu.Lock()
defer pq.mu.Unlock()
waiting := len(pq.waitingKeys)
running := len(pq.runningKeys)
maxPar := cap(pq.ffmpegSem)
if _, ok := pq.runningKeys[key]; ok {
return PostWorkKeyStatus{
State: "running",
Position: 0,
Waiting: waiting,
Running: running,
MaxParallel: maxPar,
}
}
for i, k := range pq.waitingKeys {
if k == key {
return PostWorkKeyStatus{
State: "queued",
Position: i + 1, // 1-basiert
Waiting: waiting,
Running: running,
MaxParallel: maxPar,
}
}
}
// Key ist weder queued noch running (wahrscheinlich schon fertig oder nie queued)
return PostWorkKeyStatus{
State: "missing",
Position: 0,
Waiting: waiting,
Running: running,
MaxParallel: maxPar,
}
}
// global (oder in deinem app struct halten)
var postWorkQ = NewPostWorkQueue(512, 6) // maxParallelFFmpeg = 6
// --- Status Refresher (ehemals postwork_refresh.go) ---
func startPostWorkStatusRefresher() {
t := time.NewTicker(1 * time.Second)
go func() {
defer t.Stop()
for range t.C {
changedJobs := make([]*RecordJob, 0, 16)
jobsMu.Lock()
for _, job := range jobs {
key := strings.TrimSpace(job.PostWorkKey)
if key == "" {
continue
}
st := postWorkQ.StatusForKey(key)
changed := false
// PostWork-Daten aktualisieren
if job.PostWork == nil ||
job.PostWork.State != st.State ||
job.PostWork.Position != st.Position ||
job.PostWork.Waiting != st.Waiting ||
job.PostWork.Running != st.Running ||
job.PostWork.MaxParallel != st.MaxParallel {
tmp := st
job.PostWork = &tmp
changed = true
}
// Status / Phase für UI vereinheitlichen
switch st.State {
case "queued":
if job.Status != JobPostwork {
job.Status = JobPostwork
changed = true
}
phaseLower := strings.TrimSpace(strings.ToLower(job.Phase))
if phaseLower == "" || phaseLower == "recording" {
job.Phase = "postwork"
changed = true
}
if job.Progress < 0 || job.Progress > 100 {
job.Progress = 0
changed = true
}
case "running":
if job.Status != JobPostwork {
job.Status = JobPostwork
changed = true
}
phaseLower := strings.TrimSpace(strings.ToLower(job.Phase))
// Konkrete Unterphasen NICHT überschreiben
switch phaseLower {
case "probe", "remuxing", "moving", "assets":
// ok, so lassen
default:
if phaseLower == "" || phaseLower == "recording" || phaseLower == "postwork" {
if phaseLower != "postwork" {
job.Phase = "postwork"
changed = true
}
}
}
}
if changed {
changedJobs = append(changedJobs, job)
}
}
jobsMu.Unlock()
for _, job := range changedJobs {
publishJobUpsert(job)
}
}
}()
}