Skip to content

Commit 774b2c2

Browse files
authored
Merge pull request #49 from AmadeusITGroup/refactor/websocket-buffer-cleanup
Refactor/websocket buffer cleanup
2 parents 37fbf0c + 0cdfedd commit 774b2c2

File tree

7 files changed

+53
-21
lines changed

7 files changed

+53
-21
lines changed
Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,18 @@
11
import { HttpClient } from '@angular/common/http';
22
import { inject, Injectable } from '@angular/core';
33

4-
import { Observable } from 'rxjs';
5-
import { map } from 'rxjs/operators';
4+
import { Observable, of } from 'rxjs';
5+
import { catchError, map } from 'rxjs/operators';
66
import { Book } from './book.model';
77

8+
const FALLBACK_BOOKS: Book[] = [
9+
{ id: '1', volumeInfo: { title: 'Awakenings', authors: ['Oliver Sacks'] } },
10+
{ id: '2', volumeInfo: { title: 'The Man Who Mistook His Wife for a Hat', authors: ['Oliver Sacks'] } },
11+
{ id: '3', volumeInfo: { title: 'An Anthropologist on Mars', authors: ['Oliver Sacks'] } },
12+
{ id: '4', volumeInfo: { title: 'Musicophilia', authors: ['Oliver Sacks'] } },
13+
{ id: '5', volumeInfo: { title: 'The Island of the Colourblind', authors: ['Oliver Sacks'] } },
14+
];
15+
816
@Injectable({ providedIn: 'root' })
917
export class GoogleBooksService {
1018
private readonly http = inject(HttpClient);
@@ -14,6 +22,9 @@ export class GoogleBooksService {
1422
.get<{ items: Book[] }>(
1523
'https://www.googleapis.com/books/v1/volumes?maxResults=5&orderBy=relevance&q=oliver%20sacks'
1624
)
17-
.pipe(map((books) => books.items || []));
25+
.pipe(
26+
map((books) => books.items || []),
27+
catchError(() => of(FALLBACK_BOOKS))
28+
);
1829
}
1930
}

projects/ngrx-devtool/src/lib/core/actions-interceptor.service.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import { Actions } from '@ngrx/effects';
33
import { Action } from '@ngrx/store';
44
import { filter, Subject, takeUntil, tap } from 'rxjs';
55

6-
import { DevToolMessage, EffectEvent, TrackedAction } from './core.models';
6+
import { DEFAULT_WS_URL, DevToolMessage, EffectEvent, TrackedAction } from './core.models';
77
import { EffectTrackerService } from './effect-tracker.service';
88
import { WebSocketService, WebSocketMessage } from './websocket.service';
99

@@ -16,7 +16,7 @@ export class ActionsInterceptorService implements OnDestroy {
1616
private readonly destroy$ = new Subject<void>();
1717
private initialized = false;
1818

19-
initialize(wsUrl = 'ws://localhost:4000'): void {
19+
initialize(wsUrl = DEFAULT_WS_URL): void {
2020
if (this.initialized) {
2121
return;
2222
}

projects/ngrx-devtool/src/lib/core/core.models.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import { Action } from '@ngrx/store';
22

3+
export const DEFAULT_WS_URL = 'ws://localhost:4000';
4+
35
export type EffectLifecycle = 'triggered' | 'emitted' | 'executed' | 'error';
46

57
export interface EffectEvent {

projects/ngrx-devtool/src/lib/core/websocket.service.ts

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
import { inject, Injectable, OnDestroy, PLATFORM_ID } from '@angular/core';
22
import { isPlatformBrowser } from '@angular/common';
33
import { BehaviorSubject, Observable } from 'rxjs';
4+
import { DEFAULT_WS_URL } from './core.models';
45

56
export interface WebSocketMessage {
67
readonly type: string;
78
}
89

10+
const MAX_BUFFER_SIZE = 200;
11+
912
@Injectable({ providedIn: 'root' })
1013
export class WebSocketService implements OnDestroy {
1114
private readonly platformId = inject(PLATFORM_ID);
@@ -31,7 +34,7 @@ export class WebSocketService implements OnDestroy {
3134
return this.incomingMessages$.asObservable();
3235
}
3336

34-
initialize(wsUrl = 'ws://localhost:4000'): void {
37+
initialize(wsUrl = DEFAULT_WS_URL): void {
3538
if (this.initialized) {
3639
// If already initialized with same URL, skip
3740
if (this.wsUrl === wsUrl) {
@@ -47,19 +50,17 @@ export class WebSocketService implements OnDestroy {
4750
}
4851

4952
send<T extends WebSocketMessage>(message: T): void {
50-
const payload = JSON.stringify(message);
51-
52-
if (this.isConnected && this.socket?.readyState === WebSocket.OPEN) {
53-
this.socket.send(payload);
54-
} else if (this.isBrowser) {
55-
this.messageBuffer.push(payload);
56-
}
53+
this.bufferOrSend(JSON.stringify(message));
5754
}
5855

5956
sendRaw(payload: string): void {
57+
this.bufferOrSend(payload);
58+
}
59+
60+
private bufferOrSend(payload: string): void {
6061
if (this.isConnected && this.socket?.readyState === WebSocket.OPEN) {
6162
this.socket.send(payload);
62-
} else if (this.isBrowser) {
63+
} else if (this.isBrowser && this.messageBuffer.length < MAX_BUFFER_SIZE) {
6364
this.messageBuffer.push(payload);
6465
}
6566
}
@@ -73,6 +74,7 @@ export class WebSocketService implements OnDestroy {
7374
private close(): void {
7475
this.socket?.close();
7576
this.socket = null;
77+
this.messageBuffer = [];
7678
this.connectionState$.next(false);
7779
this.initialized = false;
7880
this.wsUrl = null;
@@ -109,9 +111,11 @@ export class WebSocketService implements OnDestroy {
109111
}
110112

111113
private flushBuffer(): void {
112-
while (this.messageBuffer.length > 0 && this.isConnected) {
113-
const message = this.messageBuffer.shift();
114-
if (message && this.socket?.readyState === WebSocket.OPEN) {
114+
const buffered = this.messageBuffer;
115+
this.messageBuffer = [];
116+
117+
for (const message of buffered) {
118+
if (this.isConnected && this.socket?.readyState === WebSocket.OPEN) {
115119
this.socket.send(message);
116120
}
117121
}

projects/ngrx-devtool/src/lib/store/meta-reducer.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { ActionReducer, Action } from '@ngrx/store';
22
import { inject } from '@angular/core';
33
import { PerformanceTrackerService } from '../performance/performance-tracker.service';
44
import { WebSocketService } from '../core/websocket.service';
5+
import { DEFAULT_WS_URL } from '../core/core.models';
56

67
export interface RenderPerformanceData {
78
readonly renderTime: number;
@@ -22,13 +23,13 @@ export interface DevToolMetaReducerConfig {
2223
}
2324

2425
export function createDevToolMetaReducer(
25-
wsUrlOrConfig: string | DevToolMetaReducerConfig = 'ws://localhost:4000'
26+
wsUrlOrConfig: string | DevToolMetaReducerConfig = DEFAULT_WS_URL
2627
) {
2728
const config: DevToolMetaReducerConfig = typeof wsUrlOrConfig === 'string'
2829
? { wsUrl: wsUrlOrConfig }
2930
: wsUrlOrConfig;
3031

31-
const wsUrl = config.wsUrl ?? 'ws://localhost:4000';
32+
const wsUrl = config.wsUrl ?? DEFAULT_WS_URL;
3233
const enablePerf = config.enablePerformanceTracking ?? true;
3334

3435
return function devToolMetaReducer<State>(
@@ -85,5 +86,5 @@ export function createDevToolMetaReducer(
8586
export function loggerMetaReducer<State>(
8687
reducer: ActionReducer<State>
8788
): ActionReducer<State> {
88-
return createDevToolMetaReducer('ws://localhost:4000')(reducer);
89+
return createDevToolMetaReducer(DEFAULT_WS_URL)(reducer);
8990
}

projects/ngrx-devtool/src/lib/store/provide-devtool.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { APP_INITIALIZER, Provider } from '@angular/core';
22
import { EffectSources } from '@ngrx/effects';
33
import { ActionsInterceptorService } from '../core/actions-interceptor.service';
44
import { DevToolsEffectSources } from '../core/devtools-effect-sources';
5+
import { DEFAULT_WS_URL } from '../core/core.models';
56

67
export interface DevToolConfig {
78
readonly wsUrl?: string;
@@ -13,7 +14,7 @@ export function provideNgrxDevTool(config: DevToolConfig = {}): Provider[] {
1314
{
1415
provide: APP_INITIALIZER,
1516
useFactory: (interceptor: ActionsInterceptorService) => () => {
16-
interceptor.initialize(config.wsUrl ?? 'ws://localhost:4000');
17+
interceptor.initialize(config.wsUrl ?? DEFAULT_WS_URL);
1718
},
1819
deps: [ActionsInterceptorService],
1920
multi: true,

projects/ngrx-devtool/src/lib/testing/websocket.service.spec.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,19 @@ describe('WebSocketService', () => {
151151
expect(messages[0].type).toBe('MSG_1');
152152
expect(messages[1].type).toBe('MSG_2');
153153
});
154+
155+
it('should drop messages once the buffer reaches max size (200)', () => {
156+
service.initialize();
157+
// Don't open the socket — all messages go to buffer
158+
159+
for (let i = 0; i < 250; i++) {
160+
service.send({ type: `MSG_${i}` } as WebSocketMessage);
161+
}
162+
163+
// Open and flush — should only have the first 200
164+
getLastWs().simulateOpen();
165+
expect(getLastWs().sentMessages).toHaveLength(200);
166+
});
154167
});
155168

156169
describe('sendRaw()', () => {

0 commit comments

Comments
 (0)