Skip to content

Commit 7efef85

Browse files
committed
fix memory pubsub
1 parent 5ceae12 commit 7efef85

File tree

2 files changed

+15
-22
lines changed

2 files changed

+15
-22
lines changed

services/pubsub/memory.go

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -49,14 +49,17 @@ func (p *Memory) Subscribe(c context.Context, topic string, subscriber Subscribe
4949
p.topics[topic][&subscriber] = struct{}{}
5050
p.Unlock()
5151

52-
// Wait for context to be done
53-
<-c.Done()
52+
// Unsubscribe when context is done
53+
go func() {
54+
// Wait for context to be done
55+
<-c.Done()
5456

55-
// Unsubscribe
56-
p.Lock()
57-
delete(p.topics[topic], &subscriber)
58-
if len(p.topics[topic]) == 0 {
59-
delete(p.topics, topic)
60-
}
61-
p.Unlock()
57+
// Unsubscribe
58+
p.Lock()
59+
delete(p.topics[topic], &subscriber)
60+
if len(p.topics[topic]) == 0 {
61+
delete(p.topics, topic)
62+
}
63+
p.Unlock()
64+
}()
6265
}

services/pubsub/memory_test.go

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"context"
88
"sync"
99
"testing"
10-
"time"
1110

1211
"github.com/stretchr/testify/assert"
1312
)
@@ -24,20 +23,11 @@ func TestPubsub(t *testing.T) {
2423
)
2524

2625
broker := NewMemory()
27-
go func() {
28-
broker.Subscribe(ctx, testTopic, func(message []byte) { assert.Equal(t, testMessage, message); wg.Done() })
29-
}()
30-
go func() {
31-
broker.Subscribe(ctx, testTopic, func(_ []byte) { wg.Done() })
32-
}()
33-
34-
// Wait a bit for the subscriptions to be registered
35-
<-time.After(100 * time.Millisecond)
26+
broker.Subscribe(ctx, testTopic, func(message []byte) { assert.Equal(t, testMessage, message); wg.Done() })
27+
broker.Subscribe(ctx, testTopic, func(_ []byte) { wg.Done() })
3628

3729
wg.Add(2)
38-
go func() {
39-
broker.Publish(ctx, testTopic, testMessage)
40-
}()
30+
broker.Publish(ctx, testTopic, testMessage)
4131

4232
wg.Wait()
4333
cancel(nil)

0 commit comments

Comments
 (0)