Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 22 additions & 22 deletions internal/orchestrator/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (c *Committer) Start(ctx context.Context) {
log.Error().Err(err).Msg("Error getting block data to commit")
continue
}
if blockDataToCommit == nil || len(*blockDataToCommit) == 0 {
if len(blockDataToCommit) == 0 {
log.Debug().Msg("No block data to commit")
continue
}
Expand Down Expand Up @@ -104,7 +104,7 @@ func (c *Committer) getBlockNumbersToCommit() ([]*big.Int, error) {
return blockNumbers, nil
}

func (c *Committer) getSequentialBlockDataToCommit() (*[]common.BlockData, error) {
func (c *Committer) getSequentialBlockDataToCommit() ([]common.BlockData, error) {
blocksToCommit, err := c.getBlockNumbersToCommit()
if err != nil {
return nil, fmt.Errorf("error determining blocks to commit: %v", err)
Expand All @@ -117,49 +117,49 @@ func (c *Committer) getSequentialBlockDataToCommit() (*[]common.BlockData, error
if err != nil {
return nil, fmt.Errorf("error fetching blocks to commit: %v", err)
}
if blocksData == nil || len(*blocksData) == 0 {
if len(blocksData) == 0 {
log.Warn().Msgf("Committer didn't find the following range in staging: %v - %v", blocksToCommit[0].Int64(), blocksToCommit[len(blocksToCommit)-1].Int64())
c.handleMissingStagingData(blocksToCommit)
return nil, nil
}

// Sort blocks by block number
sort.Slice(*blocksData, func(i, j int) bool {
return (*blocksData)[i].Block.Number.Cmp((*blocksData)[j].Block.Number) < 0
sort.Slice(blocksData, func(i, j int) bool {
return blocksData[i].Block.Number.Cmp(blocksData[j].Block.Number) < 0
})

if (*blocksData)[0].Block.Number.Cmp(blocksToCommit[0]) != 0 {
return nil, c.handleGap(blocksToCommit[0], (*blocksData)[0].Block)
if blocksData[0].Block.Number.Cmp(blocksToCommit[0]) != 0 {
return nil, c.handleGap(blocksToCommit[0], blocksData[0].Block)
}

var sequentialBlockData []common.BlockData
sequentialBlockData = append(sequentialBlockData, (*blocksData)[0])
expectedBlockNumber := new(big.Int).Add((*blocksData)[0].Block.Number, big.NewInt(1))
sequentialBlockData = append(sequentialBlockData, blocksData[0])
expectedBlockNumber := new(big.Int).Add(blocksData[0].Block.Number, big.NewInt(1))

for i := 1; i < len(*blocksData); i++ {
if (*blocksData)[i].Block.Number.Cmp((*blocksData)[i-1].Block.Number) == 0 {
for i := 1; i < len(blocksData); i++ {
if blocksData[i].Block.Number.Cmp(blocksData[i-1].Block.Number) == 0 {
// Duplicate block, skip -- might happen if block has been polled multiple times
continue
}
if (*blocksData)[i].Block.Number.Cmp(expectedBlockNumber) != 0 {
if blocksData[i].Block.Number.Cmp(expectedBlockNumber) != 0 {
// Note: Gap detected, stop here
log.Warn().Msgf("Gap detected at block %s, committing until %s", expectedBlockNumber.String(), (*blocksData)[i-1].Block.Number.String())
log.Warn().Msgf("Gap detected at block %s, committing until %s", expectedBlockNumber.String(), blocksData[i-1].Block.Number.String())
// increment the gap counter in prometheus
metrics.GapCounter.Inc()
// record the first missed block number in prometheus
metrics.MissedBlockNumbers.Set(float64((*blocksData)[0].Block.Number.Int64()))
metrics.MissedBlockNumbers.Set(float64(blocksData[0].Block.Number.Int64()))
break
}
sequentialBlockData = append(sequentialBlockData, (*blocksData)[i])
sequentialBlockData = append(sequentialBlockData, blocksData[i])
expectedBlockNumber.Add(expectedBlockNumber, big.NewInt(1))
}

return &sequentialBlockData, nil
return sequentialBlockData, nil
}

func (c *Committer) commit(blockData *[]common.BlockData) error {
blockNumbers := make([]*big.Int, len(*blockData))
for i, block := range *blockData {
func (c *Committer) commit(blockData []common.BlockData) error {
blockNumbers := make([]*big.Int, len(blockData))
for i, block := range blockData {
blockNumbers[i] = block.Block.Number
}
log.Debug().Msgf("Committing %d blocks", len(blockNumbers))
Expand All @@ -175,16 +175,16 @@ func (c *Committer) commit(blockData *[]common.BlockData) error {
}

// Find highest block number from committed blocks
highestBlockNumber := (*blockData)[0].Block.Number
for _, block := range *blockData {
highestBlockNumber := blockData[0].Block.Number
for _, block := range blockData {
if block.Block.Number.Cmp(highestBlockNumber) > 0 {
highestBlockNumber = block.Block.Number
}
}
c.lastCommittedBlock = new(big.Int).Set(highestBlockNumber)

// Update metrics for successful commits
metrics.SuccessfulCommits.Add(float64(len(*blockData)))
metrics.SuccessfulCommits.Add(float64(len(blockData)))
metrics.LastCommittedBlock.Set(float64(highestBlockNumber.Int64()))
return nil
}
Expand Down
36 changes: 18 additions & 18 deletions internal/orchestrator/committer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,13 +252,13 @@ func TestGetSequentialBlockDataToCommit(t *testing.T) {
mockStagingStorage.EXPECT().GetStagingData(storage.QueryFilter{
ChainId: chainID,
BlockNumbers: []*big.Int{big.NewInt(101), big.NewInt(102), big.NewInt(103)},
}).Return(&blockData, nil)
}).Return(blockData, nil)

result, err := committer.getSequentialBlockDataToCommit()

assert.NoError(t, err)
assert.NotNil(t, result)
assert.Equal(t, 3, len(*result))
assert.Equal(t, 3, len(result))
}

func TestGetSequentialBlockDataToCommitWithDuplicateBlocks(t *testing.T) {
Expand Down Expand Up @@ -288,16 +288,16 @@ func TestGetSequentialBlockDataToCommitWithDuplicateBlocks(t *testing.T) {
mockStagingStorage.EXPECT().GetStagingData(storage.QueryFilter{
ChainId: chainID,
BlockNumbers: []*big.Int{big.NewInt(101), big.NewInt(102), big.NewInt(103)},
}).Return(&blockData, nil)
}).Return(blockData, nil)

result, err := committer.getSequentialBlockDataToCommit()

assert.NoError(t, err)
assert.NotNil(t, result)
assert.Equal(t, 3, len(*result))
assert.Equal(t, big.NewInt(101), (*result)[0].Block.Number)
assert.Equal(t, big.NewInt(102), (*result)[1].Block.Number)
assert.Equal(t, big.NewInt(103), (*result)[2].Block.Number)
assert.Equal(t, 3, len(result))
assert.Equal(t, big.NewInt(101), result[0].Block.Number)
assert.Equal(t, big.NewInt(102), result[1].Block.Number)
assert.Equal(t, big.NewInt(103), result[2].Block.Number)
}

func TestCommit(t *testing.T) {
Expand All @@ -317,10 +317,10 @@ func TestCommit(t *testing.T) {
{Block: common.Block{Number: big.NewInt(102)}},
}

mockMainStorage.EXPECT().InsertBlockData(&blockData).Return(nil)
mockStagingStorage.EXPECT().DeleteStagingData(&blockData).Return(nil)
mockMainStorage.EXPECT().InsertBlockData(blockData).Return(nil)
mockStagingStorage.EXPECT().DeleteStagingData(blockData).Return(nil)

err := committer.commit(&blockData)
err := committer.commit(blockData)

assert.NoError(t, err)
}
Expand Down Expand Up @@ -381,9 +381,9 @@ func TestStartCommitter(t *testing.T) {
{Block: common.Block{Number: big.NewInt(101)}},
{Block: common.Block{Number: big.NewInt(102)}},
}
mockStagingStorage.On("GetStagingData", mock.Anything).Return(&blockData, nil)
mockMainStorage.On("InsertBlockData", &blockData).Return(nil)
mockStagingStorage.On("DeleteStagingData", &blockData).Return(nil)
mockStagingStorage.On("GetStagingData", mock.Anything).Return(blockData, nil)
mockMainStorage.On("InsertBlockData", blockData).Return(nil)
mockStagingStorage.On("DeleteStagingData", blockData).Return(nil)

// Start the committer in a goroutine
go committer.Start(context.Background())
Expand Down Expand Up @@ -414,9 +414,9 @@ func TestCommitterRespectsSIGTERM(t *testing.T) {
{Block: common.Block{Number: big.NewInt(101)}},
{Block: common.Block{Number: big.NewInt(102)}},
}
mockStagingStorage.On("GetStagingData", mock.Anything).Return(&blockData, nil)
mockMainStorage.On("InsertBlockData", &blockData).Return(nil)
mockStagingStorage.On("DeleteStagingData", &blockData).Return(nil)
mockStagingStorage.On("GetStagingData", mock.Anything).Return(blockData, nil)
mockMainStorage.On("InsertBlockData", blockData).Return(nil)
mockStagingStorage.On("DeleteStagingData", blockData).Return(nil)

// Create a context that we can cancel
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -480,7 +480,7 @@ func TestHandleMissingStagingData(t *testing.T) {
mockStagingStorage.EXPECT().GetStagingData(storage.QueryFilter{
ChainId: chainID,
BlockNumbers: []*big.Int{big.NewInt(0), big.NewInt(1), big.NewInt(2), big.NewInt(3), big.NewInt(4)},
}).Return(&blockData, nil)
}).Return(blockData, nil)

result, err := committer.getSequentialBlockDataToCommit()

Expand Down Expand Up @@ -524,7 +524,7 @@ func TestHandleMissingStagingDataIsPolledWithCorrectBatchSize(t *testing.T) {
mockStagingStorage.EXPECT().GetStagingData(storage.QueryFilter{
ChainId: chainID,
BlockNumbers: []*big.Int{big.NewInt(0), big.NewInt(1), big.NewInt(2), big.NewInt(3), big.NewInt(4)},
}).Return(&blockData, nil)
}).Return(blockData, nil)

result, err := committer.getSequentialBlockDataToCommit()

Expand Down
8 changes: 4 additions & 4 deletions internal/orchestrator/reorg_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func (rh *ReorgHandler) findReorgedBlockNumbers(blockHeadersDescending []common.
continueCheckingForReorgs := false
for i := 0; i < len(blockHeadersDescending); i++ {
blockHeader := blockHeadersDescending[i]
fetchedBlock, ok := (*newBlocksByNumber)[blockHeader.Number.String()]
fetchedBlock, ok := newBlocksByNumber[blockHeader.Number.String()]
if !ok {
return fmt.Errorf("block not found: %s", blockHeader.Number.String())
}
Expand All @@ -220,7 +220,7 @@ func (rh *ReorgHandler) findReorgedBlockNumbers(blockHeadersDescending []common.
return nil
}

func (rh *ReorgHandler) getNewBlocksByNumber(blockHeaders []common.BlockHeader) (*map[string]common.Block, error) {
func (rh *ReorgHandler) getNewBlocksByNumber(blockHeaders []common.BlockHeader) (map[string]common.Block, error) {
blockNumbers := make([]*big.Int, 0, len(blockHeaders))
for _, header := range blockHeaders {
blockNumbers = append(blockNumbers, header.Number)
Expand Down Expand Up @@ -257,7 +257,7 @@ func (rh *ReorgHandler) getNewBlocksByNumber(blockHeaders []common.BlockHeader)
fetchedBlocksByNumber[blockResult.BlockNumber.String()] = blockResult.Data
}
}
return &fetchedBlocksByNumber, nil
return fetchedBlocksByNumber, nil
}

func (rh *ReorgHandler) handleReorg(reorgedBlockNumbers []*big.Int) error {
Expand All @@ -281,7 +281,7 @@ func (rh *ReorgHandler) handleReorg(reorgedBlockNumbers []*big.Int) error {
if err := rh.storage.MainStorage.DeleteBlockData(rh.rpc.GetChainID(), blocksToDelete); err != nil {
return fmt.Errorf("error deleting data for blocks %v: %w", blocksToDelete, err)
}
if err := rh.storage.MainStorage.InsertBlockData(&data); err != nil {
if err := rh.storage.MainStorage.InsertBlockData(data); err != nil {
return fmt.Errorf("error saving data to main storage: %w", err)
}
return nil
Expand Down
12 changes: 6 additions & 6 deletions internal/orchestrator/reorg_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,8 +614,8 @@ func TestHandleReorgWithSingleBlockReorg(t *testing.T) {
mockMainStorage.EXPECT().DeleteBlockData(big.NewInt(1), mock.MatchedBy(func(blocks []*big.Int) bool {
return len(blocks) == 1
})).Return(nil)
mockMainStorage.EXPECT().InsertBlockData(mock.MatchedBy(func(data *[]common.BlockData) bool {
return data != nil && len(*data) == 1
mockMainStorage.EXPECT().InsertBlockData(mock.MatchedBy(func(data []common.BlockData) bool {
return len(data) == 1
})).Return(nil)

handler := NewReorgHandler(mockRPC, mockStorage)
Expand Down Expand Up @@ -682,8 +682,8 @@ func TestHandleReorgWithLatestBlockReorged(t *testing.T) {
mockMainStorage.EXPECT().DeleteBlockData(big.NewInt(1), mock.MatchedBy(func(blocks []*big.Int) bool {
return len(blocks) == 8
})).Return(nil)
mockMainStorage.EXPECT().InsertBlockData(mock.MatchedBy(func(data *[]common.BlockData) bool {
return data != nil && len(*data) == 8
mockMainStorage.EXPECT().InsertBlockData(mock.MatchedBy(func(data []common.BlockData) bool {
return len(data) == 8
})).Return(nil)

handler := NewReorgHandler(mockRPC, mockStorage)
Expand Down Expand Up @@ -746,8 +746,8 @@ func TestHandleReorgWithManyBlocks(t *testing.T) {
mockMainStorage.EXPECT().DeleteBlockData(big.NewInt(1), mock.MatchedBy(func(blocks []*big.Int) bool {
return len(blocks) == 5
})).Return(nil)
mockMainStorage.EXPECT().InsertBlockData(mock.MatchedBy(func(data *[]common.BlockData) bool {
return data != nil && len(*data) == 5
mockMainStorage.EXPECT().InsertBlockData(mock.MatchedBy(func(data []common.BlockData) bool {
return len(data) == 5
})).Return(nil)

handler := NewReorgHandler(mockRPC, mockStorage)
Expand Down
16 changes: 8 additions & 8 deletions internal/rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,29 +212,29 @@ func (rpc *Client) setChainID() error {

func (rpc *Client) GetFullBlocks(blockNumbers []*big.Int) []GetFullBlockResult {
var wg sync.WaitGroup
var blocks *[]RPCFetchBatchResult[common.RawBlock]
var logs *[]RPCFetchBatchResult[common.RawLogs]
var traces *[]RPCFetchBatchResult[common.RawTraces]
var receipts *[]RPCFetchBatchResult[common.RawReceipts]
var blocks []RPCFetchBatchResult[common.RawBlock]
var logs []RPCFetchBatchResult[common.RawLogs]
var traces []RPCFetchBatchResult[common.RawTraces]
var receipts []RPCFetchBatchResult[common.RawReceipts]
wg.Add(2)

go func() {
defer wg.Done()
result := RPCFetchBatch[common.RawBlock](rpc, blockNumbers, "eth_getBlockByNumber", GetBlockWithTransactionsParams)
blocks = &result
blocks = result
}()

if rpc.supportsBlockReceipts {
go func() {
defer wg.Done()
result := RPCFetchInBatches[common.RawReceipts](rpc, blockNumbers, rpc.blocksPerRequest.Receipts, config.Cfg.RPC.BlockReceipts.BatchDelay, "eth_getBlockReceipts", GetBlockReceiptsParams)
receipts = &result
receipts = result
}()
} else {
go func() {
defer wg.Done()
result := RPCFetchInBatches[common.RawLogs](rpc, blockNumbers, rpc.blocksPerRequest.Logs, config.Cfg.RPC.Logs.BatchDelay, "eth_getLogs", GetLogsParams)
logs = &result
logs = result
}()
}

Expand All @@ -243,7 +243,7 @@ func (rpc *Client) GetFullBlocks(blockNumbers []*big.Int) []GetFullBlockResult {
go func() {
defer wg.Done()
result := RPCFetchInBatches[common.RawTraces](rpc, blockNumbers, rpc.blocksPerRequest.Traces, config.Cfg.RPC.Traces.BatchDelay, "trace_block", TraceBlockParams)
traces = &result
traces = result
}()
}

Expand Down
18 changes: 9 additions & 9 deletions internal/rpc/serializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,17 @@ import (
"github.com/thirdweb-dev/indexer/internal/common"
)

func SerializeFullBlocks(chainId *big.Int, blocks *[]RPCFetchBatchResult[common.RawBlock], logs *[]RPCFetchBatchResult[common.RawLogs], traces *[]RPCFetchBatchResult[common.RawTraces], receipts *[]RPCFetchBatchResult[common.RawReceipts]) []GetFullBlockResult {
func SerializeFullBlocks(chainId *big.Int, blocks []RPCFetchBatchResult[common.RawBlock], logs []RPCFetchBatchResult[common.RawLogs], traces []RPCFetchBatchResult[common.RawTraces], receipts []RPCFetchBatchResult[common.RawReceipts]) []GetFullBlockResult {
if blocks == nil {
return []GetFullBlockResult{}
}
results := make([]GetFullBlockResult, 0, len(*blocks))
results := make([]GetFullBlockResult, 0, len(blocks))

rawLogsMap := mapBatchResultsByBlockNumber[common.RawLogs](logs)
rawReceiptsMap := mapBatchResultsByBlockNumber[common.RawReceipts](receipts)
rawTracesMap := mapBatchResultsByBlockNumber[common.RawTraces](traces)

for _, rawBlock := range *blocks {
for _, rawBlock := range blocks {
result := GetFullBlockResult{
BlockNumber: rawBlock.BlockNumber,
}
Expand All @@ -45,7 +45,7 @@ func SerializeFullBlocks(chainId *big.Int, blocks *[]RPCFetchBatchResult[common.
if rawReceipts.Error != nil {
result.Error = rawReceipts.Error
} else {
result.Data.Logs = serializeLogsFromReceipts(chainId, &rawReceipts.Result, result.Data.Block)
result.Data.Logs = serializeLogsFromReceipts(chainId, rawReceipts.Result, result.Data.Block)
result.Data.Transactions = serializeTransactions(chainId, rawBlock.Result["transactions"].([]interface{}), blockTimestamp, &rawReceipts.Result)
}
} else {
Expand Down Expand Up @@ -75,12 +75,12 @@ func SerializeFullBlocks(chainId *big.Int, blocks *[]RPCFetchBatchResult[common.
return results
}

func mapBatchResultsByBlockNumber[T any](results *[]RPCFetchBatchResult[T]) map[string]*RPCFetchBatchResult[T] {
func mapBatchResultsByBlockNumber[T any](results []RPCFetchBatchResult[T]) map[string]*RPCFetchBatchResult[T] {
if results == nil {
return make(map[string]*RPCFetchBatchResult[T], 0)
}
resultsMap := make(map[string]*RPCFetchBatchResult[T], len(*results))
for _, result := range *results {
resultsMap := make(map[string]*RPCFetchBatchResult[T], len(results))
for _, result := range results {
resultsMap[result.BlockNumber.String()] = &result
}
return resultsMap
Expand Down Expand Up @@ -278,13 +278,13 @@ func ExtractFunctionSelector(s string) string {
return s[0:10]
}

func serializeLogsFromReceipts(chainId *big.Int, rawReceipts *[]map[string]interface{}, block common.Block) []common.Log {
func serializeLogsFromReceipts(chainId *big.Int, rawReceipts []map[string]interface{}, block common.Block) []common.Log {
logs := make([]common.Log, 0)
if rawReceipts == nil {
return logs
}

for _, receipt := range *rawReceipts {
for _, receipt := range rawReceipts {
rawLogs, ok := receipt["logs"].([]interface{})
if !ok {
log.Debug().Msgf("Failed to serialize logs: %v", receipt["logs"])
Expand Down
Loading