Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 103 additions & 89 deletions memoryos-pypi/memoryos.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,103 +125,117 @@ def __init__(self, user_id: str,

def _trigger_profile_and_knowledge_update_if_needed(self):
"""
Checks mid-term memory for hot segments and triggers profile/knowledge update if threshold is met.
Adapted from main_memoybank.py's update_user_profile_from_top_segment.
Enhanced with parallel LLM processing for better performance.
Checks mid-term memory for hot segments and triggers profile/knowledge
update if threshold is met. Processes ALL sessions above the threshold
(not just the single hottest), so that a burst of activity on multiple
topics does not leave any session indefinitely waiting for analysis.
"""
if not self.mid_term_memory.heap:
return

# Peek at the top of the heap (hottest segment)
# MidTermMemory heap stores (-H_segment, sid)
neg_heat, sid = self.mid_term_memory.heap[0]
current_heat = -neg_heat
processed_any = False

# Loop while the hottest session is above threshold, so that every
# session that has crossed the line gets analysed in one pass.
while self.mid_term_memory.heap:
neg_heat, sid = self.mid_term_memory.heap[0]
current_heat = -neg_heat

if current_heat < self.mid_term_heat_threshold:
break # nothing else is hot enough

if current_heat >= self.mid_term_heat_threshold:
session = self.mid_term_memory.sessions.get(sid)
if not session:
self.mid_term_memory.rebuild_heap() # Clean up if session is gone
return

# Get unanalyzed pages from this hot session
# A page is a dict: {"user_input": ..., "agent_response": ..., "timestamp": ..., "analyzed": False, ...}
unanalyzed_pages = [p for p in session.get("details", []) if not p.get("analyzed", False)]

if unanalyzed_pages:
print(f"Memoryos: Mid-term session {sid} heat ({current_heat:.2f}) exceeded threshold. Analyzing {len(unanalyzed_pages)} pages for profile/knowledge update.")

# 并行执行两个LLM任务:用户画像分析(已包含更新)、知识提取
def task_user_profile_analysis():
print("Memoryos: Starting parallel user profile analysis and update...")
# 获取现有用户画像
existing_profile = self.user_long_term_memory.get_raw_user_profile(self.user_id)
if not existing_profile or existing_profile.lower() == "none":
existing_profile = "No existing profile data."

# 直接输出更新后的完整画像
return gpt_user_profile_analysis(unanalyzed_pages, self.client, model=self.llm_model, existing_user_profile=existing_profile)

def task_knowledge_extraction():
print("Memoryos: Starting parallel knowledge extraction...")
return gpt_knowledge_extraction(unanalyzed_pages, self.client, model=self.llm_model)

# 使用并行任务执行
with ThreadPoolExecutor(max_workers=2) as executor:
# 提交两个主要任务
future_profile = executor.submit(task_user_profile_analysis)
future_knowledge = executor.submit(task_knowledge_extraction)

# 等待结果
try:
updated_user_profile = future_profile.result() # 直接是更新后的完整画像
knowledge_result = future_knowledge.result()
except Exception as e:
print(f"Error in parallel LLM processing: {e}")
return

new_user_private_knowledge = knowledge_result.get("private")
new_assistant_knowledge = knowledge_result.get("assistant_knowledge")

# 直接使用更新后的完整用户画像
if (updated_user_profile and
updated_user_profile.lower() != "none" and
len(updated_user_profile.strip()) >= 30):
print("Memoryos: Updating user profile with integrated analysis...")
self.user_long_term_memory.update_user_profile(self.user_id, updated_user_profile, merge=False) # 直接替换为新的完整画像
else:
print("Memoryos: Skipping user profile update due to insufficient content.")

# Add User Private Knowledge to user's LTM
if new_user_private_knowledge and new_user_private_knowledge.lower() != "none":
for line in new_user_private_knowledge.split('\n'):
if line.strip() and line.strip().lower() not in ["none", "- none", "- none."]:
self.user_long_term_memory.add_user_knowledge(line.strip())

# Add Assistant Knowledge to assistant's LTM
if new_assistant_knowledge and new_assistant_knowledge.lower() != "none":
for line in new_assistant_knowledge.split('\n'):
if line.strip() and line.strip().lower() not in ["none", "- none", "- none."]:
self.assistant_long_term_memory.add_assistant_knowledge(line.strip()) # Save to dedicated assistant LTM

# Mark pages as analyzed and reset session heat contributors
for p in session["details"]:
p["analyzed"] = True # Mark all pages in session, or just unanalyzed_pages?
# Original code marked all pages in session

session["N_visit"] = 0 # Reset visits after analysis
session["L_interaction"] = 0 # Reset interaction length contribution
# session["R_recency"] = 1.0 # Recency will re-calculate naturally
session["H_segment"] = compute_segment_heat(session) # Recompute heat with reset factors
session["last_visit_time"] = get_timestamp() # Update last visit time

self.mid_term_memory.rebuild_heap() # Heap needs rebuild due to H_segment change
self.mid_term_memory.save()
print(f"Memoryos: Profile/Knowledge update for session {sid} complete. Heat reset.")
self.mid_term_memory.rebuild_heap()
continue # stale heap entry – rebuild and re-check

unanalyzed_pages = [p for p in session.get("details", [])
if not p.get("analyzed", False)]

if not unanalyzed_pages:
# Session is hot but everything was already analysed.
# Reset its heat contributors so it does not keep occupying
# the top of the heap on every subsequent call.
print(f"Memoryos: Hot session {sid} has no unanalyzed pages. "
f"Resetting heat to allow other sessions to surface.")
session["N_visit"] = 0
session["H_segment"] = compute_segment_heat(session)
self.mid_term_memory.rebuild_heap()
processed_any = True
continue

print(f"Memoryos: Mid-term session {sid} heat ({current_heat:.2f}) "
f"exceeded threshold. Analyzing {len(unanalyzed_pages)} pages "
f"for profile/knowledge update.")

# 并行执行两个LLM任务:用户画像分析(已包含更新)、知识提取
def task_user_profile_analysis():
print("Memoryos: Starting parallel user profile analysis and update...")
existing_profile = self.user_long_term_memory.get_raw_user_profile(self.user_id)
if not existing_profile or existing_profile.lower() == "none":
existing_profile = "No existing profile data."
return gpt_user_profile_analysis(unanalyzed_pages, self.client,
model=self.llm_model,
existing_user_profile=existing_profile)

def task_knowledge_extraction():
print("Memoryos: Starting parallel knowledge extraction...")
return gpt_knowledge_extraction(unanalyzed_pages, self.client,
model=self.llm_model)

with ThreadPoolExecutor(max_workers=2) as executor:
future_profile = executor.submit(task_user_profile_analysis)
future_knowledge = executor.submit(task_knowledge_extraction)

try:
updated_user_profile = future_profile.result()
knowledge_result = future_knowledge.result()
except Exception as e:
print(f"Error in parallel LLM processing: {e}")
break # do not process further sessions on error

new_user_private_knowledge = knowledge_result.get("private")
new_assistant_knowledge = knowledge_result.get("assistant_knowledge")

# 直接使用更新后的完整用户画像
if (updated_user_profile
and updated_user_profile.lower() != "none"
and len(updated_user_profile.strip()) >= 30):
print("Memoryos: Updating user profile with integrated analysis...")
self.user_long_term_memory.update_user_profile(
self.user_id, updated_user_profile, merge=False)
else:
print(f"Memoryos: Hot session {sid} has no unanalyzed pages. Skipping profile update.")
else:
# print(f"Memoryos: Top session {sid} heat ({current_heat:.2f}) below threshold. No profile update.")
pass # No action if below threshold
print("Memoryos: Skipping user profile update due to "
"insufficient content.")

# Add User Private Knowledge to user's LTM
if new_user_private_knowledge and new_user_private_knowledge.lower() != "none":
for line in new_user_private_knowledge.split('\n'):
if line.strip() and line.strip().lower() not in ["none", "- none", "- none."]:
self.user_long_term_memory.add_user_knowledge(line.strip())

# Add Assistant Knowledge to assistant's LTM
if new_assistant_knowledge and new_assistant_knowledge.lower() != "none":
for line in new_assistant_knowledge.split('\n'):
if line.strip() and line.strip().lower() not in ["none", "- none", "- none."]:
self.assistant_long_term_memory.add_assistant_knowledge(line.strip())

# Mark pages as analyzed and reset session heat contributors
for p in session["details"]:
p["analyzed"] = True

session["N_visit"] = 0
session["L_interaction"] = 0
session["H_segment"] = compute_segment_heat(session)
session["last_visit_time"] = get_timestamp()

self.mid_term_memory.rebuild_heap()
processed_any = True
print(f"Memoryos: Profile/Knowledge update for session {sid} "
f"complete. Heat reset.")

if processed_any:
self.mid_term_memory.save()

def add_memory(self, user_input: str, agent_response: str, timestamp: str = None, meta_data: dict = None):
"""
Expand Down
29 changes: 7 additions & 22 deletions memoryos-pypi/mid_term.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,8 +278,8 @@ def insert_pages_into_session(self, summary_for_new_pages, keywords_for_new_page
print(f"MidTermMemory: No suitable session to merge (best score {best_overall_score:.2f} < threshold {similarity_threshold}). Creating new session.")
return self.add_session(summary_for_new_pages, pages_to_insert, keywords_for_new_pages)

def search_sessions(self, query_text, segment_similarity_threshold=0.1, page_similarity_threshold=0.1,
top_k_sessions=5, keyword_alpha=1.0, recency_tau_search=3600):
def search_sessions(self, query_text, segment_similarity_threshold=0.1, page_similarity_threshold=0.1,
top_k_sessions=5):
if not self.sessions:
return []

Expand All @@ -289,9 +289,7 @@ def search_sessions(self, query_text, segment_similarity_threshold=0.1, page_sim
**self.embedding_model_kwargs
)
query_vec = normalize_vector(query_vec)
query_keywords = set() # Keywords extraction removed, relying on semantic similarity

candidate_sessions = []
session_ids = list(self.sessions.keys())
if not session_ids: return []

Expand All @@ -301,7 +299,7 @@ def search_sessions(self, query_text, segment_similarity_threshold=0.1, page_sim
dim = summary_embeddings_np.shape[1]
index = faiss.IndexFlatIP(dim) # Inner product for similarity
index.add(summary_embeddings_np)

query_arr_np = np.array([query_vec], dtype=np.float32)
distances, indices = index.search(query_arr_np, min(top_k_sessions, len(session_ids)))

Expand All @@ -310,33 +308,20 @@ def search_sessions(self, query_text, segment_similarity_threshold=0.1, page_sim

for i, idx in enumerate(indices[0]):
if idx == -1: continue

session_id = session_ids[idx]
session = self.sessions[session_id]
semantic_sim_score = float(distances[0][i]) # This is the dot product

# Keyword similarity for session summary
session_keywords = set(session.get("summary_keywords", []))
s_topic_keywords = 0
if query_keywords and session_keywords:
intersection = len(query_keywords.intersection(session_keywords))
union = len(query_keywords.union(session_keywords))
if union > 0: s_topic_keywords = intersection / union

# Time decay for session recency in search scoring
# time_decay_factor = compute_time_decay(session["timestamp"], current_time_str, tau_hours=recency_tau_search)

# Combined score for session relevance
session_relevance_score = (semantic_sim_score + keyword_alpha * s_topic_keywords)
# Session relevance is based purely on semantic similarity (dot product of
# normalized summary embeddings = cosine similarity).
session_relevance_score = semantic_sim_score

if session_relevance_score >= segment_similarity_threshold:
matched_pages_in_session = []
for page in session.get("details", []):
page_embedding = np.array(page["page_embedding"], dtype=np.float32)
# page_keywords = set(page.get("page_keywords", []))

page_sim_score = float(np.dot(page_embedding, query_vec))
# Can also add keyword sim for pages if needed, but keeping it simpler for now

if page_sim_score >= page_similarity_threshold:
matched_pages_in_session.append({"page_data": page, "score": page_sim_score})
Expand Down
2 changes: 1 addition & 1 deletion memoryos-pypi/prompts.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
GENERATE_SYSTEM_RESPONSE_SYSTEM_PROMPT = (
"As a communication expert with outstanding communication habits, you embody the role of {relationship} throughout the following dialogues.\n"
"Here are some of your distinctive personal traits and knowledge:\n{assistant_knowledge_text}\n"
"User's profile:\n"
"Current conversation metadata:\n"
"{meta_data_text}\n"
"Your task is to generate responses that align with these traits and maintain the tone.\n"
)
Expand Down
4 changes: 3 additions & 1 deletion memoryos-pypi/updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,9 @@ def process_short_term_to_mid_term(self):
evicted_qas.append(qa)

if not evicted_qas:
print("Updater: No QAs evicted from short-term memory.")
print("Updater: No valid QAs evicted from short-term memory. "
"Resetting cross-batch continuity tracker to prevent stale linking.")
self.last_evicted_page_for_continuity = None
return

print(f"Updater: Processing {len(evicted_qas)} QAs from short-term to mid-term.")
Expand Down