Skip to content
This repository was archived by the owner on Aug 12, 2023. It is now read-only.

Commit dfe9b72

Browse files
authored
Restructure event extractor for extensibility (#369)
* Restructure event extractor for extensibility * Tweak a few small things
1 parent e0c397c commit dfe9b72

File tree

10 files changed

+105
-64
lines changed

10 files changed

+105
-64
lines changed
Lines changed: 60 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
const extractorV1 = require('@0x-event-extractor/extractor-v1');
2-
const extractorV2 = require('@0x-event-extractor/extractor-v2');
3-
const extractorV3 = require('@0x-event-extractor/extractor-v3');
1+
const fillExtractorV1 = require('@0x-event-extractor/extractor-v1');
2+
const fillExtractorV2 = require('@0x-event-extractor/extractor-v2');
3+
const fillExtractorV3 = require('@0x-event-extractor/extractor-v3');
44

55
const { getLogger } = require('../util/logging');
66
const BlockRange = require('../model/block-range');
@@ -9,19 +9,19 @@ const getCurrentBlock = require('../ethereum/get-current-block');
99
const getNextBlockRange = require('../events/get-next-block-range');
1010
const withTransaction = require('../util/with-transaction');
1111

12-
const extractEventsForProtocol = async (protocolVersion, extractorConfig) => {
13-
// Scope all logging for the job to the specified protocol version
14-
const logger = getLogger(`extract events v${protocolVersion}`);
15-
16-
// Determine which blocks we should fetch log entries from
17-
const { currentBlock, fetchLogEntries, getEventData } = extractorConfig;
12+
const performExtraction = async (currentBlock, extractorConfig) => {
13+
const {
14+
eventType, // TODO: Query by this once we have data in blockranges collection
15+
fetchLogEntries,
16+
getEventData,
17+
protocolVersion,
18+
} = extractorConfig;
1819

19-
logger.info(`current block is ${currentBlock}`);
20+
// Scope all logging for the job to the specified protocol version and event type
21+
const logger = getLogger(`extract v${protocolVersion} ${eventType} events`);
2022

21-
const nextBlockRange = await getNextBlockRange({
22-
currentBlock,
23-
protocolVersion,
24-
});
23+
const rangeConfig = { currentBlock, protocolVersion }; // TODO: Also include event type
24+
const nextBlockRange = await getNextBlockRange(rangeConfig);
2525

2626
if (nextBlockRange === null) {
2727
logger.info('no more blocks to process');
@@ -30,53 +30,60 @@ const extractEventsForProtocol = async (protocolVersion, extractorConfig) => {
3030

3131
const { fromBlock, toBlock } = nextBlockRange;
3232

33-
logger.info(`fetching events from block range ${fromBlock} to ${toBlock}`);
33+
logger.info(`fetching events from blocks ${fromBlock}-${toBlock}`);
3434

3535
const logEntries = await fetchLogEntries(fromBlock, toBlock);
36+
const entryCount = logEntries.length;
3637

37-
if (logEntries.length === 0) {
38-
logger.info(`no events found in block range ${fromBlock} to ${toBlock}`);
38+
if (entryCount === 0) {
39+
logger.info(`no events found in blocks ${fromBlock}-${toBlock}`);
3940
} else {
40-
logger.info(
41-
`${logEntries.length} events found in block range ${fromBlock} to ${toBlock}`,
42-
);
41+
logger.info(`${entryCount} events found in blocks ${fromBlock}-${toBlock}`);
4342
}
4443

45-
// Persistence operations are wrapped in a transaction to ensure consistency
44+
/**
45+
* Persistence operations are wrapped in a transaction to ensure consistency
46+
* between the BlockRange and Event collections. If a document exists within
47+
* the BlockRange collection, then we can assume that all the associated
48+
* events will exist in the Event collection.
49+
*/
4650
await withTransaction(async session => {
4751
if (logEntries.length > 0) {
48-
// Map the log entries to a common model before persisting to MongoDB
4952
const events = logEntries.map(logEntry => ({
5053
blockNumber: logEntry.blockNumber,
5154
data: getEventData(logEntry),
55+
dateIngested: new Date(),
5256
logIndex: logEntry.logIndex,
5357
protocolVersion,
5458
transactionHash: logEntry.transactionHash,
55-
type: logEntry.event,
59+
type: eventType,
5660
}));
5761

5862
await Event.insertMany(events, { session });
5963
}
6064

61-
// Log details of the queried block range so that we know where
62-
// to start from in the next iteration.
63-
await BlockRange.findOneAndUpdate(
64-
{ fromBlock, protocolVersion, toBlock },
65-
{
66-
$set: {
67-
date: new Date(),
65+
/**
66+
* Log details of the queried block range in MongoDB. This provides
67+
* two functions:
68+
*
69+
* 1. History of scraping activity for debugging
70+
* 2. An indicator for where to scrape from on the next iteration
71+
*
72+
* If collection size became an issue then the BlockRange collection
73+
* could be safely capped at say 100,000 documents.
74+
*/
75+
await BlockRange.create(
76+
[
77+
{
78+
dateProcessed: new Date(),
6879
events: logEntries.length,
80+
eventType,
6981
fromBlock,
7082
protocolVersion,
7183
toBlock,
7284
},
73-
},
74-
{
75-
upsert: true,
76-
new: true,
77-
runValidators: true,
78-
session,
79-
},
85+
],
86+
{ session },
8087
);
8188
});
8289

@@ -86,13 +93,24 @@ const extractEventsForProtocol = async (protocolVersion, extractorConfig) => {
8693
};
8794

8895
const extractEvents = async () => {
96+
const logger = getLogger('event extractor');
97+
98+
logger.info('beginning event extraction');
99+
logger.info('fetching current block');
100+
89101
const currentBlock = await getCurrentBlock();
90102

91-
// Fetching of events is delegated to version specific extractors which
92-
// interact with the correct 0x SDK version and contract.
93-
await extractEventsForProtocol(1, { currentBlock, ...extractorV1 });
94-
await extractEventsForProtocol(2, { currentBlock, ...extractorV2 });
95-
await extractEventsForProtocol(3, { currentBlock, ...extractorV3 });
103+
logger.info(`current block is ${currentBlock}`);
104+
105+
/**
106+
* Extractors are run sequentially to help avoid issues with rate
107+
* limiting in the Ethereum RPC provider.
108+
*/
109+
await performExtraction(currentBlock, fillExtractorV1);
110+
await performExtraction(currentBlock, fillExtractorV2);
111+
await performExtraction(currentBlock, fillExtractorV3);
112+
113+
logger.info('finished event extraction');
96114
};
97115

98116
module.exports = extractEvents;

packages/core/src/model/block-range.js

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,21 +3,23 @@ const mongoose = require('mongoose');
33
const { Schema } = mongoose;
44

55
const schema = Schema({
6-
dateProcessed: Date,
7-
events: Number,
8-
fromBlock: Number,
9-
protocolVersion: { default: 1, type: Number },
10-
toBlock: Number,
6+
dateProcessed: { required: true, type: Date },
7+
events: { required: true, type: Number },
8+
eventType: { required: true, type: String },
9+
fromBlock: { required: true, type: Number },
10+
protocolVersion: { default: 1, required: true, type: Number },
11+
toBlock: { required: true, type: Number },
1112
});
1213

1314
// Used for determining last processed block
1415
schema.index({ protocolVersion: 1, toBlock: -1 });
1516

17+
// Used to enforce consistency in the data
1618
schema.index(
1719
{ fromBlock: 1, toBlock: 1, protocolVersion: 1 },
1820
{ unique: true },
1921
);
2022

21-
const Model = mongoose.model('BlockRange', schema);
23+
const BlockRange = mongoose.model('BlockRange', schema);
2224

23-
module.exports = Model;
25+
module.exports = BlockRange;

packages/core/src/model/event.js

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,18 @@ const mongoose = require('mongoose');
33
const { Schema } = mongoose;
44

55
const schema = Schema({
6-
blockNumber: Number,
7-
data: { type: Schema.Types.Mixed },
8-
fillCreated: { type: Boolean, default: false },
9-
logIndex: Number,
10-
protocolVersion: { default: 1, type: Number },
11-
transactionHash: String,
12-
type: String,
6+
blockNumber: { required: true, type: Number },
7+
data: { required: true, type: Schema.Types.Mixed },
8+
dateIngested: { required: true, type: Date },
9+
logIndex: { required: true, type: Number },
10+
protocolVersion: { default: 1, required: true, type: Number },
11+
transactionHash: { required: true, type: String },
12+
type: { required: true, type: String },
1313
});
1414

15+
// Used to enforce consistency in the data
1516
schema.index({ logIndex: 1, transactionHash: 1 }, { unique: true });
16-
schema.index({ fillCreated: 1 });
1717

18-
const Model = mongoose.model('Event', schema);
18+
const Event = mongoose.model('Event', schema);
1919

20-
module.exports = Model;
20+
module.exports = Event;

packages/core/src/model/index.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@ const BlockRange = require('./block-range');
22
const Event = require('./event');
33

44
const init = async () => {
5-
BlockRange.createCollection();
6-
Event.createCollection();
5+
await BlockRange.createCollection();
6+
await Event.createCollection();
77
};
88

99
module.exports = { BlockRange, Event, init };

packages/extractor-v1/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
{
22
"name": "@0x-event-extractor/extractor-v1",
3+
"description": "Extractor for v1 LogFill events",
34
"version": "1.0.0",
45
"main": "src/index.js",
56
"dependencies": {

packages/extractor-v1/src/index.js

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,10 @@ const configure = options => {
66
zeroEx.configure(options);
77
};
88

9-
module.exports = { configure, fetchLogEntries, getEventData };
9+
module.exports = {
10+
configure,
11+
eventType: 'LogFill',
12+
fetchLogEntries,
13+
getEventData,
14+
protocolVersion: 1,
15+
};

packages/extractor-v2/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
{
22
"name": "@0x-event-extractor/extractor-v2",
3+
"description": "Extractor for v2 Fill events",
34
"version": "1.0.0",
45
"main": "src/index.js",
56
"dependencies": {

packages/extractor-v2/src/index.js

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,10 @@ const configure = options => {
66
zeroEx.configure(options);
77
};
88

9-
module.exports = { configure, fetchLogEntries, getEventData };
9+
module.exports = {
10+
configure,
11+
eventType: 'Fill',
12+
fetchLogEntries,
13+
getEventData,
14+
protocolVersion: 2,
15+
};

packages/extractor-v3/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
{
22
"name": "@0x-event-extractor/extractor-v3",
3+
"description": "Extractor for v3 Fill events",
34
"version": "1.0.0",
45
"main": "src/index.js",
56
"dependencies": {

packages/extractor-v3/src/index.js

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,10 @@ const configure = options => {
66
zeroEx.configure(options);
77
};
88

9-
module.exports = { configure, fetchLogEntries, getEventData };
9+
module.exports = {
10+
configure,
11+
eventType: 'Fill',
12+
fetchLogEntries,
13+
getEventData,
14+
protocolVersion: 3,
15+
};

0 commit comments

Comments
 (0)