Skip to content

Commit 41414f3

Browse files
authored
Refactor RPC client to use interface (#94)
### TL;DR Refactored RPC client interface and updated related components for improved modularity and testability. ### What changed? - Introduced `IRPCClient` interface in `rpc.go` to abstract RPC client functionality. - Updated `Client` struct to implement `IRPCClient` interface. - Modified `Initialize()` function to return `IRPCClient` instead of `*Client`. - Refactored `Orchestrator`, `ChainTracker`, `Committer`, `FailureRecoverer`, `Poller`, `ReorgHandler`, and `Worker` to use `IRPCClient` interface. - Moved `GetLatestBlockNumber()` method from `Poller` to `Client`. - Updated method calls and field accesses throughout the codebase to use the new interface. ### How to test? 1. Ensure indexing works after the refactoring. ### Why make this change? This refactoring improves the codebase by: 1. Enhancing modularity and separation of concerns. 2. Facilitating easier testing by allowing mock implementations of `IRPCClient`. 3. Improving code maintainability and readability. 4. Providing a clearer contract for RPC client functionality. 5. Enabling easier future extensions or modifications to the RPC client implementation.
2 parents f541e01 + 35bdac6 commit 41414f3

File tree

9 files changed

+92
-70
lines changed

9 files changed

+92
-70
lines changed

cmd/orchestrator.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ func RunOrchestrator(cmd *cobra.Command, args []string) {
2828
log.Fatal().Err(err).Msg("Failed to initialize RPC")
2929
}
3030

31-
orchestrator, err := orchestrator.NewOrchestrator(*rpc)
31+
orchestrator, err := orchestrator.NewOrchestrator(rpc)
3232
if err != nil {
3333
log.Fatal().Err(err).Msg("Failed to create orchestrator")
3434
}
Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package orchestrator
22

33
import (
4-
"context"
54
"time"
65

76
"github.com/rs/zerolog/log"
@@ -12,11 +11,11 @@ import (
1211
const DEFAULT_CHAIN_TRACKER_POLL_INTERVAL = 300000 // 5 minutes
1312

1413
type ChainTracker struct {
15-
rpc rpc.Client
14+
rpc rpc.IRPCClient
1615
triggerIntervalMs int
1716
}
1817

19-
func NewChainTracker(rpc rpc.Client) *ChainTracker {
18+
func NewChainTracker(rpc rpc.IRPCClient) *ChainTracker {
2019
return &ChainTracker{
2120
rpc: rpc,
2221
triggerIntervalMs: DEFAULT_CHAIN_TRACKER_POLL_INTERVAL,
@@ -30,23 +29,16 @@ func (ct *ChainTracker) Start() {
3029
log.Debug().Msgf("Chain tracker running")
3130
go func() {
3231
for range ticker.C {
33-
latestBlockNumber, err := ct.getLatestBlockNumber()
32+
latestBlockNumber, err := ct.rpc.GetLatestBlockNumber()
3433
if err != nil {
3534
log.Error().Err(err).Msg("Error getting latest block number")
3635
continue
3736
}
38-
metrics.ChainHead.Set(float64(latestBlockNumber) / 100)
37+
latestBlockNumberFloat, _ := latestBlockNumber.Float64()
38+
metrics.ChainHead.Set(latestBlockNumberFloat)
3939
}
4040
}()
4141

4242
// Keep the program running (otherwise it will exit)
4343
select {}
4444
}
45-
46-
func (ct *ChainTracker) getLatestBlockNumber() (uint64, error) {
47-
blockNumber, err := ct.rpc.EthClient.BlockNumber(context.Background())
48-
if err != nil {
49-
return 0, err
50-
}
51-
return blockNumber, nil
52-
}

internal/orchestrator/committer.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,10 @@ type Committer struct {
2222
blocksPerCommit int
2323
storage storage.IStorage
2424
pollFromBlock *big.Int
25-
rpc rpc.Client
25+
rpc rpc.IRPCClient
2626
}
2727

28-
func NewCommitter(rpc rpc.Client, storage storage.IStorage) *Committer {
28+
func NewCommitter(rpc rpc.IRPCClient, storage storage.IStorage) *Committer {
2929
triggerInterval := config.Cfg.Committer.Interval
3030
if triggerInterval == 0 {
3131
triggerInterval = DEFAULT_COMMITTER_TRIGGER_INTERVAL
@@ -71,7 +71,7 @@ func (c *Committer) Start() {
7171
}
7272

7373
func (c *Committer) getBlockNumbersToCommit() ([]*big.Int, error) {
74-
latestCommittedBlockNumber, err := c.storage.MainStorage.GetMaxBlockNumber(c.rpc.ChainID)
74+
latestCommittedBlockNumber, err := c.storage.MainStorage.GetMaxBlockNumber(c.rpc.GetChainID())
7575
log.Info().Msgf("Committer found this max block number in main storage: %s", latestCommittedBlockNumber.String())
7676
if err != nil {
7777
return nil, err
@@ -103,7 +103,7 @@ func (c *Committer) getSequentialBlockDataToCommit() (*[]common.BlockData, error
103103
return nil, nil
104104
}
105105

106-
blocksData, err := c.storage.StagingStorage.GetStagingData(storage.QueryFilter{BlockNumbers: blocksToCommit, ChainId: c.rpc.ChainID})
106+
blocksData, err := c.storage.StagingStorage.GetStagingData(storage.QueryFilter{BlockNumbers: blocksToCommit, ChainId: c.rpc.GetChainID()})
107107
if err != nil {
108108
return nil, fmt.Errorf("error fetching blocks to commit: %v", err)
109109
}
@@ -180,7 +180,7 @@ func (c *Committer) handleGap(expectedStartBlockNumber *big.Int, actualFirstBloc
180180
}
181181
log.Debug().Msgf("Detected %d missing blocks between blocks %s and %s", missingBlockCount, expectedStartBlockNumber.String(), actualFirstBlock.Number.String())
182182

183-
existingBlockFailures, err := c.storage.OrchestratorStorage.GetBlockFailures(storage.QueryFilter{BlockNumbers: missingBlockNumbers, ChainId: c.rpc.ChainID})
183+
existingBlockFailures, err := c.storage.OrchestratorStorage.GetBlockFailures(storage.QueryFilter{BlockNumbers: missingBlockNumbers, ChainId: c.rpc.GetChainID()})
184184
if err != nil {
185185
return fmt.Errorf("error getting block failures while handling gap: %v", err)
186186
}
@@ -197,7 +197,7 @@ func (c *Committer) handleGap(expectedStartBlockNumber *big.Int, actualFirstBloc
197197
if _, ok := existingBlockFailuresMap[blockNumberStr]; !ok {
198198
blockFailures = append(blockFailures, common.BlockFailure{
199199
BlockNumber: blockNumber,
200-
ChainId: c.rpc.ChainID,
200+
ChainId: c.rpc.GetChainID(),
201201
FailureTime: time.Now(),
202202
FailureCount: 1,
203203
FailureReason: "Gap detected for this block",

internal/orchestrator/failure_recoverer.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,10 @@ type FailureRecoverer struct {
2121
failuresPerPoll int
2222
triggerIntervalMs int
2323
storage storage.IStorage
24-
rpc rpc.Client
24+
rpc rpc.IRPCClient
2525
}
2626

27-
func NewFailureRecoverer(rpc rpc.Client, storage storage.IStorage) *FailureRecoverer {
27+
func NewFailureRecoverer(rpc rpc.IRPCClient, storage storage.IStorage) *FailureRecoverer {
2828
failuresPerPoll := config.Cfg.FailureRecoverer.BlocksPerRun
2929
if failuresPerPoll == 0 {
3030
failuresPerPoll = DEFAULT_FAILURES_PER_POLL
@@ -49,7 +49,7 @@ func (fr *FailureRecoverer) Start() {
4949
go func() {
5050
for range ticker.C {
5151
blockFailures, err := fr.storage.OrchestratorStorage.GetBlockFailures(storage.QueryFilter{
52-
ChainId: fr.rpc.ChainID,
52+
ChainId: fr.rpc.GetChainID(),
5353
Limit: fr.failuresPerPoll,
5454
})
5555
if err != nil {
@@ -101,7 +101,7 @@ func (fr *FailureRecoverer) handleWorkerResults(blockFailures []common.BlockFail
101101
BlockNumber: result.BlockNumber,
102102
FailureReason: result.Error.Error(),
103103
FailureTime: time.Now(),
104-
ChainId: fr.rpc.ChainID,
104+
ChainId: fr.rpc.GetChainID(),
105105
FailureCount: failureCount,
106106
})
107107
} else {

internal/orchestrator/orchestrator.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,15 @@ import (
99
)
1010

1111
type Orchestrator struct {
12-
rpc rpc.Client
12+
rpc rpc.IRPCClient
1313
storage storage.IStorage
1414
pollerEnabled bool
1515
failureRecovererEnabled bool
1616
committerEnabled bool
1717
reorgHandlerEnabled bool
1818
}
1919

20-
func NewOrchestrator(rpc rpc.Client) (*Orchestrator, error) {
20+
func NewOrchestrator(rpc rpc.IRPCClient) (*Orchestrator, error) {
2121
storage, err := storage.NewStorageConnector(&config.Cfg.Storage)
2222
if err != nil {
2323
return nil, err

internal/orchestrator/poller.go

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package orchestrator
22

33
import (
4-
"context"
54
"fmt"
65
"math/big"
76
"sync"
@@ -20,7 +19,7 @@ const DEFAULT_BLOCKS_PER_POLL = 10
2019
const DEFAULT_TRIGGER_INTERVAL = 1000
2120

2221
type Poller struct {
23-
rpc rpc.Client
22+
rpc rpc.IRPCClient
2423
blocksPerPoll int64
2524
triggerIntervalMs int64
2625
storage storage.IStorage
@@ -33,7 +32,7 @@ type BlockNumberWithError struct {
3332
Error error
3433
}
3534

36-
func NewPoller(rpc rpc.Client, storage storage.IStorage) *Poller {
35+
func NewPoller(rpc rpc.IRPCClient, storage storage.IStorage) *Poller {
3736
blocksPerPoll := config.Cfg.Poller.BlocksPerPoll
3837
if blocksPerPoll == 0 {
3938
blocksPerPoll = DEFAULT_BLOCKS_PER_POLL
@@ -44,7 +43,7 @@ func NewPoller(rpc rpc.Client, storage storage.IStorage) *Poller {
4443
}
4544
untilBlock := big.NewInt(int64(config.Cfg.Poller.UntilBlock))
4645
pollFromBlock := big.NewInt(int64(config.Cfg.Poller.FromBlock))
47-
lastPolledBlock, err := storage.StagingStorage.GetLastStagedBlockNumber(rpc.ChainID, untilBlock)
46+
lastPolledBlock, err := storage.StagingStorage.GetLastStagedBlockNumber(rpc.GetChainID(), untilBlock)
4847
if err != nil || lastPolledBlock == nil || lastPolledBlock.Sign() <= 0 {
4948
lastPolledBlock = new(big.Int).Sub(pollFromBlock, big.NewInt(1)) // needs to include the first block
5049
log.Warn().Err(err).Msgf("No last polled block found, setting to %s", lastPolledBlock.String())
@@ -124,7 +123,7 @@ func (p *Poller) reachedPollLimit(blockNumber *big.Int) bool {
124123
}
125124

126125
func (p *Poller) getBlockRange() ([]*big.Int, error) {
127-
latestBlock, err := p.getLatestBlockNumber()
126+
latestBlock, err := p.rpc.GetLatestBlockNumber()
128127
if err != nil {
129128
return nil, err
130129
}
@@ -150,14 +149,6 @@ func (p *Poller) getBlockRange() ([]*big.Int, error) {
150149
return blockNumbers, nil
151150
}
152151

153-
func (p *Poller) getLatestBlockNumber() (*big.Int, error) {
154-
latestBlockUint64, err := p.rpc.EthClient.BlockNumber(context.Background())
155-
if err != nil {
156-
return nil, fmt.Errorf("failed to get latest block number: %v", err)
157-
}
158-
return new(big.Int).SetUint64(latestBlockUint64), nil
159-
}
160-
161152
func (p *Poller) getEndBlockForRange(startBlock *big.Int, latestBlock *big.Int) *big.Int {
162153
endBlock := new(big.Int).Add(startBlock, big.NewInt(p.blocksPerPoll-1))
163154
if endBlock.Cmp(latestBlock) > 0 {
@@ -217,7 +208,7 @@ func (p *Poller) handleBlockFailures(results []rpc.GetFullBlockResult) {
217208
BlockNumber: result.BlockNumber,
218209
FailureReason: result.Error.Error(),
219210
FailureTime: time.Now(),
220-
ChainId: p.rpc.ChainID,
211+
ChainId: p.rpc.GetChainID(),
221212
FailureCount: 1,
222213
})
223214
}

internal/orchestrator/reorg_handler.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import (
1515
)
1616

1717
type ReorgHandler struct {
18-
rpc rpc.Client
18+
rpc rpc.IRPCClient
1919
storage storage.IStorage
2020
triggerInterval int
2121
blocksPerScan int
@@ -26,7 +26,7 @@ type ReorgHandler struct {
2626
const DEFAULT_REORG_HANDLER_INTERVAL = 1000
2727
const DEFAULT_REORG_HANDLER_BLOCKS_PER_SCAN = 100
2828

29-
func NewReorgHandler(rpc rpc.Client, storage storage.IStorage) *ReorgHandler {
29+
func NewReorgHandler(rpc rpc.IRPCClient, storage storage.IStorage) *ReorgHandler {
3030
triggerInterval := config.Cfg.ReorgHandler.Interval
3131
if triggerInterval == 0 {
3232
triggerInterval = DEFAULT_REORG_HANDLER_INTERVAL
@@ -41,7 +41,7 @@ func NewReorgHandler(rpc rpc.Client, storage storage.IStorage) *ReorgHandler {
4141
worker: worker.NewWorker(rpc),
4242
triggerInterval: triggerInterval,
4343
blocksPerScan: blocksPerScan,
44-
lastCheckedBlock: getInitialCheckedBlockNumber(storage, rpc.ChainID),
44+
lastCheckedBlock: getInitialCheckedBlockNumber(storage, rpc.GetChainID()),
4545
}
4646
}
4747

@@ -72,7 +72,7 @@ func (rh *ReorgHandler) Start() {
7272
go func() {
7373
for range ticker.C {
7474
lookbackFrom := new(big.Int).Add(rh.lastCheckedBlock, big.NewInt(int64(rh.blocksPerScan)))
75-
blockHeaders, err := rh.storage.MainStorage.LookbackBlockHeaders(rh.rpc.ChainID, rh.blocksPerScan, lookbackFrom)
75+
blockHeaders, err := rh.storage.MainStorage.LookbackBlockHeaders(rh.rpc.GetChainID(), rh.blocksPerScan, lookbackFrom)
7676
if err != nil {
7777
log.Error().Err(err).Msg("Error getting recent block headers")
7878
continue
@@ -85,7 +85,7 @@ func (rh *ReorgHandler) Start() {
8585
reorgEndIndex := findReorgEndIndex(blockHeaders)
8686
if reorgEndIndex == -1 {
8787
rh.lastCheckedBlock = mostRecentBlockHeader.Number
88-
rh.storage.OrchestratorStorage.SetLastReorgCheckedBlockNumber(rh.rpc.ChainID, mostRecentBlockHeader.Number)
88+
rh.storage.OrchestratorStorage.SetLastReorgCheckedBlockNumber(rh.rpc.GetChainID(), mostRecentBlockHeader.Number)
8989
metrics.ReorgHandlerLastCheckedBlock.Set(float64(mostRecentBlockHeader.Number.Int64()))
9090
continue
9191
}
@@ -101,7 +101,7 @@ func (rh *ReorgHandler) Start() {
101101
continue
102102
}
103103
rh.lastCheckedBlock = mostRecentBlockHeader.Number
104-
rh.storage.OrchestratorStorage.SetLastReorgCheckedBlockNumber(rh.rpc.ChainID, mostRecentBlockHeader.Number)
104+
rh.storage.OrchestratorStorage.SetLastReorgCheckedBlockNumber(rh.rpc.GetChainID(), mostRecentBlockHeader.Number)
105105
metrics.ReorgHandlerLastCheckedBlock.Set(float64(mostRecentBlockHeader.Number.Int64()))
106106
}
107107
}()
@@ -147,7 +147,7 @@ func (rh *ReorgHandler) findForkPoint(reversedBlockHeaders []common.BlockHeader)
147147
}
148148
}
149149
lookbackFrom := reversedBlockHeaders[len(reversedBlockHeaders)-1].Number
150-
nextHeadersBatch, err := rh.storage.MainStorage.LookbackBlockHeaders(rh.rpc.ChainID, rh.blocksPerScan, lookbackFrom)
150+
nextHeadersBatch, err := rh.storage.MainStorage.LookbackBlockHeaders(rh.rpc.GetChainID(), rh.blocksPerScan, lookbackFrom)
151151
if err != nil {
152152
return nil, fmt.Errorf("error getting next headers batch: %w", err)
153153
}
@@ -190,7 +190,7 @@ func (rh *ReorgHandler) handleReorg(reorgStart *big.Int, reorgEnd *big.Int) erro
190190
})
191191
}
192192
// TODO make delete and insert atomic
193-
if err := rh.storage.MainStorage.DeleteBlockData(rh.rpc.ChainID, blockRange); err != nil {
193+
if err := rh.storage.MainStorage.DeleteBlockData(rh.rpc.GetChainID(), blockRange); err != nil {
194194
return fmt.Errorf("error deleting data for blocks %v: %w", blockRange, err)
195195
}
196196
if err := rh.storage.MainStorage.InsertBlockData(&data); err != nil {

0 commit comments

Comments
 (0)