77 "sort"
88 "time"
99
10+ "github.com/google/uuid"
1011 "github.com/rs/zerolog/log"
1112 config "github.com/thirdweb-dev/indexer/configs"
1213 "github.com/thirdweb-dev/indexer/internal/common"
@@ -56,25 +57,27 @@ func (c *Committer) Start(ctx context.Context) {
5657 return
5758 default :
5859 time .Sleep (interval )
59- blockDataToCommit , err := c .getSequentialBlockDataToCommit ()
60+ commitID := uuid .New ().String ()
61+ log .Debug ().Msgf ("Starting commit. CommitID: %s. Timestamp %d" , commitID , time .Now ().UnixMilli ())
62+ blockDataToCommit , err := c .getSequentialBlockDataToCommit (commitID )
6063 if err != nil {
61- log .Error ().Err (err ).Msg ("Error getting block data to commit" )
64+ log .Error ().Err (err ).Msgf ("Error getting block data to commit. CommitID: %s. Timestamp %d" , commitID , time . Now (). UnixMilli () )
6265 continue
6366 }
6467 if blockDataToCommit == nil || len (* blockDataToCommit ) == 0 {
65- log .Debug ().Msg ("No block data to commit" )
68+ log .Debug ().Msgf ("No block data to commit. CommitID: %s. Timestamp %d" , commitID , time . Now (). UnixMilli () )
6669 continue
6770 }
68- if err := c .commit (blockDataToCommit ); err != nil {
69- log .Error ().Err (err ).Msg ("Error committing blocks" )
71+ if err := c .commit (commitID , blockDataToCommit ); err != nil {
72+ log .Error ().Err (err ).Msgf ("Error committing blocks. CommitID: %s. Timestamp %d" , commitID , time . Now (). UnixMilli () )
7073 }
7174 }
7275 }
7376}
7477
75- func (c * Committer ) getBlockNumbersToCommit () ([]* big.Int , error ) {
78+ func (c * Committer ) getBlockNumbersToCommit (commitID string ) ([]* big.Int , error ) {
7679 latestCommittedBlockNumber , err := c .storage .MainStorage .GetMaxBlockNumber (c .rpc .GetChainID ())
77- log .Info ().Msgf ("Committer found this max block number in main storage: %s" , latestCommittedBlockNumber .String ())
80+ log .Info ().Msgf ("Committer found this max block number in main storage: %s. CommitID: %s. Timestamp %d " , latestCommittedBlockNumber .String (), commitID , time . Now (). UnixMilli ())
7881 if err != nil {
7982 return nil , err
8083 }
@@ -96,8 +99,8 @@ func (c *Committer) getBlockNumbersToCommit() ([]*big.Int, error) {
9699 return blockNumbers , nil
97100}
98101
99- func (c * Committer ) getSequentialBlockDataToCommit () (* []common.BlockData , error ) {
100- blocksToCommit , err := c .getBlockNumbersToCommit ()
102+ func (c * Committer ) getSequentialBlockDataToCommit (commitID string ) (* []common.BlockData , error ) {
103+ blocksToCommit , err := c .getBlockNumbersToCommit (commitID )
101104 if err != nil {
102105 return nil , fmt .Errorf ("error determining blocks to commit: %v" , err )
103106 }
@@ -110,8 +113,8 @@ func (c *Committer) getSequentialBlockDataToCommit() (*[]common.BlockData, error
110113 return nil , fmt .Errorf ("error fetching blocks to commit: %v" , err )
111114 }
112115 if blocksData == nil || len (* blocksData ) == 0 {
113- log .Warn ().Msgf ("Committer didn't find the following range in staging: %v - %v" , blocksToCommit [0 ].Int64 (), blocksToCommit [len (blocksToCommit )- 1 ].Int64 ())
114- c .handleMissingStagingData (blocksToCommit )
116+ log .Warn ().Msgf ("Committer didn't find the following range in staging: %v - %v. CommitID: %s. Timestamp %d " , blocksToCommit [0 ].Int64 (), blocksToCommit [len (blocksToCommit )- 1 ].Int64 (), commitID , time . Now (). UnixMilli ())
117+ c .handleMissingStagingData (commitID , blocksToCommit )
115118 return nil , nil
116119 }
117120
@@ -121,7 +124,7 @@ func (c *Committer) getSequentialBlockDataToCommit() (*[]common.BlockData, error
121124 })
122125
123126 if (* blocksData )[0 ].Block .Number .Cmp (blocksToCommit [0 ]) != 0 {
124- return nil , c .handleGap (blocksToCommit [0 ], (* blocksData )[0 ].Block )
127+ return nil , c .handleGap (commitID , blocksToCommit [0 ], (* blocksData )[0 ].Block )
125128 }
126129
127130 var sequentialBlockData []common.BlockData
@@ -135,7 +138,7 @@ func (c *Committer) getSequentialBlockDataToCommit() (*[]common.BlockData, error
135138 }
136139 if (* blocksData )[i ].Block .Number .Cmp (expectedBlockNumber ) != 0 {
137140 // Note: Gap detected, stop here
138- log .Warn ().Msgf ("Gap detected at block %s, committing until %s" , expectedBlockNumber .String (), (* blocksData )[i - 1 ].Block .Number .String ())
141+ log .Warn ().Msgf ("Gap detected at block %s, committing until %s. CommitID: %s. Timestamp %d " , expectedBlockNumber .String (), (* blocksData )[i - 1 ].Block .Number .String (), commitID , time . Now (). UnixMilli ())
139142 // increment the a gap counter in prometheus
140143 metrics .GapCounter .Inc ()
141144 // record the first missed block number in prometheus
@@ -149,22 +152,24 @@ func (c *Committer) getSequentialBlockDataToCommit() (*[]common.BlockData, error
149152 return & sequentialBlockData , nil
150153}
151154
152- func (c * Committer ) commit (blockData * []common.BlockData ) error {
155+ func (c * Committer ) commit (commitID string , blockData * []common.BlockData ) error {
153156 blockNumbers := make ([]* big.Int , len (* blockData ))
154157 for i , block := range * blockData {
155158 blockNumbers [i ] = block .Block .Number
156159 }
157- log .Debug ().Msgf ("Committing %d blocks" , len (blockNumbers ))
160+ log .Debug ().Msgf ("Committing %d blocks: %v. CommitID: %s. Timestamp %d " , len (blockNumbers ), blockNumbers , commitID , time . Now (). UnixMilli ( ))
158161
159162 // TODO if next parts (saving or deleting) fail, we'll have to do a rollback
160163 if err := c .storage .MainStorage .InsertBlockData (blockData ); err != nil {
161- log .Error ().Err (err ).Msgf ("Failed to commit blocks: %v" , blockNumbers )
164+ log .Error ().Err (err ).Msgf ("Failed to commit blocks: %v. CommitID: %s. Timestamp %d " , blockNumbers , commitID , time . Now (). UnixMilli () )
162165 return fmt .Errorf ("error saving data to main storage: %v" , err )
163166 }
167+ log .Debug ().Msgf ("Committer inserted blocks: %v. CommitID: %s. Timestamp %d" , blockNumbers , commitID , time .Now ().UnixMilli ())
164168
165169 if err := c .storage .StagingStorage .DeleteStagingData (blockData ); err != nil {
166170 return fmt .Errorf ("error deleting data from staging storage: %v" , err )
167171 }
172+ log .Debug ().Msgf ("Committer deleted staging data for blocks: %v. CommitID: %s. Timestamp %d" , blockNumbers , commitID , time .Now ().UnixMilli ())
168173
169174 // Update metrics for successful commits
170175 metrics .SuccessfulCommits .Add (float64 (len (* blockData )))
@@ -173,7 +178,7 @@ func (c *Committer) commit(blockData *[]common.BlockData) error {
173178 return nil
174179}
175180
176- func (c * Committer ) handleGap (expectedStartBlockNumber * big.Int , actualFirstBlock common.Block ) error {
181+ func (c * Committer ) handleGap (commitID string , expectedStartBlockNumber * big.Int , actualFirstBlock common.Block ) error {
177182 // increment the a gap counter in prometheus
178183 metrics .GapCounter .Inc ()
179184 // record the first missed block number in prometheus
@@ -182,9 +187,9 @@ func (c *Committer) handleGap(expectedStartBlockNumber *big.Int, actualFirstBloc
182187 poller := NewBoundlessPoller (c .rpc , c .storage )
183188
184189 missingBlockCount := new (big.Int ).Sub (actualFirstBlock .Number , expectedStartBlockNumber ).Int64 ()
185- log .Debug ().Msgf ("Detected %d missing blocks between blocks %s and %s" , missingBlockCount , expectedStartBlockNumber .String (), actualFirstBlock .Number .String ())
190+ log .Debug ().Msgf ("Detected %d missing blocks between blocks %s and %s. CommitID: %s. Timestamp %d " , missingBlockCount , expectedStartBlockNumber .String (), actualFirstBlock .Number .String (), commitID , time . Now (). UnixMilli ())
186191 if missingBlockCount > poller .blocksPerPoll {
187- log .Debug ().Msgf ("Limiting polling missing blocks to %d blocks due to config" , poller .blocksPerPoll )
192+ log .Debug ().Msgf ("Limiting polling missing blocks to %d blocks due to config. CommitID: %s. Timestamp %d " , poller .blocksPerPoll , commitID , time . Now (). UnixMilli () )
188193 missingBlockCount = poller .blocksPerPoll
189194 }
190195 missingBlockNumbers := make ([]* big.Int , missingBlockCount )
@@ -193,29 +198,29 @@ func (c *Committer) handleGap(expectedStartBlockNumber *big.Int, actualFirstBloc
193198 missingBlockNumbers [i ] = missingBlockNumber
194199 }
195200
196- log .Debug ().Msgf ("Polling %d blocks while handling gap: %v" , len (missingBlockNumbers ), missingBlockNumbers )
201+ log .Debug ().Msgf ("Polling %d blocks while handling gap: %v. CommitID: %s. Timestamp %d " , len (missingBlockNumbers ), missingBlockNumbers , commitID , time . Now (). UnixMilli () )
197202 poller .Poll (missingBlockNumbers )
198203 return fmt .Errorf ("first block number (%s) in commit batch does not match expected (%s)" , actualFirstBlock .Number .String (), expectedStartBlockNumber .String ())
199204}
200205
201- func (c * Committer ) handleMissingStagingData (blocksToCommit []* big.Int ) {
206+ func (c * Committer ) handleMissingStagingData (commitID string , blocksToCommit []* big.Int ) {
202207 // Checks if there are any blocks in staging after the current range end
203208 lastStagedBlockNumber , err := c .storage .StagingStorage .GetLastStagedBlockNumber (c .rpc .GetChainID (), blocksToCommit [len (blocksToCommit )- 1 ], big .NewInt (0 ))
204209 if err != nil {
205210 log .Error ().Err (err ).Msg ("Error checking staged data for missing range" )
206211 return
207212 }
208213 if lastStagedBlockNumber == nil || lastStagedBlockNumber .Sign () <= 0 {
209- log .Debug ().Msgf ("Committer is caught up with staging. No need to poll for missing blocks." )
214+ log .Debug ().Msgf ("Committer is caught up with staging. No need to poll for missing blocks. CommitID: %s. Timestamp %d" , commitID , time . Now (). UnixMilli () )
210215 return
211216 }
212- log .Debug ().Msgf ("Detected missing blocks in staging data starting from %s." , blocksToCommit [0 ].String ())
217+ log .Debug ().Msgf ("Detected missing blocks in staging data starting from %s. CommitID: %s. Timestamp %d " , blocksToCommit [0 ].String (), commitID , time . Now (). UnixMilli ())
213218
214219 poller := NewBoundlessPoller (c .rpc , c .storage )
215220 blocksToPoll := blocksToCommit
216221 if len (blocksToCommit ) > int (poller .blocksPerPoll ) {
217222 blocksToPoll = blocksToCommit [:int (poller .blocksPerPoll )]
218223 }
219224 poller .Poll (blocksToPoll )
220- log .Debug ().Msgf ("Polled %d blocks due to committer detecting them as missing. Range: %s - %s" , len (blocksToPoll ), blocksToPoll [0 ].String (), blocksToPoll [len (blocksToPoll )- 1 ].String ())
225+ log .Debug ().Msgf ("Polled %d blocks due to committer detecting them as missing. Range: %s - %s. CommitID: %s. Timestamp %d " , len (blocksToPoll ), blocksToPoll [0 ].String (), blocksToPoll [len (blocksToPoll )- 1 ].String (), commitID , time . Now (). UnixMilli ())
221226}
0 commit comments