Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 78 additions & 46 deletions lib/src/voip/backend/livekit_backend.dart
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ class LiveKitBackend extends CallBackend {
/// participant:keyIndex:keyBin
final Map<CallParticipant, Map<int, Uint8List>> _encryptionKeysMap = {};

/// Tracks pending encryption key request retries per participant
final Map<CallParticipant, Timer> _requestEncryptionKeyPending = {};

/// Retry interval for key requests
static const Duration _keyRequestRetryInterval = Duration(seconds: 2);

final List<Future> _setNewKeyTimeouts = [];

int _indexCounter = 0;
Expand Down Expand Up @@ -79,10 +85,7 @@ class LiveKitBackend extends CallBackend {
'_makeNewSenderKey using previous key because last created at ${_lastNewKeyTime.toString()}',
);
// still a fairly new key, just send that
await _sendEncryptionKeysEvent(
groupCall,
_latestLocalKeyIndex,
);
await _sendEncryptionKeysEvent(groupCall, _latestLocalKeyIndex);
return;
}

Expand Down Expand Up @@ -222,20 +225,22 @@ class LiveKitBackend extends CallBackend {
);
// now wait for the key to propogate and then set it, hopefully users can
// stil decrypt everything
final useKeyTimeout =
Future.delayed(groupCall.voip.timeouts!.useKeyDelay, () async {
Logs().i(
'[VOIP E2EE] delayed setting key changed event for ${participant.id} idx $encryptionKeyIndex key $encryptionKeyBin',
);
await groupCall.voip.delegate.keyProvider?.onSetEncryptionKey(
participant,
encryptionKeyBin,
encryptionKeyIndex,
);
if (participant.isLocal) {
_currentLocalKeyIndex = encryptionKeyIndex;
}
});
final useKeyTimeout = Future.delayed(
groupCall.voip.timeouts!.useKeyDelay,
() async {
Logs().i(
'[VOIP E2EE] delayed setting key changed event for ${participant.id} idx $encryptionKeyIndex key $encryptionKeyBin',
);
await groupCall.voip.delegate.keyProvider?.onSetEncryptionKey(
participant,
encryptionKeyBin,
encryptionKeyIndex,
);
if (participant.isLocal) {
_currentLocalKeyIndex = encryptionKeyIndex;
}
},
);
_setNewKeyTimeouts.add(useKeyTimeout);
} else {
Logs().i(
Expand Down Expand Up @@ -271,17 +276,15 @@ class LiveKitBackend extends CallBackend {
'[VOIP E2EE] _sendEncryptionKeysEvent Tried to send encryption keys event but no keys found!',
);
await _makeNewSenderKey(groupCall, false);
await _sendEncryptionKeysEvent(
groupCall,
keyIndex,
sendTo: sendTo,
);
await _sendEncryptionKeysEvent(groupCall, keyIndex, sendTo: sendTo);
return;
}

try {
final keyContent = EncryptionKeysEventContent(
[EncryptionKeyEntry(keyIndex, base64Encode(myLatestKey))],
[
EncryptionKeyEntry(keyIndex, base64Encode(myLatestKey)),
],
groupCall.groupCallId,
);
final Map<String, Object> data = {
Expand All @@ -300,11 +303,7 @@ class LiveKitBackend extends CallBackend {
);
} catch (e, s) {
Logs().e('[VOIP E2EE] Failed to send e2ee keys, retrying', e, s);
await _sendEncryptionKeysEvent(
groupCall,
keyIndex,
sendTo: sendTo,
);
await _sendEncryptionKeysEvent(groupCall, keyIndex, sendTo: sendTo);
}
}

Expand Down Expand Up @@ -375,6 +374,10 @@ class LiveKitBackend extends CallBackend {
GroupCallSession groupCall,
List<CallParticipant> remoteParticipants,
) async {
Logs().v(
'[VOIP E2EE] requesting stream encryption keys from ${remoteParticipants.map((e) => e.id)}',
);

final Map<String, Object> data = {
'conf_id': groupCall.groupCallId,
'device_id': groupCall.client.deviceID!,
Expand All @@ -387,6 +390,29 @@ class LiveKitBackend extends CallBackend {
data,
EventTypes.GroupCallMemberEncryptionKeysRequest,
);

// Set up retry timers for each participant
for (final rp in remoteParticipants) {
// Skip if a retry is already pending for this participant
if (_requestEncryptionKeyPending.containsKey(rp)) continue;

var retryCount = 0;
_requestEncryptionKeyPending[rp] = Timer.periodic(
_keyRequestRetryInterval,
(timer) {
retryCount++;
if (retryCount >= 5) {
Logs().w(
'[VOIP E2EE] Max retries (5) reached for ${rp.id}, giving up key request',
);
timer.cancel();
_requestEncryptionKeyPending.remove(rp);
return;
}
unawaited(requestEncrytionKey(groupCall, [rp]));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason this is unawaited?

},
);
}
}

@override
Expand All @@ -403,8 +429,14 @@ class LiveKitBackend extends CallBackend {
final keyContent = EncryptionKeysEventContent.fromJson(content);

final callId = keyContent.callId;
final p =
CallParticipant(groupCall.voip, userId: userId, deviceId: deviceId);
final p = CallParticipant(
groupCall.voip,
userId: userId,
deviceId: deviceId,
);

// Cancel any pending retry for this participant since we received keys
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this fix the issue mentioned in https://github.com/famedly/product-management/issues/2078? They might send us keys for index N+1 but never start using it, we should only cancel requests when we get a answer for the requested index

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad, I misread the issue! thank you

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will work on it now

_requestEncryptionKeyPending.remove(p)?.cancel();

if (keyContent.keys.isEmpty) {
Logs().w(
Expand Down Expand Up @@ -471,11 +503,7 @@ class LiveKitBackend extends CallBackend {
groupCall,
_latestLocalKeyIndex,
sendTo: [
CallParticipant(
groupCall.voip,
userId: userId,
deviceId: deviceId,
),
CallParticipant(groupCall.voip, userId: userId, deviceId: deviceId),
],
);
return true;
Expand Down Expand Up @@ -525,16 +553,14 @@ class LiveKitBackend extends CallBackend {
if (_memberLeaveEncKeyRotateDebounceTimer != null) {
_memberLeaveEncKeyRotateDebounceTimer!.cancel();
}
_memberLeaveEncKeyRotateDebounceTimer =
Timer(groupCall.voip.timeouts!.makeKeyOnLeaveDelay, () async {
// we skipJoinDebounce here because we want to make sure a new key is generated
// and that the join debounce does not block us from making a new key
await _makeNewSenderKey(
groupCall,
true,
skipJoinDebounce: true,
);
});
_memberLeaveEncKeyRotateDebounceTimer = Timer(
groupCall.voip.timeouts!.makeKeyOnLeaveDelay,
() async {
// we skipJoinDebounce here because we want to make sure a new key is generated
// and that the join debounce does not block us from making a new key
await _makeNewSenderKey(groupCall, true, skipJoinDebounce: true);
},
);
}

@override
Expand All @@ -545,6 +571,12 @@ class LiveKitBackend extends CallBackend {
_currentLocalKeyIndex = 0;
_latestLocalKeyIndex = 0;
_memberLeaveEncKeyRotateDebounceTimer?.cancel();

// Clean up all pending encryption key request retries
for (final timer in _requestEncryptionKeyPending.values) {
timer.cancel();
}
_requestEncryptionKeyPending.clear();
}

@override
Expand Down
177 changes: 177 additions & 0 deletions test/livekit_backend_test.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
import 'dart:convert';

import 'package:test/test.dart';

import 'package:matrix/matrix.dart';
import 'fake_client.dart';
import 'webrtc_stub.dart';

void main() {
late Client matrix;
late Room room;
late VoIP voip;
late LiveKitBackend backend;

group('LiveKitBackend encryption key retry', () {
Logs().level = Level.info;

setUp(() async {
matrix = await getClient();
await matrix.abortSync();

voip = VoIP(matrix, MockWebRTCDelegate());
VoIP.customTxid = '1234';
room = matrix.getRoomById('!calls:example.com')!;

backend = LiveKitBackend(
livekitServiceUrl: 'https://livekit.example.com',
livekitAlias: 'test_alias',
);

Logs().outputEvents.clear();
});

Future<GroupCallSession> createGroupCall(String callId) async {
final membership = CallMembership(
userId: matrix.userID!,
callId: callId,
backend: backend,
deviceId: matrix.deviceID!,
expiresTs:
DateTime.now().add(Duration(hours: 1)).millisecondsSinceEpoch,
roomId: room.id,
membershipId: voip.currentSessionId,
voip: voip,
eventId: 'membership_event_$callId',
);

room.setState(
Event(
content: {
'memberships': [membership.toJson()],
},
type: EventTypes.GroupCallMember,
eventId: 'membership_event_$callId',
senderId: matrix.userID!,
originServerTs: DateTime.now(),
room: room,
stateKey: matrix.userID!,
),
);

await voip.createGroupCallFromRoomStateEvent(membership);
final groupCall = voip.getGroupCallById(room.id, callId)!;
await groupCall.enter();
return groupCall;
}

int countKeyRequests(String userId) => Logs()
.outputEvents
.where(
(e) =>
e.title.contains('requesting stream encryption keys') &&
e.title.contains(userId),
)
.length;

test(
'retries keys periodically until received and receiving keys cancels retry for that participant only',
() async {
final groupCall = await createGroupCall('test1');
final p1 = CallParticipant(voip, userId: '@alice:x.com', deviceId: 'D1');
final p2 = CallParticipant(voip, userId: '@bob:x.com', deviceId: 'D2');

Logs().outputEvents.clear();
Logs().level = Level.verbose;

await backend.requestEncrytionKey(groupCall, [p1]);
await backend.requestEncrytionKey(groupCall, [p2]);

// Receive keys for p1 only
await backend.onCallEncryption(groupCall, '@alice:x.com', 'D1', {
'keys': [
{
'key': base64Encode([1, 2, 3, 4]),
'index': 0,
}
],
'call_id': 'test1',
});

final countP1 = countKeyRequests('@alice:x.com');
final countP2 = countKeyRequests('@bob:x.com');

await Future.delayed(Duration(milliseconds: 2100));

// p1 stopped, p2 continues
expect(countKeyRequests('@alice:x.com'), countP1);
expect(countKeyRequests('@bob:x.com'), greaterThan(countP2));

await backend.dispose(groupCall);
});

test('can start fresh retry cycle after receiving keys', () async {
final groupCall = await createGroupCall('test2');
final p = CallParticipant(voip, userId: '@bob:x.com', deviceId: 'D1');

Logs().outputEvents.clear();
Logs().level = Level.verbose;

// Request -> receive keys -> timer cancelled
await backend.requestEncrytionKey(groupCall, [p]);
await backend.onCallEncryption(groupCall, '@bob:x.com', 'D1', {
'keys': [
{
'key': base64Encode([1, 2, 3, 4]),
'index': 0,
}
],
'call_id': 'test2',
});

final countAfterReceive = countKeyRequests('@bob:x.com');

// New request starts fresh cycle
await backend.requestEncrytionKey(groupCall, [p]);
expect(countKeyRequests('@bob:x.com'), countAfterReceive + 1);

// New timer works
await Future.delayed(Duration(milliseconds: 2100));
expect(
countKeyRequests('@bob:x.com'),
greaterThan(countAfterReceive + 1),
);

await backend.dispose(groupCall);
});

test(
'stops after 5 retries',
() async {
final groupCall = await createGroupCall('test3');
final p = CallParticipant(voip, userId: '@bob:x.com', deviceId: 'D1');

Logs().outputEvents.clear();
Logs().level = Level.verbose;

await backend.requestEncrytionKey(groupCall, [p]);

// Wait for 5 retries (5 * 2s = 10s)
await Future.delayed(Duration(milliseconds: 10500));

final hasMaxRetryLog = Logs()
.outputEvents
.any((e) => e.title.contains('Max retries (5) reached'));
expect(hasMaxRetryLog, true);

// No more retries after max
final countAtMax = countKeyRequests('@bob:x.com');
await Future.delayed(Duration(milliseconds: 2100));
expect(countKeyRequests('@bob:x.com'), countAtMax);

await backend.dispose(groupCall);
},
timeout: Timeout(Duration(seconds: 20)),
);
});
}