33import json
44import uuid
55
6+ from concurrent .futures import ThreadPoolExecutor , as_completed
67from datetime import datetime
78
89import requests
910
11+ from memos .chunkers .base import BaseChunker
1012from memos .embedders .factory import OllamaEmbedder
1113from memos .log import get_logger
1214from memos .memories .textual .item import TextualMemoryItem , TreeNodeTextualMemoryMetadata
@@ -93,8 +95,8 @@ def search(self, query: str, max_results: int | None = None) -> list[dict]:
9395 "online_search" : {
9496 "max_entries" : max_results ,
9597 "cache_switch" : False ,
96- "baidu_field" : {"switch" : True , "mode" : "relevance" , "type" : "page" },
97- "bing_field" : {"switch" : False , "mode" : "relevance" , "type" : "page_web " },
98+ "baidu_field" : {"switch" : False , "mode" : "relevance" , "type" : "page" },
99+ "bing_field" : {"switch" : True , "mode" : "relevance" , "type" : "page " },
98100 "sogou_field" : {"switch" : False , "mode" : "relevance" , "type" : "page" },
99101 },
100102 "request_id" : "memos" + str (uuid .uuid4 ()),
@@ -112,6 +114,7 @@ def __init__(
112114 access_key : str ,
113115 search_engine_id : str ,
114116 embedder : OllamaEmbedder ,
117+ chunker : BaseChunker ,
115118 max_results : int = 20 ,
116119 ):
117120 """
@@ -124,6 +127,7 @@ def __init__(
124127 """
125128 self .xinyu_api = XinyuSearchAPI (access_key , search_engine_id , max_results = max_results )
126129 self .embedder = embedder
130+ self .chunker = chunker
127131
128132 def retrieve_from_internet (
129133 self , query : str , top_k : int = 10 , parsed_goal = None
@@ -143,63 +147,25 @@ def retrieve_from_internet(
143147 search_results = self .xinyu_api .search (query , max_results = top_k )
144148
145149 # Convert to TextualMemoryItem format
146- memory_items = []
147-
148- for _ , result in enumerate (search_results ):
149- # Extract basic information from Xinyu response format
150- title = result .get ("title" , "" )
151- content = result .get ("content" , "" )
152- summary = result .get ("summary" , "" )
153- url = result .get ("url" , "" )
154- publish_time = result .get ("publish_time" , "" )
155- if publish_time :
150+ memory_items : list [TextualMemoryItem ] = []
151+
152+ with ThreadPoolExecutor (max_workers = 8 ) as executor :
153+ futures = [
154+ executor .submit (self ._process_result , result , query , parsed_goal )
155+ for result in search_results
156+ ]
157+ for future in as_completed (futures ):
156158 try :
157- publish_time = datetime .strptime (publish_time , "%Y-%m-%d %H:%M:%S" ).strftime (
158- "%Y-%m-%d"
159- )
159+ memory_items .extend (future .result ())
160160 except Exception as e :
161- logger .error (f"xinyu search error: { e } " )
162- publish_time = datetime .now ().strftime ("%Y-%m-%d" )
163- else :
164- publish_time = datetime .now ().strftime ("%Y-%m-%d" )
165- source = result .get ("source" , "" )
166- site = result .get ("site" , "" )
167- if site :
168- site = site .split ("|" )[0 ]
161+ logger .error (f"Error processing search result: { e } " )
169162
170- # Combine memory content
171- memory_content = (
172- f"Title: { title } \n Summary: { summary } \n Content: { content [: 200 ] } ... \n Source: { url } "
173- )
163+ unique_memory_items = {}
164+ for item in memory_items :
165+ if item . memory not in unique_memory_items :
166+ unique_memory_items [ item . memory ] = item
174167
175- # Create metadata
176- metadata = TreeNodeTextualMemoryMetadata (
177- user_id = None ,
178- session_id = None ,
179- status = "activated" ,
180- type = "fact" , # Search results are usually factual information
181- memory_time = publish_time ,
182- source = "web" ,
183- confidence = 85.0 , # Confidence level for search information
184- entities = self ._extract_entities (title , content , summary ),
185- tags = self ._extract_tags (title , content , summary , parsed_goal ),
186- visibility = "public" ,
187- memory_type = "LongTermMemory" , # Search results as working memory
188- key = title ,
189- sources = [url ] if url else [],
190- embedding = self .embedder .embed ([memory_content ])[0 ],
191- created_at = datetime .now ().isoformat (),
192- usage = [],
193- background = f"Xinyu search result from { site or source } " ,
194- )
195- # Create TextualMemoryItem
196- memory_item = TextualMemoryItem (
197- id = str (uuid .uuid4 ()), memory = memory_content , metadata = metadata
198- )
199-
200- memory_items .append (memory_item )
201-
202- return memory_items
168+ return list (unique_memory_items .values ())
203169
204170 def _extract_entities (self , title : str , content : str , summary : str ) -> list [str ]:
205171 """
@@ -333,3 +299,74 @@ def _extract_tags(self, title: str, content: str, summary: str, parsed_goal=None
333299 tags .extend (parsed_goal .tags )
334300
335301 return list (set (tags ))[:15 ] # Limit to 15 tags
302+
303+ def _process_result (
304+ self , result : dict , query : str , parsed_goal : str
305+ ) -> list [TextualMemoryItem ]:
306+ title = result .get ("title" , "" )
307+ content = result .get ("content" , "" )
308+ summary = result .get ("summary" , "" )
309+ url = result .get ("url" , "" )
310+ publish_time = result .get ("publish_time" , "" )
311+ if publish_time :
312+ try :
313+ publish_time = datetime .strptime (publish_time , "%Y-%m-%d %H:%M:%S" ).strftime (
314+ "%Y-%m-%d"
315+ )
316+ except Exception as e :
317+ logger .error (f"xinyu search error: { e } " )
318+ publish_time = datetime .now ().strftime ("%Y-%m-%d" )
319+ else :
320+ publish_time = datetime .now ().strftime ("%Y-%m-%d" )
321+ source = result .get ("source" , "" )
322+ site = result .get ("site" , "" )
323+ if site :
324+ site = site .split ("|" )[0 ]
325+
326+ qualified_chunks = self ._chunk (content )
327+
328+ memory_items = []
329+ for chunk_text , chunk_emb , score in qualified_chunks :
330+ memory_content = (
331+ f"Title: { title } \n NewsTime: { publish_time } \n Summary: { summary } \n "
332+ f"Content: { chunk_text } \n Source: { url } "
333+ )
334+ metadata = TreeNodeTextualMemoryMetadata (
335+ user_id = None ,
336+ session_id = None ,
337+ status = "activated" ,
338+ type = "fact" ,
339+ source = "web" ,
340+ confidence = score ,
341+ entities = self ._extract_entities (title , content , summary ),
342+ tags = self ._extract_tags (title , content , summary , parsed_goal ),
343+ visibility = "public" ,
344+ memory_type = "OuterMemory" ,
345+ key = f"[{ source } ]" + title ,
346+ sources = [url ] if url else [],
347+ embedding = chunk_emb ,
348+ created_at = datetime .now ().isoformat (),
349+ usage = [],
350+ background = f"Xinyu search result from { site or source } " ,
351+ )
352+ memory_items .append (
353+ TextualMemoryItem (id = str (uuid .uuid4 ()), memory = memory_content , metadata = metadata )
354+ )
355+
356+ return memory_items
357+
358+ def _chunk (self , content : str ) -> list [tuple [str , list [float ], float ]]:
359+ """
360+ Use SentenceChunker to split content into chunks and embed each.
361+
362+ Returns:
363+ List of (chunk_text, chunk_embedding, dummy_score)
364+ """
365+ chunks = self .chunker .chunk (content )
366+ if not chunks :
367+ return []
368+
369+ chunk_texts = [c .text for c in chunks ]
370+ chunk_embeddings = self .embedder .embed (chunk_texts )
371+
372+ return [(text , emb , 1.0 ) for text , emb in zip (chunk_texts , chunk_embeddings , strict = False )]
0 commit comments