Skip to content

Commit 4c3c5f5

Browse files
committed
pkg/loop/internal/core/services/capability: use NewClientConn in place of Dial for reconnect
1 parent ad88ac3 commit 4c3c5f5

File tree

2 files changed

+28
-32
lines changed

2 files changed

+28
-32
lines changed

pkg/loop/internal/core/services/capability/capabilities.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ type TriggerCapabilityClient struct {
2222
*baseCapabilityClient
2323
}
2424

25-
func NewTriggerCapabilityClient(brokerExt *net.BrokerExt, conn *grpc.ClientConn) capabilities.TriggerCapability {
25+
func NewTriggerCapabilityClient(brokerExt *net.BrokerExt, conn grpc.ClientConnInterface) capabilities.TriggerCapability {
2626
return &TriggerCapabilityClient{
2727
triggerExecutableClient: newTriggerExecutableClient(brokerExt, conn),
2828
baseCapabilityClient: newBaseCapabilityClient(brokerExt, conn),
@@ -39,7 +39,7 @@ type ExecutableCapability interface {
3939
capabilities.BaseCapability
4040
}
4141

42-
func NewExecutableCapabilityClient(brokerExt *net.BrokerExt, conn *grpc.ClientConn) ExecutableCapability {
42+
func NewExecutableCapabilityClient(brokerExt *net.BrokerExt, conn grpc.ClientConnInterface) ExecutableCapability {
4343
return &ExecutableCapabilityClient{
4444
executableClient: newExecutableClient(brokerExt, conn),
4545
baseCapabilityClient: newBaseCapabilityClient(brokerExt, conn),
@@ -52,7 +52,7 @@ type CombinedCapabilityClient struct {
5252
*triggerExecutableClient
5353
}
5454

55-
func NewCombinedCapabilityClient(brokerExt *net.BrokerExt, conn *grpc.ClientConn) ExecutableCapability {
55+
func NewCombinedCapabilityClient(brokerExt *net.BrokerExt, conn grpc.ClientConnInterface) ExecutableCapability {
5656
return &CombinedCapabilityClient{
5757
executableClient: newExecutableClient(brokerExt, conn),
5858
baseCapabilityClient: newBaseCapabilityClient(brokerExt, conn),
@@ -141,7 +141,7 @@ type baseCapabilityClient struct {
141141

142142
var _ capabilities.BaseCapability = (*baseCapabilityClient)(nil)
143143

144-
func newBaseCapabilityClient(brokerExt *net.BrokerExt, conn *grpc.ClientConn) *baseCapabilityClient {
144+
func newBaseCapabilityClient(brokerExt *net.BrokerExt, conn grpc.ClientConnInterface) *baseCapabilityClient {
145145
return &baseCapabilityClient{grpc: capabilitiespb.NewBaseCapabilityClient(conn), BrokerExt: brokerExt}
146146
}
147147

@@ -327,6 +327,7 @@ func (t *triggerExecutableClient) registerTrigger(ctx context.Context, req capab
327327
if err != nil {
328328
return nil, cancel, fmt.Errorf("failed to start forwarding messages from stream: %w", err)
329329
}
330+
//TODO remember these to re-run them on init
330331

331332
return ch, cancel, nil
332333
}
@@ -348,7 +349,7 @@ func (t *triggerExecutableClient) UnregisterTrigger(ctx context.Context, req cap
348349
return nil
349350
}
350351

351-
func newTriggerExecutableClient(brokerExt *net.BrokerExt, conn *grpc.ClientConn) *triggerExecutableClient {
352+
func newTriggerExecutableClient(brokerExt *net.BrokerExt, conn grpc.ClientConnInterface) *triggerExecutableClient {
352353
return &triggerExecutableClient{
353354
grpc: capabilitiespb.NewTriggerExecutableClient(conn),
354355
BrokerExt: brokerExt,
@@ -439,7 +440,7 @@ type executableClient struct {
439440
*net.BrokerExt
440441
}
441442

442-
func newExecutableClient(brokerExt *net.BrokerExt, conn *grpc.ClientConn) *executableClient {
443+
func newExecutableClient(brokerExt *net.BrokerExt, conn grpc.ClientConnInterface) *executableClient {
443444
return &executableClient{
444445
grpc: capabilitiespb.NewExecutableClient(conn),
445446
BrokerExt: brokerExt,

pkg/loop/internal/core/services/capability/capabilities_registry.go

Lines changed: 21 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -208,15 +208,13 @@ func (cr *capabilitiesRegistryClient) Get(ctx context.Context, ID string) (capab
208208
Id: ID,
209209
}
210210

211-
res, err := cr.grpc.Get(ctx, req)
212-
if err != nil {
213-
return nil, err
214-
}
215-
216-
conn, err := cr.Dial(res.CapabilityID)
217-
if err != nil {
218-
return nil, net.ErrConnDial{Name: "Capability", ID: res.CapabilityID, Err: err}
219-
}
211+
conn := cr.NewClientConn("Capability", func(ctx context.Context) (id uint32, deps net.Resources, err error) {
212+
res, err := cr.grpc.Get(ctx, req)
213+
if err != nil {
214+
return 0, nil, err
215+
}
216+
return res.CapabilityID, nil, nil
217+
})
220218
client := newBaseCapabilityClient(cr.BrokerExt, conn)
221219
return client, nil
222220
}
@@ -226,15 +224,13 @@ func (cr *capabilitiesRegistryClient) GetTrigger(ctx context.Context, ID string)
226224
Id: ID,
227225
}
228226

229-
res, err := cr.grpc.GetTrigger(ctx, req)
230-
if err != nil {
231-
return nil, err
232-
}
233-
234-
conn, err := cr.Dial(res.CapabilityID)
235-
if err != nil {
236-
return nil, net.ErrConnDial{Name: "GetTrigger", ID: res.CapabilityID, Err: err}
237-
}
227+
conn := cr.NewClientConn("Trigger", func(ctx context.Context) (id uint32, deps net.Resources, err error) {
228+
res, err := cr.grpc.GetTrigger(ctx, req)
229+
if err != nil {
230+
return 0, nil, err
231+
}
232+
return res.CapabilityID, nil, nil
233+
})
238234
client := NewTriggerCapabilityClient(cr.BrokerExt, conn)
239235
return client, nil
240236
}
@@ -244,14 +240,13 @@ func (cr *capabilitiesRegistryClient) GetExecutable(ctx context.Context, ID stri
244240
Id: ID,
245241
}
246242

247-
res, err := cr.grpc.GetExecutable(ctx, req)
248-
if err != nil {
249-
return nil, err
250-
}
251-
conn, err := cr.Dial(res.CapabilityID)
252-
if err != nil {
253-
return nil, net.ErrConnDial{Name: "GetExecutable", ID: res.CapabilityID, Err: err}
254-
}
243+
conn := cr.NewClientConn("Executable", func(ctx context.Context) (id uint32, deps net.Resources, err error) {
244+
res, err := cr.grpc.GetExecutable(ctx, req)
245+
if err != nil {
246+
return 0, nil, err
247+
}
248+
return res.CapabilityID, nil, nil
249+
})
255250
client := NewExecutableCapabilityClient(cr.BrokerExt, conn)
256251
return client, nil
257252
}

0 commit comments

Comments
 (0)