Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/indexer/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
"ioredis": "^5.4.1",
"lodash": "^4.17.21",
"luxon": "^3.5.0",
"p-lazy": "3.1.0",
"p-retry": "^4.6.2",
"redis": "^4.7.0",
"superstruct": "^2.0.3-1",
"viem": "^2.40.3",
Expand Down
4 changes: 2 additions & 2 deletions packages/indexer/src/data-indexing/service/filtering.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ export const createCctpBurnFilter = async (
logger: Logger,
): Promise<boolean> => {
// Check if receipt is present in payload
const receipt = payload.transactionReceipt;
const receipt = await payload.transactionReceipt;
if (!receipt) {
logger.debug({
at: "createSwapApiFilter",
Expand Down Expand Up @@ -142,7 +142,7 @@ export const createCctpMintFilter = async (
logger: Logger,
): Promise<boolean> => {
// Check if receipt is present in payload
const receipt = payload.transactionReceipt;
const receipt = await payload.transactionReceipt;
if (!receipt) {
logger.debug({
at: "createCctpMintFilter",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import pRetry from "p-retry";
import Bottleneck from "bottleneck";
import {
parseAbi,
Expand All @@ -7,8 +8,10 @@ import {
type Chain,
type Transaction,
TransactionReceipt,
GetBlockReturnType,
} from "viem";
import { Logger } from "winston";
import PLazy from "p-lazy";
import { DataDogMetricsService } from "../../services/MetricsService";

/**
Expand All @@ -22,6 +25,10 @@ import { DataDogMetricsService } from "../../services/MetricsService";
* - Pushing the clean payload into the next stage of the pipeline via a callback.
*/

const MAX_RETRY_TIMEOUT = 60000;
const RETRY_ATTEMPTS = 10;
const RETRY_BACKOFF_EXPONENT = 2;

/**
* Defines the configuration for a single event subscription.
* This allows the listener service to be generic and data-driven.
Expand Down Expand Up @@ -56,7 +63,7 @@ export interface IndexerEventPayload {
/** The transaction that generated the event. */
transaction?: Transaction;
/** The receipt of the transaction that generated the event. */
transactionReceipt?: TransactionReceipt;
transactionReceipt?: Promise<TransactionReceipt>;
}

/**
Expand Down Expand Up @@ -93,7 +100,6 @@ export interface ProcessLogBatchArgs<TPayload> {
config: EventConfig;
chainId: number;
client: PublicClient<Transport, Chain>;

onEvent: (payload: TPayload) => void;
logger: Logger;
metrics?: DataDogMetricsService;
Expand Down Expand Up @@ -183,11 +189,8 @@ async function processLogBatch<TPayload>(
];

// Create caches for this batch to avoid duplicate requests/parallel fetching
const blockCache = new Map<
bigint,
{ timestamp: bigint; transactions: Transaction[] }
>();
const receiptCache = new Map<string, TransactionReceipt>();
const blockCache = new Map<bigint, GetBlockReturnType<Chain, true>>();
const receiptCache = new Map<string, Promise<TransactionReceipt>>();

// Process all logs in parallel
await Promise.all(
Expand All @@ -200,17 +203,34 @@ async function processLogBatch<TPayload>(
}

// --- Fetch Block & Transactions (Deduplicated) ---
let blockInformation = blockCache.get(logItem.blockNumber);
if (!blockInformation) {
blockInformation = await client.getBlock({
blockNumber: logItem.blockNumber,
includeTransactions: true,
});
metrics?.addCountMetric("rpcCallGetBlock", tags);

blockCache.set(logItem.blockNumber, blockInformation);
let block = blockCache.get(logItem.blockNumber);
if (!block) {
block = await pRetry(
() => {
metrics?.addCountMetric("rpcCallGetBlock", tags);
return client.getBlock({
blockNumber: logItem.blockNumber!,
includeTransactions: true,
});
},
{
retries: RETRY_ATTEMPTS,
factor: RETRY_BACKOFF_EXPONENT,
maxTimeout: MAX_RETRY_TIMEOUT,
onFailedAttempt: (error) => {
logger.warn({
at: "genericEventListener#processLogBatch",
message: `Failed to fetch block ${logItem.blockNumber}, retrying...`,
attempt: error.attemptNumber,
retriesLeft: error.retriesLeft,
error,
});
},
},
);
blockCache.set(logItem.blockNumber, block);
}
const { timestamp: blockTimestamp, transactions } = blockInformation;
const { timestamp: blockTimestamp, transactions } = await block;

// --- Find Transaction ---
let transaction: Transaction | undefined;
Expand All @@ -221,15 +241,40 @@ async function processLogBatch<TPayload>(
}

// --- Fetch Transaction Receipt (Deduplicated) ---
let transactionReceipt: TransactionReceipt | undefined;
let transactionReceiptPromise: Promise<TransactionReceipt> | undefined;
if (logItem.transactionHash) {
transactionReceipt = receiptCache.get(logItem.transactionHash);
if (!transactionReceipt) {
transactionReceipt = await client.getTransactionReceipt({
hash: logItem.transactionHash,
transactionReceiptPromise = receiptCache.get(logItem.transactionHash);
if (!transactionReceiptPromise) {
transactionReceiptPromise = new PLazy((resolve, reject) => {
pRetry(
() => {
metrics?.addCountMetric("rpcCallGetTransactionReceipt", tags);
return client.getTransactionReceipt({
hash: logItem.transactionHash!,
});
},
{
retries: RETRY_ATTEMPTS,
factor: RETRY_BACKOFF_EXPONENT,
maxTimeout: MAX_RETRY_TIMEOUT,
onFailedAttempt: (error) => {
logger.warn({
at: "genericEventListener#processLogBatch",
message: `Failed to fetch receipt for ${logItem.transactionHash}, retrying...`,
attempt: error.attemptNumber,
retriesLeft: error.retriesLeft,
error,
});
},
},
)
.then(resolve)
.catch(reject);
});
metrics?.addCountMetric("rpcCallGetTransactionReceipt", tags);
receiptCache.set(logItem.transactionHash, transactionReceipt);
receiptCache.set(
logItem.transactionHash,
transactionReceiptPromise,
);
}
}

Expand All @@ -241,7 +286,7 @@ async function processLogBatch<TPayload>(
// The events are emitted at head so we can simply take the block number of the log as the currentBlockHeight
currentBlockHeight: logItem.blockNumber,
transaction,
transactionReceipt,
transactionReceipt: transactionReceiptPromise,
} as TPayload;

// Trigger the side effect (Forward it to an event processor or message queue)
Expand Down Expand Up @@ -274,6 +319,7 @@ async function processLogBatch<TPayload>(
error,
logIndex: logItem.logIndex,
txHash: logItem.transactionHash,
chainId,
});
metrics?.addCountMetric("processLogError", tags);
}
Expand Down
6 changes: 5 additions & 1 deletion packages/indexer/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,11 @@ export async function Main(config: parseEnv.Config, logger: winston.Logger) {
const abortController = new AbortController();

if (process.env.ENABLE_WEBSOCKET_INDEXER === "true") {
const allProviders = parseEnv.parseProvidersUrls();
// Merge providers, allowing WS providers to override RPC providers if defined for a chain
const allProviders = new Map([
...parseEnv.parseProvidersUrls("RPC_PROVIDER_URLS_"),
...parseEnv.parseProvidersUrls("WS_RPC_PROVIDER_URLS_"),
]);

// Determine which chains to index via WebSocket
let wsChainIds: number[] = []; // Default to Arbitrum
Expand Down
6 changes: 4 additions & 2 deletions packages/indexer/src/parseEnv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,12 @@ function parseProviderConfigs(env: Env): ProviderConfig[] {
return results;
}

export function parseProvidersUrls() {
export function parseProvidersUrls(prefix: string = "RPC_PROVIDER_URLS_") {
const results: Map<number, string[]> = new Map();
const regex = new RegExp(`^${prefix}(\\d+)$`);

for (const [key, value] of Object.entries(process.env)) {
const match = key.match(/^RPC_PROVIDER_URLS_(\d+)$/);
const match = key.match(regex);
if (match) {
const chainId = match[1] ? parseNumber(match[1]) : undefined;
if (chainId && value) {
Expand Down
26 changes: 26 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.