@@ -188,6 +188,8 @@ export class PgWriteStore extends PgStore {
188
188
async update ( data : DataStoreBlockUpdateData ) : Promise < void > {
189
189
const tokenMetadataQueueEntries : DbTokenMetadataQueueEntry [ ] = [ ] ;
190
190
let garbageCollectedMempoolTxs : string [ ] = [ ] ;
191
+ let batchedTxData : DataStoreTxEventData [ ] = [ ] ;
192
+
191
193
await this . sql . begin ( async sql => {
192
194
const chainTip = await this . getChainTip ( sql , false ) ;
193
195
await this . handleReorg ( sql , data . block , chainTip . blockHeight ) ;
@@ -257,7 +259,7 @@ export class PgWriteStore extends PgStore {
257
259
data . block . execution_cost_write_count = totalCost . execution_cost_write_count ;
258
260
data . block . execution_cost_write_length = totalCost . execution_cost_write_length ;
259
261
260
- let batchedTxData : DataStoreTxEventData [ ] = data . txs ;
262
+ batchedTxData = data . txs ;
261
263
262
264
// Find microblocks that weren't already inserted via the unconfirmed microblock event.
263
265
// This happens when a stacks-node is syncing and receives confirmed microblocks with their anchor block at the same time.
@@ -358,8 +360,6 @@ export class PgWriteStore extends PgStore {
358
360
await this . updateNames ( sql , entry . tx , bnsName ) ;
359
361
}
360
362
}
361
- await this . refreshNftCustody ( sql , batchedTxData ) ;
362
- await this . refreshMaterializedView ( sql , 'chain_tip' ) ;
363
363
const mempoolGarbageResults = await this . deleteGarbageCollectedMempoolTxs ( sql ) ;
364
364
if ( mempoolGarbageResults . deletedTxs . length > 0 ) {
365
365
logger . verbose (
@@ -399,6 +399,10 @@ export class PgWriteStore extends PgStore {
399
399
}
400
400
} ) ;
401
401
402
+ await this . refreshNftCustody ( batchedTxData ) ;
403
+ await this . refreshMaterializedView ( 'chain_tip' ) ;
404
+ await this . refreshMaterializedView ( 'mempool_digest' ) ;
405
+
402
406
// Skip sending `PgNotifier` updates altogether if we're in the genesis block since this block is the
403
407
// event replay of the v1 blockchain.
404
408
if ( ( data . block . block_height > 1 || ! isProdEnv ) && this . notifier ) {
@@ -529,6 +533,9 @@ export class PgWriteStore extends PgStore {
529
533
}
530
534
531
535
async updateMicroblocksInternal ( data : DataStoreMicroblockUpdateData ) : Promise < void > {
536
+ const txData : DataStoreTxEventData [ ] = [ ] ;
537
+ let dbMicroblocks : DbMicroblock [ ] = [ ] ;
538
+
532
539
await this . sql . begin ( async sql => {
533
540
// Sanity check: ensure incoming microblocks have a `parent_index_block_hash` that matches the API's
534
541
// current known canonical chain tip. We assume this holds true so incoming microblock data is always
@@ -550,7 +557,7 @@ export class PgWriteStore extends PgStore {
550
557
551
558
// The block height is just one after the current chain tip height
552
559
const blockHeight = chainTip . blockHeight + 1 ;
553
- const dbMicroblocks = data . microblocks . map ( mb => {
560
+ dbMicroblocks = data . microblocks . map ( mb => {
554
561
const dbMicroBlock : DbMicroblock = {
555
562
canonical : true ,
556
563
microblock_canonical : true ,
@@ -570,8 +577,6 @@ export class PgWriteStore extends PgStore {
570
577
return dbMicroBlock ;
571
578
} ) ;
572
579
573
- const txs : DataStoreTxEventData [ ] = [ ] ;
574
-
575
580
for ( const entry of data . txs ) {
576
581
// Note: the properties block_hash and burn_block_time are empty here because the anchor block with that data doesn't yet exist.
577
582
const dbTx : DbTx = {
@@ -582,7 +587,7 @@ export class PgWriteStore extends PgStore {
582
587
583
588
// Set all the `block_height` properties for the related tx objects, since it wasn't known
584
589
// when creating the objects using only the stacks-node message payload.
585
- txs . push ( {
590
+ txData . push ( {
586
591
tx : dbTx ,
587
592
stxEvents : entry . stxEvents . map ( e => ( { ...e , block_height : blockHeight } ) ) ,
588
593
contractLogEvents : entry . contractLogEvents . map ( e => ( {
@@ -598,7 +603,7 @@ export class PgWriteStore extends PgStore {
598
603
} ) ;
599
604
}
600
605
601
- await this . insertMicroblockData ( sql , dbMicroblocks , txs ) ;
606
+ await this . insertMicroblockData ( sql , dbMicroblocks , txData ) ;
602
607
603
608
// Find any microblocks that have been orphaned by this latest microblock chain tip.
604
609
// This function also checks that each microblock parent hash points to an existing microblock in the db.
@@ -646,24 +651,25 @@ export class PgWriteStore extends PgStore {
646
651
) ;
647
652
}
648
653
649
- await this . refreshNftCustody ( sql , txs , true ) ;
650
- await this . refreshMaterializedView ( sql , 'chain_tip' ) ;
651
-
652
654
if ( ! this . isEventReplay ) {
653
655
const mempoolStats = await this . getMempoolStatsInternal ( { sql } ) ;
654
656
this . eventEmitter . emit ( 'mempoolStatsUpdate' , mempoolStats ) ;
655
657
}
658
+ } ) ;
656
659
657
- if ( this . notifier ) {
658
- for ( const microblock of dbMicroblocks ) {
659
- await this . notifier . sendMicroblock ( { microblockHash : microblock . microblock_hash } ) ;
660
- }
661
- for ( const tx of txs ) {
662
- await this . notifier . sendTx ( { txId : tx . tx . tx_id } ) ;
663
- }
664
- await this . emitAddressTxUpdates ( txs ) ;
660
+ await this . refreshNftCustody ( txData , true ) ;
661
+ await this . refreshMaterializedView ( 'chain_tip' ) ;
662
+ await this . refreshMaterializedView ( 'mempool_digest' ) ;
663
+
664
+ if ( this . notifier ) {
665
+ for ( const microblock of dbMicroblocks ) {
666
+ await this . notifier . sendMicroblock ( { microblockHash : microblock . microblock_hash } ) ;
665
667
}
666
- } ) ;
668
+ for ( const tx of txData ) {
669
+ await this . notifier . sendTx ( { txId : tx . tx . tx_id } ) ;
670
+ }
671
+ await this . emitAddressTxUpdates ( txData ) ;
672
+ }
667
673
}
668
674
669
675
async updateStxLockEvent ( sql : PgSqlClient , tx : DbTx , event : DbStxLockEvent ) {
@@ -1313,13 +1319,12 @@ export class PgWriteStore extends PgStore {
1313
1319
updatedTxs . push ( tx ) ;
1314
1320
}
1315
1321
}
1316
- await this . refreshMaterializedView ( sql , 'mempool_digest' ) ;
1317
-
1318
1322
if ( ! this . isEventReplay ) {
1319
1323
const mempoolStats = await this . getMempoolStatsInternal ( { sql } ) ;
1320
1324
this . eventEmitter . emit ( 'mempoolStatsUpdate' , mempoolStats ) ;
1321
1325
}
1322
1326
} ) ;
1327
+ await this . refreshMaterializedView ( 'mempool_digest' ) ;
1323
1328
for ( const tx of updatedTxs ) {
1324
1329
await this . notifier ?. sendTx ( { txId : tx . tx_id } ) ;
1325
1330
}
@@ -1335,8 +1340,8 @@ export class PgWriteStore extends PgStore {
1335
1340
RETURNING ${ sql ( MEMPOOL_TX_COLUMNS ) }
1336
1341
` ;
1337
1342
updatedTxs = updateResults . map ( r => parseMempoolTxQueryResult ( r ) ) ;
1338
- await this . refreshMaterializedView ( sql , 'mempool_digest' ) ;
1339
1343
} ) ;
1344
+ await this . refreshMaterializedView ( 'mempool_digest' ) ;
1340
1345
for ( const tx of updatedTxs ) {
1341
1346
await this . notifier ?. sendTx ( { txId : tx . tx_id } ) ;
1342
1347
}
@@ -1964,7 +1969,6 @@ export class PgWriteStore extends PgStore {
1964
1969
WHERE tx_id IN ${ sql ( txIds ) }
1965
1970
RETURNING tx_id
1966
1971
` ;
1967
- await this . refreshMaterializedView ( sql , 'mempool_digest' ) ;
1968
1972
const restoredTxs = updateResults . map ( r => r . tx_id ) ;
1969
1973
return { restoredTxs : restoredTxs } ;
1970
1974
}
@@ -1988,7 +1992,6 @@ export class PgWriteStore extends PgStore {
1988
1992
WHERE tx_id IN ${ sql ( txIds ) }
1989
1993
RETURNING tx_id
1990
1994
` ;
1991
- await this . refreshMaterializedView ( sql , 'mempool_digest' ) ;
1992
1995
const removedTxs = updateResults . map ( r => r . tx_id ) ;
1993
1996
return { removedTxs : removedTxs } ;
1994
1997
}
@@ -2002,7 +2005,9 @@ export class PgWriteStore extends PgStore {
2002
2005
// Get threshold block.
2003
2006
const blockThreshold = process . env [ 'STACKS_MEMPOOL_TX_GARBAGE_COLLECTION_THRESHOLD' ] ?? 256 ;
2004
2007
const cutoffResults = await sql < { block_height : number } [ ] > `
2005
- SELECT (block_height - ${ blockThreshold } ) AS block_height FROM chain_tip
2008
+ SELECT (MAX(block_height) - ${ blockThreshold } ) AS block_height
2009
+ FROM blocks
2010
+ WHERE canonical = TRUE
2006
2011
` ;
2007
2012
if ( cutoffResults . length != 1 ) {
2008
2013
return { deletedTxs : [ ] } ;
@@ -2016,7 +2021,6 @@ export class PgWriteStore extends PgStore {
2016
2021
WHERE pruned = FALSE AND receipt_block_height < ${ cutoffBlockHeight }
2017
2022
RETURNING tx_id
2018
2023
` ;
2019
- await this . refreshMaterializedView ( sql , 'mempool_digest' ) ;
2020
2024
const deletedTxs = deletedTxResults . map ( r => r . tx_id ) ;
2021
2025
for ( const txId of deletedTxs ) {
2022
2026
await this . notifier ?. sendTx ( { txId : txId } ) ;
@@ -2437,11 +2441,12 @@ export class PgWriteStore extends PgStore {
2437
2441
2438
2442
/**
2439
2443
* Refreshes a Postgres materialized view.
2440
- * @param sql - Pg Client
2441
2444
* @param viewName - Materialized view name
2445
+ * @param sql - Pg scoped client. Will use the default client if none specified
2442
2446
* @param skipDuringEventReplay - If we should skip refreshing during event replay
2443
2447
*/
2444
- async refreshMaterializedView ( sql : PgSqlClient , viewName : string , skipDuringEventReplay = true ) {
2448
+ async refreshMaterializedView ( viewName : string , sql ?: PgSqlClient , skipDuringEventReplay = true ) {
2449
+ sql = sql ?? this . sql ;
2445
2450
if ( this . isEventReplay && skipDuringEventReplay ) {
2446
2451
return ;
2447
2452
}
@@ -2454,34 +2459,32 @@ export class PgWriteStore extends PgStore {
2454
2459
* @param txs - Transaction event data
2455
2460
* @param unanchored - If this refresh is requested from a block or microblock
2456
2461
*/
2457
- async refreshNftCustody (
2458
- sql : PgSqlClient ,
2459
- txs : DataStoreTxEventData [ ] ,
2460
- unanchored : boolean = false
2461
- ) {
2462
- const newNftEventCount = txs
2463
- . map ( tx => tx . nftEvents . length )
2464
- . reduce ( ( prev , cur ) => prev + cur , 0 ) ;
2465
- if ( newNftEventCount > 0 ) {
2466
- // Always refresh unanchored view since even if we're in a new anchored block we should update the
2467
- // unanchored state to the current one.
2468
- await this . refreshMaterializedView ( sql , 'nft_custody_unanchored' ) ;
2469
- if ( ! unanchored ) {
2470
- await this . refreshMaterializedView ( sql , 'nft_custody' ) ;
2471
- }
2472
- } else if ( ! unanchored ) {
2473
- // Even if we didn't receive new NFT events in a new anchor block, we should check if we need to
2474
- // update the anchored view to reflect any changes made by previous microblocks.
2475
- const result = await sql < { outdated : boolean } [ ] > `
2476
- WITH anchored_height AS (SELECT MAX(block_height) AS anchored FROM nft_custody),
2477
- unanchored_height AS (SELECT MAX(block_height) AS unanchored FROM nft_custody_unanchored)
2478
- SELECT unanchored > anchored AS outdated
2479
- FROM anchored_height CROSS JOIN unanchored_height
2480
- ` ;
2481
- if ( result . length > 0 && result [ 0 ] . outdated ) {
2482
- await this . refreshMaterializedView ( sql , 'nft_custody' ) ;
2462
+ async refreshNftCustody ( txs : DataStoreTxEventData [ ] , unanchored : boolean = false ) {
2463
+ await this . sql . begin ( async sql => {
2464
+ const newNftEventCount = txs
2465
+ . map ( tx => tx . nftEvents . length )
2466
+ . reduce ( ( prev , cur ) => prev + cur , 0 ) ;
2467
+ if ( newNftEventCount > 0 ) {
2468
+ // Always refresh unanchored view since even if we're in a new anchored block we should update the
2469
+ // unanchored state to the current one.
2470
+ await this . refreshMaterializedView ( 'nft_custody_unanchored' , sql ) ;
2471
+ if ( ! unanchored ) {
2472
+ await this . refreshMaterializedView ( 'nft_custody' , sql ) ;
2473
+ }
2474
+ } else if ( ! unanchored ) {
2475
+ // Even if we didn't receive new NFT events in a new anchor block, we should check if we need to
2476
+ // update the anchored view to reflect any changes made by previous microblocks.
2477
+ const result = await sql < { outdated : boolean } [ ] > `
2478
+ WITH anchored_height AS (SELECT MAX(block_height) AS anchored FROM nft_custody),
2479
+ unanchored_height AS (SELECT MAX(block_height) AS unanchored FROM nft_custody_unanchored)
2480
+ SELECT unanchored > anchored AS outdated
2481
+ FROM anchored_height CROSS JOIN unanchored_height
2482
+ ` ;
2483
+ if ( result . length > 0 && result [ 0 ] . outdated ) {
2484
+ await this . refreshMaterializedView ( 'nft_custody' , sql ) ;
2485
+ }
2483
2486
}
2484
- }
2487
+ } ) ;
2485
2488
}
2486
2489
2487
2490
/**
@@ -2492,10 +2495,10 @@ export class PgWriteStore extends PgStore {
2492
2495
return ;
2493
2496
}
2494
2497
await this . sql . begin ( async sql => {
2495
- await this . refreshMaterializedView ( sql , 'nft_custody' , false ) ;
2496
- await this . refreshMaterializedView ( sql , 'nft_custody_unanchored' , false ) ;
2497
- await this . refreshMaterializedView ( sql , 'chain_tip' , false ) ;
2498
- await this . refreshMaterializedView ( sql , 'mempool_digest' , false ) ;
2498
+ await this . refreshMaterializedView ( 'nft_custody' , sql , false ) ;
2499
+ await this . refreshMaterializedView ( 'nft_custody_unanchored' , sql , false ) ;
2500
+ await this . refreshMaterializedView ( 'chain_tip' , sql , false ) ;
2501
+ await this . refreshMaterializedView ( 'mempool_digest' , sql , false ) ;
2499
2502
} ) ;
2500
2503
}
2501
2504
}
0 commit comments