diff --git a/agents-core/vision_agents/core/agents/agent_launcher.py b/agents-core/vision_agents/core/agents/agent_launcher.py index d2c7e424..0ada7705 100644 --- a/agents-core/vision_agents/core/agents/agent_launcher.py +++ b/agents-core/vision_agents/core/agents/agent_launcher.py @@ -92,6 +92,14 @@ async def warmup(self, **kwargs) -> None: if agent.turn_detection and hasattr(agent.turn_detection, 'warmup'): logger.debug("Warming up turn detection: %s", agent.turn_detection.__class__.__name__) warmup_tasks.append(agent.turn_detection.warmup()) + + # Warmup processors + if agent.processors and hasattr(agent.processors, 'warmup'): + logger.debug("Warming up processors") + for processor in agent.processors: + if hasattr(processor, 'warmup'): + logger.debug("Warming up processor: %s", processor.__class__.__name__) + warmup_tasks.append(processor.warmup()) # Run all warmups in parallel if warmup_tasks: diff --git a/agents-core/vision_agents/core/agents/agents.py b/agents-core/vision_agents/core/agents/agents.py index 73726cc1..545840bd 100644 --- a/agents-core/vision_agents/core/agents/agents.py +++ b/agents-core/vision_agents/core/agents/agents.py @@ -5,12 +5,11 @@ import time import uuid from dataclasses import asdict -from typing import TYPE_CHECKING, Any, Dict, List, Optional, TypeGuard, Coroutine +from typing import TYPE_CHECKING, Any, Dict, List, Optional, TypeGuard from uuid import uuid4 import getstream.models from aiortc import VideoStreamTrack -from getstream.video.async_call import Call from getstream.video.rtc import Call from getstream.video.rtc.pb.stream.video.sfu.models.models_pb2 import TrackType @@ -697,7 +696,7 @@ async def create_user(self) -> None: async def create_call(self, call_type: str, call_id: str) -> Call: """Shortcut for creating a call/room etc.""" call = self.edge.client.video.call(call_type, call_id) - response = await call.get_or_create(data={"created_by_id": self.agent_user.id}) + await call.get_or_create(data={"created_by_id": self.agent_user.id}) return call diff --git a/plugins/moondream/README.md b/plugins/moondream/README.md index 5415bd10..a4c11f7f 100644 --- a/plugins/moondream/README.md +++ b/plugins/moondream/README.md @@ -1,27 +1,48 @@ # Moondream Plugin -This plugin provides Moondream 3 detection capabilities for vision-agents, enabling real-time zero-shot object detection on video streams. Choose between cloud-hosted or local processing depending on your needs. +This plugin provides Moondream 3 vision capabilities for vision-agents, including: +- **Object Detection**: Real-time zero-shot object detection on video streams +- **Visual Question Answering (VQA)**: Answer questions about video frames +- **Image Captioning**: Generate descriptions of video frames + +Choose between cloud-hosted or local processing depending on your needs. When running locally, we recommend you do so on CUDA enabled devices. ## Installation ```bash -uv add vision-agents-plugins-moondream +uv add vision-agents[moondream] ``` -## Choosing the Right Processor +## Choosing the Right Component + +### Detection Processors -### CloudDetectionProcessor (Recommended for Most Users) +#### CloudDetectionProcessor (Recommended for Most Users) - **Use when:** You want a simple setup with no infrastructure management - **Pros:** No model download, no GPU required, automatic updates - **Cons:** Requires API key, 2 RPS rate limit by default (can be increased) - **Best for:** Development, testing, low-to-medium volume applications -### LocalDetectionProcessor (For Advanced Users) +#### LocalDetectionProcessor (For Advanced Users) - **Use when:** You need higher throughput, have your own GPU infrastructure, or want to avoid rate limits - **Pros:** No rate limits, no API costs, full control over hardware - **Cons:** Requires GPU for best performance, model download on first use, infrastructure management - **Best for:** Production deployments, high-volume applications, Digital Ocean Gradient AI GPUs, or custom infrastructure +### Vision Language Models (VLM) + +#### CloudVLM (Recommended for Most Users) +- **Use when:** You want visual question answering or captioning without managing infrastructure +- **Pros:** No model download, no GPU required, automatic updates +- **Cons:** Requires API key, rate limits apply +- **Best for:** Development, testing, applications requiring VQA or captioning + +#### LocalVLM (For Advanced Users) +- **Use when:** You need VQA or captioning with higher throughput or want to avoid rate limits +- **Pros:** No rate limits, no API costs, full control over hardware +- **Cons:** Requires GPU for best performance, model download on first use, infrastructure management +- **Best for:** Production deployments, high-volume applications, or custom infrastructure + ## Quick Start ### Using CloudDetectionProcessor (Hosted) @@ -64,7 +85,7 @@ from vision_agents.core import Agent processor = moondream.LocalDetectionProcessor( detect_objects=["person", "car", "dog"], conf_threshold=0.3, - device="cuda", # Auto-detects CUDA, MPS, or CPU + force_cpu=False, # Auto-detects CUDA, MPS, or CPU fps=30 ) @@ -87,6 +108,107 @@ processor = moondream.CloudDetectionProcessor( ) ``` +## Vision Language Model (VLM) Quick Start + +### Using CloudVLM (Hosted) + +The `CloudVLM` uses Moondream's hosted API for visual question answering and captioning. It automatically processes video frames and responds to questions asked via STT (Speech-to-Text). + +```python +import asyncio +import os +from dotenv import load_dotenv +from vision_agents.core import User, Agent, cli +from vision_agents.core.agents import AgentLauncher +from vision_agents.plugins import deepgram, getstream, elevenlabs, moondream +from vision_agents.core.events import CallSessionParticipantJoinedEvent + +load_dotenv() + +async def create_agent(**kwargs) -> Agent: + # Create a cloud VLM for visual question answering + llm = moondream.CloudVLM( + api_key=os.getenv("MOONDREAM_API_KEY"), # or set MOONDREAM_API_KEY env var + mode="vqa", # or "caption" for image captioning + ) + + agent = Agent( + edge=getstream.Edge(), + agent_user=User(name="My happy AI friend", id="agent"), + llm=llm, + tts=elevenlabs.TTS(), + stt=deepgram.STT(), + ) + return agent + +async def join_call(agent: Agent, call_type: str, call_id: str, **kwargs) -> None: + await agent.create_user() + call = await agent.create_call(call_type, call_id) + + @agent.events.subscribe + async def on_participant_joined(event: CallSessionParticipantJoinedEvent): + if event.participant.user.id != "agent": + await asyncio.sleep(2) + # Ask the agent to describe what it sees + await agent.simple_response("Describe what you currently see") + + with await agent.join(call): + await agent.edge.open_demo(call) + await agent.finish() + +if __name__ == "__main__": + cli(AgentLauncher(create_agent=create_agent, join_call=join_call)) +``` + +### Using LocalVLM (On-Device) + +The `LocalVLM` downloads the model from HuggingFace and runs on device. It supports both VQA and captioning modes. + +**Note:** The moondream3-preview model is gated and requires HuggingFace authentication: +- Request access at https://huggingface.co/moondream/moondream3-preview +- Set `HF_TOKEN` environment variable: `export HF_TOKEN=your_token_here` +- Or run: `huggingface-cli login` + +```python +from vision_agents.plugins import moondream +from vision_agents.core import Agent + +# Create a local VLM (no API key needed) +llm = moondream.LocalVLM( + mode="vqa", # or "caption" for image captioning + force_cpu=False, # Auto-detects CUDA, MPS, or CPU +) + +# Use in an agent +agent = Agent( + llm=llm, + tts=your_tts, + stt=your_stt, + # ... other components +) +``` + +### VLM Modes + +The VLM supports two modes: + +- **`"vqa"`** (Visual Question Answering): Answers questions about video frames. Questions come from STT transcripts. +- **`"caption"`** (Image Captioning): Generates descriptions of video frames automatically. + +```python +# VQA mode - answers questions about frames +llm = moondream.CloudVLM( + api_key="your-api-key", + mode="vqa" +) + +# Caption mode - generates automatic descriptions +llm = moondream.CloudVLM( + api_key="your-api-key", + mode="caption" +) +``` + ## Configuration ### CloudDetectionProcessor Parameters @@ -107,12 +229,30 @@ processor = moondream.CloudDetectionProcessor( - `fps`: int - Frame processing rate (default: 30) - `interval`: int - Processing interval in seconds (default: 0) - `max_workers`: int - Thread pool size for CPU-intensive operations (default: 10) -- `device`: str - Device to run inference on ('cuda', 'mps', or 'cpu'). Auto-detects CUDA, then MPS (Apple Silicon), then defaults to CPU. Default: `None` (auto-detect) +- `force_cpu`: bool - If True, force CPU usage even if CUDA/MPS is available. Auto-detects CUDA, then MPS (Apple Silicon), then defaults to CPU. We recommend running on CUDA for best performance. (default: False) - `model_name`: str - Hugging Face model identifier (default: "moondream/moondream3-preview") - `options`: AgentOptions - Model directory configuration. If not provided, uses default which defaults to tempfile.gettempdir() **Performance:** Performance will vary depending on your hardware configuration. CUDA is recommended for best performance on NVIDIA GPUs. The model will be downloaded from HuggingFace on first use. +### CloudVLM Parameters + +- `api_key`: str - API key for Moondream Cloud API. If not provided, will attempt to read from `MOONDREAM_API_KEY` environment variable. +- `mode`: Literal["vqa", "caption"] - "vqa" for visual question answering or "caption" for image captioning (default: "vqa") +- `max_workers`: int - Thread pool size for CPU-intensive operations (default: 10) + +**Rate Limits:** By default, the Moondream Cloud API has rate limits. Contact the Moondream team to request higher limits. + +### LocalVLM Parameters + +- `mode`: Literal["vqa", "caption"] - "vqa" for visual question answering or "caption" for image captioning (default: "vqa") +- `max_workers`: int - Thread pool size for async operations (default: 10) +- `force_cpu`: bool - If True, force CPU usage even if CUDA/MPS is available. Auto-detects CUDA, then MPS (Apple Silicon), then defaults to CPU. Note: MPS is automatically converted to CPU due to model compatibility. We recommend running on CUDA for best performance. (default: False) +- `model_name`: str - Hugging Face model identifier (default: "moondream/moondream3-preview") +- `options`: AgentOptions - Model directory configuration. If not provided, uses default_agent_options() + +**Performance:** Performance will vary depending on your hardware configuration. CUDA is recommended for best performance on NVIDIA GPUs. The model will be downloaded from HuggingFace on first use. + ## Video Publishing The processor publishes annotated video frames with bounding boxes drawn on detected objects: @@ -146,16 +286,18 @@ pytest plugins/moondream/tests/ -k "annotation" -v ### Required - `vision-agents` - Core framework -- `moondream` - Moondream SDK for cloud API (CloudDetectionProcessor only) +- `moondream` - Moondream SDK for cloud API (CloudDetectionProcessor and CloudVLM) - `numpy>=2.0.0` - Array operations - `pillow>=10.0.0` - Image processing - `opencv-python>=4.8.0` - Video annotation - `aiortc` - WebRTC support -### LocalDetectionProcessor Additional Dependencies +### Local Components Additional Dependencies - `torch` - PyTorch for model inference - `transformers` - HuggingFace transformers library for model loading +**Note:** LocalDetectionProcessor and LocalVLM both require these dependencies. We recommend only running the model locally on CUDA devices. + ## Links - [Moondream Documentation](https://docs.moondream.ai/) diff --git a/plugins/moondream/example/README.md b/plugins/moondream/example/README.md new file mode 100644 index 00000000..ecefc4ca --- /dev/null +++ b/plugins/moondream/example/README.md @@ -0,0 +1,2 @@ +## Moondream example +Please see root readme for details. \ No newline at end of file diff --git a/plugins/moondream/example/__init__.py b/plugins/moondream/example/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/plugins/moondream/example/moondream_vlm_example.py b/plugins/moondream/example/moondream_vlm_example.py new file mode 100644 index 00000000..a67935d5 --- /dev/null +++ b/plugins/moondream/example/moondream_vlm_example.py @@ -0,0 +1,53 @@ +import asyncio +import logging +from dotenv import load_dotenv + +from vision_agents.core import User, Agent, cli +from vision_agents.core.agents import AgentLauncher +from vision_agents.plugins import deepgram, getstream, elevenlabs, moondream +from vision_agents.core.events import CallSessionParticipantJoinedEvent +import os + +logger = logging.getLogger(__name__) + +load_dotenv() + +async def create_agent(**kwargs) -> Agent: + llm = moondream.CloudVLM( + api_key=os.getenv("MOONDREAM_API_KEY"), + ) + # create an agent to run with Stream's edge, openAI llm + agent = Agent( + edge=getstream.Edge(), # low latency edge. clients for React, iOS, Android, RN, Flutter etc. + agent_user=User( + name="My happy AI friend", id="agent" + ), + llm=llm, + tts=elevenlabs.TTS(), + stt=deepgram.STT(), + ) + return agent + + +async def join_call(agent: Agent, call_type: str, call_id: str, **kwargs) -> None: + # ensure the agent user is created + await agent.create_user() + # Create a call + call = await agent.create_call(call_type, call_id) + + @agent.events.subscribe + async def on_participant_joined(event: CallSessionParticipantJoinedEvent): + if event.participant.user.id != "agent": + await asyncio.sleep(2) + await agent.simple_response("Describe what you currently see") + + # Have the agent join the call/room + with await agent.join(call): + # Open the demo UI + await agent.edge.open_demo(call) + # run till the call ends + await agent.finish() + + +if __name__ == "__main__": + cli(AgentLauncher(create_agent=create_agent, join_call=join_call)) diff --git a/plugins/moondream/example/pyproject.toml b/plugins/moondream/example/pyproject.toml new file mode 100644 index 00000000..9771bb5a --- /dev/null +++ b/plugins/moondream/example/pyproject.toml @@ -0,0 +1,22 @@ +[project] +name = "moondream-example" +version = "0.1.0" +description = "Example using Moondream Detect and VLM with Vision Agents" +requires-python = ">=3.10" +dependencies = [ + "vision-agents", + "vision-agents-plugins-moondream", + "vision-agents-plugins-getstream", + "vision-agents-plugins-deepgram", + "vision-agents-plugins-elevenlabs", + "vision-agents-plugins-vogent", + "python-dotenv", +] + +[tool.uv.sources] +vision-agents = { workspace = true } +vision-agents-plugins-moondream = { workspace = true } +vision-agents-plugins-getstream = { workspace = true } +vision-agents-plugins-deepgram = { workspace = true } +vision-agents-plugins-elevenlabs = { workspace = true } +vision-agents-plugins-vogent = { workspace = true } \ No newline at end of file diff --git a/plugins/moondream/tests/test_moondream_local.py b/plugins/moondream/tests/test_moondream_local.py index cbe42198..b0cc1360 100644 --- a/plugins/moondream/tests/test_moondream_local.py +++ b/plugins/moondream/tests/test_moondream_local.py @@ -41,7 +41,7 @@ def golf_image(self, assets_dir) -> Iterator[Image.Image]: @pytest.fixture def moondream_processor(self) -> Iterator[LocalDetectionProcessor]: """Create and manage MoondreamLocalProcessor lifecycle.""" - processor = LocalDetectionProcessor(device="cpu") + processor = LocalDetectionProcessor(force_cpu=True) try: yield processor finally: @@ -261,7 +261,7 @@ def is_available(): processor.close() # Also test explicit MPS parameter - processor2 = LocalDetectionProcessor(device="mps") + processor2 = LocalDetectionProcessor(force_cpu=True) try: # Verify explicit MPS is also converted to CPU assert processor2.device == "cpu" @@ -270,7 +270,7 @@ def is_available(): def test_device_explicit_cpu(self): """Test explicit CPU device selection.""" - processor = LocalDetectionProcessor(device="cpu") + processor = LocalDetectionProcessor(force_cpu=True) try: assert processor.device == "cpu" finally: @@ -282,7 +282,7 @@ def test_device_explicit_cpu(self): ) def test_device_explicit_cuda(self): """Test explicit CUDA device selection (only if CUDA available).""" - processor = LocalDetectionProcessor(device="cuda") + processor = LocalDetectionProcessor() try: assert processor.device == "cuda" finally: diff --git a/plugins/moondream/tests/test_moondream_local_vlm.py b/plugins/moondream/tests/test_moondream_local_vlm.py new file mode 100644 index 00000000..b5b89629 --- /dev/null +++ b/plugins/moondream/tests/test_moondream_local_vlm.py @@ -0,0 +1,102 @@ +""" +Tests for the Moondream LocalVLM plugin. + +Integration tests require HF_TOKEN environment variable (for gated model access): + + export HF_TOKEN="your-token-here" + uv run pytest plugins/moondream/tests/test_moondream_local_vlm.py -m integration -v +""" +import os +from pathlib import Path +from typing import Iterator + +import pytest +import av +from PIL import Image + +from vision_agents.plugins.moondream import LocalVLM + + +@pytest.fixture(scope="session") +def golf_image(assets_dir) -> Iterator[Image.Image]: + """Load the local golf swing test image from tests/test_assets.""" + asset_path = Path(assets_dir) / "golf_swing.png" + with Image.open(asset_path) as img: + yield img.convert("RGB") + + +@pytest.fixture +def golf_frame(golf_image: Image.Image) -> av.VideoFrame: + """Create an av.VideoFrame from the golf image.""" + return av.VideoFrame.from_image(golf_image) + + +@pytest.fixture +async def local_vlm_vqa() -> LocalVLM: + """Create LocalVLM in VQA mode.""" + hf_token = os.getenv("HF_TOKEN") + if not hf_token: + pytest.skip("HF_TOKEN not set") + + vlm = LocalVLM(mode="vqa") + try: + await vlm.warmup() + yield vlm + finally: + vlm.close() + + +@pytest.fixture +async def local_vlm_caption() -> LocalVLM: + """Create LocalVLM in caption mode.""" + hf_token = os.getenv("HF_TOKEN") + if not hf_token: + pytest.skip("HF_TOKEN not set") + + vlm = LocalVLM(mode="caption") + try: + await vlm.warmup() + yield vlm + finally: + vlm.close() + + +@pytest.mark.integration +@pytest.mark.skipif(not os.getenv("HF_TOKEN"), reason="HF_TOKEN not set") +async def test_local_vqa_mode(golf_frame: av.VideoFrame, local_vlm_vqa: LocalVLM): + """Test LocalVLM VQA mode with a question about the image.""" + + await local_vlm_vqa.warmup() + assert local_vlm_vqa.model is not None, "Model must be loaded before test" + + local_vlm_vqa._latest_frame = golf_frame + + question = "What sport is being played in this image?" + response = await local_vlm_vqa.simple_response(question) + + assert response is not None + assert response.text is not None + assert len(response.text) > 0 + assert response.exception is None + + assert "golf" in response.text.lower() + + +@pytest.mark.integration +@pytest.mark.skipif(not os.getenv("HF_TOKEN"), reason="HF_TOKEN not set") +async def test_local_caption_mode(golf_frame: av.VideoFrame, local_vlm_caption: LocalVLM): + """Test LocalVLM caption mode to generate a description of the image.""" + + await local_vlm_caption.warmup() + assert local_vlm_caption.model is not None, "Model must be loaded before test" + + local_vlm_caption._latest_frame = golf_frame + + response = await local_vlm_caption.simple_response("") + + assert response is not None + assert response.text is not None + assert len(response.text) > 0 + assert response.exception is None + + assert len(response.text.strip()) > 0 \ No newline at end of file diff --git a/plugins/moondream/tests/test_moondream_vlm.py b/plugins/moondream/tests/test_moondream_vlm.py new file mode 100644 index 00000000..bde7a040 --- /dev/null +++ b/plugins/moondream/tests/test_moondream_vlm.py @@ -0,0 +1,105 @@ +""" +Tests for the Moondream CloudVLM plugin. + +Integration tests require MOONDREAM_API_KEY environment variable: + + export MOONDREAM_API_KEY="your-key-here" + uv run pytest plugins/moondream/tests/test_moondream_vlm.py -m integration -v + +To run only unit tests (no API key needed): + + uv run pytest plugins/moondream/tests/test_moondream_vlm.py -m "not integration" -v +""" +import os +from pathlib import Path +from typing import Iterator + +import pytest +import av +from PIL import Image + +from vision_agents.plugins.moondream import CloudVLM + + +@pytest.fixture(scope="session") +def golf_image(assets_dir) -> Iterator[Image.Image]: + """Load the local golf swing test image from tests/test_assets.""" + asset_path = Path(assets_dir) / "golf_swing.png" + with Image.open(asset_path) as img: + yield img.convert("RGB") + + +@pytest.fixture +def golf_frame(golf_image: Image.Image) -> av.VideoFrame: + """Create an av.VideoFrame from the golf image.""" + return av.VideoFrame.from_image(golf_image) + + +@pytest.fixture +async def vlm_vqa() -> CloudVLM: + """Create CloudVLM in VQA mode.""" + api_key = os.getenv("MOONDREAM_API_KEY") + if not api_key: + pytest.skip("MOONDREAM_API_KEY not set") + + vlm = CloudVLM(api_key=api_key, mode="vqa") + try: + yield vlm + finally: + vlm.close() + + +@pytest.fixture +async def vlm_caption() -> CloudVLM: + """Create CloudVLM in caption mode.""" + api_key = os.getenv("MOONDREAM_API_KEY") + if not api_key: + pytest.skip("MOONDREAM_API_KEY not set") + + vlm = CloudVLM(api_key=api_key, mode="caption") + try: + yield vlm + finally: + vlm.close() + + +@pytest.mark.integration +@pytest.mark.skipif(not os.getenv("MOONDREAM_API_KEY"), reason="MOONDREAM_API_KEY not set") +async def test_vqa_mode(golf_frame: av.VideoFrame, vlm_vqa: CloudVLM): + """Test VQA mode with a question about the image.""" + # Set the latest frame so _process_frame can access it + vlm_vqa._latest_frame = golf_frame + + # Ask a question about the image + question = "What sport is being played in this image?" + response = await vlm_vqa.simple_response(question) + + # Verify we got a response + assert response is not None + assert response.text is not None + assert len(response.text) > 0 + assert response.exception is None + + # Verify the response mentions golf (should be in the image) + assert "golf" in response.text.lower() + + +@pytest.mark.integration +@pytest.mark.skipif(not os.getenv("MOONDREAM_API_KEY"), reason="MOONDREAM_API_KEY not set") +async def test_caption_mode(golf_frame: av.VideoFrame, vlm_caption: CloudVLM): + """Test caption mode to generate a description of the image.""" + # Set the latest frame so _process_frame can access it + vlm_caption._latest_frame = golf_frame + + # Generate caption (text is not needed for caption mode) + response = await vlm_caption.simple_response("") + + # Verify we got a response + assert response is not None + assert response.text is not None + assert len(response.text) > 0 + assert response.exception is None + + # Verify the caption is descriptive (not empty) + assert len(response.text.strip()) > 0 + diff --git a/plugins/moondream/vision_agents/plugins/moondream/__init__.py b/plugins/moondream/vision_agents/plugins/moondream/__init__.py index 8e77720f..bd685820 100644 --- a/plugins/moondream/vision_agents/plugins/moondream/__init__.py +++ b/plugins/moondream/vision_agents/plugins/moondream/__init__.py @@ -2,24 +2,22 @@ Moondream plugin for vision-agents. This plugin provides Moondream 3 vision capabilities including object detection, -visual question answering, counting, and captioning. +visual question answering, and captioning. """ -from .moondream_cloud_processor import ( - CloudDetectionProcessor, -) -from .moondream_local_processor import ( - LocalDetectionProcessor, -) -from .moondream_video_track import ( - MoondreamVideoTrack, -) +from vision_agents.plugins.moondream.detection.moondream_cloud_processor import CloudDetectionProcessor +from vision_agents.plugins.moondream.detection.moondream_local_processor import LocalDetectionProcessor +from vision_agents.plugins.moondream.detection.moondream_video_track import MoondreamVideoTrack +from vision_agents.plugins.moondream.vlm.moondream_cloud_vlm import CloudVLM +from vision_agents.plugins.moondream.vlm.moondream_local_vlm import LocalVLM + __path__ = __import__("pkgutil").extend_path(__path__, __name__) __all__ = [ "CloudDetectionProcessor", + "CloudVLM", + "LocalVLM", "LocalDetectionProcessor", "MoondreamVideoTrack", ] - diff --git a/plugins/moondream/vision_agents/plugins/moondream/moondream_cloud_processor.py b/plugins/moondream/vision_agents/plugins/moondream/detection/moondream_cloud_processor.py similarity index 93% rename from plugins/moondream/vision_agents/plugins/moondream/moondream_cloud_processor.py rename to plugins/moondream/vision_agents/plugins/moondream/detection/moondream_cloud_processor.py index ebfcb2d5..b9769168 100644 --- a/plugins/moondream/vision_agents/plugins/moondream/moondream_cloud_processor.py +++ b/plugins/moondream/vision_agents/plugins/moondream/detection/moondream_cloud_processor.py @@ -16,7 +16,7 @@ AudioVideoProcessor, ) from vision_agents.plugins.moondream.moondream_utils import annotate_detections, parse_detection_bbox -from vision_agents.plugins.moondream.moondream_video_track import MoondreamVideoTrack +from vision_agents.plugins.moondream.detection.moondream_video_track import MoondreamVideoTrack from vision_agents.core.utils.video_forwarder import VideoForwarder import moondream as md @@ -28,18 +28,22 @@ class CloudDetectionProcessor(AudioVideoProcessor, VideoProcessorMixin, VideoPublisherMixin): - """Performs real-time object detection on video streams using Moondream Cloud API. By default the Moondream Cloud API has a 2rps second limit however this can be changed by contacting the Moondream team. If you are deploying to your own infrastructure, consider using the LocalProcessor instead. + """Performs real-time object detection on video streams using Moondream Cloud API. + + By default the Moondream Cloud API has a 2 RPS (requests per second) rate limit, + which can be increased by contacting the Moondream team. If you are deploying + to your own infrastructure, consider using LocalDetectionProcessor instead. Args: api_key: API key for Moondream Cloud API. If not provided, will attempt to read from MOONDREAM_API_KEY environment variable. - conf_threshold: Confidence threshold for detections + conf_threshold: Confidence threshold for detections (default: 0.3) detect_objects: Object(s) to detect. Moondream uses zero-shot detection, so any object string works. Examples: "person", "car", "basketball", ["person", "car", "dog"]. Default: "person" - fps: Frame processing rate - interval: Processing interval in seconds - max_workers: Number of worker threads + fps: Frame processing rate (default: 30) + interval: Processing interval in seconds (default: 0) + max_workers: Number of worker threads for CPU-intensive operations (default: 10) """ def __init__( diff --git a/plugins/moondream/vision_agents/plugins/moondream/moondream_local_processor.py b/plugins/moondream/vision_agents/plugins/moondream/detection/moondream_local_processor.py similarity index 73% rename from plugins/moondream/vision_agents/plugins/moondream/moondream_local_processor.py rename to plugins/moondream/vision_agents/plugins/moondream/detection/moondream_local_processor.py index 1b32db8f..46752d1a 100644 --- a/plugins/moondream/vision_agents/plugins/moondream/moondream_local_processor.py +++ b/plugins/moondream/vision_agents/plugins/moondream/detection/moondream_local_processor.py @@ -19,8 +19,8 @@ AudioVideoProcessor, ) from vision_agents.core.utils.video_forwarder import VideoForwarder -from vision_agents.plugins.moondream.moondream_utils import parse_detection_bbox, annotate_detections -from vision_agents.plugins.moondream.moondream_video_track import MoondreamVideoTrack +from vision_agents.plugins.moondream.moondream_utils import parse_detection_bbox, annotate_detections, handle_device +from vision_agents.plugins.moondream.detection.moondream_video_track import MoondreamVideoTrack logger = logging.getLogger(__name__) @@ -38,33 +38,33 @@ class LocalDetectionProcessor(AudioVideoProcessor, VideoProcessorMixin, VideoPub - Run: huggingface-cli login Args: - conf_threshold: Confidence threshold for detections + conf_threshold: Confidence threshold for detections (default: 0.3) detect_objects: Object(s) to detect. Moondream uses zero-shot detection, so any object string works. Examples: "person", "car", "basketball", ["person", "car", "dog"]. Default: "person" - fps: Frame processing rate - interval: Processing interval in seconds - max_workers: Number of worker threads - device: Device to run inference on ('cuda', 'mps', or 'cpu'). - Auto-detects CUDA, then MPS (Apple Silicon), then defaults to CPU. + fps: Frame processing rate (default: 30) + interval: Processing interval in seconds (default: 0) + max_workers: Number of worker threads for CPU-intensive operations (default: 10) + force_cpu: If True, force CPU usage even if CUDA/MPS is available (default: False). + Auto-detects CUDA, then MPS (Apple Silicon), then defaults to CPU. We recommend running on CUDA for best performance. model_name: Hugging Face model identifier (default: "moondream/moondream3-preview") options: AgentOptions for model directory configuration. If not provided, uses default_agent_options() which defaults to tempfile.gettempdir() """ - + def __init__( - self, - conf_threshold: float = 0.3, - detect_objects: Union[str, List[str]] = "person", - fps: int = 30, - interval: int = 0, - max_workers: int = 10, - device: Optional[str] = None, - model_name: str = "moondream/moondream3-preview", - options: Optional[AgentOptions] = None, + self, + conf_threshold: float = 0.3, + detect_objects: Union[str, List[str]] = "person", + fps: int = 30, + interval: int = 0, + max_workers: int = 10, + force_cpu: bool = False, + model_name: str = "moondream/moondream3-preview", + options: Optional[AgentOptions] = None, ): super().__init__(interval=interval, receive_audio=False, receive_video=True) - + if options is None: self.options = default_agent_options() else: @@ -74,77 +74,63 @@ def __init__( self.fps = fps self.max_workers = max_workers self._shutdown = False - - # Auto-detect device if not specified - if device is None: - if torch.cuda.is_available(): - self.device = "cuda" - elif hasattr(torch.backends, "mps") and torch.backends.mps.is_available(): - # Moondream model has CUDA dependencies that don't work on MPS - # Use CPU instead to avoid runtime errors - self.device = "cpu" - logger.info("⚠️ MPS detected but using CPU (moondream model has CUDA dependencies incompatible with MPS)") - else: - self.device = "cpu" + + if force_cpu: + self._device, self._dtype = torch.device("cpu"), torch.float32 else: - # Override MPS to CPU if explicitly set (moondream doesn't work with MPS) - if device == "mps": - self.device = "cpu" - logger.warning("⚠️ MPS device requested but using CPU instead (moondream model has CUDA dependencies incompatible with MPS)") - else: - self.device = device - - # Initialize state tracking attributes + self._device, self._dtype = handle_device() + self._last_results: Dict[str, Any] = {} self._last_frame_time: Optional[float] = None self._last_frame_pil: Optional[Image.Image] = None - + # Font configuration constants for drawing efficiency self._font = cv2.FONT_HERSHEY_SIMPLEX self._font_scale = 0.5 self._font_thickness = 2 self._bbox_color = (0, 255, 0) self._text_color = (0, 0, 0) - + # Normalize detect_objects to list self.detect_objects = [detect_objects] if isinstance(detect_objects, str) else list(detect_objects) - + # Thread pool for CPU-intensive inference self.executor = ThreadPoolExecutor( max_workers=max_workers, thread_name_prefix="moondream_local_processor" ) - + # Video track for publishing (if used as video publisher) self._video_track: MoondreamVideoTrack = MoondreamVideoTrack() self._video_forwarder: Optional[VideoForwarder] = None - + # Model will be loaded in start() method self.model = None - + logger.info("🌙 Moondream Local Processor initialized") logger.info(f"🎯 Detection configured for objects: {self.detect_objects}") logger.info(f"🔧 Device: {self.device}") - + + @property + def device(self) -> str: + """Return the device type as a string (e.g., 'cuda', 'cpu').""" + return str(self._device) + async def warmup(self): - """Initialize and load the model.""" - # Ensure model directory exists - os.makedirs(self.options.model_dir, exist_ok=True) - # Prepare model asynchronously await self._prepare_moondream() - + async def _prepare_moondream(self): """Load the Moondream model from Hugging Face.""" logger.info(f"Loading Moondream model: {self.model_name}") - logger.info(f"Device: {self.device}") - + logger.info(f"Device: {self._device}") + # Load model in thread pool to avoid blocking event loop # Transformers handles downloading and caching automatically via Hugging Face Hub self.model = await asyncio.to_thread( # type: ignore[func-returns-value] lambda: self._load_model_sync() ) logger.info("✅ Moondream model loaded") - + def _load_model_sync(self): """Synchronous model loading function run in thread pool.""" try: @@ -156,48 +142,34 @@ def _load_model_sync(self): "This model requires authentication. " "Set HF_TOKEN or run 'huggingface-cli login'" ) - - load_kwargs = { - "trust_remote_code": True, - "dtype": torch.bfloat16 if self.device == "cuda" else torch.float32, - "cache_dir": self.options.model_dir, # Use agent's model directory for caching - } - + + load_kwargs: Dict[str, Any] = {} # Add token if available (transformers will use env var automatically, but explicit is clearer) if hf_token: load_kwargs["token"] = hf_token else: # Use True to let transformers try to read from environment or cached login - load_kwargs["token"] = True - - # Handle device placement based on device type - if self.device == "cuda": - # CUDA: Use device_map for efficient multi-GPU support - load_kwargs["device_map"] = {"": "cuda"} - else: - # CPU: load directly on CPU (MPS is automatically converted to CPU in __init__) - load_kwargs["device_map"] = "cpu" - + load_kwargs["token"] = True # type: ignore[assignment] + model = AutoModelForCausalLM.from_pretrained( self.model_name, + device_map={"": self._device}, + dtype=self._dtype, + trust_remote_code=True, + cache_dir=self.options.model_dir, **load_kwargs, - ) - - # Ensure model is in eval mode for inference + ).to(self._device) # type: ignore[arg-type] + model.eval() - - if self.device == "cuda": - logger.info("✅ Model loaded on CUDA device") - else: - logger.info("✅ Model loaded on CPU device") - - # Compile model for fast inference (as per HF documentation) + logger.info(f"✅ Model loaded on {self._device} device") + + # Compile model for fast inference try: model.compile() except Exception as compile_error: # If compilation fails, log and continue without compilation logger.warning(f"⚠️ Model compilation failed, continuing without compilation: {compile_error}") - + return model except Exception as e: error_msg = str(e) @@ -214,12 +186,12 @@ def _load_model_sync(self): else: logger.exception(f"❌ Failed to load Moondream model: {e}") raise - + async def process_video( - self, - incoming_track: aiortc.mediastreams.MediaStreamTrack, - participant: Any, - shared_forwarder=None, + self, + incoming_track: aiortc.mediastreams.MediaStreamTrack, + participant: Any, + shared_forwarder=None, ): """ Process incoming video track. @@ -230,13 +202,12 @@ async def process_video( 3. Frames are processed, annotated, and published via the video track """ logger.info("✅ Moondream process_video starting") - + # Ensure model is loaded if self.model is None: await self._prepare_moondream() - + if shared_forwarder is not None: - # Use the shared forwarder self._video_forwarder = shared_forwarder logger.info( f"🎥 Moondream subscribing to shared VideoForwarder at {self.fps} FPS" @@ -247,57 +218,55 @@ async def process_video( consumer_name="moondream_local" ) else: - # Create our own VideoForwarder self._video_forwarder = VideoForwarder( incoming_track, # type: ignore[arg-type] max_buffer=30, # 1 second at 30fps fps=self.fps, name="moondream_local_forwarder", ) - - # Start the forwarder + await self._video_forwarder.start() await self._video_forwarder.start_event_consumer( self._process_and_add_frame ) - + logger.info("✅ Moondream video processing pipeline started") - + def publish_video_track(self): logger.info("📹 publish_video_track called") return self._video_track - + async def _run_inference(self, frame_array: np.ndarray) -> Dict[str, Any]: try: # Convert frame to PIL Image image = Image.fromarray(frame_array) - + # Call model for each object type # The model's detect() is synchronous, so wrap in executor loop = asyncio.get_event_loop() all_detections = await loop.run_in_executor( self.executor, self._run_detection_sync, image ) - + return {"detections": all_detections} except Exception as e: logger.exception(f"❌ Local inference failed: {e}") return {} - + def _run_detection_sync(self, image: Image.Image) -> List[Dict]: if self._shutdown or self.model is None: return [] - + all_detections = [] - + # Call model for each object type for object_type in self.detect_objects: try: logger.debug(f"🔍 Detecting '{object_type}' via Moondream model") - + # Call model's detect method result = self.model.detect(image, object_type) - + # Parse model response format # Model returns: {"objects": [{"x_min": ..., "y_min": ..., "x_max": ..., "y_max": ...}, ...]} if "objects" in result: @@ -305,28 +274,23 @@ def _run_detection_sync(self, image: Image.Image) -> List[Dict]: detection = parse_detection_bbox(obj, object_type, self.conf_threshold) if detection: all_detections.append(detection) - + except Exception as e: logger.warning(f"⚠️ Failed to detect '{object_type}': {e}") continue - + logger.debug(f"🔍 Model returned {len(all_detections)} objects across {len(self.detect_objects)} types") return all_detections async def _process_and_add_frame(self, frame: av.VideoFrame): try: - # Convert to numpy array frame_array = frame.to_ndarray(format="rgb24") - - # Run inference results = await self._run_inference(frame_array) - # Store results for state() method and LLM access self._last_results = results self._last_frame_time = asyncio.get_event_loop().time() self._last_frame_pil = Image.fromarray(frame_array) - # Annotate frame with detections if results.get("detections"): frame_array = annotate_detections( frame_array, @@ -338,13 +302,11 @@ async def _process_and_add_frame(self, frame: av.VideoFrame): text_color=self._text_color, ) - # Convert back to av.VideoFrame and publish processed_frame = av.VideoFrame.from_ndarray(frame_array, format="rgb24") await self._video_track.add_frame(processed_frame) except Exception as e: logger.exception(f"❌ Frame processing failed: {e}") - # Pass through original frame on error await self._video_track.add_frame(frame) def close(self): @@ -356,5 +318,3 @@ def close(self): del self.model self.model = None logger.info("🛑 Moondream Local Processor closed") - - diff --git a/plugins/moondream/vision_agents/plugins/moondream/moondream_video_track.py b/plugins/moondream/vision_agents/plugins/moondream/detection/moondream_video_track.py similarity index 92% rename from plugins/moondream/vision_agents/plugins/moondream/moondream_video_track.py rename to plugins/moondream/vision_agents/plugins/moondream/detection/moondream_video_track.py index c764614c..80c117a5 100644 --- a/plugins/moondream/vision_agents/plugins/moondream/moondream_video_track.py +++ b/plugins/moondream/vision_agents/plugins/moondream/detection/moondream_video_track.py @@ -14,11 +14,14 @@ class MoondreamVideoTrack(VideoStreamTrack): - """ - Video track for publishing Moondream-processed frames. - + """Video track for publishing Moondream-processed frames. + Uses a LatestNQueue to buffer processed frames and publishes them at the configured frame rate. + + Args: + width: Frame width in pixels (default: 640) + height: Frame height in pixels (default: 480) """ def __init__(self, width: int = DEFAULT_WIDTH, height: int = DEFAULT_HEIGHT): diff --git a/plugins/moondream/vision_agents/plugins/moondream/moondream_utils.py b/plugins/moondream/vision_agents/plugins/moondream/moondream_utils.py index 1ffad182..0ac96d92 100644 --- a/plugins/moondream/vision_agents/plugins/moondream/moondream_utils.py +++ b/plugins/moondream/vision_agents/plugins/moondream/moondream_utils.py @@ -1,8 +1,15 @@ from typing import List, Optional, Dict, Any import cv2 import numpy as np +import torch +def handle_device(): + if torch.cuda.is_available(): + return torch.device("cuda"), torch.float16 + else: + return torch.device("cpu"), torch.float32 + def parse_detection_bbox(obj: Dict, object_type: str, conf_threshold: float) -> Optional[Dict]: confidence = obj.get("confidence", 1.0) diff --git a/plugins/moondream/vision_agents/plugins/moondream/vlm/moondream_cloud_vlm.py b/plugins/moondream/vision_agents/plugins/moondream/vlm/moondream_cloud_vlm.py new file mode 100644 index 00000000..918f9ae6 --- /dev/null +++ b/plugins/moondream/vision_agents/plugins/moondream/vlm/moondream_cloud_vlm.py @@ -0,0 +1,249 @@ +import asyncio +import logging +import os +from typing import Optional, List, Literal +from concurrent.futures import ThreadPoolExecutor + +import aiortc +import av +from PIL import Image + +from vision_agents.core import llm +from vision_agents.core.stt.events import STTTranscriptEvent +from vision_agents.core.llm.events import ( + LLMResponseChunkEvent, + LLMResponseCompletedEvent, +) +from vision_agents.core.llm.llm import LLMResponseEvent +from vision_agents.core.processors import Processor +from vision_agents.core.utils.video_forwarder import VideoForwarder +from vision_agents.core.utils.queue import LatestNQueue +from getstream.video.rtc.pb.stream.video.sfu.models.models_pb2 import Participant +import moondream as md + +logger = logging.getLogger(__name__) + + +class CloudVLM(llm.VideoLLM): + """Cloud-hosted VLM using Moondream model for captioning or visual queries. + + This VLM sends frames to the hosted Moondream model to perform either captioning + or visual question answering. The instructions are taken from the STT service and + sent to the model along with the frame. Once the model has an output, the results + are then vocalized with the supplied TTS service. + + Args: + api_key: API key for Moondream Cloud API. If not provided, will attempt to read + from MOONDREAM_API_KEY environment variable. + mode: "vqa" for visual question answering or "caption" for image captioning (default: "vqa") + max_workers: Number of worker threads for async operations (default: 10) + """ + + def __init__( + self, + api_key: Optional[str] = None, + mode: Literal["vqa", "caption"] = "vqa", # Default to VQA + max_workers: int = 10, + ): + super().__init__() + + self.api_key = api_key or os.getenv("MOONDREAM_API_KEY") + self.max_workers = max_workers + self.mode = mode + + # Frame buffer using LatestNQueue (maintains last 10 frames) + self._frame_buffer: LatestNQueue[av.VideoFrame] = LatestNQueue(maxlen=10) + # Keep latest frame reference for fast synchronous access + self._latest_frame: Optional[av.VideoFrame] = None + self._video_forwarder: Optional[VideoForwarder] = None + self._stt_subscription_setup = False + self._processing_lock = asyncio.Lock() + + self.executor = ThreadPoolExecutor(max_workers=max_workers) + + self._load_model() + + async def watch_video_track( + self, + track: aiortc.mediastreams.MediaStreamTrack, + shared_forwarder: Optional[VideoForwarder] = None + ) -> None: + """Setup video forwarding and STT subscription.""" + if self._video_forwarder is not None and shared_forwarder is None: + logger.warning("Video forwarder already running, stopping previous one") + await self._stop_watching_video_track() + + if shared_forwarder is not None: + # Use shared forwarder + self._video_forwarder = shared_forwarder + logger.info("🎥 Moondream subscribing to shared VideoForwarder") + await self._video_forwarder.start_event_consumer( + self._on_frame_received, + fps=1.0, # Low FPS for VLM + consumer_name="moondream_vlm" + ) + else: + # Create our own VideoForwarder + self._video_forwarder = VideoForwarder( + track, # type: ignore[arg-type] + max_buffer=10, + fps=1.0, # Low FPS for VLM + name="moondream_vlm_forwarder", + ) + await self._video_forwarder.start() + await self._video_forwarder.start_event_consumer( + self._on_frame_received + ) + + # Setup STT subscription (only once) + if not self._stt_subscription_setup and self.agent: + self._setup_stt_subscription() + self._stt_subscription_setup = True + + async def _on_frame_received(self, frame: av.VideoFrame): + """Callback to receive frames and add to buffer.""" + try: + self._frame_buffer.put_latest_nowait(frame) + self._latest_frame = frame + except Exception as e: + logger.error(f"Error adding frame to buffer: {e}") + + def _setup_stt_subscription(self): + if not self.agent: + logger.warning("Cannot setup STT subscription: agent not set") + return + + @self.agent.events.subscribe + async def on_stt_transcript(event: STTTranscriptEvent): + await self._on_stt_transcript(event) + + def _consume_stream(self, generator): + chunks = [] + for chunk in generator: + logger.debug(f"Moondream stream chunk: {type(chunk)} - {chunk}") + if isinstance(chunk, str): + chunks.append(chunk) + else: + # Log unexpected types but continue processing + logger.warning(f"Unexpected chunk type: {type(chunk)}, value: {chunk}") + if chunk: + chunks.append(str(chunk)) + result = "".join(chunks) + logger.debug(f"Moondream stream result: {result}") + return result + + async def _process_frame(self, text: Optional[str] = None) -> Optional[LLMResponseEvent]: + if self._latest_frame is None: + logger.warning("No frames available, skipping Moondream processing") + return None + + if self._processing_lock.locked(): + logger.debug("Moondream processing already in progress, skipping") + return None + + latest_frame = self._latest_frame + + async with self._processing_lock: + try: + # Convert frame to PIL Image + frame_array = latest_frame.to_ndarray(format="rgb24") + image = Image.fromarray(frame_array) + + # Process based on mode + if self.mode == "vqa": + if not text: + logger.warning("VQA mode requires text/question") + return None + # Moondream SDK returns {"answer": }, extract the generator + result = self.model.query(image, text, stream=True) + stream = result["answer"] + answer = await asyncio.to_thread(self._consume_stream, stream) + + if not answer: + logger.warning("Moondream query returned empty answer") + return None + + self.events.send(LLMResponseChunkEvent(delta=answer)) + self.events.send(LLMResponseCompletedEvent(text=answer)) + logger.info(f"Moondream VQA response: {answer}") + return LLMResponseEvent(original=answer, text=answer) + + elif self.mode == "caption": + # Moondream SDK returns {"caption": }, extract the generator + result = self.model.caption(image, length="normal", stream=True) + stream = result["caption"] + caption = await asyncio.to_thread(self._consume_stream, stream) + + if not caption: + logger.warning("Moondream caption returned empty result") + return None + + self.events.send(LLMResponseChunkEvent(delta=caption)) + self.events.send(LLMResponseCompletedEvent(text=caption)) + logger.info(f"Moondream caption: {caption}") + return LLMResponseEvent(original=caption, text=caption) + else: + logger.error(f"Unknown mode: {self.mode}") + return None + + except Exception as e: + logger.exception(f"Error processing frame: {e}") + return LLMResponseEvent(original=None, text="", exception=e) + + async def _on_stt_transcript(self, event: STTTranscriptEvent): + """Handle STT transcript event - process with Moondream.""" + if not event.text: + return + + await self._process_frame(text=event.text) + + async def simple_response( + self, + text: str, + processors: Optional[List[Processor]] = None, + participant: Optional[Participant] = None, + ) -> LLMResponseEvent: + """ + simple_response is a standardized way to create a response. + + Args: + text: The text/question to respond to + processors: list of processors (which contain state) about the video/voice AI + participant: optionally the participant object + + Examples: + await llm.simple_response("What do you see in this image?") + """ + result = await self._process_frame(text=text if self.mode == "vqa" else None) + if result is None: + return LLMResponseEvent(original=None, text="", + exception=ValueError("No frame available or processing failed")) + return result + + async def _stop_watching_video_track(self) -> None: + """Stop video forwarding.""" + if self._video_forwarder is not None: + await self._video_forwarder.stop() + self._video_forwarder = None + logger.info("Stopped video forwarding") + + def _load_model(self): + try: + # Validate API key + if not self.api_key: + raise ValueError("api_key is required for Moondream Cloud API") + + # Initialize cloud model + self.model = md.vl(api_key=self.api_key) + logger.info("✅ Moondream SDK initialized") + + except Exception as e: + logger.exception(f"❌ Failed to load Moondream model: {e}") + raise + + def close(self): + """Clean up resources.""" + self._shutdown = True + if hasattr(self, "executor"): + self.executor.shutdown(wait=False) + logger.info("🛑 Moondream Processor closed") diff --git a/plugins/moondream/vision_agents/plugins/moondream/vlm/moondream_local_vlm.py b/plugins/moondream/vision_agents/plugins/moondream/vlm/moondream_local_vlm.py new file mode 100644 index 00000000..c7d3a151 --- /dev/null +++ b/plugins/moondream/vision_agents/plugins/moondream/vlm/moondream_local_vlm.py @@ -0,0 +1,352 @@ +import asyncio +import logging +import os +from typing import Optional, List, Literal +from concurrent.futures import ThreadPoolExecutor + +import aiortc +import av +import torch +from PIL import Image +from transformers import AutoModelForCausalLM + +from vision_agents.core import llm +from vision_agents.core.agents.agents import AgentOptions, default_agent_options +from vision_agents.core.stt.events import STTTranscriptEvent +from vision_agents.core.llm.events import ( + LLMResponseChunkEvent, + LLMResponseCompletedEvent, +) +from vision_agents.core.llm.llm import LLMResponseEvent +from vision_agents.core.processors import Processor +from vision_agents.core.utils.video_forwarder import VideoForwarder +from vision_agents.core.utils.queue import LatestNQueue +from getstream.video.rtc.pb.stream.video.sfu.models.models_pb2 import Participant + +from vision_agents.plugins.moondream.moondream_utils import handle_device + +logger = logging.getLogger(__name__) + + +class LocalVLM(llm.VideoLLM): + """Local VLM using Moondream model for captioning or visual queries. + + This VLM downloads and runs the moondream3-preview model locally from Hugging Face, + providing captioning and visual question answering capabilities without requiring an API key. + + Note: The moondream3-preview model is gated and requires authentication: + - Request access at https://huggingface.co/moondream/moondream3-preview + - Once approved, authenticate using one of: + - Set HF_TOKEN environment variable: export HF_TOKEN=your_token_here + - Run: huggingface-cli login + + Args: + mode: "vqa" for visual question answering or "caption" for image captioning (default: "vqa") + max_workers: Number of worker threads for async operations (default: 10) + force_cpu: If True, force CPU usage even if CUDA/MPS is available (default: False). + Auto-detects CUDA, then MPS (Apple Silicon), then defaults to CPU. + Note: MPS is automatically converted to CPU due to model compatibility. We recommend running on CUDA for best performance. + model_name: Hugging Face model identifier (default: "moondream/moondream3-preview") + options: AgentOptions for model directory configuration. + If not provided, uses default_agent_options() + """ + + def __init__( + self, + mode: Literal["vqa", "caption"] = "vqa", + max_workers: int = 10, + force_cpu: bool = False, + model_name: str = "moondream/moondream3-preview", + options: Optional[AgentOptions] = None, + ): + super().__init__() + + self.max_workers = max_workers + self.mode = mode + self.model_name = model_name + self.force_cpu = force_cpu + + if options is None: + self.options = default_agent_options() + else: + self.options = options + + if torch.backends.mps.is_available(): + self.force_cpu = True + logger.warning("⚠️ MPS detected but using CPU (moondream model has CUDA dependencies incompatible with MPS)") + + if self.force_cpu: + self.device, self._dtype = torch.device("cpu"), torch.float32 + else: + self.device, self._dtype = handle_device() + + self._frame_buffer: LatestNQueue[av.VideoFrame] = LatestNQueue(maxlen=10) + self._latest_frame: Optional[av.VideoFrame] = None + self._video_forwarder: Optional[VideoForwarder] = None + self._stt_subscription_setup = False + self._processing_lock = asyncio.Lock() + + self.executor = ThreadPoolExecutor(max_workers=max_workers) + self.model = None + + logger.info("🌙 Moondream Local VLM initialized") + logger.info(f"🔧 Device: {self.device}") + logger.info(f"📝 Mode: {self.mode}") + + async def warmup(self) -> None: + """Initialize and load the model.""" + if self.model is None: + await self._prepare_moondream() + + async def _prepare_moondream(self): + """Load the Moondream model from Hugging Face.""" + logger.info(f"Loading Moondream model: {self.model_name}") + logger.info(f"Device: {self.device}") + + self.model = await asyncio.to_thread( # type: ignore[func-returns-value] + lambda: self._load_model_sync() + ) + logger.info("✅ Moondream model loaded") + + def _load_model_sync(self): + """Synchronous model loading function run in thread pool.""" + try: + # Check for Hugging Face token (required for gated models) + hf_token = os.getenv("HF_TOKEN") + if not hf_token: + logger.warning( + "⚠️ HF_TOKEN environment variable not set. " + "This model requires authentication. " + "Set HF_TOKEN or run 'huggingface-cli login'" + ) + + load_kwargs = { + "trust_remote_code": True, + "cache_dir": self.options.model_dir, + } + + if hf_token: + load_kwargs["token"] = hf_token + else: + load_kwargs["token"] = True + + model = AutoModelForCausalLM.from_pretrained( + self.model_name, + device_map={"": self.device}, + dtype=self._dtype, + **load_kwargs, + ) + + if self.force_cpu: + model.to("cpu") # type: ignore[arg-type] + model.eval() + logger.info(f"✅ Model loaded on {self.device} device") + + try: + model.compile() + except Exception as compile_error: + # If compilation fails, log and continue without compilation + logger.warning(f"⚠️ Model compilation failed, continuing without compilation: {compile_error}") + + return model + except Exception as e: + error_msg = str(e) + if "gated repo" in error_msg.lower() or "403" in error_msg or "authorized" in error_msg.lower(): + logger.exception( + "❌ Failed to load Moondream model: Model requires authentication.\n" + "This model is gated and requires access approval:\n" + f"1. Visit https://huggingface.co/{self.model_name} to request access\n" + "2. Once approved, authenticate using one of:\n" + " - Set HF_TOKEN environment variable: export HF_TOKEN=your_token_here\n" + " - Run: huggingface-cli login\n" + f"Original error: {e}" + ) + else: + logger.exception(f"❌ Failed to load Moondream model: {e}") + raise + + async def watch_video_track( + self, + track: aiortc.mediastreams.MediaStreamTrack, + shared_forwarder: Optional[VideoForwarder] = None + ) -> None: + """Setup video forwarding and STT subscription.""" + if self._video_forwarder is not None and shared_forwarder is None: + logger.warning("Video forwarder already running, stopping previous one") + await self._stop_watching_video_track() + + if self.model is None: + await self._prepare_moondream() + + if shared_forwarder is not None: + self._video_forwarder = shared_forwarder + logger.info("🎥 Moondream Local VLM subscribing to shared VideoForwarder") + await self._video_forwarder.start_event_consumer( + self._on_frame_received, + fps=1.0, + consumer_name="moondream_local_vlm" + ) + else: + self._video_forwarder = VideoForwarder( + track, # type: ignore[arg-type] + max_buffer=10, + fps=1.0, + name="moondream_local_vlm_forwarder", + ) + await self._video_forwarder.start() + await self._video_forwarder.start_event_consumer( + self._on_frame_received + ) + + if not self._stt_subscription_setup and self.agent: + self._setup_stt_subscription() + self._stt_subscription_setup = True + + async def _on_frame_received(self, frame: av.VideoFrame): + """Callback to receive frames and add to buffer.""" + try: + self._frame_buffer.put_latest_nowait(frame) + self._latest_frame = frame + except Exception as e: + logger.error(f"Error adding frame to buffer: {e}") + + def _setup_stt_subscription(self): + if not self.agent: + logger.warning("Cannot setup STT subscription: agent not set") + return + + @self.agent.events.subscribe + async def on_stt_transcript(event: STTTranscriptEvent): + await self._on_stt_transcript(event) + + def _consume_stream(self, generator): + """Consume the generator stream from model query/caption methods.""" + chunks = [] + for chunk in generator: + if isinstance(chunk, str): + chunks.append(chunk) + else: + logger.warning(f"Unexpected chunk type: {type(chunk)}, value: {chunk}") + if chunk: + chunks.append(str(chunk)) + result = "".join(chunks) + return result + + async def _process_frame(self, text: Optional[str] = None) -> Optional[LLMResponseEvent]: + if self._latest_frame is None: + logger.warning("No frames available, skipping Moondream processing") + return None + + if self.model is None: + logger.warning("Model not loaded, skipping Moondream processing") + return None + + if self._processing_lock.locked(): + logger.debug("Moondream processing already in progress, skipping") + return None + + try: + await self._processing_lock.acquire() + except Exception as e: + logger.warning(f"Failed to acquire lock: {e}") + return None + + latest_frame = self._latest_frame + + try: + frame_array = latest_frame.to_ndarray(format="rgb24") + image = Image.fromarray(frame_array) + + if self.mode == "vqa": + if not text: + logger.warning("VQA mode requires text/question") + return None + + result = await asyncio.to_thread(self.model.query, image, text, stream=True) + + if isinstance(result, dict) and "answer" in result: + stream = result["answer"] + else: + stream = result + + answer = await asyncio.to_thread(self._consume_stream, stream) + + if not answer: + logger.warning("Moondream query returned empty answer") + return None + + self.events.send(LLMResponseChunkEvent(delta=answer)) + self.events.send(LLMResponseCompletedEvent(text=answer)) + logger.info(f"Moondream VQA response: {answer}") + return LLMResponseEvent(original=answer, text=answer) + + else: + result = await asyncio.to_thread(self.model.caption, image, length="normal", stream=True) + + if isinstance(result, dict) and "caption" in result: + stream = result["caption"] + else: + stream = result + + caption = await asyncio.to_thread(self._consume_stream, stream) + + if not caption: + logger.warning("Moondream caption returned empty result") + return None + + self.events.send(LLMResponseChunkEvent(delta=caption)) + self.events.send(LLMResponseCompletedEvent(text=caption)) + logger.info(f"Moondream caption: {caption}") + return LLMResponseEvent(original=caption, text=caption) + + except Exception as e: + logger.exception(f"Error processing frame: {e}") + return LLMResponseEvent(original=None, text="", exception=e) + finally: + if self._processing_lock.locked(): + self._processing_lock.release() + + async def _on_stt_transcript(self, event: STTTranscriptEvent): + """Handle STT transcript event - process with Moondream.""" + if not event.text: + return + + await self._process_frame(text=event.text) + + async def simple_response( + self, + text: str, + processors: Optional[List[Processor]] = None, + participant: Optional[Participant] = None, + ) -> LLMResponseEvent: + """ + simple_response is a standardized way to create a response. + + Args: + text: The text/question to respond to + processors: list of processors (which contain state) about the video/voice AI + participant: optionally the participant object + + Examples: + await llm.simple_response("What do you see in this image?") + """ + result = await self._process_frame(text=text if self.mode == "vqa" else None) + if result is None: + return LLMResponseEvent(original=None, text="", + exception=ValueError("No frame available or processing failed")) + return result + + async def _stop_watching_video_track(self) -> None: + """Stop video forwarding.""" + if self._video_forwarder is not None: + await self._video_forwarder.stop() + self._video_forwarder = None + logger.info("Stopped video forwarding") + + def close(self): + """Clean up resources.""" + self.executor.shutdown(wait=False) + if self.model is not None: + del self.model + self.model = None + logger.info("🛑 Moondream Local VLM closed") \ No newline at end of file