Skip to content

Commit dbd5a49

Browse files
authored
fix: send address tx updates correctly on microblocks (#1089)
* fix: increase block_height correctly on microblock tx updates * fix: take mb tx block height from correct var * fix: socket-io addr tx test * fix: attempt to fix ws test * fix: attempt to fix socket io test * fix: socket io async issues * chore: clean up log entries
1 parent 3945a37 commit dbd5a49

File tree

5 files changed

+94
-101
lines changed

5 files changed

+94
-101
lines changed

src/api/routes/ws/socket-io.ts

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -29,56 +29,57 @@ export function createSocketIORouter(db: DataStore, server: http.Server) {
2929
prometheus = new WebSocketPrometheus('socket_io');
3030
}
3131

32-
io.on('connection', socket => {
33-
logger.info('[socket.io] new connection');
32+
io.on('connection', async socket => {
33+
logger.info(`[socket.io] new connection: ${socket.id}`);
3434
if (socket.handshake.headers['x-forwarded-for']) {
3535
prometheus?.connect(socket.handshake.headers['x-forwarded-for'] as string);
3636
} else {
3737
prometheus?.connect(socket.handshake.address);
3838
}
39-
socket.on('disconnect', reason => {
40-
logger.info(`[socket.io] disconnected: ${reason}`);
41-
prometheus?.disconnect(socket);
42-
});
4339
const subscriptions = socket.handshake.query['subscriptions'];
4440
if (subscriptions) {
4541
// TODO: check if init topics are valid, reject connection with error if not
4642
const topics = [...[subscriptions]].flat().flatMap(r => r.split(','));
47-
topics.forEach(topic => {
43+
for (const topic of topics) {
4844
prometheus?.subscribe(socket, topic);
49-
void socket.join(topic);
50-
});
45+
await socket.join(topic);
46+
}
5147
}
52-
socket.on('subscribe', (topic, callback) => {
48+
49+
socket.on('disconnect', reason => {
50+
logger.info(`[socket.io] disconnected ${socket.id}: ${reason}`);
51+
prometheus?.disconnect(socket);
52+
});
53+
socket.on('subscribe', async (topic, callback) => {
5354
prometheus?.subscribe(socket, topic);
54-
void socket.join(topic);
55+
await socket.join(topic);
5556
// TODO: check if topic is valid, and return error message if not
5657
callback?.(null);
5758
});
58-
socket.on('unsubscribe', (...topics) => {
59-
topics.forEach(topic => {
59+
socket.on('unsubscribe', async (...topics) => {
60+
for (const topic of topics) {
6061
prometheus?.unsubscribe(socket, topic);
61-
void socket.leave(topic);
62-
});
62+
await socket.leave(topic);
63+
}
6364
});
6465
});
6566

6667
const adapter = io.of('/').adapter;
6768

6869
adapter.on('create-room', room => {
69-
logger.info(`[socket.io] room created: "${room}"`);
70+
logger.info(`[socket.io] room created: ${room}`);
7071
});
7172

7273
adapter.on('delete-room', room => {
73-
logger.info(`[socket.io] room deleted: "${room}"`);
74+
logger.info(`[socket.io] room deleted: ${room}`);
7475
});
7576

7677
adapter.on('join-room', (room, id) => {
77-
logger.info(`[socket.io] socket ${id} joined room "${room}"`);
78+
logger.info(`[socket.io] socket ${id} joined room: ${room}`);
7879
});
7980

8081
adapter.on('leave-room', (room, id) => {
81-
logger.info(`[socket.io] socket ${id} left room "${room}"`);
82+
logger.info(`[socket.io] socket ${id} left room: ${room}`);
8283
});
8384

8485
db.on('blockUpdate', async blockHash => {
@@ -192,7 +193,7 @@ export function createSocketIORouter(db: DataStore, server: http.Server) {
192193
// Get latest balance (in case multiple txs come in from different blocks)
193194
const blockHeights = addressTxs.map(tx => tx.tx.block_height);
194195
const latestBlock = Math.max(...blockHeights);
195-
void getAddressStxBalance(address, latestBlock)
196+
getAddressStxBalance(address, latestBlock)
196197
.then(balance => {
197198
prometheus?.sendEvent('address-stx-balance');
198199
io.to(addrStxBalanceTopic).emit('address-stx-balance', address, balance);

src/api/routes/ws/ws-rpc.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -431,10 +431,10 @@ export function createWsRpcRouter(db: DataStore, server: http.Server): WebSocket
431431
// Queue to process balance update notifications
432432
const addrBalanceProcessorQueue = new PQueue({ concurrency: 1 });
433433

434-
function processAddressBalanceUpdate(address: string) {
434+
async function processAddressBalanceUpdate(address: string) {
435435
const subscribers = addressBalanceUpdateSubscriptions.subscriptions.get(address);
436436
if (subscribers) {
437-
void addrBalanceProcessorQueue.add(async () => {
437+
await addrBalanceProcessorQueue.add(async () => {
438438
try {
439439
const balance = await db.getStxBalance({
440440
stxAddress: address,
@@ -521,7 +521,7 @@ export function createWsRpcRouter(db: DataStore, server: http.Server): WebSocket
521521

522522
db.addListener('addressUpdate', async (address, blockHeight) => {
523523
await processAddressUpdate(address, blockHeight);
524-
processAddressBalanceUpdate(address);
524+
await processAddressBalanceUpdate(address);
525525
});
526526

527527
db.addListener('blockUpdate', async blockHash => {
@@ -539,7 +539,7 @@ export function createWsRpcRouter(db: DataStore, server: http.Server): WebSocket
539539
prometheus?.connect(req.socket.remoteAddress);
540540
}
541541
clientSocket.on('message', data => {
542-
void handleClientMessage(clientSocket, data);
542+
handleClientMessage(clientSocket, data);
543543
});
544544
clientSocket.on('close', (_: WebSocket) => {
545545
prometheus?.disconnect(clientSocket);

src/datastore/postgres-store.ts

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1227,13 +1227,13 @@ export class PgDataStore
12271227
await this.refreshMaterializedView(client, 'chain_tip');
12281228

12291229
if (this.notifier) {
1230-
dbMicroblocks.forEach(async microblock => {
1231-
await this.notifier?.sendMicroblock({ microblockHash: microblock.microblock_hash });
1232-
});
1233-
txs.forEach(async txData => {
1234-
await this.notifier?.sendTx({ txId: txData.tx.tx_id });
1235-
});
1236-
this.emitAddressTxUpdates(data.txs);
1230+
for (const microblock of dbMicroblocks) {
1231+
await this.notifier.sendMicroblock({ microblockHash: microblock.microblock_hash });
1232+
}
1233+
for (const tx of txs) {
1234+
await this.notifier.sendTx({ txId: tx.tx.tx_id });
1235+
}
1236+
await this.emitAddressTxUpdates(txs);
12371237
}
12381238
});
12391239
}
@@ -1446,13 +1446,13 @@ export class PgDataStore
14461446
// Skip sending `PgNotifier` updates altogether if we're in the genesis block since this block is the
14471447
// event replay of the v1 blockchain.
14481448
if ((data.block.block_height > 1 || !isProdEnv) && this.notifier) {
1449-
await this.notifier?.sendBlock({ blockHash: data.block.block_hash });
1450-
data.txs.forEach(async entry => {
1451-
await this.notifier?.sendTx({ txId: entry.tx.tx_id });
1452-
});
1453-
this.emitAddressTxUpdates(data.txs);
1449+
await this.notifier.sendBlock({ blockHash: data.block.block_hash });
1450+
for (const tx of data.txs) {
1451+
await this.notifier.sendTx({ txId: tx.tx.tx_id });
1452+
}
1453+
await this.emitAddressTxUpdates(data.txs);
14541454
for (const tokenMetadataQueueEntry of tokenMetadataQueueEntries) {
1455-
await this.notifier?.sendTokenMetadata({ entry: tokenMetadataQueueEntry });
1455+
await this.notifier.sendTokenMetadata({ entry: tokenMetadataQueueEntry });
14561456
}
14571457
}
14581458
}
@@ -2055,10 +2055,10 @@ export class PgDataStore
20552055
});
20562056
}
20572057

2058-
emitAddressTxUpdates(txs: DataStoreTxEventData[]) {
2058+
async emitAddressTxUpdates(txs: DataStoreTxEventData[]) {
20592059
// Record all addresses that had an associated tx.
20602060
const addressTxUpdates = new Map<string, number>();
2061-
txs.forEach(entry => {
2061+
for (const entry of txs) {
20622062
const tx = entry.tx;
20632063
const addAddressTx = (addr: string | undefined) => {
20642064
if (addr) {
@@ -2095,13 +2095,13 @@ export class PgDataStore
20952095
addAddressTx(tx.token_transfer_recipient_address);
20962096
break;
20972097
}
2098-
});
2099-
addressTxUpdates.forEach(async (blockHeight, address) => {
2098+
}
2099+
for (const [address, blockHeight] of addressTxUpdates) {
21002100
await this.notifier?.sendAddress({
21012101
address: address,
21022102
blockHeight: blockHeight,
21032103
});
2104-
});
2104+
}
21052105
}
21062106

21072107
/**

src/tests/socket-io-tests.ts

Lines changed: 22 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,7 @@ describe('socket-io', () => {
6464
});
6565

6666
test('socket-io > microblock updates', async () => {
67-
const address = apiServer.address;
68-
const socket = io(`http://${address}`, {
67+
const socket = io(`http://${apiServer.address}`, {
6968
reconnection: false,
7069
query: { subscriptions: 'microblock' },
7170
});
@@ -81,7 +80,6 @@ describe('socket-io', () => {
8180
const microblocks = new TestMicroblockStreamBuilder()
8281
.addMicroblock({
8382
microblock_hash: '0xff01',
84-
microblock_parent_hash: '0x1212',
8583
parent_index_block_hash: '0x4343',
8684
})
8785
.addTx({ tx_id: '0xf6f6' })
@@ -91,7 +89,7 @@ describe('socket-io', () => {
9189
const result = await updateWaiter;
9290
try {
9391
expect(result.microblock_hash).toEqual('0xff01');
94-
expect(result.microblock_parent_hash).toEqual('0x1212');
92+
expect(result.parent_block_hash).toEqual('0x1212');
9593
expect(result.txs[0]).toEqual('0xf6f6');
9694
} finally {
9795
socket.emit('unsubscribe', 'microblock');
@@ -146,44 +144,47 @@ describe('socket-io', () => {
146144

147145
test('socket-io > address tx updates', async () => {
148146
const addr1 = 'ST28D4Q6RCQSJ6F7TEYWQDS4N1RXYEP9YBWMYSB97';
149-
const address = apiServer.address;
150-
const socket = io(`http://${address}`, {
147+
const socket = io(`http://${apiServer.address}`, {
151148
reconnection: false,
152149
query: { subscriptions: `address-transaction:${addr1}` },
153150
});
154-
const updateWaiters: Waiter<AddressTransactionWithTransfers>[] = [waiter(), waiter()];
155-
156-
let waiterIndex = 0;
151+
let updateIndex = 0;
152+
const addrTxUpdates: Waiter<AddressTransactionWithTransfers>[] = [waiter(), waiter()];
157153
socket.on(`address-transaction:${addr1}`, (_, tx) => {
158-
updateWaiters[waiterIndex++].finish(tx);
154+
addrTxUpdates[updateIndex++]?.finish(tx);
159155
});
160-
const block = new TestBlockBuilder()
156+
157+
const block = new TestBlockBuilder({
158+
block_height: 1,
159+
block_hash: '0x01',
160+
index_block_hash: '0x01',
161+
})
161162
.addTx({ tx_id: '0x8912', sender_address: addr1, token_transfer_amount: 100n, fee_rate: 50n })
162163
.addTxStxEvent({ sender: addr1, amount: 100n })
163164
.build();
164165
await db.update(block);
166+
const blockResult = await addrTxUpdates[0];
165167

166168
const microblock = new TestMicroblockStreamBuilder()
167-
.addMicroblock()
169+
.addMicroblock({
170+
microblock_hash: '0x11',
171+
parent_index_block_hash: '0x01',
172+
})
168173
.addTx({
169174
tx_id: '0x8913',
170175
sender_address: addr1,
171176
token_transfer_amount: 150n,
172177
fee_rate: 50n,
173-
block_height: 2,
174178
})
175-
.addTxStxEvent({ sender: addr1, amount: 150n, block_height: 2 })
179+
.addTxStxEvent({ sender: addr1, amount: 150n })
176180
.build();
177181
await db.updateMicroblocks(microblock);
182+
const microblockResult = await addrTxUpdates[1];
178183

179-
const result0 = await updateWaiters[0];
180-
const result1 = await updateWaiters[1];
181-
const result = result0.tx.tx_id === '0x8912' ? result0 : result1;
182-
const microblockResult = result0.tx.tx_id === '0x8912' ? result1 : result0;
183184
try {
184-
expect(result.tx.tx_id).toEqual('0x8912');
185-
expect(result.stx_sent).toEqual('150'); // Incl. fees
186-
expect(result.stx_transfers[0].amount).toEqual('100');
185+
expect(blockResult.tx.tx_id).toEqual('0x8912');
186+
expect(blockResult.stx_sent).toEqual('150'); // Incl. fees
187+
expect(blockResult.stx_transfers[0].amount).toEqual('100');
187188
expect(microblockResult.tx.tx_id).toEqual('0x8913');
188189
expect(microblockResult.stx_sent).toEqual('200'); // Incl. fees
189190
expect(microblockResult.stx_transfers[0].amount).toEqual('150');

src/tests/websocket-tests.ts

Lines changed: 29 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -228,35 +228,38 @@ describe('websocket notifications', () => {
228228
});
229229

230230
test('websocket rpc - address tx subscription updates', async () => {
231-
const addr = apiServer.address;
232-
const wsAddress = `ws://${addr}/extended/v1/ws`;
231+
const wsAddress = `ws://${apiServer.address}/extended/v1/ws`;
233232
const socket = new WebSocket(wsAddress);
233+
const client = new RpcWebSocketClient();
234+
const addr = 'STB44HYPYAT2BB2QE513NSP81HTMYWBJP02HPGK6';
235+
const subParams: RpcAddressTxSubscriptionParams = {
236+
event: 'address_tx_update',
237+
address: addr,
238+
};
234239

235240
try {
236-
const addr = 'STB44HYPYAT2BB2QE513NSP81HTMYWBJP02HPGK6';
237241
await once(socket, 'open');
238-
const client = new RpcWebSocketClient();
239-
240242
client.changeSocket(socket);
241243
client.listenMessages();
242-
const subParams1: RpcAddressTxSubscriptionParams = {
243-
event: 'address_tx_update',
244-
address: addr,
245-
};
246-
const result = await client.call('subscribe', subParams1);
244+
const result = await client.call('subscribe', subParams);
247245
expect(result).toEqual({ address: addr });
248246

249-
// watch for update to this tx
250247
let updateIndex = 0;
251248
const addrTxUpdates: Waiter<RpcAddressTxNotificationParams>[] = [waiter(), waiter()];
252249
client.onNotification.push(msg => {
253250
if (msg.method === 'address_tx_update') {
254251
const txUpdate: RpcAddressTxNotificationParams = msg.params;
255252
addrTxUpdates[updateIndex++]?.finish(txUpdate);
253+
} else {
254+
fail(msg.method);
256255
}
257256
});
258257

259-
const block = new TestBlockBuilder()
258+
const block = new TestBlockBuilder({
259+
block_height: 1,
260+
block_hash: '0x01',
261+
index_block_hash: '0x01',
262+
})
260263
.addTx({
261264
tx_id: '0x8912000000000000000000000000000000000000000000000000000000000000',
262265
sender_address: addr,
@@ -266,50 +269,38 @@ describe('websocket notifications', () => {
266269
.addTxStxEvent({ sender: addr })
267270
.build();
268271
await db.update(block);
272+
const txUpdate1 = await addrTxUpdates[0];
273+
expect(txUpdate1).toEqual({
274+
address: 'STB44HYPYAT2BB2QE513NSP81HTMYWBJP02HPGK6',
275+
tx_id: '0x8912000000000000000000000000000000000000000000000000000000000000',
276+
tx_status: 'success',
277+
tx_type: 'token_transfer',
278+
});
269279

270280
const microblock = new TestMicroblockStreamBuilder()
271-
.addMicroblock()
281+
.addMicroblock({
282+
microblock_hash: '0x11',
283+
parent_index_block_hash: '0x01',
284+
})
272285
.addTx({
273286
tx_id: '0x8913',
274287
sender_address: addr,
275288
token_transfer_amount: 150n,
276289
fee_rate: 50n,
277-
block_height: 2,
278290
type_id: DbTxTypeId.TokenTransfer,
279291
})
280-
.addTxStxEvent({ sender: addr, amount: 150n, block_height: 2 })
292+
.addTxStxEvent({ sender: addr, amount: 150n })
281293
.build();
282294
await db.updateMicroblocks(microblock);
283-
284-
const update0 = await addrTxUpdates[0];
285-
const update1 = await addrTxUpdates[1];
286-
const txUpdate1 =
287-
update0.tx_id === '0x8912000000000000000000000000000000000000000000000000000000000000'
288-
? update0
289-
: update1;
290-
const txUpdate2 =
291-
update0.tx_id === '0x8912000000000000000000000000000000000000000000000000000000000000'
292-
? update1
293-
: update0;
294-
295-
// check for tx update notification
296-
expect(txUpdate1).toEqual({
297-
address: 'STB44HYPYAT2BB2QE513NSP81HTMYWBJP02HPGK6',
298-
tx_id: '0x8912000000000000000000000000000000000000000000000000000000000000',
299-
tx_status: 'success',
300-
tx_type: 'token_transfer',
301-
});
302-
295+
const txUpdate2 = await addrTxUpdates[1];
303296
expect(txUpdate2).toEqual({
304297
address: 'STB44HYPYAT2BB2QE513NSP81HTMYWBJP02HPGK6',
305298
tx_id: '0x8913',
306299
tx_status: 'success',
307300
tx_type: 'token_transfer',
308301
});
309-
310-
const unsubscribeResult = await client.call('unsubscribe', subParams1);
311-
expect(unsubscribeResult).toEqual({ address: 'STB44HYPYAT2BB2QE513NSP81HTMYWBJP02HPGK6' });
312302
} finally {
303+
await client.call('unsubscribe', subParams);
313304
socket.terminate();
314305
}
315306
});

0 commit comments

Comments
 (0)