Skip to content

Commit e9d8852

Browse files
authored
Merge pull request dapr#8639 from JoshVanL/fix-scheduler-client-close
Fix Scheduler Client connection pruning
2 parents 7c86046 + c588d4f commit e9d8852

File tree

8 files changed

+282
-113
lines changed

8 files changed

+282
-113
lines changed

docs/release_notes/v1.15.4.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,6 @@ The sidecar injector was not properly respecting the global actors enabled confi
7272

7373
The sidecar injector now properly respects the `global.actors.enabled` helm configuration and `ACTORS_ENABLED` environment variable. When set to `false`, it will not attempt to connect to the placement service, allowing the sidecar to start successfully without actor functionality.
7474

75-
7675
## Prevent panic of reminder operations on slow Actor Startup
7776

7877
### Problem

pkg/runtime/scheduler/internal/clients/clients.go

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"context"
1818
"errors"
1919
"fmt"
20+
"sync"
2021
"sync/atomic"
2122

2223
schedulerv1pb "github.com/dapr/dapr/pkg/proto/scheduler/v1"
@@ -37,30 +38,34 @@ type Options struct {
3738
// fashion.
3839
type Clients struct {
3940
clients []schedulerv1pb.SchedulerClient
41+
closeFns []context.CancelFunc
4042
lastUsedIdx atomic.Uint64
4143
}
4244

4345
func New(ctx context.Context, opts Options) (*Clients, error) {
44-
clients := make([]schedulerv1pb.SchedulerClient, len(opts.Addresses))
46+
c := &Clients{
47+
clients: make([]schedulerv1pb.SchedulerClient, 0, len(opts.Addresses)),
48+
closeFns: make([]context.CancelFunc, 0, len(opts.Addresses)),
49+
}
4550

46-
for i, address := range opts.Addresses {
51+
for _, address := range opts.Addresses {
4752
log.Debugf("Attempting to connect to Scheduler at address: %s", address)
48-
client, err := client.New(ctx, address, opts.Security)
53+
client, closeFn, err := client.New(ctx, address, opts.Security)
4954
if err != nil {
55+
c.Close()
5056
return nil, fmt.Errorf("scheduler client not initialized for address %s: %s", address, err)
5157
}
5258

5359
log.Infof("Scheduler client initialized for address: %s", address)
54-
clients[i] = client
60+
c.clients = append(c.clients, client)
61+
c.closeFns = append(c.closeFns, closeFn)
5562
}
5663

57-
if len(clients) > 0 {
64+
if len(c.clients) > 0 {
5865
log.Info("Scheduler clients initialized")
5966
}
6067

61-
return &Clients{
62-
clients: clients,
63-
}, nil
68+
return c, nil
6469
}
6570

6671
// Next returns the next client in a round-robin manner.
@@ -77,3 +82,15 @@ func (c *Clients) Next() (schedulerv1pb.SchedulerClient, error) {
7782
func (c *Clients) All() []schedulerv1pb.SchedulerClient {
7883
return c.clients
7984
}
85+
86+
func (c *Clients) Close() {
87+
var wg sync.WaitGroup
88+
wg.Add(len(c.closeFns))
89+
for _, closeFn := range c.closeFns {
90+
go func() {
91+
closeFn()
92+
wg.Done()
93+
}()
94+
}
95+
wg.Wait()
96+
}

pkg/runtime/scheduler/internal/cluster/cluster.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ func (c *Cluster) RunClients(ctx context.Context, clients *clients.Clients) erro
133133
}
134134

135135
if err != nil {
136-
log.Warnf("Error watching scheduler jobs: %v", err)
136+
return fmt.Errorf("error watching scheduler jobs: %w", err)
137137
}
138138
}
139139
}

pkg/runtime/scheduler/internal/cluster/streamer.go

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"fmt"
2020
"io"
2121
"sync"
22+
"sync/atomic"
2223

2324
"google.golang.org/grpc/codes"
2425
"google.golang.org/grpc/status"
@@ -41,7 +42,8 @@ type streamer struct {
4142
channels *channels.Channels
4243
wfengine wfengine.Interface
4344

44-
wg sync.WaitGroup
45+
wg sync.WaitGroup
46+
inflight atomic.Int64
4547
}
4648

4749
// run starts the streamer and blocks until the stream is closed or an error occurs.
@@ -53,7 +55,10 @@ func (s *streamer) run(ctx context.Context) error {
5355
// scheduler job messages. It then invokes the appropriate app or actor
5456
// reminder based on the job metadata.
5557
func (s *streamer) receive(ctx context.Context) error {
56-
defer s.wg.Wait()
58+
defer func() {
59+
s.wg.Wait()
60+
s.stream.CloseSend()
61+
}()
5762

5863
for {
5964
resp, err := s.stream.Recv()
@@ -65,12 +70,15 @@ func (s *streamer) receive(ctx context.Context) error {
6570
}
6671

6772
s.wg.Add(1)
73+
s.inflight.Add(1)
6874
go func() {
69-
defer s.wg.Done()
75+
defer func() {
76+
s.wg.Done()
77+
s.inflight.Add(-1)
78+
}()
79+
7080
result := s.handleJob(ctx, resp)
7181
select {
72-
case <-ctx.Done():
73-
case <-s.stream.Context().Done():
7482
case s.resultCh <- &schedulerv1pb.WatchJobsRequest{
7583
WatchJobRequestType: &schedulerv1pb.WatchJobsRequest_Result{
7684
Result: &schedulerv1pb.WatchJobsRequestResult{
@@ -79,6 +87,8 @@ func (s *streamer) receive(ctx context.Context) error {
7987
},
8088
},
8189
}:
90+
case <-s.stream.Context().Done():
91+
case <-ctx.Done():
8292
}
8393
}()
8494
}
@@ -93,14 +103,14 @@ func (s *streamer) outgoing(ctx context.Context) error {
93103

94104
for {
95105
select {
96-
case <-ctx.Done():
97-
return ctx.Err()
98-
case <-s.stream.Context().Done():
99-
return s.stream.Context().Err()
100106
case result := <-s.resultCh:
101107
if err := s.stream.Send(result); err != nil {
102108
return err
103109
}
110+
case <-ctx.Done():
111+
if s.inflight.Load() == 0 && len(s.resultCh) == 0 {
112+
return ctx.Err()
113+
}
104114
}
105115
}
106116
}
Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
/*
2+
Copyright 2024 The Dapr Authors
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package watchhosts
15+
16+
import (
17+
"context"
18+
"fmt"
19+
"math/rand"
20+
"slices"
21+
"sync"
22+
"sync/atomic"
23+
24+
"google.golang.org/grpc/codes"
25+
"google.golang.org/grpc/status"
26+
27+
schedulerv1pb "github.com/dapr/dapr/pkg/proto/scheduler/v1"
28+
"github.com/dapr/dapr/pkg/scheduler/client"
29+
"github.com/dapr/dapr/pkg/security"
30+
"github.com/dapr/kit/events/broadcaster"
31+
"github.com/dapr/kit/logger"
32+
)
33+
34+
var log = logger.NewLogger("dapr.runtime.scheduler.watchhosts")
35+
36+
type Options struct {
37+
Addresses []string
38+
Security security.Handler
39+
}
40+
41+
type WatchHosts struct {
42+
allAddrs []string
43+
security security.Handler
44+
45+
gotAddrs atomic.Pointer[ContextAddress]
46+
subs *broadcaster.Broadcaster[*ContextAddress]
47+
48+
cancel context.CancelFunc
49+
readyCh chan struct{}
50+
lock sync.Mutex
51+
}
52+
53+
type ContextAddress struct {
54+
context.Context
55+
Addresses []string
56+
}
57+
58+
func New(opts Options) *WatchHosts {
59+
return &WatchHosts{
60+
allAddrs: opts.Addresses,
61+
security: opts.Security,
62+
subs: broadcaster.New[*ContextAddress](),
63+
readyCh: make(chan struct{}),
64+
}
65+
}
66+
67+
func (w *WatchHosts) Run(ctx context.Context) error {
68+
defer w.subs.Close()
69+
70+
stream, closeCon, err := w.connSchedulerHosts(ctx)
71+
if err != nil {
72+
return fmt.Errorf("failed to connect to scheduler host: %s", err)
73+
}
74+
75+
if stream != nil {
76+
defer func() {
77+
stream.CloseSend()
78+
closeCon()
79+
}()
80+
}
81+
82+
err = w.handleStream(ctx, stream)
83+
84+
if ctx.Err() != nil {
85+
return ctx.Err()
86+
}
87+
88+
if err != nil {
89+
return fmt.Errorf("failed to handle Scheduler WatchHosts stream: %s", err)
90+
}
91+
92+
return nil
93+
}
94+
95+
func (w *WatchHosts) Addresses(ctx context.Context) <-chan *ContextAddress {
96+
ch := make(chan *ContextAddress, 1)
97+
select {
98+
case <-ctx.Done():
99+
ch <- &ContextAddress{ctx, nil}
100+
return ch
101+
case <-w.readyCh:
102+
}
103+
104+
w.lock.Lock()
105+
defer w.lock.Unlock()
106+
w.subs.Subscribe(ctx, ch)
107+
ch <- w.gotAddrs.Load()
108+
109+
return ch
110+
}
111+
112+
func (w *WatchHosts) handleStream(ctx context.Context, stream schedulerv1pb.Scheduler_WatchHostsClient) error {
113+
// If no stream was made the server doesn't support watching hosts
114+
// (pre-1.15), so we use static. Remove in 1.16.
115+
if stream == nil {
116+
w.lock.Lock()
117+
got := &ContextAddress{ctx, w.allAddrs}
118+
w.gotAddrs.Store(got)
119+
w.subs.Broadcast(got)
120+
w.lock.Unlock()
121+
close(w.readyCh)
122+
<-ctx.Done()
123+
return nil
124+
}
125+
126+
for {
127+
gotAddrs, err := w.watchNextAddresses(ctx, stream)
128+
if err != nil {
129+
return err
130+
}
131+
132+
w.lock.Lock()
133+
if w.cancel != nil {
134+
w.cancel()
135+
}
136+
137+
actx, cancel := context.WithCancel(ctx)
138+
w.cancel = cancel
139+
got := &ContextAddress{actx, gotAddrs}
140+
w.gotAddrs.Store(got)
141+
w.subs.Broadcast(got)
142+
w.lock.Unlock()
143+
144+
select {
145+
case <-w.readyCh:
146+
default:
147+
close(w.readyCh)
148+
}
149+
}
150+
}
151+
152+
func (w *WatchHosts) watchNextAddresses(ctx context.Context, stream schedulerv1pb.Scheduler_WatchHostsClient) ([]string, error) {
153+
resp, err := stream.Recv()
154+
if err != nil {
155+
if status.Code(err) == codes.Unimplemented {
156+
// Ignore unimplemented error code as we are talking to an old server.
157+
// TODO: @joshvanl: remove special case in v1.16.
158+
return slices.Clone(w.allAddrs), nil
159+
}
160+
return nil, err
161+
}
162+
163+
gotAddrs := make([]string, 0, len(resp.GetHosts()))
164+
for _, host := range resp.GetHosts() {
165+
gotAddrs = append(gotAddrs, host.GetAddress())
166+
}
167+
168+
log.Infof("Received updated scheduler hosts addresses: %v", gotAddrs)
169+
170+
return gotAddrs, nil
171+
}
172+
173+
func (w *WatchHosts) connSchedulerHosts(ctx context.Context) (schedulerv1pb.Scheduler_WatchHostsClient, context.CancelFunc, error) {
174+
//nolint:gosec
175+
i := rand.Intn(len(w.allAddrs))
176+
log.Debugf("Attempting to connect to scheduler to WatchHosts: %s", w.allAddrs[i])
177+
178+
// This is connecting to a DNS A rec which will return healthy scheduler
179+
// hosts.
180+
cl, closeCon, err := client.New(ctx, w.allAddrs[i], w.security)
181+
if err != nil {
182+
return nil, nil, fmt.Errorf("scheduler client not initialized for address %s: %s", w.allAddrs[i], err)
183+
}
184+
185+
stream, err := cl.WatchHosts(ctx, new(schedulerv1pb.WatchHostsRequest))
186+
if err != nil {
187+
if status.Code(err) == codes.Unimplemented {
188+
// Ignore unimplemented error code as we are talking to an old server.
189+
// TODO: @joshvanl: remove special case in v1.16.
190+
return nil, nil, nil
191+
}
192+
193+
return nil, nil, fmt.Errorf("failed to watch scheduler hosts: %s", err)
194+
}
195+
196+
return stream, closeCon, nil
197+
}

0 commit comments

Comments
 (0)