Skip to content

Commit 057c541

Browse files
authored
fix: refresh materialized views concurrently (#1270)
* feat: refresh materialized views concurrently * fix: do not refresh concurrently on tests
1 parent 7190974 commit 057c541

File tree

2 files changed

+106
-6
lines changed

2 files changed

+106
-6
lines changed

src/datastore/postgres-store.ts

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1131,7 +1131,8 @@ export class PgDataStore
11311131
}
11321132

11331133
async getChainTip(
1134-
client: ClientBase
1134+
client: ClientBase,
1135+
useMaterializedView = true
11351136
): Promise<{ blockHeight: number; blockHash: string; indexBlockHash: string }> {
11361137
const currentTipBlock = await client.query<{
11371138
block_height: number;
@@ -1141,7 +1142,7 @@ export class PgDataStore
11411142
// The `chain_tip` materialized view is not available during event replay.
11421143
// Since `getChainTip()` is used heavily during event ingestion, we'll fall back to
11431144
// a classic query.
1144-
this.eventReplay
1145+
this.eventReplay || !useMaterializedView
11451146
? `
11461147
SELECT block_height, block_hash, index_block_hash
11471148
FROM blocks
@@ -1181,7 +1182,7 @@ export class PgDataStore
11811182
// Sanity check: ensure incoming microblocks have a `parent_index_block_hash` that matches the API's
11821183
// current known canonical chain tip. We assume this holds true so incoming microblock data is always
11831184
// treated as being built off the current canonical anchor block.
1184-
const chainTip = await this.getChainTip(client);
1185+
const chainTip = await this.getChainTip(client, false);
11851186
const nonCanonicalMicroblock = data.microblocks.find(
11861187
mb => mb.parent_index_block_hash !== chainTip.indexBlockHash
11871188
);
@@ -1312,7 +1313,7 @@ export class PgDataStore
13121313
async update(data: DataStoreBlockUpdateData): Promise<void> {
13131314
const tokenMetadataQueueEntries: DbTokenMetadataQueueEntry[] = [];
13141315
await this.queryTx(async client => {
1315-
const chainTip = await this.getChainTip(client);
1316+
const chainTip = await this.getChainTip(client, false);
13161317
await this.handleReorg(client, data.block, chainTip.blockHeight);
13171318
// If the incoming block is not of greater height than current chain tip, then store data as non-canonical.
13181319
const isCanonical = data.block.block_height > chainTip.blockHeight;
@@ -3564,7 +3565,7 @@ export class PgDataStore
35643565
async updateMempoolTxs({ mempoolTxs: txs }: { mempoolTxs: DbMempoolTx[] }): Promise<void> {
35653566
const updatedTxs: DbMempoolTx[] = [];
35663567
await this.queryTx(async client => {
3567-
const chainTip = await this.getChainTip(client);
3568+
const chainTip = await this.getChainTip(client, false);
35683569
for (const tx of txs) {
35693570
const result = await client.query(
35703571
`
@@ -5434,7 +5435,8 @@ export class PgDataStore
54345435
if (this.eventReplay && skipDuringEventReplay) {
54355436
return;
54365437
}
5437-
await client.query(`REFRESH MATERIALIZED VIEW ${viewName}`);
5438+
const concurrently = isProdEnv ? 'CONCURRENTLY' : '';
5439+
await client.query(`REFRESH MATERIALIZED VIEW ${concurrently} ${viewName}`);
54385440
}
54395441

54405442
async getSmartContractByTrait(args: {
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/* eslint-disable @typescript-eslint/camelcase */
2+
import { MigrationBuilder, ColumnDefinitions } from 'node-pg-migrate';
3+
4+
export const shorthands: ColumnDefinitions | undefined = undefined;
5+
6+
export async function up(pgm: MigrationBuilder): Promise<void> {
7+
// Add LIMIT 1 to chain_tip view so we can add the uniqueness index for `block_height`.
8+
pgm.dropMaterializedView('chain_tip');
9+
pgm.createMaterializedView('chain_tip', {}, `
10+
WITH block_tip AS (
11+
SELECT block_height, block_hash, index_block_hash
12+
FROM blocks
13+
WHERE block_height = (SELECT MAX(block_height) FROM blocks WHERE canonical = TRUE)
14+
),
15+
microblock_tip AS (
16+
SELECT microblock_hash, microblock_sequence
17+
FROM microblocks, block_tip
18+
WHERE microblocks.parent_index_block_hash = block_tip.index_block_hash
19+
AND microblock_canonical = true AND canonical = true
20+
ORDER BY microblock_sequence DESC
21+
LIMIT 1
22+
),
23+
microblock_count AS (
24+
SELECT COUNT(*)::INTEGER AS microblock_count
25+
FROM microblocks
26+
WHERE canonical = TRUE AND microblock_canonical = TRUE
27+
),
28+
tx_count AS (
29+
SELECT COUNT(*)::INTEGER AS tx_count
30+
FROM txs
31+
WHERE canonical = TRUE AND microblock_canonical = TRUE
32+
AND block_height <= (SELECT MAX(block_height) FROM blocks WHERE canonical = TRUE)
33+
),
34+
tx_count_unanchored AS (
35+
SELECT COUNT(*)::INTEGER AS tx_count_unanchored
36+
FROM txs
37+
WHERE canonical = TRUE AND microblock_canonical = TRUE
38+
)
39+
SELECT *, block_tip.block_height AS block_count
40+
FROM block_tip
41+
LEFT JOIN microblock_tip ON TRUE
42+
LEFT JOIN microblock_count ON TRUE
43+
LEFT JOIN tx_count ON TRUE
44+
LEFT JOIN tx_count_unanchored ON TRUE
45+
LIMIT 1
46+
`);
47+
48+
pgm.addIndex('chain_tip', 'block_height', { unique: true });
49+
pgm.addIndex('mempool_digest', 'digest', { unique: true });
50+
pgm.addIndex('nft_custody', ['asset_identifier', 'value'], { unique: true });
51+
pgm.addIndex('nft_custody_unanchored', ['asset_identifier', 'value'], { unique: true });
52+
}
53+
54+
export async function down(pgm: MigrationBuilder): Promise<void> {
55+
pgm.dropIndex('chain_tip', 'block_height', { unique: true, ifExists: true });
56+
pgm.dropIndex('mempool_digest', 'digest', { unique: true, ifExists: true });
57+
pgm.dropIndex('nft_custody', ['asset_identifier', 'value'], { unique: true, ifExists: true });
58+
pgm.dropIndex('nft_custody_unanchored', ['asset_identifier', 'value'], { unique: true, ifExists: true });
59+
60+
pgm.dropMaterializedView('chain_tip');
61+
pgm.createMaterializedView('chain_tip', {}, `
62+
WITH block_tip AS (
63+
SELECT block_height, block_hash, index_block_hash
64+
FROM blocks
65+
WHERE block_height = (SELECT MAX(block_height) FROM blocks WHERE canonical = TRUE)
66+
),
67+
microblock_tip AS (
68+
SELECT microblock_hash, microblock_sequence
69+
FROM microblocks, block_tip
70+
WHERE microblocks.parent_index_block_hash = block_tip.index_block_hash
71+
AND microblock_canonical = true AND canonical = true
72+
ORDER BY microblock_sequence DESC
73+
LIMIT 1
74+
),
75+
microblock_count AS (
76+
SELECT COUNT(*)::INTEGER AS microblock_count
77+
FROM microblocks
78+
WHERE canonical = TRUE AND microblock_canonical = TRUE
79+
),
80+
tx_count AS (
81+
SELECT COUNT(*)::INTEGER AS tx_count
82+
FROM txs
83+
WHERE canonical = TRUE AND microblock_canonical = TRUE
84+
AND block_height <= (SELECT MAX(block_height) FROM blocks WHERE canonical = TRUE)
85+
),
86+
tx_count_unanchored AS (
87+
SELECT COUNT(*)::INTEGER AS tx_count_unanchored
88+
FROM txs
89+
WHERE canonical = TRUE AND microblock_canonical = TRUE
90+
)
91+
SELECT *, block_tip.block_height AS block_count
92+
FROM block_tip
93+
LEFT JOIN microblock_tip ON TRUE
94+
LEFT JOIN microblock_count ON TRUE
95+
LEFT JOIN tx_count ON TRUE
96+
LEFT JOIN tx_count_unanchored ON TRUE
97+
`);
98+
}

0 commit comments

Comments
 (0)