diff --git a/packages/client/src/StreamVideoClient.ts b/packages/client/src/StreamVideoClient.ts index 01285741c0..5e649c6fe1 100644 --- a/packages/client/src/StreamVideoClient.ts +++ b/packages/client/src/StreamVideoClient.ts @@ -30,6 +30,7 @@ import { import { getLogger, logToConsole, setLogger } from './logger'; import { getSdkInfo } from './client-details'; import { SdkType } from './gen/video/sfu/models/models'; +import { withoutConcurrency } from './helpers/concurrency'; /** * A `StreamVideoClient` instance lets you communicate with our API, and authenticate users. @@ -51,8 +52,9 @@ export class StreamVideoClient { streamClient: StreamClient; protected eventHandlersToUnregister: Array<() => void> = []; - protected connectionPromise: Promise | undefined; - protected disconnectionPromise: Promise | undefined; + private readonly connectionConcurrencyTag = Symbol( + 'connectionConcurrencyTag', + ); private static _instanceMap: Map = new Map(); @@ -209,12 +211,11 @@ export class StreamVideoClient { return this.streamClient.connectGuestUser(user); }; } - this.connectionPromise = this.disconnectionPromise - ? this.disconnectionPromise.then(() => connectUser()) - : connectUser(); - this.connectionPromise?.finally(() => (this.connectionPromise = undefined)); - const connectUserResponse = await this.connectionPromise; + const connectUserResponse = await withoutConcurrency( + this.connectionConcurrencyTag, + () => connectUser(), + ); // connectUserResponse will be void if connectUser called twice for the same user if (connectUserResponse?.me) { this.writeableStateStore.setConnectedUser(connectUserResponse.me); @@ -316,19 +317,15 @@ export class StreamVideoClient { * https://developer.mozilla.org/en-US/docs/Web/API/CloseEvent */ disconnectUser = async (timeout?: number) => { - if (!this.streamClient.user && !this.connectionPromise) { + if (!this.streamClient.user) { return; } const userId = this.streamClient.user?.id; const apiKey = this.streamClient.key; const disconnectUser = () => this.streamClient.disconnectUser(timeout); - this.disconnectionPromise = this.connectionPromise - ? this.connectionPromise.then(() => disconnectUser()) - : disconnectUser(); - this.disconnectionPromise.finally( - () => (this.disconnectionPromise = undefined), + await withoutConcurrency(this.connectionConcurrencyTag, () => + disconnectUser(), ); - await this.disconnectionPromise; if (userId) { StreamVideoClient._instanceMap.delete(apiKey + userId); } @@ -556,10 +553,8 @@ export class StreamVideoClient { ) => { const connectAnonymousUser = () => this.streamClient.connectAnonymousUser(user, tokenOrProvider); - this.connectionPromise = this.disconnectionPromise - ? this.disconnectionPromise.then(() => connectAnonymousUser()) - : connectAnonymousUser(); - this.connectionPromise.finally(() => (this.connectionPromise = undefined)); - return this.connectionPromise; + return await withoutConcurrency(this.connectionConcurrencyTag, () => + connectAnonymousUser(), + ); }; } diff --git a/packages/client/src/__tests__/Call.test.ts b/packages/client/src/__tests__/Call.test.ts index f6f77c4c58..fcd06fb10a 100644 --- a/packages/client/src/__tests__/Call.test.ts +++ b/packages/client/src/__tests__/Call.test.ts @@ -271,8 +271,8 @@ describe('muting logic', () => { .mockImplementation(() => Promise.resolve({ duration: '0ms' })); }); - it('should mute self', () => { - call.muteSelf('audio'); + it('should mute self', async () => { + await call.muteSelf('audio'); expect(spy).toHaveBeenCalledWith(userId, 'audio'); }); diff --git a/packages/client/src/coordinator/connection/client.ts b/packages/client/src/coordinator/connection/client.ts index 80e909395d..106c876560 100644 --- a/packages/client/src/coordinator/connection/client.ts +++ b/packages/client/src/coordinator/connection/client.ts @@ -38,7 +38,12 @@ import { } from './types'; import { InsightMetrics, postInsights } from './insights'; import { getLocationHint } from './location'; -import { CreateGuestRequest, CreateGuestResponse } from '../../gen/coordinator'; +import { + ConnectedEvent, + CreateGuestRequest, + CreateGuestResponse, +} from '../../gen/coordinator'; +import { makeSafePromise, type SafePromise } from '../../helpers/promise'; export class StreamClient { _user?: UserWithId; @@ -67,14 +72,14 @@ export class StreamClient { wsBaseURL?: string; wsConnection: StableWSConnection | null; wsFallback?: WSConnectionFallback; - wsPromise: ConnectAPIResponse | null; + private wsPromiseSafe: SafePromise | null; consecutiveFailures: number; insightMetrics: InsightMetrics; defaultWSTimeoutWithFallback: number; defaultWSTimeout: number; resolveConnectionId?: Function; rejectConnectionId?: Function; - connectionIdPromise?: Promise; + private connectionIdPromiseSafe?: SafePromise; guestUserCreatePromise?: Promise; /** @@ -155,7 +160,7 @@ export class StreamClient { // WS connection is initialized when setUser is called this.wsConnection = null; - this.wsPromise = null; + this.wsPromiseSafe = null; this.setUserPromise = null; // mapping between channel groups and configs @@ -340,12 +345,13 @@ export class StreamClient { ); } - if (this.wsConnection?.isConnecting && this.wsPromise) { + const wsPromise = this.wsPromiseSafe?.(); + if (this.wsConnection?.isConnecting && wsPromise) { this.logger( 'info', 'client:openConnection() - connection already in progress', ); - return this.wsPromise; + return await wsPromise; } if ( @@ -357,14 +363,15 @@ export class StreamClient { 'client:openConnection() - openConnection called twice, healthy connection already exists', ); - return Promise.resolve(); + return; } this._setupConnectionIdPromise(); this.clientID = `${this.userID}--${randomId()}`; - this.wsPromise = this.connect(); - return this.wsPromise; + const newWsPromise = this.connect(); + this.wsPromiseSafe = makeSafePromise(newWsPromise); + return await newWsPromise; }; /** @@ -388,7 +395,7 @@ export class StreamClient { this.tokenManager.reset(); - this.connectionIdPromise = undefined; + this.connectionIdPromiseSafe = undefined; this.rejectConnectionId = undefined; this.resolveConnectionId = undefined; }; @@ -481,16 +488,28 @@ export class StreamClient { /** * sets up the this.connectionIdPromise */ - _setupConnectionIdPromise = async () => { + _setupConnectionIdPromise = () => { /** a promise that is resolved once connection id is set */ - this.connectionIdPromise = new Promise( - (resolve, reject) => { + this.connectionIdPromiseSafe = makeSafePromise( + new Promise((resolve, reject) => { this.resolveConnectionId = resolve; this.rejectConnectionId = reject; - }, + }), ); }; + get connectionIdPromise() { + return this.connectionIdPromiseSafe?.(); + } + + get isConnectionIsPromisePending() { + return this.connectionIdPromiseSafe?.checkPending() ?? false; + } + + get wsPromise() { + return this.wsPromiseSafe?.(); + } + _logApiRequest = ( type: string, url: string, diff --git a/packages/client/src/coordinator/connection/connection.ts b/packages/client/src/coordinator/connection/connection.ts index 9a2f04bdbb..12297965d0 100644 --- a/packages/client/src/coordinator/connection/connection.ts +++ b/packages/client/src/coordinator/connection/connection.ts @@ -8,20 +8,15 @@ import { import { addConnectionEventListeners, convertErrorToJson, - isPromisePending, KnownCodes, randomId, removeConnectionEventListeners, retryInterval, sleep, } from './utils'; -import type { - ConnectAPIResponse, - LogLevel, - StreamVideoEvent, - UR, -} from './types'; +import type { LogLevel, StreamVideoEvent, UR } from './types'; import type { ConnectedEvent, WSAuthMessage } from '../../gen/coordinator'; +import { makeSafePromise, type SafePromise } from '../../helpers/promise'; // Type guards to check WebSocket error type const isCloseEvent = ( @@ -54,7 +49,7 @@ const isErrorEvent = ( export class StableWSConnection { // local vars connectionID?: string; - connectionOpen?: ConnectAPIResponse; + private connectionOpenSafe?: SafePromise; authenticationSent: boolean; consecutiveFailures: number; pingInterval: number; @@ -338,13 +333,7 @@ export class StableWSConnection { await this.client.tokenManager.loadToken(); } - let mustSetupConnectionIdPromise = true; - if (this.client.connectionIdPromise) { - if (await isPromisePending(this.client.connectionIdPromise)) { - mustSetupConnectionIdPromise = false; - } - } - if (mustSetupConnectionIdPromise) { + if (!this.client.isConnectionIsPromisePending) { this.client._setupConnectionIdPromise(); } this._setupConnectionPromise(); @@ -747,12 +736,18 @@ export class StableWSConnection { _setupConnectionPromise = () => { this.isResolved = false; /** a promise that is resolved once ws.open is called */ - this.connectionOpen = new Promise((resolve, reject) => { - this.resolvePromise = resolve; - this.rejectPromise = reject; - }); + this.connectionOpenSafe = makeSafePromise( + new Promise((resolve, reject) => { + this.resolvePromise = resolve; + this.rejectPromise = reject; + }), + ); }; + get connectionOpen() { + return this.connectionOpenSafe?.(); + } + /** * Schedules a next health check ping for websocket. */ diff --git a/packages/client/src/helpers/promise.ts b/packages/client/src/helpers/promise.ts new file mode 100644 index 0000000000..482bc7c07a --- /dev/null +++ b/packages/client/src/helpers/promise.ts @@ -0,0 +1,47 @@ +export interface SafePromise { + (): Promise; + checkPending(): boolean; +} + +type Fulfillment = + | { + status: 'resolved'; + result: T; + } + | { + status: 'rejected'; + error: unknown; + }; + +/** + * Saving a long-lived reference to a promise that can reject can be unsafe, + * since rejecting the promise causes an unhandled rejection error (even if the + * rejection is handled everywhere promise result is expected). + * + * To avoid that, we add both resolution and rejection handlers to the promise. + * That way, the saved promise never rejects. A callback is provided as return + * value to build a *new* promise, that resolves and rejects along with + * the original promise. + * @param promise Promise to wrap, which possibly rejects + * @returns Callback to build a new promise, which resolves and rejects along + * with the original promise + */ +export function makeSafePromise(promise: Promise): SafePromise { + let isPending = true; + + const safePromise: Promise> = promise + .then( + (result) => ({ status: 'resolved' as const, result }), + (error) => ({ status: 'rejected' as const, error }), + ) + .finally(() => (isPending = false)); + + const unwrapPromise = () => + safePromise.then((fulfillment) => { + if (fulfillment.status === 'rejected') throw fulfillment.error; + return fulfillment.result; + }); + + unwrapPromise.checkPending = () => isPending; + return unwrapPromise; +}