Skip to content

Commit ade2dbc

Browse files
committed
Improve Feed with preserved ordering
1 parent b03041a commit ade2dbc

File tree

2 files changed

+148
-44
lines changed

2 files changed

+148
-44
lines changed

feed.go

Lines changed: 74 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import (
1212
// Feed defines a set of subscriptions per topic T which receive messages sent
1313
// to the Feed.
1414
type Feed[T comparable, M any] struct {
15-
subscriptions map[T][]*subscription[M]
15+
subscriptions map[T][]*subscription[T, M]
1616
mu sync.RWMutex
1717

1818
wg sync.WaitGroup
@@ -23,16 +23,16 @@ type Feed[T comparable, M any] struct {
2323
// NewFeed constructs new Feed with topic type T and message type M.
2424
func NewFeed[T comparable, M any]() *Feed[T, M] {
2525
return &Feed[T, M]{
26-
subscriptions: make(map[T][]*subscription[M]),
26+
subscriptions: make(map[T][]*subscription[T, M]),
2727
quit: make(chan struct{}),
2828
}
2929
}
3030

3131
// Subscribe returns a channel from which messages M, that are sent to the Feed
32-
// on the same topic, can be read from. Message delivery is guaranteed, so the
33-
// channel should be read to avoid possible high number of goroutines. After
34-
// cancel function call, all resources ang goroutines are released even if not
35-
// all messages are read from channel.
32+
// on the same topic, can be read from. Message delivery preserves ordering and
33+
// is guaranteed, so the channel should be read to avoid keeping unread messages
34+
// in memory. After cancel function call, all resources ang goroutines are
35+
// released even if not all messages are read from channel.
3636
func (f *Feed[T, M]) Subscribe(topic T) (c <-chan M, cancel func()) {
3737
channel := make(chan M, 1) // buffer 1 not to block on Send method
3838

@@ -46,14 +46,14 @@ func (f *Feed[T, M]) Subscribe(topic T) (c <-chan M, cancel func()) {
4646
f.mu.Lock()
4747
defer f.mu.Unlock()
4848

49-
s := newSubscription(channel)
49+
s := newSubscription(f, channel)
5050

5151
f.subscriptions[topic] = append(f.subscriptions[topic], s)
5252

5353
return channel, func() { f.unsubscribe(topic, s) }
5454
}
5555

56-
func (f *Feed[T, M]) unsubscribe(topic T, s *subscription[M]) {
56+
func (f *Feed[T, M]) unsubscribe(topic T, s *subscription[T, M]) {
5757
f.mu.Lock()
5858
defer f.mu.Unlock()
5959

@@ -94,55 +94,85 @@ func (f *Feed[T, M]) Send(topic T, message M) (n int) {
9494
defer f.mu.RUnlock()
9595

9696
for _, s := range f.subscriptions[topic] {
97-
// try to send message to the channel
98-
select {
99-
case s.channel <- message:
100-
case <-s.quit:
101-
return
102-
case <-f.quit:
103-
return
104-
default:
105-
// if channel is blocked,
106-
// wait in goroutine to send the message
107-
s := s
108-
109-
f.wg.Add(1)
110-
go func() {
111-
defer f.wg.Done()
112-
113-
select {
114-
case s.channel <- message:
115-
case <-s.quit:
116-
return
117-
case <-f.quit:
118-
return
119-
}
120-
}()
121-
}
97+
s.send(message)
12298

12399
n++
124100
}
125101

126102
return n
127103
}
128104

129-
type subscription[M any] struct {
130-
channel chan M
131-
quitOnce sync.Once
132-
quit chan struct{}
133-
wg sync.WaitGroup
105+
type subscription[T comparable, M any] struct {
106+
feed *Feed[T, M]
107+
108+
channel chan M
109+
110+
buf []M
111+
mu sync.Mutex
112+
113+
quit chan struct{}
114+
wg sync.WaitGroup
134115
}
135116

136-
func newSubscription[M any](channel chan M) *subscription[M] {
137-
return &subscription[M]{
117+
func newSubscription[T comparable, M any](feed *Feed[T, M], channel chan M) *subscription[T, M] {
118+
return &subscription[T, M]{
119+
feed: feed,
138120
channel: channel,
121+
buf: make([]M, 0),
139122
quit: make(chan struct{}),
140123
}
141124
}
142125

143-
func (s *subscription[M]) close() {
144-
s.quitOnce.Do(func() {
145-
close(s.quit)
146-
})
126+
func (s *subscription[T, M]) send(message M) {
127+
s.mu.Lock()
128+
defer s.mu.Unlock()
129+
130+
if len(s.buf) > 0 {
131+
s.buf = append(s.buf, message)
132+
return
133+
}
134+
135+
select {
136+
case s.channel <- message:
137+
case <-s.quit:
138+
return
139+
case <-s.feed.quit:
140+
return
141+
default:
142+
s.buf = append(s.buf, message)
143+
144+
s.wg.Add(1)
145+
go func() {
146+
defer s.wg.Done()
147+
148+
for {
149+
s.mu.Lock()
150+
message := s.buf[0]
151+
s.mu.Unlock()
152+
153+
select {
154+
case s.channel <- message:
155+
case <-s.quit:
156+
return
157+
case <-s.feed.quit:
158+
return
159+
}
160+
161+
s.mu.Lock()
162+
s.buf = s.buf[1:]
163+
if len(s.buf) == 0 {
164+
s.mu.Unlock()
165+
return
166+
}
167+
s.mu.Unlock()
168+
}
169+
170+
}()
171+
}
172+
}
173+
174+
func (s *subscription[T, M]) close() {
175+
close(s.quit)
147176
s.wg.Wait()
177+
close(s.channel)
148178
}

feed_test.go

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,11 @@
66
package feed_test
77

88
import (
9+
"math/rand"
910
"reflect"
1011
"sync"
1112
"testing"
13+
"time"
1214

1315
"resenje.org/feed"
1416
)
@@ -263,6 +265,78 @@ func TestFeed_cancelWithUnreadMessages(t *testing.T) {
263265
cancel()
264266
}
265267

268+
func TestFeed_ordering(t *testing.T) {
269+
f := feed.NewFeed[string, int]()
270+
defer f.Close()
271+
272+
got := make([]int, 0)
273+
274+
s, cancel := f.Subscribe("topic1")
275+
defer cancel()
276+
277+
messages := make([]int, 0)
278+
279+
count := 1000
280+
281+
for i := 0; i < count; i++ {
282+
messages = append(messages, i)
283+
n := f.Send("topic1", i)
284+
assert(t, "", n, 1)
285+
}
286+
287+
for i := 0; i < count; i++ {
288+
got = append(got, <-s)
289+
}
290+
291+
assert(t, "", got, messages)
292+
}
293+
294+
func TestFeed_stressTest(t *testing.T) {
295+
f := feed.NewFeed[string, int]()
296+
defer f.Close()
297+
298+
count := 1000
299+
lastReceived := -1
300+
301+
s, cancel := f.Subscribe("topic1")
302+
defer cancel()
303+
304+
random1 := rand.New(rand.NewSource(time.Now().UnixNano()))
305+
random2 := rand.New(rand.NewSource(time.Now().UnixNano()))
306+
307+
receiveDone := make(chan struct{})
308+
go func() {
309+
defer close(receiveDone)
310+
for {
311+
m, ok := <-s
312+
if !ok {
313+
return
314+
}
315+
if m != lastReceived+1 {
316+
assert(t, "", m, lastReceived+1)
317+
}
318+
lastReceived = m
319+
time.Sleep(time.Duration(random1.Int63n(1000)) * time.Microsecond)
320+
}
321+
}()
322+
323+
sendDone := make(chan struct{})
324+
go func() {
325+
defer close(sendDone)
326+
for i := 0; i < count; i++ {
327+
n := f.Send("topic1", i)
328+
assert(t, "", n, 1)
329+
time.Sleep(time.Duration(random2.Int63n(1000)) * time.Microsecond)
330+
}
331+
}()
332+
333+
<-sendDone
334+
335+
cancel()
336+
337+
<-receiveDone
338+
}
339+
266340
func assert[T any](t testing.TB, message string, got, want T) {
267341
t.Helper()
268342

0 commit comments

Comments
 (0)