Skip to content

Commit 8a528b9

Browse files
feat: implement retry encryption key request mechanism for livekit calls
1 parent 9c6cbbd commit 8a528b9

File tree

2 files changed

+271
-46
lines changed

2 files changed

+271
-46
lines changed

lib/src/voip/backend/livekit_backend.dart

Lines changed: 78 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,12 @@ class LiveKitBackend extends CallBackend {
2424
/// participant:keyIndex:keyBin
2525
final Map<CallParticipant, Map<int, Uint8List>> _encryptionKeysMap = {};
2626

27+
/// Tracks pending encryption key request retries per participant
28+
final Map<CallParticipant, Timer> _requestEncryptionKeyPending = {};
29+
30+
/// Retry interval for key requests
31+
static const Duration _keyRequestRetryInterval = Duration(seconds: 2);
32+
2733
final List<Future> _setNewKeyTimeouts = [];
2834

2935
int _indexCounter = 0;
@@ -79,10 +85,7 @@ class LiveKitBackend extends CallBackend {
7985
'_makeNewSenderKey using previous key because last created at ${_lastNewKeyTime.toString()}',
8086
);
8187
// still a fairly new key, just send that
82-
await _sendEncryptionKeysEvent(
83-
groupCall,
84-
_latestLocalKeyIndex,
85-
);
88+
await _sendEncryptionKeysEvent(groupCall, _latestLocalKeyIndex);
8689
return;
8790
}
8891

@@ -222,20 +225,22 @@ class LiveKitBackend extends CallBackend {
222225
);
223226
// now wait for the key to propogate and then set it, hopefully users can
224227
// stil decrypt everything
225-
final useKeyTimeout =
226-
Future.delayed(groupCall.voip.timeouts!.useKeyDelay, () async {
227-
Logs().i(
228-
'[VOIP E2EE] delayed setting key changed event for ${participant.id} idx $encryptionKeyIndex key $encryptionKeyBin',
229-
);
230-
await groupCall.voip.delegate.keyProvider?.onSetEncryptionKey(
231-
participant,
232-
encryptionKeyBin,
233-
encryptionKeyIndex,
234-
);
235-
if (participant.isLocal) {
236-
_currentLocalKeyIndex = encryptionKeyIndex;
237-
}
238-
});
228+
final useKeyTimeout = Future.delayed(
229+
groupCall.voip.timeouts!.useKeyDelay,
230+
() async {
231+
Logs().i(
232+
'[VOIP E2EE] delayed setting key changed event for ${participant.id} idx $encryptionKeyIndex key $encryptionKeyBin',
233+
);
234+
await groupCall.voip.delegate.keyProvider?.onSetEncryptionKey(
235+
participant,
236+
encryptionKeyBin,
237+
encryptionKeyIndex,
238+
);
239+
if (participant.isLocal) {
240+
_currentLocalKeyIndex = encryptionKeyIndex;
241+
}
242+
},
243+
);
239244
_setNewKeyTimeouts.add(useKeyTimeout);
240245
} else {
241246
Logs().i(
@@ -271,17 +276,15 @@ class LiveKitBackend extends CallBackend {
271276
'[VOIP E2EE] _sendEncryptionKeysEvent Tried to send encryption keys event but no keys found!',
272277
);
273278
await _makeNewSenderKey(groupCall, false);
274-
await _sendEncryptionKeysEvent(
275-
groupCall,
276-
keyIndex,
277-
sendTo: sendTo,
278-
);
279+
await _sendEncryptionKeysEvent(groupCall, keyIndex, sendTo: sendTo);
279280
return;
280281
}
281282

282283
try {
283284
final keyContent = EncryptionKeysEventContent(
284-
[EncryptionKeyEntry(keyIndex, base64Encode(myLatestKey))],
285+
[
286+
EncryptionKeyEntry(keyIndex, base64Encode(myLatestKey)),
287+
],
285288
groupCall.groupCallId,
286289
);
287290
final Map<String, Object> data = {
@@ -300,11 +303,7 @@ class LiveKitBackend extends CallBackend {
300303
);
301304
} catch (e, s) {
302305
Logs().e('[VOIP E2EE] Failed to send e2ee keys, retrying', e, s);
303-
await _sendEncryptionKeysEvent(
304-
groupCall,
305-
keyIndex,
306-
sendTo: sendTo,
307-
);
306+
await _sendEncryptionKeysEvent(groupCall, keyIndex, sendTo: sendTo);
308307
}
309308
}
310309

@@ -375,6 +374,10 @@ class LiveKitBackend extends CallBackend {
375374
GroupCallSession groupCall,
376375
List<CallParticipant> remoteParticipants,
377376
) async {
377+
Logs().v(
378+
'[VOIP E2EE] requesting stream encryption keys from ${remoteParticipants.map((e) => e.id)}',
379+
);
380+
378381
final Map<String, Object> data = {
379382
'conf_id': groupCall.groupCallId,
380383
'device_id': groupCall.client.deviceID!,
@@ -387,6 +390,29 @@ class LiveKitBackend extends CallBackend {
387390
data,
388391
EventTypes.GroupCallMemberEncryptionKeysRequest,
389392
);
393+
394+
// Set up retry timers for each participant
395+
for (final rp in remoteParticipants) {
396+
// Skip if a retry is already pending for this participant
397+
if (_requestEncryptionKeyPending.containsKey(rp)) continue;
398+
399+
var retryCount = 0;
400+
_requestEncryptionKeyPending[rp] = Timer.periodic(
401+
_keyRequestRetryInterval,
402+
(timer) {
403+
retryCount++;
404+
if (retryCount >= 5) {
405+
Logs().w(
406+
'[VOIP E2EE] Max retries (5) reached for ${rp.id}, giving up key request',
407+
);
408+
timer.cancel();
409+
_requestEncryptionKeyPending.remove(rp);
410+
return;
411+
}
412+
unawaited(requestEncrytionKey(groupCall, [rp]));
413+
},
414+
);
415+
}
390416
}
391417

392418
@override
@@ -403,8 +429,14 @@ class LiveKitBackend extends CallBackend {
403429
final keyContent = EncryptionKeysEventContent.fromJson(content);
404430

405431
final callId = keyContent.callId;
406-
final p =
407-
CallParticipant(groupCall.voip, userId: userId, deviceId: deviceId);
432+
final p = CallParticipant(
433+
groupCall.voip,
434+
userId: userId,
435+
deviceId: deviceId,
436+
);
437+
438+
// Cancel any pending retry for this participant since we received keys
439+
_requestEncryptionKeyPending.remove(p)?.cancel();
408440

409441
if (keyContent.keys.isEmpty) {
410442
Logs().w(
@@ -471,11 +503,7 @@ class LiveKitBackend extends CallBackend {
471503
groupCall,
472504
_latestLocalKeyIndex,
473505
sendTo: [
474-
CallParticipant(
475-
groupCall.voip,
476-
userId: userId,
477-
deviceId: deviceId,
478-
),
506+
CallParticipant(groupCall.voip, userId: userId, deviceId: deviceId),
479507
],
480508
);
481509
return true;
@@ -525,16 +553,14 @@ class LiveKitBackend extends CallBackend {
525553
if (_memberLeaveEncKeyRotateDebounceTimer != null) {
526554
_memberLeaveEncKeyRotateDebounceTimer!.cancel();
527555
}
528-
_memberLeaveEncKeyRotateDebounceTimer =
529-
Timer(groupCall.voip.timeouts!.makeKeyOnLeaveDelay, () async {
530-
// we skipJoinDebounce here because we want to make sure a new key is generated
531-
// and that the join debounce does not block us from making a new key
532-
await _makeNewSenderKey(
533-
groupCall,
534-
true,
535-
skipJoinDebounce: true,
536-
);
537-
});
556+
_memberLeaveEncKeyRotateDebounceTimer = Timer(
557+
groupCall.voip.timeouts!.makeKeyOnLeaveDelay,
558+
() async {
559+
// we skipJoinDebounce here because we want to make sure a new key is generated
560+
// and that the join debounce does not block us from making a new key
561+
await _makeNewSenderKey(groupCall, true, skipJoinDebounce: true);
562+
},
563+
);
538564
}
539565

540566
@override
@@ -545,6 +571,12 @@ class LiveKitBackend extends CallBackend {
545571
_currentLocalKeyIndex = 0;
546572
_latestLocalKeyIndex = 0;
547573
_memberLeaveEncKeyRotateDebounceTimer?.cancel();
574+
575+
// Clean up all pending encryption key request retries
576+
for (final timer in _requestEncryptionKeyPending.values) {
577+
timer.cancel();
578+
}
579+
_requestEncryptionKeyPending.clear();
548580
}
549581

550582
@override

test/livekit_backend_test.dart

Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
import 'dart:convert';
2+
3+
import 'package:test/test.dart';
4+
5+
import 'package:matrix/matrix.dart';
6+
import 'fake_client.dart';
7+
import 'webrtc_stub.dart';
8+
9+
void main() {
10+
late Client matrix;
11+
late Room room;
12+
late VoIP voip;
13+
late LiveKitBackend backend;
14+
15+
group('LiveKitBackend encryption key request retry tests', () {
16+
Logs().level = Level.info;
17+
18+
setUp(() async {
19+
matrix = await getClient();
20+
await matrix.abortSync();
21+
22+
voip = VoIP(matrix, MockWebRTCDelegate());
23+
VoIP.customTxid = '1234';
24+
final id = '!calls:example.com';
25+
room = matrix.getRoomById(id)!;
26+
27+
backend = LiveKitBackend(
28+
livekitServiceUrl: 'https://livekit.example.com',
29+
livekitAlias: 'test_alias',
30+
);
31+
32+
// Clear logs before each test to avoid interference
33+
Logs().outputEvents.clear();
34+
});
35+
36+
/// Helper to create a group call session for testing
37+
Future<GroupCallSession> createGroupCall(String callId) async {
38+
final membership = CallMembership(
39+
userId: matrix.userID!,
40+
callId: callId,
41+
backend: backend,
42+
deviceId: matrix.deviceID!,
43+
expiresTs:
44+
DateTime.now().add(Duration(hours: 1)).millisecondsSinceEpoch,
45+
roomId: room.id,
46+
membershipId: voip.currentSessionId,
47+
voip: voip,
48+
eventId: 'membership_event_$callId',
49+
);
50+
51+
room.setState(
52+
Event(
53+
content: {
54+
'memberships': [membership.toJson()],
55+
},
56+
type: EventTypes.GroupCallMember,
57+
eventId: 'membership_event_$callId',
58+
senderId: matrix.userID!,
59+
originServerTs: DateTime.now(),
60+
room: room,
61+
stateKey: matrix.userID!,
62+
),
63+
);
64+
65+
await voip.createGroupCallFromRoomStateEvent(membership);
66+
final groupCall = voip.getGroupCallById(room.id, callId);
67+
await groupCall!.enter();
68+
return groupCall;
69+
}
70+
71+
/// Helper to count key request log messages for a specific participant
72+
int countKeyRequestsFor(String participantId) {
73+
return Logs()
74+
.outputEvents
75+
.where(
76+
(event) =>
77+
event.title
78+
.contains('requesting stream encryption keys from') &&
79+
event.title.contains(participantId),
80+
)
81+
.length;
82+
}
83+
84+
test(
85+
'retry mechanism automatically re-requests keys when initial request fails',
86+
() async {
87+
// This test verifies: without retry, a failed key request leaves the call
88+
// in an unrecoverable state. With retry, requests are automatically retried.
89+
90+
final groupCall = await createGroupCall('test_retry_mechanism');
91+
92+
const remoteUserId = '@retry_test_user:example.com';
93+
const remoteDeviceId = 'RETRY_TEST_DEVICE';
94+
final remoteParticipant = CallParticipant(
95+
voip,
96+
userId: remoteUserId,
97+
deviceId: remoteDeviceId,
98+
);
99+
100+
Logs().outputEvents.clear();
101+
Logs().level = Level.verbose;
102+
103+
// Step 1: Initial key request (simulates framecryptor detecting missingKey)
104+
await backend.requestEncrytionKey(groupCall, [remoteParticipant]);
105+
expect(countKeyRequestsFor(remoteUserId), 1);
106+
107+
// Step 2: Wait for retry timer (2 second interval)
108+
// WITHOUT retry: count stays at 1 (STUCK!)
109+
// WITH retry: count increases (RECOVERY!)
110+
await Future.delayed(Duration(milliseconds: 2100));
111+
112+
expect(
113+
countKeyRequestsFor(remoteUserId),
114+
greaterThan(1),
115+
reason: 'Retry mechanism should automatically re-request keys. '
116+
'Without retry, the call would be stuck in an unrecoverable state.',
117+
);
118+
119+
await backend.dispose(groupCall);
120+
},
121+
);
122+
123+
test(
124+
'each participant has independent retry - receiving keys for one does not affect another',
125+
() async {
126+
final groupCall = await createGroupCall('test_independent_retries');
127+
128+
const user1 = '@independent_user1:example.com';
129+
const device1 = 'DEVICE_1';
130+
const user2 = '@independent_user2:example.com';
131+
const device2 = 'DEVICE_2';
132+
133+
final participant1 =
134+
CallParticipant(voip, userId: user1, deviceId: device1);
135+
final participant2 =
136+
CallParticipant(voip, userId: user2, deviceId: device2);
137+
138+
Logs().outputEvents.clear();
139+
Logs().level = Level.verbose;
140+
141+
// Request keys from both participants
142+
await backend.requestEncrytionKey(groupCall, [participant1]);
143+
await backend.requestEncrytionKey(groupCall, [participant2]);
144+
145+
expect(countKeyRequestsFor(user1), 1);
146+
expect(countKeyRequestsFor(user2), 1);
147+
148+
// Wait for retry
149+
await Future.delayed(Duration(milliseconds: 2100));
150+
expect(countKeyRequestsFor(user1), greaterThan(1));
151+
expect(countKeyRequestsFor(user2), greaterThan(1));
152+
153+
// Receive keys ONLY from participant 1
154+
await backend.onCallEncryption(
155+
groupCall,
156+
user1,
157+
device1,
158+
{
159+
'keys': [
160+
{
161+
'key': base64Encode([1, 2, 3, 4, 5, 6, 7, 8]),
162+
'index': 0,
163+
},
164+
],
165+
'call_id': 'test_independent_retries',
166+
},
167+
);
168+
169+
final countUser1AfterKeys = countKeyRequestsFor(user1);
170+
final countUser2AfterKeys = countKeyRequestsFor(user2);
171+
172+
// Wait another retry interval
173+
await Future.delayed(Duration(milliseconds: 2100));
174+
175+
// User 1's retry should have stopped (received keys)
176+
expect(
177+
countKeyRequestsFor(user1),
178+
countUser1AfterKeys,
179+
reason: 'User 1 retry should stop after receiving keys.',
180+
);
181+
182+
// User 2's retry should continue (no keys received)
183+
expect(
184+
countKeyRequestsFor(user2),
185+
greaterThan(countUser2AfterKeys),
186+
reason: 'User 2 retry should continue since no keys were received.',
187+
);
188+
189+
await backend.dispose(groupCall);
190+
},
191+
);
192+
});
193+
}

0 commit comments

Comments
 (0)