nsfwapp/backend/sse.go
2026-03-04 18:44:22 +01:00

407 lines
7.9 KiB
Go

// backend/sse.go
package main
import (
"encoding/json"
"fmt"
"io"
"net/http"
"sort"
"sync"
"sync/atomic"
"time"
)
// -------------------- SSE primitives --------------------
type sseHub struct {
mu sync.Mutex
clients map[chan []byte]struct{}
}
func newSSEHub() *sseHub {
return &sseHub{clients: map[chan []byte]struct{}{}}
}
func (h *sseHub) add(ch chan []byte) {
h.mu.Lock()
h.clients[ch] = struct{}{}
h.mu.Unlock()
}
func (h *sseHub) remove(ch chan []byte) {
h.mu.Lock()
delete(h.clients, ch)
h.mu.Unlock()
close(ch)
}
func (h *sseHub) broadcast(b []byte) {
h.mu.Lock()
defer h.mu.Unlock()
for ch := range h.clients {
// Non-blocking: langsame Clients droppen Updates (holen sich beim nächsten Update wieder ein)
select {
case ch <- b:
default:
}
}
}
// -------------------- SSE channels + notify --------------------
var (
// done changed stream (Client soll nur "refetch done" machen)
doneHub = newSSEHub()
doneNotify = make(chan struct{}, 1)
doneSeq uint64
// record jobs stream
recordJobsHub = newSSEHub()
recordJobsNotify = make(chan struct{}, 1)
// assets task stream
assetsHub = newSSEHub()
assetsNotify = make(chan struct{}, 1)
// perf stream (periodic snapshot)
perfHub = newSSEHub()
)
func notifyDoneChanged() {
select {
case doneNotify <- struct{}{}:
default:
}
}
func notifyJobsChanged() {
select {
case recordJobsNotify <- struct{}{}:
default:
}
}
func notifyAssetsChanged() {
select {
case assetsNotify <- struct{}{}:
default:
}
}
// initSSE startet die Debounce-/Ticker-Broadcaster.
// Wichtig: wird aus main.go init() aufgerufen.
func initSSE() {
// Debounced broadcaster (jobs)
go func() {
for range recordJobsNotify {
time.Sleep(40 * time.Millisecond)
for {
select {
case <-recordJobsNotify:
default:
goto SEND
}
}
SEND:
recordJobsHub.broadcast(jobsSnapshotJSON())
}
}()
// Debounced broadcaster (done changed)
go func() {
for range doneNotify {
time.Sleep(40 * time.Millisecond)
for {
select {
case <-doneNotify:
default:
goto SEND
}
}
SEND:
seq := atomic.AddUint64(&doneSeq, 1)
b := []byte(fmt.Sprintf(`{"type":"doneChanged","seq":%d,"ts":%d}`, seq, time.Now().UnixMilli()))
doneHub.broadcast(b)
}
}()
// Debounced broadcaster (assets task)
go func() {
for range assetsNotify {
time.Sleep(80 * time.Millisecond)
for {
select {
case <-assetsNotify:
default:
goto SEND
}
}
SEND:
b := assetsSnapshotJSON()
if len(b) > 0 {
assetsHub.broadcast(b)
}
}
}()
// Periodic broadcaster (perf)
go func() {
t := time.NewTicker(3 * time.Second)
defer t.Stop()
for range t.C {
b := perfSnapshotJSON()
if len(b) > 0 {
perfHub.broadcast(b)
}
}
}()
}
// -------------------- Snapshots --------------------
// jobsSnapshotJSON liefert die aktuelle (gefilterte) Job-Liste als JSON.
// Greift auf jobs/jobsMu aus main.go zu (gleiches Package).
func jobsSnapshotJSON() []byte {
jobsMu.Lock()
list := make([]*RecordJob, 0, len(jobs))
for _, j := range jobs {
// Hidden-Jobs niemals an die UI senden
if j == nil || j.Hidden {
continue
}
c := *j
c.cancel = nil // nicht serialisieren
list = append(list, &c)
}
jobsMu.Unlock()
// neueste zuerst
sort.Slice(list, func(i, j int) bool {
return list[i].StartedAt.After(list[j].StartedAt)
})
b, _ := json.Marshal(list)
return b
}
func assetsSnapshotJSON() []byte {
assetsTaskMu.Lock()
st := assetsTaskState
assetsTaskMu.Unlock()
b, _ := json.Marshal(st)
return b
}
// perfSnapshotJSON liefert einen Snapshot für das Frontend (PerformanceMonitor).
//
// ✅ WICHTIG: Hier musst du die bestehende Logik aus deinem perfStreamHandler
// (CPU%, Disk-Free/Total/Used%, serverMs) in eine gemeinsame Funktion ziehen.
// Diese Stub-Version kompiliert, liefert aber nur serverMs (Rest null).
func perfSnapshotJSON() []byte {
payload := map[string]any{
"cpuPercent": nil,
"diskFreeBytes": nil,
"diskTotalBytes": nil,
"diskUsedPercent": nil,
"serverMs": time.Now().UnixMilli(), // Frontend: ping = Date.now() - serverMs
}
b, _ := json.Marshal(payload)
return b
}
// -------------------- SSE: /api/stream (UNIFIED) --------------------
//
// Ein Stream für:
// - event: jobs -> []RecordJob
// - event: doneChanged-> {"type":"doneChanged","seq":...,"ts":...}
// - event: state -> assetsTaskState
// - event: perf -> PerfSnapshot
//
// Frontend soll nur noch /api/stream öffnen (sseSingleton deduped per URL).
func appStream(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "Nur GET erlaubt", http.StatusMethodNotAllowed)
return
}
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "Streaming nicht unterstützt", http.StatusInternalServerError)
return
}
// SSE-Header
h := w.Header()
h.Set("Content-Type", "text/event-stream; charset=utf-8")
h.Set("Cache-Control", "no-cache, no-transform")
h.Set("Connection", "keep-alive")
h.Set("X-Accel-Buffering", "no")
// sofort starten
w.WriteHeader(http.StatusOK)
writeEvent := func(event string, data []byte) bool {
if event != "" {
if _, err := fmt.Fprintf(w, "event: %s\n", event); err != nil {
return false
}
}
if len(data) > 0 {
if _, err := fmt.Fprintf(w, "data: %s\n\n", data); err != nil {
return false
}
} else {
if _, err := io.WriteString(w, "\n"); err != nil {
return false
}
}
flusher.Flush()
return true
}
writeComment := func(msg string) bool {
if _, err := fmt.Fprintf(w, ": %s\n\n", msg); err != nil {
return false
}
flusher.Flush()
return true
}
// Reconnect-Hinweis
if _, err := fmt.Fprintf(w, "retry: 3000\n\n"); err != nil {
return
}
flusher.Flush()
// pro Client: je Hub ein Channel
jobsCh := make(chan []byte, 32)
doneCh := make(chan []byte, 32)
assetsCh := make(chan []byte, 32)
perfCh := make(chan []byte, 32)
recordJobsHub.add(jobsCh)
defer recordJobsHub.remove(jobsCh)
doneHub.add(doneCh)
defer doneHub.remove(doneCh)
assetsHub.add(assetsCh)
defer assetsHub.remove(assetsCh)
perfHub.add(perfCh)
defer perfHub.remove(perfCh)
// Initial Snapshots
if b := jobsSnapshotJSON(); len(b) > 0 {
if !writeEvent("jobs", b) {
return
}
}
// done: initialer "kick" (hilft, UI sofort zu syncen)
seq := atomic.LoadUint64(&doneSeq)
initDone := []byte(fmt.Sprintf(`{"type":"doneChanged","seq":%d,"ts":%d}`, seq, time.Now().UnixMilli()))
if !writeEvent("doneChanged", initDone) {
return
}
if b := assetsSnapshotJSON(); len(b) > 0 {
if !writeEvent("state", b) {
return
}
}
if b := perfSnapshotJSON(); len(b) > 0 {
if !writeEvent("perf", b) {
return
}
}
// coalesce helper: wenn Burst, nur latest senden
drainLatest := func(first []byte, ch <-chan []byte) []byte {
last := first
for i := 0; i < 64; i++ {
select {
case nb, ok := <-ch:
if !ok {
return last
}
if len(nb) > 0 {
last = nb
}
default:
return last
}
}
return last
}
ctx := r.Context()
ping := time.NewTicker(15 * time.Second)
defer ping.Stop()
for {
select {
case <-ctx.Done():
return
case b, ok := <-jobsCh:
if !ok {
return
}
if len(b) == 0 {
continue
}
last := drainLatest(b, jobsCh)
if !writeEvent("jobs", last) {
return
}
case b, ok := <-doneCh:
if !ok {
return
}
if len(b) == 0 {
continue
}
last := drainLatest(b, doneCh)
if !writeEvent("doneChanged", last) {
return
}
case b, ok := <-assetsCh:
if !ok {
return
}
if len(b) == 0 {
continue
}
last := drainLatest(b, assetsCh)
if !writeEvent("state", last) {
return
}
case b, ok := <-perfCh:
if !ok {
return
}
if len(b) == 0 {
continue
}
last := drainLatest(b, perfCh)
if !writeEvent("perf", last) {
return
}
case <-ping.C:
if !writeComment(fmt.Sprintf("ping %d", time.Now().Unix())) {
return
}
}
}
}