// backend\sse.go package main import ( "encoding/json" "fmt" "net/http" "sync" "time" ) // -------------------- SSE primitives -------------------- type sseEvent struct { name string data []byte } type sseHub struct { mu sync.Mutex clients map[chan sseEvent]struct{} } func newSSEHub() *sseHub { return &sseHub{clients: map[chan sseEvent]struct{}{}} } func (h *sseHub) add(ch chan sseEvent) { h.mu.Lock() h.clients[ch] = struct{}{} h.mu.Unlock() } func (h *sseHub) remove(ch chan sseEvent) { h.mu.Lock() delete(h.clients, ch) h.mu.Unlock() close(ch) } func (h *sseHub) broadcastEvent(name string, b []byte) { h.mu.Lock() defer h.mu.Unlock() ev := sseEvent{name: name, data: b} for ch := range h.clients { select { case ch <- ev: default: } } } // -------------------- only perf SSE -------------------- var ( perfHub = newSSEHub() ) // initSSE startet nur noch den perf broadcaster. func initSSE() { go func() { t := time.NewTicker(3 * time.Second) defer t.Stop() for range t.C { b := perfSnapshotJSON() if len(b) > 0 { perfHub.broadcastEvent("perf", b) } } }() } // perfSnapshotJSON liefert einen Snapshot für das Frontend (PerformanceMonitor). func perfSnapshotJSON() []byte { payload := buildPerfSnapshot() b, _ := json.Marshal(payload) return b } // -------------------- SSE: /api/stream -------------------- // // events: // - perf -> PerfSnapshot func appStream(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { http.Error(w, "Nur GET erlaubt", http.StatusMethodNotAllowed) return } flusher, ok := w.(http.Flusher) if !ok { http.Error(w, "Streaming nicht unterstützt", http.StatusInternalServerError) return } h := w.Header() h.Set("Content-Type", "text/event-stream; charset=utf-8") h.Set("Cache-Control", "no-cache, no-transform") h.Set("Connection", "keep-alive") h.Set("X-Accel-Buffering", "no") w.WriteHeader(http.StatusOK) writeEvent := func(event string, data []byte) bool { if event != "" { if _, err := fmt.Fprintf(w, "event: %s\n", event); err != nil { return false } } if len(data) > 0 { if _, err := fmt.Fprintf(w, "data: %s\n\n", data); err != nil { return false } } else { if _, err := fmt.Fprint(w, "\n"); err != nil { return false } } flusher.Flush() return true } writeComment := func(msg string) bool { if _, err := fmt.Fprintf(w, ": %s\n\n", msg); err != nil { return false } flusher.Flush() return true } if _, err := fmt.Fprintf(w, "retry: 3000\n\n"); err != nil { return } flusher.Flush() perfCh := make(chan sseEvent, 32) perfHub.add(perfCh) defer perfHub.remove(perfCh) // optional: direkt initialen perf snapshot schicken if b := perfSnapshotJSON(); len(b) > 0 { if !writeEvent("perf", b) { return } } ctx := r.Context() ping := time.NewTicker(15 * time.Second) defer ping.Stop() for { select { case <-ctx.Done(): return case ev, ok := <-perfCh: if !ok { return } if len(ev.data) == 0 { continue } eventName := ev.name if eventName == "" { eventName = "perf" } if !writeEvent(eventName, ev.data) { return } case <-ping.C: if !writeComment(fmt.Sprintf("ping %d", time.Now().Unix())) { return } } } }