219 lines
3.8 KiB
Go
219 lines
3.8 KiB
Go
// backend\sse.go
|
|
package main
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/json"
|
|
"net/http"
|
|
"sort"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/r3labs/sse/v2"
|
|
)
|
|
|
|
// -------------------- SSE server --------------------
|
|
|
|
type appSSE struct {
|
|
server *sse.Server
|
|
}
|
|
|
|
var sseApp *appSSE
|
|
|
|
func initSSE() {
|
|
srv := sse.New()
|
|
srv.SplitData = true
|
|
|
|
// ✅ Nur noch EIN Stream
|
|
stream := srv.CreateStream("events")
|
|
stream.AutoReplay = false
|
|
|
|
sseApp = &appSSE{
|
|
server: srv,
|
|
}
|
|
|
|
// Debounced broadcaster (jobs)
|
|
go func() {
|
|
for range recordJobsNotify {
|
|
time.Sleep(40 * time.Millisecond)
|
|
for {
|
|
select {
|
|
case <-recordJobsNotify:
|
|
default:
|
|
goto SEND
|
|
}
|
|
}
|
|
SEND:
|
|
b := jobsSnapshotJSON()
|
|
if len(b) > 0 {
|
|
publishSSE("jobs", b)
|
|
}
|
|
}
|
|
}()
|
|
|
|
// Debounced broadcaster (done changed)
|
|
go func() {
|
|
for range doneNotify {
|
|
time.Sleep(40 * time.Millisecond)
|
|
for {
|
|
select {
|
|
case <-doneNotify:
|
|
default:
|
|
goto SEND
|
|
}
|
|
}
|
|
SEND:
|
|
seq := atomic.AddUint64(&doneSeq, 1)
|
|
b, _ := json.Marshal(map[string]any{
|
|
"type": "doneChanged",
|
|
"seq": seq,
|
|
"ts": time.Now().UnixMilli(),
|
|
})
|
|
publishSSE("doneChanged", b)
|
|
}
|
|
}()
|
|
|
|
// Debounced broadcaster (assets)
|
|
go func() {
|
|
for range assetsNotify {
|
|
time.Sleep(80 * time.Millisecond)
|
|
for {
|
|
select {
|
|
case <-assetsNotify:
|
|
default:
|
|
goto SEND
|
|
}
|
|
}
|
|
SEND:
|
|
b := assetsSnapshotJSON()
|
|
if len(b) > 0 {
|
|
publishSSE("state", b)
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
func publishSSE(eventName string, data []byte) {
|
|
if sseApp == nil || sseApp.server == nil {
|
|
return
|
|
}
|
|
if len(data) == 0 {
|
|
return
|
|
}
|
|
|
|
sseApp.server.Publish("events", &sse.Event{
|
|
Event: []byte(eventName),
|
|
Data: data,
|
|
})
|
|
}
|
|
|
|
// -------------------- notify channels --------------------
|
|
|
|
var (
|
|
doneNotify = make(chan struct{}, 1)
|
|
doneSeq uint64
|
|
|
|
recordJobsNotify = make(chan struct{}, 1)
|
|
assetsNotify = make(chan struct{}, 1)
|
|
)
|
|
|
|
func notifyDoneChanged() {
|
|
select {
|
|
case doneNotify <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
|
|
func notifyJobsChanged() {
|
|
select {
|
|
case recordJobsNotify <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
|
|
func notifyAssetsChanged() {
|
|
select {
|
|
case assetsNotify <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
|
|
// -------------------- snapshots --------------------
|
|
|
|
func jobsSnapshotJSON() []byte {
|
|
jobsMu.Lock()
|
|
list := make([]*RecordJob, 0, len(jobs))
|
|
for _, j := range jobs {
|
|
if j == nil || j.Hidden {
|
|
continue
|
|
}
|
|
c := *j
|
|
c.cancel = nil
|
|
list = append(list, &c)
|
|
}
|
|
jobsMu.Unlock()
|
|
|
|
sort.Slice(list, func(i, j int) bool {
|
|
return list[i].StartedAt.After(list[j].StartedAt)
|
|
})
|
|
|
|
b, _ := json.Marshal(list)
|
|
return b
|
|
}
|
|
|
|
func assetsSnapshotJSON() []byte {
|
|
assetsTaskMu.Lock()
|
|
st := assetsTaskState
|
|
assetsTaskMu.Unlock()
|
|
|
|
b, _ := json.Marshal(st)
|
|
return b
|
|
}
|
|
|
|
// -------------------- unified stream handler --------------------
|
|
|
|
func eventsStream(w http.ResponseWriter, r *http.Request) {
|
|
if r.Method != http.MethodGet {
|
|
http.Error(w, "Nur GET erlaubt", http.StatusMethodNotAllowed)
|
|
return
|
|
}
|
|
|
|
if sseApp == nil || sseApp.server == nil {
|
|
http.Error(w, "SSE nicht initialisiert", http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
q := r.URL.Query()
|
|
q.Set("stream", "events")
|
|
|
|
r2 := r.Clone(r.Context())
|
|
r2.URL.RawQuery = q.Encode()
|
|
|
|
sseApp.server.ServeHTTP(w, r2)
|
|
}
|
|
|
|
// -------------------- optional compatibility handlers --------------------
|
|
// Falls du alte Routen noch kurz behalten willst, zeigen sie einfach
|
|
// auf denselben Unified-Stream.
|
|
|
|
func recordStream(w http.ResponseWriter, r *http.Request) {
|
|
eventsStream(w, r)
|
|
}
|
|
|
|
func doneStream(w http.ResponseWriter, r *http.Request) {
|
|
eventsStream(w, r)
|
|
}
|
|
|
|
func assetsStream(w http.ResponseWriter, r *http.Request) {
|
|
eventsStream(w, r)
|
|
}
|
|
|
|
// -------------------- optional helper --------------------
|
|
|
|
func publishRawSSE(eventName string, buf *bytes.Buffer) {
|
|
if buf == nil {
|
|
return
|
|
}
|
|
publishSSE(eventName, buf.Bytes())
|
|
}
|