Skip to content

Commit 7fe7262

Browse files
authored
Graceful shutdown: don't cancel in-flight input binding events (dapr#8637)
* Graceful shutdown: don't cancel in-flight input binding events Do not cancel in-flight input binding requests during graceful shutdown procedure, instead wait for them to complete or until the grace period expires. Also ensures in-flight requests are resolved during hot-reload events. New requests while the shutdown is in progress are errors to the input binding Component. Fixes: dapr#8332 Signed-off-by: joshvanl <[email protected]> * Adds subscription len check on block Signed-off-by: joshvanl <[email protected]> --------- Signed-off-by: joshvanl <[email protected]>
1 parent 9ad71c3 commit 7fe7262

File tree

21 files changed

+1316
-70
lines changed

21 files changed

+1316
-70
lines changed

pkg/components/bindings/input_pluggable.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ import (
2929
// grpcInputBinding is a implementation of a inputbinding over a gRPC Protocol.
3030
type grpcInputBinding struct {
3131
*pluggable.GRPCConnector[proto.InputBindingClient]
32-
bindings.InputBinding
3332
logger logger.Logger
3433

3534
closed atomic.Bool
@@ -130,7 +129,7 @@ func (b *grpcInputBinding) Read(ctx context.Context, handler bindings.Handler) e
130129

131130
// TODO reconnect on error
132131
if err != nil {
133-
b.logger.Errorf("failed to receive message: %v", err)
132+
b.logger.Errorf("failed to receive binding message: %v", err)
134133
return
135134
}
136135
b.wg.Add(1)
@@ -149,7 +148,7 @@ func (b *grpcInputBinding) Close() error {
149148
if b.closed.CompareAndSwap(false, true) {
150149
close(b.closeCh)
151150
}
152-
return b.InputBinding.Close()
151+
return nil
153152
}
154153

155154
// inputFromConnector creates a new GRPC inputbinding using the given underlying connector.

pkg/components/pubsub/pluggable.go

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -159,10 +159,8 @@ func (p *grpcPubSub) adaptHandler(ctx context.Context, streamingPull proto.PubSu
159159
func (p *grpcPubSub) pullMessages(parentCtx context.Context, topic *proto.Topic, handler pubsub.Handler) error {
160160
ctx, cancel := context.WithCancel(context.Background())
161161

162-
var wg sync.WaitGroup
163162
go func() {
164163
<-parentCtx.Done()
165-
wg.Wait()
166164
cancel()
167165
}()
168166

@@ -207,11 +205,7 @@ func (p *grpcPubSub) pullMessages(parentCtx context.Context, topic *proto.Topic,
207205

208206
p.logger.Debugf("Received message from stream on topic %s", msg.GetTopicName())
209207

210-
wg.Add(1)
211-
go func() {
212-
handle(msg)
213-
wg.Done()
214-
}()
208+
handle(msg)
215209
}
216210
}()
217211

pkg/runtime/processor/binding/binding.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
"github.com/dapr/dapr/pkg/runtime/compstore"
3333
rterrors "github.com/dapr/dapr/pkg/runtime/errors"
3434
"github.com/dapr/dapr/pkg/runtime/meta"
35+
"github.com/dapr/dapr/pkg/runtime/processor/binding/input"
3536
"github.com/dapr/kit/logger"
3637
)
3738

@@ -75,7 +76,7 @@ type binding struct {
7576
stopForever bool
7677

7778
subscribeBindingList []string
78-
inputCancels map[string]context.CancelFunc
79+
activeInputs map[string]*input.Input
7980
wg sync.WaitGroup
8081
}
8182

@@ -89,7 +90,7 @@ func New(opts Options) *binding {
8990
tracingSpec: opts.TracingSpec,
9091
grpc: opts.GRPC,
9192
channels: opts.Channels,
92-
inputCancels: make(map[string]context.CancelFunc),
93+
activeInputs: make(map[string]*input.Input),
9394
}
9495
}
9596

@@ -130,10 +131,10 @@ func (b *binding) Close(comp compapi.Component) error {
130131
inbinding, ok := b.compStore.GetInputBinding(comp.Name)
131132
if ok {
132133
defer b.compStore.DeleteInputBinding(comp.Name)
133-
if cancel := b.inputCancels[comp.Name]; cancel != nil {
134-
cancel()
134+
if input := b.activeInputs[comp.Name]; input != nil {
135+
input.Stop()
135136
}
136-
delete(b.inputCancels, comp.Name)
137+
delete(b.activeInputs, comp.Name)
137138
if err := inbinding.Close(); err != nil {
138139
errs = append(errs, err)
139140
}
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
Copyright 2025 The Dapr Authors
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package input
15+
16+
import (
17+
"context"
18+
"errors"
19+
"sync"
20+
"sync/atomic"
21+
"time"
22+
23+
"github.com/dapr/components-contrib/bindings"
24+
diag "github.com/dapr/dapr/pkg/diagnostics"
25+
"github.com/dapr/kit/logger"
26+
)
27+
28+
var log = logger.NewLogger("dapr.runtime.processor.binding.input")
29+
30+
type Options struct {
31+
Name string
32+
Binding bindings.InputBinding
33+
Handler func(context.Context, string, []byte, map[string]string) ([]byte, error)
34+
}
35+
36+
type Input struct {
37+
name string
38+
binding bindings.InputBinding
39+
handler func(context.Context, string, []byte, map[string]string) ([]byte, error)
40+
41+
cancel func()
42+
closed atomic.Bool
43+
wg sync.WaitGroup
44+
inflight atomic.Int64
45+
}
46+
47+
func Run(opts Options) (*Input, error) {
48+
ctx, cancel := context.WithCancel(context.Background())
49+
50+
i := &Input{
51+
name: opts.Name,
52+
binding: opts.Binding,
53+
handler: opts.Handler,
54+
cancel: cancel,
55+
}
56+
57+
return i, i.read(ctx)
58+
}
59+
60+
func (i *Input) Stop() {
61+
i.closed.Store(true)
62+
inflight := i.inflight.Load() > 0
63+
64+
i.wg.Wait()
65+
66+
// If there were in-flight requests then wait some time for the result to be
67+
// sent to the binding. This is because the message result context is
68+
// disparate.
69+
if inflight {
70+
time.Sleep(time.Millisecond * 400)
71+
}
72+
i.cancel()
73+
}
74+
75+
func (i *Input) read(ctx context.Context) error {
76+
return i.binding.Read(ctx, func(ctx context.Context, resp *bindings.ReadResponse) ([]byte, error) {
77+
i.wg.Add(1)
78+
i.inflight.Add(1)
79+
defer func() {
80+
i.wg.Done()
81+
i.inflight.Add(-1)
82+
}()
83+
84+
if i.closed.Load() {
85+
return nil, errors.New("input binding is closed")
86+
}
87+
88+
if resp == nil {
89+
return nil, nil
90+
}
91+
92+
start := time.Now()
93+
b, err := i.handler(ctx, i.name, resp.Data, resp.Metadata)
94+
elapsed := diag.ElapsedSince(start)
95+
96+
diag.DefaultComponentMonitoring.InputBindingEvent(context.Background(), i.name, err == nil, elapsed)
97+
98+
if err != nil {
99+
log.Debugf("error from app consumer for binding [%s]: %s", i.name, err)
100+
return nil, err
101+
}
102+
103+
return b, nil
104+
})
105+
}

pkg/runtime/processor/binding/send.go

Lines changed: 31 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"fmt"
2121
"net/http"
2222
"strings"
23+
"sync"
2324
"time"
2425

2526
"go.opentelemetry.io/otel/trace"
@@ -37,6 +38,7 @@ import (
3738
invokev1 "github.com/dapr/dapr/pkg/messaging/v1"
3839
runtimev1pb "github.com/dapr/dapr/pkg/proto/runtime/v1"
3940
"github.com/dapr/dapr/pkg/resiliency"
41+
"github.com/dapr/dapr/pkg/runtime/processor/binding/input"
4042
)
4143

4244
func (b *binding) StartReadingFromBindings(ctx context.Context) error {
@@ -54,10 +56,16 @@ func (b *binding) StartReadingFromBindings(ctx context.Context) error {
5456
}
5557

5658
// Clean any previous state
57-
for _, cancel := range b.inputCancels {
58-
cancel()
59+
var wg sync.WaitGroup
60+
wg.Add(len(b.activeInputs))
61+
for _, inp := range b.activeInputs {
62+
go func(input *input.Input) {
63+
input.Stop()
64+
wg.Done()
65+
}(inp)
5966
}
60-
b.inputCancels = make(map[string]context.CancelFunc)
67+
wg.Wait()
68+
clear(b.activeInputs)
6169

6270
comps := b.compStore.ListComponents()
6371
bindings := make(map[string]componentsV1alpha1.Component)
@@ -86,31 +94,34 @@ func (b *binding) startInputBinding(comp componentsV1alpha1.Component, binding b
8694

8795
m := meta.Properties
8896

89-
ctx, cancel := context.WithCancel(context.Background())
9097
if isBindingOfExplicitDirection(ComponentTypeInput, m) {
9198
isSubscribed = true
9299
} else {
93-
var err error
100+
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
101+
defer cancel()
94102
isSubscribed, err = b.isAppSubscribedToBinding(ctx, comp.Name)
95103
if err != nil {
96-
cancel()
97104
return err
98105
}
99106
}
100107

101108
if !isSubscribed {
102109
log.Infof("app has not subscribed to binding %s.", comp.Name)
103-
cancel()
104110
return nil
105111
}
106112

107-
if err := b.readFromBinding(ctx, comp.Name, binding); err != nil {
113+
input, err := input.Run(input.Options{
114+
Name: comp.Name,
115+
Binding: binding,
116+
Handler: b.sendBindingEventToApp,
117+
})
118+
if err != nil {
108119
log.Errorf("error reading from input binding %s: %s", comp.Name, err)
109-
cancel()
110-
return nil
120+
return err
111121
}
112122

113-
b.inputCancels[comp.Name] = cancel
123+
b.activeInputs[comp.Name] = input
124+
114125
return nil
115126
}
116127

@@ -125,10 +136,16 @@ func (b *binding) StopReadingFromBindings(forever bool) {
125136

126137
b.readingBindings = false
127138

128-
for _, cancel := range b.inputCancels {
129-
cancel()
139+
var wg sync.WaitGroup
140+
wg.Add(len(b.activeInputs))
141+
for _, inp := range b.activeInputs {
142+
go func(input *input.Input) {
143+
input.Stop()
144+
wg.Done()
145+
}(inp)
130146
}
131-
b.inputCancels = make(map[string]context.CancelFunc)
147+
wg.Wait()
148+
clear(b.activeInputs)
132149
}
133150

134151
func (b *binding) sendBatchOutputBindingsParallel(ctx context.Context, to []string, data []byte) {
@@ -406,26 +423,6 @@ func (b *binding) sendBindingEventToApp(ctx context.Context, bindingName string,
406423
return appResponseBody, nil
407424
}
408425

409-
func (b *binding) readFromBinding(readCtx context.Context, name string, binding bindings.InputBinding) error {
410-
return binding.Read(readCtx, func(ctx context.Context, resp *bindings.ReadResponse) ([]byte, error) {
411-
if resp == nil {
412-
return nil, nil
413-
}
414-
415-
start := time.Now()
416-
b, err := b.sendBindingEventToApp(ctx, name, resp.Data, resp.Metadata)
417-
elapsed := diag.ElapsedSince(start)
418-
419-
diag.DefaultComponentMonitoring.InputBindingEvent(context.Background(), name, err == nil, elapsed)
420-
421-
if err != nil {
422-
log.Debugf("error from app consumer for binding [%s]: %s", name, err)
423-
return nil, err
424-
}
425-
return b, nil
426-
})
427-
}
428-
429426
func (b *binding) getSubscribedBindingsGRPC(ctx context.Context) ([]string, error) {
430427
conn, err := b.grpc.GetAppClient()
431428
if err != nil {

0 commit comments

Comments
 (0)