diff --git a/backend/go.mod b/backend/go.mod index d5338bc..1310b23 100644 --- a/backend/go.mod +++ b/backend/go.mod @@ -6,6 +6,10 @@ require ( github.com/PuerkitoBio/goquery v1.11.0 github.com/google/uuid v1.6.0 github.com/grafov/m3u8 v0.12.1 + github.com/jackc/pgx/v5 v5.8.0 + github.com/pquerna/otp v1.5.0 + github.com/r3labs/sse/v2 v2.10.0 + golang.org/x/crypto v0.47.0 ) require ( @@ -13,35 +17,24 @@ require ( github.com/go-ole/go-ole v1.2.6 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect - github.com/jackc/pgx/v5 v5.8.0 // indirect github.com/jackc/puddle/v2 v2.2.2 // indirect github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect - github.com/pquerna/otp v1.5.0 // indirect github.com/shoenig/go-m1cpu v0.1.6 // indirect github.com/tklauser/go-sysconf v0.3.12 // indirect github.com/tklauser/numcpus v0.6.1 // indirect github.com/yusufpapurcu/wmi v1.2.4 // indirect - golang.org/x/crypto v0.47.0 // indirect golang.org/x/sync v0.19.0 // indirect golang.org/x/text v0.33.0 // indirect + gopkg.in/cenkalti/backoff.v1 v1.1.0 // indirect ) require ( github.com/TheTitanrain/w32 v0.0.0-20180517000239-4f5cfb03fabf // indirect github.com/andybalholm/cascadia v1.3.3 // indirect - github.com/dustin/go-humanize v1.0.1 // indirect - github.com/mattn/go-isatty v0.0.20 // indirect - github.com/ncruces/go-strftime v0.1.9 // indirect - github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect github.com/shirou/gopsutil/v3 v3.24.5 - github.com/sqweek/dialog v0.0.0-20240226140203-065105509627 // indirect - golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b // indirect + github.com/sqweek/dialog v0.0.0-20240226140203-065105509627 golang.org/x/image v0.35.0 golang.org/x/net v0.48.0 // indirect golang.org/x/sys v0.40.0 // indirect - modernc.org/libc v1.66.10 // indirect - modernc.org/mathutil v1.7.1 // indirect - modernc.org/memory v1.11.0 // indirect - modernc.org/sqlite v1.41.0 // indirect ) diff --git a/backend/go.sum b/backend/go.sum index 2d35994..2b594e8 100644 --- a/backend/go.sum +++ b/backend/go.sum @@ -7,11 +7,12 @@ github.com/andybalholm/cascadia v1.3.3/go.mod h1:xNd9bqTn98Ln4DwST8/nG+H0yuB8Hmg github.com/boombuler/barcode v1.0.1-0.20190219062509-6c824513bacc h1:biVzkmvwrH8WK8raXaxBx6fRVTlJILwEwQGL1I/ByEI= github.com/boombuler/barcode v1.0.1-0.20190219062509-6c824513bacc/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= -github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY= github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= @@ -27,26 +28,27 @@ github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4= github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I= -github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= -github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= -github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdhx/f4= -github.com/ncruces/go-strftime v0.1.9/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c h1:ncq/mPwQF4JjgDlrVEn3C11VoGHZN7m8qihwgMEtzYw= github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE= github.com/pquerna/otp v1.5.0 h1:NMMR+WrmaqXU4EzdGJEE1aUUI0AMRzsp96fFFWNPwxs= github.com/pquerna/otp v1.5.0/go.mod h1:dkJfzwRKNiegxyNb54X/3fLwhCynbMspSyWKnvi1AEg= -github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= -github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= +github.com/r3labs/sse/v2 v2.10.0 h1:hFEkLLFY4LDifoHdiCN/LlGBAdVJYsANaLqNYa1l/v0= +github.com/r3labs/sse/v2 v2.10.0/go.mod h1:Igau6Whc+F17QUgML1fYe1VPZzTV6EMCnYktEmkNJ7I= github.com/shirou/gopsutil/v3 v3.24.5 h1:i0t8kL+kQTvpAYToeuiVk3TgDeKOFioZO3Ztz/iZ9pI= github.com/shirou/gopsutil/v3 v3.24.5/go.mod h1:bsoOS1aStSs9ErQ1WWfxllSeS1K5D+U30r2NfcubMVk= github.com/shoenig/go-m1cpu v0.1.6 h1:nxdKQNcEB6vzgA2E2bvzKIYRuNj7XNJ4S/aRSwKzFtM= github.com/shoenig/go-m1cpu v0.1.6/go.mod h1:1JJMcUBvfNwpq05QDQVAnx3gUHr9IYF7GNg9SUEw2VQ= +github.com/shoenig/test v0.6.4 h1:kVTaSd7WLz5WZ2IaoM0RSzRsUD+m8wRR+5qvntpn4LU= +github.com/shoenig/test v0.6.4/go.mod h1:byHiCGXqrVaflBLAMq/srcZIHynQPQgeyvkvXnjqq0k= github.com/sqweek/dialog v0.0.0-20240226140203-065105509627 h1:2JL2wmHXWIAxDofCK+AdkFi1KEg3dgkefCsm7isADzQ= github.com/sqweek/dialog v0.0.0-20240226140203-065105509627/go.mod h1:/qNPSY91qTz/8TgHEMioAUc6q7+3SOybeKczHMXFcXw= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/tklauser/go-sysconf v0.3.12 h1:0QaGUFOdQaIVdPgfITYzaTegZvdCjmYO52cSFAEVmqU= github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI= github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk= @@ -62,8 +64,6 @@ golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= golang.org/x/crypto v0.47.0 h1:V6e3FRj+n4dbpw86FJ8Fv7XVOql7TEwpHapKoMJ/GO8= golang.org/x/crypto v0.47.0/go.mod h1:ff3Y9VzzKbwSSEzWqJsJVBnWmRwRSHt/6Op5n9bQc4A= -golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b h1:M2rDM6z3Fhozi9O7NWsxAkg/yqS/lQJ6PmkyIV3YP+o= -golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b/go.mod h1:3//PLf8L/X+8b4vuAfHzxeRUl04Adcb341+IGKfnqS8= golang.org/x/image v0.35.0 h1:LKjiHdgMtO8z7Fh18nGY6KDcoEtVfsgLDPeLyguqb7I= golang.org/x/image v0.35.0/go.mod h1:MwPLTVgvxSASsxdLzKrl8BRFuyqMyGhLwmC+TO1Sybk= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= @@ -72,6 +72,7 @@ golang.org/x/mod v0.12.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/mod v0.15.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/mod v0.17.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20191116160921-f9c825593386/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= @@ -80,8 +81,6 @@ golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk= golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4= -golang.org/x/net v0.47.0 h1:Mx+4dIFzqraBXUugkia1OOvlD6LemFo1ALMHjrXDOhY= -golang.org/x/net v0.47.0/go.mod h1:/jNxtkgq5yWUGYkaZGqo27cfGZ1c5Nen03aYrrKpVRU= golang.org/x/net v0.48.0 h1:zyQRTTrjc33Lhh0fBgT/H3oZq9WuvRR5gPC70xpDiQU= golang.org/x/net v0.48.0/go.mod h1:+ndRgGjkh8FGtu1w1FGbEC31if4VrNVMuKTgcAAnQRY= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -101,15 +100,12 @@ golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.38.0 h1:3yZWxaJjBmCWXqhN1qh02AkOnCQ1poK6oF+a7xWL6Gc= -golang.org/x/sys v0.38.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= golang.org/x/sys v0.40.0 h1:DBZZqJ2Rkml6QMQsZywtnjnnGvHza6BTfYFWY9kjEWQ= golang.org/x/sys v0.40.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= golang.org/x/telemetry v0.0.0-20240228155512-f48c80bd79b2/go.mod h1:TeRTkGYfJXctD9OcfyVLyj2J3IxLnKwHJR8f4D8a3YE= @@ -140,13 +136,9 @@ golang.org/x/tools v0.13.0/go.mod h1:HvlwmtVNQAhOuCjW7xxvovg8wbNq7LwfXh/k7wXUl58 golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/cenkalti/backoff.v1 v1.1.0 h1:Arh75ttbsvlpVA7WtVpH4u9h6Zl46xuptxqLxPiSo4Y= +gopkg.in/cenkalti/backoff.v1 v1.1.0/go.mod h1:J6Vskwqd+OMVJl8C33mmtxTBs2gyzfv7UDAkHu8BrjI= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -modernc.org/libc v1.66.10 h1:yZkb3YeLx4oynyR+iUsXsybsX4Ubx7MQlSYEw4yj59A= -modernc.org/libc v1.66.10/go.mod h1:8vGSEwvoUoltr4dlywvHqjtAqHBaw0j1jI7iFBTAr2I= -modernc.org/mathutil v1.7.1 h1:GCZVGXdaN8gTqB1Mf/usp1Y/hSqgI2vAGGP4jZMCxOU= -modernc.org/mathutil v1.7.1/go.mod h1:4p5IwJITfppl0G4sUEDtCr4DthTaT47/N3aT6MhfgJg= -modernc.org/memory v1.11.0 h1:o4QC8aMQzmcwCK3t3Ux/ZHmwFPzE6hf2Y5LbkRs+hbI= -modernc.org/memory v1.11.0/go.mod h1:/JP4VbVC+K5sU2wZi9bHoq2MAkCnrt2r98UGeSK7Mjw= -modernc.org/sqlite v1.41.0 h1:bJXddp4ZpsqMsNN1vS0jWo4IJTZzb8nWpcgvyCFG9Ck= -modernc.org/sqlite v1.41.0/go.mod h1:9fjQZ0mB1LLP0GYrp39oOJXx/I2sxEnZtzCmEQIKvGE= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/backend/record.go b/backend/record.go index 35f4eb4..75c527c 100644 --- a/backend/record.go +++ b/backend/record.go @@ -507,49 +507,6 @@ func recordList(w http.ResponseWriter, r *http.Request) { // SSE (done stream) -func handleDoneStream(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", "text/event-stream; charset=utf-8") - w.Header().Set("Cache-Control", "no-cache, no-store, must-revalidate") - w.Header().Set("Connection", "keep-alive") - w.Header().Set("X-Accel-Buffering", "no") - - flusher, ok := w.(http.Flusher) - if !ok { - http.Error(w, "streaming unsupported", http.StatusInternalServerError) - return - } - - ch := make(chan []byte, 32) - doneHub.add(ch) - defer doneHub.remove(ch) - - fmt.Fprintf(w, ": hello seq=%d ts=%d\n\n", atomic.LoadUint64(&doneSeq), time.Now().UnixMilli()) - flusher.Flush() - - ctx := r.Context() - ping := time.NewTicker(15 * time.Second) - defer ping.Stop() - - for { - select { - case <-ctx.Done(): - return - - case <-ping.C: - fmt.Fprintf(w, ": ping ts=%d\n\n", time.Now().UnixMilli()) - flusher.Flush() - - case b, ok := <-ch: - if !ok { - return - } - fmt.Fprintf(w, "event: doneChanged\n") - fmt.Fprintf(w, "data: %s\n\n", b) - flusher.Flush() - } - } -} - func startRecordingFromRequest(w http.ResponseWriter, r *http.Request) { if !mustMethod(w, r, http.MethodPost) { return diff --git a/backend/routes.go b/backend/routes.go index 6abd3cd..cd3775a 100644 --- a/backend/routes.go +++ b/backend/routes.go @@ -30,7 +30,9 @@ func registerRoutes(mux *http.ServeMux, auth *AuthManager) *ModelStore { api.HandleFunc("/api/cookies", cookiesHandler) - api.HandleFunc("/api/record/done/stream", handleDoneStream) + api.HandleFunc("/api/events/stream", eventsStream) + + api.HandleFunc("/api/record/done/stream", doneStream) api.HandleFunc("/api/perf/stream", perfStreamHandler) api.HandleFunc("/api/status/disk", diskStatusHandler) diff --git a/backend/sse.go b/backend/sse.go index 81df276..d43defc 100644 --- a/backend/sse.go +++ b/backend/sse.go @@ -1,68 +1,120 @@ -// backend/sse.go +// backend\sse.go package main import ( + "bytes" "encoding/json" - "fmt" - "io" "net/http" "sort" - "sync" "sync/atomic" "time" + + "github.com/r3labs/sse/v2" ) -// -------------------- SSE primitives -------------------- +// -------------------- SSE server -------------------- -type sseHub struct { - mu sync.Mutex - clients map[chan []byte]struct{} +type appSSE struct { + server *sse.Server } -func newSSEHub() *sseHub { - return &sseHub{clients: map[chan []byte]struct{}{}} -} +var sseApp *appSSE -func (h *sseHub) add(ch chan []byte) { - h.mu.Lock() - h.clients[ch] = struct{}{} - h.mu.Unlock() -} +func initSSE() { + srv := sse.New() + srv.SplitData = true -func (h *sseHub) remove(ch chan []byte) { - h.mu.Lock() - delete(h.clients, ch) - h.mu.Unlock() - close(ch) -} + // ✅ Nur noch EIN Stream + stream := srv.CreateStream("events") + stream.AutoReplay = false -func (h *sseHub) broadcast(b []byte) { - h.mu.Lock() - defer h.mu.Unlock() - for ch := range h.clients { - // Non-blocking: langsame Clients droppen Updates (holen sich beim nächsten Update wieder ein) - select { - case ch <- b: - default: - } + sseApp = &appSSE{ + server: srv, } + + // Debounced broadcaster (jobs) + go func() { + for range recordJobsNotify { + time.Sleep(40 * time.Millisecond) + for { + select { + case <-recordJobsNotify: + default: + goto SEND + } + } + SEND: + b := jobsSnapshotJSON() + if len(b) > 0 { + publishSSE("jobs", b) + } + } + }() + + // Debounced broadcaster (done changed) + go func() { + for range doneNotify { + time.Sleep(40 * time.Millisecond) + for { + select { + case <-doneNotify: + default: + goto SEND + } + } + SEND: + seq := atomic.AddUint64(&doneSeq, 1) + b, _ := json.Marshal(map[string]any{ + "type": "doneChanged", + "seq": seq, + "ts": time.Now().UnixMilli(), + }) + publishSSE("doneChanged", b) + } + }() + + // Debounced broadcaster (assets) + go func() { + for range assetsNotify { + time.Sleep(80 * time.Millisecond) + for { + select { + case <-assetsNotify: + default: + goto SEND + } + } + SEND: + b := assetsSnapshotJSON() + if len(b) > 0 { + publishSSE("state", b) + } + } + }() } -// -------------------- SSE channels + notify -------------------- +func publishSSE(eventName string, data []byte) { + if sseApp == nil || sseApp.server == nil { + return + } + if len(data) == 0 { + return + } + + sseApp.server.Publish("events", &sse.Event{ + Event: []byte(eventName), + Data: data, + }) +} + +// -------------------- notify channels -------------------- var ( - // done changed stream (Client soll nur "refetch done" machen) - doneHub = newSSEHub() doneNotify = make(chan struct{}, 1) doneSeq uint64 - // record jobs stream - recordJobsHub = newSSEHub() recordJobsNotify = make(chan struct{}, 1) - - // assets task stream - assetsHub = newSSEHub() - assetsNotify = make(chan struct{}, 1) + assetsNotify = make(chan struct{}, 1) ) func notifyDoneChanged() { @@ -86,82 +138,21 @@ func notifyAssetsChanged() { } } -// initSSE startet die Debounce-Broadcaster. -// Wichtig: wird aus main.go init() aufgerufen. -func initSSE() { - // Debounced broadcaster (jobs) - go func() { - for range recordJobsNotify { - time.Sleep(40 * time.Millisecond) - for { - select { - case <-recordJobsNotify: - default: - goto SEND - } - } - SEND: - recordJobsHub.broadcast(jobsSnapshotJSON()) - } - }() +// -------------------- snapshots -------------------- - // Debounced broadcaster (done changed) - go func() { - for range doneNotify { - time.Sleep(40 * time.Millisecond) - for { - select { - case <-doneNotify: - default: - goto SEND - } - } - SEND: - seq := atomic.AddUint64(&doneSeq, 1) - b := []byte(fmt.Sprintf(`{"type":"doneChanged","seq":%d,"ts":%d}`, seq, time.Now().UnixMilli())) - doneHub.broadcast(b) - } - }() - - // ✅ Debounced broadcaster (assets task) - go func() { - for range assetsNotify { - time.Sleep(80 * time.Millisecond) - for { - select { - case <-assetsNotify: - default: - goto SEND - } - } - SEND: - b := assetsSnapshotJSON() - if len(b) > 0 { - assetsHub.broadcast(b) - } - } - }() -} - -// -------------------- SSE: /api/record/stream -------------------- - -// jobsSnapshotJSON liefert die aktuelle (gefilterte) Job-Liste als JSON. -// Greift auf jobs/jobsMu aus main.go zu (gleiches Package). func jobsSnapshotJSON() []byte { jobsMu.Lock() list := make([]*RecordJob, 0, len(jobs)) for _, j := range jobs { - // Hidden-Jobs niemals an die UI senden if j == nil || j.Hidden { continue } c := *j - c.cancel = nil // nicht serialisieren + c.cancel = nil list = append(list, &c) } jobsMu.Unlock() - // neueste zuerst sort.Slice(list, func(i, j int) bool { return list[i].StartedAt.After(list[j].StartedAt) }) @@ -170,126 +161,6 @@ func jobsSnapshotJSON() []byte { return b } -func recordStream(w http.ResponseWriter, r *http.Request) { - if r.Method != http.MethodGet { - http.Error(w, "Nur GET erlaubt", http.StatusMethodNotAllowed) - return - } - - flusher, ok := w.(http.Flusher) - if !ok { - http.Error(w, "Streaming nicht unterstützt", http.StatusInternalServerError) - return - } - - // SSE-Header - h := w.Header() - h.Set("Content-Type", "text/event-stream; charset=utf-8") - h.Set("Cache-Control", "no-cache, no-transform") - h.Set("Connection", "keep-alive") - h.Set("X-Accel-Buffering", "no") // hilfreich bei Reverse-Proxies - - // sofort starten - w.WriteHeader(http.StatusOK) - - writeEvent := func(event string, data []byte) bool { - // returns false => client weg / write error - if event != "" { - if _, err := fmt.Fprintf(w, "event: %s\n", event); err != nil { - return false - } - } - if len(data) > 0 { - if _, err := fmt.Fprintf(w, "data: %s\n\n", data); err != nil { - return false - } - } else { - // empty payload ok (nur terminator) - if _, err := io.WriteString(w, "\n"); err != nil { - return false - } - } - flusher.Flush() - return true - } - - writeComment := func(msg string) bool { - if _, err := fmt.Fprintf(w, ": %s\n\n", msg); err != nil { - return false - } - flusher.Flush() - return true - } - - // Reconnect-Hinweis - if _, err := fmt.Fprintf(w, "retry: 3000\n\n"); err != nil { - return - } - flusher.Flush() - - // Channel + Hub - ch := make(chan []byte, 32) - recordJobsHub.add(ch) - defer recordJobsHub.remove(ch) - - // Initialer Snapshot sofort - if b := jobsSnapshotJSON(); len(b) > 0 { - if !writeEvent("jobs", b) { - return - } - } - - ctx := r.Context() - - // Ping/Keepalive - ping := time.NewTicker(15 * time.Second) - defer ping.Stop() - - for { - select { - case <-ctx.Done(): - return - - case b, ok := <-ch: - if !ok { - return - } - if len(b) == 0 { - continue - } - - // Burst-Coalescing: wenn viele Updates schnell kommen, nur das neueste senden - last := b - drain: - for i := 0; i < 64; i++ { - select { - case nb, ok := <-ch: - if !ok { - return - } - if len(nb) > 0 { - last = nb - } - default: - break drain - } - } - - if !writeEvent("jobs", last) { - return - } - - case <-ping.C: - // Keepalive als Kommentar (stört nicht, hält Verbindungen offen) - if !writeComment(fmt.Sprintf("ping %d", time.Now().Unix())) { - return - } - } - } -} - -// -------------------- SSE: /api/tasks/assets/stream -------------------- - func assetsSnapshotJSON() []byte { assetsTaskMu.Lock() st := assetsTaskState @@ -299,103 +170,49 @@ func assetsSnapshotJSON() []byte { return b } -func assetsStream(w http.ResponseWriter, r *http.Request) { +// -------------------- unified stream handler -------------------- + +func eventsStream(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { http.Error(w, "Nur GET erlaubt", http.StatusMethodNotAllowed) return } - flusher, ok := w.(http.Flusher) - if !ok { - http.Error(w, "Streaming nicht unterstützt", http.StatusInternalServerError) + if sseApp == nil || sseApp.server == nil { + http.Error(w, "SSE nicht initialisiert", http.StatusInternalServerError) return } - h := w.Header() - h.Set("Content-Type", "text/event-stream; charset=utf-8") - h.Set("Cache-Control", "no-cache, no-transform") - h.Set("Connection", "keep-alive") - h.Set("X-Accel-Buffering", "no") + q := r.URL.Query() + q.Set("stream", "events") - w.WriteHeader(http.StatusOK) + r2 := r.Clone(r.Context()) + r2.URL.RawQuery = q.Encode() - // Reconnect-Hinweis - fmt.Fprintf(w, "retry: 3000\n\n") - flusher.Flush() - - writeEvent := func(event string, data []byte) bool { - if event != "" { - if _, err := fmt.Fprintf(w, "event: %s\n", event); err != nil { - return false - } - } - if _, err := fmt.Fprintf(w, "data: %s\n\n", data); err != nil { - return false - } - flusher.Flush() - return true - } - - writeComment := func(msg string) bool { - if _, err := fmt.Fprintf(w, ": %s\n\n", msg); err != nil { - return false - } - flusher.Flush() - return true - } - - ch := make(chan []byte, 32) - assetsHub.add(ch) - defer assetsHub.remove(ch) - - // Initial Snapshot - if b := assetsSnapshotJSON(); len(b) > 0 { - if !writeEvent("state", b) { - return - } - } - - ctx := r.Context() - ping := time.NewTicker(15 * time.Second) - defer ping.Stop() - - for { - select { - case <-ctx.Done(): - return - - case b, ok := <-ch: - if !ok { - return - } - if len(b) == 0 { - continue - } - // coalesce - last := b - drain: - for i := 0; i < 64; i++ { - select { - case nb, ok := <-ch: - if !ok { - return - } - if len(nb) > 0 { - last = nb - } - default: - break drain - } - } - - if !writeEvent("state", last) { - return - } - - case <-ping.C: - if !writeComment(fmt.Sprintf("ping %d", time.Now().Unix())) { - return - } - } - } + sseApp.server.ServeHTTP(w, r2) +} + +// -------------------- optional compatibility handlers -------------------- +// Falls du alte Routen noch kurz behalten willst, zeigen sie einfach +// auf denselben Unified-Stream. + +func recordStream(w http.ResponseWriter, r *http.Request) { + eventsStream(w, r) +} + +func doneStream(w http.ResponseWriter, r *http.Request) { + eventsStream(w, r) +} + +func assetsStream(w http.ResponseWriter, r *http.Request) { + eventsStream(w, r) +} + +// -------------------- optional helper -------------------- + +func publishRawSSE(eventName string, buf *bytes.Buffer) { + if buf == nil { + return + } + publishSSE(eventName, buf.Bytes()) } diff --git a/frontend/src/App.tsx b/frontend/src/App.tsx index 4f839fb..c768951 100644 --- a/frontend/src/App.tsx +++ b/frontend/src/App.tsx @@ -107,6 +107,12 @@ type ChaturbateOnlineRoom = { image_url?: string } +type AutostartState = { + paused?: boolean + pausedByUser?: boolean + pausedByDisk?: boolean +} + function normalizeHttpUrl(raw: string): string | null { let v = (raw ?? '').trim() if (!v) return null @@ -768,6 +774,12 @@ export default function App() { const [pendingWatchedRooms, setPendingWatchedRooms] = useState([]) const [pendingAutoStartByKey, setPendingAutoStartByKey] = useState>({}) + const [autostartState, setAutostartState] = useState({ + paused: false, + pausedByUser: false, + pausedByDisk: false, + }) + // "latest" Refs (damit Clipboard-Loop nicht wegen jobs-Polling neu startet) const busyRef = useRef(false) const cookiesRef = useRef>({}) @@ -1296,129 +1308,158 @@ export default function App() { if (donePage > maxPage) setDonePage(maxPage) }, [doneCount, donePage]) - // jobs SSE / polling (mit "Job gestartet" Toast für Backend-Autostarts) useEffect(() => { if (!authed) return let es: EventSource | null = null - let timer: number | null = null + let fallbackTimer: number | null = null - const stopPoll = () => { - if (timer != null) { - window.clearInterval(timer) - timer = null + const stopFallbackPoll = () => { + if (fallbackTimer != null) { + window.clearInterval(fallbackTimer) + fallbackTimer = null } } - const startPoll = () => { - if (timer != null) return - timer = window.setInterval(() => { + const startFallbackPoll = () => { + if (fallbackTimer != null) return + + fallbackTimer = window.setInterval(() => { if (document.hidden) return + + void loadDoneCount() + void loadJobs() + if (selectedTabRef.current === 'finished') { - void loadDoneCount() - requestFinishedReload('done-stream poll tick') - } else { - void loadDoneCount() + requestFinishedReload('sse fallback poll') } - }, document.hidden ? 60000 : 15000) + }, document.hidden ? 60000 : 5000) } - const lastFireRef = { t: 0 } - let coalesceTimer: number | null = null + const lastDoneFireRef = { t: 0 } + let doneCoalesceTimer: number | null = null - const requestRefresh = () => { + const requestDoneRefresh = () => { const now = Date.now() - const since = now - lastFireRef.t + const since = now - lastDoneFireRef.t - // coalesce bursts if (since < 800) { - if (coalesceTimer != null) return - coalesceTimer = window.setTimeout(() => { - coalesceTimer = null - lastFireRef.t = Date.now() + if (doneCoalesceTimer != null) return + doneCoalesceTimer = window.setTimeout(() => { + doneCoalesceTimer = null + lastDoneFireRef.t = Date.now() + + void loadDoneCount() if (selectedTabRef.current === 'finished') { - void loadDoneCount() - requestFinishedReload('done-stream coalesced requestRefresh') - } else { - void loadDoneCount() + requestFinishedReload('doneChanged coalesced') } }, 900) return } - lastFireRef.t = now + lastDoneFireRef.t = now + void loadDoneCount() if (selectedTabRef.current === 'finished') { - void loadDoneCount() - requestFinishedReload('done-stream requestRefresh') - } else { - void loadDoneCount() + requestFinishedReload('doneChanged') } } - // initial + const applyJobsSnapshot = (data: unknown) => { + if (!Array.isArray(data)) return + const items = data as RecordJob[] + setJobs(items) + jobsRef.current = items + setLastHeaderUpdateAtMs(Date.now()) + } + + const applyAutostartState = (data: unknown) => { + const d = (data ?? {}) as any + setAutostartState({ + paused: Boolean(d?.paused), + pausedByUser: Boolean(d?.pausedByUser), + pausedByDisk: Boolean(d?.pausedByDisk), + }) + } + + // Initialdaten + void loadJobs() void loadDoneCount() - es = new EventSource('/api/record/done/stream') + // Optional initialer Fallback für autostart, bis erstes SSE kommt + void apiJSON('/api/autostart/state', { cache: 'no-store' as any }) + .then((s) => { + setAutostartState({ + paused: Boolean(s?.paused), + pausedByUser: Boolean(s?.pausedByUser), + pausedByDisk: Boolean(s?.pausedByDisk), + }) + }) + .catch(() => {}) + + es = new EventSource('/api/events/stream') es.onopen = () => { - // ✅ sobald SSE stabil da ist: Poll aus - stopPoll() + stopFallbackPoll() } es.onerror = () => { - // ✅ SSE kaputt -> Poll an - startPoll() + startFallbackPoll() } - const onDone = () => requestRefresh() - es.addEventListener('doneChanged', onDone as any) + const onJobs = (ev: MessageEvent) => { + try { + applyJobsSnapshot(JSON.parse(String(ev.data ?? 'null'))) + } catch { + // ignore + } + } + + const onDoneChanged = () => { + requestDoneRefresh() + } + + const onAutostart = (ev: MessageEvent) => { + try { + applyAutostartState(JSON.parse(String(ev.data ?? 'null'))) + } catch { + // ignore + } + } + + es.addEventListener('jobs', onJobs as any) + es.addEventListener('doneChanged', onDoneChanged as any) + es.addEventListener('autostart', onAutostart as any) const onVis = () => { - if (!document.hidden) requestRefresh() + if (document.hidden) return + void loadJobs() + void loadDoneCount() + if (selectedTabRef.current === 'finished') { + requestFinishedReload('visibilitychange') + } } + document.addEventListener('visibilitychange', onVis) return () => { document.removeEventListener('visibilitychange', onVis) - if (coalesceTimer != null) window.clearTimeout(coalesceTimer) - stopPoll() - es?.removeEventListener('doneChanged', onDone as any) - es?.close() + + if (doneCoalesceTimer != null) { + window.clearTimeout(doneCoalesceTimer) + } + + stopFallbackPoll() + + if (es) { + es.removeEventListener('jobs', onJobs as any) + es.removeEventListener('doneChanged', onDoneChanged as any) + es.removeEventListener('autostart', onAutostart as any) + es.close() + } + es = null } - }, [authed, loadDoneCount, requestFinishedReload]) - - useEffect(() => { - if (!authed) return - - // initial - void loadJobs() - - // polling: schneller wenn running-tab offen oder jobs laufen - const t = window.setInterval(() => { - if (document.hidden) return - - const hasRunning = jobsRef.current.some((j) => { - const s = String((j as any)?.status ?? '').toLowerCase() - return s === 'running' || s === 'postwork' - }) - - // wenn Tab "running" offen ODER irgendwas läuft -> häufiger pollen - if (selectedTabRef.current === 'running' || hasRunning) { - void loadJobs() - } - }, document.hidden ? 60000 : 3000) // 3s fühlt sich "live" an - - const onVis = () => { - if (!document.hidden) void loadJobs() - } - document.addEventListener('visibilitychange', onVis) - - return () => { - window.clearInterval(t) - document.removeEventListener('visibilitychange', onVis) - } - }, [authed, loadJobs]) + }, [authed, loadJobs, loadDoneCount, requestFinishedReload]) function isChaturbate(raw: string): boolean { const norm = normalizeHttpUrl(raw) @@ -2576,6 +2617,19 @@ export default function App() { jobs={runningJobs} modelsByKey={modelsByKey} pending={pendingWatchedRooms} + autostartState={autostartState} + onRefreshAutostartState={async () => { + try { + const s = await apiJSON('/api/autostart/state', { cache: 'no-store' as any }) + setAutostartState({ + paused: Boolean(s?.paused), + pausedByUser: Boolean(s?.pausedByUser), + pausedByDisk: Boolean(s?.pausedByDisk), + }) + } catch { + // ignore + } + }} onOpenPlayer={openPlayer} onStopJob={stopJob} onToggleFavorite={handleToggleFavorite} diff --git a/frontend/src/components/ui/Downloads.tsx b/frontend/src/components/ui/Downloads.tsx index 3d844a0..6b61dd5 100644 --- a/frontend/src/components/ui/Downloads.tsx +++ b/frontend/src/components/ui/Downloads.tsx @@ -10,8 +10,6 @@ import type { RecordJob } from '../../types' import ProgressBar from './ProgressBar' import RecordJobActions from './RecordJobActions' import { PauseIcon, PlayIcon } from '@heroicons/react/24/solid' -import { subscribeSSE } from '../../lib/sseSingleton' -import { useRecordJobsSSE } from '../../lib/useRecordJobsSSE' import { useMediaQuery } from '../../lib/useMediaQuery' type PendingWatchedRoom = WaitingModelRow & { @@ -35,6 +33,8 @@ type AutostartState = { type Props = { jobs: RecordJob[] pending?: PendingWatchedRoom[] + autostartState?: AutostartState + onRefreshAutostartState?: () => Promise | void modelsByKey?: Record onOpenPlayer: (job: RecordJob) => void onStopJob: (id: string) => void @@ -780,6 +780,8 @@ function DiskEmergencyBadge() { export default function Downloads({ jobs, pending = [], + autostartState, + onRefreshAutostartState, onOpenPlayer, onStopJob, onToggleFavorite, @@ -789,8 +791,6 @@ export default function Downloads({ modelsByKey = {}, blurPreviews }: Props) { - - const jobsLive = useRecordJobsSSE(jobs) const isDesktop = useMediaQuery('(min-width: 640px)', true) @@ -808,58 +808,32 @@ export default function Downloads({ const refreshWatchedState = useCallback(async () => { try { - const s = await apiJSON('/api/autostart/state', { cache: 'no-store' as any }) - - const nextPaused = Boolean(s?.paused) - const nextPausedByUser = Boolean(s?.pausedByUser) - const nextPausedByDisk = Boolean(s?.pausedByDisk) - - watchedPausedRef.current = nextPaused - watchedPausedByUserRef.current = nextPausedByUser - watchedPausedByDiskRef.current = nextPausedByDisk - - setWatchedPaused(nextPaused) - setWatchedPausedByUser(nextPausedByUser) - setWatchedPausedByDisk(nextPausedByDisk) + await onRefreshAutostartState?.() } catch { - // wenn Endpoint (noch) nicht da ist: nichts kaputt machen + // ignore } - }, []) + }, [onRefreshAutostartState]) useEffect(() => { - // initial: einmal fetchen (schneller first paint) - void refreshWatchedState() + const nextPaused = Boolean(autostartState?.paused) + const nextPausedByUser = Boolean(autostartState?.pausedByUser) + const nextPausedByDisk = Boolean(autostartState?.pausedByDisk) - // danach: Stream (Singleton) - const unsub = subscribeSSE( - '/api/autostart/state/stream', - 'autostart', - (data) => { - const nextPaused = Boolean((data as any)?.paused) - const nextPausedByUser = Boolean((data as any)?.pausedByUser) - const nextPausedByDisk = Boolean((data as any)?.pausedByDisk) + const unchanged = + watchedPausedRef.current === nextPaused && + watchedPausedByUserRef.current === nextPausedByUser && + watchedPausedByDiskRef.current === nextPausedByDisk - const unchanged = - watchedPausedRef.current === nextPaused && - watchedPausedByUserRef.current === nextPausedByUser && - watchedPausedByDiskRef.current === nextPausedByDisk + if (unchanged) return - if (unchanged) return + watchedPausedRef.current = nextPaused + watchedPausedByUserRef.current = nextPausedByUser + watchedPausedByDiskRef.current = nextPausedByDisk - watchedPausedRef.current = nextPaused - watchedPausedByUserRef.current = nextPausedByUser - watchedPausedByDiskRef.current = nextPausedByDisk - - setWatchedPaused(nextPaused) - setWatchedPausedByUser(nextPausedByUser) - setWatchedPausedByDisk(nextPausedByDisk) - } - ) - - return () => { - unsub() - } - }, [refreshWatchedState]) + setWatchedPaused(nextPaused) + setWatchedPausedByUser(nextPausedByUser) + setWatchedPausedByDisk(nextPausedByDisk) + }, [autostartState]) const pauseWatched = useCallback(async () => { if (watchedBusy || watchedPaused) return @@ -932,7 +906,7 @@ export default function Downloads({ const next: Record = {} for (const id of keys) { - const j = jobsLive.find((x) => x.id === id) + const j = jobs.find((x) => x.id === id) if (!j) continue const phaseLower = String((j as any).phase ?? '').trim().toLowerCase() const isBusyPhase = phaseLower !== '' && phaseLower !== 'recording' @@ -942,15 +916,15 @@ export default function Downloads({ } return next }) - }, [jobsLive]) + }, [jobs]) const [nowMs, setNowMs] = useState(() => Date.now()) const hasActive = useMemo(() => { // tickt solange mind. ein Job noch nicht beendet ist - return jobsLive.some((j) => !j.endedAt && j.status === 'running') - }, [jobsLive]) + return jobs.some((j) => !j.endedAt && j.status === 'running') + }, [jobs]) const postworkQueueInfoById = useMemo(() => { const infoById = new Map() @@ -974,7 +948,7 @@ export default function Downloads({ const running: RecordJob[] = [] const queued: RecordJob[] = [] - for (const j of jobsLive) { + for (const j of jobs) { const pw = (j as any)?.postWork if (!pw) continue @@ -1005,7 +979,7 @@ export default function Downloads({ // } return infoById - }, [jobsLive]) + }, [jobs]) const postworkInfoOf = useCallback( (job: RecordJob) => { @@ -1022,7 +996,7 @@ export default function Downloads({ }, [hasActive]) const stoppableIds = useMemo(() => { - return jobsLive + return jobs .filter((j) => { if (isPostworkJob(j)) return false if ((j as any).endedAt) return false @@ -1035,7 +1009,7 @@ export default function Downloads({ return !isStopping }) .map((j) => j.id) - }, [jobsLive, stopRequestedIds]) + }, [jobs, stopRequestedIds]) const columns = useMemo[]>(() => { return [ @@ -1305,7 +1279,7 @@ export default function Downloads({ }, [blurPreviews, markStopRequested, modelsByKey, nowMs, onStopJob, onToggleFavorite, onToggleLike, onToggleWatch, stopRequestedIds, stopInitiatedIds, postworkInfoOf]) const downloadJobRows = useMemo(() => { - const list = jobsLive + const list = jobs .filter((j) => { if (isPostworkJob(j)) return false @@ -1321,10 +1295,10 @@ export default function Downloads({ list.sort((a, b) => addedAtMsOf(b) - addedAtMsOf(a)) return list - }, [jobsLive]) + }, [jobs]) const postworkRows = useMemo(() => { - const list = jobsLive + const list = jobs .filter((j) => { if (!isPostworkJob(j)) return false if (isTerminalStatus((j as any)?.status)) return false @@ -1366,7 +1340,7 @@ export default function Downloads({ }) return list - }, [jobsLive, postworkQueueInfoById]) + }, [jobs, postworkQueueInfoById]) const pendingRows = useMemo(() => { const list = pending.map((p) => ({ kind: 'pending', pending: p }) as const) diff --git a/frontend/src/components/ui/RecorderSettings.tsx b/frontend/src/components/ui/RecorderSettings.tsx index 488aa26..2a81dfd 100644 --- a/frontend/src/components/ui/RecorderSettings.tsx +++ b/frontend/src/components/ui/RecorderSettings.tsx @@ -546,7 +546,7 @@ export default function RecorderSettings({ onAssetsGenerated }: Props) { startingLabel="Starte…" startUrl="/api/tasks/generate-assets" stopUrl="/api/tasks/generate-assets" - sseUrl="/api/tasks/assets/stream" + sseUrl="/api/events/stream" onFinished={onAssetsGenerated} onStart={(ac) => { assetsAbortRef.current = ac diff --git a/frontend/src/lib/useRecordJobsSSE.ts b/frontend/src/lib/useRecordJobsSSE.ts deleted file mode 100644 index 8f6ea68..0000000 --- a/frontend/src/lib/useRecordJobsSSE.ts +++ /dev/null @@ -1,32 +0,0 @@ -'use client' - -import { useEffect, useRef, useState } from 'react' -import type { RecordJob } from '../types' -import { subscribeSSE } from './sseSingleton' - -export function useRecordJobsSSE(initialJobs: RecordJob[]) { - const [jobs, setJobs] = useState(initialJobs) - - // optional: super simple dedupe (hilft, falls Server identische snapshots pusht) - const lastLenRef = useRef(initialJobs.length) - - useEffect(() => { - const unsub = subscribeSSE( - '/api/record/stream', - 'jobs', - (data) => { - if (!Array.isArray(data)) return - // kleine Heuristik gegen “same snapshot” (billig) - if (data.length === lastLenRef.current) { - // trotzdem setzen ist ok; wenn du härter dedupen willst, siehe Stufe 2/3 - } - lastLenRef.current = data.length - setJobs(data) - } - ) - - return () => unsub() - }, []) - - return jobs -}