Skip to content

Commit a8ac57c

Browse files
authored
Merge branch 'dev' into feat/evaluation_doc_qa
2 parents 5c496ee + ab6de77 commit a8ac57c

File tree

5 files changed

+176
-63
lines changed

5 files changed

+176
-63
lines changed

src/memos/graph_dbs/polardb.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4056,7 +4056,7 @@ def _build_filter_conditions_cypher(
40564056
if filter:
40574057

40584058
def escape_cypher_string(value: str) -> str:
4059-
return value.replace("'", "''")
4059+
return value.replace("'", "\\'")
40604060

40614061
def build_cypher_filter_condition(condition_dict: dict) -> str:
40624062
"""Build a Cypher WHERE condition for a single filter item."""

src/memos/mem_scheduler/task_schedule_modules/redis_queue.py

Lines changed: 36 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -113,24 +113,35 @@ def __init__(
113113
self._stream_keys_lock = threading.Lock()
114114
self._stream_keys_refresh_thread: ContextThread | None = None
115115
self._stream_keys_refresh_stop_event = threading.Event()
116+
self._initial_scan_max_keys = int(
117+
os.getenv("MEMSCHEDULER_REDIS_INITIAL_SCAN_MAX_KEYS", "1000") or 1000
118+
)
119+
self._initial_scan_time_limit_sec = float(
120+
os.getenv("MEMSCHEDULER_REDIS_INITIAL_SCAN_TIME_LIMIT_SEC", "1.0") or 1.0
121+
)
116122

117123
# Start background stream keys refresher if connected
118124
if self._is_connected:
119-
# Refresh once synchronously to seed cache at init
120125
try:
121-
self._refresh_stream_keys()
126+
self._refresh_stream_keys(
127+
max_keys=self._initial_scan_max_keys,
128+
time_limit_sec=self._initial_scan_time_limit_sec,
129+
)
122130
except Exception as e:
123131
logger.debug(f"Initial stream keys refresh failed: {e}")
124-
125-
# Then start background refresher
126132
self._start_stream_keys_refresh_thread()
127133

128134
def get_stream_key(self, user_id: str, mem_cube_id: str, task_label: str) -> str:
129135
stream_key = f"{self.stream_key_prefix}:{user_id}:{mem_cube_id}:{task_label}"
130136
return stream_key
131137

132138
# --- Stream keys refresh background thread ---
133-
def _refresh_stream_keys(self, stream_key_prefix: str | None = None) -> list[str]:
139+
def _refresh_stream_keys(
140+
self,
141+
stream_key_prefix: str | None = None,
142+
max_keys: int | None = None,
143+
time_limit_sec: float | None = None,
144+
) -> list[str]:
134145
"""Scan Redis and refresh cached stream keys for the queue prefix."""
135146
if not self._redis_conn:
136147
return []
@@ -140,12 +151,29 @@ def _refresh_stream_keys(self, stream_key_prefix: str | None = None) -> list[str
140151

141152
try:
142153
redis_pattern = f"{stream_key_prefix}:*"
143-
raw_keys_iter = self._redis_conn.scan_iter(match=redis_pattern)
144-
raw_keys = list(raw_keys_iter)
154+
collected: list[str] = []
155+
cursor: int | str = 0
156+
start_ts = time.time() if time_limit_sec else None
157+
count_hint = 200
158+
while True:
159+
if (
160+
start_ts is not None
161+
and time_limit_sec is not None
162+
and time.time() - start_ts > time_limit_sec
163+
):
164+
break
165+
cursor, keys = self._redis_conn.scan(
166+
cursor=cursor, match=redis_pattern, count=count_hint
167+
)
168+
collected.extend(keys)
169+
if max_keys is not None and len(collected) >= max_keys:
170+
break
171+
if cursor == 0 or cursor == "0":
172+
break
145173

146174
escaped_prefix = re.escape(stream_key_prefix)
147175
regex_pattern = f"^{escaped_prefix}:"
148-
stream_keys = [key for key in raw_keys if re.match(regex_pattern, key)]
176+
stream_keys = [key for key in collected if re.match(regex_pattern, key)]
149177

150178
if stream_key_prefix == self.stream_key_prefix:
151179
with self._stream_keys_lock:

src/memos/memories/textual/tree_text_memory/retrieve/bochasearch.py

Lines changed: 74 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,11 @@
1212
from memos.embedders.factory import OllamaEmbedder
1313
from memos.log import get_logger
1414
from memos.mem_reader.base import BaseMemReader
15-
from memos.memories.textual.item import SourceMessage, TextualMemoryItem
15+
from memos.memories.textual.item import (
16+
SearchedTreeNodeTextualMemoryMetadata,
17+
SourceMessage,
18+
TextualMemoryItem,
19+
)
1620

1721

1822
logger = get_logger(__name__)
@@ -138,7 +142,7 @@ def __init__(
138142
self.reader = reader
139143

140144
def retrieve_from_internet(
141-
self, query: str, top_k: int = 10, parsed_goal=None, info=None
145+
self, query: str, top_k: int = 10, parsed_goal=None, info=None, mode="fast"
142146
) -> list[TextualMemoryItem]:
143147
"""
144148
Default internet retrieval (Web Search).
@@ -155,24 +159,24 @@ def retrieve_from_internet(
155159
"""
156160
search_results = self.bocha_api.search_ai(query) # ✅ default to
157161
# web-search
158-
return self._convert_to_mem_items(search_results, query, parsed_goal, info)
162+
return self._convert_to_mem_items(search_results, query, parsed_goal, info, mode=mode)
159163

160164
def retrieve_from_web(
161-
self, query: str, top_k: int = 10, parsed_goal=None, info=None
165+
self, query: str, top_k: int = 10, parsed_goal=None, info=None, mode="fast"
162166
) -> list[TextualMemoryItem]:
163167
"""Explicitly retrieve using Bocha Web Search."""
164168
search_results = self.bocha_api.search_web(query)
165-
return self._convert_to_mem_items(search_results, query, parsed_goal, info)
169+
return self._convert_to_mem_items(search_results, query, parsed_goal, info, mode=mode)
166170

167171
def retrieve_from_ai(
168-
self, query: str, top_k: int = 10, parsed_goal=None, info=None
172+
self, query: str, top_k: int = 10, parsed_goal=None, info=None, mode="fast"
169173
) -> list[TextualMemoryItem]:
170174
"""Explicitly retrieve using Bocha AI Search."""
171175
search_results = self.bocha_api.search_ai(query)
172-
return self._convert_to_mem_items(search_results, query, parsed_goal, info)
176+
return self._convert_to_mem_items(search_results, query, parsed_goal, info, mode=mode)
173177

174178
def _convert_to_mem_items(
175-
self, search_results: list[dict], query: str, parsed_goal=None, info=None
179+
self, search_results: list[dict], query: str, parsed_goal=None, info=None, mode="fast"
176180
):
177181
"""Convert API search results into TextualMemoryItem objects."""
178182
memory_items = []
@@ -181,7 +185,7 @@ def _convert_to_mem_items(
181185

182186
with ContextThreadPoolExecutor(max_workers=8) as executor:
183187
futures = [
184-
executor.submit(self._process_result, r, query, parsed_goal, info)
188+
executor.submit(self._process_result, r, query, parsed_goal, info, mode=mode)
185189
for r in search_results
186190
]
187191
for future in as_completed(futures):
@@ -195,7 +199,7 @@ def _convert_to_mem_items(
195199
return list(unique_memory_items.values())
196200

197201
def _process_result(
198-
self, result: dict, query: str, parsed_goal: str, info: dict[str, Any]
202+
self, result: dict, query: str, parsed_goal: str, info: dict[str, Any], mode="fast"
199203
) -> list[TextualMemoryItem]:
200204
"""Process one Bocha search result into TextualMemoryItem."""
201205
title = result.get("name", "")
@@ -216,27 +220,63 @@ def _process_result(
216220
else:
217221
publish_time = datetime.now().strftime("%Y-%m-%d")
218222

219-
# Use reader to split and process the content into chunks
220-
read_items = self.reader.get_memory([content], type="doc", info=info)
221-
222-
memory_items = []
223-
for read_item_i in read_items[0]:
224-
read_item_i.memory = (
225-
f"[Outer internet view] Title: {title}\nNewsTime:"
226-
f" {publish_time}\nSummary:"
227-
f" {summary}\n"
228-
f"Content: {read_item_i.memory}"
229-
)
230-
read_item_i.metadata.source = "web"
231-
read_item_i.metadata.memory_type = "OuterMemory"
232-
read_item_i.metadata.sources = [SourceMessage(type="web", url=url)] if url else []
233-
read_item_i.metadata.visibility = "public"
234-
read_item_i.metadata.internet_info = {
235-
"title": title,
236-
"url": url,
237-
"site_name": site_name,
238-
"site_icon": site_icon,
239-
"summary": summary,
240-
}
241-
memory_items.append(read_item_i)
242-
return memory_items
223+
if mode == "fast":
224+
info_ = info.copy()
225+
user_id = info_.pop("user_id", "")
226+
session_id = info_.pop("session_id", "")
227+
return [
228+
TextualMemoryItem(
229+
memory=(
230+
f"[Outer internet view] Title: {title}\nNewsTime:"
231+
f" {publish_time}\nSummary:"
232+
f" {summary}\n"
233+
),
234+
metadata=SearchedTreeNodeTextualMemoryMetadata(
235+
user_id=user_id,
236+
session_id=session_id,
237+
memory_type="OuterMemory",
238+
status="activated",
239+
type="fact",
240+
source="web",
241+
sources=[SourceMessage(type="web", url=url)] if url else [],
242+
visibility="public",
243+
info=info_,
244+
background="",
245+
confidence=0.99,
246+
usage=[],
247+
embedding=self.embedder.embed([content])[0],
248+
internet_info={
249+
"title": title,
250+
"url": url,
251+
"site_name": site_name,
252+
"site_icon": site_icon,
253+
"summary": summary,
254+
},
255+
),
256+
)
257+
]
258+
else:
259+
# Use reader to split and process the content into chunks
260+
read_items = self.reader.get_memory([content], type="doc", info=info)
261+
262+
memory_items = []
263+
for read_item_i in read_items[0]:
264+
read_item_i.memory = (
265+
f"[Outer internet view] Title: {title}\nNewsTime:"
266+
f" {publish_time}\nSummary:"
267+
f" {summary}\n"
268+
f"Content: {read_item_i.memory}"
269+
)
270+
read_item_i.metadata.source = "web"
271+
read_item_i.metadata.memory_type = "OuterMemory"
272+
read_item_i.metadata.sources = [SourceMessage(type="web", url=url)] if url else []
273+
read_item_i.metadata.visibility = "public"
274+
read_item_i.metadata.internet_info = {
275+
"title": title,
276+
"url": url,
277+
"site_name": site_name,
278+
"site_icon": site_icon,
279+
"summary": summary,
280+
}
281+
memory_items.append(read_item_i)
282+
return memory_items

src/memos/memories/textual/tree_text_memory/retrieve/searcher.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -536,7 +536,7 @@ def _retrieve_from_internet(
536536
return []
537537
logger.info(f"[PATH-C] '{query}' Retrieving from internet...")
538538
items = self.internet_retriever.retrieve_from_internet(
539-
query=query, top_k=top_k, parsed_goal=parsed_goal, info=info
539+
query=query, top_k=top_k, parsed_goal=parsed_goal, info=info, mode=mode
540540
)
541541
logger.info(f"[PATH-C] '{query}' Retrieved from internet {len(items)} items: {items}")
542542
return self.reranker.rerank(

src/memos/memories/textual/tree_text_memory/retrieve/xinyusearch.py

Lines changed: 64 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,11 @@
1212
from memos.embedders.factory import OllamaEmbedder
1313
from memos.log import get_logger
1414
from memos.mem_reader.base import BaseMemReader
15-
from memos.memories.textual.item import SourceMessage, TextualMemoryItem
15+
from memos.memories.textual.item import (
16+
SearchedTreeNodeTextualMemoryMetadata,
17+
SourceMessage,
18+
TextualMemoryItem,
19+
)
1620

1721

1822
logger = get_logger(__name__)
@@ -132,7 +136,7 @@ def __init__(
132136
self.reader = reader
133137

134138
def retrieve_from_internet(
135-
self, query: str, top_k: int = 10, parsed_goal=None, info=None
139+
self, query: str, top_k: int = 10, parsed_goal=None, info=None, mode="fast"
136140
) -> list[TextualMemoryItem]:
137141
"""
138142
Retrieve information from Xinyu search and convert to TextualMemoryItem format
@@ -153,7 +157,7 @@ def retrieve_from_internet(
153157

154158
with ContextThreadPoolExecutor(max_workers=8) as executor:
155159
futures = [
156-
executor.submit(self._process_result, result, query, parsed_goal, info)
160+
executor.submit(self._process_result, result, query, parsed_goal, info, mode=mode)
157161
for result in search_results
158162
]
159163
for future in as_completed(futures):
@@ -303,7 +307,7 @@ def _extract_tags(self, title: str, content: str, summary: str, parsed_goal=None
303307
return list(set(tags))[:15] # Limit to 15 tags
304308

305309
def _process_result(
306-
self, result: dict, query: str, parsed_goal: str, info: None
310+
self, result: dict, query: str, parsed_goal: str, info: None, mode="fast"
307311
) -> list[TextualMemoryItem]:
308312
if not info:
309313
info = {"user_id": "", "session_id": ""}
@@ -323,18 +327,59 @@ def _process_result(
323327
else:
324328
publish_time = datetime.now().strftime("%Y-%m-%d")
325329

326-
read_items = self.reader.get_memory([content], type="doc", info=info)
327-
328-
memory_items = []
329-
for read_item_i in read_items[0]:
330-
read_item_i.memory = (
331-
f"Title: {title}\nNewsTime: {publish_time}\nSummary: {summary}\n"
332-
f"Content: {read_item_i.memory}"
333-
)
334-
read_item_i.metadata.source = "web"
335-
read_item_i.metadata.memory_type = "OuterMemory"
336-
read_item_i.metadata.sources = [SourceMessage(type="web", url=url)] if url else []
337-
read_item_i.metadata.visibility = "public"
338-
339-
memory_items.append(read_item_i)
340-
return memory_items
330+
if mode == "fast":
331+
info_ = info.copy()
332+
user_id = info_.pop("user_id", "")
333+
session_id = info_.pop("session_id", "")
334+
return [
335+
TextualMemoryItem(
336+
memory=(
337+
f"[Outer internet view] Title: {title}\nNewsTime:"
338+
f" {publish_time}\nSummary:"
339+
f" {summary}\n"
340+
),
341+
metadata=SearchedTreeNodeTextualMemoryMetadata(
342+
user_id=user_id,
343+
session_id=session_id,
344+
memory_type="OuterMemory",
345+
status="activated",
346+
type="fact",
347+
source="web",
348+
sources=[SourceMessage(type="web", url=url)] if url else [],
349+
visibility="public",
350+
info=info_,
351+
background="",
352+
confidence=0.99,
353+
usage=[],
354+
embedding=self.embedder.embed([content])[0],
355+
internet_info={
356+
"title": title,
357+
"url": url,
358+
"summary": summary,
359+
"content": content,
360+
},
361+
),
362+
)
363+
]
364+
else:
365+
read_items = self.reader.get_memory([content], type="doc", info=info)
366+
367+
memory_items = []
368+
for read_item_i in read_items[0]:
369+
read_item_i.memory = (
370+
f"Title: {title}\nNewsTime: {publish_time}\nSummary: {summary}\n"
371+
f"Content: {read_item_i.memory}"
372+
)
373+
read_item_i.metadata.source = "web"
374+
read_item_i.metadata.memory_type = "OuterMemory"
375+
read_item_i.metadata.sources = [SourceMessage(type="web", url=url)] if url else []
376+
read_item_i.metadata.visibility = "public"
377+
read_item_i.metadata.internet_info = {
378+
"title": title,
379+
"url": url,
380+
"summary": summary,
381+
"content": content,
382+
}
383+
384+
memory_items.append(read_item_i)
385+
return memory_items

0 commit comments

Comments
 (0)