diff --git a/libs/agno/agno/agent/_response.py b/libs/agno/agno/agent/_response.py index f86f42053d..4767dd88bf 100644 --- a/libs/agno/agno/agent/_response.py +++ b/libs/agno/agno/agent/_response.py @@ -1517,12 +1517,12 @@ def handle_model_response_chunk( if model_response.images is None: model_response.images = [] model_response.images.extend(model_response_event.images) - # Store media in run_response if store_media is enabled - if agent.store_media: - for image in model_response_event.images: - if run_response.images is None: - run_response.images = [] - run_response.images.append(image) + # Always store media in run_response for the caller; + # store_media only controls DB persistence (handled by cleanup_and_store) + for image in model_response_event.images: + if run_response.images is None: + run_response.images = [] + run_response.images.append(image) # Handle tool interruption events (HITL flow) elif model_response_event.event == ModelResponseEvent.tool_call_paused.value: diff --git a/libs/agno/agno/agent/_run.py b/libs/agno/agno/agent/_run.py index 2e5b5af833..9d92d7ab8a 100644 --- a/libs/agno/agno/agent/_run.py +++ b/libs/agno/agno/agent/_run.py @@ -556,9 +556,8 @@ def _run( user_id=user_id, ) - # 8. Store media if enabled - if agent.store_media: - store_media_util(run_response, model_response) + # 8. Store media in run output for the caller + store_media_util(run_response, model_response) # 9. Convert the response to the structured format if needed convert_response_to_structured_format(agent, run_response, run_context=run_context) @@ -1635,9 +1634,8 @@ async def _arun( # 11. Convert the response to the structured format if needed convert_response_to_structured_format(agent, run_response, run_context=run_context) - # 12. Store media if enabled - if agent.store_media: - store_media_util(run_response, model_response) + # 12. Store media in run output for the caller + store_media_util(run_response, model_response) # 13. Execute post-hooks (after output is generated but before response is returned) if agent.post_hooks is not None: @@ -2953,9 +2951,8 @@ def _continue_run( # 4. Convert the response to the structured format if needed convert_response_to_structured_format(agent, run_response, run_context=run_context) - # 5. Store media if enabled - if agent.store_media: - store_media_util(run_response, model_response) + # 5. Store media in run output for the caller + store_media_util(run_response, model_response) # 6. Execute post-hooks if agent.post_hooks is not None: @@ -3710,9 +3707,8 @@ async def _acontinue_run( # 10. Convert the response to the structured format if needed convert_response_to_structured_format(agent, run_response, run_context=run_context) - # 11. Store media if enabled - if agent.store_media: - store_media_util(run_response, model_response) + # 11. Store media in run output for the caller + store_media_util(run_response, model_response) await araise_if_cancelled(run_response.run_id) # type: ignore @@ -4402,11 +4398,20 @@ def cleanup_and_store( run_context: Optional[RunContext] = None, user_id: Optional[str] = None, ) -> None: + import copy + from agno.agent import _session from agno.run.approval import update_approval_run_status - # Scrub the stored run based on storage flags - scrub_run_output_for_storage(agent, run_response) + # Scrub a shallow copy for storage — the original run_response is never + # mutated so the caller always sees generated media regardless of store_media. + storage_copy = copy.copy(run_response) + scrub_run_output_for_storage(agent, storage_copy) + if not agent.store_media: + storage_copy.images = None + storage_copy.videos = None + storage_copy.audio = None + storage_copy.files = None # Stop the timer for the Run duration if run_response.metrics: @@ -4416,18 +4421,19 @@ def cleanup_and_store( # This ensures RunOutput reflects all tool modifications if run_context is not None and run_context.session_state is not None: run_response.session_state = run_context.session_state + storage_copy.session_state = run_context.session_state # Optional: Save output to file if save_response_to_file is set save_run_response_to_file( agent, - run_response=run_response, + run_response=storage_copy, input=run_response.input.input_content_string() if run_response.input else "", session_id=session.session_id, user_id=user_id, ) - # Add RunOutput to Agent Session - session.upsert_run(run=run_response) + # Add scrubbed RunOutput to Agent Session + session.upsert_run(run=storage_copy) # Calculate session metrics update_session_metrics(agent, session=session, run_response=run_response) @@ -4455,32 +4461,41 @@ async def acleanup_and_store( run_context: Optional[RunContext] = None, user_id: Optional[str] = None, ) -> None: + import copy + from agno.agent import _session from agno.run.approval import aupdate_approval_run_status - # Scrub the stored run based on storage flags - scrub_run_output_for_storage(agent, run_response) + # Scrub a shallow copy for storage — the original run_response is never + # mutated so the caller always sees generated media regardless of store_media. + storage_copy = copy.copy(run_response) + scrub_run_output_for_storage(agent, storage_copy) + if not agent.store_media: + storage_copy.images = None + storage_copy.videos = None + storage_copy.audio = None + storage_copy.files = None # Stop the timer for the Run duration if run_response.metrics: run_response.metrics.stop_timer() # Update run_response.session_state before saving - # This ensures RunOutput reflects all tool modifications if run_context is not None and run_context.session_state is not None: run_response.session_state = run_context.session_state + storage_copy.session_state = run_context.session_state # Optional: Save output to file if save_response_to_file is set save_run_response_to_file( agent, - run_response=run_response, + run_response=storage_copy, input=run_response.input.input_content_string() if run_response.input else "", session_id=session.session_id, user_id=user_id, ) - # Add RunOutput to Agent Session - session.upsert_run(run=run_response) + # Add scrubbed RunOutput to Agent Session + session.upsert_run(run=storage_copy) # Calculate session metrics update_session_metrics(agent, session=session, run_response=run_response) diff --git a/libs/agno/agno/team/_run.py b/libs/agno/agno/team/_run.py index c2d89776f4..d6f1e4aa16 100644 --- a/libs/agno/agno/team/_run.py +++ b/libs/agno/agno/team/_run.py @@ -368,8 +368,8 @@ def _run_tasks( # === Post-loop === - # Store media if enabled - if team.store_media and model_response is not None: + # Always add media to run_response for caller availability + if model_response is not None: store_media_util(run_response, model_response) # Convert response to structured format @@ -1152,9 +1152,8 @@ def _run( team, run_response=run_response, session=session, run_context=run_context ) - # 8. Store media if enabled - if team.store_media: - store_media_util(run_response, model_response) + # 8. Always add media to run_response for caller availability + store_media_util(run_response, model_response) # 9. Convert response to structured format _convert_response_to_structured_format(team, run_response=run_response, run_context=run_context) @@ -2143,8 +2142,8 @@ async def _arun_tasks( # === Post-loop === - # Store media if enabled - if team.store_media and model_response is not None: + # Always add media to run_response for caller availability + if model_response is not None: store_media_util(run_response, model_response) # Convert response to structured format @@ -2986,9 +2985,8 @@ async def _arun( team, run_response=run_response, session=team_session, run_context=run_context ) - # 8. Store media if enabled - if team.store_media: - store_media_util(run_response, model_response) + # 8. Always add media to run_response for caller availability + store_media_util(run_response, model_response) # 9. Convert response to structured format _convert_response_to_structured_format(team, run_response=run_response, run_context=run_context) @@ -3916,11 +3914,20 @@ def _cleanup_and_store( session: TeamSession, run_context: Optional[RunContext] = None, ) -> None: - # Scrub the stored run based on storage flags + import copy + from agno.run.approval import update_approval_run_status from agno.team._session import update_session_metrics - scrub_run_output_for_storage(team, run_response) + # Scrub a shallow copy for storage — the original run_response is never + # mutated so the caller always sees generated media regardless of store_media. + storage_copy = copy.copy(run_response) + scrub_run_output_for_storage(team, storage_copy) + if not team.store_media: + storage_copy.images = None + storage_copy.videos = None + storage_copy.audio = None + storage_copy.files = None # Stop the timer for the Run duration if run_response.metrics: @@ -3929,9 +3936,10 @@ def _cleanup_and_store( # Update run_response.session_state before saving if run_context is not None and run_context.session_state is not None: run_response.session_state = run_context.session_state + storage_copy.session_state = run_context.session_state - # Add RunOutput to Team Session - session.upsert_run(run_response=run_response) + # Add scrubbed RunOutput to Team Session + session.upsert_run(run_response=storage_copy) # Calculate session metrics update_session_metrics(team, session=session, run_response=run_response) @@ -3947,7 +3955,6 @@ def _cleanup_and_store( team.save_session(session=session) # Update approval run_status if this run has an associated approval. - # This is a no-op if no approval exists for this run_id. if run_response.status is not None and run_response.run_id is not None: update_approval_run_status(team.db, run_response.run_id, run_response.status) @@ -3958,11 +3965,20 @@ async def _acleanup_and_store( session: TeamSession, run_context: Optional[RunContext] = None, ) -> None: - # Scrub the stored run based on storage flags + import copy + from agno.run.approval import aupdate_approval_run_status from agno.team._session import update_session_metrics - scrub_run_output_for_storage(team, run_response) + # Scrub a shallow copy for storage — the original run_response is never + # mutated so the caller always sees generated media regardless of store_media. + storage_copy = copy.copy(run_response) + scrub_run_output_for_storage(team, storage_copy) + if not team.store_media: + storage_copy.images = None + storage_copy.videos = None + storage_copy.audio = None + storage_copy.files = None # Stop the timer for the Run duration if run_response.metrics: @@ -3971,9 +3987,10 @@ async def _acleanup_and_store( # Update run_response.session_state before saving if run_context is not None and run_context.session_state is not None: run_response.session_state = run_context.session_state + storage_copy.session_state = run_context.session_state - # Add RunOutput to Team Session - session.upsert_run(run_response=run_response) + # Add scrubbed RunOutput to Team Session + session.upsert_run(run_response=storage_copy) # Calculate session metrics update_session_metrics(team, session=session, run_response=run_response) @@ -3989,7 +4006,6 @@ async def _acleanup_and_store( await team.asave_session(session=session) # Update approval run_status if this run has an associated approval. - # This is a no-op if no approval exists for this run_id. if run_response.status is not None and run_response.run_id is not None: await aupdate_approval_run_status(team.db, run_response.run_id, run_response.status) @@ -4967,9 +4983,8 @@ def _continue_run( # Convert to structured format _convert_response_to_structured_format(team, run_response=run_response, run_context=run_context) - # Store media - if team.store_media: - store_media_util(run_response, model_response) + # Always add media to run_response for caller availability + store_media_util(run_response, model_response) # Execute post-hooks if team.post_hooks is not None: @@ -5621,8 +5636,7 @@ async def _acontinue_run( _convert_response_to_structured_format(team, run_response=run_response, run_context=run_context) - if team.store_media: - store_media_util(run_response, model_response) + store_media_util(run_response, model_response) elif member_results: # Member-only: re-run team with results diff --git a/libs/agno/agno/utils/agent.py b/libs/agno/agno/utils/agent.py index b846f5b33a..2898be0343 100644 --- a/libs/agno/agno/utils/agent.py +++ b/libs/agno/agno/utils/agent.py @@ -493,6 +493,13 @@ def scrub_media_from_run_output(run_response: Union[RunOutput, TeamRunOutput]) - for message in run_response.reasoning_messages: scrub_media_from_message(message) + # 6. Null top-level output media fields + # Consumed by cleanup_and_store (with save/restore) and _scrub_member_responses (permanent) + run_response.images = None + run_response.videos = None + run_response.audio = None + run_response.files = None + def scrub_media_from_message(message: Message) -> None: """Remove all media from a Message object.""" diff --git a/libs/agno/tests/unit/agent/test_store_media_run_output.py b/libs/agno/tests/unit/agent/test_store_media_run_output.py new file mode 100644 index 0000000000..f4da956b2f --- /dev/null +++ b/libs/agno/tests/unit/agent/test_store_media_run_output.py @@ -0,0 +1,192 @@ +"""Tests for store_media=False still returning media in RunOutput to caller. + +Verifies fix for https://github.com/agno-agi/agno/issues/5101 +""" + +from typing import Any, AsyncIterator, Iterator +from unittest.mock import AsyncMock, Mock + +import pytest + +from agno.agent.agent import Agent +from agno.media import Image +from agno.models.base import Model +from agno.models.message import MessageMetrics +from agno.models.response import ModelResponse +from agno.run.agent import RunCompletedEvent, RunOutput + + +class MockModelWithImage(Model): + def __init__(self): + super().__init__(id="test-model", name="test-model", provider="test") + self.instructions = None + + self._mock_response = ModelResponse( + content="Here is your generated image", + role="assistant", + images=[Image(url="https://example.com/generated.png", id="img-1")], + response_usage=MessageMetrics(), + ) + + self.response = Mock(return_value=self._mock_response) + self.aresponse = AsyncMock(return_value=self._mock_response) + + def get_instructions_for_model(self, *args, **kwargs): + return None + + def get_system_message_for_model(self, *args, **kwargs): + return None + + async def aget_instructions_for_model(self, *args, **kwargs): + return None + + async def aget_system_message_for_model(self, *args, **kwargs): + return None + + def parse_args(self, *args, **kwargs): + return {} + + def invoke(self, *args, **kwargs) -> ModelResponse: + return self._mock_response + + async def ainvoke(self, *args, **kwargs) -> ModelResponse: + return await self.aresponse(*args, **kwargs) + + def invoke_stream(self, *args, **kwargs) -> Iterator[ModelResponse]: + yield self._mock_response + + async def ainvoke_stream(self, *args, **kwargs) -> AsyncIterator[ModelResponse]: + yield self._mock_response + return + + def _parse_provider_response(self, response: Any, **kwargs) -> ModelResponse: + return self._mock_response + + def _parse_provider_response_delta(self, response: Any) -> ModelResponse: + return self._mock_response + + +def test_store_media_false_returns_images_to_caller(): + """Returned RunOutput should have images even with store_media=False.""" + agent = Agent( + model=MockModelWithImage(), + store_media=False, + ) + + result = agent.run("Generate an image") + + assert result.images is not None + assert len(result.images) == 1 + assert result.images[0].url == "https://example.com/generated.png" + + +@pytest.mark.asyncio +async def test_store_media_false_returns_images_to_caller_async(): + """Async: returned RunOutput should have images even with store_media=False.""" + agent = Agent( + model=MockModelWithImage(), + store_media=False, + ) + + result = await agent.arun("Generate an image") + + assert result.images is not None + assert len(result.images) == 1 + assert result.images[0].url == "https://example.com/generated.png" + + +def test_store_media_true_returns_images_to_caller(): + """With store_media=True (default), caller should still see images.""" + agent = Agent( + model=MockModelWithImage(), + store_media=True, + ) + + result = agent.run("Generate an image") + + assert result.images is not None + assert len(result.images) == 1 + + +def test_store_media_false_without_db(): + """store_media=False works correctly without any DB configured.""" + agent = Agent( + model=MockModelWithImage(), + store_media=False, + ) + + result = agent.run("Generate an image") + + assert result.images is not None + assert len(result.images) == 1 + assert result.images[0].id == "img-1" + + +def test_store_media_false_streaming_with_yield_run_output(): + agent = Agent( + model=MockModelWithImage(), + store_media=False, + ) + + run_output = None + for chunk in agent.run("Generate an image", stream=True, yield_run_output=True): + if isinstance(chunk, RunOutput): + run_output = chunk + + assert run_output is not None + assert run_output.images is not None + assert len(run_output.images) == 1 + assert run_output.images[0].url == "https://example.com/generated.png" + + +def test_store_media_false_streaming_with_stream_events(): + agent = Agent( + model=MockModelWithImage(), + store_media=False, + ) + + completed_event = None + for chunk in agent.run("Generate an image", stream=True, stream_events=True): + if isinstance(chunk, RunCompletedEvent): + completed_event = chunk + + assert completed_event is not None + assert completed_event.images is not None + assert len(completed_event.images) == 1 + assert completed_event.images[0].url == "https://example.com/generated.png" + + +@pytest.mark.asyncio +async def test_store_media_false_async_streaming_with_yield_run_output(): + agent = Agent( + model=MockModelWithImage(), + store_media=False, + ) + + run_output = None + async for chunk in agent.arun("Generate an image", stream=True, yield_run_output=True): + if isinstance(chunk, RunOutput): + run_output = chunk + + assert run_output is not None + assert run_output.images is not None + assert len(run_output.images) == 1 + assert run_output.images[0].url == "https://example.com/generated.png" + + +@pytest.mark.asyncio +async def test_store_media_false_async_streaming_with_stream_events(): + agent = Agent( + model=MockModelWithImage(), + store_media=False, + ) + + completed_event = None + async for chunk in agent.arun("Generate an image", stream=True, stream_events=True): + if isinstance(chunk, RunCompletedEvent): + completed_event = chunk + + assert completed_event is not None + assert completed_event.images is not None + assert len(completed_event.images) == 1 + assert completed_event.images[0].url == "https://example.com/generated.png" diff --git a/libs/agno/tests/unit/agent/test_store_media_scrub_leak.py b/libs/agno/tests/unit/agent/test_store_media_scrub_leak.py new file mode 100644 index 0000000000..6c41eb165c --- /dev/null +++ b/libs/agno/tests/unit/agent/test_store_media_scrub_leak.py @@ -0,0 +1,218 @@ +"""Reproduction tests for media leak through scrub_media_from_run_output. + +These tests prove that scrub_media_from_run_output() must null top-level +output media fields (images/videos/audio/files), not just input/message media. + +Without the fix, member responses and workflow executor runs leak media to DB +when the member/executor has store_media=False. +""" + +from typing import Any, AsyncIterator, Iterator +from unittest.mock import AsyncMock, Mock + +import pytest + +from agno.agent.agent import Agent +from agno.media import Image, Video +from agno.models.base import Model +from agno.models.message import MessageMetrics +from agno.models.response import ModelResponse +from agno.run.agent import RunOutput +from agno.run.team import TeamRunOutput +from agno.session.agent import AgentSession +from agno.utils.agent import scrub_media_from_run_output + + +class MockModelWithImage(Model): + def __init__(self): + super().__init__(id="test-model", name="test-model", provider="test") + self.instructions = None + + self._mock_response = ModelResponse( + content="Here is your generated image", + role="assistant", + images=[Image(url="https://example.com/generated.png", id="img-1")], + response_usage=MessageMetrics(), + ) + + self.response = Mock(return_value=self._mock_response) + self.aresponse = AsyncMock(return_value=self._mock_response) + + def get_instructions_for_model(self, *args, **kwargs): + return None + + def get_system_message_for_model(self, *args, **kwargs): + return None + + async def aget_instructions_for_model(self, *args, **kwargs): + return None + + async def aget_system_message_for_model(self, *args, **kwargs): + return None + + def parse_args(self, *args, **kwargs): + return {} + + def invoke(self, *args, **kwargs) -> ModelResponse: + return self._mock_response + + async def ainvoke(self, *args, **kwargs) -> ModelResponse: + return await self.aresponse(*args, **kwargs) + + def invoke_stream(self, *args, **kwargs) -> Iterator[ModelResponse]: + yield self._mock_response + + async def ainvoke_stream(self, *args, **kwargs) -> AsyncIterator[ModelResponse]: + yield self._mock_response + return + + def _parse_provider_response(self, response: Any, **kwargs) -> ModelResponse: + return self._mock_response + + def _parse_provider_response_delta(self, response: Any) -> ModelResponse: + return self._mock_response + + +# -- Core scrub function tests -- + + +def test_scrub_media_nulls_top_level_output_images(): + """scrub_media_from_run_output must null run_response.images.""" + run_output = RunOutput( + images=[Image(url="https://example.com/img.png", id="img-1")], + ) + scrub_media_from_run_output(run_output) + assert run_output.images is None, "images should be None after scrub" + + +def test_scrub_media_nulls_top_level_output_videos(): + """scrub_media_from_run_output must null run_response.videos.""" + run_output = RunOutput( + videos=[Video(url="https://example.com/vid.mp4", id="vid-1")], + ) + scrub_media_from_run_output(run_output) + assert run_output.videos is None, "videos should be None after scrub" + + +def test_scrub_media_nulls_top_level_output_audio(): + """scrub_media_from_run_output must null run_response.audio.""" + from agno.media import Audio + + run_output = RunOutput( + audio=[Audio(id="aud-1", content=b"fake-audio")], + ) + scrub_media_from_run_output(run_output) + assert run_output.audio is None, "audio should be None after scrub" + + +def test_scrub_media_nulls_top_level_output_files(): + """scrub_media_from_run_output must null run_response.files.""" + from agno.media import File + + run_output = RunOutput( + files=[File(id="file-1", content=b"fake-file")], + ) + scrub_media_from_run_output(run_output) + assert run_output.files is None, "files should be None after scrub" + + +def test_scrub_media_nulls_team_run_output_images(): + """scrub_media_from_run_output works on TeamRunOutput too.""" + team_output = TeamRunOutput( + images=[Image(url="https://example.com/img.png", id="img-1")], + ) + scrub_media_from_run_output(team_output) + assert team_output.images is None, "TeamRunOutput.images should be None after scrub" + + +# -- Member response leak reproduction -- + + +def test_member_response_media_scrubbed_via_scrub_run_output_for_storage(): + """Simulates _scrub_member_responses path: scrub_run_output_for_storage + must clear top-level images on a member response when store_media=False.""" + from agno.agent._run import scrub_run_output_for_storage + + member_agent = Agent(model=MockModelWithImage(), store_media=False) + + # Simulate a member response with generated images + member_response = RunOutput( + agent_id=member_agent.id, + images=[Image(url="https://example.com/generated.png", id="img-1")], + content="Here is your image", + ) + + scrub_run_output_for_storage(member_agent, member_response) + + assert member_response.images is None, ( + "Member response images should be None after scrub_run_output_for_storage " + "with store_media=False. Without this fix, images leak to DB." + ) + + +def test_member_response_media_preserved_when_store_media_true(): + """When store_media=True, scrub_run_output_for_storage should NOT be called + for media, so images remain.""" + from agno.agent._run import scrub_run_output_for_storage + + member_agent = Agent(model=MockModelWithImage(), store_media=True) + + member_response = RunOutput( + agent_id=member_agent.id, + images=[Image(url="https://example.com/generated.png", id="img-1")], + content="Here is your image", + ) + + scrub_run_output_for_storage(member_agent, member_response) + + # store_media=True means scrub_media_from_run_output is NOT called + assert member_response.images is not None + assert len(member_response.images) == 1 + + +# -- Agent cleanup save/restore still works -- + + +def test_cleanup_and_store_restores_media_after_scrub(): + """cleanup_and_store saves media before scrub and restores in finally. + Even with the fix to scrub_media_from_run_output, the caller must + still get images back.""" + agent = Agent(model=MockModelWithImage(), store_media=False) + + result = agent.run("Generate an image") + + assert result.images is not None, "Caller should see images after cleanup_and_store" + assert len(result.images) == 1 + assert result.images[0].url == "https://example.com/generated.png" + + +@pytest.mark.asyncio +async def test_acleanup_and_store_restores_media_after_scrub(): + """Async variant: caller still gets images back.""" + agent = Agent(model=MockModelWithImage(), store_media=False) + + result = await agent.arun("Generate an image") + + assert result.images is not None, "Caller should see images after acleanup_and_store" + assert len(result.images) == 1 + assert result.images[0].url == "https://example.com/generated.png" + + +# -- Session cache isolation -- + + +def test_session_cache_does_not_have_media_when_store_media_false(): + """After agent.run with store_media=False, the session's cached run + should NOT have images — only the returned RunOutput should.""" + agent = Agent(model=MockModelWithImage(), store_media=False) + + result = agent.run("Generate an image") + + # Caller sees images + assert result.images is not None + + # Session cache should NOT have images + session = agent.get_session() + if session and session.runs: + for run in session.runs: + assert run.images is None, "Session cached run should not have images when store_media=False" diff --git a/libs/agno/tests/unit/os/routers/test_slack_store_media.py b/libs/agno/tests/unit/os/routers/test_slack_store_media.py new file mode 100644 index 0000000000..944e0dc619 --- /dev/null +++ b/libs/agno/tests/unit/os/routers/test_slack_store_media.py @@ -0,0 +1,450 @@ +import asyncio +import time +from typing import Any, AsyncIterator, Iterator +from unittest.mock import AsyncMock, Mock, patch + +import pytest +from fastapi import APIRouter, FastAPI + +from agno.agent import RunEvent +from agno.agent.agent import Agent +from agno.media import Image +from agno.models.base import Model +from agno.models.message import MessageMetrics +from agno.models.response import ModelResponse + +from .conftest import ( + build_app, + make_async_client_mock, + make_signed_request, + make_slack_mock, + make_stream_mock, + make_streaming_body, + wait_for_call, +) + + +class MockModelWithImage(Model): + def __init__(self): + super().__init__(id="test-model", name="test-model", provider="test") + self.instructions = None + + self._mock_response = ModelResponse( + content="Here is your generated image", + role="assistant", + images=[Image(url="https://example.com/generated.png", id="img-1")], + response_usage=MessageMetrics(), + ) + + self.response = Mock(return_value=self._mock_response) + self.aresponse = AsyncMock(return_value=self._mock_response) + + def get_instructions_for_model(self, *args, **kwargs): + return None + + def get_system_message_for_model(self, *args, **kwargs): + return None + + async def aget_instructions_for_model(self, *args, **kwargs): + return None + + async def aget_system_message_for_model(self, *args, **kwargs): + return None + + def parse_args(self, *args, **kwargs): + return {} + + def invoke(self, *args, **kwargs) -> ModelResponse: + return self._mock_response + + async def ainvoke(self, *args, **kwargs) -> ModelResponse: + return await self.aresponse(*args, **kwargs) + + def invoke_stream(self, *args, **kwargs) -> Iterator[ModelResponse]: + yield self._mock_response + + async def ainvoke_stream(self, *args, **kwargs) -> AsyncIterator[ModelResponse]: + yield self._mock_response + return + + def _parse_provider_response(self, response: Any, **kwargs) -> ModelResponse: + return self._mock_response + + def _parse_provider_response_delta(self, response: Any) -> ModelResponse: + return self._mock_response + + +# -- Non-streaming: store_media=False with mock agent -- + + +@pytest.mark.asyncio +async def test_non_streaming_store_media_false_uploads_media(): + agent_mock = AsyncMock() + agent_mock.name = "Test Agent" + agent_mock.arun = AsyncMock( + return_value=Mock( + status="OK", + content="Here is your image", + reasoning_content=None, + images=[Image(url="https://example.com/generated.png", id="img-1")], + files=None, + videos=None, + audio=None, + ) + ) + mock_slack = make_slack_mock() + mock_client = make_async_client_mock() + mock_client.files_upload_v2 = AsyncMock() + + with ( + patch("agno.os.interfaces.slack.router.verify_slack_signature", return_value=True), + patch("agno.os.interfaces.slack.router.SlackTools", return_value=mock_slack), + patch("slack_sdk.web.async_client.AsyncWebClient", return_value=mock_client), + ): + app = build_app(agent_mock, reply_to_mentions_only=False) + from fastapi.testclient import TestClient + + client = TestClient(app) + body = { + "type": "event_callback", + "event": { + "type": "message", + "channel_type": "im", + "text": "generate an image", + "user": "U123", + "channel": "C123", + "ts": str(time.time()), + }, + } + resp = make_signed_request(client, body) + assert resp.status_code == 200 + await wait_for_call(agent_mock.arun) + # upload_response_media_async runs synchronously in the background task after arun + await asyncio.sleep(1.0) + + # Verify the agent was called + agent_mock.arun.assert_called_once() + # Response had images — upload should have been attempted + # (would fail because Image has url not content, but the call confirms the path) + + +@pytest.mark.asyncio +async def test_non_streaming_store_media_false_response_has_images(): + agent_mock = AsyncMock() + agent_mock.name = "Test Agent" + response_mock = Mock( + status="OK", + content="Here is your image", + reasoning_content=None, + images=[Image(url="https://example.com/generated.png", id="img-1")], + files=None, + videos=None, + audio=None, + ) + agent_mock.arun = AsyncMock(return_value=response_mock) + mock_slack = make_slack_mock() + mock_client = make_async_client_mock() + upload_mock = AsyncMock() + mock_client.files_upload_v2 = upload_mock + + with ( + patch("agno.os.interfaces.slack.router.verify_slack_signature", return_value=True), + patch("agno.os.interfaces.slack.router.SlackTools", return_value=mock_slack), + patch("slack_sdk.web.async_client.AsyncWebClient", return_value=mock_client), + patch("agno.os.interfaces.slack.router.upload_response_media_async") as mock_upload, + ): + app = build_app(agent_mock, reply_to_mentions_only=False) + from fastapi.testclient import TestClient + + client = TestClient(app) + body = { + "type": "event_callback", + "event": { + "type": "message", + "channel_type": "im", + "text": "generate an image", + "user": "U123", + "channel": "C123", + "ts": str(time.time()), + }, + } + resp = make_signed_request(client, body) + assert resp.status_code == 200 + await wait_for_call(agent_mock.arun) + await asyncio.sleep(1.0) + + mock_upload.assert_called_once() + call_args = mock_upload.call_args + # Second positional arg is the response object + passed_response = call_args[0][1] + assert passed_response.images is not None + assert len(passed_response.images) == 1 + assert passed_response.images[0].url == "https://example.com/generated.png" + + +# -- Non-streaming: real Agent with store_media=False -- + + +@pytest.mark.asyncio +async def test_non_streaming_real_agent_store_media_false(): + agent = Agent( + model=MockModelWithImage(), + store_media=False, + ) + mock_slack = make_slack_mock() + mock_client = make_async_client_mock() + + with ( + patch("agno.os.interfaces.slack.router.verify_slack_signature", return_value=True), + patch("agno.os.interfaces.slack.router.SlackTools", return_value=mock_slack), + patch("slack_sdk.web.async_client.AsyncWebClient", return_value=mock_client), + patch("agno.os.interfaces.slack.router.upload_response_media_async") as mock_upload, + ): + from agno.os.interfaces.slack.router import attach_routes + + app = FastAPI() + router = APIRouter() + attach_routes(router, agent=agent, streaming=False, reply_to_mentions_only=False) + app.include_router(router) + + from fastapi.testclient import TestClient + + client = TestClient(app) + body = { + "type": "event_callback", + "event": { + "type": "message", + "channel_type": "im", + "text": "generate an image", + "user": "U123", + "channel": "C123", + "ts": str(time.time()), + }, + } + resp = make_signed_request(client, body) + assert resp.status_code == 200 + # Wait for background task + await asyncio.sleep(3.0) + + mock_upload.assert_called_once() + call_args = mock_upload.call_args + passed_response = call_args[0][1] + assert passed_response.images is not None, "store_media=False should not strip images from caller response" + assert len(passed_response.images) == 1 + assert passed_response.images[0].url == "https://example.com/generated.png" + + +# -- Streaming: media in completion chunk -- + + +def _completion_chunk_with_image(): + return Mock( + event=RunEvent.run_completed.value, + content="Here is your image", + images=[Image(url="https://example.com/generated.png", id="img-1")], + videos=None, + audio=None, + files=None, + tool=None, + ) + + +def _content_chunk(text): + return Mock( + event=RunEvent.run_content.value, + content=text, + images=None, + videos=None, + audio=None, + files=None, + tool=None, + ) + + +@pytest.mark.asyncio +async def test_streaming_store_media_false_collects_media_from_completion(): + async def _arun_stream(*args, **kwargs): + yield _content_chunk("Here is ") + yield _content_chunk("your image") + yield _completion_chunk_with_image() + + agent = AsyncMock() + agent.name = "Test Agent" + agent.arun = _arun_stream + + mock_slack = make_slack_mock(token="xoxb-test") + mock_stream = make_stream_mock() + mock_client = AsyncMock() + mock_client.assistant_threads_setStatus = AsyncMock() + mock_client.assistant_threads_setTitle = AsyncMock() + mock_client.chat_stream = AsyncMock(return_value=mock_stream) + + with ( + patch("agno.os.interfaces.slack.router.verify_slack_signature", return_value=True), + patch("agno.os.interfaces.slack.router.SlackTools", return_value=mock_slack), + patch("slack_sdk.web.async_client.AsyncWebClient", return_value=mock_client), + patch("agno.os.interfaces.slack.router.upload_response_media_async") as mock_upload, + ): + app = build_app(agent, streaming=True, reply_to_mentions_only=False) + from fastapi.testclient import TestClient + + client = TestClient(app) + resp = make_signed_request(client, make_streaming_body()) + assert resp.status_code == 200 + await wait_for_call(mock_stream.stop) + await asyncio.sleep(1.0) + + mock_upload.assert_called_once() + call_args = mock_upload.call_args + # In streaming, the second arg is the StreamState object + state = call_args[0][1] + assert len(state.images) == 1 + assert state.images[0].url == "https://example.com/generated.png" + + +@pytest.mark.asyncio +async def test_streaming_content_chunks_with_images_collected(): + async def _arun_stream(*args, **kwargs): + yield Mock( + event=RunEvent.run_content.value, + content="Image 1", + images=[Image(url="https://example.com/img1.png", id="img-1")], + videos=None, + audio=None, + files=None, + tool=None, + ) + yield Mock( + event=RunEvent.run_content.value, + content="Image 2", + images=[Image(url="https://example.com/img2.png", id="img-2")], + videos=None, + audio=None, + files=None, + tool=None, + ) + + agent = AsyncMock() + agent.name = "Test Agent" + agent.arun = _arun_stream + + mock_slack = make_slack_mock(token="xoxb-test") + mock_stream = make_stream_mock() + mock_client = AsyncMock() + mock_client.assistant_threads_setStatus = AsyncMock() + mock_client.assistant_threads_setTitle = AsyncMock() + mock_client.chat_stream = AsyncMock(return_value=mock_stream) + + with ( + patch("agno.os.interfaces.slack.router.verify_slack_signature", return_value=True), + patch("agno.os.interfaces.slack.router.SlackTools", return_value=mock_slack), + patch("slack_sdk.web.async_client.AsyncWebClient", return_value=mock_client), + patch("agno.os.interfaces.slack.router.upload_response_media_async") as mock_upload, + ): + app = build_app(agent, streaming=True, reply_to_mentions_only=False) + from fastapi.testclient import TestClient + + client = TestClient(app) + resp = make_signed_request(client, make_streaming_body()) + assert resp.status_code == 200 + await wait_for_call(mock_stream.stop) + await asyncio.sleep(1.0) + + mock_upload.assert_called_once() + state = mock_upload.call_args[0][1] + assert len(state.images) == 2 + + +# -- Streaming: real Agent with store_media=False -- + + +@pytest.mark.asyncio +async def test_streaming_real_agent_store_media_false(): + agent = Agent( + model=MockModelWithImage(), + store_media=False, + ) + mock_slack = make_slack_mock(token="xoxb-test") + mock_stream = make_stream_mock() + mock_client = AsyncMock() + mock_client.assistant_threads_setStatus = AsyncMock() + mock_client.assistant_threads_setTitle = AsyncMock() + mock_client.chat_stream = AsyncMock(return_value=mock_stream) + + with ( + patch("agno.os.interfaces.slack.router.verify_slack_signature", return_value=True), + patch("agno.os.interfaces.slack.router.SlackTools", return_value=mock_slack), + patch("slack_sdk.web.async_client.AsyncWebClient", return_value=mock_client), + patch("agno.os.interfaces.slack.router.upload_response_media_async") as mock_upload, + ): + from agno.os.interfaces.slack.router import attach_routes + + app = FastAPI() + router = APIRouter() + attach_routes(router, agent=agent, streaming=True, reply_to_mentions_only=False) + app.include_router(router) + + from fastapi.testclient import TestClient + + client = TestClient(app) + resp = make_signed_request(client, make_streaming_body()) + assert resp.status_code == 200 + await asyncio.sleep(5.0) + + mock_upload.assert_called_once() + state = mock_upload.call_args[0][1] + # Real agent with store_media=False should still emit images in streaming chunks + assert len(state.images) >= 1, "store_media=False should not prevent media in streaming chunks" + + +# -- Regression: store_media=True still works -- + + +@pytest.mark.asyncio +async def test_non_streaming_store_media_true_still_uploads(): + agent_mock = AsyncMock() + agent_mock.name = "Test Agent" + agent_mock.arun = AsyncMock( + return_value=Mock( + status="OK", + content="Image ready", + reasoning_content=None, + images=[Image(url="https://example.com/generated.png", id="img-1")], + files=None, + videos=None, + audio=None, + ) + ) + mock_slack = make_slack_mock() + mock_client = make_async_client_mock() + + with ( + patch("agno.os.interfaces.slack.router.verify_slack_signature", return_value=True), + patch("agno.os.interfaces.slack.router.SlackTools", return_value=mock_slack), + patch("slack_sdk.web.async_client.AsyncWebClient", return_value=mock_client), + patch("agno.os.interfaces.slack.router.upload_response_media_async") as mock_upload, + ): + app = build_app(agent_mock, reply_to_mentions_only=False) + from fastapi.testclient import TestClient + + client = TestClient(app) + body = { + "type": "event_callback", + "event": { + "type": "message", + "channel_type": "im", + "text": "generate", + "user": "U123", + "channel": "C123", + "ts": str(time.time()), + }, + } + resp = make_signed_request(client, body) + assert resp.status_code == 200 + await wait_for_call(agent_mock.arun) + await asyncio.sleep(1.0) + + mock_upload.assert_called_once() + passed_response = mock_upload.call_args[0][1] + assert passed_response.images is not None + assert len(passed_response.images) == 1