Skip to content

Commit 2b623b1

Browse files
authored
Merge pull request #41 from redis/fix/hash-deduplication-issue-31
Fix hash-based deduplication FT.AGGREGATE query execution
2 parents a2a4373 + 67d732c commit 2b623b1

21 files changed

+188
-304
lines changed

CLAUDE.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# CLAUDE.md - Redis Agent Memory Server Project Context
22

3+
## Redis Version
4+
This project uses Redis 8, which is the redis:8 docker image.
5+
Do not use Redis Stack or other earlier versions of Redis.
6+
37
## Frequently Used Commands
48
Get started in a new environment by installing `uv`:
59
```bash
@@ -188,7 +192,6 @@ EMBEDDING_MODEL=text-embedding-3-small
188192

189193
# Memory Configuration
190194
LONG_TERM_MEMORY=true
191-
WINDOW_SIZE=20
192195
ENABLE_TOPIC_EXTRACTION=true
193196
ENABLE_NER=true
194197
```

agent-memory-client/agent_memory_client/client.py

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,6 @@ async def get_working_memory(
209209
session_id: str,
210210
user_id: str | None = None,
211211
namespace: str | None = None,
212-
window_size: int | None = None,
213212
model_name: ModelNameLiteral | None = None,
214213
context_window_max: int | None = None,
215214
) -> WorkingMemoryResponse:
@@ -220,7 +219,6 @@ async def get_working_memory(
220219
session_id: The session ID to retrieve working memory for
221220
user_id: The user ID to retrieve working memory for
222221
namespace: Optional namespace for the session
223-
window_size: Optional number of messages to include
224222
model_name: Optional model name to determine context window size
225223
context_window_max: Optional direct specification of context window tokens
226224
@@ -241,9 +239,6 @@ async def get_working_memory(
241239
elif self.config.default_namespace is not None:
242240
params["namespace"] = self.config.default_namespace
243241

244-
if window_size is not None:
245-
params["window_size"] = str(window_size)
246-
247242
# Use provided model_name or fall back to config default
248243
effective_model_name = model_name or self.config.default_model_name
249244
if effective_model_name is not None:
@@ -2139,7 +2134,6 @@ async def memory_prompt(
21392134
query: str,
21402135
session_id: str | None = None,
21412136
namespace: str | None = None,
2142-
window_size: int | None = None,
21432137
model_name: str | None = None,
21442138
context_window_max: int | None = None,
21452139
long_term_search: dict[str, Any] | None = None,
@@ -2154,7 +2148,6 @@ async def memory_prompt(
21542148
query: The input text to find relevant context for
21552149
session_id: Optional session ID to include session messages
21562150
namespace: Optional namespace for the session
2157-
window_size: Optional number of messages to include
21582151
model_name: Optional model name to determine context window size
21592152
context_window_max: Optional direct specification of context window tokens
21602153
long_term_search: Optional search parameters for long-term memory
@@ -2169,7 +2162,6 @@ async def memory_prompt(
21692162
prompt = await client.memory_prompt(
21702163
query="What are my UI preferences?",
21712164
session_id="current_session",
2172-
window_size=10,
21732165
long_term_search={
21742166
"topics": {"any": ["preferences", "ui"]},
21752167
"limit": 5
@@ -2190,8 +2182,6 @@ async def memory_prompt(
21902182
session_params["namespace"] = namespace
21912183
elif self.config.default_namespace is not None:
21922184
session_params["namespace"] = self.config.default_namespace
2193-
if window_size is not None:
2194-
session_params["window_size"] = str(window_size)
21952185
# Use provided model_name or fall back to config default
21962186
effective_model_name = model_name or self.config.default_model_name
21972187
if effective_model_name is not None:

agent_memory_server/api.py

Lines changed: 24 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -582,19 +582,16 @@ async def memory_prompt(
582582
logger.debug(f"Memory prompt params: {params}")
583583

584584
if params.session:
585-
# Use token limit for memory prompt, fallback to message count for backward compatibility
585+
# Use token limit for memory prompt - model info is required now
586586
if params.session.model_name or params.session.context_window_max:
587587
token_limit = _get_effective_token_limit(
588588
model_name=params.session.model_name,
589589
context_window_max=params.session.context_window_max,
590590
)
591-
effective_window_size = (
592-
token_limit # We'll handle token-based truncation below
593-
)
591+
effective_token_limit = token_limit
594592
else:
595-
effective_window_size = (
596-
params.session.window_size
597-
) # Fallback to message count
593+
# No model info provided - use all messages without truncation
594+
effective_token_limit = None
598595
working_mem = await working_memory.get_working_memory(
599596
session_id=params.session.session_id,
600597
namespace=params.session.namespace,
@@ -616,46 +613,42 @@ async def memory_prompt(
616613
)
617614
)
618615
# Apply token-based truncation if model info is provided
619-
if params.session.model_name or params.session.context_window_max:
616+
if effective_token_limit is not None:
620617
# Token-based truncation
621618
if (
622619
_calculate_messages_token_count(working_mem.messages)
623-
> effective_window_size
620+
> effective_token_limit
624621
):
625622
# Keep removing oldest messages until we're under the limit
626623
recent_messages = working_mem.messages[:]
627624
while len(recent_messages) > 1: # Always keep at least 1 message
628625
recent_messages = recent_messages[1:] # Remove oldest
629626
if (
630627
_calculate_messages_token_count(recent_messages)
631-
<= effective_window_size
628+
<= effective_token_limit
632629
):
633630
break
634631
else:
635632
recent_messages = working_mem.messages
636-
637-
for msg in recent_messages:
638-
if msg.role == "user":
639-
msg_class = base.UserMessage
640-
else:
641-
msg_class = base.AssistantMessage
642-
_messages.append(
643-
msg_class(
644-
content=TextContent(type="text", text=msg.content),
645-
)
646-
)
647633
else:
648-
# No token-based truncation - use all messages
649-
for msg in working_mem.messages:
650-
if msg.role == "user":
651-
msg_class = base.UserMessage
652-
else:
653-
msg_class = base.AssistantMessage
654-
_messages.append(
655-
msg_class(
656-
content=TextContent(type="text", text=msg.content),
657-
)
634+
# No token limit provided - use all messages
635+
recent_messages = working_mem.messages
636+
637+
for msg in recent_messages:
638+
if msg.role == "user":
639+
msg_class = base.UserMessage
640+
elif msg.role == "assistant":
641+
msg_class = base.AssistantMessage
642+
else:
643+
# For tool messages or other roles, treat as assistant for MCP compatibility
644+
# since MCP base only supports UserMessage and AssistantMessage
645+
msg_class = base.AssistantMessage
646+
647+
_messages.append(
648+
msg_class(
649+
content=TextContent(type="text", text=msg.content),
658650
)
651+
)
659652

660653
if params.long_term_search:
661654
logger.debug(

agent_memory_server/config.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ class Settings(BaseSettings):
5252
long_term_memory: bool = True
5353
openai_api_key: str | None = None
5454
anthropic_api_key: str | None = None
55-
generation_model: str = "gpt-4o-mini"
55+
generation_model: str = "gpt-4o"
5656
embedding_model: str = "text-embedding-3-small"
5757
port: int = 8000
5858
mcp_port: int = 9000
@@ -118,7 +118,6 @@ class Settings(BaseSettings):
118118
auth0_client_secret: str | None = None
119119

120120
# Working memory settings
121-
window_size: int = 20 # Default number of recent messages to return
122121
summarization_threshold: float = (
123122
0.7 # Fraction of context window that triggers summarization
124123
)

agent_memory_server/long_term_memory.py

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -324,21 +324,37 @@ async def compact_long_term_memories(
324324
index_name = Keys.search_index_name()
325325

326326
# Create aggregation query to group by memory_hash and find duplicates
327-
agg_query = (
328-
f"FT.AGGREGATE {index_name} {filter_str} "
329-
"GROUPBY 1 @memory_hash "
330-
"REDUCE COUNT 0 AS count "
331-
'FILTER "@count>1" ' # Only groups with more than 1 memory
332-
"SORTBY 2 @count DESC "
333-
f"LIMIT 0 {limit}"
334-
)
327+
agg_query = [
328+
"FT.AGGREGATE",
329+
index_name,
330+
filter_str,
331+
"GROUPBY",
332+
str(1),
333+
"@memory_hash",
334+
"REDUCE",
335+
"COUNT",
336+
str(0),
337+
"AS",
338+
"count",
339+
"FILTER",
340+
"@count>1", # Only groups with more than 1 memory
341+
"SORTBY",
342+
str(2),
343+
"@count",
344+
"DESC",
345+
"LIMIT",
346+
str(0),
347+
str(limit),
348+
]
335349

336350
# Execute aggregation to find duplicate groups
337-
duplicate_groups = await redis_client.execute_command(agg_query)
351+
duplicate_groups = await redis_client.execute_command(*agg_query)
338352

339353
if duplicate_groups and duplicate_groups[0] > 0:
340354
num_groups = duplicate_groups[0]
341-
logger.info(f"Found {num_groups} groups of hash-based duplicates")
355+
logger.info(
356+
f"Found {num_groups} groups with hash-based duplicates to process"
357+
)
342358

343359
# Process each group of duplicates
344360
for i in range(1, len(duplicate_groups), 2):
@@ -423,9 +439,11 @@ async def compact_long_term_memories(
423439
)
424440
except Exception as e:
425441
logger.error(f"Error processing duplicate group: {e}")
442+
else:
443+
logger.info("No hash-based duplicates found")
426444

427445
logger.info(
428-
f"Completed hash-based deduplication. Merged {memories_merged} memories."
446+
f"Completed hash-based deduplication. Removed {memories_merged} duplicate memories."
429447
)
430448
except Exception as e:
431449
logger.error(f"Error during hash-based duplicate compaction: {e}")

agent_memory_server/main.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,6 @@ async def lifespan(app: FastAPI):
135135

136136
logger.info(
137137
"Redis Agent Memory Server initialized",
138-
window_size=settings.window_size,
139138
generation_model=settings.generation_model,
140139
embedding_model=settings.embedding_model,
141140
long_term_memory=settings.long_term_memory,

agent_memory_server/mcp.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -467,7 +467,6 @@ async def memory_prompt(
467467
query: str,
468468
session_id: SessionId | None = None,
469469
namespace: Namespace | None = None,
470-
window_size: int = settings.window_size,
471470
model_name: ModelNameLiteral | None = None,
472471
context_window_max: int | None = None,
473472
topics: Topics | None = None,
@@ -579,7 +578,6 @@ async def memory_prompt(
579578
session_id=_session_id,
580579
namespace=namespace.eq if namespace and namespace.eq else None,
581580
user_id=user_id.eq if user_id and user_id.eq else None,
582-
window_size=window_size,
583581
model_name=model_name,
584582
context_window_max=context_window_max,
585583
)

agent_memory_server/models.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
from pydantic import BaseModel, Field
88
from ulid import ULID
99

10-
from agent_memory_server.config import settings
1110
from agent_memory_server.filters import (
1211
CreatedAt,
1312
Entities,
@@ -238,7 +237,6 @@ class WorkingMemoryRequest(BaseModel):
238237
session_id: str
239238
namespace: str | None = None
240239
user_id: str | None = None
241-
window_size: int = settings.window_size
242240
model_name: ModelNameLiteral | None = None
243241
context_window_max: int | None = None
244242

0 commit comments

Comments
 (0)