Skip to content

Commit 920c4ea

Browse files
authored
fix: unhandled promise rejections during reconnect (#1585)
For an explanation of where unhandled rejections are coming from, see https://github.com/GetStream/stream-video-js/pull/1585/files#diff-420f6ddab47c1be72fd9ce8c99e1fa2b9f5f0495b7c367546ee0ff634beaed81
1 parent 5bd2188 commit 920c4ea

File tree

5 files changed

+110
-54
lines changed

5 files changed

+110
-54
lines changed

packages/client/src/StreamVideoClient.ts

Lines changed: 14 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import {
3030
import { getLogger, logToConsole, setLogger } from './logger';
3131
import { getSdkInfo } from './client-details';
3232
import { SdkType } from './gen/video/sfu/models/models';
33+
import { withoutConcurrency } from './helpers/concurrency';
3334

3435
/**
3536
* A `StreamVideoClient` instance lets you communicate with our API, and authenticate users.
@@ -51,8 +52,9 @@ export class StreamVideoClient {
5152
streamClient: StreamClient;
5253

5354
protected eventHandlersToUnregister: Array<() => void> = [];
54-
protected connectionPromise: Promise<void | ConnectedEvent> | undefined;
55-
protected disconnectionPromise: Promise<void> | undefined;
55+
private readonly connectionConcurrencyTag = Symbol(
56+
'connectionConcurrencyTag',
57+
);
5658

5759
private static _instanceMap: Map<string, StreamVideoClient> = new Map();
5860

@@ -209,12 +211,11 @@ export class StreamVideoClient {
209211
return this.streamClient.connectGuestUser(user);
210212
};
211213
}
212-
this.connectionPromise = this.disconnectionPromise
213-
? this.disconnectionPromise.then(() => connectUser())
214-
: connectUser();
215214

216-
this.connectionPromise?.finally(() => (this.connectionPromise = undefined));
217-
const connectUserResponse = await this.connectionPromise;
215+
const connectUserResponse = await withoutConcurrency(
216+
this.connectionConcurrencyTag,
217+
() => connectUser(),
218+
);
218219
// connectUserResponse will be void if connectUser called twice for the same user
219220
if (connectUserResponse?.me) {
220221
this.writeableStateStore.setConnectedUser(connectUserResponse.me);
@@ -316,19 +317,15 @@ export class StreamVideoClient {
316317
* https://developer.mozilla.org/en-US/docs/Web/API/CloseEvent
317318
*/
318319
disconnectUser = async (timeout?: number) => {
319-
if (!this.streamClient.user && !this.connectionPromise) {
320+
if (!this.streamClient.user) {
320321
return;
321322
}
322323
const userId = this.streamClient.user?.id;
323324
const apiKey = this.streamClient.key;
324325
const disconnectUser = () => this.streamClient.disconnectUser(timeout);
325-
this.disconnectionPromise = this.connectionPromise
326-
? this.connectionPromise.then(() => disconnectUser())
327-
: disconnectUser();
328-
this.disconnectionPromise.finally(
329-
() => (this.disconnectionPromise = undefined),
326+
await withoutConcurrency(this.connectionConcurrencyTag, () =>
327+
disconnectUser(),
330328
);
331-
await this.disconnectionPromise;
332329
if (userId) {
333330
StreamVideoClient._instanceMap.delete(apiKey + userId);
334331
}
@@ -556,10 +553,8 @@ export class StreamVideoClient {
556553
) => {
557554
const connectAnonymousUser = () =>
558555
this.streamClient.connectAnonymousUser(user, tokenOrProvider);
559-
this.connectionPromise = this.disconnectionPromise
560-
? this.disconnectionPromise.then(() => connectAnonymousUser())
561-
: connectAnonymousUser();
562-
this.connectionPromise.finally(() => (this.connectionPromise = undefined));
563-
return this.connectionPromise;
556+
return await withoutConcurrency(this.connectionConcurrencyTag, () =>
557+
connectAnonymousUser(),
558+
);
564559
};
565560
}

packages/client/src/__tests__/Call.test.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -271,8 +271,8 @@ describe('muting logic', () => {
271271
.mockImplementation(() => Promise.resolve({ duration: '0ms' }));
272272
});
273273

274-
it('should mute self', () => {
275-
call.muteSelf('audio');
274+
it('should mute self', async () => {
275+
await call.muteSelf('audio');
276276

277277
expect(spy).toHaveBeenCalledWith(userId, 'audio');
278278
});

packages/client/src/coordinator/connection/client.ts

Lines changed: 33 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,12 @@ import {
3838
} from './types';
3939
import { InsightMetrics, postInsights } from './insights';
4040
import { getLocationHint } from './location';
41-
import { CreateGuestRequest, CreateGuestResponse } from '../../gen/coordinator';
41+
import {
42+
ConnectedEvent,
43+
CreateGuestRequest,
44+
CreateGuestResponse,
45+
} from '../../gen/coordinator';
46+
import { makeSafePromise, type SafePromise } from '../../helpers/promise';
4247

4348
export class StreamClient {
4449
_user?: UserWithId;
@@ -67,14 +72,14 @@ export class StreamClient {
6772
wsBaseURL?: string;
6873
wsConnection: StableWSConnection | null;
6974
wsFallback?: WSConnectionFallback;
70-
wsPromise: ConnectAPIResponse | null;
75+
private wsPromiseSafe: SafePromise<ConnectedEvent | undefined> | null;
7176
consecutiveFailures: number;
7277
insightMetrics: InsightMetrics;
7378
defaultWSTimeoutWithFallback: number;
7479
defaultWSTimeout: number;
7580
resolveConnectionId?: Function;
7681
rejectConnectionId?: Function;
77-
connectionIdPromise?: Promise<string | undefined>;
82+
private connectionIdPromiseSafe?: SafePromise<string | undefined>;
7883
guestUserCreatePromise?: Promise<CreateGuestResponse>;
7984

8085
/**
@@ -155,7 +160,7 @@ export class StreamClient {
155160

156161
// WS connection is initialized when setUser is called
157162
this.wsConnection = null;
158-
this.wsPromise = null;
163+
this.wsPromiseSafe = null;
159164
this.setUserPromise = null;
160165

161166
// mapping between channel groups and configs
@@ -340,12 +345,13 @@ export class StreamClient {
340345
);
341346
}
342347

343-
if (this.wsConnection?.isConnecting && this.wsPromise) {
348+
const wsPromise = this.wsPromiseSafe?.();
349+
if (this.wsConnection?.isConnecting && wsPromise) {
344350
this.logger(
345351
'info',
346352
'client:openConnection() - connection already in progress',
347353
);
348-
return this.wsPromise;
354+
return await wsPromise;
349355
}
350356

351357
if (
@@ -357,14 +363,15 @@ export class StreamClient {
357363
'client:openConnection() - openConnection called twice, healthy connection already exists',
358364
);
359365

360-
return Promise.resolve();
366+
return;
361367
}
362368

363369
this._setupConnectionIdPromise();
364370

365371
this.clientID = `${this.userID}--${randomId()}`;
366-
this.wsPromise = this.connect();
367-
return this.wsPromise;
372+
const newWsPromise = this.connect();
373+
this.wsPromiseSafe = makeSafePromise(newWsPromise);
374+
return await newWsPromise;
368375
};
369376

370377
/**
@@ -388,7 +395,7 @@ export class StreamClient {
388395

389396
this.tokenManager.reset();
390397

391-
this.connectionIdPromise = undefined;
398+
this.connectionIdPromiseSafe = undefined;
392399
this.rejectConnectionId = undefined;
393400
this.resolveConnectionId = undefined;
394401
};
@@ -481,16 +488,28 @@ export class StreamClient {
481488
/**
482489
* sets up the this.connectionIdPromise
483490
*/
484-
_setupConnectionIdPromise = async () => {
491+
_setupConnectionIdPromise = () => {
485492
/** a promise that is resolved once connection id is set */
486-
this.connectionIdPromise = new Promise<string | undefined>(
487-
(resolve, reject) => {
493+
this.connectionIdPromiseSafe = makeSafePromise(
494+
new Promise<string | undefined>((resolve, reject) => {
488495
this.resolveConnectionId = resolve;
489496
this.rejectConnectionId = reject;
490-
},
497+
}),
491498
);
492499
};
493500

501+
get connectionIdPromise() {
502+
return this.connectionIdPromiseSafe?.();
503+
}
504+
505+
get isConnectionIsPromisePending() {
506+
return this.connectionIdPromiseSafe?.checkPending() ?? false;
507+
}
508+
509+
get wsPromise() {
510+
return this.wsPromiseSafe?.();
511+
}
512+
494513
_logApiRequest = (
495514
type: string,
496515
url: string,

packages/client/src/coordinator/connection/connection.ts

Lines changed: 14 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -8,20 +8,15 @@ import {
88
import {
99
addConnectionEventListeners,
1010
convertErrorToJson,
11-
isPromisePending,
1211
KnownCodes,
1312
randomId,
1413
removeConnectionEventListeners,
1514
retryInterval,
1615
sleep,
1716
} from './utils';
18-
import type {
19-
ConnectAPIResponse,
20-
LogLevel,
21-
StreamVideoEvent,
22-
UR,
23-
} from './types';
17+
import type { LogLevel, StreamVideoEvent, UR } from './types';
2418
import type { ConnectedEvent, WSAuthMessage } from '../../gen/coordinator';
19+
import { makeSafePromise, type SafePromise } from '../../helpers/promise';
2520

2621
// Type guards to check WebSocket error type
2722
const isCloseEvent = (
@@ -54,7 +49,7 @@ const isErrorEvent = (
5449
export class StableWSConnection {
5550
// local vars
5651
connectionID?: string;
57-
connectionOpen?: ConnectAPIResponse;
52+
private connectionOpenSafe?: SafePromise<ConnectedEvent>;
5853
authenticationSent: boolean;
5954
consecutiveFailures: number;
6055
pingInterval: number;
@@ -338,13 +333,7 @@ export class StableWSConnection {
338333
await this.client.tokenManager.loadToken();
339334
}
340335

341-
let mustSetupConnectionIdPromise = true;
342-
if (this.client.connectionIdPromise) {
343-
if (await isPromisePending(this.client.connectionIdPromise)) {
344-
mustSetupConnectionIdPromise = false;
345-
}
346-
}
347-
if (mustSetupConnectionIdPromise) {
336+
if (!this.client.isConnectionIsPromisePending) {
348337
this.client._setupConnectionIdPromise();
349338
}
350339
this._setupConnectionPromise();
@@ -747,12 +736,18 @@ export class StableWSConnection {
747736
_setupConnectionPromise = () => {
748737
this.isResolved = false;
749738
/** a promise that is resolved once ws.open is called */
750-
this.connectionOpen = new Promise<ConnectedEvent>((resolve, reject) => {
751-
this.resolvePromise = resolve;
752-
this.rejectPromise = reject;
753-
});
739+
this.connectionOpenSafe = makeSafePromise(
740+
new Promise<ConnectedEvent>((resolve, reject) => {
741+
this.resolvePromise = resolve;
742+
this.rejectPromise = reject;
743+
}),
744+
);
754745
};
755746

747+
get connectionOpen() {
748+
return this.connectionOpenSafe?.();
749+
}
750+
756751
/**
757752
* Schedules a next health check ping for websocket.
758753
*/
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
export interface SafePromise<T> {
2+
(): Promise<T>;
3+
checkPending(): boolean;
4+
}
5+
6+
type Fulfillment<T> =
7+
| {
8+
status: 'resolved';
9+
result: T;
10+
}
11+
| {
12+
status: 'rejected';
13+
error: unknown;
14+
};
15+
16+
/**
17+
* Saving a long-lived reference to a promise that can reject can be unsafe,
18+
* since rejecting the promise causes an unhandled rejection error (even if the
19+
* rejection is handled everywhere promise result is expected).
20+
*
21+
* To avoid that, we add both resolution and rejection handlers to the promise.
22+
* That way, the saved promise never rejects. A callback is provided as return
23+
* value to build a *new* promise, that resolves and rejects along with
24+
* the original promise.
25+
* @param promise Promise to wrap, which possibly rejects
26+
* @returns Callback to build a new promise, which resolves and rejects along
27+
* with the original promise
28+
*/
29+
export function makeSafePromise<T>(promise: Promise<T>): SafePromise<T> {
30+
let isPending = true;
31+
32+
const safePromise: Promise<Fulfillment<T>> = promise
33+
.then(
34+
(result) => ({ status: 'resolved' as const, result }),
35+
(error) => ({ status: 'rejected' as const, error }),
36+
)
37+
.finally(() => (isPending = false));
38+
39+
const unwrapPromise = () =>
40+
safePromise.then((fulfillment) => {
41+
if (fulfillment.status === 'rejected') throw fulfillment.error;
42+
return fulfillment.result;
43+
});
44+
45+
unwrapPromise.checkPending = () => isPending;
46+
return unwrapPromise;
47+
}

0 commit comments

Comments
 (0)