1+ import asyncio
2+ import json
3+ import re
4+ from typing import Tuple , List , Dict , Optional , Any
5+ from optillm .plugins .web_search_plugin import run as web_search_run
6+ from optillm .plugins .readurls_plugin import run as readurls_run
7+ from optillm .plugins .memory_plugin import run as memory_run
8+
9+ SLUG = "deep_research"
10+
11+ class DeepResearcher :
12+ """
13+ Implementation of Test-Time Diffusion Deep Researcher (TTD-DR) algorithm
14+
15+ This class implements the paper's approach of treating research as a diffusion process
16+ with iterative refinement through denoising and retrieval.
17+ """
18+
19+ def __init__ (self , client , model : str , max_iterations : int = 5 , max_sources : int = 10 ):
20+ self .client = client
21+ self .model = model
22+ self .max_iterations = max_iterations
23+ self .max_sources = max_sources
24+ self .research_state = {
25+ "queries" : [],
26+ "sources" : [],
27+ "content" : [],
28+ "synthesis" : "" ,
29+ "iteration" : 0
30+ }
31+ self .total_tokens = 0
32+
33+ def decompose_query (self , system_prompt : str , initial_query : str ) -> List [str ]:
34+ """
35+ Decompose complex research query into focused sub-queries
36+ This implements the query planning phase of TTD-DR
37+ """
38+ decomposition_prompt = f"""
39+ You are a research assistant. Given a complex query, break it down into 3-5 focused sub-queries that would help gather comprehensive information.
40+
41+ Original query: { initial_query }
42+
43+ Provide sub-queries in this format:
44+ 1. [specific focused question]
45+ 2. [specific focused question]
46+ 3. [specific focused question]
47+ ...
48+
49+ Make each sub-query specific and searchable. Focus on different aspects of the main topic.
50+ """
51+
52+ try :
53+ response = self .client .chat .completions .create (
54+ model = self .model ,
55+ messages = [
56+ {"role" : "system" , "content" : system_prompt },
57+ {"role" : "user" , "content" : decomposition_prompt }
58+ ],
59+ temperature = 0.7 ,
60+ max_tokens = 1000
61+ )
62+
63+ content = response .choices [0 ].message .content .strip ()
64+ self .total_tokens += response .usage .completion_tokens
65+
66+ # Extract numbered queries
67+ queries = []
68+ for line in content .split ('\n ' ):
69+ line = line .strip ()
70+ if re .match (r'^\d+\.' , line ):
71+ query = re .sub (r'^\d+\.\s*' , '' , line ).strip ()
72+ if query :
73+ queries .append (query )
74+
75+ return queries [:5 ] # Limit to 5 sub-queries
76+
77+ except Exception as e :
78+ # Fallback: use original query
79+ return [initial_query ]
80+
81+ def perform_web_search (self , queries : List [str ]) -> str :
82+ """
83+ Perform web search for multiple queries using the web_search plugin
84+ """
85+ combined_query = "Search for the following topics:\n " + "\n " .join ([f"- { q } " for q in queries ])
86+
87+ try :
88+ enhanced_query , _ = web_search_run ("" , combined_query , None , None , {
89+ "num_results" : self .max_sources ,
90+ "delay_seconds" : 3 , # Increased delay to avoid rate limiting
91+ "headless" : True
92+ })
93+ return enhanced_query
94+ except Exception as e :
95+ return f"Web search failed: { str (e )} "
96+
97+ def extract_and_fetch_urls (self , search_results : str ) -> str :
98+ """
99+ Extract URLs from search results and fetch their content using readurls plugin
100+ """
101+ try :
102+ content_with_urls , _ = readurls_run ("" , search_results , None , None )
103+ return content_with_urls
104+ except Exception as e :
105+ return f"URL fetching failed: { str (e )} "
106+
107+ def synthesize_with_memory (self , system_prompt : str , query : str , content : str ) -> Tuple [str , int ]:
108+ """
109+ Use memory plugin to synthesize information from collected content
110+ """
111+ # Format content for memory plugin (it expects "Query: <query>" format)
112+ memory_input = f"{ content } \n \n Query: { query } "
113+
114+ try :
115+ synthesis , tokens = memory_run (system_prompt , memory_input , self .client , self .model )
116+ return synthesis , tokens
117+ except Exception as e :
118+ return f"Memory synthesis failed: { str (e )} " , 0
119+
120+ def evaluate_completeness (self , system_prompt : str , query : str , current_synthesis : str ) -> Tuple [bool , List [str ]]:
121+ """
122+ Evaluate if the current research is complete or needs more information
123+ Returns (is_complete, list_of_missing_aspects)
124+ """
125+ evaluation_prompt = f"""
126+ You are evaluating the completeness of a research synthesis.
127+
128+ Original query: { query }
129+ Current synthesis: { current_synthesis }
130+
131+ Evaluate if this synthesis adequately addresses the original query. Consider:
132+ 1. Are all major aspects of the query covered?
133+ 2. Is there sufficient depth and detail?
134+ 3. Are there any obvious gaps or missing information?
135+
136+ Respond in this format:
137+ COMPLETE: [YES/NO]
138+ MISSING: [list any missing aspects, one per line, or "None" if complete]
139+ """
140+
141+ try :
142+ response = self .client .chat .completions .create (
143+ model = self .model ,
144+ messages = [
145+ {"role" : "system" , "content" : system_prompt },
146+ {"role" : "user" , "content" : evaluation_prompt }
147+ ],
148+ temperature = 0.3 ,
149+ max_tokens = 500
150+ )
151+
152+ content = response .choices [0 ].message .content .strip ()
153+ self .total_tokens += response .usage .completion_tokens
154+
155+ # Parse response
156+ is_complete = "COMPLETE: YES" in content .upper ()
157+
158+ missing_aspects = []
159+ if "MISSING:" in content .upper ():
160+ missing_section = content .split ("MISSING:" )[- 1 ].strip ()
161+ if missing_section .upper () != "NONE" :
162+ missing_aspects = [line .strip () for line in missing_section .split ('\n ' ) if line .strip ()]
163+
164+ return is_complete , missing_aspects
165+
166+ except Exception as e :
167+ # Default to not complete on error
168+ return False , ["Error in evaluation" ]
169+
170+ def generate_focused_queries (self , missing_aspects : List [str ], original_query : str ) -> List [str ]:
171+ """
172+ Generate focused search queries to address missing aspects
173+ """
174+ focused_queries = []
175+ for aspect in missing_aspects :
176+ # Create a focused query combining the original topic with the missing aspect
177+ focused_query = f"{ original_query } { aspect } "
178+ focused_queries .append (focused_query )
179+
180+ return focused_queries [:3 ] # Limit to 3 additional queries per iteration
181+
182+ def research (self , system_prompt : str , initial_query : str ) -> Tuple [str , int ]:
183+ """
184+ Main research loop implementing TTD-DR algorithm
185+ """
186+ # Initialize research state
187+ self .research_state ["queries" ] = [initial_query ]
188+ current_synthesis = ""
189+
190+ for iteration in range (self .max_iterations ):
191+ self .research_state ["iteration" ] = iteration + 1
192+
193+ # Step 1: Decompose current queries (first iteration) or use focused queries
194+ if iteration == 0 :
195+ queries = self .decompose_query (system_prompt , initial_query )
196+ else :
197+ # Use queries from previous iteration's gap analysis
198+ queries = self .research_state ["queries" ]
199+
200+ # Step 2: Perform web search
201+ search_results = self .perform_web_search (queries )
202+
203+ # Step 3: Extract and fetch content from URLs
204+ content_with_urls = self .extract_and_fetch_urls (search_results )
205+
206+ # Step 4: Synthesize information using memory plugin
207+ current_synthesis , tokens = self .synthesize_with_memory (
208+ system_prompt , initial_query , content_with_urls
209+ )
210+ self .total_tokens += tokens
211+
212+ # Step 5: Evaluate completeness
213+ is_complete , missing_aspects = self .evaluate_completeness (
214+ system_prompt , initial_query , current_synthesis
215+ )
216+
217+ # Store current state
218+ self .research_state ["content" ].append (content_with_urls )
219+ self .research_state ["synthesis" ] = current_synthesis
220+
221+ # Check if research is complete or max iterations reached
222+ if is_complete or iteration == self .max_iterations - 1 :
223+ break
224+
225+ # Step 6: Generate focused queries for next iteration
226+ if missing_aspects :
227+ self .research_state ["queries" ] = self .generate_focused_queries (
228+ missing_aspects , initial_query
229+ )
230+ else :
231+ break
232+
233+ # Generate final comprehensive response
234+ final_response = self .generate_final_response (system_prompt , initial_query , current_synthesis )
235+
236+ return final_response , self .total_tokens
237+
238+ def generate_final_response (self , system_prompt : str , original_query : str , synthesis : str ) -> str :
239+ """
240+ Generate the final comprehensive research response
241+ """
242+ final_prompt = f"""
243+ Based on comprehensive research, provide a detailed and well-structured response to the following query.
244+
245+ Original query: { original_query }
246+ Research synthesis: { synthesis }
247+
248+ Please provide a comprehensive, well-organized response that:
249+ 1. Directly addresses the original query
250+ 2. Includes key findings and insights
251+ 3. Provides proper context and background
252+ 4. Is well-structured with clear sections
253+ 5. Acknowledges any limitations or areas where more research might be needed
254+
255+ Format your response professionally and cite specific information where relevant.
256+ """
257+
258+ try :
259+ response = self .client .chat .completions .create (
260+ model = self .model ,
261+ messages = [
262+ {"role" : "system" , "content" : system_prompt },
263+ {"role" : "user" , "content" : final_prompt }
264+ ],
265+ temperature = 0.7 ,
266+ max_tokens = 2000
267+ )
268+
269+ final_content = response .choices [0 ].message .content .strip ()
270+ self .total_tokens += response .usage .completion_tokens
271+
272+ # Add research metadata
273+ metadata = f"\n \n ---\n **Research Summary:**\n "
274+ metadata += f"- Iterations completed: { self .research_state ['iteration' ]} \n "
275+ metadata += f"- Total tokens used: { self .total_tokens } \n "
276+ metadata += f"- Sources consulted: Multiple web sources and documents\n "
277+
278+ return final_content + metadata
279+
280+ except Exception as e :
281+ return f"Final response generation failed: { str (e )} "
282+
283+ def run (system_prompt : str , initial_query : str , client , model : str , request_config : Optional [Dict ] = None ) -> Tuple [str , int ]:
284+ """
285+ Deep Research plugin implementing TTD-DR (Test-Time Diffusion Deep Researcher)
286+
287+ This plugin orchestrates web search, URL fetching, and memory synthesis to provide
288+ comprehensive research responses using an iterative refinement approach.
289+
290+ Args:
291+ system_prompt: System prompt for the conversation
292+ initial_query: User's research query
293+ client: OpenAI client for LLM calls
294+ model: Model name to use for synthesis
295+ request_config: Optional configuration dict with keys:
296+ - max_iterations: Maximum research iterations (default: 5)
297+ - max_sources: Maximum web sources per search (default: 10)
298+
299+ Returns:
300+ Tuple of (comprehensive_research_response, total_completion_tokens)
301+ """
302+ # Parse configuration
303+ config = request_config or {}
304+ max_iterations = config .get ("max_iterations" , 5 )
305+ max_sources = config .get ("max_sources" , 10 )
306+
307+ # Validate inputs
308+ if not initial_query .strip ():
309+ return "Error: No research query provided" , 0
310+
311+ if not client :
312+ return "Error: No LLM client provided for research synthesis" , 0
313+
314+ # Initialize researcher
315+ researcher = DeepResearcher (
316+ client = client ,
317+ model = model ,
318+ max_iterations = max_iterations ,
319+ max_sources = max_sources
320+ )
321+
322+ try :
323+ # Perform deep research
324+ result , total_tokens = researcher .research (system_prompt , initial_query )
325+ return result , total_tokens
326+
327+ except Exception as e :
328+ error_response = f"Deep research failed: { str (e )} \n \n Falling back to basic response..."
329+
330+ # Fallback: provide basic response using just the model
331+ try :
332+ fallback_response = client .chat .completions .create (
333+ model = model ,
334+ messages = [
335+ {"role" : "system" , "content" : system_prompt },
336+ {"role" : "user" , "content" : initial_query }
337+ ]
338+ )
339+
340+ result = fallback_response .choices [0 ].message .content .strip ()
341+ tokens = fallback_response .usage .completion_tokens
342+
343+ return f"{ error_response } \n \n { result } " , tokens
344+
345+ except Exception as fallback_error :
346+ return f"Deep research and fallback both failed: { str (e )} | { str (fallback_error )} " , 0
0 commit comments