Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions src/a2a/server/tasks/task_updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,30 @@ async def start_work(self, message: Message | None = None) -> None:
message=message,
)

async def cancel(self, message: Message | None = None) -> None:
"""Marks the task as cancelled and publishes a finalstatus update."""
await self.update_status(
TaskState.canceled, message=message, final=True
)

async def requires_input(
self, message: Message | None = None, final: bool = False
) -> None:
"""Marks the task as input required and publishes a status update."""
await self.update_status(
TaskState.input_required,
message=message,
final=final,
)

async def requires_auth(
self, message: Message | None = None, final: bool = False
) -> None:
"""Marks the task as auth required and publishes a status update."""
await self.update_status(
TaskState.auth_required, message=message, final=final
)

def new_agent_message(
self,
parts: list[Part],
Expand Down
148 changes: 148 additions & 0 deletions tests/server/tasks/test_task_updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,3 +324,151 @@ async def test_reject_with_message(task_updater, event_queue, sample_message):
assert event.status.state == TaskState.rejected
assert event.final is True
assert event.status.message == sample_message


@pytest.mark.asyncio
async def test_requires_input_without_message(task_updater, event_queue):
"""Test marking a task as input required without a message."""
await task_updater.requires_input()

event_queue.enqueue_event.assert_called_once()
event = event_queue.enqueue_event.call_args[0][0]

assert isinstance(event, TaskStatusUpdateEvent)
assert event.status.state == TaskState.input_required
assert event.final is False
assert event.status.message is None


@pytest.mark.asyncio
async def test_requires_input_with_message(
task_updater, event_queue, sample_message
):
"""Test marking a task as input required with a message."""
await task_updater.requires_input(message=sample_message)

event_queue.enqueue_event.assert_called_once()
event = event_queue.enqueue_event.call_args[0][0]

assert isinstance(event, TaskStatusUpdateEvent)
assert event.status.state == TaskState.input_required
assert event.final is False
assert event.status.message == sample_message


@pytest.mark.asyncio
async def test_requires_input_final_true(task_updater, event_queue):
"""Test marking a task as input required with final=True."""
await task_updater.requires_input(final=True)

event_queue.enqueue_event.assert_called_once()
event = event_queue.enqueue_event.call_args[0][0]

assert isinstance(event, TaskStatusUpdateEvent)
assert event.status.state == TaskState.input_required
assert event.final is True
assert event.status.message is None


@pytest.mark.asyncio
async def test_requires_input_with_message_and_final(
task_updater, event_queue, sample_message
):
"""Test marking a task as input required with message and final=True."""
await task_updater.requires_input(message=sample_message, final=True)

event_queue.enqueue_event.assert_called_once()
event = event_queue.enqueue_event.call_args[0][0]

assert isinstance(event, TaskStatusUpdateEvent)
assert event.status.state == TaskState.input_required
assert event.final is True
assert event.status.message == sample_message


@pytest.mark.asyncio
async def test_requires_auth_without_message(task_updater, event_queue):
"""Test marking a task as auth required without a message."""
await task_updater.requires_auth()

event_queue.enqueue_event.assert_called_once()
event = event_queue.enqueue_event.call_args[0][0]

assert isinstance(event, TaskStatusUpdateEvent)
assert event.status.state == TaskState.auth_required
assert event.final is False
assert event.status.message is None


@pytest.mark.asyncio
async def test_requires_auth_with_message(
task_updater, event_queue, sample_message
):
"""Test marking a task as auth required with a message."""
await task_updater.requires_auth(message=sample_message)

event_queue.enqueue_event.assert_called_once()
event = event_queue.enqueue_event.call_args[0][0]

assert isinstance(event, TaskStatusUpdateEvent)
assert event.status.state == TaskState.auth_required
assert event.final is False
assert event.status.message == sample_message


@pytest.mark.asyncio
async def test_requires_auth_final_true(task_updater, event_queue):
"""Test marking a task as auth required with final=True."""
await task_updater.requires_auth(final=True)

event_queue.enqueue_event.assert_called_once()
event = event_queue.enqueue_event.call_args[0][0]

assert isinstance(event, TaskStatusUpdateEvent)
assert event.status.state == TaskState.auth_required
assert event.final is True
assert event.status.message is None


@pytest.mark.asyncio
async def test_requires_auth_with_message_and_final(
task_updater, event_queue, sample_message
):
"""Test marking a task as auth required with message and final=True."""
await task_updater.requires_auth(message=sample_message, final=True)

event_queue.enqueue_event.assert_called_once()
event = event_queue.enqueue_event.call_args[0][0]

assert isinstance(event, TaskStatusUpdateEvent)
assert event.status.state == TaskState.auth_required
assert event.final is True
assert event.status.message == sample_message


@pytest.mark.asyncio
async def test_cancel_without_message(task_updater, event_queue):
"""Test marking a task as cancelled without a message."""
await task_updater.cancel()

event_queue.enqueue_event.assert_called_once()
event = event_queue.enqueue_event.call_args[0][0]

assert isinstance(event, TaskStatusUpdateEvent)
assert event.status.state == TaskState.canceled
assert event.final is True
assert event.status.message is None


@pytest.mark.asyncio
async def test_cancel_with_message(task_updater, event_queue, sample_message):
"""Test marking a task as cancelled with a message."""
await task_updater.cancel(message=sample_message)

event_queue.enqueue_event.assert_called_once()
event = event_queue.enqueue_event.call_args[0][0]

assert isinstance(event, TaskStatusUpdateEvent)
assert event.status.state == TaskState.canceled
assert event.final is True
assert event.status.message == sample_message
Loading