diff --git a/.gitignore b/.gitignore index 97f3ac2b..b55a82a2 100644 --- a/.gitignore +++ b/.gitignore @@ -149,4 +149,8 @@ lychee-report.md # Notebook validation .notebook_validation_state.json .notebook_validation_checkpoint.json -validation_report_*.md \ No newline at end of file +validation_report_*.md +# Memory tool demo artifacts +tool_use/demo_memory/ +tool_use/memory_storage/ +tool_use/.env diff --git a/tool_use/.env.example b/tool_use/.env.example new file mode 100644 index 00000000..333d0a50 --- /dev/null +++ b/tool_use/.env.example @@ -0,0 +1,14 @@ +# Anthropic API Configuration +# Copy this file to .env and fill in your actual values + +# Your Anthropic API key from https://console.anthropic.com/ +ANTHROPIC_API_KEY=your_api_key_here + +# Model name - Use a model that supports memory_20250818 tool +# Supported models (as of launch): +# - claude-sonnet-4-20250514 +# - claude-opus-4-20250514 +# - claude-opus-4-1-20250805 +# - claude-sonnet-4-5-20250929 + +ANTHROPIC_MODEL=claude-sonnet-4-5-20250929 \ No newline at end of file diff --git a/tool_use/memory_cookbook.ipynb b/tool_use/memory_cookbook.ipynb index 4a687715..30dfdba5 100644 --- a/tool_use/memory_cookbook.ipynb +++ b/tool_use/memory_cookbook.ipynb @@ -4,1077 +4,1206 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "## Self-Managed LLM Memory" + "# Memory & Context Management with Claude Sonnet 4.5\n", + "\n", + "Learn how to build AI agents that learn and improve across conversations using Claude's memory tool and context editing capabilities." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "### Table of Contents\n", - "\n", - "- [Introduction](#introduction)\n", - "- [Getting Started](#getting-started)\n", - "- [Memory Implementations](#memory-implementations)\n", - " - [Basic Memory](#implementation-1-simple-memory-tool)\n", - " - [Compactify Memory](#implementation-2-compactify-memory)\n", - " - [File-Based Memory](#implementation-3-file-based-memory)\n", - "- [Basic Evaluations](#basic-evaluations)\n", - "- [Future Work](#future-work)" + "## Table of Contents\n", + "\n", + "1. [Introduction: Why Memory Matters](#introduction)\n", + "2. [Use Cases](#use-cases)\n", + "3. [Quick Start Examples](#quick-start)\n", + "4. [How It Works](#how-it-works)\n", + "5. [Code Review Assistant Demo](#demo)\n", + "6. [Real-World Applications](#real-world)\n", + "7. [Best Practices](#best-practices)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "### Introduction\n", + "## Setup\n", + "\n", + "### For VSCode Users\n", "\n", - "Managing memory effectively is a critical part of building agents and agentic workflows that handle long-horizon tasks. In this cookbook we demonstrate a few different strategies for \"self-managed\" (LLM-managed) memory. Use this notebook as a starting point for your own memory implementations. We do not expect that memory tools are one-size-fits-all, and further believe that different domains/tasks necessarily lend themselves to more or less rigid memory scaffolding. The Claude 4 model family has proven to be particularly strong at utilizing [memory tooling](https://www.anthropic.com/news/claude-4#:~:text=more%20on%20methodology.-,Model%20improvements,-In%20addition%20to), and we're excited to see how teams extend the ideas below.\n", + "```bash\n", + "# 1. Create virtual environment\n", + "python -m venv .venv\n", "\n", + "# 2. Activate it\n", + "source .venv/bin/activate # macOS/Linux\n", + "# or: .venv\\Scripts\\activate # Windows\n", "\n", - "#### Why do we need to manage memory?\n", + "# 3. Install dependencies\n", + "pip install -r requirements.txt\n", "\n", - "LLMs have finite context windows (200k tokens for Claude 4 Sonnet & Opus). This means that for any request, if the sum of prompt tokens and output tokens exceeds the model’s context window, the system will return a validation error. As many teams building with LLMs quickly learn, there is additional complexity in identifying and working within the *effective* [context window](https://docs.claude.com/en/docs/build-with-claude/context-windows) of an LLM. See our tips for [long context prompting](https://docs.claude.com/en/docs/build-with-claude/prompt-engineering/long-context-tips) to learn more about effective context windows and best practices.\n", + "# 4. In VSCode: Select .venv as kernel (top right)\n", + "```\n", "\n", - "In addition to the above, memory is important for the following reasons:\n", - "- **Long context windows are computationally expensive:** Attention mechanisms scale quadratically—doubling context length quadruples compute cost. Most tasks only need a small fraction of available context, making it wasteful to process millions of irrelevant tokens. This is why humans don't memorize entire textbooks; we take notes and build mental models instead.\n", - "- **More efficient processing:** When LLMs write and maintain their own notes—saving successful strategies, key insights, and relevant context—they're effectively updating their capabilities in real-time without retraining. Models that excel at these operations can maintain coherent behavior over extremely long time horizons while using only a fraction of the computational resources required for full context windows.\n", + "### API Key\n", + "\n", + "```bash\n", + "cp .env.example .env\n", + "# Edit .env and add your ANTHROPIC_API_KEY\n", + "```\n", "\n", - "Successfully building LLM-based systems is an exercise in discarding the unnecessary tokens and efficiently storing + retrieving the relevant tokens for the task at-hand." + "Get your API key from: https://console.anthropic.com/\n" ] }, { "cell_type": "markdown", "metadata": {}, - "source": [ - "### Getting Started" - ] + "source": "## 1. Introduction: Why Memory Matters {#introduction}\n\nThis cookbook demonstrates practical implementations of the context engineering patterns described in [Effective context engineering for AI agents](https://www.anthropic.com/engineering/effective-context-engineering-for-ai-agents). That post covers why context is a finite resource, how attention budgets work, and strategies for building effective agents—the techniques you'll see in action here.\n\n### The Problem\n\nLarge language models have finite context windows (200k tokens for Claude 4). While this seems large, several challenges emerge:\n\n- **Context limits**: Long conversations or complex tasks can exceed available context\n- **Computational cost**: Processing large contexts is expensive - attention mechanisms scale quadratically\n- **Repeated patterns**: Similar tasks across conversations require re-explaining context every time\n- **Information loss**: When context fills up, earlier important information gets lost\n\n### The Solution\n\nClaude Sonnet 4.5 introduces two powerful capabilities:\n\n1. **Memory Tool** (`memory_20250818`): Enables cross-conversation learning\n - Claude can write down what it learns for future reference\n - File-based system under `/memories` directory\n - Client-side implementation gives you full control\n\n2. **Context Editing** (`clear_tool_uses_20250919`): Automatically manages context\n - Clears old tool results when context grows large\n - Keeps recent context while preserving memory\n - Configurable triggers and retention policies\n\n### The Benefit\n\nBuild AI agents that **get better at your specific tasks over time**:\n\n- **Session 1**: Claude solves a problem, writes down the pattern\n- **Session 2**: Claude applies the learned pattern immediately (faster!)\n- **Long sessions**: Context editing keeps conversations manageable\n\nThink of it as giving Claude a notebook to take notes and refer back to - just like humans do." }, { - "cell_type": "code", - "execution_count": null, + "cell_type": "markdown", "metadata": {}, - "outputs": [], "source": [ - "# install deps\n", - "%pip install -q -U anthropic python-dotenv nest_asyncio PyPDF2" + "## 2. Use Cases {#use-cases}\n", + "\n", + "Memory and context management enable powerful new workflows:\n", + "\n", + "### 🔍 Code Review Assistant\n", + "- Learns debugging patterns from past reviews\n", + "- Recognizes similar bugs instantly in future sessions\n", + "- Builds team-specific code quality knowledge\n", + "- **Production ready**: Integrate with [claude-code-action](https://github.com/anthropics/claude-code-action) for GitHub PR reviews\n", + "\n", + "### 📚 Research Assistant\n", + "- Accumulates knowledge on topics over multiple sessions\n", + "- Connects insights across different research threads\n", + "- Maintains bibliography and source tracking\n", + "\n", + "### 💬 Customer Support Bot\n", + "- Learns user preferences and communication style\n", + "- Remembers common issues and solutions\n", + "- Builds product knowledge base from interactions\n", + "\n", + "### 📊 Data Analysis Helper\n", + "- Remembers dataset patterns and anomalies\n", + "- Stores analysis techniques that work well\n", + "- Builds domain-specific insights over time\n", + "\n", + "**Supported Models**: Claude Opus 4 (`claude-opus-4-20250514`), Claude Opus 4.1 (`claude-opus-4-1-20250805`), Claude Sonnet 4 (`claude-sonnet-4-20250514`), and Claude Sonnet 4.5 (`claude-sonnet-4-5-20250929`)\n", + "\n", + "**This cookbook focuses on the Code Review Assistant** as it clearly demonstrates both memory (learning patterns) and context editing (handling long reviews)." ] }, { - "cell_type": "code", - "execution_count": 36, + "cell_type": "markdown", "metadata": {}, - "outputs": [], "source": [ - "# env setup\n", - "from anthropic import Anthropic\n", - "from dotenv import load_dotenv\n", - "import os\n", - "\n", - "# api key must be in .env file in project\n", - "load_dotenv()\n", - "if os.getenv(\"ANTHROPIC_API_KEY\") is None:\n", - " raise ValueError(\"ANTHROPIC_API_KEY not found in .env file\")\n", + "## 3. Quick Start Examples {#quick-start}\n", "\n", - "client = Anthropic()" + "Let's see memory and context management in action with simple examples." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "Clone the agents quickstart implementation\n", + "### Setup\n", "\n", - "We are going to use some of the core work from the agents quickstart implementation which can be found [here](https://github.com/anthropics/anthropic-quickstarts/tree/main/agents)." + "First, install dependencies and configure your environment:" ] }, { "cell_type": "code", - "execution_count": 37, + "execution_count": 1, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ - "Repository already exists at /tmp/anthropic-quickstarts\n" + "\n", + "\u001b[1m[\u001b[0m\u001b[34;49mnotice\u001b[0m\u001b[1;39;49m]\u001b[0m\u001b[39;49m A new release of pip is available: \u001b[0m\u001b[31;49m24.0\u001b[0m\u001b[39;49m -> \u001b[0m\u001b[32;49m25.2\u001b[0m\n", + "\u001b[1m[\u001b[0m\u001b[34;49mnotice\u001b[0m\u001b[1;39;49m]\u001b[0m\u001b[39;49m To update, run: \u001b[0m\u001b[32;49mpip install --upgrade pip\u001b[0m\n", + "Note: you may need to restart the kernel to use updated packages.\n" ] } ], "source": [ - "import sys \n", - "import os\n", + "# Install required packages\n", + "# Option 1: From requirements.txt\n", + "# %pip install -q -r requirements.txt\n", "\n", - "# Check if the repo already exists\n", - "if not os.path.exists('/tmp/anthropic-quickstarts'):\n", - " # Clone the agents quickstart implementation\n", - " !git clone https://github.com/anthropics/anthropic-quickstarts.git /tmp/anthropic-quickstarts\n", - "else:\n", - " print(\"Repository already exists at /tmp/anthropic-quickstarts\")\n", - "\n", - "# IMPORTANT: Insert at the beginning of sys.path to override any existing 'agents' modules\n", - "if '/tmp/anthropic-quickstarts' not in sys.path:\n", - " sys.path.insert(0, '/tmp/anthropic-quickstarts')\n", - "\n", - "# Clear any cached imports of 'agents' module\n", - "if 'agents' in sys.modules:\n", - " del sys.modules['agents']\n", - "if 'agents.agent' in sys.modules:\n", - " del sys.modules['agents.agent']" + "# Option 2: Direct install\n", + "%pip install -q anthropic python-dotenv ipykernel\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "Confirm the agents repo import works as expected." + "**⚠️ Important**: Create a `.env` file in this directory:\n", + "\n", + "```bash\n", + "# Copy .env.example to .env and add your API key\n", + "cp .env.example .env\n", + "```\n", + "\n", + "Then edit `.env` to add your Anthropic API key from https://console.anthropic.com/" ] }, { "cell_type": "code", - "execution_count": 38, + "execution_count": 3, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ - "*eye roll* Another laptop crisis. What's it doing? Singing off-key? Refusing to work unless you feed it cookies? Details, please.\n" + "✓ API key loaded\n", + "✓ Using model: claude-sonnet-4-5-20250929\n" ] } ], "source": [ - "import nest_asyncio\n", - "nest_asyncio.apply()\n", + "import os\n", + "from typing import Any, cast\n", "\n", - "from agents.agent import Agent\n", + "from anthropic import Anthropic\n", + "from dotenv import load_dotenv\n", "\n", - "agent = Agent(\n", - " name=\"MyAgent\",\n", - " system=\"You are an extremely cynical, snarky, and quick-witted customer support agent. Provide short responses to user queries.\",\n", - ")\n", + "# Load environment variables\n", + "load_dotenv()\n", + "\n", + "API_KEY = os.getenv(\"ANTHROPIC_API_KEY\")\n", + "MODEL = os.getenv(\"ANTHROPIC_MODEL\")\n", + "\n", + "if not API_KEY:\n", + " raise ValueError(\n", + " \"ANTHROPIC_API_KEY not found. \"\n", + " \"Copy .env.example to .env and add your API key.\"\n", + " )\n", + "\n", + "if not MODEL:\n", + " raise ValueError(\n", + " \"ANTHROPIC_MODEL not found. \"\n", + " \"Copy .env.example to .env and set the model.\"\n", + " )\n", + "\n", + "MODEL = cast(str, MODEL)\n", "\n", - "response = agent.run(\"I'm having issues with my laptop. Can you help me?\")\n", - "print(response.content[0].text)\n" + "client = Anthropic(api_key=API_KEY)\n", + "\n", + "print(\"✓ API key loaded\")\n", + "print(f\"✓ Using model: {MODEL}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "### Implementation 1: Simple Memory Tool\n", + "### Example 1: Basic Memory Usage\n", "\n", - "*This implementation is a reflection of our agents quickstarts repo [here](https://github.com/anthropics/anthropic-quickstarts/tree/main/agents/tools). For more information on tool use, see the Claude API tools [docs](https://docs.claude.com/en/docs/build-with-claude/tool-use/overview).*\n", + "Let's see Claude use memory to store information for future reference." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "**Helper Functions**\n", "\n", - "The `SimpleMemory()` tool gives the model a scratchpad to manage memory. This is maintained as a single string that can be read or updated.\n", + "These examples use helper functions from `demo_helpers.py`:\n", "\n", - "Here we've defined the `read`, `write`, and `edit` actions. Explicitly defining `read` means the model won't have access to the full contents of memory at every turn. We recommend that if you follow this pattern you introduce a separate, shortened summary or metadata object describing the contents of memory and include that in every request (ideally preventing excessive reads).\n", + "- **`run_conversation_loop()`**: Handles the API conversation loop\n", + " - Calls Claude's API with memory tool enabled\n", + " - Executes tool uses (memory operations)\n", + " - Continues until Claude stops using tools\n", + " - Returns the final response\n", "\n", + "- **`run_conversation_turn()`**: Single turn (used in Example 3)\n", + " - Same as above but returns after one API call\n", + " - Useful when you need fine-grained control\n", "\n", - "When would you use this?\n", + "- **`print_context_management_info()`**: Displays context clearing stats\n", + " - Shows tokens saved, tool uses cleared\n", + " - Helps visualize when context editing triggers" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "**⚠️ Note on Memory Clearing**\n", "\n", - "You want to quickly spin up a memory experiment or augment an existing long-context task. Start here if you don't have high conviction around the types of items that need to be stored or if the agent must support many interaction types.\n", + "The following cell clears all memory files to provide a clean slate for this demonstration. This is useful for running the notebook multiple times to see consistent results.\n", "\n", - "General Notes on Tool Use: \n", - "- Your tool descriptions should be clear and sufficiently detailed. The best way to guide model behavior around tools is by providing direction as to when / under what conditions tools should be used. \n", - "- If you find that a task requires the agent or workflow manage many (~20+) tools, you may find better performance by introducing a higher level delegation step to route the task to a specialized LLM-step designed around a smaller, logically coupled subset of tools." + "**In production applications**, you should carefully consider whether to clear all memory, as it permanently removes learned patterns. Consider using selective deletion or organizing memory into project-specific directories instead." ] }, { "cell_type": "code", - "execution_count": 39, + "execution_count": 3, "metadata": {}, - "outputs": [], + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "🧹 Clearing previous memories...\n", + "✓ Memory cleared\n", + "\n", + "============================================================\n", + "📝 SESSION 1: Learning from a bug\n", + "============================================================\n", + "\n", + "🔄 Turn 1:\n", + "💬 Claude: I'll review this code to identify the race condition issues. Let me start by checking my memory and then analyzing the problem.\n", + "\n", + " 🔧 Memory tool: view /memories\n", + " ✓ Result: Directory: /memories\n", + "(empty)\n", + "\n", + "🔄 Turn 2:\n", + " 🔧 Memory tool: create /memories/review_progress.md\n", + " ✓ Result: File created successfully at /memories/review_progress.md\n", + "\n", + "🔄 Turn 3:\n", + "💬 Claude: \n", + "\n", + "## Code Review: Race Condition in Multi-threaded Web Scraper\n", + "\n", + "### 🐛 **Critical Issues Found**\n", + "\n", + "#### **1. Race Condition on Shared Lists (Lines 44-48)**\n", + "\n", + "**Problem:**\n", + "```python\n", + "self.results = [] # Shared across threads\n", + "self.failed_urls = [] # Shared across threads\n", + "\n", + "# In scrape_urls():\n", + "self.failed_urls.append(result[\"url\"]) # NOT THREAD-SAFE\n", + "self.results.append(result) # NOT THREAD-SAFE\n", + "```\n", + "\n", + "**Why This Fails:**\n", + "- Python's `list.append()` is **not atomic** for the overall operation\n", + "- Multiple threads can simultaneously read and modify the list's internal state\n", + "- This causes **lost updates** where some appends are overwritten\n", + "- Results: Inconsistent counts, missing data\n", + "\n", + "**Race Condition Scenario:**\n", + "1. Thread A reads `self.results` size: 5\n", + "2. Thread B reads `self.results` size: 5 \n", + "3. Thread A appends → size becomes 6\n", + "4. Thread B appends → overwrites position, still size 6 (one result lost!)\n", + "\n", + "---\n", + "\n", + "### ✅ **Solutions**\n", + "\n", + "#### **Option 1: Use Thread-Safe Queue (Recommended)**\n", + "```python\n", + "import queue\n", + "from concurrent.futures import ThreadPoolExecutor, as_completed\n", + "\n", + "class WebScraper:\n", + " def __init__(self, max_workers: int = 10):\n", + " self.max_workers = max_workers\n", + " # Use thread-safe queues instead of lists\n", + " self.results = queue.Queue()\n", + " self.failed_urls = queue.Queue()\n", + "\n", + " def scrape_urls(self, urls: List[str]) -> List[Dict[str, any]]:\n", + " with ThreadPoolExecutor(max_workers=self.max_workers) as executor:\n", + " futures = [executor.submit(self.fetch_url, url) for url in urls]\n", + "\n", + " for future in as_completed(futures):\n", + " result = future.result()\n", + " if \"error\" in result:\n", + " self.failed_urls.put(result[\"url\"]) # Thread-safe\n", + " else:\n", + " self.results.put(result) # Thread-safe\n", + "\n", + " # Convert queue to list for return\n", + " return list(self.results.queue)\n", + "\n", + " def get_stats(self) -> Dict[str, int]:\n", + " total = self.results.qsize()\n", + " failed = self.failed_urls.qsize()\n", + " return {\n", + " \"total_results\": total,\n", + " \"failed_urls\": failed,\n", + " \"success_rate\": total / (total + failed) if (total + failed) > 0 else 0,\n", + " }\n", + "```\n", + "\n", + "#### **Option 2: Use Threading Lock**\n", + "```python\n", + "import threading\n", + "\n", + "class WebScraper:\n", + " def __init__(self, max_workers: int = 10):\n", + " self.max_workers = max_workers\n", + " self.results = []\n", + " self.failed_urls = []\n", + " self.lock = threading.Lock() # Add lock for synchronization\n", + "\n", + " def scrape_urls(self, urls: List[str]) -> List[Dict[str, any]]:\n", + " with ThreadPoolExecutor(max_workers=self.max_workers) as executor:\n", + " futures = [executor.submit(self.fetch_url, url) for url in urls]\n", + "\n", + " for future in as_completed(futures):\n", + " result = future.result()\n", + " \n", + " # Protect shared state with lock\n", + " with self.lock:\n", + " if \"error\" in result:\n", + " self.failed_urls.append(result[\"url\"])\n", + " else:\n", + " self.results.append(result)\n", + "\n", + " return self.results\n", + "```\n", + "\n", + "#### **Option 3: Collect Results from Futures (Best for this use case)**\n", + "```python\n", + "class WebScraper:\n", + " def __init__(self, max_workers: int = 10):\n", + " self.max_workers = max_workers\n", + " # No shared state needed!\n", + "\n", + " def scrape_urls(self, urls: List[str]) -> List[Dict[str, any]]:\n", + " results = []\n", + " failed_urls = []\n", + " \n", + " with ThreadPoolExecutor(max_workers=self.max_workers) as executor:\n", + " futures = [executor.submit(self.fetch_url, url) for url in urls]\n", + "\n", + " # Collect results from futures - no race condition!\n", + " for future in as_completed(futures):\n", + " result = future.result()\n", + " if \"error\" in result:\n", + " failed_urls.append(result[\"url\"])\n", + " else:\n", + " results.append(result)\n", + "\n", + " self.results = results # Store after collection complete\n", + " self.failed_urls = failed_urls\n", + " return results\n", + "```\n", + "\n", + "---\n", + "\n", + "### 📊 **Why Option 3 is Best Here**\n", + "\n", + "1. **No synchronization overhead** - Single thread collects results\n", + "2. **Simpler code** - No locks or queues needed\n", + "3. **Natural fit** - The main thread iterates `as_completed()` sequentially anyway\n", + "4. **No performance penalty** - Collection happens in calling thread, not worker threads\n", + "\n", + "---\n", + "\n", + "### 🔍 **Additional Issues**\n", + "\n", + "**Type hint error (Line 20):**\n", + "```python\n", + "def fetch_url(self, url: str) -> Dict[str, any]: # Should be 'Any' not 'any'\n", + "```\n", + "Should be:\n", + "```python\n", + "from typing import Any\n", + "def fetch_url(self, url: str) -> Dict[str, Any]:\n", + "```\n", + "\n", + "---\n", + "\n", + "### 📝 **Summary**\n", + "\n", + "**Root Cause:** Unsynchronized concurrent access to `self.results` and `self.failed_urls` lists\n", + "\n", + "**Impact:** Lost updates, inconsistent result counts\n", + "\n", + "**Fix:** Use Option 3 (local variables during collection) - simplest and most efficient\n", + "\n", + " 🔧 Memory tool: str_replace /memories/review_progress.md\n", + " ✓ Result: File /memories/review_progress.md has been edited successfully\n", + "\n", + "🔄 Turn 4:\n", + "\n", + "============================================================\n", + "✅ Session 1 complete!\n", + "============================================================\n" + ] + } + ], "source": [ - "# SIMPLE MEMORY TOOL\n", - "from agents.tools.base import Tool\n", - "\n", - "class SimpleMemory(Tool):\n", - " \"\"\"String-based memory tool for storing and modifying persistent text.\n", - "\n", - " This tool maintains a single in-memory string that can be read,\n", - " replaced, or selectively edited using string replacement. It provides safety\n", - " warnings when overwriting content or when edit operations would affect\n", - " multiple occurrences.\n", - " \"\"\"\n", - "\n", - " name = \"simple_memory\"\n", - "\n", - " #TODO: Provide additional domain context to guide Claude on the types of items that should be stored\n", - " description = \"\"\"Tool for managing persistent text memory with read, write and edit operations.\n", - " Read: Retrieves full memory contents as a string\n", - " Write: Replaces entire memory (warns when overwriting existing data)\n", - " Edit: Performs targeted string replacement (warns on multiple matches)\"\"\"\n", - "\n", - " # single tool that exposes 3 distinct abilities\n", - " input_schema = {\n", - " \"type\": \"object\",\n", - " \"properties\": {\n", - " \"action\": {\n", - " \"type\": \"string\",\n", - " \"enum\": [\"read\", \"write\", \"edit\"],\n", - " \"description\": \"The memory operation to perform: read retrieves current content, write replaces everything, edit performs string replacement\",\n", - " },\n", - " \"content\": {\n", - " \"type\": \"string\",\n", - " \"description\": \"Full text content to store when using write action (ignored for read/edit)\",\n", - " },\n", - " \"old_string\": {\n", - " \"type\": \"string\",\n", - " \"description\": \"Exact text to find and replace when using edit action (must be unique in memory)\",\n", - " },\n", - " \"new_string\": {\n", - " \"type\": \"string\",\n", - " \"description\": \"Replacement text to insert when using edit action\",\n", - " },\n", - " },\n", - " \"required\": [\"action\"],\n", + "# Import helper functions\n", + "from memory_demo.demo_helpers import run_conversation_loop, run_conversation_turn, print_context_management_info\n", + "from memory_tool import MemoryToolHandler\n", + "\n", + "# Initialize\n", + "client = Anthropic(api_key=API_KEY)\n", + "memory = MemoryToolHandler(base_path=\"./demo_memory\")\n", + "\n", + "# Clear any existing memories to start fresh\n", + "print(\"🧹 Clearing previous memories...\")\n", + "memory.clear_all_memory()\n", + "print(\"✓ Memory cleared\\n\")\n", + "\n", + "# Load example code with a race condition bug\n", + "with open(\"memory_demo/sample_code/web_scraper_v1.py\", \"r\") as f:\n", + " code_to_review = f.read()\n", + "\n", + "messages = [\n", + " {\n", + " \"role\": \"user\",\n", + " \"content\": f\"I'm reviewing a multi-threaded web scraper that sometimes returns fewer results than expected. The count is inconsistent across runs. Can you find the issue?\\n\\n```python\\n{code_to_review}\\n```\"\n", " }\n", + "]\n", "\n", - " def __init__(self):\n", - " self.full_memory = \"\"\n", - " self.compressed_memory = \"\" # not doing anything with this for now\n", - " \n", - " async def execute(self, **kwargs) -> str:\n", - " \"\"\"Execute the memory tool with provided parameters.\"\"\"\n", - " action = kwargs.get(\"action\")\n", - " content = kwargs.get(\"content\", \"\")\n", - " old_string = kwargs.get(\"old_string\", \"\")\n", - " new_string = kwargs.get(\"new_string\", \"\")\n", - "\n", - " if action == \"read\":\n", - " return self._read_memory()\n", - " elif action == \"write\":\n", - " print(\"Writing to memory...\")\n", - " return self._write_memory(content)\n", - " elif action == \"edit\":\n", - " return self._edit_memory(old_string, new_string)\n", - " else:\n", - " return f\"Error: Unknown action '{action}'. Valid actions are read, write, edit.\"\n", - "\n", - " def _read_memory(self) -> str:\n", - " \"\"\"Read the current memory contents.\"\"\"\n", - " return self.full_memory\n", - "\n", - " def _write_memory(self, content: str) -> str:\n", - " \"\"\"Replace the entire memory with new content.\"\"\"\n", - " if self.full_memory:\n", - " previous = self.full_memory\n", - " self.full_memory = content\n", - " return f\"Warning: Overwriting existing content. Previous content was:\\n{previous}\\n\\nMemory has been updated successfully.\"\n", - " self.full_memory = content\n", - " return \"Memory updated successfully.\"\n", - "\n", - " def _edit_memory(self, old_string: str, new_string: str) -> str:\n", - " \"\"\"Replace occurrences of old string with new string.\"\"\"\n", - " if old_string not in self.full_memory:\n", - " return f\"Error: '{old_string}' not found in memory.\"\n", - "\n", - " old_memory = self.full_memory\n", - " count = old_memory.count(old_string)\n", - "\n", - " if count > 1:\n", - " return f\"Warning: Found {count} occurrences of '{old_string}'. Please confirm which occurrence to replace or use more specific context.\"\n", - "\n", - " self.full_memory = self.full_memory.replace(old_string, new_string)\n", - " return f\"Edited memory: 1 occurrence replaced.\"\n", - "\n", - " def __str__(self) -> str:\n", - " return self.full_memory" + "print(\"=\" * 60)\n", + "print(\"📝 SESSION 1: Learning from a bug\")\n", + "print(\"=\" * 60)\n", + "\n", + "# Run conversation loop\n", + "response = run_conversation_loop(\n", + " client=client,\n", + " model=MODEL,\n", + " messages=messages,\n", + " memory_handler=memory,\n", + " system=\"You are a code reviewer.\",\n", + " max_tokens=2048,\n", + " max_turns=5,\n", + " verbose=True\n", + ")\n", + "\n", + "print(\"\\n\" + \"=\" * 60)\n", + "print(\"✅ Session 1 complete!\")\n", + "print(\"=\" * 60)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "### Implementation 2: Compactify Memory \n", - "\n", - "Maintaining a rolling summary over long interactions is a pattern you might have already built into your application. Generally the implementation looks something like:\n", - "\n", + "**What happened?**\n", "\n", - "1) Set a `token_threshold`. This threshold could be the context window for the model, but generally you would set it lower.\n", - "2) Track the current token usage: `system_prompt` + `rolling_summary` (up to step_n) + `message_history[]` (since step_n)\n", - "3) When token usage exceeds threshold, summarize using current `rolling_summary` + `message_history[]`. Clear `message_history[]` and reset `rolling_summary`. \n", + "1. Claude checked its memory (empty on first run)\n", + "2. Identified the bug: **race condition** - multiple threads modifying shared state (`self.results` and `self.failed_urls`) without synchronization\n", + "3. Stored the concurrency pattern in memory for future reference\n", "\n", - "We believe the pattern outlined above works well. The modification we're introducing with this tool is allowing the model to invoke the summarization operation at its own discretion. You might decide to combine these ideas, allowing the model to determine when to summarize but preserve the `token_threshold` + force summarization as a fail safe in tetheh case that Claude doesn't decide to compactify memory in time. \n", - "\n", - "When would you use this?\n", + "Now let's see the magic - Claude applying this learned pattern in a **new conversation**:" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Example 2: Cross-Conversation Learning\n", "\n", - "Similar to the first implementation, test this tool when you don't have a clear idea of what should be saved. Behaviorally speaking, decision making around when to condense a long running conversation can be more reliably tuned compared to the open-endedness of the first memory tool." + "Start a completely new conversation - memory persists!" ] }, { "cell_type": "code", - "execution_count": 40, + "execution_count": 4, "metadata": {}, - "outputs": [], + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "============================================================\n", + "🚀 SESSION 2: Applying learned pattern\n", + "============================================================\n", + "\n", + "🔄 Turn 1:\n", + " 🔧 Memory tool: view /memories\n", + " ✓ Result: Directory: /memories\n", + "- review_progress.md\n", + "\n", + "🔄 Turn 2:\n", + " 🔧 Memory tool: view /memories/review_progress.md\n", + " ✓ Result: 1: # Code Review: Multi-threaded Web Scraper\n", + " 2: \n", + " 3: ## Issue Type\n", + " 4:...\n", + "\n", + "🔄 Turn 3:\n", + " 🔧 Memory tool: str_replace /memories/review_progress.md\n", + " ✓ Result: File /memories/review_progress.md has been edited successfully\n", + "\n", + "🔄 Turn 4:\n", + "💬 Claude: \n", + "\n", + "Now let me review this async API client code:\n", + "\n", + "## Code Review: Async API Client\n", + "\n", + "### ✅ **Correct Assessment of Issues**\n", + "\n", + "The comments in the code correctly identify the problems! Let me elaborate:\n", + "\n", + "---\n", + "\n", + "### 🔴 **Critical Issues**\n", + "\n", + "#### **1. Race Condition on Shared State (Lines 13-14, 44-48)**\n", + "\n", + "**Problem:**\n", + "```python\n", + "self.responses = [] # Shared across coroutines\n", + "self.error_count = 0 # Race condition on increment\n", + "```\n", + "\n", + "While Python's GIL prevents *some* threading issues, **asyncio doesn't have the same protection**. When you `await`, other coroutines can run, leading to interleaving:\n", + "\n", + "```python\n", + "# What could happen:\n", + "# Coroutine A: reads self.error_count (0)\n", + "# Coroutine B: reads self.error_count (0)\n", + "# Coroutine A: increments and writes (1)\n", + "# Coroutine B: increments and writes (1) ← Should be 2!\n", + "```\n", + "\n", + "Similarly, `self.responses.append()` can be interrupted mid-operation.\n", + "\n", + "#### **2. Incorrect Pattern (Lines 41-48)**\n", + "\n", + "The `as_completed` loop is inefficient and still buggy:\n", + "```python\n", + "for coro in asyncio.as_completed(tasks):\n", + " result = await coro\n", + " # Modifying shared state...\n", + "```\n", + "\n", + "---\n", + "\n", + "### 🟡 **Minor Issues**\n", + "\n", + "#### **3. Type Hint Error (Lines 25, 34)**\n", + "```python\n", + "Dict[str, any] # ❌ Wrong: 'any' is not defined\n", + "```\n", + "Should be:\n", + "```python\n", + "Dict[str, Any] # ✅ Correct (import from typing)\n", + "```\n", + "\n", + "#### **4. Missing Error Handling**\n", + "Errors are stored in results but never counted properly due to the race condition.\n", + "\n", + "#### **5. Reusability Issue**\n", + "Calling `fetch_all()` multiple times will accumulate results incorrectly.\n", + "\n", + "---\n", + "\n", + "### ✅ **Recommended Fixes**\n", + "\n", + "#### **Option 1: Use Local Variables (Simplest)**\n", + "\n", + "```python\n", + "async def fetch_all(self, endpoints: List[str]) -> List[Dict[str, Any]]:\n", + " \"\"\"Fetch multiple endpoints concurrently.\"\"\"\n", + " async with aiohttp.ClientSession() as session:\n", + " tasks = [self.fetch_endpoint(session, endpoint) for endpoint in endpoints]\n", + " results = await asyncio.gather(*tasks) # Collect all results\n", + " \n", + " # Now safely update instance variables\n", + " self.responses = [r for r in results if \"error\" not in r]\n", + " self.error_count = sum(1 for r in results if \"error\" in r)\n", + " \n", + " return results # Return ALL results (success + errors)\n", + "```\n", + "\n", + "**Advantages:**\n", + "- Simple and clean\n", + "- No race conditions\n", + "- Uses `asyncio.gather()` which is more efficient\n", + "\n", + "#### **Option 2: Use asyncio.Lock (If Shared State is Required)**\n", + "\n", + "```python\n", + "class AsyncAPIClient:\n", + " def __init__(self, base_url: str):\n", + " self.base_url = base_url\n", + " self.responses = []\n", + " self.error_count = 0\n", + " self._lock = asyncio.Lock() # Add lock\n", + " \n", + " async def fetch_all(self, endpoints: List[str]) -> List[Dict[str, Any]]:\n", + " async with aiohttp.ClientSession() as session:\n", + " tasks = [self.fetch_endpoint(session, endpoint) for endpoint in endpoints]\n", + " \n", + " for coro in asyncio.as_completed(tasks):\n", + " result = await coro\n", + " \n", + " async with self._lock: # Protect shared state\n", + " if \"error\" in result:\n", + " self.error_count += 1\n", + " else:\n", + " self.responses.append(result)\n", + " \n", + " return self.responses\n", + "```\n", + "\n", + "**Advantages:**\n", + "- Explicitly safe\n", + "- Good if multiple methods need to update shared state\n", + "\n", + "---\n", + "\n", + "### 📊 **Comparison: Async vs Threading Races**\n", + "\n", + "| Aspect | Threading | Asyncio |\n", + "|--------|-----------|---------|\n", + "| **Protection** | GIL provides *some* protection | No GIL - explicit coordination needed |\n", + "| **Interleaving** | Can happen anytime | Only happens at `await` points |\n", + "| **Solution** | `threading.Lock()` | `asyncio.Lock()` or avoid shared state |\n", + "\n", + "---\n", + "\n", + "### 🎯 **Final Recommendations**\n", + "\n", + "1. **Use Option 1** (local variables + `asyncio.gather()`) - it's cleaner and faster\n", + "2. Fix the type hints: `any` → `Any`\n", + "3. Consider returning ALL results (including errors) for better observability\n", + "4. Reset `self.responses` and `self.error_count` at the start of `fetch_all()` if you want to reuse the client\n", + "\n", + " 🔧 Memory tool: str_replace /memories/review_progress.md\n", + " ✓ Result: File /memories/review_progress.md has been edited successfully\n", + "\n", + "🔄 Turn 5:\n", + "💬 Claude: \n", + "\n", + "---\n", + "\n", + "### 📝 **Summary**\n", + "\n", + "This code demonstrates a **common misconception**: that asyncio is automatically \"safe\" because it's not multi-threaded. While the GIL prevents *some* threading issues, asyncio coroutines can still interleave at `await` points, causing race conditions on shared mutable state.\n", + "\n", + "**Bottom line:** Use `asyncio.gather()` and local variables instead of modifying instance variables from multiple coroutines. It's simpler, safer, and faster! 🚀\n", + "\n", + "\n", + "============================================================\n", + "✅ Session 2 complete!\n", + "============================================================\n" + ] + } + ], "source": [ - "# COMPACTIFY MEMORY TOOL\n", - "from agents.utils.history_util import MessageHistory\n", - "\n", - "class CompactifyMemory(Tool):\n", - " \"\"\"Memory summarization tool.\n", - " \n", - " Summarizes and replaces the existing message history.\n", - " Expects to have access to a message_history object that is shared with the request handler.\n", - " Descriptions should be modified to introduce use-case specific guidance.\n", - " \"\"\"\n", - " \n", - " name = \"compactify_memory\"\n", - " description = \"\"\"The memory compactifier tool will compress the current conversation history (replaces message history entirely). \n", - " Should be used when there is sufficient information that requires summarization.\n", - " The summary should keep relevant information from any previous summaries.\n", - " \"\"\"\n", - "\n", - " input_schema = {\n", - " \"type\": \"object\",\n", - " \"properties\": {},\n", - " \"required\": []\n", + "# NEW conversation (empty messages)\n", + "# Load API client code with similar concurrency issue\n", + "with open(\"memory_demo/sample_code/api_client_v1.py\", \"r\") as f:\n", + " code_to_review = f.read()\n", + "\n", + "messages = [\n", + " {\n", + " \"role\": \"user\",\n", + " \"content\": f\"Review this API client code:\\n\\n```python\\n{code_to_review}\\n```\"\n", " }\n", - " \n", - " def __init__(self, client: Anthropic):\n", - " self.client = client\n", - " self.full_memory = ''\n", - " self.compressed_memory = '' # not doing anything with this for now\n", - "\n", - " def run_compactify (self, message_history: MessageHistory):\n", - " summary = self.client.messages.create(\n", - " model=\"claude-sonnet-4-20250514\",\n", - " max_tokens = 10000, # modify as needed\n", - " messages=[*message_history.messages, {\n", - " \"role\": \"user\",\n", - " \"content\": \"\"\"Your task is to summarize the conversation using the previous summary as well as the messages since the last summary. Note that this will replace the previous summary entirely, so be sure to include the most relevant information that should be persisted.\"\"\"\n", - " }]\n", - " )\n", - "\n", - " # modify the message history object in place\n", - " message_history.messages = [\n", - " {\n", - " \"role\": \"assistant\",\n", - " \"content\": \"Conversation Summary: \" + summary.content[0].text\n", - " }\n", - " ]\n", - " \n", - " async def execute(self, **kwargs) -> str:\n", - " # ATTN: note that we're breaking tool encapsulation here and will be executing the function outside the agent loop (see agents.agent.py)\n", - " # we do this because we don't have an elegant way to share message state between the agent and tool just yet (...stay tuned)\n", - " return \"pending_compactify\"\n", - " \n", - " def __str__(self):\n", - " return self.full_memory\n", - " " + "]\n", + "\n", + "print(\"=\" * 60)\n", + "print(\"🚀 SESSION 2: Applying learned pattern\")\n", + "print(\"=\" * 60)\n", + "\n", + "# Run conversation loop\n", + "response = run_conversation_loop(\n", + " client=client,\n", + " model=MODEL,\n", + " messages=messages,\n", + " memory_handler=memory,\n", + " system=\"You are a code reviewer.\",\n", + " max_tokens=2048,\n", + " max_turns=5,\n", + " verbose=True\n", + ")\n", + "\n", + "print(\"\\n\" + \"=\" * 60)\n", + "print(\"✅ Session 2 complete!\")\n", + "print(\"=\" * 60)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "### Implementation 3: \"File-Based\" Memory\n", - "\n", - "This implementation gives Claude the ability to interact with a 'memory' system represented to the model as a hierarchical file structure. The example below implements a basic directory, where the 'files' are just strings that we've labeled as plaintext files (the '.txt' label has no impact functionally, but can be useful for behavioral consistency).\n", + "**Notice the difference:**\n", "\n", - "Hierarchical directory structures are easily readable and well-understood by humans and LLMs alike, so it's fitting to use them as a mechanism to represent persistent state more generally to an LLM. While you can connect to and define access patterns for any external storage system, a quick way to get started is with Anthropic's new [Files API](https://docs.claude.com/en/docs/build-with-claude/files). The Files API enables storage and retrieval of objects for use in future requests.\n", + "- Claude **immediately checked memory** and found the thread-safety/concurrency pattern\n", + "- Recognized the similar issue in async code **instantly** without re-learning\n", + "- Response was **faster** because it applied stored knowledge about shared mutable state\n", "\n", - "Ideally you (the developer & domain expert) would construct an initial state for the directory structure that adequately represents your domain context. Having some pre-defined structure provides useful behavioral queues for the model, but you should also introduce more explicit guidance to guard against excessive reads / writes / new file creation / etc." + "This is **cross-conversation learning** in action!" ] }, { - "cell_type": "code", - "execution_count": 41, + "cell_type": "markdown", "metadata": {}, - "outputs": [], "source": [ - "import json\n", - "import re\n", - "\n", - "# HELPER FUNCTION: Parse markdown string for JSON\n", - "def parse_markdown_json(markdown_string):\n", - " \"\"\"\n", - " Parses a JSON string from a Markdown string.\n", - "\n", - " Args:\n", - " markdown_string (str): The Markdown string containing JSON.\n", - "\n", - " Returns:\n", - " dict or list or None: A Python object representing the parsed JSON, or None if parsing fails.\n", - " \"\"\"\n", - " match = re.search(r\"```(?:json)?\\n(.*?)\\n```\", markdown_string, re.DOTALL)\n", - " if match:\n", - " json_string = match.group(1).strip()\n", - " else:\n", - " json_string = markdown_string.strip()\n", - " try:\n", - " parsed_json = json.loads(json_string)\n", - " return parsed_json\n", - " except json.JSONDecodeError:\n", - " return None\n", - "\n", - "# HELPER CLASS: Memory Node\n", - "class MemoryNode:\n", - " def __init__(self, name, is_directory=False, parent=None, content=None):\n", - " self.name = name\n", - " self.is_directory = is_directory\n", - " self.parent = parent\n", - " self.content = content if not is_directory else None\n", - " self.children = {} if is_directory else None\n", - " \n", - " def add_child(self, name, is_directory=False, content=None):\n", - " \"\"\"Add a child node to the current node.\"\"\"\n", - " if not self.is_directory:\n", - " raise ValueError(f\"Cannot add child to file '{self.name}'\")\n", - " \n", - " if name in self.children:\n", - " raise ValueError(f\"Child '{name}' already exists\")\n", - " \n", - " child = MemoryNode(name, is_directory, parent=self, content=content)\n", - " self.children[name] = child\n", - " return child\n", - " \n", - " def remove_child(self, name):\n", - " \"\"\"Remove a child node from the current node.\"\"\"\n", - " if not self.is_directory:\n", - " raise ValueError(f\"Cannot remove child from file '{self.name}'\")\n", - " \n", - " if name not in self.children:\n", - " raise ValueError(f\"Child '{name}' not found\")\n", - " \n", - " del self.children[name]\n", - " \n", - " def find(self, path):\n", - " \"\"\"Find a node by path (ex: 'folder1/folder2/file.txt').\"\"\"\n", - " if not path:\n", - " return self\n", - " \n", - " parts = path.strip('/').split('/', 1)\n", - " child_name = parts[0]\n", - " \n", - " if not self.is_directory or child_name not in self.children:\n", - " return None\n", - " \n", - " child = self.children[child_name]\n", - " \n", - " if len(parts) == 1:\n", - " return child\n", - " else:\n", - " return child.find(parts[1])\n", - " \n", - " def __repr__(self):\n", - " return f\"MemoryNode(name='{self.name}', is_directory={self.is_directory})\"\n", + "### Example 3: Context Clearing While Preserving Memory\n", "\n", - "# HELPER CLASS: Memory Tree\n", - "class MemoryTree:\n", - " def __init__(self):\n", - " self.root = MemoryNode(\"memory\", is_directory=True)\n", - "\n", - " def add(self, path, content):\n", - " \"\"\"Add content to a node at the given path (ex: 'folder1/folder2/file.txt').\"\"\"\n", - " node = self.root.find(path)\n", - " if node:\n", - " node.content = content\n", - " else:\n", - " raise ValueError(f\"Path '{path}' not found\")\n", - "\n", - " def get(self, path):\n", - " \"\"\"Get content from a node at the given path.\"\"\"\n", - " node = self.root.find(path)\n", - " if node:\n", - " return node.content\n", - " else:\n", - " raise ValueError(f\"Path '{path}' not found\")\n", - "\n", - " def edit(self, path, content):\n", - " node = self.root.find(path)\n", - " if node:\n", - " node.content = content\n", - " else:\n", - " raise ValueError(f\"Path '{path}' not found\")\n", - "\n", - " def _build_from_json_recursive(self, json_obj, parent_node):\n", - " \"\"\"Recursively build the tree from a JSON object.\"\"\"\n", - "\n", - " # handle root memory (already initialized)\n", - " if len(json_obj) == 1 and 'memory' in json_obj:\n", - " json_obj = json_obj['memory']\n", - "\n", - " for name, value in json_obj.items():\n", - " if isinstance(value, dict):\n", - " # Create a directory node\n", - " child_node = parent_node.add_child(name, is_directory=True)\n", - " self._build_from_json_recursive(value, child_node)\n", - " else:\n", - " # Create a file node with content\n", - " parent_node.add_child(name, content=value)\n", + "What happens during a **long review session** with many code files?\n", "\n", - " def build_from_json_string(self, str_json_obj):\n", - " json_obj = parse_markdown_json(str_json_obj)\n", - " self._build_from_json_recursive(json_obj, self.root)\n", - "\n", - " def print_tree(self, node=None, prefix=''):\n", - " \"\"\"Print a directory tree structure.\"\"\"\n", - " if node is None:\n", - " node = self.root\n", - " \n", - " # Build list of children for proper indexing\n", - " children = list(node.children.items()) if node.is_directory else []\n", - " \n", - " for index, (name, child) in enumerate(children):\n", - " is_last = index == len(children) - 1\n", - " \n", - " # Create the appropriate connector\n", - " if prefix == '' and node == self.root:\n", - " # For root level items (direct children of root)\n", - " connector = '└── ' if is_last else '├── '\n", - " self.lines.append(f\"{connector}{name}\")\n", - " \n", - " # Recurse if this is a directory\n", - " if child.is_directory:\n", - " extension = ' ' if is_last else '│ '\n", - " self.print_tree(child, extension)\n", - " else:\n", - " # For non-root level items\n", - " connector = '└── ' if is_last else '├── '\n", - " self.lines.append(f\"{prefix}{connector}{name}\")\n", - " \n", - " # Recurse if this is a directory\n", - " if child.is_directory:\n", - " extension = ' ' if is_last else '│ '\n", - " self.print_tree(child, prefix + extension)\n", - "\n", - " def get_tree(self):\n", - " \"\"\"Return the tree as a string.\"\"\"\n", - " self.lines = []\n", - " \n", - " # Start with the root directory name\n", - " self.lines.append(self.root.name)\n", + "- Context fills up with tool results from previous reviews\n", + "- But memory (learned patterns) must persist!\n", "\n", - " # Print the rest of the tree\n", - " self.print_tree()\n", - " return '\\n'.join(self.lines)\n", - "\n", - " def __str__(self):\n", - " return self.get_tree()\n", - "\n", - " def __repr__(self):\n", - " return str(self)" + "Let's trigger **context editing** to see how Claude manages this automatically." ] }, { "cell_type": "code", - "execution_count": 43, + "execution_count": null, "metadata": {}, "outputs": [ { - "data": { - "text/plain": [ - "[{'type': 'file',\n", - " 'id': 'file_011CPaGpXxdBojQLTszA5LGp',\n", - " 'size_bytes': 544347,\n", - " 'created_at': '2025-05-28T16:51:06.716000Z',\n", - " 'filename': 'sample.pdf',\n", - " 'mime_type': 'application/pdf',\n", - " 'downloadable': False},\n", - " {'type': 'file',\n", - " 'id': 'file_011CPYNG2Sf1cWjuCFhKJFV7',\n", - " 'size_bytes': 3,\n", - " 'created_at': '2025-05-27T16:41:15.335000Z',\n", - " 'filename': 'number.txt',\n", - " 'mime_type': 'text/plain',\n", - " 'downloadable': True}]" - ] - }, - "execution_count": 43, - "metadata": {}, - "output_type": "execute_result" + "name": "stdout", + "output_type": "stream", + "text": [ + "============================================================\n", + "📚 SESSION 3: Long review session with context clearing\n", + "============================================================\n", + "\n", + "📝 Review 1: Data processor\n", + " 🔧 Memory tool: str_replace /memories/review_progress.md\n", + " ✓ Result: File /memories/review_progress.md has been edited successfully\n", + " 📊 Input tokens: 5,977\n", + " ℹ️ Context below threshold - no clearing triggered\n", + "\n", + "📝 Review 2: SQL query builder\n", + " 🔧 Memory tool: str_replace /memories/review_progress.md\n", + " ✓ Result: File /memories/review_progress.md has been edited successfully\n", + " 📊 Input tokens: 7,359\n", + " ℹ️ Context below threshold - no clearing triggered\n", + "\n", + "============================================================\n", + "✅ Session 3 complete!\n", + "============================================================\n" + ] } ], "source": [ - "import requests\n", - "import mimetypes\n", - "\n", - "# HELPER CLASS FOR FILE STORAGE using the new files API!\n", - "class StorageManager:\n", - " def __init__(self, api_key):\n", - " if api_key is None:\n", - " raise ValueError(\"ANTHROPIC_API_KEY not available.\")\n", - " self.api_key = api_key\n", - " self.base_url = \"https://api.anthropic.com/v1/files\"\n", - " self.headers = {\n", - " \"x-api-key\": self.api_key,\n", - " \"anthropic-version\": \"2023-06-01\",\n", - " \"anthropic-beta\": \"files-api-2025-04-14\"\n", + "# Configure context management to clear aggressively for demo\n", + "CONTEXT_MANAGEMENT = {\n", + " \"edits\": [\n", + " {\n", + " \"type\": \"clear_tool_uses_20250919\",\n", + " \"trigger\": {\"type\": \"input_tokens\", \"value\": 5000}, # Lower threshold to trigger clearing sooner\n", + " \"keep\": {\"type\": \"tool_uses\", \"value\": 2}, # Keep only the last 2 tool uses\n", + " \"clear_at_least\": {\"type\": \"input_tokens\", \"value\": 3000}\n", " }\n", + " ]\n", + "}\n", "\n", - " def _execute_request(self, method, endpoint, data=None, files=None):\n", - " \"\"\"Execute a request to the API.\"\"\"\n", - " url = f\"{self.base_url}/{endpoint}\"\n", - "\n", - " res = requests.request(method, url, headers=self.headers, data=data, files=files)\n", - " if res.status_code == 200:\n", - " return res.json()\n", - " else:\n", - " raise ValueError(f\"Request failed: {res.status_code} - {res.text}\")\n", - "\n", - " def list_files(self):\n", - " \"\"\"List all files. Direct curl request to the API.\"\"\"\n", - " res = requests.get(\n", - " self.base_url,\n", - " headers=self.headers\n", - " )\n", - " if res.status_code != 200:\n", - " raise ValueError(f\"Failed to retrieve files: {res.status_code} - {res.text}\")\n", - " res = res.json()\n", - " return res['data']\n", - " \n", - " \n", - " def get_file_metadata(self, file_id):\n", - " \"\"\"Get a file by ID. Direct curl request to the API.\"\"\"\n", - " res = requests.get(\n", - " f\"{self.base_url}/{file_id}\",\n", - " headers=self.headers\n", - " )\n", - " if res.status_code != 200:\n", - " raise ValueError(f\"Failed to retrieve file: {res.status_code} - {res.text}\")\n", - " res = res.json()\n", - " return res \n", - " \n", - " def upload_file(self, file_path):\n", - " \"\"\"Upload a file to the API.\"\"\" \n", - " # Determine the file's MIME type\n", - " mime_type, _ = mimetypes.guess_type(file_path)\n", - " if mime_type is None:\n", - " mime_type = \"application/octet-stream\" # Fallback to binary if type unknown\n", - " \n", - " with open(file_path, \"rb\") as file_obj:\n", - " files = {\n", - " \"file\": (os.path.basename(file_path), file_obj, mime_type)\n", - " }\n", - " \n", - " res = requests.post(\n", - " self.base_url,\n", - " headers=self.headers,\n", - " files=files\n", - " )\n", - " \n", - " if res.status_code == 200:\n", - " return res.json()\n", - " else:\n", - " raise ValueError(f\"Failed to upload file: {res.status_code} - {res.text}\")\n", - " \n", - "# example usage\n", - "#file_path = \"/Users/user/Downloads/SB1029-ProjectUpdate-FINAL_020317-A11Y.pdf\" # REPLACE\n", - "storage_manager = StorageManager(os.getenv(\"ANTHROPIC_API_KEY\"))\n", - "#uploaded = storage_manager.upload_file(file_path)\n", - "#storage_manager.get_file_metadata(uploaded['id'])\n", - "storage_manager.list_files()[:2]" + "# Continue from previous session - memory persists!\n", + "# Add multiple code reviews to build up context\n", + "\n", + "print(\"=\" * 60)\n", + "print(\"📚 SESSION 3: Long review session with context clearing\")\n", + "print(\"=\" * 60)\n", + "print()\n", + "\n", + "# Review 1: Data processor (larger file)\n", + "with open(\"memory_demo/sample_code/data_processor_v1.py\", \"r\") as f:\n", + " data_processor_code = f.read()\n", + "\n", + "messages.extend([\n", + " {\n", + " \"role\": \"user\",\n", + " \"content\": f\"Review this data processor:\\n\\n```python\\n{data_processor_code}\\n```\"\n", + " }\n", + "])\n", + "\n", + "print(\"📝 Review 1: Data processor\")\n", + "response = run_conversation_turn(\n", + " client=client,\n", + " model=MODEL,\n", + " messages=messages,\n", + " memory_handler=memory,\n", + " system=\"You are a code reviewer.\",\n", + " context_management=CONTEXT_MANAGEMENT,\n", + " max_tokens=2048,\n", + " verbose=True\n", + ")\n", + "\n", + "# Add response to messages\n", + "messages.append({\"role\": \"assistant\", \"content\": response[1]})\n", + "if response[2]:\n", + " messages.append({\"role\": \"user\", \"content\": response[2]})\n", + "\n", + "print(f\" 📊 Input tokens: {response[0].usage.input_tokens:,}\")\n", + "context_cleared, saved = print_context_management_info(response[0])\n", + "print()\n", + "\n", + "# Review 2: Add SQL code\n", + "with open(\"memory_demo/sample_code/sql_query_builder.py\", \"r\") as f:\n", + " sql_code = f.read()\n", + "\n", + "messages.extend([\n", + " {\n", + " \"role\": \"user\",\n", + " \"content\": f\"Review this SQL query builder:\\n\\n```python\\n{sql_code}\\n```\"\n", + " }\n", + "])\n", + "\n", + "print(\"📝 Review 2: SQL query builder\")\n", + "response = run_conversation_turn(\n", + " client=client,\n", + " model=MODEL,\n", + " messages=messages,\n", + " memory_handler=memory,\n", + " system=\"You are a code reviewer.\",\n", + " context_management=CONTEXT_MANAGEMENT,\n", + " max_tokens=2048,\n", + " verbose=True\n", + ")\n", + "\n", + "messages.append({\"role\": \"assistant\", \"content\": response[1]})\n", + "if response[2]:\n", + " messages.append({\"role\": \"user\", \"content\": response[2]})\n", + "\n", + "print(f\" 📊 Input tokens: {response[0].usage.input_tokens:,}\")\n", + "context_cleared, saved = print_context_management_info(response[0])\n", + "print()\n", + "\n", + "print(\"=\" * 60)\n", + "print(\"✅ Session 3 complete!\")\n", + "print(\"=\" * 60)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "#### What does this look like in practice?\n", + "**What just happened?**\n", "\n", - "Imagine you want to build a company wide chatbot that needs access to information about ongoing projects, teams, customers, etc. You could build a retrieval pipeline that chunks, loads, and refreshes your company documents within a vector database, but tuning this pipeline is a non-trivial task. The neat part about building file-based memory scaffolding for this problem is you can treat files managed by your organization in the exact same manner as files managed by the LLM (just with different read/write permissions).\n", + "As context grew during multiple reviews:\n", + "1. **Context clearing triggered automatically** when input tokens exceeded the threshold\n", + "2. **Old tool results were removed** (data processor review details)\n", + "3. **Memory files remained intact** - Claude can still query learned patterns\n", + "4. **Token usage decreased** - saved thousands of tokens while preserving knowledge\n", "\n", - "Imagine the agent has access the following directory at every turn and can read and update these objects at its discretion.\n", + "This demonstrates the key benefit:\n", + "- **Short-term memory** (conversation context) → Cleared to save space\n", + "- **Long-term memory** (stored patterns) → Persists across sessions\n", "\n", - "```\n", - "claude_memories/\n", - "├── user_session_notes/\n", - "│ ├── cli_debuggin_session_2025_05_02.txt\n", - "│ ├── quarterly_planning_2025_05_01.txt\n", - "│ └── data_analysis_2025_05_01.txt\n", - "├── general_preferences/\n", - "│ ├── code_style.txt\n", - "│ └── all_preferences.txt\n", - "files/\n", - "├── projects/\n", - "│ ├── building_agi.txt\n", - "│ └── prompt_optimization.txt\n", - "├── documents/\n", - "│ ├── updated_risk_report.txt\n", - "│ ├── company_strategy.txt\n", - "│ └── 2024_annual_report.txt\n", - "├── teams/\n", - "│ ├── engineering.txt\n", - "│ └── marketing.txt\n", - "├── customers/\n", - "│ ├── acme.txt\n", - "│ └── widgets.txt\n", - "```\n", + "Let's verify memory survived the clearing:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Verify memory persists after context clearing\n", + "import os\n", "\n", - "A fully featured implementation might enforce the following:\n", - "- `claude_memories/` directory (llm-managed) allows read & write operations\n", - "- `user_session_notes` is stored + loaded per user\n", - "- `files/` directory (org-managed) is read-only and connects to an external storage system\n", - "- as directories grow past a certain size you may want to limit traversal up to depth n, and then allow the model to invoke deeper traversal only as neeeded\n", + "print(\"📂 Memory files in demo_memory/:\")\n", + "print()\n", "\n", - "*In theory, the Simple Memory tool presented in #1 could be represented as a file system with a single available path.*\n" + "for root, dirs, files in os.walk(\"./demo_memory\"):\n", + " # Calculate relative path for display\n", + " level = root.replace(\"./demo_memory\", \"\").count(os.sep)\n", + " indent = \" \" * level\n", + " folder_name = os.path.basename(root) or \"demo_memory\"\n", + " print(f\"{indent}{folder_name}/\")\n", + " \n", + " sub_indent = \" \" * (level + 1)\n", + " for file in files:\n", + " file_path = os.path.join(root, file)\n", + " size = os.path.getsize(file_path)\n", + " print(f\"{sub_indent}├── {file} ({size} bytes)\")\n", + "\n", + "print()\n", + "print(\"✅ All learned patterns preserved despite context clearing!\")" ] }, { - "cell_type": "code", - "execution_count": 44, + "cell_type": "markdown", "metadata": {}, - "outputs": [ - { - "data": { - "text/plain": [ - "memory\n", - "├── self_managed\n", - "│ ├── user_session_notes\n", - "│ │ ├── ongoing_projects.txt\n", - "│ │ └── preferences.txt\n", - "│ └── projects\n", - "│ └── building_agi.txt\n", - "└── files\n", - " └── projects" - ] - }, - "execution_count": 44, - "metadata": {}, - "output_type": "execute_result" - } - ], "source": [ - "# example usage\n", - "company_agent_memory = MemoryTree()\n", + "## 4. How It Works {#how-it-works}\n", "\n", - "# example of the type of object you might get from an LLM (if you wanted to allow the LLM to construct it's own memory structure)\n", - "example_str = \"\"\"\n", - "```json\n", - "{\"self_managed\": {\"user_session_notes\":{\"ongoing_projects.txt\":\"I should remember that the user is working on prompt optimization\",\"preferences.txt\":\"I should remember that the user prefers to be called Jimbo\"},\"projects\":{\"building_agi.txt\":\"I should remember that the user is working on building AGI\"}}, \"files\": {\"projects\":\"building_agi.txt\"}}\n", - "```\n", - "\"\"\"\n", + "### Memory Tool Architecture\n", + "\n", + "The memory tool is **client-side** - you control the storage. Claude makes tool calls, your application executes them.\n", "\n", - "company_agent_memory.build_from_json_string(example_str)\n", - "company_agent_memory\n", + "#### Memory Tool Commands\n", "\n", - "# test out the file utilities below\n", + "| Command | Description | Example |\n", + "|---------|-------------|---------|\n", + "| `view` | Show directory or file contents | `{\"command\": \"view\", \"path\": \"/memories\"}` |\n", + "| `create` | Create or overwrite a file | `{\"command\": \"create\", \"path\": \"/memories/notes.md\", \"file_text\": \"...\"}` |\n", + "| `str_replace` | Replace text in a file | `{\"command\": \"str_replace\", \"path\": \"...\", \"old_str\": \"...\", \"new_str\": \"...\"}` |\n", + "| `insert` | Insert text at line number | `{\"command\": \"insert\", \"path\": \"...\", \"insert_line\": 2, \"insert_text\": \"...\"}` |\n", + "| `delete` | Delete a file or directory | `{\"command\": \"delete\", \"path\": \"/memories/old.txt\"}` |\n", + "| `rename` | Rename or move a file | `{\"command\": \"rename\", \"old_path\": \"...\", \"new_path\": \"...\"}` |\n", "\n", - "# print(company_agent_memory)\n", - "# print(\"GET:\", company_agent_memory.get('self_managed/user_session_notes/ongoing_projects.txt'))\n", - "# company_agent_memory.edit('self_managed/user_session_notes/ongoing_projects.txt', 'The user gave up on prompt optimization')\n", - "# print(\"UPDATED:\", company_agent_memory.get('self_managed/user_session_notes/ongoing_projects.txt'))" + "See `memory_tool.py` for the complete implementation with path validation and security measures." ] }, { - "cell_type": "code", - "execution_count": 45, + "cell_type": "markdown", "metadata": {}, - "outputs": [], "source": [ - "# FILE BASED MEMORY TOOL\n", - "\n", - "class FileBasedMemoryTool(Tool):\n", - " \"\"\"\n", - " Manage memory as a nested file system. This is specifically designed around the new files API.\n", - "\n", - " This tool provides a simple interace for interacting with this memory system.\n", - " We have only defined three actions: GET, EDIT, and BUILD. In practice, you likely would opt for a more opinionated file structure \n", - " and more fine-grained control over access to the memory. We will rely on the default message truncation mechanism of the request handler.\n", - " \"\"\"\n", - "\n", - " name = 'hierarchical_memory'\n", - " description = 'Interact with file system for storing memories, retrieving memories, and rebuilding the memory state.'\n", - " input_schema = {\n", - " 'type': 'object',\n", - " 'properties': {\n", - " 'action': {\n", - " 'type': 'string',\n", - " 'enum': ['get', 'edit', 'build']\n", - " },\n", - " 'paths': {\n", - " 'type': 'array',\n", - " 'items': {\n", - " 'type': 'string',\n", - " 'description': 'Path to the memory item'\n", - " },\n", - " 'description': 'List of paths for the associated action. Available with GET and EDIT actions. (GET can have multiple paths, EDIT should have one path)'\n", - " },\n", - " 'content': {\n", - " 'type': 'string',\n", - " 'description': 'Content that will be written to the specified path. Only available with the EDIT action.'\n", - " },\n", - " 'new_memory_object': {\n", - " 'type': 'object',\n", - " 'description': 'Full memory output object to rebuild the memory scaffold. Only available with the BUILD action. This should be a JSON object representing the desired tree structure for memories. The values should be None (as a placeholder for future content).'\n", - " }\n", - " },\n", - " 'required': ['action']\n", - " }\n", + "### Understanding the Demo Code\n", + "\n", + "Key implementation details from `code_review_demo.py`:\n", + "\n", + "```python\n", + "class CodeReviewAssistant:\n", + " def __init__(self, memory_storage_path=\"./memory_storage\"):\n", + " self.client = Anthropic(api_key=API_KEY)\n", + " self.memory_handler = MemoryToolHandler(base_path=memory_storage_path)\n", + " self.messages = []\n", " \n", - " def __init__(self, storage_manager: StorageManager):\n", - " self.full_memory = MemoryTree()\n", - " self.compressed_memory = self.full_memory # including the compressed memory for standardizing the interface\n", - " self.storage_manager = storage_manager\n", - "\n", - " async def execute(self, **kwargs) -> str:\n", - " action = kwargs.get('action')\n", - " paths = kwargs.get('paths')\n", - " content = kwargs.get('content')\n", - " new_memory_object = kwargs.get('new_memory_object')\n", - "\n", - " if action == 'get':\n", - " # we need to build the file messages from the file metadata (https://docs.claude.com/en/docs/docs/build-with-claude/files)\n", - " message_refs = [{\"type\": \"document\", \"source\": { \"type\": \"file\", \"file_id\": self.full_memory.get(path)}} for path in paths]\n", - " return message_refs\n", - "\n", - " elif action == 'edit':\n", - " path = paths[0]\n", - "\n", - " #create txt file in tmp dir with content\n", - " with open(f'/tmp/{path}.txt', 'w') as f:\n", - " f.write(content)\n", - "\n", - " # upload the file to the API\n", - " uploaded = self.storage_manager.upload_file(f'/tmp/{path}.txt')\n", - "\n", - " # add the file to the memory tree (using the id)\n", - " self.full_memory.edit(path, uploaded['id'])\n", - " return 'Updated'\n", - " \n", - " elif action == 'build':\n", - " self.full_memory.build_from_json_string(new_memory_object)\n", - " return 'Updated'\n", - " \n", - " else:\n", - " raise ValueError(f\"Invalid action: {action}\")\n", + " def review_code(self, code, filename, description=\"\"):\n", + " # 1. Add user message\n", + " self.messages.append({...})\n", " \n", - " def __str__(self):\n", - " return str(self.memory)" + " # 2. Conversation loop with tool execution\n", + " while True:\n", + " response = self.client.beta.messages.create(\n", + " model=MODEL,\n", + " system=self._create_system_prompt(),\n", + " messages=self.messages,\n", + " tools=[{\"type\": \"memory_20250818\", \"name\": \"memory\"}],\n", + " betas=[\"context-management-2025-06-27\"],\n", + " context_management=CONTEXT_MANAGEMENT\n", + " )\n", + " \n", + " # 3. Execute tool uses\n", + " tool_results = []\n", + " for content in response.content:\n", + " if content.type == \"tool_use\":\n", + " result = self._execute_tool_use(content)\n", + " tool_results.append({...})\n", + " \n", + " # 4. Continue if there are tool uses, otherwise done\n", + " if tool_results:\n", + " self.messages.append({\"role\": \"user\", \"content\": tool_results})\n", + " else:\n", + " break\n", + "```\n", + "\n", + "**The key pattern**: Keep calling the API while there are tool uses, executing them and feeding results back." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "#### General Memory Management Advice:\n", - "- Maintain a summary or compressed representation of memory preloaded in the context, even if your tools require actions from the model to load the full information.\n", + "### What Claude Actually Learns\n", + "\n", + "This is what makes memory powerful - **semantic pattern recognition**, not just syntax:\n", + "\n", + "**Session 1: Thread-Based Web Scraper**\n", + "\n", + "```python\n", + "# Bug: Race condition\n", + "class WebScraper:\n", + " def __init__(self):\n", + " self.results = [] # Shared state!\n", + " \n", + " def scrape_urls(self, urls):\n", + " with ThreadPoolExecutor() as executor:\n", + " for future in as_completed(futures):\n", + " self.results.append(future.result()) # RACE!\n", + "```\n", + "\n", + "**What Claude Stores in Memory** (example file: `/memories/concurrency_patterns/thread_safety.md`):\n", + "\n", + "When Claude encounters this pattern, it stores the following insights to its memory files:\n", + "- **Symptom**: Inconsistent results in concurrent operations\n", + "- **Cause**: Shared mutable state (lists/dicts) modified from multiple threads\n", + "- **Solution**: Use locks, thread-safe data structures, or return results instead\n", + "- **Red flags**: Instance variables in thread callbacks, unused locks, counter increments\n", + "\n", + "---\n", + "\n", + "**Session 2: Async API Client** (New conversation!)\n", + "\n", + "Claude checks memory FIRST, finds the thread-safety pattern, then:\n", + "1. **Recognizes** similar pattern in async code (coroutines can interleave too)\n", + "2. **Applies** the solution immediately (no re-learning needed)\n", + "3. **Explains** with reference to stored knowledge\n", + "\n", + "```python\n", + "# Claude spots this immediately:\n", + "async def fetch_all(self, endpoints):\n", + " for coro in asyncio.as_completed(tasks):\n", + " self.responses.append(await coro) # Same pattern!\n", + "```\n", + "\n", + "---\n", "\n", - "- Encourage the model to reason about what to remember and how to update its memory content given the domain or task at hand.\n", + "**Why This Matters:**\n", "\n", - "- Encourage the model to keep the content of its memory up-to-date and coherent. Discourage excessive file creation." + "- ❌ **Syntax checkers** miss race conditions entirely\n", + "- ✅ **Claude learns** architectural patterns and applies them across contexts\n", + "- ✅ **Cross-language**: Pattern applies to Go, Java, Rust concurrency too\n", + "- ✅ **Gets better**: Each review adds to the knowledge base" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "### An Interactive Demo" + "### Sample Code Files\n", + "\n", + "The demo uses these sample files (all have concurrency/thread-safety bugs):\n", + "\n", + "- `memory_demo/sample_code/web_scraper_v1.py` - Race condition: threads modifying shared state\n", + "- `memory_demo/sample_code/api_client_v1.py` - Similar concurrency bug in async context\n", + "- `memory_demo/sample_code/data_processor_v1.py` - Multiple concurrency issues for long session demo\n", + "\n", + "Let's look at one:" ] }, { - "cell_type": "code", - "execution_count": null, + "cell_type": "markdown", "metadata": {}, - "outputs": [], "source": [ - "import ipywidgets as widgets\n", - "from IPython.display import display, clear_output, HTML\n", - "import datetime\n", - "import textwrap\n", - "from typing import List\n", - "from anthropic import Anthropic\n", + "**`memory_demo/sample_code/web_scraper_v1.py`**\n", "\n", - "memory_tools = [\n", - " SimpleMemory(),\n", - " CompactifyMemory(client),\n", - " FileBasedMemoryTool()\n", - "]\n", + "```python\n", + "\"\"\"\n", + "Concurrent web scraper with a race condition bug.\n", + "Multiple threads modify shared state without synchronization.\n", + "\"\"\"\n", "\n", - "def process_memory_function(agent, tool):\n", - " \"\"\"Because some memory tools work with the agents message history object\"\"\"\n", - " mem_tool_names = [tool.name for tool in memory_tools]\n", - " for tool in agent.tools:\n", - " if tool.name in mem_tool_names:\n", - " # ATTN: bit of a hack, but we need to inject some additional functionality\n", - " if tool.name == 'compactify_memory':\n", - " tool.run_compactify(self.agent.message_history)\n", - " \n", + "import time\n", + "from concurrent.futures import ThreadPoolExecutor, as_completed\n", + "from typing import List, Dict\n", "\n", - "class ChatInterface:\n", - " def __init__(self, agent: Agent, max_line_length=80):\n", - " self.max_line_length = max_line_length\n", - " self.agent = agent\n", - " self.messages = [] # managing the window's messages separately from the Agent's messages\n", - " self.memory = ''\n", - "\n", - " # Chat history container\n", - " self.chat_output = widgets.Output(layout=widgets.Layout(\n", - " height='400px', \n", - " overflow='auto',\n", - " border='1px solid #ccc',\n", - " padding='10px',\n", - " display='flex',\n", - " flex_flow='wrap-reverse'\n", - " ))\n", - " \n", - " # Text input for new messages\n", - " self.text_input = widgets.Text(\n", - " placeholder='Type your message here...',\n", - " layout=widgets.Layout(width='100%')\n", - " )\n", - " \n", - " # Send button\n", - " self.send_button = widgets.Button(\n", - " description='Send',\n", - " button_style='primary'\n", - " )\n", - " \n", - " # Memory settings display\n", - " self.memory_display = widgets.Output(layout=widgets.Layout(\n", - " width='100%', \n", - " height='400px',\n", - " border='1px solid #ccc',\n", - " padding ='10px',\n", - "\n", - " )) \n", - " \n", - " # Input container (text input + send button)\n", - " input_box = widgets.HBox([\n", - " self.text_input,\n", - " self.send_button\n", - " ], layout=widgets.Layout(width='100%'))\n", - " \n", - " # Left panel (chat)\n", - " left_panel = widgets.VBox([\n", - " widgets.Label('Chat'),\n", - " self.chat_output,\n", - " input_box\n", - " ], layout=widgets.Layout(\n", - " width='50%',\n", - " padding='10px',\n", - " ))\n", - " \n", - " # Right panel (memory settings)\n", - " right_panel = widgets.VBox([\n", - " widgets.Label('Memory'),\n", - " self.memory_display\n", - " ], layout=widgets.Layout(\n", - " width='50%',\n", - " padding='10px'\n", - " ))\n", - " \n", - " # Main layout\n", - " self.interface = widgets.HBox([\n", - " left_panel,\n", - " right_panel\n", - " ], layout=widgets.Layout(\n", - " width='100%',\n", - " display='flex'\n", - " ))\n", - " \n", - " # Event handlers\n", - " self.send_button.on_click(self.on_send)\n", - " self.text_input.on_submit(self.on_send)\n", - " \n", - " # Message history\n", - " self.messages = []\n", - " \n", - " def on_send(self, _):\n", - " \"\"\"Handle sending a message\"\"\"\n", - " message = self.text_input.value.strip()\n", - " if message:\n", - " self.add_message(\"user\", message)\n", - " self.text_input.value = \"\"\n", + "import requests\n", "\n", - " # call the agent with the message\n", - " response = self.agent.run(message)\n", - " self.add_message(\"assistant\", response.content[0].text)\n", "\n", - " ## PROCESS\n", + "class WebScraper:\n", + " \"\"\"Web scraper that fetches multiple URLs concurrently.\"\"\"\n", "\n", - " self.update_memory_display()\n", - " \n", - " def wrap_text(self, text):\n", - " \"\"\"Wrap text to fit within max_line_length\"\"\"\n", - " # Use textwrap to wrap long lines\n", - " wrapped_lines = []\n", - " for line in text.split('\\n'):\n", - " if len(line) > self.max_line_length:\n", - " # Wrap this line\n", - " wrapped = textwrap.fill(line, width=self.max_line_length)\n", - " wrapped_lines.append(wrapped)\n", - " else:\n", - " wrapped_lines.append(line)\n", - " return '\\n'.join(wrapped_lines)\n", - " \n", - " def add_message(self, role, message):\n", - " \"\"\"Add a message to the chat history with text wrapping\"\"\"\n", - " timestamp = datetime.datetime.now().strftime(\"%H:%M:%S\")\n", - " # Wrap the message text\n", - " wrapped_message = self.wrap_text(message)\n", - " \n", - " self.messages.append({\n", - " \"role\": role,\n", - " \"content\": message, # Store original message\n", - " \"wrapped_message\": wrapped_message, # Store wrapped version\n", - " \"timestamp\": timestamp\n", - " })\n", - " \n", - " with self.chat_output:\n", - " clear_output()\n", - " # Display all messages with HTML formatting\n", - " for msg in self.messages:\n", - " if msg['role'] == 'user':\n", - " color = '#0066cc'\n", + " def __init__(self, max_workers: int = 10):\n", + " self.max_workers = max_workers\n", + " self.results = [] # BUG: Shared mutable state accessed by multiple threads!\n", + " self.failed_urls = [] # BUG: Another race condition!\n", + "\n", + " def fetch_url(self, url: str) -> Dict[str, any]:\n", + " \"\"\"Fetch a single URL and return the result.\"\"\"\n", + " try:\n", + " response = requests.get(url, timeout=5)\n", + " response.raise_for_status()\n", + " return {\n", + " \"url\": url,\n", + " \"status\": response.status_code,\n", + " \"content_length\": len(response.content),\n", + " }\n", + " except requests.exceptions.RequestException as e:\n", + " return {\"url\": url, \"error\": str(e)}\n", + "\n", + " def scrape_urls(self, urls: List[str]) -> List[Dict[str, any]]:\n", + " \"\"\"\n", + " Scrape multiple URLs concurrently.\n", + "\n", + " BUG: self.results is accessed from multiple threads without locking!\n", + " This causes race conditions where results can be lost or corrupted.\n", + " \"\"\"\n", + " with ThreadPoolExecutor(max_workers=self.max_workers) as executor:\n", + " futures = [executor.submit(self.fetch_url, url) for url in urls]\n", + "\n", + " for future in as_completed(futures):\n", + " result = future.result()\n", + "\n", + " # RACE CONDITION: Multiple threads append to self.results simultaneously\n", + " if \"error\" in result:\n", + " self.failed_urls.append(result[\"url\"]) # RACE CONDITION\n", " else:\n", - " color = '#000000'\n", - " \n", - " display(HTML(\n", - " f\"
{self.memory}\"))\n",
- " \n",
- " def display(self):\n",
- " \"\"\"Display the interface\"\"\"\n",
- " return self.interface"
+ " self.results.append(result) # RACE CONDITION\n",
+ "\n",
+ " return self.results\n",
+ "```"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
- "#### Run The Demo"
+ "**Bug**: Multiple threads modify `self.results` and `self.failed_urls` without locking!\n",
+ "\n",
+ "Claude will:\n",
+ "1. Identify the race conditions\n",
+ "2. Store the pattern in `/memories/concurrency_patterns/thread_safety.md`\n",
+ "3. Apply this concurrency pattern to async code in Session 2"
]
},
{
- "cell_type": "code",
- "execution_count": null,
+ "cell_type": "markdown",
"metadata": {},
- "outputs": [],
"source": [
- "memory_tool = FileBasedMemoryTool() # or SimpleMemory() or CompactifyMemory(client) or FileBasedMemoryTool(storage_manager)\n",
- "model_config = {\n",
- " \"model\": \"claude-sonnet-4-20250514\",\n",
- "}\n",
- "agent = Agent(\n",
- " name=\"Assistant\",\n",
- " system=\"You are a helpful assistant designed to work with a user.\", # additional memory instructions can be added here\n",
- " tools=[memory_tool],\n",
- " config=model_config,\n",
- ")\n",
+ "### Demo Overview\n",
"\n",
- "chat = ChatInterface(\n",
- " agent=agent,\n",
- ")\n",
+ "We've built a complete Code Review Assistant. The implementation is in `memory_demo/code_review_demo.py`.\n",
+ "\n",
+ "**To run the interactive demo:**\n",
+ "```bash\n",
+ "python memory_demo/code_review_demo.py\n",
+ "```\n",
"\n",
- "chat.display()"
+ "The demo demonstrates:\n",
+ "1. **Session 1**: Review Python code with a bug → Claude learns the pattern\n",
+ "2. **Session 2**: Review similar code (new conversation) → Claude applies the pattern\n",
+ "3. **Session 3**: Long review session → Context editing keeps it manageable"
]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## 7. Best Practices & Security {#best-practices}\n",
+ "\n",
+ "### Memory Management\n",
+ "\n",
+ "**Do:**\n",
+ "- ✅ Store task-relevant patterns, not conversation history\n",
+ "- ✅ Organize with clear directory structure\n",
+ "- ✅ Use descriptive file names\n",
+ "- ✅ Periodically review and clean up memory\n",
+ "\n",
+ "**Don't:**\n",
+ "- ❌ Store sensitive information (passwords, API keys, PII)\n",
+ "- ❌ Let memory grow unbounded\n",
+ "- ❌ Store everything indiscriminately\n",
+ "\n",
+ "### Security: Path Traversal Protection\n",
+ "\n",
+ "**Critical**: Always validate paths to prevent directory traversal attacks. See `memory_tool.py` for implementation.\n",
+ "\n",
+ "### Security: Memory Poisoning\n",
+ "\n",
+ "**⚠️ Critical Risk**: Memory files are read back into Claude's context, making them a potential vector for prompt injection.\n",
+ "\n",
+ "**Mitigation strategies:**\n",
+ "1. **Content Sanitization**: Filter dangerous patterns before storing\n",
+ "2. **Memory Scope Isolation**: Per-user/per-project isolation \n",
+ "3. **Memory Auditing**: Log and scan all memory operations\n",
+ "4. **Prompt Engineering**: Instruct Claude to ignore instructions in memory\n",
+ "\n",
+ "See `memory_tool.py` for complete security implementation and tests in `tests/`."
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": "## Next Steps\n\n### Resources\n\n- **API docs**: [Claude API reference](https://docs.claude.com/en/api/messages)\n- **Usage docs**: [Memory tool](https://docs.claude.com/en/docs/agents-and-tools/tool-use/memory-tool)\n- **GitHub Action**: [claude-code-action](https://github.com/anthropics/claude-code-action)\n- **Support**: [support.claude.com](https://support.claude.com)\n\n### Feedback\n\nMemory and context management are in **beta**. Share your feedback to help us improve!"
}
],
"metadata": {
"kernelspec": {
- "display_name": "py311",
+ "display_name": ".venv",
"language": "python",
"name": "python3"
},
@@ -1088,9 +1217,9 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
- "version": "3.11.12"
+ "version": "3.11.13"
}
},
"nbformat": 4,
- "nbformat_minor": 2
-}
+ "nbformat_minor": 4
+}
\ No newline at end of file
diff --git a/tool_use/memory_demo/.gitignore b/tool_use/memory_demo/.gitignore
new file mode 100644
index 00000000..664ee29d
--- /dev/null
+++ b/tool_use/memory_demo/.gitignore
@@ -0,0 +1,5 @@
+# Ignore demo-generated directories and files
+demo_memory/
+memory_storage/
+__pycache__/
+*.pyc
\ No newline at end of file
diff --git a/tool_use/memory_demo/code_review_demo.py b/tool_use/memory_demo/code_review_demo.py
new file mode 100644
index 00000000..3e4bcd9e
--- /dev/null
+++ b/tool_use/memory_demo/code_review_demo.py
@@ -0,0 +1,339 @@
+"""
+Code Review Assistant Demo - Three-session demonstration.
+
+This demo showcases:
+1. Session 1: Claude learns debugging patterns
+2. Session 2: Claude applies learned patterns (faster!)
+3. Session 3: Long session with context editing
+
+Requires:
+- .env file with ANTHROPIC_API_KEY and ANTHROPIC_MODEL
+- memory_tool.py in the same directory
+"""
+
+import os
+from typing import Any, Dict, List, Optional
+
+from anthropic import Anthropic
+from dotenv import load_dotenv
+
+import sys
+from pathlib import Path
+
+# Add parent directory to path to import memory_tool
+sys.path.insert(0, str(Path(__file__).parent.parent))
+
+from memory_tool import MemoryToolHandler
+
+
+# Load environment variables
+load_dotenv()
+
+API_KEY = os.getenv("ANTHROPIC_API_KEY")
+MODEL = os.getenv("ANTHROPIC_MODEL")
+
+if not API_KEY:
+ raise ValueError(
+ "ANTHROPIC_API_KEY not found. Copy .env.example to .env and add your API key."
+ )
+
+if not MODEL:
+ raise ValueError(
+ "ANTHROPIC_MODEL not found. Copy .env.example to .env and set the model."
+ )
+
+
+# Context management configuration
+CONTEXT_MANAGEMENT = {
+ "edits": [
+ {
+ "type": "clear_tool_uses_20250919",
+ "trigger": {"type": "input_tokens", "value": 30000},
+ "keep": {"type": "tool_uses", "value": 3},
+ "clear_at_least": {"type": "input_tokens", "value": 5000},
+ }
+ ]
+}
+
+
+class CodeReviewAssistant:
+ """
+ Code review assistant with memory and context editing capabilities.
+
+ This assistant:
+ - Checks memory for debugging patterns before reviewing code
+ - Stores learned patterns for future sessions
+ - Automatically clears old tool results when context grows large
+ """
+
+ def __init__(self, memory_storage_path: str = "./memory_storage"):
+ """
+ Initialize the code review assistant.
+
+ Args:
+ memory_storage_path: Path for memory storage
+ """
+ self.client = Anthropic(api_key=API_KEY)
+ self.memory_handler = MemoryToolHandler(base_path=memory_storage_path)
+ self.messages: List[Dict[str, Any]] = []
+
+ def _create_system_prompt(self) -> str:
+ """Create system prompt with memory instructions."""
+ return """You are an expert code reviewer focused on finding bugs and suggesting improvements.
+
+MEMORY PROTOCOL:
+1. Check your /memories directory for relevant debugging patterns or insights
+2. When you find a bug or pattern, update your memory with what you learned
+3. Keep your memory organized - use descriptive file names and clear content
+
+When reviewing code:
+- Identify bugs, security issues, and code quality problems
+- Explain the issue clearly
+- Provide a corrected version
+- Store important patterns in memory for future reference
+
+Remember: Your memory persists across conversations. Use it wisely."""
+
+ def _execute_tool_use(self, tool_use: Any) -> str:
+ """Execute a tool use and return the result."""
+ if tool_use.name == "memory":
+ result = self.memory_handler.execute(**tool_use.input)
+ return result.get("success") or result.get("error", "Unknown error")
+ return f"Unknown tool: {tool_use.name}"
+
+ def review_code(
+ self, code: str, filename: str, description: str = ""
+ ) -> Dict[str, Any]:
+ """
+ Review code with memory-enhanced analysis.
+
+ Args:
+ code: The code to review
+ filename: Name of the file being reviewed
+ description: Optional description of what to look for
+
+ Returns:
+ Dict with review results and metadata
+ """
+ # Construct user message
+ user_message = f"Please review this code from {filename}"
+ if description:
+ user_message += f"\n\nContext: {description}"
+ user_message += f"\n\n```python\n{code}\n```"
+
+ self.messages.append({"role": "user", "content": user_message})
+
+ # Track token usage and context management
+ total_input_tokens = 0
+ context_edits_applied = []
+
+ # Conversation loop
+ turn = 1
+ while True:
+ print(f" 🔄 Turn {turn}: Calling Claude API...", end="", flush=True)
+ response = self.client.beta.messages.create(
+ model=MODEL,
+ max_tokens=4096,
+ system=self._create_system_prompt(),
+ messages=self.messages,
+ tools=[{"type": "memory_20250818", "name": "memory"}],
+ betas=["context-management-2025-06-27"],
+ extra_body={"context_management": CONTEXT_MANAGEMENT},
+ )
+
+ print(" ✓")
+
+ # Track usage
+ total_input_tokens = response.usage.input_tokens
+
+ # Check for context management
+ if hasattr(response, "context_management") and response.context_management:
+ applied = response.context_management.get("applied_edits", [])
+ if applied:
+ context_edits_applied.extend(applied)
+
+ # Process response content
+ assistant_content = []
+ tool_results = []
+ final_text = []
+
+ for content in response.content:
+ if content.type == "text":
+ assistant_content.append({"type": "text", "text": content.text})
+ final_text.append(content.text)
+ elif content.type == "tool_use":
+ cmd = content.input.get('command', 'unknown')
+ path = content.input.get('path', '')
+ print(f" 🔧 Memory: {cmd} {path}")
+
+ # Execute tool
+ result = self._execute_tool_use(content)
+
+ assistant_content.append(
+ {
+ "type": "tool_use",
+ "id": content.id,
+ "name": content.name,
+ "input": content.input,
+ }
+ )
+
+ tool_results.append(
+ {
+ "type": "tool_result",
+ "tool_use_id": content.id,
+ "content": result,
+ }
+ )
+
+ # Add assistant message
+ self.messages.append({"role": "assistant", "content": assistant_content})
+
+ # If there are tool results, add them and continue
+ if tool_results:
+ self.messages.append({"role": "user", "content": tool_results})
+ turn += 1
+ else:
+ # No more tool uses, we're done
+ print()
+ break
+
+ return {
+ "review": "\n".join(final_text),
+ "input_tokens": total_input_tokens,
+ "context_edits": context_edits_applied,
+ }
+
+ def start_new_session(self) -> None:
+ """Start a new conversation session (memory persists)."""
+ self.messages = []
+
+
+def run_session_1() -> None:
+ """Session 1: Learn debugging patterns."""
+ print("=" * 80)
+ print("SESSION 1: Learning from First Code Review")
+ print("=" * 80)
+
+ assistant = CodeReviewAssistant()
+
+ # Read sample code
+ with open("memory_demo/sample_code/web_scraper_v1.py", "r") as f:
+ code = f.read()
+
+ print("\n📋 Reviewing web_scraper_v1.py...")
+ print("\nMulti-threaded web scraper that sometimes loses results.\n")
+
+ result = assistant.review_code(
+ code=code,
+ filename="web_scraper_v1.py",
+ description="This scraper sometimes returns fewer results than expected. "
+ "The count is inconsistent across runs. Can you find the issue?",
+ )
+
+ print("\n🤖 Claude's Review:\n")
+ print(result["review"])
+ print(f"\n📊 Input tokens used: {result['input_tokens']:,}")
+
+ if result["context_edits"]:
+ print(f"\n🧹 Context edits applied: {result['context_edits']}")
+
+ print("\n✅ Session 1 complete - Claude learned debugging patterns!\n")
+
+
+def run_session_2() -> None:
+ """Session 2: Apply learned patterns."""
+ print("=" * 80)
+ print("SESSION 2: Applying Learned Patterns (New Conversation)")
+ print("=" * 80)
+
+ # New assistant instance (new conversation, but memory persists)
+ assistant = CodeReviewAssistant()
+
+ # Read different sample code with similar bug
+ with open("memory_demo/sample_code/api_client_v1.py", "r") as f:
+ code = f.read()
+
+ print("\n📋 Reviewing api_client_v1.py...")
+ print("\nAsync API client with concurrent requests.\n")
+
+ result = assistant.review_code(
+ code=code,
+ filename="api_client_v1.py",
+ description="Review this async API client. "
+ "It fetches multiple endpoints concurrently. Are there any issues?",
+ )
+
+ print("\n🤖 Claude's Review:\n")
+ print(result["review"])
+ print(f"\n📊 Input tokens used: {result['input_tokens']:,}")
+
+ print("\n✅ Session 2 complete - Claude applied learned patterns faster!\n")
+
+
+def run_session_3() -> None:
+ """Session 3: Long session with context editing."""
+ print("=" * 80)
+ print("SESSION 3: Long Session with Context Editing")
+ print("=" * 80)
+
+ assistant = CodeReviewAssistant()
+
+ # Read data processor code (has multiple issues)
+ with open("memory_demo/sample_code/data_processor_v1.py", "r") as f:
+ code = f.read()
+
+ print("\n📋 Reviewing data_processor_v1.py...")
+ print("\nLarge file with multiple concurrent processing classes.\n")
+
+ result = assistant.review_code(
+ code=code,
+ filename="data_processor_v1.py",
+ description="This data processor handles files concurrently. "
+ "There's also a SharedCache class. Review all components for issues.",
+ )
+
+ print("\n🤖 Claude's Review:\n")
+ print(result["review"])
+ print(f"\n📊 Input tokens used: {result['input_tokens']:,}")
+
+ if result["context_edits"]:
+ print("\n🧹 Context Management Applied:")
+ for edit in result["context_edits"]:
+ print(f" - Type: {edit.get('type')}")
+ print(f" - Cleared tool uses: {edit.get('cleared_tool_uses', 0)}")
+ print(f" - Tokens saved: {edit.get('cleared_input_tokens', 0):,}")
+
+ print("\n✅ Session 3 complete - Context editing kept conversation manageable!\n")
+
+
+def main() -> None:
+ """Run all three demo sessions."""
+ print("\n🚀 Code Review Assistant Demo\n")
+ print("This demo shows:")
+ print("1. Session 1: Claude learns debugging patterns")
+ print("2. Session 2: Claude applies learned patterns (new conversation)")
+ print("3. Session 3: Long session with context editing\n")
+
+ input("Press Enter to start Session 1...")
+ run_session_1()
+
+ input("Press Enter to start Session 2...")
+ run_session_2()
+
+ input("Press Enter to start Session 3...")
+ run_session_3()
+
+ print("=" * 80)
+ print("🎉 Demo Complete!")
+ print("=" * 80)
+ print("\nKey Takeaways:")
+ print("- Memory tool enabled cross-conversation learning")
+ print("- Claude got faster at recognizing similar bugs")
+ print("- Context editing handled long sessions gracefully")
+ print("\n💡 For production GitHub PR reviews, check out:")
+ print(" https://github.com/anthropics/claude-code-action\n")
+
+
+if __name__ == "__main__":
+ main()
diff --git a/tool_use/memory_demo/demo_helpers.py b/tool_use/memory_demo/demo_helpers.py
new file mode 100644
index 00000000..3f86f4bb
--- /dev/null
+++ b/tool_use/memory_demo/demo_helpers.py
@@ -0,0 +1,194 @@
+"""
+Helper functions for memory cookbook demos.
+
+This module provides reusable functions for running conversation loops
+with Claude, handling tool execution, and managing context.
+"""
+
+from typing import Any
+
+from anthropic import Anthropic
+from memory_tool import MemoryToolHandler
+
+
+def execute_tool(tool_use: Any, memory_handler: MemoryToolHandler) -> str:
+ """
+ Execute a tool use and return the result.
+
+ Args:
+ tool_use: The tool use object from Claude's response
+ memory_handler: The memory tool handler instance
+
+ Returns:
+ str: The result of the tool execution
+ """
+ if tool_use.name == "memory":
+ result = memory_handler.execute(**tool_use.input)
+ return result.get("success") or result.get("error", "Unknown error")
+ return f"Unknown tool: {tool_use.name}"
+
+
+def run_conversation_turn(
+ client: Anthropic,
+ model: str,
+ messages: list[dict[str, Any]],
+ memory_handler: MemoryToolHandler,
+ system: str,
+ context_management: dict[str, Any] | None = None,
+ max_tokens: int = 1024,
+ verbose: bool = False
+) -> tuple[Any, list[dict[str, Any]], list[dict[str, Any]]]:
+ """
+ Run a single conversation turn, handling tool uses.
+
+ Args:
+ client: Anthropic client instance
+ model: Model to use
+ messages: Current conversation messages
+ memory_handler: Memory tool handler instance
+ system: System prompt
+ context_management: Optional context management config
+ max_tokens: Max tokens for response
+ verbose: Whether to print tool operations
+
+ Returns:
+ Tuple of (response, assistant_content, tool_results)
+ """
+ memory_tool: dict[str, Any] = {"type": "memory_20250818", "name": "memory"}
+
+ request_params: dict[str, Any] = {
+ "model": model,
+ "max_tokens": max_tokens,
+ "system": system,
+ "messages": messages,
+ "tools": [memory_tool],
+ "betas": ["context-management-2025-06-27"]
+ }
+
+ if context_management:
+ request_params["extra_body"] = {"context_management": context_management}
+
+ response = client.beta.messages.create(**request_params)
+
+ assistant_content = []
+ tool_results = []
+
+ for content in response.content:
+ if content.type == "text":
+ if verbose:
+ print(f"💬 Claude: {content.text}\n")
+ assistant_content.append({"type": "text", "text": content.text})
+ elif content.type == "tool_use":
+ if verbose:
+ cmd = content.input.get('command')
+ path = content.input.get('path', '')
+ print(f" 🔧 Memory tool: {cmd} {path}")
+
+ result = execute_tool(content, memory_handler)
+
+ if verbose:
+ result_preview = result[:80] + "..." if len(result) > 80 else result
+ print(f" ✓ Result: {result_preview}")
+
+ assistant_content.append({
+ "type": "tool_use",
+ "id": content.id,
+ "name": content.name,
+ "input": content.input
+ })
+ tool_results.append({
+ "type": "tool_result",
+ "tool_use_id": content.id,
+ "content": result
+ })
+
+ return response, assistant_content, tool_results
+
+
+def run_conversation_loop(
+ client: Anthropic,
+ model: str,
+ messages: list[dict[str, Any]],
+ memory_handler: MemoryToolHandler,
+ system: str,
+ context_management: dict[str, Any] | None = None,
+ max_tokens: int = 1024,
+ max_turns: int = 5,
+ verbose: bool = False
+) -> Any:
+ """
+ Run a complete conversation loop until Claude stops using tools.
+
+ Args:
+ client: Anthropic client instance
+ model: Model to use
+ messages: Current conversation messages (will be modified in-place)
+ memory_handler: Memory tool handler instance
+ system: System prompt
+ context_management: Optional context management config
+ max_tokens: Max tokens for response
+ max_turns: Maximum number of turns to prevent infinite loops
+ verbose: Whether to print progress
+
+ Returns:
+ The final API response
+ """
+ turn = 1
+ response = None
+
+ while turn <= max_turns:
+ if verbose:
+ print(f"\n🔄 Turn {turn}:")
+
+ response, assistant_content, tool_results = run_conversation_turn(
+ client=client,
+ model=model,
+ messages=messages,
+ memory_handler=memory_handler,
+ system=system,
+ context_management=context_management,
+ max_tokens=max_tokens,
+ verbose=verbose
+ )
+
+ messages.append({"role": "assistant", "content": assistant_content})
+
+ if tool_results:
+ messages.append({"role": "user", "content": tool_results})
+ turn += 1
+ else:
+ # No more tool uses, conversation complete
+ break
+
+ return response
+
+
+def print_context_management_info(response: Any) -> tuple[bool, int]:
+ """
+ Print context management information from response.
+
+ Args:
+ response: API response to analyze
+
+ Returns:
+ Tuple of (context_cleared, saved_tokens)
+ """
+ context_cleared = False
+ saved_tokens = 0
+
+ if hasattr(response, "context_management") and response.context_management:
+ edits = response.context_management.get("applied_edits", [])
+ if edits:
+ context_cleared = True
+ cleared_uses = edits[0].get('cleared_tool_uses', 0)
+ saved_tokens = edits[0].get('cleared_input_tokens', 0)
+ print(f" ✂️ Context editing triggered!")
+ print(f" • Cleared {cleared_uses} tool uses")
+ print(f" • Saved {saved_tokens:,} tokens")
+ print(f" • After clearing: {response.usage.input_tokens:,} tokens")
+ else:
+ print(f" ℹ️ Context below threshold - no clearing triggered")
+ else:
+ print(f" ℹ️ No context management applied")
+
+ return context_cleared, saved_tokens
\ No newline at end of file
diff --git a/tool_use/memory_demo/sample_code/api_client_v1.py b/tool_use/memory_demo/sample_code/api_client_v1.py
new file mode 100644
index 00000000..bd357994
--- /dev/null
+++ b/tool_use/memory_demo/sample_code/api_client_v1.py
@@ -0,0 +1,99 @@
+"""
+Async API client with similar concurrency issues.
+This demonstrates Claude applying thread-safety patterns to async code.
+"""
+
+import asyncio
+from typing import List, Dict, Optional, Any
+
+import aiohttp
+
+
+class AsyncAPIClient:
+ """Async API client for fetching data from multiple endpoints."""
+
+ def __init__(self, base_url: str):
+ self.base_url = base_url
+ self.responses = [] # BUG: Shared state accessed from multiple coroutines!
+ self.error_count = 0 # BUG: Race condition on counter increment!
+
+ async def fetch_endpoint(
+ self, session: aiohttp.ClientSession, endpoint: str
+ ) -> Dict[str, Any]:
+ """Fetch a single endpoint."""
+ url = f"{self.base_url}/{endpoint}"
+ try:
+ async with session.get(
+ url, timeout=aiohttp.ClientTimeout(total=5)
+ ) as response:
+ data = await response.json()
+ return {
+ "endpoint": endpoint,
+ "status": response.status,
+ "data": data,
+ }
+ except Exception as e:
+ return {
+ "endpoint": endpoint,
+ "error": str(e),
+ }
+
+ async def fetch_all(self, endpoints: List[str]) -> List[Dict[str, Any]]:
+ """
+ Fetch multiple endpoints concurrently.
+
+ BUG: Similar to the threading issue, multiple coroutines
+ modify self.responses and self.error_count without coordination!
+ While Python's GIL prevents some race conditions in threads,
+ async code can still have interleaving issues.
+ """
+ async with aiohttp.ClientSession() as session:
+ tasks = [self.fetch_endpoint(session, endpoint) for endpoint in endpoints]
+
+ for coro in asyncio.as_completed(tasks):
+ result = await coro
+
+ # RACE CONDITION: Multiple coroutines modify shared state
+ if "error" in result:
+ self.error_count += 1 # Not atomic!
+ else:
+ self.responses.append(result) # Not thread-safe in async context!
+
+ return self.responses
+
+ def get_summary(self) -> Dict[str, Any]:
+ """Get summary statistics."""
+ return {
+ "total_responses": len(self.responses),
+ "errors": self.error_count,
+ "success_rate": (
+ len(self.responses) / (len(self.responses) + self.error_count)
+ if (len(self.responses) + self.error_count) > 0
+ else 0
+ ),
+ }
+
+
+async def main():
+ """Test the async API client."""
+ client = AsyncAPIClient("https://jsonplaceholder.typicode.com")
+
+ endpoints = [
+ "posts/1",
+ "posts/2",
+ "posts/3",
+ "users/1",
+ "users/2",
+ "invalid/endpoint", # Will error
+ ] * 20 # 120 requests total
+
+ results = await client.fetch_all(endpoints)
+
+ print(f"Expected: ~100 successful responses")
+ print(f"Got: {len(results)} responses")
+ print(f"Summary: {client.get_summary()}")
+ print("\nNote: Counts may be incorrect due to race conditions!")
+
+
+if __name__ == "__main__":
+ asyncio.run(main())
diff --git a/tool_use/memory_demo/sample_code/cache_manager.py b/tool_use/memory_demo/sample_code/cache_manager.py
new file mode 100644
index 00000000..6cc6f3ec
--- /dev/null
+++ b/tool_use/memory_demo/sample_code/cache_manager.py
@@ -0,0 +1,115 @@
+"""
+Cache manager with mutable default argument bug.
+This is one of Python's most common gotchas.
+"""
+
+from datetime import datetime
+from typing import Dict, List, Optional, Any
+
+
+class CacheManager:
+ """Manage cached data with TTL support."""
+
+ def __init__(self):
+ self.cache = {}
+
+ def add_items(
+ self, key: str, items: List[str] = [] # BUG: Mutable default argument!
+ ) -> None:
+ """
+ Add items to cache.
+
+ BUG: Using [] as default creates a SHARED list across all calls!
+ This is one of Python's classic gotchas.
+ """
+ # The items list is shared across ALL calls that don't provide items
+ items.append(f"Added at {datetime.now()}")
+ self.cache[key] = items
+
+ def add_items_fixed(self, key: str, items: Optional[List[str]] = None) -> None:
+ """Add items with proper default handling."""
+ if items is None:
+ items = []
+ items = items.copy() # Also make a copy to avoid mutation
+ items.append(f"Added at {datetime.now()}")
+ self.cache[key] = items
+
+ def merge_configs(
+ self, name: str, overrides: Dict[str, Any] = {} # BUG: Mutable default!
+ ) -> Dict[str, Any]:
+ """
+ Merge configuration with overrides.
+
+ BUG: The default dict is shared across all calls!
+ """
+ defaults = {"timeout": 30, "retries": 3, "cache_enabled": True}
+
+ # This modifies the SHARED overrides dict
+ overrides.update(defaults)
+ return overrides
+
+ def merge_configs_fixed(
+ self, name: str, overrides: Optional[Dict[str, Any]] = None
+ ) -> Dict[str, Any]:
+ """Merge configs properly."""
+ if overrides is None:
+ overrides = {}
+
+ defaults = {"timeout": 30, "retries": 3, "cache_enabled": True}
+
+ # Create new dict to avoid mutation
+ result = {**defaults, **overrides}
+ return result
+
+
+class DataProcessor:
+ """Another example of the mutable default bug."""
+
+ def process_batch(
+ self, data: List[int], filters: List[str] = [] # BUG: Mutable default!
+ ) -> List[int]:
+ """
+ Process data with optional filters.
+
+ BUG: filters list is shared across calls!
+ """
+ filters.append("default_filter") # Modifies shared list!
+
+ result = []
+ for item in data:
+ if "positive" in filters and item < 0:
+ continue
+ result.append(item * 2)
+ return result
+
+
+if __name__ == "__main__":
+ cache = CacheManager()
+
+ # Demonstrate the bug
+ print("=== Demonstrating Mutable Default Argument Bug ===\n")
+
+ # First call with no items
+ cache.add_items("key1")
+ print(f"key1: {cache.cache['key1']}")
+
+ # Second call with no items - SURPRISE! Gets the same list
+ cache.add_items("key2")
+ print(f"key2: {cache.cache['key2']}") # Will have TWO timestamps!
+
+ # Third call - even worse!
+ cache.add_items("key3")
+ print(f"key3: {cache.cache['key3']}") # Will have THREE timestamps!
+
+ print("\nAll keys share the same list object!")
+ print(f"key1 is key2: {cache.cache['key1'] is cache.cache['key2']}")
+
+ print("\n=== Using Fixed Version ===\n")
+ cache2 = CacheManager()
+ cache2.add_items_fixed("key1")
+ cache2.add_items_fixed("key2")
+ cache2.add_items_fixed("key3")
+ print(f"key1: {cache2.cache['key1']}")
+ print(f"key2: {cache2.cache['key2']}")
+ print(f"key3: {cache2.cache['key3']}")
+ print(f"\nkey1 is key2: {cache2.cache['key1'] is cache2.cache['key2']}")
\ No newline at end of file
diff --git a/tool_use/memory_demo/sample_code/data_processor_v1.py b/tool_use/memory_demo/sample_code/data_processor_v1.py
new file mode 100644
index 00000000..97a480ea
--- /dev/null
+++ b/tool_use/memory_demo/sample_code/data_processor_v1.py
@@ -0,0 +1,145 @@
+"""
+Data processor with multiple concurrency and thread-safety issues.
+Used for Session 3 to demonstrate context editing with multiple bugs.
+"""
+
+import json
+import threading
+from concurrent.futures import ThreadPoolExecutor
+from pathlib import Path
+from typing import List, Dict, Any
+
+
+class DataProcessor:
+ """Process data files concurrently with various thread-safety issues."""
+
+ def __init__(self, max_workers: int = 5):
+ self.max_workers = max_workers
+ self.processed_count = 0 # BUG: Race condition on counter
+ self.results = [] # BUG: Shared list without locking
+ self.errors = {} # BUG: Shared dict without locking
+ self.lock = threading.Lock() # Available but not used!
+
+ def process_file(self, file_path: str) -> Dict[str, Any]:
+ """Process a single file."""
+ try:
+ with open(file_path, "r") as f:
+ data = json.load(f)
+
+ # Simulate some processing
+ processed = {
+ "file": file_path,
+ "record_count": len(data) if isinstance(data, list) else 1,
+ "size_bytes": Path(file_path).stat().st_size,
+ }
+
+ return processed
+
+ except Exception as e:
+ return {"file": file_path, "error": str(e)}
+
+ def process_batch(self, file_paths: List[str]) -> List[Dict[str, Any]]:
+ """
+ Process multiple files concurrently.
+
+ MULTIPLE BUGS:
+ 1. self.processed_count is incremented without locking
+ 2. self.results is appended to from multiple threads
+ 3. self.errors is modified from multiple threads
+ 4. We have a lock but don't use it!
+ """
+ with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
+ futures = [executor.submit(self.process_file, fp) for fp in file_paths]
+
+ for future in futures:
+ result = future.result()
+
+ # RACE CONDITION: Increment counter without lock
+ self.processed_count += 1 # BUG!
+
+ if "error" in result:
+ # RACE CONDITION: Modify dict without lock
+ self.errors[result["file"]] = result["error"] # BUG!
+ else:
+ # RACE CONDITION: Append to list without lock
+ self.results.append(result) # BUG!
+
+ return self.results
+
+ def get_statistics(self) -> Dict[str, Any]:
+ """
+ Get processing statistics.
+
+ BUG: Accessing shared state without ensuring thread-safety.
+ If called while processing, could get inconsistent values.
+ """
+ total_files = self.processed_count
+ successful = len(self.results)
+ failed = len(self.errors)
+
+ # BUG: These counts might not add up correctly due to race conditions
+ return {
+ "total_processed": total_files,
+ "successful": successful,
+ "failed": failed,
+ "success_rate": successful / total_files if total_files > 0 else 0,
+ }
+
+ def reset(self):
+ """
+ Reset processor state.
+
+ BUG: No locking - if called during processing, causes corruption.
+ """
+ self.processed_count = 0 # RACE CONDITION
+ self.results = [] # RACE CONDITION
+ self.errors = {} # RACE CONDITION
+
+
+class SharedCache:
+ """
+ A shared cache with thread-safety issues.
+
+ BUG: Classic read-modify-write race condition pattern.
+ """
+
+ def __init__(self):
+ self.cache = {} # BUG: Shared dict without locking
+ self.hit_count = 0 # BUG: Race condition
+ self.miss_count = 0 # BUG: Race condition
+
+ def get(self, key: str) -> Any:
+ """Get from cache - RACE CONDITION on hit/miss counts."""
+ if key in self.cache:
+ self.hit_count += 1 # BUG: Not atomic!
+ return self.cache[key]
+ else:
+ self.miss_count += 1 # BUG: Not atomic!
+ return None
+
+ def set(self, key: str, value: Any):
+ """Set in cache - RACE CONDITION on dict modification."""
+ self.cache[key] = value # BUG: Dict access not synchronized!
+
+ def get_stats(self) -> Dict[str, Any]:
+ """Get cache statistics - may be inconsistent."""
+ total = self.hit_count + self.miss_count
+ return {
+ "hits": self.hit_count,
+ "misses": self.miss_count,
+ "hit_rate": self.hit_count / total if total > 0 else 0,
+ }
+
+
+if __name__ == "__main__":
+ # Create some test files (not included)
+ processor = DataProcessor(max_workers=10)
+
+ # Simulate processing many files
+ file_paths = [f"data/file_{i}.json" for i in range(100)]
+
+ print("Processing files concurrently...")
+ results = processor.process_batch(file_paths)
+
+ print(f"\nStatistics: {processor.get_statistics()}")
+ print("\nNote: Counts may be inconsistent due to race conditions!")
diff --git a/tool_use/memory_demo/sample_code/sql_query_builder.py b/tool_use/memory_demo/sample_code/sql_query_builder.py
new file mode 100644
index 00000000..a3a931af
--- /dev/null
+++ b/tool_use/memory_demo/sample_code/sql_query_builder.py
@@ -0,0 +1,105 @@
+"""
+SQL query builder with SQL injection vulnerability.
+Demonstrates dangerous string formatting in SQL queries.
+"""
+
+from typing import List, Optional
+
+
+class UserDatabase:
+ """Simple database interface (mock)."""
+
+ def execute(self, query: str) -> List[dict]:
+ """Mock execute - just returns the query for inspection."""
+ print(f"Executing: {query}")
+ return []
+
+
+class QueryBuilder:
+ """Build SQL queries for user operations."""
+
+ def __init__(self, db: UserDatabase):
+ self.db = db
+
+ def get_user_by_name(self, username: str) -> Optional[dict]:
+ """
+ Get user by username.
+
+ BUG: SQL INJECTION VULNERABILITY!
+ Using string formatting with user input allows SQL injection.
+ """
+ # DANGEROUS: Never use f-strings or % formatting with user input!
+ query = f"SELECT * FROM users WHERE username = '{username}'"
+ results = self.db.execute(query)
+ return results[0] if results else None
+
+ def get_user_by_name_safe(self, username: str) -> Optional[dict]:
+ """Safe version using parameterized queries."""
+ # Use parameterized queries (this is pseudo-code for the concept)
+ query = "SELECT * FROM users WHERE username = ?"
+ # In real code: self.db.execute(query, (username,))
+ print(f"Safe query with parameter: {query}, params: ({username},)")
+ return None
+
+ def search_users(self, search_term: str, limit: int = 10) -> List[dict]:
+ """
+ Search users by term.
+
+ BUG: SQL INJECTION through LIKE clause!
+ """
+ # DANGEROUS: User input directly in LIKE clause
+ query = f"SELECT * FROM users WHERE name LIKE '%{search_term}%' LIMIT {limit}"
+ return self.db.execute(query)
+
+ def delete_user(self, user_id: str) -> bool:
+ """
+ Delete a user.
+
+ BUG: SQL INJECTION in DELETE statement!
+ This is especially dangerous as it can lead to data loss.
+ """
+ # DANGEROUS: Unvalidated user input in DELETE
+ query = f"DELETE FROM users WHERE id = {user_id}"
+ self.db.execute(query)
+ return True
+
+ def get_users_by_role(self, role: str, order_by: str = "name") -> List[dict]:
+ """
+ Get users by role.
+
+ BUG: SQL INJECTION in ORDER BY clause!
+ Even the ORDER BY clause can be exploited.
+ """
+ # DANGEROUS: User-controlled ORDER BY
+ query = f"SELECT * FROM users WHERE role = '{role}' ORDER BY {order_by}"
+ return self.db.execute(query)
+
+
+if __name__ == "__main__":
+ db = UserDatabase()
+ qb = QueryBuilder(db)
+
+ print("=== Demonstrating SQL Injection Vulnerabilities ===\n")
+
+ # Example 1: Basic injection
+ print("1. Basic username injection:")
+ qb.get_user_by_name("admin' OR '1'='1")
+ # Executes: SELECT * FROM users WHERE username = 'admin' OR '1'='1'
+ # Returns ALL users!
+
+ print("\n2. Search term injection:")
+ qb.search_users("test%' OR 1=1--")
+ # Can bypass the LIKE and return everything
+
+ print("\n3. DELETE injection:")
+ qb.delete_user("1 OR 1=1")
+ # Executes: DELETE FROM users WHERE id = 1 OR 1=1
+ # DELETES ALL USERS!
+
+ print("\n4. ORDER BY injection:")
+ qb.get_users_by_role("admin", "name; DROP TABLE users--")
+ # Can execute arbitrary SQL commands!
+
+ print("\n=== Safe Version ===")
+ qb.get_user_by_name_safe("admin' OR '1'='1")
+ # Parameters are properly escaped
\ No newline at end of file
diff --git a/tool_use/memory_demo/sample_code/web_scraper_v1.py b/tool_use/memory_demo/sample_code/web_scraper_v1.py
new file mode 100644
index 00000000..824a07cf
--- /dev/null
+++ b/tool_use/memory_demo/sample_code/web_scraper_v1.py
@@ -0,0 +1,84 @@
+"""
+Concurrent web scraper with a race condition bug.
+Multiple threads modify shared state without synchronization.
+"""
+
+import time
+from concurrent.futures import ThreadPoolExecutor, as_completed
+from typing import List, Dict, Any
+
+import requests
+
+
+class WebScraper:
+ """Web scraper that fetches multiple URLs concurrently."""
+
+ def __init__(self, max_workers: int = 10):
+ self.max_workers = max_workers
+ self.results = [] # BUG: Shared mutable state accessed by multiple threads!
+ self.failed_urls = [] # BUG: Another race condition!
+
+ def fetch_url(self, url: str) -> Dict[str, Any]:
+ """Fetch a single URL and return the result."""
+ try:
+ response = requests.get(url, timeout=5)
+ response.raise_for_status()
+ return {
+ "url": url,
+ "status": response.status_code,
+ "content_length": len(response.content),
+ }
+ except requests.exceptions.RequestException as e:
+ return {"url": url, "error": str(e)}
+
+ def scrape_urls(self, urls: List[str]) -> List[Dict[str, Any]]:
+ """
+ Scrape multiple URLs concurrently.
+
+ BUG: self.results is accessed from multiple threads without locking!
+ This causes race conditions where results can be lost or corrupted.
+ """
+ with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
+ futures = [executor.submit(self.fetch_url, url) for url in urls]
+
+ for future in as_completed(futures):
+ result = future.result()
+
+ # RACE CONDITION: Multiple threads append to self.results simultaneously
+ if "error" in result:
+ self.failed_urls.append(result["url"]) # RACE CONDITION
+ else:
+ self.results.append(result) # RACE CONDITION
+
+ return self.results
+
+ def get_stats(self) -> Dict[str, int]:
+ """Get scraping statistics."""
+ return {
+ "total_results": len(self.results),
+ "failed_urls": len(self.failed_urls),
+ "success_rate": (
+ len(self.results) / (len(self.results) + len(self.failed_urls))
+ if (len(self.results) + len(self.failed_urls)) > 0
+ else 0
+ ),
+ }
+
+
+if __name__ == "__main__":
+ # Test with multiple URLs
+ urls = [
+ "https://httpbin.org/delay/1",
+ "https://httpbin.org/status/200",
+ "https://httpbin.org/status/404",
+ "https://httpbin.org/delay/2",
+ "https://httpbin.org/status/500",
+ ] * 10 # 50 URLs total to increase race condition probability
+
+ scraper = WebScraper(max_workers=10)
+ results = scraper.scrape_urls(urls)
+
+ print(f"Expected: 50 results")
+ print(f"Got: {len(results)} results")
+ print(f"Stats: {scraper.get_stats()}")
+ print("\nNote: Results count may be less than expected due to race condition!")
diff --git a/tool_use/memory_tool.py b/tool_use/memory_tool.py
new file mode 100644
index 00000000..43f5c770
--- /dev/null
+++ b/tool_use/memory_tool.py
@@ -0,0 +1,366 @@
+"""
+Production-ready memory tool handler for Claude's memory_20250818 tool.
+
+This implementation provides secure, client-side execution of memory operations
+with path validation, error handling, and comprehensive security measures.
+"""
+
+import shutil
+from pathlib import Path
+from typing import Any
+
+
+class MemoryToolHandler:
+ """
+ Handles execution of Claude's memory tool commands.
+
+ The memory tool enables Claude to read, write, and manage files in a memory
+ system through a standardized tool interface. This handler provides client-side
+ implementation with security controls.
+
+ Attributes:
+ base_path: Root directory for memory storage
+ memory_root: The /memories directory within base_path
+ """
+
+ def __init__(self, base_path: str = "./memory_storage"):
+ """
+ Initialize the memory tool handler.
+
+ Args:
+ base_path: Root directory for all memory operations
+ """
+ self.base_path = Path(base_path).resolve()
+ self.memory_root = self.base_path / "memories"
+ self.memory_root.mkdir(parents=True, exist_ok=True)
+
+ def _validate_path(self, path: str) -> Path:
+ """
+ Validate and resolve memory paths to prevent directory traversal attacks.
+
+ Args:
+ path: The path to validate (must start with /memories)
+
+ Returns:
+ Resolved absolute Path object within memory_root
+
+ Raises:
+ ValueError: If path is invalid or attempts to escape memory directory
+ """
+ if not path.startswith("/memories"):
+ raise ValueError(
+ f"Path must start with /memories, got: {path}. "
+ "All memory operations must be confined to the /memories directory."
+ )
+
+ # Remove /memories prefix and any leading slashes
+ relative_path = path[len("/memories") :].lstrip("/")
+
+ # Resolve to absolute path within memory_root
+ if relative_path:
+ full_path = (self.memory_root / relative_path).resolve()
+ else:
+ full_path = self.memory_root.resolve()
+
+ # Verify the resolved path is still within memory_root
+ try:
+ full_path.relative_to(self.memory_root.resolve())
+ except ValueError as e:
+ raise ValueError(
+ f"Path '{path}' would escape /memories directory. "
+ "Directory traversal attempts are not allowed."
+ ) from e
+
+ return full_path
+
+ def execute(self, **params: Any) -> dict[str, str]:
+ """
+ Execute a memory tool command.
+
+ Args:
+ **params: Command parameters from Claude's tool use
+
+ Returns:
+ Dict with either 'success' or 'error' key
+
+ Supported commands:
+ - view: Show directory contents or file contents
+ - create: Create or overwrite a file
+ - str_replace: Replace text in a file
+ - insert: Insert text at a specific line
+ - delete: Delete a file or directory
+ - rename: Rename or move a file/directory
+ """
+ command = params.get("command")
+
+ try:
+ if command == "view":
+ return self._view(params)
+ elif command == "create":
+ return self._create(params)
+ elif command == "str_replace":
+ return self._str_replace(params)
+ elif command == "insert":
+ return self._insert(params)
+ elif command == "delete":
+ return self._delete(params)
+ elif command == "rename":
+ return self._rename(params)
+ else:
+ return {
+ "error": f"Unknown command: '{command}'. "
+ "Valid commands are: view, create, str_replace, insert, delete, rename"
+ }
+ except ValueError as e:
+ return {"error": str(e)}
+ except Exception as e:
+ return {"error": f"Unexpected error executing {command}: {e}"}
+
+ def _view(self, params: dict[str, Any]) -> dict[str, str]:
+ """View directory contents or file contents."""
+ path = params.get("path")
+ view_range = params.get("view_range")
+
+ if not path:
+ return {"error": "Missing required parameter: path"}
+
+ full_path = self._validate_path(path)
+
+ # Handle directory listing
+ if full_path.is_dir():
+ try:
+ items = []
+ for item in sorted(full_path.iterdir()):
+ if item.name.startswith("."):
+ continue
+ items.append(f"{item.name}/" if item.is_dir() else item.name)
+
+ if not items:
+ return {"success": f"Directory: {path}\n(empty)"}
+
+ return {
+ "success": f"Directory: {path}\n"
+ + "\n".join([f"- {item}" for item in items])
+ }
+ except Exception as e:
+ return {"error": f"Cannot read directory {path}: {e}"}
+
+ # Handle file reading
+ elif full_path.is_file():
+ try:
+ content = full_path.read_text(encoding="utf-8")
+ lines = content.splitlines()
+
+ # Apply view range if specified
+ if view_range:
+ start_line = max(1, view_range[0]) - 1 # Convert to 0-indexed
+ end_line = len(lines) if view_range[1] == -1 else view_range[1]
+ lines = lines[start_line:end_line]
+ start_num = start_line + 1
+ else:
+ start_num = 1
+
+ # Format with line numbers
+ numbered_lines = [
+ f"{i + start_num:4d}: {line}" for i, line in enumerate(lines)
+ ]
+ return {"success": "\n".join(numbered_lines)}
+
+ except UnicodeDecodeError:
+ return {"error": f"Cannot read {path}: File is not valid UTF-8 text"}
+ except Exception as e:
+ return {"error": f"Cannot read file {path}: {e}"}
+
+ else:
+ return {"error": f"Path not found: {path}"}
+
+ def _create(self, params: dict[str, Any]) -> dict[str, str]:
+ """Create or overwrite a file."""
+ path = params.get("path")
+ file_text = params.get("file_text", "")
+
+ if not path:
+ return {"error": "Missing required parameter: path"}
+
+ full_path = self._validate_path(path)
+
+ # Don't allow creating directories directly
+ if not path.endswith((".txt", ".md", ".json", ".py", ".yaml", ".yml")):
+ return {
+ "error": f"Cannot create {path}: Only text files are supported. "
+ "Use file extensions: .txt, .md, .json, .py, .yaml, .yml"
+ }
+
+ try:
+ # Create parent directories if needed
+ full_path.parent.mkdir(parents=True, exist_ok=True)
+
+ # Write the file
+ full_path.write_text(file_text, encoding="utf-8")
+ return {"success": f"File created successfully at {path}"}
+
+ except Exception as e:
+ return {"error": f"Cannot create file {path}: {e}"}
+
+ def _str_replace(self, params: dict[str, Any]) -> dict[str, str]:
+ """Replace text in a file."""
+ path = params.get("path")
+ old_str = params.get("old_str")
+ new_str = params.get("new_str", "")
+
+ if not path or old_str is None:
+ return {"error": "Missing required parameters: path, old_str"}
+
+ full_path = self._validate_path(path)
+
+ if not full_path.is_file():
+ return {"error": f"File not found: {path}"}
+
+ try:
+ content = full_path.read_text(encoding="utf-8")
+
+ # Check if old_str exists
+ count = content.count(old_str)
+ if count == 0:
+ return {
+ "error": f"String not found in {path}. "
+ "The exact text must exist in the file."
+ }
+ elif count > 1:
+ return {
+ "error": f"String appears {count} times in {path}. "
+ "The string must be unique. Use more specific context."
+ }
+
+ # Perform replacement
+ new_content = content.replace(old_str, new_str, 1)
+ full_path.write_text(new_content, encoding="utf-8")
+
+ return {"success": f"File {path} has been edited successfully"}
+
+ except Exception as e:
+ return {"error": f"Cannot edit file {path}: {e}"}
+
+ def _insert(self, params: dict[str, Any]) -> dict[str, str]:
+ """Insert text at a specific line."""
+ path = params.get("path")
+ insert_line = params.get("insert_line")
+ insert_text = params.get("insert_text", "")
+
+ if not path or insert_line is None:
+ return {"error": "Missing required parameters: path, insert_line"}
+
+ full_path = self._validate_path(path)
+
+ if not full_path.is_file():
+ return {"error": f"File not found: {path}"}
+
+ try:
+ lines = full_path.read_text(encoding="utf-8").splitlines()
+
+ # Validate insert_line
+ if insert_line < 0 or insert_line > len(lines):
+ return {
+ "error": f"Invalid insert_line {insert_line}. "
+ f"Must be between 0 and {len(lines)}"
+ }
+
+ # Insert the text
+ lines.insert(insert_line, insert_text.rstrip("\n"))
+
+ # Write back
+ full_path.write_text("\n".join(lines) + "\n", encoding="utf-8")
+
+ return {"success": f"Text inserted at line {insert_line} in {path}"}
+
+ except Exception as e:
+ return {"error": f"Cannot insert into {path}: {e}"}
+
+ def _delete(self, params: dict[str, Any]) -> dict[str, str]:
+ """Delete a file or directory."""
+ path = params.get("path")
+
+ if not path:
+ return {"error": "Missing required parameter: path"}
+
+ # Prevent deletion of root memories directory
+ if path == "/memories":
+ return {"error": "Cannot delete the /memories directory itself"}
+
+ full_path = self._validate_path(path)
+
+ # Verify the path is within /memories to prevent accidental deletion outside the memory directory
+ # This provides an additional safety check beyond _validate_path
+ try:
+ full_path.relative_to(self.memory_root.resolve())
+ except ValueError:
+ return {
+ "error": f"Invalid operation: Path '{path}' is not within /memories directory. "
+ "Only paths within /memories can be deleted."
+ }
+
+ if not full_path.exists():
+ return {"error": f"Path not found: {path}"}
+
+ try:
+ if full_path.is_file():
+ full_path.unlink()
+ return {"success": f"File deleted: {path}"}
+ elif full_path.is_dir():
+ shutil.rmtree(full_path)
+ return {"success": f"Directory deleted: {path}"}
+
+ except Exception as e:
+ return {"error": f"Cannot delete {path}: {e}"}
+
+ def _rename(self, params: dict[str, Any]) -> dict[str, str]:
+ """Rename or move a file/directory."""
+ old_path = params.get("old_path")
+ new_path = params.get("new_path")
+
+ if not old_path or not new_path:
+ return {"error": "Missing required parameters: old_path, new_path"}
+
+ old_full_path = self._validate_path(old_path)
+ new_full_path = self._validate_path(new_path)
+
+ if not old_full_path.exists():
+ return {"error": f"Source path not found: {old_path}"}
+
+ if new_full_path.exists():
+ return {
+ "error": f"Destination already exists: {new_path}. "
+ "Cannot overwrite existing files/directories."
+ }
+
+ try:
+ # Create parent directories if needed
+ new_full_path.parent.mkdir(parents=True, exist_ok=True)
+
+ # Perform rename/move
+ old_full_path.rename(new_full_path)
+
+ return {"success": f"Renamed {old_path} to {new_path}"}
+
+ except Exception as e:
+ return {"error": f"Cannot rename {old_path} to {new_path}: {e}"}
+
+ def clear_all_memory(self) -> dict[str, str]:
+ """
+ Clear all memory files (useful for testing or starting fresh).
+
+ ⚠️ WARNING: This method is for demonstration and testing purposes only.
+ In production, you should carefully consider whether you need to delete
+ all memory files, as this will permanently remove all learned patterns
+ and stored knowledge. Consider using selective deletion instead.
+
+ Returns:
+ Dict with success message
+ """
+ try:
+ if self.memory_root.exists():
+ shutil.rmtree(self.memory_root)
+ self.memory_root.mkdir(parents=True, exist_ok=True)
+ return {"success": "All memory cleared successfully"}
+ except Exception as e:
+ return {"error": f"Cannot clear memory: {e}"}
diff --git a/tool_use/requirements.txt b/tool_use/requirements.txt
new file mode 100644
index 00000000..76d40390
--- /dev/null
+++ b/tool_use/requirements.txt
@@ -0,0 +1,3 @@
+anthropic>=0.18.0
+python-dotenv>=1.0.0
+ipykernel>=6.29.0 # For Jupyter in VSCode
\ No newline at end of file
diff --git a/tool_use/tests/test_memory_tool.py b/tool_use/tests/test_memory_tool.py
new file mode 100644
index 00000000..a9135043
--- /dev/null
+++ b/tool_use/tests/test_memory_tool.py
@@ -0,0 +1,426 @@
+"""
+Unit tests for the memory tool handler.
+
+Tests security validation, command execution, and error handling.
+"""
+
+import shutil
+import tempfile
+import unittest
+from pathlib import Path
+
+from memory_tool import MemoryToolHandler
+
+
+class TestMemoryToolHandler(unittest.TestCase):
+ """Test suite for MemoryToolHandler."""
+
+ def setUp(self):
+ """Create temporary directory for each test."""
+ self.test_dir = tempfile.mkdtemp()
+ self.handler = MemoryToolHandler(base_path=self.test_dir)
+
+ def tearDown(self):
+ """Clean up temporary directory after each test."""
+ shutil.rmtree(self.test_dir)
+
+ # Security Tests
+
+ def test_path_validation_requires_memories_prefix(self):
+ """Test that paths must start with /memories."""
+ result = self.handler.execute(command="view", path="/etc/passwd")
+ self.assertIn("error", result)
+ self.assertIn("must start with /memories", result["error"])
+
+ def test_path_validation_prevents_traversal_dotdot(self):
+ """Test that .. traversal is blocked."""
+ result = self.handler.execute(
+ command="view", path="/memories/../../../etc/passwd"
+ )
+ self.assertIn("error", result)
+ self.assertIn("escape", result["error"].lower())
+
+ def test_path_validation_prevents_traversal_encoded(self):
+ """Test that URL-encoded traversal is blocked."""
+ result = self.handler.execute(
+ command="view", path="/memories/%2e%2e/%2e%2e/etc/passwd"
+ )
+ # The path will be processed and should fail validation
+ self.assertIn("error", result)
+
+ def test_path_validation_allows_valid_paths(self):
+ """Test that valid memory paths are accepted."""
+ result = self.handler.execute(
+ command="create", path="/memories/test.txt", file_text="test"
+ )
+ self.assertIn("success", result)
+
+ # View Command Tests
+
+ def test_view_empty_directory(self):
+ """Test viewing an empty /memories directory."""
+ result = self.handler.execute(command="view", path="/memories")
+ self.assertIn("success", result)
+ self.assertIn("empty", result["success"].lower())
+
+ def test_view_directory_with_files(self):
+ """Test viewing a directory with files."""
+ # Create some test files
+ self.handler.execute(
+ command="create", path="/memories/file1.txt", file_text="content1"
+ )
+ self.handler.execute(
+ command="create", path="/memories/file2.txt", file_text="content2"
+ )
+
+ result = self.handler.execute(command="view", path="/memories")
+ self.assertIn("success", result)
+ self.assertIn("file1.txt", result["success"])
+ self.assertIn("file2.txt", result["success"])
+
+ def test_view_file_with_line_numbers(self):
+ """Test viewing a file with line numbers."""
+ content = "line 1\nline 2\nline 3"
+ self.handler.execute(
+ command="create", path="/memories/test.txt", file_text=content
+ )
+
+ result = self.handler.execute(command="view", path="/memories/test.txt")
+ self.assertIn("success", result)
+ self.assertIn(" 1: line 1", result["success"])
+ self.assertIn(" 2: line 2", result["success"])
+ self.assertIn(" 3: line 3", result["success"])
+
+ def test_view_file_with_range(self):
+ """Test viewing specific line range."""
+ content = "line 1\nline 2\nline 3\nline 4"
+ self.handler.execute(
+ command="create", path="/memories/test.txt", file_text=content
+ )
+
+ result = self.handler.execute(
+ command="view", path="/memories/test.txt", view_range=[2, 3]
+ )
+ self.assertIn("success", result)
+ self.assertIn(" 2: line 2", result["success"])
+ self.assertIn(" 3: line 3", result["success"])
+ self.assertNotIn("line 1", result["success"])
+ self.assertNotIn("line 4", result["success"])
+
+ def test_view_nonexistent_path(self):
+ """Test viewing a nonexistent path."""
+ result = self.handler.execute(command="view", path="/memories/notfound.txt")
+ self.assertIn("error", result)
+ self.assertIn("not found", result["error"].lower())
+
+ # Create Command Tests
+
+ def test_create_file(self):
+ """Test creating a file."""
+ result = self.handler.execute(
+ command="create", path="/memories/test.txt", file_text="Hello, World!"
+ )
+ self.assertIn("success", result)
+
+ # Verify file exists
+ file_path = Path(self.test_dir) / "memories" / "test.txt"
+ self.assertTrue(file_path.exists())
+ self.assertEqual(file_path.read_text(), "Hello, World!")
+
+ def test_create_file_in_subdirectory(self):
+ """Test creating a file in a subdirectory (auto-creates dirs)."""
+ result = self.handler.execute(
+ command="create",
+ path="/memories/subdir/test.txt",
+ file_text="Nested content",
+ )
+ self.assertIn("success", result)
+
+ file_path = Path(self.test_dir) / "memories" / "subdir" / "test.txt"
+ self.assertTrue(file_path.exists())
+
+ def test_create_requires_file_extension(self):
+ """Test that create only allows text file extensions."""
+ result = self.handler.execute(
+ command="create", path="/memories/noext", file_text="content"
+ )
+ self.assertIn("error", result)
+ self.assertIn("text files are supported", result["error"])
+
+ def test_create_overwrites_existing_file(self):
+ """Test that create overwrites existing files."""
+ self.handler.execute(
+ command="create", path="/memories/test.txt", file_text="original"
+ )
+ result = self.handler.execute(
+ command="create", path="/memories/test.txt", file_text="updated"
+ )
+ self.assertIn("success", result)
+
+ file_path = Path(self.test_dir) / "memories" / "test.txt"
+ self.assertEqual(file_path.read_text(), "updated")
+
+ # String Replace Command Tests
+
+ def test_str_replace_success(self):
+ """Test successful string replacement."""
+ self.handler.execute(
+ command="create",
+ path="/memories/test.txt",
+ file_text="Hello World",
+ )
+
+ result = self.handler.execute(
+ command="str_replace",
+ path="/memories/test.txt",
+ old_str="World",
+ new_str="Universe",
+ )
+ self.assertIn("success", result)
+
+ file_path = Path(self.test_dir) / "memories" / "test.txt"
+ self.assertEqual(file_path.read_text(), "Hello Universe")
+
+ def test_str_replace_string_not_found(self):
+ """Test replacement when string doesn't exist."""
+ self.handler.execute(
+ command="create", path="/memories/test.txt", file_text="Hello World"
+ )
+
+ result = self.handler.execute(
+ command="str_replace",
+ path="/memories/test.txt",
+ old_str="Missing",
+ new_str="Text",
+ )
+ self.assertIn("error", result)
+ self.assertIn("not found", result["error"].lower())
+
+ def test_str_replace_multiple_occurrences(self):
+ """Test that replacement fails with multiple occurrences."""
+ self.handler.execute(
+ command="create",
+ path="/memories/test.txt",
+ file_text="Hello World Hello World",
+ )
+
+ result = self.handler.execute(
+ command="str_replace",
+ path="/memories/test.txt",
+ old_str="Hello",
+ new_str="Hi",
+ )
+ self.assertIn("error", result)
+ self.assertIn("appears 2 times", result["error"])
+
+ def test_str_replace_file_not_found(self):
+ """Test replacement on nonexistent file."""
+ result = self.handler.execute(
+ command="str_replace",
+ path="/memories/notfound.txt",
+ old_str="old",
+ new_str="new",
+ )
+ self.assertIn("error", result)
+ self.assertIn("not found", result["error"].lower())
+
+ # Insert Command Tests
+
+ def test_insert_at_beginning(self):
+ """Test inserting at line 0 (beginning)."""
+ self.handler.execute(
+ command="create", path="/memories/test.txt", file_text="line 1\nline 2"
+ )
+
+ result = self.handler.execute(
+ command="insert",
+ path="/memories/test.txt",
+ insert_line=0,
+ insert_text="new line",
+ )
+ self.assertIn("success", result)
+
+ file_path = Path(self.test_dir) / "memories" / "test.txt"
+ self.assertEqual(file_path.read_text(), "new line\nline 1\nline 2\n")
+
+ def test_insert_in_middle(self):
+ """Test inserting in the middle."""
+ self.handler.execute(
+ command="create", path="/memories/test.txt", file_text="line 1\nline 2"
+ )
+
+ result = self.handler.execute(
+ command="insert",
+ path="/memories/test.txt",
+ insert_line=1,
+ insert_text="inserted",
+ )
+ self.assertIn("success", result)
+
+ file_path = Path(self.test_dir) / "memories" / "test.txt"
+ self.assertEqual(file_path.read_text(), "line 1\ninserted\nline 2\n")
+
+ def test_insert_at_end(self):
+ """Test inserting at the end."""
+ self.handler.execute(
+ command="create", path="/memories/test.txt", file_text="line 1\nline 2"
+ )
+
+ result = self.handler.execute(
+ command="insert",
+ path="/memories/test.txt",
+ insert_line=2,
+ insert_text="last line",
+ )
+ self.assertIn("success", result)
+
+ def test_insert_invalid_line(self):
+ """Test insert with invalid line number."""
+ self.handler.execute(
+ command="create", path="/memories/test.txt", file_text="line 1"
+ )
+
+ result = self.handler.execute(
+ command="insert",
+ path="/memories/test.txt",
+ insert_line=99,
+ insert_text="text",
+ )
+ self.assertIn("error", result)
+ self.assertIn("invalid", result["error"].lower())
+
+ # Delete Command Tests
+
+ def test_delete_file(self):
+ """Test deleting a file."""
+ self.handler.execute(
+ command="create", path="/memories/test.txt", file_text="content"
+ )
+
+ result = self.handler.execute(command="delete", path="/memories/test.txt")
+ self.assertIn("success", result)
+
+ file_path = Path(self.test_dir) / "memories" / "test.txt"
+ self.assertFalse(file_path.exists())
+
+ def test_delete_directory(self):
+ """Test deleting a directory."""
+ self.handler.execute(
+ command="create", path="/memories/subdir/test.txt", file_text="content"
+ )
+
+ result = self.handler.execute(command="delete", path="/memories/subdir")
+ self.assertIn("success", result)
+
+ dir_path = Path(self.test_dir) / "memories" / "subdir"
+ self.assertFalse(dir_path.exists())
+
+ def test_delete_cannot_delete_root(self):
+ """Test that root /memories directory cannot be deleted."""
+ result = self.handler.execute(command="delete", path="/memories")
+ self.assertIn("error", result)
+ self.assertIn("cannot delete", result["error"].lower())
+
+ def test_delete_nonexistent_path(self):
+ """Test deleting a nonexistent path."""
+ result = self.handler.execute(command="delete", path="/memories/notfound.txt")
+ self.assertIn("error", result)
+ self.assertIn("not found", result["error"].lower())
+
+ # Rename Command Tests
+
+ def test_rename_file(self):
+ """Test renaming a file."""
+ self.handler.execute(
+ command="create", path="/memories/old.txt", file_text="content"
+ )
+
+ result = self.handler.execute(
+ command="rename", old_path="/memories/old.txt", new_path="/memories/new.txt"
+ )
+ self.assertIn("success", result)
+
+ old_path = Path(self.test_dir) / "memories" / "old.txt"
+ new_path = Path(self.test_dir) / "memories" / "new.txt"
+ self.assertFalse(old_path.exists())
+ self.assertTrue(new_path.exists())
+
+ def test_rename_to_subdirectory(self):
+ """Test moving a file to a subdirectory."""
+ self.handler.execute(
+ command="create", path="/memories/file.txt", file_text="content"
+ )
+
+ result = self.handler.execute(
+ command="rename",
+ old_path="/memories/file.txt",
+ new_path="/memories/subdir/file.txt",
+ )
+ self.assertIn("success", result)
+
+ new_path = Path(self.test_dir) / "memories" / "subdir" / "file.txt"
+ self.assertTrue(new_path.exists())
+
+ def test_rename_source_not_found(self):
+ """Test rename when source doesn't exist."""
+ result = self.handler.execute(
+ command="rename",
+ old_path="/memories/notfound.txt",
+ new_path="/memories/new.txt",
+ )
+ self.assertIn("error", result)
+ self.assertIn("not found", result["error"].lower())
+
+ def test_rename_destination_exists(self):
+ """Test rename when destination already exists."""
+ self.handler.execute(
+ command="create", path="/memories/file1.txt", file_text="content1"
+ )
+ self.handler.execute(
+ command="create", path="/memories/file2.txt", file_text="content2"
+ )
+
+ result = self.handler.execute(
+ command="rename",
+ old_path="/memories/file1.txt",
+ new_path="/memories/file2.txt",
+ )
+ self.assertIn("error", result)
+ self.assertIn("already exists", result["error"].lower())
+
+ # Error Handling Tests
+
+ def test_unknown_command(self):
+ """Test handling of unknown command."""
+ result = self.handler.execute(command="invalid", path="/memories")
+ self.assertIn("error", result)
+ self.assertIn("unknown command", result["error"].lower())
+
+ def test_missing_required_parameters(self):
+ """Test error handling for missing parameters."""
+ result = self.handler.execute(command="view")
+ self.assertIn("error", result)
+
+ # Utility Tests
+
+ def test_clear_all_memory(self):
+ """Test clearing all memory."""
+ # Create some files
+ self.handler.execute(
+ command="create", path="/memories/file1.txt", file_text="content1"
+ )
+ self.handler.execute(
+ command="create", path="/memories/file2.txt", file_text="content2"
+ )
+
+ result = self.handler.clear_all_memory()
+ self.assertIn("success", result)
+
+ # Verify directory exists but is empty
+ memory_root = Path(self.test_dir) / "memories"
+ self.assertTrue(memory_root.exists())
+ self.assertEqual(len(list(memory_root.iterdir())), 0)
+
+
+if __name__ == "__main__":
+ unittest.main()