Skip to content

Commit 7add12c

Browse files
BLuEScioNzone117xsemantic-release-botrafaelcr
authored
Nickb/duplicate event observer requests processing (#1412)
* fix(api): updates to event_observer_requests occur after specific event handlers * fix(api): clean up pr * fix(events): added tests * fix(event_observer_requests): modified env to run nodes * fix: log PoisonMicroblock tx instead rather than throwing (#1379) * chore(release): 6.1.1 [skip ci] ## [6.1.1](v6.1.0...v6.1.1) (2022-10-24) ### Bug Fixes * log PoisonMicroblock tx instead rather than throwing ([#1379](#1379)) ([cee6352](cee6352)) * fix(duplicate-event-requests): fixed test * fix(duplicate-event-requests): removed unneeded changes * fix(duplicate-event-requests): reverted env * fix(duplicate-events): addressing comments * feat(duplicate-requests): address pr comments * fix(duplicate-events): fixing tests * fix(duplicate-events): fixing tests * ci: patch * fix(duplicate-events): fixing pr * fix(duplicate-events): fixing pr * fix(duplicate-events): addressed pr feedback Co-authored-by: Nick Barnett <[email protected]> Co-authored-by: Matthew Little <[email protected]> Co-authored-by: semantic-release-bot <[email protected]> Co-authored-by: Rafael Cárdenas <[email protected]>
1 parent 5f4feb2 commit 7add12c

File tree

2 files changed

+123
-25
lines changed

2 files changed

+123
-25
lines changed

src/event-stream/event-server.ts

Lines changed: 35 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -606,6 +606,7 @@ interface EventMessageHandler {
606606
function createMessageProcessorQueue(): EventMessageHandler {
607607
// Create a promise queue so that only one message is handled at a time.
608608
const processorQueue = new PQueue({ concurrency: 1 });
609+
609610
const handler: EventMessageHandler = {
610611
handleRawEventRequest: (eventPath: string, payload: any, db: PgWriteStore) => {
611612
return processorQueue
@@ -728,105 +729,114 @@ export async function startEventServer(opts: {
728729
.json({ status: 'ready', msg: 'API event server listening for core-node POST messages' });
729730
});
730731

731-
app.post(
732-
'*',
733-
asyncHandler(async (req, res, next) => {
732+
const handleRawEventRequest = asyncHandler(async req => {
733+
await messageHandler.handleRawEventRequest(req.path, req.body, db);
734+
735+
if (logger.isDebugEnabled()) {
734736
const eventPath = req.path;
735737
let payload = JSON.stringify(req.body);
736-
await messageHandler.handleRawEventRequest(eventPath, req.body, db);
737-
if (logger.isDebugEnabled()) {
738-
// Skip logging massive event payloads, this _should_ only exclude the genesis block payload which is ~80 MB.
739-
if (payload.length > 10_000_000) {
740-
payload = 'payload body too large for logging';
741-
}
742-
logger.debug(`[stacks-node event] ${eventPath} ${payload}`);
738+
// Skip logging massive event payloads, this _should_ only exclude the genesis block payload which is ~80 MB.
739+
if (payload.length > 10_000_000) {
740+
payload = 'payload body too large for logging';
743741
}
744-
next();
745-
})
746-
);
742+
logger.debug(`[stacks-node event] ${eventPath} ${payload}`);
743+
}
744+
});
747745

748746
app.post(
749747
'/new_block',
750-
asyncHandler(async (req, res) => {
748+
asyncHandler(async (req, res, next) => {
751749
try {
752750
const msg: CoreNodeBlockMessage = req.body;
753751
await messageHandler.handleBlockMessage(opts.chainId, msg, db);
754752
res.status(200).json({ result: 'ok' });
753+
next();
755754
} catch (error) {
756755
logError(`error processing core-node /new_block: ${error}`, error);
757756
res.status(500).json({ error: error });
758757
}
759-
})
758+
}),
759+
handleRawEventRequest
760760
);
761761

762762
app.post(
763763
'/new_burn_block',
764-
asyncHandler(async (req, res) => {
764+
asyncHandler(async (req, res, next) => {
765765
try {
766766
const msg: CoreNodeBurnBlockMessage = req.body;
767767
await messageHandler.handleBurnBlock(msg, db);
768768
res.status(200).json({ result: 'ok' });
769+
next();
769770
} catch (error) {
770771
logError(`error processing core-node /new_burn_block: ${error}`, error);
771772
res.status(500).json({ error: error });
772773
}
773-
})
774+
}),
775+
handleRawEventRequest
774776
);
775777

776778
app.post(
777779
'/new_mempool_tx',
778-
asyncHandler(async (req, res) => {
780+
asyncHandler(async (req, res, next) => {
779781
try {
780782
const rawTxs: string[] = req.body;
781783
await messageHandler.handleMempoolTxs(rawTxs, db);
782784
res.status(200).json({ result: 'ok' });
785+
next();
783786
} catch (error) {
784787
logError(`error processing core-node /new_mempool_tx: ${error}`, error);
785788
res.status(500).json({ error: error });
786789
}
787-
})
790+
}),
791+
handleRawEventRequest
788792
);
789793

790794
app.post(
791795
'/drop_mempool_tx',
792-
asyncHandler(async (req, res) => {
796+
asyncHandler(async (req, res, next) => {
793797
try {
794798
const msg: CoreNodeDropMempoolTxMessage = req.body;
795799
await messageHandler.handleDroppedMempoolTxs(msg, db);
796800
res.status(200).json({ result: 'ok' });
801+
next();
797802
} catch (error) {
798803
logError(`error processing core-node /drop_mempool_tx: ${error}`, error);
799804
res.status(500).json({ error: error });
800805
}
801-
})
806+
}),
807+
handleRawEventRequest
802808
);
803809

804810
app.post(
805811
'/attachments/new',
806-
asyncHandler(async (req, res) => {
812+
asyncHandler(async (req, res, next) => {
807813
try {
808814
const msg: CoreNodeAttachmentMessage[] = req.body;
809815
await messageHandler.handleNewAttachment(msg, db);
810816
res.status(200).json({ result: 'ok' });
817+
next();
811818
} catch (error) {
812819
logError(`error processing core-node /attachments/new: ${error}`, error);
813820
res.status(500).json({ error: error });
814821
}
815-
})
822+
}),
823+
handleRawEventRequest
816824
);
817825

818826
app.post(
819827
'/new_microblocks',
820-
asyncHandler(async (req, res) => {
828+
asyncHandler(async (req, res, next) => {
821829
try {
822830
const msg: CoreNodeMicroblockMessage = req.body;
823831
await messageHandler.handleMicroblockMessage(opts.chainId, msg, db);
824832
res.status(200).json({ result: 'ok' });
833+
next();
825834
} catch (error) {
826835
logError(`error processing core-node /new_microblocks: ${error}`, error);
827836
res.status(500).json({ error: error });
828837
}
829-
})
838+
}),
839+
handleRawEventRequest
830840
);
831841

832842
app.post('*', (req, res, next) => {
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
import * as fs from 'fs';
2+
import { PgWriteStore } from '../datastore/pg-write-store';
3+
import { runMigrations } from '../datastore/migrations';
4+
import { ChainID } from '@stacks/transactions';
5+
import { httpPostRequest } from '../helpers';
6+
import { EventStreamServer, startEventServer } from '../event-stream/event-server';
7+
import { cycleMigrations } from '../datastore/migrations';
8+
import { PgSqlClient } from '../datastore/connection';
9+
import { getRawEventRequests } from '../datastore/event-requests';
10+
import { useWithCleanup } from '../tests/test-helpers';
11+
12+
describe('Events table', () => {
13+
let db: PgWriteStore;
14+
let client: PgSqlClient;
15+
let eventServer: EventStreamServer;
16+
17+
beforeEach(async () => {
18+
process.env.PG_DATABASE = 'postgres';
19+
await cycleMigrations();
20+
db = await PgWriteStore.connect({ usageName: 'tests', withNotifier: false });
21+
client = db.sql;
22+
eventServer = await startEventServer({
23+
datastore: db,
24+
chainId: ChainID.Mainnet,
25+
serverHost: '127.0.0.1',
26+
serverPort: 0,
27+
httpLogLevel: 'debug',
28+
});
29+
});
30+
31+
afterEach(async () => {
32+
await eventServer.closeAsync();
33+
await db?.close();
34+
await runMigrations(undefined, 'down');
35+
});
36+
37+
test('If there is an event request error, then the event will not be recorded in the events_observer_request table', async () => {
38+
const getRawEventCount = async () =>
39+
await client<Promise<number>[]>`SELECT count(*) from event_observer_requests`;
40+
41+
await useWithCleanup(
42+
() => {
43+
const readStream = fs.createReadStream('src/tests-event-replay/tsv/mainnet.tsv');
44+
const rawEventsIterator = getRawEventRequests(readStream);
45+
return [rawEventsIterator, () => readStream.close()] as const;
46+
},
47+
async () => {
48+
const eventServer = await startEventServer({
49+
datastore: db,
50+
chainId: ChainID.Mainnet,
51+
serverHost: '127.0.0.1',
52+
serverPort: 0,
53+
httpLogLevel: 'debug',
54+
});
55+
return [eventServer, eventServer.closeAsync] as const;
56+
},
57+
async (rawEventsIterator, eventServer) => {
58+
for await (const rawEvents of rawEventsIterator) {
59+
for (const rawEvent of rawEvents) {
60+
try {
61+
if (rawEvent.event_path === '/new_block') {
62+
const payloadJson = JSON.parse(rawEvent.payload);
63+
payloadJson.transactions = undefined;
64+
rawEvent.payload = JSON.stringify(payloadJson);
65+
}
66+
} catch (error) {}
67+
const rawEventRequestCountResultBefore = await getRawEventCount();
68+
const rawEventRequestCountBefore = rawEventRequestCountResultBefore[0];
69+
const response = await httpPostRequest({
70+
host: '127.0.0.1',
71+
port: eventServer.serverAddress.port,
72+
path: rawEvent.event_path,
73+
headers: { 'Content-Type': 'application/json' },
74+
body: Buffer.from(rawEvent.payload, 'utf8'),
75+
throwOnNotOK: false,
76+
});
77+
if (rawEvent.event_path === '/new_block') {
78+
expect(response.statusCode).toBe(500);
79+
const rawEventRequestCountResultAfter = await getRawEventCount();
80+
const rawEventRequestCountAfter = rawEventRequestCountResultAfter[0];
81+
expect(rawEventRequestCountBefore).toEqual(rawEventRequestCountAfter);
82+
}
83+
}
84+
}
85+
}
86+
);
87+
});
88+
});

0 commit comments

Comments
 (0)