Skip to content

Commit 53aba8e

Browse files
committed
fix(daemon): address monitoring layer review feedback
- Replace process.exit with MONITORING_IDLE_TIMEOUT callback event - Handle MONITORING_IDLE_TIMEOUT in SyncMachine (-> ERROR state) - Add machine.onDone shutdown path in index.ts - Wrap monitoring send-actions in choose guard (no-op if actor absent) - Add reconnection storm alert cooldown (1 min) to prevent spam - Halve idle check interval to reduce worst-case detection lag to 1.5x - Fix stuck-processing doc comment (MAJOR alert, machine keeps running) - Fix err.message interpolation in alert catch handlers
1 parent 2c2cc5c commit 53aba8e

File tree

7 files changed

+120
-54
lines changed

7 files changed

+120
-54
lines changed

packages/daemon/__tests__/actors/MonitoringActor.test.ts

Lines changed: 60 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ import { EventTypes } from '../../src/types/event';
1111
import getConfig from '../../src/config';
1212
import { addAlert } from '@wallet-service/common';
1313

14+
const MONITORING_IDLE_TIMEOUT_EVENT = { type: EventTypes.MONITORING_IDLE_TIMEOUT };
15+
1416
jest.useFakeTimers();
1517
jest.spyOn(global, 'setInterval');
1618
jest.spyOn(global, 'clearInterval');
@@ -86,7 +88,7 @@ describe('MonitoringActor', () => {
8688
expect(clearInterval).toHaveBeenCalledTimes(1);
8789
});
8890

89-
it('should fire an idle alert and exit after IDLE_EVENT_TIMEOUT_MS with no events', async () => {
91+
it('should fire an idle alert and send MONITORING_IDLE_TIMEOUT after IDLE_EVENT_TIMEOUT_MS with no events', async () => {
9092
MonitoringActor(mockCallback, mockReceive, config);
9193
sendEvent('CONNECTED');
9294

@@ -96,7 +98,8 @@ describe('MonitoringActor', () => {
9698

9799
expect(mockAddAlert).toHaveBeenCalledTimes(1);
98100
expect(mockAddAlert.mock.calls[0][0]).toBe('Daemon Idle — No Events Received');
99-
expect(processExitSpy).toHaveBeenCalledWith(1);
101+
expect(mockCallback).toHaveBeenCalledWith(MONITORING_IDLE_TIMEOUT_EVENT);
102+
expect(processExitSpy).not.toHaveBeenCalled();
100103
});
101104

102105
it('should NOT fire an idle alert when events keep arriving', async () => {
@@ -112,7 +115,7 @@ describe('MonitoringActor', () => {
112115
expect(mockAddAlert).not.toHaveBeenCalled();
113116
});
114117

115-
it('should fire only one idle alert and exit once per idle period', async () => {
118+
it('should fire only one idle alert and send MONITORING_IDLE_TIMEOUT once per idle period', async () => {
116119
MonitoringActor(mockCallback, mockReceive, config);
117120
sendEvent('CONNECTED');
118121

@@ -121,33 +124,35 @@ describe('MonitoringActor', () => {
121124
await Promise.resolve();
122125

123126
expect(mockAddAlert).toHaveBeenCalledTimes(1);
124-
expect(processExitSpy).toHaveBeenCalledTimes(1);
125-
expect(processExitSpy).toHaveBeenCalledWith(1);
127+
expect(mockCallback).toHaveBeenCalledTimes(1);
128+
expect(mockCallback).toHaveBeenCalledWith(MONITORING_IDLE_TIMEOUT_EVENT);
129+
expect(processExitSpy).not.toHaveBeenCalled();
126130
});
127131

128-
it('should reset the idle alert flag when an event is received, allowing a second exit', async () => {
132+
it('should reset the idle alert flag when an event is received, allowing a second MONITORING_IDLE_TIMEOUT', async () => {
129133
MonitoringActor(mockCallback, mockReceive, config);
130134
sendEvent('CONNECTED');
131135

132-
// Trigger first alert + exit
136+
// Trigger first alert (interval = T/2, fires at T/2 then T — alert at T)
133137
jest.advanceTimersByTime(config['IDLE_EVENT_TIMEOUT_MS'] + 1);
134138
await Promise.resolve();
135139
await Promise.resolve();
136140
expect(mockAddAlert).toHaveBeenCalledTimes(1);
137-
expect(processExitSpy).toHaveBeenCalledTimes(1);
141+
expect(mockCallback).toHaveBeenCalledTimes(1);
138142

139-
// Receive an event — resets idleAlertFired and lastEventReceivedAt
143+
// Receive an event — resets idleAlertFired and lastEventReceivedAt (~T from start)
140144
sendEvent('EVENT_RECEIVED');
141145

142-
// Advance far enough for the interval to fire when idleMs >= threshold again.
143-
// The interval fires at 2T, 3T, … from start. After EVENT_RECEIVED at ~T,
144-
// the next fire where idleMs >= T is at 3T (fire at 2T gives idleMs = T-1).
146+
// With interval=T/2, interval fires at 3T/2, 2T, 5T/2, … from start.
147+
// The first fire where idleMs >= T after EVENT_RECEIVED is at 5T/2 (idleMs = 3T/2 - ε).
148+
// Advancing 2T from here (total ~3T from start) covers 5T/2, so the second alert fires.
145149
jest.advanceTimersByTime(2 * config['IDLE_EVENT_TIMEOUT_MS']);
146150
await Promise.resolve();
147151
await Promise.resolve();
148152

149153
expect(mockAddAlert).toHaveBeenCalledTimes(2);
150-
expect(processExitSpy).toHaveBeenCalledTimes(2);
154+
expect(mockCallback).toHaveBeenCalledTimes(2);
155+
expect(processExitSpy).not.toHaveBeenCalled();
151156
});
152157

153158
it('should restart the idle timer when CONNECTED is sent while already running', () => {
@@ -173,7 +178,7 @@ describe('MonitoringActor', () => {
173178
expect(clearTimeout).toHaveBeenCalledTimes(1);
174179
});
175180

176-
it('should fire a MAJOR alert when stuck', async () => {
181+
it('should fire a MAJOR alert when stuck and NOT send MONITORING_IDLE_TIMEOUT', async () => {
177182
MonitoringActor(mockCallback, mockReceive, config);
178183
sendEvent('PROCESSING_STARTED');
179184

@@ -183,6 +188,7 @@ describe('MonitoringActor', () => {
183188

184189
expect(mockAddAlert).toHaveBeenCalledTimes(1);
185190
expect(mockAddAlert.mock.calls[0][0]).toBe('Daemon Stuck In Processing State');
191+
// Stuck detection intentionally does not notify the machine — machine keeps running
186192
expect(mockCallback).not.toHaveBeenCalled();
187193
});
188194

@@ -249,6 +255,46 @@ describe('MonitoringActor', () => {
249255
expect(mockAddAlert).not.toHaveBeenCalled();
250256
});
251257

258+
it('should not fire more than one storm alert within the 1-minute cooldown window', async () => {
259+
MonitoringActor(mockCallback, mockReceive, config);
260+
261+
// Trigger threshold
262+
sendEvent('RECONNECTING');
263+
sendEvent('RECONNECTING');
264+
sendEvent('RECONNECTING'); // threshold = 3 → first alert
265+
266+
await Promise.resolve();
267+
expect(mockAddAlert).toHaveBeenCalledTimes(1);
268+
269+
// Additional reconnections within cooldown (no time advanced)
270+
sendEvent('RECONNECTING');
271+
sendEvent('RECONNECTING');
272+
273+
await Promise.resolve();
274+
// Cooldown prevents a second alert
275+
expect(mockAddAlert).toHaveBeenCalledTimes(1);
276+
});
277+
278+
it('should fire another storm alert after the 1-minute cooldown expires', async () => {
279+
MonitoringActor(mockCallback, mockReceive, config);
280+
281+
sendEvent('RECONNECTING');
282+
sendEvent('RECONNECTING');
283+
sendEvent('RECONNECTING'); // first alert
284+
285+
await Promise.resolve();
286+
expect(mockAddAlert).toHaveBeenCalledTimes(1);
287+
288+
// Advance past the 1-minute cooldown
289+
jest.advanceTimersByTime(61 * 1000);
290+
291+
sendEvent('RECONNECTING'); // still >= threshold in window, cooldown expired
292+
293+
await Promise.resolve();
294+
expect(mockAddAlert).toHaveBeenCalledTimes(2);
295+
expect(mockAddAlert.mock.calls[1][0]).toBe('Daemon Reconnection Storm');
296+
});
297+
252298
it('should evict old reconnections outside the storm window', async () => {
253299
MonitoringActor(mockCallback, mockReceive, config);
254300

packages/daemon/src/actions/index.ts

Lines changed: 28 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
* LICENSE file in the root directory of this source tree.
66
*/
77

8-
import { assign, AssignAction, sendTo } from 'xstate';
8+
import { assign, AssignAction, sendTo, choose } from 'xstate';
99
import { Context, Event, EventTypes, StandardFullNodeEvent } from '../types';
1010
import { get } from 'lodash';
1111
import logger from '../logger';
@@ -209,54 +209,55 @@ export const getMonitoringRefFromContext = (context: Context) => {
209209
return context.monitoring;
210210
};
211211

212+
const monitoringIsPresent = (context: Context) => context.monitoring !== null;
213+
212214
/*
213215
* Notifies the monitoring actor that the WebSocket became connected.
214216
*/
215-
export const sendMonitoringConnected = sendTo(
216-
getMonitoringRefFromContext,
217-
{ type: EventTypes.MONITORING_EVENT, event: { type: 'CONNECTED' } },
218-
);
217+
export const sendMonitoringConnected = choose([{
218+
cond: monitoringIsPresent,
219+
actions: sendTo(getMonitoringRefFromContext, { type: EventTypes.MONITORING_EVENT, event: { type: 'CONNECTED' } }),
220+
}]);
219221

220222
/*
221223
* Notifies the monitoring actor that the WebSocket disconnected.
222224
*/
223-
export const sendMonitoringDisconnected = sendTo(
224-
getMonitoringRefFromContext,
225-
{ type: EventTypes.MONITORING_EVENT, event: { type: 'DISCONNECTED' } },
226-
);
225+
export const sendMonitoringDisconnected = choose([{
226+
cond: monitoringIsPresent,
227+
actions: sendTo(getMonitoringRefFromContext, { type: EventTypes.MONITORING_EVENT, event: { type: 'DISCONNECTED' } }),
228+
}]);
227229

228230
/*
229231
* Notifies the monitoring actor that a fullnode event was received (resets the idle timer).
230232
*/
231-
export const sendMonitoringEventReceived = sendTo(
232-
getMonitoringRefFromContext,
233-
{ type: EventTypes.MONITORING_EVENT, event: { type: 'EVENT_RECEIVED' } },
234-
);
233+
export const sendMonitoringEventReceived = choose([{
234+
cond: monitoringIsPresent,
235+
actions: sendTo(getMonitoringRefFromContext, { type: EventTypes.MONITORING_EVENT, event: { type: 'EVENT_RECEIVED' } }),
236+
}]);
235237

236238
/*
237239
* Notifies the monitoring actor that the machine is entering the RECONNECTING state.
238240
*/
239-
export const sendMonitoringReconnecting = sendTo(
240-
getMonitoringRefFromContext,
241-
{ type: EventTypes.MONITORING_EVENT, event: { type: 'RECONNECTING' } },
242-
);
241+
export const sendMonitoringReconnecting = choose([{
242+
cond: monitoringIsPresent,
243+
actions: sendTo(getMonitoringRefFromContext, { type: EventTypes.MONITORING_EVENT, event: { type: 'RECONNECTING' } }),
244+
}]);
243245

244246
/*
245247
* Notifies the monitoring actor that a processing state was entered.
246248
* The actor starts a stuck-detection timer; if PROCESSING_COMPLETED doesn't
247-
* arrive within STUCK_PROCESSING_TIMEOUT_MS it fires a CRITICAL alert and sends
248-
* MONITORING_STUCK_PROCESSING back to the machine.
249+
* arrive within STUCK_PROCESSING_TIMEOUT_MS it fires a MAJOR alert.
249250
*/
250-
export const sendMonitoringProcessingStarted = sendTo(
251-
getMonitoringRefFromContext,
252-
{ type: EventTypes.MONITORING_EVENT, event: { type: 'PROCESSING_STARTED' } },
253-
);
251+
export const sendMonitoringProcessingStarted = choose([{
252+
cond: monitoringIsPresent,
253+
actions: sendTo(getMonitoringRefFromContext, { type: EventTypes.MONITORING_EVENT, event: { type: 'PROCESSING_STARTED' } }),
254+
}]);
254255

255256
/*
256257
* Notifies the monitoring actor that a processing state was exited normally,
257258
* cancelling the stuck-detection timer.
258259
*/
259-
export const sendMonitoringProcessingCompleted = sendTo(
260-
getMonitoringRefFromContext,
261-
{ type: EventTypes.MONITORING_EVENT, event: { type: 'PROCESSING_COMPLETED' } },
262-
);
260+
export const sendMonitoringProcessingCompleted = choose([{
261+
cond: monitoringIsPresent,
262+
actions: sendTo(getMonitoringRefFromContext, { type: EventTypes.MONITORING_EVENT, event: { type: 'PROCESSING_COMPLETED' } }),
263+
}]);

packages/daemon/src/actors/MonitoringActor.ts

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@ import { Event, EventTypes } from '../types';
1515
*
1616
* Centralises all runtime health monitoring for the sync state machine.
1717
* The machine sends MONITORING_EVENTs to this actor; when anomalies are detected
18-
* the actor fires alerts and, when necessary, sends MONITORING_STUCK_PROCESSING
19-
* back to the machine to trigger a soft reconnect.
18+
* the actor fires alerts and, when necessary, sends MONITORING_IDLE_TIMEOUT back
19+
* to the machine to trigger a graceful shutdown.
2020
*
2121
* Responsibilities:
2222
*
@@ -26,13 +26,13 @@ import { Event, EventTypes } from '../types';
2626
*
2727
* 2. Stuck-processing detection — PROCESSING_STARTED begins a one-shot timer;
2828
* PROCESSING_COMPLETED cancels it. If the timer fires the actor fires a
29-
* CRITICAL alert and sends MONITORING_STUCK_PROCESSING to the machine, which
30-
* transitions to RECONNECTING. This replaces the per-state XState `after`
31-
* blocks that previously scattered this logic across every processing state.
29+
* MAJOR alert; the machine keeps running so that a long-running handler
30+
* (e.g. a large reorg) is allowed to finish.
3231
*
33-
* 3. Reconnection storm detection — fires a CRITICAL alert when the daemon
32+
* 3. Reconnection storm detection — fires a MAJOR alert when the daemon
3433
* reconnects more than RECONNECTION_STORM_THRESHOLD times within
35-
* RECONNECTION_STORM_WINDOW_MS.
34+
* RECONNECTION_STORM_WINDOW_MS. Duplicate alerts are suppressed for
35+
* STORM_ALERT_COOLDOWN_MS (1 min) to avoid spamming the alerting system.
3636
*/
3737
const DEFAULT_IDLE_EVENT_TIMEOUT_MS = 5 * 60 * 1000;
3838
const DEFAULT_STUCK_PROCESSING_TIMEOUT_MS = 60 * 60 * 1000; // 1 hour
@@ -59,6 +59,7 @@ export default (callback: any, receive: any, config = getConfig()) => {
5959
idleAlertFired = false;
6060

6161
idleCheckTimer = setInterval(async () => {
62+
// Interval is idleTimeoutMs/2 so the worst-case detection lag is 1.5×timeout
6263
if (!isConnected || lastEventReceivedAt === null) return;
6364

6465
const idleMs = Date.now() - lastEventReceivedAt;
@@ -75,9 +76,11 @@ export default (callback: any, receive: any, config = getConfig()) => {
7576
Severity.MAJOR,
7677
{ idleMs: String(idleMs) },
7778
logger,
78-
).finally(() => process.exit(1));
79+
).finally(() => {
80+
callback({ type: EventTypes.MONITORING_IDLE_TIMEOUT });
81+
});
7982
}
80-
}, idleTimeoutMs);
83+
}, Math.floor(idleTimeoutMs / 2));
8184
};
8285

8386
const stopIdleCheck = () => {
@@ -102,7 +105,7 @@ export default (callback: any, receive: any, config = getConfig()) => {
102105
{ timeoutMs: String(stuckTimeoutMs) },
103106
logger,
104107
).catch((err: Error) =>
105-
logger.error(`[monitoring] Failed to send stuck-processing alert: ${err}`),
108+
logger.error(`[monitoring] Failed to send stuck-processing alert: ${err.message}`),
106109
);
107110
}, stuckTimeoutMs);
108111
};
@@ -115,7 +118,9 @@ export default (callback: any, receive: any, config = getConfig()) => {
115118
};
116119

117120
// ── Reconnection storm detection ─────────────────────────────────────────────
121+
const STORM_ALERT_COOLDOWN_MS = 60 * 1000; // suppress duplicate storm alerts for 1 minute
118122
let reconnectionTimestamps: number[] = [];
123+
let stormAlertLastFiredAt: number | null = null;
119124

120125
const trackReconnection = () => {
121126
const now = Date.now();
@@ -125,6 +130,11 @@ export default (callback: any, receive: any, config = getConfig()) => {
125130
reconnectionTimestamps = reconnectionTimestamps.filter(t => t >= windowStart);
126131

127132
if (reconnectionTimestamps.length >= stormThreshold) {
133+
if (stormAlertLastFiredAt !== null && now - stormAlertLastFiredAt < STORM_ALERT_COOLDOWN_MS) {
134+
return; // still within cooldown — do not spam the alerting system
135+
}
136+
stormAlertLastFiredAt = now;
137+
128138
const windowMinutes = Math.round(stormWindowMs / 60000);
129139
logger.error(
130140
`[monitoring] Reconnection storm: ${reconnectionTimestamps.length} reconnections in the last ${windowMinutes} minutes`,
@@ -140,7 +150,7 @@ export default (callback: any, receive: any, config = getConfig()) => {
140150
},
141151
logger,
142152
).catch((err: Error) =>
143-
logger.error(`[monitoring] Failed to send reconnection storm alert: ${err}`),
153+
logger.error(`[monitoring] Failed to send reconnection storm alert: ${err.message}`),
144154
);
145155
}
146156
};

packages/daemon/src/delays/index.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,4 +24,3 @@ export const ACK_TIMEOUT = () => {
2424
const { ACK_TIMEOUT_MS } = getConfig();
2525
return ACK_TIMEOUT_MS;
2626
};
27-

packages/daemon/src/index.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,11 @@ const main = async () => {
2020
logger.info(`Transitioned to ${bigIntUtils.JSONBigInt.stringify(state.value)}`);
2121
});
2222

23+
machine.onDone(() => {
24+
logger.error('Sync machine reached a final state — terminating process for Kubernetes restart');
25+
process.exit(1);
26+
});
27+
2328
machine.onEvent((event) => {
2429
logger.info(`Processing event: ${bigIntUtils.JSONBigInt.stringify(event.type)}`);
2530
});

packages/daemon/src/machines/SyncMachine.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -391,6 +391,9 @@ export const SyncMachine = Machine<Context, any, Event>({
391391
cond: 'websocketDisconnected',
392392
target: SYNC_MACHINE_STATES.RECONNECTING,
393393
}],
394+
MONITORING_IDLE_TIMEOUT: {
395+
target: `#${SYNC_MACHINE_STATES.ERROR}`,
396+
},
394397
},
395398
},
396399
[SYNC_MACHINE_STATES.ERROR]: {

packages/daemon/src/types/event.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ export enum EventTypes {
4242
WEBSOCKET_SEND_EVENT = 'WEBSOCKET_SEND_EVENT',
4343
HEALTHCHECK_EVENT = 'HEALTHCHECK_EVENT',
4444
MONITORING_EVENT = 'MONITORING_EVENT',
45+
MONITORING_IDLE_TIMEOUT = 'MONITORING_IDLE_TIMEOUT',
4546
}
4647

4748
export enum FullNodeEventTypes {
@@ -82,7 +83,8 @@ export type Event =
8283
| { type: EventTypes.FULLNODE_EVENT, event: FullNodeEvent }
8384
| { type: EventTypes.WEBSOCKET_SEND_EVENT, event: WebSocketSendEvent }
8485
| { type: EventTypes.HEALTHCHECK_EVENT, event: HealthCheckEvent }
85-
| { type: EventTypes.MONITORING_EVENT, event: MonitoringEvent };
86+
| { type: EventTypes.MONITORING_EVENT, event: MonitoringEvent }
87+
| { type: EventTypes.MONITORING_IDLE_TIMEOUT };
8688

8789

8890
export interface VertexRemovedEventData {

0 commit comments

Comments
 (0)