483 lines
9.7 KiB
Go
483 lines
9.7 KiB
Go
// backend\sse.go
|
|
package main
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/json"
|
|
"net/http"
|
|
"sort"
|
|
"strings"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/r3labs/sse/v2"
|
|
)
|
|
|
|
// -------------------- SSE server --------------------
|
|
|
|
type appSSE struct {
|
|
server *sse.Server
|
|
}
|
|
|
|
var sseApp *appSSE
|
|
|
|
type jobEvent struct {
|
|
Type string `json:"type"`
|
|
Model string `json:"model"`
|
|
JobID string `json:"jobId"`
|
|
Status JobStatus `json:"status"`
|
|
Phase string `json:"phase,omitempty"`
|
|
Progress int `json:"progress,omitempty"`
|
|
SourceURL string `json:"sourceUrl,omitempty"`
|
|
Output string `json:"output,omitempty"`
|
|
StartedAt string `json:"startedAt,omitempty"`
|
|
StartedAtMs int64 `json:"startedAtMs,omitempty"`
|
|
EndedAt string `json:"endedAt,omitempty"`
|
|
EndedAtMs int64 `json:"endedAtMs,omitempty"`
|
|
SizeBytes int64 `json:"sizeBytes,omitempty"`
|
|
DurationSeconds float64 `json:"durationSeconds,omitempty"`
|
|
PreviewState string `json:"previewState,omitempty"`
|
|
RoomStatus string `json:"roomStatus,omitempty"`
|
|
IsOnline bool `json:"isOnline,omitempty"`
|
|
ModelImageURL string `json:"modelImageUrl,omitempty"`
|
|
ModelChatRoomURL string `json:"modelChatRoomUrl,omitempty"`
|
|
|
|
PostWorkKey string `json:"postWorkKey,omitempty"`
|
|
PostWork any `json:"postWork,omitempty"`
|
|
|
|
TS int64 `json:"ts"`
|
|
}
|
|
|
|
type ssePublishItem struct {
|
|
EventName string
|
|
Data []byte
|
|
}
|
|
|
|
func isTerminalJobStatusForSSE(status JobStatus) bool {
|
|
s := strings.ToLower(strings.TrimSpace(string(status)))
|
|
return s == "stopped" ||
|
|
s == "finished" ||
|
|
s == "failed" ||
|
|
s == "done" ||
|
|
s == "completed" ||
|
|
s == "canceled" ||
|
|
s == "cancelled"
|
|
}
|
|
|
|
func isPostworkJobForSSE(j *RecordJob) bool {
|
|
if j == nil {
|
|
return false
|
|
}
|
|
|
|
phase := strings.ToLower(strings.TrimSpace(j.Phase))
|
|
pwKey := strings.TrimSpace(j.PostWorkKey)
|
|
|
|
if pwKey != "" {
|
|
return true
|
|
}
|
|
|
|
if j.PostWork != nil {
|
|
// falls PostWork als struct/map vorliegt, reicht für SSE der generelle Hinweis:
|
|
return true
|
|
}
|
|
|
|
if j.EndedAt != nil && phase != "" {
|
|
return true
|
|
}
|
|
|
|
if phase == "postwork" {
|
|
return true
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
func isVisibleDownloadJobForSSE(j *RecordJob) bool {
|
|
if j == nil {
|
|
return false
|
|
}
|
|
if isPostworkJobForSSE(j) {
|
|
return false
|
|
}
|
|
if isTerminalJobStatusForSSE(j.Status) {
|
|
return false
|
|
}
|
|
if j.EndedAt != nil {
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
func isVisiblePostworkJobForSSE(j *RecordJob) bool {
|
|
if j == nil {
|
|
return false
|
|
}
|
|
if !isPostworkJobForSSE(j) {
|
|
return false
|
|
}
|
|
if isTerminalJobStatusForSSE(j.Status) {
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
func shouldPublishModelEventForJob(j *RecordJob) bool {
|
|
return isVisibleDownloadJobForSSE(j) || isVisiblePostworkJobForSSE(j)
|
|
}
|
|
|
|
func visibleJobEventsJSON() []ssePublishItem {
|
|
nowTs := time.Now().UnixMilli()
|
|
out := make([]ssePublishItem, 0, 64)
|
|
|
|
jobsMu.Lock()
|
|
defer jobsMu.Unlock()
|
|
|
|
for _, j := range jobs {
|
|
if j == nil || j.Hidden {
|
|
continue
|
|
}
|
|
if !shouldPublishModelEventForJob(j) {
|
|
continue
|
|
}
|
|
|
|
eventName := sseModelEventNameForJob(j)
|
|
if eventName == "" {
|
|
continue
|
|
}
|
|
|
|
payload := jobEvent{
|
|
Type: "job_upsert",
|
|
Model: eventName,
|
|
JobID: j.ID,
|
|
Status: j.Status,
|
|
Phase: j.Phase,
|
|
Progress: j.Progress,
|
|
SourceURL: j.SourceURL,
|
|
Output: j.Output,
|
|
StartedAt: j.StartedAt.Format(time.RFC3339Nano),
|
|
StartedAtMs: j.StartedAtMs,
|
|
SizeBytes: j.SizeBytes,
|
|
DurationSeconds: j.DurationSeconds,
|
|
PreviewState: j.PreviewState,
|
|
PostWorkKey: strings.TrimSpace(j.PostWorkKey),
|
|
PostWork: j.PostWork,
|
|
TS: nowTs,
|
|
}
|
|
|
|
if sm := sseStoredModelForJob(j); sm != nil {
|
|
payload.RoomStatus = strings.ToLower(strings.TrimSpace(sm.RoomStatus))
|
|
payload.IsOnline = sm.IsOnline
|
|
payload.ModelImageURL = strings.TrimSpace(sm.ImageURL)
|
|
payload.ModelChatRoomURL = strings.TrimSpace(sm.ChatRoomURL)
|
|
}
|
|
|
|
if j.EndedAt != nil {
|
|
payload.EndedAt = j.EndedAt.Format(time.RFC3339Nano)
|
|
payload.EndedAtMs = j.EndedAtMs
|
|
}
|
|
|
|
b, err := json.Marshal(payload)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
out = append(out, ssePublishItem{
|
|
EventName: eventName,
|
|
Data: b,
|
|
})
|
|
}
|
|
|
|
return out
|
|
}
|
|
|
|
func publishJobUpsert(j *RecordJob) {
|
|
if j == nil || j.Hidden {
|
|
return
|
|
}
|
|
if !shouldPublishModelEventForJob(j) {
|
|
return
|
|
}
|
|
|
|
eventName := sseModelEventNameForJob(j)
|
|
if eventName == "" {
|
|
return
|
|
}
|
|
|
|
payload := jobEvent{
|
|
Type: "job_upsert",
|
|
Model: eventName,
|
|
JobID: j.ID,
|
|
Status: j.Status,
|
|
Phase: j.Phase,
|
|
Progress: j.Progress,
|
|
SourceURL: j.SourceURL,
|
|
Output: j.Output,
|
|
StartedAt: j.StartedAt.Format(time.RFC3339Nano),
|
|
StartedAtMs: j.StartedAtMs,
|
|
EndedAtMs: j.EndedAtMs,
|
|
SizeBytes: j.SizeBytes,
|
|
DurationSeconds: j.DurationSeconds,
|
|
PreviewState: j.PreviewState,
|
|
PostWorkKey: strings.TrimSpace(j.PostWorkKey),
|
|
PostWork: j.PostWork,
|
|
TS: time.Now().UnixMilli(),
|
|
}
|
|
|
|
if sm := sseStoredModelForJob(j); sm != nil {
|
|
payload.RoomStatus = strings.ToLower(strings.TrimSpace(sm.RoomStatus))
|
|
payload.IsOnline = sm.IsOnline
|
|
payload.ModelImageURL = strings.TrimSpace(sm.ImageURL)
|
|
payload.ModelChatRoomURL = strings.TrimSpace(sm.ChatRoomURL)
|
|
}
|
|
|
|
if j.EndedAt != nil {
|
|
payload.EndedAt = j.EndedAt.Format(time.RFC3339Nano)
|
|
}
|
|
|
|
b, _ := json.Marshal(payload)
|
|
publishSSE(eventName, b)
|
|
}
|
|
|
|
func publishJobRemove(j *RecordJob) {
|
|
if j == nil || j.Hidden {
|
|
return
|
|
}
|
|
|
|
eventName := sseModelEventNameForJob(j)
|
|
if eventName == "" {
|
|
return
|
|
}
|
|
|
|
b, _ := json.Marshal(jobEvent{
|
|
Type: "job_remove",
|
|
Model: eventName,
|
|
JobID: j.ID,
|
|
TS: time.Now().UnixMilli(),
|
|
})
|
|
|
|
publishSSE(eventName, b)
|
|
}
|
|
|
|
func sseModelEventNameForJob(j *RecordJob) string {
|
|
if j == nil {
|
|
return ""
|
|
}
|
|
|
|
src := strings.TrimSpace(j.SourceURL)
|
|
|
|
switch detectProvider(src) {
|
|
case "chaturbate":
|
|
if u := strings.TrimSpace(extractUsername(src)); u != "" {
|
|
return strings.ToLower(u)
|
|
}
|
|
case "mfc":
|
|
if u := strings.TrimSpace(extractMFCUsername(src)); u != "" {
|
|
return strings.ToLower(u)
|
|
}
|
|
}
|
|
|
|
return ""
|
|
}
|
|
|
|
func sseStoredModelForJob(j *RecordJob) *StoredModel {
|
|
if j == nil || cbModelStore == nil {
|
|
return nil
|
|
}
|
|
|
|
src := strings.TrimSpace(j.SourceURL)
|
|
if src == "" {
|
|
return nil
|
|
}
|
|
|
|
host := ""
|
|
modelKey := ""
|
|
|
|
switch detectProvider(src) {
|
|
case "chaturbate":
|
|
host = "chaturbate.com"
|
|
modelKey = strings.ToLower(strings.TrimSpace(extractUsername(src)))
|
|
default:
|
|
return nil
|
|
}
|
|
|
|
if host == "" || modelKey == "" {
|
|
return nil
|
|
}
|
|
|
|
m, ok := cbModelStore.GetByHostAndModelKey(host, modelKey)
|
|
if !ok {
|
|
return nil
|
|
}
|
|
|
|
return m
|
|
}
|
|
|
|
func initSSE() {
|
|
srv := sse.New()
|
|
srv.SplitData = true
|
|
|
|
stream := srv.CreateStream("events")
|
|
stream.AutoReplay = false
|
|
|
|
sseApp = &appSSE{
|
|
server: srv,
|
|
}
|
|
|
|
// Debounced broadcaster (done changed)
|
|
go func() {
|
|
for range doneNotify {
|
|
time.Sleep(40 * time.Millisecond)
|
|
for {
|
|
select {
|
|
case <-doneNotify:
|
|
default:
|
|
goto SEND
|
|
}
|
|
}
|
|
SEND:
|
|
seq := atomic.AddUint64(&doneSeq, 1)
|
|
b, _ := json.Marshal(map[string]any{
|
|
"type": "doneChanged",
|
|
"seq": seq,
|
|
"ts": time.Now().UnixMilli(),
|
|
})
|
|
publishSSE("doneChanged", b)
|
|
}
|
|
}()
|
|
|
|
// Debounced broadcaster (assets)
|
|
go func() {
|
|
for range assetsNotify {
|
|
time.Sleep(80 * time.Millisecond)
|
|
for {
|
|
select {
|
|
case <-assetsNotify:
|
|
default:
|
|
goto SEND
|
|
}
|
|
}
|
|
SEND:
|
|
b := assetsSnapshotJSON()
|
|
if len(b) > 0 {
|
|
publishSSE("state", b)
|
|
}
|
|
}
|
|
}()
|
|
|
|
// Per running job: 1 SSE event per second
|
|
go func() {
|
|
t := time.NewTicker(1 * time.Second)
|
|
defer t.Stop()
|
|
|
|
for range t.C {
|
|
events := visibleJobEventsJSON()
|
|
for _, ev := range events {
|
|
if len(ev.Data) == 0 || ev.EventName == "" {
|
|
continue
|
|
}
|
|
publishSSE(ev.EventName, ev.Data)
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
func publishSSE(eventName string, data []byte) {
|
|
if sseApp == nil || sseApp.server == nil {
|
|
return
|
|
}
|
|
if len(data) == 0 {
|
|
return
|
|
}
|
|
|
|
sseApp.server.Publish("events", &sse.Event{
|
|
Event: []byte(eventName),
|
|
Data: data,
|
|
})
|
|
}
|
|
|
|
// -------------------- notify channels --------------------
|
|
|
|
var (
|
|
doneNotify = make(chan struct{}, 1)
|
|
doneSeq uint64
|
|
|
|
assetsNotify = make(chan struct{}, 1)
|
|
)
|
|
|
|
func notifyDoneChanged() {
|
|
select {
|
|
case doneNotify <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
|
|
func notifyAssetsChanged() {
|
|
select {
|
|
case assetsNotify <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
|
|
// -------------------- snapshots --------------------
|
|
|
|
func jobsSnapshotJSON() []byte {
|
|
jobsMu.Lock()
|
|
list := make([]*RecordJob, 0, len(jobs))
|
|
for _, j := range jobs {
|
|
if j == nil || j.Hidden {
|
|
continue
|
|
}
|
|
c := *j
|
|
c.cancel = nil
|
|
list = append(list, &c)
|
|
}
|
|
jobsMu.Unlock()
|
|
|
|
sort.Slice(list, func(i, j int) bool {
|
|
return list[i].StartedAt.After(list[j].StartedAt)
|
|
})
|
|
|
|
b, _ := json.Marshal(list)
|
|
return b
|
|
}
|
|
|
|
func assetsSnapshotJSON() []byte {
|
|
assetsTaskMu.Lock()
|
|
st := assetsTaskState
|
|
assetsTaskMu.Unlock()
|
|
|
|
b, _ := json.Marshal(st)
|
|
return b
|
|
}
|
|
|
|
// -------------------- unified stream handler --------------------
|
|
|
|
func eventsStream(w http.ResponseWriter, r *http.Request) {
|
|
if r.Method != http.MethodGet {
|
|
http.Error(w, "Nur GET erlaubt", http.StatusMethodNotAllowed)
|
|
return
|
|
}
|
|
|
|
if sseApp == nil || sseApp.server == nil {
|
|
http.Error(w, "SSE nicht initialisiert", http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
q := r.URL.Query()
|
|
q.Set("stream", "events")
|
|
|
|
r2 := r.Clone(r.Context())
|
|
r2.URL.RawQuery = q.Encode()
|
|
|
|
sseApp.server.ServeHTTP(w, r2)
|
|
}
|
|
|
|
// -------------------- optional helper --------------------
|
|
|
|
func publishRawSSE(eventName string, buf *bytes.Buffer) {
|
|
if buf == nil {
|
|
return
|
|
}
|
|
publishSSE(eventName, buf.Bytes())
|
|
}
|