// backend/sse.go package main import ( "encoding/json" "fmt" "io" "net/http" "sort" "sync" "sync/atomic" "time" ) // -------------------- SSE primitives -------------------- type sseHub struct { mu sync.Mutex clients map[chan []byte]struct{} } func newSSEHub() *sseHub { return &sseHub{clients: map[chan []byte]struct{}{}} } func (h *sseHub) add(ch chan []byte) { h.mu.Lock() h.clients[ch] = struct{}{} h.mu.Unlock() } func (h *sseHub) remove(ch chan []byte) { h.mu.Lock() delete(h.clients, ch) h.mu.Unlock() close(ch) } func (h *sseHub) broadcast(b []byte) { h.mu.Lock() defer h.mu.Unlock() for ch := range h.clients { // Non-blocking: langsame Clients droppen Updates (holen sich beim nächsten Update wieder ein) select { case ch <- b: default: } } } // -------------------- SSE channels + notify -------------------- var ( // done changed stream (Client soll nur "refetch done" machen) doneHub = newSSEHub() doneNotify = make(chan struct{}, 1) doneSeq uint64 // record jobs stream recordJobsHub = newSSEHub() recordJobsNotify = make(chan struct{}, 1) // assets task stream assetsHub = newSSEHub() assetsNotify = make(chan struct{}, 1) ) func notifyDoneChanged() { select { case doneNotify <- struct{}{}: default: } } func notifyJobsChanged() { select { case recordJobsNotify <- struct{}{}: default: } } func notifyAssetsChanged() { select { case assetsNotify <- struct{}{}: default: } } // initSSE startet die Debounce-Broadcaster. // Wichtig: wird aus main.go init() aufgerufen. func initSSE() { // Debounced broadcaster (jobs) go func() { for range recordJobsNotify { time.Sleep(40 * time.Millisecond) for { select { case <-recordJobsNotify: default: goto SEND } } SEND: recordJobsHub.broadcast(jobsSnapshotJSON()) } }() // Debounced broadcaster (done changed) go func() { for range doneNotify { time.Sleep(40 * time.Millisecond) for { select { case <-doneNotify: default: goto SEND } } SEND: seq := atomic.AddUint64(&doneSeq, 1) b := []byte(fmt.Sprintf(`{"type":"doneChanged","seq":%d,"ts":%d}`, seq, time.Now().UnixMilli())) doneHub.broadcast(b) } }() // ✅ Debounced broadcaster (assets task) go func() { for range assetsNotify { time.Sleep(80 * time.Millisecond) for { select { case <-assetsNotify: default: goto SEND } } SEND: b := assetsSnapshotJSON() if len(b) > 0 { assetsHub.broadcast(b) } } }() } // -------------------- SSE: /api/record/stream -------------------- // jobsSnapshotJSON liefert die aktuelle (gefilterte) Job-Liste als JSON. // Greift auf jobs/jobsMu aus main.go zu (gleiches Package). func jobsSnapshotJSON() []byte { jobsMu.Lock() list := make([]*RecordJob, 0, len(jobs)) for _, j := range jobs { // Hidden-Jobs niemals an die UI senden if j == nil || j.Hidden { continue } c := *j c.cancel = nil // nicht serialisieren list = append(list, &c) } jobsMu.Unlock() // neueste zuerst sort.Slice(list, func(i, j int) bool { return list[i].StartedAt.After(list[j].StartedAt) }) b, _ := json.Marshal(list) return b } func recordStream(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { http.Error(w, "Nur GET erlaubt", http.StatusMethodNotAllowed) return } flusher, ok := w.(http.Flusher) if !ok { http.Error(w, "Streaming nicht unterstützt", http.StatusInternalServerError) return } // SSE-Header h := w.Header() h.Set("Content-Type", "text/event-stream; charset=utf-8") h.Set("Cache-Control", "no-cache, no-transform") h.Set("Connection", "keep-alive") h.Set("X-Accel-Buffering", "no") // hilfreich bei Reverse-Proxies // sofort starten w.WriteHeader(http.StatusOK) writeEvent := func(event string, data []byte) bool { // returns false => client weg / write error if event != "" { if _, err := fmt.Fprintf(w, "event: %s\n", event); err != nil { return false } } if len(data) > 0 { if _, err := fmt.Fprintf(w, "data: %s\n\n", data); err != nil { return false } } else { // empty payload ok (nur terminator) if _, err := io.WriteString(w, "\n"); err != nil { return false } } flusher.Flush() return true } writeComment := func(msg string) bool { if _, err := fmt.Fprintf(w, ": %s\n\n", msg); err != nil { return false } flusher.Flush() return true } // Reconnect-Hinweis if _, err := fmt.Fprintf(w, "retry: 3000\n\n"); err != nil { return } flusher.Flush() // Channel + Hub ch := make(chan []byte, 32) recordJobsHub.add(ch) defer recordJobsHub.remove(ch) // Initialer Snapshot sofort if b := jobsSnapshotJSON(); len(b) > 0 { if !writeEvent("jobs", b) { return } } ctx := r.Context() // Ping/Keepalive ping := time.NewTicker(15 * time.Second) defer ping.Stop() for { select { case <-ctx.Done(): return case b, ok := <-ch: if !ok { return } if len(b) == 0 { continue } // Burst-Coalescing: wenn viele Updates schnell kommen, nur das neueste senden last := b drain: for i := 0; i < 64; i++ { select { case nb, ok := <-ch: if !ok { return } if len(nb) > 0 { last = nb } default: break drain } } if !writeEvent("jobs", last) { return } case <-ping.C: // Keepalive als Kommentar (stört nicht, hält Verbindungen offen) if !writeComment(fmt.Sprintf("ping %d", time.Now().Unix())) { return } } } } // -------------------- SSE: /api/tasks/assets/stream -------------------- func assetsSnapshotJSON() []byte { assetsTaskMu.Lock() st := assetsTaskState assetsTaskMu.Unlock() b, _ := json.Marshal(st) return b } func assetsStream(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { http.Error(w, "Nur GET erlaubt", http.StatusMethodNotAllowed) return } flusher, ok := w.(http.Flusher) if !ok { http.Error(w, "Streaming nicht unterstützt", http.StatusInternalServerError) return } h := w.Header() h.Set("Content-Type", "text/event-stream; charset=utf-8") h.Set("Cache-Control", "no-cache, no-transform") h.Set("Connection", "keep-alive") h.Set("X-Accel-Buffering", "no") w.WriteHeader(http.StatusOK) // Reconnect-Hinweis fmt.Fprintf(w, "retry: 3000\n\n") flusher.Flush() writeEvent := func(event string, data []byte) bool { if event != "" { if _, err := fmt.Fprintf(w, "event: %s\n", event); err != nil { return false } } if _, err := fmt.Fprintf(w, "data: %s\n\n", data); err != nil { return false } flusher.Flush() return true } writeComment := func(msg string) bool { if _, err := fmt.Fprintf(w, ": %s\n\n", msg); err != nil { return false } flusher.Flush() return true } ch := make(chan []byte, 32) assetsHub.add(ch) defer assetsHub.remove(ch) // Initial Snapshot if b := assetsSnapshotJSON(); len(b) > 0 { if !writeEvent("state", b) { return } } ctx := r.Context() ping := time.NewTicker(15 * time.Second) defer ping.Stop() for { select { case <-ctx.Done(): return case b, ok := <-ch: if !ok { return } if len(b) == 0 { continue } // coalesce last := b drain: for i := 0; i < 64; i++ { select { case nb, ok := <-ch: if !ok { return } if len(nb) > 0 { last = nb } default: break drain } } if !writeEvent("state", last) { return } case <-ping.C: if !writeComment(fmt.Sprintf("ping %d", time.Now().Unix())) { return } } } }