Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 22 additions & 22 deletions mem0/memory/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ def _add_to_vector_store(self, messages, metadata, filters, infer):
response = remove_code_blocks(response)
new_retrieved_facts = json.loads(response)["facts"]
except Exception as e:
logging.error(f"Error in new_retrieved_facts: {e}")
logger.error(f"Error in new_retrieved_facts: {e}")
new_retrieved_facts = []

if not new_retrieved_facts:
Expand All @@ -360,7 +360,7 @@ def _add_to_vector_store(self, messages, metadata, filters, infer):
for item in retrieved_old_memory:
unique_data[item["id"]] = item
retrieved_old_memory = list(unique_data.values())
logging.info(f"Total existing memories: {len(retrieved_old_memory)}")
logger.info(f"Total existing memories: {len(retrieved_old_memory)}")

# mapping UUIDs with integers for handling UUID hallucinations
temp_uuid_mapping = {}
Expand All @@ -379,26 +379,26 @@ def _add_to_vector_store(self, messages, metadata, filters, infer):
response_format={"type": "json_object"},
)
except Exception as e:
logging.error(f"Error in new memory actions response: {e}")
logger.error(f"Error in new memory actions response: {e}")
response = ""

try:
response = remove_code_blocks(response)
new_memories_with_actions = json.loads(response)
except Exception as e:
logging.error(f"Invalid JSON response: {e}")
logger.error(f"Invalid JSON response: {e}")
new_memories_with_actions = {}
else:
new_memories_with_actions = {}

returned_memories = []
try:
for resp in new_memories_with_actions.get("memory", []):
logging.info(resp)
logger.info(resp)
try:
action_text = resp.get("text")
if not action_text:
logging.info("Skipping memory entry because of empty `text` field.")
logger.info("Skipping memory entry because of empty `text` field.")
continue

event_type = resp.get("event")
Expand Down Expand Up @@ -434,11 +434,11 @@ def _add_to_vector_store(self, messages, metadata, filters, infer):
}
)
elif event_type == "NONE":
logging.info("NOOP for Memory.")
logger.info("NOOP for Memory.")
except Exception as e:
logging.error(f"Error processing memory action: {resp}, Error: {e}")
logger.error(f"Error processing memory action: {resp}, Error: {e}")
except Exception as e:
logging.error(f"Error iterating new_memories_with_actions: {e}")
logger.error(f"Error iterating new_memories_with_actions: {e}")

keys, encoded_ids = process_telemetry_filters(filters)
capture_event(
Expand Down Expand Up @@ -801,7 +801,7 @@ def history(self, memory_id):
return self.db.get_history(memory_id)

def _create_memory(self, data, existing_embeddings, metadata=None):
logging.debug(f"Creating memory with {data=}")
logger.debug(f"Creating memory with {data=}")
if data in existing_embeddings:
embeddings = existing_embeddings[data]
else:
Expand Down Expand Up @@ -922,7 +922,7 @@ def _update_memory(self, memory_id, data, existing_embeddings, metadata=None):
return memory_id

def _delete_memory(self, memory_id):
logging.info(f"Deleting memory with {memory_id=}")
logger.info(f"Deleting memory with {memory_id=}")
existing_memory = self.vector_store.get(vector_id=memory_id)
prev_value = existing_memory.payload["data"]
self.vector_store.delete(vector_id=memory_id)
Expand Down Expand Up @@ -1164,7 +1164,7 @@ async def _add_to_vector_store(
response = remove_code_blocks(response)
new_retrieved_facts = json.loads(response)["facts"]
except Exception as e:
logging.error(f"Error in new_retrieved_facts: {e}")
logger.error(f"Error in new_retrieved_facts: {e}")
new_retrieved_facts = []

if not new_retrieved_facts:
Expand Down Expand Up @@ -1195,7 +1195,7 @@ async def process_fact_for_search(new_mem):
for item in retrieved_old_memory:
unique_data[item["id"]] = item
retrieved_old_memory = list(unique_data.values())
logging.info(f"Total existing memories: {len(retrieved_old_memory)}")
logger.info(f"Total existing memories: {len(retrieved_old_memory)}")

# mapping UUIDs with integers for handling UUID hallucinations
temp_uuid_mapping = {}
Expand All @@ -1214,23 +1214,23 @@ async def process_fact_for_search(new_mem):
response_format={"type": "json_object"},
)
except Exception as e:
logging.error(f"Error in new memory actions response: {e}")
logger.error(f"Error in new memory actions response: {e}")
response = ""
try:
response = remove_code_blocks(response)
new_memories_with_actions = json.loads(response)
except Exception as e:
logging.error(f"Invalid JSON response: {e}")
logger.error(f"Invalid JSON response: {e}")
new_memories_with_actions = {}

returned_memories = []
try:
for resp in new_memories_with_actions.get("memory", []):
logging.info(resp)
logger.info(resp)
try:
action_text = resp.get("text")
if not action_text:
logging.info("Skipping memory entry because of empty `text` field.")
logger.info("Skipping memory entry because of empty `text` field.")
continue

event_type = resp.get("event")
Expand Down Expand Up @@ -1271,11 +1271,11 @@ async def process_fact_for_search(new_mem):
{"id": temp_uuid_mapping[id], "memory": action_text, "event": event_type}
)
elif event_type == "NONE":
logging.info("NOOP for Memory (async).")
logger.info("NOOP for Memory (async).")
except Exception as e:
logging.error(f"Error processing memory action (async): {resp}, Error: {e}")
logger.error(f"Error processing memory action (async): {resp}, Error: {e}")
except Exception as e:
logging.error(f"Error iterating new_memories_with_actions: {e}")
logger.error(f"Error iterating new_memories_with_actions: {e}")

keys, encoded_ids = process_telemetry_filters(filters)
capture_event(
Expand Down Expand Up @@ -1641,7 +1641,7 @@ async def history(self, memory_id):
return await asyncio.to_thread(self.db.get_history, memory_id)

async def _create_memory(self, data, existing_embeddings, metadata=None):
logging.debug(f"Creating memory with {data=}")
logger.debug(f"Creating memory with {data=}")
if data in existing_embeddings:
embeddings = existing_embeddings[data]
else:
Expand Down Expand Up @@ -1783,7 +1783,7 @@ async def _update_memory(self, memory_id, data, existing_embeddings, metadata=No
return memory_id

async def _delete_memory(self, memory_id):
logging.info(f"Deleting memory with {memory_id=}")
logger.info(f"Deleting memory with {memory_id=}")
existing_memory = await asyncio.to_thread(self.vector_store.get, vector_id=memory_id)
prev_value = existing_memory.payload["data"]

Expand Down
6 changes: 4 additions & 2 deletions mem0/vector_stores/pgvector.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ def __init__(
hnsw (bool, optional): Use HNSW for faster search
"""
self.collection_name = collection_name
self.user = user
self.use_diskann = diskann
self.use_hnsw = hnsw
self.embedding_model_dims = embedding_model_dims
Expand All @@ -73,6 +74,7 @@ def create_col(self, embedding_model_dims):
f"""
CREATE TABLE IF NOT EXISTS {self.collection_name} (
id UUID PRIMARY KEY,
tenant_id TEXT NOT NULL,
vector vector({embedding_model_dims}),
payload JSONB
);
Expand Down Expand Up @@ -114,10 +116,10 @@ def insert(self, vectors, payloads=None, ids=None):
logger.info(f"Inserting {len(vectors)} vectors into collection {self.collection_name}")
json_payloads = [json.dumps(payload) for payload in payloads]

data = [(id, vector, payload) for id, vector, payload in zip(ids, vectors, json_payloads)]
data = [(id, self.user, vector, payload) for id, vector, payload in zip(ids, vectors, json_payloads)]
execute_values(
self.cur,
f"INSERT INTO {self.collection_name} (id, vector, payload) VALUES %s",
f"INSERT INTO {self.collection_name} (id, tenant_id, vector, payload) VALUES %s",
data,
)
self.conn.commit()
Expand Down