Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@
],
"outputCapture": "std",
"console": "internalConsole",
"runtimeVersion": "22",
"nodeVersionHint": 22,
},
{
"name": "Chunk parser prototyping",
Expand Down
13 changes: 12 additions & 1 deletion chunk-parser/src/recover-slot-pubkey.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,18 @@
import crypto from 'node:crypto';
import * as secp from '@noble/secp256k1';
// import * as secp from '@noble/secp256k1';
import { ModifiedSlot, NewNakamotoBlockMessage, toU32BeBytes } from './common';

let secp: typeof import('@noble/secp256k1');

async function getSecp() {
if (!secp) secp = await import('@noble/secp256k1');
return secp;
}
getSecp().catch(error => {
console.error(`Failed to load secp256k1: ${error}`, error);
throw error;
});

/** Get the digest to sign that authenticates this chunk data and metadata */
function authDigest(slot: ModifiedSlot): Buffer {
const hasher = crypto.createHash('sha512-256');
Expand Down
8 changes: 4 additions & 4 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
"@fastify/swagger": "^8.15.0",
"@fastify/type-provider-typebox": "^4.1.0",
"@hirosystems/api-toolkit": "^1.9.0",
"@hirosystems/salt-n-pepper-client": "^1.0.4-beta.1",
"@hirosystems/salt-n-pepper-client": "^1.1.1",
"@noble/secp256k1": "^2.2.3",
"@sinclair/typebox": "^0.28.17",
"@stacks/transactions": "^7.0.6",
Expand Down
33 changes: 11 additions & 22 deletions src/event-stream/event-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,48 +6,36 @@ import {
CoreNodeNakamotoBlockMessage,
StackerDbChunk,
} from './core-node-message';
import { logger as defaultLogger, stopwatch } from '@hirosystems/api-toolkit';
import { logger as defaultLogger, stopwatch, WorkerThreadManager } from '@hirosystems/api-toolkit';
import { ENV } from '../env';
import {
ParsedNakamotoBlock,
ParsedStackerDbChunk,
parseNakamotoBlockMsg,
parseStackerDbChunk,
} from './msg-parsing';
import { ParsedNakamotoBlock, ParsedStackerDbChunk } from './msg-parsing';
import { SignerMessagesEventPayload } from '../pg/types';
import { ThreadedParser } from './threaded-parser';
import { SERVER_VERSION } from '@hirosystems/api-toolkit';
import { EventEmitter } from 'node:events';

// TODO: move this into the @hirosystems/salt-n-pepper-client lib
function sanitizeRedisClientName(value: string): string {
const nameSanitizer = /[^!-~]+/g;
return value.trim().replace(nameSanitizer, '-');
}
import * as msgParserWorkerBlocks from './msg-parser-worker-blocks';
import * as msgParserWorkerStackerDb from './msg-parser-worker-stackerdb';

export class EventStreamHandler {
db: PgStore;
logger = defaultLogger.child({ name: 'EventStreamHandler' });
eventStream: StacksEventStream;
threadedParser: ThreadedParser;
threadedParserBlocks = new WorkerThreadManager(msgParserWorkerBlocks);
threadedParserStackerDb = new WorkerThreadManager(msgParserWorkerStackerDb);

readonly events = new EventEmitter<{
processedMessage: [{ msgId: string }];
}>();

constructor(opts: { db: PgStore; lastMessageId: string }) {
this.db = opts.db;
const appName = sanitizeRedisClientName(
`signer-metrics-api ${SERVER_VERSION.tag} (${SERVER_VERSION.branch}:${SERVER_VERSION.commit})`
);
const appName = `signer-metrics-api ${SERVER_VERSION.tag} (${SERVER_VERSION.branch}:${SERVER_VERSION.commit})`;
this.eventStream = new StacksEventStream({
redisUrl: ENV.REDIS_URL,
redisStreamPrefix: ENV.REDIS_STREAM_KEY_PREFIX,
eventStreamType: StacksEventStreamType.all,
lastMessageId: opts.lastMessageId,
appName,
});
this.threadedParser = new ThreadedParser();
}

async start() {
Expand All @@ -71,7 +59,7 @@ export class EventStreamHandler {
);
}
if ('signer_signature_hash' in blockMsg) {
const parsed = await this.threadedParser.parseNakamotoBlock(nakamotoBlockMsg);
const parsed = await this.threadedParserBlocks.exec(nakamotoBlockMsg);
await this.handleNakamotoBlockMsg(messageId, parseInt(timestamp), parsed);
} else {
// ignore pre-Nakamoto blocks
Expand All @@ -81,7 +69,7 @@ export class EventStreamHandler {

case '/stackerdb_chunks': {
const msg = body as StackerDbChunk;
const parsed = await this.threadedParser.parseStackerDbChunk(msg);
const parsed = await this.threadedParserStackerDb.exec(msg);
await this.handleStackerDbMsg(messageId, parseInt(timestamp), parsed);
break;
}
Expand Down Expand Up @@ -109,7 +97,8 @@ export class EventStreamHandler {

async stop(): Promise<void> {
await this.eventStream.stop();
await this.threadedParser.close();
await this.threadedParserBlocks.close();
await this.threadedParserStackerDb.close();
}

async handleStackerDbMsg(
Expand Down
8 changes: 8 additions & 0 deletions src/event-stream/msg-parser-worker-blocks.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import type { CoreNodeNakamotoBlockMessage } from './core-node-message';
import { ParsedNakamotoBlock, parseNakamotoBlockMsg } from './msg-parsing';

export function processTask(msg: CoreNodeNakamotoBlockMessage): ParsedNakamotoBlock {
return parseNakamotoBlockMsg(msg);
}

export const workerModule = module;
7 changes: 7 additions & 0 deletions src/event-stream/msg-parser-worker-stackerdb.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import type { StackerDbChunk } from './core-node-message';
import { parseStackerDbChunk } from './msg-parsing';

export function processTask(msg: StackerDbChunk) {
return parseStackerDbChunk(msg);
}
export const workerModule = module;
92 changes: 0 additions & 92 deletions src/event-stream/threaded-parser-worker.ts

This file was deleted.

82 changes: 0 additions & 82 deletions src/event-stream/threaded-parser.ts

This file was deleted.

27 changes: 12 additions & 15 deletions src/pg/pg-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import {
import * as path from 'path';
import { PgWriteStore } from './ingestion/pg-write-store';
import { BlockIdParam, normalizeHexString, sleep } from '../helpers';
import { Fragment } from 'postgres';
import { DbBlockProposalQueryResponse } from './types';
import { NotificationPgStore } from './notifications/pg-notifications';

Expand Down Expand Up @@ -491,20 +490,18 @@ export class PgStore extends BasePgStore {
}

async getSignerDataForBlock({ sql, blockId }: { sql: PgSqlClient; blockId: BlockIdParam }) {
let blockFilter: Fragment;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Work around for a type error due to the api-toolkit using an older version of the postgres library.
TODO: update postgres library in toolkit repo

switch (blockId.type) {
case 'height':
blockFilter = sql`block_height = ${blockId.height}`;
break;
case 'hash':
blockFilter = sql`block_hash = ${normalizeHexString(blockId.hash)}`;
break;
case 'latest':
blockFilter = sql`block_height = (SELECT block_height FROM chain_tip)`;
break;
default:
throw new Error(`Invalid blockId type: ${blockId}`);
}
const blockFilter = (() => {
switch (blockId.type) {
case 'height':
return sql`block_height = ${blockId.height}`;
case 'hash':
return sql`block_hash = ${normalizeHexString(blockId.hash)}`;
case 'latest':
return sql`block_height = (SELECT block_height FROM chain_tip)`;
default:
throw new Error(`Invalid blockId type: ${blockId}`);
}
})();

const result = await sql<
{
Expand Down
Loading