From 19f36fab36f1ed11ddc9b382d6f78316fd40e2ba Mon Sep 17 00:00:00 2001 From: Jake Loo <2171134+jakeloo@users.noreply.github.com> Date: Thu, 7 Aug 2025 15:02:28 +0000 Subject: [PATCH 1/2] Batch multi-rows queries for postgres --- internal/storage/postgres.go | 161 +++++++++++++---------------------- 1 file changed, 61 insertions(+), 100 deletions(-) diff --git a/internal/storage/postgres.go b/internal/storage/postgres.go index 5432a88..0598e19 100644 --- a/internal/storage/postgres.go +++ b/internal/storage/postgres.go @@ -150,41 +150,33 @@ func (p *PostgresConnector) StoreBlockFailures(failures []common.BlockFailure) e return nil } - tx, err := p.db.Begin() - if err != nil { - return err - } - defer tx.Rollback() - - query := `INSERT INTO block_failures (chain_id, block_number, last_error_timestamp, failure_count, reason) - VALUES ($1, $2, $3, $4, $5) - ON CONFLICT (chain_id, block_number) - DO UPDATE SET - last_error_timestamp = EXCLUDED.last_error_timestamp, - failure_count = EXCLUDED.failure_count, - reason = EXCLUDED.reason, - updated_at = NOW()` - - stmt, err := tx.Prepare(query) - if err != nil { - return err - } - defer stmt.Close() - - for _, failure := range failures { - _, err := stmt.Exec( + // Build multi-row INSERT without transaction for better performance + valueStrings := make([]string, 0, len(failures)) + valueArgs := make([]interface{}, 0, len(failures)*5) + + for i, failure := range failures { + valueStrings = append(valueStrings, fmt.Sprintf("($%d, $%d, $%d, $%d, $%d)", + i*5+1, i*5+2, i*5+3, i*5+4, i*5+5)) + valueArgs = append(valueArgs, failure.ChainId.String(), failure.BlockNumber.String(), failure.FailureTime.Unix(), failure.FailureCount, failure.FailureReason, ) - if err != nil { - return err - } } - return tx.Commit() + query := fmt.Sprintf(`INSERT INTO block_failures (chain_id, block_number, last_error_timestamp, failure_count, reason) + VALUES %s + ON CONFLICT (chain_id, block_number) + DO UPDATE SET + last_error_timestamp = EXCLUDED.last_error_timestamp, + failure_count = EXCLUDED.failure_count, + reason = EXCLUDED.reason, + updated_at = NOW()`, strings.Join(valueStrings, ",")) + + _, err := p.db.Exec(query, valueArgs...) + return err } func (p *PostgresConnector) DeleteBlockFailures(failures []common.BlockFailure) error { @@ -192,30 +184,20 @@ func (p *PostgresConnector) DeleteBlockFailures(failures []common.BlockFailure) return nil } - tx, err := p.db.Begin() - if err != nil { - return err - } - defer tx.Rollback() - - // Hard delete for block failures - query := `DELETE FROM block_failures - WHERE chain_id = $1 AND block_number = $2` - - stmt, err := tx.Prepare(query) - if err != nil { - return err - } - defer stmt.Close() - - for _, failure := range failures { - _, err := stmt.Exec(failure.ChainId.String(), failure.BlockNumber.String()) - if err != nil { - return err - } + // Build single DELETE query with all tuples + tuples := make([]string, 0, len(failures)) + args := make([]interface{}, 0, len(failures)*2) + + for i, failure := range failures { + tuples = append(tuples, fmt.Sprintf("($%d, $%d)", i*2+1, i*2+2)) + args = append(args, failure.ChainId.String(), failure.BlockNumber.String()) } - - return tx.Commit() + + query := fmt.Sprintf(`DELETE FROM block_failures + WHERE (chain_id, block_number) IN (%s)`, strings.Join(tuples, ",")) + + _, err := p.db.Exec(query, args...) + return err } func (p *PostgresConnector) GetLastReorgCheckedBlockNumber(chainId *big.Int) (*big.Int, error) { @@ -253,40 +235,32 @@ func (p *PostgresConnector) InsertStagingData(data []common.BlockData) error { return nil } - tx, err := p.db.Begin() - if err != nil { - return err - } - defer tx.Rollback() - - query := `INSERT INTO block_data (chain_id, block_number, data) - VALUES ($1, $2, $3) - ON CONFLICT (chain_id, block_number) - DO UPDATE SET data = EXCLUDED.data, updated_at = NOW()` - - stmt, err := tx.Prepare(query) - if err != nil { - return err - } - defer stmt.Close() - - for _, blockData := range data { + // Build multi-row INSERT without transaction for better performance + valueStrings := make([]string, 0, len(data)) + valueArgs := make([]interface{}, 0, len(data)*3) + + for i, blockData := range data { blockDataJSON, err := json.Marshal(blockData) if err != nil { return err } - - _, err = stmt.Exec( + + valueStrings = append(valueStrings, fmt.Sprintf("($%d, $%d, $%d)", + i*3+1, i*3+2, i*3+3)) + valueArgs = append(valueArgs, blockData.Block.ChainId.String(), blockData.Block.Number.String(), string(blockDataJSON), ) - if err != nil { - return err - } } - return tx.Commit() + query := fmt.Sprintf(`INSERT INTO block_data (chain_id, block_number, data) + VALUES %s + ON CONFLICT (chain_id, block_number) + DO UPDATE SET data = EXCLUDED.data, updated_at = NOW()`, strings.Join(valueStrings, ",")) + + _, err := p.db.Exec(query, valueArgs...) + return err } func (p *PostgresConnector) GetStagingData(qf QueryFilter) ([]common.BlockData, error) { @@ -354,33 +328,20 @@ func (p *PostgresConnector) DeleteStagingData(data []common.BlockData) error { return nil } - tx, err := p.db.Begin() - if err != nil { - return err - } - defer tx.Rollback() - - // Hard delete for staging data to save disk space - query := `DELETE FROM block_data - WHERE chain_id = $1 AND block_number = $2` - - stmt, err := tx.Prepare(query) - if err != nil { - return err - } - defer stmt.Close() - - for _, blockData := range data { - _, err := stmt.Exec( - blockData.Block.ChainId.String(), - blockData.Block.Number.String(), - ) - if err != nil { - return err - } + // Build single DELETE query with all tuples + tuples := make([]string, 0, len(data)) + args := make([]interface{}, 0, len(data)*2) + + for i, blockData := range data { + tuples = append(tuples, fmt.Sprintf("($%d, $%d)", i*2+1, i*2+2)) + args = append(args, blockData.Block.ChainId.String(), blockData.Block.Number.String()) } - - return tx.Commit() + + query := fmt.Sprintf(`DELETE FROM block_data + WHERE (chain_id, block_number) IN (%s)`, strings.Join(tuples, ",")) + + _, err := p.db.Exec(query, args...) + return err } func (p *PostgresConnector) GetLastStagedBlockNumber(chainId *big.Int, rangeStart *big.Int, rangeEnd *big.Int) (*big.Int, error) { From 1a67ed3dec8855b0cc41aabf1f353a8494efbb1d Mon Sep 17 00:00:00 2001 From: Jake Loo <2171134+jakeloo@users.noreply.github.com> Date: Thu, 7 Aug 2025 15:02:50 +0000 Subject: [PATCH 2/2] gofmt --- internal/storage/postgres.go | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/internal/storage/postgres.go b/internal/storage/postgres.go index 0598e19..9498e8d 100644 --- a/internal/storage/postgres.go +++ b/internal/storage/postgres.go @@ -153,11 +153,11 @@ func (p *PostgresConnector) StoreBlockFailures(failures []common.BlockFailure) e // Build multi-row INSERT without transaction for better performance valueStrings := make([]string, 0, len(failures)) valueArgs := make([]interface{}, 0, len(failures)*5) - + for i, failure := range failures { - valueStrings = append(valueStrings, fmt.Sprintf("($%d, $%d, $%d, $%d, $%d)", + valueStrings = append(valueStrings, fmt.Sprintf("($%d, $%d, $%d, $%d, $%d)", i*5+1, i*5+2, i*5+3, i*5+4, i*5+5)) - valueArgs = append(valueArgs, + valueArgs = append(valueArgs, failure.ChainId.String(), failure.BlockNumber.String(), failure.FailureTime.Unix(), @@ -187,15 +187,15 @@ func (p *PostgresConnector) DeleteBlockFailures(failures []common.BlockFailure) // Build single DELETE query with all tuples tuples := make([]string, 0, len(failures)) args := make([]interface{}, 0, len(failures)*2) - + for i, failure := range failures { tuples = append(tuples, fmt.Sprintf("($%d, $%d)", i*2+1, i*2+2)) args = append(args, failure.ChainId.String(), failure.BlockNumber.String()) } - + query := fmt.Sprintf(`DELETE FROM block_failures WHERE (chain_id, block_number) IN (%s)`, strings.Join(tuples, ",")) - + _, err := p.db.Exec(query, args...) return err } @@ -238,14 +238,14 @@ func (p *PostgresConnector) InsertStagingData(data []common.BlockData) error { // Build multi-row INSERT without transaction for better performance valueStrings := make([]string, 0, len(data)) valueArgs := make([]interface{}, 0, len(data)*3) - + for i, blockData := range data { blockDataJSON, err := json.Marshal(blockData) if err != nil { return err } - - valueStrings = append(valueStrings, fmt.Sprintf("($%d, $%d, $%d)", + + valueStrings = append(valueStrings, fmt.Sprintf("($%d, $%d, $%d)", i*3+1, i*3+2, i*3+3)) valueArgs = append(valueArgs, blockData.Block.ChainId.String(), @@ -331,15 +331,15 @@ func (p *PostgresConnector) DeleteStagingData(data []common.BlockData) error { // Build single DELETE query with all tuples tuples := make([]string, 0, len(data)) args := make([]interface{}, 0, len(data)*2) - + for i, blockData := range data { tuples = append(tuples, fmt.Sprintf("($%d, $%d)", i*2+1, i*2+2)) args = append(args, blockData.Block.ChainId.String(), blockData.Block.Number.String()) } - + query := fmt.Sprintf(`DELETE FROM block_data WHERE (chain_id, block_number) IN (%s)`, strings.Join(tuples, ",")) - + _, err := p.db.Exec(query, args...) return err }