sse update

This commit is contained in:
Linrador 2026-03-09 14:21:36 +01:00
parent 6f12d3c2b1
commit ceb310a428
27 changed files with 2207 additions and 984 deletions

View File

@ -271,6 +271,74 @@ func indexLiteByUser(rooms []ChaturbateRoom) map[string]ChaturbateOnlineRoomLite
return m
}
func syncChaturbateRoomStateIntoModelStore(store *ModelStore, rooms []ChaturbateRoom, fetchedAt time.Time) {
if store == nil {
return
}
seenOnline := make(map[string]bool, len(rooms))
for _, rm := range rooms {
modelKey := strings.ToLower(strings.TrimSpace(rm.Username))
if modelKey == "" {
continue
}
seenOnline[modelKey] = true
img := strings.TrimSpace(rm.ImageURL360)
if img == "" {
img = strings.TrimSpace(rm.ImageURL)
}
_ = store.SetChaturbateRoomState(
"chaturbate.com",
modelKey,
rm.CurrentShow,
true,
rm.ChatRoomURL,
img,
fetchedAt,
)
if sm, ok := store.GetByHostAndModelKey("chaturbate.com", modelKey); ok {
publishRoomStateForModel(sm)
}
}
// bekannte Chaturbate-Models, die NICHT im Online-Snapshot sind => offline setzen
models := store.List()
for _, m := range models {
if strings.ToLower(strings.TrimSpace(m.Host)) != "chaturbate.com" {
continue
}
modelKey := strings.ToLower(strings.TrimSpace(m.ModelKey))
if modelKey == "" {
continue
}
if seenOnline[modelKey] {
continue
}
_ = store.SetChaturbateRoomState(
"chaturbate.com",
modelKey,
"offline",
false,
"",
"",
fetchedAt,
)
if sm, ok := store.GetByHostAndModelKey("chaturbate.com", modelKey); ok {
publishRoomStateForModel(sm)
}
}
}
// --- Profilbild Download + Persist (online -> offline) ---
func selectBestRoomImageURL(rm ChaturbateRoom) string {
@ -456,11 +524,15 @@ func startChaturbateOnlinePoller(store *ModelStore) {
// ✅ Alle bekannten Chaturbate-Models in der DB mit aktuellem Online-Snapshot synchronisieren
if cbModelStore != nil {
go func(roomsCopy []ChaturbateRoom, fetchedAt time.Time) {
if err := cbModelStore.SyncChaturbateOnlineForKnownModels(roomsCopy, fetchedAt); err != nil && verboseLogs() {
fmt.Println("⚠️ [chaturbate] sync known models failed:", err)
go syncChaturbateRoomStateIntoModelStore(
cbModelStore,
append([]ChaturbateRoom(nil), rooms...),
fetchedAtNow,
)
if len(rooms) > 0 {
cbModelStore.FillMissingTagsFromChaturbateOnline(rooms)
}
}(append([]ChaturbateRoom(nil), rooms...), fetchedAtNow)
}
// Tags übernehmen ist teuer -> nur selten + im Hintergrund
@ -611,7 +683,8 @@ func refreshRunningJobsHLS(userLower string, newHls string, cookie string, ua st
return
}
changedAny := false
toStop := make([]*RecordJob, 0, 4)
changedJobs := make([]*RecordJob, 0, 4)
jobsMu.Lock()
for _, j := range jobs {
@ -628,21 +701,25 @@ func refreshRunningJobsHLS(userLower string, newHls string, cookie string, ua st
j.PreviewCookie = cookie
j.PreviewUA = ua
// Wenn ffmpeg schon läuft und sich Quelle geändert hat -> hart stoppen
if old != "" && old != newHls {
stopPreview(j)
// PreviewState zurücksetzen (damit "private/offline" nicht hängen bleibt)
j.PreviewState = ""
j.PreviewStateAt = ""
j.PreviewStateMsg = ""
toStop = append(toStop, j)
}
changedAny = true
if !j.Hidden {
changedJobs = append(changedJobs, j)
}
}
jobsMu.Unlock()
if changedAny {
notifyJobsChanged()
for _, j := range toStop {
stopPreview(j)
}
for _, j := range changedJobs {
publishJobUpsert(j)
}
}

View File

@ -86,6 +86,7 @@ func stopJobsInternal(list []*RecordJob) {
}
pl := make([]payload, 0, len(list))
visibleJobs := make([]*RecordJob, 0, len(list))
jobsMu.Lock()
for _, job := range list {
@ -96,10 +97,16 @@ func stopJobsInternal(list []*RecordJob) {
job.Progress = 10
pl = append(pl, payload{cmd: job.previewCmd, cancel: job.cancel})
job.previewCmd = nil
if !job.Hidden {
visibleJobs = append(visibleJobs, job)
}
}
jobsMu.Unlock()
notifyJobsChanged() // 1) UI sofort updaten (Phase/Progress)
for _, job := range visibleJobs {
publishJobUpsert(job)
}
for _, p := range pl {
if p.cmd != nil && p.cmd.Process != nil {
@ -110,7 +117,9 @@ func stopJobsInternal(list []*RecordJob) {
}
}
notifyJobsChanged() // 2) optional: nach Cancel/Kill nochmal pushen
for _, job := range visibleJobs {
publishJobUpsert(job)
}
}
func stopAllStoppableJobs() int {

View File

@ -358,8 +358,12 @@ func startPreviewHLS(ctx context.Context, job *RecordJob, m3u8URL, previewDir, h
job.PreviewState = ""
job.PreviewStateAt = ""
job.PreviewStateMsg = ""
hidden := job.Hidden
jobsMu.Unlock()
notifyJobsChanged()
if !hidden {
publishJobUpsert(job)
}
commonIn := []string{"-y"}
if strings.TrimSpace(userAgent) != "" {
@ -427,8 +431,12 @@ func startPreviewHLS(ctx context.Context, job *RecordJob, m3u8URL, previewDir, h
job.PreviewStateMsg = st
}
}
hidden := job.Hidden
jobsMu.Unlock()
notifyJobsChanged()
if !hidden {
publishJobUpsert(job)
}
fmt.Printf("⚠️ preview hq ffmpeg failed: %v (%s)\n", err, st)
}

View File

@ -271,7 +271,7 @@ func publishJob(jobID string) bool {
j.Hidden = false
jobsMu.Unlock()
notifyJobsChanged()
publishJobUpsert(j)
return true
}
@ -784,6 +784,9 @@ func shouldAutoDeleteSmallDownload(filePath string) (bool, int64, int64) {
}
func setJobPhase(job *RecordJob, phase string, progress int) {
if job == nil {
return
}
if progress < 0 {
progress = 0
}
@ -794,10 +797,12 @@ func setJobPhase(job *RecordJob, phase string, progress int) {
jobsMu.Lock()
job.Phase = phase
job.Progress = progress
hidden := job.Hidden
jobsMu.Unlock()
notifyJobsChanged()
if !hidden {
publishJobUpsert(job)
}
}
func durationSecondsCached(ctx context.Context, path string) (float64, error) {
@ -1659,7 +1664,8 @@ func removeJobsByOutputBasename(file string) {
return
}
removed := false
removedJobs := make([]*RecordJob, 0, 4)
jobsMu.Lock()
for id, j := range jobs {
if j == nil {
@ -1670,14 +1676,16 @@ func removeJobsByOutputBasename(file string) {
continue
}
if filepath.Base(out) == file {
if !j.Hidden {
removedJobs = append(removedJobs, j)
}
delete(jobs, id)
removed = true
}
}
jobsMu.Unlock()
if removed {
notifyJobsChanged()
for _, j := range removedJobs {
publishJobRemove(j)
}
}
@ -1688,7 +1696,8 @@ func renameJobsOutputBasename(oldFile, newFile string) {
return
}
changed := false
changedJobs := make([]*RecordJob, 0, 4)
jobsMu.Lock()
for _, j := range jobs {
if j == nil {
@ -1700,13 +1709,15 @@ func renameJobsOutputBasename(oldFile, newFile string) {
}
if filepath.Base(out) == oldFile {
j.Output = filepath.Join(filepath.Dir(out), newFile)
changed = true
if !j.Hidden {
changedJobs = append(changedJobs, j)
}
}
}
jobsMu.Unlock()
if changed {
notifyJobsChanged()
for _, j := range changedJobs {
publishJobUpsert(j)
}
}

View File

@ -11,9 +11,28 @@ import (
"net/url"
"strconv"
"strings"
"sync"
"time"
)
var (
modelStoreMu sync.RWMutex
modelStoreRef *ModelStore
)
func getModelStore() *ModelStore {
modelStoreMu.RLock()
defer modelStoreMu.RUnlock()
return modelStoreRef
}
func setModelStore(store *ModelStore) {
modelStoreMu.Lock()
defer modelStoreMu.Unlock()
modelStoreRef = store
setCoverModelStore(store)
}
// ✅ umbenannt, damit es nicht mit models.go kollidiert
func modelsWriteJSON(w http.ResponseWriter, status int, v any) {
w.Header().Set("Content-Type", "application/json; charset=utf-8")
@ -220,6 +239,7 @@ func importModelsCSV(store *ModelStore, r io.Reader, kind string) (importResult,
}
func RegisterModelAPI(mux *http.ServeMux, store *ModelStore) {
setModelStore(store)
// ✅ NEU: Parse-Endpoint (nur URL erlaubt)
mux.HandleFunc("/api/models/parse", func(w http.ResponseWriter, r *http.Request) {
@ -245,7 +265,15 @@ func RegisterModelAPI(mux *http.ServeMux, store *ModelStore) {
modelsWriteJSON(w, http.StatusMethodNotAllowed, map[string]string{"error": "method not allowed"})
return
}
modelsWriteJSON(w, http.StatusOK, store.Meta())
store := getModelStore()
if store == nil {
modelsWriteJSON(w, http.StatusServiceUnavailable, map[string]string{"error": "model store nicht verfügbar"})
return
}
meta := store.Meta()
modelsWriteJSON(w, http.StatusOK, meta)
})
mux.HandleFunc("/api/models/watched", func(w http.ResponseWriter, r *http.Request) {
@ -254,6 +282,13 @@ func RegisterModelAPI(mux *http.ServeMux, store *ModelStore) {
return
}
host := strings.TrimSpace(r.URL.Query().Get("host"))
store := getModelStore()
if store == nil {
modelsWriteJSON(w, http.StatusServiceUnavailable, map[string]string{"error": "model store nicht verfügbar"})
return
}
modelsWriteJSON(w, http.StatusOK, store.ListWatchedLite(host))
})
@ -265,8 +300,13 @@ func RegisterModelAPI(mux *http.ServeMux, store *ModelStore) {
// ✅ Wenn du List() als ([]T, error) hast -> Fehler sichtbar machen:
// Falls List() aktuell nur []T zurückgibt, siehe Schritt 2 unten.
list := store.List()
store := getModelStore()
if store == nil {
modelsWriteJSON(w, http.StatusServiceUnavailable, map[string]string{"error": "model store nicht verfügbar"})
return
}
list := store.List()
modelsWriteJSON(w, http.StatusOK, list)
})
@ -283,6 +323,12 @@ func RegisterModelAPI(mux *http.ServeMux, store *ModelStore) {
return
}
store := getModelStore()
if store == nil {
modelsWriteJSON(w, http.StatusServiceUnavailable, map[string]string{"error": "model store nicht verfügbar"})
return
}
mime, data, ok, err := store.GetProfileImageByID(id)
if err != nil {
modelsWriteJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
@ -310,6 +356,12 @@ func RegisterModelAPI(mux *http.ServeMux, store *ModelStore) {
return
}
store := getModelStore()
if store == nil {
modelsWriteJSON(w, http.StatusServiceUnavailable, map[string]string{"error": "model store nicht verfügbar"})
return
}
host := strings.TrimSpace(r.FormValue("host"))
modelKey := strings.TrimSpace(r.FormValue("modelKey"))
sourceURL := strings.TrimSpace(r.FormValue("sourceUrl"))
@ -368,6 +420,12 @@ func RegisterModelAPI(mux *http.ServeMux, store *ModelStore) {
return
}
store := getModelStore()
if store == nil {
modelsWriteJSON(w, http.StatusServiceUnavailable, map[string]string{"error": "model store nicht verfügbar"})
return
}
m, err := store.UpsertFromParsed(req)
if err != nil {
modelsWriteJSON(w, http.StatusBadRequest, map[string]string{"error": err.Error()})
@ -402,6 +460,12 @@ func RegisterModelAPI(mux *http.ServeMux, store *ModelStore) {
return
}
store := getModelStore()
if store == nil {
modelsWriteJSON(w, http.StatusServiceUnavailable, map[string]string{"error": "model store nicht verfügbar"})
return
}
m, err := store.EnsureByHostModelKey(host, key)
if err != nil {
modelsWriteJSON(w, http.StatusBadRequest, map[string]string{"error": err.Error()})
@ -435,6 +499,12 @@ func RegisterModelAPI(mux *http.ServeMux, store *ModelStore) {
}
defer f.Close()
store := getModelStore()
if store == nil {
modelsWriteJSON(w, http.StatusServiceUnavailable, map[string]string{"error": "model store nicht verfügbar"})
return
}
res, err := importModelsCSV(store, f, kind)
if err != nil {
modelsWriteJSON(w, http.StatusBadRequest, map[string]string{"error": err.Error()})
@ -471,6 +541,13 @@ func RegisterModelAPI(mux *http.ServeMux, store *ModelStore) {
}
req.ID = ensured.ID
}
store := getModelStore()
if store == nil {
modelsWriteJSON(w, http.StatusServiceUnavailable, map[string]string{"error": "model store nicht verfügbar"})
return
}
m, err := store.PatchFlags(req)
if err != nil {
modelsWriteJSON(w, http.StatusBadRequest, map[string]string{"error": err.Error()})
@ -496,6 +573,13 @@ func RegisterModelAPI(mux *http.ServeMux, store *ModelStore) {
var req struct {
ID string `json:"id"`
}
store := getModelStore()
if store == nil {
modelsWriteJSON(w, http.StatusServiceUnavailable, map[string]string{"error": "model store nicht verfügbar"})
return
}
if err := modelsReadJSON(r, &req); err != nil {
modelsWriteJSON(w, http.StatusBadRequest, map[string]string{"error": err.Error()})
return

View File

@ -1,4 +1,5 @@
// backend/models_store.go
package main
import (
@ -36,6 +37,14 @@ type StoredModel struct {
ProfileImageCached string `json:"profileImageCached,omitempty"` // z.B. /api/models/image?id=...
ProfileImageUpdatedAt string `json:"profileImageUpdatedAt,omitempty"` // RFC3339Nano
RoomStatus string `json:"roomStatus,omitempty"`
IsOnline bool `json:"isOnline,omitempty"`
ChatRoomURL string `json:"chatRoomUrl,omitempty"`
ImageURL string `json:"imageUrl,omitempty"`
LastOnlineAt string `json:"lastOnlineAt,omitempty"`
LastOfflineAt string `json:"lastOfflineAt,omitempty"`
LastRoomSyncAt string `json:"lastRoomSyncAt,omitempty"`
Watching bool `json:"watching"`
Favorite bool `json:"favorite"`
Hot bool `json:"hot"`
@ -184,6 +193,105 @@ func (s *ModelStore) init() error {
return nil
}
func (s *ModelStore) SetChaturbateRoomState(
host string,
modelKey string,
roomStatus string,
isOnline bool,
chatRoomURL string,
imageURL string,
seenAt time.Time,
) error {
if err := s.ensureInit(); err != nil {
return err
}
host = canonicalHost(host)
modelKey = strings.TrimSpace(modelKey)
roomStatus = strings.ToLower(strings.TrimSpace(roomStatus))
chatRoomURL = strings.TrimSpace(chatRoomURL)
imageURL = strings.TrimSpace(imageURL)
if host == "" {
host = "chaturbate.com"
}
if modelKey == "" {
return errors.New("modelKey fehlt")
}
seenAt = seenAt.UTC()
now := time.Now().UTC()
s.mu.Lock()
defer s.mu.Unlock()
_, err := s.db.Exec(`
UPDATE models
SET
room_status = $1,
is_online = $2,
chat_room_url = $3,
image_url = CASE
WHEN COALESCE(trim($4), '') <> '' THEN $4
ELSE image_url
END,
last_room_sync_at = $5,
last_online_at = CASE
WHEN $2 = true THEN $5
ELSE last_online_at
END,
last_offline_at = CASE
WHEN $2 = false THEN $5
ELSE last_offline_at
END,
updated_at = $6
WHERE lower(trim(host)) = lower(trim($7))
AND lower(trim(model_key)) = lower(trim($8));
`,
roomStatus,
isOnline,
nullableStringArg(chatRoomURL),
nullableStringArg(imageURL),
seenAt,
now,
host,
modelKey,
)
return err
}
func (s *ModelStore) GetByHostAndModelKey(host string, modelKey string) (*StoredModel, bool) {
if err := s.ensureInit(); err != nil {
return nil, false
}
host = canonicalHost(host)
modelKey = strings.TrimSpace(modelKey)
if host == "" || modelKey == "" {
return nil, false
}
var id string
err := s.db.QueryRow(`
SELECT id
FROM models
WHERE lower(trim(host)) = lower(trim($1))
AND lower(trim(model_key)) = lower(trim($2))
LIMIT 1;
`, host, modelKey).Scan(&id)
if err != nil {
return nil, false
}
m, err := s.getByID(id)
if err != nil {
return nil, false
}
return &m, true
}
func canonicalHost(host string) string {
h := strings.ToLower(strings.TrimSpace(host))
h = strings.TrimPrefix(h, "www.")
@ -841,10 +949,17 @@ SELECT
last_seen_online_at,
COALESCE(cb_online_json,''),
cb_online_fetched_at,
COALESCE(cb_online_last_error,''), -- optional
COALESCE(cb_online_last_error,''),
COALESCE(profile_image_url,''),
profile_image_updated_at,
CASE WHEN profile_image_blob IS NOT NULL AND octet_length(profile_image_blob) > 0 THEN 1 ELSE 0 END as has_profile_image,
COALESCE(room_status,'') as room_status,
COALESCE(is_online,false) as is_online,
COALESCE(chat_room_url,'') as chat_room_url,
COALESCE(image_url,'') as image_url,
last_online_at,
last_offline_at,
last_room_sync_at,
watching,favorite,hot,keep,liked,
created_at, updated_at
FROM models
@ -865,10 +980,17 @@ SELECT
last_seen_online_at,
COALESCE(cb_online_json,''),
cb_online_fetched_at,
''::text as cb_online_last_error, -- fallback dummy
''::text as cb_online_last_error,
COALESCE(profile_image_url,''),
profile_image_updated_at,
CASE WHEN profile_image_blob IS NOT NULL AND octet_length(profile_image_blob) > 0 THEN 1 ELSE 0 END as has_profile_image,
COALESCE(room_status,'') as room_status,
COALESCE(is_online,false) as is_online,
COALESCE(chat_room_url,'') as chat_room_url,
COALESCE(image_url,'') as image_url,
last_online_at,
last_offline_at,
last_room_sync_at,
watching,favorite,hot,keep,liked,
created_at, updated_at
FROM models
@ -907,6 +1029,14 @@ ORDER BY updated_at DESC;
profileImageUpdatedAt sql.NullTime
hasProfileImage int64
roomStatus string
isOnline bool
chatRoomURL string
imageURL string
lastOnlineAt sql.NullTime
lastOfflineAt sql.NullTime
lastRoomSyncAt sql.NullTime
watching, favorite, hot, keep bool
liked sql.NullBool
@ -919,6 +1049,8 @@ ORDER BY updated_at DESC;
&lastSeenOnline, &lastSeenOnlineAt,
&cbOnlineJSON, &cbOnlineFetchedAt, &cbOnlineLastError,
&profileImageURL, &profileImageUpdatedAt, &hasProfileImage,
&roomStatus, &isOnline, &chatRoomURL, &imageURL,
&lastOnlineAt, &lastOfflineAt, &lastRoomSyncAt,
&watching, &favorite, &hot, &keep, &liked,
&createdAt, &updatedAt,
); err != nil {
@ -941,6 +1073,17 @@ ORDER BY updated_at DESC;
CbOnlineFetchedAt: fmtNullTime(cbOnlineFetchedAt),
CbOnlineLastError: cbOnlineLastError,
ProfileImageURL: profileImageURL,
ProfileImageUpdatedAt: fmtNullTime(profileImageUpdatedAt),
RoomStatus: roomStatus,
IsOnline: isOnline,
ChatRoomURL: chatRoomURL,
ImageURL: imageURL,
LastOnlineAt: fmtNullTime(lastOnlineAt),
LastOfflineAt: fmtNullTime(lastOfflineAt),
LastRoomSyncAt: fmtNullTime(lastRoomSyncAt),
Watching: watching,
Favorite: favorite,
Hot: hot,
@ -949,9 +1092,6 @@ ORDER BY updated_at DESC;
CreatedAt: fmtTime(createdAt),
UpdatedAt: fmtTime(updatedAt),
ProfileImageURL: profileImageURL,
ProfileImageUpdatedAt: fmtNullTime(profileImageUpdatedAt),
}
if hasProfileImage != 0 {
@ -1597,6 +1737,14 @@ func (s *ModelStore) getByID(id string) (StoredModel, error) {
profileImageUpdatedAt sql.NullTime
hasProfileImage int64
roomStatus string
isOnline bool
chatRoomURL string
imageURL string
lastOnlineAt sql.NullTime
lastOfflineAt sql.NullTime
lastRoomSyncAt sql.NullTime
watching, favorite, hot, keep bool
liked sql.NullBool
@ -1621,6 +1769,13 @@ SELECT
COALESCE(profile_image_url,''),
profile_image_updated_at,
CASE WHEN profile_image_blob IS NOT NULL AND octet_length(profile_image_blob) > 0 THEN 1 ELSE 0 END as has_profile_image,
COALESCE(room_status,'') as room_status,
COALESCE(is_online,false) as is_online,
COALESCE(chat_room_url,'') as chat_room_url,
COALESCE(image_url,'') as image_url,
last_online_at,
last_offline_at,
last_room_sync_at,
watching,favorite,hot,keep,liked,
created_at, updated_at
FROM models
@ -1646,6 +1801,13 @@ SELECT
COALESCE(profile_image_url,''),
profile_image_updated_at,
CASE WHEN profile_image_blob IS NOT NULL AND octet_length(profile_image_blob) > 0 THEN 1 ELSE 0 END as has_profile_image,
COALESCE(room_status,'') as room_status,
COALESCE(is_online,false) as is_online,
COALESCE(chat_room_url,'') as chat_room_url,
COALESCE(image_url,'') as image_url,
last_online_at,
last_offline_at,
last_room_sync_at,
watching,favorite,hot,keep,liked,
created_at, updated_at
FROM models
@ -1659,6 +1821,8 @@ WHERE id=$1;
&lastSeenOnline, &lastSeenOnlineAt,
&cbOnlineJSON, &cbOnlineFetchedAt, &cbOnlineLastError,
&profileImageURL, &profileImageUpdatedAt, &hasProfileImage,
&roomStatus, &isOnline, &chatRoomURL, &imageURL,
&lastOnlineAt, &lastOfflineAt, &lastRoomSyncAt,
&watching, &favorite, &hot, &keep, &liked,
&createdAt, &updatedAt,
)
@ -1699,6 +1863,14 @@ WHERE id=$1;
CbOnlineFetchedAt: fmtNullTime(cbOnlineFetchedAt),
CbOnlineLastError: cbOnlineLastError,
RoomStatus: roomStatus,
IsOnline: isOnline,
ChatRoomURL: chatRoomURL,
ImageURL: imageURL,
LastOnlineAt: fmtNullTime(lastOnlineAt),
LastOfflineAt: fmtNullTime(lastOfflineAt),
LastRoomSyncAt: fmtNullTime(lastRoomSyncAt),
Watching: watching,
Favorite: favorite,
Hot: hot,

View File

@ -179,9 +179,8 @@ func mfcAbortIfNoOutput(jobID string, maxWait time.Duration) {
delete(jobs, jobID)
jobsMu.Unlock()
// ✅ wenn der Job nie sichtbar war, nicht unnötig UI refreshen
if wasVisible {
notifyJobsChanged()
publishJobRemove(job)
}
}

View File

@ -211,7 +211,7 @@ func startPostWorkStatusRefresher() {
defer t.Stop()
for range t.C {
changed := false
changedJobs := make([]*RecordJob, 0, 16)
jobsMu.Lock()
for _, job := range jobs {
@ -222,17 +222,16 @@ func startPostWorkStatusRefresher() {
st := postWorkQ.StatusForKey(key)
// ✅ Kein Typname nötig: job.PostWork ist *<StatusType>, st ist <StatusType>
if job.PostWork == nil || !reflect.DeepEqual(*job.PostWork, st) {
tmp := st
job.PostWork = &tmp
changed = true
changedJobs = append(changedJobs, job)
}
}
jobsMu.Unlock()
if changed {
notifyJobsChanged()
for _, job := range changedJobs {
publishJobUpsert(job)
}
}
}()

View File

@ -1594,7 +1594,6 @@ func recordDeleteVideo(w http.ResponseWriter, r *http.Request) {
removeJobsByOutputBasename(file)
notifyDoneChanged()
notifyJobsChanged()
respondJSON(w, map[string]any{
"ok": true,
@ -1976,7 +1975,6 @@ func recordToggleHot(w http.ResponseWriter, r *http.Request) {
renameJobsOutputBasename(file, newFile)
notifyDoneChanged()
notifyJobsChanged()
respondJSON(w, map[string]any{
"ok": true,

View File

@ -114,7 +114,7 @@ func RecordStream(
jobsMu.Lock()
job.SizeBytes = written
jobsMu.Unlock()
notifyJobsChanged()
publishJobUpsert(job)
lastPush = now
lastBytes = written

View File

@ -396,7 +396,7 @@ func handleM3U8Mode(ctx context.Context, m3u8URL, outFile string, job *RecordJob
jobsMu.Lock()
job.SizeBytes = sz
jobsMu.Unlock()
notifyJobsChanged()
publishJobUpsert(job)
last = sz
}
}

View File

@ -31,18 +31,19 @@ func setJobProgress(job *RecordJob, phase string, pct int) {
}
type rng struct{ start, end int }
rangeFor := func(ph string) rng {
switch ph {
case "postwork":
return rng{0, 5}
return rng{0, 8}
case "remuxing":
return rng{5, 65}
return rng{8, 42}
case "moving":
return rng{65, 75}
return rng{42, 58}
case "probe":
return rng{75, 80}
return rng{58, 72}
case "assets":
return rng{80, 99}
return rng{72, 99}
default:
return rng{0, 100}
}
@ -62,34 +63,35 @@ func setJobProgress(job *RecordJob, phase string, pct int) {
job.Phase = phase
}
if phaseLower == "postwork" && pct == 0 {
job.Progress = 0
// recording = direkter Prozentwert
if !inPostwork {
if pct < job.Progress {
pct = job.Progress
}
job.Progress = pct
return
}
mapped := pct
if inPostwork {
// postwork-Phasen: pct ist IMMER lokal 0..100 innerhalb der Phase
r := rangeFor(phaseLower)
if r.end >= r.start {
if pct >= r.start && pct <= r.end {
mapped = pct
} else {
width := float64(r.end - r.start)
mapped := r.start
if width > 0 {
mapped = r.start + int(math.Round((float64(pct)/100.0)*width))
}
if mapped < r.start {
mapped = r.start
}
if mapped > r.end {
mapped = r.end
}
}
}
if mapped < job.Progress {
mapped = job.Progress
}
job.Progress = mapped
}
@ -263,7 +265,7 @@ func startRecordingInternal(req RecordRequest) (*RecordJob, error) {
j.Hidden = false
jobsMu.Unlock()
notifyJobsChanged()
publishJobUpsert(j)
return j, nil
}
@ -314,7 +316,7 @@ func startRecordingInternal(req RecordRequest) (*RecordJob, error) {
jobsMu.Unlock()
if !job.Hidden {
notifyJobsChanged()
publishJobUpsert(job)
}
go runJob(ctx, job, req)
@ -346,7 +348,7 @@ func runJob(ctx context.Context, job *RecordJob, req RecordRequest) {
}
setJobProgress(job, "recording", 0)
notifyJobsChanged()
publishJobUpsert(job)
switch provider {
case "chaturbate":
@ -379,7 +381,7 @@ func runJob(ctx context.Context, job *RecordJob, req RecordRequest) {
jobsMu.Lock()
job.Output = outPath
jobsMu.Unlock()
notifyJobsChanged()
publishJobUpsert(job)
}
err = RecordStream(ctx, hc, "https://chaturbate.com/", username, outPath, req.Cookie, job)
@ -400,7 +402,7 @@ func runJob(ctx context.Context, job *RecordJob, req RecordRequest) {
jobsMu.Lock()
job.Output = outPath
jobsMu.Unlock()
notifyJobsChanged()
publishJobUpsert(job)
err = RecordStreamMFC(ctx, hc, username, outPath, job)
@ -436,7 +438,7 @@ func runJob(ctx context.Context, job *RecordJob, req RecordRequest) {
job.Phase = "postwork"
out := strings.TrimSpace(job.Output)
jobsMu.Unlock()
notifyJobsChanged()
publishJobUpsert(job)
if out == "" {
jobsMu.Lock()
@ -446,7 +448,7 @@ func runJob(ctx context.Context, job *RecordJob, req RecordRequest) {
job.PostWorkKey = ""
job.PostWork = nil
jobsMu.Unlock()
notifyJobsChanged()
publishJobUpsert(job)
notifyDoneChanged()
return
}
@ -462,7 +464,7 @@ func runJob(ctx context.Context, job *RecordJob, req RecordRequest) {
delete(jobs, job.ID)
jobsMu.Unlock()
notifyJobsChanged()
publishJobRemove(job)
notifyDoneChanged()
if shouldLogRecordInfo(req) {
@ -485,7 +487,7 @@ func runJob(ctx context.Context, job *RecordJob, req RecordRequest) {
jobsMu.Lock()
job.SizeBytes = fi.Size()
jobsMu.Unlock()
notifyJobsChanged()
publishJobUpsert(job)
s := getSettings()
minMB := s.AutoDeleteSmallDownloadsBelowMB
@ -508,7 +510,7 @@ func runJob(ctx context.Context, job *RecordJob, req RecordRequest) {
delete(jobs, job.ID)
jobsMu.Unlock()
notifyJobsChanged()
publishJobRemove(job)
notifyDoneChanged()
if shouldLogRecordInfo(req) {
@ -536,7 +538,7 @@ func runJob(ctx context.Context, job *RecordJob, req RecordRequest) {
job.PostWork = &s
}
jobsMu.Unlock()
notifyJobsChanged()
publishJobUpsert(job)
okQueued := postWorkQ.Enqueue(PostWorkTask{
Key: postKey,
@ -549,7 +551,7 @@ func runJob(ctx context.Context, job *RecordJob, req RecordRequest) {
jobsMu.Unlock()
setJobProgress(job, "postwork", 0)
notifyJobsChanged()
publishJobUpsert(job)
}
out := strings.TrimSpace(postOut)
@ -561,7 +563,7 @@ func runJob(ctx context.Context, job *RecordJob, req RecordRequest) {
job.PostWorkKey = ""
job.PostWork = nil
jobsMu.Unlock()
notifyJobsChanged()
publishJobUpsert(job)
notifyDoneChanged()
return nil
}
@ -572,23 +574,23 @@ func runJob(ctx context.Context, job *RecordJob, req RecordRequest) {
jobsMu.Lock()
job.PostWork = &st
jobsMu.Unlock()
notifyJobsChanged()
publishJobUpsert(job)
}
// 1) Remux
if strings.EqualFold(filepath.Ext(out), ".ts") {
setPhase("remuxing", 72)
setPhase("remuxing", 10)
if newOut, err2 := maybeRemuxTSForJob(job, out); err2 == nil && strings.TrimSpace(newOut) != "" {
out = strings.TrimSpace(newOut)
jobsMu.Lock()
job.Output = out
jobsMu.Unlock()
notifyJobsChanged()
publishJobUpsert(job)
}
}
// 2) Move to done
setPhase("moving", 78)
setPhase("moving", 10)
// ✅ auch nach Remux nochmal hart prüfen: keine 0-Byte-Dateien nach done verschieben
{
@ -601,7 +603,7 @@ func runJob(ctx context.Context, job *RecordJob, req RecordRequest) {
delete(jobs, job.ID)
jobsMu.Unlock()
notifyJobsChanged()
publishJobRemove(job)
notifyDoneChanged()
if shouldLogRecordInfo(req) {
@ -622,25 +624,25 @@ func runJob(ctx context.Context, job *RecordJob, req RecordRequest) {
jobsMu.Lock()
job.Output = out
jobsMu.Unlock()
notifyJobsChanged()
publishJobUpsert(job)
notifyDoneChanged()
}
// 3) Duration
setPhase("probe", 84)
setPhase("probe", 35)
{
dctx, cancel := context.WithTimeout(ctx, 6*time.Second)
if sec, derr := durationSecondsCached(dctx, out); derr == nil && sec > 0 {
jobsMu.Lock()
job.DurationSeconds = sec
jobsMu.Unlock()
notifyJobsChanged()
publishJobUpsert(job)
}
cancel()
}
// 4) Video props
setPhase("probe", 86)
setPhase("probe", 75)
{
pctx, cancel := context.WithTimeout(ctx, 6*time.Second)
w, h, fps, perr := probeVideoProps(pctx, out)
@ -651,17 +653,12 @@ func runJob(ctx context.Context, job *RecordJob, req RecordRequest) {
job.VideoHeight = h
job.FPS = fps
jobsMu.Unlock()
notifyJobsChanged()
publishJobUpsert(job)
}
}
// 5) Assets with progress
const (
assetsStart = 86
assetsEnd = 99
)
setPhase("assets", assetsStart)
setPhase("assets", 0)
lastPct := -1
lastTick := time.Time{}
@ -673,19 +670,22 @@ func runJob(ctx context.Context, job *RecordJob, req RecordRequest) {
if r > 1 {
r = 1
}
pct := assetsStart + int(math.Round(r*float64(assetsEnd-assetsStart)))
if pct < assetsStart {
pct = assetsStart
pct := int(math.Round(r * 100))
if pct < 0 {
pct = 0
}
if pct > assetsEnd {
pct = assetsEnd
if pct > 100 {
pct = 100
}
if pct == lastPct {
return
}
if !lastTick.IsZero() && time.Since(lastTick) < 150*time.Millisecond {
return
}
lastPct = pct
lastTick = time.Now()
setPhase("assets", pct)
@ -694,7 +694,7 @@ func runJob(ctx context.Context, job *RecordJob, req RecordRequest) {
if _, err := ensureAssetsForVideoWithProgressCtx(ctx, out, job.SourceURL, update); err != nil {
fmt.Println("⚠️ ensureAssetsForVideo:", err)
}
setPhase("assets", assetsEnd)
setPhase("assets", 100)
// Finalize
jobsMu.Lock()
@ -704,7 +704,7 @@ func runJob(ctx context.Context, job *RecordJob, req RecordRequest) {
job.PostWorkKey = ""
job.PostWork = nil
jobsMu.Unlock()
notifyJobsChanged()
publishJobUpsert(job)
notifyDoneChanged()
return nil
},
@ -715,7 +715,7 @@ func runJob(ctx context.Context, job *RecordJob, req RecordRequest) {
jobsMu.Lock()
job.PostWork = &st
jobsMu.Unlock()
notifyJobsChanged()
publishJobUpsert(job)
} else {
jobsMu.Lock()
job.Status = postTarget
@ -724,7 +724,7 @@ func runJob(ctx context.Context, job *RecordJob, req RecordRequest) {
job.PostWorkKey = ""
job.PostWork = nil
jobsMu.Unlock()
notifyJobsChanged()
publishJobUpsert(job)
notifyDoneChanged()
}
}

View File

@ -31,8 +31,6 @@ func registerRoutes(mux *http.ServeMux, auth *AuthManager) *ModelStore {
api.HandleFunc("/api/cookies", cookiesHandler)
api.HandleFunc("/api/events/stream", eventsStream)
api.HandleFunc("/api/record/done/stream", doneStream)
api.HandleFunc("/api/perf/stream", perfStreamHandler)
api.HandleFunc("/api/status/disk", diskStatusHandler)
@ -53,7 +51,6 @@ func registerRoutes(mux *http.ServeMux, auth *AuthManager) *ModelStore {
api.HandleFunc("/api/preview-scrubber/", recordPreviewScrubberFrame)
api.HandleFunc("/api/preview-sprite/", recordPreviewSprite)
api.HandleFunc("/api/record/list", recordList)
api.HandleFunc("/api/record/stream", recordStream)
api.HandleFunc("/api/record/done/meta", recordDoneMeta)
api.HandleFunc("/api/record/video", recordVideo)
api.HandleFunc("/api/record/done", recordDoneList)
@ -73,8 +70,6 @@ func registerRoutes(mux *http.ServeMux, auth *AuthManager) *ModelStore {
// Tasks
api.HandleFunc("/api/tasks/generate-assets", tasksGenerateAssets)
api.HandleFunc("/api/tasks/assets/stream", assetsStream)
// --------------------------
// 3) ModelStore (Postgres)
// DSN kommt aus Settings: databaseUrl + gespeichertes Passwort

View File

@ -332,12 +332,13 @@ func recordSettingsHandler(w http.ResponseWriter, r *http.Request) {
}
// 3) Wenn Frontend ein Passwort sendet, hat das Priorität.
current := getSettings()
plainPW := strings.TrimSpace(in.DBPassword)
if plainPW == "" {
plainPW = pwFromURL
}
// 4) Wenn wir ein neues Passwort haben: encrypten & speichern (nur encrypted!)
if plainPW != "" {
enc, err := encryptSettingString(plainPW)
if err != nil {
@ -345,8 +346,14 @@ func recordSettingsHandler(w http.ResponseWriter, r *http.Request) {
return
}
in.EncryptedDBPassword = enc
} else {
in.EncryptedDBPassword = current.EncryptedDBPassword
}
dbChanged :=
strings.TrimSpace(in.DatabaseURL) != strings.TrimSpace(current.DatabaseURL) ||
strings.TrimSpace(in.EncryptedDBPassword) != strings.TrimSpace(current.EncryptedDBPassword)
// ✅ Settings im RAM aktualisieren
settingsMu.Lock()
settings = in.RecorderSettings
@ -355,6 +362,24 @@ func recordSettingsHandler(w http.ResponseWriter, r *http.Request) {
// ✅ Settings auf Disk persistieren
saveSettingsToDisk()
// ✅ Wenn DB geändert wurde: ModelStore sofort auf neue DB umstellen
if dbChanged {
dsn, err := buildPostgresDSNFromSettings()
if err != nil {
http.Error(w, "ungültige Datenbank-Konfiguration: "+err.Error(), http.StatusBadRequest)
return
}
newStore := NewModelStore(dsn)
if err := newStore.Load(); err != nil {
http.Error(w, "Datenbank-Verbindung fehlgeschlagen: "+err.Error(), http.StatusBadRequest)
return
}
setModelStore(newStore)
setChaturbateOnlineModelStore(newStore)
}
// ✅ ffmpeg/ffprobe nach Änderungen neu bestimmen
// Tipp: wenn der User FFmpegPath explizit setzt, nutze den direkt.
if strings.TrimSpace(in.FFmpegPath) != "" {

View File

@ -6,6 +6,7 @@ import (
"encoding/json"
"net/http"
"sort"
"strings"
"sync/atomic"
"time"
@ -20,11 +21,286 @@ type appSSE struct {
var sseApp *appSSE
type jobEvent struct {
Type string `json:"type"`
Model string `json:"model"`
JobID string `json:"jobId"`
Status JobStatus `json:"status"`
Phase string `json:"phase,omitempty"`
Progress int `json:"progress,omitempty"`
SourceURL string `json:"sourceUrl,omitempty"`
Output string `json:"output,omitempty"`
StartedAt string `json:"startedAt,omitempty"`
StartedAtMs int64 `json:"startedAtMs,omitempty"`
EndedAt string `json:"endedAt,omitempty"`
EndedAtMs int64 `json:"endedAtMs,omitempty"`
SizeBytes int64 `json:"sizeBytes,omitempty"`
DurationSeconds float64 `json:"durationSeconds,omitempty"`
PreviewState string `json:"previewState,omitempty"`
RoomStatus string `json:"roomStatus,omitempty"`
IsOnline bool `json:"isOnline,omitempty"`
ModelImageURL string `json:"modelImageUrl,omitempty"`
ModelChatRoomURL string `json:"modelChatRoomUrl,omitempty"`
TS int64 `json:"ts"`
}
type ssePublishItem struct {
EventName string
Data []byte
}
func visibleJobEventsJSON() []ssePublishItem {
nowTs := time.Now().UnixMilli()
out := make([]ssePublishItem, 0, 64)
jobsMu.Lock()
defer jobsMu.Unlock()
for _, j := range jobs {
if j == nil || j.Hidden {
continue
}
eventName := sseModelEventNameForJob(j)
if eventName == "" {
continue
}
payload := jobEvent{
Type: "job_upsert",
Model: eventName,
JobID: j.ID,
Status: j.Status,
Phase: j.Phase,
Progress: j.Progress,
SourceURL: j.SourceURL,
Output: j.Output,
StartedAt: j.StartedAt.Format(time.RFC3339Nano),
StartedAtMs: j.StartedAtMs,
SizeBytes: j.SizeBytes,
DurationSeconds: j.DurationSeconds,
PreviewState: j.PreviewState,
TS: nowTs,
}
if sm := sseStoredModelForJob(j); sm != nil {
payload.RoomStatus = strings.ToLower(strings.TrimSpace(sm.RoomStatus))
payload.IsOnline = sm.IsOnline
payload.ModelImageURL = strings.TrimSpace(sm.ImageURL)
payload.ModelChatRoomURL = strings.TrimSpace(sm.ChatRoomURL)
}
if j.EndedAt != nil {
payload.EndedAt = j.EndedAt.Format(time.RFC3339Nano)
payload.EndedAtMs = j.EndedAtMs
}
b, err := json.Marshal(payload)
if err != nil {
continue
}
out = append(out, ssePublishItem{
EventName: eventName,
Data: b,
})
}
return out
}
func visibleRoomStateEventsJSON() []ssePublishItem {
nowTs := time.Now().UnixMilli()
out := make([]ssePublishItem, 0, 128)
if cbModelStore == nil {
return out
}
models := cbModelStore.List()
for _, sm := range models {
if strings.ToLower(strings.TrimSpace(sm.Host)) != "chaturbate.com" {
continue
}
modelKey := strings.ToLower(strings.TrimSpace(sm.ModelKey))
if modelKey == "" {
continue
}
payload := jobEvent{
Type: "room_state",
Model: modelKey,
RoomStatus: strings.ToLower(strings.TrimSpace(sm.RoomStatus)),
IsOnline: sm.IsOnline,
ModelImageURL: strings.TrimSpace(sm.ImageURL),
ModelChatRoomURL: strings.TrimSpace(sm.ChatRoomURL),
TS: nowTs,
}
b, err := json.Marshal(payload)
if err != nil {
continue
}
out = append(out, ssePublishItem{
EventName: modelKey,
Data: b,
})
}
return out
}
func publishJobUpsert(j *RecordJob) {
if j == nil || j.Hidden {
return
}
eventName := sseModelEventNameForJob(j)
if eventName == "" {
return
}
payload := jobEvent{
Type: "job_upsert",
Model: eventName,
JobID: j.ID,
Status: j.Status,
Phase: j.Phase,
Progress: j.Progress,
SourceURL: j.SourceURL,
Output: j.Output,
StartedAt: j.StartedAt.Format(time.RFC3339Nano),
StartedAtMs: j.StartedAtMs,
EndedAtMs: j.EndedAtMs,
SizeBytes: j.SizeBytes,
DurationSeconds: j.DurationSeconds,
PreviewState: j.PreviewState,
TS: time.Now().UnixMilli(),
}
if sm := sseStoredModelForJob(j); sm != nil {
payload.RoomStatus = strings.ToLower(strings.TrimSpace(sm.RoomStatus))
payload.IsOnline = sm.IsOnline
payload.ModelImageURL = strings.TrimSpace(sm.ImageURL)
payload.ModelChatRoomURL = strings.TrimSpace(sm.ChatRoomURL)
}
if j.EndedAt != nil {
payload.EndedAt = j.EndedAt.Format(time.RFC3339Nano)
}
b, _ := json.Marshal(payload)
publishSSE(eventName, b)
}
func publishJobRemove(j *RecordJob) {
if j == nil || j.Hidden {
return
}
eventName := sseModelEventNameForJob(j)
if eventName == "" {
return
}
b, _ := json.Marshal(jobEvent{
Type: "job_remove",
Model: eventName,
JobID: j.ID,
TS: time.Now().UnixMilli(),
})
publishSSE(eventName, b)
}
func publishRoomStateForModel(sm *StoredModel) {
if sm == nil {
return
}
if strings.ToLower(strings.TrimSpace(sm.Host)) != "chaturbate.com" {
return
}
modelKey := strings.ToLower(strings.TrimSpace(sm.ModelKey))
if modelKey == "" {
return
}
payload := jobEvent{
Type: "room_state",
Model: modelKey,
RoomStatus: strings.ToLower(strings.TrimSpace(sm.RoomStatus)),
IsOnline: sm.IsOnline,
ModelImageURL: strings.TrimSpace(sm.ImageURL),
ModelChatRoomURL: strings.TrimSpace(sm.ChatRoomURL),
TS: time.Now().UnixMilli(),
}
b, _ := json.Marshal(payload)
publishSSE(modelKey, b)
}
func sseModelEventNameForJob(j *RecordJob) string {
if j == nil {
return ""
}
src := strings.TrimSpace(j.SourceURL)
switch detectProvider(src) {
case "chaturbate":
if u := strings.TrimSpace(extractUsername(src)); u != "" {
return strings.ToLower(u)
}
case "mfc":
if u := strings.TrimSpace(extractMFCUsername(src)); u != "" {
return strings.ToLower(u)
}
}
return ""
}
func sseStoredModelForJob(j *RecordJob) *StoredModel {
if j == nil || cbModelStore == nil {
return nil
}
src := strings.TrimSpace(j.SourceURL)
if src == "" {
return nil
}
host := ""
modelKey := ""
switch detectProvider(src) {
case "chaturbate":
host = "chaturbate.com"
modelKey = strings.ToLower(strings.TrimSpace(extractUsername(src)))
default:
return nil
}
if host == "" || modelKey == "" {
return nil
}
m, ok := cbModelStore.GetByHostAndModelKey(host, modelKey)
if !ok {
return nil
}
return m
}
func initSSE() {
srv := sse.New()
srv.SplitData = true
// ✅ Nur noch EIN Stream
stream := srv.CreateStream("events")
stream.AutoReplay = false
@ -32,25 +308,6 @@ func initSSE() {
server: srv,
}
// Debounced broadcaster (jobs)
go func() {
for range recordJobsNotify {
time.Sleep(40 * time.Millisecond)
for {
select {
case <-recordJobsNotify:
default:
goto SEND
}
}
SEND:
b := jobsSnapshotJSON()
if len(b) > 0 {
publishSSE("jobs", b)
}
}
}()
// Debounced broadcaster (done changed)
go func() {
for range doneNotify {
@ -91,6 +348,53 @@ func initSSE() {
}
}
}()
// Per running job: 1 SSE event per second
go func() {
t := time.NewTicker(1 * time.Second)
defer t.Stop()
for range t.C {
events := visibleJobEventsJSON()
for _, ev := range events {
if len(ev.Data) == 0 || ev.EventName == "" {
continue
}
publishSSE(ev.EventName, ev.Data)
}
}
}()
// Room state snapshot: 1 SSE event per second for known chaturbate models
go func() {
t := time.NewTicker(1 * time.Second)
defer t.Stop()
lastByModel := map[string]string{}
for range t.C {
events := visibleRoomStateEventsJSON()
nextByModel := make(map[string]string, len(events))
for _, ev := range events {
if len(ev.Data) == 0 || ev.EventName == "" {
continue
}
key := ev.EventName
payloadKey := string(ev.Data)
nextByModel[key] = payloadKey
if prev, ok := lastByModel[key]; ok && prev == payloadKey {
continue
}
publishSSE(ev.EventName, ev.Data)
}
lastByModel = nextByModel
}
}()
}
func publishSSE(eventName string, data []byte) {
@ -113,7 +417,6 @@ var (
doneNotify = make(chan struct{}, 1)
doneSeq uint64
recordJobsNotify = make(chan struct{}, 1)
assetsNotify = make(chan struct{}, 1)
)
@ -124,13 +427,6 @@ func notifyDoneChanged() {
}
}
func notifyJobsChanged() {
select {
case recordJobsNotify <- struct{}{}:
default:
}
}
func notifyAssetsChanged() {
select {
case assetsNotify <- struct{}{}:
@ -192,22 +488,6 @@ func eventsStream(w http.ResponseWriter, r *http.Request) {
sseApp.server.ServeHTTP(w, r2)
}
// -------------------- optional compatibility handlers --------------------
// Falls du alte Routen noch kurz behalten willst, zeigen sie einfach
// auf denselben Unified-Stream.
func recordStream(w http.ResponseWriter, r *http.Request) {
eventsStream(w, r)
}
func doneStream(w http.ResponseWriter, r *http.Request) {
eventsStream(w, r)
}
func assetsStream(w http.ResponseWriter, r *http.Request) {
eventsStream(w, r)
}
// -------------------- optional helper --------------------
func publishRawSSE(eventName string, buf *bytes.Buffer) {

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -5,8 +5,8 @@
<link rel="icon" type="image/svg+xml" href="/vite.svg" />
<meta name="viewport" content="width=device-width, initial-scale=1, viewport-fit=cover" />
<title>App</title>
<script type="module" crossorigin src="/assets/index-jlVIND2Y.js"></script>
<link rel="stylesheet" crossorigin href="/assets/index-D0pbgV48.css">
<script type="module" crossorigin src="/assets/index-z2cKWgjr.js"></script>
<link rel="stylesheet" crossorigin href="/assets/index-C6R3TW-y.css">
</head>
<body>
<div id="root"></div>

File diff suppressed because it is too large Load Diff

View File

@ -36,6 +36,7 @@ type Props = {
autostartState?: AutostartState
onRefreshAutostartState?: () => Promise<void> | void
modelsByKey?: Record<string, { favorite?: boolean; liked?: boolean | null; watching?: boolean }>
roomStatusByModelKey?: Record<string, string>
onOpenPlayer: (job: RecordJob) => void
onStopJob: (id: string) => void
blurPreviews?: boolean
@ -91,14 +92,35 @@ const normalizeRoomStatus = (v: unknown): string => {
case 'offline':
return 'Offline'
default:
return 'Offline'
return 'Unknown'
}
}
const roomStatusOfJob = (job: RecordJob): string => {
const modelKeyFromJob = (job: RecordJob): string => {
const j = job as any
const raw =
const rawUrl = String(j.sourceUrl ?? '').trim()
if (rawUrl) {
const m = rawUrl.match(/chaturbate\.com\/([^/?#]+)/i)
if (m?.[1]) {
try {
return decodeURIComponent(m[1]).trim().toLowerCase()
} catch {
return String(m[1]).trim().toLowerCase()
}
}
}
return String(modelNameFromOutput(j.output || '')).trim().toLowerCase()
}
const roomStatusOfJob = (
job: RecordJob,
roomStatusByModelKey?: Record<string, string>
): string => {
const j = job as any
const rawFromJob =
j.currentShow ??
j.roomStatus ??
j.modelStatus ??
@ -108,10 +130,19 @@ const roomStatusOfJob = (job: RecordJob): string => {
j.model?.roomStatus ??
j.model?.status ??
j.room?.currentShow ??
j.room?.status ??
''
j.room?.status
return normalizeRoomStatus(raw)
if (rawFromJob != null && String(rawFromJob).trim() !== '') {
return normalizeRoomStatus(rawFromJob)
}
const modelKey = modelKeyFromJob(job)
if (modelKey && roomStatusByModelKey?.[modelKey]) {
return normalizeRoomStatus(roomStatusByModelKey[modelKey])
}
return 'Unknown'
}
const roomStatusTone = (status: string): string => {
@ -125,6 +156,8 @@ const roomStatusTone = (status: string): string => {
case 'Away':
return 'bg-amber-500/15 text-amber-900 ring-amber-500/30 dark:bg-amber-400/10 dark:text-amber-200 dark:ring-amber-400/25'
case 'Offline':
return 'bg-gray-900/5 text-gray-800 ring-gray-900/10 dark:bg-white/10 dark:text-gray-200 dark:ring-white/10'
case 'Unknown':
default:
return 'bg-gray-900/5 text-gray-800 ring-gray-900/10 dark:bg-white/10 dark:text-gray-200 dark:ring-white/10'
}
@ -223,16 +256,6 @@ const phaseLabel = (p?: string) => {
}
}
async function apiJSON<T>(url: string, init?: RequestInit): Promise<T> {
const res = await fetch(url, init)
if (!res.ok) {
const text = await res.text().catch(() => '')
throw new Error(text || `HTTP ${res.status}`)
}
return res.json() as Promise<T>
}
function postWorkLabel(
job: RecordJob,
override?: { pos?: number; total?: number }
@ -344,6 +367,7 @@ function DownloadsCardRow({
nowMs,
blurPreviews,
modelsByKey,
roomStatusByModelKey,
stopRequestedIds,
postworkInfoOf,
markStopRequested,
@ -357,6 +381,7 @@ function DownloadsCardRow({
nowMs: number
blurPreviews?: boolean
modelsByKey: Record<string, { favorite?: boolean; liked?: boolean | null; watching?: boolean }>
roomStatusByModelKey: Record<string, string>
stopRequestedIds: Record<string, true>
postworkInfoOf: (job: RecordJob) => { pos?: number; total?: number } | undefined
markStopRequested: (ids: string | string[]) => void
@ -470,7 +495,7 @@ function DownloadsCardRow({
const isBusyPhase = phaseLower !== '' && phaseLower !== 'recording'
const isStopping = isBusyPhase || rawStatus !== 'running' || isStopRequested
const roomStatus = roomStatusOfJob(j)
const roomStatus = roomStatusOfJob(j, roomStatusByModelKey)
let phaseText = phase ? (phaseLabel(phase) || phase) : ''
@ -537,6 +562,7 @@ function DownloadsCardRow({
<ModelPreview
jobId={j.id}
blur={blurPreviews}
roomStatus={roomStatus}
alignStartAt={j.startedAt}
alignEndAt={j.endedAt ?? null}
alignEveryMs={10_000}
@ -789,6 +815,7 @@ export default function Downloads({
onToggleWatch,
onAddToDownloads,
modelsByKey = {},
roomStatusByModelKey = {},
blurPreviews
}: Props) {
@ -1025,6 +1052,7 @@ export default function Downloads({
<ModelPreview
jobId={j.id}
blur={blurPreviews}
roomStatus={roomStatusOfJob(j, roomStatusByModelKey)}
alignStartAt={j.startedAt}
alignEndAt={j.endedAt ?? null}
alignEveryMs={10_000}
@ -1080,7 +1108,7 @@ export default function Downloads({
const f = baseName(j.output || '')
const name = modelNameFromOutput(j.output)
const roomStatus = roomStatusOfJob(j)
const roomStatus = roomStatusOfJob(j, roomStatusByModelKey)
return (
<>
@ -1276,7 +1304,7 @@ export default function Downloads({
},
},
]
}, [blurPreviews, markStopRequested, modelsByKey, nowMs, onStopJob, onToggleFavorite, onToggleLike, onToggleWatch, stopRequestedIds, stopInitiatedIds, postworkInfoOf])
}, [blurPreviews, markStopRequested, modelsByKey, roomStatusByModelKey, nowMs, onStopJob, onToggleFavorite, onToggleLike, onToggleWatch, stopRequestedIds, stopInitiatedIds, postworkInfoOf])
const downloadJobRows = useMemo<DownloadRow[]>(() => {
const list = jobs
@ -1455,6 +1483,7 @@ export default function Downloads({
nowMs={nowMs}
blurPreviews={blurPreviews}
modelsByKey={modelsByKey}
roomStatusByModelKey={roomStatusByModelKey}
postworkInfoOf={postworkInfoOf}
stopRequestedIds={stopRequestedIds}
markStopRequested={markStopRequested}
@ -1475,11 +1504,12 @@ export default function Downloads({
</div>
{postworkRows.map((r) => (
<DownloadsCardRow
key={`pw:${r.kind === 'job' ? r.job.id : pendingRowKey(r.pending)}`}
key={`dl:${r.kind === 'job' ? r.job.id : pendingRowKey(r.pending)}`}
r={r}
nowMs={nowMs}
blurPreviews={blurPreviews}
modelsByKey={modelsByKey}
roomStatusByModelKey={roomStatusByModelKey}
postworkInfoOf={postworkInfoOf}
stopRequestedIds={stopRequestedIds}
markStopRequested={markStopRequested}
@ -1500,11 +1530,12 @@ export default function Downloads({
</div>
{pendingRows.map((r) => (
<DownloadsCardRow
key={`wa:${r.kind === 'job' ? r.job.id : pendingRowKey(r.pending)}`}
key={`dl:${r.kind === 'job' ? r.job.id : pendingRowKey(r.pending)}`}
r={r}
nowMs={nowMs}
blurPreviews={blurPreviews}
modelsByKey={modelsByKey}
roomStatusByModelKey={roomStatusByModelKey}
postworkInfoOf={postworkInfoOf}
stopRequestedIds={stopRequestedIds}
markStopRequested={markStopRequested}

View File

@ -9,14 +9,40 @@ export default function LiveVideo({
src,
muted = DEFAULT_INLINE_MUTED,
className,
roomStatus,
}: {
src: string
muted?: boolean
className?: string
roomStatus?: string
}) {
const ref = useRef<HTMLVideoElement>(null)
const [broken, setBroken] = useState(false)
const [brokenReason, setBrokenReason] = useState<'private' | 'offline' | null>(null)
const [brokenReason, setBrokenReason] = useState<'private' | 'hidden' | 'away' | 'offline' | null>(null)
const normalizeBrokenReason = (status?: string): 'private' | 'hidden' | 'away' | 'offline' => {
const s = String(status ?? '').trim().toLowerCase()
if (s === 'private') return 'private'
if (s === 'hidden') return 'hidden'
if (s === 'away') return 'away'
return 'offline'
}
const brokenMessage = (reason: 'private' | 'hidden' | 'away' | 'offline' | null): string => {
switch (reason) {
case 'hidden':
return 'Cam is hidden'
case 'away':
return 'Model is away'
case 'private':
return 'Private show in progress.'
case 'offline':
return 'Model is offline'
default:
return 'Live video unavailable'
}
}
useEffect(() => {
let cancelled = false
@ -51,24 +77,33 @@ export default function LiveVideo({
}
video.addEventListener('timeupdate', onTime)
let stalledRetries = 0
watchdogTimer = window.setInterval(() => {
if (cancelled) return
// wenn nicht paused, aber 12s keine timeupdate => neu verbinden
// wenn nicht paused, aber 12s kein Fortschritt -> reconnect versuchen
if (!video.paused && Date.now() - lastT > 12_000) {
if (stalledRetries < 1) {
stalledRetries += 1
hardReset()
video.src = src
video.load()
video.play().catch(() => {})
lastT = Date.now()
return
}
setBroken(true)
setBrokenReason(normalizeBrokenReason(roomStatus))
}
}, 4_000)
// 3) HTTP-Fehler (403/404) erkennen ist bei <video> nicht sauber möglich.
// Wir machen hier bewusst KEIN aggressives Retry. Broken UI nur, wenn Video "error" signalisiert.
const onError = () => {
// best effort: 403/404 kannst du im Backend zusätzlich über Query/JSON o.ä. signalisieren,
// aber hier: generic broken
setBroken(true)
setBrokenReason(normalizeBrokenReason(roomStatus))
}
video.addEventListener('error', onError)
@ -80,12 +115,14 @@ export default function LiveVideo({
video.removeEventListener('error', onError)
hardReset()
}
}, [src, muted])
}, [src, muted, roomStatus])
if (broken) {
return (
<div className="text-xs text-gray-400 italic">
{brokenReason === 'private' ? 'Private' : brokenReason === 'offline' ? 'Offline' : ''}
<div className="grid h-full w-full place-items-center bg-black/80 px-4 text-center">
<div className="text-sm font-medium text-white">
{brokenMessage(brokenReason)}
</div>
</div>
)
}

View File

@ -13,6 +13,7 @@ type Props = {
blur?: boolean
className?: string
fit?: 'cover' | 'contain'
roomStatus?: string
alignStartAt?: string | number | Date
alignEndAt?: string | number | Date | null
@ -32,6 +33,7 @@ export default function ModelPreview({
autoTickMs = 10_000,
blur = false,
className,
roomStatus,
alignStartAt,
alignEndAt = null,
alignEveryMs,
@ -269,7 +271,12 @@ export default function ModelPreview({
}}
>
<div className="absolute inset-0">
<LiveVideo src={hq} muted={true} className="w-full h-full object-contain object-bottom relative z-0" />
<LiveVideo
src={hq}
muted={true}
roomStatus={roomStatus}
className="w-full h-full object-contain object-bottom relative z-0"
/>
<div className="absolute left-2 top-2 inline-flex items-center gap-1.5 rounded-full bg-red-600/90 px-2 py-1 text-[11px] font-semibold text-white shadow-sm">
<span className="inline-block size-1.5 rounded-full bg-white animate-pulse" />

View File

@ -501,6 +501,17 @@ export default function ModelsTab() {
return () => window.removeEventListener('models-changed', onChanged as any)
}, [refresh])
React.useEffect(() => {
const onDbChanged = () => {
setPage(1)
setModels([])
void refresh()
}
window.addEventListener('models-db-changed', onDbChanged as any)
return () => window.removeEventListener('models-db-changed', onDbChanged as any)
}, [refresh])
React.useEffect(() => {
const raw = input.trim()
if (!raw) {

View File

@ -81,6 +81,7 @@ export default function RecorderSettings({ onAssetsGenerated }: Props) {
const [diskStatus, setDiskStatus] = useState<DiskStatus | null>(null)
const assetsAbortRef = useRef<AbortController | null>(null)
const [dbModalOpen, setDbModalOpen] = useState(false)
const [loadedDatabaseUrl, setLoadedDatabaseUrl] = useState('')
const [pendingDbPassword, setPendingDbPassword] = useState('') // wird nur beim Speichern gesendet
const now = Date.now()
const saveSucceeded = saveSuccessUntilMs > now
@ -184,6 +185,7 @@ export default function RecorderSettings({ onAssetsGenerated }: Props) {
lowDiskPauseBelowGB: (data as any).lowDiskPauseBelowGB ?? DEFAULTS.lowDiskPauseBelowGB,
enableNotifications: (data as any).enableNotifications ?? DEFAULTS.enableNotifications,
})
setLoadedDatabaseUrl(String((data as any).databaseUrl ?? '').trim())
})
.catch(() => {
// backend evtl. noch alt -> defaults lassen
@ -252,6 +254,7 @@ export default function RecorderSettings({ onAssetsGenerated }: Props) {
const doneDir = value.doneDir.trim()
const ffmpegPath = (value.ffmpegPath ?? '').trim()
const databaseUrl = String((value as any).databaseUrl ?? '').trim()
const hadPendingDbPassword = Boolean((pendingDbPassword || '').trim())
if (!recordDir || !doneDir) {
setErr('Bitte Aufnahme-Ordner und Ziel-Ordner angeben.')
@ -319,6 +322,19 @@ export default function RecorderSettings({ onAssetsGenerated }: Props) {
saveSuccessTimerRef.current = null
}, 2500)
window.dispatchEvent(new CustomEvent('recorder-settings-updated'))
const databaseUrlChanged =
databaseUrl !== loadedDatabaseUrl || hadPendingDbPassword
if (databaseUrlChanged) {
window.dispatchEvent(
new CustomEvent('models-db-changed', {
detail: { databaseUrl },
})
)
setLoadedDatabaseUrl(databaseUrl)
setPendingDbPassword('')
}
} catch (e: any) {
setSaveSuccessUntilMs(0)
if (saveSuccessTimerRef.current != null) {

View File

@ -1,191 +0,0 @@
// frontend/src/lib/chaturbateOnlinePoller.ts
export type ChaturbateOnlineRoom = {
username?: string
current_show?: string
chat_room_url?: string
image_url?: string
}
export type ChaturbateOnlineResponse = {
enabled: boolean
rooms: ChaturbateOnlineRoom[]
total?: number
}
type OnlineState = ChaturbateOnlineResponse
function chunk<T>(arr: T[], size: number): T[][] {
const out: T[][] = []
for (let i = 0; i < arr.length; i += size) out.push(arr.slice(i, i + size))
return out
}
function dedupeRooms(rooms: ChaturbateOnlineRoom[]): ChaturbateOnlineRoom[] {
const seen = new Set<string>()
const out: ChaturbateOnlineRoom[] = []
for (const r of rooms) {
const u = String(r?.username ?? '').trim().toLowerCase()
if (!u || seen.has(u)) continue
seen.add(u)
out.push(r)
}
return out
}
export function startChaturbateOnlinePolling(opts: {
getModels: () => string[]
getShow: () => string[]
onData: (data: OnlineState) => void
intervalMs?: number
// ✅ NEU: wenn getModels() leer ist, trotzdem einmal call machen (für "ALL online")
fetchAllWhenNoModels?: boolean
/** Optional: wird bei Fehlern aufgerufen (für Debug) */
onError?: (err: unknown) => void
}) {
const baseIntervalMs = opts.intervalMs ?? 5000
let timer: number | null = null
let inFlight: AbortController | null = null
let lastKey = ''
let lastResult: OnlineState | null = null
let stopped = false
const clearTimer = () => {
if (timer != null) {
window.clearTimeout(timer)
timer = null
}
}
const closeInFlight = () => {
if (inFlight) {
try {
inFlight.abort()
} catch {}
inFlight = null
}
}
const schedule = (ms: number) => {
if (stopped) return
clearTimer()
timer = window.setTimeout(() => void tick(), ms)
}
const tick = async () => {
if (stopped) return
try {
const models = (opts.getModels?.() ?? [])
.map((x) => String(x || '').trim())
.filter(Boolean)
const showRaw = (opts.getShow?.() ?? [])
.map((x) => String(x || '').trim())
.filter(Boolean)
// stabilisieren
const show = showRaw.slice().sort()
const modelsSorted = models.slice().sort()
// ✅ ALL-mode, wenn keine Models und Option aktiv
const isAllMode = modelsSorted.length === 0 && Boolean(opts.fetchAllWhenNoModels)
// keine Models -> normalerweise rooms leeren (enabled nicht neu erfinden)
if (modelsSorted.length === 0 && !isAllMode) {
closeInFlight()
const empty: OnlineState = { enabled: lastResult?.enabled ?? false, rooms: [] }
lastResult = empty
opts.onData(empty)
const nextMs = document.hidden ? Math.max(15000, baseIntervalMs) : baseIntervalMs
schedule(nextMs)
return
}
// ✅ In ALL-mode senden wir q:[] (1 Request). Sonst normale Liste.
const modelsForRequest = isAllMode ? [] : modelsSorted
const key = `${show.join(',')}|${isAllMode ? '__ALL__' : modelsForRequest.join(',')}`
const requestKey = key
lastKey = key
// dedupe / cancel previous
closeInFlight()
const controller = new AbortController()
inFlight = controller
const CHUNK_SIZE = 350 // wenn du extrem viele Keys hast: 200300 nehmen
// ✅ ALL-mode: genau ein Part mit [] schicken
const parts = isAllMode ? [[]] : chunk(modelsForRequest, CHUNK_SIZE)
let mergedRooms: ChaturbateOnlineRoom[] = []
let mergedEnabled = false
let mergedTotal = 0
let hadAnyOk = false
for (const part of parts) {
if (controller.signal.aborted) return
if (requestKey !== lastKey) return
if (stopped) return
const res = await fetch('/api/chaturbate/online', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ q: part, show, refresh: false }),
signal: controller.signal,
cache: 'no-store',
})
if (!res.ok) continue
hadAnyOk = true
const data = (await res.json()) as OnlineState
mergedEnabled = mergedEnabled || Boolean(data?.enabled)
mergedRooms.push(...(Array.isArray(data?.rooms) ? data.rooms : []))
// ✅ NEU: total mergen (Backend liefert Gesamtzahl)
const t = Number((data as any)?.total ?? 0)
if (Number.isFinite(t) && t > mergedTotal) mergedTotal = t
}
if (!hadAnyOk) {
const nextMs = document.hidden ? Math.max(15000, baseIntervalMs) : baseIntervalMs
schedule(nextMs)
return
}
const merged: OnlineState = { enabled: mergedEnabled, rooms: dedupeRooms(mergedRooms), total: mergedTotal }
if (controller.signal.aborted) return
if (requestKey !== lastKey) return
if (stopped) return
lastResult = merged
opts.onData(merged)
} catch (e: any) {
if (e?.name === 'AbortError') return
opts.onError?.(e)
} finally {
// ✅ adaptive backoff: hidden tab = viel seltener pollen
const nextMs = document.hidden ? Math.max(15000, baseIntervalMs) : baseIntervalMs
schedule(nextMs)
}
}
// sofort einmal
void tick()
// stop function
return () => {
stopped = true
clearTimer()
closeInFlight()
}
}