20
20
from pydantic import BaseModel
21
21
22
22
from .. import _identifier
23
- from ..event_loop .event_loop import event_loop_cycle , run_tool
23
+ from ..event_loop .event_loop import event_loop_cycle
24
24
from ..handlers .callback_handler import PrintingCallbackHandler , null_callback_handler
25
25
from ..hooks import (
26
26
AfterInvocationEvent ,
35
35
from ..session .session_manager import SessionManager
36
36
from ..telemetry .metrics import EventLoopMetrics
37
37
from ..telemetry .tracer import get_tracer , serialize
38
+ from ..tools .executors import ConcurrentToolExecutor
39
+ from ..tools .executors ._executor import ToolExecutor
38
40
from ..tools .registry import ToolRegistry
39
41
from ..tools .watcher import ToolWatcher
40
42
from ..types .content import ContentBlock , Message , Messages
@@ -136,13 +138,14 @@ def caller(
136
138
"name" : normalized_name ,
137
139
"input" : kwargs .copy (),
138
140
}
141
+ tool_results : list [ToolResult ] = []
142
+ invocation_state = kwargs
139
143
140
144
async def acall () -> ToolResult :
141
- # Pass kwargs as invocation_state
142
- async for event in run_tool (self ._agent , tool_use , kwargs ):
145
+ async for event in ToolExecutor ._stream (self ._agent , tool_use , tool_results , invocation_state ):
143
146
_ = event
144
147
145
- return cast ( ToolResult , event )
148
+ return tool_results [ 0 ]
146
149
147
150
def tcall () -> ToolResult :
148
151
return asyncio .run (acall ())
@@ -208,6 +211,7 @@ def __init__(
208
211
state : Optional [Union [AgentState , dict ]] = None ,
209
212
hooks : Optional [list [HookProvider ]] = None ,
210
213
session_manager : Optional [SessionManager ] = None ,
214
+ tool_executor : Optional [ToolExecutor ] = None ,
211
215
):
212
216
"""Initialize the Agent with the specified configuration.
213
217
@@ -250,6 +254,7 @@ def __init__(
250
254
Defaults to None.
251
255
session_manager: Manager for handling agent sessions including conversation history and state.
252
256
If provided, enables session-based persistence and state management.
257
+ tool_executor: Definition of tool execution stragety (e.g., sequential, concurrent, etc.).
253
258
254
259
Raises:
255
260
ValueError: If agent id contains path separators.
@@ -324,6 +329,8 @@ def __init__(
324
329
if self ._session_manager :
325
330
self .hooks .add_hook (self ._session_manager )
326
331
332
+ self .tool_executor = tool_executor or ConcurrentToolExecutor ()
333
+
327
334
if hooks :
328
335
for hook in hooks :
329
336
self .hooks .add_hook (hook )
@@ -354,14 +361,21 @@ def tool_names(self) -> list[str]:
354
361
all_tools = self .tool_registry .get_all_tools_config ()
355
362
return list (all_tools .keys ())
356
363
357
- def __call__ (self , prompt : Union [ str , list [ContentBlock ]] , ** kwargs : Any ) -> AgentResult :
364
+ def __call__ (self , prompt : str | list [ContentBlock ] | Messages | None = None , ** kwargs : Any ) -> AgentResult :
358
365
"""Process a natural language prompt through the agent's event loop.
359
366
360
- This method implements the conversational interface (e.g., `agent("hello!")`). It adds the user's prompt to
361
- the conversation history, processes it through the model, executes any tool calls, and returns the final result.
367
+ This method implements the conversational interface with multiple input patterns:
368
+ - String input: `agent("hello!")`
369
+ - ContentBlock list: `agent([{"text": "hello"}, {"image": {...}}])`
370
+ - Message list: `agent([{"role": "user", "content": [{"text": "hello"}]}])`
371
+ - No input: `agent()` - uses existing conversation history
362
372
363
373
Args:
364
- prompt: User input as text or list of ContentBlock objects for multi-modal content.
374
+ prompt: User input in various formats:
375
+ - str: Simple text input
376
+ - list[ContentBlock]: Multi-modal content blocks
377
+ - list[Message]: Complete messages with roles
378
+ - None: Use existing conversation history
365
379
**kwargs: Additional parameters to pass through the event loop.
366
380
367
381
Returns:
@@ -380,14 +394,23 @@ def execute() -> AgentResult:
380
394
future = executor .submit (execute )
381
395
return future .result ()
382
396
383
- async def invoke_async (self , prompt : Union [str , list [ContentBlock ]], ** kwargs : Any ) -> AgentResult :
397
+ async def invoke_async (
398
+ self , prompt : str | list [ContentBlock ] | Messages | None = None , ** kwargs : Any
399
+ ) -> AgentResult :
384
400
"""Process a natural language prompt through the agent's event loop.
385
401
386
- This method implements the conversational interface (e.g., `agent("hello!")`). It adds the user's prompt to
387
- the conversation history, processes it through the model, executes any tool calls, and returns the final result.
402
+ This method implements the conversational interface with multiple input patterns:
403
+ - String input: Simple text input
404
+ - ContentBlock list: Multi-modal content blocks
405
+ - Message list: Complete messages with roles
406
+ - No input: Use existing conversation history
388
407
389
408
Args:
390
- prompt: User input as text or list of ContentBlock objects for multi-modal content.
409
+ prompt: User input in various formats:
410
+ - str: Simple text input
411
+ - list[ContentBlock]: Multi-modal content blocks
412
+ - list[Message]: Complete messages with roles
413
+ - None: Use existing conversation history
391
414
**kwargs: Additional parameters to pass through the event loop.
392
415
393
416
Returns:
@@ -404,7 +427,7 @@ async def invoke_async(self, prompt: Union[str, list[ContentBlock]], **kwargs: A
404
427
405
428
return cast (AgentResult , event ["result" ])
406
429
407
- def structured_output (self , output_model : Type [T ], prompt : Optional [ Union [ str , list [ContentBlock ]]] = None ) -> T :
430
+ def structured_output (self , output_model : Type [T ], prompt : str | list [ContentBlock ] | Messages | None = None ) -> T :
408
431
"""This method allows you to get structured output from the agent.
409
432
410
433
If you pass in a prompt, it will be used temporarily without adding it to the conversation history.
@@ -416,7 +439,11 @@ def structured_output(self, output_model: Type[T], prompt: Optional[Union[str, l
416
439
Args:
417
440
output_model: The output model (a JSON schema written as a Pydantic BaseModel)
418
441
that the agent will use when responding.
419
- prompt: The prompt to use for the agent (will not be added to conversation history).
442
+ prompt: The prompt to use for the agent in various formats:
443
+ - str: Simple text input
444
+ - list[ContentBlock]: Multi-modal content blocks
445
+ - list[Message]: Complete messages with roles
446
+ - None: Use existing conversation history
420
447
421
448
Raises:
422
449
ValueError: If no conversation history or prompt is provided.
@@ -430,7 +457,7 @@ def execute() -> T:
430
457
return future .result ()
431
458
432
459
async def structured_output_async (
433
- self , output_model : Type [T ], prompt : Optional [ Union [ str , list [ContentBlock ]]] = None
460
+ self , output_model : Type [T ], prompt : str | list [ContentBlock ] | Messages | None = None
434
461
) -> T :
435
462
"""This method allows you to get structured output from the agent.
436
463
@@ -455,12 +482,8 @@ async def structured_output_async(
455
482
try :
456
483
if not self .messages and not prompt :
457
484
raise ValueError ("No conversation history or prompt provided" )
458
- # Create temporary messages array if prompt is provided
459
- if prompt :
460
- content : list [ContentBlock ] = [{"text" : prompt }] if isinstance (prompt , str ) else prompt
461
- temp_messages = self .messages + [{"role" : "user" , "content" : content }]
462
- else :
463
- temp_messages = self .messages
485
+
486
+ temp_messages : Messages = self .messages + self ._convert_prompt_to_messages (prompt )
464
487
465
488
structured_output_span .set_attributes (
466
489
{
@@ -470,16 +493,16 @@ async def structured_output_async(
470
493
"gen_ai.operation.name" : "execute_structured_output" ,
471
494
}
472
495
)
473
- for message in temp_messages :
474
- structured_output_span .add_event (
475
- f"gen_ai.{ message ['role' ]} .message" ,
476
- attributes = {"role" : message ["role" ], "content" : serialize (message ["content" ])},
477
- )
478
496
if self .system_prompt :
479
497
structured_output_span .add_event (
480
498
"gen_ai.system.message" ,
481
499
attributes = {"role" : "system" , "content" : serialize ([{"text" : self .system_prompt }])},
482
500
)
501
+ for message in temp_messages :
502
+ structured_output_span .add_event (
503
+ f"gen_ai.{ message ['role' ]} .message" ,
504
+ attributes = {"role" : message ["role" ], "content" : serialize (message ["content" ])},
505
+ )
483
506
events = self .model .structured_output (output_model , temp_messages , system_prompt = self .system_prompt )
484
507
async for event in events :
485
508
if "callback" in event :
@@ -492,16 +515,25 @@ async def structured_output_async(
492
515
finally :
493
516
self .hooks .invoke_callbacks (AfterInvocationEvent (agent = self ))
494
517
495
- async def stream_async (self , prompt : Union [str , list [ContentBlock ]], ** kwargs : Any ) -> AsyncIterator [Any ]:
518
+ async def stream_async (
519
+ self ,
520
+ prompt : str | list [ContentBlock ] | Messages | None = None ,
521
+ ** kwargs : Any ,
522
+ ) -> AsyncIterator [Any ]:
496
523
"""Process a natural language prompt and yield events as an async iterator.
497
524
498
- This method provides an asynchronous interface for streaming agent events, allowing
499
- consumers to process stream events programmatically through an async iterator pattern
500
- rather than callback functions. This is particularly useful for web servers and other
501
- async environments.
525
+ This method provides an asynchronous interface for streaming agent events with multiple input patterns:
526
+ - String input: Simple text input
527
+ - ContentBlock list: Multi-modal content blocks
528
+ - Message list: Complete messages with roles
529
+ - No input: Use existing conversation history
502
530
503
531
Args:
504
- prompt: User input as text or list of ContentBlock objects for multi-modal content.
532
+ prompt: User input in various formats:
533
+ - str: Simple text input
534
+ - list[ContentBlock]: Multi-modal content blocks
535
+ - list[Message]: Complete messages with roles
536
+ - None: Use existing conversation history
505
537
**kwargs: Additional parameters to pass to the event loop.
506
538
507
539
Yields:
@@ -525,13 +557,15 @@ async def stream_async(self, prompt: Union[str, list[ContentBlock]], **kwargs: A
525
557
"""
526
558
callback_handler = kwargs .get ("callback_handler" , self .callback_handler )
527
559
528
- content : list [ContentBlock ] = [{"text" : prompt }] if isinstance (prompt , str ) else prompt
529
- message : Message = {"role" : "user" , "content" : content }
560
+ # Process input and get message to add (if any)
561
+ messages = self ._convert_prompt_to_messages (prompt )
562
+
563
+ self .trace_span = self ._start_agent_trace_span (messages )
530
564
531
- self .trace_span = self ._start_agent_trace_span (message )
532
565
with trace_api .use_span (self .trace_span ):
533
566
try :
534
- events = self ._run_loop (message , invocation_state = kwargs )
567
+ events = self ._run_loop (messages , invocation_state = kwargs )
568
+
535
569
async for event in events :
536
570
if "callback" in event :
537
571
callback_handler (** event ["callback" ])
@@ -548,12 +582,12 @@ async def stream_async(self, prompt: Union[str, list[ContentBlock]], **kwargs: A
548
582
raise
549
583
550
584
async def _run_loop (
551
- self , message : Message , invocation_state : dict [str , Any ]
585
+ self , messages : Messages , invocation_state : dict [str , Any ]
552
586
) -> AsyncGenerator [dict [str , Any ], None ]:
553
587
"""Execute the agent's event loop with the given message and parameters.
554
588
555
589
Args:
556
- message : The user message to add to the conversation.
590
+ messages : The input messages to add to the conversation.
557
591
invocation_state: Additional parameters to pass to the event loop.
558
592
559
593
Yields:
@@ -564,7 +598,8 @@ async def _run_loop(
564
598
try :
565
599
yield {"callback" : {"init_event_loop" : True , ** invocation_state }}
566
600
567
- self ._append_message (message )
601
+ for message in messages :
602
+ self ._append_message (message )
568
603
569
604
# Execute the event loop cycle with retry logic for context limits
570
605
events = self ._execute_event_loop_cycle (invocation_state )
@@ -622,6 +657,34 @@ async def _execute_event_loop_cycle(self, invocation_state: dict[str, Any]) -> A
622
657
async for event in events :
623
658
yield event
624
659
660
+ def _convert_prompt_to_messages (self , prompt : str | list [ContentBlock ] | Messages | None ) -> Messages :
661
+ messages : Messages | None = None
662
+ if prompt is not None :
663
+ if isinstance (prompt , str ):
664
+ # String input - convert to user message
665
+ messages = [{"role" : "user" , "content" : [{"text" : prompt }]}]
666
+ elif isinstance (prompt , list ):
667
+ if len (prompt ) == 0 :
668
+ # Empty list
669
+ messages = []
670
+ # Check if all item in input list are dictionaries
671
+ elif all (isinstance (item , dict ) for item in prompt ):
672
+ # Check if all items are messages
673
+ if all (all (key in item for key in Message .__annotations__ .keys ()) for item in prompt ):
674
+ # Messages input - add all messages to conversation
675
+ messages = cast (Messages , prompt )
676
+
677
+ # Check if all items are content blocks
678
+ elif all (any (key in ContentBlock .__annotations__ .keys () for key in item ) for item in prompt ):
679
+ # Treat as List[ContentBlock] input - convert to user message
680
+ # This allows invalid structures to be passed through to the model
681
+ messages = [{"role" : "user" , "content" : cast (list [ContentBlock ], prompt )}]
682
+ else :
683
+ messages = []
684
+ if messages is None :
685
+ raise ValueError ("Input prompt must be of type: `str | list[Contentblock] | Messages | None`." )
686
+ return messages
687
+
625
688
def _record_tool_execution (
626
689
self ,
627
690
tool : ToolUse ,
@@ -687,15 +750,15 @@ def _record_tool_execution(
687
750
self ._append_message (tool_result_msg )
688
751
self ._append_message (assistant_msg )
689
752
690
- def _start_agent_trace_span (self , message : Message ) -> trace_api .Span :
753
+ def _start_agent_trace_span (self , messages : Messages ) -> trace_api .Span :
691
754
"""Starts a trace span for the agent.
692
755
693
756
Args:
694
- message : The user message .
757
+ messages : The input messages .
695
758
"""
696
759
model_id = self .model .config .get ("model_id" ) if hasattr (self .model , "config" ) else None
697
760
return self .tracer .start_agent_span (
698
- message = message ,
761
+ messages = messages ,
699
762
agent_name = self .name ,
700
763
model_id = model_id ,
701
764
tools = self .tool_names ,
0 commit comments