diff --git a/internal/storage/postgres.go b/internal/storage/postgres.go index 5432a88..9498e8d 100644 --- a/internal/storage/postgres.go +++ b/internal/storage/postgres.go @@ -150,41 +150,33 @@ func (p *PostgresConnector) StoreBlockFailures(failures []common.BlockFailure) e return nil } - tx, err := p.db.Begin() - if err != nil { - return err - } - defer tx.Rollback() - - query := `INSERT INTO block_failures (chain_id, block_number, last_error_timestamp, failure_count, reason) - VALUES ($1, $2, $3, $4, $5) - ON CONFLICT (chain_id, block_number) - DO UPDATE SET - last_error_timestamp = EXCLUDED.last_error_timestamp, - failure_count = EXCLUDED.failure_count, - reason = EXCLUDED.reason, - updated_at = NOW()` - - stmt, err := tx.Prepare(query) - if err != nil { - return err - } - defer stmt.Close() - - for _, failure := range failures { - _, err := stmt.Exec( + // Build multi-row INSERT without transaction for better performance + valueStrings := make([]string, 0, len(failures)) + valueArgs := make([]interface{}, 0, len(failures)*5) + + for i, failure := range failures { + valueStrings = append(valueStrings, fmt.Sprintf("($%d, $%d, $%d, $%d, $%d)", + i*5+1, i*5+2, i*5+3, i*5+4, i*5+5)) + valueArgs = append(valueArgs, failure.ChainId.String(), failure.BlockNumber.String(), failure.FailureTime.Unix(), failure.FailureCount, failure.FailureReason, ) - if err != nil { - return err - } } - return tx.Commit() + query := fmt.Sprintf(`INSERT INTO block_failures (chain_id, block_number, last_error_timestamp, failure_count, reason) + VALUES %s + ON CONFLICT (chain_id, block_number) + DO UPDATE SET + last_error_timestamp = EXCLUDED.last_error_timestamp, + failure_count = EXCLUDED.failure_count, + reason = EXCLUDED.reason, + updated_at = NOW()`, strings.Join(valueStrings, ",")) + + _, err := p.db.Exec(query, valueArgs...) + return err } func (p *PostgresConnector) DeleteBlockFailures(failures []common.BlockFailure) error { @@ -192,30 +184,20 @@ func (p *PostgresConnector) DeleteBlockFailures(failures []common.BlockFailure) return nil } - tx, err := p.db.Begin() - if err != nil { - return err - } - defer tx.Rollback() - - // Hard delete for block failures - query := `DELETE FROM block_failures - WHERE chain_id = $1 AND block_number = $2` + // Build single DELETE query with all tuples + tuples := make([]string, 0, len(failures)) + args := make([]interface{}, 0, len(failures)*2) - stmt, err := tx.Prepare(query) - if err != nil { - return err + for i, failure := range failures { + tuples = append(tuples, fmt.Sprintf("($%d, $%d)", i*2+1, i*2+2)) + args = append(args, failure.ChainId.String(), failure.BlockNumber.String()) } - defer stmt.Close() - for _, failure := range failures { - _, err := stmt.Exec(failure.ChainId.String(), failure.BlockNumber.String()) - if err != nil { - return err - } - } + query := fmt.Sprintf(`DELETE FROM block_failures + WHERE (chain_id, block_number) IN (%s)`, strings.Join(tuples, ",")) - return tx.Commit() + _, err := p.db.Exec(query, args...) + return err } func (p *PostgresConnector) GetLastReorgCheckedBlockNumber(chainId *big.Int) (*big.Int, error) { @@ -253,40 +235,32 @@ func (p *PostgresConnector) InsertStagingData(data []common.BlockData) error { return nil } - tx, err := p.db.Begin() - if err != nil { - return err - } - defer tx.Rollback() - - query := `INSERT INTO block_data (chain_id, block_number, data) - VALUES ($1, $2, $3) - ON CONFLICT (chain_id, block_number) - DO UPDATE SET data = EXCLUDED.data, updated_at = NOW()` - - stmt, err := tx.Prepare(query) - if err != nil { - return err - } - defer stmt.Close() + // Build multi-row INSERT without transaction for better performance + valueStrings := make([]string, 0, len(data)) + valueArgs := make([]interface{}, 0, len(data)*3) - for _, blockData := range data { + for i, blockData := range data { blockDataJSON, err := json.Marshal(blockData) if err != nil { return err } - _, err = stmt.Exec( + valueStrings = append(valueStrings, fmt.Sprintf("($%d, $%d, $%d)", + i*3+1, i*3+2, i*3+3)) + valueArgs = append(valueArgs, blockData.Block.ChainId.String(), blockData.Block.Number.String(), string(blockDataJSON), ) - if err != nil { - return err - } } - return tx.Commit() + query := fmt.Sprintf(`INSERT INTO block_data (chain_id, block_number, data) + VALUES %s + ON CONFLICT (chain_id, block_number) + DO UPDATE SET data = EXCLUDED.data, updated_at = NOW()`, strings.Join(valueStrings, ",")) + + _, err := p.db.Exec(query, valueArgs...) + return err } func (p *PostgresConnector) GetStagingData(qf QueryFilter) ([]common.BlockData, error) { @@ -354,33 +328,20 @@ func (p *PostgresConnector) DeleteStagingData(data []common.BlockData) error { return nil } - tx, err := p.db.Begin() - if err != nil { - return err - } - defer tx.Rollback() + // Build single DELETE query with all tuples + tuples := make([]string, 0, len(data)) + args := make([]interface{}, 0, len(data)*2) - // Hard delete for staging data to save disk space - query := `DELETE FROM block_data - WHERE chain_id = $1 AND block_number = $2` - - stmt, err := tx.Prepare(query) - if err != nil { - return err + for i, blockData := range data { + tuples = append(tuples, fmt.Sprintf("($%d, $%d)", i*2+1, i*2+2)) + args = append(args, blockData.Block.ChainId.String(), blockData.Block.Number.String()) } - defer stmt.Close() - for _, blockData := range data { - _, err := stmt.Exec( - blockData.Block.ChainId.String(), - blockData.Block.Number.String(), - ) - if err != nil { - return err - } - } + query := fmt.Sprintf(`DELETE FROM block_data + WHERE (chain_id, block_number) IN (%s)`, strings.Join(tuples, ",")) - return tx.Commit() + _, err := p.db.Exec(query, args...) + return err } func (p *PostgresConnector) GetLastStagedBlockNumber(chainId *big.Int, rangeStart *big.Int, rangeEnd *big.Int) (*big.Int, error) {