Skip to content

Commit 40e4fc3

Browse files
committed
Add aiostream
1 parent 9beacde commit 40e4fc3

File tree

1 file changed

+17
-21
lines changed

1 file changed

+17
-21
lines changed

text_2_sql/autogen/src/autogen_text_2_sql/custom_agents/parallel_query_solving_agent.py

Lines changed: 17 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,6 @@ async def on_messages(
4545
async def on_messages_stream(
4646
self, messages: Sequence[ChatMessage], cancellation_token: CancellationToken
4747
) -> AsyncGenerator[AgentMessage | Response, None]:
48-
inner_messages: List[AgentMessage | ChatMessage] = []
49-
5048
last_response = messages[-1].content
5149
parameter_input = messages[0].content
5250
try:
@@ -58,9 +56,7 @@ async def on_messages_stream(
5856
# Load the json of the last message to populate the final output object
5957
query_rewrites = json.loads(last_response)
6058

61-
logging.info(f"Query Rewrite: {query_rewrites}")
62-
63-
inner_solving_generators = []
59+
logging.info(f"Query Rewrites: {query_rewrites}")
6460

6561
async def consume_inner_messages_from_agentic_flow(
6662
agentic_flow, identifier, complete_inner_messages
@@ -81,49 +77,49 @@ async def consume_inner_messages_from_agentic_flow(
8177

8278
yield {"source": identifier, "message": inner_message}
8379

80+
inner_solving_generators = []
8481
complete_inner_messages = {}
8582

8683
# Start processing sub-queries
8784
for query_rewrite in query_rewrites["sub_queries"]:
85+
logging.info(f"Processing sub-query: {query_rewrite}")
8886
# Create an instance of the InnerAutoGenText2Sql class
8987
inner_autogen_text_2_sql = InnerAutoGenText2Sql(
9088
self.engine_specific_rules, **self.kwargs
9189
)
9290

9391
# Launch tasks for each sub-query
9492
inner_solving_generators.append(
95-
inner_autogen_text_2_sql.process_question(
96-
question=query_rewrite, parameters=user_parameters
93+
consume_inner_messages_from_agentic_flow(
94+
inner_autogen_text_2_sql.process_question(
95+
question=query_rewrite, parameters=user_parameters
96+
),
97+
query_rewrite,
98+
complete_inner_messages,
9799
)
98100
)
99101

102+
logging.info("Starting Inner Solving Generators")
100103
combined_message_streams = stream.merge(*inner_solving_generators)
101104

102105
async with combined_message_streams.stream() as streamer:
103106
async for inner_message in streamer:
104-
print(inner_message)
107+
logging.info(f"Inner Solving Message: {inner_message}")
105108
yield inner_message
106109

107-
# # Process the results as they are yielded
108-
# for completed in asyncio.as_completed(inner_solving_generators):
109-
# async for inner_message in completed:
110-
# # Yield the result as soon as it's available
111-
# yield inner_message
112-
113-
# # Wait for all tasks to complete
114-
# await asyncio.gather(*inner_solving_generators, return_exceptions=True)
115-
116-
# # Log final results for debugging or auditing
117-
# logging.info(f"Formatted Results: {complete_inner_messages}")
110+
# Log final results for debugging or auditing
111+
logging.info(f"Formatted Results: {complete_inner_messages}")
118112

119113
# TODO: Trim out unnecessary information from the final response
120-
121114
# Final response
122115
yield Response(
123116
chat_message=TextMessage(
124117
content=json.dumps(complete_inner_messages), source=self.name
125118
),
126-
inner_messages=complete_inner_messages,
119+
inner_messages=[
120+
complete_inner_message["message"]
121+
for complete_inner_message in complete_inner_messages
122+
],
127123
)
128124

129125
async def on_reset(self, cancellation_token: CancellationToken) -> None:

0 commit comments

Comments
 (0)