nsfwapp/backend/recorder.go
2026-03-16 12:46:38 +01:00

776 lines
17 KiB
Go

// backend/recorder.go
package main
import (
"context"
"errors"
"fmt"
"math"
"net/http"
"net/url"
"os"
"path/filepath"
"strconv"
"strings"
"time"
"github.com/google/uuid"
)
// ---------------- Progress mapping ----------------
func setJobProgress(job *RecordJob, phase string, pct int) {
phase = strings.TrimSpace(phase)
phaseLower := strings.ToLower(phase)
if pct < 0 {
pct = 0
}
if pct > 100 {
pct = 100
}
type rng struct{ start, end int }
rangeFor := func(ph string) rng {
switch ph {
case "postwork":
return rng{0, 8}
case "remuxing":
return rng{8, 38}
case "moving":
return rng{38, 54}
case "probe":
return rng{54, 70}
case "assets":
return rng{70, 88}
case "analyze":
return rng{88, 99}
default:
return rng{0, 100}
}
}
jobsMu.Lock()
defer jobsMu.Unlock()
inPostwork := job.EndedAt != nil || (strings.TrimSpace(job.Phase) != "" && strings.ToLower(strings.TrimSpace(job.Phase)) != "recording")
if inPostwork {
if phaseLower == "" || phaseLower == "recording" {
return
}
}
if phase != "" {
job.Phase = phase
}
// recording = direkter Prozentwert
if !inPostwork {
if pct < job.Progress {
pct = job.Progress
}
job.Progress = pct
return
}
// postwork-Phasen: pct ist IMMER lokal 0..100 innerhalb der Phase
r := rangeFor(phaseLower)
width := float64(r.end - r.start)
mapped := r.start
if width > 0 {
mapped = r.start + int(math.Round((float64(pct)/100.0)*width))
}
if mapped < r.start {
mapped = r.start
}
if mapped > r.end {
mapped = r.end
}
if mapped < job.Progress {
mapped = job.Progress
}
job.Progress = mapped
}
// ---------------- Preview scrubber ----------------
const defaultScrubberCount = 18
// /api/preview-scrubber/{index}?id=... (oder ?file=...)
func recordPreviewScrubberFrame(w http.ResponseWriter, r *http.Request) {
const prefix = "/api/preview-scrubber/"
if !strings.HasPrefix(r.URL.Path, prefix) {
http.NotFound(w, r)
return
}
idxPart := strings.Trim(strings.TrimPrefix(r.URL.Path, prefix), "/")
if idxPart == "" {
http.Error(w, "missing scrubber frame index", http.StatusBadRequest)
return
}
idx, err := strconv.Atoi(idxPart)
if err != nil || idx < 0 {
http.Error(w, "invalid scrubber frame index", http.StatusBadRequest)
return
}
q := r.URL.Query()
id := strings.TrimSpace(q.Get("id"))
file := strings.TrimSpace(q.Get("file"))
if id == "" && file == "" {
http.Error(w, "missing id or file", http.StatusBadRequest)
return
}
durSec, err := lookupDurationForScrubber(r)
if err != nil || durSec <= 0 {
durSec = 60
}
count := defaultScrubberCount
if idx >= count {
idx = count - 1
}
if count < 1 {
count = 1
}
t := scrubberIndexToTime(idx, count, durSec)
targetQ := url.Values{}
if id != "" {
targetQ.Set("id", id)
}
if file != "" {
targetQ.Set("file", file)
}
targetQ.Set("t", fmt.Sprintf("%.3f", t))
w.Header().Set("Cache-Control", "private, max-age=300")
http.Redirect(w, r, "/api/preview?"+targetQ.Encode(), http.StatusFound)
}
// Gleichmäßig über die Videolänge sampeln (Mitte des Segments)
func scrubberIndexToTime(index, count int, durationSec float64) float64 {
if count <= 1 {
return 0.1
}
if durationSec <= 0 {
return 0.1
}
maxT := math.Max(0.1, durationSec-0.1)
ratio := (float64(index) + 0.5) / float64(count)
t := ratio * maxT
if t < 0.1 {
t = 0.1
}
if t > maxT {
t = maxT
}
return t
}
func lookupDurationForScrubber(r *http.Request) (float64, error) {
path, ok, _, _ := resolvePlayablePathFromQuery(r)
if !ok || strings.TrimSpace(path) == "" {
return 0, fmt.Errorf("unable to resolve file")
}
// best-effort meta
ensureMetaJSONForPlayback(r.Context(), path)
ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
defer cancel()
sec, err := durationSecondsCached(ctx, path)
if err != nil {
return 0, err
}
return sec, nil
}
// ---------------- Preview sprite file handler ----------------
func recordPreviewSprite(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet && r.Method != http.MethodHead {
http.Error(w, "Nur GET/HEAD", http.StatusMethodNotAllowed)
return
}
id := strings.TrimPrefix(r.URL.Path, "/api/record/preview-sprite/")
if id == r.URL.Path {
id = strings.TrimPrefix(r.URL.Path, "/api/preview-sprite/")
}
id = strings.TrimSpace(id)
id = strings.Trim(id, "/")
if id == "" {
http.Error(w, "id fehlt", http.StatusBadRequest)
return
}
var err error
id, err = sanitizeID(id)
if err != nil {
http.Error(w, "ungültige id", http.StatusBadRequest)
return
}
dir, err := generatedDirForID(id)
if err != nil {
http.Error(w, "ungültige id", http.StatusBadRequest)
return
}
spritePath := filepath.Join(dir, "preview-sprite.jpg")
fi, err := os.Stat(spritePath)
if err != nil || fi.IsDir() || fi.Size() <= 0 {
http.NotFound(w, r)
return
}
f, err := os.Open(spritePath)
if err != nil {
http.NotFound(w, r)
return
}
defer f.Close()
w.Header().Set("Content-Type", "image/jpeg")
w.Header().Set("Cache-Control", "private, max-age=31536000, immutable")
w.Header().Set("X-Content-Type-Options", "nosniff")
http.ServeContent(w, r, "preview-sprite.jpg", fi.ModTime(), f)
}
// ---------------- Start + run job ----------------
func startRecordingInternal(req RecordRequest) (*RecordJob, error) {
url := strings.TrimSpace(req.URL)
if url == "" {
return nil, errors.New("url fehlt")
}
jobsMu.Lock()
for _, j := range jobs {
if j != nil && j.Status == JobRunning && j.EndedAt == nil && strings.TrimSpace(j.SourceURL) == url {
if j.Hidden && !req.Hidden {
j.Hidden = false
jobsMu.Unlock()
publishJobUpsert(j)
return j, nil
}
jobsMu.Unlock()
return j, nil
}
}
startedAt := time.Now()
provider := detectProvider(url)
username := ""
switch provider {
case "chaturbate":
username = extractUsername(url)
case "mfc":
username = extractMFCUsername(url)
}
if strings.TrimSpace(username) == "" {
username = "unknown"
}
filename := fmt.Sprintf("%s_%s.ts", username, startedAt.Format("01_02_2006__15-04-05"))
s := getSettings()
recordDirAbs, _ := resolvePathRelativeToApp(s.RecordDir)
recordDir := strings.TrimSpace(recordDirAbs)
if recordDir == "" {
recordDir = strings.TrimSpace(s.RecordDir)
}
outPath := filepath.Join(recordDir, filename)
jobID := uuid.NewString()
ctx, cancel := context.WithCancel(context.Background())
job := &RecordJob{
ID: jobID,
SourceURL: url,
Status: JobRunning,
StartedAt: startedAt,
StartedAtMs: startedAt.UnixMilli(),
Output: outPath,
Hidden: req.Hidden,
cancel: cancel,
}
jobs[jobID] = job
jobsMu.Unlock()
if !job.Hidden {
publishJobUpsert(job)
}
go runJob(ctx, job, req)
return job, nil
}
func runJob(ctx context.Context, job *RecordJob, req RecordRequest) {
hc := NewHTTPClient(req.UserAgent)
provider := detectProvider(req.URL)
var err error
now := job.StartedAt
if now.IsZero() {
now = time.Now()
}
if job.StartedAtMs == 0 {
base := job.StartedAt
if base.IsZero() {
base = time.Now()
jobsMu.Lock()
job.StartedAt = base
jobsMu.Unlock()
}
jobsMu.Lock()
job.StartedAtMs = base.UnixMilli()
jobsMu.Unlock()
}
setJobProgress(job, "recording", 0)
publishJobUpsert(job)
switch provider {
case "chaturbate":
if !hasChaturbateCookies(req.Cookie) {
err = errors.New("cf_clearance und session_id (oder sessionid) Cookies sind für Chaturbate erforderlich")
break
}
s := getSettings()
recordDirAbs, rerr := resolvePathRelativeToApp(s.RecordDir)
if rerr != nil || strings.TrimSpace(recordDirAbs) == "" {
err = fmt.Errorf("recordDir auflösung fehlgeschlagen: %v", rerr)
break
}
_ = os.MkdirAll(recordDirAbs, 0o755)
username := extractUsername(req.URL)
filename := fmt.Sprintf("%s_%s.ts", username, now.Format("01_02_2006__15-04-05"))
jobsMu.Lock()
existingOut := strings.TrimSpace(job.Output)
jobsMu.Unlock()
outPath := existingOut
if outPath == "" || !filepath.IsAbs(outPath) {
outPath = filepath.Join(recordDirAbs, filename)
}
if strings.TrimSpace(existingOut) != strings.TrimSpace(outPath) {
jobsMu.Lock()
job.Output = outPath
jobsMu.Unlock()
publishJobUpsert(job)
}
err = RecordStream(ctx, hc, "https://chaturbate.com/", username, outPath, req.Cookie, job)
case "mfc":
s := getSettings()
recordDirAbs, rerr := resolvePathRelativeToApp(s.RecordDir)
if rerr != nil || strings.TrimSpace(recordDirAbs) == "" {
err = fmt.Errorf("recordDir auflösung fehlgeschlagen: %v", rerr)
break
}
_ = os.MkdirAll(recordDirAbs, 0o755)
username := extractMFCUsername(req.URL)
filename := fmt.Sprintf("%s_%s.ts", username, now.Format("01_02_2006__15-04-05"))
outPath := filepath.Join(recordDirAbs, filename)
jobsMu.Lock()
job.Output = outPath
jobsMu.Unlock()
publishJobUpsert(job)
err = RecordStreamMFC(ctx, hc, username, outPath, job)
default:
err = errors.New("unsupported provider")
}
if err != nil && shouldLogRecordError(err, provider, req) {
fmt.Println("❌ [record]", provider, job.SourceURL, "->", err)
}
end := time.Now()
target := JobFinished
var errText string
if err != nil {
if errors.Is(err, context.Canceled) {
target = JobStopped
} else {
target = JobFailed
errText = err.Error()
}
}
stopPreview(job)
jobsMu.Lock()
job.EndedAt = &end
job.EndedAtMs = end.UnixMilli()
if errText != "" {
job.Error = errText
}
job.Phase = "postwork"
out := strings.TrimSpace(job.Output)
jobsMu.Unlock()
publishJobUpsert(job)
if out == "" {
jobsMu.Lock()
job.Status = target
job.Phase = ""
job.Progress = 100
job.PostWorkKey = ""
job.PostWork = nil
jobsMu.Unlock()
publishJobRemove(job)
notifyDoneChanged()
return
}
// ✅ harte Schranke: leere / ungültige Output-Dateien nie in den Postwork schicken
{
fi, serr := os.Stat(out)
if serr != nil || fi == nil || fi.IsDir() || fi.Size() <= 0 {
_ = removeWithRetry(out)
purgeDurationCacheForPath(out)
jobsMu.Lock()
delete(jobs, job.ID)
jobsMu.Unlock()
publishJobRemove(job)
notifyDoneChanged()
if shouldLogRecordInfo(req) {
if serr != nil {
fmt.Println("🧹 removed invalid output before postwork:", filepath.Base(out), "(stat error:", serr, ")")
} else if fi == nil || fi.IsDir() {
fmt.Println("🧹 removed invalid output before postwork:", filepath.Base(out), "(not a regular file)")
} else {
fmt.Println("🧹 removed empty output before postwork:", filepath.Base(out), "(0 bytes)")
}
}
return
}
}
// pre-queue gate: nur in die Nachbearbeitung, wenn Datei behalten werden soll
{
fi, serr := os.Stat(out)
if serr == nil && fi != nil && !fi.IsDir() {
jobsMu.Lock()
job.SizeBytes = fi.Size()
jobsMu.Unlock()
publishJobUpsert(job)
s := getSettings()
minMB := s.AutoDeleteSmallDownloadsBelowMB
// ✅ Wenn AutoDelete aktiv ist und Datei unter Schwellwert liegt:
// NICHT in die Postwork-Queue aufnehmen, sondern direkt löschen + return.
if s.AutoDeleteSmallDownloads && minMB > 0 {
threshold := int64(minMB) * 1024 * 1024
if fi.Size() > 0 && fi.Size() < threshold {
base := filepath.Base(out)
id := stripHotPrefix(strings.TrimSuffix(base, filepath.Ext(base)))
derr := removeWithRetry(out)
if derr == nil || os.IsNotExist(derr) {
removeGeneratedForID(id)
purgeDurationCacheForPath(out)
jobsMu.Lock()
delete(jobs, job.ID)
jobsMu.Unlock()
publishJobRemove(job)
notifyDoneChanged()
if shouldLogRecordInfo(req) {
fmt.Println("🧹 auto-deleted before enqueue:", base, "(size: "+formatBytesSI(fi.Size())+", threshold: "+formatBytesSI(threshold)+")")
}
return
}
fmt.Println("⚠️ auto-delete before enqueue failed:", derr)
}
}
}
}
// postwork queue
postOut := out
postTarget := target
postKey := "postwork:" + job.ID
jobsMu.Lock()
job.Phase = "postwork"
job.PostWorkKey = postKey
{
s := postWorkQ.StatusForKey(postKey)
job.PostWork = &s
}
jobsMu.Unlock()
publishJobUpsert(job)
okQueued := postWorkQ.Enqueue(PostWorkTask{
Key: postKey,
Added: time.Now(),
Run: func(ctx context.Context) error {
{
st := postWorkQ.StatusForKey(postKey)
jobsMu.Lock()
job.PostWork = &st
jobsMu.Unlock()
setJobProgress(job, "postwork", 0)
publishJobUpsert(job)
}
out := strings.TrimSpace(postOut)
if out == "" {
jobsMu.Lock()
job.Phase = ""
job.Progress = 100
job.Status = postTarget
job.PostWorkKey = ""
job.PostWork = nil
jobsMu.Unlock()
publishJobUpsert(job)
notifyDoneChanged()
return nil
}
setPhase := func(phase string, pct int) {
setJobProgress(job, phase, pct)
st := postWorkQ.StatusForKey(postKey)
jobsMu.Lock()
job.PostWork = &st
jobsMu.Unlock()
publishJobUpsert(job)
}
// 1) Remux
if strings.EqualFold(filepath.Ext(out), ".ts") {
setPhase("remuxing", 10)
if newOut, err2 := maybeRemuxTSForJob(job, out); err2 == nil && strings.TrimSpace(newOut) != "" {
out = strings.TrimSpace(newOut)
jobsMu.Lock()
job.Output = out
jobsMu.Unlock()
publishJobUpsert(job)
}
}
// 2) Move to done
setPhase("moving", 10)
// ✅ auch nach Remux nochmal hart prüfen: keine 0-Byte-Dateien nach done verschieben
{
fi, serr := os.Stat(out)
if serr != nil || fi == nil || fi.IsDir() || fi.Size() <= 0 {
_ = removeWithRetry(out)
purgeDurationCacheForPath(out)
jobsMu.Lock()
delete(jobs, job.ID)
jobsMu.Unlock()
publishJobRemove(job)
notifyDoneChanged()
if shouldLogRecordInfo(req) {
if serr != nil {
fmt.Println("🧹 removed invalid post-remux output:", filepath.Base(out), "(stat error:", serr, ")")
} else if fi == nil || fi.IsDir() {
fmt.Println("🧹 removed invalid post-remux output:", filepath.Base(out), "(not a regular file)")
} else {
fmt.Println("🧹 removed empty post-remux output:", filepath.Base(out), "(0 bytes)")
}
}
return nil
}
}
if moved, err2 := moveToDoneDir(out); err2 == nil && strings.TrimSpace(moved) != "" {
out = strings.TrimSpace(moved)
jobsMu.Lock()
job.Output = out
jobsMu.Unlock()
publishJobUpsert(job)
notifyDoneChanged()
}
// 3) Duration
setPhase("probe", 35)
{
dctx, cancel := context.WithTimeout(ctx, 6*time.Second)
if sec, derr := durationSecondsCached(dctx, out); derr == nil && sec > 0 {
jobsMu.Lock()
job.DurationSeconds = sec
jobsMu.Unlock()
publishJobUpsert(job)
}
cancel()
}
// 4) Video props
setPhase("probe", 75)
{
pctx, cancel := context.WithTimeout(ctx, 6*time.Second)
w, h, fps, perr := probeVideoProps(pctx, out)
cancel()
if perr == nil {
jobsMu.Lock()
job.VideoWidth = w
job.VideoHeight = h
job.FPS = fps
jobsMu.Unlock()
publishJobUpsert(job)
}
}
// 5) Assets with progress
setPhase("assets", 0)
lastPct := -1
lastTick := time.Time{}
update := func(r float64) {
if r < 0 {
r = 0
}
if r > 1 {
r = 1
}
pct := int(math.Round(r * 100))
if pct < 0 {
pct = 0
}
if pct > 100 {
pct = 100
}
if pct == lastPct {
return
}
if !lastTick.IsZero() && time.Since(lastTick) < 150*time.Millisecond {
return
}
lastPct = pct
lastTick = time.Now()
setPhase("assets", pct)
}
if _, err := ensureAssetsForVideoWithProgressCtx(ctx, out, job.SourceURL, update); err != nil {
fmt.Println("⚠️ ensureAssetsForVideo:", err)
}
setPhase("assets", 100)
// 6) AI Analyze -> meta.json.ai
setPhase("analyze", 5)
{
actx, cancel := context.WithTimeout(ctx, 45*time.Second)
defer cancel()
id := assetIDFromVideoPath(out)
if strings.TrimSpace(id) == "" {
fmt.Println("⚠️ postwork analyze: keine asset id ableitbar")
} else {
ps := previewSpriteTruthForID(id)
if !ps.Exists {
fmt.Println("⚠️ postwork analyze: preview-sprite.jpg nicht gefunden")
} else {
durationSec, _ := durationSecondsForAnalyze(actx, out)
hits, aerr := analyzeVideoFromSprite(actx, out, "nsfw")
if aerr != nil {
fmt.Println("⚠️ postwork analyze:", aerr)
} else {
setPhase("analyze", 65)
segments := buildSegmentsFromAnalyzeHits(hits, durationSec)
ai := &aiAnalysisMeta{
Goal: "nsfw",
Mode: "sprite",
Hits: hits,
Segments: segments,
AnalyzedAtUnix: time.Now().Unix(),
}
if werr := writeVideoAIForFile(actx, out, job.SourceURL, ai); werr != nil {
fmt.Println("⚠️ writeVideoAIForFile:", werr)
}
}
}
}
}
setPhase("analyze", 100)
// Finalize
jobsMu.Lock()
job.Status = postTarget
job.Phase = ""
job.Progress = 100
job.PostWorkKey = ""
job.PostWork = nil
jobsMu.Unlock()
publishJobRemove(job)
notifyDoneChanged()
return nil
},
})
if okQueued {
st := postWorkQ.StatusForKey(postKey)
jobsMu.Lock()
job.PostWork = &st
jobsMu.Unlock()
publishJobUpsert(job)
} else {
jobsMu.Lock()
job.Status = postTarget
job.Phase = ""
job.Progress = 100
job.PostWorkKey = ""
job.PostWork = nil
jobsMu.Unlock()
publishJobRemove(job)
notifyDoneChanged()
}
}