Skip to content

Commit 08351c4

Browse files
authored
Fix events emit order (#902)
Fixes events emit order after async related fixes
1 parent 9de81dc commit 08351c4

File tree

4 files changed

+151
-48
lines changed

4 files changed

+151
-48
lines changed

lib/livekit_client.dart

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ export 'src/managers/event.dart';
3030
export 'src/options.dart';
3131
export 'src/participant/local.dart';
3232
export 'src/participant/participant.dart';
33-
export 'src/participant/remote.dart';
33+
export 'src/participant/remote.dart' hide ParticipantCreationResult;
3434
export 'src/publication/local.dart';
3535
export 'src/publication/remote.dart';
3636
export 'src/publication/track_publication.dart';

lib/src/core/room.dart

Lines changed: 31 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -636,15 +636,18 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
636636
return null;
637637
}
638638

639-
Future<RemoteParticipant> _getOrCreateRemoteParticipant(String identity, lk_models.ParticipantInfo? info) async {
639+
Future<ParticipantCreationResult> _getOrCreateRemoteParticipant(
640+
String identity, lk_models.ParticipantInfo? info) async {
640641
RemoteParticipant? participant = _remoteParticipants[identity];
641642
if (participant != null) {
642-
if (info != null) {
643-
await participant.updateFromInfo(info);
644-
}
645-
return participant;
643+
// Return existing participant with no new publications; caller handles updates.
644+
return ParticipantCreationResult(
645+
participant: participant,
646+
newPublications: const [],
647+
);
646648
}
647649

650+
ParticipantCreationResult result;
648651
if (info == null) {
649652
logger.warning('RemoteParticipant.info is null identity: $identity');
650653
participant = RemoteParticipant(
@@ -653,16 +656,20 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
653656
identity: identity,
654657
name: '',
655658
);
659+
result = ParticipantCreationResult(
660+
participant: participant,
661+
newPublications: const [],
662+
);
656663
} else {
657-
participant = await RemoteParticipant.createFromInfo(
664+
result = await RemoteParticipant.createFromInfo(
658665
room: this,
659666
info: info,
660667
);
661668
}
662669

663-
_remoteParticipants[identity] = participant;
664-
_sidToIdentity[participant.sid] = identity;
665-
return participant;
670+
_remoteParticipants[result.participant.identity] = result.participant;
671+
_sidToIdentity[result.participant.sid] = result.participant.identity;
672+
return result;
666673
}
667674

668675
Future<void> _onParticipantUpdateEvent(List<lk_models.ParticipantInfo> updates) async {
@@ -689,14 +696,25 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
689696
continue;
690697
}
691698

692-
final participant = await _getOrCreateRemoteParticipant(info.identity, info);
699+
final result = await _getOrCreateRemoteParticipant(info.identity, info);
693700

694701
if (isNew) {
695702
hasChanged = true;
696-
// fire connected event
697-
emitWhenConnected(ParticipantConnectedEvent(participant: participant));
703+
// Emit connected event
704+
emitWhenConnected(ParticipantConnectedEvent(participant: result.participant));
705+
// Emit TrackPublishedEvent for each new track
706+
if (connectionState == ConnectionState.connected) {
707+
for (final pub in result.newPublications) {
708+
final event = TrackPublishedEvent(
709+
participant: result.participant,
710+
publication: pub,
711+
);
712+
[result.participant.events, events].emit(event);
713+
}
714+
}
715+
_sidToIdentity[info.sid] = info.identity;
698716
} else {
699-
final wasUpdated = await participant.updateFromInfo(info);
717+
final wasUpdated = await result.participant.updateFromInfo(info);
700718
if (wasUpdated) {
701719
_sidToIdentity[info.sid] = info.identity;
702720
}

lib/src/participant/remote.dart

Lines changed: 88 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,28 @@ import '../track/remote/video.dart';
3131
import '../types/other.dart';
3232
import 'participant.dart';
3333

34+
/// Result of creating a RemoteParticipant with all its initial data populated.
35+
///
36+
/// This struct is returned by [RemoteParticipant.createFromInfo] and contains
37+
/// the fully initialized participant along with the list of track publications
38+
/// that were added during creation. The caller is responsible for emitting
39+
/// events in the correct order.
40+
@internal
41+
class ParticipantCreationResult {
42+
/// The fully initialized remote participant with all basic info and tracks populated.
43+
final RemoteParticipant participant;
44+
45+
/// List of new track publications that were added during participant creation.
46+
/// The caller should emit [TrackPublishedEvent] for each of these after
47+
/// emitting [ParticipantConnectedEvent].
48+
final List<RemoteTrackPublication> newPublications;
49+
50+
const ParticipantCreationResult({
51+
required this.participant,
52+
required this.newPublications,
53+
});
54+
}
55+
3456
/// Represents other participant in the [Room].
3557
class RemoteParticipant extends Participant<RemoteTrackPublication> {
3658
@internal
@@ -46,18 +68,70 @@ class RemoteParticipant extends Participant<RemoteTrackPublication> {
4668
name: name,
4769
);
4870

49-
static Future<RemoteParticipant> createFromInfo({
71+
/// Creates a fully initialized RemoteParticipant without emitting events.
72+
///
73+
/// Populates the participant with all data from [info] including metadata, permissions,
74+
/// and track publications. No events are emitted, allowing the caller to control event
75+
/// timing and order.
76+
///
77+
/// Returns [ParticipantCreationResult] with the participant and new track publications.
78+
/// The caller should emit [ParticipantConnectedEvent] first, then [TrackPublishedEvent]
79+
/// for each track, ensuring the participant is fully populated when connected event fires.
80+
///
81+
/// @internal - Should only be called by [Room].
82+
@internal
83+
static Future<ParticipantCreationResult> createFromInfo({
5084
required Room room,
5185
required lk_models.ParticipantInfo info,
5286
}) async {
5387
final participant = RemoteParticipant(
5488
room: room,
55-
sid: info.identity,
89+
sid: info.sid,
5690
identity: info.identity,
5791
name: info.name,
5892
);
59-
await participant.updateFromInfo(info);
60-
return participant;
93+
// Update basic participant info (state, metadata, etc.)
94+
await participant._updateBasicInfo(info);
95+
// Add tracks to participant without emitting events
96+
final newPubs = await participant._addTracks(info.tracks);
97+
// Return result for caller to emit events in correct order
98+
return ParticipantCreationResult(
99+
participant: participant,
100+
newPublications: newPubs,
101+
);
102+
}
103+
104+
Future<void> _updateBasicInfo(lk_models.ParticipantInfo info) async {
105+
// Only call superclass updateFromInfo to update basic participant state
106+
await super.updateFromInfo(info);
107+
}
108+
109+
Future<List<RemoteTrackPublication>> _addTracks(List<lk_models.TrackInfo> tracks) async {
110+
final newPubs = <RemoteTrackPublication>[];
111+
for (final trackInfo in tracks) {
112+
final RemoteTrackPublication? pub = getTrackPublicationBySid(trackInfo.sid);
113+
if (pub == null) {
114+
final RemoteTrackPublication pub;
115+
if (trackInfo.type == lk_models.TrackType.VIDEO) {
116+
pub = RemoteTrackPublication<RemoteVideoTrack>(
117+
participant: this,
118+
info: trackInfo,
119+
);
120+
} else if (trackInfo.type == lk_models.TrackType.AUDIO) {
121+
pub = RemoteTrackPublication<RemoteAudioTrack>(
122+
participant: this,
123+
info: trackInfo,
124+
);
125+
} else {
126+
throw UnexpectedStateException('Unknown track type');
127+
}
128+
newPubs.add(pub);
129+
addTrackPublication(pub);
130+
} else {
131+
pub.updateFromInfo(trackInfo);
132+
}
133+
}
134+
return newPubs;
61135
}
62136

63137
/// A convenience property to get all video tracks.
@@ -189,34 +263,16 @@ class RemoteParticipant extends Participant<RemoteTrackPublication> {
189263
//return false;
190264
}
191265

192-
// figuring out deltas between tracks
193-
final newPubs = <RemoteTrackPublication>{};
266+
await _updateTracks(info.tracks);
194267

195-
for (final trackInfo in info.tracks) {
196-
final RemoteTrackPublication? pub = getTrackPublicationBySid(trackInfo.sid);
197-
if (pub == null) {
198-
final RemoteTrackPublication pub;
199-
if (trackInfo.type == lk_models.TrackType.VIDEO) {
200-
pub = RemoteTrackPublication<RemoteVideoTrack>(
201-
participant: this,
202-
info: trackInfo,
203-
);
204-
} else if (trackInfo.type == lk_models.TrackType.AUDIO) {
205-
pub = RemoteTrackPublication<RemoteAudioTrack>(
206-
participant: this,
207-
info: trackInfo,
208-
);
209-
} else {
210-
throw UnexpectedStateException('Unknown track type');
211-
}
212-
newPubs.add(pub);
213-
addTrackPublication(pub);
214-
} else {
215-
pub.updateFromInfo(trackInfo);
216-
}
217-
}
268+
return true;
269+
}
218270

219-
// always emit events for new publications, Room will not forward them unless it's ready
271+
Future<void> _updateTracks(List<lk_models.TrackInfo> tracks) async {
272+
// Add new tracks
273+
final newPubs = await _addTracks(tracks);
274+
275+
// Emit events for new publications
220276
for (final pub in newPubs) {
221277
final event = TrackPublishedEvent(
222278
participant: this,
@@ -227,14 +283,12 @@ class RemoteParticipant extends Participant<RemoteTrackPublication> {
227283
}
228284
}
229285

230-
// remove any published track that is not in the info
231-
final validSids = info.tracks.map((e) => e.sid);
286+
// Remove any published track that is not in the info
287+
final validSids = tracks.map((e) => e.sid);
232288
final removeSids = trackPublications.keys.where((e) => !validSids.contains(e)).toSet();
233289
for (final sid in removeSids) {
234290
await removePublishedTrack(sid);
235291
}
236-
237-
return true;
238292
}
239293

240294
Future<void> removePublishedTrack(String trackSid, {bool notify = true}) async {

test/core/room_e2e_test.dart

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,37 @@ void main() {
6868
expect(room.remoteParticipants.length, 1);
6969
});
7070

71+
test('participant join with tracks populated before connected event', () async {
72+
// Track whether participant has tracks when connected event fires
73+
bool participantHadTracksOnConnect = false;
74+
int trackCountOnConnect = 0;
75+
76+
// Listen for ParticipantConnectedEvent
77+
final cancel = room.events.on<ParticipantConnectedEvent>((event) {
78+
// Verify participant is fully populated with tracks
79+
trackCountOnConnect = event.participant.trackPublications.length;
80+
participantHadTracksOnConnect = trackCountOnConnect > 0;
81+
});
82+
83+
// Send participant join with tracks
84+
ws.onData(participantJoinResponse.writeToBuffer());
85+
86+
// Wait for connected event
87+
await room.events.waitFor<ParticipantConnectedEvent>(duration: const Duration(seconds: 1));
88+
89+
// Clean up listener
90+
cancel();
91+
92+
// Verify participant had tracks when connected event was emitted
93+
expect(participantHadTracksOnConnect, isTrue,
94+
reason: 'Participant should have tracks when ParticipantConnectedEvent is emitted');
95+
expect(trackCountOnConnect, greaterThan(0),
96+
reason: 'Participant should have at least one track when connected event fires');
97+
98+
// Verify the participant is in the room
99+
expect(room.remoteParticipants.length, 1);
100+
});
101+
71102
test('participant disconnect', () async {
72103
ws.onData(participantJoinResponse.writeToBuffer());
73104
await room.events.waitFor<ParticipantConnectedEvent>(duration: const Duration(seconds: 1));

0 commit comments

Comments
 (0)