Skip to content

Commit aaaffab

Browse files
gregdelPouuleT
authored andcommitted
Add SSE events on the http server and polochonfs
1 parent 85a4dfb commit aaaffab

File tree

15 files changed

+483
-2
lines changed

15 files changed

+483
-2
lines changed

app/app/app.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,9 @@ func (a *App) init() error {
116116
}
117117

118118
// Add the http server
119-
a.subApps = append(a.subApps, server.New(config, library, authManager))
119+
srv := server.New(config, library, authManager)
120+
config.Notifiers = append(config.Notifiers, srv.Hub())
121+
a.subApps = append(a.subApps, srv)
120122
}
121123

122124
log.Debug("app configuration loaded")

app/server/http.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ type Server struct {
2929
library *library.Library
3030
authManager *auth.Manager
3131
gracefulServer *http.Server
32+
hub *sseHub
3233
log *logrus.Entry
3334
render *render.Render
3435
}
@@ -40,6 +41,7 @@ func New(config *configuration.Config, vs *library.Library, auth *auth.Manager)
4041
config: config,
4142
library: vs,
4243
authManager: auth,
44+
hub: newSSEHub(),
4345
render: render.New(),
4446
}
4547
}
@@ -66,6 +68,11 @@ func (s *Server) Stop(log *logrus.Entry) {
6668
}
6769
}
6870

71+
// Hub returns the SSE hub as a Notifier
72+
func (s *Server) Hub() polochon.Notifier {
73+
return s.hub
74+
}
75+
6976
func (s *Server) wishlist(w http.ResponseWriter, r *http.Request) {
7077
log := s.logEntry(r)
7178
log.Infof("getting wishlist")

app/server/library.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@ func (s *Server) libraryRefresh(w http.ResponseWriter, req *http.Request) {
1111
if err := s.library.RebuildIndex(log); err != nil {
1212
log.WithField("function", "rebuild_index").Error(err)
1313
s.renderError(w, req, err)
14+
return
1415
}
1516

17+
s.hub.broadcast()
1618
s.renderOK(w, nil)
1719
}

app/server/movies.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,5 +102,6 @@ func (s *Server) deleteMovie(w http.ResponseWriter, req *http.Request) {
102102
return
103103
}
104104

105+
s.hub.broadcast()
105106
s.renderOK(w, nil)
106107
}

app/server/routes.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,12 @@ func (s *Server) httpServer(log *logrus.Entry) *http.Server {
192192
handler: s.serveEpisodeSubtitle,
193193
excluded: !s.config.HTTPServer.ServeFiles,
194194
},
195+
{
196+
name: "Events",
197+
path: "/events",
198+
methods: "GET",
199+
handler: s.events,
200+
},
195201
{
196202
name: "Wishlist",
197203
path: "/wishlist",

app/server/seasons.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,5 +98,6 @@ func (s *Server) deleteSeason(w http.ResponseWriter, req *http.Request) {
9898
return
9999
}
100100

101+
s.hub.broadcast()
101102
s.renderOK(w, nil)
102103
}

app/server/shows.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ func (s *Server) deleteShow(w http.ResponseWriter, req *http.Request) {
105105
return
106106
}
107107

108+
s.hub.broadcast()
108109
s.renderOK(w, nil)
109110
}
110111

@@ -152,6 +153,7 @@ func (s *Server) deleteEpisode(w http.ResponseWriter, req *http.Request) {
152153
return
153154
}
154155

156+
s.hub.broadcast()
155157
s.renderOK(w, nil)
156158
}
157159

app/server/sse.go

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
package server
2+
3+
import (
4+
"io"
5+
"net/http"
6+
"sync"
7+
8+
polochon "github.com/odwrtw/polochon/lib"
9+
"github.com/sirupsen/logrus"
10+
)
11+
12+
const sseModuleName = "sse"
13+
14+
// Compile-time assertion.
15+
var _ polochon.Notifier = (*sseHub)(nil)
16+
17+
type sseHub struct {
18+
mu sync.Mutex
19+
clients map[chan struct{}]struct{}
20+
}
21+
22+
func newSSEHub() *sseHub {
23+
return &sseHub{
24+
clients: make(map[chan struct{}]struct{}),
25+
}
26+
}
27+
28+
// Module interface.
29+
func (h *sseHub) Init(_ []byte) error { return nil }
30+
func (h *sseHub) Name() string { return sseModuleName }
31+
func (h *sseHub) Status() (polochon.ModuleStatus, error) { return polochon.StatusOK, nil }
32+
33+
// Notifier interface.
34+
func (h *sseHub) Notify(_ any, _ *logrus.Entry) error {
35+
h.broadcast()
36+
return nil
37+
}
38+
39+
func (h *sseHub) subscribe() chan struct{} {
40+
ch := make(chan struct{}, 1)
41+
h.mu.Lock()
42+
h.clients[ch] = struct{}{}
43+
h.mu.Unlock()
44+
return ch
45+
}
46+
47+
func (h *sseHub) unsubscribe(ch chan struct{}) {
48+
h.mu.Lock()
49+
delete(h.clients, ch)
50+
h.mu.Unlock()
51+
close(ch)
52+
}
53+
54+
func (h *sseHub) broadcast() {
55+
h.mu.Lock()
56+
defer h.mu.Unlock()
57+
for ch := range h.clients {
58+
// Non-blocking send to coalesce duplicate events.
59+
select {
60+
case ch <- struct{}{}:
61+
default:
62+
}
63+
}
64+
}
65+
66+
func (s *Server) events(w http.ResponseWriter, r *http.Request) {
67+
flusher, ok := w.(http.Flusher)
68+
if !ok {
69+
http.Error(w, "streaming not supported", http.StatusInternalServerError)
70+
return
71+
}
72+
73+
ch := s.hub.subscribe()
74+
defer s.hub.unsubscribe(ch)
75+
76+
w.Header().Set("Content-Type", "text/event-stream")
77+
w.Header().Set("Cache-Control", "no-cache")
78+
w.Header().Set("Connection", "keep-alive")
79+
flusher.Flush()
80+
81+
for {
82+
select {
83+
case <-ch:
84+
_, _ = io.WriteString(w, "data:\n\n")
85+
flusher.Flush()
86+
case <-r.Context().Done():
87+
return
88+
}
89+
}
90+
}

app/server/sse_test.go

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
package server
2+
3+
import (
4+
"bufio"
5+
"net/http"
6+
"net/http/httptest"
7+
"testing"
8+
"time"
9+
)
10+
11+
func TestSSEHubSubscribeBroadcast(t *testing.T) {
12+
hub := newSSEHub()
13+
14+
ch1 := hub.subscribe()
15+
ch2 := hub.subscribe()
16+
17+
hub.broadcast()
18+
19+
select {
20+
case <-ch1:
21+
case <-time.After(time.Second):
22+
t.Fatal("ch1 did not receive broadcast")
23+
}
24+
select {
25+
case <-ch2:
26+
case <-time.After(time.Second):
27+
t.Fatal("ch2 did not receive broadcast")
28+
}
29+
30+
hub.unsubscribe(ch1)
31+
hub.broadcast()
32+
33+
// ch1 should be closed
34+
select {
35+
case _, ok := <-ch1:
36+
if ok {
37+
t.Fatal("ch1 should be closed")
38+
}
39+
default:
40+
t.Fatal("ch1 should be closed and readable")
41+
}
42+
43+
// ch2 should still receive
44+
select {
45+
case <-ch2:
46+
case <-time.After(time.Second):
47+
t.Fatal("ch2 did not receive broadcast after ch1 unsubscribed")
48+
}
49+
50+
hub.unsubscribe(ch2)
51+
}
52+
53+
func TestSSEHubCoalescing(t *testing.T) {
54+
hub := newSSEHub()
55+
ch := hub.subscribe()
56+
defer hub.unsubscribe(ch)
57+
58+
// Multiple broadcasts before consuming should coalesce into one.
59+
hub.broadcast()
60+
hub.broadcast()
61+
hub.broadcast()
62+
63+
select {
64+
case <-ch:
65+
case <-time.After(time.Second):
66+
t.Fatal("did not receive coalesced broadcast")
67+
}
68+
69+
// Channel should be empty now.
70+
select {
71+
case <-ch:
72+
t.Fatal("should not receive a second event (coalescing)")
73+
default:
74+
}
75+
}
76+
77+
func TestSSEHandler(t *testing.T) {
78+
hub := newSSEHub()
79+
s := &Server{hub: hub}
80+
81+
ts := httptest.NewServer(http.HandlerFunc(s.events))
82+
defer ts.Close()
83+
84+
resp, err := http.Get(ts.URL)
85+
if err != nil {
86+
t.Fatalf("failed to connect: %v", err)
87+
}
88+
defer func() { _ = resp.Body.Close() }()
89+
90+
if ct := resp.Header.Get("Content-Type"); ct != "text/event-stream" {
91+
t.Fatalf("expected Content-Type text/event-stream, got %q", ct)
92+
}
93+
94+
hub.broadcast()
95+
96+
scanner := bufio.NewScanner(resp.Body)
97+
for scanner.Scan() {
98+
if scanner.Text() == "data:" {
99+
return
100+
}
101+
}
102+
t.Fatal("did not receive SSE data line")
103+
}

app/server/subtitles.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ func (s *Server) updateSubtitle(v polochon.Video, w http.ResponseWriter, r *http
9898
return
9999
}
100100

101+
s.hub.broadcast()
101102
s.renderOK(w, sub)
102103
}
103104

@@ -131,6 +132,7 @@ func (s *Server) uploadSubtitle(v polochon.Video, w http.ResponseWriter, r *http
131132
return
132133
}
133134

135+
s.hub.broadcast()
134136
s.renderOK(w, sub)
135137
}
136138

0 commit comments

Comments
 (0)