@@ -268,41 +268,19 @@ func (p *KafkaPublisher) createBlockDataMessage(block *common.BlockData, isDelet
268268 return nil , fmt .Errorf ("failed to marshal block data: %v" , err )
269269 }
270270
271- log .Debug ().
272- Uint64 ("chain_id" , data .ChainId ).
273- Uint64 ("block_number" , block .Block .Number .Uint64 ()).
274- Int ("tx_count" , len (block .Transactions )).
275- Int ("log_count" , len (block .Logs )).
276- Int ("trace_count" , len (block .Traces )).
277- Bool ("is_deleted" , isDeleted ).
278- Bool ("is_reorg" , isReorg ).
279- Msg ("KafkaPublisher Message: Block metadata" )
280-
281- return p .createRecord (data .GetType (), data .ChainId , block .Block .Number .Uint64 (), timestamp , isDeleted , isReorg , msgJson )
282- }
283-
284- func (p * KafkaPublisher ) createBlockRevertMessage (chainId uint64 , blockNumber uint64 ) (* kgo.Record , error ) {
285- timestamp := time .Now ()
286-
287- data := PublishableMessageRevert {
288- ChainId : chainId ,
289- BlockNumber : blockNumber ,
290- IsDeleted : 0 ,
291- InsertTimestamp : timestamp ,
292- }
293-
294- msg := PublishableMessagePayload {
295- Data : data ,
296- Type : data .GetType (),
297- Timestamp : timestamp ,
298- }
299-
300- msgJson , err := json .Marshal (msg )
301- if err != nil {
302- return nil , fmt .Errorf ("failed to marshal block data: %v" , err )
271+ if isReorg {
272+ log .Debug ().
273+ Uint64 ("chain_id" , data .ChainId ).
274+ Uint64 ("block_number" , block .Block .Number .Uint64 ()).
275+ Int ("tx_count" , len (block .Transactions )).
276+ Int ("log_count" , len (block .Logs )).
277+ Int ("trace_count" , len (block .Traces )).
278+ Bool ("is_deleted" , isDeleted ).
279+ Bool ("is_reorg" , isReorg ).
280+ Msg ("KafkaPublisher Message Reorg: Block metadata" )
303281 }
304282
305- return p .createRecord (data .GetType (), chainId , blockNumber , timestamp , false , false , msgJson )
283+ return p .createRecord (data .GetType (), data . ChainId , block . Block . Number . Uint64 () , timestamp , isDeleted , isReorg , msgJson )
306284}
307285
308286func (p * KafkaPublisher ) createRecord (msgType MessageType , chainId uint64 , blockNumber uint64 , timestamp time.Time , isDeleted bool , isReorg bool , msgJson []byte ) (* kgo.Record , error ) {
0 commit comments