Skip to content

Commit 75a7cd2

Browse files
feat: implement retry encryption key request mechanism for livekit calls
1 parent 810b5de commit 75a7cd2

File tree

3 files changed

+297
-49
lines changed

3 files changed

+297
-49
lines changed
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
import 'dart:async';
2+
3+
/// Retries the `retryFunction` after a set `timeInterval` until `dispose` is called
4+
class RetryEventModel {
5+
final Duration timeInterval;
6+
final void Function(Timer? timer) retryFunction;
7+
8+
final Timer? _timer;
9+
10+
RetryEventModel({required this.timeInterval, required this.retryFunction})
11+
: _timer = Timer.periodic(timeInterval, retryFunction);
12+
13+
void dispose() => _timer?.cancel();
14+
}

lib/src/voip/backend/livekit_backend.dart

Lines changed: 90 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import 'dart:convert';
33
import 'dart:typed_data';
44

55
import 'package:matrix/matrix.dart';
6+
import 'package:matrix/src/models/retry_event_model.dart';
67
import 'package:matrix/src/utils/crypto/crypto.dart';
78

89
class LiveKitBackend extends CallBackend {
@@ -24,6 +25,18 @@ class LiveKitBackend extends CallBackend {
2425
/// participant:keyIndex:keyBin
2526
final Map<CallParticipant, Map<int, Uint8List>> _encryptionKeysMap = {};
2627

28+
/// Tracks pending encryption key request retries per participant
29+
final Map<CallParticipant, RetryEventModel> _requestEncryptionKeyPending = {};
30+
31+
/// Tracks retry count per participant to prevent infinite retries
32+
final Map<CallParticipant, int> _keyRequestRetryCount = {};
33+
34+
/// Maximum number of retry attempts for key requests
35+
static const int _maxKeyRequestRetries = 5;
36+
37+
/// Retry interval for key requests
38+
static const Duration _keyRequestRetryInterval = Duration(seconds: 2);
39+
2740
final List<Future> _setNewKeyTimeouts = [];
2841

2942
int _indexCounter = 0;
@@ -79,10 +92,7 @@ class LiveKitBackend extends CallBackend {
7992
'_makeNewSenderKey using previous key because last created at ${_lastNewKeyTime.toString()}',
8093
);
8194
// still a fairly new key, just send that
82-
await _sendEncryptionKeysEvent(
83-
groupCall,
84-
_latestLocalKeyIndex,
85-
);
95+
await _sendEncryptionKeysEvent(groupCall, _latestLocalKeyIndex);
8696
return;
8797
}
8898

@@ -222,20 +232,22 @@ class LiveKitBackend extends CallBackend {
222232
);
223233
// now wait for the key to propogate and then set it, hopefully users can
224234
// stil decrypt everything
225-
final useKeyTimeout =
226-
Future.delayed(groupCall.voip.timeouts!.useKeyDelay, () async {
227-
Logs().i(
228-
'[VOIP E2EE] delayed setting key changed event for ${participant.id} idx $encryptionKeyIndex key $encryptionKeyBin',
229-
);
230-
await groupCall.voip.delegate.keyProvider?.onSetEncryptionKey(
231-
participant,
232-
encryptionKeyBin,
233-
encryptionKeyIndex,
234-
);
235-
if (participant.isLocal) {
236-
_currentLocalKeyIndex = encryptionKeyIndex;
237-
}
238-
});
235+
final useKeyTimeout = Future.delayed(
236+
groupCall.voip.timeouts!.useKeyDelay,
237+
() async {
238+
Logs().i(
239+
'[VOIP E2EE] delayed setting key changed event for ${participant.id} idx $encryptionKeyIndex key $encryptionKeyBin',
240+
);
241+
await groupCall.voip.delegate.keyProvider?.onSetEncryptionKey(
242+
participant,
243+
encryptionKeyBin,
244+
encryptionKeyIndex,
245+
);
246+
if (participant.isLocal) {
247+
_currentLocalKeyIndex = encryptionKeyIndex;
248+
}
249+
},
250+
);
239251
_setNewKeyTimeouts.add(useKeyTimeout);
240252
} else {
241253
Logs().i(
@@ -271,19 +283,14 @@ class LiveKitBackend extends CallBackend {
271283
'[VOIP E2EE] _sendEncryptionKeysEvent Tried to send encryption keys event but no keys found!',
272284
);
273285
await _makeNewSenderKey(groupCall, false);
274-
await _sendEncryptionKeysEvent(
275-
groupCall,
276-
keyIndex,
277-
sendTo: sendTo,
278-
);
286+
await _sendEncryptionKeysEvent(groupCall, keyIndex, sendTo: sendTo);
279287
return;
280288
}
281289

282290
try {
283-
final keyContent = EncryptionKeysEventContent(
284-
[EncryptionKeyEntry(keyIndex, base64Encode(myLatestKey))],
285-
groupCall.groupCallId,
286-
);
291+
final keyContent = EncryptionKeysEventContent([
292+
EncryptionKeyEntry(keyIndex, base64Encode(myLatestKey)),
293+
], groupCall.groupCallId);
287294
final Map<String, Object> data = {
288295
...keyContent.toJson(),
289296
// used to find group call in groupCalls when ToDeviceEvent happens,
@@ -300,11 +307,7 @@ class LiveKitBackend extends CallBackend {
300307
);
301308
} catch (e, s) {
302309
Logs().e('[VOIP E2EE] Failed to send e2ee keys, retrying', e, s);
303-
await _sendEncryptionKeysEvent(
304-
groupCall,
305-
keyIndex,
306-
sendTo: sendTo,
307-
);
310+
await _sendEncryptionKeysEvent(groupCall, keyIndex, sendTo: sendTo);
308311
}
309312
}
310313

@@ -375,6 +378,10 @@ class LiveKitBackend extends CallBackend {
375378
GroupCallSession groupCall,
376379
List<CallParticipant> remoteParticipants,
377380
) async {
381+
Logs().v(
382+
'[VOIP E2EE] requesting stream encryption keys from ${remoteParticipants.map((e) => e.id)}',
383+
);
384+
378385
final Map<String, Object> data = {
379386
'conf_id': groupCall.groupCallId,
380387
'device_id': groupCall.client.deviceID!,
@@ -387,6 +394,32 @@ class LiveKitBackend extends CallBackend {
387394
data,
388395
EventTypes.GroupCallMemberEncryptionKeysRequest,
389396
);
397+
398+
// Set up retry timers for each participant
399+
for (final rp in remoteParticipants) {
400+
// Skip if a retry is already pending for this participant
401+
if (_requestEncryptionKeyPending.containsKey(rp)) continue;
402+
403+
// Check retry count to prevent infinite retries
404+
final currentCount = _keyRequestRetryCount[rp] ?? 0;
405+
if (currentCount >= _maxKeyRequestRetries) {
406+
Logs().w(
407+
'[VOIP E2EE] Max retries ($_maxKeyRequestRetries) reached for ${rp.id}, giving up key request',
408+
);
409+
_keyRequestRetryCount.remove(rp);
410+
continue;
411+
}
412+
_keyRequestRetryCount[rp] = currentCount + 1;
413+
414+
_requestEncryptionKeyPending[rp] = RetryEventModel(
415+
timeInterval: _keyRequestRetryInterval,
416+
retryFunction: (_) {
417+
// Remove the pending entry before retrying to allow new retry to be scheduled
418+
_requestEncryptionKeyPending.remove(rp)?.dispose();
419+
unawaited(requestEncrytionKey(groupCall, [rp]));
420+
},
421+
);
422+
}
390423
}
391424

392425
@override
@@ -403,8 +436,15 @@ class LiveKitBackend extends CallBackend {
403436
final keyContent = EncryptionKeysEventContent.fromJson(content);
404437

405438
final callId = keyContent.callId;
406-
final p =
407-
CallParticipant(groupCall.voip, userId: userId, deviceId: deviceId);
439+
final p = CallParticipant(
440+
groupCall.voip,
441+
userId: userId,
442+
deviceId: deviceId,
443+
);
444+
445+
// Cancel any pending retry for this participant since we received keys
446+
_requestEncryptionKeyPending.remove(p)?.dispose();
447+
_keyRequestRetryCount.remove(p);
408448

409449
if (keyContent.keys.isEmpty) {
410450
Logs().w(
@@ -471,11 +511,7 @@ class LiveKitBackend extends CallBackend {
471511
groupCall,
472512
_latestLocalKeyIndex,
473513
sendTo: [
474-
CallParticipant(
475-
groupCall.voip,
476-
userId: userId,
477-
deviceId: deviceId,
478-
),
514+
CallParticipant(groupCall.voip, userId: userId, deviceId: deviceId),
479515
],
480516
);
481517
return true;
@@ -525,16 +561,14 @@ class LiveKitBackend extends CallBackend {
525561
if (_memberLeaveEncKeyRotateDebounceTimer != null) {
526562
_memberLeaveEncKeyRotateDebounceTimer!.cancel();
527563
}
528-
_memberLeaveEncKeyRotateDebounceTimer =
529-
Timer(groupCall.voip.timeouts!.makeKeyOnLeaveDelay, () async {
530-
// we skipJoinDebounce here because we want to make sure a new key is generated
531-
// and that the join debounce does not block us from making a new key
532-
await _makeNewSenderKey(
533-
groupCall,
534-
true,
535-
skipJoinDebounce: true,
536-
);
537-
});
564+
_memberLeaveEncKeyRotateDebounceTimer = Timer(
565+
groupCall.voip.timeouts!.makeKeyOnLeaveDelay,
566+
() async {
567+
// we skipJoinDebounce here because we want to make sure a new key is generated
568+
// and that the join debounce does not block us from making a new key
569+
await _makeNewSenderKey(groupCall, true, skipJoinDebounce: true);
570+
},
571+
);
538572
}
539573

540574
@override
@@ -545,6 +579,13 @@ class LiveKitBackend extends CallBackend {
545579
_currentLocalKeyIndex = 0;
546580
_latestLocalKeyIndex = 0;
547581
_memberLeaveEncKeyRotateDebounceTimer?.cancel();
582+
583+
// Clean up all pending encryption key request retries
584+
for (final retry in _requestEncryptionKeyPending.values) {
585+
retry.dispose();
586+
}
587+
_requestEncryptionKeyPending.clear();
588+
_keyRequestRetryCount.clear();
548589
}
549590

550591
@override

0 commit comments

Comments
 (0)