Skip to content

Commit 93a1a6a

Browse files
authored
Backport: Fix: never stop ring feedback on the sender side (#3503)
* make ring$ a behavior and add code comments to justify/explain the change. Signed-off-by: Timo K <[email protected]> * Add test: reproduce "ring does not stop" race. Signed-off-by: Timo K <[email protected]> --------- Signed-off-by: Timo K <[email protected]>
1 parent d7824ce commit 93a1a6a

File tree

2 files changed

+96
-51
lines changed

2 files changed

+96
-51
lines changed

src/state/CallViewModel.test.ts

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,7 @@ const mockLegacyRingEvent = {} as { event_id: string } & ICallNotifyContent;
266266
interface CallViewModelInputs {
267267
remoteParticipants$: Behavior<RemoteParticipant[]>;
268268
rtcMembers$: Behavior<Partial<CallMembership>[]>;
269-
connectionState$: Observable<ECConnectionState>;
269+
livekitConnectionState$: Observable<ECConnectionState>;
270270
speaking: Map<Participant, Observable<boolean>>;
271271
mediaDevices: MediaDevices;
272272
initialSyncState: SyncState;
@@ -276,7 +276,7 @@ function withCallViewModel(
276276
{
277277
remoteParticipants$ = constant([]),
278278
rtcMembers$ = constant([localRtcMember]),
279-
connectionState$ = of(ConnectionState.Connected),
279+
livekitConnectionState$: connectionState$ = of(ConnectionState.Connected),
280280
speaking = new Map(),
281281
mediaDevices = mockMediaDevices({}),
282282
initialSyncState = SyncState.Syncing,
@@ -384,7 +384,7 @@ test("participants are retained during a focus switch", () => {
384384
b: [],
385385
}),
386386
rtcMembers$: constant([localRtcMember, aliceRtcMember, bobRtcMember]),
387-
connectionState$: behavior(connectionInputMarbles, {
387+
livekitConnectionState$: behavior(connectionInputMarbles, {
388388
c: ConnectionState.Connected,
389389
s: ECAddonConnectionState.ECSwitchingFocus,
390390
}),
@@ -1251,6 +1251,41 @@ describe("waitForCallPickup$", () => {
12511251
});
12521252
});
12531253

1254+
test("regression test: does stop ringing in case livekitConnectionState$ emits after didSendCallNotification$ has already emitted", () => {
1255+
withTestScheduler(({ schedule, expectObservable, behavior }) => {
1256+
withCallViewModel(
1257+
{
1258+
livekitConnectionState$: behavior("d 9ms c", {
1259+
d: ConnectionState.Disconnected,
1260+
c: ConnectionState.Connected,
1261+
}),
1262+
},
1263+
(vm, rtcSession) => {
1264+
// Fire a call notification IMMEDIATELY (its important for this test, that this happens before the livekitConnectionState$ emits)
1265+
schedule("n", {
1266+
n: () => {
1267+
rtcSession.emit(
1268+
MatrixRTCSessionEvent.DidSendCallNotification,
1269+
mockRingEvent("$notif1", 30),
1270+
mockLegacyRingEvent,
1271+
);
1272+
},
1273+
});
1274+
1275+
expectObservable(vm.callPickupState$).toBe("a 9ms b 29ms c", {
1276+
a: "unknown",
1277+
b: "ringing",
1278+
c: "timeout",
1279+
});
1280+
},
1281+
{
1282+
waitForCallPickup: true,
1283+
encryptionSystem: { kind: E2eeType.PER_PARTICIPANT },
1284+
},
1285+
);
1286+
});
1287+
});
1288+
12541289
test("ringing -> success if someone joins before timeout", () => {
12551290
withTestScheduler(({ behavior, schedule, expectObservable }) => {
12561291
// Someone joins at 20ms (both LiveKit participant and MatrixRTC member)
@@ -1305,7 +1340,7 @@ describe("waitForCallPickup$", () => {
13051340
a: [localRtcMember],
13061341
b: [localRtcMember, aliceRtcMember],
13071342
}),
1308-
connectionState$,
1343+
livekitConnectionState$: connectionState$,
13091344
},
13101345
(vm, rtcSession) => {
13111346
// Notify at 5ms so we enter ringing, then get disconnected 5ms later

src/state/CallViewModel.ts

Lines changed: 57 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -880,60 +880,68 @@ export class CallViewModel extends ViewModel {
880880
? this.allOthersLeft$
881881
: NEVER;
882882

883+
private readonly didSendCallNotification$ = fromEvent(
884+
this.matrixRTCSession,
885+
MatrixRTCSessionEvent.DidSendCallNotification,
886+
) as Observable<
887+
Parameters<
888+
MatrixRTCSessionEventHandlerMap[MatrixRTCSessionEvent.DidSendCallNotification]
889+
>
890+
>;
883891
/**
884892
* Whenever the RTC session tells us that it intends to ring the remote
885893
* participant's devices, this emits an Observable tracking the current state of
886894
* that ringing process.
887895
*/
888-
private readonly ring$: Observable<
889-
Observable<"ringing" | "timeout" | "decline">
890-
> = (
891-
fromEvent(
892-
this.matrixRTCSession,
893-
MatrixRTCSessionEvent.DidSendCallNotification,
894-
) as Observable<
895-
Parameters<
896-
MatrixRTCSessionEventHandlerMap[MatrixRTCSessionEvent.DidSendCallNotification]
897-
>
898-
>
899-
).pipe(
900-
filter(
901-
([notificationEvent]) => notificationEvent.notification_type === "ring",
902-
),
903-
map(([notificationEvent]) => {
904-
const lifetimeMs = notificationEvent?.lifetime ?? 0;
905-
return concat(
906-
lifetimeMs === 0
907-
? // If no lifetime, skip the ring state
908-
EMPTY
909-
: // Ring until lifetime ms have passed
910-
timer(lifetimeMs).pipe(
911-
ignoreElements(),
912-
startWith("ringing" as const),
913-
),
914-
// The notification lifetime has timed out, meaning ringing has likely
915-
// stopped on all receiving clients.
916-
of("timeout" as const),
917-
NEVER,
918-
).pipe(
919-
takeUntil(
920-
(
921-
fromEvent(this.matrixRoom, RoomEvent.Timeline) as Observable<
922-
Parameters<EventTimelineSetHandlerMap[RoomEvent.Timeline]>
923-
>
924-
).pipe(
925-
filter(
926-
([event]) =>
927-
event.getType() === EventType.RTCDecline &&
928-
event.getRelation()?.rel_type === "m.reference" &&
929-
event.getRelation()?.event_id === notificationEvent.event_id &&
930-
event.getSender() !== this.userId,
896+
// This is a behavior since we need to store the latest state for when we subscribe to this after `didSendCallNotification$`
897+
// has already emitted but we still need the latest observable with a timeout timer that only gets created on after receiving `notificationEvent`.
898+
// A behavior will emit the latest observable with the running timer to new subscribers.
899+
// see also: callPickupState$ and in particular the line: `return this.ring$.pipe(mergeAll());` here we otherwise might get an EMPTY observable if
900+
// `ring$` would not be a behavior.
901+
private readonly ring$: Behavior<
902+
Observable<"ringing" | "timeout" | "decline"> | Observable<never>
903+
> = this.scope.behavior(
904+
this.didSendCallNotification$.pipe(
905+
filter(
906+
([notificationEvent]) => notificationEvent.notification_type === "ring",
907+
),
908+
map(([notificationEvent]) => {
909+
const lifetimeMs = notificationEvent?.lifetime ?? 0;
910+
return concat(
911+
lifetimeMs === 0
912+
? // If no lifetime, skip the ring state
913+
EMPTY
914+
: // Ring until lifetime ms have passed
915+
timer(lifetimeMs).pipe(
916+
ignoreElements(),
917+
startWith("ringing" as const),
918+
),
919+
// The notification lifetime has timed out, meaning ringing has likely
920+
// stopped on all receiving clients.
921+
of("timeout" as const),
922+
NEVER,
923+
).pipe(
924+
takeUntil(
925+
(
926+
fromEvent(this.matrixRoom, RoomEvent.Timeline) as Observable<
927+
Parameters<EventTimelineSetHandlerMap[RoomEvent.Timeline]>
928+
>
929+
).pipe(
930+
filter(
931+
([event]) =>
932+
event.getType() === EventType.RTCDecline &&
933+
event.getRelation()?.rel_type === "m.reference" &&
934+
event.getRelation()?.event_id ===
935+
notificationEvent.event_id &&
936+
event.getSender() !== this.userId,
937+
),
931938
),
932939
),
933-
),
934-
endWith("decline" as const),
935-
);
936-
}),
940+
endWith("decline" as const),
941+
);
942+
}),
943+
),
944+
EMPTY,
937945
);
938946

939947
/**
@@ -972,6 +980,8 @@ export class CallViewModel extends ViewModel {
972980
return of("success" as const);
973981
}
974982
// Show the ringing state of the most recent ringing attempt.
983+
// ring$ is a behavior so it will emit the latest observable which very well might already have a running timer.
984+
// this is important in case livekitConnectionState$ after didSendCallNotification$ has already emitted.
975985
return this.ring$.pipe(switchAll());
976986
}),
977987
// The state starts as 'unknown' because we don't know if the RTC

0 commit comments

Comments
 (0)