diff --git a/packages/daemon/__tests__/actors/MonitoringActor.test.ts b/packages/daemon/__tests__/actors/MonitoringActor.test.ts new file mode 100644 index 00000000..07ce898d --- /dev/null +++ b/packages/daemon/__tests__/actors/MonitoringActor.test.ts @@ -0,0 +1,333 @@ +/** + * Copyright (c) Hathor Labs and its affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +import MonitoringActor from '../../src/actors/MonitoringActor'; +import logger from '../../src/logger'; +import { EventTypes } from '../../src/types/event'; +import getConfig from '../../src/config'; +import { addAlert, Severity } from '@wallet-service/common'; + +const MONITORING_IDLE_TIMEOUT_EVENT = { type: EventTypes.MONITORING_IDLE_TIMEOUT }; + +jest.useFakeTimers(); +jest.spyOn(global, 'setInterval'); +jest.spyOn(global, 'clearInterval'); +jest.spyOn(global, 'setTimeout'); +jest.spyOn(global, 'clearTimeout'); + +jest.mock('@wallet-service/common', () => ({ + ...jest.requireActual('@wallet-service/common'), + addAlert: jest.fn().mockResolvedValue(undefined), +})); + +const mockAddAlert = addAlert as jest.Mock; + +describe('MonitoringActor', () => { + let mockCallback: jest.Mock; + let mockReceive: jest.Mock; + let receiveCallback: (event: any) => void; + let config: ReturnType; + let processExitSpy: jest.SpyInstance; + + const sendEvent = (monitoringEventType: string) => { + receiveCallback({ + type: EventTypes.MONITORING_EVENT, + event: { type: monitoringEventType }, + }); + }; + + beforeEach(() => { + jest.clearAllMocks(); + jest.clearAllTimers(); + processExitSpy = jest.spyOn(process, 'exit').mockImplementation(() => undefined as never); + config = getConfig(); + config['IDLE_EVENT_TIMEOUT_MS'] = 5 * 60 * 1000; // 5 min + config['STUCK_PROCESSING_TIMEOUT_MS'] = 5 * 60 * 1000; // 5 min + config['RECONNECTION_STORM_THRESHOLD'] = 3; // low threshold for tests + config['RECONNECTION_STORM_WINDOW_MS'] = 5 * 60 * 1000; // 5 min + + mockCallback = jest.fn(); + mockReceive = jest.fn().mockImplementation((cb: any) => { + receiveCallback = cb; + }); + }); + + afterEach(() => { + processExitSpy.mockRestore(); + }); + + afterAll(() => { + jest.clearAllMocks(); + jest.useRealTimers(); + }); + + // ── Idle detection ─────────────────────────────────────────────────────────── + + it('should not start the idle timer on initialization', () => { + MonitoringActor(mockCallback, mockReceive, config); + expect(setInterval).not.toHaveBeenCalled(); + }); + + it('should start the idle timer when receiving a CONNECTED event', () => { + MonitoringActor(mockCallback, mockReceive, config); + sendEvent('CONNECTED'); + expect(setInterval).toHaveBeenCalledTimes(1); + }); + + it('should stop the idle timer when receiving a DISCONNECTED event', () => { + MonitoringActor(mockCallback, mockReceive, config); + sendEvent('CONNECTED'); + sendEvent('DISCONNECTED'); + expect(clearInterval).toHaveBeenCalledTimes(1); + }); + + it('should stop the idle timer when the actor is stopped', () => { + const stopActor = MonitoringActor(mockCallback, mockReceive, config); + sendEvent('CONNECTED'); + stopActor(); + expect(clearInterval).toHaveBeenCalledTimes(1); + }); + + it('should fire an idle alert and send MONITORING_IDLE_TIMEOUT after IDLE_EVENT_TIMEOUT_MS with no events', async () => { + MonitoringActor(mockCallback, mockReceive, config); + sendEvent('CONNECTED'); + + jest.advanceTimersByTime(config['IDLE_EVENT_TIMEOUT_MS'] + 1); + await Promise.resolve(); + await Promise.resolve(); // flush the .finally() microtask + + expect(mockAddAlert).toHaveBeenCalledTimes(1); + expect(mockAddAlert.mock.calls[0][0]).toBe('Daemon Idle — No Events Received'); + expect(mockAddAlert.mock.calls[0][2]).toBe(Severity.MAJOR); + expect(mockCallback).toHaveBeenCalledWith(MONITORING_IDLE_TIMEOUT_EVENT); + expect(processExitSpy).not.toHaveBeenCalled(); + }); + + it('should NOT fire an idle alert when events keep arriving', async () => { + MonitoringActor(mockCallback, mockReceive, config); + sendEvent('CONNECTED'); + + // Stay below the threshold each time + jest.advanceTimersByTime(config['IDLE_EVENT_TIMEOUT_MS'] - 1000); + sendEvent('EVENT_RECEIVED'); + jest.advanceTimersByTime(config['IDLE_EVENT_TIMEOUT_MS'] - 1000); + + await Promise.resolve(); + expect(mockAddAlert).not.toHaveBeenCalled(); + }); + + it('should fire only one idle alert and send MONITORING_IDLE_TIMEOUT once per idle period', async () => { + MonitoringActor(mockCallback, mockReceive, config); + sendEvent('CONNECTED'); + + jest.advanceTimersByTime(config['IDLE_EVENT_TIMEOUT_MS'] * 3); + await Promise.resolve(); + await Promise.resolve(); + + expect(mockAddAlert).toHaveBeenCalledTimes(1); + expect(mockCallback).toHaveBeenCalledTimes(1); + expect(mockCallback).toHaveBeenCalledWith(MONITORING_IDLE_TIMEOUT_EVENT); + expect(processExitSpy).not.toHaveBeenCalled(); + }); + + it('should reset the idle alert flag when an event is received, allowing a second MONITORING_IDLE_TIMEOUT', async () => { + MonitoringActor(mockCallback, mockReceive, config); + sendEvent('CONNECTED'); + + // Trigger first alert (interval = T/2, fires at T/2 then T — alert at T) + jest.advanceTimersByTime(config['IDLE_EVENT_TIMEOUT_MS'] + 1); + await Promise.resolve(); + await Promise.resolve(); + expect(mockAddAlert).toHaveBeenCalledTimes(1); + expect(mockCallback).toHaveBeenCalledTimes(1); + + // Receive an event — resets idleAlertFired and lastEventReceivedAt (~T from start) + sendEvent('EVENT_RECEIVED'); + + // With interval=T/2, interval fires at 3T/2, 2T, 5T/2, … from start. + // The first fire where idleMs >= T after EVENT_RECEIVED is at 5T/2 (idleMs = 3T/2 - ε). + // Advancing 2T from here (total ~3T from start) covers 5T/2, so the second alert fires. + jest.advanceTimersByTime(2 * config['IDLE_EVENT_TIMEOUT_MS']); + await Promise.resolve(); + await Promise.resolve(); + + expect(mockAddAlert).toHaveBeenCalledTimes(2); + expect(mockCallback).toHaveBeenCalledTimes(2); + expect(processExitSpy).not.toHaveBeenCalled(); + }); + + it('should restart the idle timer when CONNECTED is sent while already running', () => { + MonitoringActor(mockCallback, mockReceive, config); + sendEvent('CONNECTED'); + sendEvent('CONNECTED'); // second connect clears old and starts new + expect(clearInterval).toHaveBeenCalledTimes(1); + expect(setInterval).toHaveBeenCalledTimes(2); + }); + + // ── Stuck-processing detection ─────────────────────────────────────────────── + + it('should start a stuck timer on PROCESSING_STARTED', () => { + MonitoringActor(mockCallback, mockReceive, config); + sendEvent('PROCESSING_STARTED'); + expect(setTimeout).toHaveBeenCalledTimes(1); + }); + + it('should cancel the stuck timer on PROCESSING_COMPLETED', () => { + MonitoringActor(mockCallback, mockReceive, config); + sendEvent('PROCESSING_STARTED'); + sendEvent('PROCESSING_COMPLETED'); + expect(clearTimeout).toHaveBeenCalledTimes(1); + }); + + it('should fire a MAJOR alert when stuck and NOT send MONITORING_IDLE_TIMEOUT', async () => { + MonitoringActor(mockCallback, mockReceive, config); + sendEvent('PROCESSING_STARTED'); + + jest.advanceTimersByTime(config['STUCK_PROCESSING_TIMEOUT_MS'] + 1); + await Promise.resolve(); + await Promise.resolve(); + + expect(mockAddAlert).toHaveBeenCalledTimes(1); + expect(mockAddAlert.mock.calls[0][0]).toBe('Daemon Stuck In Processing State'); + expect(mockAddAlert.mock.calls[0][2]).toBe(Severity.MAJOR); + // Stuck detection intentionally does not notify the machine — machine keeps running + expect(mockCallback).not.toHaveBeenCalled(); + }); + + it('should NOT fire the stuck alert when PROCESSING_COMPLETED arrives in time', async () => { + MonitoringActor(mockCallback, mockReceive, config); + sendEvent('PROCESSING_STARTED'); + + jest.advanceTimersByTime(config['STUCK_PROCESSING_TIMEOUT_MS'] - 1000); + sendEvent('PROCESSING_COMPLETED'); + + jest.advanceTimersByTime(2000); // advance past original timeout + await Promise.resolve(); + + expect(mockAddAlert).not.toHaveBeenCalled(); + expect(mockCallback).not.toHaveBeenCalled(); + }); + + it('should reset the stuck timer on consecutive PROCESSING_STARTED events', () => { + MonitoringActor(mockCallback, mockReceive, config); + sendEvent('PROCESSING_STARTED'); + sendEvent('PROCESSING_STARTED'); // second one clears the first + expect(clearTimeout).toHaveBeenCalledTimes(1); + expect(setTimeout).toHaveBeenCalledTimes(2); + }); + + it('should stop the stuck timer when the actor is stopped', () => { + const stopActor = MonitoringActor(mockCallback, mockReceive, config); + sendEvent('PROCESSING_STARTED'); + stopActor(); + expect(clearTimeout).toHaveBeenCalledTimes(1); + }); + + it('should also clear the stuck timer on DISCONNECTED', () => { + MonitoringActor(mockCallback, mockReceive, config); + sendEvent('CONNECTED'); + sendEvent('PROCESSING_STARTED'); + sendEvent('DISCONNECTED'); + // clearTimeout for stuck timer + clearInterval for idle timer + expect(clearTimeout).toHaveBeenCalledTimes(1); + expect(clearInterval).toHaveBeenCalledTimes(1); + }); + + // ── Reconnection storm detection ───────────────────────────────────────────── + + it('should fire a reconnection storm alert when the threshold is reached', async () => { + MonitoringActor(mockCallback, mockReceive, config); + + sendEvent('RECONNECTING'); + sendEvent('RECONNECTING'); + sendEvent('RECONNECTING'); // threshold is 3 in test config + + await Promise.resolve(); + expect(mockAddAlert).toHaveBeenCalledTimes(1); + expect(mockAddAlert.mock.calls[0][0]).toBe('Daemon Reconnection Storm'); + expect(mockAddAlert.mock.calls[0][2]).toBe(Severity.MAJOR); + }); + + it('should NOT fire a reconnection storm alert below the threshold', async () => { + MonitoringActor(mockCallback, mockReceive, config); + + sendEvent('RECONNECTING'); + sendEvent('RECONNECTING'); + + await Promise.resolve(); + expect(mockAddAlert).not.toHaveBeenCalled(); + }); + + it('should not fire more than one storm alert within the 1-minute cooldown window', async () => { + MonitoringActor(mockCallback, mockReceive, config); + + // Trigger threshold + sendEvent('RECONNECTING'); + sendEvent('RECONNECTING'); + sendEvent('RECONNECTING'); // threshold = 3 → first alert + + await Promise.resolve(); + expect(mockAddAlert).toHaveBeenCalledTimes(1); + + // Additional reconnections within cooldown (no time advanced) + sendEvent('RECONNECTING'); + sendEvent('RECONNECTING'); + + await Promise.resolve(); + // Cooldown prevents a second alert + expect(mockAddAlert).toHaveBeenCalledTimes(1); + }); + + it('should fire another storm alert after the 1-minute cooldown expires', async () => { + MonitoringActor(mockCallback, mockReceive, config); + + sendEvent('RECONNECTING'); + sendEvent('RECONNECTING'); + sendEvent('RECONNECTING'); // first alert + + await Promise.resolve(); + expect(mockAddAlert).toHaveBeenCalledTimes(1); + + // Advance past the 1-minute cooldown + jest.advanceTimersByTime(61 * 1000); + + sendEvent('RECONNECTING'); // still >= threshold in window, cooldown expired + + await Promise.resolve(); + expect(mockAddAlert).toHaveBeenCalledTimes(2); + expect(mockAddAlert.mock.calls[1][0]).toBe('Daemon Reconnection Storm'); + }); + + it('should evict old reconnections outside the storm window', async () => { + MonitoringActor(mockCallback, mockReceive, config); + + sendEvent('RECONNECTING'); + sendEvent('RECONNECTING'); + + jest.advanceTimersByTime(config['RECONNECTION_STORM_WINDOW_MS'] + 1000); + + // Only 1 new reconnection — below threshold after eviction + sendEvent('RECONNECTING'); + + await Promise.resolve(); + expect(mockAddAlert).not.toHaveBeenCalled(); + }); + + // ── Misc ───────────────────────────────────────────────────────────────────── + + it('should ignore events of other types and log a warning', () => { + const warnSpy = jest.spyOn(logger, 'warn'); + MonitoringActor(mockCallback, mockReceive, config); + + receiveCallback({ type: 'SOME_OTHER_EVENT', event: { type: 'WHATEVER' } }); + + expect(warnSpy).toHaveBeenCalledWith( + '[monitoring] Unexpected event type received by MonitoringActor', + ); + expect(setInterval).not.toHaveBeenCalled(); + }); +}); diff --git a/packages/daemon/__tests__/integration/balances.test.ts b/packages/daemon/__tests__/integration/balances.test.ts index 5d0565ac..488ffb8c 100644 --- a/packages/daemon/__tests__/integration/balances.test.ts +++ b/packages/daemon/__tests__/integration/balances.test.ts @@ -96,6 +96,10 @@ getConfig.mockReturnValue({ DB_PASS, DB_PORT, ACK_TIMEOUT_MS: 20000, + IDLE_EVENT_TIMEOUT_MS: 5 * 60 * 1000, + STUCK_PROCESSING_TIMEOUT_MS: 5 * 60 * 1000, + RECONNECTION_STORM_THRESHOLD: 10, + RECONNECTION_STORM_WINDOW_MS: 5 * 60 * 1000, }); let mysql: Connection; diff --git a/packages/daemon/__tests__/integration/token_creation.test.ts b/packages/daemon/__tests__/integration/token_creation.test.ts index afcf0f3a..30f374eb 100644 --- a/packages/daemon/__tests__/integration/token_creation.test.ts +++ b/packages/daemon/__tests__/integration/token_creation.test.ts @@ -57,6 +57,10 @@ getConfig.mockReturnValue({ DB_PASS, DB_PORT, ACK_TIMEOUT_MS: 20000, + IDLE_EVENT_TIMEOUT_MS: 5 * 60 * 1000, + STUCK_PROCESSING_TIMEOUT_MS: 5 * 60 * 1000, + RECONNECTION_STORM_THRESHOLD: 10, + RECONNECTION_STORM_WINDOW_MS: 5 * 60 * 1000, }); let mysql: Connection; diff --git a/packages/daemon/jestIntegrationSetup.ts b/packages/daemon/jestIntegrationSetup.ts new file mode 100644 index 00000000..187f7a26 --- /dev/null +++ b/packages/daemon/jestIntegrationSetup.ts @@ -0,0 +1,16 @@ +/** + * Copyright (c) Hathor Labs and its affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +/** + * Integration test environment setup. + * Mocks addAlert so MonitoringActor does not attempt real SQS/SNS connections + * in environments where AWS credentials / region are not configured. + */ +jest.mock('@wallet-service/common', () => ({ + ...jest.requireActual('@wallet-service/common'), + addAlert: jest.fn().mockResolvedValue(undefined), +})); diff --git a/packages/daemon/jest_integration.config.js b/packages/daemon/jest_integration.config.js index 86722d5e..2dfc9ff2 100644 --- a/packages/daemon/jest_integration.config.js +++ b/packages/daemon/jest_integration.config.js @@ -7,6 +7,7 @@ const mainTestMatch = process.env.SPECIFIC_INTEGRATION_TEST_FILE module.exports = { roots: ["/__tests__"], setupFiles: ['./jestSetup.ts'], + setupFilesAfterEnv: ['./jestIntegrationSetup.ts'], transform: { "^.+\\.ts$": ["ts-jest", { tsconfig: "./tsconfig.json", diff --git a/packages/daemon/src/actions/index.ts b/packages/daemon/src/actions/index.ts index 70fe5461..87e65c53 100644 --- a/packages/daemon/src/actions/index.ts +++ b/packages/daemon/src/actions/index.ts @@ -5,8 +5,8 @@ * LICENSE file in the root directory of this source tree. */ -import { assign, AssignAction, sendTo } from 'xstate'; -import { Context, Event, EventTypes, StandardFullNodeEvent } from '../types'; +import { assign, AssignAction, sendTo, choose } from 'xstate'; +import { Context, Event, EventTypes, MonitoringEvent, StandardFullNodeEvent } from '../types'; import { get } from 'lodash'; import logger from '../logger'; import { hashTxData } from '../utils'; @@ -197,3 +197,28 @@ export const stopHealthcheckPing = sendTo( * Logs the event as an error log */ export const logEventError = (_context: Context, event: Event) => logger.error(bigIntUtils.JSONBigInt.stringify(event)); + +/* + * This is a helper to get the monitoring ref from the context and throw if it's not found. + */ +export const getMonitoringRefFromContext = (context: Context) => { + if (!context.monitoring) { + throw new Error('No monitoring actor in context'); + } + + return context.monitoring; +}; + +const monitoringIsPresent = (context: Context) => context.monitoring !== null; + +const makeMonitoringSender = (eventType: MonitoringEvent['type']) => choose([{ + cond: monitoringIsPresent, + actions: sendTo(getMonitoringRefFromContext, { type: EventTypes.MONITORING_EVENT, event: { type: eventType } }), +}]); + +export const sendMonitoringConnected = makeMonitoringSender('CONNECTED'); +export const sendMonitoringDisconnected = makeMonitoringSender('DISCONNECTED'); +export const sendMonitoringEventReceived = makeMonitoringSender('EVENT_RECEIVED'); +export const sendMonitoringReconnecting = makeMonitoringSender('RECONNECTING'); +export const sendMonitoringProcessingStarted = makeMonitoringSender('PROCESSING_STARTED'); +export const sendMonitoringProcessingCompleted = makeMonitoringSender('PROCESSING_COMPLETED'); diff --git a/packages/daemon/src/actors/MonitoringActor.ts b/packages/daemon/src/actors/MonitoringActor.ts new file mode 100644 index 00000000..ec84809d --- /dev/null +++ b/packages/daemon/src/actors/MonitoringActor.ts @@ -0,0 +1,203 @@ +/** + * Copyright (c) Hathor Labs and its affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +import logger from '../logger'; +import getConfig from '../config'; +import { addAlert, Severity } from '@wallet-service/common'; +import { Event, EventTypes } from '../types'; + +/** + * MonitoringActor + * + * Centralises all runtime health monitoring for the sync state machine. + * The machine sends MONITORING_EVENTs to this actor; when anomalies are detected + * the actor fires alerts and, when necessary, sends MONITORING_IDLE_TIMEOUT back + * to the machine to trigger a graceful shutdown. + * + * Responsibilities: + * + * 1. Idle-stream detection — no EVENT_RECEIVED for >IDLE_EVENT_TIMEOUT_MS while + * connected fires a MAJOR alert so operators know the fullnode stream may have + * stalled without triggering a WebSocket disconnect. + * + * 2. Stuck-processing detection — PROCESSING_STARTED begins a one-shot timer; + * PROCESSING_COMPLETED cancels it. If the timer fires the actor fires a + * MAJOR alert; the machine keeps running so that a long-running handler + * (e.g. a large reorg) is allowed to finish. + * + * 3. Reconnection storm detection — fires a MAJOR alert when the daemon + * reconnects more than RECONNECTION_STORM_THRESHOLD times within + * RECONNECTION_STORM_WINDOW_MS. Duplicate alerts are suppressed for + * STORM_ALERT_COOLDOWN_MS (1 min) to avoid spamming the alerting system. + */ +const DEFAULT_IDLE_EVENT_TIMEOUT_MS = 5 * 60 * 1000; +const DEFAULT_STUCK_PROCESSING_TIMEOUT_MS = 60 * 60 * 1000; // 1 hour +const DEFAULT_RECONNECTION_STORM_THRESHOLD = 10; +const DEFAULT_RECONNECTION_STORM_WINDOW_MS = 5 * 60 * 1000; + +export default (callback: any, receive: any, config = getConfig()) => { + logger.info('Starting monitoring actor'); + + const idleTimeoutMs = config.IDLE_EVENT_TIMEOUT_MS ?? DEFAULT_IDLE_EVENT_TIMEOUT_MS; + const stuckTimeoutMs = config.STUCK_PROCESSING_TIMEOUT_MS ?? DEFAULT_STUCK_PROCESSING_TIMEOUT_MS; + const stormThreshold = config.RECONNECTION_STORM_THRESHOLD ?? DEFAULT_RECONNECTION_STORM_THRESHOLD; + const stormWindowMs = config.RECONNECTION_STORM_WINDOW_MS ?? DEFAULT_RECONNECTION_STORM_WINDOW_MS; + + // ── Idle detection ────────────────────────────────────────────────────────── + let isConnected = false; + let lastEventReceivedAt: number | null = null; + let idleCheckTimer: ReturnType | null = null; + let idleAlertFired = false; + + const startIdleCheck = () => { + stopIdleCheck(); + lastEventReceivedAt = Date.now(); + idleAlertFired = false; + + idleCheckTimer = setInterval(async () => { + // Interval is idleTimeoutMs/2 so the worst-case detection lag is 1.5×timeout + if (!isConnected || lastEventReceivedAt === null) return; + + const idleMs = Date.now() - lastEventReceivedAt; + if (idleMs >= idleTimeoutMs && !idleAlertFired) { + idleAlertFired = true; + const idleMinutes = Math.round(idleMs / 60000); + logger.error( + `[monitoring] No fullnode events received for ${idleMinutes} minutes while WebSocket is connected — terminating`, + ); + addAlert( + 'Daemon Idle — No Events Received', + `No fullnode events received for ${idleMinutes} minute(s) while the WebSocket is connected. ` + + 'Terminating the process so Kubernetes can restart it.', + Severity.MAJOR, + { idleMs: String(idleMs) }, + logger, + ).finally(() => { + callback({ type: EventTypes.MONITORING_IDLE_TIMEOUT }); + }); + } + }, Math.floor(idleTimeoutMs / 2)); + }; + + const stopIdleCheck = () => { + if (idleCheckTimer) { + clearInterval(idleCheckTimer); + idleCheckTimer = null; + } + }; + + // ── Stuck-processing detection ─────────────────────────────────────────────── + let stuckTimer: ReturnType | null = null; + + const startStuckTimer = () => { + clearStuckTimer(); + stuckTimer = setTimeout(async () => { + logger.error('[monitoring] State machine stuck in processing state'); + addAlert( + 'Daemon Stuck In Processing State', + `The state machine has been processing a single event for more than ` + + `${Math.round(stuckTimeoutMs / 60000)} minute(s).`, + Severity.MAJOR, + { timeoutMs: String(stuckTimeoutMs) }, + logger, + ).catch((err: Error) => + logger.error(`[monitoring] Failed to send stuck-processing alert: ${err.message}`), + ); + }, stuckTimeoutMs); + }; + + const clearStuckTimer = () => { + if (stuckTimer) { + clearTimeout(stuckTimer); + stuckTimer = null; + } + }; + + // ── Reconnection storm detection ───────────────────────────────────────────── + const STORM_ALERT_COOLDOWN_MS = 60 * 1000; // suppress duplicate storm alerts for 1 minute + let reconnectionTimestamps: number[] = []; + let stormAlertLastFiredAt: number | null = null; + + const trackReconnection = () => { + const now = Date.now(); + reconnectionTimestamps.push(now); + + const windowStart = now - stormWindowMs; + reconnectionTimestamps = reconnectionTimestamps.filter(t => t >= windowStart); + + if (reconnectionTimestamps.length >= stormThreshold) { + if (stormAlertLastFiredAt !== null && now - stormAlertLastFiredAt < STORM_ALERT_COOLDOWN_MS) { + return; // still within cooldown — do not spam the alerting system + } + stormAlertLastFiredAt = now; + + const windowMinutes = Math.round(stormWindowMs / 60000); + logger.error( + `[monitoring] Reconnection storm: ${reconnectionTimestamps.length} reconnections in the last ${windowMinutes} minutes`, + ); + addAlert( + 'Daemon Reconnection Storm', + `${reconnectionTimestamps.length} reconnections occurred in the last ${windowMinutes} minute(s). ` + + 'The daemon may be stuck in a reconnection loop.', + Severity.MAJOR, + { + reconnectionCount: String(reconnectionTimestamps.length), + windowMinutes: String(windowMinutes), + }, + logger, + ).catch((err: Error) => + logger.error(`[monitoring] Failed to send reconnection storm alert: ${err.message}`), + ); + } + }; + + // ── Event handling ──────────────────────────────────────────────────────────── + receive((event: Event) => { + if (event.type !== EventTypes.MONITORING_EVENT) { + logger.warn('[monitoring] Unexpected event type received by MonitoringActor'); + return; + } + + switch (event.event.type) { + case 'CONNECTED': + logger.info('[monitoring] WebSocket connected — starting idle-event timer'); + isConnected = true; + startIdleCheck(); + break; + + case 'DISCONNECTED': + logger.info('[monitoring] WebSocket disconnected — stopping timers'); + isConnected = false; + stopIdleCheck(); + clearStuckTimer(); + break; + + case 'EVENT_RECEIVED': + lastEventReceivedAt = Date.now(); + idleAlertFired = false; + break; + + case 'PROCESSING_STARTED': + startStuckTimer(); + break; + + case 'PROCESSING_COMPLETED': + clearStuckTimer(); + break; + + case 'RECONNECTING': + trackReconnection(); + break; + } + }); + + return () => { + logger.info('Stopping monitoring actor'); + stopIdleCheck(); + clearStuckTimer(); + }; +}; diff --git a/packages/daemon/src/actors/index.ts b/packages/daemon/src/actors/index.ts index e2907d54..79407af0 100644 --- a/packages/daemon/src/actors/index.ts +++ b/packages/daemon/src/actors/index.ts @@ -7,4 +7,5 @@ export { default as WebSocketActor } from './WebSocketActor'; export { default as HealthCheckActor } from './HealthCheckActor'; +export { default as MonitoringActor } from './MonitoringActor'; export * from './helpers'; diff --git a/packages/daemon/src/config.ts b/packages/daemon/src/config.ts index e5538dfc..8a5d683a 100644 --- a/packages/daemon/src/config.ts +++ b/packages/daemon/src/config.ts @@ -87,6 +87,16 @@ export const HEALTHCHECK_PING_INTERVAL = parseInt(process.env.HEALTHCHECK_PING_I // ACK timeout configuration (in milliseconds) export const ACK_TIMEOUT_MS = parseInt(process.env.ACK_TIMEOUT_MS ?? '20000', 10); // 20 seconds +// Monitoring configuration +// Timeout (ms) before alerting when no fullnode events received while WebSocket connected +export const IDLE_EVENT_TIMEOUT_MS = parseInt(process.env.IDLE_EVENT_TIMEOUT_MS ?? String(5 * 60 * 1000), 10); // 5 minutes +// Timeout (ms) before alerting when stuck in a single processing state +export const STUCK_PROCESSING_TIMEOUT_MS = parseInt(process.env.STUCK_PROCESSING_TIMEOUT_MS ?? String(60 * 60 * 1000), 10); // 1 hour +// Number of reconnections within RECONNECTION_STORM_WINDOW_MS to trigger a storm alert +export const RECONNECTION_STORM_THRESHOLD = parseInt(process.env.RECONNECTION_STORM_THRESHOLD ?? '10', 10); +// Time window (ms) for reconnection storm detection +export const RECONNECTION_STORM_WINDOW_MS = parseInt(process.env.RECONNECTION_STORM_WINDOW_MS ?? String(5 * 60 * 1000), 10); // 5 minutes + // Other export const USE_SSL = process.env.USE_SSL === 'true'; @@ -127,6 +137,10 @@ export default () => ({ HEALTHCHECK_SERVER_API_KEY, HEALTHCHECK_PING_INTERVAL, ACK_TIMEOUT_MS, + IDLE_EVENT_TIMEOUT_MS, + STUCK_PROCESSING_TIMEOUT_MS, + RECONNECTION_STORM_THRESHOLD, + RECONNECTION_STORM_WINDOW_MS, REORG_SIZE_INFO, REORG_SIZE_MINOR, REORG_SIZE_MAJOR, diff --git a/packages/daemon/src/index.ts b/packages/daemon/src/index.ts index 571c6dac..fc9ffc4f 100644 --- a/packages/daemon/src/index.ts +++ b/packages/daemon/src/index.ts @@ -20,6 +20,11 @@ const main = async () => { logger.info(`Transitioned to ${bigIntUtils.JSONBigInt.stringify(state.value)}`); }); + machine.onDone(() => { + logger.error('Sync machine reached a final state — terminating process for Kubernetes restart'); + process.exit(1); + }); + machine.onEvent((event) => { logger.info(`Processing event: ${bigIntUtils.JSONBigInt.stringify(event.type)}`); }); diff --git a/packages/daemon/src/machines/SyncMachine.ts b/packages/daemon/src/machines/SyncMachine.ts index 5caa3632..a636b3b1 100644 --- a/packages/daemon/src/machines/SyncMachine.ts +++ b/packages/daemon/src/machines/SyncMachine.ts @@ -11,7 +11,7 @@ import { spawn, } from 'xstate'; import { LRU } from '../utils'; -import { WebSocketActor, HealthCheckActor } from '../actors'; +import { WebSocketActor, HealthCheckActor, MonitoringActor } from '../actors'; import { Context, Event, @@ -59,6 +59,12 @@ import { updateCache, startHealthcheckPing, stopHealthcheckPing, + sendMonitoringConnected, + sendMonitoringDisconnected, + sendMonitoringEventReceived, + sendMonitoringReconnecting, + sendMonitoringProcessingStarted, + sendMonitoringProcessingCompleted, } from '../actions'; import { BACKOFF_DELAYED_RECONNECT, ACK_TIMEOUT } from '../delays'; import getConfig from '../config'; @@ -94,6 +100,7 @@ export const SyncMachine = Machine({ context: { socket: null, healthcheck: null, + monitoring: null, retryAttempt: 0, event: null, initialEventId: null, @@ -104,6 +111,7 @@ export const SyncMachine = Machine({ entry: assign({ txCache: () => new LRU(TX_CACHE_SIZE), healthcheck: () => spawn(HealthCheckActor), + monitoring: () => spawn(MonitoringActor), }), invoke: { src: 'fetchInitialState', @@ -130,7 +138,13 @@ export const SyncMachine = Machine({ }, }, [SYNC_MACHINE_STATES.RECONNECTING]: { - onEntry: ['clearSocket', 'increaseRetry', 'stopHealthcheckPing'], + onEntry: [ + 'clearSocket', + 'increaseRetry', + 'stopHealthcheckPing', + 'sendMonitoringReconnecting', + 'sendMonitoringDisconnected', + ], after: { BACKOFF_DELAYED_RECONNECT: SYNC_MACHINE_STATES.CONNECTING, }, @@ -138,7 +152,7 @@ export const SyncMachine = Machine({ [SYNC_MACHINE_STATES.CONNECTED]: { id: SYNC_MACHINE_STATES.CONNECTED, initial: CONNECTED_STATES.idle, - entry: ['startStream', 'startHealthcheckPing'], + entry: ['startStream', 'startHealthcheckPing', 'sendMonitoringConnected'], states: { [CONNECTED_STATES.idle]: { id: CONNECTED_STATES.idle, @@ -158,44 +172,46 @@ export const SyncMachine = Machine({ cond: 'invalidNetwork', target: `#${SYNC_MACHINE_STATES.ERROR}`, }, { - actions: ['storeEvent', 'sendAck'], + actions: ['storeEvent', 'sendAck', 'sendMonitoringEventReceived'], cond: 'unchanged', target: CONNECTED_STATES.idle, }, { - actions: ['storeEvent'], + actions: ['storeEvent', 'sendMonitoringEventReceived'], cond: 'metadataChanged', target: CONNECTED_STATES.handlingMetadataChanged, }, { - actions: ['storeEvent', 'sendAck'], + actions: ['storeEvent', 'sendAck', 'sendMonitoringEventReceived'], /* If the transaction is already voided and is not * VERTEX_METADATA_CHANGED, we should ignore it. */ cond: 'voided', target: CONNECTED_STATES.idle, }, { - actions: ['storeEvent'], + actions: ['storeEvent', 'sendMonitoringEventReceived'], cond: 'vertexRemoved', target: CONNECTED_STATES.handlingVertexRemoved, }, { - actions: ['storeEvent'], + actions: ['storeEvent', 'sendMonitoringEventReceived'], cond: 'vertexAccepted', target: CONNECTED_STATES.handlingVertexAccepted, }, { - actions: ['storeEvent'], + actions: ['storeEvent', 'sendMonitoringEventReceived'], cond: 'reorgStarted', target: CONNECTED_STATES.handlingReorgStarted, }, { - actions: ['storeEvent'], + actions: ['storeEvent', 'sendMonitoringEventReceived'], cond: 'tokenCreated', target: CONNECTED_STATES.handlingTokenCreated, }, { - actions: ['storeEvent'], + actions: ['storeEvent', 'sendMonitoringEventReceived'], target: CONNECTED_STATES.handlingUnhandledEvent, }], }, }, [CONNECTED_STATES.handlingUnhandledEvent]: { id: CONNECTED_STATES.handlingUnhandledEvent, + entry: ['sendMonitoringProcessingStarted'], + exit: ['sendMonitoringProcessingCompleted'], invoke: { src: 'updateLastSyncedEvent', onDone: { @@ -208,6 +224,8 @@ export const SyncMachine = Machine({ [CONNECTED_STATES.handlingMetadataChanged]: { id: 'handlingMetadataChanged', initial: 'detectingDiff', + entry: ['sendMonitoringProcessingStarted'], + exit: ['sendMonitoringProcessingCompleted'], states: { detectingDiff: { invoke: { @@ -236,6 +254,8 @@ export const SyncMachine = Machine({ // We have the unchanged guard, so it's guaranteed that this is a new tx [CONNECTED_STATES.handlingVertexAccepted]: { id: CONNECTED_STATES.handlingVertexAccepted, + entry: ['sendMonitoringProcessingStarted'], + exit: ['sendMonitoringProcessingCompleted'], invoke: { src: 'handleVertexAccepted', data: (_context: Context, event: Event) => event, @@ -248,6 +268,8 @@ export const SyncMachine = Machine({ }, [CONNECTED_STATES.handlingVertexRemoved]: { id: CONNECTED_STATES.handlingVertexRemoved, + entry: ['sendMonitoringProcessingStarted'], + exit: ['sendMonitoringProcessingCompleted'], invoke: { src: 'handleVertexRemoved', data: (_context: Context, event: Event) => event, @@ -260,6 +282,8 @@ export const SyncMachine = Machine({ }, [CONNECTED_STATES.handlingVoidedTx]: { id: CONNECTED_STATES.handlingVoidedTx, + entry: ['sendMonitoringProcessingStarted'], + exit: ['sendMonitoringProcessingCompleted'], invoke: { src: 'handleVoidedTx', data: (_context: Context, event: Event) => event, @@ -272,6 +296,8 @@ export const SyncMachine = Machine({ }, [CONNECTED_STATES.handlingUnvoidedTx]: { id: CONNECTED_STATES.handlingUnvoidedTx, + entry: ['sendMonitoringProcessingStarted'], + exit: ['sendMonitoringProcessingCompleted'], invoke: { src: 'handleUnvoidedTx', data: (_context: Context, event: Event) => event, @@ -287,6 +313,8 @@ export const SyncMachine = Machine({ }, [CONNECTED_STATES.handlingFirstBlock]: { id: CONNECTED_STATES.handlingFirstBlock, + entry: ['sendMonitoringProcessingStarted'], + exit: ['sendMonitoringProcessingCompleted'], invoke: { src: 'handleTxFirstBlock', data: (_context: Context, event: Event) => event, @@ -299,6 +327,8 @@ export const SyncMachine = Machine({ }, [CONNECTED_STATES.handlingNcExecVoided]: { id: CONNECTED_STATES.handlingNcExecVoided, + entry: ['sendMonitoringProcessingStarted'], + exit: ['sendMonitoringProcessingCompleted'], invoke: { src: 'handleNcExecVoided', data: (_context: Context, event: Event) => event, @@ -311,6 +341,8 @@ export const SyncMachine = Machine({ }, [CONNECTED_STATES.handlingReorgStarted]: { id: CONNECTED_STATES.handlingReorgStarted, + entry: ['sendMonitoringProcessingStarted'], + exit: ['sendMonitoringProcessingCompleted'], invoke: { src: 'handleReorgStarted', data: (_context: Context, event: Event) => event, @@ -323,6 +355,8 @@ export const SyncMachine = Machine({ }, [CONNECTED_STATES.handlingTokenCreated]: { id: CONNECTED_STATES.handlingTokenCreated, + entry: ['sendMonitoringProcessingStarted'], + exit: ['sendMonitoringProcessingCompleted'], invoke: { src: 'handleTokenCreated', data: (_context: Context, event: Event) => event, @@ -335,6 +369,8 @@ export const SyncMachine = Machine({ }, [CONNECTED_STATES.checkingForMissedEvents]: { id: CONNECTED_STATES.checkingForMissedEvents, + entry: ['sendMonitoringProcessingStarted'], + exit: ['sendMonitoringProcessingCompleted'], invoke: { src: 'checkForMissedEvents', onDone: [{ @@ -355,12 +391,15 @@ export const SyncMachine = Machine({ cond: 'websocketDisconnected', target: SYNC_MACHINE_STATES.RECONNECTING, }], + MONITORING_IDLE_TIMEOUT: { + target: `#${SYNC_MACHINE_STATES.ERROR}`, + }, }, }, [SYNC_MACHINE_STATES.ERROR]: { id: SYNC_MACHINE_STATES.ERROR, type: 'final', - onEntry: ['logEventError', 'stopHealthcheckPing'], + onEntry: ['logEventError', 'stopHealthcheckPing', 'sendMonitoringDisconnected'], }, }, }, { @@ -407,6 +446,12 @@ export const SyncMachine = Machine({ updateCache, startHealthcheckPing, stopHealthcheckPing, + sendMonitoringConnected, + sendMonitoringDisconnected, + sendMonitoringEventReceived, + sendMonitoringReconnecting, + sendMonitoringProcessingStarted, + sendMonitoringProcessingCompleted, }, }); diff --git a/packages/daemon/src/services/index.ts b/packages/daemon/src/services/index.ts index 82ebf91d..b8151239 100644 --- a/packages/daemon/src/services/index.ts +++ b/packages/daemon/src/services/index.ts @@ -17,6 +17,7 @@ import { DbTransaction, LastSyncedEvent, Event, + EventTypes, Context, EventTxInput, EventTxOutput, @@ -103,7 +104,7 @@ export const METADATA_DIFF_EVENT_TYPES = { const DUPLICATE_TX_ALERT_GRACE_PERIOD = 10; // seconds export const metadataDiff = async (_context: Context, event: Event) => { - const fullNodeEvent = event.event as StandardFullNodeEvent; + const fullNodeEvent = (event as Extract).event as StandardFullNodeEvent; const { hash, metadata: { voided_by, first_block, nc_execution }, diff --git a/packages/daemon/src/types/event.ts b/packages/daemon/src/types/event.ts index eebf0f96..5fa0b3d2 100644 --- a/packages/daemon/src/types/event.ts +++ b/packages/daemon/src/types/event.ts @@ -28,11 +28,21 @@ export type HealthCheckEvent = | { type: 'START' } | { type: 'STOP' }; +export type MonitoringEvent = + | { type: 'CONNECTED' } + | { type: 'DISCONNECTED' } + | { type: 'EVENT_RECEIVED' } + | { type: 'RECONNECTING' } + | { type: 'PROCESSING_STARTED' } + | { type: 'PROCESSING_COMPLETED' }; + export enum EventTypes { WEBSOCKET_EVENT = 'WEBSOCKET_EVENT', FULLNODE_EVENT = 'FULLNODE_EVENT', WEBSOCKET_SEND_EVENT = 'WEBSOCKET_SEND_EVENT', HEALTHCHECK_EVENT = 'HEALTHCHECK_EVENT', + MONITORING_EVENT = 'MONITORING_EVENT', + MONITORING_IDLE_TIMEOUT = 'MONITORING_IDLE_TIMEOUT', } export enum FullNodeEventTypes { @@ -72,7 +82,9 @@ export type Event = | { type: EventTypes.WEBSOCKET_EVENT, event: WebSocketEvent } | { type: EventTypes.FULLNODE_EVENT, event: FullNodeEvent } | { type: EventTypes.WEBSOCKET_SEND_EVENT, event: WebSocketSendEvent } - | { type: EventTypes.HEALTHCHECK_EVENT, event: HealthCheckEvent }; + | { type: EventTypes.HEALTHCHECK_EVENT, event: HealthCheckEvent } + | { type: EventTypes.MONITORING_EVENT, event: MonitoringEvent } + | { type: EventTypes.MONITORING_IDLE_TIMEOUT }; export interface VertexRemovedEventData { diff --git a/packages/daemon/src/types/machine.ts b/packages/daemon/src/types/machine.ts index 04871e9a..1c9ebff0 100644 --- a/packages/daemon/src/types/machine.ts +++ b/packages/daemon/src/types/machine.ts @@ -12,6 +12,7 @@ import { FullNodeEvent } from './event'; export interface Context { socket: ActorRef | null; healthcheck: ActorRef | null; + monitoring: ActorRef | null; retryAttempt: number; event?: FullNodeEvent | null; initialEventId: null | number;