Skip to content

Commit 50afdd6

Browse files
kabirmaeste
andauthored
fix: Sending should happen in a background thread in the resubscribe test (#83)
Once we get the first evemt (the Task) the event queue will be open during the configurable sleepi that happens in the AgentExecutor. This is our window for when it is possible to resubscribe # Description Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly: - [x] Follow the [`CONTRIBUTING` Guide](../CONTRIBUTING.md). - [x] Make your Pull Request title in the <https://www.conventionalcommits.org/> specification. - Important Prefixes for [release-please](https://github.com/googleapis/release-please): - `fix:` which represents bug fixes, and correlates to a [SemVer](https://semver.org/) patch. - `feat:` represents a new feature, and correlates to a SemVer minor. - `feat!:`, or `fix!:`, `refactor!:`, etc., which represent a breaking change (indicated by the `!`) and will result in a SemVer major. - [x] Ensure the tests pass - [x] Appropriate READMEs were updated (if necessary) Fixes #<issue_number_goes_here> 🦕 --------- Co-authored-by: Stefano Maestri <stefano.maestri@javalinux.it>
1 parent b9a0343 commit 50afdd6

File tree

1 file changed

+122
-79
lines changed

1 file changed

+122
-79
lines changed

tests/optional/capabilities/test_streaming_methods.py

Lines changed: 122 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,16 @@
11
import asyncio
2-
import json
32
import logging
43
import os
5-
from typing import Any, AsyncGenerator, Dict, List, Optional
64
import uuid
75

86
import pytest
9-
import pytest_asyncio
107

118
from tck import agent_card_utils, config, message_utils
129
from tests.markers import optional_capability
1310
from tests.capability_validator import CapabilityValidator, skip_if_capability_not_declared
1411
from tests.utils.transport_helpers import (
1512
transport_send_streaming_message,
1613
transport_resubscribe_task,
17-
transport_send_message,
18-
extract_task_id_from_response,
1914
generate_test_message_id,
2015
)
2116

@@ -37,7 +32,6 @@
3732
"async_wait_for": BASE_TIMEOUT * 1.0, # 2.0s default (for asyncio.wait_for)
3833
}
3934

40-
4135
# SimpleSSEClient class removed - now using transport-agnostic streaming
4236

4337

@@ -230,47 +224,129 @@ async def test_tasks_resubscribe(sut_client, agent_card_data):
230224
if not validator.is_capability_declared("streaming"):
231225
pytest.skip("Streaming capability not declared - test not applicable")
232226

233-
# First, create a task via message/stream to get a task ID
234-
message_params = {
235-
"message": {
236-
"kind": "message",
237-
"messageId": generate_test_message_id("resubscribe"),
238-
"role": "user",
239-
"parts": [{"kind": "text", "text": "Test message for resubscribe"}],
240-
}
241-
}
242-
task_id = None # Initialize task_id
227+
# Synchronization events
228+
task_id_received = asyncio.Event()
243229

230+
task_id = None
231+
resubscribe_events = []
232+
stream_error = None
233+
resubscribe_error = None
234+
244235
try:
236+
# First, create a task via message/stream to get a task ID
237+
message_params = {
238+
"message": {
239+
"kind": "message",
240+
"messageId": "test-resubscribe-message-id-" + str(uuid.uuid4()),
241+
"role": "user",
242+
"parts": [{"kind": "text", "text": "Test message for resubscribe"}],
243+
}
244+
}
245+
245246
# Use transport-agnostic streaming message sending
246247
stream = transport_send_streaming_message(sut_client, message_params)
247-
248-
# Collect events to get task ID
249-
try:
250-
event_count = 0
251-
async for event in stream:
252-
event_count += 1
253-
logger.info(f"Processing streaming event #{event_count}: {event}")
254248

255-
# Look for task ID in the event
256-
if isinstance(event, dict):
257-
if "id" in event and "status" in event:
258-
# This looks like a Task object
259-
task_id = event["id"]
260-
logger.info(f"Captured task ID from stream: {task_id}")
249+
# Background task to process the initial stream
250+
async def process_initial_stream():
251+
nonlocal task_id, stream_error
252+
try:
253+
event_count = 0
254+
async for event in stream:
255+
event_count += 1
256+
logger.info(f"Processing streaming event #{event_count}: {event}")
257+
258+
# Look for task ID in the event
259+
if isinstance(event, dict):
260+
if "id" in event and "status" in event:
261+
# This looks like a Task object
262+
task_id = event["id"]
263+
logger.info(f"Captured task ID from stream: {task_id}")
264+
# Signal that task ID is available. This means that the event queue is open
265+
task_id_received.set()
266+
267+
# Safety break to prevent infinite loops
268+
if event_count >= 10:
269+
logger.warning("Hit event count limit while getting task ID")
261270
break
262271

263-
# Safety break to prevent infinite loops
264-
if event_count >= 10:
265-
logger.warning("Hit event count limit while getting task ID")
266-
break
272+
# Continue processing events even after getting task ID
273+
# to keep the stream alive for resubscribe testing
267274

268-
# Break after getting first event with task ID
269-
if task_id:
270-
break
271-
275+
except Exception as e:
276+
stream_error = e
277+
logger.error(f"Error in initial stream processing: {e}")
278+
task_id_received.set() # Signal completion even on error
279+
280+
# Background task to handle resubscription once task ID is available
281+
async def process_resubscribe():
282+
nonlocal resubscribe_events, resubscribe_error
283+
try:
284+
# Wait for task ID to be available
285+
await task_id_received.wait()
286+
287+
if task_id is None:
288+
logger.warning("No task ID available for resubscribe")
289+
return
290+
291+
logger.info(f"Starting resubscribe for task ID: {task_id}")
292+
293+
# Use transport-agnostic task resubscription
294+
resubscribe_stream = transport_resubscribe_task(sut_client, task_id)
295+
296+
event_count = 0
297+
async for event in resubscribe_stream:
298+
event_count += 1
299+
logger.info(f"Processing resubscribe event #{event_count}: {event}")
300+
resubscribe_events.append(event)
301+
302+
# Safety break to prevent infinite loops
303+
if event_count >= 10:
304+
logger.warning("Hit event count limit in resubscribe test, breaking")
305+
break
306+
307+
# Validate this is a proper A2A object
308+
assert isinstance(event, dict), "Resubscribe events must be objects"
309+
310+
# Check if this is a terminal event
311+
if "status" in event and isinstance(event["status"], dict):
312+
state = event["status"].get("state")
313+
if state in ["completed", "failed", "canceled"]:
314+
logger.info("Detected terminal event in resubscribe, ending stream processing.")
315+
break
316+
317+
# Collect a few events then break
318+
if len(resubscribe_events) >= 3:
319+
break
320+
321+
except Exception as e:
322+
resubscribe_error = e
323+
logger.error(f"Error in resubscribe processing: {e}")
324+
325+
# Start both background tasks
326+
initial_stream_task = asyncio.create_task(process_initial_stream())
327+
resubscribe_task = asyncio.create_task(process_resubscribe())
328+
329+
# Wait for both tasks to complete with timeout
330+
try:
331+
await asyncio.wait_for(
332+
asyncio.gather(initial_stream_task, resubscribe_task, return_exceptions=True),
333+
timeout=TIMEOUTS["async_wait_for"] * 2)
272334
except asyncio.TimeoutError:
273-
logger.warning("Timeout while getting task ID from initial stream")
335+
logger.warning("Timeout while waiting for stream processing and resubscribe")
336+
# Cancel tasks if they're still running
337+
initial_stream_task.cancel()
338+
resubscribe_task.cancel()
339+
340+
# Check for errors from the background tasks
341+
if stream_error:
342+
error_msg = str(stream_error).lower()
343+
if "501" in error_msg or "not implemented" in error_msg:
344+
pytest.fail(
345+
"Streaming capability declared in Agent Card but SUT returned error indicating not implemented. "
346+
"This is a specification violation - declared capabilities MUST be implemented."
347+
)
348+
else:
349+
raise stream_error
274350

275351
except Exception as e:
276352
error_msg = str(e).lower()
@@ -282,49 +358,13 @@ async def test_tasks_resubscribe(sut_client, agent_card_data):
282358
else:
283359
raise
284360

285-
# Now try to resubscribe to this task
361+
# Now validate the resubscribe results
286362
if task_id is None:
287363
pytest.skip("Could not capture task ID from initial stream - cannot test resubscribe")
288364

289-
try:
290-
# Use transport-agnostic task resubscription
291-
resubscribe_stream = transport_resubscribe_task(sut_client, task_id)
292-
events = []
293-
294-
try:
295-
event_count = 0
296-
async for event in resubscribe_stream:
297-
event_count += 1
298-
logger.info(f"Processing resubscribe event #{event_count}: {event}")
299-
events.append(event)
300-
301-
# Safety break to prevent infinite loops
302-
if event_count >= 10:
303-
logger.warning("Hit event count limit in resubscribe test, breaking")
304-
break
305-
306-
# Validate this is a proper A2A object
307-
assert isinstance(event, dict), "Resubscribe events must be objects"
308-
309-
# Check if this is a terminal event
310-
if "status" in event and isinstance(event["status"], dict):
311-
state = event["status"].get("state")
312-
if state in ["completed", "failed", "canceled"]:
313-
logger.info("Detected terminal event in resubscribe, ending stream processing.")
314-
break
315-
316-
# Collect a few events then break
317-
if len(events) >= 3:
318-
break
319-
320-
except asyncio.TimeoutError:
321-
logger.warning("Timeout while processing resubscribe stream")
322-
323-
# Validate that we got at least some events
324-
assert len(events) > 0, "Streaming capability declared but no events received from resubscribe stream"
325-
326-
except Exception as e:
327-
error_msg = str(e).lower()
365+
# Check for resubscribe errors
366+
if resubscribe_error:
367+
error_msg = str(resubscribe_error).lower()
328368
if "501" in error_msg or "not implemented" in error_msg:
329369
pytest.fail(
330370
"Streaming capability declared but tasks/resubscribe returned error indicating not implemented. "
@@ -333,7 +373,10 @@ async def test_tasks_resubscribe(sut_client, agent_card_data):
333373
elif "not found" in error_msg or "404" in error_msg:
334374
pytest.skip("Task expired before resubscribe test - this is implementation-dependent behavior")
335375
else:
336-
raise
376+
raise resubscribe_error
377+
378+
# Validate that we got at least some events from resubscribe
379+
assert len(resubscribe_events) > 0, "Streaming capability declared but no events received from resubscribe stream"
337380

338381

339382
@optional_capability

0 commit comments

Comments
 (0)