Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 95 additions & 0 deletions examples/memory_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@

import os
from haystack.components.agents import Agent
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.components.builders.chat_prompt_builder import ChatPromptBuilder
from haystack.core.pipeline import Pipeline
from haystack.tools import tool
from haystack.dataclasses import ChatMessage
from haystack_experimental.memory import Mem0MemoryRetriever, MemoryWriter, Mem0MemoryStore


@tool
def save_user_preference(preference_type: str, preference_value: str) -> str:
"""Save user preferences that should be remembered"""
return f"✅ Saved preference: {preference_type} = {preference_value}"


@tool
def get_recommendation(category: str) -> str:
"""Get personalized recommendations based on user preferences"""
recommendations = {
"food": "Based on your preferences, try the Mediterranean cuisine!",
"music": "I recommend some jazz playlists for you!",
"books": "You might enjoy science fiction novels!",
}
return recommendations.get(category, "I'll learn your preferences to give better recommendations!")


# Create memory store
memory_store = Mem0MemoryStore(api_key=os.getenv("MEM0_API_KEY"))

# Create memory-aware agent
memory_agent = Agent(
chat_generator=OpenAIChatGenerator(model="gpt-4o-mini"),
tools=[save_user_preference, get_recommendation],
system_prompt="""
You are a personal assistant with memory capabilities.
Use the provided memories to personalize your responses and remember user context.
When users share preferences, use the save_user_preference tool.
When asked for recommendations, use the get_recommendation tool.
Be conversational and reference previous interactions when relevant.
""",
exit_conditions=["text"],
max_agent_steps=10,
raise_on_tool_invocation_failure=False
)

# Create the pipeline
agent_memory_pipeline = Pipeline()

# Add components
agent_memory_pipeline.add_component("memory_retriever", Mem0MemoryRetriever(
memory_store=memory_store,
top_k=5
))

agent_memory_pipeline.add_component("prompt_builder", ChatPromptBuilder(
template=[
ChatMessage.from_system(
"Previous conversation context:\n"
"{% for memory in memories %}"
"{{ memory.content }}\n"
"{% endfor %}"
"{% if not memories %}No previous context available.{% endif %}"
),
ChatMessage.from_user("{{ user_query }}")
],
required_variables=["user_query"]
))

agent_memory_pipeline.add_component("agent", memory_agent)
agent_memory_pipeline.add_component("memory_writer", MemoryWriter(memory_store=memory_store))

# Connect components
agent_memory_pipeline.connect("memory_retriever.memories", "prompt_builder.memories")
agent_memory_pipeline.connect("prompt_builder.prompt", "agent.messages")
agent_memory_pipeline.connect("agent.messages", "memory_writer.messages")

# Run the pipeline
user_id = "alice_123"
user_query = "Can you remember this and give me a food recommendation?"

# Get memories and run agent
agent_output = agent_memory_pipeline.run({
"memory_retriever": {
"query": user_query,
"user_id": user_id
},
"prompt_builder": {
"user_query": user_query
},
"memory_writer": {
"user_id": user_id
}
})
48 changes: 48 additions & 0 deletions haystack_experimental/memory/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Memory Components

This directory contains Haystack components for implementing agent memory capabilities.

## Components

### MemoryStore Protocol (`protocol.py`)
Defines the interface for pluggable memory storage backends.

### Mem0MemoryStore (`mem0_store.py`)
Implementation using Mem0 as the backend storage service.

### MemoryRetriever (`memory_retriever.py`)
Component for retrieving relevant memories based on queries.

### MemoryWriter (`memory_writer.py`)
Component for storing chat messages as memories.

## Usage

### Examples

1. **Basic Memory Pipeline** (`examples/memory_pipeline_example.py`)
- Simple memory storage and retrieval
- Different memory types demonstration

2. **Agent Memory Integration** (`examples/agent_memory_pipeline_example.py`)
- Complete agent with memory capabilities
- Memory-aware conversations
- Preference learning and recall
- Session persistence

3. **Simple Agent Memory** (`examples/simple_agent_memory_example.py`)
- Minimal agent memory integration
- Direct pipeline structure
- Easy to understand and modify

## Memory Types

Memories are stored as ChatMessage objects with metadata:
- `memory_type`: "semantic" (facts/preferences) or "episodic" (experiences)
- `user_id`: User identifier for scoping
- `memory_id`: Unique identifier (set by storage backend)

## Requirements

- `pip install mem0ai` for Mem0MemoryStore
- MEM0_API_KEY environment variable
8 changes: 8 additions & 0 deletions haystack_experimental/memory/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# SPDX-FileCopyrightText: 2022-present deepset GmbH <[email protected]>
# SPDX-License-Identifier: Apache-2.0

from .mem0_store import Mem0MemoryStore
from .memory_retriever import Mem0MemoryRetriever
from .memory_writer import MemoryWriter

__all__ = ["Mem0MemoryRetriever", "MemoryWriter", "Mem0MemoryStore"]
143 changes: 143 additions & 0 deletions haystack_experimental/memory/mem0_store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
# SPDX-FileCopyrightText: 2022-present deepset GmbH <[email protected]>
# SPDX-License-Identifier: Apache-2.0

import os
from datetime import datetime
from typing import Any, Optional

from haystack import default_from_dict, default_to_dict
from haystack.dataclasses.chat_message import ChatMessage
from haystack.lazy_imports import LazyImport

with LazyImport(message="Run 'pip install mem0ai'") as mem0_import:
from mem0 import MemoryClient


class Mem0MemoryStore:
"""
A memory store implementation using Mem0 as the backend.

This store provides semantic and episodic memory capabilities for agents
by integrating with the Mem0 memory service. Memories are stored as ChatMessage
objects with memory-specific metadata.

:param api_key: Mem0 API key (if not provided, uses MEM0_API_KEY environment variable)
:param kwargs: Additional configuration parameters for Mem0 client
"""

def __init__(self, api_key: Optional[str] = None, config: Optional[dict[Any, Any]] = None, **kwargs):
mem0_import.check()
self.api_key = api_key or os.getenv("MEM0_API_KEY")
if not self.api_key:
raise ValueError("Mem0 API key must be provided either as parameter or MEM0_API_KEY environment variable")

self.config = config
self.client = (
MemoryClient.from_config(self.config) if self.config else MemoryClient(api_key=self.api_key, **kwargs)
)

def to_dict(self) -> dict[str, Any]:
"""Serialize the store configuration to a dictionary."""
return default_to_dict(self, api_key=self.api_key, config=self.config)

@classmethod
def from_dict(cls, data: dict[str, Any]) -> "Mem0MemoryStore":
"""Deserialize the store from a dictionary."""
return default_from_dict(cls, data)

def add_memories(self, user_id: str, messages: list[ChatMessage]) -> list[str]:
"""
Add ChatMessage memories to Mem0.

:param messages: List of ChatMessage objects with memory metadata
:param user_id: User identifier associated with the memories
:returns: List of memory IDs for the added messages
"""
added_ids = []

for message in messages:
if not message.text:
continue
mem0_message = [{"role": message.role, "content": message.text}]

try:
# Mem0 primarily uses user_id as the main identifier
# org_id and session_id are stored in metadata for filtering
result = self.client.add(messages=mem0_message, user_id=user_id, metadata=message.meta, infer=False)
# Mem0 returns different response formats, handle both
memory_id = result.get("id") or result.get("memory_id") or str(result)
added_ids.append(memory_id)
except Exception as e:
raise RuntimeError(f"Failed to add memory message: {e}") from e

return added_ids

def search_memories(
self,
query: str,
user_id: str,
filters: Optional[dict[str, Any]] = None,
top_k: int = 10,
) -> list[ChatMessage]:
"""
Search for memories in Mem0.

:param query: Text query to search for
:param user_id: User identifier for scoping the search
:param filters: Additional filters to apply on search. For more details on mem0 filters, see https://mem0.ai/docs/search/
:param top_k: Maximum number of results to return
:returns: List of ChatMessage memories matching the criteria
"""
# Prepare filters for Mem0
search_filters = filters or {}

mem0_filters = {"AND": [{"user_id": user_id}, search_filters]}

try:
results = self.client.search(query=query, limit=top_k, filters=mem0_filters, user_id=user_id)
memories = [
ChatMessage.from_assistant(text=result["content"], meta=result["metadata"]) for result in results
]

return memories

except Exception as e:
raise RuntimeError(f"Failed to search memories: {e}") from e

def update_memories(self, messages: list[ChatMessage]) -> int:
"""
Update ChatMessage memories in Mem0.

:param messages: List of ChatMessage memories to update (must have memory_id in meta)
:returns: Number of records actually updated
"""

for message in messages:
memory_id = message.meta.get("memory_id")
if not memory_id:
raise ValueError("ChatMessage must have memory_id in meta to be updated")

metadata = {
"role": message.role.value,
"updated_at": datetime.now().isoformat(),
**{k: v for k, v in message.meta.items() if k not in ["memory_id", "user_id"]},
}

try:
self.client.update(memory_id=memory_id, data=message.text or str(message), metadata=metadata)
except Exception as e:
raise RuntimeError(f"Failed to update memory {memory_id}: {e}") from e

# mem0 doesn't allow passing filter to delete endpoint,
# we can delete all memories for a user by passing the user_id
def delete_all_memories(self, user_id: str):
"""
Delete memory records from Mem0.

:param user_id: User identifier for scoping the deletion
"""

try:
self.client.delete_all(user_id=user_id)
except Exception as e:
raise RuntimeError(f"Failed to delete memories for user {user_id}: {e}") from e
76 changes: 76 additions & 0 deletions haystack_experimental/memory/mem_super_component.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
from haystack import Pipeline, SuperComponent, super_component
from haystack.components.agents import Agent
from haystack.components.builders.chat_prompt_builder import ChatPromptBuilder
from haystack.components.generators.chat import ChatGenerator
from haystack.dataclasses import ChatMessage, Document
from haystack.tools import Tool

from .mem0_store import Mem0MemoryStore
from .memory_retriever import Mem0MemoryRetriever
from .memory_writer import MemoryWriter


@super_component
class AgentMemory:
def __init__(
self,
system_prompt: str,
tools: list[Tool],
chat_generator: ChatGenerator,
exit_conditions: list[str],
max_agent_steps: int,
raise_on_tool_invocation_failure: bool,
):
memory_store = Mem0MemoryStore()
memory_retriever = Mem0MemoryRetriever(memory_store=memory_store)
memory_writer = MemoryWriter(memory_store=memory_store)
agent = Agent(
chat_generator=chat_generator,
tools=tools,
system_prompt=system_prompt,
exit_conditions=exit_conditions,
max_agent_steps=max_agent_steps,
raise_on_tool_invocation_failure=raise_on_tool_invocation_failure,
)
pipeline = Pipeline()
pipeline.add_component("memory_retriever", memory_retriever)

pipeline.add_component(
"prompt_builder",
ChatPromptBuilder(
template=[
ChatMessage.from_system(
"Previous conversation context:\n"
"{% for memory in memories %}"
"{{ memory.content }}\n"
"{% endfor %}"
"{% if not memories %}No previous context available.{% endif %}"
),
ChatMessage.from_user("{{ user_query }}"),
],
required_variables=["user_query"],
),
)

pipeline.add_component("agent", agent)
pipeline.add_component("memory_writer", memory_writer)

# Connect components
pipeline.connect("memory_retriever.memories", "prompt_builder.memories")
pipeline.connect("prompt_builder.prompt", "agent.messages")
pipeline.connect("agent.messages", "memory_writer.messages")

self.output_mapping = {
"agent.messages": "messages",
"memory_writer.messages": "messages",
}
self.input_mapping = {
"query": "retriever.query",
"user_id": ["retriever.user_id", "writer.user_id"],
}

def run(self, *, query: str) -> dict[str, list[Document]]: # noqa: D102
...

def warmup(self) -> None: # noqa: D102
...
Loading
Loading