Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
264 changes: 263 additions & 1 deletion veadk/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,28 @@


async def pre_run_process(self, process_func, new_message, user_id, session_id):
"""Pre-run hook invoked before agent execution.

Iterates over all ``parts`` of ``new_message`` and, when a ``part`` contains
``inline_data`` and uploading is enabled, calls ``process_func`` to process
the data (for example, upload to TOS and rewrite with an accessible URL).
Typically used together with the ``intercept_new_message`` decorator.

Args:
self: Runner instance.
process_func: An async processing function with a signature like
``(part, app_name, user_id, session_id)`` used to handle
``inline_data`` in the message (e.g., upload to TOS).
new_message (google.genai.types.Content): Incoming user message.
user_id (str): User identifier.
session_id (str): Session identifier.

Returns:
None

Raises:
Exception: Propagated if ``process_func`` raises and does not handle it.
"""
if new_message.parts:
for part in new_message.parts:
if part.inline_data and self.upload_inline_data_to_tos:
Expand All @@ -60,10 +82,42 @@ async def pre_run_process(self, process_func, new_message, user_id, session_id):


def post_run_process(self):
"""Post-run hook executed after agent run.

This is currently a no-op placeholder and can be extended to perform
cleanup or finalize logic after a run.

Args:
self: Runner instance.

Returns:
None

Raises:
None
"""
return


def intercept_new_message(process_func):
"""Create a decorator to insert pre/post hooks around ``run_async`` calls.

Internally it invokes :func:`pre_run_process` to preprocess the incoming
message (e.g., upload image/video inline data to TOS), then iterates the
underlying event stream and finally calls :func:`post_run_process`.

Args:
process_func: Async function used to process ``inline_data`` (typically
``_upload_image_to_tos``).

Returns:
Callable: A decorator that can wrap ``run_async``.

Raises:
Exception: May propagate exceptions raised by the wrapped function or
the pre-processing step.
"""

def decorator(func):
@functools.wraps(func)
async def wrapper(
Expand Down Expand Up @@ -97,7 +151,34 @@ def _convert_messages(
user_id: str,
session_id: str,
) -> list:
"""Convert VeADK formatted messages to Google ADK formatted messages."""
"""Convert a VeADK ``RunnerMessage`` into a list of Google ADK messages.

Supported inputs:
- ``str``: Single-turn text prompt.
- :class:`veadk.types.MediaMessage`: Single-turn multimodal prompt (text + image/video).
- ``list``: A list of the above types (multi-turn with mixed text and multimodal).

For multimodal inputs, this reads the local media file bytes and detects
the MIME type via ``filetype``; only ``image/*`` and ``video/*`` are supported.

Args:
messages (RunnerMessage): Input message or list of messages to convert.
app_name (str): App name (not directly used; kept for consistency with upload path).
user_id (str): User ID (not directly used; kept for consistency with upload path).
session_id (str): Session ID (not directly used; kept for consistency with upload path).

Returns:
list[google.genai.types.Content]: Converted ADK messages.

Raises:
ValueError: If the message type is unknown or media type is unrecognized.
AssertionError: If the media MIME type is not supported (only image/* and video/*).

Note:
This function only performs structural conversion. To upload inline media
to an object store and rewrite URLs, use it together with
``intercept_new_message`` and ``_upload_image_to_tos``.
"""
if isinstance(messages, str):
_messages = [types.Content(role="user", parts=[types.Part(text=messages)])]
elif isinstance(messages, MediaMessage):
Expand Down Expand Up @@ -146,6 +227,25 @@ def _convert_messages(
async def _upload_image_to_tos(
part: genai.types.Part, app_name: str, user_id: str, session_id: str
) -> None:
"""Upload inline media data in a message part to TOS and rewrite its URL.

When ``part.inline_data`` has both ``display_name`` (original filename) and
``data`` (bytes), it generates an object storage path based on
``app_name``, ``user_id`` and ``session_id``. After upload, it replaces
``display_name`` with a signed URL.

Args:
part (google.genai.types.Part): Message part containing ``inline_data``.
app_name (str): App name.
user_id (str): User ID.
session_id (str): Session ID.

Returns:
None

Raises:
None: All exceptions are caught and logged; nothing is propagated.
"""
try:
if part.inline_data and part.inline_data.display_name and part.inline_data.data:
from veadk.integrations.ve_tos.ve_tos import VeTOS
Expand All @@ -164,6 +264,51 @@ async def _upload_image_to_tos(


class Runner(ADKRunner):
"""VeADK Runner that augments ADK with session, memory, tracing, and media upload.

This class builds on Google ADK's ``Runner`` and adds:
- Integration with short-term memory (ShortTermMemory) for auto session management.
- Optional long-term memory integration and session persistence.
- New message interception and media upload to TOS.
- Tracing dump and Trace ID logging.
- A simplified ``run`` entry that supports multi-turn text/multimodal inputs.

Attributes:
user_id (str): Default user ID.
long_term_memory: Long-term memory service instance, or ``None`` if not set.
short_term_memory (veadk.memory.short_term_memory.ShortTermMemory | None):
Short-term memory instance used to auto-create/manage sessions.
upload_inline_data_to_tos (bool): Whether to upload inline media to TOS while running.
session_service: Session service instance (may come from short-term memory).
memory_service: Memory service instance (may come from agent's long-term memory).
app_name (str): Application name used in session management and object pathing.

Examples:
Create a runner and perform a text-only interaction:

```python
from veadk.runner import Runner
from veadk.agent import Agent # assume it's properly constructed
runner = Runner(agent=my_agent, app_name="demo_app", user_id="u1")
output = await runner.run("Hello")
print(output)
```

Send multimodal (text + image):

```python
from veadk.types import MediaMessage
msg = MediaMessage(text="Describe the image", media="/path/to/image.png")
output = await runner.run(msg, upload_inline_data_to_tos=True)
print(output)
```

Note:
This class wraps the parent ``run_async`` at initialization to insert media
upload and post-run handling. If you override the underlying ``run_async``,
ensure it remains compatible with this interception logic.
"""

def __init__(
self,
agent: BaseAgent | Agent,
Expand All @@ -174,6 +319,33 @@ def __init__(
*args,
**kwargs,
) -> None:
"""Initialize a Runner instance.

Selects the session service based on provided short-term memory or an
external ``session_service``. If long-term memory or an external
``memory_service`` is provided, the passed service is preferred. After
construction, it injects a message interception layer into the parent's
``run_async`` to support inline media upload and post-run handling.

Args:
agent (google.adk.agents.base_agent.BaseAgent | veadk.agent.Agent):
The agent instance used to run interactions.
short_term_memory (ShortTermMemory | None): Optional short-term memory; if
not provided and no external ``session_service`` is supplied, an in-memory
session service will be created.
app_name (str): Application name. Defaults to ``"veadk_default_app"``.
user_id (str): Default user ID. Defaults to ``"veadk_default_user"``.
upload_inline_data_to_tos (bool): Whether to enable inline media upload. Defaults to ``False``.
*args: Positional args passed through to ``ADKRunner``.
**kwargs: Keyword args passed through to ``ADKRunner``; may include
``session_service`` and ``memory_service`` to override defaults.

Returns:
None

Raises:
None
"""
self.user_id = user_id
self.long_term_memory = None
self.short_term_memory = short_term_memory
Expand Down Expand Up @@ -238,6 +410,30 @@ async def run(
save_tracing_data: bool = False,
upload_inline_data_to_tos: bool = False,
):
"""Run a conversation with multi-turn text and multimodal inputs.

When short-term memory is configured, a session is auto-created as needed.
Inputs are converted into ADK message format. If ``upload_inline_data_to_tos``
is ``True``, media upload is enabled temporarily for this run (does not change
the Runner's global setting).

Args:
messages (RunnerMessage): Input messages (``str``, ``MediaMessage`` or a list of them).
user_id (str): Override default user ID; if empty, uses the constructed ``user_id``.
session_id (str): Session ID. Defaults to a timestamp-based temporary ID.
run_config (google.adk.agents.RunConfig | None): Run config; if ``None``, a default
config is created using the environment var ``MODEL_AGENT_MAX_LLM_CALLS``.
save_tracing_data (bool): Whether to dump tracing data to disk after the run. Defaults to ``False``.
upload_inline_data_to_tos (bool): Whether to enable media upload only for this run. Defaults to ``False``.

Returns:
str: The textual output from the last event, if present; otherwise an empty string.

Raises:
ValueError: If an input contains an unsupported or unrecognized media type.
AssertionError: If a media MIME type is not among ``image/*`` or ``video/*``.
Exception: Exceptions from the underlying ADK/Agent execution may propagate.
"""
if upload_inline_data_to_tos:
_upload_inline_data_to_tos = self.upload_inline_data_to_tos
self.upload_inline_data_to_tos = upload_inline_data_to_tos
Expand Down Expand Up @@ -302,6 +498,17 @@ async def run(
return final_output

def get_trace_id(self) -> str:
"""Get the Trace ID from the current agent's tracer.

If the agent is not a :class:`veadk.agent.Agent` or no tracer is configured,
returns ``"<unknown_trace_id>"``.

Returns:
str: The Trace ID or ``"<unknown_trace_id>"``.

Raises:
None
"""
if not isinstance(self.agent, Agent):
logger.warning(
("The agent is not an instance of VeADK Agent, no trace id provided.")
Expand All @@ -322,6 +529,17 @@ def get_trace_id(self) -> str:
return "<unknown_trace_id>"

def _print_trace_id(self) -> None:
"""Log the current tracer's Trace ID.

If the agent is not a :class:`veadk.agent.Agent` or no tracer is configured,
nothing is printed.

Returns:
None

Raises:
None
"""
if not isinstance(self.agent, Agent):
logger.warning(
("The agent is not an instance of VeADK Agent, no trace id provided.")
Expand All @@ -342,6 +560,21 @@ def _print_trace_id(self) -> None:
return

def save_tracing_file(self, session_id: str) -> str:
"""Dump tracing data to disk and return the last written path.

Only effective when the agent is one of
Agent/SequentialAgent/ParallelAgent/LoopAgent and a tracer is configured;
otherwise returns an empty string.

Args:
session_id (str): Session ID used to associate the tracing with a session.

Returns:
str: The tracing file path; returns an empty string on failure or when no tracer.

Raises:
None: All errors are logged and an empty string is returned.
"""
if not isinstance(
self.agent, (Agent, SequentialAgent, ParallelAgent, LoopAgent)
):
Expand All @@ -367,6 +600,18 @@ def save_tracing_file(self, session_id: str) -> str:
return ""

async def save_eval_set(self, session_id: str, eval_set_id: str = "default") -> str:
"""Save the current session as part of an evaluation set and return its path.

Args:
session_id (str): Session ID.
eval_set_id (str): Evaluation set identifier. Defaults to ``"default"``.

Returns:
str: The exported evaluation set file path.

Raises:
Exception: Propagated if the underlying export logic raises.
"""
eval_set_recorder = EvalSetRecorder(self.session_service, eval_set_id)
eval_set_path = await eval_set_recorder.dump(
self.app_name, self.user_id, session_id
Expand All @@ -376,6 +621,23 @@ async def save_eval_set(self, session_id: str, eval_set_id: str = "default") ->
async def save_session_to_long_term_memory(
self, session_id: str, user_id: str = "", app_name: str = ""
) -> None:
"""Save the specified session to long-term memory.

If ``long_term_memory`` is not configured, the function logs a warning and returns.
It fetches the session from the session service and then calls the long-term memory's
``add_session_to_memory`` for persistence.

Args:
session_id (str): Session ID.
user_id (str): Optional; override default user ID. If empty, uses ``self.user_id``.
app_name (str): Optional; override default app name. If empty, uses ``self.app_name``.

Returns:
None

Raises:
Exception: May propagate if the underlying memory service raises during write.
"""
if not self.long_term_memory:
logger.warning("Long-term memory is not enabled. Failed to save session.")
return
Expand Down