diff --git a/internal/storage/clickhouse.go b/internal/storage/clickhouse.go index d8b2313..beb4a9c 100644 --- a/internal/storage/clickhouse.go +++ b/internal/storage/clickhouse.go @@ -788,6 +788,19 @@ func (c *ClickHouseConnector) GetMaxBlockNumber(chainId *big.Int) (maxBlockNumbe return maxBlockNumber, nil } +func (c *ClickHouseConnector) getMaxBlockNumberConsistent(chainId *big.Int) (maxBlockNumber *big.Int, err error) { + tableName := c.getTableName(chainId, "blocks") + query := fmt.Sprintf("SELECT block_number FROM %s.%s WHERE chain_id = ? ORDER BY block_number DESC LIMIT 1 SETTINGS select_sequential_consistency = 1", c.cfg.Database, tableName) + err = c.conn.QueryRow(context.Background(), query, chainId).Scan(&maxBlockNumber) + if err != nil { + if err == sql.ErrNoRows { + return big.NewInt(0), nil + } + return nil, err + } + return maxBlockNumber, nil +} + func (c *ClickHouseConnector) GetLastStagedBlockNumber(chainId *big.Int, rangeStart *big.Int, rangeEnd *big.Int) (maxBlockNumber *big.Int, err error) { query := fmt.Sprintf("SELECT block_number FROM %s.block_data WHERE is_deleted = 0", c.cfg.Database) if chainId.Sign() > 0 { @@ -1136,7 +1149,8 @@ func (c *ClickHouseConnector) InsertBlockData(data []common.BlockData) error { return nil } - tableName := c.getTableName(data[0].Block.ChainId, "inserts_null_table") + chainId := data[0].Block.ChainId + tableName := c.getTableName(chainId, "inserts_null_table") columns := []string{ "chain_id", "block", "transactions", "logs", "traces", "sign", "insert_timestamp", } @@ -1286,7 +1300,27 @@ func (c *ClickHouseConnector) InsertBlockData(data []common.BlockData) error { } if err := batch.Send(); err != nil { - return err + // if insert errors, it can actually still succeed in the background + // so we need to check if the consistent highest block matches the batch before we return an error + var highestBlockInBatch *big.Int + for _, blockData := range data[i:end] { + if highestBlockInBatch == nil || blockData.Block.Number.Cmp(highestBlockInBatch) > 0 { + highestBlockInBatch = blockData.Block.Number + } + } + + time.Sleep(500 * time.Millisecond) + + // Check if this matches the max consistent block + maxConsistentBlock, maxBlockErr := c.getMaxBlockNumberConsistent(chainId) + if maxBlockErr != nil || maxConsistentBlock.Cmp(highestBlockInBatch) != 0 { + if maxBlockErr != nil { + zLog.Error().Err(maxBlockErr).Msgf("Error getting consistent max block number for chain %s", chainId.String()) + } + return err + } else { + zLog.Info().Err(err).Msgf("Failure while inserting block data, but insert still succeeded") + } } }