Skip to content

Commit 5b416ef

Browse files
committed
refactor rpc to a wrapper for easier reuse
1 parent 99f24d4 commit 5b416ef

File tree

16 files changed

+391
-334
lines changed

16 files changed

+391
-334
lines changed

cmd/orchestrator.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ import (
66
"github.com/prometheus/client_golang/prometheus/promhttp"
77
"github.com/rs/zerolog/log"
88
"github.com/spf13/cobra"
9-
"github.com/thirdweb-dev/indexer/internal/common"
109
"github.com/thirdweb-dev/indexer/internal/orchestrator"
10+
"github.com/thirdweb-dev/indexer/internal/rpc"
1111
)
1212

1313
var (
@@ -23,7 +23,7 @@ var (
2323

2424
func RunOrchestrator(cmd *cobra.Command, args []string) {
2525
log.Info().Msg("Starting indexer")
26-
rpc, err := common.InitializeRPC()
26+
rpc, err := rpc.Initialize()
2727
if err != nil {
2828
log.Fatal().Err(err).Msg("Failed to initialize RPC")
2929
}

internal/common/block.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,3 +35,5 @@ type BlockData struct {
3535
Logs []Log
3636
Traces []Trace
3737
}
38+
39+
type RawBlock = map[string]interface{}

internal/common/log.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,5 @@ type Log struct {
1616
Data string `json:"data"`
1717
Topics []string `json:"topics"`
1818
}
19+
20+
type RawLogs = []map[string]interface{}

internal/common/rpc.go

Lines changed: 0 additions & 125 deletions
This file was deleted.

internal/common/trace.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,3 +27,5 @@ type Trace struct {
2727
RewardType string `json:"reward_type"`
2828
RefundAddress string `json:"refund_address"`
2929
}
30+
31+
type RawTraces = []map[string]interface{}

internal/common/utils.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package common
2+
3+
import "math/big"
4+
5+
func BigIntSliceToChunks(values []*big.Int, chunkSize int) [][]*big.Int {
6+
if chunkSize >= len(values) || chunkSize <= 0 {
7+
return [][]*big.Int{values}
8+
}
9+
var chunks [][]*big.Int
10+
for i := 0; i < len(values); i += chunkSize {
11+
end := i + chunkSize
12+
if end > len(values) {
13+
end = len(values)
14+
}
15+
chunks = append(chunks, values[i:end])
16+
}
17+
return chunks
18+
}

internal/orchestrator/chain_tracker.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,19 +5,18 @@ import (
55
"time"
66

77
"github.com/rs/zerolog/log"
8-
"github.com/thirdweb-dev/indexer/internal/common"
98
"github.com/thirdweb-dev/indexer/internal/metrics"
9+
"github.com/thirdweb-dev/indexer/internal/rpc"
1010
)
1111

1212
const DEFAULT_CHAIN_TRACKER_POLL_INTERVAL = 300000 // 5 minutes
1313

1414
type ChainTracker struct {
15-
rpc common.RPC
15+
rpc rpc.Client
1616
triggerIntervalMs int
1717
}
1818

19-
func NewChainTracker(rpc common.RPC) *ChainTracker {
20-
19+
func NewChainTracker(rpc rpc.Client) *ChainTracker {
2120
return &ChainTracker{
2221
rpc: rpc,
2322
triggerIntervalMs: DEFAULT_CHAIN_TRACKER_POLL_INTERVAL,

internal/orchestrator/committer.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
config "github.com/thirdweb-dev/indexer/configs"
1212
"github.com/thirdweb-dev/indexer/internal/common"
1313
"github.com/thirdweb-dev/indexer/internal/metrics"
14+
"github.com/thirdweb-dev/indexer/internal/rpc"
1415
"github.com/thirdweb-dev/indexer/internal/storage"
1516
)
1617

@@ -22,10 +23,10 @@ type Committer struct {
2223
blocksPerCommit int
2324
storage storage.IStorage
2425
pollFromBlock *big.Int
25-
rpc common.RPC
26+
rpc rpc.Client
2627
}
2728

28-
func NewCommitter(rpc common.RPC, storage storage.IStorage) *Committer {
29+
func NewCommitter(rpc rpc.Client, storage storage.IStorage) *Committer {
2930
triggerInterval := config.Cfg.Committer.Interval
3031
if triggerInterval == 0 {
3132
triggerInterval = DEFAULT_COMMITTER_TRIGGER_INTERVAL

internal/orchestrator/failure_recoverer.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
config "github.com/thirdweb-dev/indexer/configs"
1010
"github.com/thirdweb-dev/indexer/internal/common"
1111
"github.com/thirdweb-dev/indexer/internal/metrics"
12+
"github.com/thirdweb-dev/indexer/internal/rpc"
1213
"github.com/thirdweb-dev/indexer/internal/storage"
1314
"github.com/thirdweb-dev/indexer/internal/worker"
1415
)
@@ -20,10 +21,10 @@ type FailureRecoverer struct {
2021
failuresPerPoll int
2122
triggerIntervalMs int
2223
storage storage.IStorage
23-
rpc common.RPC
24+
rpc rpc.Client
2425
}
2526

26-
func NewFailureRecoverer(rpc common.RPC, storage storage.IStorage) *FailureRecoverer {
27+
func NewFailureRecoverer(rpc rpc.Client, storage storage.IStorage) *FailureRecoverer {
2728
failuresPerPoll := config.Cfg.FailureRecoverer.BlocksPerRun
2829
if failuresPerPoll == 0 {
2930
failuresPerPoll = DEFAULT_FAILURES_PER_POLL
@@ -80,7 +81,7 @@ func (fr *FailureRecoverer) Start() {
8081
select {}
8182
}
8283

83-
func (fr *FailureRecoverer) handleWorkerResults(blockFailures []common.BlockFailure, results []worker.WorkerResult) {
84+
func (fr *FailureRecoverer) handleWorkerResults(blockFailures []common.BlockFailure, results []rpc.GetFullBlockResult) {
8485
log.Debug().Msgf("Failure Recoverer recovered %d blocks", len(results))
8586
blockFailureMap := make(map[*big.Int]common.BlockFailure)
8687
for _, failure := range blockFailures {
@@ -105,10 +106,10 @@ func (fr *FailureRecoverer) handleWorkerResults(blockFailures []common.BlockFail
105106
})
106107
} else {
107108
successfulResults = append(successfulResults, common.BlockData{
108-
Block: result.Block,
109-
Logs: result.Logs,
110-
Transactions: result.Transactions,
111-
Traces: result.Traces,
109+
Block: result.Data.Block,
110+
Logs: result.Data.Logs,
111+
Transactions: result.Data.Transactions,
112+
Traces: result.Data.Traces,
112113
})
113114
failuresToDelete = append(failuresToDelete, blockFailureForBlock)
114115
}

internal/orchestrator/orchestrator.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,19 @@ import (
44
"sync"
55

66
config "github.com/thirdweb-dev/indexer/configs"
7-
"github.com/thirdweb-dev/indexer/internal/common"
7+
"github.com/thirdweb-dev/indexer/internal/rpc"
88
"github.com/thirdweb-dev/indexer/internal/storage"
99
)
1010

1111
type Orchestrator struct {
12-
rpc common.RPC
12+
rpc rpc.Client
1313
storage storage.IStorage
1414
pollerEnabled bool
1515
failureRecovererEnabled bool
1616
committerEnabled bool
1717
}
1818

19-
func NewOrchestrator(rpc common.RPC) (*Orchestrator, error) {
19+
func NewOrchestrator(rpc rpc.Client) (*Orchestrator, error) {
2020
storage, err := storage.NewStorageConnector(&config.Cfg.Storage)
2121
if err != nil {
2222
return nil, err

0 commit comments

Comments
 (0)