Skip to content

Commit da992d7

Browse files
rafaelcrzone117x
andauthored
feat: add pruned event import mode that ignores some historical events (#1125)
* chore: move event replay functions to separate file * feat: add fs-reverse * feat: start reading block height * feat: start ignoring prunable events * refactor: name updates * feat: integrate pruned mode, create replay helpers file * fix: unused export * feat: also try pruning microblock events * feat: remove fs-reverse and implement again with modern streams * docs: add mode to readme * fix: avoid emitting LF bytes in reverse file stream, support CRLF endings, avoid multiple conversions between buffers and strings * chore: explicit buffer utf8 conversion * chore: test reading whole file with backpressure * chore: remove unused waiter import Co-authored-by: Matthew Little <[email protected]>
1 parent 303eaaa commit da992d7

File tree

7 files changed

+441
-107
lines changed

7 files changed

+441
-107
lines changed

readme.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ be upgraded and its database cannot be migrated to a new schema. One way to hand
103103
and stacks-node working directory, and re-sync from scratch.
104104

105105
Alternatively, an event-replay feature is available where the API records the HTTP POST requests from the stacks-node event emitter, then streams
106-
these events back to itself. Essentially simulating a wipe & full re-sync, but much quicker -- typically around 10 minutes.
106+
these events back to itself. Essentially simulating a wipe & full re-sync, but much quicker.
107107

108108
The feature can be used via program args. For example, if there are breaking changes in the API's sql schema, like adding a new column which requires
109109
event's to be re-played, the following steps could be ran:
@@ -125,6 +125,10 @@ event's to be re-played, the following steps could be ran:
125125
node ./lib/index.js import-events --file /tmp/stacks-node-events.tsv --wipe-db --force
126126
```
127127

128+
This command has two modes of operation, specified by the `--mode` option:
129+
* `archival` (default): The process will import and ingest *all* blockchain events that have happened since the first block.
130+
* `pruned`: The import process will ignore some prunable events (mempool, microblocks) until the import block height has reached `chain tip - 256` blocks. This saves a considerable amount of time during import, but sacrifices some historical data. You can use this mode if you're mostly interested in running an API that prioritizes real time information.
131+
128132
Alternatively, instead of performing the `export-events` command in step 1, an environmental variable can be set which enables events to be streamed to a file
129133
as they are received, while the application is running normally. To enable this feature, set the `STACKS_EXPORT_EVENTS_FILE` env var to the file path where
130134
events should be appended. Example:

src/event-replay/event-replay.ts

Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
import * as path from 'path';
2+
import * as fs from 'fs';
3+
import { cycleMigrations, dangerousDropAllTables, PgDataStore } from '../datastore/postgres-store';
4+
import { startEventServer } from '../event-stream/event-server';
5+
import { getApiConfiguredChainID, httpPostRequest, logger } from '../helpers';
6+
import { findTsvBlockHeight, getDbBlockHeight } from './helpers';
7+
8+
enum EventImportMode {
9+
/**
10+
* The Event Server will ingest and process every single Stacks node event contained in the TSV file
11+
* from block 0 to the latest block. This is the default mode.
12+
*/
13+
archival = 'archival',
14+
/**
15+
* The Event Server will ingore certain "prunable" events (see `PRUNABLE_EVENT_PATHS`) from
16+
* the imported TSV file if they are received outside of a block window, usually set to
17+
* TSV's `block_height` - 256.
18+
* This allows the import to be faster at the expense of historical blockchain information.
19+
*/
20+
pruned = 'pruned',
21+
}
22+
23+
/**
24+
* Event paths that will be ignored during `EventImportMode.pruned` if received outside of the
25+
* pruned block window.
26+
*/
27+
const PRUNABLE_EVENT_PATHS = ['/new_mempool_tx', '/drop_mempool_tx', '/new_microblocks'];
28+
29+
/**
30+
* Exports all Stacks node events stored in the `event_observer_requests` table to a TSV file.
31+
* @param filePath - Path to TSV file to write
32+
* @param overwriteFile - If we should overwrite the file
33+
*/
34+
export async function exportEventsAsTsv(
35+
filePath?: string,
36+
overwriteFile: boolean = false
37+
): Promise<void> {
38+
if (!filePath) {
39+
throw new Error(`A file path should be specified with the --file option`);
40+
}
41+
const resolvedFilePath = path.resolve(filePath);
42+
if (fs.existsSync(resolvedFilePath) && overwriteFile !== true) {
43+
throw new Error(
44+
`A file already exists at ${resolvedFilePath}. Add --overwrite-file to truncate an existing file`
45+
);
46+
}
47+
console.log(`Export event data to file: ${resolvedFilePath}`);
48+
const writeStream = fs.createWriteStream(resolvedFilePath);
49+
console.log(`Export started...`);
50+
await PgDataStore.exportRawEventRequests(writeStream);
51+
console.log('Export successful.');
52+
}
53+
54+
/**
55+
* Imports Stacks node events from a TSV file and ingests them through the Event Server.
56+
* @param filePath - Path to TSV file to read
57+
* @param importMode - Event import mode
58+
* @param wipeDb - If we should wipe the DB before importing
59+
* @param force - If we should force drop all tables
60+
*/
61+
export async function importEventsFromTsv(
62+
filePath?: string,
63+
importMode?: string,
64+
wipeDb: boolean = false,
65+
force: boolean = false
66+
): Promise<void> {
67+
if (!filePath) {
68+
throw new Error(`A file path should be specified with the --file option`);
69+
}
70+
const resolvedFilePath = path.resolve(filePath);
71+
if (!fs.existsSync(resolvedFilePath)) {
72+
throw new Error(`File does not exist: ${resolvedFilePath}`);
73+
}
74+
let eventImportMode: EventImportMode;
75+
switch (importMode) {
76+
case 'pruned':
77+
eventImportMode = EventImportMode.pruned;
78+
break;
79+
case 'archival':
80+
case undefined:
81+
eventImportMode = EventImportMode.archival;
82+
break;
83+
default:
84+
throw new Error(`Invalid event import mode: ${importMode}`);
85+
}
86+
const hasData = await PgDataStore.containsAnyRawEventRequests();
87+
if (!wipeDb && hasData) {
88+
throw new Error(`Database contains existing data. Add --wipe-db to drop the existing tables.`);
89+
}
90+
if (force) {
91+
await dangerousDropAllTables({ acknowledgePotentialCatastrophicConsequences: 'yes' });
92+
}
93+
94+
// This performs a "migration down" which drops the tables, then re-creates them.
95+
// If there's a breaking change in the migration files, this will throw, and the pg database needs wiped manually,
96+
// or the `--force` option can be used.
97+
await cycleMigrations({ dangerousAllowDataLoss: true });
98+
99+
// Look for the TSV's block height and determine the prunable block window.
100+
const tsvBlockHeight = await findTsvBlockHeight(resolvedFilePath);
101+
const blockWindowSize = parseInt(
102+
process.env['STACKS_MEMPOOL_TX_GARBAGE_COLLECTION_THRESHOLD'] ?? '256'
103+
);
104+
const prunedBlockHeight = Math.max(tsvBlockHeight - blockWindowSize, 0);
105+
console.log(`Event file's block height: ${tsvBlockHeight}`);
106+
console.log(`Starting event import and playback in ${eventImportMode} mode`);
107+
if (eventImportMode === EventImportMode.pruned) {
108+
console.log(`Ignoring all prunable events before block height: ${prunedBlockHeight}`);
109+
}
110+
111+
const db = await PgDataStore.connect({
112+
usageName: 'import-events',
113+
skipMigrations: true,
114+
withNotifier: false,
115+
eventReplay: true,
116+
});
117+
const eventServer = await startEventServer({
118+
datastore: db,
119+
chainId: getApiConfiguredChainID(),
120+
serverHost: '127.0.0.1',
121+
serverPort: 0,
122+
httpLogLevel: 'debug',
123+
});
124+
125+
const readStream = fs.createReadStream(resolvedFilePath);
126+
const rawEventsIterator = PgDataStore.getRawEventRequests(readStream, status => {
127+
console.log(status);
128+
});
129+
// Set logger to only output for warnings/errors, otherwise the event replay will result
130+
// in the equivalent of months/years of API log output.
131+
logger.level = 'warn';
132+
// Disable this feature so a redundant export file isn't created while importing from an existing one.
133+
delete process.env['STACKS_EXPORT_EVENTS_FILE'];
134+
// The current import block height. Will be updated with every `/new_block` event.
135+
let blockHeight = 0;
136+
let isPruneFinished = false;
137+
for await (const rawEvents of rawEventsIterator) {
138+
for (const rawEvent of rawEvents) {
139+
if (eventImportMode === EventImportMode.pruned) {
140+
if (PRUNABLE_EVENT_PATHS.includes(rawEvent.event_path) && blockHeight < prunedBlockHeight) {
141+
// Prunable events are ignored here.
142+
continue;
143+
}
144+
if (blockHeight == prunedBlockHeight && !isPruneFinished) {
145+
isPruneFinished = true;
146+
console.log(`Resuming prunable event import...`);
147+
}
148+
}
149+
await httpPostRequest({
150+
host: '127.0.0.1',
151+
port: eventServer.serverAddress.port,
152+
path: rawEvent.event_path,
153+
headers: { 'Content-Type': 'application/json' },
154+
body: Buffer.from(rawEvent.payload, 'utf8'),
155+
throwOnNotOK: true,
156+
});
157+
if (rawEvent.event_path === '/new_block') {
158+
blockHeight = await getDbBlockHeight(db);
159+
}
160+
}
161+
}
162+
await db.finishEventReplay();
163+
console.log(`Event import and playback successful.`);
164+
await eventServer.closeAsync();
165+
await db.close();
166+
}

src/event-replay/helpers.ts

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
import { PgDataStore } from '../datastore/postgres-store';
2+
import { ReverseFileStream } from './reverse-file-stream';
3+
4+
/**
5+
* Traverse a TSV file in reverse to find the last received `/new_block` node message and return
6+
* the `block_height` reported by that event. Even though the block produced by that event might
7+
* end up being re-org'd, it gives us a reasonable idea as to what the Stacks node thought
8+
* the block height was the moment it was sent.
9+
* @param filePath - TSV path
10+
* @returns `number` found block height, 0 if not found
11+
*/
12+
export async function findTsvBlockHeight(filePath: string): Promise<number> {
13+
let blockHeight = 0;
14+
const reverseStream = new ReverseFileStream(filePath);
15+
for await (const data of reverseStream) {
16+
const columns = data.toString().split('\t');
17+
const eventName = columns[2];
18+
if (eventName === '/new_block') {
19+
const payload = columns[3];
20+
blockHeight = JSON.parse(payload).block_height;
21+
break;
22+
}
23+
}
24+
25+
reverseStream.destroy();
26+
return blockHeight;
27+
}
28+
29+
/**
30+
* Get the current block height from the DB. We won't use the `getChainTip` method since that
31+
* adds some conversions from block hashes into strings that we're not interested in. We also can't
32+
* use the `chain_tip` materialized view since it is unavailable during replay, so we'll use the
33+
* `block_height DESC` index.
34+
* @param db - Data store
35+
* @returns Block height
36+
*/
37+
export async function getDbBlockHeight(db: PgDataStore): Promise<number> {
38+
const result = await db.query(async client => {
39+
return await client.query<{ block_height: number }>(
40+
`SELECT MAX(block_height) as block_height FROM blocks WHERE canonical = TRUE`
41+
);
42+
});
43+
if (result.rowCount === 0) {
44+
return 0;
45+
}
46+
return result.rows[0].block_height;
47+
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
import * as stream from 'stream';
2+
import * as fs from 'fs';
3+
4+
/**
5+
* Streams lines from a text file in reverse, starting from the end of the file.
6+
* Modernized version of https://www.npmjs.com/package/fs-reverse
7+
*/
8+
export class ReverseFileStream extends stream.Readable {
9+
private fileDescriptor: number;
10+
private position: number;
11+
12+
private lineBuffer: string[] = [];
13+
private remainder: string = '';
14+
15+
public readonly fileLength: number;
16+
public bytesRead: number = 0;
17+
18+
constructor(filePath: string, opts?: stream.ReadableOptions) {
19+
// `objectMode` avoids the `Buffer->utf8->Buffer->utf8` conversions when pushing strings
20+
super({ ...{ objectMode: true, autoDestroy: true }, ...opts });
21+
this.fileLength = fs.statSync(filePath).size;
22+
this.position = this.fileLength;
23+
this.fileDescriptor = fs.openSync(filePath, 'r', 0o666);
24+
}
25+
26+
_read(size: number): void {
27+
while (this.lineBuffer.length === 0 && this.position > 0) {
28+
// Read `size` bytes from the end of the file.
29+
const length = Math.min(size, this.position);
30+
const buffer = Buffer.alloc(length);
31+
this.position = this.position - length;
32+
this.bytesRead += fs.readSync(this.fileDescriptor, buffer, 0, length, this.position);
33+
34+
// Split into lines to fill the `lineBuffer`
35+
this.remainder = buffer.toString('utf8') + this.remainder;
36+
this.lineBuffer = this.remainder.split(/\r?\n/);
37+
// Ignore empty/trailing lines, `readable.push('')` is not recommended
38+
this.lineBuffer = this.lineBuffer.filter(line => line.length > 0);
39+
this.remainder = this.lineBuffer.shift() ?? '';
40+
}
41+
if (this.lineBuffer.length) {
42+
this.push(this.lineBuffer.pop());
43+
} else if (this.remainder.length) {
44+
this.push(this.remainder);
45+
this.remainder = '';
46+
} else {
47+
this.push(null);
48+
}
49+
}
50+
51+
_destroy(error: Error | null, callback: (error?: Error | null) => void): void {
52+
fs.closeSync(this.fileDescriptor);
53+
}
54+
}

src/helpers.ts

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import {
1717
SyslogConfigSetLevels,
1818
} from 'winston/lib/winston/config';
1919
import { DbStxEvent, DbTx } from './datastore/common';
20+
import { StacksCoreRpcClient } from './core-rpc/client';
2021

2122
export const isDevEnv = process.env.NODE_ENV === 'development';
2223
export const isTestEnv = process.env.NODE_ENV === 'test';
@@ -960,3 +961,34 @@ export function isSmartContractTx(dbTx: DbTx, stxEvents: DbStxEvent[] = []): boo
960961
}
961962
return false;
962963
}
964+
965+
/**
966+
* Gets the chain id as reported by the Stacks node.
967+
* @returns `ChainID` Chain id
968+
*/
969+
export async function getStacksNodeChainID(): Promise<ChainID> {
970+
const client = new StacksCoreRpcClient();
971+
await client.waitForConnection(Infinity);
972+
const coreInfo = await client.getInfo();
973+
if (coreInfo.network_id === ChainID.Mainnet) {
974+
return ChainID.Mainnet;
975+
} else if (coreInfo.network_id === ChainID.Testnet) {
976+
return ChainID.Testnet;
977+
} else {
978+
throw new Error(`Unexpected network_id "${coreInfo.network_id}"`);
979+
}
980+
}
981+
982+
/**
983+
* Gets the chain id as configured by the `STACKS_CHAIN_ID` API env variable.
984+
* @returns `ChainID` Chain id
985+
*/
986+
export function getApiConfiguredChainID() {
987+
if (!('STACKS_CHAIN_ID' in process.env)) {
988+
const error = new Error(`Env var STACKS_CHAIN_ID is not set`);
989+
logError(error.message, error);
990+
throw error;
991+
}
992+
const configuredChainID: ChainID = parseInt(process.env['STACKS_CHAIN_ID'] as string);
993+
return configuredChainID;
994+
}

0 commit comments

Comments
 (0)