diff --git a/src/state/CallViewModel.ts b/src/state/CallViewModel.ts index 6b046b28b..009d042c1 100644 --- a/src/state/CallViewModel.ts +++ b/src/state/CallViewModel.ts @@ -400,10 +400,10 @@ export class CallViewModel extends ViewModel { c?.state === "ready" ? // TODO mapping to ConnectionState for compatibility, but we should use the full state? c.value.transportState$.pipe( - switchMap((s) => { + map((s) => { if (s.state === "ConnectedToLkRoom") - return s.connectionState$; - return of(ConnectionState.Disconnected); + return s.livekitState; + return ConnectionState.Disconnected; }), ) : of(ConnectionState.Disconnected), diff --git a/src/state/Connection.test.ts b/src/state/Connection.test.ts index b5389db41..73a7d7714 100644 --- a/src/state/Connection.test.ts +++ b/src/state/Connection.test.ts @@ -364,6 +364,59 @@ describe("Start connection states", () => { expect(connectedState?.state).toEqual("ConnectedToLkRoom"); }); + it("should relay livekit events once connected", async () => { + setupTest(); + + const connection = setupRemoteConnection(); + + await connection.start(); + + let capturedStates: TransportState[] = []; + const s = connection.transportState$.subscribe((value) => { + capturedStates.push(value); + }); + onTestFinished(() => s.unsubscribe()); + + const states = [ + ConnectionState.Disconnected, + ConnectionState.Connecting, + ConnectionState.Connected, + ConnectionState.SignalReconnecting, + ConnectionState.Connecting, + ConnectionState.Connected, + ConnectionState.Reconnecting, + ]; + for (const state of states) { + fakeRoomEventEmiter.emit(RoomEvent.ConnectionStateChanged, state); + } + + for (const state of states) { + const s = capturedStates.shift(); + expect(s?.state).toEqual("ConnectedToLkRoom"); + const transportState = s as TransportState & { + state: "ConnectedToLkRoom"; + }; + expect(transportState.livekitState).toEqual(state); + + // should always have the focus info + expect(transportState.transport.livekit_alias).toEqual( + livekitFocus.livekit_alias, + ); + expect(transportState.transport.livekit_service_url).toEqual( + livekitFocus.livekit_service_url, + ); + } + + // If the state is not ConnectedToLkRoom, no events should be relayed anymore + await connection.stop(); + capturedStates = []; + for (const state of states) { + fakeRoomEventEmiter.emit(RoomEvent.ConnectionStateChanged, state); + } + + expect(capturedStates.length).toEqual(0); + }); + it("shutting down the scope should stop the connection", async () => { setupTest(); vi.useFakeTimers(); diff --git a/src/state/Connection.ts b/src/state/Connection.ts index 7b044e1d5..0ca109af3 100644 --- a/src/state/Connection.ts +++ b/src/state/Connection.ts @@ -21,7 +21,7 @@ import { type CallMembership, type LivekitTransport, } from "matrix-js-sdk/lib/matrixrtc"; -import { BehaviorSubject, combineLatest, type Observable } from "rxjs"; +import { BehaviorSubject, combineLatest } from "rxjs"; import { getSFUConfigWithOpenID, @@ -60,7 +60,7 @@ export type TransportState = | { state: "FailedToStart"; error: Error; transport: LivekitTransport } | { state: "ConnectedToLkRoom"; - connectionState$: Observable; + livekitState: ConnectionState; transport: LivekitTransport; } | { state: "Stopped"; transport: LivekitTransport }; @@ -159,7 +159,7 @@ export class Connection { this._transportState$.next({ state: "ConnectedToLkRoom", transport: this.transport, - connectionState$: connectionStateObserver(this.livekitRoom), + livekitState: this.livekitRoom.state, }); } catch (error) { this._transportState$.next({ @@ -250,6 +250,20 @@ export class Connection { [], ); + scope + .behavior(connectionStateObserver(livekitRoom)) + .subscribe((livekitState) => { + const current = this._transportState$.value; + // Only update the state if we are already connected to the LiveKit room. + if (current.state === "ConnectedToLkRoom") { + this._transportState$.next({ + state: "ConnectedToLkRoom", + livekitState, + transport: current.transport, + }); + } + }); + scope.onEnd(() => void this.stop()); } }