Skip to content

Commit c08e4fd

Browse files
authored
Merge pull request #6 from redis-developer/add-cli
Add a CLI, fix compaction
2 parents f0104db + fe89c28 commit c08e4fd

File tree

12 files changed

+520
-1646
lines changed

12 files changed

+520
-1646
lines changed

README.md

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -228,10 +228,10 @@ First, you'll need to download this repository. After you've downloaded it, you
228228
#### MCP Server
229229
The MCP server can run in either SSE mode or stdio:
230230
```bash
231-
python -m agent_memory_server.mcp <sse|stdio>
231+
agent-memory mcp --mode <sse|stdio>
232232
```
233233

234-
**NOTE:** With uv, just prefix the command with `uv`, e.g.: `uv run python -m agent_memory_server.mcp sse`.
234+
**NOTE:** With uv, prefix the command with `uv`, e.g.: `uv run agent-memory --mode sse`. If you installed from source, you'll probably need to add `--directory` to tell uv where to find the code: `uv run --directory <path/to/checkout> run agent-memory --mode stdio`.
235235

236236
### Docker Compose
237237

@@ -332,12 +332,12 @@ uv sync --all-extras
332332

333333
3. Run the API server:
334334
```bash
335-
python -m agent_memory_server.main
335+
agent-memory api
336336
```
337337

338-
4. In a separate terminal, run the MCP server (use either the "stdio" or "sse" options to set the running mode):
338+
4. In a separate terminal, run the MCP server (use either the "stdio" or "sse" options to set the running mode) if you want to test with tools like Cursor or Claude:
339339
```bash
340-
python -m agent_memory_server.mcp [stdio|sse]
340+
agent-memory mcp --mode <stdio|sse>
341341
```
342342

343343
### Running Tests
@@ -374,7 +374,12 @@ The memory compaction functionality optimizes storage by merging duplicate and s
374374

375375
### Running Compaction
376376

377-
Currently, memory compaction is only available as a function in `agent_memory_server.long_term_memory.compact_long_term_memories`. You can run it manually or trigger it (manually, via code) to run as a background task.
377+
Memory compaction is available as a task function in `agent_memory_server.long_term_memory.compact_long_term_memories`. You can trigger it manually
378+
by running the `agent-memory schedule-task` command:
379+
380+
```bash
381+
agent-memory schedule-task "agent_memory_server.long_term_memory.compact_long_term_memories"
382+
```
378383

379384
### Key Features
380385

agent_memory_server/cli.py

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
import datetime
77
import importlib
8+
import logging
89
import sys
910

1011
import click
@@ -39,17 +40,8 @@ def version():
3940
@click.option("--reload", is_flag=True, help="Enable auto-reload")
4041
def api(port: int, host: str, reload: bool):
4142
"""Run the REST API server."""
42-
import asyncio
43-
4443
from agent_memory_server.main import app, on_start_logger
4544

46-
async def setup_redis():
47-
redis = await get_redis_conn()
48-
await ensure_search_index_exists(redis)
49-
50-
# Run the async setup
51-
asyncio.run(setup_redis())
52-
5345
on_start_logger(port)
5446
uvicorn.run(
5547
app,
@@ -61,8 +53,13 @@ async def setup_redis():
6153

6254
@cli.command()
6355
@click.option("--port", default=settings.mcp_port, help="Port to run the MCP server on")
64-
@click.option("--sse", is_flag=True, help="Run the MCP server in SSE mode")
65-
def mcp(port: int, sse: bool):
56+
@click.option(
57+
"--mode",
58+
default="stdio",
59+
help="Run the MCP server in SSE or stdio mode",
60+
type=click.Choice(["stdio", "sse"]),
61+
)
62+
def mcp(port: int, mode: str):
6663
"""Run the MCP server."""
6764
import asyncio
6865

@@ -73,25 +70,28 @@ def mcp(port: int, sse: bool):
7370
from agent_memory_server.mcp import mcp_app
7471

7572
async def setup_and_run():
76-
redis = await get_redis_conn()
77-
await ensure_search_index_exists(redis)
73+
# Redis setup is handled by the MCP app before it starts
7874

7975
# Run the MCP server
80-
if sse:
76+
if mode == "sse":
77+
logger.info(f"Starting MCP server on port {port}\n")
8178
await mcp_app.run_sse_async()
82-
else:
79+
elif mode == "stdio":
80+
# Try to force all logging to stderr because stdio-mode MCP servers
81+
# use standard output for the protocol.
82+
logging.basicConfig(
83+
level=settings.log_level,
84+
stream=sys.stderr,
85+
force=True, # remove any existing handlers
86+
format="%(asctime)s %(name)s %(levelname)s %(message)s",
87+
)
8388
await mcp_app.run_stdio_async()
89+
else:
90+
raise ValueError(f"Invalid mode: {mode}")
8491

8592
# Update the port in settings
8693
settings.mcp_port = port
8794

88-
click.echo(f"Starting MCP server on port {port}")
89-
90-
if sse:
91-
click.echo("Running in SSE mode")
92-
else:
93-
click.echo("Running in stdio mode")
94-
9595
asyncio.run(setup_and_run())
9696

9797

agent_memory_server/config.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import os
2+
from typing import Literal
23

34
from dotenv import load_dotenv
45
from pydantic_settings import BaseSettings
@@ -34,5 +35,8 @@ class Settings(BaseSettings):
3435
docket_name: str = "memory-server"
3536
use_docket: bool = True
3637

38+
# Other Application settings
39+
log_level: Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] = "INFO"
40+
3741

3842
settings = Settings()

agent_memory_server/logging.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
1+
import logging
2+
import sys
3+
14
import structlog
25

6+
from agent_memory_server.config import settings
7+
38

49
_configured = False
510

@@ -10,14 +15,25 @@ def configure_logging():
1015
if _configured:
1116
return
1217

18+
# Configure standard library logging based on settings.log_level
19+
level = getattr(logging, settings.log_level.upper(), logging.INFO)
20+
handler = logging.StreamHandler(sys.stdout)
21+
handler.setLevel(level)
22+
logging.basicConfig(level=level, handlers=[handler], format="%(message)s")
23+
24+
# Configure structlog with processors honoring the log level and structured output
1325
structlog.configure(
1426
processors=[
15-
structlog.processors.TimeStamper(fmt="iso"),
27+
structlog.stdlib.filter_by_level,
28+
structlog.stdlib.add_logger_name,
1629
structlog.stdlib.add_log_level,
30+
structlog.processors.TimeStamper(fmt="iso"),
31+
structlog.processors.format_exc_info,
1732
structlog.processors.JSONRenderer(),
1833
],
1934
wrapper_class=structlog.stdlib.BoundLogger,
2035
logger_factory=structlog.stdlib.LoggerFactory(),
36+
cache_logger_on_first_use=True,
2137
)
2238
_configured = True
2339

agent_memory_server/long_term_memory.py

Lines changed: 47 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -253,17 +253,6 @@ async def compact_long_term_memories(
253253
f"semantic_duplicates={compact_semantic_duplicates}"
254254
)
255255

256-
# Get all memory keys using scan
257-
memory_keys = []
258-
pattern = "memory:*"
259-
# Scan for memory keys
260-
cursor = 0
261-
while True:
262-
cursor, keys = await redis_client.scan(cursor, match=pattern, count=limit)
263-
memory_keys.extend(keys)
264-
if cursor == 0 or len(memory_keys) >= limit:
265-
break
266-
267256
# Build filters for memory queries
268257
filters = []
269258
if namespace:
@@ -372,21 +361,30 @@ async def compact_long_term_memories(
372361
if compact_semantic_duplicates:
373362
logger.info("Starting semantic duplicate compaction")
374363
try:
375-
# Check if the index exists before proceeding
364+
# Get the correct index name
376365
index_name = Keys.search_index_name()
366+
logger.info(
367+
f"Using index '{index_name}' for semantic duplicate compaction."
368+
)
369+
370+
# Check if the index exists before proceeding
377371
try:
378372
await redis_client.execute_command(f"FT.INFO {index_name}")
379373
except Exception as info_e:
380374
if "unknown index name" in str(info_e).lower():
381-
# Index doesn't exist, create it
382375
logger.info(f"Search index {index_name} doesn't exist, creating it")
383-
await ensure_search_index_exists(redis_client)
376+
# Ensure 'get_search_index' is called with the correct name to create it if needed
377+
await ensure_search_index_exists(
378+
redis_client, index_name=index_name
379+
)
384380
else:
385-
logger.warning(f"Error checking index: {info_e}")
381+
logger.warning(
382+
f"Error checking index '{index_name}': {info_e} - attempting to proceed."
383+
)
386384

387-
# Get all memories matching the filters
388-
index = get_search_index(redis_client)
389-
query_str = filter_str if filter_str != "*" else ""
385+
# Get all memories matching the filters, using the correct index name
386+
index = get_search_index(redis_client, index_name=index_name)
387+
query_str = filter_str if filter_str != "*" else "*"
390388

391389
# Create a query to get all memories
392390
q = Query(query_str).paging(0, limit)
@@ -509,10 +507,9 @@ async def compact_long_term_memories(
509507
if filter_expression:
510508
vector_query.set_filter(filter_expression)
511509

512-
# Execute the vector search
513-
similar_results = None
510+
# Execute the vector search using the AsyncSearchIndex
514511
try:
515-
similar_results = await index.search(vector_query)
512+
vector_search_result = await index.search(vector_query)
516513
except Exception as e:
517514
logger.error(
518515
f"Error in vector search for memory {memory_id}: {e}"
@@ -521,14 +518,14 @@ async def compact_long_term_memories(
521518

522519
# Filter out the current memory and already processed memories
523520
similar_memories = []
524-
if similar_results:
525-
for doc in similar_results.docs:
526-
similar_id = doc.id.replace("memory:", "")
527-
if (
528-
similar_id != memory_id
529-
and similar_id not in processed_ids
530-
):
531-
similar_memories.append(doc)
521+
for doc in getattr(vector_search_result, "docs", []):
522+
# Extract the ID field safely
523+
similar_id = safe_get(doc, "id_").replace("memory:", "")
524+
if (
525+
similar_id != memory_id
526+
and similar_id not in processed_ids
527+
):
528+
similar_memories.append(doc)
532529

533530
# If we found similar memories, merge them
534531
if similar_memories:
@@ -541,7 +538,7 @@ async def compact_long_term_memories(
541538
similar_memory_keys = []
542539

543540
for similar_memory in similar_memories:
544-
similar_id = similar_memory.id.replace(
541+
similar_id = similar_memory["id_"].replace(
545542
"memory:", ""
546543
)
547544
similar_key = Keys.memory_key(
@@ -552,30 +549,30 @@ async def compact_long_term_memories(
552549
# Get similar memory data with error handling
553550
similar_data = {}
554551
try:
555-
# Use pipeline for Redis operations - only await the execute() method
556-
pipeline = redis_client.pipeline()
557-
pipeline.hgetall(similar_key)
558-
# Execute the pipeline and await the result
559-
similar_data_raw = await pipeline.execute()
552+
similar_data_raw = await redis_client.hgetall(
553+
similar_key # type: ignore
554+
)
560555

561-
if similar_data_raw and similar_data_raw[0]:
556+
# hgetall returns a dict of field to value
557+
if similar_data_raw:
562558
# Convert from bytes to strings
563559
similar_data = {
564-
k.decode()
565-
if isinstance(k, bytes)
566-
else k: v.decode()
567-
if isinstance(v, bytes)
568-
and k != b"vector"
569-
else v
570-
for k, v in similar_data_raw[0].items()
560+
(
561+
k.decode()
562+
if isinstance(k, bytes)
563+
else k
564+
): (
565+
v.decode()
566+
if isinstance(v, bytes)
567+
else v
568+
)
569+
for k, v in similar_data_raw.items()
571570
}
572-
similar_memory_data_list.append(
573-
similar_data
574-
)
575-
similar_memory_keys.append(similar_key)
576-
processed_ids.add(
577-
similar_id
578-
) # Mark as processed
571+
similar_memory_data_list.append(similar_data)
572+
similar_memory_keys.append(similar_key)
573+
processed_ids.add(
574+
similar_id
575+
) # Mark as processed
579576
except Exception as e:
580577
logger.error(
581578
f"Error retrieving similar memory {similar_id}: {e}"

0 commit comments

Comments
 (0)