Skip to content
247 changes: 247 additions & 0 deletions agent_memory_server/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from fastapi import APIRouter, Depends, Header, HTTPException, Query
from mcp.server.fastmcp.prompts import base
from mcp.types import TextContent
from ulid import ULID

from agent_memory_server import long_term_memory, working_memory
from agent_memory_server.auth import UserInfo, get_current_user
Expand All @@ -16,6 +17,7 @@
from agent_memory_server.models import (
AckResponse,
CreateMemoryRecordRequest,
CreateSummaryViewRequest,
EditMemoryRecordRequest,
GetSessionsQuery,
MemoryMessage,
Expand All @@ -24,14 +26,30 @@
MemoryRecord,
MemoryRecordResultsResponse,
ModelNameLiteral,
RunSummaryViewPartitionRequest,
RunSummaryViewRequest,
SearchRequest,
SessionListResponse,
SummaryView,
SummaryViewPartitionResult,
SystemMessage,
Task,
TaskStatusEnum,
TaskTypeEnum,
UpdateWorkingMemory,
WorkingMemory,
WorkingMemoryResponse,
)
from agent_memory_server.summarization import _incremental_summary
from agent_memory_server.summary_views import (
get_summary_view as get_summary_view_config,
list_partition_results,
list_summary_views,
save_partition_result,
save_summary_view,
summarize_partition_for_view,
)
from agent_memory_server.tasks import create_task, get_task
from agent_memory_server.utils.redis import get_redis_conn


Expand Down Expand Up @@ -1030,3 +1048,232 @@ async def memory_prompt(
)

return MemoryPromptResponse(messages=_messages)


def _validate_summary_view_keys(payload: CreateSummaryViewRequest) -> None:
"""Validate group_by and filter keys for a SummaryView.

For v1 we explicitly restrict these keys to a small, known set so we can
implement execution safely. We also currently only support long-term
memory as the source for SummaryViews.
"""

if payload.source != "long_term":
raise HTTPException(
status_code=400,
detail=(
"SummaryView.source must be 'long_term' for now; "
"'working_memory' is not yet supported."
Comment on lines +1109 to +1110
Copy link

Copilot AI Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error message format is inconsistent with other validation error messages in the function. The other validation errors use parentheses and "Unsupported" prefix (e.g., "Unsupported group_by fields:"), while this one uses "must be" phrasing. For consistency, consider using a similar format like "SummaryView source must be 'long_term'; 'working_memory' is not yet supported" or reformatting all messages to use a consistent style.

Suggested change
"SummaryView.source must be 'long_term' for now; "
"'working_memory' is not yet supported."
f"Unsupported SummaryView.source: {payload.source!r} "
"(only 'long_term' is supported)."

Copilot uses AI. Check for mistakes.
),
)

allowed_group_by = {"user_id", "namespace", "session_id", "memory_type"}
allowed_filters = {
"user_id",
"namespace",
"session_id",
"memory_type",
}

invalid_group = [k for k in payload.group_by if k not in allowed_group_by]
if invalid_group:
raise HTTPException(
status_code=400,
detail=("Unsupported group_by fields: " + ", ".join(sorted(invalid_group))),
)
Comment on lines +1122 to +1127
Copy link

Copilot AI Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing validation that group_by fields are not duplicated. If a user creates a SummaryView with group_by containing duplicate fields like ["user_id", "user_id"], this could lead to unexpected behavior in partition key encoding and group matching. Add validation to ensure all group_by fields are unique.

Copilot uses AI. Check for mistakes.

invalid_filters = [k for k in payload.filters if k not in allowed_filters]
if invalid_filters:
raise HTTPException(
status_code=400,
detail=("Unsupported filter fields: " + ", ".join(sorted(invalid_filters))),
)


Comment on lines +1135 to +1136
Copy link

Copilot AI Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing validation that filter keys and group_by keys don't conflict. If a user specifies the same field in both filters and group_by (e.g., filters={"user_id": "alice"} and group_by=["user_id"]), the filter would be overridden by the group value in _build_long_term_filters_for_view. This could lead to unexpected results. Add validation to prevent this configuration or document the precedence behavior.

Suggested change
# Disallow configurations where the same field is used in both filters and group_by.
conflicting_keys = sorted(set(payload.group_by) & set(payload.filters.keys()))
if conflicting_keys:
raise HTTPException(
status_code=400,
detail=(
"SummaryView.group_by and SummaryView.filters may not use the same "
"fields: " + ", ".join(conflicting_keys)
),
)

Copilot uses AI. Check for mistakes.
@router.post("/v1/summary-views", response_model=SummaryView)
async def create_summary_view(
payload: CreateSummaryViewRequest,
current_user: UserInfo = Depends(get_current_user),
):
"""Create a new SummaryView configuration.

The server assigns an ID; the configuration can then be run on-demand or
by background workers.
"""

_validate_summary_view_keys(payload)

view = SummaryView(
id=str(ULID()),
name=payload.name,
source=payload.source,
group_by=payload.group_by,
filters=payload.filters,
time_window_days=payload.time_window_days,
continuous=payload.continuous,
prompt=payload.prompt,
model_name=payload.model_name,
)

await save_summary_view(view)
return view


@router.get("/v1/summary-views", response_model=list[SummaryView])
async def list_summary_views_endpoint(
current_user: UserInfo = Depends(get_current_user),
):
"""List all registered SummaryViews.

Filtering by source/continuous can be added later if needed.
"""

return await list_summary_views()


@router.get("/v1/summary-views/{view_id}", response_model=SummaryView)
async def get_summary_view(
view_id: str,
current_user: UserInfo = Depends(get_current_user),
):
"""Get a SummaryView configuration by ID."""

view = await get_summary_view_config(view_id)
if view is None:
raise HTTPException(status_code=404, detail=f"SummaryView {view_id} not found")
return view


@router.delete("/v1/summary-views/{view_id}", response_model=AckResponse)
async def delete_summary_view_endpoint(
view_id: str,
current_user: UserInfo = Depends(get_current_user),
):
"""Delete a SummaryView configuration.

Stored partition summaries are left as-is for now.
"""

from agent_memory_server.summary_views import delete_summary_view

await delete_summary_view(view_id)
return AckResponse(status="ok")
Copy link

Copilot AI Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The delete endpoint doesn't remove partition summaries, which could lead to orphaned data accumulation. The docstring acknowledges this ("Stored partition summaries are left as-is for now"), but this creates a maintenance burden and potential confusion for users who expect deletion to be complete. Consider implementing cleanup of partition summaries, or at minimum, documenting this limitation in the API response or endpoint description.

Suggested change
return AckResponse(status="ok")
return AckResponse(
status=(
"ok (SummaryView deleted; stored partition summaries were not removed)"
)
)

Copilot uses AI. Check for mistakes.


@router.post(
"/v1/summary-views/{view_id}/partitions/run",
response_model=SummaryViewPartitionResult,
)
async def run_summary_view_partition(
view_id: str,
payload: RunSummaryViewPartitionRequest,
current_user: UserInfo = Depends(get_current_user),
):
"""Synchronously compute a summary for a single partition of a view.

For long-term memory views this will query the underlying memories
and run a real summarization. For other sources it currently returns
a placeholder summary.
"""

view = await get_summary_view_config(view_id)
if view is None:
raise HTTPException(status_code=404, detail=f"SummaryView {view_id} not found")

# Ensure the provided group keys match the view's group_by definition.
group_keys = set(payload.group.keys())
expected_keys = set(view.group_by)
if group_keys != expected_keys:
raise HTTPException(
status_code=400,
detail=(
f"group keys {sorted(group_keys)} must exactly match "
f"view.group_by {sorted(expected_keys)}"
),
)

result = await summarize_partition_for_view(view, payload.group)
# Persist the result so it appears in materialized listings.
await save_partition_result(result)
return result


@router.get(
"/v1/summary-views/{view_id}/partitions",
response_model=list[SummaryViewPartitionResult],
)
async def list_summary_view_partitions(
view_id: str,
user_id: str | None = None,
namespace: str | None = None,
session_id: str | None = None,
memory_type: str | None = None,
current_user: UserInfo = Depends(get_current_user),
):
"""List materialized partition summaries for a SummaryView.

This does not trigger recomputation; it simply reads stored
SummaryViewPartitionResult entries from Redis. Optional query
parameters filter by group fields when present.
"""

view = await get_summary_view_config(view_id)
if view is None:
raise HTTPException(status_code=404, detail=f"SummaryView {view_id} not found")

group_filter: dict[str, str] = {}
if user_id is not None:
group_filter["user_id"] = user_id
if namespace is not None:
group_filter["namespace"] = namespace
if session_id is not None:
group_filter["session_id"] = session_id
if memory_type is not None:
group_filter["memory_type"] = memory_type

Copy link

Copilot AI Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The list_summary_view_partitions endpoint allows filtering by any combination of user_id, namespace, session_id, and memory_type, but doesn't validate that these fields are actually in the view's group_by definition. This could lead to confusing empty results when users filter by a field that isn't part of the grouping. Consider validating that filter parameters correspond to fields in view.group_by, or document this behavior clearly.

Suggested change
# Validate that all requested filter fields are part of the view's group_by definition.
# This avoids confusing empty results when filtering on a field that is not grouped.
if group_filter:
allowed_group_fields = list(getattr(view, "group_by", []) or [])
invalid_fields = [key for key in group_filter.keys() if key not in allowed_group_fields]
if invalid_fields:
raise HTTPException(
status_code=400,
detail=(
"Invalid partition filter field(s): "
f"{', '.join(invalid_fields)}. "
"Filters must correspond to fields in the SummaryView's group_by: "
f"{', '.join(allowed_group_fields) if allowed_group_fields else '(none)'}."
),
)

Copilot uses AI. Check for mistakes.
return await list_partition_results(view_id, group_filter or None)
Copy link

Copilot AI Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The expression "group_filter or None" at line 1278 is redundant. If group_filter is an empty dict, it's truthy, so this expression will always pass group_filter (never None). If the intent is to pass None when the dict is empty, use "group_filter if group_filter else None" or just pass group_filter directly since the function handles empty dicts correctly.

Copilot uses AI. Check for mistakes.


@router.post("/v1/summary-views/{view_id}/run", response_model=Task)
async def run_summary_view_full(
view_id: str,
payload: RunSummaryViewRequest,
background_tasks: HybridBackgroundTasks,
current_user: UserInfo = Depends(get_current_user),
):
"""Trigger an asynchronous full recompute of all partitions for a view.

Returns a Task that can be polled for status. The actual work is
performed by a Docket worker running refresh_summary_view.
"""

view = await get_summary_view_config(view_id)
if view is None:
raise HTTPException(status_code=404, detail=f"SummaryView {view_id} not found")

task_id = payload.task_id or str(ULID())
task = Task(
id=task_id,
type=TaskTypeEnum.SUMMARY_VIEW_FULL_RUN,
status=TaskStatusEnum.PENDING,
view_id=view_id,
)
await create_task(task)

from agent_memory_server.summary_views import refresh_summary_view

background_tasks.add_task(refresh_summary_view, view_id=view_id, task_id=task_id)
return task


@router.get("/v1/tasks/{task_id}", response_model=Task)
async def get_task_status(
task_id: str,
current_user: UserInfo = Depends(get_current_user),
):
"""Get the status of a background Task by ID."""

task = await get_task(task_id)
if task is None:
raise HTTPException(status_code=404, detail=f"Task {task_id} not found")
return task
6 changes: 6 additions & 0 deletions agent_memory_server/docket_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
update_last_accessed,
)
from agent_memory_server.summarization import summarize_session
from agent_memory_server.summary_views import (
periodic_refresh_summary_views,
refresh_summary_view,
)


logger = logging.getLogger(__name__)
Expand All @@ -38,6 +42,8 @@
forget_long_term_memories,
periodic_forget_long_term_memories,
update_last_accessed,
refresh_summary_view,
periodic_refresh_summary_views,
]


Expand Down
Loading
Loading