Skip to content

Commit 37c63ae

Browse files
committed
feat: add optimizations and fix caching
1 parent f0c25ec commit 37c63ae

File tree

15 files changed

+252
-229
lines changed

15 files changed

+252
-229
lines changed

packages/data-flow/src/orchestrator.ts

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ import { CoreDependencies, DataLoader, delay, IQueue, iStrategyAbi, Queue } from
3939

4040
type TokenWithTimestamps = {
4141
token: { priceSourceCode: Token["priceSourceCode"] };
42-
timestamps: number[];
42+
timestamps: TimestampMs[];
4343
};
4444

4545
/**
@@ -126,13 +126,23 @@ export class Orchestrator {
126126
}
127127

128128
async run(signal: AbortSignal): Promise<void> {
129+
let totalEvents = 0;
130+
let processedEvents = 0;
131+
129132
while (!signal.aborted) {
130133
let event: ProcessorEvent<ContractName, AnyEvent> | undefined;
131134
try {
132135
if (this.eventsQueue.isEmpty()) {
133136
const events = await this.getNextEventsBatch();
134-
await this.bulkFetchMetadataAndPricesForBatch(events);
137+
if (
138+
events[0] &&
139+
Math.abs(new Date().getTime() - events[0].blockTimestamp!) >
140+
1000 * 60 * 60 * 0.5 // 30 minutes
141+
) {
142+
await this.bulkFetchMetadataAndPricesForBatch(events);
143+
}
135144
await this.enqueueEvents(events);
145+
totalEvents += events.length;
136146
}
137147

138148
event = this.eventsQueue.pop();
@@ -152,13 +162,19 @@ export class Orchestrator {
152162
...event,
153163
rawEvent: event,
154164
});
155-
165+
console.time(`Processing time for event ${event.eventName}`);
156166
await this.retryHandler.execute(
157167
async () => {
158168
await this.handleEvent(event!);
159169
},
160170
{ abortSignal: signal },
161171
);
172+
console.timeEnd(`Processing time for event ${event.eventName}`);
173+
processedEvents++;
174+
this.logger.info(`Processed events: ${processedEvents}/${totalEvents}`, {
175+
className: Orchestrator.name,
176+
chainId: this.chainId,
177+
});
162178
} catch (error: unknown) {
163179
// TODO: notify
164180
if (
@@ -249,6 +265,7 @@ export class Orchestrator {
249265
*/
250266
private async getNextEventsBatch(): Promise<AnyIndexerFetchedEvent[]> {
251267
const lastProcessedEvent = await this.eventsRegistry.getLastProcessedEvent(this.chainId);
268+
252269
const blockNumber = lastProcessedEvent?.blockNumber ?? 0;
253270
const logIndex = lastProcessedEvent?.logIndex ?? 0;
254271

packages/data-flow/test/unit/eventsFetcher.spec.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ describe("EventsFetcher", () => {
1818
indexerClientMock = {
1919
getEventsAfterBlockNumberAndLogIndex: vi.fn(),
2020
getEvents: vi.fn(),
21-
getBlockRangeByChainId: vi.fn(),
21+
getBlockRangeTimestampByChainId: vi.fn(),
2222
};
2323

2424
eventsFetcher = new EventsFetcher(indexerClientMock);

packages/indexer-client/src/interfaces/indexerClient.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,9 @@ export interface IIndexerClient {
3434
getEvents(params: GetEventsFilters): Promise<AnyIndexerFetchedEvent[]>;
3535

3636
/**
37-
* Get the block range by chain id from the indexer service
37+
* Get the block range timestamp by chain id from the indexer service
3838
* @param chainId Id of the chain
3939
* @returns Block range from the indexer service
4040
*/
41-
getBlockRangeByChainId(chainId: ChainId): Promise<{ from: number; to: number }>;
41+
getBlockRangeTimestampByChainId(chainId: ChainId): Promise<{ from: number; to: number }>;
4242
}

packages/indexer-client/src/providers/envioIndexerClient.ts

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -267,34 +267,34 @@ export class EnvioIndexerClient implements IIndexerClient {
267267
methodName,
268268
});
269269
}
270-
async getBlockRangeByChainId(chainId: ChainId): Promise<{ from: number; to: number }> {
270+
async getBlockRangeTimestampByChainId(chainId: ChainId): Promise<{ from: number; to: number }> {
271271
const response = (await this.client.request(
272272
gql`
273273
query getBlockRangeByChainId($chainId: Int!) {
274274
from: raw_events(
275275
where: { chain_id: { _eq: $chainId } }
276-
order_by: { block_number: asc }
276+
order_by: { block_timestamp: asc }
277277
limit: 1
278278
) {
279-
block_number
279+
block_timestamp
280280
}
281281
to: raw_events(
282282
where: { chain_id: { _eq: $chainId } }
283-
order_by: { block_number: desc }
283+
order_by: { block_timestamp: desc }
284284
limit: 1
285285
) {
286-
block_number
286+
block_timestamp
287287
}
288288
}
289289
`,
290290
{ chainId },
291-
)) as { from: { block_number: number }[]; to: { block_number: number }[] };
291+
)) as { from: { block_timestamp: number }[]; to: { block_timestamp: number }[] };
292292
if (!response.from[0] || !response.to[0]) {
293293
throw new Error("No block range found");
294294
}
295295
return {
296-
from: response.from[0].block_number,
297-
to: response.to[0].block_number,
296+
from: response.from[0].block_timestamp,
297+
to: response.to[0].block_timestamp,
298298
};
299299
}
300300
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
// This is the minimum granularity we can get data with on Enterprise plans of Coingecko
22
// Refer to https://support.coingecko.com/hc/en-us/articles/4538747001881-What-granularity-do-you-support-for-historical-data
3-
export const MIN_GRANULARITY_MS = 300_000; // 5 minutes
3+
export const MIN_GRANULARITY_MS = 3600_000; // 1 hour
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
export class NoClosePriceFound extends Error {
2+
constructor() {
3+
super(`No close price found`);
4+
}
5+
}

0 commit comments

Comments
 (0)