Skip to content

Commit 27ced99

Browse files
committed
fetch latest block after s3 is completed
1 parent 700befa commit 27ced99

File tree

3 files changed

+214
-2
lines changed

3 files changed

+214
-2
lines changed

cmd/committer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,6 @@ func RunCommitter(cmd *cobra.Command, args []string) {
2424
}
2525
chainId := rpc.GetChainID()
2626

27-
committer.Init(chainId)
27+
committer.Init(chainId, rpc)
2828
committer.Commit(chainId)
2929
}

internal/committer/committer.go

Lines changed: 192 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"github.com/rs/zerolog/log"
2525
config "github.com/thirdweb-dev/indexer/configs"
2626
"github.com/thirdweb-dev/indexer/internal/common"
27+
"github.com/thirdweb-dev/indexer/internal/rpc"
2728
"github.com/thirdweb-dev/indexer/internal/storage"
2829
)
2930

@@ -50,6 +51,7 @@ type ParquetBlockData struct {
5051
Traces []byte `parquet:"traces_json"`
5152
}
5253

54+
var rpcClient rpc.IRPCClient
5355
var clickhouseConn clickhouse.Conn
5456
var s3Client *s3.Client
5557
var kafkaPublisher *storage.KafkaPublisher
@@ -58,7 +60,8 @@ var parquetFilenameRegex = regexp.MustCompile(`blocks_(\d+)_(\d+)\.parquet`)
5860
var mu sync.RWMutex
5961
var downloadComplete chan *BlockRange
6062

61-
func Init(chainId *big.Int) {
63+
func Init(chainId *big.Int, rpc rpc.IRPCClient) {
64+
rpcClient = rpc
6265
tempDir = filepath.Join(os.TempDir(), "committer", fmt.Sprintf("chain_%d", chainId.Uint64()))
6366
downloadComplete = make(chan *BlockRange, config.Cfg.StagingS3MaxParallelFileDownload)
6467

@@ -192,6 +195,9 @@ func Commit(chainId *big.Int) error {
192195
<-processorDone
193196
log.Info().Msg("All processing completed successfully")
194197

198+
log.Info().Msg("Fetching latest blocks")
199+
fetchLatest(nextCommitBlockNumber)
200+
195201
return nil
196202
}
197203

@@ -710,3 +716,188 @@ func Close() error {
710716
// Clean up temp directory
711717
return os.RemoveAll(tempDir)
712718
}
719+
720+
func fetchLatest(nextCommitBlockNumber *big.Int) error {
721+
for {
722+
latestBlock, err := rpcClient.GetLatestBlockNumber(context.Background())
723+
if err != nil {
724+
log.Warn().Err(err).Msg("Failed to get latest block number, retrying...")
725+
time.Sleep(250 * time.Millisecond)
726+
continue
727+
}
728+
if nextCommitBlockNumber.Cmp(latestBlock) >= 0 {
729+
time.Sleep(250 * time.Millisecond)
730+
continue
731+
}
732+
733+
// Configuration variables
734+
rpcBatchSize := int64(50) // Number of blocks per batch
735+
rpcNumParallelCalls := int64(10) // Maximum number of parallel RPC calls
736+
maxBlocksPerFetch := rpcBatchSize * rpcNumParallelCalls // Total blocks per fetch cycle
737+
738+
// Calculate the range of blocks to fetch
739+
blocksToFetch := new(big.Int).Sub(latestBlock, nextCommitBlockNumber)
740+
if blocksToFetch.Cmp(big.NewInt(maxBlocksPerFetch)) > 0 {
741+
blocksToFetch = big.NewInt(maxBlocksPerFetch) // Limit to maxBlocksPerFetch blocks per batch
742+
}
743+
744+
log.Info().
745+
Str("next_commit_block", nextCommitBlockNumber.String()).
746+
Str("latest_block", latestBlock.String()).
747+
Str("blocks_to_fetch", blocksToFetch.String()).
748+
Int64("batch_size", rpcBatchSize).
749+
Int64("max_parallel_calls", rpcNumParallelCalls).
750+
Msg("Starting to fetch latest blocks")
751+
752+
// Precreate array of block data
753+
blockDataArray := make([]common.BlockData, blocksToFetch.Int64())
754+
755+
// Create batches and calculate number of parallel calls needed
756+
numBatches := min((blocksToFetch.Int64()+rpcBatchSize-1)/rpcBatchSize, rpcNumParallelCalls)
757+
758+
var wg sync.WaitGroup
759+
var mu sync.Mutex
760+
var fetchErrors []error
761+
762+
for batchIndex := int64(0); batchIndex < numBatches; batchIndex++ {
763+
wg.Add(1)
764+
go func(batchIdx int64) {
765+
defer wg.Done()
766+
767+
startBlock := new(big.Int).Add(nextCommitBlockNumber, big.NewInt(batchIdx*rpcBatchSize))
768+
endBlock := new(big.Int).Add(startBlock, big.NewInt(rpcBatchSize-1))
769+
770+
// Don't exceed the latest block
771+
if endBlock.Cmp(latestBlock) > 0 {
772+
endBlock = latestBlock
773+
}
774+
775+
log.Debug().
776+
Int64("batch", batchIdx).
777+
Str("start_block", startBlock.String()).
778+
Str("end_block", endBlock.String()).
779+
Msg("Starting batch fetch")
780+
781+
// Create block numbers array for this batch
782+
var blockNumbers []*big.Int
783+
for i := new(big.Int).Set(startBlock); i.Cmp(endBlock) <= 0; i.Add(i, big.NewInt(1)) {
784+
blockNumbers = append(blockNumbers, new(big.Int).Set(i))
785+
}
786+
787+
// Make RPC call with retry mechanism (3 retries)
788+
var batchResults []rpc.GetFullBlockResult
789+
var fetchErr error
790+
791+
for retry := 0; retry < 3; retry++ {
792+
batchResults = rpcClient.GetFullBlocks(context.Background(), blockNumbers)
793+
794+
// Check if all blocks were fetched successfully
795+
allSuccess := true
796+
for _, result := range batchResults {
797+
if result.Error != nil {
798+
allSuccess = false
799+
break
800+
}
801+
}
802+
803+
if allSuccess {
804+
break
805+
}
806+
807+
if retry < 2 {
808+
log.Warn().
809+
Int64("batch", batchIdx).
810+
Int("retry", retry+1).
811+
Msg("Batch fetch failed, retrying...")
812+
time.Sleep(time.Duration(retry+1) * 100 * time.Millisecond)
813+
} else {
814+
fetchErr = fmt.Errorf("batch %d failed after 3 retries", batchIdx)
815+
}
816+
}
817+
818+
if fetchErr != nil {
819+
mu.Lock()
820+
fetchErrors = append(fetchErrors, fetchErr)
821+
mu.Unlock()
822+
return
823+
}
824+
825+
// Set values to the array
826+
mu.Lock()
827+
for i, result := range batchResults {
828+
arrayIndex := batchIdx*rpcBatchSize + int64(i)
829+
if arrayIndex < int64(len(blockDataArray)) {
830+
blockDataArray[arrayIndex] = result.Data
831+
}
832+
}
833+
mu.Unlock()
834+
835+
log.Debug().
836+
Int64("batch", batchIdx).
837+
Int("blocks_fetched", len(batchResults)).
838+
Msg("Completed batch fetch")
839+
}(batchIndex)
840+
}
841+
842+
// Wait for all goroutines to complete
843+
wg.Wait()
844+
845+
// Check for fetch errors
846+
if len(fetchErrors) > 0 {
847+
log.Error().
848+
Int("error_count", len(fetchErrors)).
849+
Msg("Some batches failed to fetch")
850+
for _, err := range fetchErrors {
851+
log.Error().Err(err).Msg("Batch fetch error")
852+
}
853+
log.Panic().Msg("Failed to fetch all required blocks")
854+
}
855+
856+
// Validate that all blocks are sequential and nothing is missing
857+
expectedBlockNumber := new(big.Int).Set(nextCommitBlockNumber)
858+
for i, blockData := range blockDataArray {
859+
if blockData.Block.Number == nil {
860+
log.Panic().
861+
Int("index", i).
862+
Str("expected_block", expectedBlockNumber.String()).
863+
Msg("Found nil block number in array")
864+
}
865+
866+
if blockData.Block.Number.Cmp(expectedBlockNumber) != 0 {
867+
log.Panic().
868+
Int("index", i).
869+
Str("expected_block", expectedBlockNumber.String()).
870+
Str("actual_block", blockData.Block.Number.String()).
871+
Msg("Block sequence mismatch - missing or out of order block")
872+
}
873+
874+
expectedBlockNumber.Add(expectedBlockNumber, big.NewInt(1))
875+
}
876+
877+
log.Info().
878+
Int("total_blocks", len(blockDataArray)).
879+
Str("start_block", nextCommitBlockNumber.String()).
880+
Str("end_block", new(big.Int).Sub(expectedBlockNumber, big.NewInt(1)).String()).
881+
Msg("All blocks validated successfully")
882+
883+
// Publish to Kafka
884+
log.Info().
885+
Int("blocks_to_publish", len(blockDataArray)).
886+
Msg("Publishing blocks to Kafka")
887+
888+
if err := kafkaPublisher.PublishBlockData(blockDataArray); err != nil {
889+
log.Panic().
890+
Err(err).
891+
Int("blocks_count", len(blockDataArray)).
892+
Msg("Failed to publish blocks to Kafka")
893+
}
894+
895+
log.Info().
896+
Int("blocks_published", len(blockDataArray)).
897+
Str("next_commit_block", expectedBlockNumber.String()).
898+
Msg("Successfully published blocks to Kafka")
899+
900+
// Update nextCommitBlockNumber for next iteration
901+
nextCommitBlockNumber.Set(expectedBlockNumber)
902+
}
903+
}

internal/committer/fetchLatest.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package committer
2+
3+
import (
4+
"context"
5+
"math/big"
6+
7+
"github.com/thirdweb-dev/indexer/internal/rpc"
8+
)
9+
10+
func FetchLatest(chainId *big.Int, rpc rpc.IRPCClient) error {
11+
for {
12+
latestBlock, err := rpc.GetLatestBlockNumber(context.Background())
13+
if err != nil {
14+
return err
15+
}
16+
if latestBlock.Cmp(chainId) > 0 {
17+
return nil
18+
}
19+
}
20+
return nil
21+
}

0 commit comments

Comments
 (0)