diff --git a/.changes/fix-subscribed-event b/.changes/fix-subscribed-event new file mode 100644 index 00000000..26c6c84f --- /dev/null +++ b/.changes/fix-subscribed-event @@ -0,0 +1 @@ +patch type="fixed" "Fix race condition causing track subscriptions to fail when metadata arrives before connection completes" diff --git a/lib/src/core/room.dart b/lib/src/core/room.dart index 7375381f..c2f4454b 100644 --- a/lib/src/core/room.dart +++ b/lib/src/core/room.dart @@ -371,7 +371,7 @@ class Room extends DisposableChangeNotifier with EventsEmittable { } // await publication.updateSubscriptionAllowed(event.allowed); - emitWhenConnected(TrackSubscriptionPermissionChangedEvent( + emitIfConnected(TrackSubscriptionPermissionChangedEvent( participant: participant, publication: publication, state: publication.subscriptionState, @@ -380,10 +380,10 @@ class Room extends DisposableChangeNotifier with EventsEmittable { ..on((event) async { _metadata = event.room.metadata; _roomInfo = event.room; - emitWhenConnected(RoomMetadataChangedEvent(metadata: event.room.metadata)); + emitIfConnected(RoomMetadataChangedEvent(metadata: event.room.metadata)); if (_isRecording != event.room.activeRecording) { _isRecording = event.room.activeRecording; - emitWhenConnected(RoomRecordingStatusChanged(activeRecording: _isRecording)); + emitIfConnected(RoomRecordingStatusChanged(activeRecording: _isRecording)); } }) ..on((event) async { @@ -416,7 +416,7 @@ class Room extends DisposableChangeNotifier with EventsEmittable { if (_isRecording != event.response.room.activeRecording) { _isRecording = event.response.room.activeRecording; - emitWhenConnected(RoomRecordingStatusChanged(activeRecording: _isRecording)); + emitIfConnected(RoomRecordingStatusChanged(activeRecording: _isRecording)); } logger.fine('[Engine] Received JoinResponse, ' @@ -588,7 +588,7 @@ class Room extends DisposableChangeNotifier with EventsEmittable { trackSid = streamId; } - final participant = _getRemoteParticipantBySid(participantSid); + var participant = _getRemoteParticipantBySid(participantSid); try { if (trackSid == null || trackSid.isEmpty) { throw TrackSubscriptionExceptionEvent( @@ -597,11 +597,26 @@ class Room extends DisposableChangeNotifier with EventsEmittable { ); } if (participant == null) { - throw TrackSubscriptionExceptionEvent( - participant: participant, - sid: trackSid, - reason: TrackSubscribeFailReason.noParticipantFound, - ); + logger.fine('EngineTrackAddedEvent participant is null, waiting for participant metadata...'); + // Wait for participant metadata to arrive + try { + final availableEvent = await events.waitFor( + filter: (event) => event.participantSid == participantSid, + duration: connectOptions.timeouts.publish, + ); + participant = availableEvent.participant; + logger.fine('EngineTrackAddedEvent participant metadata received'); + } catch (e) { + logger.severe('EngineTrackAddedEvent timeout waiting for participant metadata: $e'); + } + + if (participant == null) { + throw TrackSubscriptionExceptionEvent( + participant: participant, + sid: trackSid, + reason: TrackSubscribeFailReason.noParticipantFound, + ); + } } await participant.addSubscribedMediaTrack( event.track, @@ -611,11 +626,11 @@ class Room extends DisposableChangeNotifier with EventsEmittable { audioOutputOptions: roomOptions.defaultAudioOutputOptions, ); } on TrackSubscriptionExceptionEvent catch (event) { - logger.severe('addSubscribedMediaTrack() throwed ${event}'); + logger.severe('Track subscription failed: ${event}'); events.emit(event); } catch (exception) { // We don't want to pass up any exception so catch everything here. - logger.warning('Unknown exception on addSubscribedMediaTrack() ${exception}'); + logger.warning('Unknown exception during track subscription: ${exception}'); } }); @@ -678,6 +693,12 @@ class Room extends DisposableChangeNotifier with EventsEmittable { _remoteParticipants[result.participant.identity] = result.participant; _sidToIdentity[result.participant.sid] = result.participant.identity; + + // Emit internal event for tracks waiting for participant metadata + events.emit(InternalParticipantAvailableEvent( + participant: result.participant, + )); + return result; } @@ -710,22 +731,36 @@ class Room extends DisposableChangeNotifier with EventsEmittable { if (isNew) { hasChanged = true; // Emit connected event - emitWhenConnected(ParticipantConnectedEvent(participant: result.participant)); + emitIfConnected(ParticipantConnectedEvent(participant: result.participant)); // Emit TrackPublishedEvent for each new track - if (connectionState == ConnectionState.connected) { - for (final pub in result.newPublications) { - final event = TrackPublishedEvent( - participant: result.participant, - publication: pub, - ); - [result.participant.events, events].emit(event); - } + for (final pub in result.newPublications) { + // Always emit internal event (for addSubscribedMediaTrack) + result.participant.events.emit(InternalTrackPublishedEvent( + participant: result.participant, + publication: pub, + )); + + // Only emit public event when connected (for apps) + emitIfConnected(TrackPublishedEvent( + participant: result.participant, + publication: pub, + )); } _sidToIdentity[info.sid] = info.identity; + + // Emit internal event for tracks waiting for participant metadata + events.emit(InternalParticipantAvailableEvent( + participant: result.participant, + )); } else { final wasUpdated = await result.participant.updateFromInfo(info); if (wasUpdated) { _sidToIdentity[info.sid] = info.identity; + + // Emit internal event for tracks waiting for participant metadata + events.emit(InternalParticipantAvailableEvent( + participant: result.participant, + )); } } } @@ -757,7 +792,7 @@ class Room extends DisposableChangeNotifier with EventsEmittable { final activeSpeakers = lastSpeakers.values.toList(); activeSpeakers.sort((a, b) => b.audioLevel.compareTo(a.audioLevel)); _activeSpeakers = activeSpeakers; - emitWhenConnected(ActiveSpeakersChangedEvent(speakers: activeSpeakers)); + emitIfConnected(ActiveSpeakersChangedEvent(speakers: activeSpeakers)); } // from data channel @@ -790,7 +825,7 @@ class Room extends DisposableChangeNotifier with EventsEmittable { } _activeSpeakers = activeSpeakers; - emitWhenConnected(ActiveSpeakersChangedEvent(speakers: activeSpeakers)); + emitIfConnected(ActiveSpeakersChangedEvent(speakers: activeSpeakers)); } void _onSignalConnectionQualityUpdateEvent(List updates) { @@ -824,7 +859,7 @@ class Room extends DisposableChangeNotifier with EventsEmittable { if (trackPublication == null) continue; // update the stream state await trackPublication.updateStreamState(update.state.toLKType()); - emitWhenConnected(TrackStreamStateUpdatedEvent( + emitIfConnected(TrackStreamStateUpdatedEvent( participant: participant, publication: trackPublication, streamState: update.state.toLKType(), @@ -894,7 +929,7 @@ class Room extends DisposableChangeNotifier with EventsEmittable { await participant.removeAllPublishedTracks(notify: true); - emitWhenConnected(ParticipantDisconnectedEvent(participant: participant)); + emitIfConnected(ParticipantDisconnectedEvent(participant: participant)); return true; } @@ -967,7 +1002,7 @@ extension RoomPrivateMethods on Room { } @internal - void emitWhenConnected(RoomEvent event) { + void emitIfConnected(RoomEvent event) { if (connectionState == ConnectionState.connected) { events.emit(event); } diff --git a/lib/src/internal/events.dart b/lib/src/internal/events.dart index 1e9be51d..5b2f6c7b 100644 --- a/lib/src/internal/events.dart +++ b/lib/src/internal/events.dart @@ -18,8 +18,10 @@ import 'package:meta/meta.dart'; import '../e2ee/options.dart'; import '../events.dart'; +import '../participant/remote.dart' show RemoteParticipant; import '../proto/livekit_models.pb.dart' as lk_models; import '../proto/livekit_rtc.pb.dart' as lk_rtc; +import '../publication/remote.dart' show RemoteTrackPublication; import '../track/local/local.dart'; import '../track/options.dart'; import '../track/track.dart'; @@ -399,6 +401,42 @@ class SignalLocalTrackPublishedEvent with SignalEvent, InternalEvent { String toString() => '${runtimeType}(cid: ${cid}, track: ${track})'; } +/// Internal event for track publication metadata arrival. +/// Used by addSubscribedMediaTrack to wait for publication metadata. +/// This event always fires regardless of connection state. +/// Apps should listen to TrackPublishedEvent instead (only fires when connected). +@internal +class InternalTrackPublishedEvent with ParticipantEvent, InternalEvent { + final RemoteParticipant participant; + final RemoteTrackPublication publication; + + const InternalTrackPublishedEvent({ + required this.participant, + required this.publication, + }); + + @override + String toString() => '${runtimeType}' + '(participant: ${participant}, publication: ${publication})'; +} + +/// Internal event fired when a participant becomes available (added to _sidToIdentity map). +/// Used by EngineTrackAddedEvent handler to wait for participant metadata when tracks arrive +/// before participant info is processed from JoinResponse or ParticipantUpdate. +@internal +class InternalParticipantAvailableEvent with RoomEvent, InternalEvent { + final RemoteParticipant participant; + + const InternalParticipantAvailableEvent({ + required this.participant, + }); + + String get participantSid => participant.sid; + + @override + String toString() => '${runtimeType}(participant: ${participant.sid})'; +} + @internal class SignalTrackUnpublishedEvent with SignalEvent, InternalEvent { final String trackSid; diff --git a/lib/src/participant/remote.dart b/lib/src/participant/remote.dart index 95ebd82b..5f740849 100644 --- a/lib/src/participant/remote.dart +++ b/lib/src/participant/remote.dart @@ -188,8 +188,8 @@ class RemoteParticipant extends Participant { if (pub == null) { logger.fine('addSubscribedMediaTrack() pub is null, will wait...'); logger.fine('addSubscribedMediaTrack() tracks: $trackPublications'); - // Wait for the metadata to arrive - final event = await events.waitFor( + // Wait for the metadata to arrive (using internal event) + final event = await events.waitFor( filter: (event) => event.participant == this && event.publication.sid == trackSid, duration: room.connectOptions.timeouts.publish, onTimeout: () => throw TrackSubscriptionExceptionEvent( @@ -273,11 +273,18 @@ class RemoteParticipant extends Participant { // Emit events for new publications for (final pub in newPubs) { - final event = TrackPublishedEvent( + // Always emit internal event (for addSubscribedMediaTrack) + events.emit(InternalTrackPublishedEvent( participant: this, publication: pub, - ); + )); + + // Only emit public event when connected (for apps) if (room.connectionState == ConnectionState.connected) { + final event = TrackPublishedEvent( + participant: this, + publication: pub, + ); [events, room.events].emit(event); } }