Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 103 additions & 5 deletions packages/indexer/src/data-indexing/service/CctpFinalizerService.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import winston, { Logger } from "winston";
import axios from "axios";

import { RepeatableTask } from "../../generics";
import { DataSource, entities } from "@repo/indexer-database";
Expand All @@ -14,9 +13,11 @@ import {
} from "../adapter/cctp-v2/service";

export const CCTP_FINALIZER_DELAY_SECONDS = 10;
export const CCTP_UNFINALIZED_MONITOR_DELAY_SECONDS = 5 * 60; // 5 minutes

export class CctpFinalizerServiceManager {
private service: RepeatableTask;
private finalizerService: RepeatableTask;
private monitorService: RepeatableTask;
private pubSubService: PubSubService;

constructor(
Expand All @@ -36,12 +37,20 @@ export class CctpFinalizerServiceManager {
}

this.pubSubService = new PubSubService(this.config);
this.service = new CctpFinalizerService(
this.finalizerService = new CctpFinalizerService(
this.logger,
this.postgres,
this.pubSubService,
);
await this.service.start(CCTP_FINALIZER_DELAY_SECONDS);
this.monitorService = new CctpUnfinalizedBurnMonitorService(
this.logger,
this.postgres,
);

await Promise.all([
this.finalizerService.start(CCTP_FINALIZER_DELAY_SECONDS),
this.monitorService.start(CCTP_UNFINALIZED_MONITOR_DELAY_SECONDS),
]);
} catch (error) {
this.logger.error({
at: "Indexer#CctpFinalizerServiceManager#start",
Expand All @@ -54,7 +63,8 @@ export class CctpFinalizerServiceManager {
}

public async stopGracefully() {
this.service?.stop();
this.finalizerService?.stop();
this.monitorService?.stop();
}
}

Expand Down Expand Up @@ -270,6 +280,94 @@ class CctpFinalizerService extends RepeatableTask {
}
}

/**
* @description Monitors for CCTP burn events that have been published to the finalizer
* but haven't been finalized on the destination chain within the expected timeframe.
* Runs every 5 minutes to detect and alert on stuck burns.
*/
class CctpUnfinalizedBurnMonitorService extends RepeatableTask {
constructor(
logger: winston.Logger,
private readonly postgres: DataSource,
) {
super(logger, "cctp-unfinalized-burn-monitor");
}

protected async taskLogic(): Promise<void> {
try {
const unfinalizedBurns = await this.postgres
.createQueryBuilder(entities.CctpFinalizerJob, "job")
.innerJoinAndSelect("job.burnEvent", "burnEvent")
.innerJoin(
entities.MessageSent,
"messageSent",
"messageSent.transactionHash = burnEvent.transactionHash AND messageSent.chainId = burnEvent.chainId",
)
.leftJoin(
entities.MessageReceived,
"messageReceived",
"messageReceived.sourceDomain = messageSent.sourceDomain AND messageReceived.nonce = messageSent.nonce",
)
.where("now() - job.createdAt > interval '30 minutes'")
.andWhere("now() - job.createdAt < interval '24 hours'")
.andWhere("messageReceived.id IS NULL")
.addSelect("messageSent.sourceDomain", "sourceDomain")
.addSelect("messageSent.nonce", "nonce")
.addSelect("messageSent.destinationDomain", "destinationDomain")
.getRawAndEntities();

for (let i = 0; i < unfinalizedBurns.entities.length; i++) {
const job = unfinalizedBurns.entities[i]!;
const raw = unfinalizedBurns.raw[i];

if (!raw?.sourceDomain || !raw?.destinationDomain) {
continue;
}

const { nonce, sourceDomain, destinationDomain } = raw;
const isProduction = isProductionNetwork(Number(job.burnEvent.chainId));
const destinationChainId = getCctpDestinationChainFromDomain(
destinationDomain,
isProduction,
);

const elapsedMinutes = Math.round(
(Date.now() - job.createdAt.getTime()) / 1000 / 60,
);

this.logger.error({
at: "CctpUnfinalizedBurnMonitorService#taskLogic",
message: `CCTP burn event has not been finalized after ${elapsedMinutes} minutes`,
notificationPath: "across-indexer-error",
sourceChainId: job.burnEvent.chainId,
destinationChainId,
burnTransactionHash: job.burnEvent.transactionHash,
burnBlockNumber: job.burnEvent.blockNumber,
amount: job.burnEvent.amount,
depositor: job.burnEvent.depositor,
mintRecipient: job.burnEvent.mintRecipient,
sourceDomain,
nonce,
jobCreatedAt: job.createdAt,
elapsedMinutes,
});
}
} catch (error) {
this.logger.error({
at: "CctpUnfinalizedBurnMonitorService#taskLogic",
message: "Error checking for unfinalized burn events",
notificationPath: "across-indexer-error",
errorJson: JSON.stringify(error),
error,
});
}
}

protected initialize(): Promise<void> {
return Promise.resolve();
}
}

const ATTESTATION_TIMES = {
[CHAIN_IDs.MAINNET]: { standard: 13 * 60, fast: 20 },
[CHAIN_IDs.ARBITRUM]: { standard: 13 * 60, fast: 8 },
Expand Down