Skip to content

Commit 64d4448

Browse files
committed
refactor(daemon): move stuck-processing timeout logic into MonitoringActor
1 parent 49152f8 commit 64d4448

File tree

6 files changed

+222
-158
lines changed

6 files changed

+222
-158
lines changed

packages/daemon/__tests__/actors/MonitoringActor.test.ts

Lines changed: 99 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ import { addAlert } from '@wallet-service/common';
1414
jest.useFakeTimers();
1515
jest.spyOn(global, 'setInterval');
1616
jest.spyOn(global, 'clearInterval');
17+
jest.spyOn(global, 'setTimeout');
18+
jest.spyOn(global, 'clearTimeout');
1719

1820
jest.mock('@wallet-service/common', () => ({
1921
...jest.requireActual('@wallet-service/common'),
@@ -39,8 +41,9 @@ describe('MonitoringActor', () => {
3941
jest.clearAllMocks();
4042
jest.clearAllTimers();
4143
config = getConfig();
42-
config['IDLE_EVENT_TIMEOUT_MS'] = 5 * 60 * 1000; // 5 min
43-
config['RECONNECTION_STORM_THRESHOLD'] = 3; // low threshold for tests
44+
config['IDLE_EVENT_TIMEOUT_MS'] = 5 * 60 * 1000; // 5 min
45+
config['STUCK_PROCESSING_TIMEOUT_MS'] = 5 * 60 * 1000; // 5 min
46+
config['RECONNECTION_STORM_THRESHOLD'] = 3; // low threshold for tests
4447
config['RECONNECTION_STORM_WINDOW_MS'] = 5 * 60 * 1000; // 5 min
4548

4649
mockCallback = jest.fn();
@@ -54,6 +57,8 @@ describe('MonitoringActor', () => {
5457
jest.useRealTimers();
5558
});
5659

60+
// ── Idle detection ───────────────────────────────────────────────────────────
61+
5762
it('should not start the idle timer on initialization', () => {
5863
MonitoringActor(mockCallback, mockReceive, config);
5964
expect(setInterval).not.toHaveBeenCalled();
@@ -68,17 +73,13 @@ describe('MonitoringActor', () => {
6873
it('should stop the idle timer when receiving a DISCONNECTED event', () => {
6974
MonitoringActor(mockCallback, mockReceive, config);
7075
sendEvent('CONNECTED');
71-
expect(setInterval).toHaveBeenCalledTimes(1);
72-
7376
sendEvent('DISCONNECTED');
7477
expect(clearInterval).toHaveBeenCalledTimes(1);
7578
});
7679

7780
it('should stop the idle timer when the actor is stopped', () => {
7881
const stopActor = MonitoringActor(mockCallback, mockReceive, config);
7982
sendEvent('CONNECTED');
80-
expect(setInterval).toHaveBeenCalledTimes(1);
81-
8283
stopActor();
8384
expect(clearInterval).toHaveBeenCalledTimes(1);
8485
});
@@ -87,83 +88,150 @@ describe('MonitoringActor', () => {
8788
MonitoringActor(mockCallback, mockReceive, config);
8889
sendEvent('CONNECTED');
8990

90-
// Advance time past the idle timeout
9191
jest.advanceTimersByTime(config['IDLE_EVENT_TIMEOUT_MS'] + 1);
92-
93-
// Allow the async addAlert promise to resolve
9492
await Promise.resolve();
9593

9694
expect(mockAddAlert).toHaveBeenCalledTimes(1);
9795
expect(mockAddAlert.mock.calls[0][0]).toBe('Daemon Idle — No Events Received');
9896
});
9997

100-
it('should NOT fire an idle alert when events are being received', async () => {
98+
it('should NOT fire an idle alert when events keep arriving', async () => {
10199
MonitoringActor(mockCallback, mockReceive, config);
102100
sendEvent('CONNECTED');
103101

104-
// Advance to just before the timeout
102+
// Stay below the threshold each time
105103
jest.advanceTimersByTime(config['IDLE_EVENT_TIMEOUT_MS'] - 1000);
106104
sendEvent('EVENT_RECEIVED');
107-
108-
// Advance past the original threshold (but lastEventReceivedAt was reset)
109105
jest.advanceTimersByTime(config['IDLE_EVENT_TIMEOUT_MS'] - 1000);
110106

111107
await Promise.resolve();
112108
expect(mockAddAlert).not.toHaveBeenCalled();
113109
});
114110

115-
it('should fire only one idle alert even if the timer fires multiple times', async () => {
111+
it('should fire only one idle alert per idle period', async () => {
116112
MonitoringActor(mockCallback, mockReceive, config);
117113
sendEvent('CONNECTED');
118114

119-
// Fire timer three times without receiving any events
120115
jest.advanceTimersByTime(config['IDLE_EVENT_TIMEOUT_MS'] * 3);
121-
122116
await Promise.resolve();
123117

124118
expect(mockAddAlert).toHaveBeenCalledTimes(1);
125119
});
126120

127-
it('should reset the idle alert flag when an event is received after an alert fired', async () => {
121+
it('should reset the idle alert flag when an event is received, allowing a second alert', async () => {
128122
MonitoringActor(mockCallback, mockReceive, config);
129123
sendEvent('CONNECTED');
130124

131-
// Trigger first alert (interval fires at T = IDLE_EVENT_TIMEOUT_MS)
125+
// Trigger first alert
132126
jest.advanceTimersByTime(config['IDLE_EVENT_TIMEOUT_MS'] + 1);
133127
await Promise.resolve();
134128
expect(mockAddAlert).toHaveBeenCalledTimes(1);
135129

136-
// Receive an event — resets idleAlertFired and lastEventReceivedAt to current time T1
130+
// Receive an event — resets idleAlertFired and lastEventReceivedAt
137131
sendEvent('EVENT_RECEIVED');
138132

139-
// The next interval tick where idleMs >= threshold is at 3*T (the interval at 2*T
140-
// fires only T-1 ms after EVENT_RECEIVED, which is below the threshold).
141-
// Advancing by 2*T from T1 guarantees we cross that boundary.
133+
// Advance far enough for the interval to fire when idleMs >= threshold again.
134+
// The interval fires at 2T, 3T, … from start. After EVENT_RECEIVED at ~T,
135+
// the next fire where idleMs >= T is at 3T (fire at 2T gives idleMs = T-1).
142136
jest.advanceTimersByTime(2 * config['IDLE_EVENT_TIMEOUT_MS']);
143137
await Promise.resolve();
144138

145-
// A second alert should now be fired
146139
expect(mockAddAlert).toHaveBeenCalledTimes(2);
147140
});
148141

149-
it('should fire a reconnection storm alert when threshold is reached', async () => {
142+
it('should restart the idle timer when CONNECTED is sent while already running', () => {
143+
MonitoringActor(mockCallback, mockReceive, config);
144+
sendEvent('CONNECTED');
145+
sendEvent('CONNECTED'); // second connect clears old and starts new
146+
expect(clearInterval).toHaveBeenCalledTimes(1);
147+
expect(setInterval).toHaveBeenCalledTimes(2);
148+
});
149+
150+
// ── Stuck-processing detection ───────────────────────────────────────────────
151+
152+
it('should start a stuck timer on PROCESSING_STARTED', () => {
153+
MonitoringActor(mockCallback, mockReceive, config);
154+
sendEvent('PROCESSING_STARTED');
155+
expect(setTimeout).toHaveBeenCalledTimes(1);
156+
});
157+
158+
it('should cancel the stuck timer on PROCESSING_COMPLETED', () => {
159+
MonitoringActor(mockCallback, mockReceive, config);
160+
sendEvent('PROCESSING_STARTED');
161+
sendEvent('PROCESSING_COMPLETED');
162+
expect(clearTimeout).toHaveBeenCalledTimes(1);
163+
});
164+
165+
it('should fire a CRITICAL alert and call back MONITORING_STUCK_PROCESSING when stuck', async () => {
166+
MonitoringActor(mockCallback, mockReceive, config);
167+
sendEvent('PROCESSING_STARTED');
168+
169+
jest.advanceTimersByTime(config['STUCK_PROCESSING_TIMEOUT_MS'] + 1);
170+
// Let the async addAlert inside the timeout resolve
171+
await Promise.resolve();
172+
await Promise.resolve();
173+
174+
expect(mockAddAlert).toHaveBeenCalledTimes(1);
175+
expect(mockAddAlert.mock.calls[0][0]).toBe('Daemon Stuck In Processing State');
176+
expect(mockCallback).toHaveBeenCalledWith({ type: EventTypes.MONITORING_STUCK_PROCESSING });
177+
});
178+
179+
it('should NOT fire the stuck alert when PROCESSING_COMPLETED arrives in time', async () => {
180+
MonitoringActor(mockCallback, mockReceive, config);
181+
sendEvent('PROCESSING_STARTED');
182+
183+
jest.advanceTimersByTime(config['STUCK_PROCESSING_TIMEOUT_MS'] - 1000);
184+
sendEvent('PROCESSING_COMPLETED');
185+
186+
jest.advanceTimersByTime(2000); // advance past original timeout
187+
await Promise.resolve();
188+
189+
expect(mockAddAlert).not.toHaveBeenCalled();
190+
expect(mockCallback).not.toHaveBeenCalled();
191+
});
192+
193+
it('should reset the stuck timer on consecutive PROCESSING_STARTED events', () => {
194+
MonitoringActor(mockCallback, mockReceive, config);
195+
sendEvent('PROCESSING_STARTED');
196+
sendEvent('PROCESSING_STARTED'); // second one clears the first
197+
expect(clearTimeout).toHaveBeenCalledTimes(1);
198+
expect(setTimeout).toHaveBeenCalledTimes(2);
199+
});
200+
201+
it('should stop the stuck timer when the actor is stopped', () => {
202+
const stopActor = MonitoringActor(mockCallback, mockReceive, config);
203+
sendEvent('PROCESSING_STARTED');
204+
stopActor();
205+
expect(clearTimeout).toHaveBeenCalledTimes(1);
206+
});
207+
208+
it('should also clear the stuck timer on DISCONNECTED', () => {
209+
MonitoringActor(mockCallback, mockReceive, config);
210+
sendEvent('CONNECTED');
211+
sendEvent('PROCESSING_STARTED');
212+
sendEvent('DISCONNECTED');
213+
// clearTimeout for stuck timer + clearInterval for idle timer
214+
expect(clearTimeout).toHaveBeenCalledTimes(1);
215+
expect(clearInterval).toHaveBeenCalledTimes(1);
216+
});
217+
218+
// ── Reconnection storm detection ─────────────────────────────────────────────
219+
220+
it('should fire a reconnection storm alert when the threshold is reached', async () => {
150221
MonitoringActor(mockCallback, mockReceive, config);
151222

152-
// Send enough reconnections to trigger the storm threshold (3 in our test config)
153-
sendEvent('RECONNECTING');
154223
sendEvent('RECONNECTING');
155224
sendEvent('RECONNECTING');
225+
sendEvent('RECONNECTING'); // threshold is 3 in test config
156226

157227
await Promise.resolve();
158-
159228
expect(mockAddAlert).toHaveBeenCalledTimes(1);
160229
expect(mockAddAlert.mock.calls[0][0]).toBe('Daemon Reconnection Storm');
161230
});
162231

163232
it('should NOT fire a reconnection storm alert below the threshold', async () => {
164233
MonitoringActor(mockCallback, mockReceive, config);
165234

166-
// Send fewer reconnections than the threshold
167235
sendEvent('RECONNECTING');
168236
sendEvent('RECONNECTING');
169237

@@ -174,33 +242,21 @@ describe('MonitoringActor', () => {
174242
it('should evict old reconnections outside the storm window', async () => {
175243
MonitoringActor(mockCallback, mockReceive, config);
176244

177-
// Two reconnections at time 0
178245
sendEvent('RECONNECTING');
179246
sendEvent('RECONNECTING');
180247

181-
// Advance past the storm window so those timestamps are evicted
182248
jest.advanceTimersByTime(config['RECONNECTION_STORM_WINDOW_MS'] + 1000);
183249

184-
// One new reconnection — count should restart from 1, no alert
250+
// Only 1 new reconnection — below threshold after eviction
185251
sendEvent('RECONNECTING');
186252

187253
await Promise.resolve();
188254
expect(mockAddAlert).not.toHaveBeenCalled();
189255
});
190256

191-
it('should restart idle timer when CONNECTED is sent while already connected', () => {
192-
MonitoringActor(mockCallback, mockReceive, config);
193-
194-
sendEvent('CONNECTED');
195-
expect(setInterval).toHaveBeenCalledTimes(1);
196-
197-
// A second CONNECTED clears the old timer and creates a new one
198-
sendEvent('CONNECTED');
199-
expect(clearInterval).toHaveBeenCalledTimes(1);
200-
expect(setInterval).toHaveBeenCalledTimes(2);
201-
});
257+
// ── Misc ─────────────────────────────────────────────────────────────────────
202258

203-
it('should ignore events of other types', () => {
259+
it('should ignore events of other types and log a warning', () => {
204260
const warnSpy = jest.spyOn(logger, 'warn');
205261
MonitoringActor(mockCallback, mockReceive, config);
206262

packages/daemon/src/actions/index.ts

Lines changed: 17 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import logger from '../logger';
1212
import { hashTxData } from '../utils';
1313
import { createStartStreamMessage, createSendAckMessage } from '../actors';
1414
import { bigIntUtils } from '@hathor/wallet-lib';
15-
import { addAlert, Severity } from '@wallet-service/common';
1615

1716
/*
1817
* This action is used to store the initial event id on the context
@@ -243,23 +242,21 @@ export const sendMonitoringReconnecting = sendTo(
243242
);
244243

245244
/*
246-
* Fires a CRITICAL alert and logs when the machine has been stuck in a processing
247-
* state for longer than STUCK_PROCESSING_TIMEOUT_MS. The machine will transition to
248-
* RECONNECTING immediately after this action runs.
245+
* Notifies the monitoring actor that a processing state was entered.
246+
* The actor starts a stuck-detection timer; if PROCESSING_COMPLETED doesn't
247+
* arrive within STUCK_PROCESSING_TIMEOUT_MS it fires a CRITICAL alert and sends
248+
* MONITORING_STUCK_PROCESSING back to the machine.
249249
*/
250-
export const alertStuckProcessing = (context: Context) => {
251-
const eventId = context.event?.event?.id;
252-
logger.error(
253-
`[monitoring] State machine stuck processing event ${eventId ?? 'unknown'} for too long — forcing reconnection`,
254-
);
255-
addAlert(
256-
'Daemon Stuck In Processing State',
257-
`The state machine has been processing event ${eventId ?? 'unknown'} ` +
258-
'for longer than the configured timeout. Forcing a reconnection.',
259-
Severity.CRITICAL,
260-
{ eventId: eventId !== undefined ? String(eventId) : 'unknown' },
261-
logger,
262-
).catch((err: Error) =>
263-
logger.error(`[monitoring] Failed to send stuck-processing alert: ${err}`),
264-
);
265-
};
250+
export const sendMonitoringProcessingStarted = sendTo(
251+
getMonitoringRefFromContext,
252+
{ type: EventTypes.MONITORING_EVENT, event: { type: 'PROCESSING_STARTED' } },
253+
);
254+
255+
/*
256+
* Notifies the monitoring actor that a processing state was exited normally,
257+
* cancelling the stuck-detection timer.
258+
*/
259+
export const sendMonitoringProcessingCompleted = sendTo(
260+
getMonitoringRefFromContext,
261+
{ type: EventTypes.MONITORING_EVENT, event: { type: 'PROCESSING_COMPLETED' } },
262+
);

0 commit comments

Comments
 (0)