Skip to content

feat: add chainhooks index notifier #2321

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 7 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -216,3 +216,9 @@ STACKS_EVENTS_DIR=./events
# SNP_REDIS_URL=redis://127.0.0.1:6379
# Only specify `SNP_REDIS_STREAM_KEY_PREFIX` if `REDIS_STREAM_KEY_PREFIX` is configured on the SNP server.
# SNP_REDIS_STREAM_KEY_PREFIX=

# If enabled this service will notify Chainhooks via Redis whenever the Stacks index advances e.g.
# whenever a new block is confirmed. This is used by Chainhooks to send block data to its subscribers.
# CHAINHOOKS_NOTIFIER_ENABLED=true
# CHAINHOOKS_REDIS_URL=redis://127.0.0.1:6379
# CHAINHOOKS_REDIS_QUEUE=index-progress
79 changes: 79 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@
"fastify": "4.29.1",
"fastify-metrics": "11.0.0",
"getopts": "2.3.0",
"ioredis": "5.6.1",
"jsonc-parser": "3.0.0",
"jsonrpc-lite": "2.2.0",
"lru-cache": "6.0.0",
Expand Down
61 changes: 61 additions & 0 deletions src/datastore/chainhooks-notifier.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import Redis from 'ioredis';
import { ReOrgUpdatedEntities } from './common';
import { ChainID } from '@stacks/transactions';
import { getApiConfiguredChainID } from '../helpers';
import { logger } from '@hirosystems/api-toolkit';

/**
* Notifies Chainhooks of the progress of the Stacks index via a message sent to a Redis queue. This
* message will contain a block header for each new canonical block as well as headers for those
* that need to be rolled back from a re-org.
*/
export class ChainhooksNotifier {
private readonly redis: Redis;
private readonly chainId: ChainID;
private readonly queue: string;

constructor() {
const url = process.env.CHAINHOOKS_REDIS_URL;
if (!url) throw new Error(`ChainhooksNotifier is enabled but CHAINHOOKS_REDIS_URL is not set`);
this.queue = process.env.CHAINHOOKS_REDIS_QUEUE ?? 'chainhooks:stacks:index-progress';
this.redis = new Redis(url);
this.chainId = getApiConfiguredChainID();
logger.info(`ChainhooksNotifier initialized for queue ${this.queue} on ${url}`);
}

/**
* Broadcast index progress message to Chainhooks Redis queue.
* @param reOrg - The re-org updated entities, if any
* @param indexBlockHash - Block hash of the newest canonical block
* @param blockHeight - Block height of the newest canonical block
*/
async notify(reOrg: ReOrgUpdatedEntities, indexBlockHash: string, blockHeight: number) {
const message = {
id: `stacks-${blockHeight}-${indexBlockHash}-${Date.now()}`,
payload: {
chain: 'stacks',
network: this.chainId === ChainID.Mainnet ? 'mainnet' : 'testnet',
apply_blocks: [
...reOrg.markedCanonical.blockHeaders.map(block => ({
hash: block.index_block_hash,
index: block.block_height,
})),
{
hash: indexBlockHash,
index: blockHeight,
},
],
rollback_blocks: reOrg.markedNonCanonical.blockHeaders.map(block => ({
hash: block.index_block_hash,
index: block.block_height,
})),
},
};
logger.debug(message, 'ChainhooksNotifier broadcasting index progress message');
await this.redis.rpush(this.queue, JSON.stringify(message));
}

async close() {
await this.redis.quit();
}
}
1 change: 1 addition & 0 deletions src/datastore/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1111,6 +1111,7 @@ export interface DbPoxCycleSignerStacker {
}

interface ReOrgEntities {
blockHeaders: { index_block_hash: string; block_height: number }[];
blocks: number;
microblockHashes: string[];
microblocks: number;
Expand Down
2 changes: 2 additions & 0 deletions src/datastore/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1315,6 +1315,7 @@ export function markBlockUpdateDataAsNonCanonical(data: DataStoreBlockUpdateData
export function newReOrgUpdatedEntities(): ReOrgUpdatedEntities {
return {
markedCanonical: {
blockHeaders: [],
blocks: 0,
microblockHashes: [],
microblocks: 0,
Expand All @@ -1336,6 +1337,7 @@ export function newReOrgUpdatedEntities(): ReOrgUpdatedEntities {
poxCycles: 0,
},
markedNonCanonical: {
blockHeaders: [],
blocks: 0,
microblockHashes: [],
microblocks: 0,
Expand Down
42 changes: 38 additions & 4 deletions src/datastore/pg-write-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ import {
} from '@hirosystems/api-toolkit';
import { PgServer, getConnectionArgs, getConnectionConfig } from './connection';
import { BigNumber } from 'bignumber.js';
import { ChainhooksNotifier } from './chainhooks-notifier';

const MIGRATIONS_TABLE = 'pgmigrations';
const INSERT_BATCH_SIZE = 500;
Expand Down Expand Up @@ -130,6 +131,7 @@ type TransactionHeader = {
*/
export class PgWriteStore extends PgStore {
readonly isEventReplay: boolean;
protected readonly chainhooksNotifier: ChainhooksNotifier | undefined = undefined;
protected isIbdBlockHeightReached = false;
private metrics:
| {
Expand All @@ -141,10 +143,12 @@ export class PgWriteStore extends PgStore {
constructor(
sql: PgSqlClient,
notifier: PgNotifier | undefined = undefined,
isEventReplay: boolean = false
isEventReplay: boolean = false,
chainhooksNotifier: ChainhooksNotifier | undefined = undefined
) {
super(sql, notifier);
this.isEventReplay = isEventReplay;
this.chainhooksNotifier = chainhooksNotifier;
if (isProdEnv) {
this.metrics = {
blockHeight: new prom.Gauge({
Expand All @@ -163,11 +167,13 @@ export class PgWriteStore extends PgStore {
usageName,
skipMigrations = false,
withNotifier = true,
withChainhooksNotifier = false,
isEventReplay = false,
}: {
usageName: string;
skipMigrations?: boolean;
withNotifier?: boolean;
withChainhooksNotifier?: boolean;
isEventReplay?: boolean;
}): Promise<PgWriteStore> {
const sql = await connectPostgres({
Expand All @@ -190,7 +196,8 @@ export class PgWriteStore extends PgStore {
});
}
const notifier = withNotifier ? await PgNotifier.create(usageName) : undefined;
const store = new PgWriteStore(sql, notifier, isEventReplay);
const chainhooksNotifier = withChainhooksNotifier ? new ChainhooksNotifier() : undefined;
const store = new PgWriteStore(sql, notifier, isEventReplay, chainhooksNotifier);
await store.connectPgNotifier();
return store;
}
Expand Down Expand Up @@ -229,11 +236,13 @@ export class PgWriteStore extends PgStore {
async update(data: DataStoreBlockUpdateData): Promise<void> {
let garbageCollectedMempoolTxs: string[] = [];
let newTxData: DataStoreTxEventData[] = [];
let reorg: ReOrgUpdatedEntities = newReOrgUpdatedEntities();
let isCanonical = true;

await this.sqlWriteTransaction(async sql => {
const chainTip = await this.getChainTip(sql);
const reorg = await this.handleReorg(sql, data.block, chainTip.block_height);
const isCanonical = data.block.block_height > chainTip.block_height;
reorg = await this.handleReorg(sql, data.block, chainTip.block_height);
isCanonical = data.block.block_height > chainTip.block_height;
if (!isCanonical) {
markBlockUpdateDataAsNonCanonical(data);
} else {
Expand Down Expand Up @@ -402,6 +411,13 @@ export class PgWriteStore extends PgStore {
// Send block updates but don't block current execution unless we're testing.
if (isTestEnv) await this.sendBlockNotifications({ data, garbageCollectedMempoolTxs });
else void this.sendBlockNotifications({ data, garbageCollectedMempoolTxs });
if (isCanonical) {
await this.chainhooksNotifier?.notify(
reorg,
data.block.index_block_hash,
data.block.block_height
);
}
}

/**
Expand Down Expand Up @@ -3548,6 +3564,13 @@ export class PgWriteStore extends PgStore {
return result;
}

/**
* Recursively restore previously orphaned blocks to canonical.
* @param sql - The SQL client
* @param indexBlockHash - The index block hash that we will restore first
* @param updatedEntities - The updated entities
* @returns The updated entities
*/
async restoreOrphanedChain(
sql: PgSqlClient,
indexBlockHash: string,
Expand All @@ -3568,6 +3591,10 @@ export class PgWriteStore extends PgStore {
throw new Error(`Found multiple non-canonical parents for index_hash ${indexBlockHash}`);
}
updatedEntities.markedCanonical.blocks++;
updatedEntities.markedCanonical.blockHeaders.unshift({
index_block_hash: restoredBlockResult[0].index_block_hash,
block_height: restoredBlockResult[0].block_height,
});

// Orphan the now conflicting block at the same height
const orphanedBlockResult = await sql<BlockQueryResult[]>`
Expand Down Expand Up @@ -3606,6 +3633,10 @@ export class PgWriteStore extends PgStore {
}

updatedEntities.markedNonCanonical.blocks++;
updatedEntities.markedNonCanonical.blockHeaders.unshift({
index_block_hash: orphanedBlockResult[0].index_block_hash,
block_height: orphanedBlockResult[0].block_height,
});
const markNonCanonicalResult = await this.markEntitiesCanonical(
sql,
orphanedBlockResult[0].index_block_hash,
Expand Down Expand Up @@ -3662,6 +3693,8 @@ export class PgWriteStore extends PgStore {
markCanonicalResult.txsMarkedCanonical
);
updatedEntities.prunedMempoolTxs += prunedMempoolTxs.removedTxs.length;

// Do we have a parent that is non-canonical? If so, restore it recursively.
const parentResult = await sql<{ index_block_hash: string }[]>`
SELECT index_block_hash
FROM blocks
Expand Down Expand Up @@ -4019,6 +4052,7 @@ export class PgWriteStore extends PgStore {
if (this._debounceMempoolStat.debounce) {
clearTimeout(this._debounceMempoolStat.debounce);
}
await this.chainhooksNotifier?.close();
await super.close(args);
}
}
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ async function init(): Promise<void> {
dbWriteStore = await PgWriteStore.connect({
usageName: `write-datastore-${apiMode}`,
skipMigrations: apiMode === StacksApiMode.readOnly,
withChainhooksNotifier: parseBoolean(process.env['CHAINHOOKS_NOTIFIER_ENABLED']) ?? false,
});
registerMempoolPromStats(dbWriteStore.eventEmitter);
}
Expand Down
Loading