Skip to content

Commit acfc7e8

Browse files
committed
Add support for multiple Streaming Subscriptions subscribers
Signed-off-by: Anton Troshin <[email protected]>
1 parent e7a8637 commit acfc7e8

File tree

13 files changed

+404
-201
lines changed

13 files changed

+404
-201
lines changed

pkg/api/grpc/subscribe.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ func (a *api) SubscribeTopicEventsAlpha1(stream runtimev1pb.Dapr_SubscribeTopicE
6666
}
6767

6868
key := a.pubsubAdapterStreamer.StreamerKey(req.GetPubsubName(), req.GetTopic())
69-
err = a.Universal.CompStore().AddStreamSubscription(&subapi.Subscription{
69+
sub := &subapi.Subscription{
7070
ObjectMeta: metav1.ObjectMeta{Name: key},
7171
Spec: subapi.SubscriptionSpec{
7272
Pubsubname: req.GetPubsubName(),
@@ -75,19 +75,21 @@ func (a *api) SubscribeTopicEventsAlpha1(stream runtimev1pb.Dapr_SubscribeTopicE
7575
DeadLetterTopic: req.GetDeadLetterTopic(),
7676
Routes: subapi.Routes{Default: "/"},
7777
},
78-
})
78+
}
79+
connectionID := a.pubsubAdapterStreamer.NextIndex()
80+
err = a.Universal.CompStore().AddStreamSubscription(sub, connectionID)
7981
if err != nil {
8082
return err
8183
}
8284

83-
if err = a.processor.Subscriber().StartStreamerSubscription(key); err != nil {
84-
a.Universal.CompStore().DeleteStreamSubscription(key)
85+
if err = a.processor.Subscriber().StartStreamerSubscription(sub, connectionID); err != nil {
86+
a.Universal.CompStore().DeleteStreamSubscription(sub)
8587
return err
8688
}
8789

8890
defer func() {
89-
a.processor.Subscriber().StopStreamerSubscription(req.GetPubsubName(), key)
90-
a.Universal.CompStore().DeleteStreamSubscription(key)
91+
a.processor.Subscriber().StopStreamerSubscription(sub, connectionID)
92+
a.Universal.CompStore().DeleteStreamSubscription(sub)
9193
}()
9294

9395
if err = stream.Send(&runtimev1pb.SubscribeTopicEventsResponseAlpha1{
@@ -98,5 +100,5 @@ func (a *api) SubscribeTopicEventsAlpha1(stream runtimev1pb.Dapr_SubscribeTopicE
98100
return err
99101
}
100102

101-
return a.pubsubAdapterStreamer.Subscribe(stream, req)
103+
return a.pubsubAdapterStreamer.Subscribe(stream, req, connectionID)
102104
}

pkg/runtime/compstore/compstore.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ func New() *ComponentStore {
8282
cryptoProviders: make(map[string]crypto.SubtleCrypto),
8383
subscriptions: &subscriptions{
8484
declaratives: make(map[string]*DeclarativeSubscription),
85-
streams: make(map[string]*DeclarativeSubscription),
85+
streams: make(map[string][]*DeclarativeSubscription),
8686
},
8787
conversations: make(map[string]conversation.Conversation),
8888
}

pkg/runtime/compstore/subscriptions.go

Lines changed: 49 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,15 @@ limitations under the License.
1414
package compstore
1515

1616
import (
17-
"fmt"
18-
1917
subapi "github.com/dapr/dapr/pkg/apis/subscriptions/v2alpha1"
2018
runtimev1pb "github.com/dapr/dapr/pkg/proto/runtime/v1"
2119
rtpubsub "github.com/dapr/dapr/pkg/runtime/pubsub"
20+
"github.com/dapr/kit/logger"
2221
"github.com/dapr/kit/ptr"
2322
)
2423

24+
var log = logger.NewLogger("dapr.runtime.compstore.subscriptions")
25+
2526
type DeclarativeSubscription struct {
2627
Comp *subapi.Subscription
2728
*NamedSubscription
@@ -30,7 +31,8 @@ type DeclarativeSubscription struct {
3031
type NamedSubscription struct {
3132
// Name is the optional name of the subscription. If not set, the name of the
3233
// component is used.
33-
Name *string
34+
Name *string
35+
ConnectionID rtpubsub.ConnectionID
3436
rtpubsub.Subscription
3537
}
3638

@@ -40,7 +42,7 @@ type subscriptions struct {
4042
// declarativesList used to track order of declarative subscriptions for
4143
// processing priority.
4244
declarativesList []string
43-
streams map[string]*DeclarativeSubscription
45+
streams map[string][]*DeclarativeSubscription
4446
}
4547

4648
// MetadataSubscription is a temporary wrapper for rtpubsub.Subscription to add a Type attribute to be used in GetMetadata
@@ -75,17 +77,19 @@ func (c *ComponentStore) AddDeclarativeSubscription(comp *subapi.Subscription, s
7577
c.subscriptions.declarativesList = append(c.subscriptions.declarativesList, comp.Name)
7678
}
7779

78-
func (c *ComponentStore) AddStreamSubscription(comp *subapi.Subscription) error {
80+
func (c *ComponentStore) AddStreamSubscription(comp *subapi.Subscription, connectionID rtpubsub.ConnectionID) error {
81+
log.Warn("Lock AddStreamSubscription")
7982
c.lock.Lock()
80-
defer c.lock.Unlock()
81-
if _, ok := c.subscriptions.streams[comp.Name]; ok {
82-
return fmt.Errorf("streamer already subscribed to pubsub %q topic %q", comp.Spec.Pubsubname, comp.Spec.Topic)
83-
}
83+
defer func() {
84+
c.lock.Unlock()
85+
log.Warn("Unlock AddStreamSubscription defer")
86+
}()
8487

85-
c.subscriptions.streams[comp.Name] = &DeclarativeSubscription{
88+
sub := &DeclarativeSubscription{
8689
Comp: comp,
8790
NamedSubscription: &NamedSubscription{
88-
Name: ptr.Of(comp.Name),
91+
Name: ptr.Of(comp.Name),
92+
ConnectionID: connectionID,
8993
Subscription: rtpubsub.Subscription{
9094
PubsubName: comp.Spec.Pubsubname,
9195
Topic: comp.Spec.Topic,
@@ -95,15 +99,28 @@ func (c *ComponentStore) AddStreamSubscription(comp *subapi.Subscription) error
9599
},
96100
},
97101
}
102+
c.subscriptions.streams[comp.Name] = append(c.subscriptions.streams[comp.Name], sub)
98103

99104
return nil
100105
}
101106

102-
func (c *ComponentStore) DeleteStreamSubscription(names ...string) {
107+
func (c *ComponentStore) DeleteStreamSubscription(comp *subapi.Subscription) {
108+
log.Warn("Lock DeleteStreamSubscription")
103109
c.lock.Lock()
104-
defer c.lock.Unlock()
105-
for _, name := range names {
106-
delete(c.subscriptions.streams, name)
110+
defer func() {
111+
c.lock.Unlock()
112+
log.Warn("Unlock DeleteStreamSubscription defer")
113+
}()
114+
streamingSubscriptions, ok := c.subscriptions.streams[comp.Name]
115+
if ok && len(streamingSubscriptions) > 0 {
116+
for i, sub := range streamingSubscriptions {
117+
if sub.Comp == comp {
118+
c.subscriptions.streams[comp.Name] = append(c.subscriptions.streams[comp.Name][:i], c.subscriptions.streams[comp.Name][i+1:]...)
119+
}
120+
}
121+
}
122+
if len(c.subscriptions.streams[comp.Name]) == 0 {
123+
delete(c.subscriptions.streams, comp.Name)
107124
}
108125
}
109126

@@ -155,17 +172,12 @@ func (c *ComponentStore) ListTypedSubscriptions() []TypedSubscription {
155172
subs = append(subs, typedSub)
156173
}
157174
}
158-
for i := range c.subscriptions.streams {
159-
sub := c.subscriptions.streams[i].Subscription
160-
typedSub := TypedSubscription{
161-
Subscription: sub,
162-
Type: runtimev1pb.PubsubSubscriptionType_STREAMING,
163-
}
164-
key := sub.PubsubName + "||" + sub.Topic
165-
if j, ok := taken[key]; ok {
166-
subs[j] = typedSub
167-
} else {
168-
taken[key] = len(subs)
175+
for _, streamingSubs := range c.subscriptions.streams {
176+
for _, sub := range streamingSubs {
177+
typedSub := TypedSubscription{
178+
Subscription: sub.Subscription,
179+
Type: runtimev1pb.PubsubSubscriptionType_STREAMING,
180+
}
169181
subs = append(subs, typedSub)
170182
}
171183
}
@@ -215,9 +227,11 @@ func (c *ComponentStore) ListSubscriptionsStreamByPubSub(name string) []*NamedSu
215227
defer c.lock.RUnlock()
216228

217229
var subs []*NamedSubscription
218-
for _, sub := range c.subscriptions.streams {
219-
if sub.Subscription.PubsubName == name {
220-
subs = append(subs, sub.NamedSubscription)
230+
for _, subscriptions := range c.subscriptions.streams {
231+
for _, sub := range subscriptions {
232+
if sub.Subscription.PubsubName == name {
233+
subs = append(subs, sub.NamedSubscription)
234+
}
221235
}
222236
}
223237

@@ -235,12 +249,14 @@ func (c *ComponentStore) GetDeclarativeSubscription(name string) (*DeclarativeSu
235249
return nil, false
236250
}
237251

238-
func (c *ComponentStore) GetStreamSubscription(key string) (*NamedSubscription, bool) {
252+
func (c *ComponentStore) GetStreamSubscription(subscription *subapi.Subscription) (*NamedSubscription, bool) {
239253
c.lock.RLock()
240254
defer c.lock.RUnlock()
241-
for i, sub := range c.subscriptions.streams {
242-
if sub.Comp.Name == key {
243-
return c.subscriptions.streams[i].NamedSubscription, true
255+
for _, subscriptions := range c.subscriptions.streams {
256+
for _, sub := range subscriptions {
257+
if sub.Comp == subscription {
258+
return sub.NamedSubscription, true
259+
}
244260
}
245261
}
246262
return nil, false

pkg/runtime/processor/manager.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ package processor
1616
import (
1717
"context"
1818
"fmt"
19+
subapi "github.com/dapr/dapr/pkg/apis/subscriptions/v2alpha1"
20+
rtpubsub "github.com/dapr/dapr/pkg/runtime/pubsub"
1921

2022
"github.com/dapr/durabletask-go/backend"
2123

@@ -46,8 +48,8 @@ type SubscribeManager interface {
4648
StopAppSubscriptions()
4749
StopAllSubscriptionsForever()
4850
ReloadDeclaredAppSubscription(name, pubsubName string) error
49-
StartStreamerSubscription(key string) error
50-
StopStreamerSubscription(pubsubName, key string)
51+
StartStreamerSubscription(sub *subapi.Subscription, connectionID rtpubsub.ConnectionID) error
52+
StopStreamerSubscription(sub *subapi.Subscription, connectionID rtpubsub.ConnectionID)
5153
ReloadPubSub(string) error
5254
StopPubSub(string)
5355
}

pkg/runtime/processor/subscriber/subscriber.go

Lines changed: 50 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"context"
1818
"errors"
1919
"fmt"
20+
subapi "github.com/dapr/dapr/pkg/apis/subscriptions/v2alpha1"
2021
"sync"
2122
"sync/atomic"
2223

@@ -60,7 +61,7 @@ type Subscriber struct {
6061
adapterStreamer rtpubsub.AdapterStreamer
6162

6263
appSubs map[string][]*namedSubscription
63-
streamSubs map[string][]*namedSubscription
64+
streamSubs map[string]map[rtpubsub.ConnectionID]*namedSubscription
6465
appSubActive bool
6566
hasInitProg bool
6667
lock sync.RWMutex
@@ -88,7 +89,7 @@ func New(opts Options) *Subscriber {
8889
adapter: opts.Adapter,
8990
adapterStreamer: opts.AdapterStreamer,
9091
appSubs: make(map[string][]*namedSubscription),
91-
streamSubs: make(map[string][]*namedSubscription),
92+
streamSubs: make(map[string]map[rtpubsub.ConnectionID]*namedSubscription),
9293
}
9394
}
9495

@@ -126,19 +127,19 @@ func (s *Subscriber) ReloadPubSub(name string) error {
126127
return errors.Join(errs...)
127128
}
128129

129-
func (s *Subscriber) StartStreamerSubscription(key string) error {
130+
func (s *Subscriber) StartStreamerSubscription(subscription *subapi.Subscription, connectionID rtpubsub.ConnectionID) error {
131+
log.Warn("Lock StartStreamerSubscription")
130132
s.lock.Lock()
131-
defer s.lock.Unlock()
133+
defer func() {
134+
s.lock.Unlock()
135+
log.Warn("Unlock StartStreamerSubscription defer")
136+
}()
132137

133138
if s.closed {
134139
return apierrors.PubSub("").WithMetadata(nil).DeserializeError(errors.New("subscriber is closed"))
135140
}
136141

137-
sub, ok := s.compStore.GetStreamSubscription(key)
138-
if !ok {
139-
err := fmt.Errorf("starting stream subscription without connection: %s", key)
140-
return apierrors.PubSub("").WithMetadata(nil).DeserializeError(err)
141-
}
142+
sub, ok := s.compStore.GetStreamSubscription(subscription)
142143

143144
pubsub, ok := s.compStore.GetPubSub(sub.PubsubName)
144145
if !ok {
@@ -150,28 +151,46 @@ func (s *Subscriber) StartStreamerSubscription(key string) error {
150151
return fmt.Errorf("failed to create subscription for %s: %s", sub.PubsubName, err)
151152
}
152153

153-
s.streamSubs[sub.PubsubName] = append(s.streamSubs[sub.PubsubName], &namedSubscription{
154+
key := s.adapterStreamer.StreamerKey(sub.PubsubName, sub.Topic)
155+
156+
subscriptions, ok := s.streamSubs[sub.PubsubName]
157+
if subscriptions == nil {
158+
s.streamSubs[sub.PubsubName] = make(map[rtpubsub.ConnectionID]*namedSubscription)
159+
}
160+
161+
s.streamSubs[sub.PubsubName][connectionID] = &namedSubscription{
154162
name: &key,
155163
Subscription: ss,
156-
})
157-
164+
}
158165
return nil
159166
}
160167

161-
func (s *Subscriber) StopStreamerSubscription(pubsubName, key string) {
168+
func (s *Subscriber) StopStreamerSubscription(subscription *subapi.Subscription, connectionID rtpubsub.ConnectionID) {
169+
log.Warn("Lock StopStreamerSubscription")
162170
s.lock.Lock()
163-
defer s.lock.Unlock()
171+
defer func() {
172+
s.lock.Unlock()
173+
log.Warn("Unlock StopStreamerSubscription defer")
174+
}()
164175

165176
if s.closed {
166177
return
167178
}
168179

169-
for i, sub := range s.streamSubs[pubsubName] {
170-
if sub.name != nil && *sub.name == key {
171-
sub.Stop()
172-
s.streamSubs[pubsubName] = append(s.streamSubs[pubsubName][:i], s.streamSubs[pubsubName][i+1:]...)
173-
return
174-
}
180+
subscriptions, ok := s.streamSubs[subscription.Spec.Pubsubname]
181+
if !ok {
182+
return
183+
}
184+
185+
sub, ok := subscriptions[connectionID]
186+
if !ok {
187+
return
188+
}
189+
190+
sub.Stop()
191+
delete(subscriptions, connectionID)
192+
if len(subscriptions) == 0 {
193+
delete(s.streamSubs, subscription.Spec.Pubsubname)
175194
}
176195
}
177196

@@ -293,8 +312,12 @@ func (s *Subscriber) StopAppSubscriptions() {
293312
}
294313

295314
func (s *Subscriber) StopAllSubscriptionsForever() {
315+
log.Warn("Lock StopAllSubscriptionsForever")
296316
s.lock.Lock()
297-
defer s.lock.Unlock()
317+
defer func() {
318+
s.lock.Unlock()
319+
log.Warn("Unlock StopAllSubscriptionsForever defer")
320+
}()
298321

299322
s.closed = true
300323

@@ -310,7 +333,7 @@ func (s *Subscriber) StopAllSubscriptionsForever() {
310333
}
311334

312335
s.appSubs = make(map[string][]*namedSubscription)
313-
s.streamSubs = make(map[string][]*namedSubscription)
336+
s.streamSubs = make(map[string]map[rtpubsub.ConnectionID]*namedSubscription)
314337
}
315338

316339
func (s *Subscriber) InitProgramaticSubscriptions(ctx context.Context) error {
@@ -329,7 +352,8 @@ func (s *Subscriber) reloadPubSubStream(name string, pubsub *rtpubsub.PubsubItem
329352
return nil
330353
}
331354

332-
subs := make([]*namedSubscription, 0, len(s.compStore.ListSubscriptionsStreamByPubSub(name)))
355+
//subs := make([]*namedSubscription, 0, len(s.compStore.ListSubscriptionsStreamByPubSub(name)))
356+
subs := make(map[rtpubsub.ConnectionID]*namedSubscription, len(s.compStore.ListSubscriptionsStreamByPubSub(name)))
333357
var errs []error
334358
for _, sub := range s.compStore.ListSubscriptionsStreamByPubSub(name) {
335359
ss, err := s.startSubscription(pubsub, sub, true)
@@ -338,10 +362,10 @@ func (s *Subscriber) reloadPubSubStream(name string, pubsub *rtpubsub.PubsubItem
338362
continue
339363
}
340364

341-
subs = append(subs, &namedSubscription{
365+
subs[sub.ConnectionID] = &namedSubscription{
342366
name: sub.Name,
343367
Subscription: ss,
344-
})
368+
}
345369
}
346370

347371
s.streamSubs[name] = subs
@@ -455,5 +479,6 @@ func (s *Subscriber) startSubscription(pubsub *rtpubsub.PubsubItem, comp *compst
455479
GRPC: s.grpc,
456480
Adapter: s.adapter,
457481
AdapterStreamer: streamer,
482+
ConnectionID: comp.ConnectionID,
458483
})
459484
}

0 commit comments

Comments
 (0)