Skip to content

Commit 77e0b95

Browse files
committed
fix(api-graphql): make ConnectionHealthMonitor production-ready
- Replace manual observer pattern with RxJS BehaviorSubject - Add lifecycle management to prevent infinite timer loops - Implement proper error handling with try-catch blocks - Add input validation for constructor parameters - Fix memory leak from improper Observable usage - Cap consecutive missed keep-alives at reasonable limit - Ensure timers stop when monitor closes - Add JSDoc comments for public methods - Complete observables properly on close BREAKING CHANGE: getHealthStateObservable() now returns Observable<ConnectionHealthState> directly instead of Optional
1 parent b3fa5bd commit 77e0b95

File tree

3 files changed

+159
-87
lines changed

3 files changed

+159
-87
lines changed

packages/api-graphql/__tests__/utils/ConnectionHealthMonitor.test.ts

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -155,11 +155,12 @@ describe('ConnectionHealthMonitor', () => {
155155
const states: boolean[] = [];
156156
const observable = monitor.getHealthStateObservable();
157157

158-
const subscription = observable!.subscribe(state => {
158+
const subscription = observable.subscribe(state => {
159159
states.push(state.isHealthy);
160160

161-
if (states.length === 2) {
162-
expect(states).toEqual([true, true]);
161+
// BehaviorSubject emits initial state, then updates
162+
if (states.length === 3) {
163+
expect(states).toEqual([false, true, true]); // initial, connected, keep-alive
163164
subscription.unsubscribe();
164165
done();
165166
}
@@ -168,6 +169,18 @@ describe('ConnectionHealthMonitor', () => {
168169
monitor.recordConnectionEstablished();
169170
monitor.recordKeepAlive();
170171
});
172+
173+
it('should handle constructor validation', () => {
174+
expect(() => new ConnectionHealthMonitor('test', 0, 1000)).toThrow(
175+
'healthCheckThresholdMs must be positive',
176+
);
177+
expect(() => new ConnectionHealthMonitor('test', 1000, 0)).toThrow(
178+
'healthCheckIntervalMs must be positive',
179+
);
180+
expect(() => new ConnectionHealthMonitor('test', 1000, 2000)).toThrow(
181+
'healthCheckIntervalMs must be less than healthCheckThresholdMs',
182+
);
183+
});
171184
});
172185

173186
describe('health check timer', () => {

packages/api-graphql/src/Providers/AWSWebSocketProvider/index.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,10 @@ import {
4040
ReconnectEvent,
4141
ReconnectionMonitor,
4242
} from '../../utils/ReconnectionMonitor';
43-
import { ConnectionHealthMonitor } from '../../utils/ConnectionHealthMonitor';
43+
import {
44+
ConnectionHealthMonitor,
45+
type ConnectionHealthState,
46+
} from '../../utils/ConnectionHealthMonitor';
4447
import type { AWSAppSyncRealTimeProviderOptions } from '../AWSAppSyncRealTimeProvider';
4548

4649
import {
@@ -110,8 +113,9 @@ export abstract class AWSWebSocketProvider {
110113
*/
111114
/**
112115
* Get the connection health state observable
116+
* @returns Observable of connection health state
113117
*/
114-
getConnectionHealthStateObservable() {
118+
getConnectionHealthStateObservable(): Observable<ConnectionHealthState> {
115119
return this.connectionHealthMonitor.getHealthStateObservable();
116120
}
117121

packages/api-graphql/src/utils/ConnectionHealthMonitor.ts

Lines changed: 137 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@
22
// SPDX-License-Identifier: Apache-2.0
33

44
import { ConsoleLogger } from '@aws-amplify/core';
5-
import { Observable, Observer } from 'rxjs';
5+
import { BehaviorSubject, Observable, Subject } from 'rxjs';
6+
import { takeUntil } from 'rxjs/operators';
67

78
export interface ConnectionHealthState {
89
isHealthy: boolean;
@@ -25,163 +26,210 @@ export enum HEALTH_EVENT {
2526
*/
2627
export class ConnectionHealthMonitor {
2728
private readonly logger: ConsoleLogger;
28-
private _healthState: ConnectionHealthState;
29-
private _healthStateObservable?: Observable<ConnectionHealthState>;
30-
private _healthStateObserver?: Observer<ConnectionHealthState>;
29+
private readonly destroy$ = new Subject<void>();
30+
private readonly healthStateSubject: BehaviorSubject<ConnectionHealthState>;
31+
public readonly healthState$: Observable<ConnectionHealthState>;
3132
private healthCheckTimer?: ReturnType<typeof setTimeout>;
3233
private readonly healthCheckThresholdMs: number;
3334
private readonly healthCheckIntervalMs: number;
35+
private isActive = true;
3436

3537
constructor(
3638
loggerName = 'ConnectionHealthMonitor',
3739
healthCheckThresholdMs = 30000,
3840
healthCheckIntervalMs = 5000,
3941
) {
42+
// Validate inputs
43+
if (healthCheckThresholdMs <= 0) {
44+
throw new Error('healthCheckThresholdMs must be positive');
45+
}
46+
if (healthCheckIntervalMs <= 0) {
47+
throw new Error('healthCheckIntervalMs must be positive');
48+
}
49+
if (healthCheckIntervalMs >= healthCheckThresholdMs) {
50+
throw new Error(
51+
'healthCheckIntervalMs must be less than healthCheckThresholdMs',
52+
);
53+
}
54+
4055
this.logger = new ConsoleLogger(loggerName);
4156
this.healthCheckThresholdMs = healthCheckThresholdMs;
4257
this.healthCheckIntervalMs = healthCheckIntervalMs;
4358

44-
this._healthState = {
59+
const initialState: ConnectionHealthState = {
4560
isHealthy: false,
4661
consecutiveMissedKeepAlives: 0,
4762
totalKeepAlivesReceived: 0,
4863
};
4964

50-
this._healthStateObservable = new Observable(observer => {
51-
this._healthStateObserver = observer;
52-
53-
return () => {
54-
this.stopHealthCheck();
55-
this._healthStateObserver = undefined;
56-
};
57-
});
65+
this.healthStateSubject = new BehaviorSubject(initialState);
66+
this.healthState$ = this.healthStateSubject
67+
.asObservable()
68+
.pipe(takeUntil(this.destroy$));
5869
}
5970

6071
/**
6172
* Records a keep-alive message receipt
6273
*/
6374
recordKeepAlive(): void {
64-
const currentTime = Date.now();
75+
if (!this.isActive) {
76+
return;
77+
}
6578

66-
this.logger.debug(HEALTH_EVENT.KEEP_ALIVE_RECEIVED);
79+
try {
80+
const currentTime = Date.now();
81+
this.logger.debug(HEALTH_EVENT.KEEP_ALIVE_RECEIVED);
6782

68-
const previouslyUnhealthy = !this._healthState.isHealthy;
83+
const currentState = this.healthStateSubject.getValue();
84+
const previouslyUnhealthy = !currentState.isHealthy;
6985

70-
this._healthState = {
71-
...this._healthState,
72-
lastKeepAliveTime: currentTime,
73-
isHealthy: true,
74-
consecutiveMissedKeepAlives: 0,
75-
totalKeepAlivesReceived: this._healthState.totalKeepAlivesReceived + 1,
76-
};
86+
const newState: ConnectionHealthState = {
87+
...currentState,
88+
lastKeepAliveTime: currentTime,
89+
isHealthy: true,
90+
consecutiveMissedKeepAlives: 0,
91+
totalKeepAlivesReceived: currentState.totalKeepAlivesReceived + 1,
92+
};
7793

78-
this.notifyObservers();
94+
this.healthStateSubject.next(newState);
7995

80-
if (previouslyUnhealthy) {
81-
this.logger.info('WebSocket connection recovered');
82-
}
96+
if (previouslyUnhealthy) {
97+
this.logger.info('WebSocket connection recovered');
98+
}
8399

84-
// Reset health check timer
85-
this.scheduleNextHealthCheck();
100+
// Reset health check timer
101+
this.scheduleNextHealthCheck();
102+
} catch (error) {
103+
this.logger.error('Error in recordKeepAlive:', error);
104+
}
86105
}
87106

88107
/**
89108
* Records connection establishment
90109
*/
91110
recordConnectionEstablished(): void {
92-
const currentTime = Date.now();
111+
if (!this.isActive) {
112+
return;
113+
}
93114

94-
this.logger.debug(HEALTH_EVENT.CONNECTION_ESTABLISHED);
115+
try {
116+
const currentTime = Date.now();
117+
this.logger.debug(HEALTH_EVENT.CONNECTION_ESTABLISHED);
95118

96-
this._healthState = {
97-
isHealthy: true,
98-
connectionStartTime: currentTime,
99-
lastKeepAliveTime: currentTime,
100-
consecutiveMissedKeepAlives: 0,
101-
totalKeepAlivesReceived: 0,
102-
};
119+
const newState: ConnectionHealthState = {
120+
isHealthy: true,
121+
connectionStartTime: currentTime,
122+
lastKeepAliveTime: currentTime,
123+
consecutiveMissedKeepAlives: 0,
124+
totalKeepAlivesReceived: 0,
125+
};
103126

104-
this.notifyObservers();
105-
this.scheduleNextHealthCheck();
127+
this.healthStateSubject.next(newState);
128+
this.scheduleNextHealthCheck();
129+
} catch (error) {
130+
this.logger.error('Error in recordConnectionEstablished:', error);
131+
}
106132
}
107133

108134
/**
109135
* Records a missed keep-alive
110136
*/
111137
private recordKeepAliveMissed(): void {
112-
const wasHealthy = this._healthState.isHealthy;
138+
if (!this.isActive) {
139+
return;
140+
}
113141

114-
this._healthState = {
115-
...this._healthState,
116-
isHealthy: false,
117-
consecutiveMissedKeepAlives:
118-
this._healthState.consecutiveMissedKeepAlives + 1,
119-
};
142+
try {
143+
const currentState = this.healthStateSubject.getValue();
144+
const wasHealthy = currentState.isHealthy;
120145

121-
if (wasHealthy) {
122-
this.logger.warn(
123-
`${HEALTH_EVENT.KEEP_ALIVE_MISSED} - WebSocket may be unhealthy`,
146+
// Cap consecutive misses at a reasonable number
147+
const newConsecutiveMisses = Math.min(
148+
currentState.consecutiveMissedKeepAlives + 1,
149+
100,
124150
);
125-
}
126151

127-
this.notifyObservers();
152+
const newState: ConnectionHealthState = {
153+
...currentState,
154+
isHealthy: false,
155+
consecutiveMissedKeepAlives: newConsecutiveMisses,
156+
};
157+
158+
this.healthStateSubject.next(newState);
159+
160+
if (wasHealthy) {
161+
this.logger.warn(
162+
`${HEALTH_EVENT.KEEP_ALIVE_MISSED} - WebSocket may be unhealthy`,
163+
);
164+
}
165+
} catch (error) {
166+
this.logger.error('Error in recordKeepAliveMissed:', error);
167+
}
128168
}
129169

130170
/**
131171
* Gets the current health state
172+
* @returns A copy of the current health state
132173
*/
133174
getHealthState(): ConnectionHealthState {
134-
return { ...this._healthState };
175+
return { ...this.healthStateSubject.getValue() };
135176
}
136177

137178
/**
138179
* Checks if the connection is currently healthy
180+
* @returns true if the connection is healthy, false otherwise
139181
*/
140182
isHealthy(): boolean {
141-
if (!this._healthState.lastKeepAliveTime) {
183+
const state = this.healthStateSubject.getValue();
184+
if (!state.lastKeepAliveTime) {
142185
return false;
143186
}
144187

145-
const timeSinceLastKeepAlive =
146-
Date.now() - this._healthState.lastKeepAliveTime;
188+
const timeSinceLastKeepAlive = Date.now() - state.lastKeepAliveTime;
147189

148190
return timeSinceLastKeepAlive < this.healthCheckThresholdMs;
149191
}
150192

151193
/**
152194
* Returns an observable for monitoring health state changes
195+
* @returns Observable that emits health state changes
153196
*/
154-
getHealthStateObservable(): Observable<ConnectionHealthState> | undefined {
155-
return this._healthStateObservable;
156-
}
157-
158-
/**
159-
* Notifies observers of state changes
160-
*/
161-
private notifyObservers(): void {
162-
if (this._healthStateObserver) {
163-
this._healthStateObserver.next({ ...this._healthState });
164-
}
197+
getHealthStateObservable(): Observable<ConnectionHealthState> {
198+
return this.healthState$;
165199
}
166200

167201
/**
168202
* Schedules the next health check
169203
*/
170204
private scheduleNextHealthCheck(): void {
205+
if (!this.isActive) {
206+
return;
207+
}
208+
171209
this.stopHealthCheck();
172210

173211
this.healthCheckTimer = setTimeout(() => {
174-
if (this._healthState.lastKeepAliveTime) {
175-
const timeSinceLastKeepAlive =
176-
Date.now() - this._healthState.lastKeepAliveTime;
212+
if (!this.isActive) {
213+
return;
214+
}
215+
216+
try {
217+
const state = this.healthStateSubject.getValue();
218+
if (state.lastKeepAliveTime) {
219+
const timeSinceLastKeepAlive = Date.now() - state.lastKeepAliveTime;
177220

178-
if (timeSinceLastKeepAlive >= this.healthCheckThresholdMs) {
179-
this.recordKeepAliveMissed();
221+
if (timeSinceLastKeepAlive >= this.healthCheckThresholdMs) {
222+
this.recordKeepAliveMissed();
223+
}
180224
}
181-
}
182225

183-
// Schedule next check
184-
this.scheduleNextHealthCheck();
226+
// Schedule next check only if still active
227+
if (this.isActive) {
228+
this.scheduleNextHealthCheck();
229+
}
230+
} catch (error) {
231+
this.logger.error('Error in health check:', error);
232+
}
185233
}, this.healthCheckIntervalMs);
186234
}
187235

@@ -196,22 +244,29 @@ export class ConnectionHealthMonitor {
196244
}
197245

198246
/**
199-
* Resets the monitor state and closes observers
247+
* Closes the monitor and cleans up resources
200248
*/
201249
close(): void {
250+
if (!this.isActive) {
251+
return; // Already closed
252+
}
253+
202254
this.logger.debug('Closing ConnectionHealthMonitor');
203255

256+
this.isActive = false;
204257
this.stopHealthCheck();
205258

206-
if (this._healthStateObserver) {
207-
this._healthStateObserver.complete();
208-
this._healthStateObserver = undefined;
209-
}
210-
211-
this._healthState = {
259+
// Emit final state
260+
const finalState: ConnectionHealthState = {
212261
isHealthy: false,
213262
consecutiveMissedKeepAlives: 0,
214263
totalKeepAlivesReceived: 0,
215264
};
265+
this.healthStateSubject.next(finalState);
266+
267+
// Complete subjects
268+
this.destroy$.next();
269+
this.destroy$.complete();
270+
this.healthStateSubject.complete();
216271
}
217272
}

0 commit comments

Comments
 (0)