From 62dc57c68b1ef93b6d704291400e5a6c0c0f2325 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Abadesso?= Date: Mon, 23 Feb 2026 09:17:31 -0300 Subject: [PATCH 01/11] feat(daemon): add monitoring layer with idle detection, stuck-state restart, and reconnection storm alerts --- .../__tests__/actors/MonitoringActor.test.ts | 214 ++++++++++++++++++ packages/daemon/src/actions/index.ts | 66 ++++++ packages/daemon/src/actors/MonitoringActor.ts | 142 ++++++++++++ packages/daemon/src/actors/index.ts | 1 + packages/daemon/src/config.ts | 14 ++ packages/daemon/src/delays/index.ts | 6 + packages/daemon/src/machines/SyncMachine.ts | 112 +++++++-- packages/daemon/src/types/event.ts | 10 +- packages/daemon/src/types/machine.ts | 1 + 9 files changed, 551 insertions(+), 15 deletions(-) create mode 100644 packages/daemon/__tests__/actors/MonitoringActor.test.ts create mode 100644 packages/daemon/src/actors/MonitoringActor.ts diff --git a/packages/daemon/__tests__/actors/MonitoringActor.test.ts b/packages/daemon/__tests__/actors/MonitoringActor.test.ts new file mode 100644 index 00000000..9e410392 --- /dev/null +++ b/packages/daemon/__tests__/actors/MonitoringActor.test.ts @@ -0,0 +1,214 @@ +/** + * Copyright (c) Hathor Labs and its affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +import MonitoringActor from '../../src/actors/MonitoringActor'; +import logger from '../../src/logger'; +import { EventTypes } from '../../src/types/event'; +import getConfig from '../../src/config'; +import { addAlert } from '@wallet-service/common'; + +jest.useFakeTimers(); +jest.spyOn(global, 'setInterval'); +jest.spyOn(global, 'clearInterval'); + +jest.mock('@wallet-service/common', () => ({ + ...jest.requireActual('@wallet-service/common'), + addAlert: jest.fn().mockResolvedValue(undefined), +})); + +const mockAddAlert = addAlert as jest.Mock; + +describe('MonitoringActor', () => { + let mockCallback: jest.Mock; + let mockReceive: jest.Mock; + let receiveCallback: (event: any) => void; + let config: ReturnType; + + const sendEvent = (monitoringEventType: string) => { + receiveCallback({ + type: EventTypes.MONITORING_EVENT, + event: { type: monitoringEventType }, + }); + }; + + beforeEach(() => { + jest.clearAllMocks(); + jest.clearAllTimers(); + config = getConfig(); + config['IDLE_EVENT_TIMEOUT_MS'] = 5 * 60 * 1000; // 5 min + config['RECONNECTION_STORM_THRESHOLD'] = 3; // low threshold for tests + config['RECONNECTION_STORM_WINDOW_MS'] = 5 * 60 * 1000; // 5 min + + mockCallback = jest.fn(); + mockReceive = jest.fn().mockImplementation((cb: any) => { + receiveCallback = cb; + }); + }); + + afterAll(() => { + jest.clearAllMocks(); + jest.useRealTimers(); + }); + + it('should not start the idle timer on initialization', () => { + MonitoringActor(mockCallback, mockReceive, config); + expect(setInterval).not.toHaveBeenCalled(); + }); + + it('should start the idle timer when receiving a CONNECTED event', () => { + MonitoringActor(mockCallback, mockReceive, config); + sendEvent('CONNECTED'); + expect(setInterval).toHaveBeenCalledTimes(1); + }); + + it('should stop the idle timer when receiving a DISCONNECTED event', () => { + MonitoringActor(mockCallback, mockReceive, config); + sendEvent('CONNECTED'); + expect(setInterval).toHaveBeenCalledTimes(1); + + sendEvent('DISCONNECTED'); + expect(clearInterval).toHaveBeenCalledTimes(1); + }); + + it('should stop the idle timer when the actor is stopped', () => { + const stopActor = MonitoringActor(mockCallback, mockReceive, config); + sendEvent('CONNECTED'); + expect(setInterval).toHaveBeenCalledTimes(1); + + stopActor(); + expect(clearInterval).toHaveBeenCalledTimes(1); + }); + + it('should fire an idle alert after IDLE_EVENT_TIMEOUT_MS with no events', async () => { + MonitoringActor(mockCallback, mockReceive, config); + sendEvent('CONNECTED'); + + // Advance time past the idle timeout + jest.advanceTimersByTime(config['IDLE_EVENT_TIMEOUT_MS'] + 1); + + // Allow the async addAlert promise to resolve + await Promise.resolve(); + + expect(mockAddAlert).toHaveBeenCalledTimes(1); + expect(mockAddAlert.mock.calls[0][0]).toBe('Daemon Idle — No Events Received'); + }); + + it('should NOT fire an idle alert when events are being received', async () => { + MonitoringActor(mockCallback, mockReceive, config); + sendEvent('CONNECTED'); + + // Advance to just before the timeout + jest.advanceTimersByTime(config['IDLE_EVENT_TIMEOUT_MS'] - 1000); + sendEvent('EVENT_RECEIVED'); + + // Advance past the original threshold (but lastEventReceivedAt was reset) + jest.advanceTimersByTime(config['IDLE_EVENT_TIMEOUT_MS'] - 1000); + + await Promise.resolve(); + expect(mockAddAlert).not.toHaveBeenCalled(); + }); + + it('should fire only one idle alert even if the timer fires multiple times', async () => { + MonitoringActor(mockCallback, mockReceive, config); + sendEvent('CONNECTED'); + + // Fire timer three times without receiving any events + jest.advanceTimersByTime(config['IDLE_EVENT_TIMEOUT_MS'] * 3); + + await Promise.resolve(); + + expect(mockAddAlert).toHaveBeenCalledTimes(1); + }); + + it('should reset the idle alert flag when an event is received after an alert fired', async () => { + MonitoringActor(mockCallback, mockReceive, config); + sendEvent('CONNECTED'); + + // Trigger first alert (interval fires at T = IDLE_EVENT_TIMEOUT_MS) + jest.advanceTimersByTime(config['IDLE_EVENT_TIMEOUT_MS'] + 1); + await Promise.resolve(); + expect(mockAddAlert).toHaveBeenCalledTimes(1); + + // Receive an event — resets idleAlertFired and lastEventReceivedAt to current time T1 + sendEvent('EVENT_RECEIVED'); + + // The next interval tick where idleMs >= threshold is at 3*T (the interval at 2*T + // fires only T-1 ms after EVENT_RECEIVED, which is below the threshold). + // Advancing by 2*T from T1 guarantees we cross that boundary. + jest.advanceTimersByTime(2 * config['IDLE_EVENT_TIMEOUT_MS']); + await Promise.resolve(); + + // A second alert should now be fired + expect(mockAddAlert).toHaveBeenCalledTimes(2); + }); + + it('should fire a reconnection storm alert when threshold is reached', async () => { + MonitoringActor(mockCallback, mockReceive, config); + + // Send enough reconnections to trigger the storm threshold (3 in our test config) + sendEvent('RECONNECTING'); + sendEvent('RECONNECTING'); + sendEvent('RECONNECTING'); + + await Promise.resolve(); + + expect(mockAddAlert).toHaveBeenCalledTimes(1); + expect(mockAddAlert.mock.calls[0][0]).toBe('Daemon Reconnection Storm'); + }); + + it('should NOT fire a reconnection storm alert below the threshold', async () => { + MonitoringActor(mockCallback, mockReceive, config); + + // Send fewer reconnections than the threshold + sendEvent('RECONNECTING'); + sendEvent('RECONNECTING'); + + await Promise.resolve(); + expect(mockAddAlert).not.toHaveBeenCalled(); + }); + + it('should evict old reconnections outside the storm window', async () => { + MonitoringActor(mockCallback, mockReceive, config); + + // Two reconnections at time 0 + sendEvent('RECONNECTING'); + sendEvent('RECONNECTING'); + + // Advance past the storm window so those timestamps are evicted + jest.advanceTimersByTime(config['RECONNECTION_STORM_WINDOW_MS'] + 1000); + + // One new reconnection — count should restart from 1, no alert + sendEvent('RECONNECTING'); + + await Promise.resolve(); + expect(mockAddAlert).not.toHaveBeenCalled(); + }); + + it('should restart idle timer when CONNECTED is sent while already connected', () => { + MonitoringActor(mockCallback, mockReceive, config); + + sendEvent('CONNECTED'); + expect(setInterval).toHaveBeenCalledTimes(1); + + // A second CONNECTED clears the old timer and creates a new one + sendEvent('CONNECTED'); + expect(clearInterval).toHaveBeenCalledTimes(1); + expect(setInterval).toHaveBeenCalledTimes(2); + }); + + it('should ignore events of other types', () => { + const warnSpy = jest.spyOn(logger, 'warn'); + MonitoringActor(mockCallback, mockReceive, config); + + receiveCallback({ type: 'SOME_OTHER_EVENT', event: { type: 'WHATEVER' } }); + + expect(warnSpy).toHaveBeenCalledWith( + '[monitoring] Unexpected event type received by MonitoringActor', + ); + expect(setInterval).not.toHaveBeenCalled(); + }); +}); diff --git a/packages/daemon/src/actions/index.ts b/packages/daemon/src/actions/index.ts index 70fe5461..c05546ab 100644 --- a/packages/daemon/src/actions/index.ts +++ b/packages/daemon/src/actions/index.ts @@ -12,6 +12,7 @@ import logger from '../logger'; import { hashTxData } from '../utils'; import { createStartStreamMessage, createSendAckMessage } from '../actors'; import { bigIntUtils } from '@hathor/wallet-lib'; +import { addAlert, Severity } from '@wallet-service/common'; /* * This action is used to store the initial event id on the context @@ -197,3 +198,68 @@ export const stopHealthcheckPing = sendTo( * Logs the event as an error log */ export const logEventError = (_context: Context, event: Event) => logger.error(bigIntUtils.JSONBigInt.stringify(event)); + +/* + * This is a helper to get the monitoring ref from the context and throw if it's not found. + */ +export const getMonitoringRefFromContext = (context: Context) => { + if (!context.monitoring) { + throw new Error('No monitoring actor in context'); + } + + return context.monitoring; +}; + +/* + * Notifies the monitoring actor that the WebSocket became connected. + */ +export const sendMonitoringConnected = sendTo( + getMonitoringRefFromContext, + { type: EventTypes.MONITORING_EVENT, event: { type: 'CONNECTED' } }, +); + +/* + * Notifies the monitoring actor that the WebSocket disconnected. + */ +export const sendMonitoringDisconnected = sendTo( + getMonitoringRefFromContext, + { type: EventTypes.MONITORING_EVENT, event: { type: 'DISCONNECTED' } }, +); + +/* + * Notifies the monitoring actor that a fullnode event was received (resets the idle timer). + */ +export const sendMonitoringEventReceived = sendTo( + getMonitoringRefFromContext, + { type: EventTypes.MONITORING_EVENT, event: { type: 'EVENT_RECEIVED' } }, +); + +/* + * Notifies the monitoring actor that the machine is entering the RECONNECTING state. + */ +export const sendMonitoringReconnecting = sendTo( + getMonitoringRefFromContext, + { type: EventTypes.MONITORING_EVENT, event: { type: 'RECONNECTING' } }, +); + +/* + * Fires a CRITICAL alert and logs when the machine has been stuck in a processing + * state for longer than STUCK_PROCESSING_TIMEOUT_MS. The machine will transition to + * RECONNECTING immediately after this action runs. + */ +export const alertStuckProcessing = (context: Context) => { + const eventId = context.event?.event?.id; + logger.error( + `[monitoring] State machine stuck processing event ${eventId ?? 'unknown'} for too long — forcing reconnection`, + ); + addAlert( + 'Daemon Stuck In Processing State', + `The state machine has been processing event ${eventId ?? 'unknown'} ` + + 'for longer than the configured timeout. Forcing a reconnection.', + Severity.CRITICAL, + { eventId: eventId !== undefined ? String(eventId) : 'unknown' }, + logger, + ).catch((err: Error) => + logger.error(`[monitoring] Failed to send stuck-processing alert: ${err}`), + ); +}; diff --git a/packages/daemon/src/actors/MonitoringActor.ts b/packages/daemon/src/actors/MonitoringActor.ts new file mode 100644 index 00000000..673a313f --- /dev/null +++ b/packages/daemon/src/actors/MonitoringActor.ts @@ -0,0 +1,142 @@ +/** + * Copyright (c) Hathor Labs and its affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +import logger from '../logger'; +import getConfig from '../config'; +import { addAlert, Severity } from '@wallet-service/common'; +import { Event, EventTypes } from '../types'; + +/** + * MonitoringActor + * + * Watches the state machine for anomalies and raises alerts via the alert manager. + * + * Monitors: + * 1. No events received for >IDLE_EVENT_TIMEOUT_MS while WebSocket connected — fires a MAJOR alert + * so operators know the fullnode stream may have stalled without a disconnect. + * 2. Reconnection storm — fires a CRITICAL alert if the daemon reconnects more than + * RECONNECTION_STORM_THRESHOLD times within RECONNECTION_STORM_WINDOW_MS. This catches + * pathological thrash-reconnect cycles that would otherwise be silent. + * + * The actor receives MONITORING_EVENTs from the SyncMachine: + * - CONNECTED: WebSocket became connected; starts the idle-event timer. + * - DISCONNECTED: WebSocket disconnected; stops the idle-event timer. + * - EVENT_RECEIVED: A fullnode event arrived (resets the idle timer). + * - RECONNECTING: Machine entered RECONNECTING state (used for storm detection). + */ +export default (callback: any, receive: any, config = getConfig()) => { + logger.info('Starting monitoring actor'); + + let isConnected = false; + let lastEventReceivedAt: number | null = null; + // Timer that fires when we have been idle (no EVENT_RECEIVED) for too long + let idleCheckTimer: ReturnType | null = null; + // Whether we already fired the current idle alert (avoids alert flood) + let idleAlertFired = false; + // Rolling list of reconnection timestamps within the storm window + let reconnectionTimestamps: number[] = []; + + const startIdleCheck = () => { + stopIdleCheck(); + lastEventReceivedAt = Date.now(); + idleAlertFired = false; + + idleCheckTimer = setInterval(async () => { + if (!isConnected || lastEventReceivedAt === null) return; + + const idleMs = Date.now() - lastEventReceivedAt; + if (idleMs >= config.IDLE_EVENT_TIMEOUT_MS && !idleAlertFired) { + idleAlertFired = true; + const idleMinutes = Math.round(idleMs / 60000); + logger.warn( + `[monitoring] No fullnode events received for ${idleMinutes} minutes while WebSocket is connected`, + ); + addAlert( + 'Daemon Idle — No Events Received', + `No fullnode events received for ${idleMinutes} minute(s) while the WebSocket is connected. ` + + 'The fullnode stream may be stalled.', + Severity.MAJOR, + { idleMs: String(idleMs) }, + logger, + ).catch((err: Error) => + logger.error(`[monitoring] Failed to send idle alert: ${err}`), + ); + } + }, config.IDLE_EVENT_TIMEOUT_MS); + }; + + const stopIdleCheck = () => { + if (idleCheckTimer) { + clearInterval(idleCheckTimer); + idleCheckTimer = null; + } + }; + + const trackReconnection = () => { + const now = Date.now(); + reconnectionTimestamps.push(now); + + // Evict timestamps outside the rolling window + const windowStart = now - config.RECONNECTION_STORM_WINDOW_MS; + reconnectionTimestamps = reconnectionTimestamps.filter(t => t >= windowStart); + + if (reconnectionTimestamps.length >= config.RECONNECTION_STORM_THRESHOLD) { + const windowMinutes = Math.round(config.RECONNECTION_STORM_WINDOW_MS / 60000); + logger.error( + `[monitoring] Reconnection storm: ${reconnectionTimestamps.length} reconnections in the last ${windowMinutes} minutes`, + ); + addAlert( + 'Daemon Reconnection Storm', + `${reconnectionTimestamps.length} reconnections occurred in the last ${windowMinutes} minute(s). ` + + 'The daemon may be stuck in a reconnection loop.', + Severity.CRITICAL, + { + reconnectionCount: String(reconnectionTimestamps.length), + windowMinutes: String(windowMinutes), + }, + logger, + ).catch((err: Error) => + logger.error(`[monitoring] Failed to send reconnection storm alert: ${err}`), + ); + } + }; + + receive((event: Event) => { + if (event.type !== EventTypes.MONITORING_EVENT) { + logger.warn('[monitoring] Unexpected event type received by MonitoringActor'); + return; + } + + switch (event.event.type) { + case 'CONNECTED': + logger.info('[monitoring] WebSocket connected — starting idle-event timer'); + isConnected = true; + startIdleCheck(); + break; + + case 'DISCONNECTED': + logger.info('[monitoring] WebSocket disconnected — stopping idle-event timer'); + isConnected = false; + stopIdleCheck(); + break; + + case 'EVENT_RECEIVED': + lastEventReceivedAt = Date.now(); + idleAlertFired = false; + break; + + case 'RECONNECTING': + trackReconnection(); + break; + } + }); + + return () => { + logger.info('Stopping monitoring actor'); + stopIdleCheck(); + }; +}; diff --git a/packages/daemon/src/actors/index.ts b/packages/daemon/src/actors/index.ts index e2907d54..79407af0 100644 --- a/packages/daemon/src/actors/index.ts +++ b/packages/daemon/src/actors/index.ts @@ -7,4 +7,5 @@ export { default as WebSocketActor } from './WebSocketActor'; export { default as HealthCheckActor } from './HealthCheckActor'; +export { default as MonitoringActor } from './MonitoringActor'; export * from './helpers'; diff --git a/packages/daemon/src/config.ts b/packages/daemon/src/config.ts index e5538dfc..a8230c9a 100644 --- a/packages/daemon/src/config.ts +++ b/packages/daemon/src/config.ts @@ -87,6 +87,16 @@ export const HEALTHCHECK_PING_INTERVAL = parseInt(process.env.HEALTHCHECK_PING_I // ACK timeout configuration (in milliseconds) export const ACK_TIMEOUT_MS = parseInt(process.env.ACK_TIMEOUT_MS ?? '20000', 10); // 20 seconds +// Monitoring configuration +// Timeout (ms) before alerting when no fullnode events received while WebSocket connected +export const IDLE_EVENT_TIMEOUT_MS = parseInt(process.env.IDLE_EVENT_TIMEOUT_MS ?? String(5 * 60 * 1000), 10); // 5 minutes +// Timeout (ms) before auto-restarting when stuck in a single processing state +export const STUCK_PROCESSING_TIMEOUT_MS = parseInt(process.env.STUCK_PROCESSING_TIMEOUT_MS ?? String(5 * 60 * 1000), 10); // 5 minutes +// Number of reconnections within RECONNECTION_STORM_WINDOW_MS to trigger a storm alert +export const RECONNECTION_STORM_THRESHOLD = parseInt(process.env.RECONNECTION_STORM_THRESHOLD ?? '10', 10); +// Time window (ms) for reconnection storm detection +export const RECONNECTION_STORM_WINDOW_MS = parseInt(process.env.RECONNECTION_STORM_WINDOW_MS ?? String(5 * 60 * 1000), 10); // 5 minutes + // Other export const USE_SSL = process.env.USE_SSL === 'true'; @@ -127,6 +137,10 @@ export default () => ({ HEALTHCHECK_SERVER_API_KEY, HEALTHCHECK_PING_INTERVAL, ACK_TIMEOUT_MS, + IDLE_EVENT_TIMEOUT_MS, + STUCK_PROCESSING_TIMEOUT_MS, + RECONNECTION_STORM_THRESHOLD, + RECONNECTION_STORM_WINDOW_MS, REORG_SIZE_INFO, REORG_SIZE_MINOR, REORG_SIZE_MAJOR, diff --git a/packages/daemon/src/delays/index.ts b/packages/daemon/src/delays/index.ts index 45ab1ff0..faf70d9b 100644 --- a/packages/daemon/src/delays/index.ts +++ b/packages/daemon/src/delays/index.ts @@ -24,3 +24,9 @@ export const ACK_TIMEOUT = () => { const { ACK_TIMEOUT_MS } = getConfig(); return ACK_TIMEOUT_MS; }; + +// Timeout before auto-restarting when stuck in a single processing state +export const STUCK_PROCESSING_TIMEOUT = () => { + const { STUCK_PROCESSING_TIMEOUT_MS } = getConfig(); + return STUCK_PROCESSING_TIMEOUT_MS; +}; diff --git a/packages/daemon/src/machines/SyncMachine.ts b/packages/daemon/src/machines/SyncMachine.ts index 5caa3632..a3f6486d 100644 --- a/packages/daemon/src/machines/SyncMachine.ts +++ b/packages/daemon/src/machines/SyncMachine.ts @@ -11,7 +11,7 @@ import { spawn, } from 'xstate'; import { LRU } from '../utils'; -import { WebSocketActor, HealthCheckActor } from '../actors'; +import { WebSocketActor, HealthCheckActor, MonitoringActor } from '../actors'; import { Context, Event, @@ -59,8 +59,13 @@ import { updateCache, startHealthcheckPing, stopHealthcheckPing, + sendMonitoringConnected, + sendMonitoringDisconnected, + sendMonitoringEventReceived, + sendMonitoringReconnecting, + alertStuckProcessing, } from '../actions'; -import { BACKOFF_DELAYED_RECONNECT, ACK_TIMEOUT } from '../delays'; +import { BACKOFF_DELAYED_RECONNECT, ACK_TIMEOUT, STUCK_PROCESSING_TIMEOUT } from '../delays'; import getConfig from '../config'; export const SYNC_MACHINE_STATES = { @@ -94,6 +99,7 @@ export const SyncMachine = Machine({ context: { socket: null, healthcheck: null, + monitoring: null, retryAttempt: 0, event: null, initialEventId: null, @@ -104,6 +110,7 @@ export const SyncMachine = Machine({ entry: assign({ txCache: () => new LRU(TX_CACHE_SIZE), healthcheck: () => spawn(HealthCheckActor), + monitoring: () => spawn(MonitoringActor), }), invoke: { src: 'fetchInitialState', @@ -130,7 +137,13 @@ export const SyncMachine = Machine({ }, }, [SYNC_MACHINE_STATES.RECONNECTING]: { - onEntry: ['clearSocket', 'increaseRetry', 'stopHealthcheckPing'], + onEntry: [ + 'clearSocket', + 'increaseRetry', + 'stopHealthcheckPing', + 'sendMonitoringReconnecting', + 'sendMonitoringDisconnected', + ], after: { BACKOFF_DELAYED_RECONNECT: SYNC_MACHINE_STATES.CONNECTING, }, @@ -138,7 +151,7 @@ export const SyncMachine = Machine({ [SYNC_MACHINE_STATES.CONNECTED]: { id: SYNC_MACHINE_STATES.CONNECTED, initial: CONNECTED_STATES.idle, - entry: ['startStream', 'startHealthcheckPing'], + entry: ['startStream', 'startHealthcheckPing', 'sendMonitoringConnected'], states: { [CONNECTED_STATES.idle]: { id: CONNECTED_STATES.idle, @@ -158,44 +171,50 @@ export const SyncMachine = Machine({ cond: 'invalidNetwork', target: `#${SYNC_MACHINE_STATES.ERROR}`, }, { - actions: ['storeEvent', 'sendAck'], + actions: ['storeEvent', 'sendAck', 'sendMonitoringEventReceived'], cond: 'unchanged', target: CONNECTED_STATES.idle, }, { - actions: ['storeEvent'], + actions: ['storeEvent', 'sendMonitoringEventReceived'], cond: 'metadataChanged', target: CONNECTED_STATES.handlingMetadataChanged, }, { - actions: ['storeEvent', 'sendAck'], + actions: ['storeEvent', 'sendAck', 'sendMonitoringEventReceived'], /* If the transaction is already voided and is not * VERTEX_METADATA_CHANGED, we should ignore it. */ cond: 'voided', target: CONNECTED_STATES.idle, }, { - actions: ['storeEvent'], + actions: ['storeEvent', 'sendMonitoringEventReceived'], cond: 'vertexRemoved', target: CONNECTED_STATES.handlingVertexRemoved, }, { - actions: ['storeEvent'], + actions: ['storeEvent', 'sendMonitoringEventReceived'], cond: 'vertexAccepted', target: CONNECTED_STATES.handlingVertexAccepted, }, { - actions: ['storeEvent'], + actions: ['storeEvent', 'sendMonitoringEventReceived'], cond: 'reorgStarted', target: CONNECTED_STATES.handlingReorgStarted, }, { - actions: ['storeEvent'], + actions: ['storeEvent', 'sendMonitoringEventReceived'], cond: 'tokenCreated', target: CONNECTED_STATES.handlingTokenCreated, }, { - actions: ['storeEvent'], + actions: ['storeEvent', 'sendMonitoringEventReceived'], target: CONNECTED_STATES.handlingUnhandledEvent, }], }, }, [CONNECTED_STATES.handlingUnhandledEvent]: { id: CONNECTED_STATES.handlingUnhandledEvent, + after: { + STUCK_PROCESSING_TIMEOUT: { + target: `#SyncMachine.${SYNC_MACHINE_STATES.RECONNECTING}`, + actions: ['alertStuckProcessing'], + }, + }, invoke: { src: 'updateLastSyncedEvent', onDone: { @@ -208,6 +227,12 @@ export const SyncMachine = Machine({ [CONNECTED_STATES.handlingMetadataChanged]: { id: 'handlingMetadataChanged', initial: 'detectingDiff', + after: { + STUCK_PROCESSING_TIMEOUT: { + target: `#SyncMachine.${SYNC_MACHINE_STATES.RECONNECTING}`, + actions: ['alertStuckProcessing'], + }, + }, states: { detectingDiff: { invoke: { @@ -236,6 +261,12 @@ export const SyncMachine = Machine({ // We have the unchanged guard, so it's guaranteed that this is a new tx [CONNECTED_STATES.handlingVertexAccepted]: { id: CONNECTED_STATES.handlingVertexAccepted, + after: { + STUCK_PROCESSING_TIMEOUT: { + target: `#SyncMachine.${SYNC_MACHINE_STATES.RECONNECTING}`, + actions: ['alertStuckProcessing'], + }, + }, invoke: { src: 'handleVertexAccepted', data: (_context: Context, event: Event) => event, @@ -248,6 +279,12 @@ export const SyncMachine = Machine({ }, [CONNECTED_STATES.handlingVertexRemoved]: { id: CONNECTED_STATES.handlingVertexRemoved, + after: { + STUCK_PROCESSING_TIMEOUT: { + target: `#SyncMachine.${SYNC_MACHINE_STATES.RECONNECTING}`, + actions: ['alertStuckProcessing'], + }, + }, invoke: { src: 'handleVertexRemoved', data: (_context: Context, event: Event) => event, @@ -260,6 +297,12 @@ export const SyncMachine = Machine({ }, [CONNECTED_STATES.handlingVoidedTx]: { id: CONNECTED_STATES.handlingVoidedTx, + after: { + STUCK_PROCESSING_TIMEOUT: { + target: `#SyncMachine.${SYNC_MACHINE_STATES.RECONNECTING}`, + actions: ['alertStuckProcessing'], + }, + }, invoke: { src: 'handleVoidedTx', data: (_context: Context, event: Event) => event, @@ -272,6 +315,12 @@ export const SyncMachine = Machine({ }, [CONNECTED_STATES.handlingUnvoidedTx]: { id: CONNECTED_STATES.handlingUnvoidedTx, + after: { + STUCK_PROCESSING_TIMEOUT: { + target: `#SyncMachine.${SYNC_MACHINE_STATES.RECONNECTING}`, + actions: ['alertStuckProcessing'], + }, + }, invoke: { src: 'handleUnvoidedTx', data: (_context: Context, event: Event) => event, @@ -287,6 +336,12 @@ export const SyncMachine = Machine({ }, [CONNECTED_STATES.handlingFirstBlock]: { id: CONNECTED_STATES.handlingFirstBlock, + after: { + STUCK_PROCESSING_TIMEOUT: { + target: `#SyncMachine.${SYNC_MACHINE_STATES.RECONNECTING}`, + actions: ['alertStuckProcessing'], + }, + }, invoke: { src: 'handleTxFirstBlock', data: (_context: Context, event: Event) => event, @@ -299,6 +354,12 @@ export const SyncMachine = Machine({ }, [CONNECTED_STATES.handlingNcExecVoided]: { id: CONNECTED_STATES.handlingNcExecVoided, + after: { + STUCK_PROCESSING_TIMEOUT: { + target: `#SyncMachine.${SYNC_MACHINE_STATES.RECONNECTING}`, + actions: ['alertStuckProcessing'], + }, + }, invoke: { src: 'handleNcExecVoided', data: (_context: Context, event: Event) => event, @@ -311,6 +372,12 @@ export const SyncMachine = Machine({ }, [CONNECTED_STATES.handlingReorgStarted]: { id: CONNECTED_STATES.handlingReorgStarted, + after: { + STUCK_PROCESSING_TIMEOUT: { + target: `#SyncMachine.${SYNC_MACHINE_STATES.RECONNECTING}`, + actions: ['alertStuckProcessing'], + }, + }, invoke: { src: 'handleReorgStarted', data: (_context: Context, event: Event) => event, @@ -323,6 +390,12 @@ export const SyncMachine = Machine({ }, [CONNECTED_STATES.handlingTokenCreated]: { id: CONNECTED_STATES.handlingTokenCreated, + after: { + STUCK_PROCESSING_TIMEOUT: { + target: `#SyncMachine.${SYNC_MACHINE_STATES.RECONNECTING}`, + actions: ['alertStuckProcessing'], + }, + }, invoke: { src: 'handleTokenCreated', data: (_context: Context, event: Event) => event, @@ -335,6 +408,12 @@ export const SyncMachine = Machine({ }, [CONNECTED_STATES.checkingForMissedEvents]: { id: CONNECTED_STATES.checkingForMissedEvents, + after: { + STUCK_PROCESSING_TIMEOUT: { + target: `#SyncMachine.${SYNC_MACHINE_STATES.RECONNECTING}`, + actions: ['alertStuckProcessing'], + }, + }, invoke: { src: 'checkForMissedEvents', onDone: [{ @@ -360,7 +439,7 @@ export const SyncMachine = Machine({ [SYNC_MACHINE_STATES.ERROR]: { id: SYNC_MACHINE_STATES.ERROR, type: 'final', - onEntry: ['logEventError', 'stopHealthcheckPing'], + onEntry: ['logEventError', 'stopHealthcheckPing', 'sendMonitoringDisconnected'], }, }, }, { @@ -393,7 +472,7 @@ export const SyncMachine = Machine({ tokenCreated, hasNewEvents, }, - delays: { BACKOFF_DELAYED_RECONNECT, ACK_TIMEOUT }, + delays: { BACKOFF_DELAYED_RECONNECT, ACK_TIMEOUT, STUCK_PROCESSING_TIMEOUT }, actions: { storeInitialState, storeMetadataChanges, @@ -407,6 +486,11 @@ export const SyncMachine = Machine({ updateCache, startHealthcheckPing, stopHealthcheckPing, + sendMonitoringConnected, + sendMonitoringDisconnected, + sendMonitoringEventReceived, + sendMonitoringReconnecting, + alertStuckProcessing, }, }); diff --git a/packages/daemon/src/types/event.ts b/packages/daemon/src/types/event.ts index eebf0f96..a431ed98 100644 --- a/packages/daemon/src/types/event.ts +++ b/packages/daemon/src/types/event.ts @@ -28,11 +28,18 @@ export type HealthCheckEvent = | { type: 'START' } | { type: 'STOP' }; +export type MonitoringEvent = + | { type: 'CONNECTED' } + | { type: 'DISCONNECTED' } + | { type: 'EVENT_RECEIVED' } + | { type: 'RECONNECTING' }; + export enum EventTypes { WEBSOCKET_EVENT = 'WEBSOCKET_EVENT', FULLNODE_EVENT = 'FULLNODE_EVENT', WEBSOCKET_SEND_EVENT = 'WEBSOCKET_SEND_EVENT', HEALTHCHECK_EVENT = 'HEALTHCHECK_EVENT', + MONITORING_EVENT = 'MONITORING_EVENT', } export enum FullNodeEventTypes { @@ -72,7 +79,8 @@ export type Event = | { type: EventTypes.WEBSOCKET_EVENT, event: WebSocketEvent } | { type: EventTypes.FULLNODE_EVENT, event: FullNodeEvent } | { type: EventTypes.WEBSOCKET_SEND_EVENT, event: WebSocketSendEvent } - | { type: EventTypes.HEALTHCHECK_EVENT, event: HealthCheckEvent }; + | { type: EventTypes.HEALTHCHECK_EVENT, event: HealthCheckEvent } + | { type: EventTypes.MONITORING_EVENT, event: MonitoringEvent }; export interface VertexRemovedEventData { diff --git a/packages/daemon/src/types/machine.ts b/packages/daemon/src/types/machine.ts index 04871e9a..1c9ebff0 100644 --- a/packages/daemon/src/types/machine.ts +++ b/packages/daemon/src/types/machine.ts @@ -12,6 +12,7 @@ import { FullNodeEvent } from './event'; export interface Context { socket: ActorRef | null; healthcheck: ActorRef | null; + monitoring: ActorRef | null; retryAttempt: number; event?: FullNodeEvent | null; initialEventId: null | number; From 1cd9e561dceba72869dbbea31fdf498d14ba80ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Abadesso?= Date: Tue, 24 Feb 2026 12:04:13 -0300 Subject: [PATCH 02/11] refactor(daemon): move stuck-processing timeout logic into MonitoringActor --- .../__tests__/actors/MonitoringActor.test.ts | 142 ++++++++++++------ packages/daemon/src/actions/index.ts | 37 +++-- packages/daemon/src/actors/MonitoringActor.ts | 83 +++++++--- packages/daemon/src/delays/index.ts | 5 - packages/daemon/src/machines/SyncMachine.ts | 103 ++++--------- packages/daemon/src/types/event.ts | 10 +- 6 files changed, 222 insertions(+), 158 deletions(-) diff --git a/packages/daemon/__tests__/actors/MonitoringActor.test.ts b/packages/daemon/__tests__/actors/MonitoringActor.test.ts index 9e410392..adcee8b4 100644 --- a/packages/daemon/__tests__/actors/MonitoringActor.test.ts +++ b/packages/daemon/__tests__/actors/MonitoringActor.test.ts @@ -14,6 +14,8 @@ import { addAlert } from '@wallet-service/common'; jest.useFakeTimers(); jest.spyOn(global, 'setInterval'); jest.spyOn(global, 'clearInterval'); +jest.spyOn(global, 'setTimeout'); +jest.spyOn(global, 'clearTimeout'); jest.mock('@wallet-service/common', () => ({ ...jest.requireActual('@wallet-service/common'), @@ -39,8 +41,9 @@ describe('MonitoringActor', () => { jest.clearAllMocks(); jest.clearAllTimers(); config = getConfig(); - config['IDLE_EVENT_TIMEOUT_MS'] = 5 * 60 * 1000; // 5 min - config['RECONNECTION_STORM_THRESHOLD'] = 3; // low threshold for tests + config['IDLE_EVENT_TIMEOUT_MS'] = 5 * 60 * 1000; // 5 min + config['STUCK_PROCESSING_TIMEOUT_MS'] = 5 * 60 * 1000; // 5 min + config['RECONNECTION_STORM_THRESHOLD'] = 3; // low threshold for tests config['RECONNECTION_STORM_WINDOW_MS'] = 5 * 60 * 1000; // 5 min mockCallback = jest.fn(); @@ -54,6 +57,8 @@ describe('MonitoringActor', () => { jest.useRealTimers(); }); + // ── Idle detection ─────────────────────────────────────────────────────────── + it('should not start the idle timer on initialization', () => { MonitoringActor(mockCallback, mockReceive, config); expect(setInterval).not.toHaveBeenCalled(); @@ -68,8 +73,6 @@ describe('MonitoringActor', () => { it('should stop the idle timer when receiving a DISCONNECTED event', () => { MonitoringActor(mockCallback, mockReceive, config); sendEvent('CONNECTED'); - expect(setInterval).toHaveBeenCalledTimes(1); - sendEvent('DISCONNECTED'); expect(clearInterval).toHaveBeenCalledTimes(1); }); @@ -77,8 +80,6 @@ describe('MonitoringActor', () => { it('should stop the idle timer when the actor is stopped', () => { const stopActor = MonitoringActor(mockCallback, mockReceive, config); sendEvent('CONNECTED'); - expect(setInterval).toHaveBeenCalledTimes(1); - stopActor(); expect(clearInterval).toHaveBeenCalledTimes(1); }); @@ -87,75 +88,143 @@ describe('MonitoringActor', () => { MonitoringActor(mockCallback, mockReceive, config); sendEvent('CONNECTED'); - // Advance time past the idle timeout jest.advanceTimersByTime(config['IDLE_EVENT_TIMEOUT_MS'] + 1); - - // Allow the async addAlert promise to resolve await Promise.resolve(); expect(mockAddAlert).toHaveBeenCalledTimes(1); expect(mockAddAlert.mock.calls[0][0]).toBe('Daemon Idle — No Events Received'); }); - it('should NOT fire an idle alert when events are being received', async () => { + it('should NOT fire an idle alert when events keep arriving', async () => { MonitoringActor(mockCallback, mockReceive, config); sendEvent('CONNECTED'); - // Advance to just before the timeout + // Stay below the threshold each time jest.advanceTimersByTime(config['IDLE_EVENT_TIMEOUT_MS'] - 1000); sendEvent('EVENT_RECEIVED'); - - // Advance past the original threshold (but lastEventReceivedAt was reset) jest.advanceTimersByTime(config['IDLE_EVENT_TIMEOUT_MS'] - 1000); await Promise.resolve(); expect(mockAddAlert).not.toHaveBeenCalled(); }); - it('should fire only one idle alert even if the timer fires multiple times', async () => { + it('should fire only one idle alert per idle period', async () => { MonitoringActor(mockCallback, mockReceive, config); sendEvent('CONNECTED'); - // Fire timer three times without receiving any events jest.advanceTimersByTime(config['IDLE_EVENT_TIMEOUT_MS'] * 3); - await Promise.resolve(); expect(mockAddAlert).toHaveBeenCalledTimes(1); }); - it('should reset the idle alert flag when an event is received after an alert fired', async () => { + it('should reset the idle alert flag when an event is received, allowing a second alert', async () => { MonitoringActor(mockCallback, mockReceive, config); sendEvent('CONNECTED'); - // Trigger first alert (interval fires at T = IDLE_EVENT_TIMEOUT_MS) + // Trigger first alert jest.advanceTimersByTime(config['IDLE_EVENT_TIMEOUT_MS'] + 1); await Promise.resolve(); expect(mockAddAlert).toHaveBeenCalledTimes(1); - // Receive an event — resets idleAlertFired and lastEventReceivedAt to current time T1 + // Receive an event — resets idleAlertFired and lastEventReceivedAt sendEvent('EVENT_RECEIVED'); - // The next interval tick where idleMs >= threshold is at 3*T (the interval at 2*T - // fires only T-1 ms after EVENT_RECEIVED, which is below the threshold). - // Advancing by 2*T from T1 guarantees we cross that boundary. + // Advance far enough for the interval to fire when idleMs >= threshold again. + // The interval fires at 2T, 3T, … from start. After EVENT_RECEIVED at ~T, + // the next fire where idleMs >= T is at 3T (fire at 2T gives idleMs = T-1). jest.advanceTimersByTime(2 * config['IDLE_EVENT_TIMEOUT_MS']); await Promise.resolve(); - // A second alert should now be fired expect(mockAddAlert).toHaveBeenCalledTimes(2); }); - it('should fire a reconnection storm alert when threshold is reached', async () => { + it('should restart the idle timer when CONNECTED is sent while already running', () => { + MonitoringActor(mockCallback, mockReceive, config); + sendEvent('CONNECTED'); + sendEvent('CONNECTED'); // second connect clears old and starts new + expect(clearInterval).toHaveBeenCalledTimes(1); + expect(setInterval).toHaveBeenCalledTimes(2); + }); + + // ── Stuck-processing detection ─────────────────────────────────────────────── + + it('should start a stuck timer on PROCESSING_STARTED', () => { + MonitoringActor(mockCallback, mockReceive, config); + sendEvent('PROCESSING_STARTED'); + expect(setTimeout).toHaveBeenCalledTimes(1); + }); + + it('should cancel the stuck timer on PROCESSING_COMPLETED', () => { + MonitoringActor(mockCallback, mockReceive, config); + sendEvent('PROCESSING_STARTED'); + sendEvent('PROCESSING_COMPLETED'); + expect(clearTimeout).toHaveBeenCalledTimes(1); + }); + + it('should fire a CRITICAL alert and call back MONITORING_STUCK_PROCESSING when stuck', async () => { + MonitoringActor(mockCallback, mockReceive, config); + sendEvent('PROCESSING_STARTED'); + + jest.advanceTimersByTime(config['STUCK_PROCESSING_TIMEOUT_MS'] + 1); + // Let the async addAlert inside the timeout resolve + await Promise.resolve(); + await Promise.resolve(); + + expect(mockAddAlert).toHaveBeenCalledTimes(1); + expect(mockAddAlert.mock.calls[0][0]).toBe('Daemon Stuck In Processing State'); + expect(mockCallback).toHaveBeenCalledWith({ type: EventTypes.MONITORING_STUCK_PROCESSING }); + }); + + it('should NOT fire the stuck alert when PROCESSING_COMPLETED arrives in time', async () => { + MonitoringActor(mockCallback, mockReceive, config); + sendEvent('PROCESSING_STARTED'); + + jest.advanceTimersByTime(config['STUCK_PROCESSING_TIMEOUT_MS'] - 1000); + sendEvent('PROCESSING_COMPLETED'); + + jest.advanceTimersByTime(2000); // advance past original timeout + await Promise.resolve(); + + expect(mockAddAlert).not.toHaveBeenCalled(); + expect(mockCallback).not.toHaveBeenCalled(); + }); + + it('should reset the stuck timer on consecutive PROCESSING_STARTED events', () => { + MonitoringActor(mockCallback, mockReceive, config); + sendEvent('PROCESSING_STARTED'); + sendEvent('PROCESSING_STARTED'); // second one clears the first + expect(clearTimeout).toHaveBeenCalledTimes(1); + expect(setTimeout).toHaveBeenCalledTimes(2); + }); + + it('should stop the stuck timer when the actor is stopped', () => { + const stopActor = MonitoringActor(mockCallback, mockReceive, config); + sendEvent('PROCESSING_STARTED'); + stopActor(); + expect(clearTimeout).toHaveBeenCalledTimes(1); + }); + + it('should also clear the stuck timer on DISCONNECTED', () => { + MonitoringActor(mockCallback, mockReceive, config); + sendEvent('CONNECTED'); + sendEvent('PROCESSING_STARTED'); + sendEvent('DISCONNECTED'); + // clearTimeout for stuck timer + clearInterval for idle timer + expect(clearTimeout).toHaveBeenCalledTimes(1); + expect(clearInterval).toHaveBeenCalledTimes(1); + }); + + // ── Reconnection storm detection ───────────────────────────────────────────── + + it('should fire a reconnection storm alert when the threshold is reached', async () => { MonitoringActor(mockCallback, mockReceive, config); - // Send enough reconnections to trigger the storm threshold (3 in our test config) - sendEvent('RECONNECTING'); sendEvent('RECONNECTING'); sendEvent('RECONNECTING'); + sendEvent('RECONNECTING'); // threshold is 3 in test config await Promise.resolve(); - expect(mockAddAlert).toHaveBeenCalledTimes(1); expect(mockAddAlert.mock.calls[0][0]).toBe('Daemon Reconnection Storm'); }); @@ -163,7 +232,6 @@ describe('MonitoringActor', () => { it('should NOT fire a reconnection storm alert below the threshold', async () => { MonitoringActor(mockCallback, mockReceive, config); - // Send fewer reconnections than the threshold sendEvent('RECONNECTING'); sendEvent('RECONNECTING'); @@ -174,33 +242,21 @@ describe('MonitoringActor', () => { it('should evict old reconnections outside the storm window', async () => { MonitoringActor(mockCallback, mockReceive, config); - // Two reconnections at time 0 sendEvent('RECONNECTING'); sendEvent('RECONNECTING'); - // Advance past the storm window so those timestamps are evicted jest.advanceTimersByTime(config['RECONNECTION_STORM_WINDOW_MS'] + 1000); - // One new reconnection — count should restart from 1, no alert + // Only 1 new reconnection — below threshold after eviction sendEvent('RECONNECTING'); await Promise.resolve(); expect(mockAddAlert).not.toHaveBeenCalled(); }); - it('should restart idle timer when CONNECTED is sent while already connected', () => { - MonitoringActor(mockCallback, mockReceive, config); - - sendEvent('CONNECTED'); - expect(setInterval).toHaveBeenCalledTimes(1); - - // A second CONNECTED clears the old timer and creates a new one - sendEvent('CONNECTED'); - expect(clearInterval).toHaveBeenCalledTimes(1); - expect(setInterval).toHaveBeenCalledTimes(2); - }); + // ── Misc ───────────────────────────────────────────────────────────────────── - it('should ignore events of other types', () => { + it('should ignore events of other types and log a warning', () => { const warnSpy = jest.spyOn(logger, 'warn'); MonitoringActor(mockCallback, mockReceive, config); diff --git a/packages/daemon/src/actions/index.ts b/packages/daemon/src/actions/index.ts index c05546ab..3f09999f 100644 --- a/packages/daemon/src/actions/index.ts +++ b/packages/daemon/src/actions/index.ts @@ -12,7 +12,6 @@ import logger from '../logger'; import { hashTxData } from '../utils'; import { createStartStreamMessage, createSendAckMessage } from '../actors'; import { bigIntUtils } from '@hathor/wallet-lib'; -import { addAlert, Severity } from '@wallet-service/common'; /* * This action is used to store the initial event id on the context @@ -243,23 +242,21 @@ export const sendMonitoringReconnecting = sendTo( ); /* - * Fires a CRITICAL alert and logs when the machine has been stuck in a processing - * state for longer than STUCK_PROCESSING_TIMEOUT_MS. The machine will transition to - * RECONNECTING immediately after this action runs. + * Notifies the monitoring actor that a processing state was entered. + * The actor starts a stuck-detection timer; if PROCESSING_COMPLETED doesn't + * arrive within STUCK_PROCESSING_TIMEOUT_MS it fires a CRITICAL alert and sends + * MONITORING_STUCK_PROCESSING back to the machine. */ -export const alertStuckProcessing = (context: Context) => { - const eventId = context.event?.event?.id; - logger.error( - `[monitoring] State machine stuck processing event ${eventId ?? 'unknown'} for too long — forcing reconnection`, - ); - addAlert( - 'Daemon Stuck In Processing State', - `The state machine has been processing event ${eventId ?? 'unknown'} ` + - 'for longer than the configured timeout. Forcing a reconnection.', - Severity.CRITICAL, - { eventId: eventId !== undefined ? String(eventId) : 'unknown' }, - logger, - ).catch((err: Error) => - logger.error(`[monitoring] Failed to send stuck-processing alert: ${err}`), - ); -}; +export const sendMonitoringProcessingStarted = sendTo( + getMonitoringRefFromContext, + { type: EventTypes.MONITORING_EVENT, event: { type: 'PROCESSING_STARTED' } }, +); + +/* + * Notifies the monitoring actor that a processing state was exited normally, + * cancelling the stuck-detection timer. + */ +export const sendMonitoringProcessingCompleted = sendTo( + getMonitoringRefFromContext, + { type: EventTypes.MONITORING_EVENT, event: { type: 'PROCESSING_COMPLETED' } }, +); diff --git a/packages/daemon/src/actors/MonitoringActor.ts b/packages/daemon/src/actors/MonitoringActor.ts index 673a313f..5da0371a 100644 --- a/packages/daemon/src/actors/MonitoringActor.ts +++ b/packages/daemon/src/actors/MonitoringActor.ts @@ -13,32 +13,35 @@ import { Event, EventTypes } from '../types'; /** * MonitoringActor * - * Watches the state machine for anomalies and raises alerts via the alert manager. + * Centralises all runtime health monitoring for the sync state machine. + * The machine sends MONITORING_EVENTs to this actor; when anomalies are detected + * the actor fires alerts and, when necessary, sends MONITORING_STUCK_PROCESSING + * back to the machine to trigger a soft reconnect. * - * Monitors: - * 1. No events received for >IDLE_EVENT_TIMEOUT_MS while WebSocket connected — fires a MAJOR alert - * so operators know the fullnode stream may have stalled without a disconnect. - * 2. Reconnection storm — fires a CRITICAL alert if the daemon reconnects more than - * RECONNECTION_STORM_THRESHOLD times within RECONNECTION_STORM_WINDOW_MS. This catches - * pathological thrash-reconnect cycles that would otherwise be silent. + * Responsibilities: * - * The actor receives MONITORING_EVENTs from the SyncMachine: - * - CONNECTED: WebSocket became connected; starts the idle-event timer. - * - DISCONNECTED: WebSocket disconnected; stops the idle-event timer. - * - EVENT_RECEIVED: A fullnode event arrived (resets the idle timer). - * - RECONNECTING: Machine entered RECONNECTING state (used for storm detection). + * 1. Idle-stream detection — no EVENT_RECEIVED for >IDLE_EVENT_TIMEOUT_MS while + * connected fires a MAJOR alert so operators know the fullnode stream may have + * stalled without triggering a WebSocket disconnect. + * + * 2. Stuck-processing detection — PROCESSING_STARTED begins a one-shot timer; + * PROCESSING_COMPLETED cancels it. If the timer fires the actor fires a + * CRITICAL alert and sends MONITORING_STUCK_PROCESSING to the machine, which + * transitions to RECONNECTING. This replaces the per-state XState `after` + * blocks that previously scattered this logic across every processing state. + * + * 3. Reconnection storm detection — fires a CRITICAL alert when the daemon + * reconnects more than RECONNECTION_STORM_THRESHOLD times within + * RECONNECTION_STORM_WINDOW_MS. */ export default (callback: any, receive: any, config = getConfig()) => { logger.info('Starting monitoring actor'); + // ── Idle detection ────────────────────────────────────────────────────────── let isConnected = false; let lastEventReceivedAt: number | null = null; - // Timer that fires when we have been idle (no EVENT_RECEIVED) for too long let idleCheckTimer: ReturnType | null = null; - // Whether we already fired the current idle alert (avoids alert flood) let idleAlertFired = false; - // Rolling list of reconnection timestamps within the storm window - let reconnectionTimestamps: number[] = []; const startIdleCheck = () => { stopIdleCheck(); @@ -76,11 +79,44 @@ export default (callback: any, receive: any, config = getConfig()) => { } }; + // ── Stuck-processing detection ─────────────────────────────────────────────── + let stuckTimer: ReturnType | null = null; + + const startStuckTimer = () => { + clearStuckTimer(); + stuckTimer = setTimeout(async () => { + logger.error('[monitoring] State machine stuck in processing state — forcing reconnection'); + try { + await addAlert( + 'Daemon Stuck In Processing State', + `The state machine has been processing a single event for more than ` + + `${Math.round(config.STUCK_PROCESSING_TIMEOUT_MS / 60000)} minute(s). ` + + 'Forcing a reconnection.', + Severity.CRITICAL, + { timeoutMs: String(config.STUCK_PROCESSING_TIMEOUT_MS) }, + logger, + ); + } catch (err) { + logger.error(`[monitoring] Failed to send stuck-processing alert: ${err}`); + } + callback({ type: EventTypes.MONITORING_STUCK_PROCESSING }); + }, config.STUCK_PROCESSING_TIMEOUT_MS); + }; + + const clearStuckTimer = () => { + if (stuckTimer) { + clearTimeout(stuckTimer); + stuckTimer = null; + } + }; + + // ── Reconnection storm detection ───────────────────────────────────────────── + let reconnectionTimestamps: number[] = []; + const trackReconnection = () => { const now = Date.now(); reconnectionTimestamps.push(now); - // Evict timestamps outside the rolling window const windowStart = now - config.RECONNECTION_STORM_WINDOW_MS; reconnectionTimestamps = reconnectionTimestamps.filter(t => t >= windowStart); @@ -105,6 +141,7 @@ export default (callback: any, receive: any, config = getConfig()) => { } }; + // ── Event handling ──────────────────────────────────────────────────────────── receive((event: Event) => { if (event.type !== EventTypes.MONITORING_EVENT) { logger.warn('[monitoring] Unexpected event type received by MonitoringActor'); @@ -119,9 +156,10 @@ export default (callback: any, receive: any, config = getConfig()) => { break; case 'DISCONNECTED': - logger.info('[monitoring] WebSocket disconnected — stopping idle-event timer'); + logger.info('[monitoring] WebSocket disconnected — stopping timers'); isConnected = false; stopIdleCheck(); + clearStuckTimer(); break; case 'EVENT_RECEIVED': @@ -129,6 +167,14 @@ export default (callback: any, receive: any, config = getConfig()) => { idleAlertFired = false; break; + case 'PROCESSING_STARTED': + startStuckTimer(); + break; + + case 'PROCESSING_COMPLETED': + clearStuckTimer(); + break; + case 'RECONNECTING': trackReconnection(); break; @@ -138,5 +184,6 @@ export default (callback: any, receive: any, config = getConfig()) => { return () => { logger.info('Stopping monitoring actor'); stopIdleCheck(); + clearStuckTimer(); }; }; diff --git a/packages/daemon/src/delays/index.ts b/packages/daemon/src/delays/index.ts index faf70d9b..0ad82103 100644 --- a/packages/daemon/src/delays/index.ts +++ b/packages/daemon/src/delays/index.ts @@ -25,8 +25,3 @@ export const ACK_TIMEOUT = () => { return ACK_TIMEOUT_MS; }; -// Timeout before auto-restarting when stuck in a single processing state -export const STUCK_PROCESSING_TIMEOUT = () => { - const { STUCK_PROCESSING_TIMEOUT_MS } = getConfig(); - return STUCK_PROCESSING_TIMEOUT_MS; -}; diff --git a/packages/daemon/src/machines/SyncMachine.ts b/packages/daemon/src/machines/SyncMachine.ts index a3f6486d..f21c1cb5 100644 --- a/packages/daemon/src/machines/SyncMachine.ts +++ b/packages/daemon/src/machines/SyncMachine.ts @@ -15,6 +15,7 @@ import { WebSocketActor, HealthCheckActor, MonitoringActor } from '../actors'; import { Context, Event, + EventTypes, } from '../types'; import { handleVertexAccepted, @@ -63,9 +64,10 @@ import { sendMonitoringDisconnected, sendMonitoringEventReceived, sendMonitoringReconnecting, - alertStuckProcessing, + sendMonitoringProcessingStarted, + sendMonitoringProcessingCompleted, } from '../actions'; -import { BACKOFF_DELAYED_RECONNECT, ACK_TIMEOUT, STUCK_PROCESSING_TIMEOUT } from '../delays'; +import { BACKOFF_DELAYED_RECONNECT, ACK_TIMEOUT } from '../delays'; import getConfig from '../config'; export const SYNC_MACHINE_STATES = { @@ -209,12 +211,8 @@ export const SyncMachine = Machine({ }, [CONNECTED_STATES.handlingUnhandledEvent]: { id: CONNECTED_STATES.handlingUnhandledEvent, - after: { - STUCK_PROCESSING_TIMEOUT: { - target: `#SyncMachine.${SYNC_MACHINE_STATES.RECONNECTING}`, - actions: ['alertStuckProcessing'], - }, - }, + entry: ['sendMonitoringProcessingStarted'], + exit: ['sendMonitoringProcessingCompleted'], invoke: { src: 'updateLastSyncedEvent', onDone: { @@ -227,12 +225,8 @@ export const SyncMachine = Machine({ [CONNECTED_STATES.handlingMetadataChanged]: { id: 'handlingMetadataChanged', initial: 'detectingDiff', - after: { - STUCK_PROCESSING_TIMEOUT: { - target: `#SyncMachine.${SYNC_MACHINE_STATES.RECONNECTING}`, - actions: ['alertStuckProcessing'], - }, - }, + entry: ['sendMonitoringProcessingStarted'], + exit: ['sendMonitoringProcessingCompleted'], states: { detectingDiff: { invoke: { @@ -261,12 +255,8 @@ export const SyncMachine = Machine({ // We have the unchanged guard, so it's guaranteed that this is a new tx [CONNECTED_STATES.handlingVertexAccepted]: { id: CONNECTED_STATES.handlingVertexAccepted, - after: { - STUCK_PROCESSING_TIMEOUT: { - target: `#SyncMachine.${SYNC_MACHINE_STATES.RECONNECTING}`, - actions: ['alertStuckProcessing'], - }, - }, + entry: ['sendMonitoringProcessingStarted'], + exit: ['sendMonitoringProcessingCompleted'], invoke: { src: 'handleVertexAccepted', data: (_context: Context, event: Event) => event, @@ -279,12 +269,8 @@ export const SyncMachine = Machine({ }, [CONNECTED_STATES.handlingVertexRemoved]: { id: CONNECTED_STATES.handlingVertexRemoved, - after: { - STUCK_PROCESSING_TIMEOUT: { - target: `#SyncMachine.${SYNC_MACHINE_STATES.RECONNECTING}`, - actions: ['alertStuckProcessing'], - }, - }, + entry: ['sendMonitoringProcessingStarted'], + exit: ['sendMonitoringProcessingCompleted'], invoke: { src: 'handleVertexRemoved', data: (_context: Context, event: Event) => event, @@ -297,12 +283,8 @@ export const SyncMachine = Machine({ }, [CONNECTED_STATES.handlingVoidedTx]: { id: CONNECTED_STATES.handlingVoidedTx, - after: { - STUCK_PROCESSING_TIMEOUT: { - target: `#SyncMachine.${SYNC_MACHINE_STATES.RECONNECTING}`, - actions: ['alertStuckProcessing'], - }, - }, + entry: ['sendMonitoringProcessingStarted'], + exit: ['sendMonitoringProcessingCompleted'], invoke: { src: 'handleVoidedTx', data: (_context: Context, event: Event) => event, @@ -315,12 +297,8 @@ export const SyncMachine = Machine({ }, [CONNECTED_STATES.handlingUnvoidedTx]: { id: CONNECTED_STATES.handlingUnvoidedTx, - after: { - STUCK_PROCESSING_TIMEOUT: { - target: `#SyncMachine.${SYNC_MACHINE_STATES.RECONNECTING}`, - actions: ['alertStuckProcessing'], - }, - }, + entry: ['sendMonitoringProcessingStarted'], + exit: ['sendMonitoringProcessingCompleted'], invoke: { src: 'handleUnvoidedTx', data: (_context: Context, event: Event) => event, @@ -336,12 +314,8 @@ export const SyncMachine = Machine({ }, [CONNECTED_STATES.handlingFirstBlock]: { id: CONNECTED_STATES.handlingFirstBlock, - after: { - STUCK_PROCESSING_TIMEOUT: { - target: `#SyncMachine.${SYNC_MACHINE_STATES.RECONNECTING}`, - actions: ['alertStuckProcessing'], - }, - }, + entry: ['sendMonitoringProcessingStarted'], + exit: ['sendMonitoringProcessingCompleted'], invoke: { src: 'handleTxFirstBlock', data: (_context: Context, event: Event) => event, @@ -354,12 +328,8 @@ export const SyncMachine = Machine({ }, [CONNECTED_STATES.handlingNcExecVoided]: { id: CONNECTED_STATES.handlingNcExecVoided, - after: { - STUCK_PROCESSING_TIMEOUT: { - target: `#SyncMachine.${SYNC_MACHINE_STATES.RECONNECTING}`, - actions: ['alertStuckProcessing'], - }, - }, + entry: ['sendMonitoringProcessingStarted'], + exit: ['sendMonitoringProcessingCompleted'], invoke: { src: 'handleNcExecVoided', data: (_context: Context, event: Event) => event, @@ -372,12 +342,8 @@ export const SyncMachine = Machine({ }, [CONNECTED_STATES.handlingReorgStarted]: { id: CONNECTED_STATES.handlingReorgStarted, - after: { - STUCK_PROCESSING_TIMEOUT: { - target: `#SyncMachine.${SYNC_MACHINE_STATES.RECONNECTING}`, - actions: ['alertStuckProcessing'], - }, - }, + entry: ['sendMonitoringProcessingStarted'], + exit: ['sendMonitoringProcessingCompleted'], invoke: { src: 'handleReorgStarted', data: (_context: Context, event: Event) => event, @@ -390,12 +356,8 @@ export const SyncMachine = Machine({ }, [CONNECTED_STATES.handlingTokenCreated]: { id: CONNECTED_STATES.handlingTokenCreated, - after: { - STUCK_PROCESSING_TIMEOUT: { - target: `#SyncMachine.${SYNC_MACHINE_STATES.RECONNECTING}`, - actions: ['alertStuckProcessing'], - }, - }, + entry: ['sendMonitoringProcessingStarted'], + exit: ['sendMonitoringProcessingCompleted'], invoke: { src: 'handleTokenCreated', data: (_context: Context, event: Event) => event, @@ -408,12 +370,8 @@ export const SyncMachine = Machine({ }, [CONNECTED_STATES.checkingForMissedEvents]: { id: CONNECTED_STATES.checkingForMissedEvents, - after: { - STUCK_PROCESSING_TIMEOUT: { - target: `#SyncMachine.${SYNC_MACHINE_STATES.RECONNECTING}`, - actions: ['alertStuckProcessing'], - }, - }, + entry: ['sendMonitoringProcessingStarted'], + exit: ['sendMonitoringProcessingCompleted'], invoke: { src: 'checkForMissedEvents', onDone: [{ @@ -434,6 +392,10 @@ export const SyncMachine = Machine({ cond: 'websocketDisconnected', target: SYNC_MACHINE_STATES.RECONNECTING, }], + // Sent by MonitoringActor when a processing state has been active for too long + [EventTypes.MONITORING_STUCK_PROCESSING]: { + target: `#SyncMachine.${SYNC_MACHINE_STATES.RECONNECTING}`, + }, }, }, [SYNC_MACHINE_STATES.ERROR]: { @@ -472,7 +434,7 @@ export const SyncMachine = Machine({ tokenCreated, hasNewEvents, }, - delays: { BACKOFF_DELAYED_RECONNECT, ACK_TIMEOUT, STUCK_PROCESSING_TIMEOUT }, + delays: { BACKOFF_DELAYED_RECONNECT, ACK_TIMEOUT }, actions: { storeInitialState, storeMetadataChanges, @@ -490,7 +452,8 @@ export const SyncMachine = Machine({ sendMonitoringDisconnected, sendMonitoringEventReceived, sendMonitoringReconnecting, - alertStuckProcessing, + sendMonitoringProcessingStarted, + sendMonitoringProcessingCompleted, }, }); diff --git a/packages/daemon/src/types/event.ts b/packages/daemon/src/types/event.ts index a431ed98..546a8255 100644 --- a/packages/daemon/src/types/event.ts +++ b/packages/daemon/src/types/event.ts @@ -32,7 +32,9 @@ export type MonitoringEvent = | { type: 'CONNECTED' } | { type: 'DISCONNECTED' } | { type: 'EVENT_RECEIVED' } - | { type: 'RECONNECTING' }; + | { type: 'RECONNECTING' } + | { type: 'PROCESSING_STARTED' } + | { type: 'PROCESSING_COMPLETED' }; export enum EventTypes { WEBSOCKET_EVENT = 'WEBSOCKET_EVENT', @@ -40,6 +42,7 @@ export enum EventTypes { WEBSOCKET_SEND_EVENT = 'WEBSOCKET_SEND_EVENT', HEALTHCHECK_EVENT = 'HEALTHCHECK_EVENT', MONITORING_EVENT = 'MONITORING_EVENT', + MONITORING_STUCK_PROCESSING = 'MONITORING_STUCK_PROCESSING', } export enum FullNodeEventTypes { @@ -80,7 +83,10 @@ export type Event = | { type: EventTypes.FULLNODE_EVENT, event: FullNodeEvent } | { type: EventTypes.WEBSOCKET_SEND_EVENT, event: WebSocketSendEvent } | { type: EventTypes.HEALTHCHECK_EVENT, event: HealthCheckEvent } - | { type: EventTypes.MONITORING_EVENT, event: MonitoringEvent }; + | { type: EventTypes.MONITORING_EVENT, event: MonitoringEvent } + // Machine-internal signal sent by MonitoringActor when a processing state is stuck. + // `event: never` keeps the union consistent (all members have an `event` field). + | { type: EventTypes.MONITORING_STUCK_PROCESSING, event: never }; export interface VertexRemovedEventData { From ee88ca9c7521b57cb029da60a9722213a9df4296 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Abadesso?= Date: Tue, 24 Feb 2026 12:25:59 -0300 Subject: [PATCH 03/11] fix(daemon): mock addAlert in integration tests to prevent real SQS calls --- packages/daemon/jestIntegrationSetup.ts | 16 ++++++++++++++++ packages/daemon/jest_integration.config.js | 1 + 2 files changed, 17 insertions(+) create mode 100644 packages/daemon/jestIntegrationSetup.ts diff --git a/packages/daemon/jestIntegrationSetup.ts b/packages/daemon/jestIntegrationSetup.ts new file mode 100644 index 00000000..187f7a26 --- /dev/null +++ b/packages/daemon/jestIntegrationSetup.ts @@ -0,0 +1,16 @@ +/** + * Copyright (c) Hathor Labs and its affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +/** + * Integration test environment setup. + * Mocks addAlert so MonitoringActor does not attempt real SQS/SNS connections + * in environments where AWS credentials / region are not configured. + */ +jest.mock('@wallet-service/common', () => ({ + ...jest.requireActual('@wallet-service/common'), + addAlert: jest.fn().mockResolvedValue(undefined), +})); diff --git a/packages/daemon/jest_integration.config.js b/packages/daemon/jest_integration.config.js index 86722d5e..2dfc9ff2 100644 --- a/packages/daemon/jest_integration.config.js +++ b/packages/daemon/jest_integration.config.js @@ -7,6 +7,7 @@ const mainTestMatch = process.env.SPECIFIC_INTEGRATION_TEST_FILE module.exports = { roots: ["/__tests__"], setupFiles: ['./jestSetup.ts'], + setupFilesAfterEnv: ['./jestIntegrationSetup.ts'], transform: { "^.+\\.ts$": ["ts-jest", { tsconfig: "./tsconfig.json", From 6a96883e9dd6897291f3c09ad895a9685fb59bcf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Abadesso?= Date: Tue, 24 Feb 2026 12:28:04 -0300 Subject: [PATCH 04/11] fix(daemon): add missing monitoring config keys to integration test mock --- packages/daemon/__tests__/integration/balances.test.ts | 4 ++++ packages/daemon/__tests__/integration/token_creation.test.ts | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/packages/daemon/__tests__/integration/balances.test.ts b/packages/daemon/__tests__/integration/balances.test.ts index 5d0565ac..488ffb8c 100644 --- a/packages/daemon/__tests__/integration/balances.test.ts +++ b/packages/daemon/__tests__/integration/balances.test.ts @@ -96,6 +96,10 @@ getConfig.mockReturnValue({ DB_PASS, DB_PORT, ACK_TIMEOUT_MS: 20000, + IDLE_EVENT_TIMEOUT_MS: 5 * 60 * 1000, + STUCK_PROCESSING_TIMEOUT_MS: 5 * 60 * 1000, + RECONNECTION_STORM_THRESHOLD: 10, + RECONNECTION_STORM_WINDOW_MS: 5 * 60 * 1000, }); let mysql: Connection; diff --git a/packages/daemon/__tests__/integration/token_creation.test.ts b/packages/daemon/__tests__/integration/token_creation.test.ts index afcf0f3a..30f374eb 100644 --- a/packages/daemon/__tests__/integration/token_creation.test.ts +++ b/packages/daemon/__tests__/integration/token_creation.test.ts @@ -57,6 +57,10 @@ getConfig.mockReturnValue({ DB_PASS, DB_PORT, ACK_TIMEOUT_MS: 20000, + IDLE_EVENT_TIMEOUT_MS: 5 * 60 * 1000, + STUCK_PROCESSING_TIMEOUT_MS: 5 * 60 * 1000, + RECONNECTION_STORM_THRESHOLD: 10, + RECONNECTION_STORM_WINDOW_MS: 5 * 60 * 1000, }); let mysql: Connection; From 0cb4b6184b9d4d9d4a85a508f721b3385c4b48f6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Abadesso?= Date: Tue, 24 Feb 2026 12:35:57 -0300 Subject: [PATCH 05/11] =?UTF-8?q?fix(daemon):=20adjust=20alert=20severitie?= =?UTF-8?q?s=20=E2=80=94=20idle=3DMINOR,=20stuck/storm=3DMAJOR?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- packages/daemon/src/actors/MonitoringActor.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/daemon/src/actors/MonitoringActor.ts b/packages/daemon/src/actors/MonitoringActor.ts index 5da0371a..d1e5e995 100644 --- a/packages/daemon/src/actors/MonitoringActor.ts +++ b/packages/daemon/src/actors/MonitoringActor.ts @@ -62,7 +62,7 @@ export default (callback: any, receive: any, config = getConfig()) => { 'Daemon Idle — No Events Received', `No fullnode events received for ${idleMinutes} minute(s) while the WebSocket is connected. ` + 'The fullnode stream may be stalled.', - Severity.MAJOR, + Severity.MINOR, { idleMs: String(idleMs) }, logger, ).catch((err: Error) => @@ -92,7 +92,7 @@ export default (callback: any, receive: any, config = getConfig()) => { `The state machine has been processing a single event for more than ` + `${Math.round(config.STUCK_PROCESSING_TIMEOUT_MS / 60000)} minute(s). ` + 'Forcing a reconnection.', - Severity.CRITICAL, + Severity.MAJOR, { timeoutMs: String(config.STUCK_PROCESSING_TIMEOUT_MS) }, logger, ); @@ -129,7 +129,7 @@ export default (callback: any, receive: any, config = getConfig()) => { 'Daemon Reconnection Storm', `${reconnectionTimestamps.length} reconnections occurred in the last ${windowMinutes} minute(s). ` + 'The daemon may be stuck in a reconnection loop.', - Severity.CRITICAL, + Severity.MAJOR, { reconnectionCount: String(reconnectionTimestamps.length), windowMinutes: String(windowMinutes), From fd50522acf2a173e1393d9c64b0a8a39ca8017dc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Abadesso?= Date: Tue, 24 Feb 2026 12:39:35 -0300 Subject: [PATCH 06/11] feat(daemon): terminate process on idle timeout to allow kubernetes restart --- .../__tests__/actors/MonitoringActor.test.ts | 19 +++++++++++++++---- packages/daemon/src/actors/MonitoringActor.ts | 12 +++++------- 2 files changed, 20 insertions(+), 11 deletions(-) diff --git a/packages/daemon/__tests__/actors/MonitoringActor.test.ts b/packages/daemon/__tests__/actors/MonitoringActor.test.ts index adcee8b4..15b2161a 100644 --- a/packages/daemon/__tests__/actors/MonitoringActor.test.ts +++ b/packages/daemon/__tests__/actors/MonitoringActor.test.ts @@ -29,6 +29,7 @@ describe('MonitoringActor', () => { let mockReceive: jest.Mock; let receiveCallback: (event: any) => void; let config: ReturnType; + let processExitSpy: jest.SpyInstance; const sendEvent = (monitoringEventType: string) => { receiveCallback({ @@ -40,6 +41,7 @@ describe('MonitoringActor', () => { beforeEach(() => { jest.clearAllMocks(); jest.clearAllTimers(); + processExitSpy = jest.spyOn(process, 'exit').mockImplementation(() => undefined as never); config = getConfig(); config['IDLE_EVENT_TIMEOUT_MS'] = 5 * 60 * 1000; // 5 min config['STUCK_PROCESSING_TIMEOUT_MS'] = 5 * 60 * 1000; // 5 min @@ -84,15 +86,17 @@ describe('MonitoringActor', () => { expect(clearInterval).toHaveBeenCalledTimes(1); }); - it('should fire an idle alert after IDLE_EVENT_TIMEOUT_MS with no events', async () => { + it('should fire an idle alert and exit after IDLE_EVENT_TIMEOUT_MS with no events', async () => { MonitoringActor(mockCallback, mockReceive, config); sendEvent('CONNECTED'); jest.advanceTimersByTime(config['IDLE_EVENT_TIMEOUT_MS'] + 1); await Promise.resolve(); + await Promise.resolve(); // flush the .finally() microtask expect(mockAddAlert).toHaveBeenCalledTimes(1); expect(mockAddAlert.mock.calls[0][0]).toBe('Daemon Idle — No Events Received'); + expect(processExitSpy).toHaveBeenCalledWith(1); }); it('should NOT fire an idle alert when events keep arriving', async () => { @@ -108,24 +112,29 @@ describe('MonitoringActor', () => { expect(mockAddAlert).not.toHaveBeenCalled(); }); - it('should fire only one idle alert per idle period', async () => { + it('should fire only one idle alert and exit once per idle period', async () => { MonitoringActor(mockCallback, mockReceive, config); sendEvent('CONNECTED'); jest.advanceTimersByTime(config['IDLE_EVENT_TIMEOUT_MS'] * 3); await Promise.resolve(); + await Promise.resolve(); expect(mockAddAlert).toHaveBeenCalledTimes(1); + expect(processExitSpy).toHaveBeenCalledTimes(1); + expect(processExitSpy).toHaveBeenCalledWith(1); }); - it('should reset the idle alert flag when an event is received, allowing a second alert', async () => { + it('should reset the idle alert flag when an event is received, allowing a second exit', async () => { MonitoringActor(mockCallback, mockReceive, config); sendEvent('CONNECTED'); - // Trigger first alert + // Trigger first alert + exit jest.advanceTimersByTime(config['IDLE_EVENT_TIMEOUT_MS'] + 1); await Promise.resolve(); + await Promise.resolve(); expect(mockAddAlert).toHaveBeenCalledTimes(1); + expect(processExitSpy).toHaveBeenCalledTimes(1); // Receive an event — resets idleAlertFired and lastEventReceivedAt sendEvent('EVENT_RECEIVED'); @@ -135,8 +144,10 @@ describe('MonitoringActor', () => { // the next fire where idleMs >= T is at 3T (fire at 2T gives idleMs = T-1). jest.advanceTimersByTime(2 * config['IDLE_EVENT_TIMEOUT_MS']); await Promise.resolve(); + await Promise.resolve(); expect(mockAddAlert).toHaveBeenCalledTimes(2); + expect(processExitSpy).toHaveBeenCalledTimes(2); }); it('should restart the idle timer when CONNECTED is sent while already running', () => { diff --git a/packages/daemon/src/actors/MonitoringActor.ts b/packages/daemon/src/actors/MonitoringActor.ts index d1e5e995..56b1bc8a 100644 --- a/packages/daemon/src/actors/MonitoringActor.ts +++ b/packages/daemon/src/actors/MonitoringActor.ts @@ -55,19 +55,17 @@ export default (callback: any, receive: any, config = getConfig()) => { if (idleMs >= config.IDLE_EVENT_TIMEOUT_MS && !idleAlertFired) { idleAlertFired = true; const idleMinutes = Math.round(idleMs / 60000); - logger.warn( - `[monitoring] No fullnode events received for ${idleMinutes} minutes while WebSocket is connected`, + logger.error( + `[monitoring] No fullnode events received for ${idleMinutes} minutes while WebSocket is connected — terminating`, ); addAlert( 'Daemon Idle — No Events Received', `No fullnode events received for ${idleMinutes} minute(s) while the WebSocket is connected. ` + - 'The fullnode stream may be stalled.', - Severity.MINOR, + 'Terminating the process so Kubernetes can restart it.', + Severity.MAJOR, { idleMs: String(idleMs) }, logger, - ).catch((err: Error) => - logger.error(`[monitoring] Failed to send idle alert: ${err}`), - ); + ).finally(() => process.exit(1)); } }, config.IDLE_EVENT_TIMEOUT_MS); }; From 78ba9c1cdb3a15ed9c327d7592247dcb52bc1b2a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Abadesso?= Date: Tue, 24 Feb 2026 12:47:56 -0300 Subject: [PATCH 07/11] fix(daemon): apply defaults for monitoring config values to guard against undefined --- packages/daemon/src/actors/MonitoringActor.ts | 26 +++++++++++++------ 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/packages/daemon/src/actors/MonitoringActor.ts b/packages/daemon/src/actors/MonitoringActor.ts index 56b1bc8a..cf1425e0 100644 --- a/packages/daemon/src/actors/MonitoringActor.ts +++ b/packages/daemon/src/actors/MonitoringActor.ts @@ -34,9 +34,19 @@ import { Event, EventTypes } from '../types'; * reconnects more than RECONNECTION_STORM_THRESHOLD times within * RECONNECTION_STORM_WINDOW_MS. */ +const DEFAULT_IDLE_EVENT_TIMEOUT_MS = 5 * 60 * 1000; +const DEFAULT_STUCK_PROCESSING_TIMEOUT_MS = 5 * 60 * 1000; +const DEFAULT_RECONNECTION_STORM_THRESHOLD = 10; +const DEFAULT_RECONNECTION_STORM_WINDOW_MS = 5 * 60 * 1000; + export default (callback: any, receive: any, config = getConfig()) => { logger.info('Starting monitoring actor'); + const idleTimeoutMs = config.IDLE_EVENT_TIMEOUT_MS ?? DEFAULT_IDLE_EVENT_TIMEOUT_MS; + const stuckTimeoutMs = config.STUCK_PROCESSING_TIMEOUT_MS ?? DEFAULT_STUCK_PROCESSING_TIMEOUT_MS; + const stormThreshold = config.RECONNECTION_STORM_THRESHOLD ?? DEFAULT_RECONNECTION_STORM_THRESHOLD; + const stormWindowMs = config.RECONNECTION_STORM_WINDOW_MS ?? DEFAULT_RECONNECTION_STORM_WINDOW_MS; + // ── Idle detection ────────────────────────────────────────────────────────── let isConnected = false; let lastEventReceivedAt: number | null = null; @@ -52,7 +62,7 @@ export default (callback: any, receive: any, config = getConfig()) => { if (!isConnected || lastEventReceivedAt === null) return; const idleMs = Date.now() - lastEventReceivedAt; - if (idleMs >= config.IDLE_EVENT_TIMEOUT_MS && !idleAlertFired) { + if (idleMs >= idleTimeoutMs && !idleAlertFired) { idleAlertFired = true; const idleMinutes = Math.round(idleMs / 60000); logger.error( @@ -67,7 +77,7 @@ export default (callback: any, receive: any, config = getConfig()) => { logger, ).finally(() => process.exit(1)); } - }, config.IDLE_EVENT_TIMEOUT_MS); + }, idleTimeoutMs); }; const stopIdleCheck = () => { @@ -88,17 +98,17 @@ export default (callback: any, receive: any, config = getConfig()) => { await addAlert( 'Daemon Stuck In Processing State', `The state machine has been processing a single event for more than ` + - `${Math.round(config.STUCK_PROCESSING_TIMEOUT_MS / 60000)} minute(s). ` + + `${Math.round(stuckTimeoutMs / 60000)} minute(s). ` + 'Forcing a reconnection.', Severity.MAJOR, - { timeoutMs: String(config.STUCK_PROCESSING_TIMEOUT_MS) }, + { timeoutMs: String(stuckTimeoutMs) }, logger, ); } catch (err) { logger.error(`[monitoring] Failed to send stuck-processing alert: ${err}`); } callback({ type: EventTypes.MONITORING_STUCK_PROCESSING }); - }, config.STUCK_PROCESSING_TIMEOUT_MS); + }, stuckTimeoutMs); }; const clearStuckTimer = () => { @@ -115,11 +125,11 @@ export default (callback: any, receive: any, config = getConfig()) => { const now = Date.now(); reconnectionTimestamps.push(now); - const windowStart = now - config.RECONNECTION_STORM_WINDOW_MS; + const windowStart = now - stormWindowMs; reconnectionTimestamps = reconnectionTimestamps.filter(t => t >= windowStart); - if (reconnectionTimestamps.length >= config.RECONNECTION_STORM_THRESHOLD) { - const windowMinutes = Math.round(config.RECONNECTION_STORM_WINDOW_MS / 60000); + if (reconnectionTimestamps.length >= stormThreshold) { + const windowMinutes = Math.round(stormWindowMs / 60000); logger.error( `[monitoring] Reconnection storm: ${reconnectionTimestamps.length} reconnections in the last ${windowMinutes} minutes`, ); From 289eed91bf48bfb8237db6e223fe33973e825786 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Abadesso?= Date: Tue, 24 Feb 2026 13:05:54 -0300 Subject: [PATCH 08/11] refactor(daemon): stuck-processing only alerts; default 1 hour --- .../__tests__/actors/MonitoringActor.test.ts | 5 ++-- packages/daemon/src/actors/MonitoringActor.ts | 28 ++++++++----------- packages/daemon/src/config.ts | 4 +-- packages/daemon/src/machines/SyncMachine.ts | 5 ---- packages/daemon/src/types/event.ts | 6 +--- 5 files changed, 17 insertions(+), 31 deletions(-) diff --git a/packages/daemon/__tests__/actors/MonitoringActor.test.ts b/packages/daemon/__tests__/actors/MonitoringActor.test.ts index 15b2161a..35b776ad 100644 --- a/packages/daemon/__tests__/actors/MonitoringActor.test.ts +++ b/packages/daemon/__tests__/actors/MonitoringActor.test.ts @@ -173,18 +173,17 @@ describe('MonitoringActor', () => { expect(clearTimeout).toHaveBeenCalledTimes(1); }); - it('should fire a CRITICAL alert and call back MONITORING_STUCK_PROCESSING when stuck', async () => { + it('should fire a MAJOR alert when stuck', async () => { MonitoringActor(mockCallback, mockReceive, config); sendEvent('PROCESSING_STARTED'); jest.advanceTimersByTime(config['STUCK_PROCESSING_TIMEOUT_MS'] + 1); - // Let the async addAlert inside the timeout resolve await Promise.resolve(); await Promise.resolve(); expect(mockAddAlert).toHaveBeenCalledTimes(1); expect(mockAddAlert.mock.calls[0][0]).toBe('Daemon Stuck In Processing State'); - expect(mockCallback).toHaveBeenCalledWith({ type: EventTypes.MONITORING_STUCK_PROCESSING }); + expect(mockCallback).not.toHaveBeenCalled(); }); it('should NOT fire the stuck alert when PROCESSING_COMPLETED arrives in time', async () => { diff --git a/packages/daemon/src/actors/MonitoringActor.ts b/packages/daemon/src/actors/MonitoringActor.ts index cf1425e0..5769ae8b 100644 --- a/packages/daemon/src/actors/MonitoringActor.ts +++ b/packages/daemon/src/actors/MonitoringActor.ts @@ -35,7 +35,7 @@ import { Event, EventTypes } from '../types'; * RECONNECTION_STORM_WINDOW_MS. */ const DEFAULT_IDLE_EVENT_TIMEOUT_MS = 5 * 60 * 1000; -const DEFAULT_STUCK_PROCESSING_TIMEOUT_MS = 5 * 60 * 1000; +const DEFAULT_STUCK_PROCESSING_TIMEOUT_MS = 60 * 60 * 1000; // 1 hour const DEFAULT_RECONNECTION_STORM_THRESHOLD = 10; const DEFAULT_RECONNECTION_STORM_WINDOW_MS = 5 * 60 * 1000; @@ -93,21 +93,17 @@ export default (callback: any, receive: any, config = getConfig()) => { const startStuckTimer = () => { clearStuckTimer(); stuckTimer = setTimeout(async () => { - logger.error('[monitoring] State machine stuck in processing state — forcing reconnection'); - try { - await addAlert( - 'Daemon Stuck In Processing State', - `The state machine has been processing a single event for more than ` + - `${Math.round(stuckTimeoutMs / 60000)} minute(s). ` + - 'Forcing a reconnection.', - Severity.MAJOR, - { timeoutMs: String(stuckTimeoutMs) }, - logger, - ); - } catch (err) { - logger.error(`[monitoring] Failed to send stuck-processing alert: ${err}`); - } - callback({ type: EventTypes.MONITORING_STUCK_PROCESSING }); + logger.error('[monitoring] State machine stuck in processing state'); + addAlert( + 'Daemon Stuck In Processing State', + `The state machine has been processing a single event for more than ` + + `${Math.round(stuckTimeoutMs / 60000)} minute(s).`, + Severity.MAJOR, + { timeoutMs: String(stuckTimeoutMs) }, + logger, + ).catch((err: Error) => + logger.error(`[monitoring] Failed to send stuck-processing alert: ${err}`), + ); }, stuckTimeoutMs); }; diff --git a/packages/daemon/src/config.ts b/packages/daemon/src/config.ts index a8230c9a..8a5d683a 100644 --- a/packages/daemon/src/config.ts +++ b/packages/daemon/src/config.ts @@ -90,8 +90,8 @@ export const ACK_TIMEOUT_MS = parseInt(process.env.ACK_TIMEOUT_MS ?? '20000', 10 // Monitoring configuration // Timeout (ms) before alerting when no fullnode events received while WebSocket connected export const IDLE_EVENT_TIMEOUT_MS = parseInt(process.env.IDLE_EVENT_TIMEOUT_MS ?? String(5 * 60 * 1000), 10); // 5 minutes -// Timeout (ms) before auto-restarting when stuck in a single processing state -export const STUCK_PROCESSING_TIMEOUT_MS = parseInt(process.env.STUCK_PROCESSING_TIMEOUT_MS ?? String(5 * 60 * 1000), 10); // 5 minutes +// Timeout (ms) before alerting when stuck in a single processing state +export const STUCK_PROCESSING_TIMEOUT_MS = parseInt(process.env.STUCK_PROCESSING_TIMEOUT_MS ?? String(60 * 60 * 1000), 10); // 1 hour // Number of reconnections within RECONNECTION_STORM_WINDOW_MS to trigger a storm alert export const RECONNECTION_STORM_THRESHOLD = parseInt(process.env.RECONNECTION_STORM_THRESHOLD ?? '10', 10); // Time window (ms) for reconnection storm detection diff --git a/packages/daemon/src/machines/SyncMachine.ts b/packages/daemon/src/machines/SyncMachine.ts index f21c1cb5..61b87892 100644 --- a/packages/daemon/src/machines/SyncMachine.ts +++ b/packages/daemon/src/machines/SyncMachine.ts @@ -15,7 +15,6 @@ import { WebSocketActor, HealthCheckActor, MonitoringActor } from '../actors'; import { Context, Event, - EventTypes, } from '../types'; import { handleVertexAccepted, @@ -392,10 +391,6 @@ export const SyncMachine = Machine({ cond: 'websocketDisconnected', target: SYNC_MACHINE_STATES.RECONNECTING, }], - // Sent by MonitoringActor when a processing state has been active for too long - [EventTypes.MONITORING_STUCK_PROCESSING]: { - target: `#SyncMachine.${SYNC_MACHINE_STATES.RECONNECTING}`, - }, }, }, [SYNC_MACHINE_STATES.ERROR]: { diff --git a/packages/daemon/src/types/event.ts b/packages/daemon/src/types/event.ts index 546a8255..9981e5dd 100644 --- a/packages/daemon/src/types/event.ts +++ b/packages/daemon/src/types/event.ts @@ -42,7 +42,6 @@ export enum EventTypes { WEBSOCKET_SEND_EVENT = 'WEBSOCKET_SEND_EVENT', HEALTHCHECK_EVENT = 'HEALTHCHECK_EVENT', MONITORING_EVENT = 'MONITORING_EVENT', - MONITORING_STUCK_PROCESSING = 'MONITORING_STUCK_PROCESSING', } export enum FullNodeEventTypes { @@ -83,10 +82,7 @@ export type Event = | { type: EventTypes.FULLNODE_EVENT, event: FullNodeEvent } | { type: EventTypes.WEBSOCKET_SEND_EVENT, event: WebSocketSendEvent } | { type: EventTypes.HEALTHCHECK_EVENT, event: HealthCheckEvent } - | { type: EventTypes.MONITORING_EVENT, event: MonitoringEvent } - // Machine-internal signal sent by MonitoringActor when a processing state is stuck. - // `event: never` keeps the union consistent (all members have an `event` field). - | { type: EventTypes.MONITORING_STUCK_PROCESSING, event: never }; + | { type: EventTypes.MONITORING_EVENT, event: MonitoringEvent }; export interface VertexRemovedEventData { From 53aba8e735625aca597bca54576b78652ece8179 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Abadesso?= Date: Thu, 26 Feb 2026 17:23:33 -0300 Subject: [PATCH 09/11] fix(daemon): address monitoring layer review feedback - Replace process.exit with MONITORING_IDLE_TIMEOUT callback event - Handle MONITORING_IDLE_TIMEOUT in SyncMachine (-> ERROR state) - Add machine.onDone shutdown path in index.ts - Wrap monitoring send-actions in choose guard (no-op if actor absent) - Add reconnection storm alert cooldown (1 min) to prevent spam - Halve idle check interval to reduce worst-case detection lag to 1.5x - Fix stuck-processing doc comment (MAJOR alert, machine keeps running) - Fix err.message interpolation in alert catch handlers --- .../__tests__/actors/MonitoringActor.test.ts | 74 +++++++++++++++---- packages/daemon/src/actions/index.ts | 55 +++++++------- packages/daemon/src/actors/MonitoringActor.ts | 32 +++++--- packages/daemon/src/delays/index.ts | 1 - packages/daemon/src/index.ts | 5 ++ packages/daemon/src/machines/SyncMachine.ts | 3 + packages/daemon/src/types/event.ts | 4 +- 7 files changed, 120 insertions(+), 54 deletions(-) diff --git a/packages/daemon/__tests__/actors/MonitoringActor.test.ts b/packages/daemon/__tests__/actors/MonitoringActor.test.ts index 35b776ad..1850c224 100644 --- a/packages/daemon/__tests__/actors/MonitoringActor.test.ts +++ b/packages/daemon/__tests__/actors/MonitoringActor.test.ts @@ -11,6 +11,8 @@ import { EventTypes } from '../../src/types/event'; import getConfig from '../../src/config'; import { addAlert } from '@wallet-service/common'; +const MONITORING_IDLE_TIMEOUT_EVENT = { type: EventTypes.MONITORING_IDLE_TIMEOUT }; + jest.useFakeTimers(); jest.spyOn(global, 'setInterval'); jest.spyOn(global, 'clearInterval'); @@ -86,7 +88,7 @@ describe('MonitoringActor', () => { expect(clearInterval).toHaveBeenCalledTimes(1); }); - it('should fire an idle alert and exit after IDLE_EVENT_TIMEOUT_MS with no events', async () => { + it('should fire an idle alert and send MONITORING_IDLE_TIMEOUT after IDLE_EVENT_TIMEOUT_MS with no events', async () => { MonitoringActor(mockCallback, mockReceive, config); sendEvent('CONNECTED'); @@ -96,7 +98,8 @@ describe('MonitoringActor', () => { expect(mockAddAlert).toHaveBeenCalledTimes(1); expect(mockAddAlert.mock.calls[0][0]).toBe('Daemon Idle — No Events Received'); - expect(processExitSpy).toHaveBeenCalledWith(1); + expect(mockCallback).toHaveBeenCalledWith(MONITORING_IDLE_TIMEOUT_EVENT); + expect(processExitSpy).not.toHaveBeenCalled(); }); it('should NOT fire an idle alert when events keep arriving', async () => { @@ -112,7 +115,7 @@ describe('MonitoringActor', () => { expect(mockAddAlert).not.toHaveBeenCalled(); }); - it('should fire only one idle alert and exit once per idle period', async () => { + it('should fire only one idle alert and send MONITORING_IDLE_TIMEOUT once per idle period', async () => { MonitoringActor(mockCallback, mockReceive, config); sendEvent('CONNECTED'); @@ -121,33 +124,35 @@ describe('MonitoringActor', () => { await Promise.resolve(); expect(mockAddAlert).toHaveBeenCalledTimes(1); - expect(processExitSpy).toHaveBeenCalledTimes(1); - expect(processExitSpy).toHaveBeenCalledWith(1); + expect(mockCallback).toHaveBeenCalledTimes(1); + expect(mockCallback).toHaveBeenCalledWith(MONITORING_IDLE_TIMEOUT_EVENT); + expect(processExitSpy).not.toHaveBeenCalled(); }); - it('should reset the idle alert flag when an event is received, allowing a second exit', async () => { + it('should reset the idle alert flag when an event is received, allowing a second MONITORING_IDLE_TIMEOUT', async () => { MonitoringActor(mockCallback, mockReceive, config); sendEvent('CONNECTED'); - // Trigger first alert + exit + // Trigger first alert (interval = T/2, fires at T/2 then T — alert at T) jest.advanceTimersByTime(config['IDLE_EVENT_TIMEOUT_MS'] + 1); await Promise.resolve(); await Promise.resolve(); expect(mockAddAlert).toHaveBeenCalledTimes(1); - expect(processExitSpy).toHaveBeenCalledTimes(1); + expect(mockCallback).toHaveBeenCalledTimes(1); - // Receive an event — resets idleAlertFired and lastEventReceivedAt + // Receive an event — resets idleAlertFired and lastEventReceivedAt (~T from start) sendEvent('EVENT_RECEIVED'); - // Advance far enough for the interval to fire when idleMs >= threshold again. - // The interval fires at 2T, 3T, … from start. After EVENT_RECEIVED at ~T, - // the next fire where idleMs >= T is at 3T (fire at 2T gives idleMs = T-1). + // With interval=T/2, interval fires at 3T/2, 2T, 5T/2, … from start. + // The first fire where idleMs >= T after EVENT_RECEIVED is at 5T/2 (idleMs = 3T/2 - ε). + // Advancing 2T from here (total ~3T from start) covers 5T/2, so the second alert fires. jest.advanceTimersByTime(2 * config['IDLE_EVENT_TIMEOUT_MS']); await Promise.resolve(); await Promise.resolve(); expect(mockAddAlert).toHaveBeenCalledTimes(2); - expect(processExitSpy).toHaveBeenCalledTimes(2); + expect(mockCallback).toHaveBeenCalledTimes(2); + expect(processExitSpy).not.toHaveBeenCalled(); }); it('should restart the idle timer when CONNECTED is sent while already running', () => { @@ -173,7 +178,7 @@ describe('MonitoringActor', () => { expect(clearTimeout).toHaveBeenCalledTimes(1); }); - it('should fire a MAJOR alert when stuck', async () => { + it('should fire a MAJOR alert when stuck and NOT send MONITORING_IDLE_TIMEOUT', async () => { MonitoringActor(mockCallback, mockReceive, config); sendEvent('PROCESSING_STARTED'); @@ -183,6 +188,7 @@ describe('MonitoringActor', () => { expect(mockAddAlert).toHaveBeenCalledTimes(1); expect(mockAddAlert.mock.calls[0][0]).toBe('Daemon Stuck In Processing State'); + // Stuck detection intentionally does not notify the machine — machine keeps running expect(mockCallback).not.toHaveBeenCalled(); }); @@ -249,6 +255,46 @@ describe('MonitoringActor', () => { expect(mockAddAlert).not.toHaveBeenCalled(); }); + it('should not fire more than one storm alert within the 1-minute cooldown window', async () => { + MonitoringActor(mockCallback, mockReceive, config); + + // Trigger threshold + sendEvent('RECONNECTING'); + sendEvent('RECONNECTING'); + sendEvent('RECONNECTING'); // threshold = 3 → first alert + + await Promise.resolve(); + expect(mockAddAlert).toHaveBeenCalledTimes(1); + + // Additional reconnections within cooldown (no time advanced) + sendEvent('RECONNECTING'); + sendEvent('RECONNECTING'); + + await Promise.resolve(); + // Cooldown prevents a second alert + expect(mockAddAlert).toHaveBeenCalledTimes(1); + }); + + it('should fire another storm alert after the 1-minute cooldown expires', async () => { + MonitoringActor(mockCallback, mockReceive, config); + + sendEvent('RECONNECTING'); + sendEvent('RECONNECTING'); + sendEvent('RECONNECTING'); // first alert + + await Promise.resolve(); + expect(mockAddAlert).toHaveBeenCalledTimes(1); + + // Advance past the 1-minute cooldown + jest.advanceTimersByTime(61 * 1000); + + sendEvent('RECONNECTING'); // still >= threshold in window, cooldown expired + + await Promise.resolve(); + expect(mockAddAlert).toHaveBeenCalledTimes(2); + expect(mockAddAlert.mock.calls[1][0]).toBe('Daemon Reconnection Storm'); + }); + it('should evict old reconnections outside the storm window', async () => { MonitoringActor(mockCallback, mockReceive, config); diff --git a/packages/daemon/src/actions/index.ts b/packages/daemon/src/actions/index.ts index 3f09999f..ede51cd1 100644 --- a/packages/daemon/src/actions/index.ts +++ b/packages/daemon/src/actions/index.ts @@ -5,7 +5,7 @@ * LICENSE file in the root directory of this source tree. */ -import { assign, AssignAction, sendTo } from 'xstate'; +import { assign, AssignAction, sendTo, choose } from 'xstate'; import { Context, Event, EventTypes, StandardFullNodeEvent } from '../types'; import { get } from 'lodash'; import logger from '../logger'; @@ -209,54 +209,55 @@ export const getMonitoringRefFromContext = (context: Context) => { return context.monitoring; }; +const monitoringIsPresent = (context: Context) => context.monitoring !== null; + /* * Notifies the monitoring actor that the WebSocket became connected. */ -export const sendMonitoringConnected = sendTo( - getMonitoringRefFromContext, - { type: EventTypes.MONITORING_EVENT, event: { type: 'CONNECTED' } }, -); +export const sendMonitoringConnected = choose([{ + cond: monitoringIsPresent, + actions: sendTo(getMonitoringRefFromContext, { type: EventTypes.MONITORING_EVENT, event: { type: 'CONNECTED' } }), +}]); /* * Notifies the monitoring actor that the WebSocket disconnected. */ -export const sendMonitoringDisconnected = sendTo( - getMonitoringRefFromContext, - { type: EventTypes.MONITORING_EVENT, event: { type: 'DISCONNECTED' } }, -); +export const sendMonitoringDisconnected = choose([{ + cond: monitoringIsPresent, + actions: sendTo(getMonitoringRefFromContext, { type: EventTypes.MONITORING_EVENT, event: { type: 'DISCONNECTED' } }), +}]); /* * Notifies the monitoring actor that a fullnode event was received (resets the idle timer). */ -export const sendMonitoringEventReceived = sendTo( - getMonitoringRefFromContext, - { type: EventTypes.MONITORING_EVENT, event: { type: 'EVENT_RECEIVED' } }, -); +export const sendMonitoringEventReceived = choose([{ + cond: monitoringIsPresent, + actions: sendTo(getMonitoringRefFromContext, { type: EventTypes.MONITORING_EVENT, event: { type: 'EVENT_RECEIVED' } }), +}]); /* * Notifies the monitoring actor that the machine is entering the RECONNECTING state. */ -export const sendMonitoringReconnecting = sendTo( - getMonitoringRefFromContext, - { type: EventTypes.MONITORING_EVENT, event: { type: 'RECONNECTING' } }, -); +export const sendMonitoringReconnecting = choose([{ + cond: monitoringIsPresent, + actions: sendTo(getMonitoringRefFromContext, { type: EventTypes.MONITORING_EVENT, event: { type: 'RECONNECTING' } }), +}]); /* * Notifies the monitoring actor that a processing state was entered. * The actor starts a stuck-detection timer; if PROCESSING_COMPLETED doesn't - * arrive within STUCK_PROCESSING_TIMEOUT_MS it fires a CRITICAL alert and sends - * MONITORING_STUCK_PROCESSING back to the machine. + * arrive within STUCK_PROCESSING_TIMEOUT_MS it fires a MAJOR alert. */ -export const sendMonitoringProcessingStarted = sendTo( - getMonitoringRefFromContext, - { type: EventTypes.MONITORING_EVENT, event: { type: 'PROCESSING_STARTED' } }, -); +export const sendMonitoringProcessingStarted = choose([{ + cond: monitoringIsPresent, + actions: sendTo(getMonitoringRefFromContext, { type: EventTypes.MONITORING_EVENT, event: { type: 'PROCESSING_STARTED' } }), +}]); /* * Notifies the monitoring actor that a processing state was exited normally, * cancelling the stuck-detection timer. */ -export const sendMonitoringProcessingCompleted = sendTo( - getMonitoringRefFromContext, - { type: EventTypes.MONITORING_EVENT, event: { type: 'PROCESSING_COMPLETED' } }, -); +export const sendMonitoringProcessingCompleted = choose([{ + cond: monitoringIsPresent, + actions: sendTo(getMonitoringRefFromContext, { type: EventTypes.MONITORING_EVENT, event: { type: 'PROCESSING_COMPLETED' } }), +}]); diff --git a/packages/daemon/src/actors/MonitoringActor.ts b/packages/daemon/src/actors/MonitoringActor.ts index 5769ae8b..ec84809d 100644 --- a/packages/daemon/src/actors/MonitoringActor.ts +++ b/packages/daemon/src/actors/MonitoringActor.ts @@ -15,8 +15,8 @@ import { Event, EventTypes } from '../types'; * * Centralises all runtime health monitoring for the sync state machine. * The machine sends MONITORING_EVENTs to this actor; when anomalies are detected - * the actor fires alerts and, when necessary, sends MONITORING_STUCK_PROCESSING - * back to the machine to trigger a soft reconnect. + * the actor fires alerts and, when necessary, sends MONITORING_IDLE_TIMEOUT back + * to the machine to trigger a graceful shutdown. * * Responsibilities: * @@ -26,13 +26,13 @@ import { Event, EventTypes } from '../types'; * * 2. Stuck-processing detection — PROCESSING_STARTED begins a one-shot timer; * PROCESSING_COMPLETED cancels it. If the timer fires the actor fires a - * CRITICAL alert and sends MONITORING_STUCK_PROCESSING to the machine, which - * transitions to RECONNECTING. This replaces the per-state XState `after` - * blocks that previously scattered this logic across every processing state. + * MAJOR alert; the machine keeps running so that a long-running handler + * (e.g. a large reorg) is allowed to finish. * - * 3. Reconnection storm detection — fires a CRITICAL alert when the daemon + * 3. Reconnection storm detection — fires a MAJOR alert when the daemon * reconnects more than RECONNECTION_STORM_THRESHOLD times within - * RECONNECTION_STORM_WINDOW_MS. + * RECONNECTION_STORM_WINDOW_MS. Duplicate alerts are suppressed for + * STORM_ALERT_COOLDOWN_MS (1 min) to avoid spamming the alerting system. */ const DEFAULT_IDLE_EVENT_TIMEOUT_MS = 5 * 60 * 1000; const DEFAULT_STUCK_PROCESSING_TIMEOUT_MS = 60 * 60 * 1000; // 1 hour @@ -59,6 +59,7 @@ export default (callback: any, receive: any, config = getConfig()) => { idleAlertFired = false; idleCheckTimer = setInterval(async () => { + // Interval is idleTimeoutMs/2 so the worst-case detection lag is 1.5×timeout if (!isConnected || lastEventReceivedAt === null) return; const idleMs = Date.now() - lastEventReceivedAt; @@ -75,9 +76,11 @@ export default (callback: any, receive: any, config = getConfig()) => { Severity.MAJOR, { idleMs: String(idleMs) }, logger, - ).finally(() => process.exit(1)); + ).finally(() => { + callback({ type: EventTypes.MONITORING_IDLE_TIMEOUT }); + }); } - }, idleTimeoutMs); + }, Math.floor(idleTimeoutMs / 2)); }; const stopIdleCheck = () => { @@ -102,7 +105,7 @@ export default (callback: any, receive: any, config = getConfig()) => { { timeoutMs: String(stuckTimeoutMs) }, logger, ).catch((err: Error) => - logger.error(`[monitoring] Failed to send stuck-processing alert: ${err}`), + logger.error(`[monitoring] Failed to send stuck-processing alert: ${err.message}`), ); }, stuckTimeoutMs); }; @@ -115,7 +118,9 @@ export default (callback: any, receive: any, config = getConfig()) => { }; // ── Reconnection storm detection ───────────────────────────────────────────── + const STORM_ALERT_COOLDOWN_MS = 60 * 1000; // suppress duplicate storm alerts for 1 minute let reconnectionTimestamps: number[] = []; + let stormAlertLastFiredAt: number | null = null; const trackReconnection = () => { const now = Date.now(); @@ -125,6 +130,11 @@ export default (callback: any, receive: any, config = getConfig()) => { reconnectionTimestamps = reconnectionTimestamps.filter(t => t >= windowStart); if (reconnectionTimestamps.length >= stormThreshold) { + if (stormAlertLastFiredAt !== null && now - stormAlertLastFiredAt < STORM_ALERT_COOLDOWN_MS) { + return; // still within cooldown — do not spam the alerting system + } + stormAlertLastFiredAt = now; + const windowMinutes = Math.round(stormWindowMs / 60000); logger.error( `[monitoring] Reconnection storm: ${reconnectionTimestamps.length} reconnections in the last ${windowMinutes} minutes`, @@ -140,7 +150,7 @@ export default (callback: any, receive: any, config = getConfig()) => { }, logger, ).catch((err: Error) => - logger.error(`[monitoring] Failed to send reconnection storm alert: ${err}`), + logger.error(`[monitoring] Failed to send reconnection storm alert: ${err.message}`), ); } }; diff --git a/packages/daemon/src/delays/index.ts b/packages/daemon/src/delays/index.ts index 0ad82103..45ab1ff0 100644 --- a/packages/daemon/src/delays/index.ts +++ b/packages/daemon/src/delays/index.ts @@ -24,4 +24,3 @@ export const ACK_TIMEOUT = () => { const { ACK_TIMEOUT_MS } = getConfig(); return ACK_TIMEOUT_MS; }; - diff --git a/packages/daemon/src/index.ts b/packages/daemon/src/index.ts index 571c6dac..fc9ffc4f 100644 --- a/packages/daemon/src/index.ts +++ b/packages/daemon/src/index.ts @@ -20,6 +20,11 @@ const main = async () => { logger.info(`Transitioned to ${bigIntUtils.JSONBigInt.stringify(state.value)}`); }); + machine.onDone(() => { + logger.error('Sync machine reached a final state — terminating process for Kubernetes restart'); + process.exit(1); + }); + machine.onEvent((event) => { logger.info(`Processing event: ${bigIntUtils.JSONBigInt.stringify(event.type)}`); }); diff --git a/packages/daemon/src/machines/SyncMachine.ts b/packages/daemon/src/machines/SyncMachine.ts index 61b87892..a636b3b1 100644 --- a/packages/daemon/src/machines/SyncMachine.ts +++ b/packages/daemon/src/machines/SyncMachine.ts @@ -391,6 +391,9 @@ export const SyncMachine = Machine({ cond: 'websocketDisconnected', target: SYNC_MACHINE_STATES.RECONNECTING, }], + MONITORING_IDLE_TIMEOUT: { + target: `#${SYNC_MACHINE_STATES.ERROR}`, + }, }, }, [SYNC_MACHINE_STATES.ERROR]: { diff --git a/packages/daemon/src/types/event.ts b/packages/daemon/src/types/event.ts index 9981e5dd..5fa0b3d2 100644 --- a/packages/daemon/src/types/event.ts +++ b/packages/daemon/src/types/event.ts @@ -42,6 +42,7 @@ export enum EventTypes { WEBSOCKET_SEND_EVENT = 'WEBSOCKET_SEND_EVENT', HEALTHCHECK_EVENT = 'HEALTHCHECK_EVENT', MONITORING_EVENT = 'MONITORING_EVENT', + MONITORING_IDLE_TIMEOUT = 'MONITORING_IDLE_TIMEOUT', } export enum FullNodeEventTypes { @@ -82,7 +83,8 @@ export type Event = | { type: EventTypes.FULLNODE_EVENT, event: FullNodeEvent } | { type: EventTypes.WEBSOCKET_SEND_EVENT, event: WebSocketSendEvent } | { type: EventTypes.HEALTHCHECK_EVENT, event: HealthCheckEvent } - | { type: EventTypes.MONITORING_EVENT, event: MonitoringEvent }; + | { type: EventTypes.MONITORING_EVENT, event: MonitoringEvent } + | { type: EventTypes.MONITORING_IDLE_TIMEOUT }; export interface VertexRemovedEventData { From 7044b7a5f707d8181061864155f368241712316f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Abadesso?= Date: Fri, 27 Feb 2026 09:40:49 -0300 Subject: [PATCH 10/11] fix: missing type --- packages/daemon/src/services/index.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/daemon/src/services/index.ts b/packages/daemon/src/services/index.ts index 82ebf91d..b8151239 100644 --- a/packages/daemon/src/services/index.ts +++ b/packages/daemon/src/services/index.ts @@ -17,6 +17,7 @@ import { DbTransaction, LastSyncedEvent, Event, + EventTypes, Context, EventTxInput, EventTxOutput, @@ -103,7 +104,7 @@ export const METADATA_DIFF_EVENT_TYPES = { const DUPLICATE_TX_ALERT_GRACE_PERIOD = 10; // seconds export const metadataDiff = async (_context: Context, event: Event) => { - const fullNodeEvent = event.event as StandardFullNodeEvent; + const fullNodeEvent = (event as Extract).event as StandardFullNodeEvent; const { hash, metadata: { voided_by, first_block, nc_execution }, From b020cae098857109b59659ee33ecaadc72199d4a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Abadesso?= Date: Fri, 27 Feb 2026 10:36:13 -0300 Subject: [PATCH 11/11] refactor(daemon): address monitoring layer nitpicks - restore processExitSpy after each test to prevent spy leakage - add Severity.MAJOR assertions to alert tests - replace repeated choose+sendTo blocks with makeMonitoringSender factory --- .../__tests__/actors/MonitoringActor.test.ts | 9 ++- packages/daemon/src/actions/index.ts | 57 +++---------------- 2 files changed, 17 insertions(+), 49 deletions(-) diff --git a/packages/daemon/__tests__/actors/MonitoringActor.test.ts b/packages/daemon/__tests__/actors/MonitoringActor.test.ts index 1850c224..07ce898d 100644 --- a/packages/daemon/__tests__/actors/MonitoringActor.test.ts +++ b/packages/daemon/__tests__/actors/MonitoringActor.test.ts @@ -9,7 +9,7 @@ import MonitoringActor from '../../src/actors/MonitoringActor'; import logger from '../../src/logger'; import { EventTypes } from '../../src/types/event'; import getConfig from '../../src/config'; -import { addAlert } from '@wallet-service/common'; +import { addAlert, Severity } from '@wallet-service/common'; const MONITORING_IDLE_TIMEOUT_EVENT = { type: EventTypes.MONITORING_IDLE_TIMEOUT }; @@ -56,6 +56,10 @@ describe('MonitoringActor', () => { }); }); + afterEach(() => { + processExitSpy.mockRestore(); + }); + afterAll(() => { jest.clearAllMocks(); jest.useRealTimers(); @@ -98,6 +102,7 @@ describe('MonitoringActor', () => { expect(mockAddAlert).toHaveBeenCalledTimes(1); expect(mockAddAlert.mock.calls[0][0]).toBe('Daemon Idle — No Events Received'); + expect(mockAddAlert.mock.calls[0][2]).toBe(Severity.MAJOR); expect(mockCallback).toHaveBeenCalledWith(MONITORING_IDLE_TIMEOUT_EVENT); expect(processExitSpy).not.toHaveBeenCalled(); }); @@ -188,6 +193,7 @@ describe('MonitoringActor', () => { expect(mockAddAlert).toHaveBeenCalledTimes(1); expect(mockAddAlert.mock.calls[0][0]).toBe('Daemon Stuck In Processing State'); + expect(mockAddAlert.mock.calls[0][2]).toBe(Severity.MAJOR); // Stuck detection intentionally does not notify the machine — machine keeps running expect(mockCallback).not.toHaveBeenCalled(); }); @@ -243,6 +249,7 @@ describe('MonitoringActor', () => { await Promise.resolve(); expect(mockAddAlert).toHaveBeenCalledTimes(1); expect(mockAddAlert.mock.calls[0][0]).toBe('Daemon Reconnection Storm'); + expect(mockAddAlert.mock.calls[0][2]).toBe(Severity.MAJOR); }); it('should NOT fire a reconnection storm alert below the threshold', async () => { diff --git a/packages/daemon/src/actions/index.ts b/packages/daemon/src/actions/index.ts index ede51cd1..87e65c53 100644 --- a/packages/daemon/src/actions/index.ts +++ b/packages/daemon/src/actions/index.ts @@ -6,7 +6,7 @@ */ import { assign, AssignAction, sendTo, choose } from 'xstate'; -import { Context, Event, EventTypes, StandardFullNodeEvent } from '../types'; +import { Context, Event, EventTypes, MonitoringEvent, StandardFullNodeEvent } from '../types'; import { get } from 'lodash'; import logger from '../logger'; import { hashTxData } from '../utils'; @@ -211,53 +211,14 @@ export const getMonitoringRefFromContext = (context: Context) => { const monitoringIsPresent = (context: Context) => context.monitoring !== null; -/* - * Notifies the monitoring actor that the WebSocket became connected. - */ -export const sendMonitoringConnected = choose([{ - cond: monitoringIsPresent, - actions: sendTo(getMonitoringRefFromContext, { type: EventTypes.MONITORING_EVENT, event: { type: 'CONNECTED' } }), -}]); - -/* - * Notifies the monitoring actor that the WebSocket disconnected. - */ -export const sendMonitoringDisconnected = choose([{ - cond: monitoringIsPresent, - actions: sendTo(getMonitoringRefFromContext, { type: EventTypes.MONITORING_EVENT, event: { type: 'DISCONNECTED' } }), -}]); - -/* - * Notifies the monitoring actor that a fullnode event was received (resets the idle timer). - */ -export const sendMonitoringEventReceived = choose([{ - cond: monitoringIsPresent, - actions: sendTo(getMonitoringRefFromContext, { type: EventTypes.MONITORING_EVENT, event: { type: 'EVENT_RECEIVED' } }), -}]); - -/* - * Notifies the monitoring actor that the machine is entering the RECONNECTING state. - */ -export const sendMonitoringReconnecting = choose([{ +const makeMonitoringSender = (eventType: MonitoringEvent['type']) => choose([{ cond: monitoringIsPresent, - actions: sendTo(getMonitoringRefFromContext, { type: EventTypes.MONITORING_EVENT, event: { type: 'RECONNECTING' } }), + actions: sendTo(getMonitoringRefFromContext, { type: EventTypes.MONITORING_EVENT, event: { type: eventType } }), }]); -/* - * Notifies the monitoring actor that a processing state was entered. - * The actor starts a stuck-detection timer; if PROCESSING_COMPLETED doesn't - * arrive within STUCK_PROCESSING_TIMEOUT_MS it fires a MAJOR alert. - */ -export const sendMonitoringProcessingStarted = choose([{ - cond: monitoringIsPresent, - actions: sendTo(getMonitoringRefFromContext, { type: EventTypes.MONITORING_EVENT, event: { type: 'PROCESSING_STARTED' } }), -}]); - -/* - * Notifies the monitoring actor that a processing state was exited normally, - * cancelling the stuck-detection timer. - */ -export const sendMonitoringProcessingCompleted = choose([{ - cond: monitoringIsPresent, - actions: sendTo(getMonitoringRefFromContext, { type: EventTypes.MONITORING_EVENT, event: { type: 'PROCESSING_COMPLETED' } }), -}]); +export const sendMonitoringConnected = makeMonitoringSender('CONNECTED'); +export const sendMonitoringDisconnected = makeMonitoringSender('DISCONNECTED'); +export const sendMonitoringEventReceived = makeMonitoringSender('EVENT_RECEIVED'); +export const sendMonitoringReconnecting = makeMonitoringSender('RECONNECTING'); +export const sendMonitoringProcessingStarted = makeMonitoringSender('PROCESSING_STARTED'); +export const sendMonitoringProcessingCompleted = makeMonitoringSender('PROCESSING_COMPLETED');