Skip to content

Commit 3412954

Browse files
committed
WIP on summary views
1 parent 8a06b27 commit 3412954

File tree

6 files changed

+1120
-2
lines changed

6 files changed

+1120
-2
lines changed

agent_memory_server/api.py

Lines changed: 247 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from fastapi import APIRouter, Depends, Header, HTTPException, Query
66
from mcp.server.fastmcp.prompts import base
77
from mcp.types import TextContent
8+
from ulid import ULID
89

910
from agent_memory_server import long_term_memory, working_memory
1011
from agent_memory_server.auth import UserInfo, get_current_user
@@ -16,6 +17,7 @@
1617
from agent_memory_server.models import (
1718
AckResponse,
1819
CreateMemoryRecordRequest,
20+
CreateSummaryViewRequest,
1921
EditMemoryRecordRequest,
2022
GetSessionsQuery,
2123
MemoryMessage,
@@ -24,14 +26,30 @@
2426
MemoryRecord,
2527
MemoryRecordResultsResponse,
2628
ModelNameLiteral,
29+
RunSummaryViewPartitionRequest,
30+
RunSummaryViewRequest,
2731
SearchRequest,
2832
SessionListResponse,
33+
SummaryView,
34+
SummaryViewPartitionResult,
2935
SystemMessage,
36+
Task,
37+
TaskStatusEnum,
38+
TaskTypeEnum,
3039
UpdateWorkingMemory,
3140
WorkingMemory,
3241
WorkingMemoryResponse,
3342
)
3443
from agent_memory_server.summarization import _incremental_summary
44+
from agent_memory_server.summary_views import (
45+
get_summary_view as get_summary_view_config,
46+
list_partition_results,
47+
list_summary_views,
48+
save_partition_result,
49+
save_summary_view,
50+
summarize_partition_for_view,
51+
)
52+
from agent_memory_server.tasks import create_task, get_task
3553
from agent_memory_server.utils.redis import get_redis_conn
3654

3755

@@ -1030,3 +1048,232 @@ async def memory_prompt(
10301048
)
10311049

10321050
return MemoryPromptResponse(messages=_messages)
1051+
1052+
1053+
def _validate_summary_view_keys(payload: CreateSummaryViewRequest) -> None:
1054+
"""Validate group_by and filter keys for a SummaryView.
1055+
1056+
For v1 we explicitly restrict these keys to a small, known set so we can
1057+
implement execution safely. We also currently only support long-term
1058+
memory as the source for SummaryViews.
1059+
"""
1060+
1061+
if payload.source != "long_term":
1062+
raise HTTPException(
1063+
status_code=400,
1064+
detail=(
1065+
"SummaryView.source must be 'long_term' for now; "
1066+
"'working_memory' is not yet supported."
1067+
),
1068+
)
1069+
1070+
allowed_group_by = {"user_id", "namespace", "session_id", "memory_type"}
1071+
allowed_filters = {
1072+
"user_id",
1073+
"namespace",
1074+
"session_id",
1075+
"memory_type",
1076+
}
1077+
1078+
invalid_group = [k for k in payload.group_by if k not in allowed_group_by]
1079+
if invalid_group:
1080+
raise HTTPException(
1081+
status_code=400,
1082+
detail=("Unsupported group_by fields: " + ", ".join(sorted(invalid_group))),
1083+
)
1084+
1085+
invalid_filters = [k for k in payload.filters if k not in allowed_filters]
1086+
if invalid_filters:
1087+
raise HTTPException(
1088+
status_code=400,
1089+
detail=("Unsupported filter fields: " + ", ".join(sorted(invalid_filters))),
1090+
)
1091+
1092+
1093+
@router.post("/v1/summary-views", response_model=SummaryView)
1094+
async def create_summary_view(
1095+
payload: CreateSummaryViewRequest,
1096+
current_user: UserInfo = Depends(get_current_user),
1097+
):
1098+
"""Create a new SummaryView configuration.
1099+
1100+
The server assigns an ID; the configuration can then be run on-demand or
1101+
by background workers.
1102+
"""
1103+
1104+
_validate_summary_view_keys(payload)
1105+
1106+
view = SummaryView(
1107+
id=str(ULID()),
1108+
name=payload.name,
1109+
source=payload.source,
1110+
group_by=payload.group_by,
1111+
filters=payload.filters,
1112+
time_window_days=payload.time_window_days,
1113+
continuous=payload.continuous,
1114+
prompt=payload.prompt,
1115+
model_name=payload.model_name,
1116+
)
1117+
1118+
await save_summary_view(view)
1119+
return view
1120+
1121+
1122+
@router.get("/v1/summary-views", response_model=list[SummaryView])
1123+
async def list_summary_views_endpoint(
1124+
current_user: UserInfo = Depends(get_current_user),
1125+
):
1126+
"""List all registered SummaryViews.
1127+
1128+
Filtering by source/continuous can be added later if needed.
1129+
"""
1130+
1131+
return await list_summary_views()
1132+
1133+
1134+
@router.get("/v1/summary-views/{view_id}", response_model=SummaryView)
1135+
async def get_summary_view(
1136+
view_id: str,
1137+
current_user: UserInfo = Depends(get_current_user),
1138+
):
1139+
"""Get a SummaryView configuration by ID."""
1140+
1141+
view = await get_summary_view_config(view_id)
1142+
if view is None:
1143+
raise HTTPException(status_code=404, detail=f"SummaryView {view_id} not found")
1144+
return view
1145+
1146+
1147+
@router.delete("/v1/summary-views/{view_id}", response_model=AckResponse)
1148+
async def delete_summary_view_endpoint(
1149+
view_id: str,
1150+
current_user: UserInfo = Depends(get_current_user),
1151+
):
1152+
"""Delete a SummaryView configuration.
1153+
1154+
Stored partition summaries are left as-is for now.
1155+
"""
1156+
1157+
from agent_memory_server.summary_views import delete_summary_view
1158+
1159+
await delete_summary_view(view_id)
1160+
return AckResponse(status="ok")
1161+
1162+
1163+
@router.post(
1164+
"/v1/summary-views/{view_id}/partitions/run",
1165+
response_model=SummaryViewPartitionResult,
1166+
)
1167+
async def run_summary_view_partition(
1168+
view_id: str,
1169+
payload: RunSummaryViewPartitionRequest,
1170+
current_user: UserInfo = Depends(get_current_user),
1171+
):
1172+
"""Synchronously compute a summary for a single partition of a view.
1173+
1174+
For long-term memory views this will query the underlying memories
1175+
and run a real summarization. For other sources it currently returns
1176+
a placeholder summary.
1177+
"""
1178+
1179+
view = await get_summary_view_config(view_id)
1180+
if view is None:
1181+
raise HTTPException(status_code=404, detail=f"SummaryView {view_id} not found")
1182+
1183+
# Ensure the provided group keys match the view's group_by definition.
1184+
group_keys = set(payload.group.keys())
1185+
expected_keys = set(view.group_by)
1186+
if group_keys != expected_keys:
1187+
raise HTTPException(
1188+
status_code=400,
1189+
detail=(
1190+
f"group keys {sorted(group_keys)} must exactly match "
1191+
f"view.group_by {sorted(expected_keys)}"
1192+
),
1193+
)
1194+
1195+
result = await summarize_partition_for_view(view, payload.group)
1196+
# Persist the result so it appears in materialized listings.
1197+
await save_partition_result(result)
1198+
return result
1199+
1200+
1201+
@router.get(
1202+
"/v1/summary-views/{view_id}/partitions",
1203+
response_model=list[SummaryViewPartitionResult],
1204+
)
1205+
async def list_summary_view_partitions(
1206+
view_id: str,
1207+
user_id: str | None = None,
1208+
namespace: str | None = None,
1209+
session_id: str | None = None,
1210+
memory_type: str | None = None,
1211+
current_user: UserInfo = Depends(get_current_user),
1212+
):
1213+
"""List materialized partition summaries for a SummaryView.
1214+
1215+
This does not trigger recomputation; it simply reads stored
1216+
SummaryViewPartitionResult entries from Redis. Optional query
1217+
parameters filter by group fields when present.
1218+
"""
1219+
1220+
view = await get_summary_view_config(view_id)
1221+
if view is None:
1222+
raise HTTPException(status_code=404, detail=f"SummaryView {view_id} not found")
1223+
1224+
group_filter: dict[str, str] = {}
1225+
if user_id is not None:
1226+
group_filter["user_id"] = user_id
1227+
if namespace is not None:
1228+
group_filter["namespace"] = namespace
1229+
if session_id is not None:
1230+
group_filter["session_id"] = session_id
1231+
if memory_type is not None:
1232+
group_filter["memory_type"] = memory_type
1233+
1234+
return await list_partition_results(view_id, group_filter or None)
1235+
1236+
1237+
@router.post("/v1/summary-views/{view_id}/run", response_model=Task)
1238+
async def run_summary_view_full(
1239+
view_id: str,
1240+
payload: RunSummaryViewRequest,
1241+
background_tasks: HybridBackgroundTasks,
1242+
current_user: UserInfo = Depends(get_current_user),
1243+
):
1244+
"""Trigger an asynchronous full recompute of all partitions for a view.
1245+
1246+
Returns a Task that can be polled for status. The actual work is
1247+
performed by a Docket worker running refresh_summary_view.
1248+
"""
1249+
1250+
view = await get_summary_view_config(view_id)
1251+
if view is None:
1252+
raise HTTPException(status_code=404, detail=f"SummaryView {view_id} not found")
1253+
1254+
task_id = payload.task_id or str(ULID())
1255+
task = Task(
1256+
id=task_id,
1257+
type=TaskTypeEnum.SUMMARY_VIEW_FULL_RUN,
1258+
status=TaskStatusEnum.PENDING,
1259+
view_id=view_id,
1260+
)
1261+
await create_task(task)
1262+
1263+
from agent_memory_server.summary_views import refresh_summary_view
1264+
1265+
background_tasks.add_task(refresh_summary_view, view_id=view_id, task_id=task_id)
1266+
return task
1267+
1268+
1269+
@router.get("/v1/tasks/{task_id}", response_model=Task)
1270+
async def get_task_status(
1271+
task_id: str,
1272+
current_user: UserInfo = Depends(get_current_user),
1273+
):
1274+
"""Get the status of a background Task by ID."""
1275+
1276+
task = await get_task(task_id)
1277+
if task is None:
1278+
raise HTTPException(status_code=404, detail=f"Task {task_id} not found")
1279+
return task

agent_memory_server/docket_tasks.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@
2121
update_last_accessed,
2222
)
2323
from agent_memory_server.summarization import summarize_session
24+
from agent_memory_server.summary_views import (
25+
periodic_refresh_summary_views,
26+
refresh_summary_view,
27+
)
2428

2529

2630
logger = logging.getLogger(__name__)
@@ -38,6 +42,8 @@
3842
forget_long_term_memories,
3943
periodic_forget_long_term_memories,
4044
update_last_accessed,
45+
refresh_summary_view,
46+
periodic_refresh_summary_views,
4147
]
4248

4349

0 commit comments

Comments
 (0)