Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .changes/fix-subscribed-event
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
patch type="fixed" "Fix race condition causing track subscriptions to fail when metadata arrives before connection completes"
87 changes: 61 additions & 26 deletions lib/src/core/room.dart
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
}
//
await publication.updateSubscriptionAllowed(event.allowed);
emitWhenConnected(TrackSubscriptionPermissionChangedEvent(
emitIfConnected(TrackSubscriptionPermissionChangedEvent(
participant: participant,
publication: publication,
state: publication.subscriptionState,
Expand All @@ -380,10 +380,10 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
..on<SignalRoomUpdateEvent>((event) async {
_metadata = event.room.metadata;
_roomInfo = event.room;
emitWhenConnected(RoomMetadataChangedEvent(metadata: event.room.metadata));
emitIfConnected(RoomMetadataChangedEvent(metadata: event.room.metadata));
if (_isRecording != event.room.activeRecording) {
_isRecording = event.room.activeRecording;
emitWhenConnected(RoomRecordingStatusChanged(activeRecording: _isRecording));
emitIfConnected(RoomRecordingStatusChanged(activeRecording: _isRecording));
}
})
..on<SignalRemoteMuteTrackEvent>((event) async {
Expand Down Expand Up @@ -416,7 +416,7 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {

if (_isRecording != event.response.room.activeRecording) {
_isRecording = event.response.room.activeRecording;
emitWhenConnected(RoomRecordingStatusChanged(activeRecording: _isRecording));
emitIfConnected(RoomRecordingStatusChanged(activeRecording: _isRecording));
}

logger.fine('[Engine] Received JoinResponse, '
Expand Down Expand Up @@ -588,7 +588,7 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
trackSid = streamId;
}

final participant = _getRemoteParticipantBySid(participantSid);
var participant = _getRemoteParticipantBySid(participantSid);
try {
if (trackSid == null || trackSid.isEmpty) {
throw TrackSubscriptionExceptionEvent(
Expand All @@ -597,11 +597,26 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
);
}
if (participant == null) {
throw TrackSubscriptionExceptionEvent(
participant: participant,
sid: trackSid,
reason: TrackSubscribeFailReason.noParticipantFound,
);
logger.fine('EngineTrackAddedEvent participant is null, waiting for participant metadata...');
// Wait for participant metadata to arrive
try {
final availableEvent = await events.waitFor<InternalParticipantAvailableEvent>(
filter: (event) => event.participantSid == participantSid,
duration: connectOptions.timeouts.publish,
);
participant = availableEvent.participant;
logger.fine('EngineTrackAddedEvent participant metadata received');
} catch (e) {
logger.severe('EngineTrackAddedEvent timeout waiting for participant metadata: $e');
}

if (participant == null) {
throw TrackSubscriptionExceptionEvent(
participant: participant,
sid: trackSid,
reason: TrackSubscribeFailReason.noParticipantFound,
);
}
}
await participant.addSubscribedMediaTrack(
event.track,
Expand All @@ -611,11 +626,11 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
audioOutputOptions: roomOptions.defaultAudioOutputOptions,
);
} on TrackSubscriptionExceptionEvent catch (event) {
logger.severe('addSubscribedMediaTrack() throwed ${event}');
logger.severe('Track subscription failed: ${event}');
events.emit(event);
} catch (exception) {
// We don't want to pass up any exception so catch everything here.
logger.warning('Unknown exception on addSubscribedMediaTrack() ${exception}');
logger.warning('Unknown exception during track subscription: ${exception}');
}
});

Expand Down Expand Up @@ -678,6 +693,12 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {

_remoteParticipants[result.participant.identity] = result.participant;
_sidToIdentity[result.participant.sid] = result.participant.identity;

// Emit internal event for tracks waiting for participant metadata
events.emit(InternalParticipantAvailableEvent(
participant: result.participant,
));

return result;
}

Expand Down Expand Up @@ -710,22 +731,36 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
if (isNew) {
hasChanged = true;
// Emit connected event
emitWhenConnected(ParticipantConnectedEvent(participant: result.participant));
emitIfConnected(ParticipantConnectedEvent(participant: result.participant));
// Emit TrackPublishedEvent for each new track
if (connectionState == ConnectionState.connected) {
for (final pub in result.newPublications) {
final event = TrackPublishedEvent(
participant: result.participant,
publication: pub,
);
[result.participant.events, events].emit(event);
}
for (final pub in result.newPublications) {
// Always emit internal event (for addSubscribedMediaTrack)
result.participant.events.emit(InternalTrackPublishedEvent(
participant: result.participant,
publication: pub,
));

// Only emit public event when connected (for apps)
emitIfConnected(TrackPublishedEvent(
participant: result.participant,
publication: pub,
));
}
_sidToIdentity[info.sid] = info.identity;

// Emit internal event for tracks waiting for participant metadata
events.emit(InternalParticipantAvailableEvent(
participant: result.participant,
));
} else {
final wasUpdated = await result.participant.updateFromInfo(info);
if (wasUpdated) {
_sidToIdentity[info.sid] = info.identity;

// Emit internal event for tracks waiting for participant metadata
events.emit(InternalParticipantAvailableEvent(
participant: result.participant,
));
}
}
}
Expand Down Expand Up @@ -757,7 +792,7 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
final activeSpeakers = lastSpeakers.values.toList();
activeSpeakers.sort((a, b) => b.audioLevel.compareTo(a.audioLevel));
_activeSpeakers = activeSpeakers;
emitWhenConnected(ActiveSpeakersChangedEvent(speakers: activeSpeakers));
emitIfConnected(ActiveSpeakersChangedEvent(speakers: activeSpeakers));
}

// from data channel
Expand Down Expand Up @@ -790,7 +825,7 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
}

_activeSpeakers = activeSpeakers;
emitWhenConnected(ActiveSpeakersChangedEvent(speakers: activeSpeakers));
emitIfConnected(ActiveSpeakersChangedEvent(speakers: activeSpeakers));
}

void _onSignalConnectionQualityUpdateEvent(List<lk_rtc.ConnectionQualityInfo> updates) {
Expand Down Expand Up @@ -824,7 +859,7 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
if (trackPublication == null) continue;
// update the stream state
await trackPublication.updateStreamState(update.state.toLKType());
emitWhenConnected(TrackStreamStateUpdatedEvent(
emitIfConnected(TrackStreamStateUpdatedEvent(
participant: participant,
publication: trackPublication,
streamState: update.state.toLKType(),
Expand Down Expand Up @@ -894,7 +929,7 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {

await participant.removeAllPublishedTracks(notify: true);

emitWhenConnected(ParticipantDisconnectedEvent(participant: participant));
emitIfConnected(ParticipantDisconnectedEvent(participant: participant));
return true;
}

Expand Down Expand Up @@ -967,7 +1002,7 @@ extension RoomPrivateMethods on Room {
}

@internal
void emitWhenConnected(RoomEvent event) {
void emitIfConnected(RoomEvent event) {
if (connectionState == ConnectionState.connected) {
events.emit(event);
}
Expand Down
38 changes: 38 additions & 0 deletions lib/src/internal/events.dart
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ import 'package:meta/meta.dart';

import '../e2ee/options.dart';
import '../events.dart';
import '../participant/remote.dart' show RemoteParticipant;
import '../proto/livekit_models.pb.dart' as lk_models;
import '../proto/livekit_rtc.pb.dart' as lk_rtc;
import '../publication/remote.dart' show RemoteTrackPublication;
import '../track/local/local.dart';
import '../track/options.dart';
import '../track/track.dart';
Expand Down Expand Up @@ -399,6 +401,42 @@ class SignalLocalTrackPublishedEvent with SignalEvent, InternalEvent {
String toString() => '${runtimeType}(cid: ${cid}, track: ${track})';
}

/// Internal event for track publication metadata arrival.
/// Used by addSubscribedMediaTrack to wait for publication metadata.
/// This event always fires regardless of connection state.
/// Apps should listen to TrackPublishedEvent instead (only fires when connected).
@internal
class InternalTrackPublishedEvent with ParticipantEvent, InternalEvent {
final RemoteParticipant participant;
final RemoteTrackPublication publication;

const InternalTrackPublishedEvent({
required this.participant,
required this.publication,
});

@override
String toString() => '${runtimeType}'
'(participant: ${participant}, publication: ${publication})';
}

/// Internal event fired when a participant becomes available (added to _sidToIdentity map).
/// Used by EngineTrackAddedEvent handler to wait for participant metadata when tracks arrive
/// before participant info is processed from JoinResponse or ParticipantUpdate.
@internal
class InternalParticipantAvailableEvent with RoomEvent, InternalEvent {
final RemoteParticipant participant;

const InternalParticipantAvailableEvent({
required this.participant,
});

String get participantSid => participant.sid;

@override
String toString() => '${runtimeType}(participant: ${participant.sid})';
}

@internal
class SignalTrackUnpublishedEvent with SignalEvent, InternalEvent {
final String trackSid;
Expand Down
15 changes: 11 additions & 4 deletions lib/src/participant/remote.dart
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,8 @@ class RemoteParticipant extends Participant<RemoteTrackPublication> {
if (pub == null) {
logger.fine('addSubscribedMediaTrack() pub is null, will wait...');
logger.fine('addSubscribedMediaTrack() tracks: $trackPublications');
// Wait for the metadata to arrive
final event = await events.waitFor<TrackPublishedEvent>(
// Wait for the metadata to arrive (using internal event)
final event = await events.waitFor<InternalTrackPublishedEvent>(
filter: (event) => event.participant == this && event.publication.sid == trackSid,
duration: room.connectOptions.timeouts.publish,
onTimeout: () => throw TrackSubscriptionExceptionEvent(
Expand Down Expand Up @@ -273,11 +273,18 @@ class RemoteParticipant extends Participant<RemoteTrackPublication> {

// Emit events for new publications
for (final pub in newPubs) {
final event = TrackPublishedEvent(
// Always emit internal event (for addSubscribedMediaTrack)
events.emit(InternalTrackPublishedEvent(
participant: this,
publication: pub,
);
));

// Only emit public event when connected (for apps)
if (room.connectionState == ConnectionState.connected) {
final event = TrackPublishedEvent(
participant: this,
publication: pub,
);
[events, room.events].emit(event);
}
}
Expand Down
Loading