Skip to content

Commit 8b3c61a

Browse files
committed
clean up resources when track cancelled
1 parent d8e08cb commit 8b3c61a

File tree

1 file changed

+148
-121
lines changed
  • agents-core/vision_agents/core/agents

1 file changed

+148
-121
lines changed

agents-core/vision_agents/core/agents/agents.py

Lines changed: 148 additions & 121 deletions
Original file line numberDiff line numberDiff line change
@@ -768,139 +768,166 @@ async def _switch_to_next_available_track(self) -> None:
768768
self.logger.warning("🎥 No suitable video tracks found")
769769

770770
async def _process_track(self, track_id: str, track_type: int, participant):
771-
# TODO: handle CancelledError
772-
# we only process video tracks (camera video or screenshare)
773-
if track_type not in (TrackType.TRACK_TYPE_VIDEO, TrackType.TRACK_TYPE_SCREEN_SHARE):
774-
return
775-
776-
# subscribe to the video track
777-
track = self.edge.add_track_subscriber(track_id)
778-
if not track:
779-
self.logger.error(f"Failed to subscribe to {track_id}")
780-
return
771+
raw_forwarder = None
772+
processed_forwarder = None
773+
774+
try:
775+
# we only process video tracks (camera video or screenshare)
776+
if track_type not in (TrackType.TRACK_TYPE_VIDEO, TrackType.TRACK_TYPE_SCREEN_SHARE):
777+
return
781778

782-
# Wrap screenshare tracks to ensure even dimensions for H.264 encoding
783-
if track_type == TrackType.TRACK_TYPE_SCREEN_SHARE:
784-
class _EvenDimensionsTrack(VideoStreamTrack):
785-
def __init__(self, src):
786-
super().__init__()
787-
self.src = src
788-
async def recv(self):
789-
return ensure_even_dimensions(await self.src.recv())
790-
791-
track = _EvenDimensionsTrack(track) # type: ignore[arg-type]
792-
793-
# Create a SHARED VideoForwarder for the RAW incoming track
794-
# This prevents multiple recv() calls competing on the same track
795-
raw_forwarder = VideoForwarder(
796-
track, # type: ignore[arg-type]
797-
max_buffer=30,
798-
fps=30, # Max FPS for the producer (individual consumers can throttle down)
799-
name=f"raw_video_forwarder_{track_id}",
800-
)
801-
await raw_forwarder.start()
802-
self.logger.info("🎥 Created raw VideoForwarder for track %s", track_id)
779+
# subscribe to the video track
780+
track = self.edge.add_track_subscriber(track_id)
781+
if not track:
782+
self.logger.error(f"Failed to subscribe to {track_id}")
783+
return
803784

804-
# Track forwarders for cleanup
805-
if not hasattr(self, "_video_forwarders"):
806-
self._video_forwarders = []
807-
self._video_forwarders.append(raw_forwarder)
785+
# Wrap screenshare tracks to ensure even dimensions for H.264 encoding
786+
if track_type == TrackType.TRACK_TYPE_SCREEN_SHARE:
787+
class _EvenDimensionsTrack(VideoStreamTrack):
788+
def __init__(self, src):
789+
super().__init__()
790+
self.src = src
791+
async def recv(self):
792+
return ensure_even_dimensions(await self.src.recv())
793+
794+
track = _EvenDimensionsTrack(track) # type: ignore[arg-type]
795+
796+
# Create a SHARED VideoForwarder for the RAW incoming track
797+
# This prevents multiple recv() calls competing on the same track
798+
raw_forwarder = VideoForwarder(
799+
track, # type: ignore[arg-type]
800+
max_buffer=30,
801+
fps=30, # Max FPS for the producer (individual consumers can throttle down)
802+
name=f"raw_video_forwarder_{track_id}",
803+
)
804+
await raw_forwarder.start()
805+
self.logger.info("🎥 Created raw VideoForwarder for track %s", track_id)
808806

809-
# Store track metadata
810-
self._active_video_tracks[track_id] = (track_type, participant, raw_forwarder)
807+
# Track forwarders for cleanup
808+
if not hasattr(self, "_video_forwarders"):
809+
self._video_forwarders = []
810+
self._video_forwarders.append(raw_forwarder)
811811

812-
# If Realtime provider supports video, switch to this new track
813-
track_type_name = TrackType.Name(track_type)
814-
815-
if self.realtime_mode:
816-
if self._video_track:
817-
# We have a video publisher (e.g., YOLO processor)
818-
# Create a separate forwarder for the PROCESSED video track
819-
self.logger.info(
820-
"🎥 Forwarding PROCESSED video frames to Realtime provider"
821-
)
822-
processed_forwarder = VideoForwarder(
823-
self._video_track, # type: ignore[arg-type]
824-
max_buffer=30,
825-
fps=30,
826-
name=f"processed_video_forwarder_{track_id}",
827-
)
828-
await processed_forwarder.start()
829-
self._video_forwarders.append(processed_forwarder)
812+
# Store track metadata
813+
self._active_video_tracks[track_id] = (track_type, participant, raw_forwarder)
830814

831-
if isinstance(self.llm, Realtime):
832-
# Send PROCESSED frames with the processed forwarder
833-
await self.llm._watch_video_track(
834-
self._video_track, shared_forwarder=processed_forwarder
815+
# If Realtime provider supports video, switch to this new track
816+
track_type_name = TrackType.Name(track_type)
817+
818+
if self.realtime_mode:
819+
if self._video_track:
820+
# We have a video publisher (e.g., YOLO processor)
821+
# Create a separate forwarder for the PROCESSED video track
822+
self.logger.info(
823+
"🎥 Forwarding PROCESSED video frames to Realtime provider"
835824
)
836-
self._current_video_track_id = track_id
837-
else:
838-
# No video publisher, send raw frames - switch to this new track
839-
self.logger.info(f"🎥 Switching to {track_type_name} track: {track_id}")
840-
if isinstance(self.llm, Realtime):
841-
await self.llm._watch_video_track(
842-
track, shared_forwarder=raw_forwarder
825+
processed_forwarder = VideoForwarder(
826+
self._video_track, # type: ignore[arg-type]
827+
max_buffer=30,
828+
fps=30,
829+
name=f"processed_video_forwarder_{track_id}",
843830
)
844-
self._current_video_track_id = track_id
845-
846-
hasImageProcessers = len(self.image_processors) > 0
847-
848-
# video processors - pass the raw forwarder (they process incoming frames)
849-
for processor in self.video_processors:
850-
try:
851-
await processor.process_video(
852-
track, participant.user_id, shared_forwarder=raw_forwarder
853-
)
854-
except Exception as e:
855-
self.logger.error(
856-
f"Error in video processor {type(processor).__name__}: {e}"
857-
)
858-
859-
# Use raw forwarder for image processors - only if there are image processors
860-
if not hasImageProcessers:
861-
# No image processors, just keep the connection alive
862-
self.logger.info(
863-
"No image processors, video processing handled by video processors only"
864-
)
865-
return
866-
867-
# Initialize error tracking counters
868-
timeout_errors = 0
869-
consecutive_errors = 0
870-
871-
while True:
872-
try:
873-
# Use the raw forwarder instead of competing for track.recv()
874-
video_frame = await raw_forwarder.next_frame(timeout=2.0)
831+
await processed_forwarder.start()
832+
self._video_forwarders.append(processed_forwarder)
833+
834+
if isinstance(self.llm, Realtime):
835+
# Send PROCESSED frames with the processed forwarder
836+
await self.llm._watch_video_track(
837+
self._video_track, shared_forwarder=processed_forwarder
838+
)
839+
self._current_video_track_id = track_id
840+
else:
841+
# No video publisher, send raw frames - switch to this new track
842+
self.logger.info(f"🎥 Switching to {track_type_name} track: {track_id}")
843+
if isinstance(self.llm, Realtime):
844+
await self.llm._watch_video_track(
845+
track, shared_forwarder=raw_forwarder
846+
)
847+
self._current_video_track_id = track_id
875848

876-
if video_frame:
877-
# Reset error counts on successful frame processing
878-
timeout_errors = 0
879-
consecutive_errors = 0
849+
hasImageProcessers = len(self.image_processors) > 0
880850

881-
if hasImageProcessers:
882-
img = video_frame.to_image()
851+
# video processors - pass the raw forwarder (they process incoming frames)
852+
for processor in self.video_processors:
853+
try:
854+
await processor.process_video(
855+
track, participant.user_id, shared_forwarder=raw_forwarder
856+
)
857+
except Exception as e:
858+
self.logger.error(
859+
f"Error in video processor {type(processor).__name__}: {e}"
860+
)
883861

884-
for processor in self.image_processors:
885-
try:
886-
await processor.process_image(img, participant.user_id)
887-
except Exception as e:
888-
self.logger.error(
889-
f"Error in image processor {type(processor).__name__}: {e}"
890-
)
862+
# Use raw forwarder for image processors - only if there are image processors
863+
if not hasImageProcessers:
864+
# No image processors, just keep the connection alive
865+
self.logger.info(
866+
"No image processors, video processing handled by video processors only"
867+
)
868+
return
891869

892-
else:
893-
self.logger.warning("🎥VDP: Received empty frame")
894-
consecutive_errors += 1
870+
# Initialize error tracking counters
871+
timeout_errors = 0
872+
consecutive_errors = 0
895873

896-
except asyncio.TimeoutError:
897-
# Exponential backoff for timeout errors
898-
timeout_errors += 1
899-
backoff_delay = min(2.0 ** min(timeout_errors, 5), 30.0)
900-
self.logger.debug(
901-
f"🎥VDP: Applying backoff delay: {backoff_delay:.1f}s"
902-
)
903-
await asyncio.sleep(backoff_delay)
874+
while True:
875+
try:
876+
# Use the raw forwarder instead of competing for track.recv()
877+
video_frame = await raw_forwarder.next_frame(timeout=2.0)
878+
879+
if video_frame:
880+
# Reset error counts on successful frame processing
881+
timeout_errors = 0
882+
consecutive_errors = 0
883+
884+
if hasImageProcessers:
885+
img = video_frame.to_image()
886+
887+
for processor in self.image_processors:
888+
try:
889+
await processor.process_image(img, participant.user_id)
890+
except Exception as e:
891+
self.logger.error(
892+
f"Error in image processor {type(processor).__name__}: {e}"
893+
)
894+
895+
else:
896+
self.logger.warning("🎥VDP: Received empty frame")
897+
consecutive_errors += 1
898+
899+
except asyncio.TimeoutError:
900+
# Exponential backoff for timeout errors
901+
timeout_errors += 1
902+
backoff_delay = min(2.0 ** min(timeout_errors, 5), 30.0)
903+
self.logger.debug(
904+
f"🎥VDP: Applying backoff delay: {backoff_delay:.1f}s"
905+
)
906+
await asyncio.sleep(backoff_delay)
907+
except asyncio.CancelledError:
908+
# Task was cancelled (e.g., track removed)
909+
# Clean up forwarders that were created for this track
910+
self.logger.debug(f"🎥 Cleaning up forwarders for cancelled track {track_id}")
911+
912+
# Stop and remove the raw forwarder if it was created
913+
if raw_forwarder is not None and hasattr(self, '_video_forwarders'):
914+
if raw_forwarder in self._video_forwarders:
915+
try:
916+
await raw_forwarder.stop()
917+
self._video_forwarders.remove(raw_forwarder)
918+
except Exception as e:
919+
self.logger.error(f"Error stopping raw forwarder: {e}")
920+
921+
# Stop and remove processed forwarder if it was created
922+
if processed_forwarder is not None and hasattr(self, '_video_forwarders'):
923+
if processed_forwarder in self._video_forwarders:
924+
try:
925+
await processed_forwarder.stop()
926+
self._video_forwarders.remove(processed_forwarder)
927+
except Exception as e:
928+
self.logger.error(f"Error stopping processed forwarder: {e}")
929+
930+
return
904931

905932
async def _on_turn_event(self, event: TurnStartedEvent | TurnEndedEvent) -> None:
906933
"""Handle turn detection events."""

0 commit comments

Comments
 (0)