2026-03-10 18:27:17 +01:00

483 lines
9.7 KiB
Go

// backend\sse.go
package main
import (
"bytes"
"encoding/json"
"net/http"
"sort"
"strings"
"sync/atomic"
"time"
"github.com/r3labs/sse/v2"
)
// -------------------- SSE server --------------------
type appSSE struct {
server *sse.Server
}
var sseApp *appSSE
type jobEvent struct {
Type string `json:"type"`
Model string `json:"model"`
JobID string `json:"jobId"`
Status JobStatus `json:"status"`
Phase string `json:"phase,omitempty"`
Progress int `json:"progress,omitempty"`
SourceURL string `json:"sourceUrl,omitempty"`
Output string `json:"output,omitempty"`
StartedAt string `json:"startedAt,omitempty"`
StartedAtMs int64 `json:"startedAtMs,omitempty"`
EndedAt string `json:"endedAt,omitempty"`
EndedAtMs int64 `json:"endedAtMs,omitempty"`
SizeBytes int64 `json:"sizeBytes,omitempty"`
DurationSeconds float64 `json:"durationSeconds,omitempty"`
PreviewState string `json:"previewState,omitempty"`
RoomStatus string `json:"roomStatus,omitempty"`
IsOnline bool `json:"isOnline,omitempty"`
ModelImageURL string `json:"modelImageUrl,omitempty"`
ModelChatRoomURL string `json:"modelChatRoomUrl,omitempty"`
PostWorkKey string `json:"postWorkKey,omitempty"`
PostWork any `json:"postWork,omitempty"`
TS int64 `json:"ts"`
}
type ssePublishItem struct {
EventName string
Data []byte
}
func isTerminalJobStatusForSSE(status JobStatus) bool {
s := strings.ToLower(strings.TrimSpace(string(status)))
return s == "stopped" ||
s == "finished" ||
s == "failed" ||
s == "done" ||
s == "completed" ||
s == "canceled" ||
s == "cancelled"
}
func isPostworkJobForSSE(j *RecordJob) bool {
if j == nil {
return false
}
phase := strings.ToLower(strings.TrimSpace(j.Phase))
pwKey := strings.TrimSpace(j.PostWorkKey)
if pwKey != "" {
return true
}
if j.PostWork != nil {
// falls PostWork als struct/map vorliegt, reicht für SSE der generelle Hinweis:
return true
}
if j.EndedAt != nil && phase != "" {
return true
}
if phase == "postwork" {
return true
}
return false
}
func isVisibleDownloadJobForSSE(j *RecordJob) bool {
if j == nil {
return false
}
if isPostworkJobForSSE(j) {
return false
}
if isTerminalJobStatusForSSE(j.Status) {
return false
}
if j.EndedAt != nil {
return false
}
return true
}
func isVisiblePostworkJobForSSE(j *RecordJob) bool {
if j == nil {
return false
}
if !isPostworkJobForSSE(j) {
return false
}
if isTerminalJobStatusForSSE(j.Status) {
return false
}
return true
}
func shouldPublishModelEventForJob(j *RecordJob) bool {
return isVisibleDownloadJobForSSE(j) || isVisiblePostworkJobForSSE(j)
}
func visibleJobEventsJSON() []ssePublishItem {
nowTs := time.Now().UnixMilli()
out := make([]ssePublishItem, 0, 64)
jobsMu.Lock()
defer jobsMu.Unlock()
for _, j := range jobs {
if j == nil || j.Hidden {
continue
}
if !shouldPublishModelEventForJob(j) {
continue
}
eventName := sseModelEventNameForJob(j)
if eventName == "" {
continue
}
payload := jobEvent{
Type: "job_upsert",
Model: eventName,
JobID: j.ID,
Status: j.Status,
Phase: j.Phase,
Progress: j.Progress,
SourceURL: j.SourceURL,
Output: j.Output,
StartedAt: j.StartedAt.Format(time.RFC3339Nano),
StartedAtMs: j.StartedAtMs,
SizeBytes: j.SizeBytes,
DurationSeconds: j.DurationSeconds,
PreviewState: j.PreviewState,
PostWorkKey: strings.TrimSpace(j.PostWorkKey),
PostWork: j.PostWork,
TS: nowTs,
}
if sm := sseStoredModelForJob(j); sm != nil {
payload.RoomStatus = strings.ToLower(strings.TrimSpace(sm.RoomStatus))
payload.IsOnline = sm.IsOnline
payload.ModelImageURL = strings.TrimSpace(sm.ImageURL)
payload.ModelChatRoomURL = strings.TrimSpace(sm.ChatRoomURL)
}
if j.EndedAt != nil {
payload.EndedAt = j.EndedAt.Format(time.RFC3339Nano)
payload.EndedAtMs = j.EndedAtMs
}
b, err := json.Marshal(payload)
if err != nil {
continue
}
out = append(out, ssePublishItem{
EventName: eventName,
Data: b,
})
}
return out
}
func publishJobUpsert(j *RecordJob) {
if j == nil || j.Hidden {
return
}
if !shouldPublishModelEventForJob(j) {
return
}
eventName := sseModelEventNameForJob(j)
if eventName == "" {
return
}
payload := jobEvent{
Type: "job_upsert",
Model: eventName,
JobID: j.ID,
Status: j.Status,
Phase: j.Phase,
Progress: j.Progress,
SourceURL: j.SourceURL,
Output: j.Output,
StartedAt: j.StartedAt.Format(time.RFC3339Nano),
StartedAtMs: j.StartedAtMs,
EndedAtMs: j.EndedAtMs,
SizeBytes: j.SizeBytes,
DurationSeconds: j.DurationSeconds,
PreviewState: j.PreviewState,
PostWorkKey: strings.TrimSpace(j.PostWorkKey),
PostWork: j.PostWork,
TS: time.Now().UnixMilli(),
}
if sm := sseStoredModelForJob(j); sm != nil {
payload.RoomStatus = strings.ToLower(strings.TrimSpace(sm.RoomStatus))
payload.IsOnline = sm.IsOnline
payload.ModelImageURL = strings.TrimSpace(sm.ImageURL)
payload.ModelChatRoomURL = strings.TrimSpace(sm.ChatRoomURL)
}
if j.EndedAt != nil {
payload.EndedAt = j.EndedAt.Format(time.RFC3339Nano)
}
b, _ := json.Marshal(payload)
publishSSE(eventName, b)
}
func publishJobRemove(j *RecordJob) {
if j == nil || j.Hidden {
return
}
eventName := sseModelEventNameForJob(j)
if eventName == "" {
return
}
b, _ := json.Marshal(jobEvent{
Type: "job_remove",
Model: eventName,
JobID: j.ID,
TS: time.Now().UnixMilli(),
})
publishSSE(eventName, b)
}
func sseModelEventNameForJob(j *RecordJob) string {
if j == nil {
return ""
}
src := strings.TrimSpace(j.SourceURL)
switch detectProvider(src) {
case "chaturbate":
if u := strings.TrimSpace(extractUsername(src)); u != "" {
return strings.ToLower(u)
}
case "mfc":
if u := strings.TrimSpace(extractMFCUsername(src)); u != "" {
return strings.ToLower(u)
}
}
return ""
}
func sseStoredModelForJob(j *RecordJob) *StoredModel {
if j == nil || cbModelStore == nil {
return nil
}
src := strings.TrimSpace(j.SourceURL)
if src == "" {
return nil
}
host := ""
modelKey := ""
switch detectProvider(src) {
case "chaturbate":
host = "chaturbate.com"
modelKey = strings.ToLower(strings.TrimSpace(extractUsername(src)))
default:
return nil
}
if host == "" || modelKey == "" {
return nil
}
m, ok := cbModelStore.GetByHostAndModelKey(host, modelKey)
if !ok {
return nil
}
return m
}
func initSSE() {
srv := sse.New()
srv.SplitData = true
stream := srv.CreateStream("events")
stream.AutoReplay = false
sseApp = &appSSE{
server: srv,
}
// Debounced broadcaster (done changed)
go func() {
for range doneNotify {
time.Sleep(40 * time.Millisecond)
for {
select {
case <-doneNotify:
default:
goto SEND
}
}
SEND:
seq := atomic.AddUint64(&doneSeq, 1)
b, _ := json.Marshal(map[string]any{
"type": "doneChanged",
"seq": seq,
"ts": time.Now().UnixMilli(),
})
publishSSE("doneChanged", b)
}
}()
// Debounced broadcaster (assets)
go func() {
for range assetsNotify {
time.Sleep(80 * time.Millisecond)
for {
select {
case <-assetsNotify:
default:
goto SEND
}
}
SEND:
b := assetsSnapshotJSON()
if len(b) > 0 {
publishSSE("state", b)
}
}
}()
// Per running job: 1 SSE event per second
go func() {
t := time.NewTicker(1 * time.Second)
defer t.Stop()
for range t.C {
events := visibleJobEventsJSON()
for _, ev := range events {
if len(ev.Data) == 0 || ev.EventName == "" {
continue
}
publishSSE(ev.EventName, ev.Data)
}
}
}()
}
func publishSSE(eventName string, data []byte) {
if sseApp == nil || sseApp.server == nil {
return
}
if len(data) == 0 {
return
}
sseApp.server.Publish("events", &sse.Event{
Event: []byte(eventName),
Data: data,
})
}
// -------------------- notify channels --------------------
var (
doneNotify = make(chan struct{}, 1)
doneSeq uint64
assetsNotify = make(chan struct{}, 1)
)
func notifyDoneChanged() {
select {
case doneNotify <- struct{}{}:
default:
}
}
func notifyAssetsChanged() {
select {
case assetsNotify <- struct{}{}:
default:
}
}
// -------------------- snapshots --------------------
func jobsSnapshotJSON() []byte {
jobsMu.Lock()
list := make([]*RecordJob, 0, len(jobs))
for _, j := range jobs {
if j == nil || j.Hidden {
continue
}
c := *j
c.cancel = nil
list = append(list, &c)
}
jobsMu.Unlock()
sort.Slice(list, func(i, j int) bool {
return list[i].StartedAt.After(list[j].StartedAt)
})
b, _ := json.Marshal(list)
return b
}
func assetsSnapshotJSON() []byte {
assetsTaskMu.Lock()
st := assetsTaskState
assetsTaskMu.Unlock()
b, _ := json.Marshal(st)
return b
}
// -------------------- unified stream handler --------------------
func eventsStream(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "Nur GET erlaubt", http.StatusMethodNotAllowed)
return
}
if sseApp == nil || sseApp.server == nil {
http.Error(w, "SSE nicht initialisiert", http.StatusInternalServerError)
return
}
q := r.URL.Query()
q.Set("stream", "events")
r2 := r.Clone(r.Context())
r2.URL.RawQuery = q.Encode()
sseApp.server.ServeHTTP(w, r2)
}
// -------------------- optional helper --------------------
func publishRawSSE(eventName string, buf *bytes.Buffer) {
if buf == nil {
return
}
publishSSE(eventName, buf.Bytes())
}