Skip to content

Commit dc4e281

Browse files
committed
added tests for chain sync
1 parent f052dda commit dc4e281

File tree

2 files changed

+407
-7
lines changed

2 files changed

+407
-7
lines changed

src/chain-cache/ChainSync.ts

Lines changed: 61 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ export class ChainSync {
1717
private _numOfPairsToBatch: number;
1818
private _msToWaitBetweenSyncs: number;
1919
private _chunkSize: number;
20+
// Track all active timers for cleanup
21+
private _activeTimers: Set<number> = new Set();
22+
private _isStopped: boolean = false;
2023

2124
constructor(
2225
fetcher: Fetcher,
@@ -32,6 +35,35 @@ export class ChainSync {
3235
this._chunkSize = chunkSize;
3336
}
3437

38+
/**
39+
* Stops all running timers and cleans up resources
40+
*/
41+
public stop(): void {
42+
logger.debug('Stopping all ChainSync timers');
43+
this._isStopped = true;
44+
this._activeTimers.forEach((timerId) => clearTimeout(timerId));
45+
this._activeTimers.clear();
46+
}
47+
48+
// Helper method to track timers
49+
private _setTimeout(callback: () => void, ms: number): number {
50+
if (this._isStopped) {
51+
logger.debug('Ignoring timer creation after stop() was called');
52+
return 0;
53+
}
54+
55+
const timerId = Number(
56+
setTimeout(() => {
57+
// Remove the timer from active timers before executing callback
58+
this._activeTimers.delete(timerId);
59+
callback();
60+
}, ms)
61+
);
62+
63+
this._activeTimers.add(timerId);
64+
return timerId;
65+
}
66+
3567
public async startDataSync(): Promise<void> {
3668
logger.debug('startDataSync called');
3769
if (this._syncCalled) {
@@ -121,7 +153,7 @@ export class ChainSync {
121153
// if we have no pairs we need to fetch - unless we're in slow poll mode and less than a minute has passed since last fetch
122154
if (this._slowPollPairs && Date.now() - this._lastFetch < 60000) {
123155
// go back to sleep
124-
setTimeout(processPairs, 1000);
156+
this._setTimeout(processPairs, 1000);
125157
return;
126158
}
127159
await this._updateUncachedPairsFromChain();
@@ -142,14 +174,14 @@ export class ChainSync {
142174
'_populatePairsData handled all pairs and goes to slow poll mode'
143175
);
144176
this._slowPollPairs = true;
145-
setTimeout(processPairs, 1000);
177+
this._setTimeout(processPairs, 1000);
146178
return;
147179
} catch (e) {
148180
logger.error('Error while syncing pairs data', e);
149-
setTimeout(processPairs, 60000);
181+
this._setTimeout(processPairs, 60000);
150182
}
151183
};
152-
setTimeout(processPairs, 1);
184+
this._setTimeout(processPairs, 1);
153185
}
154186

155187
private async _syncPairDataBatch(): Promise<void> {
@@ -162,12 +194,21 @@ export class ChainSync {
162194
) {
163195
batches.push(this._uncachedPairs.slice(i, i + this._numOfPairsToBatch));
164196
}
197+
logger.debug('_syncPairDataBatch batches', batches);
165198

166199
try {
167200
const strategiesBatches = await Promise.all(
168201
batches.map((batch) => this._fetcher.strategiesByPairs(batch))
169202
);
203+
logger.debug('_syncPairDataBatch strategiesBatches', strategiesBatches);
170204
strategiesBatches.flat().forEach((pairStrategies) => {
205+
logger.debug(
206+
'_syncPairDataBatch adding pair',
207+
pairStrategies.pair[0],
208+
pairStrategies.pair[1],
209+
'with strategies',
210+
pairStrategies.strategies
211+
);
171212
this._chainCache.addPair(
172213
pairStrategies.pair[0],
173214
pairStrategies.pair[1],
@@ -196,6 +237,7 @@ export class ChainSync {
196237
private async _syncEvents(): Promise<void> {
197238
logger.debug('_syncEvents called');
198239
const processEvents = async () => {
240+
logger.debug('_syncEvents processEvents - new cycle started');
199241
try {
200242
const currentBlock = await this._fetcher.getBlockNumber();
201243
// if the current block number isn't a number, throw an error and hope that the next iteration of processEvents will get a valid number
@@ -211,13 +253,20 @@ export class ChainSync {
211253

212254
const latestBlock = this._chainCache.getLatestBlockNumber();
213255

256+
logger.debug(
257+
'_syncEvents processEvents - latestBlock (start point for new cycle)',
258+
latestBlock,
259+
'currentBlock',
260+
currentBlock
261+
);
262+
214263
if (currentBlock > latestBlock) {
215264
if (await this._detectReorg(currentBlock)) {
216265
logger.debug('_syncEvents detected reorg - resetting');
217266
this._chainCache.clear();
218267
this._chainCache.applyEvents([], currentBlock);
219268
this._resetPairsFetching();
220-
setTimeout(processEvents, 1);
269+
this._setTimeout(processEvents, 1);
221270
return;
222271
}
223272

@@ -247,6 +296,11 @@ export class ChainSync {
247296
strategy.token1
248297
)
249298
) {
299+
logger.debug(
300+
'_syncEvents noticed new pair created',
301+
strategy.token0,
302+
strategy.token1
303+
);
250304
newlyCreatedPairs.push([strategy.token0, strategy.token1]);
251305
}
252306
}
@@ -275,9 +329,9 @@ export class ChainSync {
275329
logger.error('Error syncing events:', err);
276330
}
277331

278-
setTimeout(processEvents, this._msToWaitBetweenSyncs);
332+
this._setTimeout(processEvents, this._msToWaitBetweenSyncs);
279333
};
280-
setTimeout(processEvents, 1);
334+
this._setTimeout(processEvents, 1);
281335
}
282336

283337
private _resetPairsFetching() {

0 commit comments

Comments
 (0)