5050from ..converters .request_converter import convert_a2a_request_to_agent_run_request
5151from ..converters .utils import _get_adk_metadata_key
5252from ..experimental import a2a_experimental
53+ from .config import ExecuteInterceptor
54+ from .executor_context import ExecutorContext
5355from .task_result_aggregator import TaskResultAggregator
56+ from .utils import execute_after_agent_interceptors
57+ from .utils import execute_after_event_interceptors
58+ from .utils import execute_before_agent_interceptors
5459
5560logger = logging .getLogger ('google_adk.' + __name__ )
5661
@@ -70,6 +75,8 @@ class A2aAgentExecutorConfig(BaseModel):
7075 )
7176 event_converter : AdkEventToA2AEventsConverter = convert_event_to_a2a_events
7277
78+ execute_interceptors : Optional [list [ExecuteInterceptor ]] = None
79+
7380
7481@a2a_experimental
7582class A2aAgentExecutor (AgentExecutor ):
@@ -135,6 +142,10 @@ async def execute(
135142 if not context .message :
136143 raise ValueError ('A2A request must have a message' )
137144
145+ context = await execute_before_agent_interceptors (
146+ context , self ._config .execute_interceptors
147+ )
148+
138149 # for new task, create a task submitted event
139150 if not context .current_task :
140151 await event_queue .enqueue_event (
@@ -202,6 +213,13 @@ async def _handle_request(
202213 run_config = run_request .run_config ,
203214 )
204215
216+ self ._executor_context = ExecutorContext (
217+ app_name = runner .app_name ,
218+ user_id = run_request .user_id ,
219+ session_id = run_request .session_id ,
220+ runner = runner ,
221+ )
222+
205223 # publish the task working event
206224 await event_queue .enqueue_event (
207225 TaskStatusUpdateEvent (
@@ -230,6 +248,15 @@ async def _handle_request(
230248 context .context_id ,
231249 self ._config .gen_ai_part_converter ,
232250 ):
251+ a2a_event = await execute_after_event_interceptors (
252+ a2a_event ,
253+ self ._executor_context ,
254+ adk_event ,
255+ self ._config .execute_interceptors ,
256+ )
257+ if a2a_event is None :
258+ continue
259+
233260 task_result_aggregator .process_event (a2a_event )
234261 await event_queue .enqueue_event (a2a_event )
235262
@@ -253,31 +280,34 @@ async def _handle_request(
253280 )
254281 )
255282 # public the final status update event
256- await event_queue .enqueue_event (
257- TaskStatusUpdateEvent (
258- task_id = context .task_id ,
259- status = TaskStatus (
260- state = TaskState .completed ,
261- timestamp = datetime .now (timezone .utc ).isoformat (),
262- ),
263- context_id = context .context_id ,
264- final = True ,
265- )
283+ final_event = TaskStatusUpdateEvent (
284+ task_id = context .task_id ,
285+ status = TaskStatus (
286+ state = TaskState .completed ,
287+ timestamp = datetime .now (timezone .utc ).isoformat (),
288+ ),
289+ context_id = context .context_id ,
290+ final = True ,
266291 )
267292 else :
268- await event_queue .enqueue_event (
269- TaskStatusUpdateEvent (
270- task_id = context .task_id ,
271- status = TaskStatus (
272- state = task_result_aggregator .task_state ,
273- timestamp = datetime .now (timezone .utc ).isoformat (),
274- message = task_result_aggregator .task_status_message ,
275- ),
276- context_id = context .context_id ,
277- final = True ,
278- )
293+ final_event = TaskStatusUpdateEvent (
294+ task_id = context .task_id ,
295+ status = TaskStatus (
296+ state = task_result_aggregator .task_state ,
297+ timestamp = datetime .now (timezone .utc ).isoformat (),
298+ message = task_result_aggregator .task_status_message ,
299+ ),
300+ context_id = context .context_id ,
301+ final = True ,
279302 )
280303
304+ final_event = await execute_after_agent_interceptors (
305+ self ._executor_context ,
306+ final_event ,
307+ self ._config .execute_interceptors ,
308+ )
309+ await event_queue .enqueue_event (final_event )
310+
281311 async def _prepare_session (
282312 self ,
283313 context : RequestContext ,
0 commit comments