diff --git a/mem0/memory/main.py b/mem0/memory/main.py index 8b4007fce9..baef5e0e5a 100644 --- a/mem0/memory/main.py +++ b/mem0/memory/main.py @@ -336,7 +336,7 @@ def _add_to_vector_store(self, messages, metadata, filters, infer): response = remove_code_blocks(response) new_retrieved_facts = json.loads(response)["facts"] except Exception as e: - logging.error(f"Error in new_retrieved_facts: {e}") + logger.error(f"Error in new_retrieved_facts: {e}") new_retrieved_facts = [] if not new_retrieved_facts: @@ -360,7 +360,7 @@ def _add_to_vector_store(self, messages, metadata, filters, infer): for item in retrieved_old_memory: unique_data[item["id"]] = item retrieved_old_memory = list(unique_data.values()) - logging.info(f"Total existing memories: {len(retrieved_old_memory)}") + logger.info(f"Total existing memories: {len(retrieved_old_memory)}") # mapping UUIDs with integers for handling UUID hallucinations temp_uuid_mapping = {} @@ -379,14 +379,14 @@ def _add_to_vector_store(self, messages, metadata, filters, infer): response_format={"type": "json_object"}, ) except Exception as e: - logging.error(f"Error in new memory actions response: {e}") + logger.error(f"Error in new memory actions response: {e}") response = "" try: response = remove_code_blocks(response) new_memories_with_actions = json.loads(response) except Exception as e: - logging.error(f"Invalid JSON response: {e}") + logger.error(f"Invalid JSON response: {e}") new_memories_with_actions = {} else: new_memories_with_actions = {} @@ -394,11 +394,11 @@ def _add_to_vector_store(self, messages, metadata, filters, infer): returned_memories = [] try: for resp in new_memories_with_actions.get("memory", []): - logging.info(resp) + logger.info(resp) try: action_text = resp.get("text") if not action_text: - logging.info("Skipping memory entry because of empty `text` field.") + logger.info("Skipping memory entry because of empty `text` field.") continue event_type = resp.get("event") @@ -434,11 +434,11 @@ def _add_to_vector_store(self, messages, metadata, filters, infer): } ) elif event_type == "NONE": - logging.info("NOOP for Memory.") + logger.info("NOOP for Memory.") except Exception as e: - logging.error(f"Error processing memory action: {resp}, Error: {e}") + logger.error(f"Error processing memory action: {resp}, Error: {e}") except Exception as e: - logging.error(f"Error iterating new_memories_with_actions: {e}") + logger.error(f"Error iterating new_memories_with_actions: {e}") keys, encoded_ids = process_telemetry_filters(filters) capture_event( @@ -801,7 +801,7 @@ def history(self, memory_id): return self.db.get_history(memory_id) def _create_memory(self, data, existing_embeddings, metadata=None): - logging.debug(f"Creating memory with {data=}") + logger.debug(f"Creating memory with {data=}") if data in existing_embeddings: embeddings = existing_embeddings[data] else: @@ -922,7 +922,7 @@ def _update_memory(self, memory_id, data, existing_embeddings, metadata=None): return memory_id def _delete_memory(self, memory_id): - logging.info(f"Deleting memory with {memory_id=}") + logger.info(f"Deleting memory with {memory_id=}") existing_memory = self.vector_store.get(vector_id=memory_id) prev_value = existing_memory.payload["data"] self.vector_store.delete(vector_id=memory_id) @@ -1164,7 +1164,7 @@ async def _add_to_vector_store( response = remove_code_blocks(response) new_retrieved_facts = json.loads(response)["facts"] except Exception as e: - logging.error(f"Error in new_retrieved_facts: {e}") + logger.error(f"Error in new_retrieved_facts: {e}") new_retrieved_facts = [] if not new_retrieved_facts: @@ -1195,7 +1195,7 @@ async def process_fact_for_search(new_mem): for item in retrieved_old_memory: unique_data[item["id"]] = item retrieved_old_memory = list(unique_data.values()) - logging.info(f"Total existing memories: {len(retrieved_old_memory)}") + logger.info(f"Total existing memories: {len(retrieved_old_memory)}") # mapping UUIDs with integers for handling UUID hallucinations temp_uuid_mapping = {} @@ -1214,23 +1214,23 @@ async def process_fact_for_search(new_mem): response_format={"type": "json_object"}, ) except Exception as e: - logging.error(f"Error in new memory actions response: {e}") + logger.error(f"Error in new memory actions response: {e}") response = "" try: response = remove_code_blocks(response) new_memories_with_actions = json.loads(response) except Exception as e: - logging.error(f"Invalid JSON response: {e}") + logger.error(f"Invalid JSON response: {e}") new_memories_with_actions = {} returned_memories = [] try: for resp in new_memories_with_actions.get("memory", []): - logging.info(resp) + logger.info(resp) try: action_text = resp.get("text") if not action_text: - logging.info("Skipping memory entry because of empty `text` field.") + logger.info("Skipping memory entry because of empty `text` field.") continue event_type = resp.get("event") @@ -1271,11 +1271,11 @@ async def process_fact_for_search(new_mem): {"id": temp_uuid_mapping[id], "memory": action_text, "event": event_type} ) elif event_type == "NONE": - logging.info("NOOP for Memory (async).") + logger.info("NOOP for Memory (async).") except Exception as e: - logging.error(f"Error processing memory action (async): {resp}, Error: {e}") + logger.error(f"Error processing memory action (async): {resp}, Error: {e}") except Exception as e: - logging.error(f"Error iterating new_memories_with_actions: {e}") + logger.error(f"Error iterating new_memories_with_actions: {e}") keys, encoded_ids = process_telemetry_filters(filters) capture_event( @@ -1641,7 +1641,7 @@ async def history(self, memory_id): return await asyncio.to_thread(self.db.get_history, memory_id) async def _create_memory(self, data, existing_embeddings, metadata=None): - logging.debug(f"Creating memory with {data=}") + logger.debug(f"Creating memory with {data=}") if data in existing_embeddings: embeddings = existing_embeddings[data] else: @@ -1783,7 +1783,7 @@ async def _update_memory(self, memory_id, data, existing_embeddings, metadata=No return memory_id async def _delete_memory(self, memory_id): - logging.info(f"Deleting memory with {memory_id=}") + logger.info(f"Deleting memory with {memory_id=}") existing_memory = await asyncio.to_thread(self.vector_store.get, vector_id=memory_id) prev_value = existing_memory.payload["data"] diff --git a/mem0/vector_stores/pgvector.py b/mem0/vector_stores/pgvector.py index 3b5d4157c8..d163d5410f 100644 --- a/mem0/vector_stores/pgvector.py +++ b/mem0/vector_stores/pgvector.py @@ -49,6 +49,7 @@ def __init__( hnsw (bool, optional): Use HNSW for faster search """ self.collection_name = collection_name + self.user = user self.use_diskann = diskann self.use_hnsw = hnsw self.embedding_model_dims = embedding_model_dims @@ -73,6 +74,7 @@ def create_col(self, embedding_model_dims): f""" CREATE TABLE IF NOT EXISTS {self.collection_name} ( id UUID PRIMARY KEY, + tenant_id TEXT NOT NULL, vector vector({embedding_model_dims}), payload JSONB ); @@ -114,10 +116,10 @@ def insert(self, vectors, payloads=None, ids=None): logger.info(f"Inserting {len(vectors)} vectors into collection {self.collection_name}") json_payloads = [json.dumps(payload) for payload in payloads] - data = [(id, vector, payload) for id, vector, payload in zip(ids, vectors, json_payloads)] + data = [(id, self.user, vector, payload) for id, vector, payload in zip(ids, vectors, json_payloads)] execute_values( self.cur, - f"INSERT INTO {self.collection_name} (id, vector, payload) VALUES %s", + f"INSERT INTO {self.collection_name} (id, tenant_id, vector, payload) VALUES %s", data, ) self.conn.commit()