Skip to content

Commit 01b8f45

Browse files
authored
fix(langgraph-checkpoint-aws): Add max_results and limit attributes to AgentCoreMemorySaver (#728)
Addresses #719. This PR updates the AgentCore memory checkpointer integration to: - Add an `max_results` attribute to `AgentCoreMemorySaver`, allowing control of the `maxResults` value passed to [ListEvents API](https://docs.aws.amazon.com/bedrock-agentcore/latest/APIReference/API_ListEvents.html). - Effective default will remain at the previously hardcoded limit of 100 results. - Add another `limit` attribute to control the maximum events parsed when paginating on the ListEvents response. - Effective default will move from 100 -> `None` (unlimited). - In `AgentCoreEventClient,get_events()`, exit from the response processing loop early if the number of events found exceeds the `limit` attribute value. If this occurs, a warning will be returned to the user.
1 parent 1f9f649 commit 01b8f45

File tree

2 files changed

+64
-9
lines changed

2 files changed

+64
-9
lines changed

libs/langgraph-checkpoint-aws/langgraph_checkpoint_aws/agentcore/helpers.py

Lines changed: 39 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import datetime
99
import json
1010
import logging
11+
import warnings
1112
from collections import defaultdict
1213
from typing import Any, cast
1314

@@ -184,22 +185,37 @@ def store_blob_events_batch(
184185
)
185186

186187
def get_events(
187-
self, session_id: str, actor_id: str, limit: int = 100
188+
self,
189+
session_id: str,
190+
actor_id: str,
191+
limit: int | None = None,
192+
max_results: int | None = 100,
188193
) -> list[EventType]:
189-
"""Retrieve events from AgentCore Memory."""
194+
"""Retrieve events from AgentCore Memory.
190195
191-
if limit is not None and limit <= 0:
196+
Args:
197+
session_id: The session ID to retrieve events for
198+
actor_id: The actor ID to retrieve events for
199+
limit: The maximum number of events to parse from ListEvents
200+
max_results: Maximum number of results to retrieve. Defaults to 100.
201+
202+
Returns:
203+
List of retrieved events
204+
"""
205+
206+
if max_results is not None and max_results <= 0:
192207
return []
193208

194209
all_events = []
195210
next_token = None
211+
limit_reached = False
196212

197213
while True:
198214
params = {
199215
"memoryId": self.memory_id,
200216
"actorId": actor_id,
201217
"sessionId": session_id,
202-
"maxResults": 100,
218+
"maxResults": max_results,
203219
"includePayloads": True,
204220
}
205221

@@ -218,8 +234,26 @@ def get_events(
218234
except EventDecodingError as e:
219235
logger.warning(f"Failed to decode event: {e}")
220236

237+
if limit is not None and len(all_events) >= limit:
238+
limit_reached = True
239+
break
240+
241+
if limit_reached:
242+
break
243+
221244
next_token = response.get("nextToken")
222-
if not next_token or (limit is not None and len(all_events) >= limit):
245+
246+
if limit_reached and next_token:
247+
warnings.warn(
248+
f"Stopped retrieving events at limit of {limit}. "
249+
f"There may be additional checkpoints that were not retrieved. "
250+
f"Consider increasing the limit parameter, or set None for no "
251+
f"limit.",
252+
UserWarning,
253+
stacklevel=2,
254+
)
255+
256+
if limit_reached or not next_token:
223257
break
224258

225259
return all_events

libs/langgraph-checkpoint-aws/langgraph_checkpoint_aws/agentcore/saver.py

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,26 +49,43 @@ class AgentCoreMemorySaver(BaseCheckpointSaver[str]):
4949
Args:
5050
memory_id: the ID of the memory resource created in AgentCore Memory
5151
serde: serialization protocol to be used. Defaults to JSONPlusSerializer
52+
limit: maximum number of events to parse from ListEvents.
53+
max_results: maximum number of results to retrieve from AgentCore Memory.
54+
Defaults to 100
5255
"""
5356

5457
def __init__(
5558
self,
5659
memory_id: str,
5760
*,
5861
serde: SerializerProtocol | None = None,
62+
limit: int | None = None,
63+
max_results: int | None = 100,
5964
**boto3_kwargs: Any,
6065
) -> None:
6166
super().__init__(serde=serde)
6267

6368
self.memory_id = memory_id
69+
self.limit = limit
70+
self.max_results = max_results
6471
self.serializer = EventSerializer(self.serde)
6572
self.checkpoint_event_client = AgentCoreEventClient(
6673
memory_id, self.serializer, **boto3_kwargs
6774
)
6875
self.processor = EventProcessor()
6976

70-
def get_tuple(self, config: RunnableConfig) -> CheckpointTuple | None:
71-
"""Get a checkpoint tuple from Bedrock AgentCore Memory."""
77+
def get_tuple(
78+
self,
79+
config: RunnableConfig,
80+
) -> CheckpointTuple | None:
81+
"""Get a checkpoint tuple from Bedrock AgentCore Memory.
82+
83+
Args:
84+
config: The runnable config containing checkpoint information
85+
86+
Returns:
87+
CheckpointTuple if found, None otherwise
88+
"""
7289

7390
# TODO: There is room for caching here on the client side
7491

@@ -77,7 +94,10 @@ def get_tuple(self, config: RunnableConfig) -> CheckpointTuple | None:
7794
)
7895

7996
events = self.checkpoint_event_client.get_events(
80-
checkpoint_config.session_id, checkpoint_config.actor_id
97+
checkpoint_config.session_id,
98+
checkpoint_config.actor_id,
99+
self.limit,
100+
self.max_results,
81101
)
82102

83103
checkpoints, writes_by_checkpoint, channel_data = self.processor.process_events(
@@ -122,7 +142,8 @@ def list(
122142
events = self.checkpoint_event_client.get_events(
123143
checkpoint_config.session_id,
124144
checkpoint_config.actor_id,
125-
100 if limit is None else limit,
145+
limit,
146+
self.max_results,
126147
)
127148

128149
checkpoints, writes_by_checkpoint, channel_data = self.processor.process_events(

0 commit comments

Comments
 (0)