@@ -788,6 +788,19 @@ func (c *ClickHouseConnector) GetMaxBlockNumber(chainId *big.Int) (maxBlockNumbe
788788 return maxBlockNumber , nil
789789}
790790
791+ func (c * ClickHouseConnector ) getMaxBlockNumberConsistent (chainId * big.Int ) (maxBlockNumber * big.Int , err error ) {
792+ tableName := c .getTableName (chainId , "blocks" )
793+ query := fmt .Sprintf ("SELECT block_number FROM %s.%s WHERE chain_id = ? ORDER BY block_number DESC LIMIT 1 SETTINGS select_sequential_consistency = 1" , c .cfg .Database , tableName )
794+ err = c .conn .QueryRow (context .Background (), query , chainId ).Scan (& maxBlockNumber )
795+ if err != nil {
796+ if err == sql .ErrNoRows {
797+ return big .NewInt (0 ), nil
798+ }
799+ return nil , err
800+ }
801+ return maxBlockNumber , nil
802+ }
803+
791804func (c * ClickHouseConnector ) GetLastStagedBlockNumber (chainId * big.Int , rangeStart * big.Int , rangeEnd * big.Int ) (maxBlockNumber * big.Int , err error ) {
792805 query := fmt .Sprintf ("SELECT block_number FROM %s.block_data WHERE is_deleted = 0" , c .cfg .Database )
793806 if chainId .Sign () > 0 {
@@ -1136,7 +1149,8 @@ func (c *ClickHouseConnector) InsertBlockData(data []common.BlockData) error {
11361149 return nil
11371150 }
11381151
1139- tableName := c .getTableName (data [0 ].Block .ChainId , "inserts_null_table" )
1152+ chainId := data [0 ].Block .ChainId
1153+ tableName := c .getTableName (chainId , "inserts_null_table" )
11401154 columns := []string {
11411155 "chain_id" , "block" , "transactions" , "logs" , "traces" , "sign" , "insert_timestamp" ,
11421156 }
@@ -1286,7 +1300,27 @@ func (c *ClickHouseConnector) InsertBlockData(data []common.BlockData) error {
12861300 }
12871301
12881302 if err := batch .Send (); err != nil {
1289- return err
1303+ // if insert errors, it can actually still succeed in the background
1304+ // so we need to check if the consistent highest block matches the batch before we return an error
1305+ var highestBlockInBatch * big.Int
1306+ for _ , blockData := range data [i :end ] {
1307+ if highestBlockInBatch == nil || blockData .Block .Number .Cmp (highestBlockInBatch ) > 0 {
1308+ highestBlockInBatch = blockData .Block .Number
1309+ }
1310+ }
1311+
1312+ time .Sleep (500 * time .Millisecond )
1313+
1314+ // Check if this matches the max consistent block
1315+ maxConsistentBlock , maxBlockErr := c .getMaxBlockNumberConsistent (chainId )
1316+ if maxBlockErr != nil || maxConsistentBlock .Cmp (highestBlockInBatch ) != 0 {
1317+ if maxBlockErr != nil {
1318+ zLog .Error ().Err (maxBlockErr ).Msgf ("Error getting consistent max block number for chain %s" , chainId .String ())
1319+ }
1320+ return err
1321+ } else {
1322+ zLog .Info ().Err (err ).Msgf ("Failure while inserting block data, but insert still succeeded" )
1323+ }
12901324 }
12911325 }
12921326
0 commit comments