Skip to content

Commit a12333e

Browse files
committed
Type fixes and message stream fixes
Change-Id: I6b16e04e3ba0e9c01ba2268ab144e943017a8f2c
1 parent f63189f commit a12333e

File tree

6 files changed

+56
-32
lines changed

6 files changed

+56
-32
lines changed

examples/langgraph/agent_executor.py

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,16 @@
1-
from typing import Any
2-
31
from agent import CurrencyAgent
42
from typing_extensions import override
53

64
from a2a.server.agent_execution import AgentExecutor, RequestContext
7-
from a2a.server.events.event_queue import EventQueue, Event
8-
from a2a.utils import new_agent_text_message, new_text_artifact, new_task
5+
from a2a.server.events.event_queue import EventQueue
96
from a2a.types import (
107
Task,
11-
TextPart,
12-
TaskStatusUpdateEvent,
138
TaskArtifactUpdateEvent,
14-
TaskStatus,
159
TaskState,
10+
TaskStatus,
11+
TaskStatusUpdateEvent,
1612
)
13+
from a2a.utils import new_agent_text_message, new_task, new_text_artifact
1714

1815

1916
class CurrencyAgentExecutor(AgentExecutor):
@@ -31,6 +28,9 @@ async def execute(
3128
query = context.get_user_input()
3229
task = context.current_task
3330

31+
if not context.message:
32+
raise Exception('No message provided')
33+
3434
if not task:
3535
task = new_task(context.message)
3636
event_queue.enqueue_event(task)
@@ -92,6 +92,7 @@ async def execute(
9292
)
9393

9494
@override
95-
async def cancel(self, request: RequestContext, event_queue: EventQueue) -> Task | None:
96-
raise Exception("cancel not supported")
97-
95+
async def cancel(
96+
self, request: RequestContext, event_queue: EventQueue
97+
) -> Task | None:
98+
raise Exception('cancel not supported')

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ requires-python = ">=3.13"
77
dependencies = [
88
"httpx>=0.28.1",
99
"httpx-sse>=0.4.0",
10+
"mypy>=1.15.0",
1011
"pydantic>=2.11.3",
1112
"sse-starlette>=2.3.3",
1213
"starlette>=0.46.2",

src/a2a/server/events/event_consumer.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,11 @@ async def consume_one(self) -> Event:
2929
logger.debug('Attempting to consume one event.')
3030
try:
3131
event = await self.queue.dequeue_event(no_wait=True)
32-
except asyncio.QueueEmpty:
32+
except asyncio.QueueEmpty as e:
3333
logger.warning('Event queue was empty in consume_one.')
3434
raise ServerError(
3535
InternalError(message='Agent did not return any response')
36-
) from None
36+
) from e
3737

3838
logger.debug(f'Dequeued event of type: {type(event)} in consume_one.')
3939

src/a2a/server/tasks/result_aggregator.py

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -28,22 +28,19 @@ class ResultAggregator:
2828

2929
def __init__(self, task_manager: TaskManager):
3030
self.task_manager = task_manager
31-
self._message = None
31+
self._message: Message | None = None
3232

3333
@property
34-
def current_result(self) -> Task | Message | None:
34+
async def current_result(self) -> Task | Message | None:
3535
if self._message:
3636
return self._message
37-
return self.task_manager.get_task()
37+
return await self.task_manager.get_task()
3838

3939
async def consume_and_emit(
4040
self, consumer: EventConsumer
4141
) -> AsyncGenerator[Event, None]:
4242
"""Processes the event stream and emits the same event stream out."""
4343
async for event in consumer.consume_all():
44-
if isinstance(event, Message):
45-
self._current_task_or_message = event
46-
break
4744
await self.task_manager.process(event)
4845
yield event
4946

@@ -53,17 +50,18 @@ async def consume_all(
5350
"""Processes the entire event stream and returns the final result."""
5451
async for event in consumer.consume_all():
5552
if isinstance(event, Message):
56-
self._current_task_or_message = event
53+
self._message = event
5754
return event
5855
await self.task_manager.process(event)
5956
return await self.task_manager.get_task()
6057

61-
async def consume_and_emit_task(
62-
self, consumer: EventConsumer
63-
) -> AsyncGenerator[Event, None]:
64-
"""Processes the event stream and emits the current state of the task."""
65-
async for event in consumer.consume_all():
66-
if isinstance(event, Message):
67-
self._current_task_or_message = event
68-
break
69-
yield await self.task_manager.process(event)
58+
# async def consume_and_emit_task(
59+
# self, consumer: EventConsumer
60+
# ) -> AsyncGenerator[Event, None]:
61+
# """Processes the event stream and emits the current state of the task."""
62+
# async for event in consumer.consume_all():
63+
# if isinstance(event, Message):
64+
# self._current_task_or_message = event
65+
# break
66+
# await self.task_manager.process(event)
67+
# return await self.task_manager.get_task()

src/a2a/server/tasks/task_manager.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -127,14 +127,17 @@ async def ensure_task(
127127

128128
return task
129129

130-
async def process(self, event: Event) -> Task | Message:
130+
async def process(self, event: Event) -> Event:
131131
"""Processes an event, store the task state and return the task or message.
132132
133133
The returned Task or Message represent the current status of the result.
134134
"""
135-
if isinstance(event, Message):
136-
return Message
137-
return await self.save_task_event(event)
135+
if isinstance(
136+
event, Task | TaskStatusUpdateEvent | TaskArtifactUpdateEvent
137+
):
138+
await self.save_task_event(event)
139+
140+
return event
138141

139142
def _init_task_obj(self, task_id: str, context_id: str) -> Task:
140143
"""Initializes a new task object."""

uv.lock

Lines changed: 21 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)