Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 8 additions & 21 deletions src/graphql/operations/options.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { capture } from '@snapshot-labs/snapshot-sentry';
import snapshot from '@snapshot-labs/snapshot.js';
import log from '../../helpers/log';
import { createLoop } from '../../helpers/loop';
import db from '../../helpers/mysql';

const RUN_INTERVAL = 120e3;
Expand All @@ -11,23 +10,11 @@ export default async function () {
return options;
}

async function loadOptions() {
options = await db.queryAsync('SELECT s.* FROM options s');
}

async function run() {
while (true) {
try {
log.info('[options] Start options refresh');
await loadOptions();
log.info(`[options] ${options.length} options reloaded`);
log.info('[options] End options refresh');
} catch (e: any) {
capture(e);
log.error(`[options] failed to refresh options, ${JSON.stringify(e)}`);
}
await snapshot.utils.sleep(RUN_INTERVAL);
createLoop({
name: 'options',
interval: RUN_INTERVAL,
task: async () => {
options = await db.queryAsync('SELECT s.* FROM options s');
log.info(`[options] ${options.length} options reloaded`);
}
}

run();
});
37 changes: 37 additions & 0 deletions src/helpers/loop-metrics.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import { client } from '@snapshot-labs/snapshot-metrics';

/**
* Loop-specific metrics are separated into this file to avoid circular dependencies.
*
* The circular dependency chain was:
* loop.ts → metrics.ts → spaces.ts/strategies.ts → loop.ts
*
* By isolating loop metrics here, loop.ts can import metrics without creating
* a circular dependency, since this file doesn't import any business data.
*/

// Loop Performance & Health Metrics
export const loopIterationsTotal = new client.Counter({
name: 'loop_iterations_total',
help: 'Total number of loop iterations',
labelNames: ['loop_name']
});

export const loopDurationSeconds = new client.Histogram({
name: 'loop_duration_seconds',
help: 'Duration in seconds of loop execution',
labelNames: ['loop_name'],
buckets: [0.1, 0.5, 1, 2, 5, 10, 30, 60]
});

export const loopErrorsTotal = new client.Counter({
name: 'loop_errors_total',
help: 'Total number of errors encountered in the loop',
labelNames: ['loop_name', 'error_type']
});

export const loopActiveTasksCount = new client.Gauge({
name: 'loop_active_tasks_count',
help: 'Number of currently active tasks in the loop',
labelNames: ['loop_name']
});
76 changes: 76 additions & 0 deletions src/helpers/loop.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import { capture } from '@snapshot-labs/snapshot-sentry';
import snapshot from '@snapshot-labs/snapshot.js';
import log from './log';
import {
loopActiveTasksCount,
loopDurationSeconds,
loopErrorsTotal,
loopIterationsTotal
} from './loop-metrics';
import {
getShutdownStatus,
registerStopFunction,
ShutdownFunction
} from './shutdown';

interface LoopOptions {
name: string;
interval: number;
task: () => Promise<void>;
maxConsecutiveFailsBeforeCapture?: number;
}

export const createLoop = (options: LoopOptions): ShutdownFunction => {
const {
name,
interval,
task,
maxConsecutiveFailsBeforeCapture = 1
} = options;
let isRunning = false;
let consecutiveFailsCount = 0;

const run = async () => {
isRunning = true;

while (!getShutdownStatus() && isRunning) {
loopIterationsTotal.inc({ loop_name: name });
const endTimer = loopDurationSeconds.startTimer({
loop_name: name
});
loopActiveTasksCount.inc({ loop_name: name });

try {
log.info(`[${name}] Start ${name} refresh`);
await task();
consecutiveFailsCount = 0;
log.info(`[${name}] End ${name} refresh`);
} catch (e: any) {
consecutiveFailsCount++;
loopErrorsTotal.inc({
loop_name: name,
error_type: e.name || 'UnknownError'
});

if (consecutiveFailsCount >= maxConsecutiveFailsBeforeCapture) {
capture(e);
}
log.error(`[${name}] failed to refresh, ${JSON.stringify(e)}`);
} finally {
endTimer();
loopActiveTasksCount.dec({ loop_name: name });
}

await snapshot.utils.sleep(interval);
}
};

const stop = async () => {
isRunning = false;
};

registerStopFunction(stop);
run();

return stop;
};
22 changes: 16 additions & 6 deletions src/helpers/mysql.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import mysql from 'mysql';
import Connection from 'mysql/lib/Connection';
import Pool from 'mysql/lib/Pool';
import log from './log';
import { registerStopFunction } from './shutdown';

const connectionLimit = parseInt(process.env.CONNECTION_LIMIT || '25');
log.info(`[mysql] connection limit ${connectionLimit}`);
Expand Down Expand Up @@ -37,15 +38,24 @@ const sequencerDB = mysql.createPool(sequencerConfig);

bluebird.promisifyAll([Pool, Connection]);

export const closeDatabase = (): Promise<void> => {
const createCloseFunction = (pool: Pool, name: string) => (): Promise<void> => {
return new Promise(resolve => {
hubDB.end(() => {
sequencerDB.end(() => {
log.info('[mysql] Database connection pools closed');
resolve();
});
pool.end((err: Error | null) => {
if (err) {
log.error(`[mysql] Error closing ${name}:`, err);
} else {
log.info(`[mysql] ${name} connection pool closed`);
}
resolve();
});
});
};

const closeHubDB = createCloseFunction(hubDB, 'hubDB');
const closeSequencerDB = createCloseFunction(sequencerDB, 'sequencerDB');

// Register each database shutdown individually with the shutdown helper
registerStopFunction(closeHubDB);
registerStopFunction(closeSequencerDB);

export { hubDB as default, sequencerDB };
26 changes: 26 additions & 0 deletions src/helpers/shutdown.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import log from './log';

export type ShutdownFunction = () => Promise<void>;

let isShuttingDown = false;
const stopFunctions: ShutdownFunction[] = [];

export const getShutdownStatus = () => isShuttingDown;

export const registerStopFunction = (fn: ShutdownFunction) => {
stopFunctions.push(fn);
};

export const initiateShutdown = async () => {
isShuttingDown = true;

// Execute all stop functions with individual error handling
const results = await Promise.allSettled(stopFunctions.map(fn => fn()));

// Log any failures but don't throw
results.forEach((result, index) => {
if (result.status === 'rejected') {
log.error(`Stop function ${index} failed:`, result.reason);
}
});
};
27 changes: 9 additions & 18 deletions src/helpers/spaces.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import { capture } from '@snapshot-labs/snapshot-sentry';
import snapshot from '@snapshot-labs/snapshot.js';
import networks from '@snapshot-labs/snapshot.js/src/networks.json';
import { uniq } from 'lodash';
import log from './log';
import { createLoop } from './loop';
import db from './mysql';

const RUN_INTERVAL = 120e3;
Expand Down Expand Up @@ -329,20 +328,12 @@ export async function getSpace(id: string) {
};
}

export default async function run() {
while (true) {
try {
log.info('[spaces] Start spaces refresh');

await loadSpaces();
await loadSpacesMetrics();

sortSpaces();
log.info('[spaces] End spaces refresh');
} catch (e: any) {
capture(e);
log.error(`[spaces] failed to load spaces, ${JSON.stringify(e)}`);
}
await snapshot.utils.sleep(RUN_INTERVAL);
export default createLoop({
name: 'spaces',
interval: RUN_INTERVAL,
task: async () => {
await loadSpaces();
await loadSpacesMetrics();
sortSpaces();
}
}
});
32 changes: 9 additions & 23 deletions src/helpers/strategies.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { URL } from 'url';
import { capture } from '@snapshot-labs/snapshot-sentry';
import snapshot from '@snapshot-labs/snapshot.js';
import log from './log';
import { createLoop } from './loop';
import { spacesMetadata } from './spaces';

export let strategies: any[] = [];
Expand All @@ -14,8 +14,6 @@ const scoreApiURL: URL = new URL(
scoreApiURL.pathname = '/api/strategies';
const uri = scoreApiURL.toString();

let consecutiveFailsCount = 0;

async function loadStrategies() {
const res = await snapshot.utils.getJSON(uri);

Expand Down Expand Up @@ -57,23 +55,11 @@ async function loadStrategies() {
);
}

async function run() {
while (true) {
try {
log.info('[strategies] Start strategies refresh');
await loadStrategies();
consecutiveFailsCount = 0;
log.info('[strategies] End strategies refresh');
} catch (e: any) {
consecutiveFailsCount++;

if (consecutiveFailsCount >= 3) {
capture(e);
}
log.error(`[strategies] failed to load ${JSON.stringify(e)}`);
}
await snapshot.utils.sleep(RUN_INTERVAL);
}
}

run();
createLoop({
name: 'strategies',
interval: RUN_INTERVAL,
task: async () => {
await loadStrategies();
},
maxConsecutiveFailsBeforeCapture: 3
});
26 changes: 15 additions & 11 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import graphql from './graphql';
import { checkKeycard } from './helpers/keycard';
import log from './helpers/log';
import initMetrics from './helpers/metrics';
import { closeDatabase } from './helpers/mysql';
import rateLimit from './helpers/rateLimit';
import { initiateShutdown } from './helpers/shutdown';
import refreshSpacesCache from './helpers/spaces';
import './helpers/strategies';

Expand Down Expand Up @@ -40,18 +40,22 @@ const server = app.listen(PORT, () =>
const gracefulShutdown = async (signal: string) => {
log.info(`Received ${signal}. Starting graceful shutdown...`);

server.close(async () => {
log.info('Express server closed.');
// Stop all background processes (loops + database)
await initiateShutdown();
log.info('Background processes stopped.');

try {
await closeDatabase();
log.info('Graceful shutdown completed.');
process.exit(0);
} catch (error) {
log.error('Error during shutdown:', error);
process.exit(1);
}
// Close Express server
server.close(() => {
log.info('Express server closed.');
log.info('Graceful shutdown completed.');
process.exit(0);
});

// Fallback timeout for the entire shutdown process
setTimeout(() => {
log.error('Graceful shutdown timeout exceeded, forcing exit');
process.exit(1);
}, 15000); // 15 seconds total shutdown timeout
};

process.on('SIGINT', () => gracefulShutdown('SIGINT'));
Expand Down