|
1 |
| -import { getOrAdd, I32_MAX, getIbdBlockHeight } from '../helpers'; |
| 1 | +import { getOrAdd, I32_MAX, getIbdBlockHeight, getUintEnvOrDefault } from '../helpers'; |
2 | 2 | import {
|
3 | 3 | DbBlock,
|
4 | 4 | DbTx,
|
@@ -97,6 +97,14 @@ import { PgServer, getConnectionArgs, getConnectionConfig } from './connection';
|
97 | 97 |
|
98 | 98 | const MIGRATIONS_TABLE = 'pgmigrations';
|
99 | 99 | const INSERT_BATCH_SIZE = 500;
|
| 100 | +const MEMPOOL_STATS_DEBOUNCE_INTERVAL = getUintEnvOrDefault( |
| 101 | + 'MEMPOOL_STATS_DEBOUNCE_INTERVAL', |
| 102 | + 1000 |
| 103 | +); |
| 104 | +const MEMPOOL_STATS_DEBOUNCE_MAX_INTERVAL = getUintEnvOrDefault( |
| 105 | + 'MEMPOOL_STATS_DEBOUNCE_MAX_INTERVAL', |
| 106 | + 10000 |
| 107 | +); |
100 | 108 |
|
101 | 109 | class MicroblockGapError extends Error {
|
102 | 110 | constructor(message: string) {
|
@@ -271,10 +279,7 @@ export class PgWriteStore extends PgStore {
|
271 | 279 | }
|
272 | 280 |
|
273 | 281 | if (!this.isEventReplay) {
|
274 |
| - await this.reconcileMempoolStatus(sql); |
275 |
| - |
276 |
| - const mempoolStats = await this.getMempoolStatsInternal({ sql }); |
277 |
| - this.eventEmitter.emit('mempoolStatsUpdate', mempoolStats); |
| 282 | + this.debounceMempoolStat(); |
278 | 283 | }
|
279 | 284 | if (isCanonical)
|
280 | 285 | await sql`
|
@@ -664,10 +669,7 @@ export class PgWriteStore extends PgStore {
|
664 | 669 | }
|
665 | 670 |
|
666 | 671 | if (!this.isEventReplay) {
|
667 |
| - await this.reconcileMempoolStatus(sql); |
668 |
| - |
669 |
| - const mempoolStats = await this.getMempoolStatsInternal({ sql }); |
670 |
| - this.eventEmitter.emit('mempoolStatsUpdate', mempoolStats); |
| 672 | + this.debounceMempoolStat(); |
671 | 673 | }
|
672 | 674 | if (currentMicroblockTip.microblock_canonical)
|
673 | 675 | await sql`
|
@@ -705,42 +707,6 @@ export class PgWriteStore extends PgStore {
|
705 | 707 | }
|
706 | 708 | }
|
707 | 709 |
|
708 |
| - // Find any transactions that are erroneously still marked as both `pending` in the mempool table |
709 |
| - // and also confirmed in the mined txs table. Mark these as pruned in the mempool and log warning. |
710 |
| - // This must be called _after_ any writes to txs/mempool tables during block and microblock ingestion, |
711 |
| - // but _before_ any reads or view refreshes that depend on the mempool table. |
712 |
| - // NOTE: this is essentially a work-around for whatever bug is causing the underlying problem. |
713 |
| - async reconcileMempoolStatus(sql: PgSqlClient): Promise<void> { |
714 |
| - const txsResult = await sql<{ tx_id: string }[]>` |
715 |
| - WITH pruned AS ( |
716 |
| - UPDATE mempool_txs |
717 |
| - SET pruned = true |
718 |
| - FROM txs |
719 |
| - WHERE |
720 |
| - mempool_txs.tx_id = txs.tx_id AND |
721 |
| - mempool_txs.pruned = false AND |
722 |
| - txs.canonical = true AND |
723 |
| - txs.microblock_canonical = true AND |
724 |
| - txs.status IN ${sql([ |
725 |
| - DbTxStatus.Success, |
726 |
| - DbTxStatus.AbortByResponse, |
727 |
| - DbTxStatus.AbortByPostCondition, |
728 |
| - ])} |
729 |
| - RETURNING mempool_txs.tx_id |
730 |
| - ), |
731 |
| - count_update AS ( |
732 |
| - UPDATE chain_tip SET |
733 |
| - mempool_tx_count = mempool_tx_count - (SELECT COUNT(*) FROM pruned), |
734 |
| - mempool_updated_at = NOW() |
735 |
| - ) |
736 |
| - SELECT tx_id FROM pruned |
737 |
| - `; |
738 |
| - if (txsResult.length > 0) { |
739 |
| - const txs = txsResult.map(tx => tx.tx_id); |
740 |
| - logger.warn(`Reconciled mempool txs as pruned for ${txsResult.length} txs`, { txs }); |
741 |
| - } |
742 |
| - } |
743 |
| - |
744 | 710 | async fixBlockZeroData(sql: PgSqlClient, blockOne: DbBlock): Promise<void> {
|
745 | 711 | const tablesUpdates: Record<string, number> = {};
|
746 | 712 | const txsResult = await sql<TxQueryResult[]>`
|
@@ -1696,21 +1662,84 @@ export class PgWriteStore extends PgStore {
|
1696 | 1662 | SELECT tx_id FROM inserted
|
1697 | 1663 | `;
|
1698 | 1664 | txIds.push(...result.map(r => r.tx_id));
|
| 1665 | + // The incoming mempool transactions might have already been settled |
| 1666 | + // We need to mark them as pruned to avoid inconsistent tx state |
| 1667 | + const pruned_tx = await sql<{ tx_id: string }[]>` |
| 1668 | + SELECT tx_id |
| 1669 | + FROM txs |
| 1670 | + WHERE |
| 1671 | + tx_id IN ${sql(batch.map(b => b.tx_id))} AND |
| 1672 | + canonical = true AND |
| 1673 | + microblock_canonical = true`; |
| 1674 | + if (pruned_tx.length > 0) { |
| 1675 | + await sql<{ tx_id: string }[]>` |
| 1676 | + WITH pruned AS ( |
| 1677 | + UPDATE mempool_txs |
| 1678 | + SET pruned = true |
| 1679 | + WHERE |
| 1680 | + tx_id IN ${sql(pruned_tx.map(t => t.tx_id))} AND |
| 1681 | + pruned = false |
| 1682 | + RETURNING tx_id |
| 1683 | + ), |
| 1684 | + count_update AS ( |
| 1685 | + UPDATE chain_tip SET |
| 1686 | + mempool_tx_count = mempool_tx_count - (SELECT COUNT(*) FROM pruned), |
| 1687 | + mempool_updated_at = NOW() |
| 1688 | + ) |
| 1689 | + SELECT tx_id FROM pruned`; |
| 1690 | + } |
1699 | 1691 | }
|
1700 | 1692 | return txIds;
|
1701 | 1693 | }
|
1702 | 1694 |
|
| 1695 | + private _debounceMempoolStat: { |
| 1696 | + triggeredAt?: number | null; |
| 1697 | + debounce?: NodeJS.Timeout | null; |
| 1698 | + running: boolean; |
| 1699 | + } = { running: false }; |
| 1700 | + /** |
| 1701 | + * Debounce the mempool stat process in case new transactions pour in. |
| 1702 | + */ |
| 1703 | + private debounceMempoolStat() { |
| 1704 | + if (this._debounceMempoolStat.triggeredAt == null) { |
| 1705 | + this._debounceMempoolStat.triggeredAt = Date.now(); |
| 1706 | + } |
| 1707 | + if (this._debounceMempoolStat.running) return; |
| 1708 | + const waited = Date.now() - this._debounceMempoolStat.triggeredAt; |
| 1709 | + const delay = Math.max( |
| 1710 | + 0, |
| 1711 | + Math.min(MEMPOOL_STATS_DEBOUNCE_MAX_INTERVAL - waited, MEMPOOL_STATS_DEBOUNCE_INTERVAL) |
| 1712 | + ); |
| 1713 | + if (this._debounceMempoolStat.debounce != null) { |
| 1714 | + clearTimeout(this._debounceMempoolStat.debounce); |
| 1715 | + } |
| 1716 | + this._debounceMempoolStat.debounce = setTimeout(async () => { |
| 1717 | + this._debounceMempoolStat.running = true; |
| 1718 | + this._debounceMempoolStat.triggeredAt = null; |
| 1719 | + try { |
| 1720 | + const mempoolStats = await this.getMempoolStatsInternal({ sql: this.sql }); |
| 1721 | + this.eventEmitter.emit('mempoolStatsUpdate', mempoolStats); |
| 1722 | + } catch (e) { |
| 1723 | + logger.error(e, `failed to run mempool stats update`); |
| 1724 | + } finally { |
| 1725 | + this._debounceMempoolStat.running = false; |
| 1726 | + this._debounceMempoolStat.debounce = null; |
| 1727 | + if (this._debounceMempoolStat.triggeredAt != null) { |
| 1728 | + this.debounceMempoolStat(); |
| 1729 | + } |
| 1730 | + } |
| 1731 | + }, delay); |
| 1732 | + } |
| 1733 | + |
1703 | 1734 | async updateMempoolTxs({ mempoolTxs: txs }: { mempoolTxs: DbMempoolTxRaw[] }): Promise<void> {
|
1704 | 1735 | const updatedTxIds: string[] = [];
|
1705 | 1736 | await this.sqlWriteTransaction(async sql => {
|
1706 | 1737 | const chainTip = await this.getChainTip();
|
1707 | 1738 | updatedTxIds.push(...(await this.insertDbMempoolTxs(txs, chainTip, sql)));
|
1708 |
| - if (!this.isEventReplay) { |
1709 |
| - await this.reconcileMempoolStatus(sql); |
1710 |
| - const mempoolStats = await this.getMempoolStatsInternal({ sql }); |
1711 |
| - this.eventEmitter.emit('mempoolStatsUpdate', mempoolStats); |
1712 |
| - } |
1713 | 1739 | });
|
| 1740 | + if (!this.isEventReplay) { |
| 1741 | + this.debounceMempoolStat(); |
| 1742 | + } |
1714 | 1743 | for (const txId of updatedTxIds) {
|
1715 | 1744 | await this.notifier?.sendTx({ txId });
|
1716 | 1745 | }
|
|
0 commit comments