Skip to content

Commit 2e6448d

Browse files
authored
fix: handle pg exceptions on web socket transmitter (#1353)
1 parent f9b7ae4 commit 2e6448d

File tree

2 files changed

+175
-87
lines changed

2 files changed

+175
-87
lines changed

src/api/routes/ws/web-socket-transmitter.ts

Lines changed: 116 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import { WebSocketChannel } from './web-socket-channel';
1212
import { SocketIOChannel } from './channels/socket-io-channel';
1313
import { WsRpcChannel } from './channels/ws-rpc-channel';
1414
import { parseNftEvent } from '../../../datastore/helpers';
15+
import { logger } from '../../../helpers';
1516

1617
/**
1718
* This object matches real time update `WebSocketTopics` subscriptions with internal
@@ -67,128 +68,156 @@ export class WebSocketTransmitter {
6768

6869
private async blockUpdate(blockHash: string) {
6970
if (this.channels.find(c => c.hasListeners('block'))) {
70-
const blockQuery = await getBlockFromDataStore({
71-
blockIdentifer: { hash: blockHash },
72-
db: this.db,
73-
});
74-
if (blockQuery.found) {
75-
this.channels.forEach(c => c.send('block', blockQuery.result));
71+
try {
72+
const blockQuery = await getBlockFromDataStore({
73+
blockIdentifer: { hash: blockHash },
74+
db: this.db,
75+
});
76+
if (blockQuery.found) {
77+
this.channels.forEach(c => c.send('block', blockQuery.result));
78+
}
79+
} catch (error) {
80+
logger.error(error);
7681
}
7782
}
7883
}
7984

8085
private async microblockUpdate(microblockHash: string) {
8186
if (this.channels.find(c => c.hasListeners('microblock'))) {
82-
const microblockQuery = await getMicroblockFromDataStore({
83-
db: this.db,
84-
microblockHash: microblockHash,
85-
});
86-
if (microblockQuery.found) {
87-
this.channels.forEach(c => c.send('microblock', microblockQuery.result));
87+
try {
88+
const microblockQuery = await getMicroblockFromDataStore({
89+
db: this.db,
90+
microblockHash: microblockHash,
91+
});
92+
if (microblockQuery.found) {
93+
this.channels.forEach(c => c.send('microblock', microblockQuery.result));
94+
}
95+
} catch (error) {
96+
logger.error(error);
8897
}
8998
}
9099
}
91100

92101
private async txUpdate(txId: string) {
93102
if (this.channels.find(c => c.hasListeners('mempool'))) {
94-
const mempoolTxs = await getMempoolTxsFromDataStore(this.db, {
95-
txIds: [txId],
96-
includeUnanchored: true,
97-
});
98-
if (mempoolTxs.length > 0) {
99-
this.channels.forEach(c => c.send('mempoolTransaction', mempoolTxs[0]));
103+
try {
104+
const mempoolTxs = await getMempoolTxsFromDataStore(this.db, {
105+
txIds: [txId],
106+
includeUnanchored: true,
107+
});
108+
if (mempoolTxs.length > 0) {
109+
this.channels.forEach(c => c.send('mempoolTransaction', mempoolTxs[0]));
110+
}
111+
} catch (error) {
112+
logger.error(error);
100113
}
101114
}
102115

103116
if (this.channels.find(c => c.hasListeners('transaction', txId))) {
104-
// Look at the `txs` table first so we always prefer the confirmed transaction.
105-
const txQuery = await getTxFromDataStore(this.db, {
106-
txId: txId,
107-
includeUnanchored: true,
108-
});
109-
if (txQuery.found) {
110-
this.channels.forEach(c => c.send('transaction', txQuery.result));
111-
} else {
112-
// Tx is not yet confirmed, look at `mempool_txs`.
113-
const mempoolTxs = await getMempoolTxsFromDataStore(this.db, {
114-
txIds: [txId],
117+
try {
118+
// Look at the `txs` table first so we always prefer the confirmed transaction.
119+
const txQuery = await getTxFromDataStore(this.db, {
120+
txId: txId,
115121
includeUnanchored: true,
116122
});
117-
if (mempoolTxs.length > 0) {
118-
this.channels.forEach(c => c.send('transaction', mempoolTxs[0]));
123+
if (txQuery.found) {
124+
this.channels.forEach(c => c.send('transaction', txQuery.result));
125+
} else {
126+
// Tx is not yet confirmed, look at `mempool_txs`.
127+
const mempoolTxs = await getMempoolTxsFromDataStore(this.db, {
128+
txIds: [txId],
129+
includeUnanchored: true,
130+
});
131+
if (mempoolTxs.length > 0) {
132+
this.channels.forEach(c => c.send('transaction', mempoolTxs[0]));
133+
}
119134
}
135+
} catch (error) {
136+
logger.error(error);
120137
}
121138
}
122139
}
123140

124141
private async nftEventUpdate(txId: string, eventIndex: number) {
125-
const nftEvent = await this.db.getNftEvent({ txId, eventIndex });
126-
if (!nftEvent.found) {
127-
return;
128-
}
129-
const assetIdentifier = nftEvent.result.asset_identifier;
130-
const value = nftEvent.result.value;
131-
const event = parseNftEvent(nftEvent.result);
142+
try {
143+
const nftEvent = await this.db.getNftEvent({ txId, eventIndex });
144+
if (!nftEvent.found) {
145+
return;
146+
}
147+
const assetIdentifier = nftEvent.result.asset_identifier;
148+
const value = nftEvent.result.value;
149+
const event = parseNftEvent(nftEvent.result);
132150

133-
if (this.channels.find(c => c.hasListeners('nftEvent'))) {
134-
this.channels.forEach(c => c.send('nftEvent', event));
135-
}
136-
if (this.channels.find(c => c.hasListeners('nftAssetEvent', assetIdentifier, value))) {
137-
this.channels.forEach(c => c.send('nftAssetEvent', assetIdentifier, value, event));
138-
}
139-
if (this.channels.find(c => c.hasListeners('nftCollectionEvent', assetIdentifier))) {
140-
this.channels.forEach(c => c.send('nftCollectionEvent', assetIdentifier, event));
151+
if (this.channels.find(c => c.hasListeners('nftEvent'))) {
152+
this.channels.forEach(c => c.send('nftEvent', event));
153+
}
154+
if (this.channels.find(c => c.hasListeners('nftAssetEvent', assetIdentifier, value))) {
155+
this.channels.forEach(c => c.send('nftAssetEvent', assetIdentifier, value, event));
156+
}
157+
if (this.channels.find(c => c.hasListeners('nftCollectionEvent', assetIdentifier))) {
158+
this.channels.forEach(c => c.send('nftCollectionEvent', assetIdentifier, event));
159+
}
160+
} catch (error) {
161+
logger.error(error);
141162
}
142163
}
143164

144165
private async addressUpdate(address: string, blockHeight: number) {
145166
if (this.channels.find(c => c.hasListeners('principalTransactions', address))) {
146-
const dbTxsQuery = await this.db.getAddressTxsWithAssetTransfers({
147-
stxAddress: address,
148-
blockHeight: blockHeight,
149-
atSingleBlock: true,
150-
});
151-
if (dbTxsQuery.total == 0) {
152-
return;
153-
}
154-
const addressTxs = dbTxsQuery.results;
155-
for (const addressTx of addressTxs) {
156-
const parsedTx = parseDbTx(addressTx.tx);
157-
const result: AddressTransactionWithTransfers = {
158-
tx: parsedTx,
159-
stx_sent: addressTx.stx_sent.toString(),
160-
stx_received: addressTx.stx_received.toString(),
161-
stx_transfers: addressTx.stx_transfers.map(value => {
162-
return {
163-
amount: value.amount.toString(),
164-
sender: value.sender,
165-
recipient: value.recipient,
166-
};
167-
}),
168-
};
169-
this.channels.forEach(c => c.send('principalTransaction', address, result));
167+
try {
168+
const dbTxsQuery = await this.db.getAddressTxsWithAssetTransfers({
169+
stxAddress: address,
170+
blockHeight: blockHeight,
171+
atSingleBlock: true,
172+
});
173+
if (dbTxsQuery.total == 0) {
174+
return;
175+
}
176+
const addressTxs = dbTxsQuery.results;
177+
for (const addressTx of addressTxs) {
178+
const parsedTx = parseDbTx(addressTx.tx);
179+
const result: AddressTransactionWithTransfers = {
180+
tx: parsedTx,
181+
stx_sent: addressTx.stx_sent.toString(),
182+
stx_received: addressTx.stx_received.toString(),
183+
stx_transfers: addressTx.stx_transfers.map(value => {
184+
return {
185+
amount: value.amount.toString(),
186+
sender: value.sender,
187+
recipient: value.recipient,
188+
};
189+
}),
190+
};
191+
this.channels.forEach(c => c.send('principalTransaction', address, result));
192+
}
193+
} catch (error) {
194+
logger.error(error);
170195
}
171196
}
172197

173198
if (this.channels.find(c => c.hasListeners('principalStxBalance', address))) {
174-
const stxBalanceResult = await this.db.getStxBalanceAtBlock(address, blockHeight);
175-
const tokenOfferingLocked = await this.db.getTokenOfferingLocked(address, blockHeight);
176-
const balance: AddressStxBalanceResponse = {
177-
balance: stxBalanceResult.balance.toString(),
178-
total_sent: stxBalanceResult.totalSent.toString(),
179-
total_received: stxBalanceResult.totalReceived.toString(),
180-
total_fees_sent: stxBalanceResult.totalFeesSent.toString(),
181-
total_miner_rewards_received: stxBalanceResult.totalMinerRewardsReceived.toString(),
182-
lock_tx_id: stxBalanceResult.lockTxId,
183-
locked: stxBalanceResult.locked.toString(),
184-
lock_height: stxBalanceResult.lockHeight,
185-
burnchain_lock_height: stxBalanceResult.burnchainLockHeight,
186-
burnchain_unlock_height: stxBalanceResult.burnchainUnlockHeight,
187-
};
188-
if (tokenOfferingLocked.found) {
189-
balance.token_offering_locked = tokenOfferingLocked.result;
199+
try {
200+
const stxBalanceResult = await this.db.getStxBalanceAtBlock(address, blockHeight);
201+
const tokenOfferingLocked = await this.db.getTokenOfferingLocked(address, blockHeight);
202+
const balance: AddressStxBalanceResponse = {
203+
balance: stxBalanceResult.balance.toString(),
204+
total_sent: stxBalanceResult.totalSent.toString(),
205+
total_received: stxBalanceResult.totalReceived.toString(),
206+
total_fees_sent: stxBalanceResult.totalFeesSent.toString(),
207+
total_miner_rewards_received: stxBalanceResult.totalMinerRewardsReceived.toString(),
208+
lock_tx_id: stxBalanceResult.lockTxId,
209+
locked: stxBalanceResult.locked.toString(),
210+
lock_height: stxBalanceResult.lockHeight,
211+
burnchain_lock_height: stxBalanceResult.burnchainLockHeight,
212+
burnchain_unlock_height: stxBalanceResult.burnchainUnlockHeight,
213+
};
214+
if (tokenOfferingLocked.found) {
215+
balance.token_offering_locked = tokenOfferingLocked.result;
216+
}
217+
this.channels.forEach(c => c.send('principalStxBalance', address, balance));
218+
} catch (error) {
219+
logger.error(error);
190220
}
191-
this.channels.forEach(c => c.send('principalStxBalance', address, balance));
192221
}
193222
}
194223
}

src/tests/ws-transmitter-tests.ts

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
import { PgWriteStore } from '../datastore/pg-write-store';
2+
import { cycleMigrations, runMigrations } from '../datastore/migrations';
3+
import { WebSocketTransmitter } from '../api/routes/ws/web-socket-transmitter';
4+
import { Server } from 'http';
5+
import {
6+
ListenerType,
7+
WebSocketChannel,
8+
WebSocketPayload,
9+
WebSocketTopics,
10+
} from '../api/routes/ws/web-socket-channel';
11+
12+
class TestChannel extends WebSocketChannel {
13+
connect(): void {
14+
//
15+
}
16+
close(callback?: ((err?: Error | undefined) => void) | undefined): void {
17+
//
18+
}
19+
send<P extends keyof WebSocketPayload>(
20+
payload: P,
21+
...args: ListenerType<WebSocketPayload[P]>
22+
): void {
23+
//
24+
}
25+
hasListeners<P extends keyof WebSocketTopics>(
26+
topic: P,
27+
...args: ListenerType<WebSocketTopics[P]>
28+
): boolean {
29+
return true;
30+
}
31+
}
32+
33+
describe('ws transmitter', () => {
34+
let db: PgWriteStore;
35+
let transmitter: WebSocketTransmitter;
36+
37+
beforeEach(async () => {
38+
process.env.PG_DATABASE = 'postgres';
39+
await cycleMigrations();
40+
db = await PgWriteStore.connect({ usageName: 'tests', skipMigrations: true });
41+
});
42+
43+
test('handles pg exceptions gracefully', async () => {
44+
const fakeServer = new Server();
45+
transmitter = new WebSocketTransmitter(db, fakeServer);
46+
transmitter['channels'].push(new TestChannel(fakeServer));
47+
await db.close();
48+
await expect(transmitter['blockUpdate']('0xff')).resolves.not.toThrow();
49+
await expect(transmitter['microblockUpdate']('0xff')).resolves.not.toThrow();
50+
await expect(transmitter['txUpdate']('0xff')).resolves.not.toThrow();
51+
await expect(transmitter['nftEventUpdate']('0xff', 0)).resolves.not.toThrow();
52+
await expect(transmitter['addressUpdate']('0xff', 1)).resolves.not.toThrow();
53+
});
54+
55+
afterEach(async () => {
56+
await db?.close();
57+
await runMigrations(undefined, 'down');
58+
});
59+
});

0 commit comments

Comments
 (0)