Skip to content

Commit 7b38eb2

Browse files
committed
pkg/loop/internal/core/services/capability: use NewClientConn in place of Dial for reconnect
1 parent 83f7a79 commit 7b38eb2

File tree

3 files changed

+64
-53
lines changed

3 files changed

+64
-53
lines changed

pkg/loop/internal/core/services/capability/capabilities.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ type TriggerCapabilityClient struct {
2222
*baseCapabilityClient
2323
}
2424

25-
func NewTriggerCapabilityClient(brokerExt *net.BrokerExt, conn *grpc.ClientConn) capabilities.TriggerCapability {
25+
func NewTriggerCapabilityClient(brokerExt *net.BrokerExt, conn grpc.ClientConnInterface) capabilities.TriggerCapability {
2626
return &TriggerCapabilityClient{
2727
triggerExecutableClient: newTriggerExecutableClient(brokerExt, conn),
2828
baseCapabilityClient: newBaseCapabilityClient(brokerExt, conn),
@@ -39,7 +39,7 @@ type ExecutableCapability interface {
3939
capabilities.BaseCapability
4040
}
4141

42-
func NewExecutableCapabilityClient(brokerExt *net.BrokerExt, conn *grpc.ClientConn) ExecutableCapability {
42+
func NewExecutableCapabilityClient(brokerExt *net.BrokerExt, conn grpc.ClientConnInterface) ExecutableCapability {
4343
return &ExecutableCapabilityClient{
4444
executableClient: newExecutableClient(brokerExt, conn),
4545
baseCapabilityClient: newBaseCapabilityClient(brokerExt, conn),
@@ -52,7 +52,7 @@ type CombinedCapabilityClient struct {
5252
*triggerExecutableClient
5353
}
5454

55-
func NewCombinedCapabilityClient(brokerExt *net.BrokerExt, conn *grpc.ClientConn) ExecutableCapability {
55+
func NewCombinedCapabilityClient(brokerExt *net.BrokerExt, conn grpc.ClientConnInterface) ExecutableCapability {
5656
return &CombinedCapabilityClient{
5757
executableClient: newExecutableClient(brokerExt, conn),
5858
baseCapabilityClient: newBaseCapabilityClient(brokerExt, conn),
@@ -141,7 +141,7 @@ type baseCapabilityClient struct {
141141

142142
var _ capabilities.BaseCapability = (*baseCapabilityClient)(nil)
143143

144-
func newBaseCapabilityClient(brokerExt *net.BrokerExt, conn *grpc.ClientConn) *baseCapabilityClient {
144+
func newBaseCapabilityClient(brokerExt *net.BrokerExt, conn grpc.ClientConnInterface) *baseCapabilityClient {
145145
return &baseCapabilityClient{grpc: capabilitiespb.NewBaseCapabilityClient(conn), BrokerExt: brokerExt}
146146
}
147147

@@ -348,7 +348,7 @@ func (t *triggerExecutableClient) UnregisterTrigger(ctx context.Context, req cap
348348
return nil
349349
}
350350

351-
func newTriggerExecutableClient(brokerExt *net.BrokerExt, conn *grpc.ClientConn) *triggerExecutableClient {
351+
func newTriggerExecutableClient(brokerExt *net.BrokerExt, conn grpc.ClientConnInterface) *triggerExecutableClient {
352352
return &triggerExecutableClient{
353353
grpc: capabilitiespb.NewTriggerExecutableClient(conn),
354354
BrokerExt: brokerExt,
@@ -439,7 +439,7 @@ type executableClient struct {
439439
*net.BrokerExt
440440
}
441441

442-
func newExecutableClient(brokerExt *net.BrokerExt, conn *grpc.ClientConn) *executableClient {
442+
func newExecutableClient(brokerExt *net.BrokerExt, conn grpc.ClientConnInterface) *executableClient {
443443
return &executableClient{
444444
grpc: capabilitiespb.NewExecutableClient(conn),
445445
BrokerExt: brokerExt,

pkg/loop/internal/core/services/capability/capabilities_registry.go

Lines changed: 34 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package capability
33
import (
44
"context"
55
"fmt"
6+
"time"
67

78
"google.golang.org/grpc"
89
"google.golang.org/protobuf/types/known/durationpb"
@@ -208,52 +209,56 @@ func (cr *capabilitiesRegistryClient) Get(ctx context.Context, ID string) (capab
208209
Id: ID,
209210
}
210211

211-
res, err := cr.grpc.Get(ctx, req)
212-
if err != nil {
213-
return nil, err
214-
}
215-
216-
conn, err := cr.Dial(res.CapabilityID)
217-
if err != nil {
218-
return nil, net.ErrConnDial{Name: "Capability", ID: res.CapabilityID, Err: err}
219-
}
212+
conn := cr.NewClientConn("Capability", func(ctx context.Context) (id uint32, deps net.Resources, err error) {
213+
res, err := cr.grpc.Get(ctx, req)
214+
if err != nil {
215+
return 0, nil, err
216+
}
217+
return res.CapabilityID, nil, nil
218+
})
220219
client := newBaseCapabilityClient(cr.BrokerExt, conn)
221-
return client, nil
220+
ctx, cancel := context.WithTimeout(ctx, time.Second)
221+
defer cancel()
222+
_, err := client.Info(ctx) // ensure exists by triggering lazy connection with reduced timeout
223+
return client, err
222224
}
223225

224226
func (cr *capabilitiesRegistryClient) GetTrigger(ctx context.Context, ID string) (capabilities.TriggerCapability, error) {
225227
req := &pb.GetTriggerRequest{
226228
Id: ID,
227229
}
228230

229-
res, err := cr.grpc.GetTrigger(ctx, req)
230-
if err != nil {
231-
return nil, err
232-
}
233-
234-
conn, err := cr.Dial(res.CapabilityID)
235-
if err != nil {
236-
return nil, net.ErrConnDial{Name: "GetTrigger", ID: res.CapabilityID, Err: err}
237-
}
231+
conn := cr.NewClientConn("Trigger", func(ctx context.Context) (id uint32, deps net.Resources, err error) {
232+
res, err := cr.grpc.GetTrigger(ctx, req)
233+
if err != nil {
234+
return 0, nil, err
235+
}
236+
return res.CapabilityID, nil, nil
237+
})
238238
client := NewTriggerCapabilityClient(cr.BrokerExt, conn)
239-
return client, nil
239+
ctx, cancel := context.WithTimeout(ctx, time.Second)
240+
defer cancel()
241+
_, err := client.Info(ctx) // ensure exists by triggering lazy connection with reduced timeout
242+
return client, err
240243
}
241244

242245
func (cr *capabilitiesRegistryClient) GetExecutable(ctx context.Context, ID string) (capabilities.ExecutableCapability, error) {
243246
req := &pb.GetExecutableRequest{
244247
Id: ID,
245248
}
246249

247-
res, err := cr.grpc.GetExecutable(ctx, req)
248-
if err != nil {
249-
return nil, err
250-
}
251-
conn, err := cr.Dial(res.CapabilityID)
252-
if err != nil {
253-
return nil, net.ErrConnDial{Name: "GetExecutable", ID: res.CapabilityID, Err: err}
254-
}
250+
conn := cr.NewClientConn("Executable", func(ctx context.Context) (id uint32, deps net.Resources, err error) {
251+
res, err := cr.grpc.GetExecutable(ctx, req)
252+
if err != nil {
253+
return 0, nil, err
254+
}
255+
return res.CapabilityID, nil, nil
256+
})
255257
client := NewExecutableCapabilityClient(cr.BrokerExt, conn)
256-
return client, nil
258+
ctx, cancel := context.WithTimeout(ctx, time.Second)
259+
defer cancel()
260+
_, err := client.Info(ctx) // ensure exists by triggering lazy connection with reduced timeout
261+
return client, err
257262
}
258263

259264
func (cr *capabilitiesRegistryClient) List(ctx context.Context) ([]capabilities.BaseCapability, error) {

pkg/loop/internal/net/client.go

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package net
22

33
import (
44
"context"
5+
"fmt"
56
"sync"
67
"sync/atomic"
78
"time"
@@ -53,8 +54,9 @@ func (c *clientConn) Invoke(ctx context.Context, method string, args any, reply
5354
cc := c.cc
5455
c.mu.RUnlock()
5556

57+
var refErr error
5658
if cc == nil {
57-
cc = c.refresh(ctx, nil)
59+
cc, refErr = c.refresh(ctx, nil)
5860
}
5961
for cc != nil {
6062
err := cc.Invoke(ctx, method, args, reply, opts...)
@@ -65,41 +67,42 @@ func (c *clientConn) Invoke(ctx context.Context, method string, args any, reply
6567
return err
6668
}
6769
c.Logger.Errorw("clientConn: Invoke: terminal error, refreshing connection", "method", method, "err", err)
68-
cc = c.refresh(ctx, cc)
70+
cc, refErr = c.refresh(ctx, cc)
6971
continue
7072
}
7173
return err
7274
}
73-
return context.Cause(ctx)
75+
return refErr
7476
}
7577

7678
func (c *clientConn) NewStream(ctx context.Context, desc *grpc.StreamDesc, method string, opts ...grpc.CallOption) (grpc.ClientStream, error) {
7779
c.mu.RLock()
7880
cc := c.cc
7981
c.mu.RUnlock()
8082

83+
var refErr error
8184
if cc == nil {
82-
cc = c.refresh(ctx, nil)
85+
cc, refErr = c.refresh(ctx, nil)
8386
}
8487
for cc != nil {
8588
s, err := cc.NewStream(ctx, desc, method, opts...)
8689
if isErrTerminal(err) {
8790
c.Logger.Errorw("clientConn: NewStream: terminal error, refreshing connection", "err", err)
88-
cc = c.refresh(ctx, cc)
91+
cc, refErr = c.refresh(ctx, cc)
8992
continue
9093
}
9194
return s, err
9295
}
93-
return nil, context.Cause(ctx)
96+
return nil, refErr
9497
}
9598

9699
// refresh replaces c.cc with a new (different from orig) *grpc.ClientConn, and returns it as well.
97100
// It will block until a new connection is successfully dialed, or return nil if the context expires.
98-
func (c *clientConn) refresh(ctx context.Context, orig *grpc.ClientConn) *grpc.ClientConn {
101+
func (c *clientConn) refresh(ctx context.Context, orig *grpc.ClientConn) (*grpc.ClientConn, error) {
99102
c.mu.Lock()
100103
defer c.mu.Unlock()
101104
if c.cc != orig {
102-
return c.cc
105+
return c.cc, nil
103106
}
104107
if c.cc != nil {
105108
if err := c.cc.Close(); err != nil {
@@ -108,13 +111,15 @@ func (c *clientConn) refresh(ctx context.Context, orig *grpc.ClientConn) *grpc.C
108111
c.CloseAll(c.deps...)
109112
}
110113

111-
try := func() bool {
112-
c.Logger.Debug("Client refresh")
114+
try := func() error {
115+
if d, ok := ctx.Deadline(); ok {
116+
c.Logger.Debugw("Client refresh", "deadline", d, "until", time.Until(d))
117+
}
113118
id, deps, err := c.newClient(ctx)
114119
if err != nil {
115120
c.Logger.Errorw("Client refresh attempt failed", "err", err)
116121
c.CloseAll(deps...)
117-
return false
122+
return err
118123
}
119124
c.deps = deps
120125

@@ -126,29 +131,30 @@ func (c *clientConn) refresh(ctx context.Context, orig *grpc.ClientConn) *grpc.C
126131
lggr.Errorw("Client dial failed", "err", ErrConnDial{Name: c.name, ID: id, Err: err})
127132
}
128133
c.CloseAll(c.deps...)
129-
return false
134+
return err
130135
}
131-
return true
136+
return nil
132137
}
133138

134139
b := backoff.Backoff{
135140
Min: 100 * time.Millisecond,
136141
Max: 5 * time.Second,
137142
Factor: 2,
138143
}
139-
for !try() {
144+
for err := try(); err != nil; err = try() {
140145
if ctx.Err() != nil {
141-
c.Logger.Errorw("Client refresh failed: aborting refresh due to context error", "err", ctx.Err())
142-
return nil
146+
err = fmt.Errorf("%w: last error: %w", context.Cause(ctx), err)
147+
c.Logger.Errorw("Client refresh failed: aborting refresh", "err", err)
148+
return nil, err
143149
}
144150
wait := b.Duration()
145151
c.Logger.Infow("Waiting to refresh", "wait", wait)
146152
select {
147153
case <-ctx.Done():
148-
return nil
154+
return nil, fmt.Errorf("%w: last error: %w", context.Cause(ctx), err)
149155
case <-time.After(wait):
150156
}
151157
}
152158

153-
return c.cc
159+
return c.cc, nil
154160
}

0 commit comments

Comments
 (0)