Skip to content

Commit 95c48cb

Browse files
committed
pkg/capabilities: support replacing registered capabilities after shutdown
1 parent d611017 commit 95c48cb

File tree

5 files changed

+245
-29
lines changed

5 files changed

+245
-29
lines changed

pkg/capabilities/capabilities.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"time"
1010

1111
p2ptypes "github.com/smartcontractkit/libocr/ragep2p/types"
12+
"google.golang.org/grpc/connectivity"
1213
"google.golang.org/protobuf/proto"
1314
"google.golang.org/protobuf/types/known/anypb"
1415

@@ -209,6 +210,7 @@ type Validatable interface {
209210
// or extension in the future.
210211
type BaseCapability interface {
211212
Info(ctx context.Context) (CapabilityInfo, error)
213+
GetState() connectivity.State
212214
}
213215

214216
type TriggerRegistrationRequest struct {
@@ -397,6 +399,11 @@ type CapabilityInfo struct {
397399
SpendTypes []CapabilitySpendType
398400
}
399401

402+
// GetState is included to implement BaseCapability.
403+
func (c CapabilityInfo) GetState() connectivity.State {
404+
return connectivity.Idle
405+
}
406+
400407
// Parse out the version from the ID.
401408
func (c CapabilityInfo) Version() string {
402409
return c.ID[strings.Index(c.ID, "@")+1:]

pkg/capabilities/registry/base.go

Lines changed: 220 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,10 @@ import (
66
"fmt"
77
"strings"
88
"sync"
9+
"sync/atomic"
910

1011
"github.com/Masterminds/semver/v3"
12+
"google.golang.org/grpc/connectivity"
1113

1214
"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
1315
"github.com/smartcontractkit/chainlink-common/pkg/logger"
@@ -18,8 +20,13 @@ var (
1820
ErrCapabilityAlreadyExists = errors.New("capability already exists")
1921
)
2022

23+
type atomicBaseCapability interface {
24+
capabilities.BaseCapability
25+
Update(capabilities.BaseCapability) error
26+
}
27+
2128
type baseRegistry struct {
22-
m map[string]capabilities.BaseCapability
29+
m map[string]atomicBaseCapability
2330
lggr logger.Logger
2431
mu sync.RWMutex
2532
}
@@ -28,7 +35,7 @@ var _ core.CapabilitiesRegistryBase = (*baseRegistry)(nil)
2835

2936
func NewBaseRegistry(lggr logger.Logger) core.CapabilitiesRegistryBase {
3037
return &baseRegistry{
31-
m: map[string]capabilities.BaseCapability{},
38+
m: map[string]atomicBaseCapability{},
3239
lggr: logger.Named(lggr, "registries.basic"),
3340
}
3441
}
@@ -142,46 +149,231 @@ func (r *baseRegistry) Add(ctx context.Context, c capabilities.BaseCapability) e
142149
return err
143150
}
144151

145-
switch info.CapabilityType {
146-
case capabilities.CapabilityTypeTrigger:
147-
_, ok := c.(capabilities.TriggerCapability)
148-
if !ok {
149-
return errors.New("trigger capability does not satisfy TriggerCapability interface")
152+
id := info.ID
153+
bc, ok := r.m[id]
154+
if ok {
155+
if bc.GetState() != connectivity.Shutdown {
156+
return fmt.Errorf("%w: id %s found in registry", ErrCapabilityAlreadyExists, id)
150157
}
151-
case capabilities.CapabilityTypeAction, capabilities.CapabilityTypeConsensus, capabilities.CapabilityTypeTarget:
152-
_, ok := c.(capabilities.ExecutableCapability)
153-
if !ok {
154-
return errors.New("action does not satisfy ExecutableCapability interface")
158+
if err := bc.Update(c); err != nil {
159+
return fmt.Errorf("failed to update capability %s: %w", id, err)
155160
}
156-
case capabilities.CapabilityTypeCombined:
157-
_, ok := c.(capabilities.ExecutableAndTriggerCapability)
158-
if !ok {
159-
return errors.New("target capability does not satisfy ExecutableAndTriggerCapability interface")
161+
} else {
162+
var ac atomicBaseCapability
163+
switch info.CapabilityType {
164+
case capabilities.CapabilityTypeTrigger:
165+
ac = &atomicTriggerCapability{}
166+
case capabilities.CapabilityTypeAction, capabilities.CapabilityTypeConsensus, capabilities.CapabilityTypeTarget:
167+
ac = &atomicExecuteCapability{}
168+
case capabilities.CapabilityTypeCombined:
169+
ac = &atomicExecuteAndTriggerCapability{}
170+
default:
171+
return fmt.Errorf("unknown capability type: %s", info.CapabilityType)
160172
}
161-
default:
162-
return fmt.Errorf("unknown capability type: %s", info.CapabilityType)
163-
}
164-
165-
id := info.ID
166-
_, ok := r.m[id]
167-
if ok {
168-
return fmt.Errorf("%w: id %s found in registry", ErrCapabilityAlreadyExists, id)
173+
if err := ac.Update(c); err != nil {
174+
return err
175+
}
176+
r.m[id] = ac
169177
}
170-
171-
r.m[id] = c
172178
r.lggr.Infow("capability added", "id", id, "type", info.CapabilityType, "description", info.Description, "version", info.Version())
173179
return nil
174180
}
175181

176182
func (r *baseRegistry) Remove(_ context.Context, id string) error {
177183
r.mu.Lock()
178184
defer r.mu.Unlock()
179-
_, ok := r.m[id]
185+
ac, ok := r.m[id]
180186
if !ok {
181187
return fmt.Errorf("unable to remove, capability not found: %s", id)
182188
}
183-
184-
delete(r.m, id)
189+
if err := ac.Update(nil); err != nil {
190+
return fmt.Errorf("failed to remove capability %s: %w", id, err)
191+
}
185192
r.lggr.Infow("capability removed", "id", id)
186193
return nil
187194
}
195+
196+
var _ capabilities.TriggerCapability = &atomicTriggerCapability{}
197+
198+
type atomicTriggerCapability struct {
199+
atomic.Pointer[capabilities.TriggerCapability]
200+
}
201+
202+
func (a *atomicTriggerCapability) Update(c capabilities.BaseCapability) error {
203+
if c == nil {
204+
a.Store(nil)
205+
return nil
206+
}
207+
tc, ok := c.(capabilities.TriggerCapability)
208+
if !ok {
209+
return errors.New("trigger capability does not satisfy TriggerCapability interface")
210+
}
211+
a.Store(&tc)
212+
return nil
213+
}
214+
215+
func (a *atomicTriggerCapability) Info(ctx context.Context) (capabilities.CapabilityInfo, error) {
216+
c := a.Load()
217+
if c == nil {
218+
return capabilities.CapabilityInfo{}, errors.New("capability unavailable")
219+
}
220+
return (*c).Info(ctx)
221+
}
222+
223+
func (a *atomicTriggerCapability) GetState() connectivity.State {
224+
c := a.Load()
225+
if c == nil {
226+
return connectivity.Shutdown
227+
}
228+
return (*c).GetState()
229+
}
230+
231+
func (a *atomicTriggerCapability) RegisterTrigger(ctx context.Context, request capabilities.TriggerRegistrationRequest) (<-chan capabilities.TriggerResponse, error) {
232+
c := a.Load()
233+
if c == nil {
234+
return nil, errors.New("capability unavailable")
235+
}
236+
return (*c).RegisterTrigger(ctx, request)
237+
}
238+
239+
func (a *atomicTriggerCapability) UnregisterTrigger(ctx context.Context, request capabilities.TriggerRegistrationRequest) error {
240+
c := a.Load()
241+
if c == nil {
242+
return errors.New("capability unavailable")
243+
}
244+
return (*c).UnregisterTrigger(ctx, request)
245+
}
246+
247+
var _ capabilities.ExecutableCapability = &atomicExecuteCapability{}
248+
249+
type atomicExecuteCapability struct {
250+
atomic.Pointer[capabilities.ExecutableCapability]
251+
}
252+
253+
func (a *atomicExecuteCapability) Update(c capabilities.BaseCapability) error {
254+
if c == nil {
255+
a.Store(nil)
256+
return nil
257+
}
258+
tc, ok := c.(capabilities.ExecutableCapability)
259+
if !ok {
260+
return errors.New("action does not satisfy ExecutableCapability interface")
261+
}
262+
a.Store(&tc)
263+
return nil
264+
}
265+
266+
func (a *atomicExecuteCapability) Info(ctx context.Context) (capabilities.CapabilityInfo, error) {
267+
c := a.Load()
268+
if c == nil {
269+
return capabilities.CapabilityInfo{}, errors.New("capability unavailable")
270+
}
271+
return (*c).Info(ctx)
272+
}
273+
274+
func (a *atomicExecuteCapability) GetState() connectivity.State {
275+
c := a.Load()
276+
if c == nil {
277+
return connectivity.Shutdown
278+
}
279+
return (*c).GetState()
280+
}
281+
282+
func (a *atomicExecuteCapability) RegisterToWorkflow(ctx context.Context, request capabilities.RegisterToWorkflowRequest) error {
283+
c := a.Load()
284+
if c == nil {
285+
return errors.New("capability unavailable")
286+
}
287+
return (*c).RegisterToWorkflow(ctx, request)
288+
}
289+
290+
func (a *atomicExecuteCapability) UnregisterFromWorkflow(ctx context.Context, request capabilities.UnregisterFromWorkflowRequest) error {
291+
c := a.Load()
292+
if c == nil {
293+
return errors.New("capability unavailable")
294+
}
295+
return (*c).UnregisterFromWorkflow(ctx, request)
296+
}
297+
298+
func (a *atomicExecuteCapability) Execute(ctx context.Context, request capabilities.CapabilityRequest) (capabilities.CapabilityResponse, error) {
299+
c := a.Load()
300+
if c == nil {
301+
return capabilities.CapabilityResponse{}, errors.New("capability unavailable")
302+
}
303+
return (*c).Execute(ctx, request)
304+
}
305+
306+
var _ capabilities.ExecutableAndTriggerCapability = &atomicExecuteAndTriggerCapability{}
307+
308+
type atomicExecuteAndTriggerCapability struct {
309+
atomic.Pointer[capabilities.ExecutableAndTriggerCapability]
310+
}
311+
312+
func (a *atomicExecuteAndTriggerCapability) Update(c capabilities.BaseCapability) error {
313+
if c == nil {
314+
a.Store(nil)
315+
return nil
316+
}
317+
tc, ok := c.(capabilities.ExecutableAndTriggerCapability)
318+
if !ok {
319+
return errors.New("target capability does not satisfy ExecutableAndTriggerCapability interface")
320+
}
321+
a.Store(&tc)
322+
return nil
323+
}
324+
325+
func (a *atomicExecuteAndTriggerCapability) Info(ctx context.Context) (capabilities.CapabilityInfo, error) {
326+
c := a.Load()
327+
if c == nil {
328+
return capabilities.CapabilityInfo{}, errors.New("capability unavailable")
329+
}
330+
return (*c).Info(ctx)
331+
}
332+
333+
func (a *atomicExecuteAndTriggerCapability) GetState() connectivity.State {
334+
c := a.Load()
335+
if c == nil {
336+
return connectivity.Shutdown
337+
}
338+
return (*c).GetState()
339+
}
340+
341+
func (a *atomicExecuteAndTriggerCapability) RegisterTrigger(ctx context.Context, request capabilities.TriggerRegistrationRequest) (<-chan capabilities.TriggerResponse, error) {
342+
c := a.Load()
343+
if c == nil {
344+
return nil, errors.New("capability unavailable")
345+
}
346+
return (*c).RegisterTrigger(ctx, request)
347+
}
348+
349+
func (a *atomicExecuteAndTriggerCapability) UnregisterTrigger(ctx context.Context, request capabilities.TriggerRegistrationRequest) error {
350+
c := a.Load()
351+
if c == nil {
352+
return errors.New("capability unavailable")
353+
}
354+
return (*c).UnregisterTrigger(ctx, request)
355+
}
356+
357+
func (a *atomicExecuteAndTriggerCapability) RegisterToWorkflow(ctx context.Context, request capabilities.RegisterToWorkflowRequest) error {
358+
c := a.Load()
359+
if c == nil {
360+
return errors.New("capability unavailable")
361+
}
362+
return (*c).RegisterToWorkflow(ctx, request)
363+
}
364+
365+
func (a *atomicExecuteAndTriggerCapability) UnregisterFromWorkflow(ctx context.Context, request capabilities.UnregisterFromWorkflowRequest) error {
366+
c := a.Load()
367+
if c == nil {
368+
return errors.New("capability unavailable")
369+
}
370+
return (*c).UnregisterFromWorkflow(ctx, request)
371+
}
372+
373+
func (a *atomicExecuteAndTriggerCapability) Execute(ctx context.Context, request capabilities.CapabilityRequest) (capabilities.CapabilityResponse, error) {
374+
c := a.Load()
375+
if c == nil {
376+
return capabilities.CapabilityResponse{}, errors.New("capability unavailable")
377+
}
378+
return (*c).Execute(ctx, request)
379+
}

pkg/loop/internal/core/services/capability/capabilities.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"sync"
99

1010
"google.golang.org/grpc"
11+
"google.golang.org/grpc/connectivity"
1112
"google.golang.org/protobuf/types/known/emptypb"
1213

1314
"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
@@ -135,14 +136,19 @@ func InfoToReply(info capabilities.CapabilityInfo) *capabilitiespb.CapabilityInf
135136
}
136137

137138
type baseCapabilityClient struct {
139+
c *grpc.ClientConn
138140
grpc capabilitiespb.BaseCapabilityClient
139141
*net.BrokerExt
140142
}
141143

142144
var _ capabilities.BaseCapability = (*baseCapabilityClient)(nil)
143145

144146
func newBaseCapabilityClient(brokerExt *net.BrokerExt, conn *grpc.ClientConn) *baseCapabilityClient {
145-
return &baseCapabilityClient{grpc: capabilitiespb.NewBaseCapabilityClient(conn), BrokerExt: brokerExt}
147+
return &baseCapabilityClient{c: conn, grpc: capabilitiespb.NewBaseCapabilityClient(conn), BrokerExt: brokerExt}
148+
}
149+
150+
func (c *baseCapabilityClient) GetState() connectivity.State {
151+
return c.c.GetState()
146152
}
147153

148154
func (c *baseCapabilityClient) Info(ctx context.Context) (capabilities.CapabilityInfo, error) {

pkg/loop/internal/core/services/capability/capabilities_registry_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/stretchr/testify/mock"
1212
"github.com/stretchr/testify/require"
1313
"google.golang.org/grpc"
14+
"google.golang.org/grpc/connectivity"
1415

1516
p2ptypes "github.com/smartcontractkit/libocr/ragep2p/types"
1617

@@ -29,6 +30,10 @@ type mockBaseCapability struct {
2930
info capabilities.CapabilityInfo
3031
}
3132

33+
func (f *mockBaseCapability) GetState() connectivity.State {
34+
return connectivity.Idle
35+
}
36+
3237
func (f *mockBaseCapability) Info(ctx context.Context) (capabilities.CapabilityInfo, error) {
3338
return f.info, nil
3439
}

pkg/loop/internal/test/test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package test
33
import (
44
"context"
55

6+
"google.golang.org/grpc/connectivity"
7+
68
"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
79
"github.com/smartcontractkit/chainlink-common/pkg/services"
810

@@ -33,6 +35,10 @@ var _ capabilities.BaseCapability = (*baseCapability)(nil)
3335
type baseCapability struct {
3436
}
3537

38+
func (e baseCapability) GetState() connectivity.State {
39+
return connectivity.Idle
40+
}
41+
3642
func (e baseCapability) Info(ctx context.Context) (capabilities.CapabilityInfo, error) {
3743
return CapabilityInfo, nil
3844
}

0 commit comments

Comments
 (0)