Skip to content

Commit 8d90eac

Browse files
authored
Merge pull request #181 from GetStream/participant-list
Participant list
2 parents 570619b + b3a5cd4 commit 8d90eac

File tree

4 files changed

+583
-7
lines changed

4 files changed

+583
-7
lines changed

getstream/video/rtc/connection_manager.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,12 @@ async def _connect_internal(
306306
self._ws_client.on_event("ice_trickle", self._on_ice_trickle)
307307

308308
# Connect track subscription events to subscription manager
309+
self._ws_client.on_event(
310+
"participant_joined", self.participants_state._on_participant_joined
311+
)
312+
self._ws_client.on_event(
313+
"participant_left", self.participants_state._on_participant_left
314+
)
309315
self._ws_client.on_event(
310316
"track_published", self._subscription_manager.handle_track_published
311317
)
@@ -321,7 +327,7 @@ async def _connect_internal(
321327
# Populate participants state with existing participants
322328
if hasattr(sfu_event.join_response, "call_state"):
323329
for participant in sfu_event.join_response.call_state.participants:
324-
self._participants_state.add_participant(participant)
330+
self._participants_state._add_participant(participant)
325331
# Update reconnection config
326332
if hasattr(sfu_event.join_response, "fast_reconnect_deadline_seconds"):
327333
self._reconnector._fast_reconnect_deadline_seconds = (

getstream/video/rtc/participants.py

Lines changed: 102 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
from typing import Optional
1+
from typing import Optional, Callable, List
2+
import inspect
3+
import weakref
24

35
from pyee.asyncio import AsyncIOEventEmitter
46

@@ -17,6 +19,7 @@ def __init__(self):
1719
super().__init__()
1820
self._participant_by_prefix = {}
1921
self._track_stream_mapping = {}
22+
self._map_handlers = [] # List of weak references to handler functions
2023

2124
def get_user_from_track_id(self, track_id: str) -> Optional[models_pb2.Participant]:
2225
# Track IDs have format: participant_id:track_type:...
@@ -43,17 +46,111 @@ def set_track_stream_mapping(self, mapping: dict):
4346
logger.debug(f"Setting track stream mapping: {mapping}")
4447
self._track_stream_mapping = mapping
4548

46-
def add_participant(self, participant: models_pb2.Participant):
49+
def _add_participant(self, participant: models_pb2.Participant):
4750
self._participant_by_prefix[participant.track_lookup_prefix] = participant
51+
self._notify_map_handlers()
4852

49-
def remove_participant(self, participant: models_pb2.Participant):
53+
def _remove_participant(self, participant: models_pb2.Participant):
5054
if participant.track_lookup_prefix in self._participant_by_prefix:
5155
del self._participant_by_prefix[participant.track_lookup_prefix]
56+
self._notify_map_handlers()
57+
58+
def get_participants(self) -> List[models_pb2.Participant]:
59+
"""Get the current list of participants."""
60+
return list(self._participant_by_prefix.values())
61+
62+
def map(self, handler: Callable[[List[models_pb2.Participant]], None]):
63+
"""
64+
Subscribe to participant list changes. The handler is called immediately
65+
with the current list and whenever participants are added or removed.
66+
67+
The handler is stored as a weak reference, so it will be automatically
68+
cleaned up when no other references to it exist.
69+
70+
Args:
71+
handler: A function that takes a list of participants
72+
73+
Returns:
74+
A subscription object that can be used to unsubscribe (optional)
75+
76+
Example:
77+
>>> state = ParticipantsState()
78+
>>> def on_participants(participants):
79+
... print(f"Participants: {len(participants)}")
80+
>>> subscription = state.map(on_participants)
81+
Participants: 0
82+
"""
83+
84+
# Create a weak reference to the handler
85+
# Use a callback to remove dead references when they're collected
86+
def cleanup_callback(ref):
87+
try:
88+
# Remove this weak reference from the handlers list
89+
self._map_handlers.remove(ref)
90+
except (ValueError, AttributeError):
91+
pass
92+
93+
# Create a weak reference to the handler
94+
# The Subscription object will hold a strong reference to the handler
95+
# to prevent it from being garbage collected (important for inline lambdas)
96+
# Use WeakMethod for bound methods, ref for other callables
97+
if inspect.ismethod(handler) or (
98+
hasattr(handler, "__self__") and hasattr(handler, "__func__")
99+
):
100+
handler_ref = weakref.WeakMethod(handler, cleanup_callback)
101+
else:
102+
handler_ref = weakref.ref(handler, cleanup_callback)
103+
self._map_handlers.append(handler_ref)
104+
105+
# Call handler immediately with current list
106+
try:
107+
handler_fn = handler_ref()
108+
if handler_fn:
109+
handler_fn(self.get_participants())
110+
except Exception as e:
111+
logger.error(f"Error calling map handler: {e}")
112+
113+
# Return a simple subscription object
114+
class Subscription:
115+
def __init__(self, handlers_list, handler_ref, handler_to_keep_alive):
116+
self._handlers_list = handlers_list
117+
self._handler_ref = handler_ref
118+
# Keep handler alive with a strong reference
119+
# This is the key: the Subscription holds the only strong reference
120+
# to the handler, so when the Subscription is garbage collected,
121+
# the handler is also garbage collected and removed from _map_handlers
122+
self._handler = handler_to_keep_alive
123+
124+
def unsubscribe(self):
125+
try:
126+
self._handlers_list.remove(self._handler_ref)
127+
except (ValueError, AttributeError):
128+
pass
129+
130+
return Subscription(self._map_handlers, handler_ref, handler)
131+
132+
def _notify_map_handlers(self):
133+
"""Notify all map handlers about participant list changes."""
134+
participants = self.get_participants()
135+
136+
# Clean up dead references and call active handlers
137+
active_handlers = []
138+
for handler_ref in self._map_handlers:
139+
handler = handler_ref()
140+
if handler is not None:
141+
active_handlers.append(handler_ref)
142+
try:
143+
handler(participants)
144+
except Exception as e:
145+
logger.error(f"Error calling map handler: {e}")
146+
147+
# Update list to only include active handlers
148+
self._map_handlers[:] = active_handlers
52149

53150
async def _on_participant_joined(self, event: events_pb2.ParticipantJoined):
54-
self.add_participant(event.participant)
151+
self._add_participant(event.participant)
55152
self.emit("participant_joined", event.participant)
56153

57154
async def _on_participant_left(self, event: events_pb2.ParticipantLeft):
58-
self.remove_participant(event.participant)
155+
self._remove_participant(event.participant)
59156
self.emit("participant_left", event.participant)

getstream/video/rtc/tracks.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ async def handle_track_published(self, event):
204204
try:
205205
# Keep participants state up-to-date
206206
if hasattr(event, "participant"):
207-
self.connection_manager.participants_state.add_participant(
207+
self.connection_manager.participants_state._add_participant(
208208
event.participant
209209
)
210210

0 commit comments

Comments
 (0)