nsfwapp/backend/sse.go
2026-03-07 13:21:31 +01:00

219 lines
3.8 KiB
Go

// backend\sse.go
package main
import (
"bytes"
"encoding/json"
"net/http"
"sort"
"sync/atomic"
"time"
"github.com/r3labs/sse/v2"
)
// -------------------- SSE server --------------------
type appSSE struct {
server *sse.Server
}
var sseApp *appSSE
func initSSE() {
srv := sse.New()
srv.SplitData = true
// ✅ Nur noch EIN Stream
stream := srv.CreateStream("events")
stream.AutoReplay = false
sseApp = &appSSE{
server: srv,
}
// Debounced broadcaster (jobs)
go func() {
for range recordJobsNotify {
time.Sleep(40 * time.Millisecond)
for {
select {
case <-recordJobsNotify:
default:
goto SEND
}
}
SEND:
b := jobsSnapshotJSON()
if len(b) > 0 {
publishSSE("jobs", b)
}
}
}()
// Debounced broadcaster (done changed)
go func() {
for range doneNotify {
time.Sleep(40 * time.Millisecond)
for {
select {
case <-doneNotify:
default:
goto SEND
}
}
SEND:
seq := atomic.AddUint64(&doneSeq, 1)
b, _ := json.Marshal(map[string]any{
"type": "doneChanged",
"seq": seq,
"ts": time.Now().UnixMilli(),
})
publishSSE("doneChanged", b)
}
}()
// Debounced broadcaster (assets)
go func() {
for range assetsNotify {
time.Sleep(80 * time.Millisecond)
for {
select {
case <-assetsNotify:
default:
goto SEND
}
}
SEND:
b := assetsSnapshotJSON()
if len(b) > 0 {
publishSSE("state", b)
}
}
}()
}
func publishSSE(eventName string, data []byte) {
if sseApp == nil || sseApp.server == nil {
return
}
if len(data) == 0 {
return
}
sseApp.server.Publish("events", &sse.Event{
Event: []byte(eventName),
Data: data,
})
}
// -------------------- notify channels --------------------
var (
doneNotify = make(chan struct{}, 1)
doneSeq uint64
recordJobsNotify = make(chan struct{}, 1)
assetsNotify = make(chan struct{}, 1)
)
func notifyDoneChanged() {
select {
case doneNotify <- struct{}{}:
default:
}
}
func notifyJobsChanged() {
select {
case recordJobsNotify <- struct{}{}:
default:
}
}
func notifyAssetsChanged() {
select {
case assetsNotify <- struct{}{}:
default:
}
}
// -------------------- snapshots --------------------
func jobsSnapshotJSON() []byte {
jobsMu.Lock()
list := make([]*RecordJob, 0, len(jobs))
for _, j := range jobs {
if j == nil || j.Hidden {
continue
}
c := *j
c.cancel = nil
list = append(list, &c)
}
jobsMu.Unlock()
sort.Slice(list, func(i, j int) bool {
return list[i].StartedAt.After(list[j].StartedAt)
})
b, _ := json.Marshal(list)
return b
}
func assetsSnapshotJSON() []byte {
assetsTaskMu.Lock()
st := assetsTaskState
assetsTaskMu.Unlock()
b, _ := json.Marshal(st)
return b
}
// -------------------- unified stream handler --------------------
func eventsStream(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "Nur GET erlaubt", http.StatusMethodNotAllowed)
return
}
if sseApp == nil || sseApp.server == nil {
http.Error(w, "SSE nicht initialisiert", http.StatusInternalServerError)
return
}
q := r.URL.Query()
q.Set("stream", "events")
r2 := r.Clone(r.Context())
r2.URL.RawQuery = q.Encode()
sseApp.server.ServeHTTP(w, r2)
}
// -------------------- optional compatibility handlers --------------------
// Falls du alte Routen noch kurz behalten willst, zeigen sie einfach
// auf denselben Unified-Stream.
func recordStream(w http.ResponseWriter, r *http.Request) {
eventsStream(w, r)
}
func doneStream(w http.ResponseWriter, r *http.Request) {
eventsStream(w, r)
}
func assetsStream(w http.ResponseWriter, r *http.Request) {
eventsStream(w, r)
}
// -------------------- optional helper --------------------
func publishRawSSE(eventName string, buf *bytes.Buffer) {
if buf == nil {
return
}
publishSSE(eventName, buf.Bytes())
}