Skip to content

Commit 44c083b

Browse files
committed
Add refresh callback to broker and client and refresh synced accessor contracts
1 parent 4669923 commit 44c083b

File tree

3 files changed

+99
-3
lines changed

3 files changed

+99
-3
lines changed

pkg/loop/internal/net/broker.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,12 +65,19 @@ type BrokerConfig struct {
6565
type BrokerExt struct {
6666
Broker Broker
6767
BrokerConfig
68+
69+
onRefreshComplete func(ctx context.Context) error
70+
hooksMu sync.RWMutex
6871
}
6972

7073
// WithName returns a new [*BrokerExt] with Name added to the logger.
7174
func (b *BrokerExt) WithName(name string) *BrokerExt {
7275
bn := *b
7376
bn.Logger = logger.Named(b.Logger, name)
77+
78+
// Don't share hooks mutex or onRefreshComplete between copies
79+
bn.onRefreshComplete = nil
80+
bn.hooksMu = sync.RWMutex{}
7481
return &bn
7582
}
7683

@@ -147,6 +154,25 @@ func (b *BrokerExt) CloseAll(deps ...Resource) {
147154
}
148155
}
149156

157+
// SetOnRefreshComplete sets a hook to be called after successful connection refresh.
158+
func (b *BrokerExt) SetOnRefreshComplete(hook func(ctx context.Context) error) {
159+
b.hooksMu.Lock()
160+
defer b.hooksMu.Unlock()
161+
b.onRefreshComplete = hook
162+
}
163+
164+
// executeOnRefreshComplete executes the refresh completion hook if it exists.
165+
func (b *BrokerExt) executeOnRefreshComplete(ctx context.Context) error {
166+
b.hooksMu.RLock()
167+
hook := b.onRefreshComplete
168+
b.hooksMu.RUnlock()
169+
170+
if hook != nil {
171+
return hook(ctx)
172+
}
173+
return nil
174+
}
175+
150176
type Resource struct {
151177
io.Closer
152178
Name string

pkg/loop/internal/net/client.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,13 @@ func (c *clientConn) refresh(ctx context.Context, orig *grpc.ClientConn) *grpc.C
128128
c.CloseAll(c.deps...)
129129
return false
130130
}
131+
132+
// Execute refresh completion hook after successful connection but before returning to caller.
133+
if err := c.BrokerExt.executeOnRefreshComplete(ctx); err != nil {
134+
// Don't fail the refresh, but log the error
135+
c.Logger.Errorw("Refresh completion hook failed", "err", err, "conn", c.name)
136+
}
137+
131138
return true
132139
}
133140

pkg/loop/internal/relayer/pluginprovider/ext/ccipocr3/chainaccessor.go

Lines changed: 66 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package ccipocr3
22

33
import (
44
"context"
5+
"fmt"
6+
"sync"
57
"time"
68

79
"google.golang.org/grpc"
@@ -19,13 +21,22 @@ var _ ccipocr3.ChainAccessor = (*chainAccessorClient)(nil)
1921
type chainAccessorClient struct {
2022
*net.BrokerExt
2123
grpc ccipocr3pb.ChainAccessorClient
24+
25+
// Local persistence for refresh functionality
26+
mu sync.RWMutex
27+
syncedContracts map[string]ccipocr3.UnknownAddress // contractName -> contractAddress
2228
}
2329

2430
func NewChainAccessorClient(broker *net.BrokerExt, cc grpc.ClientConnInterface) ccipocr3.ChainAccessor {
25-
return &chainAccessorClient{
26-
BrokerExt: broker,
27-
grpc: ccipocr3pb.NewChainAccessorClient(cc),
31+
client := &chainAccessorClient{
32+
BrokerExt: broker,
33+
grpc: ccipocr3pb.NewChainAccessorClient(cc),
34+
syncedContracts: make(map[string]ccipocr3.UnknownAddress),
2835
}
36+
37+
broker.SetOnRefreshComplete(client.restoreStateOnRefresh)
38+
39+
return client
2940
}
3041

3142
// AllAccessors methods
@@ -82,9 +93,61 @@ func (c *chainAccessorClient) Sync(ctx context.Context, contractName string, con
8293
ContractName: contractName,
8394
ContractAddress: contractAddress,
8495
})
96+
97+
if err == nil {
98+
// Persist the synced contract locally for client refresh
99+
c.mu.Lock()
100+
c.syncedContracts[contractName] = contractAddress
101+
c.mu.Unlock()
102+
c.Logger.Debugw("Persisted synced contract", "contractName", contractName, "contractAddress", contractAddress)
103+
}
104+
85105
return err
86106
}
87107

108+
// restoreStateOnRefresh is called after successful relayer refresh to restore synced contracts.
109+
//
110+
// TODO: right now this only supports re-syncing previously synced contracts. In the future this should support
111+
// re-establishing any arbitrary serializable state.
112+
func (c *chainAccessorClient) restoreStateOnRefresh(ctx context.Context) error {
113+
c.mu.RLock()
114+
contractsToRestore := make(map[string]ccipocr3.UnknownAddress)
115+
for name, addr := range c.syncedContracts {
116+
contractsToRestore[name] = addr
117+
}
118+
c.mu.RUnlock()
119+
120+
if len(contractsToRestore) == 0 {
121+
c.Logger.Debug("No synced contracts to restore")
122+
return nil
123+
}
124+
125+
c.Logger.Infow("Restoring synced contracts after refresh", "count", len(contractsToRestore))
126+
127+
// Re-sync all previously synced contracts
128+
var restoreErrors []error
129+
for contractName, contractAddress := range contractsToRestore {
130+
if err := c.Sync(ctx, contractName, contractAddress); err != nil {
131+
c.Logger.Errorw("Failed to restore synced contract",
132+
"contractName", contractName,
133+
"contractAddress", contractAddress,
134+
"err", err)
135+
restoreErrors = append(restoreErrors, fmt.Errorf("failed to restore contract %s: %w", contractName, err))
136+
} else {
137+
c.Logger.Debugw("Successfully restored synced contract",
138+
"contractName", contractName,
139+
"contractAddress", contractAddress)
140+
}
141+
}
142+
143+
if len(restoreErrors) > 0 {
144+
return fmt.Errorf("failed to restore %d/%d contracts: %v", len(restoreErrors), len(contractsToRestore), restoreErrors)
145+
}
146+
147+
c.Logger.Infow("Successfully restored all synced contracts", "count", len(contractsToRestore))
148+
return nil
149+
}
150+
88151
// DestinationAccessor methods
89152
func (c *chainAccessorClient) CommitReportsGTETimestamp(
90153
ctx context.Context,

0 commit comments

Comments
 (0)