Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 14 additions & 19 deletions packages/client/src/StreamVideoClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import {
import { getLogger, logToConsole, setLogger } from './logger';
import { getSdkInfo } from './client-details';
import { SdkType } from './gen/video/sfu/models/models';
import { withoutConcurrency } from './helpers/concurrency';

/**
* A `StreamVideoClient` instance lets you communicate with our API, and authenticate users.
Expand All @@ -51,8 +52,9 @@ export class StreamVideoClient {
streamClient: StreamClient;

protected eventHandlersToUnregister: Array<() => void> = [];
protected connectionPromise: Promise<void | ConnectedEvent> | undefined;
protected disconnectionPromise: Promise<void> | undefined;
private readonly connectionConcurrencyTag = Symbol(
'connectionConcurrencyTag',
);

private static _instanceMap: Map<string, StreamVideoClient> = new Map();

Expand Down Expand Up @@ -209,12 +211,11 @@ export class StreamVideoClient {
return this.streamClient.connectGuestUser(user);
};
}
this.connectionPromise = this.disconnectionPromise
? this.disconnectionPromise.then(() => connectUser())
: connectUser();

this.connectionPromise?.finally(() => (this.connectionPromise = undefined));
const connectUserResponse = await this.connectionPromise;
const connectUserResponse = await withoutConcurrency(
this.connectionConcurrencyTag,
() => connectUser(),
);
// connectUserResponse will be void if connectUser called twice for the same user
if (connectUserResponse?.me) {
this.writeableStateStore.setConnectedUser(connectUserResponse.me);
Expand Down Expand Up @@ -316,19 +317,15 @@ export class StreamVideoClient {
* https://developer.mozilla.org/en-US/docs/Web/API/CloseEvent
*/
disconnectUser = async (timeout?: number) => {
if (!this.streamClient.user && !this.connectionPromise) {
if (!this.streamClient.user) {
return;
}
const userId = this.streamClient.user?.id;
const apiKey = this.streamClient.key;
const disconnectUser = () => this.streamClient.disconnectUser(timeout);
this.disconnectionPromise = this.connectionPromise
? this.connectionPromise.then(() => disconnectUser())
: disconnectUser();
this.disconnectionPromise.finally(
() => (this.disconnectionPromise = undefined),
await withoutConcurrency(this.connectionConcurrencyTag, () =>
disconnectUser(),
Comment on lines -325 to +327
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had a lot of this disconnectionPromise/connectionPromise chaining to make sure the two don't run at the same time. Replacing with withoutConcurrency for simplicity. Also allows us to get rid of two stored promises.

);
await this.disconnectionPromise;
if (userId) {
StreamVideoClient._instanceMap.delete(apiKey + userId);
}
Expand Down Expand Up @@ -556,10 +553,8 @@ export class StreamVideoClient {
) => {
const connectAnonymousUser = () =>
this.streamClient.connectAnonymousUser(user, tokenOrProvider);
this.connectionPromise = this.disconnectionPromise
? this.disconnectionPromise.then(() => connectAnonymousUser())
: connectAnonymousUser();
this.connectionPromise.finally(() => (this.connectionPromise = undefined));
return this.connectionPromise;
return await withoutConcurrency(this.connectionConcurrencyTag, () =>
connectAnonymousUser(),
);
};
}
4 changes: 2 additions & 2 deletions packages/client/src/__tests__/Call.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -271,8 +271,8 @@ describe('muting logic', () => {
.mockImplementation(() => Promise.resolve({ duration: '0ms' }));
});

it('should mute self', () => {
call.muteSelf('audio');
it('should mute self', async () => {
await call.muteSelf('audio');

expect(spy).toHaveBeenCalledWith(userId, 'audio');
});
Expand Down
47 changes: 33 additions & 14 deletions packages/client/src/coordinator/connection/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,12 @@ import {
} from './types';
import { InsightMetrics, postInsights } from './insights';
import { getLocationHint } from './location';
import { CreateGuestRequest, CreateGuestResponse } from '../../gen/coordinator';
import {
ConnectedEvent,
CreateGuestRequest,
CreateGuestResponse,
} from '../../gen/coordinator';
import { makeSafePromise, type SafePromise } from '../../helpers/promise';

export class StreamClient {
_user?: UserWithId;
Expand Down Expand Up @@ -67,14 +72,14 @@ export class StreamClient {
wsBaseURL?: string;
wsConnection: StableWSConnection | null;
wsFallback?: WSConnectionFallback;
wsPromise: ConnectAPIResponse | null;
private wsPromiseSafe: SafePromise<ConnectedEvent | undefined> | null;
consecutiveFailures: number;
insightMetrics: InsightMetrics;
defaultWSTimeoutWithFallback: number;
defaultWSTimeout: number;
resolveConnectionId?: Function;
rejectConnectionId?: Function;
connectionIdPromise?: Promise<string | undefined>;
private connectionIdPromiseSafe?: SafePromise<string | undefined>;
guestUserCreatePromise?: Promise<CreateGuestResponse>;

/**
Expand Down Expand Up @@ -155,7 +160,7 @@ export class StreamClient {

// WS connection is initialized when setUser is called
this.wsConnection = null;
this.wsPromise = null;
this.wsPromiseSafe = null;
this.setUserPromise = null;

// mapping between channel groups and configs
Expand Down Expand Up @@ -340,12 +345,13 @@ export class StreamClient {
);
}

if (this.wsConnection?.isConnecting && this.wsPromise) {
const wsPromise = this.wsPromiseSafe?.();
if (this.wsConnection?.isConnecting && wsPromise) {
this.logger(
'info',
'client:openConnection() - connection already in progress',
);
return this.wsPromise;
return await wsPromise;
}

if (
Expand All @@ -357,14 +363,15 @@ export class StreamClient {
'client:openConnection() - openConnection called twice, healthy connection already exists',
);

return Promise.resolve();
return;
}

this._setupConnectionIdPromise();

this.clientID = `${this.userID}--${randomId()}`;
this.wsPromise = this.connect();
return this.wsPromise;
const newWsPromise = this.connect();
this.wsPromiseSafe = makeSafePromise(newWsPromise);
return await newWsPromise;
};

/**
Expand All @@ -388,7 +395,7 @@ export class StreamClient {

this.tokenManager.reset();

this.connectionIdPromise = undefined;
this.connectionIdPromiseSafe = undefined;
this.rejectConnectionId = undefined;
this.resolveConnectionId = undefined;
};
Expand Down Expand Up @@ -481,16 +488,28 @@ export class StreamClient {
/**
* sets up the this.connectionIdPromise
*/
_setupConnectionIdPromise = async () => {
_setupConnectionIdPromise = () => {
/** a promise that is resolved once connection id is set */
this.connectionIdPromise = new Promise<string | undefined>(
(resolve, reject) => {
this.connectionIdPromiseSafe = makeSafePromise(
new Promise<string | undefined>((resolve, reject) => {
this.resolveConnectionId = resolve;
this.rejectConnectionId = reject;
},
}),
);
};

get connectionIdPromise() {
return this.connectionIdPromiseSafe?.();
}

get isConnectionIsPromisePending() {
return this.connectionIdPromiseSafe?.checkPending() ?? false;
}

get wsPromise() {
return this.wsPromiseSafe?.();
}

_logApiRequest = (
type: string,
url: string,
Expand Down
33 changes: 14 additions & 19 deletions packages/client/src/coordinator/connection/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,15 @@ import {
import {
addConnectionEventListeners,
convertErrorToJson,
isPromisePending,
KnownCodes,
randomId,
removeConnectionEventListeners,
retryInterval,
sleep,
} from './utils';
import type {
ConnectAPIResponse,
LogLevel,
StreamVideoEvent,
UR,
} from './types';
import type { LogLevel, StreamVideoEvent, UR } from './types';
import type { ConnectedEvent, WSAuthMessage } from '../../gen/coordinator';
import { makeSafePromise, type SafePromise } from '../../helpers/promise';

// Type guards to check WebSocket error type
const isCloseEvent = (
Expand Down Expand Up @@ -54,7 +49,7 @@ const isErrorEvent = (
export class StableWSConnection {
// local vars
connectionID?: string;
connectionOpen?: ConnectAPIResponse;
private connectionOpenSafe?: SafePromise<ConnectedEvent>;
authenticationSent: boolean;
consecutiveFailures: number;
pingInterval: number;
Expand Down Expand Up @@ -338,13 +333,7 @@ export class StableWSConnection {
await this.client.tokenManager.loadToken();
}

let mustSetupConnectionIdPromise = true;
if (this.client.connectionIdPromise) {
if (await isPromisePending(this.client.connectionIdPromise)) {
mustSetupConnectionIdPromise = false;
}
}
if (mustSetupConnectionIdPromise) {
if (!this.client.isConnectionIsPromisePending) {
this.client._setupConnectionIdPromise();
}
this._setupConnectionPromise();
Expand Down Expand Up @@ -747,12 +736,18 @@ export class StableWSConnection {
_setupConnectionPromise = () => {
this.isResolved = false;
/** a promise that is resolved once ws.open is called */
this.connectionOpen = new Promise<ConnectedEvent>((resolve, reject) => {
this.resolvePromise = resolve;
this.rejectPromise = reject;
});
this.connectionOpenSafe = makeSafePromise(
new Promise<ConnectedEvent>((resolve, reject) => {
this.resolvePromise = resolve;
this.rejectPromise = reject;
}),
);
};

get connectionOpen() {
return this.connectionOpenSafe?.();
}

/**
* Schedules a next health check ping for websocket.
*/
Expand Down
47 changes: 47 additions & 0 deletions packages/client/src/helpers/promise.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
export interface SafePromise<T> {
(): Promise<T>;
checkPending(): boolean;
}

type Fulfillment<T> =
| {
status: 'resolved';
result: T;
}
| {
status: 'rejected';
error: unknown;
};

/**
* Saving a long-lived reference to a promise that can reject can be unsafe,
* since rejecting the promise causes an unhandled rejection error (even if the
* rejection is handled everywhere promise result is expected).
*
* To avoid that, we add both resolution and rejection handlers to the promise.
* That way, the saved promise never rejects. A callback is provided as return
* value to build a *new* promise, that resolves and rejects along with
* the original promise.
* @param promise Promise to wrap, which possibly rejects
* @returns Callback to build a new promise, which resolves and rejects along
* with the original promise
*/
export function makeSafePromise<T>(promise: Promise<T>): SafePromise<T> {
let isPending = true;

const safePromise: Promise<Fulfillment<T>> = promise
.then(
(result) => ({ status: 'resolved' as const, result }),
(error) => ({ status: 'rejected' as const, error }),
)
.finally(() => (isPending = false));

const unwrapPromise = () =>
safePromise.then((fulfillment) => {
if (fulfillment.status === 'rejected') throw fulfillment.error;
return fulfillment.result;
});

unwrapPromise.checkPending = () => isPending;
return unwrapPromise;
}
Loading