Skip to content

Commit 372faaa

Browse files
committed
add melody pubsub
1 parent bb4443d commit 372faaa

File tree

6 files changed

+134
-4
lines changed

6 files changed

+134
-4
lines changed

.air.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@ tmp_dir = ".air"
55
cmd = "make --no-print-directory backend"
66
bin = "gitea"
77
delay = 1000
8-
include_ext = ["go", "tmpl"]
8+
include_ext = ["go", "tmpl", "css", "js"]
99
include_file = ["main.go"]
10-
include_dir = ["cmd", "models", "modules", "options", "routers", "services"]
10+
include_dir = ["cmd", "models", "modules", "options", "public", "routers", "services", "templates"]
1111
exclude_dir = ["modules/git/tests", "services/gitdiff/testdata", "modules/avatar/testdata", "models/fixtures", "models/migrations/fixtures", "modules/migration/file_format_testdata", "modules/avatar/identicon/testdata"]
1212
exclude_regex = ["_test.go$", "_gen.go$"]
1313
stop_on_error = true

services/pubsub/memory.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
// Copyright 2024 The Gitea Authors. All rights reserved.
2+
// SPDX-License-Identifier: MIT
3+
4+
package pubsub
5+
6+
import (
7+
"context"
8+
"sync"
9+
)
10+
11+
type Memory struct {
12+
sync.Mutex
13+
14+
topics map[string]map[*Subscriber]struct{}
15+
}
16+
17+
// New creates an in-memory publisher.
18+
func NewMemory() Broker {
19+
return &Memory{
20+
topics: make(map[string]map[*Subscriber]struct{}),
21+
}
22+
}
23+
24+
func (p *Memory) Publish(_ context.Context, message Message) {
25+
p.Lock()
26+
27+
topic, ok := p.topics[message.Topic]
28+
if !ok {
29+
p.Unlock()
30+
return
31+
}
32+
33+
for s := range topic {
34+
go (*s)(message)
35+
}
36+
p.Unlock()
37+
}
38+
39+
func (p *Memory) Subscribe(c context.Context, topic string, subscriber Subscriber) {
40+
// Subscribe
41+
p.Lock()
42+
_, ok := p.topics[topic]
43+
if !ok {
44+
p.topics[topic] = make(map[*Subscriber]struct{})
45+
}
46+
p.topics[topic][&subscriber] = struct{}{}
47+
p.Unlock()
48+
49+
// Wait for context to be done
50+
<-c.Done()
51+
52+
// Unsubscribe
53+
p.Lock()
54+
delete(p.topics[topic], &subscriber)
55+
if len(p.topics[topic]) == 0 {
56+
delete(p.topics, topic)
57+
}
58+
p.Unlock()
59+
}

services/pubsub/memory_test.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
// Copyright 2024 The Gitea Authors. All rights reserved.
2+
// SPDX-License-Identifier: MIT
3+
4+
package pubsub
5+
6+
import (
7+
"context"
8+
"sync"
9+
"testing"
10+
"time"
11+
12+
"github.com/stretchr/testify/assert"
13+
)
14+
15+
func TestPubsub(t *testing.T) {
16+
var (
17+
wg sync.WaitGroup
18+
19+
testMessage = Message{
20+
Data: []byte("test"),
21+
Topic: "hello-world",
22+
}
23+
)
24+
25+
ctx, cancel := context.WithCancelCause(
26+
context.Background(),
27+
)
28+
29+
broker := NewMemory()
30+
go func() {
31+
broker.Subscribe(ctx, "hello-world", func(message Message) { assert.Equal(t, testMessage, message); wg.Done() })
32+
}()
33+
go func() {
34+
broker.Subscribe(ctx, "hello-world", func(_ Message) { wg.Done() })
35+
}()
36+
37+
// Wait a bit for the subscriptions to be registered
38+
<-time.After(100 * time.Millisecond)
39+
40+
wg.Add(2)
41+
go func() {
42+
broker.Publish(ctx, testMessage)
43+
}()
44+
45+
wg.Wait()
46+
cancel(nil)
47+
}

services/pubsub/types.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package pubsub
2+
3+
import "context"
4+
5+
// Message defines a published message.
6+
type Message struct {
7+
// Data is the actual data in the entry.
8+
Data []byte `json:"data"`
9+
10+
// Topic is the topic of the message.
11+
Topic string `json:"topic"`
12+
}
13+
14+
// Subscriber receives published messages.
15+
type Subscriber func(Message)
16+
17+
type Broker interface {
18+
Publish(c context.Context, message Message)
19+
Subscribe(c context.Context, topic string, subscriber Subscriber)
20+
}

services/websocket/notifier.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"code.gitea.io/gitea/modules/log"
88
"code.gitea.io/gitea/modules/templates"
99
notify_service "code.gitea.io/gitea/services/notify"
10+
"code.gitea.io/gitea/services/pubsub"
1011
"github.com/olahol/melody"
1112
)
1213

@@ -17,7 +18,7 @@ type websocketNotifier struct {
1718
}
1819

1920
// NewNotifier create a new webhooksNotifier notifier
20-
func newNotifier(m *melody.Melody) notify_service.Notifier {
21+
func newNotifier(m *melody.Melody, pubsub pubsub.Broker) notify_service.Notifier {
2122
return &websocketNotifier{
2223
m: m,
2324
rnd: templates.HTMLRenderer(),

services/websocket/websocket.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"code.gitea.io/gitea/modules/json"
88
"code.gitea.io/gitea/services/context"
99
notify_service "code.gitea.io/gitea/services/notify"
10+
"code.gitea.io/gitea/services/pubsub"
1011

1112
"github.com/mitchellh/mapstructure"
1213
"github.com/olahol/melody"
@@ -28,7 +29,9 @@ func Init() *melody.Melody {
2829
m.HandleConnect(handleConnect)
2930
m.HandleMessage(handleMessage)
3031
m.HandleDisconnect(handleDisconnect)
31-
notify_service.RegisterNotifier(newNotifier(m))
32+
33+
broker := pubsub.NewMemory() // TODO: allow for other pubsub implementations
34+
notify_service.RegisterNotifier(newNotifier(m, broker))
3235
return m
3336
}
3437

0 commit comments

Comments
 (0)