Skip to content

Commit ba5d58b

Browse files
authored
Re-emit track_published events for already published tracks after connecting to SFU (#203)
1 parent eaee60d commit ba5d58b

File tree

1 file changed

+43
-0
lines changed

1 file changed

+43
-0
lines changed

getstream/video/rtc/connection_manager.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,9 @@ async def _connect_internal(
322322
# Connect subscriber offer event to handle SDP negotiation
323323
self._ws_client.on_event("subscriber_offer", self._on_subscriber_offer)
324324

325+
# Re-emit the events so they can be subscribed to on the ConnectionManager
326+
self._ws_client.on_wildcard("*", self.emit)
327+
325328
if hasattr(sfu_event, "join_response"):
326329
logger.debug(f"sfu join response: {sfu_event.join_response}")
327330
# Populate participants state with existing participants
@@ -530,3 +533,43 @@ async def _restore_published_tracks(self):
530533
await self._peer_manager.restore_published_tracks()
531534
except Exception as e:
532535
logger.error("Failed to restore published tracks", exc_info=e)
536+
537+
async def republish_tracks(self) -> None:
538+
"""
539+
Use the participants info from the SFU to re-emit the "track_published"
540+
events for the already published tracks.
541+
542+
It's needed because SFU does not send the events for the already present tracks when the
543+
agent joins after the user.
544+
"""
545+
546+
if not self._ws_client:
547+
return None
548+
549+
participants = self.participants_state.get_participants()
550+
551+
for participant in participants:
552+
# Skip the tracks belonging to this connection
553+
if participant.session_id == self.session_id:
554+
continue
555+
556+
for track_type_int in participant.published_tracks:
557+
event = events_pb2.TrackPublished(
558+
user_id=participant.user_id,
559+
session_id=participant.session_id,
560+
participant=participant,
561+
type=track_type_int,
562+
)
563+
try:
564+
# Update track subscriptions first
565+
await self._subscription_manager.handle_track_published(event)
566+
# Emit the event downstream
567+
self.emit("track_published", event)
568+
except Exception:
569+
logger.exception(
570+
f"Failed to emit track_published event "
571+
f"for the already published "
572+
f"track {participant.user_id}:{participant.session_id}:{track_type_int}"
573+
)
574+
575+
return None

0 commit comments

Comments
 (0)