nsfwapp/backend/live.go
2026-03-16 15:11:45 +01:00

786 lines
20 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// backend/live.go
package main
import (
"bufio"
"bytes"
"context"
"fmt"
"io"
"net/http"
"net/url"
"os"
"os/exec"
"path"
"path/filepath"
"regexp"
"strings"
"time"
)
// ============================================================
// HLS Live Preview serving (+ m3u8 rewrite)
// ============================================================
//
// This file contains everything related to the HLS live preview stream:
// - serving index*.m3u8 + segment files from a job's PreviewDir
// - rewriting m3u8 segment URLs to a configurable base path
// - starting/stopping the ffmpeg HLS preview process (per job)
// - hover/play activation checks + preview "touch" + ensure-start logic
//
// It intentionally reuses existing globals/types from your backend (package main):
// - jobs, jobsMu, RecordJob, JobRunning
// - ffmpegPath, previewSem
// - notifyJobsChanged()
// - assetIDForJob(job *RecordJob) string
// - startLiveThumbJPGLoop(ctx, job)
// ============================================================
// Allowed files that may be served out of PreviewDir.
var previewFileRe = regexp.MustCompile(`^(index(_hq)?\.m3u8|seg_(low|hq)_\d+\.ts|seg_\d+\.ts|init\.m4s|\w+\.m4s)$`)
func serveLiveNotReady(w http.ResponseWriter, r *http.Request) {
// ✅ Für HLS-Clients (hls.js) ist 204 beim Manifest "ein Fehler" -> aggressive Retries.
// Deshalb: IMMER 200 + gültige (aber leere) m3u8 zurückgeben.
w.Header().Set("Content-Type", "application/vnd.apple.mpegurl; charset=utf-8")
w.Header().Set("Cache-Control", "no-store")
w.Header().Set("X-Content-Type-Options", "nosniff")
w.Header().Set("Retry-After", "1")
if r.Method == http.MethodHead {
w.WriteHeader(http.StatusOK)
return
}
body := "#EXTM3U\n" +
"#EXT-X-VERSION:3\n" +
"#EXT-X-TARGETDURATION:2\n" +
"#EXT-X-MEDIA-SEQUENCE:0\n"
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(body))
}
func maybeBlockHLSOnPreview(w http.ResponseWriter, r *http.Request, basePath, file string) bool {
// Nur /api/preview (nicht /api/preview/live)
if strings.TrimSpace(basePath) != "/api/preview" {
return false
}
low := strings.ToLower(strings.TrimSpace(file))
if low == "" {
return false
}
// Nur echte HLS-Dateien blocken (Manifest/Segmente)
if strings.HasSuffix(low, ".m3u8") || strings.HasSuffix(low, ".ts") || strings.HasSuffix(low, ".m4s") {
w.Header().Set("Cache-Control", "no-store")
w.Header().Set("X-Preview-HLS-Disabled", "1")
http.Error(w, "HLS disabled on /api/preview; use /api/preview/live", http.StatusGone) // 410
return true
}
return false
}
// stopPreview stops the running ffmpeg HLS preview process for a job and resets state.
func stopPreview(job *RecordJob) {
jobsMu.Lock()
cmd := job.previewCmd
cancel := job.previewCancel
job.previewCmd = nil
job.previewCancel = nil
job.LiveThumbStarted = false
job.PreviewDir = ""
jobsMu.Unlock()
if cancel != nil {
cancel()
}
if cmd != nil && cmd.Process != nil {
_ = cmd.Process.Kill()
}
}
func recordPreviewLive(w http.ResponseWriter, r *http.Request) {
// ✅ Route bleibt /api/preview/live
// Wenn kein "file" Parameter da ist, liefern wir den neuen Single-Request Stream (fMP4).
file := strings.TrimSpace(r.URL.Query().Get("file"))
if file == "" {
recordPreviewLiveFMP4(w, r)
return
}
// Legacy: HLS file serving + m3u8 rewrite (falls du es noch irgendwo brauchst)
id := strings.TrimSpace(r.URL.Query().Get("id"))
if id == "" {
http.Error(w, "id fehlt", http.StatusBadRequest)
return
}
servePreviewHLSFileWithBase(w, r, id, file, "/api/preview/live")
}
// recordPreviewFile serves ONLY the HLS file requests for /api/preview?file=...
// preview.jpg bleibt in preview.go (servePreviewJPGAlias).
func recordPreviewFile(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet && r.Method != http.MethodHead {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
id := strings.TrimSpace(r.URL.Query().Get("id"))
if id == "" {
id = strings.TrimSpace(r.URL.Query().Get("name"))
}
if id == "" {
http.Error(w, "id fehlt", http.StatusBadRequest)
return
}
file := strings.TrimSpace(r.URL.Query().Get("file"))
if file == "" {
http.Error(w, "file fehlt", http.StatusBadRequest)
return
}
// ✅ HLS auf /api/preview blocken (Manifest/Segmente), um Polling-Storm zu verhindern.
// Wenn du es NICHT blocken willst, diese if-Zeile entfernen.
if maybeBlockHLSOnPreview(w, r, "/api/preview", file) {
return
}
// Alles andere (falls doch erlaubt) über die gemeinsame Serving-Funktion
servePreviewHLSFileWithBase(w, r, id, file, "/api/preview")
}
// servePreviewHLSFileWithBase serves a single HLS file (index/segment) for a job.
// If it's an m3u8, it is rewritten so that segment URIs point at basePath.
func servePreviewHLSFileWithBase(w http.ResponseWriter, r *http.Request, id, file, basePath string) {
file = strings.TrimSpace(file)
if file == "" || filepath.Base(file) != file || !previewFileRe.MatchString(file) {
http.Error(w, "ungültige file", http.StatusBadRequest)
return
}
isIndex := file == "index.m3u8" || file == "index_hq.m3u8"
jobsMu.Lock()
job, ok := jobs[id]
state := ""
if ok && job != nil {
state = strings.TrimSpace(job.PreviewState)
}
jobsMu.Unlock()
// HEAD: quick existence check
if r.Method == http.MethodHead {
if !ok || job == nil {
w.WriteHeader(http.StatusNotFound)
return
}
if state == "private" {
w.WriteHeader(http.StatusForbidden)
return
}
if state == "offline" {
w.WriteHeader(http.StatusNotFound)
return
}
previewDir := strings.TrimSpace(job.PreviewDir)
if previewDir == "" {
w.WriteHeader(http.StatusNotFound)
return
}
p := filepath.Join(previewDir, file)
if st, err := os.Stat(p); err == nil && !st.IsDir() {
w.Header().Set("Cache-Control", "no-store")
w.WriteHeader(http.StatusOK)
return
}
w.WriteHeader(http.StatusNotFound)
return
}
// activation: hover or play=1
active := isHover(r) || strings.TrimSpace(r.URL.Query().Get("play")) == "1"
if !active {
if isIndex {
serveLiveNotReady(w, r)
return
}
http.Error(w, "preview not active", http.StatusNotFound)
return
}
if !ok || job == nil {
if isIndex {
serveLiveNotReady(w, r)
return
}
http.Error(w, "job nicht gefunden", http.StatusNotFound)
return
}
ensurePreviewStarted(r, job)
touchPreview(job)
jobsMu.Lock()
state = strings.TrimSpace(job.PreviewState)
jobsMu.Unlock()
if state == "private" {
http.Error(w, "model private", http.StatusForbidden)
return
}
if state == "offline" {
http.Error(w, "model offline", http.StatusNotFound)
return
}
if state == "error" {
http.Error(w, "preview error", http.StatusServiceUnavailable)
return
}
previewDir := strings.TrimSpace(job.PreviewDir)
if previewDir == "" {
if isIndex {
serveLiveNotReady(w, r)
return
}
http.Error(w, "preview nicht verfügbar", http.StatusNotFound)
return
}
p := filepath.Join(previewDir, file)
st, err := os.Stat(p)
if err != nil || st.IsDir() {
if isIndex {
serveLiveNotReady(w, r)
return
}
http.Error(w, "datei nicht gefunden", http.StatusNotFound)
return
}
ext := strings.ToLower(filepath.Ext(p))
w.Header().Set("Cache-Control", "no-store")
w.Header().Set("X-Accel-Buffering", "no")
// m3u8 -> rewrite
if ext == ".m3u8" {
raw, err := os.ReadFile(p)
if err != nil {
http.Error(w, "m3u8 read failed", http.StatusInternalServerError)
return
}
rewritten := rewriteM3U8WithBase(raw, id, basePath)
w.Header().Set("Content-Type", "application/vnd.apple.mpegurl; charset=utf-8")
w.WriteHeader(http.StatusOK)
_, _ = w.Write(rewritten)
return
}
switch ext {
case ".ts":
w.Header().Set("Content-Type", "video/mp2t")
case ".m4s":
w.Header().Set("Content-Type", "video/iso.segment")
default:
w.Header().Set("Content-Type", "application/octet-stream")
}
// segments may still be written -> wait until size stabilizes
if ext == ".ts" || ext == ".m4s" {
if !waitForStableFile(p, 2, 120*time.Millisecond) {
http.Error(w, "segment not ready", http.StatusNotFound)
return
}
}
f, err := os.Open(p)
if err != nil {
http.Error(w, "open failed", http.StatusNotFound)
return
}
defer f.Close()
http.ServeContent(w, r, file, st.ModTime(), f)
}
func waitForStableFile(path string, checks int, interval time.Duration) bool {
var last int64 = -1
for i := 0; i < checks; i++ {
st, err := os.Stat(path)
if err != nil || st.IsDir() {
return false
}
sz := st.Size()
if last >= 0 && sz == last {
return true
}
last = sz
time.Sleep(interval)
}
return false
}
func classifyPreviewFFmpegStderr(stderr string) (state string, httpStatus int) {
s := strings.ToLower(stderr)
if strings.Contains(s, "403 forbidden") || strings.Contains(s, "http error 403") || strings.Contains(s, "server returned 403") {
return "private", http.StatusForbidden
}
if strings.Contains(s, "404 not found") || strings.Contains(s, "http error 404") || strings.Contains(s, "server returned 404") {
return "offline", http.StatusNotFound
}
return "", 0
}
// startPreviewHLS starts ffmpeg to generate HLS segments in previewDir.
// It also starts your existing live-thumb loop: startLiveThumbJPGLoop(ctx, job).
func startPreviewHLS(ctx context.Context, job *RecordJob, m3u8URL, previewDir, httpCookie, userAgent string) error {
if strings.TrimSpace(ffmpegPath) == "" {
return fmt.Errorf("kein ffmpeg gefunden setze FFMPEG_PATH oder lege ffmpeg(.exe) neben das Backend")
}
if err := os.MkdirAll(previewDir, 0o755); err != nil {
return err
}
jobsMu.Lock()
job.PreviewState = ""
job.PreviewStateAt = ""
job.PreviewStateMsg = ""
hidden := job.Hidden
jobsMu.Unlock()
if !hidden {
publishJobUpsert(job)
}
commonIn := []string{"-y"}
if strings.TrimSpace(userAgent) != "" {
commonIn = append(commonIn, "-user_agent", userAgent)
}
if strings.TrimSpace(httpCookie) != "" {
commonIn = append(commonIn, "-headers", fmt.Sprintf("Cookie: %s\r\n", httpCookie))
}
commonIn = append(commonIn, "-i", m3u8URL)
hqArgs := append(commonIn,
"-vf", "scale=480:-2",
"-c:v", "libx264", "-preset", "veryfast", "-tune", "zerolatency",
"-pix_fmt", "yuv420p",
"-profile:v", "main",
"-level", "3.1",
"-threads", "4",
"-g", "48", "-keyint_min", "48", "-sc_threshold", "0",
"-map", "0:v:0",
"-map", "0:a:0?",
"-c:a", "aac", "-b:a", "128k", "-ac", "2",
"-f", "hls",
"-hls_time", "2",
"-hls_list_size", "6",
"-hls_allow_cache", "0",
"-hls_flags", "delete_segments+append_list+independent_segments+temp_file",
"-hls_segment_filename", filepath.Join(previewDir, "seg_hq_%05d.ts"),
filepath.Join(previewDir, "index_hq.m3u8"),
)
cmd := exec.CommandContext(ctx, ffmpegPath, hqArgs...)
var stderr bytes.Buffer
cmd.Stderr = &stderr
jobsMu.Lock()
job.previewCmd = cmd
jobsMu.Unlock()
go func() {
if err := previewSem.Acquire(ctx); err != nil {
jobsMu.Lock()
if job.previewCmd == cmd {
job.previewCmd = nil
}
jobsMu.Unlock()
return
}
defer previewSem.Release()
if err := cmd.Run(); err != nil && ctx.Err() == nil {
st := strings.TrimSpace(stderr.String())
state, code := classifyPreviewFFmpegStderr(st)
jobsMu.Lock()
if state != "" {
job.PreviewState = state
job.PreviewStateAt = time.Now().UTC().Format(time.RFC3339Nano)
job.PreviewStateMsg = fmt.Sprintf("ffmpeg input returned HTTP %d", code)
} else {
job.PreviewState = "error"
job.PreviewStateAt = time.Now().UTC().Format(time.RFC3339Nano)
if len(st) > 280 {
job.PreviewStateMsg = st[:280] + "…"
} else {
job.PreviewStateMsg = st
}
}
hidden := job.Hidden
jobsMu.Unlock()
if !hidden {
publishJobUpsert(job)
}
//fmt.Printf("⚠️ preview hq ffmpeg failed: %v (%s)\n", err, st)
}
jobsMu.Lock()
if job.previewCmd == cmd {
job.previewCmd = nil
}
jobsMu.Unlock()
}()
startLiveThumbJPGLoop(ctx, job)
return nil
}
// rewriteM3U8WithBase rewrites all segment URIs inside an m3u8 to point at basePath.
//
// Example output line:
//
// /api/preview/live?id=<id>&file=seg_hq_00001.ts&play=1
func rewriteM3U8WithBase(raw []byte, id string, basePath string) []byte {
basePath = strings.TrimSpace(basePath)
if basePath == "" {
basePath = "/api/preview"
}
if !strings.HasPrefix(basePath, "/") {
basePath = "/" + basePath
}
base := basePath + "?id=" + url.QueryEscape(id) + "&file="
var out bytes.Buffer
sc := bufio.NewScanner(bytes.NewReader(raw))
for sc.Scan() {
line := sc.Text()
trim := strings.TrimSpace(line)
if trim == "" {
out.WriteByte('\n')
continue
}
// tags: may contain URI="..."
if strings.HasPrefix(trim, "#") {
line = rewriteAttrURIWithBase(line, base, basePath)
out.WriteString(line)
out.WriteByte('\n')
continue
}
u := trim
// absolute URLs: keep
if strings.HasPrefix(u, "http://") || strings.HasPrefix(u, "https://") {
out.WriteString(line)
out.WriteByte('\n')
continue
}
// already points to our endpoint: keep
if strings.Contains(u, basePath) || strings.Contains(u, "/api/preview") {
out.WriteString(line)
out.WriteByte('\n')
continue
}
name := path.Base(u)
out.WriteString(base + url.QueryEscape(name) + "&play=1")
out.WriteByte('\n')
}
if err := sc.Err(); err != nil {
return raw
}
return out.Bytes()
}
func rewriteAttrURIWithBase(line, base string, basePath string) string {
const key = `URI="`
i := strings.Index(line, key)
if i < 0 {
return line
}
j := strings.Index(line[i+len(key):], `"`)
if j < 0 {
return line
}
start := i + len(key)
end := start + j
val := line[start:end]
valTrim := strings.TrimSpace(val)
// keep absolute or already-rewritten URIs
if strings.HasPrefix(valTrim, "http://") || strings.HasPrefix(valTrim, "https://") {
return line
}
if strings.Contains(valTrim, basePath) || strings.Contains(valTrim, "/api/preview") {
return line
}
name := path.Base(valTrim)
repl := base + url.QueryEscape(name) + "&play=1"
return line[:start] + repl + line[end:]
}
// isHover decides whether this request should count as "active".
func isHover(r *http.Request) bool {
v := strings.ToLower(strings.TrimSpace(r.URL.Query().Get("hover")))
return v == "1" || v == "true" || v == "yes"
}
// touchPreview updates the last-hit timestamp so your cleanup/stop logic can use it.
func touchPreview(job *RecordJob) {
if job == nil {
return
}
jobsMu.Lock()
job.previewLastHit = time.Now()
jobsMu.Unlock()
}
// ensurePreviewStarted starts the ffmpeg HLS preview if not running yet.
func ensurePreviewStarted(r *http.Request, job *RecordJob) {
if job == nil {
return
}
job.previewStartMu.Lock()
defer job.previewStartMu.Unlock()
jobsMu.Lock()
if job.previewCmd != nil && job.PreviewDir != "" {
job.previewLastHit = time.Now()
jobsMu.Unlock()
return
}
m3u8 := strings.TrimSpace(job.PreviewM3U8)
cookie := strings.TrimSpace(job.PreviewCookie)
ua := strings.TrimSpace(job.PreviewUA)
jobsMu.Unlock()
if m3u8 == "" {
return
}
pctx, cancel := context.WithCancel(context.Background())
assetID := assetIDForJob(job)
pdir := filepath.Join(os.TempDir(), "rec_preview", assetID)
jobsMu.Lock()
job.PreviewDir = pdir
job.previewCancel = cancel
job.previewLastHit = time.Now()
jobsMu.Unlock()
_ = startPreviewHLS(pctx, job, m3u8, pdir, cookie, ua)
}
// ============================================================
// Live fMP4 (single request, chunked) via ffmpeg -> stdout
// Route: /api/preview/live-fmp4?id=<jobId>&hover=1
// ============================================================
func recordPreviewLiveFMP4(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "Nur GET erlaubt", http.StatusMethodNotAllowed)
return
}
id := strings.TrimSpace(r.URL.Query().Get("id"))
if id == "" {
http.Error(w, "id fehlt", http.StatusBadRequest)
return
}
// activation: hover or play=1 (wie bei HLS)
active := isHover(r) || strings.TrimSpace(r.URL.Query().Get("play")) == "1"
if !active {
http.Error(w, "preview not active", http.StatusNotFound)
return
}
jobsMu.Lock()
job, ok := jobs[id]
state := ""
if ok && job != nil {
state = strings.TrimSpace(job.PreviewState)
}
jobsMu.Unlock()
if !ok || job == nil {
http.Error(w, "job nicht gefunden", http.StatusNotFound)
return
}
username := extractUsername(job.SourceURL)
if strings.TrimSpace(username) != "" {
cookie := strings.TrimSpace(job.PreviewCookie)
ua := strings.TrimSpace(job.PreviewUA)
if ua == "" {
ua = "Mozilla/5.0"
}
ctxRefresh, cancelRefresh := context.WithTimeout(r.Context(), 8*time.Second)
newHls, err := fetchCurrentBestHLS(ctxRefresh, username, cookie, ua)
cancelRefresh()
if err == nil && strings.TrimSpace(newHls) != "" {
jobsMu.Lock()
oldHls := strings.TrimSpace(job.PreviewM3U8)
job.PreviewM3U8 = strings.TrimSpace(newHls)
job.PreviewCookie = cookie
job.PreviewUA = ua
job.PreviewState = ""
job.PreviewStateAt = ""
job.PreviewStateMsg = ""
jobsMu.Unlock()
if oldHls != "" && oldHls != strings.TrimSpace(newHls) {
stopPreview(job)
}
}
}
// ensure ffmpeg preview input data exists
// (PreviewM3U8 + Cookie/UA werden beim Job gesetzt)
m3u8 := strings.TrimSpace(job.PreviewM3U8)
if m3u8 == "" {
http.Error(w, "preview m3u8 fehlt", http.StatusNotFound)
return
}
// states
if state == "private" {
http.Error(w, "model private", http.StatusForbidden)
return
}
if state == "offline" {
http.Error(w, "model offline", http.StatusNotFound)
return
}
if state == "error" {
http.Error(w, "preview error", http.StatusServiceUnavailable)
return
}
// Headers: fMP4 stream
w.Header().Set("Content-Type", `video/mp4`)
w.Header().Set("Cache-Control", "no-store")
w.Header().Set("X-Accel-Buffering", "no")
// Sehr wichtig: Flushbar?
flusher, okf := w.(http.Flusher)
if !okf {
http.Error(w, "Streaming nicht unterstützt", http.StatusInternalServerError)
return
}
// Client disconnect => ffmpeg stoppen
ctx := r.Context()
// Cookie/UA aus Job
cookie := strings.TrimSpace(job.PreviewCookie)
ua := strings.TrimSpace(job.PreviewUA)
if ua == "" {
ua = "Mozilla/5.0"
}
// ffmpeg args: input = m3u8, output = fragmented mp4 to stdout
// ✅ Video + Audio für Browser-Playback
args := []string{"-hide_banner", "-loglevel", "error"}
if ua != "" {
args = append(args, "-user_agent", ua)
}
if cookie != "" {
args = append(args, "-headers", fmt.Sprintf("Cookie: %s\r\n", cookie))
}
// Input
args = append(args, "-i", m3u8)
// Video + Audio encode (low-latency-ish)
args = append(args,
"-map", "0:v:0",
"-map", "0:a:0?",
"-vf", "scale=480:-2",
"-c:v", "libx264",
"-preset", "veryfast",
"-tune", "zerolatency",
"-pix_fmt", "yuv420p",
"-profile:v", "main",
"-level", "3.1",
"-g", "48",
"-keyint_min", "48",
"-sc_threshold", "0",
"-c:a", "aac",
"-b:a", "128k",
"-ac", "2",
"-ar", "48000",
)
// Output: fMP4 fragmented to stdout (single HTTP response)
args = append(args,
"-f", "mp4",
"-movflags", "frag_keyframe+empty_moov+default_base_moof",
"-frag_duration", "2000000", // 2s (µs)
"-min_frag_duration", "2000000",
"pipe:1",
)
cmd := exec.CommandContext(ctx, ffmpegPath, args...)
// stdout -> response
stdout, err := cmd.StdoutPipe()
if err != nil {
http.Error(w, "ffmpeg stdout pipe failed", http.StatusInternalServerError)
return
}
// stderr nur für Debug (optional)
var stderr bytes.Buffer
cmd.Stderr = &stderr
// Start
if err := cmd.Start(); err != nil {
http.Error(w, "ffmpeg start failed: "+err.Error(), http.StatusInternalServerError)
return
}
// Wenn Client weg => Prozess killt CommandContext sowieso (ctx cancels),
// aber wir kopieren streaming-mäßig.
buf := make([]byte, 32*1024)
for {
select {
case <-ctx.Done():
_ = cmd.Process.Kill()
return
default:
}
n, rerr := stdout.Read(buf)
if n > 0 {
_, _ = w.Write(buf[:n])
flusher.Flush()
}
if rerr != nil {
if rerr == io.EOF {
break
}
break
}
}
// Wait (verhindert Zombies)
_ = cmd.Wait()
}