Skip to content

Commit d7df115

Browse files
committed
Enhance deep research functionality with background mode and improved error handling
- Introduce background mode for handling long prompts in deep research activities - Update timeout settings to 2 hours for complex research and 5 minutes for heartbeat - Add retry policy for deep research activity execution - Include new dependency 'httpx' in pyproject.toml - Update .gitignore to exclude 'refactor.md'
1 parent 0fecc22 commit d7df115

File tree

4 files changed

+303
-6
lines changed

4 files changed

+303
-6
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,3 +22,4 @@ debug_log.md
2222
deep_research_prompts.md
2323
openai_deep_research.md
2424
to_do.md
25+
refactor.md

examples/tutorials/10_agentic/10_temporal/030_oai_deep_research/project/activities/deep_research_activities.py

Lines changed: 293 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
"""Custom activities for deep research workflow."""
22
import re
3+
import traceback
34
from temporalio import activity
45
from agentex.lib.types.tracing import BaseModelWithTraceParams
56
from agentex.lib.utils.logging import make_logger
@@ -20,6 +21,238 @@ class DeepResearchResult(BaseModelWithTraceParams):
2021
research_report: str
2122
citations: list[dict[str, str]]
2223

24+
async def use_background_mode_for_long_prompt(params: DeepResearchParams) -> DeepResearchResult:
25+
"""Handle long-running research tasks with better timeout management."""
26+
logger.info("DeepResearchActivity: Handling long prompt with timeout management")
27+
28+
try:
29+
from agents import Agent, Runner, WebSearchTool
30+
import asyncio
31+
32+
# Send initial message to user
33+
await adk.messages.create(
34+
task_id=params.task_id,
35+
content=TextContent(
36+
author="agent",
37+
content="🔄 Processing complex research request... This typically takes 3-5 minutes for detailed financial analysis. I'll keep working on it."
38+
)
39+
)
40+
41+
# Create agent
42+
research_agent = Agent(
43+
name="Deep Research Agent",
44+
model=params.research_model,
45+
instructions=params.research_instructions,
46+
tools=[WebSearchTool()]
47+
)
48+
49+
final_output = ""
50+
citations = []
51+
message_sent = False
52+
53+
try:
54+
logger.info("DeepResearchActivity: Starting long-running research")
55+
56+
# Keep heartbeat alive during execution
57+
async def heartbeat_task():
58+
count = 0
59+
while True:
60+
activity.heartbeat()
61+
count += 1
62+
if count % 4 == 0: # Every 2 minutes (30s * 4)
63+
await adk.messages.create(
64+
task_id=params.task_id,
65+
content=TextContent(
66+
author="agent",
67+
content=f"⏳ Still researching... ({count // 2} minutes elapsed)"
68+
)
69+
)
70+
await asyncio.sleep(30) # Heartbeat every 30 seconds
71+
72+
# Run heartbeat in background
73+
heartbeat = asyncio.create_task(heartbeat_task())
74+
75+
try:
76+
# For very long prompts, we'll use streaming to capture partial results
77+
result = Runner.run_streamed(
78+
starting_agent=research_agent,
79+
input=[
80+
{"role": "user", "content": params.enriched_instructions}
81+
]
82+
)
83+
84+
# Process streaming results
85+
current_message = ""
86+
event_count = 0
87+
88+
async for event in result.stream_events():
89+
event_count += 1
90+
91+
# Keep activity alive
92+
if event_count % 10 == 0:
93+
activity.heartbeat()
94+
95+
# Handle different event types
96+
if event.type == "run_item_stream_event":
97+
if hasattr(event, 'item'):
98+
item = event.item
99+
item_type = getattr(item, 'type', None)
100+
101+
# Handle message items
102+
if item_type == "message":
103+
if hasattr(item, 'content') and isinstance(item.content, list):
104+
for content_item in item.content:
105+
if hasattr(content_item, 'type') and content_item.type == "output_text":
106+
text = getattr(content_item, 'text', '')
107+
if text:
108+
final_output = text
109+
message_sent = True
110+
logger.info(f"DeepResearchActivity: Found message output ({len(text)} chars)")
111+
112+
# Send to UI
113+
await adk.messages.create(
114+
task_id=params.task_id,
115+
content=TextContent(
116+
author="agent",
117+
content=text
118+
)
119+
)
120+
121+
# Extract citations from annotations
122+
annotations = getattr(content_item, 'annotations', [])
123+
for annotation in annotations:
124+
if hasattr(annotation, 'url') and hasattr(annotation, 'title'):
125+
citations.append({
126+
"title": annotation.title,
127+
"url": annotation.url
128+
})
129+
130+
# Handle text deltas for streaming
131+
elif hasattr(event, 'delta') and hasattr(event.delta, 'content'):
132+
content = event.delta.content
133+
current_message += content
134+
135+
# Stream large chunks to user
136+
if len(current_message) > 1000 and not message_sent:
137+
await adk.messages.create(
138+
task_id=params.task_id,
139+
content=TextContent(
140+
author="agent",
141+
content=current_message
142+
)
143+
)
144+
current_message = ""
145+
146+
elif event.type == "agent_updated_stream_event":
147+
logger.debug("DeepResearchActivity: Agent updated event")
148+
continue
149+
150+
# Send any remaining content
151+
if current_message and not message_sent:
152+
final_output = current_message
153+
await adk.messages.create(
154+
task_id=params.task_id,
155+
content=TextContent(
156+
author="agent",
157+
content=current_message
158+
)
159+
)
160+
161+
# Try to get final output from result
162+
if not final_output and hasattr(result, 'final_output') and result.final_output:
163+
final_output = result.final_output
164+
if not message_sent:
165+
await adk.messages.create(
166+
task_id=params.task_id,
167+
content=TextContent(
168+
author="agent",
169+
content=final_output
170+
)
171+
)
172+
173+
logger.info(f"DeepResearchActivity: Research completed, processed {event_count} events")
174+
175+
finally:
176+
# Cancel heartbeat task
177+
heartbeat.cancel()
178+
try:
179+
await heartbeat
180+
except asyncio.CancelledError:
181+
pass
182+
183+
except asyncio.CancelledError:
184+
logger.error("DeepResearchActivity: Research was cancelled")
185+
error_msg = "The research request was cancelled. For complex financial analysis, please try breaking your request into smaller, specific queries."
186+
187+
await adk.messages.create(
188+
task_id=params.task_id,
189+
content=TextContent(
190+
author="agent",
191+
content=error_msg
192+
)
193+
)
194+
195+
return DeepResearchResult(
196+
research_report=error_msg,
197+
citations=[]
198+
)
199+
200+
except Exception as e:
201+
logger.error(f"DeepResearchActivity: Error during long-running research: {e}")
202+
logger.error(f"DeepResearchActivity: Error type: {type(e).__name__}")
203+
204+
error_msg = f"Error during research: {str(e)}. Please try a more focused question."
205+
206+
await adk.messages.create(
207+
task_id=params.task_id,
208+
content=TextContent(
209+
author="agent",
210+
content=error_msg
211+
)
212+
)
213+
214+
return DeepResearchResult(
215+
research_report=error_msg,
216+
citations=[]
217+
)
218+
219+
# Send citations if found
220+
if citations:
221+
citations_text = "\\n\\nSources cited:\\n" + "\\n".join(
222+
[f"- [{c['title']}]({c['url']})" for c in citations[:10]]
223+
)
224+
await adk.messages.create(
225+
task_id=params.task_id,
226+
content=TextContent(
227+
author="agent",
228+
content=citations_text
229+
)
230+
)
231+
232+
return DeepResearchResult(
233+
research_report=final_output or "Research completed.",
234+
citations=citations
235+
)
236+
237+
except Exception as e:
238+
logger.error(f"DeepResearchActivity: Long prompt handler failed: {e}")
239+
logger.error(f"Full error: {traceback.format_exc()}")
240+
241+
# Send error message to user
242+
error_msg = f"Failed to process research request: {str(e)}"
243+
await adk.messages.create(
244+
task_id=params.task_id,
245+
content=TextContent(
246+
author="agent",
247+
content=error_msg
248+
)
249+
)
250+
251+
return DeepResearchResult(
252+
research_report=error_msg,
253+
citations=[]
254+
)
255+
23256
@activity.defn(name="run_deep_research")
24257
async def run_deep_research(params: DeepResearchParams) -> DeepResearchResult:
25258
"""Run deep research using OpenAI agents library directly."""
@@ -28,6 +261,11 @@ async def run_deep_research(params: DeepResearchParams) -> DeepResearchResult:
28261
logger.info(f"DeepResearchActivity: Instructions length: {len(params.enriched_instructions)}")
29262
logger.info(f"DeepResearchActivity: Model: {params.research_model}")
30263

264+
# Check if this is a long prompt that needs background mode
265+
if len(params.enriched_instructions) > 5000:
266+
logger.info(f"DeepResearchActivity: Long prompt detected ({len(params.enriched_instructions)} chars), considering background mode")
267+
# For now, we'll continue with the agents library approach but with better error handling
268+
31269
try:
32270
from agents import Agent, Runner, WebSearchTool
33271
logger.info("DeepResearchActivity: Successfully imported agents library")
@@ -44,7 +282,12 @@ async def run_deep_research(params: DeepResearchParams) -> DeepResearchResult:
44282
tools=[WebSearchTool()] # Use WebSearchTool directly
45283
)
46284

47-
# Run agent with streaming
285+
# For long prompts, use background mode
286+
if len(params.enriched_instructions) > 5000:
287+
logger.info(f"DeepResearchActivity: Using background mode for long prompt ({len(params.enriched_instructions)} chars)")
288+
return await use_background_mode_for_long_prompt(params)
289+
290+
# Run agent with streaming for shorter prompts
48291
logger.info("DeepResearchActivity: Starting agent run with streaming")
49292
try:
50293
result = Runner.run_streamed(
@@ -177,7 +420,7 @@ async def run_deep_research(params: DeepResearchParams) -> DeepResearchResult:
177420
logger.info(f"DeepResearchActivity: Using accumulated message as final output ({len(current_message)} chars)")
178421

179422
# Check if we can get final output from result object
180-
if not final_output and hasattr(result, 'final_output'):
423+
if not final_output and hasattr(result, 'final_output') and result.final_output:
181424
final_output = result.final_output
182425
logger.info(f"DeepResearchActivity: Using result.final_output ({len(result.final_output)} chars)")
183426
logger.debug(f"DeepResearchActivity: First 500 chars of result.final_output: {result.final_output[:500]}")
@@ -200,10 +443,56 @@ async def run_deep_research(params: DeepResearchParams) -> DeepResearchResult:
200443
logger.error(f"DeepResearchActivity: Traceback: {traceback.format_exc()}")
201444
raise
202445

446+
# Wait for result to complete if streaming didn't capture everything
447+
if not final_output and hasattr(result, 'wait'):
448+
logger.info("DeepResearchActivity: No output captured during streaming, waiting for result...")
449+
try:
450+
# Some streaming results need to be awaited
451+
if callable(result.wait):
452+
await result.wait()
453+
454+
# Check again for final_output
455+
if hasattr(result, 'final_output') and result.final_output:
456+
final_output = result.final_output
457+
logger.info(f"DeepResearchActivity: Got final_output after wait ({len(final_output)} chars)")
458+
except Exception as e:
459+
logger.error(f"DeepResearchActivity: Error waiting for result: {e}")
460+
461+
# Try to get output from other result attributes
462+
if not final_output:
463+
# Check for other possible output attributes
464+
for attr in ['output', 'text', 'content', 'response']:
465+
if hasattr(result, attr):
466+
value = getattr(result, attr)
467+
if value and isinstance(value, str):
468+
final_output = value
469+
logger.info(f"DeepResearchActivity: Found output in result.{attr} ({len(value)} chars)")
470+
break
471+
472+
# Log all available attributes for debugging
473+
if not final_output:
474+
logger.warning("DeepResearchActivity: No final output found, checking available attributes...")
475+
attrs = [attr for attr in dir(result) if not attr.startswith('_')]
476+
logger.info(f"DeepResearchActivity: Available result attributes: {attrs}")
477+
478+
# Try to get any string representation
479+
try:
480+
final_output = str(result)
481+
if final_output and len(final_output) > 100: # Meaningful content
482+
logger.info(f"DeepResearchActivity: Using string representation of result ({len(final_output)} chars)")
483+
else:
484+
final_output = ""
485+
except:
486+
final_output = ""
487+
203488
# Ensure we have some output
204489
if not final_output:
205-
logger.warning("DeepResearchActivity: No final output captured, using placeholder")
206-
final_output = "Research completed but no output was captured. Please check the logs."
490+
logger.warning("DeepResearchActivity: No final output captured after all attempts")
491+
if current_message and len(current_message) > 100:
492+
final_output = current_message
493+
logger.info(f"DeepResearchActivity: Using accumulated message as fallback ({len(current_message)} chars)")
494+
else:
495+
final_output = "I apologize, but I was unable to complete the research. The request may be too complex or there may have been a technical issue. Please try breaking down your request into smaller, more specific queries."
207496

208497
# Send the final output to the UI if we haven't sent it already
209498
if final_output and not message_found:

examples/tutorials/10_agentic/10_temporal/030_oai_deep_research/project/workflows/deep_research/research.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
from datetime import timedelta
33
from typing import override
44
from temporalio import workflow
5+
from temporalio.common import RetryPolicy
56
from agentex.lib import adk
67
from agentex.lib.sdk.state_machine.state_workflow import StateWorkflow
78
from agentex.lib.utils.logging import make_logger
@@ -77,8 +78,13 @@ async def execute(self, state_machine, state_machine_data=None):
7778
result = await workflow.execute_activity(
7879
"run_deep_research",
7980
research_params,
80-
start_to_close_timeout=timedelta(minutes=20), # 20 minutes timeout
81-
heartbeat_timeout=timedelta(minutes=1) # 1 minute heartbeat
81+
start_to_close_timeout=timedelta(hours=2), # 2 hours timeout for complex research
82+
heartbeat_timeout=timedelta(minutes=5), # 5 minute heartbeat
83+
retry_policy=RetryPolicy(
84+
maximum_attempts=3,
85+
initial_interval=timedelta(seconds=30),
86+
maximum_interval=timedelta(minutes=5)
87+
)
8288
)
8389

8490
logger.info("ResearchWorkflow: Deep research activity completed")

examples/tutorials/10_agentic/10_temporal/030_oai_deep_research/pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ dependencies = [
1717
"jinja2",
1818
"python-dotenv",
1919
"scale-gp",
20+
"httpx>=0.24.0",
2021
]
2122

2223
[project.optional-dependencies]

0 commit comments

Comments
 (0)