diff --git a/src/graphql/operations/options.ts b/src/graphql/operations/options.ts index 15f23516..e3421cdd 100644 --- a/src/graphql/operations/options.ts +++ b/src/graphql/operations/options.ts @@ -1,6 +1,5 @@ -import { capture } from '@snapshot-labs/snapshot-sentry'; -import snapshot from '@snapshot-labs/snapshot.js'; import log from '../../helpers/log'; +import { createLoop } from '../../helpers/loop'; import db from '../../helpers/mysql'; const RUN_INTERVAL = 120e3; @@ -11,23 +10,11 @@ export default async function () { return options; } -async function loadOptions() { - options = await db.queryAsync('SELECT s.* FROM options s'); -} - -async function run() { - while (true) { - try { - log.info('[options] Start options refresh'); - await loadOptions(); - log.info(`[options] ${options.length} options reloaded`); - log.info('[options] End options refresh'); - } catch (e: any) { - capture(e); - log.error(`[options] failed to refresh options, ${JSON.stringify(e)}`); - } - await snapshot.utils.sleep(RUN_INTERVAL); +createLoop({ + name: 'options', + interval: RUN_INTERVAL, + task: async () => { + options = await db.queryAsync('SELECT s.* FROM options s'); + log.info(`[options] ${options.length} options reloaded`); } -} - -run(); +}); diff --git a/src/helpers/loop-metrics.ts b/src/helpers/loop-metrics.ts new file mode 100644 index 00000000..946e66b3 --- /dev/null +++ b/src/helpers/loop-metrics.ts @@ -0,0 +1,37 @@ +import { client } from '@snapshot-labs/snapshot-metrics'; + +/** + * Loop-specific metrics are separated into this file to avoid circular dependencies. + * + * The circular dependency chain was: + * loop.ts → metrics.ts → spaces.ts/strategies.ts → loop.ts + * + * By isolating loop metrics here, loop.ts can import metrics without creating + * a circular dependency, since this file doesn't import any business data. + */ + +// Loop Performance & Health Metrics +export const loopIterationsTotal = new client.Counter({ + name: 'loop_iterations_total', + help: 'Total number of loop iterations', + labelNames: ['loop_name'] +}); + +export const loopDurationSeconds = new client.Histogram({ + name: 'loop_duration_seconds', + help: 'Duration in seconds of loop execution', + labelNames: ['loop_name'], + buckets: [0.1, 0.5, 1, 2, 5, 10, 30, 60] +}); + +export const loopErrorsTotal = new client.Counter({ + name: 'loop_errors_total', + help: 'Total number of errors encountered in the loop', + labelNames: ['loop_name', 'error_type'] +}); + +export const loopActiveTasksCount = new client.Gauge({ + name: 'loop_active_tasks_count', + help: 'Number of currently active tasks in the loop', + labelNames: ['loop_name'] +}); diff --git a/src/helpers/loop.ts b/src/helpers/loop.ts new file mode 100644 index 00000000..bb4f9077 --- /dev/null +++ b/src/helpers/loop.ts @@ -0,0 +1,76 @@ +import { capture } from '@snapshot-labs/snapshot-sentry'; +import snapshot from '@snapshot-labs/snapshot.js'; +import log from './log'; +import { + loopActiveTasksCount, + loopDurationSeconds, + loopErrorsTotal, + loopIterationsTotal +} from './loop-metrics'; +import { + getShutdownStatus, + registerStopFunction, + ShutdownFunction +} from './shutdown'; + +interface LoopOptions { + name: string; + interval: number; + task: () => Promise; + maxConsecutiveFailsBeforeCapture?: number; +} + +export const createLoop = (options: LoopOptions): ShutdownFunction => { + const { + name, + interval, + task, + maxConsecutiveFailsBeforeCapture = 1 + } = options; + let isRunning = false; + let consecutiveFailsCount = 0; + + const run = async () => { + isRunning = true; + + while (!getShutdownStatus() && isRunning) { + loopIterationsTotal.inc({ loop_name: name }); + const endTimer = loopDurationSeconds.startTimer({ + loop_name: name + }); + loopActiveTasksCount.inc({ loop_name: name }); + + try { + log.info(`[${name}] Start ${name} refresh`); + await task(); + consecutiveFailsCount = 0; + log.info(`[${name}] End ${name} refresh`); + } catch (e: any) { + consecutiveFailsCount++; + loopErrorsTotal.inc({ + loop_name: name, + error_type: e.name || 'UnknownError' + }); + + if (consecutiveFailsCount >= maxConsecutiveFailsBeforeCapture) { + capture(e); + } + log.error(`[${name}] failed to refresh, ${JSON.stringify(e)}`); + } finally { + endTimer(); + loopActiveTasksCount.dec({ loop_name: name }); + } + + await snapshot.utils.sleep(interval); + } + }; + + const stop = async () => { + isRunning = false; + }; + + registerStopFunction(stop); + run(); + + return stop; +}; diff --git a/src/helpers/mysql.ts b/src/helpers/mysql.ts index 2dfbb642..ef76a0dc 100644 --- a/src/helpers/mysql.ts +++ b/src/helpers/mysql.ts @@ -4,6 +4,7 @@ import mysql from 'mysql'; import Connection from 'mysql/lib/Connection'; import Pool from 'mysql/lib/Pool'; import log from './log'; +import { registerStopFunction } from './shutdown'; const connectionLimit = parseInt(process.env.CONNECTION_LIMIT || '25'); log.info(`[mysql] connection limit ${connectionLimit}`); @@ -37,15 +38,24 @@ const sequencerDB = mysql.createPool(sequencerConfig); bluebird.promisifyAll([Pool, Connection]); -export const closeDatabase = (): Promise => { +const createCloseFunction = (pool: Pool, name: string) => (): Promise => { return new Promise(resolve => { - hubDB.end(() => { - sequencerDB.end(() => { - log.info('[mysql] Database connection pools closed'); - resolve(); - }); + pool.end((err: Error | null) => { + if (err) { + log.error(`[mysql] Error closing ${name}:`, err); + } else { + log.info(`[mysql] ${name} connection pool closed`); + } + resolve(); }); }); }; +const closeHubDB = createCloseFunction(hubDB, 'hubDB'); +const closeSequencerDB = createCloseFunction(sequencerDB, 'sequencerDB'); + +// Register each database shutdown individually with the shutdown helper +registerStopFunction(closeHubDB); +registerStopFunction(closeSequencerDB); + export { hubDB as default, sequencerDB }; diff --git a/src/helpers/shutdown.ts b/src/helpers/shutdown.ts new file mode 100644 index 00000000..c29d41bc --- /dev/null +++ b/src/helpers/shutdown.ts @@ -0,0 +1,26 @@ +import log from './log'; + +export type ShutdownFunction = () => Promise; + +let isShuttingDown = false; +const stopFunctions: ShutdownFunction[] = []; + +export const getShutdownStatus = () => isShuttingDown; + +export const registerStopFunction = (fn: ShutdownFunction) => { + stopFunctions.push(fn); +}; + +export const initiateShutdown = async () => { + isShuttingDown = true; + + // Execute all stop functions with individual error handling + const results = await Promise.allSettled(stopFunctions.map(fn => fn())); + + // Log any failures but don't throw + results.forEach((result, index) => { + if (result.status === 'rejected') { + log.error(`Stop function ${index} failed:`, result.reason); + } + }); +}; diff --git a/src/helpers/spaces.ts b/src/helpers/spaces.ts index 7fb5e59c..300a54c5 100644 --- a/src/helpers/spaces.ts +++ b/src/helpers/spaces.ts @@ -1,8 +1,7 @@ -import { capture } from '@snapshot-labs/snapshot-sentry'; -import snapshot from '@snapshot-labs/snapshot.js'; import networks from '@snapshot-labs/snapshot.js/src/networks.json'; import { uniq } from 'lodash'; import log from './log'; +import { createLoop } from './loop'; import db from './mysql'; const RUN_INTERVAL = 120e3; @@ -329,20 +328,12 @@ export async function getSpace(id: string) { }; } -export default async function run() { - while (true) { - try { - log.info('[spaces] Start spaces refresh'); - - await loadSpaces(); - await loadSpacesMetrics(); - - sortSpaces(); - log.info('[spaces] End spaces refresh'); - } catch (e: any) { - capture(e); - log.error(`[spaces] failed to load spaces, ${JSON.stringify(e)}`); - } - await snapshot.utils.sleep(RUN_INTERVAL); +export default createLoop({ + name: 'spaces', + interval: RUN_INTERVAL, + task: async () => { + await loadSpaces(); + await loadSpacesMetrics(); + sortSpaces(); } -} +}); diff --git a/src/helpers/strategies.ts b/src/helpers/strategies.ts index 3058fc09..dcf16eb1 100644 --- a/src/helpers/strategies.ts +++ b/src/helpers/strategies.ts @@ -1,7 +1,7 @@ import { URL } from 'url'; import { capture } from '@snapshot-labs/snapshot-sentry'; import snapshot from '@snapshot-labs/snapshot.js'; -import log from './log'; +import { createLoop } from './loop'; import { spacesMetadata } from './spaces'; export let strategies: any[] = []; @@ -14,8 +14,6 @@ const scoreApiURL: URL = new URL( scoreApiURL.pathname = '/api/strategies'; const uri = scoreApiURL.toString(); -let consecutiveFailsCount = 0; - async function loadStrategies() { const res = await snapshot.utils.getJSON(uri); @@ -57,23 +55,11 @@ async function loadStrategies() { ); } -async function run() { - while (true) { - try { - log.info('[strategies] Start strategies refresh'); - await loadStrategies(); - consecutiveFailsCount = 0; - log.info('[strategies] End strategies refresh'); - } catch (e: any) { - consecutiveFailsCount++; - - if (consecutiveFailsCount >= 3) { - capture(e); - } - log.error(`[strategies] failed to load ${JSON.stringify(e)}`); - } - await snapshot.utils.sleep(RUN_INTERVAL); - } -} - -run(); +createLoop({ + name: 'strategies', + interval: RUN_INTERVAL, + task: async () => { + await loadStrategies(); + }, + maxConsecutiveFailsBeforeCapture: 3 +}); diff --git a/src/index.ts b/src/index.ts index 72aa15da..2a0eebce 100644 --- a/src/index.ts +++ b/src/index.ts @@ -8,8 +8,8 @@ import graphql from './graphql'; import { checkKeycard } from './helpers/keycard'; import log from './helpers/log'; import initMetrics from './helpers/metrics'; -import { closeDatabase } from './helpers/mysql'; import rateLimit from './helpers/rateLimit'; +import { initiateShutdown } from './helpers/shutdown'; import refreshSpacesCache from './helpers/spaces'; import './helpers/strategies'; @@ -40,18 +40,22 @@ const server = app.listen(PORT, () => const gracefulShutdown = async (signal: string) => { log.info(`Received ${signal}. Starting graceful shutdown...`); - server.close(async () => { - log.info('Express server closed.'); + // Stop all background processes (loops + database) + await initiateShutdown(); + log.info('Background processes stopped.'); - try { - await closeDatabase(); - log.info('Graceful shutdown completed.'); - process.exit(0); - } catch (error) { - log.error('Error during shutdown:', error); - process.exit(1); - } + // Close Express server + server.close(() => { + log.info('Express server closed.'); + log.info('Graceful shutdown completed.'); + process.exit(0); }); + + // Fallback timeout for the entire shutdown process + setTimeout(() => { + log.error('Graceful shutdown timeout exceeded, forcing exit'); + process.exit(1); + }, 15000); // 15 seconds total shutdown timeout }; process.on('SIGINT', () => gracefulShutdown('SIGINT'));