Skip to content

Commit b83ab46

Browse files
committed
add redis progress tracking to prevent duplicate adapter scans
1 parent dc9bff8 commit b83ab46

File tree

2 files changed

+89
-21
lines changed

2 files changed

+89
-21
lines changed

src/utils/adapter.ts

Lines changed: 56 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,38 @@ import { getProvider } from "./provider";
1818
import { sendDiscordText } from "./discord";
1919
import { getConnection } from "../helpers/solana";
2020
import { chainMappings } from "../helpers/tokenMappings";
21+
import { getCache, setCache } from "./cache";
2122
const axios = require("axios");
2223
const retry = require("async-retry");
2324

2425
const SECONDS_IN_DAY = 86400;
2526

27+
interface AdapterProgress {
28+
lastSuccessfulBlock: number;
29+
lastUpdated: number;
30+
}
31+
32+
const getProgressKey = (bridgeDbName: string, chain: string) =>
33+
`adapter_progress:${bridgeDbName.toLowerCase()}:${chain.toLowerCase()}`;
34+
35+
const getAdapterProgress = async (bridgeDbName: string, chain: string): Promise<AdapterProgress | null> => {
36+
return await getCache(getProgressKey(bridgeDbName, chain));
37+
};
38+
39+
const PROGRESS_TTL = 60 * 60 * 24 * 7; // 7 days
40+
41+
const setAdapterProgress = async (bridgeDbName: string, chain: string, lastBlock: number): Promise<void> => {
42+
const key = getProgressKey(bridgeDbName, chain);
43+
const current = await getAdapterProgress(bridgeDbName, chain);
44+
if (current?.lastSuccessfulBlock && current.lastSuccessfulBlock >= lastBlock) {
45+
return;
46+
}
47+
await setCache(key, {
48+
lastSuccessfulBlock: lastBlock,
49+
lastUpdated: Math.floor(Date.now() / 1000)
50+
}, PROGRESS_TTL);
51+
};
52+
2653
// FIX timeout problems throughout functions here
2754

2855
const getBlocksForRunningAdapter = async (
@@ -73,15 +100,14 @@ const getBlocksForRunningAdapter = async (
73100
lastRecordedEndBlock + 1
74101
}.`
75102
);
76-
} else {
77-
// try {
78-
// const lastTs = await getTimestamp(lastRecordedEndBlock, chain);
79-
// const sixHoursBlock = await getBlock(chain, Number((currentTimestamp - SECONDS_IN_DAY / 4).toFixed()));
80-
// lastRecordedEndBlock = currentTimestamp - lastTs > SECONDS_IN_DAY ? sixHoursBlock : lastRecordedEndBlock;
81-
// } catch (e: any) {
82-
// console.error("Get start block error");
83-
// }
84103
}
104+
105+
const cachedProgress = await getAdapterProgress(bridgeDbName, chain);
106+
if (cachedProgress?.lastSuccessfulBlock && cachedProgress.lastSuccessfulBlock > lastRecordedEndBlock) {
107+
console.log(`[PROGRESS] Using Redis progress for ${bridgeDbName}:${chain}, advancing from block ${lastRecordedEndBlock} to ${cachedProgress.lastSuccessfulBlock}`);
108+
lastRecordedEndBlock = cachedProgress.lastSuccessfulBlock;
109+
}
110+
85111
startBlock = lastRecordedEndBlock + 1;
86112
useRecordedBlocks = true;
87113
} else {
@@ -328,7 +354,7 @@ export const runAllAdaptersTimestampRange = async (
328354
startBlock = (await lookupBlock(startTimestamp, { chain: chainContractsAreOn as Chain })).block;
329355
endBlock = (await lookupBlock(endTimestamp, { chain: chainContractsAreOn as Chain })).block;
330356
}
331-
await runAdapterHistorical(startBlock, endBlock, id, chain as Chain, allowNullTxValues, true, onConflict);
357+
await runAdapterHistorical(startBlock, endBlock, id, chain as Chain, allowNullTxValues, true, onConflict, false);
332358
} catch (e: any) {
333359
const errString = `Adapter txs for ${bridgeDbName} on chain ${chain} failed, skipped. ${JSON.stringify(e)}`;
334360
await insertErrorRow({
@@ -356,7 +382,8 @@ export const runAdapterHistorical = async (
356382
chain: string,
357383
allowNullTxValues: boolean = false,
358384
throwOnFailedInsert: boolean = true,
359-
onConflict: "ignore" | "error" | "upsert" = "error"
385+
onConflict: "ignore" | "error" | "upsert" = "error",
386+
updateProgress: boolean = true
360387
) => {
361388
const currentTimestamp = await getCurrentUnixTimestamp();
362389
const bridgeNetwork = bridgeNetworks.filter((bridgeNetwork) => bridgeNetwork.id === bridgeNetworkId)[0];
@@ -366,6 +393,19 @@ export const runAdapterHistorical = async (
366393
return;
367394
}
368395

396+
const cachedProgress = await getAdapterProgress(bridgeDbName, chain);
397+
if (cachedProgress?.lastSuccessfulBlock) {
398+
if (cachedProgress.lastSuccessfulBlock >= endBlock) {
399+
console.log(`[SKIP] ${bridgeDbName}:${chain} blocks ${startBlock}-${endBlock} already processed (last: ${cachedProgress.lastSuccessfulBlock})`);
400+
return;
401+
}
402+
if (cachedProgress.lastSuccessfulBlock >= startBlock) {
403+
const newStart = cachedProgress.lastSuccessfulBlock + 1;
404+
console.log(`[PROGRESS] ${bridgeDbName}:${chain} adjusting start from ${startBlock} to ${newStart} (last: ${cachedProgress.lastSuccessfulBlock})`);
405+
startBlock = newStart;
406+
}
407+
}
408+
369409
let adapter = adapters[bridgeDbName];
370410
adapter = isAsyncAdapter(adapter) ? await adapter.build() : adapter;
371411

@@ -449,6 +489,9 @@ export const runAdapterHistorical = async (
449489
);
450490

451491
if (!eventLogs || eventLogs.length === 0) {
492+
if (updateProgress) {
493+
await setAdapterProgress(bridgeDbName, chain, endBlockForQuery);
494+
}
452495
break;
453496
}
454497

@@ -631,6 +674,9 @@ export const runAdapterHistorical = async (
631674
{ retries: 3, factor: 2 }
632675
);
633676

677+
if (updateProgress) {
678+
await setAdapterProgress(bridgeDbName, chain, endBlockForQuery);
679+
}
634680
break;
635681
} catch (e: any) {
636682
retryCount++;

src/utils/cache.ts

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -70,23 +70,45 @@ export const registerCacheHandler = (cacheKey: string, handler: Function) => {
7070
export const getCacheKey = (...parts: (string | undefined)[]) => parts.filter(Boolean).join(":");
7171

7272
export const getCache = async (key: string): Promise<any> => {
73-
const value = await redis.get(key);
74-
if (value) {
75-
console.log("Cache HIT", key);
76-
} else {
77-
console.log("Cache MISS", key);
73+
if (!redis) {
74+
return null;
75+
}
76+
try {
77+
const value = await redis.get(key);
78+
if (value) {
79+
console.log("Cache HIT", key);
80+
} else {
81+
console.log("Cache MISS", key);
82+
}
83+
return value ? JSON.parse(value) : null;
84+
} catch (e) {
85+
console.error(`[CACHE] getCache error for ${key}:`, e);
86+
return null;
7887
}
79-
return value ? JSON.parse(value) : null;
8088
};
8189

8290
export const setCache = async (key: string, value: any, ttl: number | null = DEFAULT_TTL): Promise<void> => {
83-
if (ttl === null) {
84-
await redis.set(key, JSON.stringify(value));
85-
} else {
86-
await redis.set(key, JSON.stringify(value), "EX", ttl);
91+
if (!redis) {
92+
return;
93+
}
94+
try {
95+
if (ttl === null) {
96+
await redis.set(key, JSON.stringify(value));
97+
} else {
98+
await redis.set(key, JSON.stringify(value), "EX", ttl);
99+
}
100+
} catch (e) {
101+
console.error(`[CACHE] setCache error for ${key}:`, e);
87102
}
88103
};
89104

90105
export const deleteCache = async (key: string): Promise<void> => {
91-
await redis.del(key);
106+
if (!redis) {
107+
return;
108+
}
109+
try {
110+
await redis.del(key);
111+
} catch (e) {
112+
console.error(`[CACHE] deleteCache error for ${key}:`, e);
113+
}
92114
};

0 commit comments

Comments
 (0)