Skip to content

Commit 340a304

Browse files
authored
fix: parallelize re-org update queries (#1835)
1 parent 3535c11 commit 340a304

File tree

2 files changed

+263
-229
lines changed

2 files changed

+263
-229
lines changed

src/datastore/helpers.ts

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { parseEnum, unwrapOptionalProp } from '../helpers';
1+
import { getUintEnvOrDefault, parseEnum, unwrapOptionalProp } from '../helpers';
22
import {
33
BlockQueryResult,
44
ContractTxQueryResult,
@@ -66,6 +66,7 @@ import { PgStoreEventEmitter } from './pg-store-event-emitter';
6666
import { SyntheticPoxEventName } from '../pox-helpers';
6767
import { logger } from '../logger';
6868
import { PgSqlClient } from '@hirosystems/api-toolkit';
69+
import PQueue from 'p-queue';
6970

7071
export const TX_COLUMNS = [
7172
'tx_id',
@@ -1335,3 +1336,21 @@ export function newReOrgUpdatedEntities(): ReOrgUpdatedEntities {
13351336
restoredMempoolTxs: 0,
13361337
};
13371338
}
1339+
1340+
/**
1341+
* Priority queue for parallel Postgres write query execution. This helps performance because it
1342+
* parallelizes the work postgres.js has to do when serializing JS types to PG types.
1343+
*/
1344+
export class PgWriteQueue {
1345+
readonly queue: PQueue;
1346+
constructor() {
1347+
const concurrency = Math.max(1, getUintEnvOrDefault('STACKS_BLOCK_DATA_INSERT_CONCURRENCY', 4));
1348+
this.queue = new PQueue({ concurrency, autoStart: true });
1349+
}
1350+
enqueue(task: Parameters<PQueue['add']>[0]): void {
1351+
void this.queue.add(task);
1352+
}
1353+
done(): Promise<void> {
1354+
return this.queue.onIdle();
1355+
}
1356+
}

0 commit comments

Comments
 (0)