402 lines
7.4 KiB
Go
402 lines
7.4 KiB
Go
// backend/sse.go
|
|
package main
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"sort"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
)
|
|
|
|
// -------------------- SSE primitives --------------------
|
|
|
|
type sseHub struct {
|
|
mu sync.Mutex
|
|
clients map[chan []byte]struct{}
|
|
}
|
|
|
|
func newSSEHub() *sseHub {
|
|
return &sseHub{clients: map[chan []byte]struct{}{}}
|
|
}
|
|
|
|
func (h *sseHub) add(ch chan []byte) {
|
|
h.mu.Lock()
|
|
h.clients[ch] = struct{}{}
|
|
h.mu.Unlock()
|
|
}
|
|
|
|
func (h *sseHub) remove(ch chan []byte) {
|
|
h.mu.Lock()
|
|
delete(h.clients, ch)
|
|
h.mu.Unlock()
|
|
close(ch)
|
|
}
|
|
|
|
func (h *sseHub) broadcast(b []byte) {
|
|
h.mu.Lock()
|
|
defer h.mu.Unlock()
|
|
for ch := range h.clients {
|
|
// Non-blocking: langsame Clients droppen Updates (holen sich beim nächsten Update wieder ein)
|
|
select {
|
|
case ch <- b:
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
|
|
// -------------------- SSE channels + notify --------------------
|
|
|
|
var (
|
|
// done changed stream (Client soll nur "refetch done" machen)
|
|
doneHub = newSSEHub()
|
|
doneNotify = make(chan struct{}, 1)
|
|
doneSeq uint64
|
|
|
|
// record jobs stream
|
|
recordJobsHub = newSSEHub()
|
|
recordJobsNotify = make(chan struct{}, 1)
|
|
|
|
// assets task stream
|
|
assetsHub = newSSEHub()
|
|
assetsNotify = make(chan struct{}, 1)
|
|
)
|
|
|
|
func notifyDoneChanged() {
|
|
select {
|
|
case doneNotify <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
|
|
func notifyJobsChanged() {
|
|
select {
|
|
case recordJobsNotify <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
|
|
func notifyAssetsChanged() {
|
|
select {
|
|
case assetsNotify <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
|
|
// initSSE startet die Debounce-Broadcaster.
|
|
// Wichtig: wird aus main.go init() aufgerufen.
|
|
func initSSE() {
|
|
// Debounced broadcaster (jobs)
|
|
go func() {
|
|
for range recordJobsNotify {
|
|
time.Sleep(40 * time.Millisecond)
|
|
for {
|
|
select {
|
|
case <-recordJobsNotify:
|
|
default:
|
|
goto SEND
|
|
}
|
|
}
|
|
SEND:
|
|
recordJobsHub.broadcast(jobsSnapshotJSON())
|
|
}
|
|
}()
|
|
|
|
// Debounced broadcaster (done changed)
|
|
go func() {
|
|
for range doneNotify {
|
|
time.Sleep(40 * time.Millisecond)
|
|
for {
|
|
select {
|
|
case <-doneNotify:
|
|
default:
|
|
goto SEND
|
|
}
|
|
}
|
|
SEND:
|
|
seq := atomic.AddUint64(&doneSeq, 1)
|
|
b := []byte(fmt.Sprintf(`{"type":"doneChanged","seq":%d,"ts":%d}`, seq, time.Now().UnixMilli()))
|
|
doneHub.broadcast(b)
|
|
}
|
|
}()
|
|
|
|
// ✅ Debounced broadcaster (assets task)
|
|
go func() {
|
|
for range assetsNotify {
|
|
time.Sleep(80 * time.Millisecond)
|
|
for {
|
|
select {
|
|
case <-assetsNotify:
|
|
default:
|
|
goto SEND
|
|
}
|
|
}
|
|
SEND:
|
|
b := assetsSnapshotJSON()
|
|
if len(b) > 0 {
|
|
assetsHub.broadcast(b)
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
// -------------------- SSE: /api/record/stream --------------------
|
|
|
|
// jobsSnapshotJSON liefert die aktuelle (gefilterte) Job-Liste als JSON.
|
|
// Greift auf jobs/jobsMu aus main.go zu (gleiches Package).
|
|
func jobsSnapshotJSON() []byte {
|
|
jobsMu.Lock()
|
|
list := make([]*RecordJob, 0, len(jobs))
|
|
for _, j := range jobs {
|
|
// Hidden-Jobs niemals an die UI senden
|
|
if j == nil || j.Hidden {
|
|
continue
|
|
}
|
|
c := *j
|
|
c.cancel = nil // nicht serialisieren
|
|
list = append(list, &c)
|
|
}
|
|
jobsMu.Unlock()
|
|
|
|
// neueste zuerst
|
|
sort.Slice(list, func(i, j int) bool {
|
|
return list[i].StartedAt.After(list[j].StartedAt)
|
|
})
|
|
|
|
b, _ := json.Marshal(list)
|
|
return b
|
|
}
|
|
|
|
func recordStream(w http.ResponseWriter, r *http.Request) {
|
|
if r.Method != http.MethodGet {
|
|
http.Error(w, "Nur GET erlaubt", http.StatusMethodNotAllowed)
|
|
return
|
|
}
|
|
|
|
flusher, ok := w.(http.Flusher)
|
|
if !ok {
|
|
http.Error(w, "Streaming nicht unterstützt", http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
// SSE-Header
|
|
h := w.Header()
|
|
h.Set("Content-Type", "text/event-stream; charset=utf-8")
|
|
h.Set("Cache-Control", "no-cache, no-transform")
|
|
h.Set("Connection", "keep-alive")
|
|
h.Set("X-Accel-Buffering", "no") // hilfreich bei Reverse-Proxies
|
|
|
|
// sofort starten
|
|
w.WriteHeader(http.StatusOK)
|
|
|
|
writeEvent := func(event string, data []byte) bool {
|
|
// returns false => client weg / write error
|
|
if event != "" {
|
|
if _, err := fmt.Fprintf(w, "event: %s\n", event); err != nil {
|
|
return false
|
|
}
|
|
}
|
|
if len(data) > 0 {
|
|
if _, err := fmt.Fprintf(w, "data: %s\n\n", data); err != nil {
|
|
return false
|
|
}
|
|
} else {
|
|
// empty payload ok (nur terminator)
|
|
if _, err := io.WriteString(w, "\n"); err != nil {
|
|
return false
|
|
}
|
|
}
|
|
flusher.Flush()
|
|
return true
|
|
}
|
|
|
|
writeComment := func(msg string) bool {
|
|
if _, err := fmt.Fprintf(w, ": %s\n\n", msg); err != nil {
|
|
return false
|
|
}
|
|
flusher.Flush()
|
|
return true
|
|
}
|
|
|
|
// Reconnect-Hinweis
|
|
if _, err := fmt.Fprintf(w, "retry: 3000\n\n"); err != nil {
|
|
return
|
|
}
|
|
flusher.Flush()
|
|
|
|
// Channel + Hub
|
|
ch := make(chan []byte, 32)
|
|
recordJobsHub.add(ch)
|
|
defer recordJobsHub.remove(ch)
|
|
|
|
// Initialer Snapshot sofort
|
|
if b := jobsSnapshotJSON(); len(b) > 0 {
|
|
if !writeEvent("jobs", b) {
|
|
return
|
|
}
|
|
}
|
|
|
|
ctx := r.Context()
|
|
|
|
// Ping/Keepalive
|
|
ping := time.NewTicker(15 * time.Second)
|
|
defer ping.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
|
|
case b, ok := <-ch:
|
|
if !ok {
|
|
return
|
|
}
|
|
if len(b) == 0 {
|
|
continue
|
|
}
|
|
|
|
// Burst-Coalescing: wenn viele Updates schnell kommen, nur das neueste senden
|
|
last := b
|
|
drain:
|
|
for i := 0; i < 64; i++ {
|
|
select {
|
|
case nb, ok := <-ch:
|
|
if !ok {
|
|
return
|
|
}
|
|
if len(nb) > 0 {
|
|
last = nb
|
|
}
|
|
default:
|
|
break drain
|
|
}
|
|
}
|
|
|
|
if !writeEvent("jobs", last) {
|
|
return
|
|
}
|
|
|
|
case <-ping.C:
|
|
// Keepalive als Kommentar (stört nicht, hält Verbindungen offen)
|
|
if !writeComment(fmt.Sprintf("ping %d", time.Now().Unix())) {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// -------------------- SSE: /api/tasks/assets/stream --------------------
|
|
|
|
func assetsSnapshotJSON() []byte {
|
|
assetsTaskMu.Lock()
|
|
st := assetsTaskState
|
|
assetsTaskMu.Unlock()
|
|
|
|
b, _ := json.Marshal(st)
|
|
return b
|
|
}
|
|
|
|
func assetsStream(w http.ResponseWriter, r *http.Request) {
|
|
if r.Method != http.MethodGet {
|
|
http.Error(w, "Nur GET erlaubt", http.StatusMethodNotAllowed)
|
|
return
|
|
}
|
|
|
|
flusher, ok := w.(http.Flusher)
|
|
if !ok {
|
|
http.Error(w, "Streaming nicht unterstützt", http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
h := w.Header()
|
|
h.Set("Content-Type", "text/event-stream; charset=utf-8")
|
|
h.Set("Cache-Control", "no-cache, no-transform")
|
|
h.Set("Connection", "keep-alive")
|
|
h.Set("X-Accel-Buffering", "no")
|
|
|
|
w.WriteHeader(http.StatusOK)
|
|
|
|
// Reconnect-Hinweis
|
|
fmt.Fprintf(w, "retry: 3000\n\n")
|
|
flusher.Flush()
|
|
|
|
writeEvent := func(event string, data []byte) bool {
|
|
if event != "" {
|
|
if _, err := fmt.Fprintf(w, "event: %s\n", event); err != nil {
|
|
return false
|
|
}
|
|
}
|
|
if _, err := fmt.Fprintf(w, "data: %s\n\n", data); err != nil {
|
|
return false
|
|
}
|
|
flusher.Flush()
|
|
return true
|
|
}
|
|
|
|
writeComment := func(msg string) bool {
|
|
if _, err := fmt.Fprintf(w, ": %s\n\n", msg); err != nil {
|
|
return false
|
|
}
|
|
flusher.Flush()
|
|
return true
|
|
}
|
|
|
|
ch := make(chan []byte, 32)
|
|
assetsHub.add(ch)
|
|
defer assetsHub.remove(ch)
|
|
|
|
// Initial Snapshot
|
|
if b := assetsSnapshotJSON(); len(b) > 0 {
|
|
if !writeEvent("state", b) {
|
|
return
|
|
}
|
|
}
|
|
|
|
ctx := r.Context()
|
|
ping := time.NewTicker(15 * time.Second)
|
|
defer ping.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
|
|
case b, ok := <-ch:
|
|
if !ok {
|
|
return
|
|
}
|
|
if len(b) == 0 {
|
|
continue
|
|
}
|
|
// coalesce
|
|
last := b
|
|
drain:
|
|
for i := 0; i < 64; i++ {
|
|
select {
|
|
case nb, ok := <-ch:
|
|
if !ok {
|
|
return
|
|
}
|
|
if len(nb) > 0 {
|
|
last = nb
|
|
}
|
|
default:
|
|
break drain
|
|
}
|
|
}
|
|
|
|
if !writeEvent("state", last) {
|
|
return
|
|
}
|
|
|
|
case <-ping.C:
|
|
if !writeComment(fmt.Sprintf("ping %d", time.Now().Unix())) {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|