Skip to content

Commit 19f36fa

Browse files
committed
Batch multi-rows queries for postgres
1 parent 86675ea commit 19f36fa

File tree

1 file changed

+61
-100
lines changed

1 file changed

+61
-100
lines changed

internal/storage/postgres.go

Lines changed: 61 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -150,72 +150,54 @@ func (p *PostgresConnector) StoreBlockFailures(failures []common.BlockFailure) e
150150
return nil
151151
}
152152

153-
tx, err := p.db.Begin()
154-
if err != nil {
155-
return err
156-
}
157-
defer tx.Rollback()
158-
159-
query := `INSERT INTO block_failures (chain_id, block_number, last_error_timestamp, failure_count, reason)
160-
VALUES ($1, $2, $3, $4, $5)
161-
ON CONFLICT (chain_id, block_number)
162-
DO UPDATE SET
163-
last_error_timestamp = EXCLUDED.last_error_timestamp,
164-
failure_count = EXCLUDED.failure_count,
165-
reason = EXCLUDED.reason,
166-
updated_at = NOW()`
167-
168-
stmt, err := tx.Prepare(query)
169-
if err != nil {
170-
return err
171-
}
172-
defer stmt.Close()
173-
174-
for _, failure := range failures {
175-
_, err := stmt.Exec(
153+
// Build multi-row INSERT without transaction for better performance
154+
valueStrings := make([]string, 0, len(failures))
155+
valueArgs := make([]interface{}, 0, len(failures)*5)
156+
157+
for i, failure := range failures {
158+
valueStrings = append(valueStrings, fmt.Sprintf("($%d, $%d, $%d, $%d, $%d)",
159+
i*5+1, i*5+2, i*5+3, i*5+4, i*5+5))
160+
valueArgs = append(valueArgs,
176161
failure.ChainId.String(),
177162
failure.BlockNumber.String(),
178163
failure.FailureTime.Unix(),
179164
failure.FailureCount,
180165
failure.FailureReason,
181166
)
182-
if err != nil {
183-
return err
184-
}
185167
}
186168

187-
return tx.Commit()
169+
query := fmt.Sprintf(`INSERT INTO block_failures (chain_id, block_number, last_error_timestamp, failure_count, reason)
170+
VALUES %s
171+
ON CONFLICT (chain_id, block_number)
172+
DO UPDATE SET
173+
last_error_timestamp = EXCLUDED.last_error_timestamp,
174+
failure_count = EXCLUDED.failure_count,
175+
reason = EXCLUDED.reason,
176+
updated_at = NOW()`, strings.Join(valueStrings, ","))
177+
178+
_, err := p.db.Exec(query, valueArgs...)
179+
return err
188180
}
189181

190182
func (p *PostgresConnector) DeleteBlockFailures(failures []common.BlockFailure) error {
191183
if len(failures) == 0 {
192184
return nil
193185
}
194186

195-
tx, err := p.db.Begin()
196-
if err != nil {
197-
return err
198-
}
199-
defer tx.Rollback()
200-
201-
// Hard delete for block failures
202-
query := `DELETE FROM block_failures
203-
WHERE chain_id = $1 AND block_number = $2`
204-
205-
stmt, err := tx.Prepare(query)
206-
if err != nil {
207-
return err
208-
}
209-
defer stmt.Close()
210-
211-
for _, failure := range failures {
212-
_, err := stmt.Exec(failure.ChainId.String(), failure.BlockNumber.String())
213-
if err != nil {
214-
return err
215-
}
187+
// Build single DELETE query with all tuples
188+
tuples := make([]string, 0, len(failures))
189+
args := make([]interface{}, 0, len(failures)*2)
190+
191+
for i, failure := range failures {
192+
tuples = append(tuples, fmt.Sprintf("($%d, $%d)", i*2+1, i*2+2))
193+
args = append(args, failure.ChainId.String(), failure.BlockNumber.String())
216194
}
217-
218-
return tx.Commit()
195+
196+
query := fmt.Sprintf(`DELETE FROM block_failures
197+
WHERE (chain_id, block_number) IN (%s)`, strings.Join(tuples, ","))
198+
199+
_, err := p.db.Exec(query, args...)
200+
return err
219201
}
220202

221203
func (p *PostgresConnector) GetLastReorgCheckedBlockNumber(chainId *big.Int) (*big.Int, error) {
@@ -253,40 +235,32 @@ func (p *PostgresConnector) InsertStagingData(data []common.BlockData) error {
253235
return nil
254236
}
255237

256-
tx, err := p.db.Begin()
257-
if err != nil {
258-
return err
259-
}
260-
defer tx.Rollback()
261-
262-
query := `INSERT INTO block_data (chain_id, block_number, data)
263-
VALUES ($1, $2, $3)
264-
ON CONFLICT (chain_id, block_number)
265-
DO UPDATE SET data = EXCLUDED.data, updated_at = NOW()`
266-
267-
stmt, err := tx.Prepare(query)
268-
if err != nil {
269-
return err
270-
}
271-
defer stmt.Close()
272-
273-
for _, blockData := range data {
238+
// Build multi-row INSERT without transaction for better performance
239+
valueStrings := make([]string, 0, len(data))
240+
valueArgs := make([]interface{}, 0, len(data)*3)
241+
242+
for i, blockData := range data {
274243
blockDataJSON, err := json.Marshal(blockData)
275244
if err != nil {
276245
return err
277246
}
278-
279-
_, err = stmt.Exec(
247+
248+
valueStrings = append(valueStrings, fmt.Sprintf("($%d, $%d, $%d)",
249+
i*3+1, i*3+2, i*3+3))
250+
valueArgs = append(valueArgs,
280251
blockData.Block.ChainId.String(),
281252
blockData.Block.Number.String(),
282253
string(blockDataJSON),
283254
)
284-
if err != nil {
285-
return err
286-
}
287255
}
288256

289-
return tx.Commit()
257+
query := fmt.Sprintf(`INSERT INTO block_data (chain_id, block_number, data)
258+
VALUES %s
259+
ON CONFLICT (chain_id, block_number)
260+
DO UPDATE SET data = EXCLUDED.data, updated_at = NOW()`, strings.Join(valueStrings, ","))
261+
262+
_, err := p.db.Exec(query, valueArgs...)
263+
return err
290264
}
291265

292266
func (p *PostgresConnector) GetStagingData(qf QueryFilter) ([]common.BlockData, error) {
@@ -354,33 +328,20 @@ func (p *PostgresConnector) DeleteStagingData(data []common.BlockData) error {
354328
return nil
355329
}
356330

357-
tx, err := p.db.Begin()
358-
if err != nil {
359-
return err
360-
}
361-
defer tx.Rollback()
362-
363-
// Hard delete for staging data to save disk space
364-
query := `DELETE FROM block_data
365-
WHERE chain_id = $1 AND block_number = $2`
366-
367-
stmt, err := tx.Prepare(query)
368-
if err != nil {
369-
return err
370-
}
371-
defer stmt.Close()
372-
373-
for _, blockData := range data {
374-
_, err := stmt.Exec(
375-
blockData.Block.ChainId.String(),
376-
blockData.Block.Number.String(),
377-
)
378-
if err != nil {
379-
return err
380-
}
331+
// Build single DELETE query with all tuples
332+
tuples := make([]string, 0, len(data))
333+
args := make([]interface{}, 0, len(data)*2)
334+
335+
for i, blockData := range data {
336+
tuples = append(tuples, fmt.Sprintf("($%d, $%d)", i*2+1, i*2+2))
337+
args = append(args, blockData.Block.ChainId.String(), blockData.Block.Number.String())
381338
}
382-
383-
return tx.Commit()
339+
340+
query := fmt.Sprintf(`DELETE FROM block_data
341+
WHERE (chain_id, block_number) IN (%s)`, strings.Join(tuples, ","))
342+
343+
_, err := p.db.Exec(query, args...)
344+
return err
384345
}
385346

386347
func (p *PostgresConnector) GetLastStagedBlockNumber(chainId *big.Int, rangeStart *big.Int, rangeEnd *big.Int) (*big.Int, error) {

0 commit comments

Comments
 (0)