2222
2323
2424class FilteredParallelMessagesCollection (BaseModel ):
25+ """A collection of filtered parallel messages."""
26+
2527 database_results : dict [str , list ] = Field (default_factory = dict )
2628 disambiguation_requests : dict [str , list ] = Field (default_factory = dict )
2729
28- def add_identifier (self , identifier ):
30+ def add_identifier (self , identifier : str ):
31+ """Add an identifier to the collection.
32+
33+ Args:
34+ ----
35+ identifier (str): The identifier to add."""
2936 if identifier not in self .database_results :
3037 self .database_results [identifier ] = []
3138 if identifier not in self .disambiguation_requests :
3239 self .disambiguation_requests [identifier ] = []
3340
3441
3542class ParallelQuerySolvingAgent (BaseChatAgent ):
43+ """An agent that solves each query in parallel."""
44+
3645 def __init__ (self , ** kwargs : dict ):
3746 super ().__init__ (
3847 "parallel_query_solving_agent" ,
@@ -98,7 +107,7 @@ async def on_messages_stream(
98107 # Load the json of the last message to populate the final output object
99108 sequential_steps = json .loads (last_response )
100109
101- logging .info (f"Query Rewrites: { sequential_steps } " )
110+ logging .info ("Sequential Steps: %s" , sequential_steps )
102111
103112 async def consume_inner_messages_from_agentic_flow (
104113 agentic_flow , identifier , filtered_parallel_messages
@@ -115,7 +124,7 @@ async def consume_inner_messages_from_agentic_flow(
115124 # Add message to results dictionary, tagged by the function name
116125 filtered_parallel_messages .add_identifier (identifier )
117126
118- logging .info (f "Checking Inner Message: { inner_message } " )
127+ logging .info ("Checking Inner Message: %s" , inner_message )
119128
120129 try :
121130 if isinstance (inner_message , ToolCallExecutionEvent ):
@@ -124,7 +133,7 @@ async def consume_inner_messages_from_agentic_flow(
124133 parsed_message = self .parse_inner_message (
125134 call_result .content
126135 )
127- logging .info (f "Inner Loaded: { parsed_message } " )
136+ logging .info ("Inner Loaded: %s" , parsed_message )
128137
129138 if isinstance (parsed_message , dict ):
130139 if (
@@ -145,7 +154,7 @@ async def consume_inner_messages_from_agentic_flow(
145154 elif isinstance (inner_message , TextMessage ):
146155 parsed_message = self .parse_inner_message (inner_message .content )
147156
148- logging .info (f "Inner Loaded: { parsed_message } " )
157+ logging .info ("Inner Loaded: %s" , parsed_message )
149158
150159 # Search for specific message types and add them to the final output object
151160 if isinstance (parsed_message , dict ):
@@ -186,7 +195,7 @@ async def consume_inner_messages_from_agentic_flow(
186195 ].append (disambiguation_request )
187196
188197 except Exception as e :
189- logging .warning (f "Error processing message: { e } " )
198+ logging .warning ("Error processing message: %s" , e )
190199
191200 yield inner_message
192201
@@ -209,10 +218,10 @@ async def consume_inner_messages_from_agentic_flow(
209218
210219 # Start processing sub-queries
211220 for sequential_round in sequential_steps ["steps" ]:
212- logging .info (f "Processing round: { sequential_round } " )
221+ logging .info ("Processing round: %s" , sequential_round )
213222
214223 for parallel_message in sequential_round :
215- logging .info (f "Parallel Message: { parallel_message } " )
224+ logging .info ("Parallel Message: %s" , parallel_message )
216225
217226 # Create an instance of the InnerAutoGenText2Sql class
218227 inner_autogen_text_2_sql = InnerAutoGenText2Sql (** self .kwargs )
@@ -250,7 +259,7 @@ async def consume_inner_messages_from_agentic_flow(
250259 async with combined_message_streams .stream () as streamer :
251260 async for inner_message in streamer :
252261 if isinstance (inner_message , TextMessage ):
253- logging .debug (f "Inner Solving Message: { inner_message } " )
262+ logging .debug ("Inner Solving Message: %s" , inner_message )
254263 yield inner_message
255264
256265 # Log final results for debugging or auditing
0 commit comments