331 lines
7.5 KiB
Go
331 lines
7.5 KiB
Go
// backend\tasks_assets.go
|
|
package main
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"net/http"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// ---------------------------
|
|
// Tasks: Missing Assets erzeugen
|
|
// ---------------------------
|
|
|
|
type AssetsTaskState struct {
|
|
Running bool `json:"running"`
|
|
Total int `json:"total"`
|
|
Done int `json:"done"`
|
|
GeneratedThumbs int `json:"generatedThumbs"`
|
|
GeneratedPreviews int `json:"generatedPreviews"`
|
|
Skipped int `json:"skipped"`
|
|
StartedAt time.Time `json:"startedAt"`
|
|
FinishedAt *time.Time `json:"finishedAt,omitempty"`
|
|
Error string `json:"error,omitempty"`
|
|
CurrentFile string `json:"currentFile,omitempty"`
|
|
}
|
|
|
|
var assetsTaskMu sync.Mutex
|
|
var assetsTaskState AssetsTaskState
|
|
var assetsTaskCancel context.CancelFunc
|
|
|
|
// updateAssetsState mutiert den State atomar und triggert danach SSE notify.
|
|
// notifyAssetsChanged() muss außerhalb des Locks passieren.
|
|
func updateAssetsState(fn func(st *AssetsTaskState)) AssetsTaskState {
|
|
assetsTaskMu.Lock()
|
|
fn(&assetsTaskState)
|
|
st := assetsTaskState
|
|
assetsTaskMu.Unlock()
|
|
|
|
notifyAssetsChanged()
|
|
return st
|
|
}
|
|
|
|
func snapshotAssetsState() AssetsTaskState {
|
|
assetsTaskMu.Lock()
|
|
st := assetsTaskState
|
|
assetsTaskMu.Unlock()
|
|
return st
|
|
}
|
|
|
|
func getGenerateAssetsTaskStatus() AssetsTaskState {
|
|
return snapshotAssetsState()
|
|
}
|
|
|
|
func tasksGenerateAssets(w http.ResponseWriter, r *http.Request) {
|
|
switch r.Method {
|
|
case http.MethodGet:
|
|
// GET bleibt als Fallback/Debug möglich (UI nutzt SSE)
|
|
st := snapshotAssetsState()
|
|
writeJSON(w, http.StatusOK, st)
|
|
return
|
|
|
|
case http.MethodPost:
|
|
assetsTaskMu.Lock()
|
|
if assetsTaskState.Running {
|
|
st := assetsTaskState
|
|
assetsTaskMu.Unlock()
|
|
writeJSON(w, http.StatusOK, st)
|
|
return
|
|
}
|
|
|
|
// cancelbarer Context (pro Run)
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
assetsTaskCancel = cancel
|
|
|
|
now := time.Now()
|
|
assetsTaskState = AssetsTaskState{
|
|
Running: true,
|
|
Total: 0,
|
|
Done: 0,
|
|
GeneratedThumbs: 0,
|
|
GeneratedPreviews: 0,
|
|
Skipped: 0,
|
|
StartedAt: now,
|
|
FinishedAt: nil,
|
|
Error: "",
|
|
CurrentFile: "",
|
|
}
|
|
st := assetsTaskState
|
|
assetsTaskMu.Unlock()
|
|
|
|
// ✅ SSE: Start pushen
|
|
notifyAssetsChanged()
|
|
|
|
go runGenerateMissingAssets(ctx)
|
|
|
|
writeJSON(w, http.StatusOK, st)
|
|
return
|
|
|
|
case http.MethodDelete:
|
|
assetsTaskMu.Lock()
|
|
cancel := assetsTaskCancel
|
|
running := assetsTaskState.Running
|
|
assetsTaskMu.Unlock()
|
|
|
|
if !running || cancel == nil {
|
|
w.WriteHeader(http.StatusNoContent)
|
|
return
|
|
}
|
|
|
|
// canceln: Worker merkt das beim nächsten ctx.Err() und beendet sauber
|
|
cancel()
|
|
|
|
// UI sofort informieren (ohne Running künstlich auf false zu setzen —
|
|
// das macht der Worker zuverlässig im finishWithErr(context.Canceled))
|
|
st := updateAssetsState(func(st *AssetsTaskState) {
|
|
if st.Running {
|
|
st.Error = "abgebrochen"
|
|
}
|
|
})
|
|
|
|
writeJSON(w, http.StatusOK, st)
|
|
return
|
|
|
|
default:
|
|
http.Error(w, "Nur GET/POST/DELETE", http.StatusMethodNotAllowed)
|
|
return
|
|
}
|
|
}
|
|
|
|
func runGenerateMissingAssets(ctx context.Context) {
|
|
// Worker-Ende: CancelFunc zurücksetzen (pro Run)
|
|
defer func() {
|
|
assetsTaskMu.Lock()
|
|
assetsTaskCancel = nil
|
|
assetsTaskMu.Unlock()
|
|
}()
|
|
|
|
finishWithErr := func(err error) {
|
|
now := time.Now()
|
|
|
|
updateAssetsState(func(st *AssetsTaskState) {
|
|
st.Running = false
|
|
st.FinishedAt = &now
|
|
st.CurrentFile = ""
|
|
|
|
if err == nil {
|
|
// Erfolg: Error leeren
|
|
st.Error = ""
|
|
return
|
|
}
|
|
|
|
// stabiler Text für UI
|
|
if errors.Is(err, context.Canceled) {
|
|
st.Error = "abgebrochen"
|
|
} else {
|
|
st.Error = err.Error()
|
|
}
|
|
})
|
|
}
|
|
|
|
s := getSettings()
|
|
doneAbs, err := resolvePathRelativeToApp(s.DoneDir)
|
|
if err != nil || strings.TrimSpace(doneAbs) == "" {
|
|
finishWithErr(fmt.Errorf("doneDir auflösung fehlgeschlagen: %v", err))
|
|
return
|
|
}
|
|
|
|
type item struct {
|
|
name string
|
|
path string
|
|
}
|
|
|
|
// .trash niemals verarbeiten
|
|
isTrashPath := func(full string) bool {
|
|
p := strings.ToLower(strings.ReplaceAll(full, "\\", "/"))
|
|
return strings.Contains(p, "/.trash/") || strings.HasSuffix(p, "/.trash")
|
|
}
|
|
|
|
seen := map[string]struct{}{}
|
|
items := make([]item, 0, 512)
|
|
|
|
addIfVideo := func(full string) {
|
|
if isTrashPath(full) {
|
|
return
|
|
}
|
|
|
|
name := filepath.Base(full)
|
|
low := strings.ToLower(name)
|
|
if strings.Contains(low, ".part") || strings.Contains(low, ".tmp") {
|
|
return
|
|
}
|
|
ext := strings.ToLower(filepath.Ext(name))
|
|
if ext != ".mp4" && ext != ".ts" {
|
|
return
|
|
}
|
|
|
|
if _, ok := seen[full]; ok {
|
|
return
|
|
}
|
|
seen[full] = struct{}{}
|
|
items = append(items, item{name: name, path: full})
|
|
}
|
|
|
|
scanOneLevel := func(dir string) {
|
|
ents, err := os.ReadDir(dir)
|
|
if err != nil {
|
|
return
|
|
}
|
|
for _, e := range ents {
|
|
if e.IsDir() && strings.EqualFold(e.Name(), ".trash") {
|
|
continue
|
|
}
|
|
|
|
full := filepath.Join(dir, e.Name())
|
|
if e.IsDir() {
|
|
sub, err := os.ReadDir(full)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
for _, se := range sub {
|
|
if se.IsDir() {
|
|
continue
|
|
}
|
|
addIfVideo(filepath.Join(full, se.Name()))
|
|
}
|
|
continue
|
|
}
|
|
addIfVideo(full)
|
|
}
|
|
}
|
|
|
|
// done + done/<model>/ + done/keep + done/keep/<model>/
|
|
scanOneLevel(doneAbs)
|
|
scanOneLevel(filepath.Join(doneAbs, "keep"))
|
|
|
|
// ✅ Initialisierung: Total etc. + SSE Push
|
|
updateAssetsState(func(st *AssetsTaskState) {
|
|
st.Total = len(items)
|
|
st.Done = 0
|
|
st.GeneratedThumbs = 0
|
|
st.GeneratedPreviews = 0
|
|
st.Skipped = 0
|
|
// Start hat Error schon geleert — hier nur sicherheitshalber:
|
|
st.Error = ""
|
|
})
|
|
|
|
for i, it := range items {
|
|
if err := ctx.Err(); err != nil {
|
|
finishWithErr(err)
|
|
return
|
|
}
|
|
|
|
// ✅ aktuellen Dateinamen für UI setzen
|
|
updateAssetsState(func(st *AssetsTaskState) {
|
|
st.CurrentFile = it.name
|
|
})
|
|
|
|
// ID aus Dateiname
|
|
base := strings.TrimSuffix(it.name, filepath.Ext(it.name))
|
|
id := stripHotPrefix(base)
|
|
if strings.TrimSpace(id) == "" {
|
|
updateAssetsState(func(st *AssetsTaskState) {
|
|
st.Done = i + 1
|
|
})
|
|
continue
|
|
}
|
|
|
|
// Datei-Info (validieren)
|
|
vfi, verr := os.Stat(it.path)
|
|
if verr != nil || vfi.IsDir() || vfi.Size() <= 0 {
|
|
updateAssetsState(func(st *AssetsTaskState) {
|
|
st.Done = i + 1
|
|
})
|
|
continue
|
|
}
|
|
|
|
// Pfade einmalig über zentralen Helper
|
|
_, _, _, _, metaPath, perr := assetPathsForID(id)
|
|
if perr != nil {
|
|
updateAssetsState(func(st *AssetsTaskState) {
|
|
// UI bekommt stabilen Hinweis, aber Task läuft weiter
|
|
st.Error = "mindestens ein Eintrag konnte nicht verarbeitet werden (siehe Logs)"
|
|
st.Done = i + 1
|
|
})
|
|
fmt.Println("⚠️ assetPathsForID:", perr)
|
|
continue
|
|
}
|
|
|
|
// SourceURL best-effort: aus bestehender meta.json
|
|
sourceURL := ""
|
|
if u, ok := readVideoMetaSourceURL(metaPath, vfi); ok {
|
|
sourceURL = u
|
|
}
|
|
|
|
// Generate/Ensure (einheitliche Core-Funktion)
|
|
res, e := ensureAssetsForVideoWithProgressCtx(ctx, it.path, sourceURL, nil)
|
|
if e != nil {
|
|
finishWithErr(e)
|
|
return
|
|
}
|
|
|
|
if _, aerr := prepareVideoForSplit(ctx, it.path, sourceURL, "nsfw"); aerr != nil {
|
|
updateAssetsState(func(st *AssetsTaskState) {
|
|
st.Error = "mindestens ein Eintrag konnte nicht vollständig analysiert werden (siehe Logs)"
|
|
})
|
|
//fmt.Println("⚠️ tasks generate assets analyze:", aerr)
|
|
}
|
|
|
|
updateAssetsState(func(st *AssetsTaskState) {
|
|
if res.Skipped {
|
|
st.Skipped++
|
|
}
|
|
if res.ThumbGenerated {
|
|
st.GeneratedThumbs++
|
|
}
|
|
if res.PreviewGenerated {
|
|
st.GeneratedPreviews++
|
|
}
|
|
st.Done = i + 1
|
|
})
|
|
}
|
|
|
|
finishWithErr(nil)
|
|
}
|