Skip to content

Commit 597fac4

Browse files
committed
add pubsub
1 parent abcc609 commit 597fac4

File tree

7 files changed

+123
-36
lines changed

7 files changed

+123
-36
lines changed

routers/web/websocket/websocket.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import (
99
"code.gitea.io/gitea/services/websocket"
1010
)
1111

12-
func Init(r *web.Route) {
12+
func Init(r *web.Router) {
1313
m := websocket.Init()
1414

1515
r.Any("/-/ws", func(ctx *context.Context) {

services/pubsub/memory.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package pubsub
55

66
import (
77
"context"
8+
"errors"
89
"sync"
910
)
1011

@@ -21,19 +22,21 @@ func NewMemory() Broker {
2122
}
2223
}
2324

24-
func (p *Memory) Publish(_ context.Context, message Message) {
25+
func (p *Memory) Publish(_ context.Context, _topic string, data []byte) error {
2526
p.Lock()
2627

27-
topic, ok := p.topics[message.Topic]
28+
topic, ok := p.topics[_topic]
2829
if !ok {
2930
p.Unlock()
30-
return
31+
return errors.New("topic not found")
3132
}
3233

3334
for s := range topic {
34-
go (*s)(message)
35+
go (*s)(data)
3536
}
3637
p.Unlock()
38+
39+
return nil
3740
}
3841

3942
func (p *Memory) Subscribe(c context.Context, topic string, subscriber Subscriber) {

services/pubsub/memory_test.go

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,9 @@ import (
1414

1515
func TestPubsub(t *testing.T) {
1616
var (
17-
wg sync.WaitGroup
18-
19-
testMessage = Message{
20-
Data: []byte("test"),
21-
Topic: "hello-world",
22-
}
17+
wg sync.WaitGroup
18+
testTopic = "hello-world"
19+
testMessage = []byte("test")
2320
)
2421

2522
ctx, cancel := context.WithCancelCause(
@@ -28,18 +25,18 @@ func TestPubsub(t *testing.T) {
2825

2926
broker := NewMemory()
3027
go func() {
31-
broker.Subscribe(ctx, "hello-world", func(message Message) { assert.Equal(t, testMessage, message); wg.Done() })
28+
broker.Subscribe(ctx, testTopic, func(message []byte) { assert.Equal(t, testMessage, message); wg.Done() })
3229
}()
3330
go func() {
34-
broker.Subscribe(ctx, "hello-world", func(_ Message) { wg.Done() })
31+
broker.Subscribe(ctx, testTopic, func(_ []byte) { wg.Done() })
3532
}()
3633

3734
// Wait a bit for the subscriptions to be registered
3835
<-time.After(100 * time.Millisecond)
3936

4037
wg.Add(2)
4138
go func() {
42-
broker.Publish(ctx, testMessage)
39+
broker.Publish(ctx, testTopic, testMessage)
4340
}()
4441

4542
wg.Wait()

services/pubsub/notifier.go

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
// Copyright 2024 The Gitea Authors. All rights reserved.
2+
// SPDX-License-Identifier: MIT
3+
4+
package pubsub
5+
6+
import (
7+
"context"
8+
9+
issues_model "code.gitea.io/gitea/models/issues"
10+
user_model "code.gitea.io/gitea/models/user"
11+
"code.gitea.io/gitea/modules/json"
12+
"code.gitea.io/gitea/modules/log"
13+
notify_service "code.gitea.io/gitea/services/notify"
14+
)
15+
16+
func Init() Broker {
17+
broker := NewMemory() // TODO: allow for other pubsub implementations
18+
notify_service.RegisterNotifier(newNotifier(broker))
19+
return broker
20+
}
21+
22+
type pubsubNotifier struct {
23+
notify_service.NullNotifier
24+
broker Broker
25+
}
26+
27+
// NewNotifier create a new pubsubNotifier notifier
28+
func newNotifier(broker Broker) notify_service.Notifier {
29+
return &pubsubNotifier{
30+
broker: broker,
31+
}
32+
}
33+
34+
func (p *pubsubNotifier) DeleteComment(ctx context.Context, doer *user_model.User, c *issues_model.Comment) {
35+
data := struct {
36+
Function string
37+
Comment *issues_model.Comment
38+
Doer *user_model.User
39+
}{
40+
Function: "DeleteComment",
41+
Comment: c,
42+
Doer: doer,
43+
}
44+
45+
msg, err := json.Marshal(data)
46+
if err != nil {
47+
log.Error("Failed to marshal message: %v", err)
48+
return
49+
}
50+
51+
err = p.broker.Publish(ctx, "notify", msg)
52+
if err != nil {
53+
log.Error("Failed to publish message: %v", err)
54+
return
55+
}
56+
}

services/pubsub/types.go

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,19 +5,10 @@ package pubsub
55

66
import "context"
77

8-
// Message defines a published message.
9-
type Message struct {
10-
// Data is the actual data in the entry.
11-
Data []byte `json:"data"`
12-
13-
// Topic is the topic of the message.
14-
Topic string `json:"topic"`
15-
}
16-
178
// Subscriber receives published messages.
18-
type Subscriber func(Message)
9+
type Subscriber func(data []byte)
1910

2011
type Broker interface {
21-
Publish(c context.Context, message Message)
12+
Publish(c context.Context, topic string, data []byte) error
2213
Subscribe(c context.Context, topic string, subscriber Subscriber)
2314
}

services/websocket/notifier.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,21 +7,22 @@ import (
77
"code.gitea.io/gitea/modules/log"
88
"code.gitea.io/gitea/modules/templates"
99
notify_service "code.gitea.io/gitea/services/notify"
10-
"code.gitea.io/gitea/services/pubsub"
1110
"github.com/olahol/melody"
1211
)
1312

13+
var _ notify_service.Notifier = &websocketNotifier{}
14+
1415
type websocketNotifier struct {
1516
notify_service.NullNotifier
16-
m *melody.Melody
17-
rnd *templates.HTMLRender
17+
melody *melody.Melody
18+
htmlRenderer *templates.HTMLRender
1819
}
1920

2021
// NewNotifier create a new webhooksNotifier notifier
21-
func newNotifier(m *melody.Melody, pubsub pubsub.Broker) notify_service.Notifier {
22+
func newNotifier(m *melody.Melody) notify_service.Notifier {
2223
return &websocketNotifier{
23-
m: m,
24-
rnd: templates.HTMLRenderer(),
24+
melody: m,
25+
htmlRenderer: templates.HTMLRenderer(),
2526
}
2627
}
2728

@@ -31,7 +32,7 @@ func newNotifier(m *melody.Melody, pubsub pubsub.Broker) notify_service.Notifier
3132
var htmxRemoveElement = "<div hx-swap-oob=\"delete:%s\"></div>"
3233

3334
func (n *websocketNotifier) filterSessions(fn func(*melody.Session, *sessionData) bool) []*melody.Session {
34-
sessions, err := n.m.Sessions()
35+
sessions, err := n.melody.Sessions()
3536
if err != nil {
3637
log.Error("Failed to get sessions: %v", err)
3738
return nil

services/websocket/websocket.go

Lines changed: 43 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,14 @@
44
package websocket
55

66
import (
7+
"context"
8+
9+
issues_model "code.gitea.io/gitea/models/issues"
10+
user_model "code.gitea.io/gitea/models/user"
11+
"code.gitea.io/gitea/modules/graceful"
712
"code.gitea.io/gitea/modules/json"
8-
"code.gitea.io/gitea/services/context"
9-
notify_service "code.gitea.io/gitea/services/notify"
13+
"code.gitea.io/gitea/modules/log"
14+
gitea_context "code.gitea.io/gitea/services/context"
1015
"code.gitea.io/gitea/services/pubsub"
1116

1217
"github.com/mitchellh/mapstructure"
@@ -31,12 +36,46 @@ func Init() *melody.Melody {
3136
m.HandleDisconnect(handleDisconnect)
3237

3338
broker := pubsub.NewMemory() // TODO: allow for other pubsub implementations
34-
notify_service.RegisterNotifier(newNotifier(m, broker))
39+
notifier := newNotifier(m)
40+
41+
ctx, unsubscribe := context.WithCancel(context.Background())
42+
graceful.GetManager().RunAtShutdown(ctx, func() {
43+
unsubscribe()
44+
})
45+
46+
broker.Subscribe(ctx, "notify", func(msg []byte) {
47+
data := struct {
48+
Function string
49+
}{}
50+
51+
err := json.Unmarshal(msg, &data)
52+
if err != nil {
53+
log.Error("Failed to unmarshal message: %v", err)
54+
return
55+
}
56+
57+
switch data.Function {
58+
case "DeleteComment":
59+
var data struct {
60+
Comment *issues_model.Comment
61+
Doer *user_model.User
62+
}
63+
64+
err := json.Unmarshal(msg, &data)
65+
if err != nil {
66+
log.Error("Failed to unmarshal message: %v", err)
67+
return
68+
}
69+
70+
notifier.DeleteComment(context.Background(), data.Doer, data.Comment)
71+
}
72+
})
73+
3574
return m
3675
}
3776

3877
func handleConnect(s *melody.Session) {
39-
ctx := context.GetWebContext(s.Request)
78+
ctx := gitea_context.GetWebContext(s.Request)
4079

4180
data := &sessionData{}
4281
if ctx.IsSigned {

0 commit comments

Comments
 (0)