diff --git a/python-recipes/agents/03_memory_agent.ipynb b/python-recipes/agents/03_memory_agent.ipynb index 9aff82d5..8569cf99 100644 --- a/python-recipes/agents/03_memory_agent.ipynb +++ b/python-recipes/agents/03_memory_agent.ipynb @@ -1,1607 +1,1897 @@ { - "cells": [ - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "![Redis](https://redis.io/wp-content/uploads/2024/04/Logotype.svg?auto=webp&quality=85,75&width=120)\n", - "\n", - "# Agent Memory Using Redis and LangGraph\n", - "This notebook demonstrates how to manage short-term and long-term agent memory using Redis and LangGraph. We'll explore:\n", - "\n", - "1. Short-term memory management using LangGraph's checkpointer\n", - "2. Long-term memory storage and retrieval using RedisVL\n", - "3. Managing long-term memory manually vs. exposing tool access (AKA function-calling)\n", - "4. Managing conversation history size with summarization\n", - "5. Memory consolidation\n", - "\n", - "\n", - "## What We'll Build\n", - "\n", - "We're going to build two versions of a travel agent, one that manages long-term\n", - "memory manually and one that does so using tools the LLM calls.\n", - "\n", - "Here are two diagrams showing the components used in both agents:\n", - "\n", - "![diagram](../../assets/memory-agents.png)\n", - "\n", - "## Let's Begin!\n", - "\"Open" - ] + "cells": [ + { + "cell_type": "markdown", + "metadata": { + "id": "sxdnLVT31nfd" + }, + "source": [ + "![Redis](https://redis.io/wp-content/uploads/2024/04/Logotype.svg?auto=webp&quality=85,75&width=120)\n", + "\n", + "# Agent Memory with Redis\n", + "\n", + "## Introduction\n", + "\n", + "Without memory, AI agents are like goldfish - they forget everything after each conversation and can't learn from past interactions or maintain context across sessions. Agentic systems require both **short-term** and **long-term** memory in order to complete tasks in a personalized and resilient manner. Memory is all about state management and [**Redis**](https://redis.io/try-free/) is the well-known in-memory database for exaclty this kind of use case today in production systems.\n", + "\n", + "## What We'll Build\n", + "\n", + "This tutorial demonstrates how to build a **memory-enabled travel agent** with **Redis** and **LangGraph** that remembers user preferences and provides personalized recommendations. This is a **horizontal concept** that you can take and apply to your own agent use cases.\n", + "\n", + "We'll explore:\n", + "\n", + "1. Short-term memory management using LangGraph's checkpointer\n", + "2. Long-term memory storage and retrieval using RedisVL\n", + "3. Managing long-term memory as a tool for a ReAct agent\n", + "4. Managing conversation history size with summarization" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "Ee3ltHdVvKOD" + }, + "source": [ + "# 🧠 Memory architecture overview\n", + "\n", + "Our agent uses a dual-memory system:\n", + "- **Short-term**: Manages conversation context\n", + "- **Long-term**: Stores persistent knowledge\n", + "\n", + "## Short-term Memory\n", + "The agent tracks chat history using Redis through LangGraph's [checkpointer](https://github.com/redis-developer/langgraph-redis). Each node in the graph (Retrieve Memories, Respond, Summarize) saves its state to Redis, including conversation history and thread metadata.\n", + "\n", + "\n", + "\n", + "To prevent context window pollution, the agent summarizes conversations when they exceed a configurable length.\n", + "\n", + "## Long-term Memory\n", + "\n", + "Long-term memories are stored & indexed in Redis using the RedisVL client, with two types:\n", + "- **Episodic**: User preferences and experiences\n", + "- **Semantic**: General travel knowledge\n", + "\n", + "\n", + "\n", + ">**NOTE**: These memory types align with the [CoALA](https://arxiv.org/abs/2309.02427) paper's concepts. Our agent's procedural memory is encoded in its Python workflow." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Let's Begin\n", + "\"Open" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "0KciGua91nfe" + }, + "source": [ + "---\n", + "\n", + "# Set up our environment\n", + "\n", + "Before diving into the code, let's set up our development environment with the right Python libraries.\n", + "\n", + ">**NOTE**: You may need to restart your kernal after installing libraries." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "0zTUm35H1nfe" + }, + "outputs": [], + "source": [ + "%pip install langchain-openai langgraph-checkpoint langgraph langgraph-checkpoint-redis pydantic" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "8R1hEM7s1nff" + }, + "source": [ + "## Required API keys\n", + "\n", + "You must add an [OpenAI API](https://platform.openai.com/signup) key with billing information for this tutorial." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "365fzPsj1nff" + }, + "outputs": [], + "source": [ + "import getpass\n", + "import os\n", + "\n", + "def _set_env(key: str):\n", + " if key not in os.environ:\n", + " os.environ[key] = getpass.getpass(f\"{key}:\")\n", + "\n", + "\n", + "_set_env(\"OPENAI_API_KEY\")" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "NLkF4GB_1nff" + }, + "source": [ + "## Setup Redis\n", + "\n", + "You have two options for running Redis:\n", + "\n", + "1. **Redis Cloud**: For a fully-managed, seamless experience, use [a free instance of Redis Cloud](https://redis.io/try-free).\n", + "2. **Local Redis**: For a simple, local (non-persistent) Redis instance, run the cell below." + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "zgKbb4ol1nff" + }, + "source": [ + "Run the cell below to get a localized Redis instance on your Google colab server." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "xs7bi1kr1nff" + }, + "outputs": [], + "source": [ + "# NBVAL_SKIP\n", + "\n", + "%%sh\n", + "curl -fsSL https://packages.redis.io/gpg | sudo gpg --dearmor -o /usr/share/keyrings/redis-archive-keyring.gpg\n", + "echo \"deb [signed-by=/usr/share/keyrings/redis-archive-keyring.gpg] https://packages.redis.io/deb $(lsb_release -cs) main\" | sudo tee /etc/apt/sources.list.d/redis.list\n", + "sudo apt-get update > /dev/null 2>&1\n", + "sudo apt-get install redis-stack-server > /dev/null 2>&1\n", + "redis-stack-server --daemonize yes" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "-B8XRKHR1nff" + }, + "source": [ + "Let's test out Redis connection and create a client to communicate with the server." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "dauPT3PT1nff" + }, + "outputs": [], + "source": [ + "import os\n", + "\n", + "from redis import Redis\n", + "\n", + "# Use the environment variable if set, otherwise default to localhost\n", + "REDIS_URL = os.getenv(\"REDIS_URL\", \"redis://localhost:6379\")\n", + "\n", + "redis_client = Redis.from_url(REDIS_URL)\n", + "redis_client.ping()" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "aRxYTTOf1nfg" + }, + "source": [ + "## Prepare memory data models\n", + "\n", + "In this section, we'll create a robust data modeling system for our agent's memory using `Pydantic`. These models will ensure type safety and provide clear data structures for storing and retrieving memories from Redis.\n", + "\n", + "We'll implement four key components:\n", + "\n", + "1. `MemoryType` - An enumeration that categorizes memories into two types:\n", + " - Episodic: Personal experiences and user preferences\n", + " - Semantic: General knowledge and domain facts\n", + "\n", + "2. `Memory` - The core model representing a single memory entry with its content and metadata\n", + "\n", + "3. `Memories` - A container model that holds collections of memory objects\n", + "\n", + "4. `StoredMemory` - A specialized model for memories that have been persisted to Redis\n", + "\n", + "These models work together to create a complete memory lifecycle, from creation to storage and retrieval." + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": { + "id": "Ix6Pe6qG1nfg" + }, + "outputs": [], + "source": [ + "import ulid\n", + "\n", + "from datetime import datetime\n", + "from enum import Enum\n", + "from typing import List, Optional\n", + "from pydantic import BaseModel, Field\n", + "\n", + "\n", + "class MemoryType(str, Enum):\n", + " \"\"\"\n", + " Defines the type of long-term memory for categorization and retrieval.\n", + "\n", + " EPISODIC: Personal experiences and user-specific preferences\n", + " (e.g., \"User prefers Delta airlines\", \"User visited Paris last year\")\n", + "\n", + " SEMANTIC: General domain knowledge and facts\n", + " (e.g., \"Singapore requires passport\", \"Tokyo has excellent public transit\")\n", + "\n", + " The type of a long-term memory.\n", + "\n", + " EPISODIC: User specific experiences and preferences\n", + "\n", + " SEMANTIC: General knowledge on top of the user's preferences and LLM's\n", + " training data.\n", + " \"\"\"\n", + "\n", + " EPISODIC = \"episodic\"\n", + " SEMANTIC = \"semantic\"\n", + "\n", + "\n", + "class Memory(BaseModel):\n", + " \"\"\"Represents a single long-term memory.\"\"\"\n", + "\n", + " content: str\n", + " memory_type: MemoryType\n", + " metadata: str\n", + "\n", + "\n", + "class Memories(BaseModel):\n", + " \"\"\"\n", + " A list of memories extracted from a conversation by an LLM.\n", + "\n", + " NOTE: OpenAI's structured output requires us to wrap the list in an object.\n", + " \"\"\"\n", + "\n", + " memories: List[Memory]\n", + "\n", + "\n", + "class StoredMemory(Memory):\n", + " \"\"\"A stored long-term memory\"\"\"\n", + "\n", + " id: str # The redis key\n", + " memory_id: ulid.ULID = Field(default_factory=lambda: ulid.ULID())\n", + " created_at: datetime = Field(default_factory=datetime.now)\n", + " user_id: Optional[str] = None\n", + " thread_id: Optional[str] = None\n", + " memory_type: Optional[MemoryType] = None" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "P6a03f4b1nfg" + }, + "source": [ + "Now we have type-safe data models that handle the complete memory lifecycle from LLM extraction to Redis storage, with proper metadata tracking for production use. Next, we'll set up the Redis infrastructure to store and search these memories using vector embeddings." + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "T0FBUdRY1nfg" + }, + "source": [ + "# Memory Storage\n", + "\n", + "- **Short-term memory** is handled automatically by `RedisSaver` from `langgraph-checkpoint-redis`.\n", + "- For **long-term memory**, we'll use RedisVL with vector embeddings to enable semantic search of past experiences and knowledge.\n", + "\n", + "Below, we will create a search index schema in Redis to hold our long term memories. The schema has a few different fields including content, memory type, metadata, timestamps, user id, memory id, and the embedding of the memory." + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": { + "id": "D-bfk_Ro1nfg" + }, + "outputs": [], + "source": [ + "from redisvl.index import SearchIndex\n", + "from redisvl.schema.schema import IndexSchema\n", + "\n", + "\n", + "# Define the schema for our vector search index\n", + "# This creates the structure for storing and querying memories\n", + "memory_schema = IndexSchema.from_dict({\n", + " \"index\": {\n", + " \"name\": \"agent_memories\", # Index name for identification\n", + " \"prefix\": \"memory\", # Redis key prefix (memory:1, memory:2, etc.)\n", + " \"key_separator\": \":\",\n", + " \"storage_type\": \"json\",\n", + " },\n", + " \"fields\": [\n", + " {\"name\": \"content\", \"type\": \"text\"},\n", + " {\"name\": \"memory_type\", \"type\": \"tag\"},\n", + " {\"name\": \"metadata\", \"type\": \"text\"},\n", + " {\"name\": \"created_at\", \"type\": \"text\"},\n", + " {\"name\": \"user_id\", \"type\": \"tag\"},\n", + " {\"name\": \"memory_id\", \"type\": \"tag\"},\n", + " {\n", + " \"name\": \"embedding\",\n", + " \"type\": \"vector\",\n", + " \"attrs\": {\n", + " \"algorithm\": \"flat\",\n", + " \"dims\": 1536, # OpenAI embedding dimension\n", + " \"distance_metric\": \"cosine\",\n", + " \"datatype\": \"float32\",\n", + " },\n", + " },\n", + " ],\n", + " }\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "IHUC6A6tvKOF" + }, + "source": [ + "Below we create the `SearchIndex` from the `IndexSchema` and our Redis client connection object. We will overwrite the index spec if its already created!" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/" + }, + "id": "iMHgajwyvKOF", + "outputId": "bc3892c0-6139-4458-e79d-de2249d1da0d" + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Long-term memory index ready\n" + ] + } + ], + "source": [ + "try:\n", + " long_term_memory_index = SearchIndex(\n", + " schema=memory_schema,\n", + " redis_client=redis_client,\n", + " validate_on_load=True\n", + " )\n", + " long_term_memory_index.create(overwrite=True)\n", + " print(\"Long-term memory index ready\")\n", + "except Exception as e:\n", + " print(f\"Error creating index: {e}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "q9J3oIwN24M-" + }, + "source": [ + "Now that the index is created, we can inspect the long term memory index in Redis using the `rvl` cli:" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/" + }, + "id": "smnQbc5-2y_C", + "outputId": "221e0ccd-3857-4983-d500-5095a075e601" + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\n", + "\n", + "Index Information:\n", + "╭────────────────┬────────────────┬────────────────┬────────────────┬────────────────┬\b╮\n", + "│ Index Name │ Storage Type │ Prefixes │ Index Options │ Indexing │\n", + "├────────────────┼────────────────┼────────────────┼────────────────┼────────────────┼\b┤\n", + "| agent_memories | JSON | ['memory'] | [] | 0 |\n", + "╰────────────────┴────────────────┴────────────────┴────────────────┴────────────────┴\b╯\n", + "Index Fields:\n", + "╭─────────────────┬─────────────────┬─────────────────┬─────────────────┬─────────────────┬─────────────────┬─────────────────┬─────────────────┬─────────────────┬─────────────────┬─────────────────┬\b╮\n", + "│ Name │ Attribute │ Type │ Field Option │ Option Value │ Field Option │ Option Value │ Field Option │ Option Value │ Field Option │ Option Value │\n", + "├─────────────────┼─────────────────┼─────────────────┼─────────────────┼─────────────────┼─────────────────┼─────────────────┼─────────────────┼─────────────────┼─────────────────┼─────────────────┼\b┤\n", + "│ $.content │ content │ TEXT │ WEIGHT │ 1 │ │ │ │ │ │ │\n", + "│ $.memory_type │ memory_type │ TAG │ SEPARATOR │ , │ │ │ │ │ │ │\n", + "│ $.metadata │ metadata │ TEXT │ WEIGHT │ 1 │ │ │ │ │ │ │\n", + "│ $.created_at │ created_at │ TEXT │ WEIGHT │ 1 │ │ │ │ │ │ │\n", + "│ $.user_id │ user_id │ TAG │ SEPARATOR │ , │ │ │ │ │ │ │\n", + "│ $.memory_id │ memory_id │ TAG │ SEPARATOR │ , │ │ │ │ │ │ │\n", + "│ $.embedding │ embedding │ VECTOR │ algorithm │ FLAT │ data_type │ FLOAT32 │ dim │ 1536 │ distance_metric │ COSINE │\n", + "╰─────────────────┴─────────────────┴─────────────────┴─────────────────┴─────────────────┴─────────────────┴─────────────────┴─────────────────┴─────────────────┴─────────────────┴─────────────────┴\b╯\n" + ] + } + ], + "source": [ + "!rvl index info -i agent_memories" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "r5ybTN2l1nfg" + }, + "source": [ + "## Functions to access memories\n", + "\n", + "Next, we provide three core functions to access, store and retrieve memories. We will eventually use these in tools for the LLM to call. We will start by loading a vectorizer class to create OpenAI embeddings.\n" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "metadata": { + "id": "ruYpDU_lvKOF" + }, + "outputs": [], + "source": [ + "from redisvl.utils.vectorize.text.openai import OpenAITextVectorizer\n", + "\n", + "openai_embed = OpenAITextVectorizer(model=\"text-embedding-ada-002\")" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "HXLu70owvKOF" + }, + "source": [ + "Next we will set up a simple logger so our functions will record log activity of whats happening." + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": { + "id": "-XIpiadMvKOF" + }, + "outputs": [], + "source": [ + "import logging\n", + "\n", + "# Set up a logger\n", + "logger = logging.getLogger(__name__)" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "eMBbx2MkvKOF" + }, + "source": [ + "### 1. Check for similar memories\n", + "First, we'll write a utility function to check if a memory similar to a given\n", + "memory already exists in the index.\n", + "\n", + "This function checks for duplicate memories in Redis by:\n", + "1. Converting the input content into a vector embedding\n", + "2. Creating filters for user_id and memory_type\n", + "3. Using vector similarity search with a vector range query to find any existing + similar memories\n", + "4. Returning True if a similar memory exists, False otherwise\n", + "\n", + "This helps prevent storing redundant information in the agent's memory." + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "metadata": { + "id": "GN9zPAWO1nfg" + }, + "outputs": [], + "source": [ + "from redisvl.query import VectorRangeQuery\n", + "from redisvl.query.filter import Tag\n", + "\n", + "\n", + "# If we have any memories that aren't associated with a user, we'll use this ID.\n", + "SYSTEM_USER_ID = \"system\"\n", + "\n", + "\n", + "def similar_memory_exists(\n", + " content: str,\n", + " memory_type: MemoryType,\n", + " user_id: str = SYSTEM_USER_ID,\n", + " thread_id: Optional[str] = None,\n", + " distance_threshold: float = 0.1,\n", + ") -> bool:\n", + " \"\"\"Check if a similar long-term memory already exists in Redis.\"\"\"\n", + " content_embedding = openai_embed.embed(content)\n", + "\n", + " filters = (Tag(\"user_id\") == user_id) & (Tag(\"memory_type\") == memory_type)\n", + "\n", + " if thread_id:\n", + " filters = filters & (Tag(\"thread_id\") == thread_id)\n", + "\n", + " # Search for similar memories\n", + " vector_query = VectorRangeQuery(\n", + " vector=content_embedding,\n", + " num_results=1,\n", + " vector_field_name=\"embedding\",\n", + " filter_expression=filters,\n", + " distance_threshold=distance_threshold,\n", + " return_fields=[\"id\"],\n", + " )\n", + " results = long_term_memory_index.query(vector_query)\n", + " logger.debug(f\"Similar memory search results: {results}\")\n", + "\n", + " if results:\n", + " logger.debug(\n", + " f\"{len(results)} similar {'memory' if results.count == 1 else 'memories'} found. First: \"\n", + " f\"{results[0]['id']}. Skipping storage.\"\n", + " )\n", + " return True\n", + "\n", + " return False" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "_zqJwlXx1nfg" + }, + "source": [ + "### 2. Store long-term memories" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "KIu2CrUq1nfg" + }, + "source": [ + "Below is a function that handles storing long-term memories in Redis with built-in deduplication.\n", + "\n", + "It's a key part of our memory system that:\n", + "1. Prevents duplicate memories by checking for similar content\n", + "2. Creates vector embeddings for semantic search capabilities\n", + "3. Stores the memory with relevant metadata for future retrieval\n", + "\n", + "We'll use the `similar_memory_exists()` function when we store memories in order to perform in-line memory deduplication." + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "metadata": { + "id": "oKA39Qp21nfh" + }, + "outputs": [], + "source": [ + "from datetime import datetime\n", + "from typing import List, Optional, Union\n", + "\n", + "import ulid\n", + "\n", + "\n", + "def store_memory(\n", + " content: str,\n", + " memory_type: MemoryType,\n", + " user_id: str = SYSTEM_USER_ID,\n", + " thread_id: Optional[str] = None,\n", + " metadata: Optional[str] = None,\n", + "):\n", + " \"\"\"Store a long-term memory in Redis with deduplication.\n", + "\n", + " This function:\n", + " 1. Checks for similar existing memories to avoid duplicates\n", + " 2. Generates vector embeddings for semantic search\n", + " 3. Stores the memory with metadata for retrieval\n", + " \"\"\"\n", + " if metadata is None:\n", + " metadata = \"{}\"\n", + "\n", + " logger.info(f\"Preparing to store memory: {content}\")\n", + "\n", + " if similar_memory_exists(content, memory_type, user_id, thread_id):\n", + " logger.info(\"Similar memory found, skipping storage\")\n", + " return\n", + "\n", + " embedding = openai_embed.embed(content)\n", + "\n", + " memory_data = {\n", + " \"user_id\": user_id or SYSTEM_USER_ID,\n", + " \"content\": content,\n", + " \"memory_type\": memory_type.value,\n", + " \"metadata\": metadata,\n", + " \"created_at\": datetime.now().isoformat(),\n", + " \"embedding\": embedding,\n", + " \"memory_id\": str(ulid.ULID()),\n", + " \"thread_id\": thread_id,\n", + " }\n", + "\n", + " try:\n", + " long_term_memory_index.load([memory_data])\n", + " except Exception as e:\n", + " logger.error(f\"Error storing memory: {e}\")\n", + " return\n", + "\n", + " logger.info(f\"Stored {memory_type} memory: {content}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "0cpk-m7Z1nfh" + }, + "source": [ + "### 3. Retrieve relevant long-term memories\n", + "And now that we're storing memories, we can retrieve them using vector similarity search with metadata filters using RedisVL.\n", + "\n", + "This function:\n", + "1. Takes a query string, optional filters (memory type, user ID, thread ID), and a distance threshold (semantic)\n", + "2. Creates a vector range query using the query's embedding\n", + "3. Builds a filter object based on passed options\n", + "4. Filters to narrow down the search results\n", + "4. Executes the search and returns parsed memory objects" + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "metadata": { + "id": "xuEAMNjq1nfh" + }, + "outputs": [], + "source": [ + "def retrieve_memories(\n", + " query: str,\n", + " memory_type: Union[Optional[MemoryType], List[MemoryType]] = None,\n", + " user_id: str = SYSTEM_USER_ID,\n", + " thread_id: Optional[str] = None,\n", + " distance_threshold: float = 0.1,\n", + " limit: int = 5,\n", + ") -> List[StoredMemory]:\n", + " \"\"\"Retrieve relevant memories from Redis using vector similarity search.\n", + "\n", + " \"\"\"\n", + " # Create vector query using query embedding\n", + " logger.debug(f\"Retrieving memories for query: {query}\")\n", + " vector_query = VectorRangeQuery(\n", + " vector=openai_embed.embed(query),\n", + " return_fields=[\n", + " \"content\",\n", + " \"memory_type\", \n", + " \"metadata\",\n", + " \"created_at\",\n", + " \"memory_id\",\n", + " \"thread_id\",\n", + " \"user_id\",\n", + " ],\n", + " num_results=limit,\n", + " vector_field_name=\"embedding\",\n", + " dialect=2,\n", + " distance_threshold=distance_threshold,\n", + " )\n", + "\n", + " # Build filter conditions\n", + " base_filters = [f\"@user_id:{{{user_id or SYSTEM_USER_ID}}}\"]\n", + "\n", + " if memory_type:\n", + " if isinstance(memory_type, list):\n", + " base_filters.append(f\"@memory_type:{{{'|'.join(memory_type)}}}\")\n", + " else:\n", + " base_filters.append(f\"@memory_type:{{{memory_type.value}}}\")\n", + "\n", + " if thread_id:\n", + " base_filters.append(f\"@thread_id:{{{thread_id}}}\")\n", + "\n", + " vector_query.set_filter(\" \".join(base_filters))\n", + "\n", + " # Execute vector similarity search\n", + " results = long_term_memory_index.query(vector_query)\n", + "\n", + " # Parse results into StoredMemory objects\n", + " memories = []\n", + " for doc in results:\n", + " try:\n", + " memory = StoredMemory(\n", + " id=doc[\"id\"],\n", + " memory_id=doc[\"memory_id\"],\n", + " user_id=doc[\"user_id\"],\n", + " thread_id=doc.get(\"thread_id\", None),\n", + " memory_type=MemoryType(doc[\"memory_type\"]),\n", + " content=doc[\"content\"],\n", + " created_at=doc[\"created_at\"],\n", + " metadata=doc[\"metadata\"],\n", + " )\n", + " memories.append(memory)\n", + " except Exception as e:\n", + " logger.error(f\"Error parsing memory: {e}\")\n", + " continue\n", + " return memories" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "YinPoLcc1nfh" + }, + "source": [ + "## 🛠️ Managing Long-Term Memory with Tools\n", + "\n", + "Memory operations are exposed as **tools** that the LLM can call to store or retrieve memories.\n", + "\n", + "**Tool-based memory management:**\n", + "- LLM decides when to store/retrieve memories\n", + "- Fewer Redis calls but may miss some context\n", + "- Adds some latency due to LLM decision-making\n", + "\n", + "Alternatively, you can always manually manage memories in your workflows.\n", + "\n", + "**Manual memory management:**\n", + "- More Redis calls but faster response times\n", + "- Extracts more memories, providing richer context\n", + "- Higher token usage due to more context\n", + "\n", + "> NOTE: **This tutorial uses tool-based memory** for optimal balance of control and efficiency.\n", + "\n", + "\"Memory" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "BmwB-sUJ1nfh" + }, + "source": [ + "### Define Agent Tools\n", + "\n", + "Now that we have our storage functions defined, we can create the tools that will enable our agent to interact with the memory system. These tools will be used by the LLM to manage memories during conversations.\n", + "\n", + "Let's start with the Store Memory Tool:\n", + "\n", + "#### Store Memory Tool\n", + "\n", + "This tool enables the agent to save important information as long-term memories in Redis. It's particularly useful for capturing:\n", + "- User preferences and habits\n", + "- Personal experiences and anecdotes\n", + "- Important facts and knowledge shared during conversations\n", + "\n", + "The tool accepts the following parameters:\n", + "- `content`: The actual memory content to store (e.g., \"User prefers window seats on flights\")\n", + "- `memory_type`: The type of memory (e.g., `MemoryType.EPISODIC` for personal experiences, `MemoryType.SEMANTIC` for general knowledge)\n", + "- `metadata`: Optional dictionary for additional context (e.g., timestamps, source, confidence)\n", + "- `config`: Optional configuration for user/thread context (automatically handled by the agent)\n", + "\n", + "When called, the tool:\n", + "1. Validates the input parameters\n", + "2. Stores the memory in Redis with proper indexing\n", + "3. Returns a success message with the stored content\n", + "4. Handles errors gracefully with informative messages\n", + "\n", + "This tool is designed to be used by the LLM to build a persistent memory of the user's preferences and experiences, enabling more personalized and context-aware interactions over time." + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "metadata": { + "id": "T-S0eN4B1nfh" + }, + "outputs": [], + "source": [ + "from typing import Dict, Optional\n", + "\n", + "from langchain_core.tools import tool\n", + "from langchain_core.runnables.config import RunnableConfig\n", + "\n", + "\n", + "@tool\n", + "def store_memory_tool(\n", + " content: str,\n", + " memory_type: MemoryType,\n", + " metadata: Optional[Dict[str, str]] = None,\n", + " config: Optional[RunnableConfig] = None,\n", + ") -> str:\n", + " \"\"\"\n", + " Store a long-term memory in the system.\n", + "\n", + " Use this tool to save important information about user preferences,\n", + " experiences, or general knowledge that might be useful in future\n", + " interactions.\n", + " \"\"\"\n", + " config = config or RunnableConfig()\n", + " user_id = config.get(\"user_id\", SYSTEM_USER_ID)\n", + " thread_id = config.get(\"thread_id\")\n", + "\n", + " try:\n", + " # Store in long-term memory\n", + " store_memory(\n", + " content=content,\n", + " memory_type=memory_type,\n", + " user_id=user_id,\n", + " thread_id=thread_id,\n", + " metadata=str(metadata) if metadata else None,\n", + " )\n", + "\n", + " return f\"Successfully stored {memory_type} memory: {content}\"\n", + " except Exception as e:\n", + " return f\"Error storing memory: {str(e)}\"\n" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "9Am1Z_hItKpc" + }, + "source": [ + "Test the tool:" + ] + }, + { + "cell_type": "code", + "execution_count": 16, + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/", + "height": 35 + }, + "id": "H1-HPwag-im_", + "outputId": "4b883edc-29e2-4666-84ae-4e156b03661c" + }, + "outputs": [ + { + "data": { + "application/vnd.google.colaboratory.intrinsic+json": { + "type": "string" + }, + "text/plain": [ + "'Successfully stored MemoryType.EPISODIC memory: I like flying on Delta when possible'" + ] + }, + "execution_count": 16, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "store_memory_tool.invoke({\"content\": \"I like flying on Delta when possible\", \"memory_type\": \"episodic\"})" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "MkjIWht9vKOG" + }, + "source": [ + "Now that we've seen how to store memories, let's look at how to retrieve them.\n", + "\n", + "#### Retrieve Memoreis Tool\n", + "This tool allows us to search through our stored memories using semantic similarity and filtering.\n", + "\n", + "This tool is particularly useful when you want to:\n", + "- Find relevant past experiences or preferences\n", + "- Filter memories by type (episodic or semantic)\n", + "- Get user-specific information\n", + "- Limit the number of results to keep responses focused\n", + "\n", + "The tool works by:\n", + "1. Taking a query string and searching for semantically similar memories\n", + "2. Filtering results based on memory type\n", + "3. Applying a similarity threshold to ensure relevance\n", + "4. Formatting the results in a clear, readable way" + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "metadata": { + "id": "NEqm-q1ovKOG" + }, + "outputs": [], + "source": [ + "@tool\n", + "def retrieve_memories_tool(\n", + " query: str,\n", + " memory_type: List[MemoryType],\n", + " limit: int = 5,\n", + " config: Optional[RunnableConfig] = None,\n", + ") -> str:\n", + " \"\"\"\n", + " Retrieve long-term memories relevant to the query.\n", + "\n", + " Use this tool to access previously stored information about user\n", + " preferences, experiences, or general knowledge.\n", + " \"\"\"\n", + " config = config or RunnableConfig()\n", + " user_id = config.get(\"user_id\", SYSTEM_USER_ID)\n", + "\n", + " try:\n", + " # Get long-term memories\n", + " stored_memories = retrieve_memories(\n", + " query=query,\n", + " memory_type=memory_type,\n", + " user_id=user_id,\n", + " limit=limit,\n", + " distance_threshold=0.3,\n", + " )\n", + "\n", + " # Format the response\n", + " response = []\n", + "\n", + " if stored_memories:\n", + " response.append(\"Long-term memories:\")\n", + " for memory in stored_memories:\n", + " response.append(f\"- [{memory.memory_type}] {memory.content}\")\n", + "\n", + " return \"\\n\".join(response) if response else \"No relevant memories found.\"\n", + "\n", + " except Exception as e:\n", + " return f\"Error retrieving memories: {str(e)}\"" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "4irYew3pvKON" + }, + "source": [ + "Test the tool:" + ] + }, + { + "cell_type": "code", + "execution_count": 17, + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/", + "height": 35 + }, + "id": "CMlAHmTe9vhN", + "outputId": "95304a90-39c3-42d3-bcdc-d7d6ea6e2191" + }, + "outputs": [ + { + "data": { + "application/vnd.google.colaboratory.intrinsic+json": { + "type": "string" + }, + "text/plain": [ + "'Long-term memories:\\n- [MemoryType.EPISODIC] I like flying on Delta when possible'" + ] + }, + "execution_count": 17, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "retrieve_memories_tool.invoke({\"query\": \"Airline preferences\", \"memory_type\": [\"episodic\"]})" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "PftV2tTG1nfh" + }, + "source": [ + "# 🌎 Build the Travel Agent\n", + "\n", + "## Setting Up the ReAct Agent\n", + "\n", + "We'll use LangGraph's prebuilt components to create a ReAct agent with memory capabilities:\n", + "\n", + "1. **Short-term Memory**: A checkpoint saver tracks conversation history per thread\n", + "2. **Long-term Memory**: We'll extract and store key information from conversations\n", + " - Episodic memories: User preferences and experiences\n", + " - Semantic memories: General travel knowledge\n", + "\n", + "The system will automatically summarize conversations to manage context while preserving important details in long-term storage.\n", + "\n", + "Below we start with setting up the Redis checkpointer (`RedisSaver`) that will handle short term memory for the agent." + ] + }, + { + "cell_type": "code", + "execution_count": 18, + "metadata": { + "id": "QSouau_jvKON" + }, + "outputs": [], + "source": [ + "from langchain_core.messages import AIMessage, SystemMessage\n", + "from langchain_openai import ChatOpenAI\n", + "from langgraph.prebuilt.chat_agent_executor import create_react_agent\n", + "from langgraph.checkpoint.redis import RedisSaver\n", + "\n", + "# Set up the Redis checkpointer for short term memory\n", + "redis_saver = RedisSaver(redis_client=redis_client)\n", + "redis_saver.setup()" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "a8LEro_PvKON" + }, + "source": [ + "Next we define the set of tools for the agent." + ] + }, + { + "cell_type": "code", + "execution_count": 19, + "metadata": { + "id": "EtZo92KuvKON" + }, + "outputs": [], + "source": [ + "# Define the set of tools\n", + "tools = [store_memory_tool, retrieve_memories_tool]" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "e-2IMJaLvKON" + }, + "source": [ + "Configure the LLM from OpenAI." + ] + }, + { + "cell_type": "code", + "execution_count": 20, + "metadata": { + "id": "kWz7rC5_vKON" + }, + "outputs": [], + "source": [ + "# Configure an LLM for the agent with a more creative temperature.\n", + "llm = ChatOpenAI(model=\"gpt-4o\", temperature=0.7).bind_tools(tools)" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "JLKB-V9HvKON" + }, + "source": [ + "Assemble the ReAct agent combining the LLM, tools, checkpointer, and system prompt!" + ] + }, + { + "cell_type": "code", + "execution_count": 21, + "metadata": { + "id": "e-TpYxYb1nfh" + }, + "outputs": [], + "source": [ + "# Defint the travel agent\n", + "travel_agent = create_react_agent(\n", + " model=llm,\n", + " tools=tools, # Long-term memory: provided as a set of custom tools\n", + " checkpointer=redis_saver, # Short-term memory: the conversation history\n", + " prompt=SystemMessage(\n", + " content=\"\"\"\n", + " You are a travel assistant helping users plan their trips. You remember user preferences\n", + " and provide personalized recommendations based on past interactions.\n", + "\n", + " You have access to the following types of memory:\n", + " 1. Short-term memory: The current conversation thread\n", + " 2. Long-term memory:\n", + " - Episodic: User preferences and past trip experiences (e.g., \"User prefers window seats\")\n", + " - Semantic: General knowledge about travel destinations and requirements\n", + "\n", + " Your procedural knowledge (how to search, book flights, etc.) is built into your tools and prompts.\n", + "\n", + " Always be helpful, personal, and context-aware in your responses.\n", + " \"\"\"\n", + " ),\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "htuJmhkY1nfi" + }, + "source": [ + "✅ Now that we have the basic agent in place, we will build a LangGraph workflow that invokes this agent as a node. The graph will consist of three nodes in total. We will move through each one separately." + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "R2mZwvHj1nfi" + }, + "source": [ + "## Node 1: Respond to the user\n", + "In LangGraph, a **node** represents a discrete unit of processing in a workflow. Each node is a function that takes a state object and configuration as input, processes the data, and returns an updated state. Nodes can be connected to form a directed graph that defines the flow of execution.\n", + "\n", + "The `respond_to_user` node (below) is our first node in the travel agent workflow. It serves as the entry point for user interactions and handles the core conversation flow. Here's how it works:\n", + "\n", + "1. It receives the current conversation state and configuration\n", + "2. Extracts any human messages from the state\n", + "3. Invokes our travel agent to generate a response\n", + "4. Handles any errors gracefully\n", + "5. Updates the conversation state with the agent's response\n", + "\n", + "The node uses a custom `RuntimeState` class that inherits from `MessagesState` to maintain the conversation history. This state object is passed between nodes in the graph, allowing each node to access and modify the conversation context as needed." + ] + }, + { + "cell_type": "code", + "execution_count": 22, + "metadata": { + "id": "PFdGi8fd1nfi" + }, + "outputs": [], + "source": [ + "from langchain_core.messages import HumanMessage\n", + "from langgraph.graph.message import MessagesState\n", + "\n", + "\n", + "class RuntimeState(MessagesState):\n", + " \"\"\"Runtime state for the travel agent.\"\"\"\n", + " pass\n", + "\n", + "\n", + "def respond_to_user(state: RuntimeState, config: RunnableConfig) -> RuntimeState:\n", + " \"\"\"Invoke the travel agent to generate a response.\"\"\"\n", + " human_messages = [m for m in state[\"messages\"] if isinstance(m, HumanMessage)]\n", + " if not human_messages:\n", + " logger.warning(\"No HumanMessage found in state\")\n", + " return state\n", + "\n", + " try:\n", + " # Single agent invocation, not streamed (simplified for reliability)\n", + " result = travel_agent.invoke({\"messages\": state[\"messages\"]}, config=config)\n", + " agent_message = result[\"messages\"][-1]\n", + " state[\"messages\"].append(agent_message)\n", + " except Exception as e:\n", + " logger.error(f\"Error invoking travel agent: {e}\")\n", + " agent_message = AIMessage(\n", + " content=\"I'm sorry, I encountered an error processing your request.\"\n", + " )\n", + " state[\"messages\"].append(agent_message)\n", + "\n", + " return state" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "kZyQE3MeyoQw" + }, + "source": [ + "## Node 2: Execute Tools\n", + "\n", + "The `execute_tools` node is a critical component in our travel agent's workflow that bridges the gap between the LLM's decisions and actual tool execution. Positioned after the `respond_to_user` node, it handles the practical side of the agent's tool-using capabilities.\n", + "\n", + "When the LLM determines it needs to use a tool, it includes tool calls in its response. This node then:\n", + "\n", + "1. Scans the conversation history to find the most recent AI message containing tool calls\n", + "2. For each tool call found:\n", + " - Extracts the tool name, arguments, and call ID from the message\n", + " - Matches the tool name against our available tools\n", + " - Executes the tool with the provided arguments\n", + " - Creates a ToolMessage containing the result\n", + "3. Handles any errors that occur during tool execution\n", + "4. Adds all tool results back to the conversation history\n", + "\n", + "This node is essential because it enables our agent to interact with external systems and services while maintaining a coherent conversation flow. Without it, the agent would be limited to just generating text responses without the ability to perform actual actions or retrieve real-time information.\n" + ] + }, + { + "cell_type": "code", + "execution_count": 23, + "metadata": { + "id": "bkFA2_vgyrdZ" + }, + "outputs": [], + "source": [ + "from langchain_core.messages import ToolMessage\n", + "\n", + "\n", + "def execute_tools(state: RuntimeState, config: RunnableConfig) -> RuntimeState:\n", + " \"\"\"Execute tools specified in the latest AIMessage and append ToolMessages.\"\"\"\n", + " messages = state[\"messages\"]\n", + " latest_ai_message = next(\n", + " (m for m in reversed(messages) if isinstance(m, AIMessage) and m.tool_calls),\n", + " None\n", + " )\n", + "\n", + " if not latest_ai_message:\n", + " return state # No tool calls to process\n", + "\n", + " tool_messages = []\n", + " for tool_call in latest_ai_message.tool_calls:\n", + " tool_name = tool_call[\"name\"]\n", + " tool_args = tool_call[\"args\"]\n", + " tool_id = tool_call[\"id\"]\n", + "\n", + " # Find the corresponding tool\n", + " tool = next((t for t in tools if t.name == tool_name), None)\n", + " if not tool:\n", + " continue # Skip if tool not found\n", + "\n", + " try:\n", + " # Execute the tool with the provided arguments\n", + " result = tool.invoke(tool_args, config=config)\n", + " # Create a ToolMessage with the result\n", + " tool_message = ToolMessage(\n", + " content=str(result),\n", + " tool_call_id=tool_id,\n", + " name=tool_name\n", + " )\n", + " tool_messages.append(tool_message)\n", + " except Exception as e:\n", + " # Handle tool execution errors\n", + " error_message = ToolMessage(\n", + " content=f\"Error executing tool '{tool_name}': {str(e)}\",\n", + " tool_call_id=tool_id,\n", + " name=tool_name\n", + " )\n", + " tool_messages.append(error_message)\n", + "\n", + " # Append the ToolMessages to the message history\n", + " messages.extend(tool_messages)\n", + " state[\"messages\"] = messages\n", + " return state" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "LM3oPg101nfi" + }, + "source": [ + "## Node 3: Conversation Summarization\n", + "\n", + "While our Redis-based long-term memory system helps store important information, we still need to manage the immediate conversation context. As the chat progresses, the message history grows, potentially overwhelming the LLM's context window. This is where our third node comes in.\n", + "\n", + "The conversation summarization node acts as a context manager, periodically condensing the chat history into a concise summary. Here's how it works:\n", + "\n", + "1. **Trigger**: The node monitors the message count and triggers summarization after every 6 messages (configurable via `MESSAGE_SUMMARIZATION_THRESHOLD`)\n", + "\n", + "2. **Summarization Process**:\n", + " - Uses GPT-4o with a low temperature (0.3) to ensure consistent, focused summaries\n", + " - Preserves critical information like travel preferences, trip details, and pending questions\n", + " - Replaces older messages with the summary while keeping recent context\n", + "\n", + "3. **Benefits**:\n", + " - Prevents context window overflow\n", + " - Maintains conversation coherence\n", + " - Optimizes token usage while preserving essential context\n", + "\n", + "The resulting summary becomes part of the conversation history, allowing the agent to reference past interactions without carrying the full message load." + ] + }, + { + "cell_type": "code", + "execution_count": 24, + "metadata": { + "id": "KUYw18Xb1nfi" + }, + "outputs": [], + "source": [ + "from langchain_core.messages import RemoveMessage\n", + "\n", + "# An LLM configured for summarization.\n", + "summarizer = ChatOpenAI(model=\"gpt-4o\", temperature=0.3)\n", + "\n", + "# The number of messages after which we'll summarize the conversation.\n", + "MESSAGE_SUMMARIZATION_THRESHOLD = 6\n", + "\n", + "\n", + "def summarize_conversation(\n", + " state: RuntimeState, config: RunnableConfig\n", + ") -> RuntimeState:\n", + " \"\"\"\n", + " Summarize a list of messages into a concise summary to reduce context length\n", + " while preserving important information.\n", + " \"\"\"\n", + " messages = state[\"messages\"]\n", + " current_message_count = len(messages)\n", + " if current_message_count < MESSAGE_SUMMARIZATION_THRESHOLD:\n", + " logger.debug(f\"Not summarizing conversation: {current_message_count}\")\n", + " return state\n", + "\n", + " system_prompt = \"\"\"\n", + " You are a conversation summarizer. Create a concise summary of the previous\n", + " conversation between a user and a travel assistant.\n", + "\n", + " The summary should:\n", + " 1. Highlight key topics, preferences, and decisions\n", + " 2. Include any specific trip details (destinations, dates, preferences)\n", + " 3. Note any outstanding questions or topics that need follow-up\n", + " 4. Be concise but informative\n", + "\n", + " Format your summary as a brief narrative paragraph.\n", + " \"\"\"\n", + "\n", + " message_content = \"\\n\".join(\n", + " [\n", + " f\"{'User' if isinstance(msg, HumanMessage) else 'Assistant'}: {msg.content}\"\n", + " for msg in messages\n", + " ]\n", + " )\n", + "\n", + " # Invoke the summarizer\n", + " summary_messages = [\n", + " SystemMessage(content=system_prompt),\n", + " HumanMessage(\n", + " content=f\"Please summarize this conversation:\\n\\n{message_content}\"\n", + " ),\n", + " ]\n", + "\n", + " summary_response = summarizer.invoke(summary_messages)\n", + "\n", + " logger.info(f\"Summarized {len(messages)} messages into a conversation summary\")\n", + "\n", + " summary_message = SystemMessage(\n", + " content=f\"\"\"\n", + " Summary of the conversation so far:\n", + "\n", + " {summary_response.content}\n", + "\n", + " Please continue the conversation based on this summary and the recent messages.\n", + " \"\"\"\n", + " )\n", + " remove_messages = [\n", + " RemoveMessage(id=msg.id) for msg in messages if msg.id is not None\n", + " ]\n", + "\n", + " state[\"messages\"] = [ # type: ignore\n", + " *remove_messages,\n", + " summary_message,\n", + " state[\"messages\"][-1],\n", + " ]\n", + "\n", + " return state.copy()" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "dpzjQxXi1nfi" + }, + "source": [ + "## Assemble the full graph\n", + "\n", + "🚧 It's time to assemble our graph for end-to-end agent execution. We will attach all three **nodes** we defined above." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "h6TvQaob1nfi" + }, + "outputs": [], + "source": [ + "from langgraph.graph import StateGraph, END\n", + "\n", + "workflow = StateGraph(RuntimeState)\n", + "\n", + "# Add nodes to the graph\n", + "workflow.add_node(\"agent\", respond_to_user)\n", + "workflow.add_node(\"execute_tools\", execute_tools)\n", + "workflow.add_node(\"summarize_conversation\", summarize_conversation)" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "cYGE-DLuvKOO" + }, + "source": [ + "Next, we will tie the nodes together using **edges** which control process flow. There is a conditional edge between the agent node and what comes next. What comes next is based on whether we need to handle + execute a tool call or proceed..." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "61Un_szhvKOO" + }, + "outputs": [], + "source": [ + "def decide_next_step(state):\n", + " latest_ai_message = next((m for m in reversed(state[\"messages\"]) if isinstance(m, AIMessage)), None)\n", + " if latest_ai_message and latest_ai_message.tool_calls:\n", + " return \"execute_tools\"\n", + " return \"summarize_conversation\"\n", + "\n", + "\n", + "workflow.set_entry_point(\"agent\")\n", + "workflow.add_conditional_edges(\n", + " \"agent\",\n", + " decide_next_step,\n", + " {\"execute_tools\": \"execute_tools\", \"summarize_conversation\": \"summarize_conversation\"},\n", + ")\n", + "workflow.add_edge(\"execute_tools\", \"agent\")\n", + "workflow.add_edge(\"summarize_conversation\", END)" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "3L_OEc80vKOO" + }, + "source": [ + "Compile the graph!" + ] + }, + { + "cell_type": "code", + "execution_count": 27, + "metadata": { + "id": "kuwdsVhYvKOO" + }, + "outputs": [], + "source": [ + "graph = workflow.compile(checkpointer=redis_saver)" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "sQSfnQDK1nfo" + }, + "source": [ + "## Testing the Main Agent Loop\n", + "\n", + "Now that we have our workflow graph set up, let's test the main interaction loop. This loop will:\n", + "1. Initialize the conversation state and configuration\n", + "2. Process user input through our workflow\n", + "3. Display the agent's responses\n", + "4. Handle any errors gracefully\n", + "\n", + "The main loop implements the following workflow:\n", + "\n", + "1. Initialization\n", + " - Creates a unique thread ID for conversation tracking\n", + " - Initializes an empty message state for the conversation\n", + "\n", + "2. Input Processing\n", + " - Prompts for user input in a continuous loop\n", + " - Handles empty inputs by skipping to next iteration\n", + " - Provides exit commands (\"exit\" or \"quit\") to end the session\n", + "\n", + "3. Message Flow\n", + " - Converts user input into a HumanMessage\n", + " - Streams the message through our workflow graph\n", + " - Updates conversation state with each processing step\n", + " - Maintains conversation history for context\n", + "\n", + "4. Response Generation\n", + " - Processes the state through our agent workflow\n", + " - Extracts the most recent AI response\n", + " - Displays the response to the user\n", + " - Handles cases where no response is generated\n", + "\n", + "5. Error Handling\n", + " - Catches and logs any processing errors\n", + " - Provides user-friendly error messages\n", + " - Preserves conversation state even when errors occur\n", + " - Ensures graceful recovery from failures" + ] + }, + { + "cell_type": "code", + "execution_count": 28, + "metadata": { + "id": "xD1BTjXY1nfp" + }, + "outputs": [], + "source": [ + "def main(thread_id: str = \"book_flight\", user_id: str = \"demo_user\"):\n", + " \"\"\"Main interaction loop for the travel agent\"\"\"\n", + "\n", + " print(\"Welcome to the Travel Assistant! (Type 'exit' to quit)\")\n", + "\n", + " config = RunnableConfig(configurable={\"thread_id\": thread_id, \"user_id\": user_id})\n", + " state = RuntimeState(messages=[])\n", + "\n", + " while True:\n", + " user_input = input(\"\\nYou (type 'quit' to quit): \")\n", + "\n", + " if not user_input:\n", + " continue\n", + "\n", + " if user_input.lower() in [\"exit\", \"quit\"]:\n", + " print(\"Thank you for using the Travel Assistant. Goodbye!\")\n", + " break\n", + "\n", + " state[\"messages\"].append(HumanMessage(content=user_input))\n", + "\n", + " try:\n", + " # Process user input through the graph\n", + " for result in graph.stream(state, config=config, stream_mode=\"values\"):\n", + " state = RuntimeState(**result)\n", + "\n", + " logger.debug(f\"# of messages after run: {len(state['messages'])}\")\n", + "\n", + " # Find the most recent AI message, so we can print the response\n", + " ai_messages = [m for m in state[\"messages\"] if isinstance(m, AIMessage)]\n", + " if ai_messages:\n", + " message = ai_messages[-1].content\n", + " else:\n", + " logger.error(\"No AI messages after run\")\n", + " message = \"I'm sorry, I couldn't process your request properly.\"\n", + " # Add the error message to the state\n", + " state[\"messages\"].append(AIMessage(content=message))\n", + "\n", + " print(f\"\\nAssistant: {message}\")\n", + "\n", + " except Exception as e:\n", + " logger.exception(f\"Error processing request: {e}\")\n", + " error_message = \"I'm sorry, I encountered an error processing your request.\"\n", + " print(f\"\\nAssistant: {error_message}\")\n", + " # Add the error message to the state\n", + " state[\"messages\"].append(AIMessage(content=error_message))" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "P51RdhnzfZa1" + }, + "source": [ + "Before you try your own, take a look at the current conversation between Tyler and the travel agent. Notice the memory storage actions, the calls to the LLM, and also the conversation summarization that take place during the workflow!" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/" + }, + "id": "C5fg4PH97YGY", + "outputId": "1a6fd03c-e0f5-46a8-9462-76f46260e901" + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Enter a user ID: tyler\n", + "Enter a thread ID: 123\n", + "Welcome to the Travel Assistant! (Type 'exit' to quit)\n", + "13:51:57 __main__ INFO Starting memory consolidation for user tyler\n", + "\n", + "You (type 'quit' to quit): Hi I plan to go to singapore with my wife this summer. We love outdoors activities and trying new kinds of foods. Any good recommendations?\n", + "13:52:30 httpx INFO HTTP Request: POST https://api.openai.com/v1/chat/completions \"HTTP/1.1 200 OK\"\n", + "13:52:30 __main__ INFO Preparing to store memory: User plans to visit Singapore this summer with his wife and they love outdoor activities and trying new kinds of foods.\n", + "13:52:31 httpx INFO HTTP Request: POST https://api.openai.com/v1/embeddings \"HTTP/1.1 200 OK\"\n", + "13:52:31 httpx INFO HTTP Request: POST https://api.openai.com/v1/embeddings \"HTTP/1.1 200 OK\"\n", + "13:52:31 httpx INFO HTTP Request: POST https://api.openai.com/v1/embeddings \"HTTP/1.1 200 OK\"\n", + "13:52:31 __main__ INFO Stored MemoryType.EPISODIC memory: User plans to visit Singapore this summer with his wife and they love outdoor activities and trying new kinds of foods.\n", + "13:52:37 httpx INFO HTTP Request: POST https://api.openai.com/v1/chat/completions \"HTTP/1.1 200 OK\"\n", + "\n", + "Assistant: Singapore is a fantastic destination for outdoor activities and culinary adventures! Here are some recommendations that you and your wife might enjoy:\n", + "\n", + "### Outdoor Activities:\n", + "1. **Gardens by the Bay**: Explore the stunning gardens and the iconic Supertree Grove. You can also walk along the OCBC Skyway for a breathtaking view of the Marina Bay area.\n", + "\n", + "2. **Sentosa Island**: Enjoy a day at the beach, try zip-lining, or explore the numerous attractions like Universal Studios Singapore.\n", + "\n", + "3. **MacRitchie Reservoir**: Go for a hike along the MacRitchie Trails and experience the TreeTop Walk, a suspension bridge spanning the forest canopy.\n", + "\n", + "4. **Pulau Ubin**: Rent a bicycle and explore this rustic island. It's a great place to enjoy nature and see what Singapore was like in the past.\n", + "\n", + "### Food Experiences:\n", + "1. **Hawker Centers**: Visit places like Maxwell Food Centre or Lau Pa Sat to try local dishes such as Hainanese chicken rice, laksa, and chili crab.\n", + "\n", + "2. **Peranakan Cuisine**: Try something different with Peranakan or Nyonya food, which is a blend of Chinese and Malay culinary traditions. \n", + "\n", + "3. **Jumbo Seafood**: Known for their chili crab, this is a must-try for seafood lovers. There are several locations around the city.\n", + "\n", + "4. **Food Tours**: Consider joining a food tour to explore the diverse culinary scene in Singapore and learn about the history and culture behind each dish.\n", + "\n", + "Feel free to ask if you need more details or have specific interests!\n", + "\n", + "You (type 'quit' to quit): Excellent thank you. I would love help booking flights. What are the best routes typically flown from Atlanta to Singapore?\n", + "13:53:24 httpx INFO HTTP Request: POST https://api.openai.com/v1/chat/completions \"HTTP/1.1 200 OK\"\n", + "\n", + "Assistant: Flying from Atlanta to Singapore usually involves at least one stopover, as there are no direct flights. Here are some of the best routes typically flown:\n", + "\n", + "1. **Atlanta (ATL) to Singapore (SIN) via Tokyo (NRT/HND)**:\n", + " - Airlines: Delta Air Lines, Japan Airlines\n", + " - This route often involves a stop in Tokyo, which can be a great opportunity to explore Japan if you have a long layover.\n", + "\n", + "2. **Atlanta (ATL) to Singapore (SIN) via Seoul (ICN)**:\n", + " - Airlines: Korean Air, Delta Air Lines\n", + " - A stopover in Seoul offers another chance for a brief visit in South Korea.\n", + "\n", + "3. **Atlanta (ATL) to Singapore (SIN) via Doha (DOH)**:\n", + " - Airline: Qatar Airways\n", + " - Qatar Airways offers a stop in Doha, which is known for its luxurious airport and facilities.\n", + "\n", + "4. **Atlanta (ATL) to Singapore (SIN) via Dubai (DXB)**:\n", + " - Airline: Emirates\n", + " - Stopping in Dubai can be a fantastic experience with plenty of attractions to explore.\n", + "\n", + "5. **Atlanta (ATL) to Singapore (SIN) via Frankfurt (FRA) or Munich (MUC)**:\n", + " - Airlines: Lufthansa, Singapore Airlines\n", + " - A European stopover provides a different cultural experience.\n", + "\n", + "When booking flights, it's a good idea to consider the total travel time, layover duration, and the amenities offered by the airlines. Let me know if you would like assistance with booking or have any preferences, such as specific airlines or layover cities!\n", + "\n", + "You (type 'quit' to quit): Ok great. As you know we like Delta. But we also prefer to sit first class if possible. Do these kinds of routes offer upgrades normally?\n", + "13:53:32 httpx INFO HTTP Request: POST https://api.openai.com/v1/chat/completions \"HTTP/1.1 200 OK\"\n", + "13:53:32 httpx INFO HTTP Request: POST https://api.openai.com/v1/embeddings \"HTTP/1.1 200 OK\"\n", + "13:53:37 httpx INFO HTTP Request: POST https://api.openai.com/v1/chat/completions \"HTTP/1.1 200 OK\"\n", + "13:53:41 httpx INFO HTTP Request: POST https://api.openai.com/v1/chat/completions \"HTTP/1.1 200 OK\"\n", + "13:53:41 __main__ INFO Summarized 6 messages into a conversation summary\n", + "\n", + "Assistant: Since you prefer Delta Air Lines, you might consider the following route:\n", + "\n", + "### Atlanta (ATL) to Singapore (SIN) via Tokyo (NRT/HND):\n", + "- **Airlines**: Delta Air Lines, Japan Airlines\n", + "- **Class Options**: Delta offers Delta One, their premium cabin, which includes lie-flat seats, premium dining, and other luxury amenities.\n", + "\n", + "Delta typically allows for upgrades to first-class or their premium Delta One cabin, especially if you're a SkyMiles member or have elite status. Upgrades can often be requested at the time of booking or closer to the departure date, depending on availability.\n", + "\n", + "For the best chance of securing an upgrade, consider:\n", + "- Booking early and using any available miles or upgrade certificates.\n", + "- Checking the Delta app or website regularly for upgrade availability.\n", + "- Contacting Delta customer service if you have specific requests or need assistance with your booking.\n", + "\n", + "Let me know if you need help with booking or have any other questions!\n", + "\n", + "You (type 'quit' to quit): Let's hold on booking for now. Back to activities. Based on what you know about me, what do you think we should do? Design the perfect Sunday for me and my wife in Singapore.\n", + "13:54:05 httpx INFO HTTP Request: POST https://api.openai.com/v1/chat/completions \"HTTP/1.1 200 OK\"\n", + "13:54:06 httpx INFO HTTP Request: POST https://api.openai.com/v1/embeddings \"HTTP/1.1 200 OK\"\n", + "13:54:13 httpx INFO HTTP Request: POST https://api.openai.com/v1/chat/completions \"HTTP/1.1 200 OK\"\n", + "\n", + "Assistant: Here's a perfect Sunday itinerary in Singapore, tailored to your love for outdoor activities and culinary experiences:\n", + "\n", + "### Morning:\n", + "- **Breakfast at Tiong Bahru Bakery**: Start your day with a delicious breakfast at this popular bakery known for its croissants and artisanal coffee.\n", + "- **Visit Gardens by the Bay**: Spend the morning exploring this iconic attraction. Don't miss the Supertree Grove and Cloud Forest Dome for a mix of nature and futuristic architecture.\n", + "\n", + "### Midday:\n", + "- **Lunch at Lau Pa Sat Hawker Centre**: Head to this historic food market for a taste of Singapore's diverse street food. Try local favorites like Hainanese chicken rice, satay, and laksa.\n", + "- **Stroll Along Marina Bay**: Enjoy a leisurely walk along Marina Bay and take in the stunning skyline views. You can also visit the Merlion Park for some iconic photo opportunities.\n", + "\n", + "### Afternoon:\n", + "- **Biking at East Coast Park**: Rent a bike and enjoy a ride along the scenic coastline. The park offers a beautiful setting for outdoor activities and relaxation.\n", + "- **Explore Katong and Joo Chiat**: Discover the colorful shophouses and Peranakan culture in these charming neighborhoods. You can also stop by for some traditional Peranakan snacks.\n", + "\n", + "### Evening:\n", + "- **Dinner at a Rooftop Restaurant**: End your day with a romantic dinner at a rooftop restaurant like Level33 or Ce La Vi, offering panoramic views of the city skyline.\n", + "- **Night Safari at Singapore Zoo**: If you're up for some adventure, consider the Night Safari for a unique experience of seeing nocturnal animals in their natural habitat.\n", + "\n", + "This itinerary combines your love for the outdoors with Singapore's rich culinary scene, ensuring a memorable day for you and your wife. Let me know if you'd like more details or adjustments!\n", + "\n", + "You (type 'quit' to quit): Great as long as there are no shellfish items featured as my wife is very allergic.\n", + "13:54:52 httpx INFO HTTP Request: POST https://api.openai.com/v1/chat/completions \"HTTP/1.1 200 OK\"\n", + "13:54:52 __main__ INFO Preparing to store memory: User's wife is allergic to shellfish.\n", + "13:54:58 httpx INFO HTTP Request: POST https://api.openai.com/v1/embeddings \"HTTP/1.1 200 OK\"\n", + "13:54:58 httpx INFO HTTP Request: POST https://api.openai.com/v1/embeddings \"HTTP/1.1 200 OK\"\n", + "13:54:58 __main__ INFO Stored MemoryType.EPISODIC memory: User's wife is allergic to shellfish.\n", + "13:55:00 httpx INFO HTTP Request: POST https://api.openai.com/v1/chat/completions \"HTTP/1.1 200 OK\"\n", + "13:55:03 httpx INFO HTTP Request: POST https://api.openai.com/v1/chat/completions \"HTTP/1.1 200 OK\"\n", + "13:55:03 __main__ INFO Summarized 6 messages into a conversation summary\n", + "\n", + "Assistant: I've noted that your wife is allergic to shellfish. When dining at places like hawker centers or restaurants, it's a good idea to inform the staff about her allergy to ensure a safe dining experience. \n", + "\n", + "If you need more personalized recommendations or adjustments to your itinerary, feel free to ask!\n", + "\n", + "You (type 'quit' to quit): Excellent. Back to booking flights. For our preferred airline, will you recommend best route to get there from the US?\n", + "13:55:32 httpx INFO HTTP Request: POST https://api.openai.com/v1/chat/completions \"HTTP/1.1 200 OK\"\n", + "13:55:32 httpx INFO HTTP Request: POST https://api.openai.com/v1/embeddings \"HTTP/1.1 200 OK\"\n", + "13:55:34 httpx INFO HTTP Request: POST https://api.openai.com/v1/chat/completions \"HTTP/1.1 200 OK\"\n", + "\n", + "Assistant: Given your preference for Delta Air Lines, I recommend flying from Atlanta (ATL) to Singapore (SIN) with a layover in Tokyo (NRT). Delta offers convenient connections through Tokyo, and this route is popular for travelers heading to Singapore.\n", + "\n", + "Would you like me to assist with finding specific flight options or any other details regarding the booking?\n", + "\n", + "You (type 'quit' to quit): Sounds great. We will go with this one. I havent been to Singapore since I was 15 years old when I went with my family during my dads business trip. We had such a great time. I think you are going to help us have a fantastic trip!!\n", + "13:55:52 httpx INFO HTTP Request: POST https://api.openai.com/v1/chat/completions \"HTTP/1.1 200 OK\"\n", + "13:55:52 __main__ INFO Preparing to store memory: User visited Singapore at the age of 15 with their family during their dad's business trip and had a great time.\n", + "13:55:52 httpx INFO HTTP Request: POST https://api.openai.com/v1/embeddings \"HTTP/1.1 200 OK\"\n", + "13:55:52 httpx INFO HTTP Request: POST https://api.openai.com/v1/embeddings \"HTTP/1.1 200 OK\"\n", + "13:55:52 __main__ INFO Stored MemoryType.EPISODIC memory: User visited Singapore at the age of 15 with their family during their dad's business trip and had a great time.\n", + "13:55:54 httpx INFO HTTP Request: POST https://api.openai.com/v1/chat/completions \"HTTP/1.1 200 OK\"\n", + "13:55:57 httpx INFO HTTP Request: POST https://api.openai.com/v1/chat/completions \"HTTP/1.1 200 OK\"\n", + "13:55:57 __main__ INFO Summarized 6 messages into a conversation summary\n", + "\n", + "Assistant: I'm thrilled to be part of planning your trip back to Singapore! It sounds like you have fond memories from your last visit, and I'm here to help make this trip just as memorable. If you need any more assistance with flights or have questions about your itinerary, just let me know!\n", + "\n", + "You (type 'quit' to quit): quit\n", + "Thank you for using the Travel Assistant. Goodbye!\n" + ] + } + ], + "source": [ + "# NBVAL_SKIP\n", + "\n", + "try:\n", + " user_id = input(\"Enter a user ID: \") or \"demo_user\"\n", + " thread_id = input(\"Enter a thread ID: \") or \"demo_thread\"\n", + "except Exception:\n", + " # If we're running in CI, we don't have a terminal to input from, so just exit\n", + " exit()\n", + "else:\n", + " main(thread_id, user_id)\n" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "xmFwUzVo2qxB" + }, + "source": [ + "Let's review what the agent learned about me during the process!" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/" + }, + "id": "-txziZxw2jik", + "outputId": "1ac4bf0b-ec10-4fd2-ae28-8568e9be5829" + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "13:56:11 httpx INFO HTTP Request: POST https://api.openai.com/v1/embeddings \"HTTP/1.1 200 OK\"\n" + ] + }, + { + "data": { + "text/plain": [ + "['Long-term memories:',\n", + " '- [MemoryType.EPISODIC] User plans to visit Singapore this summer with his wife and they love outdoor activities and trying new kinds of foods.',\n", + " \"- [MemoryType.EPISODIC] User visited Singapore at the age of 15 with their family during their dad's business trip and had a great time.\",\n", + " '- [MemoryType.EPISODIC] I like flying on Delta when possible',\n", + " \"- [MemoryType.EPISODIC] User's wife is allergic to shellfish.\"]" + ] + }, + "execution_count": 22, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "# NBVAL_SKIP\n", + "res = retrieve_memories_tool.invoke({\"query\": \"Travel, activity, and dietary preferences\", \"memory_type\": [\"episodic\", \"semantic\"]})\n", + "res.split(\"\\n\")" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "M2UrJxte5HRT" + }, + "source": [ + "Don't forget, we have the RedisVL index we can use to manually query or work with as needed:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/" + }, + "id": "qYm3HQj54WPX", + "outputId": "5db66cb4-e07b-40dd-e4ae-35fed24e283d" + }, + "outputs": [], + "source": [ + "from redisvl.query import CountQuery\n", + "\n", + "# count total long-term memories in Redis\n", + "long_term_memory_index.query(CountQuery())" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "recap-summary" + }, + "source": [ + "___\n", + "\n", + "# 🎓 Recap\n", + "\n", + "You've now learned the fundamentals from scratch and built a **production-ready memory-enabled AI agent** from the ground up. Let's recap the key accomplishments:\n", + "\n", + "## 🏗️ What we built\n", + "\n", + "1. ✅ **Dual-Memory Architecture**: Short-term conversation state + long-term persistent knowledge with LangGraph and Redis\n", + "2. ✅ **Vector-Powered Memory**: Semantic search using RedisVL\n", + "3. ✅ **Smart Deduplication**: Prevents storing similar memories multiple times \n", + "4. ✅ **Tool-Based Memory Management**: LLM controls when to store/retrieve memories \n", + "5. ✅ **Conversation Summarization**: Automatic context window management \n", + "\n", + "**Why Redis?**\n", + "\n", + "- **Performance**: Sub-millisecond memory retrieval at scale \n", + "- **Versatility**: Handles both structured state (checkpoints) and unstructured data (vectors) \n", + "- **Production-Ready**: Built-in persistence, clustering, and high availability \n", + "- **Developer Experience**: Rich ecosystem with tools like RedisVL and AI framework integrations \n", + "\n", + "## 🔧 Alternative memory dev frameworks\n", + "\n", + "While this tutorial shows hands-on implementation, consider these frameworks for faster development:\n", + "\n", + "- **[LangMem](https://github.com/langchain-ai/langmem)**: LangChain's official memory framework\n", + "- **[Mem0](https://github.com/mem0-ai/mem0)**: Dedicated memory layer for AI applications\n", + "\n", + "**When to Use Each Approach:**\n", + "- **Custom Implementation** (this tutorial): Maximum control, specific requirements, learning\n", + "- **LangMem**: LangChain ecosystem integration, rapid prototyping\n", + "- **Mem0**: Multi-application memory sharing, enterprise features\n", + "\n", + "## 🔄 Next Steps\n", + "\n", + "You now have the foundation to build sophisticated, memory-enabled AI agents that feel truly intelligent and personalized.\n", + "\n", + "**Want to learn more?**\n", + "1. [Read more](https://redis.io/blog/build-smarter-ai-agents-manage-short-term-and-long-term-memory-with-redis/) about agent memory patterns with Redis\n", + "2. [Meet with our experts](https://redis.io/meeting/) to get a consultation on your architecture and where Redis can help." + ] + } + ], + "metadata": { + "colab": { + "provenance": [] + }, + "kernelspec": { + "display_name": ".venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.11" + } }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Setup\n", - "\n", - "### Packages" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "%pip install -q langchain-openai langgraph-checkpoint langgraph-checkpoint-redis \"langchain-community>=0.2.11\" tavily-python langchain-redis pydantic ulid" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Required API Keys\n", - "\n", - "You must add an OpenAI API key with billing information for this lesson. You will also need\n", - "a Tavily API key. Tavily API keys come with free credits at the time of this writing." - ] - }, - { - "cell_type": "code", - "execution_count": 19, - "metadata": {}, - "outputs": [], - "source": [ - "# NBVAL_SKIP\n", - "import getpass\n", - "import os\n", - "\n", - "\n", - "def _set_env(key: str):\n", - " if key not in os.environ:\n", - " os.environ[key] = getpass.getpass(f\"{key}:\")\n", - "\n", - "\n", - "_set_env(\"OPENAI_API_KEY\")\n", - "\n", - "# Uncomment this if you have a Tavily API key and want to\n", - "# use the web search tool.\n", - "# _set_env(\"TAVILY_API_KEY\")" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Run redis\n", - "\n", - "### For colab\n", - "\n", - "Convert the following cell to Python to run it in Colab." - ] - }, - { - "cell_type": "code", - "execution_count": 10, - "metadata": {}, - "outputs": [], - "source": [ - "%%sh\n", - "# Exit if this is not running in Colab\n", - "if [ -z \"$COLAB_RELEASE_TAG\" ]; then\n", - " exit 0\n", - "fi\n", - "\n", - "curl -fsSL https://packages.redis.io/gpg | sudo gpg --dearmor -o /usr/share/keyrings/redis-archive-keyring.gpg\n", - "echo \"deb [signed-by=/usr/share/keyrings/redis-archive-keyring.gpg] https://packages.redis.io/deb $(lsb_release -cs) main\" | sudo tee /etc/apt/sources.list.d/redis.list\n", - "sudo apt-get update > /dev/null 2>&1\n", - "sudo apt-get install redis-stack-server > /dev/null 2>&1\n", - "redis-stack-server --daemonize yes" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "#### For Alternative Environments\n", - "There are many ways to get the necessary redis-stack instance running\n", - "1. On cloud, deploy a [FREE instance of Redis in the cloud](https://redis.com/try-free/). Or, if you have your\n", - "own version of Redis Enterprise running, that works too!\n", - "2. Per OS, [see the docs](https://redis.io/docs/latest/operate/oss_and_stack/install/install-stack/)\n", - "3. With docker: `docker run -d --name redis-stack-server -p 6379:6379 redis/redis-stack-server:latest`\n", - "\n", - "## Test connection" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "import os\n", - "from redis import Redis\n", - "\n", - "# Use the environment variable if set, otherwise default to localhost\n", - "REDIS_URL = os.getenv(\"REDIS_URL\", \"redis://localhost:6379\")\n", - "\n", - "redis_client = Redis.from_url(REDIS_URL)\n", - "redis_client.ping()" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Short-Term vs. Long-Term Memory\n", - "\n", - "The agent uses **short-term memory** and **long-term memory**. The implementations\n", - "of short-term and long-term memory differ, as does how the agent uses them. Let's\n", - "dig into the details. We'll return to code soon!\n", - "\n", - "### Short-Term Memory\n", - "\n", - "For short-term memory, the agent keeps track of conversation history with Redis.\n", - "Because this is a LangGraph agent, we use the `RedisSaver` class to achieve\n", - "this. `RedisSaver` is what LangGraph refers to as a _checkpointer_. You can read\n", - "more about checkpointers in the [LangGraph\n", - "documentation](https://langchain-ai.github.io/langgraph/concepts/persistence/).\n", - "In short, they store state for each node in the graph, which for this agent\n", - "includes conversation history.\n", - "\n", - "Here's a diagram showing how the agent uses Redis for short-term memory. Each node\n", - "in the graph (Retrieve Users, Respond, Summarize Conversation) persists its \"state\"\n", - "to Redis. The state object contains the agent's message conversation history for\n", - "the current thread.\n", - "\n", - "\n", - "\n", - "If Redis persistence is on, then Redis will persist short-term memory to\n", - "disk. This means if you quit the agent and return with the same thread ID and\n", - "user ID, you'll resume the same conversation.\n", - "\n", - "Conversation histories can grow long and pollute an LLM's context window. To manage\n", - "this, after every \"turn\" of a conversation, the agent summarizes messages when the\n", - "conversation grows past a configurable threshold. Checkpointers do not do this by\n", - "default, so we've created a node in the graph for summarization.\n", - "\n", - "**NOTE**: We'll see example code for the summarization node later in this notebook.\n", - "\n", - "### Long-Term Memory\n", - "\n", - "Aside from conversation history, the agent stores long-term memories in a search\n", - "index in Redis, using [RedisVL](https://docs.redisvl.com/en/latest/). Here's a\n", - "diagram showing the components involved:\n", - "\n", - "\n", - "\n", - "The agent tracks two types of long-term memories:\n", - "\n", - "- **Episodic**: User-specific experiences and preferences\n", - "- **Semantic**: General knowledge about travel destinations and requirements\n", - "\n", - "**NOTE** If you're familiar with the [CoALA\n", - "paper](https://arxiv.org/abs/2309.02427), the terms \"episodic\" and \"semantic\"\n", - "here map to the same concepts in the paper. CoALA discusses a third type of\n", - "memory, _procedural_. In our example, we consider logic encoded in Python in the\n", - "agent codebase to be its procedural memory.\n", - "\n", - "### Representing Long-Term Memory in Python\n", - "We use a couple of Pydantic models to represent long-term memories, both before\n", - "and after they're stored in Redis:" - ] - }, - { - "cell_type": "code", - "execution_count": 12, - "metadata": {}, - "outputs": [], - "source": [ - "from datetime import datetime\n", - "from enum import Enum\n", - "from typing import List, Optional\n", - "\n", - "from pydantic import BaseModel, Field\n", - "import ulid\n", - "\n", - "\n", - "class MemoryType(str, Enum):\n", - " \"\"\"\n", - " The type of a long-term memory.\n", - "\n", - " EPISODIC: User specific experiences and preferences\n", - "\n", - " SEMANTIC: General knowledge on top of the user's preferences and LLM's\n", - " training data.\n", - " \"\"\"\n", - "\n", - " EPISODIC = \"episodic\"\n", - " SEMANTIC = \"semantic\"\n", - "\n", - "\n", - "class Memory(BaseModel):\n", - " \"\"\"Represents a single long-term memory.\"\"\"\n", - "\n", - " content: str\n", - " memory_type: MemoryType\n", - " metadata: str\n", - " \n", - " \n", - "class Memories(BaseModel):\n", - " \"\"\"\n", - " A list of memories extracted from a conversation by an LLM.\n", - "\n", - " NOTE: OpenAI's structured output requires us to wrap the list in an object.\n", - " \"\"\"\n", - "\n", - " memories: List[Memory]\n", - "\n", - "\n", - "class StoredMemory(Memory):\n", - " \"\"\"A stored long-term memory\"\"\"\n", - "\n", - " id: str # The redis key\n", - " memory_id: ulid.ULID = Field(default_factory=lambda: ulid.ULID())\n", - " created_at: datetime = Field(default_factory=datetime.now)\n", - " user_id: Optional[str] = None\n", - " thread_id: Optional[str] = None\n", - " memory_type: Optional[MemoryType] = None\n", - " \n", - " \n", - "class MemoryStrategy(str, Enum):\n", - " \"\"\"\n", - " Supported strategies for managing long-term memory.\n", - " \n", - " This notebook supports two strategies for working with long-term memory:\n", - "\n", - " TOOLS: The LLM decides when to store and retrieve long-term memories, using\n", - " tools (AKA, function-calling) to do so.\n", - "\n", - " MANUAL: The agent manually retrieves long-term memories relevant to the\n", - " current conversation before sending every message and analyzes every\n", - " response to extract memories to store.\n", - "\n", - " NOTE: In both cases, the agent runs a background thread to consolidate\n", - " memories, and a workflow step to summarize conversations after the history\n", - " grows past a threshold.\n", - " \"\"\"\n", - "\n", - " TOOLS = \"tools\"\n", - " MANUAL = \"manual\"\n", - " \n", - " \n", - "# By default, we'll use the manual strategy\n", - "memory_strategy = MemoryStrategy.MANUAL" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "We'll return to these models soon to see them in action!" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Short-Term Memory Storage and Retrieval\n", - "\n", - "The `RedisSaver` class handles the basics of short-term memory storage for us,\n", - "so we don't need to do anything here." - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Long-Term Memory Storage and Retrieval\n", - "\n", - "We use RedisVL to store and retrieve long-term memories with vector embeddings.\n", - "This allows for semantic search of past experiences and knowledge.\n", - "\n", - "Let's set up a new search index to store and query memories:" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "from redisvl.index import SearchIndex\n", - "from redisvl.schema.schema import IndexSchema\n", - "\n", - "# Define schema for long-term memory index\n", - "memory_schema = IndexSchema.from_dict({\n", - " \"index\": {\n", - " \"name\": \"agent_memories\",\n", - " \"prefix\": \"memory:\",\n", - " \"key_separator\": \":\",\n", - " \"storage_type\": \"json\",\n", - " },\n", - " \"fields\": [\n", - " {\"name\": \"content\", \"type\": \"text\"},\n", - " {\"name\": \"memory_type\", \"type\": \"tag\"},\n", - " {\"name\": \"metadata\", \"type\": \"text\"},\n", - " {\"name\": \"created_at\", \"type\": \"text\"},\n", - " {\"name\": \"user_id\", \"type\": \"tag\"},\n", - " {\"name\": \"memory_id\", \"type\": \"tag\"},\n", - " {\n", - " \"name\": \"embedding\",\n", - " \"type\": \"vector\",\n", - " \"attrs\": {\n", - " \"algorithm\": \"flat\",\n", - " \"dims\": 1536, # OpenAI embedding dimension\n", - " \"distance_metric\": \"cosine\",\n", - " \"datatype\": \"float32\",\n", - " },\n", - " },\n", - " ],\n", - " }\n", - ")\n", - "\n", - "# Create search index\n", - "try:\n", - " long_term_memory_index = SearchIndex(\n", - " schema=memory_schema, redis_client=redis_client, overwrite=True\n", - " )\n", - " long_term_memory_index.create()\n", - " print(\"Long-term memory index ready\")\n", - "except Exception as e:\n", - " print(f\"Error creating index: {e}\")" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Storage and Retrieval Functions\n", - "\n", - "Now that we have a search index in Redis, we can write functions to store and\n", - "retrieve memories. We can use RedisVL to write these.\n", - "\n", - "First, we'll write a utility function to check if a memory similar to a given\n", - "memory already exists in the index. Later, we can use this to avoid storing\n", - "duplicate memories.\n", - "\n", - "#### Checking for Similar Memories" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "import logging\n", - "\n", - "from redisvl.query import VectorRangeQuery\n", - "from redisvl.query.filter import Tag\n", - "from redisvl.utils.vectorize.text.openai import OpenAITextVectorizer\n", - "\n", - "\n", - "logger = logging.getLogger(__name__)\n", - "\n", - "# If we have any memories that aren't associated with a user, we'll use this ID.\n", - "SYSTEM_USER_ID = \"system\"\n", - "\n", - "openai_embed = OpenAITextVectorizer(model=\"text-embedding-ada-002\")\n", - "\n", - "# Change this to MemoryStrategy.TOOLS to use function-calling to store and\n", - "# retrieve memories.\n", - "memory_strategy = MemoryStrategy.MANUAL\n", - "\n", - "\n", - "def similar_memory_exists(\n", - " content: str,\n", - " memory_type: MemoryType,\n", - " user_id: str = SYSTEM_USER_ID,\n", - " thread_id: Optional[str] = None,\n", - " distance_threshold: float = 0.1,\n", - ") -> bool:\n", - " \"\"\"Check if a similar long-term memory already exists in Redis.\"\"\"\n", - " query_embedding = openai_embed.embed(content)\n", - " filters = (Tag(\"user_id\") == user_id) & (Tag(\"memory_type\") == memory_type)\n", - " if thread_id:\n", - " filters = filters & (Tag(\"thread_id\") == thread_id)\n", - "\n", - " # Search for similar memories\n", - " vector_query = VectorRangeQuery(\n", - " vector=query_embedding,\n", - " num_results=1,\n", - " vector_field_name=\"embedding\",\n", - " filter_expression=filters,\n", - " distance_threshold=distance_threshold,\n", - " return_fields=[\"id\"],\n", - " )\n", - " results = long_term_memory_index.query(vector_query)\n", - " logger.debug(f\"Similar memory search results: {results}\")\n", - "\n", - " if results:\n", - " logger.debug(\n", - " f\"{len(results)} similar {'memory' if results.count == 1 else 'memories'} found. First: \"\n", - " f\"{results[0]['id']}. Skipping storage.\"\n", - " )\n", - " return True\n", - "\n", - " return False\n" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "#### Storing and Retrieving Long-Term Memories" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "We'll use the `similar_memory_exists()` function when we store memories:" - ] - }, - { - "cell_type": "code", - "execution_count": 89, - "metadata": {}, - "outputs": [], - "source": [ - "\n", - "from datetime import datetime\n", - "from typing import List, Optional, Union\n", - "\n", - "import ulid\n", - "\n", - "\n", - "def store_memory(\n", - " content: str,\n", - " memory_type: MemoryType,\n", - " user_id: str = SYSTEM_USER_ID,\n", - " thread_id: Optional[str] = None,\n", - " metadata: Optional[str] = None,\n", - "):\n", - " \"\"\"Store a long-term memory in Redis, avoiding duplicates.\"\"\"\n", - " if metadata is None:\n", - " metadata = \"{}\"\n", - "\n", - " logger.info(f\"Preparing to store memory: {content}\")\n", - "\n", - " if similar_memory_exists(content, memory_type, user_id, thread_id):\n", - " logger.info(\"Similar memory found, skipping storage\")\n", - " return\n", - "\n", - " embedding = openai_embed.embed(content)\n", - "\n", - " memory_data = {\n", - " \"user_id\": user_id or SYSTEM_USER_ID,\n", - " \"content\": content,\n", - " \"memory_type\": memory_type.value,\n", - " \"metadata\": metadata,\n", - " \"created_at\": datetime.now().isoformat(),\n", - " \"embedding\": embedding,\n", - " \"memory_id\": str(ulid.ULID()),\n", - " \"thread_id\": thread_id,\n", - " }\n", - "\n", - " try:\n", - " long_term_memory_index.load([memory_data])\n", - " except Exception as e:\n", - " logger.error(f\"Error storing memory: {e}\")\n", - " return\n", - "\n", - " logger.info(f\"Stored {memory_type} memory: {content}\")\n", - " \n" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "And now that we're storing memories, we can retrieve them:" - ] - }, - { - "cell_type": "code", - "execution_count": 90, - "metadata": {}, - "outputs": [], - "source": [ - "def retrieve_memories(\n", - " query: str,\n", - " memory_type: Union[Optional[MemoryType], List[MemoryType]] = None,\n", - " user_id: str = SYSTEM_USER_ID,\n", - " thread_id: Optional[str] = None,\n", - " distance_threshold: float = 0.1,\n", - " limit: int = 5,\n", - ") -> List[StoredMemory]:\n", - " \"\"\"Retrieve relevant memories from Redis\"\"\"\n", - " # Create vector query\n", - " logger.debug(f\"Retrieving memories for query: {query}\")\n", - " vector_query = VectorRangeQuery(\n", - " vector=openai_embed.embed(query),\n", - " return_fields=[\n", - " \"content\",\n", - " \"memory_type\",\n", - " \"metadata\",\n", - " \"created_at\",\n", - " \"memory_id\",\n", - " \"thread_id\",\n", - " \"user_id\",\n", - " ],\n", - " num_results=limit,\n", - " vector_field_name=\"embedding\",\n", - " dialect=2,\n", - " distance_threshold=distance_threshold,\n", - " )\n", - "\n", - " base_filters = [f\"@user_id:{{{user_id or SYSTEM_USER_ID}}}\"]\n", - "\n", - " if memory_type:\n", - " if isinstance(memory_type, list):\n", - " base_filters.append(f\"@memory_type:{{{'|'.join(memory_type)}}}\")\n", - " else:\n", - " base_filters.append(f\"@memory_type:{{{memory_type.value}}}\")\n", - "\n", - " if thread_id:\n", - " base_filters.append(f\"@thread_id:{{{thread_id}}}\")\n", - "\n", - " vector_query.set_filter(\" \".join(base_filters))\n", - "\n", - " # Execute search\n", - " results = long_term_memory_index.query(vector_query)\n", - "\n", - " # Parse results\n", - " memories = []\n", - " for doc in results:\n", - " try:\n", - " memory = StoredMemory(\n", - " id=doc[\"id\"],\n", - " memory_id=doc[\"memory_id\"],\n", - " user_id=doc[\"user_id\"],\n", - " thread_id=doc.get(\"thread_id\", None),\n", - " memory_type=MemoryType(doc[\"memory_type\"]),\n", - " content=doc[\"content\"],\n", - " created_at=doc[\"created_at\"],\n", - " metadata=doc[\"metadata\"],\n", - " )\n", - " memories.append(memory)\n", - " except Exception as e:\n", - " logger.error(f\"Error parsing memory: {e}\")\n", - " continue\n", - " return memories" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Managing Long-Term Memory Manually vs. Calling Tools\n", - "\n", - "While making LLM queries, agents can store and retrieve relevant long-term\n", - "memories in one of two ways (and more, but these are the two we'll discuss):\n", - "\n", - "1. Expose memory retrieval and storage as \"tools\" that the LLM can decide to call contextually.\n", - "2. Manually augment prompts with relevant memories, and manually extract and store relevant memories.\n", - "\n", - "These approaches both have tradeoffs.\n", - "\n", - "**Tool-calling** leaves the decision to store a memory or find relevant memories\n", - "up to the LLM. This can add latency to requests. It will generally result in\n", - "fewer calls to Redis but will also sometimes miss out on retrieving potentially\n", - "relevant context and/or extracting relevant memories from a conversation.\n", - "\n", - "**Manual memory management** will result in more calls to Redis but will produce\n", - "fewer round-trip LLM requests, reducing latency. Manually extracting memories\n", - "will generally extract more memories than tool calls, which will store more data\n", - "in Redis and should result in more context added to LLM requests. More context\n", - "means more contextual awareness but also higher token spend.\n", - "\n", - "You can test both approaches with this agent by changing the `memory_strategy`\n", - "variable.\n", - "\n", - "## Managing Memory Manually\n", - "With the manual memory management strategy, we're going to extract memories after\n", - "every interaction between the user and the agent. We're then going to retrieve\n", - "those memories during future interactions before we send the query.\n", - "\n", - "### Extracting Memories\n", - "We'll call this `extract_memories` function manually after each interaction:" - ] - }, - { - "cell_type": "code", - "execution_count": 91, - "metadata": {}, - "outputs": [], - "source": [ - "from langchain_core.messages import HumanMessage\n", - "from langchain_core.runnables.config import RunnableConfig\n", - "from langchain_openai import ChatOpenAI\n", - "from langgraph.graph.message import MessagesState\n", - "\n", - "\n", - "class RuntimeState(MessagesState):\n", - " \"\"\"Agent state (just messages for now)\"\"\"\n", - "\n", - " pass\n", - "\n", - "\n", - "memory_llm = ChatOpenAI(model=\"gpt-4o\", temperature=0.3).with_structured_output(\n", - " Memories\n", - ")\n", - "\n", - "\n", - "def extract_memories(\n", - " last_processed_message_id: Optional[str],\n", - " state: RuntimeState,\n", - " config: RunnableConfig,\n", - ") -> Optional[str]:\n", - " \"\"\"Extract and store memories in long-term memory\"\"\"\n", - " logger.debug(f\"Last message ID is: {last_processed_message_id}\")\n", - "\n", - " if len(state[\"messages\"]) < 3: # Need at least a user message and agent response\n", - " logger.debug(\"Not enough messages to extract memories\")\n", - " return last_processed_message_id\n", - "\n", - " user_id = config.get(\"configurable\", {}).get(\"user_id\", None)\n", - " if not user_id:\n", - " logger.warning(\"No user ID found in config when extracting memories\")\n", - " return last_processed_message_id\n", - "\n", - " # Get the messages\n", - " messages = state[\"messages\"]\n", - "\n", - " # Find the newest message ID (or None if no IDs)\n", - " newest_message_id = None\n", - " for msg in reversed(messages):\n", - " if hasattr(msg, \"id\") and msg.id:\n", - " newest_message_id = msg.id\n", - " break\n", - "\n", - " logger.debug(f\"Newest message ID is: {newest_message_id}\")\n", - "\n", - " # If we've already processed up to this message ID, skip\n", - " if (\n", - " last_processed_message_id\n", - " and newest_message_id\n", - " and last_processed_message_id == newest_message_id\n", - " ):\n", - " logger.debug(f\"Already processed messages up to ID {newest_message_id}\")\n", - " return last_processed_message_id\n", - "\n", - " # Find the index of the message with last_processed_message_id\n", - " start_index = 0\n", - " if last_processed_message_id:\n", - " for i, msg in enumerate(messages):\n", - " if hasattr(msg, \"id\") and msg.id == last_processed_message_id:\n", - " start_index = i + 1 # Start processing from the next message\n", - " break\n", - "\n", - " # Check if there are messages to process\n", - " if start_index >= len(messages):\n", - " logger.debug(\"No new messages to process since last processed message\")\n", - " return newest_message_id\n", - "\n", - " # Get only the messages after the last processed message\n", - " messages_to_process = messages[start_index:]\n", - "\n", - " # If there are not enough messages to process, include some context\n", - " if len(messages_to_process) < 3 and start_index > 0:\n", - " # Include up to 3 messages before the start_index for context\n", - " context_start = max(0, start_index - 3)\n", - " messages_to_process = messages[context_start:]\n", - "\n", - " # Format messages for the memory agent\n", - " message_history = \"\\n\".join(\n", - " [\n", - " f\"{'User' if isinstance(msg, HumanMessage) else 'Assistant'}: {msg.content}\"\n", - " for msg in messages_to_process\n", - " ]\n", - " )\n", - "\n", - " prompt = f\"\"\"\n", - " You are a long-memory manager. Your job is to analyze this message history\n", - " and extract information that might be useful in future conversations.\n", - " \n", - " Extract two types of memories:\n", - " 1. EPISODIC: Personal experiences and preferences specific to this user\n", - " Example: \"User prefers window seats\" or \"User had a bad experience in Paris\"\n", - " \n", - " 2. SEMANTIC: General facts and knowledge about travel that could be useful\n", - " Example: \"The best time to visit Japan is during cherry blossom season in April\"\n", - " \n", - " For each memory, provide:\n", - " - Type: The memory type (EPISODIC/SEMANTIC)\n", - " - Content: The actual information to store\n", - " - Metadata: Relevant tags and context (as JSON)\n", - " \n", - " IMPORTANT RULES:\n", - " 1. Only extract information that would be genuinely useful for future interactions.\n", - " 2. Do not extract procedural knowledge - that is handled by the system's built-in tools and prompts.\n", - " 3. You are a large language model, not a human - do not extract facts that you already know.\n", - " \n", - " Message history:\n", - " {message_history}\n", - " \n", - " Extracted memories:\n", - " \"\"\"\n", - "\n", - " memories_to_store: Memories = memory_llm.invoke([HumanMessage(content=prompt)]) # type: ignore\n", - "\n", - " # Store each extracted memory\n", - " for memory_data in memories_to_store.memories:\n", - " store_memory(\n", - " content=memory_data.content,\n", - " memory_type=memory_data.memory_type,\n", - " user_id=user_id,\n", - " metadata=memory_data.metadata,\n", - " )\n", - "\n", - " # Return data with the newest processed message ID\n", - " return newest_message_id" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "We'll use this function in a background thread. We'll start the thread in manual\n", - "memory mode but not in tool mode, and we'll run it as a worker that pulls\n", - "message histories from a `Queue` to process:" - ] - }, - { - "cell_type": "code", - "execution_count": 92, - "metadata": {}, - "outputs": [], - "source": [ - "import time\n", - "from queue import Queue\n", - "\n", - "\n", - "DEFAULT_MEMORY_WORKER_INTERVAL = 5 * 60 # 5 minutes\n", - "DEFAULT_MEMORY_WORKER_BACKOFF_INTERVAL = 10 * 60 # 10 minutes\n", - "\n", - "\n", - "def memory_worker(\n", - " memory_queue: Queue,\n", - " user_id: str,\n", - " interval: int = DEFAULT_MEMORY_WORKER_INTERVAL,\n", - " backoff_interval: int = DEFAULT_MEMORY_WORKER_BACKOFF_INTERVAL,\n", - "):\n", - " \"\"\"Worker function that processes long-term memory extraction requests\"\"\"\n", - " key = f\"memory_worker:{user_id}:last_processed_message_id\"\n", - "\n", - " last_processed_message_id = redis_client.get(key)\n", - " logger.debug(f\"Last processed message ID: {last_processed_message_id}\")\n", - " last_processed_message_id = (\n", - " str(last_processed_message_id) if last_processed_message_id else None\n", - " )\n", - "\n", - " while True:\n", - " try:\n", - " # Get the next state and config from the queue (blocks until an item is available)\n", - " state, config = memory_queue.get()\n", - "\n", - " # Extract long-term memories from the conversation history\n", - " last_processed_message_id = extract_memories(\n", - " last_processed_message_id, state, config\n", - " )\n", - " logger.debug(\n", - " f\"Memory worker extracted memories. Last processed message ID: {last_processed_message_id}\"\n", - " )\n", - "\n", - " if last_processed_message_id:\n", - " logger.debug(\n", - " f\"Setting last processed message ID: {last_processed_message_id}\"\n", - " )\n", - " redis_client.set(key, last_processed_message_id)\n", - "\n", - " # Mark the task as done\n", - " memory_queue.task_done()\n", - " logger.debug(\"Memory extraction completed for queue item\")\n", - " # Wait before processing next item\n", - " time.sleep(interval)\n", - " except Exception as e:\n", - " # Wait before processing next item after an error\n", - " logger.exception(f\"Error in memory worker thread: {e}\")\n", - " time.sleep(backoff_interval)\n", - "\n", - "\n", - "# NOTE: We'll actually start the worker thread later, in the main loop." - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Augmenting Queries with Relevant Memories\n", - "\n", - "For every user interaction with the agent, we'll query for relevant memories and\n", - "add them to the LLM prompt with `retrieve_relevant_memories()`.\n", - "\n", - "**NOTE:** We only run this node in the \"manual\" memory management strategy. If\n", - "using \"tools,\" the LLM will decide when to retrieve memories." - ] - }, - { - "cell_type": "code", - "execution_count": 93, - "metadata": {}, - "outputs": [], - "source": [ - "def retrieve_relevant_memories(\n", - " state: RuntimeState, config: RunnableConfig\n", - ") -> RuntimeState:\n", - " \"\"\"Retrieve relevant memories based on the current conversation.\"\"\"\n", - " if not state[\"messages\"]:\n", - " logger.debug(\"No messages in state\")\n", - " return state\n", - "\n", - " latest_message = state[\"messages\"][-1]\n", - " if not isinstance(latest_message, HumanMessage):\n", - " logger.debug(\"Latest message is not a HumanMessage: \", latest_message)\n", - " return state\n", - "\n", - " user_id = config.get(\"configurable\", {}).get(\"user_id\", SYSTEM_USER_ID)\n", - "\n", - " query = str(latest_message.content)\n", - " relevant_memories = retrieve_memories(\n", - " query=query,\n", - " memory_type=[MemoryType.EPISODIC, MemoryType.SEMANTIC],\n", - " limit=5,\n", - " user_id=user_id,\n", - " distance_threshold=0.3,\n", - " )\n", - "\n", - " logger.debug(f\"All relevant memories: {relevant_memories}\")\n", - "\n", - " # We'll augment the latest human message with the relevant memories.\n", - " if relevant_memories:\n", - " memory_context = \"\\n\\n### Relevant memories from previous conversations:\\n\"\n", - "\n", - " # Group by memory type\n", - " memory_types = {\n", - " MemoryType.EPISODIC: \"User Preferences & History\",\n", - " MemoryType.SEMANTIC: \"Travel Knowledge\",\n", - " }\n", - "\n", - " for mem_type, type_label in memory_types.items():\n", - " memories_of_type = [\n", - " m for m in relevant_memories if m.memory_type == mem_type\n", - " ]\n", - " if memories_of_type:\n", - " memory_context += f\"\\n**{type_label}**:\\n\"\n", - " for mem in memories_of_type:\n", - " memory_context += f\"- {mem.content}\\n\"\n", - "\n", - " augmented_message = HumanMessage(content=f\"{query}\\n{memory_context}\")\n", - " state[\"messages\"][-1] = augmented_message\n", - "\n", - " logger.debug(f\"Augmented message: {augmented_message.content}\")\n", - "\n", - " return state.copy()\n" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "This is the first function we've seen that represents a **node** in the LangGraph\n", - "graph we'll build. As a node representation, this function receives a `state`\n", - "object containing the runtime state of the graph, which is where conversation\n", - "history resides. Its `config` parameter contains data like the user and thread\n", - "IDs.\n", - "\n", - "This will be the starting node in the graph we'll assemble later. When a user\n", - "invokes the graph with a message, the first thing we'll do (when using the\n", - "\"manual\" memory strategy) is augment that message with potentially related\n", - "memories." - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Defining Tools\n", - "\n", - "Now that we have our storage functions defined, we can create **tools**. We'll\n", - "need these to set up our agent in a moment. These tools will only be used when\n", - "the agent is operating in \"tools\" memory management mode." - ] - }, - { - "cell_type": "code", - "execution_count": 94, - "metadata": {}, - "outputs": [], - "source": [ - "from langchain_core.tools import tool\n", - "from typing import Dict, Optional\n", - "\n", - "\n", - "@tool\n", - "def store_memory_tool(\n", - " content: str,\n", - " memory_type: MemoryType,\n", - " metadata: Optional[Dict[str, str]] = None,\n", - " config: Optional[RunnableConfig] = None,\n", - ") -> str:\n", - " \"\"\"\n", - " Store a long-term memory in the system.\n", - "\n", - " Use this tool to save important information about user preferences,\n", - " experiences, or general knowledge that might be useful in future\n", - " interactions.\n", - " \"\"\"\n", - " config = config or RunnableConfig()\n", - " user_id = config.get(\"user_id\", SYSTEM_USER_ID)\n", - " thread_id = config.get(\"thread_id\")\n", - "\n", - " try:\n", - " # Store in long-term memory\n", - " store_memory(\n", - " content=content,\n", - " memory_type=memory_type,\n", - " user_id=user_id,\n", - " thread_id=thread_id,\n", - " metadata=str(metadata) if metadata else None,\n", - " )\n", - "\n", - " return f\"Successfully stored {memory_type} memory: {content}\"\n", - " except Exception as e:\n", - " return f\"Error storing memory: {str(e)}\"\n", - "\n", - "\n", - "@tool\n", - "def retrieve_memories_tool(\n", - " query: str,\n", - " memory_type: List[MemoryType],\n", - " limit: int = 5,\n", - " config: Optional[RunnableConfig] = None,\n", - ") -> str:\n", - " \"\"\"\n", - " Retrieve long-term memories relevant to the query.\n", - "\n", - " Use this tool to access previously stored information about user\n", - " preferences, experiences, or general knowledge.\n", - " \"\"\"\n", - " config = config or RunnableConfig()\n", - " user_id = config.get(\"user_id\", SYSTEM_USER_ID)\n", - "\n", - " try:\n", - " # Get long-term memories\n", - " stored_memories = retrieve_memories(\n", - " query=query,\n", - " memory_type=memory_type,\n", - " user_id=user_id,\n", - " limit=limit,\n", - " distance_threshold=0.3,\n", - " )\n", - "\n", - " # Format the response\n", - " response = []\n", - "\n", - " if stored_memories:\n", - " response.append(\"Long-term memories:\")\n", - " for memory in stored_memories:\n", - " response.append(f\"- [{memory.memory_type}] {memory.content}\")\n", - "\n", - " return \"\\n\".join(response) if response else \"No relevant memories found.\"\n", - "\n", - " except Exception as e:\n", - " return f\"Error retrieving memories: {str(e)}\"" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Creating the Agent\n", - "\n", - "Because we're using different LLM objects configured for different purposes and\n", - "a prebuilt ReAct agent, we need a node that invokes the agent and returns the\n", - "response. But before we can invoke the agent, we need to set it up. This will\n", - "involve defining the tools the agent will need." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "import json\n", - "from typing import Dict, List, Optional, Tuple, Union\n", - "\n", - "from langchain_community.tools.tavily_search import TavilySearchResults\n", - "from langchain_core.callbacks.manager import CallbackManagerForToolRun\n", - "from langchain_core.messages import AIMessage, AIMessageChunk, SystemMessage\n", - "from langgraph.prebuilt.chat_agent_executor import create_react_agent\n", - "from langgraph.checkpoint.redis import RedisSaver\n", - "\n", - "\n", - "class CachingTavilySearchResults(TavilySearchResults):\n", - " \"\"\"\n", - " An interface to Tavily search that caches results in Redis.\n", - " \n", - " Caching the results of the web search allows us to avoid rate limiting,\n", - " improve latency, and reduce costs.\n", - " \"\"\"\n", - "\n", - " def _run(\n", - " self,\n", - " query: str,\n", - " run_manager: Optional[CallbackManagerForToolRun] = None,\n", - " ) -> Tuple[Union[List[Dict[str, str]], str], Dict]:\n", - " \"\"\"Use the tool.\"\"\"\n", - " cache_key = f\"tavily_search:{query}\"\n", - " cached_result: Optional[str] = redis_client.get(cache_key) # type: ignore\n", - " if cached_result:\n", - " return json.loads(cached_result), {}\n", - " else:\n", - " result, raw_results = super()._run(query, run_manager)\n", - " redis_client.set(cache_key, json.dumps(result), ex=60 * 60)\n", - " return result, raw_results\n", - "\n", - "\n", - "# Create a checkpoint saver for short-term memory. This keeps track of the\n", - "# conversation history for each thread. Later, we'll continually summarize the\n", - "# conversation history to keep the context window manageable, while we also\n", - "# extract long-term memories from the conversation history to store in the\n", - "# long-term memory index.\n", - "redis_saver = RedisSaver(redis_client=redis_client)\n", - "redis_saver.setup()\n", - "\n", - "# Configure an LLM for the agent with a more creative temperature.\n", - "llm = ChatOpenAI(model=\"gpt-4o\", temperature=0.7)\n", - "\n", - "\n", - "# Uncomment these lines if you have a Tavily API key and want to use the web\n", - "# search tool. The agent is much more useful with this tool.\n", - "# web_search_tool = CachingTavilySearchResults(max_results=2)\n", - "# base_tools = [web_search_tool]\n", - "base_tools = []\n", - "\n", - "if memory_strategy == MemoryStrategy.TOOLS:\n", - " tools = base_tools + [store_memory_tool, retrieve_memories_tool]\n", - "elif memory_strategy == MemoryStrategy.MANUAL:\n", - " tools = base_tools\n", - "\n", - "\n", - "travel_agent = create_react_agent(\n", - " model=llm,\n", - " tools=tools,\n", - " checkpointer=redis_saver, # Short-term memory: the conversation history\n", - " prompt=SystemMessage(\n", - " content=\"\"\"\n", - " You are a travel assistant helping users plan their trips. You remember user preferences\n", - " and provide personalized recommendations based on past interactions.\n", - " \n", - " You have access to the following types of memory:\n", - " 1. Short-term memory: The current conversation thread\n", - " 2. Long-term memory: \n", - " - Episodic: User preferences and past trip experiences (e.g., \"User prefers window seats\")\n", - " - Semantic: General knowledge about travel destinations and requirements\n", - " \n", - " Your procedural knowledge (how to search, book flights, etc.) is built into your tools and prompts.\n", - " \n", - " Always be helpful, personal, and context-aware in your responses.\n", - " \"\"\"\n", - " ),\n", - ")" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Responding to the User\n", - "\n", - "Now we can write our node that invokes the agent and responds to the user:" - ] - }, - { - "cell_type": "code", - "execution_count": 96, - "metadata": {}, - "outputs": [], - "source": [ - "def respond_to_user(state: RuntimeState, config: RunnableConfig) -> RuntimeState:\n", - " \"\"\"Invoke the travel agent to generate a response.\"\"\"\n", - " human_messages = [m for m in state[\"messages\"] if isinstance(m, HumanMessage)]\n", - " if not human_messages:\n", - " logger.warning(\"No HumanMessage found in state\")\n", - " return state\n", - "\n", - " try:\n", - " for result in travel_agent.stream(\n", - " {\"messages\": state[\"messages\"]}, config=config, stream_mode=\"messages\"\n", - " ):\n", - " result_messages = result.get(\"messages\", [])\n", - "\n", - " ai_messages = [\n", - " m\n", - " for m in result_messages\n", - " if isinstance(m, AIMessage) or isinstance(m, AIMessageChunk)\n", - " ]\n", - " if ai_messages:\n", - " agent_response = ai_messages[-1]\n", - " # Append only the agent's response to the original state\n", - " state[\"messages\"].append(agent_response)\n", - "\n", - " except Exception as e:\n", - " logger.error(f\"Error invoking travel agent: {e}\")\n", - " agent_response = AIMessage(\n", - " content=\"I'm sorry, I encountered an error processing your request.\"\n", - " )\n", - " return state" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Summarizing Conversation History\n", - "\n", - "We've been focusing on long-term memory, but let's bounce back to short-term\n", - "memory for a moment. With `RedisSaver`, LangGraph will manage our message\n", - "history automatically. Still, the message history will continue to grow\n", - "indefinitely, until it overwhelms the LLM's token context window.\n", - "\n", - "To solve this problem, we'll add a node to the graph that summarizes the\n", - "conversation if it's grown past a threshold." - ] - }, - { - "cell_type": "code", - "execution_count": 97, - "metadata": {}, - "outputs": [], - "source": [ - "from langchain_core.messages import RemoveMessage\n", - "\n", - "# An LLM configured for summarization.\n", - "summarizer = ChatOpenAI(model=\"gpt-4o\", temperature=0.3)\n", - "\n", - "# The number of messages after which we'll summarize the conversation.\n", - "MESSAGE_SUMMARIZATION_THRESHOLD = 10\n", - "\n", - "\n", - "def summarize_conversation(\n", - " state: RuntimeState, config: RunnableConfig\n", - ") -> Optional[RuntimeState]:\n", - " \"\"\"\n", - " Summarize a list of messages into a concise summary to reduce context length\n", - " while preserving important information.\n", - " \"\"\"\n", - " messages = state[\"messages\"]\n", - " current_message_count = len(messages)\n", - " if current_message_count < MESSAGE_SUMMARIZATION_THRESHOLD:\n", - " logger.debug(f\"Not summarizing conversation: {current_message_count}\")\n", - " return state\n", - "\n", - " system_prompt = \"\"\"\n", - " You are a conversation summarizer. Create a concise summary of the previous\n", - " conversation between a user and a travel assistant.\n", - " \n", - " The summary should:\n", - " 1. Highlight key topics, preferences, and decisions\n", - " 2. Include any specific trip details (destinations, dates, preferences)\n", - " 3. Note any outstanding questions or topics that need follow-up\n", - " 4. Be concise but informative\n", - " \n", - " Format your summary as a brief narrative paragraph.\n", - " \"\"\"\n", - "\n", - " message_content = \"\\n\".join(\n", - " [\n", - " f\"{'User' if isinstance(msg, HumanMessage) else 'Assistant'}: {msg.content}\"\n", - " for msg in messages\n", - " ]\n", - " )\n", - "\n", - " # Invoke the summarizer\n", - " summary_messages = [\n", - " SystemMessage(content=system_prompt),\n", - " HumanMessage(\n", - " content=f\"Please summarize this conversation:\\n\\n{message_content}\"\n", - " ),\n", - " ]\n", - "\n", - " summary_response = summarizer.invoke(summary_messages)\n", - "\n", - " logger.info(f\"Summarized {len(messages)} messages into a conversation summary\")\n", - "\n", - " summary_message = SystemMessage(\n", - " content=f\"\"\"\n", - " Summary of the conversation so far:\n", - " \n", - " {summary_response.content}\n", - " \n", - " Please continue the conversation based on this summary and the recent messages.\n", - " \"\"\"\n", - " )\n", - " remove_messages = [\n", - " RemoveMessage(id=msg.id) for msg in messages if msg.id is not None\n", - " ]\n", - "\n", - " state[\"messages\"] = [ # type: ignore\n", - " *remove_messages,\n", - " summary_message,\n", - " state[\"messages\"][-1],\n", - " ]\n", - "\n", - " return state.copy()" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Assembling the Graph\n", - "\n", - "It's time to assemble our graph!" - ] - }, - { - "cell_type": "code", - "execution_count": 98, - "metadata": {}, - "outputs": [], - "source": [ - "from langgraph.graph import StateGraph, END, START\n", - "\n", - "\n", - "workflow = StateGraph(RuntimeState)\n", - "\n", - "workflow.add_node(\"respond\", respond_to_user)\n", - "workflow.add_node(\"summarize_conversation\", summarize_conversation)\n", - "\n", - "if memory_strategy == MemoryStrategy.MANUAL:\n", - " # In manual memory mode, we'll retrieve relevant memories before\n", - " # responding to the user, and then augment the user's message with the\n", - " # relevant memories.\n", - " workflow.add_node(\"retrieve_memories\", retrieve_relevant_memories)\n", - " workflow.add_edge(START, \"retrieve_memories\")\n", - " workflow.add_edge(\"retrieve_memories\", \"respond\")\n", - "else:\n", - " # In tool-calling mode, we'll respond to the user and let the LLM\n", - " # decide when to retrieve and store memories, using tool calls.\n", - " workflow.add_edge(START, \"respond\")\n", - "\n", - "# Regardless of memory strategy, we'll summarize the conversation after\n", - "# responding to the user, to keep the context window manageable.\n", - "workflow.add_edge(\"respond\", \"summarize_conversation\")\n", - "workflow.add_edge(\"summarize_conversation\", END)\n", - "\n", - "# Finally, compile the graph.\n", - "graph = workflow.compile(checkpointer=redis_saver)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Consolidating Memories in a Background Thread\n", - "\n", - "We're almost ready to create the main loop that runs our graph. First, though,\n", - "let's create a worker that consolidates similar memories on a regular schedule,\n", - "using semantic search. We'll run the worker in a background thread later, in the\n", - "main loop." - ] - }, - { - "cell_type": "code", - "execution_count": 99, - "metadata": {}, - "outputs": [], - "source": [ - "from redisvl.query import FilterQuery\n", - "\n", - "\n", - "def consolidate_memories(user_id: str, batch_size: int = 10):\n", - " \"\"\"\n", - " Periodically merge similar long-term memories for a user.\n", - " \"\"\"\n", - " logger.info(f\"Starting memory consolidation for user {user_id}\")\n", - " \n", - " # For each memory type, consolidate separately\n", - "\n", - " for memory_type in MemoryType:\n", - " all_memories = []\n", - "\n", - " # Get all memories of this type for the user\n", - " of_type_for_user = (Tag(\"user_id\") == user_id) & (\n", - " Tag(\"memory_type\") == memory_type\n", - " )\n", - " filter_query = FilterQuery(filter_expression=of_type_for_user)\n", - " \n", - " for batch in long_term_memory_index.paginate(filter_query, page_size=batch_size):\n", - " all_memories.extend(batch)\n", - " \n", - " all_memories = long_term_memory_index.query(filter_query)\n", - " if not all_memories:\n", - " continue\n", - "\n", - " # Group similar memories\n", - " processed_ids = set()\n", - " for memory in all_memories:\n", - " if memory[\"id\"] in processed_ids:\n", - " continue\n", - "\n", - " memory_embedding = memory[\"embedding\"]\n", - " vector_query = VectorRangeQuery(\n", - " vector=memory_embedding,\n", - " num_results=10,\n", - " vector_field_name=\"embedding\",\n", - " filter_expression=of_type_for_user\n", - " & (Tag(\"memory_id\") != memory[\"memory_id\"]),\n", - " distance_threshold=0.1,\n", - " return_fields=[\n", - " \"content\",\n", - " \"metadata\",\n", - " ],\n", - " )\n", - " similar_memories = long_term_memory_index.query(vector_query)\n", - "\n", - " # If we found similar memories, consolidate them\n", - " if similar_memories:\n", - " combined_content = memory[\"content\"]\n", - " combined_metadata = memory[\"metadata\"]\n", - "\n", - " if combined_metadata:\n", - " try:\n", - " combined_metadata = json.loads(combined_metadata)\n", - " except Exception as e:\n", - " logger.error(f\"Error parsing metadata: {e}\")\n", - " combined_metadata = {}\n", - "\n", - " for similar in similar_memories:\n", - " # Merge the content of similar memories\n", - " combined_content += f\" {similar['content']}\"\n", - "\n", - " if similar[\"metadata\"]:\n", - " try:\n", - " similar_metadata = json.loads(similar[\"metadata\"])\n", - " except Exception as e:\n", - " logger.error(f\"Error parsing metadata: {e}\")\n", - " similar_metadata = {}\n", - "\n", - " combined_metadata = {**combined_metadata, **similar_metadata}\n", - "\n", - " # Create a consolidated memory\n", - " new_metadata = {\n", - " \"consolidated\": True,\n", - " \"source_count\": len(similar_memories) + 1,\n", - " **combined_metadata,\n", - " }\n", - " consolidated_memory = {\n", - " \"content\": summarize_memories(combined_content, memory_type),\n", - " \"memory_type\": memory_type.value,\n", - " \"metadata\": json.dumps(new_metadata),\n", - " \"user_id\": user_id,\n", - " }\n", - "\n", - " # Delete the old memories\n", - " delete_memory(memory[\"id\"])\n", - " for similar in similar_memories:\n", - " delete_memory(similar[\"id\"])\n", - "\n", - " # Store the new consolidated memory\n", - " store_memory(\n", - " content=consolidated_memory[\"content\"],\n", - " memory_type=memory_type,\n", - " user_id=user_id,\n", - " metadata=consolidated_memory[\"metadata\"],\n", - " )\n", - "\n", - " logger.info(\n", - " f\"Consolidated {len(similar_memories) + 1} memories into one\"\n", - " )\n", - "\n", - "\n", - "def delete_memory(memory_id: str):\n", - " \"\"\"Delete a memory from Redis\"\"\"\n", - " try:\n", - " result = long_term_memory_index.drop_keys([memory_id])\n", - " except Exception as e:\n", - " logger.error(f\"Deleting memory {memory_id} failed: {e}\")\n", - " if result == 0:\n", - " logger.debug(f\"Deleting memory {memory_id} failed: memory not found\")\n", - " else:\n", - " logger.info(f\"Deleted memory {memory_id}\")\n", - "\n", - "\n", - "def summarize_memories(combined_content: str, memory_type: MemoryType) -> str:\n", - " \"\"\"Use the LLM to create a concise summary of similar memories\"\"\"\n", - " try:\n", - " system_prompt = f\"\"\"\n", - " You are a memory consolidation assistant. Your task is to create a single, \n", - " concise memory from these similar memory fragments. The new memory should\n", - " be a {memory_type.value} memory.\n", - " \n", - " Combine the information without repetition while preserving all important details.\n", - " \"\"\"\n", - "\n", - " messages = [\n", - " SystemMessage(content=system_prompt),\n", - " HumanMessage(\n", - " content=f\"Consolidate these similar memories into one:\\n\\n{combined_content}\"\n", - " ),\n", - " ]\n", - "\n", - " response = summarizer.invoke(messages)\n", - " return str(response.content)\n", - " except Exception as e:\n", - " logger.error(f\"Error summarizing memories: {e}\")\n", - " # Fall back to just using the combined content\n", - " return combined_content\n", - "\n", - "\n", - "def memory_consolidation_worker(user_id: str):\n", - " \"\"\"\n", - " Worker that periodically consolidates memories for the active user.\n", - "\n", - " NOTE: In production, this would probably use a background task framework, such\n", - " as rq or Celery, and run on a schedule.\n", - " \"\"\"\n", - " while True:\n", - " try:\n", - " consolidate_memories(user_id)\n", - " # Run every 10 minutes\n", - " time.sleep(10 * 60)\n", - " except Exception as e:\n", - " logger.exception(f\"Error in memory consolidation worker: {e}\")\n", - " # If there's an error, wait an hour and try again\n", - " time.sleep(60 * 60)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## The Main Loop\n", - "\n", - "Now we can put everything together and run the main loop.\n", - "\n", - "Running this cell should ask for your OpenAI and Tavily keys, then a username\n", - "and thread ID. You'll enter a loop in which you can enter queries and see\n", - "responses from the agent printed below the following cell." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "import threading\n", - "\n", - "\n", - "def main(thread_id: str = \"book_flight\", user_id: str = \"demo_user\"):\n", - " \"\"\"Main interaction loop for the travel agent\"\"\"\n", - " print(\"Welcome to the Travel Assistant! (Type 'exit' to quit)\")\n", - "\n", - " config = RunnableConfig(configurable={\"thread_id\": thread_id, \"user_id\": user_id})\n", - " state = RuntimeState(messages=[])\n", - "\n", - " # If we're using the manual memory strategy, we need to create a queue for\n", - " # memory processing and start a worker thread. After every 'round' of a\n", - " # conversation, the main loop will add the current state and config to the\n", - " # queue for memory processing.\n", - " if memory_strategy == MemoryStrategy.MANUAL:\n", - " # Create a queue for memory processing\n", - " memory_queue = Queue()\n", - "\n", - " # Start a worker thread that will process memory extraction tasks\n", - " memory_thread = threading.Thread(\n", - " target=memory_worker, args=(memory_queue, user_id), daemon=True\n", - " )\n", - " memory_thread.start()\n", - "\n", - " # We always run consolidation in the background, regardless of memory strategy.\n", - " consolidation_thread = threading.Thread(\n", - " target=memory_consolidation_worker, args=(user_id,), daemon=True\n", - " )\n", - " consolidation_thread.start()\n", - "\n", - " while True:\n", - " user_input = input(\"\\nYou (type 'quit' to quit): \")\n", - "\n", - " if not user_input:\n", - " continue\n", - "\n", - " if user_input.lower() in [\"exit\", \"quit\"]:\n", - " print(\"Thank you for using the Travel Assistant. Goodbye!\")\n", - " break\n", - "\n", - " state[\"messages\"].append(HumanMessage(content=user_input))\n", - "\n", - " try:\n", - " # Process user input through the graph\n", - " for result in graph.stream(state, config=config, stream_mode=\"values\"):\n", - " state = RuntimeState(**result)\n", - "\n", - " logger.debug(f\"# of messages after run: {len(state['messages'])}\")\n", - "\n", - " # Find the most recent AI message, so we can print the response\n", - " ai_messages = [m for m in state[\"messages\"] if isinstance(m, AIMessage)]\n", - " if ai_messages:\n", - " message = ai_messages[-1].content\n", - " else:\n", - " logger.error(\"No AI messages after run\")\n", - " message = \"I'm sorry, I couldn't process your request properly.\"\n", - " # Add the error message to the state\n", - " state[\"messages\"].append(AIMessage(content=message))\n", - "\n", - " print(f\"\\nAssistant: {message}\")\n", - "\n", - " # Add the current state to the memory processing queue\n", - " if memory_strategy == MemoryStrategy.MANUAL:\n", - " memory_queue.put((state.copy(), config))\n", - "\n", - " except Exception as e:\n", - " logger.exception(f\"Error processing request: {e}\")\n", - " error_message = \"I'm sorry, I encountered an error processing your request.\"\n", - " print(f\"\\nAssistant: {error_message}\")\n", - " # Add the error message to the state\n", - " state[\"messages\"].append(AIMessage(content=error_message))\n", - "\n", - "\n", - "try:\n", - " user_id = input(\"Enter a user ID: \") or \"demo_user\"\n", - " thread_id = input(\"Enter a thread ID: \") or \"demo_thread\"\n", - "except Exception:\n", - " # If we're running in CI, we don't have a terminal to input from, so just exit\n", - " exit()\n", - "else:\n", - " main(thread_id, user_id)\n" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## That's a Wrap!\n", - "\n", - "Want to make your own agent? Try the [LangGraph Quickstart](https://langchain-ai.github.io/langgraph/tutorials/introduction/). Then add our [Redis checkpointer](https://github.com/redis-developer/langgraph-redis) to give your agent fast, persistent memory!" - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": "env", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.11.11" - } - }, - "nbformat": 4, - "nbformat_minor": 2 + "nbformat": 4, + "nbformat_minor": 0 } diff --git a/python-recipes/agents/resources/long-term-memory.png b/python-recipes/agents/resources/long-term-memory.png new file mode 100644 index 00000000..855288a1 Binary files /dev/null and b/python-recipes/agents/resources/long-term-memory.png differ diff --git a/python-recipes/agents/resources/memory-agents.png b/python-recipes/agents/resources/memory-agents.png new file mode 100644 index 00000000..975d1cb7 Binary files /dev/null and b/python-recipes/agents/resources/memory-agents.png differ diff --git a/python-recipes/agents/resources/short-term-memory.png b/python-recipes/agents/resources/short-term-memory.png new file mode 100644 index 00000000..1fc555cf Binary files /dev/null and b/python-recipes/agents/resources/short-term-memory.png differ