Skip to content

Commit 617423e

Browse files
authored
Merge pull request dapr#8628 from JoshVanL/graceful-shutdown-subscriptions
Graceful shutdown: don't cancel in-flight pubsub events
2 parents b16d7bf + 7bd7987 commit 617423e

File tree

23 files changed

+1262
-123
lines changed

23 files changed

+1262
-123
lines changed

pkg/components/pubsub/pluggable.go

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -156,14 +156,23 @@ func (p *grpcPubSub) adaptHandler(ctx context.Context, streamingPull proto.PubSu
156156
}
157157

158158
// pullMessages pull messages of the given subscription and execute the handler for that messages.
159-
func (p *grpcPubSub) pullMessages(ctx context.Context, topic *proto.Topic, handler pubsub.Handler) error {
159+
func (p *grpcPubSub) pullMessages(parentCtx context.Context, topic *proto.Topic, handler pubsub.Handler) error {
160+
ctx, cancel := context.WithCancel(context.Background())
161+
162+
var wg sync.WaitGroup
163+
go func() {
164+
<-parentCtx.Done()
165+
wg.Wait()
166+
cancel()
167+
}()
168+
160169
// first pull should be sync and subsequent connections can be made in background if necessary
161170
pull, err := p.Client.PullMessages(ctx)
162171
if err != nil {
163172
return fmt.Errorf("unable to subscribe: %w", err)
164173
}
165174

166-
streamCtx, cancel := context.WithCancel(pull.Context())
175+
streamCtx, streamCancel := context.WithCancel(pull.Context())
167176

168177
err = pull.Send(&proto.PullMessagesRequest{
169178
Topic: topic,
@@ -173,7 +182,7 @@ func (p *grpcPubSub) pullMessages(ctx context.Context, topic *proto.Topic, handl
173182
if closeErr := pull.CloseSend(); closeErr != nil {
174183
p.logger.Warnf("could not close pull stream of topic %s: %v", topic.GetName(), closeErr)
175184
}
176-
cancel()
185+
streamCancel()
177186
}
178187

179188
if err != nil {
@@ -192,13 +201,17 @@ func (p *grpcPubSub) pullMessages(ctx context.Context, topic *proto.Topic, handl
192201

193202
// TODO reconnect on error
194203
if err != nil {
195-
p.logger.Errorf("failed to receive message: %v", err)
204+
p.logger.Errorf("Failed to receive pubsub message: %v", err)
196205
return
197206
}
198207

199-
p.logger.Debugf("received message from stream on topic %s", msg.GetTopicName())
208+
p.logger.Debugf("Received message from stream on topic %s", msg.GetTopicName())
200209

201-
go handle(msg)
210+
wg.Add(1)
211+
go func() {
212+
handle(msg)
213+
wg.Done()
214+
}()
202215
}
203216
}()
204217

pkg/runtime/processor/subscriber/subscriber.go

Lines changed: 82 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,7 @@ type Subscriber struct {
6565
appSubActive bool
6666
hasInitProg bool
6767
lock sync.RWMutex
68-
running atomic.Bool
69-
closed bool
68+
closed atomic.Bool
7069
}
7170

7271
type namedSubscription struct {
@@ -94,22 +93,49 @@ func New(opts Options) *Subscriber {
9493
}
9594

9695
func (s *Subscriber) Run(ctx context.Context) error {
97-
if !s.running.CompareAndSwap(false, true) {
98-
return errors.New("subscriber is already running")
96+
<-ctx.Done()
97+
s.closed.Store(true)
98+
return nil
99+
}
100+
101+
func (s *Subscriber) StopAllSubscriptionsForever() {
102+
s.lock.Lock()
103+
104+
s.closed.Store(true)
105+
106+
var wg sync.WaitGroup
107+
for _, psubs := range s.appSubs {
108+
wg.Add(len(psubs))
109+
for _, sub := range psubs {
110+
go func(sub *namedSubscription) {
111+
sub.Stop()
112+
wg.Done()
113+
}(sub)
114+
}
99115
}
100116

101-
<-ctx.Done()
117+
for _, psubs := range s.streamSubs {
118+
wg.Add(len(psubs))
119+
for _, sub := range psubs {
120+
go func() {
121+
sub.Stop()
122+
wg.Done()
123+
}()
124+
}
125+
}
102126

103-
s.StopAllSubscriptionsForever()
127+
s.appSubs = make(map[string][]*namedSubscription)
128+
s.streamSubs = make(map[string]map[rtpubsub.ConnectionID]*namedSubscription)
129+
s.lock.Unlock()
104130

105-
return nil
131+
wg.Wait()
106132
}
107133

108134
func (s *Subscriber) ReloadPubSub(name string) error {
109135
s.lock.Lock()
110136
defer s.lock.Unlock()
111137

112-
if s.closed {
138+
if s.closed.Load() {
113139
return nil
114140
}
115141

@@ -133,8 +159,8 @@ func (s *Subscriber) StartStreamerSubscription(subscription *subapi.Subscription
133159
s.lock.Unlock()
134160
}()
135161

136-
if s.closed {
137-
return fmt.Errorf("streaming subscriber %s with ID %d is closed", subscription.Name, connectionID)
162+
if s.closed.Load() {
163+
return apierrors.PubSub("").WithMetadata(nil).DeserializeError(errors.New("subscriber is closed"))
138164
}
139165

140166
sub, found := s.compStore.GetStreamSubscription(subscription)
@@ -172,10 +198,6 @@ func (s *Subscriber) StopStreamerSubscription(subscription *subapi.Subscription,
172198
s.lock.Unlock()
173199
}()
174200

175-
if s.closed {
176-
return
177-
}
178-
179201
subscriptions, ok := s.streamSubs[subscription.Spec.Pubsubname]
180202
if !ok {
181203
return
@@ -197,7 +219,7 @@ func (s *Subscriber) ReloadDeclaredAppSubscription(name, pubsubName string) erro
197219
s.lock.Lock()
198220
defer s.lock.Unlock()
199221

200-
if !s.appSubActive || s.closed {
222+
if !s.appSubActive || s.closed.Load() {
201223
return nil
202224
}
203225

@@ -240,12 +262,21 @@ func (s *Subscriber) StopPubSub(name string) {
240262
s.lock.Lock()
241263
defer s.lock.Unlock()
242264

265+
var wg sync.WaitGroup
266+
wg.Add(len(s.appSubs[name]) + len(s.streamSubs[name]))
243267
for _, sub := range s.appSubs[name] {
244-
sub.Stop()
268+
go func(sub *namedSubscription) {
269+
sub.Stop()
270+
wg.Done()
271+
}(sub)
245272
}
246273
for _, sub := range s.streamSubs[name] {
247-
sub.Stop()
274+
go func(sub *namedSubscription) {
275+
sub.Stop()
276+
wg.Done()
277+
}(sub)
248278
}
279+
wg.Wait()
249280

250281
s.appSubs[name] = nil
251282
s.streamSubs[name] = nil
@@ -255,7 +286,7 @@ func (s *Subscriber) StartAppSubscriptions() error {
255286
s.lock.Lock()
256287
defer s.lock.Unlock()
257288

258-
if s.appSubActive || s.closed {
289+
if s.appSubActive || s.closed.Load() {
259290
return nil
260291
}
261292

@@ -265,11 +296,19 @@ func (s *Subscriber) StartAppSubscriptions() error {
265296

266297
s.appSubActive = true
267298

299+
var wg sync.WaitGroup
268300
for _, subs := range s.appSubs {
301+
wg.Add(len(subs))
269302
for _, sub := range subs {
270-
sub.Stop()
303+
go func(sub *namedSubscription) {
304+
sub.Stop()
305+
wg.Done()
306+
}(sub)
271307
}
272308
}
309+
310+
wg.Wait()
311+
273312
s.appSubs = make(map[string][]*namedSubscription)
274313

275314
var errs []error
@@ -301,36 +340,19 @@ func (s *Subscriber) StopAppSubscriptions() {
301340

302341
s.appSubActive = false
303342

343+
var wg sync.WaitGroup
304344
for _, psub := range s.appSubs {
345+
wg.Add(len(psub))
305346
for _, sub := range psub {
306-
sub.Stop()
307-
}
308-
}
309-
310-
s.appSubs = make(map[string][]*namedSubscription)
311-
}
312-
313-
func (s *Subscriber) StopAllSubscriptionsForever() {
314-
s.lock.Lock()
315-
defer func() {
316-
s.lock.Unlock()
317-
}()
318-
319-
s.closed = true
320-
321-
for _, psubs := range s.appSubs {
322-
for _, sub := range psubs {
323-
sub.Stop()
324-
}
325-
}
326-
for _, psubs := range s.streamSubs {
327-
for _, sub := range psubs {
328-
sub.Stop()
347+
go func(sub *namedSubscription) {
348+
sub.Stop()
349+
wg.Done()
350+
}(sub)
329351
}
330352
}
353+
wg.Wait()
331354

332355
s.appSubs = make(map[string][]*namedSubscription)
333-
s.streamSubs = make(map[string]map[rtpubsub.ConnectionID]*namedSubscription)
334356
}
335357

336358
func (s *Subscriber) InitProgramaticSubscriptions(ctx context.Context) error {
@@ -340,12 +362,19 @@ func (s *Subscriber) InitProgramaticSubscriptions(ctx context.Context) error {
340362
}
341363

342364
func (s *Subscriber) reloadPubSubStream(name string, pubsub *rtpubsub.PubsubItem) error {
365+
var wg sync.WaitGroup
366+
wg.Add(len(s.streamSubs[name]))
343367
for _, sub := range s.streamSubs[name] {
344-
sub.Stop()
368+
go func(sub *namedSubscription) {
369+
sub.Stop()
370+
wg.Done()
371+
}(sub)
345372
}
373+
wg.Wait()
374+
346375
s.streamSubs[name] = nil
347376

348-
if s.closed || pubsub == nil {
377+
if s.closed.Load() || pubsub == nil {
349378
return nil
350379
}
351380

@@ -370,13 +399,19 @@ func (s *Subscriber) reloadPubSubStream(name string, pubsub *rtpubsub.PubsubItem
370399
}
371400

372401
func (s *Subscriber) reloadPubSubApp(name string, pubsub *rtpubsub.PubsubItem) error {
402+
var wg sync.WaitGroup
403+
wg.Add(len(s.appSubs[name]))
373404
for _, sub := range s.appSubs[name] {
374-
sub.Stop()
405+
go func(sub *namedSubscription) {
406+
sub.Stop()
407+
wg.Done()
408+
}(sub)
375409
}
410+
wg.Wait()
376411

377412
s.appSubs[name] = nil
378413

379-
if !s.appSubActive || s.closed || pubsub == nil {
414+
if !s.appSubActive || s.closed.Load() || pubsub == nil {
380415
return nil
381416
}
382417

0 commit comments

Comments
 (0)