Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 36 additions & 2 deletions internal/storage/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -788,6 +788,19 @@ func (c *ClickHouseConnector) GetMaxBlockNumber(chainId *big.Int) (maxBlockNumbe
return maxBlockNumber, nil
}

func (c *ClickHouseConnector) getMaxBlockNumberConsistent(chainId *big.Int) (maxBlockNumber *big.Int, err error) {
tableName := c.getTableName(chainId, "blocks")
query := fmt.Sprintf("SELECT block_number FROM %s.%s WHERE chain_id = ? ORDER BY block_number DESC LIMIT 1 SETTINGS select_sequential_consistency = 1", c.cfg.Database, tableName)
err = c.conn.QueryRow(context.Background(), query, chainId).Scan(&maxBlockNumber)
if err != nil {
if err == sql.ErrNoRows {
return big.NewInt(0), nil
}
return nil, err
}
return maxBlockNumber, nil
}

func (c *ClickHouseConnector) GetLastStagedBlockNumber(chainId *big.Int, rangeStart *big.Int, rangeEnd *big.Int) (maxBlockNumber *big.Int, err error) {
query := fmt.Sprintf("SELECT block_number FROM %s.block_data WHERE is_deleted = 0", c.cfg.Database)
if chainId.Sign() > 0 {
Expand Down Expand Up @@ -1136,7 +1149,8 @@ func (c *ClickHouseConnector) InsertBlockData(data []common.BlockData) error {
return nil
}

tableName := c.getTableName(data[0].Block.ChainId, "inserts_null_table")
chainId := data[0].Block.ChainId
tableName := c.getTableName(chainId, "inserts_null_table")
columns := []string{
"chain_id", "block", "transactions", "logs", "traces", "sign", "insert_timestamp",
}
Expand Down Expand Up @@ -1286,7 +1300,27 @@ func (c *ClickHouseConnector) InsertBlockData(data []common.BlockData) error {
}

if err := batch.Send(); err != nil {
return err
// if insert errors, it can actually still succeed in the background
// so we need to check if the consistent highest block matches the batch before we return an error
var highestBlockInBatch *big.Int
for _, blockData := range data[i:end] {
if highestBlockInBatch == nil || blockData.Block.Number.Cmp(highestBlockInBatch) > 0 {
highestBlockInBatch = blockData.Block.Number
}
}

time.Sleep(500 * time.Millisecond)

// Check if this matches the max consistent block
maxConsistentBlock, maxBlockErr := c.getMaxBlockNumberConsistent(chainId)
if maxBlockErr != nil || maxConsistentBlock.Cmp(highestBlockInBatch) != 0 {
if maxBlockErr != nil {
zLog.Error().Err(maxBlockErr).Msgf("Error getting consistent max block number for chain %s", chainId.String())
}
return err
} else {
zLog.Info().Err(err).Msgf("Failure while inserting block data, but insert still succeeded")
}
}
}

Expand Down