Skip to content

Commit 1f09c0e

Browse files
authored
fix: shorten token metadata pg notifications (#1143)
1 parent cb1ec6b commit 1f09c0e

File tree

4 files changed

+49
-11
lines changed

4 files changed

+49
-11
lines changed

src/datastore/common.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -389,7 +389,7 @@ export type DataStoreEventEmitter = StrictEventEmitter<
389389
addressUpdate: (address: string, blockHeight: number) => void;
390390
nameUpdate: (info: string) => void;
391391
tokensUpdate: (contractID: string) => void;
392-
tokenMetadataUpdateQueued: (entry: TokenMetadataUpdateInfo) => void;
392+
tokenMetadataUpdateQueued: (queueId: number) => void;
393393
}
394394
>;
395395

@@ -964,6 +964,12 @@ export interface DataStore extends DataStoreEventEmitter {
964964
offset: number;
965965
}): Promise<{ results: DbNonFungibleTokenMetadata[]; total: number }>;
966966

967+
/**
968+
* Returns a single entry from the `token_metadata_queue` table.
969+
* @param queueId - queue entry id
970+
*/
971+
getTokenMetadataQueueEntry(queueId: number): Promise<FoundOrNot<DbTokenMetadataQueueEntry>>;
972+
967973
getTokenMetadataQueue(
968974
limit: number,
969975
excludingEntries: number[]

src/datastore/postgres-notifier.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ export type PgAddressNotificationPayload = {
2121
};
2222

2323
export type PgTokenMetadataNotificationPayload = {
24-
entry: DbTokenMetadataQueueEntry;
24+
queueId: number;
2525
};
2626

2727
export type PgNameNotificationPayload = {

src/datastore/postgres-store.ts

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -797,7 +797,7 @@ export class PgDataStore
797797
break;
798798
case 'tokenMetadataUpdateQueued':
799799
const metadata = notification.payload as PgTokenMetadataNotificationPayload;
800-
this.emit('tokenMetadataUpdateQueued', metadata.entry);
800+
this.emit('tokenMetadataUpdateQueued', metadata.queueId);
801801
break;
802802
}
803803
});
@@ -1457,7 +1457,7 @@ export class PgDataStore
14571457
}
14581458
await this.emitAddressTxUpdates(data.txs);
14591459
for (const tokenMetadataQueueEntry of tokenMetadataQueueEntries) {
1460-
await this.notifier.sendTokenMetadata({ entry: tokenMetadataQueueEntry });
1460+
await this.notifier.sendTokenMetadata({ queueId: tokenMetadataQueueEntry.queueId });
14611461
}
14621462
}
14631463
}
@@ -3670,7 +3670,7 @@ export class PgDataStore
36703670
`
36713671
SELECT tx_id
36723672
FROM txs
3673-
WHERE canonical = true AND microblock_canonical = true
3673+
WHERE canonical = true AND microblock_canonical = true
36743674
AND tx_id = ANY($1)
36753675
AND block_height = $2
36763676
`,
@@ -4856,6 +4856,31 @@ export class PgDataStore
48564856
);
48574857
}
48584858

4859+
async getTokenMetadataQueueEntry(
4860+
queueId: number
4861+
): Promise<FoundOrNot<DbTokenMetadataQueueEntry>> {
4862+
const result = await this.query(async client => {
4863+
const queryResult = await client.query<DbTokenMetadataQueueEntryQuery>(
4864+
`SELECT * FROM token_metadata_queue WHERE queue_id = $1`,
4865+
[queueId]
4866+
);
4867+
return queryResult;
4868+
});
4869+
if (result.rowCount === 0) {
4870+
return { found: false };
4871+
}
4872+
const row = result.rows[0];
4873+
const entry: DbTokenMetadataQueueEntry = {
4874+
queueId: row.queue_id,
4875+
txId: bufferToHexPrefixString(row.tx_id),
4876+
contractId: row.contract_id,
4877+
contractAbi: JSON.parse(row.contract_abi),
4878+
blockHeight: row.block_height,
4879+
processed: row.processed,
4880+
};
4881+
return { found: true, result: entry };
4882+
}
4883+
48594884
async getTokenMetadataQueue(
48604885
limit: number,
48614886
excludingEntries: number[]
@@ -6484,7 +6509,7 @@ export class PgDataStore
64846509
const validZonefileHash = this.validateZonefileHash(zonefile_hash);
64856510
await client.query(
64866511
`
6487-
INSERT INTO zonefiles (zonefile, zonefile_hash)
6512+
INSERT INTO zonefiles (zonefile, zonefile_hash)
64886513
VALUES ($1, $2)
64896514
`,
64906515
[zonefile, validZonefileHash]
@@ -6880,10 +6905,10 @@ export class PgDataStore
68806905
WHERE owner = $1
68816906
AND block_height <= $2
68826907
AND canonical = true AND microblock_canonical = true
6883-
)),
6908+
)),
68846909
6885-
latest_names AS(
6886-
(
6910+
latest_names AS(
6911+
(
68876912
SELECT DISTINCT ON (names.name) names.name, address, registered_at as block_height, tx_index
68886913
FROM names, address_names
68896914
WHERE address_names.name = names.name

src/event-stream/tokens-contract-handler.ts

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -748,13 +748,13 @@ export class TokensProcessorQueue {
748748
/** The entries currently queued for processing in memory, keyed by the queue entry db id. */
749749
readonly queuedEntries: Map<number, TokenMetadataUpdateInfo> = new Map();
750750

751-
readonly onTokenMetadataUpdateQueued: (entry: TokenMetadataUpdateInfo) => void;
751+
readonly onTokenMetadataUpdateQueued: (queueId: number) => void;
752752

753753
constructor(db: DataStore, chainId: ChainID) {
754754
this.db = db;
755755
this.chainId = chainId;
756756
this.queue = new PQueue({ concurrency: TOKEN_METADATA_PARSING_CONCURRENCY_LIMIT });
757-
this.onTokenMetadataUpdateQueued = entry => this.queueHandler(entry);
757+
this.onTokenMetadataUpdateQueued = entry => this.queueNotificationHandler(entry);
758758
this.db.on('tokenMetadataUpdateQueued', this.onTokenMetadataUpdateQueued);
759759
}
760760

@@ -800,6 +800,13 @@ export class TokensProcessorQueue {
800800
}
801801
}
802802

803+
async queueNotificationHandler(queueId: number) {
804+
const queueEntry = await this.db.getTokenMetadataQueueEntry(queueId);
805+
if (queueEntry.found) {
806+
await this.queueHandler(queueEntry.result);
807+
}
808+
}
809+
803810
async queueHandler(queueEntry: TokenMetadataUpdateInfo) {
804811
if (
805812
this.queuedEntries.has(queueEntry.queueId) ||

0 commit comments

Comments
 (0)