// backend\sse.go package main import ( "bytes" "encoding/json" "net/http" "sort" "strings" "sync/atomic" "time" "github.com/r3labs/sse/v2" ) // -------------------- SSE server -------------------- type appSSE struct { server *sse.Server } var sseApp *appSSE type jobEvent struct { Type string `json:"type"` Model string `json:"model"` JobID string `json:"jobId"` Status JobStatus `json:"status"` Phase string `json:"phase,omitempty"` Progress int `json:"progress,omitempty"` SourceURL string `json:"sourceUrl,omitempty"` Output string `json:"output,omitempty"` StartedAt string `json:"startedAt,omitempty"` StartedAtMs int64 `json:"startedAtMs,omitempty"` EndedAt string `json:"endedAt,omitempty"` EndedAtMs int64 `json:"endedAtMs,omitempty"` SizeBytes int64 `json:"sizeBytes,omitempty"` DurationSeconds float64 `json:"durationSeconds,omitempty"` PreviewState string `json:"previewState,omitempty"` RoomStatus string `json:"roomStatus,omitempty"` IsOnline bool `json:"isOnline,omitempty"` ModelImageURL string `json:"modelImageUrl,omitempty"` ModelChatRoomURL string `json:"modelChatRoomUrl,omitempty"` TS int64 `json:"ts"` } type ssePublishItem struct { EventName string Data []byte } func visibleJobEventsJSON() []ssePublishItem { nowTs := time.Now().UnixMilli() out := make([]ssePublishItem, 0, 64) jobsMu.Lock() defer jobsMu.Unlock() for _, j := range jobs { if j == nil || j.Hidden { continue } eventName := sseModelEventNameForJob(j) if eventName == "" { continue } payload := jobEvent{ Type: "job_upsert", Model: eventName, JobID: j.ID, Status: j.Status, Phase: j.Phase, Progress: j.Progress, SourceURL: j.SourceURL, Output: j.Output, StartedAt: j.StartedAt.Format(time.RFC3339Nano), StartedAtMs: j.StartedAtMs, SizeBytes: j.SizeBytes, DurationSeconds: j.DurationSeconds, PreviewState: j.PreviewState, TS: nowTs, } if sm := sseStoredModelForJob(j); sm != nil { payload.RoomStatus = strings.ToLower(strings.TrimSpace(sm.RoomStatus)) payload.IsOnline = sm.IsOnline payload.ModelImageURL = strings.TrimSpace(sm.ImageURL) payload.ModelChatRoomURL = strings.TrimSpace(sm.ChatRoomURL) } if j.EndedAt != nil { payload.EndedAt = j.EndedAt.Format(time.RFC3339Nano) payload.EndedAtMs = j.EndedAtMs } b, err := json.Marshal(payload) if err != nil { continue } out = append(out, ssePublishItem{ EventName: eventName, Data: b, }) } return out } func visibleRoomStateEventsJSON() []ssePublishItem { nowTs := time.Now().UnixMilli() out := make([]ssePublishItem, 0, 128) if cbModelStore == nil { return out } models := cbModelStore.List() for _, sm := range models { if strings.ToLower(strings.TrimSpace(sm.Host)) != "chaturbate.com" { continue } modelKey := strings.ToLower(strings.TrimSpace(sm.ModelKey)) if modelKey == "" { continue } payload := jobEvent{ Type: "room_state", Model: modelKey, RoomStatus: strings.ToLower(strings.TrimSpace(sm.RoomStatus)), IsOnline: sm.IsOnline, ModelImageURL: strings.TrimSpace(sm.ImageURL), ModelChatRoomURL: strings.TrimSpace(sm.ChatRoomURL), TS: nowTs, } b, err := json.Marshal(payload) if err != nil { continue } out = append(out, ssePublishItem{ EventName: modelKey, Data: b, }) } return out } func publishJobUpsert(j *RecordJob) { if j == nil || j.Hidden { return } eventName := sseModelEventNameForJob(j) if eventName == "" { return } payload := jobEvent{ Type: "job_upsert", Model: eventName, JobID: j.ID, Status: j.Status, Phase: j.Phase, Progress: j.Progress, SourceURL: j.SourceURL, Output: j.Output, StartedAt: j.StartedAt.Format(time.RFC3339Nano), StartedAtMs: j.StartedAtMs, EndedAtMs: j.EndedAtMs, SizeBytes: j.SizeBytes, DurationSeconds: j.DurationSeconds, PreviewState: j.PreviewState, TS: time.Now().UnixMilli(), } if sm := sseStoredModelForJob(j); sm != nil { payload.RoomStatus = strings.ToLower(strings.TrimSpace(sm.RoomStatus)) payload.IsOnline = sm.IsOnline payload.ModelImageURL = strings.TrimSpace(sm.ImageURL) payload.ModelChatRoomURL = strings.TrimSpace(sm.ChatRoomURL) } if j.EndedAt != nil { payload.EndedAt = j.EndedAt.Format(time.RFC3339Nano) } b, _ := json.Marshal(payload) publishSSE(eventName, b) } func publishJobRemove(j *RecordJob) { if j == nil || j.Hidden { return } eventName := sseModelEventNameForJob(j) if eventName == "" { return } b, _ := json.Marshal(jobEvent{ Type: "job_remove", Model: eventName, JobID: j.ID, TS: time.Now().UnixMilli(), }) publishSSE(eventName, b) } func publishRoomStateForModel(sm *StoredModel) { if sm == nil { return } if strings.ToLower(strings.TrimSpace(sm.Host)) != "chaturbate.com" { return } modelKey := strings.ToLower(strings.TrimSpace(sm.ModelKey)) if modelKey == "" { return } payload := jobEvent{ Type: "room_state", Model: modelKey, RoomStatus: strings.ToLower(strings.TrimSpace(sm.RoomStatus)), IsOnline: sm.IsOnline, ModelImageURL: strings.TrimSpace(sm.ImageURL), ModelChatRoomURL: strings.TrimSpace(sm.ChatRoomURL), TS: time.Now().UnixMilli(), } b, _ := json.Marshal(payload) publishSSE(modelKey, b) } func sseModelEventNameForJob(j *RecordJob) string { if j == nil { return "" } src := strings.TrimSpace(j.SourceURL) switch detectProvider(src) { case "chaturbate": if u := strings.TrimSpace(extractUsername(src)); u != "" { return strings.ToLower(u) } case "mfc": if u := strings.TrimSpace(extractMFCUsername(src)); u != "" { return strings.ToLower(u) } } return "" } func sseStoredModelForJob(j *RecordJob) *StoredModel { if j == nil || cbModelStore == nil { return nil } src := strings.TrimSpace(j.SourceURL) if src == "" { return nil } host := "" modelKey := "" switch detectProvider(src) { case "chaturbate": host = "chaturbate.com" modelKey = strings.ToLower(strings.TrimSpace(extractUsername(src))) default: return nil } if host == "" || modelKey == "" { return nil } m, ok := cbModelStore.GetByHostAndModelKey(host, modelKey) if !ok { return nil } return m } func initSSE() { srv := sse.New() srv.SplitData = true stream := srv.CreateStream("events") stream.AutoReplay = false sseApp = &appSSE{ server: srv, } // Debounced broadcaster (done changed) go func() { for range doneNotify { time.Sleep(40 * time.Millisecond) for { select { case <-doneNotify: default: goto SEND } } SEND: seq := atomic.AddUint64(&doneSeq, 1) b, _ := json.Marshal(map[string]any{ "type": "doneChanged", "seq": seq, "ts": time.Now().UnixMilli(), }) publishSSE("doneChanged", b) } }() // Debounced broadcaster (assets) go func() { for range assetsNotify { time.Sleep(80 * time.Millisecond) for { select { case <-assetsNotify: default: goto SEND } } SEND: b := assetsSnapshotJSON() if len(b) > 0 { publishSSE("state", b) } } }() // Per running job: 1 SSE event per second go func() { t := time.NewTicker(1 * time.Second) defer t.Stop() for range t.C { events := visibleJobEventsJSON() for _, ev := range events { if len(ev.Data) == 0 || ev.EventName == "" { continue } publishSSE(ev.EventName, ev.Data) } } }() // Room state snapshot: 1 SSE event per second for known chaturbate models go func() { t := time.NewTicker(1 * time.Second) defer t.Stop() lastByModel := map[string]string{} for range t.C { events := visibleRoomStateEventsJSON() nextByModel := make(map[string]string, len(events)) for _, ev := range events { if len(ev.Data) == 0 || ev.EventName == "" { continue } key := ev.EventName payloadKey := string(ev.Data) nextByModel[key] = payloadKey if prev, ok := lastByModel[key]; ok && prev == payloadKey { continue } publishSSE(ev.EventName, ev.Data) } lastByModel = nextByModel } }() } func publishSSE(eventName string, data []byte) { if sseApp == nil || sseApp.server == nil { return } if len(data) == 0 { return } sseApp.server.Publish("events", &sse.Event{ Event: []byte(eventName), Data: data, }) } // -------------------- notify channels -------------------- var ( doneNotify = make(chan struct{}, 1) doneSeq uint64 assetsNotify = make(chan struct{}, 1) ) func notifyDoneChanged() { select { case doneNotify <- struct{}{}: default: } } func notifyAssetsChanged() { select { case assetsNotify <- struct{}{}: default: } } // -------------------- snapshots -------------------- func jobsSnapshotJSON() []byte { jobsMu.Lock() list := make([]*RecordJob, 0, len(jobs)) for _, j := range jobs { if j == nil || j.Hidden { continue } c := *j c.cancel = nil list = append(list, &c) } jobsMu.Unlock() sort.Slice(list, func(i, j int) bool { return list[i].StartedAt.After(list[j].StartedAt) }) b, _ := json.Marshal(list) return b } func assetsSnapshotJSON() []byte { assetsTaskMu.Lock() st := assetsTaskState assetsTaskMu.Unlock() b, _ := json.Marshal(st) return b } // -------------------- unified stream handler -------------------- func eventsStream(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { http.Error(w, "Nur GET erlaubt", http.StatusMethodNotAllowed) return } if sseApp == nil || sseApp.server == nil { http.Error(w, "SSE nicht initialisiert", http.StatusInternalServerError) return } q := r.URL.Query() q.Set("stream", "events") r2 := r.Clone(r.Context()) r2.URL.RawQuery = q.Encode() sseApp.server.ServeHTTP(w, r2) } // -------------------- optional helper -------------------- func publishRawSSE(eventName string, buf *bytes.Buffer) { if buf == nil { return } publishSSE(eventName, buf.Bytes()) }