Skip to content

Commit f558f36

Browse files
feat: ws specific rpc urls (#574)
1 parent 6fed014 commit f558f36

File tree

2 files changed

+73
-27
lines changed

2 files changed

+73
-27
lines changed

packages/indexer/src/data-indexing/service/filtering.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ export const createCctpBurnFilter = async (
7575
logger: Logger,
7676
): Promise<boolean> => {
7777
// Check if receipt is present in payload
78-
const receipt = payload.transactionReceipt;
78+
const receipt = await payload.transactionReceipt;
7979
if (!receipt) {
8080
logger.debug({
8181
at: "createSwapApiFilter",
@@ -145,7 +145,7 @@ export const createCctpMintFilter = async (
145145
logger: Logger,
146146
): Promise<boolean> => {
147147
// Check if receipt is present in payload
148-
const receipt = payload.transactionReceipt;
148+
const receipt = await payload.transactionReceipt;
149149
if (!receipt) {
150150
logger.debug({
151151
at: "createCctpMintFilter",

packages/indexer/src/data-indexing/service/genericEventListening.ts

Lines changed: 71 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import pRetry from "p-retry";
12
import Bottleneck from "bottleneck";
23
import {
34
parseAbi,
@@ -7,8 +8,10 @@ import {
78
type Chain,
89
type Transaction,
910
TransactionReceipt,
11+
GetBlockReturnType,
1012
} from "viem";
1113
import { Logger } from "winston";
14+
import PLazy from "p-lazy";
1215
import { DataDogMetricsService } from "../../services/MetricsService";
1316

1417
/**
@@ -22,6 +25,10 @@ import { DataDogMetricsService } from "../../services/MetricsService";
2225
* - Pushing the clean payload into the next stage of the pipeline via a callback.
2326
*/
2427

28+
const MAX_RETRY_TIMEOUT = 60000;
29+
const RETRY_ATTEMPTS = 10;
30+
const RETRY_BACKOFF_EXPONENT = 2;
31+
2532
/**
2633
* Defines the configuration for a single event subscription.
2734
* This allows the listener service to be generic and data-driven.
@@ -56,7 +63,7 @@ export interface IndexerEventPayload {
5663
/** The transaction that generated the event. */
5764
transaction?: Transaction;
5865
/** The receipt of the transaction that generated the event. */
59-
transactionReceipt?: TransactionReceipt;
66+
transactionReceipt?: Promise<TransactionReceipt>;
6067
}
6168

6269
/**
@@ -93,7 +100,6 @@ export interface ProcessLogBatchArgs<TPayload> {
93100
config: EventConfig;
94101
chainId: number;
95102
client: PublicClient<Transport, Chain>;
96-
97103
onEvent: (payload: TPayload) => void;
98104
logger: Logger;
99105
metrics?: DataDogMetricsService;
@@ -183,11 +189,8 @@ async function processLogBatch<TPayload>(
183189
];
184190

185191
// Create caches for this batch to avoid duplicate requests/parallel fetching
186-
const blockCache = new Map<
187-
bigint,
188-
{ timestamp: bigint; transactions: Transaction[] }
189-
>();
190-
const receiptCache = new Map<string, TransactionReceipt>();
192+
const blockCache = new Map<bigint, GetBlockReturnType<Chain, true>>();
193+
const receiptCache = new Map<string, Promise<TransactionReceipt>>();
191194

192195
// Process all logs in parallel
193196
await Promise.all(
@@ -200,17 +203,34 @@ async function processLogBatch<TPayload>(
200203
}
201204

202205
// --- Fetch Block & Transactions (Deduplicated) ---
203-
let blockInformation = blockCache.get(logItem.blockNumber);
204-
if (!blockInformation) {
205-
blockInformation = await client.getBlock({
206-
blockNumber: logItem.blockNumber,
207-
includeTransactions: true,
208-
});
209-
metrics?.addCountMetric("rpcCallGetBlock", tags);
210-
211-
blockCache.set(logItem.blockNumber, blockInformation);
206+
let block = blockCache.get(logItem.blockNumber);
207+
if (!block) {
208+
block = await pRetry(
209+
() => {
210+
metrics?.addCountMetric("rpcCallGetBlock", tags);
211+
return client.getBlock({
212+
blockNumber: logItem.blockNumber!,
213+
includeTransactions: true,
214+
});
215+
},
216+
{
217+
retries: RETRY_ATTEMPTS,
218+
factor: RETRY_BACKOFF_EXPONENT,
219+
maxTimeout: MAX_RETRY_TIMEOUT,
220+
onFailedAttempt: (error) => {
221+
logger.warn({
222+
at: "genericEventListener#processLogBatch",
223+
message: `Failed to fetch block ${logItem.blockNumber}, retrying...`,
224+
attempt: error.attemptNumber,
225+
retriesLeft: error.retriesLeft,
226+
error,
227+
});
228+
},
229+
},
230+
);
231+
blockCache.set(logItem.blockNumber, block);
212232
}
213-
const { timestamp: blockTimestamp, transactions } = blockInformation;
233+
const { timestamp: blockTimestamp, transactions } = await block;
214234

215235
// --- Find Transaction ---
216236
let transaction: Transaction | undefined;
@@ -221,15 +241,40 @@ async function processLogBatch<TPayload>(
221241
}
222242

223243
// --- Fetch Transaction Receipt (Deduplicated) ---
224-
let transactionReceipt: TransactionReceipt | undefined;
244+
let transactionReceiptPromise: Promise<TransactionReceipt> | undefined;
225245
if (logItem.transactionHash) {
226-
transactionReceipt = receiptCache.get(logItem.transactionHash);
227-
if (!transactionReceipt) {
228-
transactionReceipt = await client.getTransactionReceipt({
229-
hash: logItem.transactionHash,
246+
transactionReceiptPromise = receiptCache.get(logItem.transactionHash);
247+
if (!transactionReceiptPromise) {
248+
transactionReceiptPromise = new PLazy((resolve, reject) => {
249+
pRetry(
250+
() => {
251+
metrics?.addCountMetric("rpcCallGetTransactionReceipt", tags);
252+
return client.getTransactionReceipt({
253+
hash: logItem.transactionHash!,
254+
});
255+
},
256+
{
257+
retries: RETRY_ATTEMPTS,
258+
factor: RETRY_BACKOFF_EXPONENT,
259+
maxTimeout: MAX_RETRY_TIMEOUT,
260+
onFailedAttempt: (error) => {
261+
logger.warn({
262+
at: "genericEventListener#processLogBatch",
263+
message: `Failed to fetch receipt for ${logItem.transactionHash}, retrying...`,
264+
attempt: error.attemptNumber,
265+
retriesLeft: error.retriesLeft,
266+
error,
267+
});
268+
},
269+
},
270+
)
271+
.then(resolve)
272+
.catch(reject);
230273
});
231-
metrics?.addCountMetric("rpcCallGetTransactionReceipt", tags);
232-
receiptCache.set(logItem.transactionHash, transactionReceipt);
274+
receiptCache.set(
275+
logItem.transactionHash,
276+
transactionReceiptPromise,
277+
);
233278
}
234279
}
235280

@@ -241,7 +286,7 @@ async function processLogBatch<TPayload>(
241286
// The events are emitted at head so we can simply take the block number of the log as the currentBlockHeight
242287
currentBlockHeight: logItem.blockNumber,
243288
transaction,
244-
transactionReceipt,
289+
transactionReceipt: transactionReceiptPromise,
245290
} as TPayload;
246291

247292
// Trigger the side effect (Forward it to an event processor or message queue)
@@ -274,6 +319,7 @@ async function processLogBatch<TPayload>(
274319
error,
275320
logIndex: logItem.logIndex,
276321
txHash: logItem.transactionHash,
322+
chainId,
277323
});
278324
metrics?.addCountMetric("processLogError", tags);
279325
}

0 commit comments

Comments
 (0)