10
10
# pyright: reportUnknownVariableType=false
11
11
# pyright: reportUnknownArgumentType=false
12
12
13
+ import asyncio
13
14
import json
14
15
import multiprocessing
15
16
import socket
@@ -702,9 +703,6 @@ async def test_async_tools(server_transport: str, server_url: str) -> None:
702
703
assert async_result .operation is not None
703
704
token = async_result .operation .token
704
705
705
- # Poll for completion
706
- import asyncio
707
-
708
706
while True :
709
707
status = await session .get_operation_status (token )
710
708
if status .status == "completed" :
@@ -717,14 +715,51 @@ async def test_async_tools(server_transport: str, server_url: str) -> None:
717
715
break
718
716
elif status .status == "failed" :
719
717
pytest .fail (f"Async operation failed: { status .error } " )
720
- await asyncio .sleep (0.01 )
721
718
722
719
# Test hybrid tool (should work as sync by default)
723
720
hybrid_result = await session .call_tool ("hybrid_tool" , {"message" : "hello" })
724
721
assert len (hybrid_result .content ) == 1
725
722
assert isinstance (hybrid_result .content [0 ], TextContent )
726
723
assert "Hybrid result: HELLO" in hybrid_result .content [0 ].text
727
724
725
+ # Test long-running task with custom keep_alive
726
+ long_task_result = await session .call_tool ("long_running_task" , {"task_name" : "test_task" })
727
+ assert long_task_result .operation is not None
728
+ long_token = long_task_result .operation .token
729
+
730
+ while True :
731
+ status = await session .get_operation_status (long_token )
732
+ if status .status == "completed" :
733
+ final_result = await session .get_operation_result (long_token )
734
+ assert not final_result .result .isError
735
+ assert len (final_result .result .content ) == 1
736
+ content = final_result .result .content [0 ]
737
+ assert isinstance (content , TextContent )
738
+ assert "Long-running task 'test_task' finished with 30-minute keep_alive" in content .text
739
+ break
740
+ elif status .status == "failed" :
741
+ pytest .fail (f"Long-running task failed: { status .error } " )
742
+
743
+ # Test quick expiry task (should complete then expire)
744
+ quick_result = await session .call_tool ("quick_expiry_task" , {"message" : "test_expiry" })
745
+ assert quick_result .operation is not None
746
+ quick_token = quick_result .operation .token
747
+
748
+ # Wait for completion
749
+ while True :
750
+ status = await session .get_operation_status (quick_token )
751
+ if status .status == "completed" :
752
+ break
753
+ elif status .status == "failed" :
754
+ pytest .fail (f"Quick task failed: { status .error } " )
755
+
756
+ # Wait for expiry (2 seconds + buffer)
757
+ await asyncio .sleep (3 )
758
+
759
+ # Should now be expired
760
+ with pytest .raises (Exception ): # Should raise error when trying to access expired operation
761
+ await session .get_operation_result (quick_token )
762
+
728
763
729
764
# Test async tools example with legacy protocol
730
765
@pytest .mark .anyio
0 commit comments