Skip to content

Commit 6e29ec0

Browse files
committed
refactor(ve_identity): introduce BaseRunProcessor to decouple Agent Identity dependency
## Problem Agent class had a hard dependency on Agent Identity's AuthRequestProcessor, causing all agents to depend on Agent Identity even when authentication is not needed. This violates the Dependency Inversion Principle and could break existing agents. ## Solution Introduce an abstract processor layer following the Dependency Inversion Principle: 1. Created `veadk/processors/` package with: - `BaseRunProcessor`: Abstract base class for runtime processors - `NoOpRunProcessor`: Default no-op implementation 2. Modified `veadk/agent.py`: - Added `run_processor: Optional[BaseRunProcessor]` field - Changed dependency from AuthRequestProcessor to BaseRunProcessor - Initialize with NoOpRunProcessor by default - Updated `run()` method parameter from `auth_request_processor` to `run_processor` 3. Updated `veadk/integrations/ve_identity/auth_processor.py`: - Made AuthRequestProcessor inherit from BaseRunProcessor - Renamed main method from `with_auth_loop` to `process_run`
1 parent 1ba91e8 commit 6e29ec0

File tree

4 files changed

+211
-26
lines changed

4 files changed

+211
-26
lines changed

veadk/agent.py

Lines changed: 47 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
DEFAULT_MODEL_EXTRA_CONFIG,
3434
)
3535
from veadk.evaluation import EvalSetRecorder
36-
from veadk.integrations.ve_identity import AuthRequestProcessor
36+
from veadk.processors import BaseRunProcessor, NoOpRunProcessor
3737
from veadk.knowledgebase import KnowledgeBase
3838
from veadk.memory.long_term_memory import LongTermMemory
3939
from veadk.memory.short_term_memory import ShortTermMemory
@@ -98,9 +98,33 @@ class Agent(LlmAgent):
9898
tracers: list[BaseTracer] = []
9999
"""The tracers provided to agent."""
100100

101+
run_processor: Optional[BaseRunProcessor] = Field(default=None, exclude=True)
102+
"""Optional run processor for intercepting and processing agent execution flows.
103+
104+
The run processor can be used to implement cross-cutting concerns such as:
105+
- Authentication flows (e.g., OAuth2 via VeIdentity)
106+
- Request/response logging
107+
- Error handling and retry logic
108+
- Performance monitoring
109+
110+
If not provided, a NoOpRunProcessor will be used by default.
111+
112+
Example:
113+
from veadk.integrations.ve_identity import AuthRequestProcessor
114+
115+
agent = Agent(
116+
name="my-agent",
117+
run_processor=AuthRequestProcessor()
118+
)
119+
"""
120+
101121
def model_post_init(self, __context: Any) -> None:
102122
super().model_post_init(None) # for sub_agents init
103123

124+
# Initialize run_processor if not provided
125+
if self.run_processor is None:
126+
self.run_processor = NoOpRunProcessor()
127+
104128
# combine user model config with VeADK defaults
105129
headers = DEFAULT_MODEL_EXTRA_CONFIG["extra_headers"].copy()
106130
body = DEFAULT_MODEL_EXTRA_CONFIG["extra_body"].copy()
@@ -168,11 +192,27 @@ async def _run(
168192
session_id: str,
169193
message: types.Content,
170194
stream: bool,
171-
auth_request_processor: AuthRequestProcessor,
195+
run_processor: Optional[BaseRunProcessor] = None,
172196
):
197+
"""Internal run method with run processor support.
198+
199+
Args:
200+
runner: The Runner instance.
201+
user_id: User ID for the session.
202+
session_id: Session ID.
203+
message: The message to send.
204+
stream: Whether to stream the output.
205+
run_processor: Optional run processor to use. If not provided, uses self.run_processor.
206+
207+
Returns:
208+
The final output string.
209+
"""
173210
stream_mode = StreamingMode.SSE if stream else StreamingMode.NONE
174211

175-
@auth_request_processor.with_auth_loop(runner=runner, message=message)
212+
# Use provided run_processor or fall back to instance's run_processor
213+
processor = run_processor or self.run_processor
214+
215+
@processor.process_run(runner=runner, message=message)
176216
async def event_generator():
177217
async for event in runner.run_async(
178218
user_id=user_id,
@@ -248,7 +288,7 @@ async def run(
248288
collect_runtime_data: bool = False,
249289
eval_set_id: str = "",
250290
save_session_to_memory: bool = False,
251-
auth_request_processor: AuthRequestProcessor = AuthRequestProcessor(),
291+
run_processor: Optional[BaseRunProcessor] = None,
252292
):
253293
"""Running the agent. The runner and session service will be created automatically.
254294
@@ -265,6 +305,8 @@ async def run(
265305
collect_runtime_data (bool, optional): Whether to collect runtime data. Defaults to False.
266306
eval_set_id (str, optional): The id of the eval set. Defaults to "".
267307
save_session_to_memory (bool, optional): Whether to save this turn session to memory. Defaults to False.
308+
run_processor (Optional[BaseRunProcessor], optional): Optional run processor to use for this run.
309+
If not provided, uses the agent's default run_processor. Defaults to None.
268310
"""
269311

270312
logger.warning(
@@ -299,7 +341,7 @@ async def run(
299341
for _prompt in prompt:
300342
message = types.Content(role="user", parts=[types.Part(text=_prompt)])
301343
final_output = await self._run(
302-
runner, user_id, session_id, message, stream, auth_request_processor
344+
runner, user_id, session_id, message, stream, run_processor
303345
)
304346

305347
# VeADK features

veadk/integrations/ve_identity/auth_processor.py

Lines changed: 17 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,13 @@
1919
import asyncio
2020
import json
2121
import time
22-
from typing import Awaitable, Callable, Optional, TYPE_CHECKING
22+
from typing import Any, Awaitable, Callable, Optional, TYPE_CHECKING
2323

2424
from google.adk.auth import AuthConfig
2525
from google.genai import types
2626
from google.adk.auth.auth_credential import OAuth2Auth
2727

28+
from veadk.processors.base_run_processor import BaseRunProcessor
2829
from veadk.integrations.ve_identity.identity_client import IdentityClient
2930
from veadk.integrations.ve_identity.models import AuthRequestConfig, OAuth2AuthPoller
3031
from veadk.integrations.ve_identity.utils import (
@@ -155,19 +156,7 @@ async def poll_for_auth(self) -> OAuth2Auth:
155156
)
156157

157158

158-
class _NoOpAuthProcessor:
159-
"""No-op auth processor that doesn't modify the event generator."""
160-
161-
def with_auth_loop(self, runner, message):
162-
"""Return a decorator that does nothing."""
163-
164-
def decorator(func):
165-
return func
166-
167-
return decorator
168-
169-
170-
class AuthRequestProcessor:
159+
class AuthRequestProcessor(BaseRunProcessor):
171160
"""Processor for handling authentication requests in agent conversations.
172161
173162
This class manages the OAuth2 authentication flow when tools require user authorization.
@@ -277,19 +266,22 @@ async def process_auth_request(
277266
logger.info(f"Auth request {auth_request_event_id} processed successfully")
278267
return auth_content
279268

280-
def with_auth_loop(
269+
def process_run(
281270
self,
282271
runner: Runner,
283272
message: types.Content,
284-
task_updater: Optional[TaskUpdater] = None,
273+
**kwargs: Any,
285274
):
286-
"""Decorator to add authentication loop handling to event generators.
275+
"""Process the agent run by wrapping the event generator with authentication loop.
276+
277+
This method implements the BaseRunProcessor interface and adds authentication
278+
loop handling to event generators.
287279
288280
This decorator intercepts runner.run_async calls and automatically handles
289281
authentication loops. The event_generator code can remain completely unchanged!
290282
291283
Usage example:
292-
@auth_processor.with_auth_loop(
284+
@auth_processor.process_run(
293285
runner=runner,
294286
message=message,
295287
)
@@ -319,10 +311,14 @@ async def event_generator():
319311
Args:
320312
runner: Runner instance (will be wrapped).
321313
message: Initial message to send.
314+
**kwargs: Additional keyword arguments. Supports:
315+
- task_updater: Optional TaskUpdater for status updates.
322316
323317
Returns:
324318
Decorated generator function.
325319
"""
320+
# Extract task_updater from kwargs
321+
task_updater = kwargs.get("task_updater")
326322

327323
def decorator(event_generator_func):
328324
async def wrapper():
@@ -338,13 +334,13 @@ async def wrapper():
338334
# Create a wrapped runner to intercept run_async calls
339335
original_run_async = runner.run_async
340336

341-
async def wrapped_run_async(**kwargs):
337+
async def wrapped_run_async(**run_kwargs):
342338
nonlocal auth_request_event_id, auth_config
343339

344340
# Override the message with the current message
345-
kwargs["new_message"] = current_message
341+
run_kwargs["new_message"] = current_message
346342

347-
async for event in original_run_async(**kwargs):
343+
async for event in original_run_async(**run_kwargs):
348344
# Detect authentication events
349345
if is_pending_auth_event(event):
350346
auth_request_event_id = get_function_call_id(event)

veadk/processors/__init__.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Copyright (c) 2025 Beijing Volcano Engine Technology Co., Ltd. and/or its affiliates.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Runtime processors for VeADK agents.
16+
17+
This module provides the abstract base classes for runtime processors that can
18+
intercept and process agent execution flows.
19+
"""
20+
21+
from veadk.processors.base_run_processor import BaseRunProcessor, NoOpRunProcessor
22+
23+
__all__ = [
24+
"BaseRunProcessor",
25+
"NoOpRunProcessor",
26+
]
27+
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
# Copyright (c) 2025 Beijing Volcano Engine Technology Co., Ltd. and/or its affiliates.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Base run processor for intercepting and processing agent execution."""
16+
17+
from __future__ import annotations
18+
19+
from abc import ABC, abstractmethod
20+
from typing import TYPE_CHECKING, Any, AsyncGenerator, Callable
21+
22+
if TYPE_CHECKING:
23+
from google.genai import types
24+
from veadk.runner import Runner
25+
26+
27+
class BaseRunProcessor(ABC):
28+
"""Abstract base class for runtime processors.
29+
30+
A run processor can intercept and modify the agent execution flow by wrapping
31+
the event generator function. This is useful for implementing cross-cutting
32+
concerns such as:
33+
- Authentication flows (e.g., OAuth2)
34+
- Request/response logging
35+
- Error handling and retry logic
36+
- Performance monitoring
37+
- Custom event filtering or transformation
38+
39+
The processor uses a decorator pattern to wrap the event generator, allowing
40+
it to:
41+
1. Intercept events from runner.run_async
42+
2. Process or modify events
43+
3. Inject additional events (e.g., authentication requests)
44+
4. Control the execution flow (e.g., retry loops)
45+
46+
Example:
47+
class MyProcessor(BaseRunProcessor):
48+
def process_run(self, runner, message, **kwargs):
49+
def decorator(event_generator_func):
50+
async def wrapper():
51+
# Pre-processing
52+
async for event in event_generator_func():
53+
# Process each event
54+
yield event
55+
# Post-processing
56+
return wrapper
57+
return decorator
58+
"""
59+
60+
@abstractmethod
61+
def process_run(
62+
self,
63+
runner: Runner,
64+
message: types.Content,
65+
**kwargs: Any,
66+
) -> Callable[[Callable[[], AsyncGenerator]], Callable[[], AsyncGenerator]]:
67+
"""Process the agent run by wrapping the event generator.
68+
69+
This method returns a decorator that wraps the event generator function.
70+
The decorator can intercept events, modify them, or inject new events.
71+
72+
Args:
73+
runner: The Runner instance executing the agent.
74+
message: The initial message to send to the agent.
75+
**kwargs: Additional keyword arguments that may be needed by specific
76+
implementations (e.g., task_updater for status updates).
77+
78+
Returns:
79+
A decorator function that takes an event generator function and returns
80+
a wrapped event generator function.
81+
82+
Example:
83+
@processor.process_run(runner=runner, message=message)
84+
async def event_generator():
85+
async for event in runner.run_async(...):
86+
yield event
87+
"""
88+
pass
89+
90+
91+
class NoOpRunProcessor(BaseRunProcessor):
92+
"""No-op run processor that doesn't modify the event generator.
93+
94+
This is the default processor used when no specific processing is needed.
95+
It simply passes through all events without any modification.
96+
"""
97+
98+
def process_run(
99+
self,
100+
runner: Runner,
101+
message: types.Content,
102+
**kwargs: Any,
103+
) -> Callable[[Callable[[], AsyncGenerator]], Callable[[], AsyncGenerator]]:
104+
"""Return a decorator that does nothing.
105+
106+
Args:
107+
runner: The Runner instance (unused).
108+
message: The initial message (unused).
109+
**kwargs: Additional keyword arguments (unused).
110+
111+
Returns:
112+
A decorator that returns the original function unchanged.
113+
"""
114+
115+
def decorator(
116+
event_generator_func: Callable[[], AsyncGenerator]
117+
) -> Callable[[], AsyncGenerator]:
118+
return event_generator_func
119+
120+
return decorator

0 commit comments

Comments
 (0)