Skip to content

Commit 750b9e9

Browse files
committed
Merge branch 'maintenance/0.9.4' into feature/PLEX-1569-config-ocr3-chain-capability
2 parents d84b8ca + 8817bb8 commit 750b9e9

File tree

5 files changed

+269
-5
lines changed

5 files changed

+269
-5
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ module github.com/smartcontractkit/chainlink-common
33
go 1.24.5
44

55
require (
6+
github.com/Masterminds/semver/v3 v3.4.0
67
github.com/XSAM/otelsql v0.37.0
78
github.com/andybalholm/brotli v1.1.1
89
github.com/atombender/go-jsonschema v0.16.1-0.20240916205339-a74cd4e2851c
@@ -76,7 +77,6 @@ require (
7677
)
7778

7879
require (
79-
github.com/Masterminds/semver/v3 v3.4.0 // indirect
8080
github.com/apache/arrow-go/v18 v18.3.1 // indirect
8181
github.com/aybabtme/rgbterm v0.0.0-20170906152045-cc83f3b3ce59 // indirect
8282
github.com/bahlo/generic-list-go v0.2.0 // indirect

pkg/capabilities/registry/base.go

Lines changed: 53 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,11 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
"strings"
78
"sync"
89

10+
"github.com/Masterminds/semver/v3"
11+
912
"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
1013
"github.com/smartcontractkit/chainlink-common/pkg/logger"
1114
"github.com/smartcontractkit/chainlink-common/pkg/types/core"
@@ -36,11 +39,58 @@ func (r *baseRegistry) Get(_ context.Context, id string) (capabilities.BaseCapab
3639
r.mu.RLock()
3740
defer r.mu.RUnlock()
3841
c, ok := r.m[id]
39-
if !ok {
40-
return nil, fmt.Errorf("capability not found with id %s", id)
42+
if ok {
43+
return c, nil
44+
}
45+
46+
// Find compatible version (>= requested version with same major)
47+
parts := strings.Split(id, "@")
48+
if len(parts) != 2 {
49+
return nil, fmt.Errorf("invalid capability id format: %s", id)
4150
}
51+
name, verStr := parts[0], parts[1]
4252

43-
return c, nil
53+
reqVer, err := semver.NewVersion(verStr)
54+
if err != nil {
55+
return nil, fmt.Errorf("invalid version in capability id %q: %w", id, err)
56+
}
57+
reqIsPrerelease := reqVer.Prerelease() != ""
58+
59+
var bestCap capabilities.BaseCapability
60+
var bestVer *semver.Version
61+
for key, cap := range r.m {
62+
p := strings.Split(key, "@")
63+
if len(p) != 2 {
64+
continue
65+
}
66+
if p[0] != name {
67+
continue
68+
}
69+
v, err := semver.NewVersion(p[1])
70+
if err != nil {
71+
continue
72+
}
73+
if v.Major() != reqVer.Major() {
74+
continue
75+
}
76+
// If the request is stable, skip pre-release candidates
77+
if !reqIsPrerelease && v.Prerelease() != "" {
78+
continue
79+
}
80+
81+
if v.GreaterThan(reqVer) {
82+
if bestVer == nil || v.LessThan(bestVer) {
83+
bestCap = cap
84+
bestVer = v
85+
}
86+
}
87+
}
88+
89+
if bestCap != nil {
90+
r.lggr.Debugw("found compatible capability", "id", name+"@"+bestVer.String())
91+
return bestCap, nil
92+
}
93+
return nil, fmt.Errorf("no compatible capability found for id %s", id)
4494
}
4595

4696
// GetTrigger gets a capability from the registry and tries to coerce it to the TriggerCapability interface.

pkg/capabilities/registry/base_test.go

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,95 @@ func TestRegistry(t *testing.T) {
5959
assert.Equal(t, c, cs[0])
6060
}
6161

62+
func TestRegistryCompatibleVersions(t *testing.T) {
63+
ctx := t.Context()
64+
65+
t.Run("Compatible minor version", func(t *testing.T) {
66+
r := registry.NewBaseRegistry(logger.Test(t))
67+
68+
ci, err := capabilities.NewCapabilityInfo(
69+
id,
70+
capabilities.CapabilityTypeAction,
71+
"capability-1-description",
72+
)
73+
require.NoError(t, err)
74+
75+
c := &mockCapability{CapabilityInfo: ci}
76+
err = r.Add(ctx, c)
77+
require.NoError(t, err)
78+
_, err = r.Get(ctx, "[email protected]")
79+
require.NoError(t, err)
80+
})
81+
82+
t.Run("Incompatible minor version", func(t *testing.T) {
83+
r := registry.NewBaseRegistry(logger.Test(t))
84+
85+
ci, err := capabilities.NewCapabilityInfo(
86+
id,
87+
capabilities.CapabilityTypeAction,
88+
"capability-1-description",
89+
)
90+
require.NoError(t, err)
91+
92+
c := &mockCapability{CapabilityInfo: ci}
93+
err = r.Add(ctx, c)
94+
require.NoError(t, err)
95+
_, err = r.Get(ctx, "[email protected]")
96+
require.Error(t, err)
97+
})
98+
99+
t.Run("Incompatible major version", func(t *testing.T) {
100+
r := registry.NewBaseRegistry(logger.Test(t))
101+
102+
ci, err := capabilities.NewCapabilityInfo(
103+
id,
104+
capabilities.CapabilityTypeAction,
105+
"capability-1-description",
106+
)
107+
require.NoError(t, err)
108+
109+
c := &mockCapability{CapabilityInfo: ci}
110+
err = r.Add(ctx, c)
111+
require.NoError(t, err)
112+
_, err = r.Get(ctx, "[email protected]")
113+
require.Error(t, err)
114+
})
115+
116+
t.Run("Don't match pre-release tags if requested version if not pre-release", func(t *testing.T) {
117+
r := registry.NewBaseRegistry(logger.Test(t))
118+
119+
ci, err := capabilities.NewCapabilityInfo(
120+
id,
121+
capabilities.CapabilityTypeAction,
122+
"capability-1-description",
123+
)
124+
require.NoError(t, err)
125+
126+
c := &mockCapability{CapabilityInfo: ci}
127+
err = r.Add(ctx, c)
128+
require.NoError(t, err)
129+
_, err = r.Get(ctx, "[email protected]")
130+
require.Error(t, err)
131+
})
132+
133+
t.Run("Match pre-release tags if requested version is pre-release", func(t *testing.T) {
134+
r := registry.NewBaseRegistry(logger.Test(t))
135+
136+
ci, err := capabilities.NewCapabilityInfo(
137+
id,
138+
capabilities.CapabilityTypeAction,
139+
"capability-1-description",
140+
)
141+
require.NoError(t, err)
142+
143+
c := &mockCapability{CapabilityInfo: ci}
144+
err = r.Add(ctx, c)
145+
require.NoError(t, err)
146+
_, err = r.Get(ctx, "[email protected]")
147+
require.NoError(t, err)
148+
})
149+
}
150+
62151
func TestRegistry_NoDuplicateIDs(t *testing.T) {
63152
r := registry.NewBaseRegistry(logger.Test(t))
64153
ctx := t.Context()

pkg/loop/ccip_provider_test.go

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
package loop_test
2+
3+
import (
4+
"os/exec"
5+
"testing"
6+
"time"
7+
8+
"github.com/google/uuid"
9+
"github.com/stretchr/testify/assert"
10+
"github.com/stretchr/testify/require"
11+
"go.uber.org/zap/zapcore"
12+
"go.uber.org/zap/zaptest/observer"
13+
14+
"github.com/smartcontractkit/chainlink-common/pkg/logger"
15+
"github.com/smartcontractkit/chainlink-common/pkg/loop"
16+
keystoretest "github.com/smartcontractkit/chainlink-common/pkg/loop/internal/core/services/keystore/test"
17+
"github.com/smartcontractkit/chainlink-common/pkg/loop/internal/goplugin"
18+
ccipocr3client "github.com/smartcontractkit/chainlink-common/pkg/loop/internal/relayer/pluginprovider/ext/ccipocr3"
19+
"github.com/smartcontractkit/chainlink-common/pkg/loop/internal/test"
20+
"github.com/smartcontractkit/chainlink-common/pkg/services/servicetest"
21+
"github.com/smartcontractkit/chainlink-common/pkg/types"
22+
"github.com/smartcontractkit/chainlink-common/pkg/types/ccipocr3"
23+
)
24+
25+
// TestCCIPSyncPersistence tests the persistence of sync requests across relayer restarts. This test is testing
26+
// logic from chainlink-common/pkg/loop/internal/relayer/pluginprovider/ext/ccipocr3/chainaccessor.go but we need
27+
// the full CCIPProvider and Relayer to properly test the persistence across restarts.
28+
func TestCCIPChainAccessorSyncPersistence(t *testing.T) {
29+
t.Parallel()
30+
31+
ctx := t.Context()
32+
33+
// Observed logger for confirming PIDs
34+
lggr, logs := logger.TestObserved(t, zapcore.DebugLevel)
35+
36+
// Relayer service (client side)
37+
relayerService := loop.NewRelayerService(
38+
lggr,
39+
loop.GRPCOpts{},
40+
func() *exec.Cmd {
41+
return NewHelperProcessCommand(loop.PluginRelayerName, false, 0)
42+
},
43+
test.ConfigTOML,
44+
keystoretest.Keystore,
45+
keystoretest.Keystore,
46+
nil,
47+
)
48+
49+
// Kill hook is defined on the relayer client (service) because the client spawns the server child process
50+
hook := relayerService.XXXTestHook()
51+
servicetest.Run(t, relayerService)
52+
53+
// Create CCIPProvider client and issue first Sync() call. This client should persist and reattach
54+
// to the new server after the kill hook is run.
55+
ccipProvider, err := relayerService.NewCCIPProvider(ctx, types.CCIPProviderArgs{
56+
ExternalJobID: uuid.New(),
57+
ContractReaderConfig: []byte("asdf"),
58+
ChainWriterConfig: []byte("asdf"),
59+
OffRampAddress: "0x1234123412341234123412341234123412341234",
60+
PluginType: 0,
61+
})
62+
require.NoError(t, err)
63+
require.NotNil(t, ccipProvider)
64+
65+
firstContractNameToSync := "OnRamp"
66+
firstContractAddressToSync := ccipocr3.UnknownAddress("0x123412341234")
67+
68+
// Perform first Sync() call
69+
err = ccipProvider.ChainAccessor().Sync(ctx, firstContractNameToSync, firstContractAddressToSync)
70+
require.NoError(t, err)
71+
72+
// Confirm first sync call was stored in the c.syncs map
73+
ccipProviderClient, ok := ccipProvider.(*ccipocr3client.CCIPProviderClient)
74+
require.True(t, ok)
75+
firstSyncs := ccipProviderClient.GetSyncRequests()
76+
require.Len(t, firstSyncs, 1, "Should have one sync request in ChainAccessorClient c.syncs")
77+
78+
// Capture initial server side process ID before kill
79+
initialPID := extractLatestPluginPID(logs)
80+
require.NotZero(t, initialPID)
81+
82+
// Kill the server process (RelayerService should auto-restart it)
83+
hook.Kill()
84+
85+
// Give some time for the keep alive to kick in
86+
time.Sleep(2 * goplugin.KeepAliveTickDuration)
87+
88+
// Capture process ID again after restart and verify it's different
89+
restartedPID := extractLatestPluginPID(logs)
90+
require.NotZero(t, restartedPID)
91+
assert.NotEqual(t, initialPID, restartedPID, "Server should have restarted with different process ID")
92+
93+
// Verify new Sync() call still works and now the client map should have two
94+
secondContractNameToSync := "OffRamp"
95+
newContractAddress := ccipocr3.UnknownAddress("0x567856785678")
96+
err = ccipProvider.ChainAccessor().Sync(ctx, secondContractNameToSync, newContractAddress)
97+
require.NoError(t, err)
98+
finalSyncs := ccipProviderClient.GetSyncRequests()
99+
require.Len(t, finalSyncs, 2, "Should have both first and second sync requests in client memory")
100+
101+
// Verify first sync entry persisted through restart
102+
assert.Contains(t, finalSyncs, firstContractNameToSync)
103+
assert.Equal(t, []byte(firstContractAddressToSync), finalSyncs[firstContractNameToSync])
104+
105+
// Verify second sync entry was added
106+
assert.Contains(t, finalSyncs, secondContractNameToSync)
107+
assert.Equal(t, []byte(newContractAddress), finalSyncs[secondContractNameToSync])
108+
}
109+
110+
// extractLatestPluginPID extracts the most recent plugin process ID from the logs using the `plugin started` log
111+
func extractLatestPluginPID(logs *observer.ObservedLogs) int {
112+
var latestPID int
113+
for _, entry := range logs.All() {
114+
if entry.Message == "plugin started" {
115+
for _, field := range entry.Context {
116+
if field.Key == "pid" {
117+
latestPID = int(field.Integer)
118+
}
119+
}
120+
}
121+
}
122+
123+
return latestPID
124+
}

pkg/loop/internal/relayer/pluginprovider/ext/ccipocr3/chainaccessor.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ func NewChainAccessorClient(broker *net.BrokerExt, cc grpc.ClientConnInterface)
3232
return &ChainAccessorClient{
3333
BrokerExt: broker,
3434
grpc: ccipocr3pb.NewChainAccessorClient(cc),
35+
syncs: make(map[string]ccipocr3.UnknownAddress),
3536
}
3637
}
3738

@@ -95,7 +96,7 @@ func (c *ChainAccessorClient) Sync(ctx context.Context, contractName string, con
9596
_, err := c.grpc.Sync(ctx, req)
9697

9798
// If grpc call succeeded, store the most recent address for this given contract address.
98-
if err != nil {
99+
if err == nil {
99100
c.mu.Lock()
100101
c.syncs[contractName] = contractAddress
101102
c.mu.Unlock()

0 commit comments

Comments
 (0)