77 "sort"
88 "time"
99
10+ "github.com/google/uuid"
1011 "github.com/rs/zerolog/log"
1112 config "github.com/thirdweb-dev/indexer/configs"
1213 "github.com/thirdweb-dev/indexer/internal/common"
@@ -57,7 +58,8 @@ func (c *Committer) Start(ctx context.Context) {
5758 log .Info ().Msg ("Committer shutting down" )
5859 return
5960 case <- ticker .C :
60- blockDataToCommit , err := c .getSequentialBlockDataToCommit ()
61+ commitID := uuid .New ().String ()
62+ blockDataToCommit , err := c .getSequentialBlockDataToCommit (commitID )
6163 if err != nil {
6264 log .Error ().Err (err ).Msg ("Error getting block data to commit" )
6365 continue
@@ -66,16 +68,16 @@ func (c *Committer) Start(ctx context.Context) {
6668 log .Debug ().Msg ("No block data to commit" )
6769 continue
6870 }
69- if err := c .commit (blockDataToCommit ); err != nil {
71+ if err := c .commit (commitID , blockDataToCommit ); err != nil {
7072 log .Error ().Err (err ).Msg ("Error committing blocks" )
7173 }
7274 }
7375 }
7476}
7577
76- func (c * Committer ) getBlockNumbersToCommit () ([]* big.Int , error ) {
78+ func (c * Committer ) getBlockNumbersToCommit (commitID string ) ([]* big.Int , error ) {
7779 latestCommittedBlockNumber , err := c .storage .MainStorage .GetMaxBlockNumber (c .rpc .GetChainID ())
78- log .Info ().Msgf ("Committer found this max block number in main storage: %s" , latestCommittedBlockNumber .String ())
80+ log .Info ().Str ( "commitID" , commitID ). Msgf ("Committer found this max block number in main storage: %s" , latestCommittedBlockNumber .String ())
7981 if err != nil {
8082 return nil , err
8183 }
@@ -97,8 +99,8 @@ func (c *Committer) getBlockNumbersToCommit() ([]*big.Int, error) {
9799 return blockNumbers , nil
98100}
99101
100- func (c * Committer ) getSequentialBlockDataToCommit () (* []common.BlockData , error ) {
101- blocksToCommit , err := c .getBlockNumbersToCommit ()
102+ func (c * Committer ) getSequentialBlockDataToCommit (commitID string ) (* []common.BlockData , error ) {
103+ blocksToCommit , err := c .getBlockNumbersToCommit (commitID )
102104 if err != nil {
103105 return nil , fmt .Errorf ("error determining blocks to commit: %v" , err )
104106 }
@@ -111,8 +113,8 @@ func (c *Committer) getSequentialBlockDataToCommit() (*[]common.BlockData, error
111113 return nil , fmt .Errorf ("error fetching blocks to commit: %v" , err )
112114 }
113115 if blocksData == nil || len (* blocksData ) == 0 {
114- log .Warn ().Msgf ("Committer didn't find the following range in staging: %v - %v" , blocksToCommit [0 ].Int64 (), blocksToCommit [len (blocksToCommit )- 1 ].Int64 ())
115- c .handleMissingStagingData (blocksToCommit )
116+ log .Warn ().Str ( "commitID" , commitID ). Msgf ("Committer didn't find the following range in staging: %v - %v" , blocksToCommit [0 ].Int64 (), blocksToCommit [len (blocksToCommit )- 1 ].Int64 ())
117+ c .handleMissingStagingData (commitID , blocksToCommit )
116118 return nil , nil
117119 }
118120
@@ -136,7 +138,7 @@ func (c *Committer) getSequentialBlockDataToCommit() (*[]common.BlockData, error
136138 }
137139 if (* blocksData )[i ].Block .Number .Cmp (expectedBlockNumber ) != 0 {
138140 // Note: Gap detected, stop here
139- log .Warn ().Msgf ("Gap detected at block %s, committing until %s" , expectedBlockNumber .String (), (* blocksData )[i - 1 ].Block .Number .String ())
141+ log .Warn ().Str ( "commitID" , commitID ). Msgf ("Gap detected at block %s, committing until %s" , expectedBlockNumber .String (), (* blocksData )[i - 1 ].Block .Number .String ())
140142 // increment the a gap counter in prometheus
141143 metrics .GapCounter .Inc ()
142144 // record the first missed block number in prometheus
@@ -150,22 +152,24 @@ func (c *Committer) getSequentialBlockDataToCommit() (*[]common.BlockData, error
150152 return & sequentialBlockData , nil
151153}
152154
153- func (c * Committer ) commit (blockData * []common.BlockData ) error {
155+ func (c * Committer ) commit (commitID string , blockData * []common.BlockData ) error {
154156 blockNumbers := make ([]* big.Int , len (* blockData ))
155157 for i , block := range * blockData {
156158 blockNumbers [i ] = block .Block .Number
157159 }
158- log .Debug ().Msgf ("Committing %d blocks" , len (blockNumbers ))
160+ log .Debug ().Str ( "commitID" , commitID ). Msgf ("Committing %d blocks: %v " , len (blockNumbers ), blockNumbers )
159161
160162 // TODO if next parts (saving or deleting) fail, we'll have to do a rollback
161163 if err := c .storage .MainStorage .InsertBlockData (blockData ); err != nil {
162164 log .Error ().Err (err ).Msgf ("Failed to commit blocks: %v" , blockNumbers )
163165 return fmt .Errorf ("error saving data to main storage: %v" , err )
164166 }
167+ log .Debug ().Str ("commitID" , commitID ).Msgf ("Committer inserted blocks: %v" , blockNumbers )
165168
166169 if err := c .storage .StagingStorage .DeleteStagingData (blockData ); err != nil {
167170 return fmt .Errorf ("error deleting data from staging storage: %v" , err )
168171 }
172+ log .Debug ().Str ("commitID" , commitID ).Msgf ("Committer deleted staging data for blocks: %v" , blockNumbers )
169173
170174 // Update metrics for successful commits
171175 metrics .SuccessfulCommits .Add (float64 (len (* blockData )))
@@ -199,24 +203,24 @@ func (c *Committer) handleGap(expectedStartBlockNumber *big.Int, actualFirstBloc
199203 return fmt .Errorf ("first block number (%s) in commit batch does not match expected (%s)" , actualFirstBlock .Number .String (), expectedStartBlockNumber .String ())
200204}
201205
202- func (c * Committer ) handleMissingStagingData (blocksToCommit []* big.Int ) {
206+ func (c * Committer ) handleMissingStagingData (commitID string , blocksToCommit []* big.Int ) {
203207 // Checks if there are any blocks in staging after the current range end
204208 lastStagedBlockNumber , err := c .storage .StagingStorage .GetLastStagedBlockNumber (c .rpc .GetChainID (), blocksToCommit [len (blocksToCommit )- 1 ], big .NewInt (0 ))
205209 if err != nil {
206210 log .Error ().Err (err ).Msg ("Error checking staged data for missing range" )
207211 return
208212 }
209213 if lastStagedBlockNumber == nil || lastStagedBlockNumber .Sign () <= 0 {
210- log .Debug ().Msgf ("Committer is caught up with staging. No need to poll for missing blocks." )
214+ log .Debug ().Str ( "commitID" , commitID ). Msgf ("Committer is caught up with staging. No need to poll for missing blocks." )
211215 return
212216 }
213- log .Debug ().Msgf ("Detected missing blocks in staging data starting from %s." , blocksToCommit [0 ].String ())
217+ log .Debug ().Str ( "commitID" , commitID ). Msgf ("Detected missing blocks in staging data starting from %s." , blocksToCommit [0 ].String ())
214218
215219 poller := NewBoundlessPoller (c .rpc , c .storage )
216220 blocksToPoll := blocksToCommit
217221 if len (blocksToCommit ) > int (poller .blocksPerPoll ) {
218222 blocksToPoll = blocksToCommit [:int (poller .blocksPerPoll )]
219223 }
220224 poller .Poll (blocksToPoll )
221- log .Debug ().Msgf ("Polled %d blocks due to committer detecting them as missing. Range: %s - %s" , len (blocksToPoll ), blocksToPoll [0 ].String (), blocksToPoll [len (blocksToPoll )- 1 ].String ())
225+ log .Debug ().Str ( "commitID" , commitID ). Msgf ("Polled %d blocks due to committer detecting them as missing. Range: %s - %s" , len (blocksToPoll ), blocksToPoll [0 ].String (), blocksToPoll [len (blocksToPoll )- 1 ].String ())
222226}
0 commit comments