@@ -15,6 +15,7 @@ import (
1515 "github.com/ClickHouse/clickhouse-go/v2"
1616 "github.com/ClickHouse/clickhouse-go/v2/lib/driver"
1717 ethereum "github.com/ethereum/go-ethereum/common"
18+ "github.com/rs/zerolog/log"
1819 zLog "github.com/rs/zerolog/log"
1920 config "github.com/thirdweb-dev/indexer/configs"
2021 "github.com/thirdweb-dev/indexer/internal/common"
@@ -788,6 +789,19 @@ func (c *ClickHouseConnector) GetMaxBlockNumber(chainId *big.Int) (maxBlockNumbe
788789 return maxBlockNumber , nil
789790}
790791
792+ func (c * ClickHouseConnector ) getMaxBlockNumberConsistent (chainId * big.Int ) (maxBlockNumber * big.Int , err error ) {
793+ tableName := c .getTableName (chainId , "blocks" )
794+ query := fmt .Sprintf ("SELECT block_number FROM %s.%s WHERE chain_id = ? ORDER BY block_number DESC LIMIT 1 SETTINGS select_sequential_consistency = 1" , c .cfg .Database , tableName )
795+ err = c .conn .QueryRow (context .Background (), query , chainId ).Scan (& maxBlockNumber )
796+ if err != nil {
797+ if err == sql .ErrNoRows {
798+ return big .NewInt (0 ), nil
799+ }
800+ return nil , err
801+ }
802+ return maxBlockNumber , nil
803+ }
804+
791805func (c * ClickHouseConnector ) GetLastStagedBlockNumber (chainId * big.Int , rangeStart * big.Int , rangeEnd * big.Int ) (maxBlockNumber * big.Int , err error ) {
792806 query := fmt .Sprintf ("SELECT block_number FROM %s.block_data WHERE is_deleted = 0" , c .cfg .Database )
793807 if chainId .Sign () > 0 {
@@ -1136,7 +1150,8 @@ func (c *ClickHouseConnector) InsertBlockData(data []common.BlockData) error {
11361150 return nil
11371151 }
11381152
1139- tableName := c .getTableName (data [0 ].Block .ChainId , "inserts_null_table" )
1153+ chainId := data [0 ].Block .ChainId
1154+ tableName := c .getTableName (chainId , "inserts_null_table" )
11401155 columns := []string {
11411156 "chain_id" , "block" , "transactions" , "logs" , "traces" , "sign" , "insert_timestamp" ,
11421157 }
@@ -1286,7 +1301,27 @@ func (c *ClickHouseConnector) InsertBlockData(data []common.BlockData) error {
12861301 }
12871302
12881303 if err := batch .Send (); err != nil {
1289- return err
1304+ // if insert errors, it can actually still succeed in the background
1305+ // so we need to check if the consistent highest block matches the batch before we return an error
1306+ var highestBlockInBatch * big.Int
1307+ for _ , blockData := range data [i :end ] {
1308+ if highestBlockInBatch == nil || blockData .Block .Number .Cmp (highestBlockInBatch ) > 0 {
1309+ highestBlockInBatch = blockData .Block .Number
1310+ }
1311+ }
1312+
1313+ time .Sleep (500 * time .Millisecond )
1314+
1315+ // Check if this matches the max consistent block
1316+ maxConsistentBlock , maxBlockErr := c .getMaxBlockNumberConsistent (chainId )
1317+ if maxBlockErr != nil || maxConsistentBlock .Cmp (highestBlockInBatch ) != 0 {
1318+ if maxBlockErr != nil {
1319+ log .Error ().Err (maxBlockErr ).Msgf ("Error getting consistent max block number for chain %s" , chainId .String ())
1320+ }
1321+ return err
1322+ } else {
1323+ log .Info ().Err (err ).Msgf ("Failure while inserting block data, but insert still succeeded" )
1324+ }
12901325 }
12911326 }
12921327
0 commit comments