Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/state/CallViewModel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -400,10 +400,10 @@ export class CallViewModel extends ViewModel {
c?.state === "ready"
? // TODO mapping to ConnectionState for compatibility, but we should use the full state?
c.value.transportState$.pipe(
switchMap((s) => {
map((s) => {
if (s.state === "ConnectedToLkRoom")
return s.connectionState$;
return of(ConnectionState.Disconnected);
return s.livekitState;
return ConnectionState.Disconnected;
}),
)
: of(ConnectionState.Disconnected),
Expand Down
53 changes: 53 additions & 0 deletions src/state/Connection.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,59 @@ describe("Start connection states", () => {
expect(connectedState?.state).toEqual("ConnectedToLkRoom");
});

it("should relay livekit events once connected", async () => {
setupTest();

const connection = setupRemoteConnection();

await connection.start();

let capturedStates: TransportState[] = [];
const s = connection.transportState$.subscribe((value) => {
capturedStates.push(value);
});
onTestFinished(() => s.unsubscribe());

const states = [
ConnectionState.Disconnected,
ConnectionState.Connecting,
ConnectionState.Connected,
ConnectionState.SignalReconnecting,
ConnectionState.Connecting,
ConnectionState.Connected,
ConnectionState.Reconnecting,
];
for (const state of states) {
fakeRoomEventEmiter.emit(RoomEvent.ConnectionStateChanged, state);
}

for (const state of states) {
const s = capturedStates.shift();
expect(s?.state).toEqual("ConnectedToLkRoom");
const transportState = s as TransportState & {
state: "ConnectedToLkRoom";
};
expect(transportState.livekitState).toEqual(state);

// should always have the focus info
expect(transportState.transport.livekit_alias).toEqual(
livekitFocus.livekit_alias,
);
expect(transportState.transport.livekit_service_url).toEqual(
livekitFocus.livekit_service_url,
);
}

// If the state is not ConnectedToLkRoom, no events should be relayed anymore
await connection.stop();
capturedStates = [];
for (const state of states) {
fakeRoomEventEmiter.emit(RoomEvent.ConnectionStateChanged, state);
}

expect(capturedStates.length).toEqual(0);
});

it("shutting down the scope should stop the connection", async () => {
setupTest();
vi.useFakeTimers();
Expand Down
20 changes: 17 additions & 3 deletions src/state/Connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import {
type CallMembership,
type LivekitTransport,
} from "matrix-js-sdk/lib/matrixrtc";
import { BehaviorSubject, combineLatest, type Observable } from "rxjs";
import { BehaviorSubject, combineLatest } from "rxjs";

import {
getSFUConfigWithOpenID,
Expand Down Expand Up @@ -60,7 +60,7 @@ export type TransportState =
| { state: "FailedToStart"; error: Error; transport: LivekitTransport }
| {
state: "ConnectedToLkRoom";
connectionState$: Observable<ConnectionState>;
livekitState: ConnectionState;
transport: LivekitTransport;
}
| { state: "Stopped"; transport: LivekitTransport };
Expand Down Expand Up @@ -159,7 +159,7 @@ export class Connection {
this._transportState$.next({
state: "ConnectedToLkRoom",
transport: this.transport,
connectionState$: connectionStateObserver(this.livekitRoom),
livekitState: this.livekitRoom.state,
});
} catch (error) {
this._transportState$.next({
Expand Down Expand Up @@ -250,6 +250,20 @@ export class Connection {
[],
);

scope
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we could have a switch map from a new startLifecycle observable if you prefer

.behavior<ConnectionState>(connectionStateObserver(livekitRoom))
.subscribe((livekitState) => {
Comment on lines +254 to +255
Copy link
Member

@robintown robintown Oct 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This subscribe call would need a .pipe(this.scope.bind()) to avoid risk of leaking, for the record

const current = this._transportState$.value;
// Only update the state if we are already connected to the LiveKit room.
if (current.state === "ConnectedToLkRoom") {
this._transportState$.next({
state: "ConnectedToLkRoom",
livekitState,
transport: current.transport,
});
}
});

scope.onEnd(() => void this.stop());
}
}
Expand Down
Loading