Skip to content

Commit 91e775a

Browse files
committed
Improve Feed cancelations and add docstrings
1 parent 02f4c11 commit 91e775a

File tree

4 files changed

+128
-86
lines changed

4 files changed

+128
-86
lines changed

.github/workflows/go.yml

Lines changed: 0 additions & 52 deletions
This file was deleted.

feed.go

Lines changed: 61 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -6,28 +6,35 @@
66
package feed
77

88
import (
9-
"context"
109
"sync"
1110
)
1211

12+
// Feed defines a set of subscriptions per topic T which receive messages sent
13+
// to the Feed.
1314
type Feed[T comparable, M any] struct {
14-
channels map[T][]chan M
15-
mu sync.RWMutex
15+
subscriptions map[T][]*subscription[M]
16+
mu sync.RWMutex
1617

1718
wg sync.WaitGroup
1819
quit chan struct{}
1920
quitOnce sync.Once
2021
}
2122

23+
// NewFeed constructs new Feed with topic type T and message type M.
2224
func NewFeed[T comparable, M any]() *Feed[T, M] {
2325
return &Feed[T, M]{
24-
channels: make(map[T][]chan M, 1),
25-
quit: make(chan struct{}),
26+
subscriptions: make(map[T][]*subscription[M]),
27+
quit: make(chan struct{}),
2628
}
2729
}
2830

31+
// Subscribe returns a channel from which messages M, that are sent to the Feed
32+
// on the same topic, can be read from. Message delivery is guaranteed, so the
33+
// channel should be read to avoid possible high number of goroutines. After
34+
// cancel function call, all resources ang goroutines are released even if not
35+
// all messages are read from channel.
2936
func (f *Feed[T, M]) Subscribe(topic T) (c <-chan M, cancel func()) {
30-
channel := make(chan M)
37+
channel := make(chan M, 1) // buffer 1 not to block on Send method
3138

3239
select {
3340
case <-f.quit:
@@ -39,24 +46,27 @@ func (f *Feed[T, M]) Subscribe(topic T) (c <-chan M, cancel func()) {
3946
f.mu.Lock()
4047
defer f.mu.Unlock()
4148

42-
f.channels[topic] = append(f.channels[topic], channel)
49+
s := newSubscription(channel)
4350

44-
return channel, func() { f.unsubscribe(topic, channel) }
51+
f.subscriptions[topic] = append(f.subscriptions[topic], s)
52+
53+
return channel, func() { f.unsubscribe(topic, s) }
4554
}
4655

47-
func (f *Feed[T, M]) unsubscribe(topic T, c <-chan M) {
56+
func (f *Feed[T, M]) unsubscribe(topic T, s *subscription[M]) {
4857
f.mu.Lock()
4958
defer f.mu.Unlock()
5059

51-
for i, ch := range f.channels[topic] {
52-
if ch == c {
53-
f.channels[topic] = append(f.channels[topic][:i], f.channels[topic][i+1:]...)
54-
close(ch)
60+
for i, sub := range f.subscriptions[topic] {
61+
if sub == s {
62+
f.subscriptions[topic] = append(f.subscriptions[topic][:i], f.subscriptions[topic][i+1:]...)
63+
s.close()
5564
}
5665
}
5766
}
5867

59-
func (f *Feed[T, M]) Shutdown(ctx context.Context) error {
68+
// Close terminates all subscriptions and releases acquired resources.
69+
func (f *Feed[T, M]) Close() error {
6070
f.quitOnce.Do(func() {
6171
close(f.quit)
6272
})
@@ -66,46 +76,48 @@ func (f *Feed[T, M]) Shutdown(ctx context.Context) error {
6676
close(done)
6777
}()
6878

69-
select {
70-
case <-ctx.Done():
71-
return ctx.Err()
72-
case <-done:
73-
}
74-
7579
f.mu.Lock()
7680
defer f.mu.Unlock()
7781

78-
for topic, channels := range f.channels {
79-
for _, c := range channels {
80-
close(c)
82+
for topic, subscriptions := range f.subscriptions {
83+
for _, s := range subscriptions {
84+
s.close()
8185
}
82-
f.channels[topic] = nil
86+
f.subscriptions[topic] = nil
8387
}
8488

8589
return nil
8690
}
8791

92+
// Send sends a message to all sunscribed channels to topic. Messages will be
93+
// delivered to subscribers when each of them is ready to receive it, without
94+
// blocking this method call. The returned integer is the number of subscribers
95+
// that should receive the message.
8896
func (f *Feed[T, M]) Send(topic T, message M) (n int) {
8997
f.mu.RLock()
9098
defer f.mu.RUnlock()
9199

92-
for _, c := range f.channels[topic] {
100+
for _, s := range f.subscriptions[topic] {
93101
// try to send message to the channel
94102
select {
95-
case c <- message:
103+
case s.channel <- message:
104+
case <-s.quit:
105+
return
96106
case <-f.quit:
97107
return
98108
default:
99109
// if channel is blocked,
100110
// wait in goroutine to send the message
101-
c := c
111+
s := s
102112

103113
f.wg.Add(1)
104114
go func() {
105115
defer f.wg.Done()
106116

107117
select {
108-
case c <- message:
118+
case s.channel <- message:
119+
case <-s.quit:
120+
return
109121
case <-f.quit:
110122
return
111123
}
@@ -117,3 +129,24 @@ func (f *Feed[T, M]) Send(topic T, message M) (n int) {
117129

118130
return n
119131
}
132+
133+
type subscription[M any] struct {
134+
channel chan M
135+
quitOnce sync.Once
136+
quit chan struct{}
137+
wg sync.WaitGroup
138+
}
139+
140+
func newSubscription[M any](channel chan M) *subscription[M] {
141+
return &subscription[M]{
142+
channel: channel,
143+
quit: make(chan struct{}),
144+
}
145+
}
146+
147+
func (s *subscription[M]) close() {
148+
s.quitOnce.Do(func() {
149+
close(s.quit)
150+
})
151+
s.wg.Wait()
152+
}

feed_test.go

Lines changed: 56 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
package feed_test
77

88
import (
9-
"context"
109
"reflect"
1110
"sync"
1211
"testing"
@@ -16,7 +15,7 @@ import (
1615

1716
func TestFeed_singleMessage(t *testing.T) {
1817
f := feed.NewFeed[string, int]()
19-
defer f.Shutdown(context.Background())
18+
defer f.Close()
2019

2120
got := make([]int, 0)
2221

@@ -43,7 +42,7 @@ func TestFeed_singleMessage(t *testing.T) {
4342

4443
func TestFeed_twoMessages(t *testing.T) {
4544
f := feed.NewFeed[string, int]()
46-
defer f.Shutdown(context.Background())
45+
defer f.Close()
4746

4847
got := make([]int, 0)
4948

@@ -77,7 +76,7 @@ func TestFeed_twoMessages(t *testing.T) {
7776

7877
func TestFeed_multipleSubscriptions(t *testing.T) {
7978
f := feed.NewFeed[string, int]()
80-
defer f.Shutdown(context.Background())
79+
defer f.Close()
8180

8281
got1 := make([]int, 0)
8382
c1 := newCond()
@@ -147,7 +146,7 @@ func TestFeed_multipleSubscriptions(t *testing.T) {
147146

148147
func TestFeed_multipleTopics(t *testing.T) {
149148
f := feed.NewFeed[string, int]()
150-
defer f.Shutdown(context.Background())
149+
defer f.Close()
151150

152151
got1 := make([]int, 0)
153152
c1 := newCond()
@@ -196,7 +195,7 @@ func TestFeed_multipleTopics(t *testing.T) {
196195

197196
func TestFeed_shutdown(t *testing.T) {
198197
f := feed.NewFeed[string, int]()
199-
err := f.Shutdown(context.Background())
198+
err := f.Close()
200199
assert(t, "", err, nil)
201200

202201
c, cancel := f.Subscribe("topic")
@@ -213,6 +212,57 @@ func TestFeed_shutdown(t *testing.T) {
213212
assert(t, "", n, 0)
214213
}
215214

215+
func TestFeed_shutdownWithUnreadMessages(t *testing.T) {
216+
f := feed.NewFeed[string, int]()
217+
defer f.Close()
218+
219+
c, _ := f.Subscribe("topic")
220+
221+
n := f.Send("topic", 25)
222+
assert(t, "", n, 1)
223+
224+
n = f.Send("topic", 42)
225+
assert(t, "", n, 1)
226+
227+
n = f.Send("topic", 100)
228+
assert(t, "", n, 1)
229+
230+
n = f.Send("topic", 200)
231+
assert(t, "", n, 1)
232+
233+
m, ok := <-c
234+
assert(t, "", m, 25)
235+
assert(t, "", ok, true)
236+
237+
err := f.Close()
238+
assert(t, "", err, nil)
239+
}
240+
241+
func TestFeed_cancelWithUnreadMessages(t *testing.T) {
242+
f := feed.NewFeed[string, int]()
243+
defer f.Close()
244+
245+
c, cancel := f.Subscribe("topic")
246+
247+
n := f.Send("topic", 25)
248+
assert(t, "", n, 1)
249+
250+
n = f.Send("topic", 42)
251+
assert(t, "", n, 1)
252+
253+
n = f.Send("topic", 100)
254+
assert(t, "", n, 1)
255+
256+
n = f.Send("topic", 200)
257+
assert(t, "", n, 1)
258+
259+
m, ok := <-c
260+
assert(t, "", m, 25)
261+
assert(t, "", ok, true)
262+
263+
cancel()
264+
}
265+
216266
func assert[T any](t testing.TB, message string, got, want T) {
217267
t.Helper()
218268

trigger.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,17 +9,24 @@ import (
99
"sync"
1010
)
1111

12+
// Trigger notifies over topic based subscription channels if one or more
13+
// trigger events occurred. Trigger is useful in situations where only
14+
// information that something happened is useful, not information about all
15+
// occurred events.
1216
type Trigger[T comparable] struct {
1317
channels map[T][]chan struct{}
1418
mu sync.RWMutex
1519
}
1620

21+
// NewTrigger constructs a new Trigger instance.
1722
func NewTrigger[T comparable]() *Trigger[T] {
1823
return &Trigger[T]{
1924
channels: make(map[T][]chan struct{}),
2025
}
2126
}
2227

28+
// Subscribe returns a channel of empty structs which will return if at least
29+
// one Trigger call has been done on the same topic.
2330
func (t *Trigger[T]) Subscribe(topic T) (c <-chan struct{}, cancel func()) {
2431
channel := make(chan struct{}, 1)
2532

@@ -43,6 +50,10 @@ func (t *Trigger[T]) unsubscribe(topic T, c <-chan struct{}) {
4350
}
4451
}
4552

53+
// Trigger notifies all subscritions on the provided topic. Notifications will
54+
// be delivered to subscribers when each of them is ready to receive it, without
55+
// blocking this method call. Returned integer is a number of subscriptions that
56+
// will be notified.
4657
func (t *Trigger[T]) Trigger(topic T) (n int) {
4758
t.mu.RLock()
4859
defer t.mu.RUnlock()

0 commit comments

Comments
 (0)