Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 53 additions & 92 deletions internal/storage/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,72 +150,54 @@ func (p *PostgresConnector) StoreBlockFailures(failures []common.BlockFailure) e
return nil
}

tx, err := p.db.Begin()
if err != nil {
return err
}
defer tx.Rollback()

query := `INSERT INTO block_failures (chain_id, block_number, last_error_timestamp, failure_count, reason)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (chain_id, block_number)
DO UPDATE SET
last_error_timestamp = EXCLUDED.last_error_timestamp,
failure_count = EXCLUDED.failure_count,
reason = EXCLUDED.reason,
updated_at = NOW()`

stmt, err := tx.Prepare(query)
if err != nil {
return err
}
defer stmt.Close()

for _, failure := range failures {
_, err := stmt.Exec(
// Build multi-row INSERT without transaction for better performance
valueStrings := make([]string, 0, len(failures))
valueArgs := make([]interface{}, 0, len(failures)*5)

for i, failure := range failures {
valueStrings = append(valueStrings, fmt.Sprintf("($%d, $%d, $%d, $%d, $%d)",
i*5+1, i*5+2, i*5+3, i*5+4, i*5+5))
valueArgs = append(valueArgs,
failure.ChainId.String(),
failure.BlockNumber.String(),
failure.FailureTime.Unix(),
failure.FailureCount,
failure.FailureReason,
)
if err != nil {
return err
}
}

return tx.Commit()
query := fmt.Sprintf(`INSERT INTO block_failures (chain_id, block_number, last_error_timestamp, failure_count, reason)
VALUES %s
ON CONFLICT (chain_id, block_number)
DO UPDATE SET
last_error_timestamp = EXCLUDED.last_error_timestamp,
failure_count = EXCLUDED.failure_count,
reason = EXCLUDED.reason,
updated_at = NOW()`, strings.Join(valueStrings, ","))

_, err := p.db.Exec(query, valueArgs...)
return err
}

func (p *PostgresConnector) DeleteBlockFailures(failures []common.BlockFailure) error {
if len(failures) == 0 {
return nil
}

tx, err := p.db.Begin()
if err != nil {
return err
}
defer tx.Rollback()

// Hard delete for block failures
query := `DELETE FROM block_failures
WHERE chain_id = $1 AND block_number = $2`
// Build single DELETE query with all tuples
tuples := make([]string, 0, len(failures))
args := make([]interface{}, 0, len(failures)*2)

stmt, err := tx.Prepare(query)
if err != nil {
return err
for i, failure := range failures {
tuples = append(tuples, fmt.Sprintf("($%d, $%d)", i*2+1, i*2+2))
args = append(args, failure.ChainId.String(), failure.BlockNumber.String())
}
defer stmt.Close()

for _, failure := range failures {
_, err := stmt.Exec(failure.ChainId.String(), failure.BlockNumber.String())
if err != nil {
return err
}
}
query := fmt.Sprintf(`DELETE FROM block_failures
WHERE (chain_id, block_number) IN (%s)`, strings.Join(tuples, ","))

return tx.Commit()
_, err := p.db.Exec(query, args...)
return err
}

func (p *PostgresConnector) GetLastReorgCheckedBlockNumber(chainId *big.Int) (*big.Int, error) {
Expand Down Expand Up @@ -253,40 +235,32 @@ func (p *PostgresConnector) InsertStagingData(data []common.BlockData) error {
return nil
}

tx, err := p.db.Begin()
if err != nil {
return err
}
defer tx.Rollback()

query := `INSERT INTO block_data (chain_id, block_number, data)
VALUES ($1, $2, $3)
ON CONFLICT (chain_id, block_number)
DO UPDATE SET data = EXCLUDED.data, updated_at = NOW()`

stmt, err := tx.Prepare(query)
if err != nil {
return err
}
defer stmt.Close()
// Build multi-row INSERT without transaction for better performance
valueStrings := make([]string, 0, len(data))
valueArgs := make([]interface{}, 0, len(data)*3)

for _, blockData := range data {
for i, blockData := range data {
blockDataJSON, err := json.Marshal(blockData)
if err != nil {
return err
}

_, err = stmt.Exec(
valueStrings = append(valueStrings, fmt.Sprintf("($%d, $%d, $%d)",
i*3+1, i*3+2, i*3+3))
valueArgs = append(valueArgs,
blockData.Block.ChainId.String(),
blockData.Block.Number.String(),
string(blockDataJSON),
)
if err != nil {
return err
}
}

return tx.Commit()
query := fmt.Sprintf(`INSERT INTO block_data (chain_id, block_number, data)
VALUES %s
ON CONFLICT (chain_id, block_number)
DO UPDATE SET data = EXCLUDED.data, updated_at = NOW()`, strings.Join(valueStrings, ","))

_, err := p.db.Exec(query, valueArgs...)
return err
}

func (p *PostgresConnector) GetStagingData(qf QueryFilter) ([]common.BlockData, error) {
Expand Down Expand Up @@ -354,33 +328,20 @@ func (p *PostgresConnector) DeleteStagingData(data []common.BlockData) error {
return nil
}

tx, err := p.db.Begin()
if err != nil {
return err
}
defer tx.Rollback()
// Build single DELETE query with all tuples
tuples := make([]string, 0, len(data))
args := make([]interface{}, 0, len(data)*2)

// Hard delete for staging data to save disk space
query := `DELETE FROM block_data
WHERE chain_id = $1 AND block_number = $2`

stmt, err := tx.Prepare(query)
if err != nil {
return err
for i, blockData := range data {
tuples = append(tuples, fmt.Sprintf("($%d, $%d)", i*2+1, i*2+2))
args = append(args, blockData.Block.ChainId.String(), blockData.Block.Number.String())
}
defer stmt.Close()

for _, blockData := range data {
_, err := stmt.Exec(
blockData.Block.ChainId.String(),
blockData.Block.Number.String(),
)
if err != nil {
return err
}
}
query := fmt.Sprintf(`DELETE FROM block_data
WHERE (chain_id, block_number) IN (%s)`, strings.Join(tuples, ","))

return tx.Commit()
_, err := p.db.Exec(query, args...)
return err
}

func (p *PostgresConnector) GetLastStagedBlockNumber(chainId *big.Int, rangeStart *big.Int, rangeEnd *big.Int) (*big.Int, error) {
Expand Down