-
Notifications
You must be signed in to change notification settings - Fork 72
[AI-245] - Low latency for OpenAI realtime #145
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughLarge-scale refactor consolidating audio handling via PCM data structures, introducing agent lifecycle management with async warmup hooks across multiple base classes, adding a new CLI framework with agent launcher, creating queued audio/video track abstractions, implementing non-blocking I/O patterns in plugins, and removing deprecated non-blocking test assertions. Changes
Sequence Diagram(s)sequenceDiagram
participant User
participant CLI as cli()
participant Launcher as AgentLauncher
participant Components as Agent Components<br/>(LLM, TTS, STT, VD)
participant Agent as Agent
User->>CLI: invoke cli(launcher)
CLI->>CLI: parse options (debug, log-level, etc.)
CLI->>Launcher: warmup(**kwargs)
activate Launcher
Launcher->>Launcher: create agent via create_agent
Launcher->>Components: warmup() [concurrently]
activate Components
Components-->>Launcher: warmup complete
deactivate Components
Launcher->>Launcher: mark _warmed_up
deactivate Launcher
Launcher-->>CLI: warmup done
CLI->>Launcher: launch(**kwargs)
activate Launcher
Launcher->>Launcher: await warmup (noop if already done)
Launcher->>Launcher: return agent
deactivate Launcher
CLI->>Agent: await join_call()
activate Agent
Agent-->>CLI: call joined
deactivate Agent
CLI->>CLI: run agent event loop
CLI-->>User: agent ready
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60–90 minutes Areas requiring extra attention:
Possibly related PRs
Suggested labels
Suggested reviewers
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 inconclusive)
✅ Passed checks (3 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 9
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (6)
plugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py (1)
309-314: Rename parameterframeratetosample_ratefor consistency and clarity.The function parameter is named
frameratebut is passed assample_ratetoAudioStreamTrack. Audio terminology uses "sample rate" (samples per second), not "framerate" (which typically refers to video frames per second). This inconsistency may confuse developers.Additionally, consider replacing
stereo and 2 or 1with the more readable ternary operator2 if stereo else 1.Apply this diff to improve naming and readability:
def create_audio_track( - self, framerate: int = 48000, stereo: bool = True + self, sample_rate: int = 48000, stereo: bool = True ) -> OutputAudioTrack: return audio_track.AudioStreamTrack( - sample_rate=framerate, channels=stereo and 2 or 1 + sample_rate=sample_rate, channels=2 if stereo else 1 ) # default to webrtc framerateplugins/gemini/vision_agents/plugins/gemini/gemini_llm.py (1)
147-220: Streaming regression: chunk emission now waits for full response.By collecting the entire iterator inside
_iterate_stream_blocking(Lines 147-205) and only processing chunks afterasyncio.to_threadreturns, we no longer emitGeminiResponseEvent/LLMResponseChunkEventincrementally. The caller now sees nothing until the full response completes (and the follow-up loop repeats the pattern), which defeats the low-latency streaming requirement this PR targets. Please restore true incremental delivery—e.g., push each chunk from the worker thread through anasyncio.Queue(or similar) so the async loop can emit events as soon as each chunk arrives.plugins/aws/vision_agents/plugins/aws/aws_llm.py (2)
227-273: Don't block the event loop in follow-up Bedrock calls.Line 227 and Line 272 still invoke
client.conversedirectly, undoing the earlier offloading and freezing the event loop on every follow-up/final turn. Please keep these calls on a worker thread.Apply this diff:
- follow_up_response = client.converse(**follow_up_kwargs) + follow_up_response = await asyncio.to_thread( + client.converse, **follow_up_kwargs + ) ... - final_response = client.converse(**final_kwargs) + final_response = await asyncio.to_thread( + client.converse, **final_kwargs + )
478-488: Stream follow-ups must also leave the event loop.Line 478 currently calls
client.converse_stream(and iterates its stream) on the event loop thread, reintroducing the same blocking behavior you just removed. Offload the entire stream consumption to a worker thread as well.Apply this diff:
- follow_up_response = client.converse_stream(**follow_up_kwargs) - - accumulated_calls = [] - follow_up_text_parts: List[str] = [] - follow_up_stream = follow_up_response.get("stream") - for event in follow_up_stream: + def _consume_follow_up_stream(): + response = client.converse_stream(**follow_up_kwargs) + stream = response.get("stream") + events = [] + if stream: + for event in stream: + events.append(event) + return response, events + + follow_up_response, follow_up_events = await asyncio.to_thread( + _consume_follow_up_stream + ) + + accumulated_calls = [] + follow_up_text_parts: List[str] = [] + for event in follow_up_events:agents-core/vision_agents/core/tts/events.py (1)
58-58: Defaultingis_recoverable=Truemay be optimistic.Most TTS errors are likely non-recoverable (network failures, invalid API keys, quota exceeded). Consider defaulting to
Falsefor safer error handling, requiring explicit opt-in for recoverable cases.Apply this diff to use a safer default:
- is_recoverable: bool = True + is_recoverable: bool = Falseplugins/openai/vision_agents/plugins/openai/rtc_manager.py (1)
229-254: Missing null check forshared_forwarder.Line 247 uses
shared_forwarderwithout checking if it's None. The parameter is required but not type-hinted as such, and there's no validation.Add validation or update type hints:
async def start_video_sender( - self, stream_video_track: MediaStreamTrack, fps: int = 1, shared_forwarder=None + self, stream_video_track: MediaStreamTrack, fps: int = 1, shared_forwarder: "VideoForwarder" ) -> None: """Replace dummy video track with the actual Stream Video forwarding track. Args: stream_video_track: Video track to forward to OpenAI. fps: Target frames per second. - shared_forwarder: Optional shared VideoForwarder to use instead of creating a new one. + shared_forwarder: Shared VideoForwarder instance for frame forwarding. """ if not self.send_video: logger.error("❌ Video sending not enabled for this session") raise RuntimeError("Video sending not enabled for this session") + if shared_forwarder is None: + raise ValueError("shared_forwarder is required for video sending") + # Create forwarding track and start its forwarder await shared_forwarder.start_event_consumer(Or if None should be allowed, add proper handling for that case.
🧹 Nitpick comments (18)
.gitignore (1)
6-6: Move.cursor/pattern to the Editors/IDEs section for better organization.The
.cursor/*pattern is being added to the "Byte-compiled / optimized / DLL files" section, but.cursor/is an IDE cache directory (Cursor editor/VS Code) and should be grouped with other editor/IDE ignores like.vscode/and.idea/(around line 67-68).Additionally, the idiomatic
.gitignorepattern for directories uses a trailing slash:.cursor/instead of.cursor/*. This explicitly signals to Git that the intent is to ignore a directory.Apply this diff to relocate and refactor the pattern:
# Byte-compiled / optimized / DLL files __pycache__/ *.py[cod] *$py.class *.so -.cursor/* # Distribution / packagingThen add it to the Editors/IDEs section:
# Editors / IDEs .vscode/ .idea/ +.cursor/agents-core/pyproject.toml (1)
90-92: Clarify the aiortc comment reference.Line 93 references
aiortcin the comment, but this configuration section is forgetstream. This appears to be a leftover or copy-paste artifact from earlier development iterations.Consider updating line 93 to accurately reflect the getstream development configuration, e.g.:
# getstream = { git = "https://github.com/GetStream/stream-py.git", branch = "audio-more" } # for local development # getstream = { path = "../../stream-py/", editable = true } -# aiortc = { path = "../stream-py/", editable = true } +# getstream = { path = "../stream-py/", editable = true }Alternatively, if aiortc is intentionally separate, add clarifying context.
plugins/ultralytics/tests/test_ultralytics.py (1)
46-47: Consider moving the asyncio import to module level.The import works correctly here, but Python convention is to place imports at the top of the file alongside the other module-level imports (lines 5-14).
Apply this diff to move the import:
from pathlib import Path from typing import Iterator +import asyncio import numpy as np import pytest from PIL import ImageAnd remove from the function:
async def test_annotated_image_output(self, golf_image: Image.Image, pose_processor: YOLOPoseProcessor): - import asyncio - image_with_pose, pose = await pose_processor.add_pose_to_image(image=golf_image)plugins/gemini/vision_agents/plugins/gemini/gemini_llm.py (1)
88-101: Docstring needs Google-style sections.Project guidelines require Google-style docstrings. Please add the
Args/Returnssections for_iterate_stream_blockingso it complies.PRODUCTION.md (1)
1-20: Document is marked WIP and lacks actionable content.The production deployment guide is currently a skeleton outline with no specific implementation details, security considerations, monitoring strategies, or actual deployment guides. While the structure is reasonable, the content needs substantial expansion before it can guide production deployments.
Consider adding:
- Specific security requirements (authentication, rate limiting, secrets management)
- Monitoring and observability setup
- Error handling and retry strategies
- Scaling considerations and resource limits
- Actual deployment guides with code examples for each platform
Since this is marked WIP, these additions can be deferred to future work.
plugins/fish/tests/test_fish_stt.py (1)
26-26: Remove trailing whitespace.Line 26 has trailing whitespace after
STTSession(stt). Trailing spaces can cause unnecessary diffs and may be flagged by linters.- session = STTSession(stt) + session = STTSession(stt)docs/ai/instructions/ai-utils.md (1)
24-29: Add language identifier to fenced code block.The fenced code block starting at line 27 is missing a language identifier, which impacts syntax highlighting and markdown rendering.
-``` +```python PcmData.from_responseThis addresses the markdownlint warning (MD040, fenced-code-language). </blockquote></details> <details> <summary>plugins/wizper/vision_agents/plugins/wizper/stt.py (1)</summary><blockquote> `129-131`: **Consider using aiofiles for consistency.** File creation and writing use `aiofiles`, but cleanup uses `asyncio.to_thread(os.unlink, ...)`. While both are non-blocking, using `aiofiles.os.remove()` would provide consistency with the file creation approach. ```diff - await asyncio.to_thread(os.unlink, temp_file_path) + await aiofiles.os.remove(temp_file_path)This maintains consistency with the async file I/O pattern used for file creation.
plugins/aws/tests/test_aws_realtime.py (1)
60-80: Consider adding a None guard when appending audio data.Line 65 appends
event.datawithout checking if it's None. While the test may work as-is, adding a guard would make the test more robust.Apply this diff to add defensive None checking:
@realtime.events.subscribe async def on_audio(event: RealtimeAudioOutputEvent): events.append(event) - pcm.append(event.data) + if event.data: + pcm.append(event.data)tests/test_audio_forwarder.py (1)
53-76: Test name suggests "restart" but doesn't test it.The test
test_audio_forwarder_stop_and_restartonly verifies stop behavior. Consider renaming totest_audio_forwarder_stops_cleanlyor add actual restart verification if that's a desired behavior.plugins/ultralytics/vision_agents/plugins/ultralytics/yolo_pose_processor.py (1)
40-50: Consider using QueuedVideoTrack directly.Since
YOLOPoseVideoTracknow has no custom behavior, you could useQueuedVideoTrackdirectly throughout the code instead of maintaining an empty subclass. This would reduce indirection unless you anticipate adding custom behavior in the future.If no customization is planned, apply this refactor at line 97:
- self._video_track: YOLOPoseVideoTrack = YOLOPoseVideoTrack() + self._video_track: QueuedVideoTrack = QueuedVideoTrack()And remove the empty class definition:
-class YOLOPoseVideoTrack(QueuedVideoTrack): - """ - The track has a async recv() method which is called repeatedly. - The recv method should wait for FPS interval before providing the next frame... - - Queuing behaviour is where it gets a little tricky. - - Ideally we'd do frame.to_ndarray -> process -> from.from_ndarray and skip image conversion - """ - - pass -plugins/smart_turn/vision_agents/plugins/smart_turn/smart_turn_detection.py (1)
400-412: Consider async file read when model_bytes is None.The new
model_bytesparameter improves multi-worker scenarios, but whenmodel_bytesisNone, the file read at lines 411-412 still blocks. SinceSileroVAD.__init__is called viaasyncio.to_threadat line 148, this is mitigated. However, for API consistency, consider documenting that callers should useasyncio.to_threadfor initialization, or make the constructor require pre-loaded bytes.Consider adding a docstring note:
def __init__(self, model_path: str, model_bytes: Optional[bytes] = None, reset_interval_seconds: float = 5.0): """ Initialize Silero VAD. Args: model_path: Path to the ONNX model file (used only if model_bytes is None) model_bytes: Optional pre-loaded model file contents to avoid blocking I/O reset_interval_seconds: Reset internal state every N seconds to prevent drift + + Note: + When model_bytes is None, this constructor performs blocking file I/O. + Call from asyncio.to_thread to avoid blocking the event loop. """agents-core/vision_agents/core/tts/manual_test.py (1)
123-124: Refactor list comprehension used for side effects.Using a list comprehension for its side effects (append operations) is an anti-pattern. The created list is discarded, making the intent unclear.
Apply this diff:
pcm = result.speeches[0] - [pcm.append(p) for p in result.speeches[1:]] + for p in result.speeches[1:]: + pcm.append(p)agents-core/vision_agents/core/tts/events.py (1)
60-62: Consider enrichingerror_messagewith error code.The property could provide more context by including
error_codewhen available.@property def error_message(self) -> str: - return str(self.error) if self.error else "Unknown error" + parts = [] + if self.error_code: + parts.append(f"[{self.error_code}]") + if self.error: + parts.append(str(self.error)) + return " ".join(parts) if parts else "Unknown error"plugins/openai/vision_agents/plugins/openai/rtc_manager.py (1)
150-156: Consider task tracking for event handlers.The
asyncio.create_taskon line 154 creates tasks without tracking them. If many messages arrive and handlers error out, this could lead to unobserved task exceptions. Consider adding tasks to a set for lifecycle management.# In __init__: self._event_tasks: set[asyncio.Task] = set() # In on_message: @self.data_channel.on("message") def on_message(message): try: data = json.loads(message) task = asyncio.create_task(self._handle_event(data)) self._event_tasks.add(task) task.add_done_callback(self._event_tasks.discard) except json.JSONDecodeError as e: logger.error(f"Failed to decode message: {e}") # In close(): for task in self._event_tasks: task.cancel() await asyncio.gather(*self._event_tasks, return_exceptions=True)agents-core/vision_agents/core/agents/agent_launcher.py (2)
12-16: Placeholder class needs more detail.The
AgentProcessclass is empty with a vague docstring. Either implement it now or convert it to a more detailed placeholder that explains what information it will contain (PID, warmup status, etc.).class AgentProcess: """ - Add info here about the thread/process. Enabling warm up to work well in a multiprocess env + Placeholder for future multi-process warmup support. + + Will track process/thread metadata including: + - Process ID + - Warmup completion status + - Resource utilization + - Inter-process communication channels + + TODO: Implement when adding multi-process agent support. """ pass
44-101: Consider error handling in parallel warmup.Line 98 uses
asyncio.gatherwithoutreturn_exceptions=True. If any component's warmup fails, the entire warmup process will fail. Depending on requirements, you might want to:
- Continue even if some warmups fail (log errors but don't raise)
- Fail fast if any critical component fails
- Distinguish between critical and optional component warmup
For more resilient warmup:
# Run all warmups in parallel if warmup_tasks: - await asyncio.gather(*warmup_tasks) + results = await asyncio.gather(*warmup_tasks, return_exceptions=True) + # Log any warmup failures + for i, result in enumerate(results): + if isinstance(result, Exception): + logger.warning(f"Component warmup failed: {result}")Or for strict error handling with better context:
# Run all warmups in parallel if warmup_tasks: - await asyncio.gather(*warmup_tasks) + try: + await asyncio.gather(*warmup_tasks) + except Exception as e: + logger.error(f"Agent warmup failed: {e}") + self._agent = None + self._warmed_up = False + raisetests/test_queued_video_track.py (1)
177-201: Consider marking real video test as integration.The
test_queued_video_track_with_real_videotest uses an actual video file (bunny_3s.mp4) which makes it more of an integration test. Consider marking it with@pytest.mark.integrationas per the coding guidelines.As per coding guidelines:
+ @pytest.mark.integration async def test_queued_video_track_with_real_video(self, bunny_video_track): """Test QueuedVideoTrack with frames from actual video."""
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
⛔ Files ignored due to path filters (1)
uv.lockis excluded by!**/*.lock
📒 Files selected for processing (65)
.gitignore(1 hunks)DEVELOPMENT.md(1 hunks)PRODUCTION.md(1 hunks)agents-core/pyproject.toml(2 hunks)agents-core/vision_agents/core/__init__.py(1 hunks)agents-core/vision_agents/core/agents/__init__.py(1 hunks)agents-core/vision_agents/core/agents/agent_launcher.py(1 hunks)agents-core/vision_agents/core/agents/agents.py(5 hunks)agents-core/vision_agents/core/cli/cli_runner.py(1 hunks)agents-core/vision_agents/core/edge/types.py(2 hunks)agents-core/vision_agents/core/events/manager.py(1 hunks)agents-core/vision_agents/core/llm/events.py(2 hunks)agents-core/vision_agents/core/llm/llm.py(8 hunks)agents-core/vision_agents/core/llm/realtime.py(2 hunks)agents-core/vision_agents/core/stt/stt.py(1 hunks)agents-core/vision_agents/core/tts/events.py(2 hunks)agents-core/vision_agents/core/tts/manual_test.py(2 hunks)agents-core/vision_agents/core/tts/testing.py(4 hunks)agents-core/vision_agents/core/tts/tts.py(3 hunks)agents-core/vision_agents/core/turn_detection/turn_detection.py(1 hunks)agents-core/vision_agents/core/utils/audio_forwarder.py(1 hunks)agents-core/vision_agents/core/utils/audio_track.py(1 hunks)agents-core/vision_agents/core/utils/utils.py(4 hunks)agents-core/vision_agents/core/utils/video_forwarder.py(4 hunks)agents-core/vision_agents/core/utils/video_track.py(1 hunks)conftest.py(3 hunks)docs/ai/instructions/ai-tests.md(0 hunks)docs/ai/instructions/ai-tts.md(0 hunks)docs/ai/instructions/ai-utils.md(1 hunks)docs/vad-refactor-plan-extended.md(0 hunks)docs/vad-refactor-plan.md(0 hunks)examples/other_examples/gemini_live_realtime/gemini_live_example.py(1 hunks)examples/other_examples/openai_realtime_webrtc/openai_realtime_example.py(3 hunks)plugins/anthropic/tests/test_anthropic_llm.py(1 hunks)plugins/aws/tests/test_aws.py(0 hunks)plugins/aws/tests/test_aws_realtime.py(4 hunks)plugins/aws/tests/test_tts.py(1 hunks)plugins/aws/vision_agents/plugins/aws/aws_llm.py(12 hunks)plugins/aws/vision_agents/plugins/aws/aws_realtime.py(4 hunks)plugins/aws/vision_agents/plugins/aws/tts.py(3 hunks)plugins/cartesia/tests/test_tts.py(1 hunks)plugins/elevenlabs/tests/test_tts.py(1 hunks)plugins/fish/tests/test_fish_stt.py(2 hunks)plugins/fish/tests/test_fish_tts.py(1 hunks)plugins/fish/vision_agents/plugins/fish/stt.py(3 hunks)plugins/gemini/tests/test_gemini_realtime.py(4 hunks)plugins/gemini/vision_agents/plugins/gemini/gemini_llm.py(12 hunks)plugins/gemini/vision_agents/plugins/gemini/gemini_realtime.py(4 hunks)plugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py(2 hunks)plugins/kokoro/tests/test_tts.py(1 hunks)plugins/openai/tests/test_tts_openai.py(1 hunks)plugins/openai/vision_agents/plugins/openai/openai_realtime.py(4 hunks)plugins/openai/vision_agents/plugins/openai/rtc_manager.py(9 hunks)plugins/smart_turn/tests/test_smart_turn.py(6 hunks)plugins/smart_turn/vision_agents/plugins/smart_turn/smart_turn_detection.py(2 hunks)plugins/ultralytics/tests/test_ultralytics.py(2 hunks)plugins/ultralytics/vision_agents/plugins/ultralytics/yolo_pose_processor.py(3 hunks)plugins/wizper/tests/test_wizper_stt.py(1 hunks)plugins/wizper/vision_agents/plugins/wizper/stt.py(3 hunks)pyproject.toml(3 hunks)pytest.ini(1 hunks)tests/test_audio_forwarder.py(1 hunks)tests/test_queued_video_track.py(1 hunks)tests/test_tts_base.py(2 hunks)tests/test_utils.py(3 hunks)
💤 Files with no reviewable changes (5)
- docs/vad-refactor-plan.md
- docs/vad-refactor-plan-extended.md
- docs/ai/instructions/ai-tests.md
- plugins/aws/tests/test_aws.py
- docs/ai/instructions/ai-tts.md
🧰 Additional context used
📓 Path-based instructions (2)
**/*.py
📄 CodeRabbit inference engine (.cursor/rules/python.mdc)
**/*.py: Do not modify sys.path in Python code
Docstrings must follow the Google style guide
Files:
plugins/fish/tests/test_fish_stt.pyagents-core/vision_agents/core/turn_detection/turn_detection.pyplugins/wizper/tests/test_wizper_stt.pyplugins/elevenlabs/tests/test_tts.pyagents-core/vision_agents/core/utils/audio_forwarder.pyagents-core/vision_agents/core/tts/testing.pyagents-core/vision_agents/core/tts/manual_test.pyagents-core/vision_agents/core/__init__.pyagents-core/vision_agents/core/agents/agent_launcher.pyplugins/aws/tests/test_tts.pyagents-core/vision_agents/core/events/manager.pyplugins/ultralytics/tests/test_ultralytics.pyagents-core/vision_agents/core/edge/types.pyagents-core/vision_agents/core/utils/utils.pyagents-core/vision_agents/core/agents/__init__.pyagents-core/vision_agents/core/utils/audio_track.pyplugins/wizper/vision_agents/plugins/wizper/stt.pyagents-core/vision_agents/core/stt/stt.pyplugins/cartesia/tests/test_tts.pyplugins/gemini/vision_agents/plugins/gemini/gemini_realtime.pyplugins/ultralytics/vision_agents/plugins/ultralytics/yolo_pose_processor.pyplugins/gemini/tests/test_gemini_realtime.pyplugins/aws/tests/test_aws_realtime.pyagents-core/vision_agents/core/agents/agents.pyplugins/anthropic/tests/test_anthropic_llm.pyagents-core/vision_agents/core/cli/cli_runner.pyexamples/other_examples/gemini_live_realtime/gemini_live_example.pyplugins/smart_turn/vision_agents/plugins/smart_turn/smart_turn_detection.pyplugins/fish/vision_agents/plugins/fish/stt.pyagents-core/vision_agents/core/utils/video_track.pyagents-core/vision_agents/core/llm/events.pyplugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.pyplugins/fish/tests/test_fish_tts.pyagents-core/vision_agents/core/tts/tts.pyplugins/aws/vision_agents/plugins/aws/aws_realtime.pytests/test_utils.pyconftest.pyagents-core/vision_agents/core/llm/realtime.pytests/test_queued_video_track.pyagents-core/vision_agents/core/tts/events.pyexamples/other_examples/openai_realtime_webrtc/openai_realtime_example.pyplugins/openai/tests/test_tts_openai.pyplugins/openai/vision_agents/plugins/openai/rtc_manager.pyplugins/aws/vision_agents/plugins/aws/aws_llm.pyplugins/smart_turn/tests/test_smart_turn.pyagents-core/vision_agents/core/llm/llm.pyplugins/aws/vision_agents/plugins/aws/tts.pyplugins/kokoro/tests/test_tts.pytests/test_tts_base.pyplugins/openai/vision_agents/plugins/openai/openai_realtime.pyplugins/gemini/vision_agents/plugins/gemini/gemini_llm.pyagents-core/vision_agents/core/utils/video_forwarder.pytests/test_audio_forwarder.py
tests/**/*.py
📄 CodeRabbit inference engine (.cursor/rules/python.mdc)
tests/**/*.py: Never use mocking utilities (e.g., unittest.mock, pytest-mock) in test files
Write tests using pytest (avoid unittest.TestCase or other frameworks)
Mark integration tests with @pytest.mark.integration
Do not use @pytest.mark.asyncio; async support is automatic
Files:
tests/test_utils.pytests/test_queued_video_track.pytests/test_tts_base.pytests/test_audio_forwarder.py
🧠 Learnings (3)
📚 Learning: 2025-10-20T19:23:41.259Z
Learnt from: CR
Repo: GetStream/Vision-Agents PR: 0
File: .cursor/rules/python.mdc:0-0
Timestamp: 2025-10-20T19:23:41.259Z
Learning: Applies to tests/**/*.py : Do not use pytest.mark.asyncio; async support is automatic
Applied to files:
plugins/elevenlabs/tests/test_tts.pyplugins/openai/tests/test_tts_openai.pyplugins/kokoro/tests/test_tts.py
📚 Learning: 2025-10-20T19:23:41.259Z
Learnt from: CR
Repo: GetStream/Vision-Agents PR: 0
File: .cursor/rules/python.mdc:0-0
Timestamp: 2025-10-20T19:23:41.259Z
Learning: Applies to tests/**/*.py : Mark integration tests with pytest.mark.integration
Applied to files:
pytest.iniplugins/openai/tests/test_tts_openai.py
📚 Learning: 2025-10-20T19:23:41.259Z
Learnt from: CR
Repo: GetStream/Vision-Agents PR: 0
File: .cursor/rules/python.mdc:0-0
Timestamp: 2025-10-20T19:23:41.259Z
Learning: Applies to **/*.py : Do not modify sys.path in Python code
Applied to files:
.gitignore
🧬 Code graph analysis (46)
plugins/fish/tests/test_fish_stt.py (1)
conftest.py (1)
STTSession(58-116)
agents-core/vision_agents/core/turn_detection/turn_detection.py (3)
agents-core/vision_agents/core/llm/llm.py (1)
warmup(63-71)agents-core/vision_agents/core/tts/tts.py (1)
warmup(73-81)agents-core/vision_agents/core/stt/stt.py (1)
warmup(39-47)
plugins/wizper/tests/test_wizper_stt.py (1)
conftest.py (2)
STTSession(58-116)skip_blockbuster(24-41)
plugins/elevenlabs/tests/test_tts.py (1)
agents-core/vision_agents/core/tts/testing.py (1)
TTSSession(25-83)
agents-core/vision_agents/core/utils/audio_forwarder.py (3)
tests/test_audio_forwarder.py (4)
callback(16-17)callback(34-35)callback(57-58)callback(82-83)conftest.py (2)
recv(315-338)recv(361-366)agents-core/vision_agents/core/utils/audio_track.py (1)
recv(10-11)
agents-core/vision_agents/core/tts/manual_test.py (1)
agents-core/vision_agents/core/tts/testing.py (2)
TTSSession(25-83)speeches(65-66)
agents-core/vision_agents/core/__init__.py (1)
agents-core/vision_agents/core/cli/cli_runner.py (1)
cli(115-206)
agents-core/vision_agents/core/agents/agent_launcher.py (2)
agents-core/vision_agents/core/agents/agents.py (1)
Agent(112-1336)examples/other_examples/openai_realtime_webrtc/openai_realtime_example.py (2)
create_agent(23-37)join_call(41-65)
plugins/aws/tests/test_tts.py (1)
agents-core/vision_agents/core/tts/testing.py (1)
TTSSession(25-83)
agents-core/vision_agents/core/utils/utils.py (1)
agents-core/vision_agents/core/agents/agents.py (1)
join(450-533)
agents-core/vision_agents/core/agents/__init__.py (1)
agents-core/vision_agents/core/agents/agent_launcher.py (1)
AgentLauncher(18-117)
agents-core/vision_agents/core/utils/audio_track.py (2)
agents-core/vision_agents/core/agents/agents.py (1)
recv(946-947)agents-core/vision_agents/core/utils/video_track.py (1)
recv(45-71)
plugins/wizper/vision_agents/plugins/wizper/stt.py (3)
agents-core/vision_agents/core/edge/types.py (1)
Participant(22-24)agents-core/vision_agents/core/stt/events.py (1)
TranscriptResponse(7-13)agents-core/vision_agents/core/stt/stt.py (1)
_emit_transcript_event(49-69)
agents-core/vision_agents/core/stt/stt.py (3)
agents-core/vision_agents/core/llm/llm.py (1)
warmup(63-71)agents-core/vision_agents/core/tts/tts.py (1)
warmup(73-81)agents-core/vision_agents/core/turn_detection/turn_detection.py (1)
warmup(36-44)
plugins/cartesia/tests/test_tts.py (1)
agents-core/vision_agents/core/tts/testing.py (1)
TTSSession(25-83)
plugins/gemini/vision_agents/plugins/gemini/gemini_realtime.py (2)
agents-core/vision_agents/core/llm/realtime.py (1)
_emit_audio_output_event(116-127)agents-core/vision_agents/core/edge/types.py (1)
write(45-45)
plugins/ultralytics/vision_agents/plugins/ultralytics/yolo_pose_processor.py (1)
agents-core/vision_agents/core/utils/video_track.py (1)
QueuedVideoTrack(12-74)
plugins/gemini/tests/test_gemini_realtime.py (5)
agents-core/vision_agents/core/tts/manual_test.py (1)
play_pcm_with_ffplay(17-81)agents-core/vision_agents/core/llm/events.py (1)
RealtimeAudioOutputEvent(37-42)agents-core/vision_agents/core/utils/utils.py (1)
frame_to_png_bytes(130-153)agents-core/vision_agents/core/events/base.py (1)
AudioFormat(23-30)conftest.py (1)
mia_audio_16khz(131-170)
plugins/aws/tests/test_aws_realtime.py (4)
agents-core/vision_agents/core/tts/manual_test.py (1)
play_pcm_with_ffplay(17-81)agents-core/vision_agents/core/llm/events.py (1)
RealtimeAudioOutputEvent(37-42)agents-core/vision_agents/core/events/base.py (1)
AudioFormat(23-30)conftest.py (1)
mia_audio_16khz(131-170)
agents-core/vision_agents/core/agents/agents.py (5)
agents-core/vision_agents/core/edge/types.py (1)
write(45-45)agents-core/vision_agents/core/edge/sfu_events.py (16)
participant(1496-1501)participant(1504-1507)participant(1545-1550)participant(1553-1556)participant(1625-1630)participant(1633-1636)participant(2100-2105)participant(2108-2111)participant(2156-2161)participant(2164-2167)user_id(489-493)user_id(856-860)user_id(901-905)user_id(1186-1190)user_id(2093-2097)user_id(2142-2146)agents-core/vision_agents/core/events/base.py (1)
user_id(45-48)agents-core/vision_agents/core/stt/events.py (2)
confidence(30-31)confidence(61-62)agents-core/vision_agents/core/turn_detection/events.py (1)
TurnEndedEvent(29-45)
plugins/anthropic/tests/test_anthropic_llm.py (1)
plugins/anthropic/vision_agents/plugins/anthropic/anthropic_llm.py (1)
ClaudeLLM(25-477)
agents-core/vision_agents/core/cli/cli_runner.py (2)
agents-core/vision_agents/core/agents/agent_launcher.py (2)
AgentLauncher(18-117)launch(103-117)examples/other_examples/openai_realtime_webrtc/openai_realtime_example.py (1)
join_call(41-65)
examples/other_examples/gemini_live_realtime/gemini_live_example.py (2)
agents-core/vision_agents/core/agents/agents.py (1)
join(450-533)plugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py (2)
join(256-307)open_demo(350-428)
plugins/fish/vision_agents/plugins/fish/stt.py (4)
plugins/aws/vision_agents/plugins/aws/tts.py (1)
client(55-60)agents-core/vision_agents/core/edge/sfu_events.py (17)
participant(1496-1501)participant(1504-1507)participant(1545-1550)participant(1553-1556)participant(1625-1630)participant(1633-1636)participant(2100-2105)participant(2108-2111)participant(2156-2161)participant(2164-2167)Participant(229-270)user_id(489-493)user_id(856-860)user_id(901-905)user_id(1186-1190)user_id(2093-2097)user_id(2142-2146)agents-core/vision_agents/core/edge/types.py (1)
Participant(22-24)agents-core/vision_agents/core/stt/stt.py (1)
_emit_transcript_event(49-69)
agents-core/vision_agents/core/utils/video_track.py (1)
agents-core/vision_agents/core/utils/queue.py (2)
LatestNQueue(6-28)put_latest_nowait(22-28)
agents-core/vision_agents/core/llm/events.py (1)
agents-core/vision_agents/core/events/base.py (1)
PluginBaseEvent(52-54)
plugins/fish/tests/test_fish_tts.py (1)
agents-core/vision_agents/core/tts/testing.py (1)
TTSSession(25-83)
agents-core/vision_agents/core/tts/tts.py (4)
agents-core/vision_agents/core/llm/llm.py (1)
warmup(63-71)agents-core/vision_agents/core/stt/stt.py (1)
warmup(39-47)agents-core/vision_agents/core/turn_detection/turn_detection.py (1)
warmup(36-44)agents-core/vision_agents/core/tts/events.py (1)
TTSAudioEvent(11-19)
plugins/aws/vision_agents/plugins/aws/aws_realtime.py (2)
agents-core/vision_agents/core/llm/realtime.py (2)
_emit_audio_input_event(105-113)_emit_audio_output_event(116-127)agents-core/vision_agents/core/edge/types.py (1)
write(45-45)
tests/test_utils.py (2)
agents-core/vision_agents/core/utils/utils.py (1)
parse_instructions(89-127)agents-core/vision_agents/core/events/base.py (1)
AudioFormat(23-30)
conftest.py (2)
agents-core/vision_agents/core/agents/agents.py (2)
recv(946-947)close(567-639)tests/base_test.py (1)
recv(78-109)
agents-core/vision_agents/core/llm/realtime.py (1)
agents-core/vision_agents/core/llm/events.py (2)
RealtimeAudioInputEvent(29-33)RealtimeAudioOutputEvent(37-42)
tests/test_queued_video_track.py (2)
agents-core/vision_agents/core/utils/video_track.py (1)
QueuedVideoTrack(12-74)tests/base_test.py (1)
BaseTest(11-111)
agents-core/vision_agents/core/tts/events.py (1)
agents-core/vision_agents/core/events/base.py (2)
PluginBaseEvent(52-54)ConnectionState(13-20)
examples/other_examples/openai_realtime_webrtc/openai_realtime_example.py (5)
agents-core/vision_agents/core/edge/types.py (1)
User(15-18)agents-core/vision_agents/core/agents/agents.py (2)
Agent(112-1336)create_user(658-670)agents-core/vision_agents/core/cli/cli_runner.py (1)
cli(115-206)agents-core/vision_agents/core/agents/agent_launcher.py (1)
AgentLauncher(18-117)plugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py (1)
create_user(252-254)
plugins/openai/tests/test_tts_openai.py (1)
agents-core/vision_agents/core/tts/testing.py (1)
TTSSession(25-83)
plugins/openai/vision_agents/plugins/openai/rtc_manager.py (6)
agents-core/vision_agents/core/utils/audio_forwarder.py (3)
AudioForwarder(12-68)start(30-36)stop(38-47)agents-core/vision_agents/core/utils/audio_track.py (1)
QueuedAudioTrack(9-11)agents-core/vision_agents/core/utils/video_track.py (3)
QueuedVideoTrack(12-74)add_frame(31-43)stop(73-74)agents-core/vision_agents/core/utils/video_forwarder.py (3)
start(32-40)start_event_consumer(109-195)stop(42-59)agents-core/vision_agents/core/edge/types.py (2)
write(45-45)stop(47-47)plugins/gemini/vision_agents/plugins/gemini/gemini_realtime.py (1)
_send_video_frame(430-442)
plugins/aws/vision_agents/plugins/aws/aws_llm.py (2)
plugins/aws/vision_agents/plugins/aws/tts.py (1)
client(55-60)agents-core/vision_agents/core/llm/llm_types.py (1)
NormalizedToolCallItem(107-111)
plugins/smart_turn/tests/test_smart_turn.py (3)
conftest.py (1)
skip_blockbuster(24-41)agents-core/vision_agents/core/utils/utils.py (1)
ensure_model(166-208)plugins/smart_turn/vision_agents/plugins/smart_turn/smart_turn_detection.py (1)
SileroVAD(397-466)
agents-core/vision_agents/core/llm/llm.py (10)
agents-core/vision_agents/core/tts/tts.py (1)
warmup(73-81)agents-core/vision_agents/core/stt/stt.py (1)
warmup(39-47)agents-core/vision_agents/core/turn_detection/turn_detection.py (1)
warmup(36-44)plugins/aws/vision_agents/plugins/aws/aws_llm.py (2)
_convert_tools_to_provider_format(666-715)_extract_tool_calls_from_response(635-664)plugins/aws/vision_agents/plugins/aws/aws_realtime.py (1)
_convert_tools_to_provider_format(437-489)plugins/gemini/vision_agents/plugins/gemini/gemini_llm.py (3)
_convert_tools_to_provider_format(297-318)_extract_tool_calls_from_response(320-365)_extract_tool_calls_from_stream_chunk(367-384)plugins/gemini/vision_agents/plugins/gemini/gemini_realtime.py (2)
_convert_tools_to_provider_format(507-530)_extract_tool_calls_from_response(532-567)agents-core/vision_agents/core/llm/llm_types.py (2)
ToolSchema(64-67)NormalizedToolCallItem(107-111)agents-core/vision_agents/core/llm/function_registry.py (1)
get_tool_schemas(108-114)agents-core/vision_agents/core/llm/events.py (2)
ToolStartEvent(116-122)ToolEndEvent(126-135)
plugins/aws/vision_agents/plugins/aws/tts.py (1)
plugins/aws/vision_agents/plugins/aws/aws_llm.py (1)
client(84-91)
plugins/kokoro/tests/test_tts.py (6)
plugins/aws/tests/test_tts.py (1)
tts(29-33)plugins/cartesia/tests/test_tts.py (1)
tts(17-21)plugins/elevenlabs/tests/test_tts.py (1)
tts(12-18)plugins/fish/tests/test_fish_tts.py (1)
tts(15-16)plugins/openai/tests/test_tts_openai.py (1)
tts(12-16)agents-core/vision_agents/core/tts/manual_test.py (1)
manual_tts_to_wav(84-135)
tests/test_tts_base.py (1)
agents-core/vision_agents/core/tts/testing.py (1)
speeches(65-66)
plugins/openai/vision_agents/plugins/openai/openai_realtime.py (2)
agents-core/vision_agents/core/llm/realtime.py (1)
_emit_audio_output_event(116-127)agents-core/vision_agents/core/edge/types.py (1)
write(45-45)
plugins/gemini/vision_agents/plugins/gemini/gemini_llm.py (4)
agents-core/vision_agents/core/llm/events.py (2)
LLMResponseCompletedEvent(106-112)LLMResponseChunkEvent(87-102)plugins/aws/vision_agents/plugins/aws/aws_llm.py (4)
client(84-91)simple_response(93-113)_convert_tools_to_provider_format(666-715)_extract_tool_calls_from_response(635-664)agents-core/vision_agents/core/llm/llm.py (7)
simple_response(73-79)LLMResponseEvent(35-39)get_available_functions(214-216)_convert_tools_to_provider_format(120-134)_dedup_and_execute(358-392)_extract_tool_calls_from_stream_chunk(152-166)_extract_tool_calls_from_response(136-150)plugins/gemini/vision_agents/plugins/gemini/gemini_realtime.py (3)
simple_response(113-128)_convert_tools_to_provider_format(507-530)_extract_tool_calls_from_response(532-567)
tests/test_audio_forwarder.py (3)
agents-core/vision_agents/core/events/base.py (1)
AudioFormat(23-30)agents-core/vision_agents/core/utils/audio_forwarder.py (3)
AudioForwarder(12-68)start(30-36)stop(38-47)conftest.py (1)
audio_track_48khz(348-372)
🪛 LanguageTool
DEVELOPMENT.md
[grammar] ~315-~315: Use a hyphen to join words.
Context: ...t what pace? ### Tasks & Async - Short running tasks should check if the connec...
(QB_NEW_EN_HYPHEN)
[style] ~321-~321: Consider using the more formal “until”.
Context: ...rn a frame. Never return None. and wait till the next frame is available - When usin...
(TILL)
[style] ~322-~322: In American English, abbreviations like “etc.” require a period.
Context: ...t rgb24 when connecting/sending to Yolo etc - QueuedVideoTrack is a writable/queued...
(ETC_PERIOD)
🪛 markdownlint-cli2 (0.18.1)
docs/ai/instructions/ai-utils.md
27-27: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
| # Enable BlockBuster in debug mode | ||
| if debug: | ||
| loop = asyncio.get_running_loop() | ||
| loop.slow_callback_duration = 0.005 # warn if blocking >5ms | ||
| from blockbuster import BlockBuster | ||
|
|
||
| blockbuster = BlockBuster() | ||
| blockbuster.activate() | ||
| logging.info("BlockBuster activated") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Critical: get_running_loop() called before asyncio.run().
Line 42 calls asyncio.get_running_loop() but no event loop is running yet at this point in the code. This will raise a RuntimeError: no running event loop. The slow_callback_duration should be set on the loop created by asyncio.run(), or use asyncio.new_event_loop().
# Enable BlockBuster in debug mode
if debug:
- loop = asyncio.get_running_loop()
- loop.slow_callback_duration = 0.005 # warn if blocking >5ms
from blockbuster import BlockBuster
blockbuster = BlockBuster()
blockbuster.activate()
logging.info("BlockBuster activated")
+
+ # Note: slow_callback_duration should be set on the event loop
+ # created by asyncio.run, but we can't access it here.
+ # Users should set PYTHONASYNCIODEBUG=1 environment variable instead.Alternatively, create and manage the loop explicitly:
# Enable BlockBuster in debug mode
if debug:
- loop = asyncio.get_running_loop()
- loop.slow_callback_duration = 0.005 # warn if blocking >5ms
from blockbuster import BlockBuster
blockbuster = BlockBuster()
blockbuster.activate()
logging.info("BlockBuster activated")
# Run warmup if agent launcher provided
async def _run_with_launcher():
if agent_launcher:
logging.info("Warming up agent via launcher...")
await agent_launcher.launch()
await async_main()
- # Run the async main function
- asyncio.run(_run_with_launcher(), debug=debug)
+ # Run the async main function with explicit loop for debug config
+ if debug:
+ loop = asyncio.new_event_loop()
+ asyncio.set_event_loop(loop)
+ loop.set_debug(True)
+ loop.slow_callback_duration = 0.005
+ try:
+ loop.run_until_complete(_run_with_launcher())
+ finally:
+ loop.close()
+ else:
+ asyncio.run(_run_with_launcher())📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| # Enable BlockBuster in debug mode | |
| if debug: | |
| loop = asyncio.get_running_loop() | |
| loop.slow_callback_duration = 0.005 # warn if blocking >5ms | |
| from blockbuster import BlockBuster | |
| blockbuster = BlockBuster() | |
| blockbuster.activate() | |
| logging.info("BlockBuster activated") | |
| # Enable BlockBuster in debug mode | |
| if debug: | |
| from blockbuster import BlockBuster | |
| blockbuster = BlockBuster() | |
| blockbuster.activate() | |
| logging.info("BlockBuster activated") | |
| # Note: slow_callback_duration should be set on the event loop | |
| # created by asyncio.run, but we can't access it here. | |
| # Users should set PYTHONASYNCIODEBUG=1 environment variable instead. |
🤖 Prompt for AI Agents
In agents-core/vision_agents/core/cli/cli_runner.py around lines 40-48, calling
asyncio.get_running_loop() runs before asyncio.run() and will raise
RuntimeError; replace this by creating and configuring an explicit event loop
(loop = asyncio.new_event_loop()), set loop.slow_callback_duration = 0.005,
install it with asyncio.set_event_loop(loop) and then run your main coroutine
with loop.run_until_complete(...) (or keep the new loop and use it
consistently), or alternatively defer setting slow_callback_duration until
inside the running coroutine where asyncio.get_running_loop() is valid; ensure
BlockBuster is activated after the loop is created and running.
| try: | ||
| # Temporarily suppress asyncio error logging during cleanup | ||
| asyncio_logger = logging.getLogger("asyncio") | ||
| original_level = asyncio_logger.level | ||
|
|
||
| asyncio.run(_run(), debug=debug) | ||
| except KeyboardInterrupt: | ||
| # Suppress KeyboardInterrupt and asyncio errors during cleanup | ||
| asyncio_logger.setLevel(logging.CRITICAL) | ||
| logger = logging.getLogger(__name__) | ||
| logger.info("👋 Agent shutdown complete") | ||
| finally: | ||
| # Restore original logging level | ||
| if 'asyncio_logger' in locals() and 'original_level' in locals(): | ||
| asyncio_logger.setLevel(original_level) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential NameError in finally block.
Line 202 references original_level which is only set on line 192 inside the try block. If an exception occurs before line 192, the finally block will raise a NameError. The locals() check helps but is fragile.
try:
+ # Save original log level before any processing
+ asyncio_logger = logging.getLogger("asyncio")
+ original_level = asyncio_logger.level
+
# Temporarily suppress asyncio error logging during cleanup
- asyncio_logger = logging.getLogger("asyncio")
- original_level = asyncio_logger.level
asyncio.run(_run(), debug=debug)
except KeyboardInterrupt:
# Suppress KeyboardInterrupt and asyncio errors during cleanup
asyncio_logger.setLevel(logging.CRITICAL)
logger = logging.getLogger(__name__)
logger.info("👋 Agent shutdown complete")
finally:
# Restore original logging level
- if 'asyncio_logger' in locals() and 'original_level' in locals():
- asyncio_logger.setLevel(original_level)
+ asyncio_logger.setLevel(original_level)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| try: | |
| # Temporarily suppress asyncio error logging during cleanup | |
| asyncio_logger = logging.getLogger("asyncio") | |
| original_level = asyncio_logger.level | |
| asyncio.run(_run(), debug=debug) | |
| except KeyboardInterrupt: | |
| # Suppress KeyboardInterrupt and asyncio errors during cleanup | |
| asyncio_logger.setLevel(logging.CRITICAL) | |
| logger = logging.getLogger(__name__) | |
| logger.info("👋 Agent shutdown complete") | |
| finally: | |
| # Restore original logging level | |
| if 'asyncio_logger' in locals() and 'original_level' in locals(): | |
| asyncio_logger.setLevel(original_level) | |
| try: | |
| # Save original log level before any processing | |
| asyncio_logger = logging.getLogger("asyncio") | |
| original_level = asyncio_logger.level | |
| asyncio.run(_run(), debug=debug) | |
| except KeyboardInterrupt: | |
| # Suppress KeyboardInterrupt and asyncio errors during cleanup | |
| asyncio_logger.setLevel(logging.CRITICAL) | |
| logger = logging.getLogger(__name__) | |
| logger.info("👋 Agent shutdown complete") | |
| finally: | |
| # Restore original logging level | |
| asyncio_logger.setLevel(original_level) |
🤖 Prompt for AI Agents
In agents-core/vision_agents/core/cli/cli_runner.py around lines 189 to 203, the
finally block references original_level which may be undefined if an exception
occurs before it is set; initialize asyncio_logger and original_level to None
(or set a sentinel) before the try, then inside finally check for is not None
(or the sentinel) before restoring the level so no NameError can occur.
| while (asyncio.get_event_loop().time() - start_time) < timeout: | ||
| if not self._queue and not self._handler_tasks: | ||
| break | ||
| await asyncio.sleep(0.01) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
Verify the behavioral change in wait completion semantics.
The updated logic exits when the queue is empty and no handler tasks are pending, removing the previous await for handler task completion. This changes when wait() returns relative to handler completion.
Potential concern: If new handler tasks are created after the check but handlers haven't fully completed, wait() may return prematurely.
Please confirm whether tests expect all handlers to complete before wait() returns, or if time-based progression is the intended behavior.
🏁 Script executed:
#!/bin/bash
# Check how wait() is used in tests to understand if early return is acceptable
rg -n -A5 -B5 'await.*\.wait\(' --type=py -g '!**/node_modules/**' -g 'test_*.py' -g '*_test.py'Length of output: 21822
I need to examine how handler tasks are tracked in the event manager implementation to verify if the concern about premature returns is valid.
Let me examine the manager.py implementation to verify how handler tasks are tracked:
I need to examine the actual manager.py implementation to verify the task tracking logic:
🏁 Script executed:
#!/bin/bash
# Find and examine the wait() implementation and _handler_tasks management
rg -n "_handler_tasks" agents-core/vision_agents/core/events/manager.py -A 2 -B 2Length of output: 1040
🏁 Script executed:
#!/bin/bash
# Look for the wait() method implementation and surrounding context
rg -n "async def wait" agents-core/vision_agents/core/events/manager.py -A 15Length of output: 639
🏁 Script executed:
#!/bin/bash
# Search for where handler tasks are created/added
rg -n "asyncio.create_task\|_handler_tasks" agents-core/vision_agents/core/events/manager.py -B 1 -A 1Length of output: 0
Confirm the concern: wait() behavioral change weakens completion guarantees.
The test patterns consistently show send event → await wait() → assert handler effects, which indicates all tests depend on handlers completing before wait() returns. The updated implementation replaced explicit task completion (likely asyncio.gather()) with a polling loop that checks if queue and handler tasks dict are empty. This polling approach is less reliable: if the check passes when both are empty, wait() immediately returns, but new handler tasks could be created milliseconds later during event processing. While asyncio's single-threaded nature mitigates immediate race conditions, the change removes the deterministic guarantee that handlers have fully completed.
The tests expect wait() to act as a synchronization point ensuring handler side effects are visible after return. The polling-based approach doesn't provide this guarantee as strongly as explicitly awaiting task completion.
🤖 Prompt for AI Agents
In agents-core/vision_agents/core/events/manager.py around lines 480 to 483, the
new polling-based wait() can return while handler tasks may still be created or
finishing, weakening the deterministic synchronization tests rely on; restore
deterministic completion by capturing the current set of handler tasks and
explicitly awaiting their completion (e.g., use asyncio.wait or asyncio.gather
on a snapshot of tasks) while still respecting the overall timeout loop; repeat
capturing and awaiting newly created tasks until no tasks remain or the timeout
elapses so wait() only returns after all handler tasks have finished.
| class QueuedAudioTrack(AudioStreamTrack): | ||
| async def recv(self) -> Frame: | ||
| return await super().recv() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
QueuedAudioTrack doesn't implement queueing.
The class name implies queuing behavior, but recv() simply delegates to the parent without any queue management. Compare this to QueuedVideoTrack (agents-core/vision_agents/core/utils/video_track.py), which maintains a frame_queue and implements actual queuing logic.
If queuing isn't needed yet, consider:
- Renaming to something like
BasicAudioTrackorPassthroughAudioTrack - Or implementing the queue similar to QueuedVideoTrack
Do you want me to generate a queued implementation consistent with QueuedVideoTrack?
🤖 Prompt for AI Agents
In agents-core/vision_agents/core/utils/audio_track.py around lines 9-11,
QueuedAudioTrack currently just delegates recv() to the parent but the class
name implies it should manage a queue like QueuedVideoTrack; fix by implementing
a real queued audio track: add an __init__ that creates an asyncio.Queue for
frames and a closed flag, provide an async put(frame) to enqueue frames (and a
put(None) sentinel to mark end), change recv() to await queue.get(), raise
appropriate exceptions or return sentinel behavior when closed, and implement
stop/close cleanup mirroring the logic and imports used in QueuedVideoTrack so
callers can push audio frames into the queue and recv() yields them
asynchronously.
| try: | ||
| while not self._stopped.is_set(): | ||
| frame : Frame = await self.input_track.recv() | ||
| frame.dts = int(datetime.datetime.now().timestamp()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overwriting frame.dts breaks video timing semantics.
Setting dts to wall-clock time discards the frame's original decoding timestamp, which downstream consumers may rely on for A/V synchronization or frame ordering. If you need to track when frames arrive, consider adding a separate metadata field rather than overwriting dts.
Apply this diff to preserve the original dts:
- frame.dts = int(datetime.datetime.now().timestamp())
-Or if wall-clock tracking is required, store it separately:
- frame.dts = int(datetime.datetime.now().timestamp())
+ # Store arrival time as metadata without breaking dts
+ frame.time = datetime.datetime.now().timestamp()📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| frame.dts = int(datetime.datetime.now().timestamp()) |
🤖 Prompt for AI Agents
In agents-core/vision_agents/core/utils/video_forwarder.py around line 76, do
not overwrite frame.dts with wall-clock time; preserve the original decoding
timestamp for A/V sync and ordering. Remove the assignment that sets frame.dts =
int(datetime.datetime.now().timestamp()) and instead attach the arrival
timestamp to a separate field (for example frame.arrival_ts or
frame.metadata['arrival_ts']) so arrival time is tracked without mutating the
frame's timing semantics; ensure any downstream code reads the new field for
arrival time and continues to use frame.dts for decoding/sync.
| # Resample to 24kHz if needed, as required by AWS Nova | ||
| if pcm.sample_rate != 24000: | ||
| pcm = pcm.resample(24000) | ||
| pcm = pcm.resample(24000) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clarify the comment to match the unconditional resampling.
The comment states "if needed" but the code always resamples. While resample() may internally optimize for matching rates, the comment should reflect the actual behavior.
Apply this diff:
- # Resample to 24kHz if needed, as required by AWS Nova
- pcm = pcm.resample(24000)
+ # Resample to 24kHz as required by AWS Nova
+ pcm = pcm.resample(24000)🤖 Prompt for AI Agents
In plugins/aws/vision_agents/plugins/aws/aws_realtime.py around lines 243 to
244, the comment "Resample to 24kHz if needed, as required by AWS Nova" is
misleading because the code unconditionally calls pcm = pcm.resample(24000);
update the comment to reflect that the code always resamples (e.g., "Resample to
24kHz (unconditionally) as required by AWS Nova") or adjust wording to note that
resample() may be a no-op when rates already match, so the comment matches the
actual behavior.
| self._audio_to_openai_track: QueuedAudioTrack = QueuedAudioTrack( | ||
| sample_rate=48000 | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
Verify sample rate alignment with OpenAI requirements.
The QueuedAudioTrack is configured for 48kHz, while the AudioForwarder (line 86-87 in audio_forwarder.py) resamples received audio to 16kHz mono. Ensure this matches OpenAI's expected input format.
Check the OpenAI Realtime API documentation to confirm the expected audio format:
🌐 Web query:
What audio format does OpenAI Realtime API expect for input audio?
💡 Result:
OpenAI Realtime API accepts input_audio_format values: pcm16, g711_ulaw, or g711_alaw. For pcm16 the audio must be 16‑bit PCM, 24 kHz sample rate, mono (single channel), little‑endian. [1][2]
Audio sample rate must be corrected to 24 kHz for OpenAI Realtime API compatibility.
The OpenAI Realtime API expects audio in PCM16 format at 24 kHz sample rate, mono, little-endian. However, the current implementation initializes QueuedAudioTrack at 48 kHz (lines 50-52) and then AudioForwarder resamples to 16 kHz (audio_forwarder.py, lines 86-87). Neither matches the required 24 kHz specification, which will cause audio streaming failures.
Update QueuedAudioTrack to use 24000 as the sample rate, and adjust AudioForwarder's resampling to match.
🤖 Prompt for AI Agents
In plugins/openai/vision_agents/plugins/openai/rtc_manager.py around lines 50 to
52, the QueuedAudioTrack is initialized with a 48000 Hz sample rate but OpenAI
Realtime requires PCM16 mono little-endian at 24000 Hz; change the
QueuedAudioTrack initialization to use sample_rate=24000. Also update the
corresponding resampling target in AudioForwarder (audio_forwarder.py) from
16000 to 24000 so both producer and forwarder use 24 kHz, and ensure the audio
pipeline outputs mono PCM16 little-endian to match the API spec.
| async def _send_video_frame(self, frame: av.VideoFrame) -> None: | ||
| """ | ||
| Send a video frame to Gemini using send_realtime_input | ||
| """ | ||
| logger.debug(f"Sending video frame: {frame}") | ||
| if self._video_to_openai_track: | ||
| await self._video_to_openai_track.add_frame(frame) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix copy-paste error in docstring.
Line 223 says "Send a video frame to Gemini" but this is the OpenAI RTC manager.
async def _send_video_frame(self, frame: av.VideoFrame) -> None:
"""
- Send a video frame to Gemini using send_realtime_input
+ Send a video frame to OpenAI via the queued video track.
"""📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| async def _send_video_frame(self, frame: av.VideoFrame) -> None: | |
| """ | |
| Send a video frame to Gemini using send_realtime_input | |
| """ | |
| logger.debug(f"Sending video frame: {frame}") | |
| if self._video_to_openai_track: | |
| await self._video_to_openai_track.add_frame(frame) | |
| async def _send_video_frame(self, frame: av.VideoFrame) -> None: | |
| """ | |
| Send a video frame to OpenAI via the queued video track. | |
| """ | |
| logger.debug(f"Sending video frame: {frame}") | |
| if self._video_to_openai_track: | |
| await self._video_to_openai_track.add_frame(frame) |
🤖 Prompt for AI Agents
In plugins/openai/vision_agents/plugins/openai/rtc_manager.py around lines 221
to 227, the method docstring incorrectly references "Gemini"; update the
docstring to accurately describe that the method sends a video frame via the
OpenAI RTC manager (e.g., "Send a video frame to OpenAI RTC using
send_realtime_input" or similar). Keep the rest of the method unchanged.
| # Load model file asynchronously to avoid blocking I/O | ||
| model_bytes = await asyncio.to_thread(open(path, "rb").read) | ||
| vad = SileroVAD(path, model_bytes) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix async file read syntax.
Line 37 attempts to call open(path, "rb").read but this passes the unbound method rather than calling it. The file won't be read and won't be closed.
Apply this diff to fix the file reading:
- # Load model file asynchronously to avoid blocking I/O
- model_bytes = await asyncio.to_thread(open(path, "rb").read)
- vad = SileroVAD(path, model_bytes)
+ # Load model file asynchronously to avoid blocking I/O
+ def read_model():
+ with open(path, "rb") as f:
+ return f.read()
+ model_bytes = await asyncio.to_thread(read_model)
+ vad = SileroVAD(path, model_bytes)🤖 Prompt for AI Agents
In plugins/smart_turn/tests/test_smart_turn.py around lines 36 to 38, the code
passes the unbound method open(path, "rb").read to asyncio.to_thread which
neither calls read() nor closes the file; fix by moving the file read into a
synchronous helper executed inside asyncio.to_thread that uses a with block to
open and read the file (e.g., define an inline function or lambda that does
"with open(path, 'rb') as f: return f.read()" and await asyncio.to_thread(...)
to get model_bytes), then pass model_bytes to SileroVAD.
TODOs
Summary by CodeRabbit
New Features
Documentation
Tests
Chores