Skip to content

Commit bb95cc5

Browse files
committed
Add Update subscription type
1 parent 436ca61 commit bb95cc5

File tree

2 files changed

+522
-0
lines changed

2 files changed

+522
-0
lines changed

update.go

Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
// Copyright (c) 2022, Janoš Guljaš <janos@resenje.org>
2+
// All rights reserved.
3+
// Use of this source code is governed by a BSD-style
4+
// license that can be found in the LICENSE file.
5+
6+
package feed
7+
8+
import (
9+
"sync"
10+
)
11+
12+
// Update defines a set of subscriptions per topic T which receive messages sent
13+
// to the Update.
14+
type Update[T comparable, M any] struct {
15+
subscriptions map[T][]*updateSubscription[T, M]
16+
mu sync.RWMutex
17+
18+
wg sync.WaitGroup
19+
quit chan struct{}
20+
quitOnce sync.Once
21+
}
22+
23+
// NewFeed constructs new Feed with topic type T and message type M.
24+
func NewUpdate[T comparable, M any]() *Update[T, M] {
25+
return &Update[T, M]{
26+
subscriptions: make(map[T][]*updateSubscription[T, M]),
27+
quit: make(chan struct{}),
28+
}
29+
}
30+
31+
// Subscribe returns a channel from which messages M, that are sent to the Feed
32+
// on the same topic, can be read from. Message delivery preserves ordering and
33+
// is guaranteed, so the channel should be read to avoid keeping unread messages
34+
// in memory. After cancel function call, all resources ang goroutines are
35+
// released even if not all messages are read from channel.
36+
func (u *Update[T, M]) Subscribe(topic T) (c <-chan M, cancel func()) {
37+
channel := make(chan M)
38+
39+
select {
40+
case <-u.quit:
41+
close(channel)
42+
return channel, func() {}
43+
default:
44+
}
45+
46+
u.mu.Lock()
47+
defer u.mu.Unlock()
48+
49+
s := newUpdateSubscription(u, channel)
50+
51+
u.subscriptions[topic] = append(u.subscriptions[topic], s)
52+
53+
return channel, func() { u.unsubscribe(topic, s) }
54+
}
55+
56+
func (u *Update[T, M]) unsubscribe(topic T, s *updateSubscription[T, M]) {
57+
u.mu.Lock()
58+
defer u.mu.Unlock()
59+
60+
for i, sub := range u.subscriptions[topic] {
61+
if sub == s {
62+
u.subscriptions[topic] = append(u.subscriptions[topic][:i], u.subscriptions[topic][i+1:]...)
63+
s.close()
64+
}
65+
}
66+
}
67+
68+
// Close terminates all subscriptions and releases acquired resources.
69+
func (u *Update[T, M]) Close() error {
70+
u.quitOnce.Do(func() {
71+
close(u.quit)
72+
})
73+
u.wg.Wait()
74+
75+
u.mu.Lock()
76+
defer u.mu.Unlock()
77+
78+
for topic, subscriptions := range u.subscriptions {
79+
for _, s := range subscriptions {
80+
s.close()
81+
}
82+
u.subscriptions[topic] = nil
83+
}
84+
85+
return nil
86+
}
87+
88+
// Send sends a message to all sunscribed channels to topic. Messages will be
89+
// delivered to subscribers when each of them is ready to receive it, without
90+
// blocking this method call. The returned integer is the number of subscribers
91+
// that should receive the message.
92+
func (u *Update[T, M]) Send(topic T, message M) (n int) {
93+
u.mu.RLock()
94+
defer u.mu.RUnlock()
95+
96+
for _, s := range u.subscriptions[topic] {
97+
s.send(message)
98+
99+
n++
100+
}
101+
102+
return n
103+
}
104+
105+
type updateSubscription[T comparable, M any] struct {
106+
feed *Update[T, M]
107+
108+
channel chan M
109+
update chan M
110+
updated chan struct{}
111+
112+
quit chan struct{}
113+
wg sync.WaitGroup
114+
}
115+
116+
func newUpdateSubscription[T comparable, M any](u *Update[T, M], channel chan M) *updateSubscription[T, M] {
117+
return &updateSubscription[T, M]{
118+
feed: u,
119+
channel: channel,
120+
update: make(chan M),
121+
updated: make(chan struct{}),
122+
quit: make(chan struct{}),
123+
}
124+
}
125+
126+
func (s *updateSubscription[T, M]) send(message M) {
127+
select {
128+
case s.channel <- message:
129+
130+
case s.update <- message:
131+
select {
132+
case <-s.updated:
133+
case <-s.quit:
134+
case <-s.feed.quit:
135+
}
136+
return
137+
138+
case <-s.quit:
139+
return
140+
141+
case <-s.feed.quit:
142+
return
143+
144+
default:
145+
146+
ready := make(chan struct{})
147+
done := make(chan struct{})
148+
149+
channel := s.channel
150+
151+
s.wg.Add(1)
152+
go func() {
153+
defer s.wg.Done()
154+
defer close(done)
155+
156+
for {
157+
select {
158+
case channel <- message:
159+
return
160+
161+
case message = <-s.update:
162+
channel = nil
163+
164+
case ready <- struct{}{}:
165+
166+
case s.updated <- struct{}{}:
167+
channel = s.channel
168+
169+
case <-s.quit:
170+
return
171+
172+
case <-s.feed.quit:
173+
return
174+
175+
}
176+
}
177+
}()
178+
179+
select {
180+
case <-ready:
181+
case <-done:
182+
case <-s.quit:
183+
case <-s.feed.quit:
184+
}
185+
}
186+
}
187+
188+
func (s *updateSubscription[T, M]) close() {
189+
close(s.quit)
190+
s.wg.Wait()
191+
close(s.channel)
192+
}

0 commit comments

Comments
 (0)