Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions libs/langgraph-checkpoint-aws/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ poetry add langgraph-checkpoint-aws
## Requirements
```text
Python >=3.9
langgraph-checkpoint >=2.0.0
langgraph >=0.2.55
langgraph-checkpoint >=2.1.0
langgraph >=0.5.3
boto3 >=1.37.3
```

Expand Down
35 changes: 1 addition & 34 deletions libs/langgraph-checkpoint-aws/langgraph_checkpoint_aws/saver.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,27 +263,6 @@ def _get_checkpoint_step(
)
).invocation_step

def _get_task_sends(
self, thread_id: str, checkpoint_ns: str, parent_checkpoint_id: Optional[str]
) -> list:
"""Get sorted task sends for parent checkpoint.

Args:
thread_id: Session thread identifier
checkpoint_ns: Checkpoint namespace
parent_checkpoint_id: Parent checkpoint identifier

Returns:
Sorted list of task sends
"""
if not parent_checkpoint_id:
return []

pending_writes = self._get_checkpoint_pending_writes(
thread_id, checkpoint_ns, parent_checkpoint_id
)
return transform_pending_task_writes(pending_writes)

def get_tuple(self, config: RunnableConfig) -> Optional[CheckpointTuple]:
"""Retrieve a checkpoint tuple from the Bedrock session.

Expand Down Expand Up @@ -319,18 +298,11 @@ def get_tuple(self, config: RunnableConfig) -> Optional[CheckpointTuple]:
invocation_step.invocation_step_id,
)

task_sends = self._get_task_sends(
session_thread_id,
checkpoint_namespace,
session_checkpoint.parent_checkpoint_id,
)

return construct_checkpoint_tuple(
session_thread_id,
checkpoint_namespace,
session_checkpoint,
pending_write_ops,
task_sends,
self.serde,
)

Expand Down Expand Up @@ -455,7 +427,7 @@ def put_writes(

def list(
self,
config: Optional[RunnableConfig],
config: Optional[RunnableConfig] = None,
*,
filter: Optional[dict[str, Any]] = None,
before: Optional[RunnableConfig] = None,
Expand Down Expand Up @@ -553,15 +525,10 @@ def list(
checkpoint.checkpoint_id,
)

task_sends = self._get_task_sends(
thread_id, checkpoint.checkpoint_ns, checkpoint.parent_checkpoint_id
)

yield construct_checkpoint_tuple(
thread_id,
checkpoint.checkpoint_ns,
checkpoint,
pending_write_ops,
task_sends,
self.serde,
)
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ def construct_checkpoint_tuple(
checkpoint_ns: str,
session_checkpoint: SessionCheckpoint,
pending_writes: list[SessionPendingWrite],
sends: list,
serde: SerializerProtocol,
) -> CheckpointTuple:
"""Construct checkpoint tuple from components.
Expand All @@ -160,7 +159,6 @@ def construct_checkpoint_tuple(
checkpoint_ns: Checkpoint namespace
session_checkpoint: Checkpoint payload data
pending_writes: List of pending write operations
sends: List of task sends
serde: Protocol for serialization and deserialization of objects

Returns:
Expand All @@ -178,7 +176,6 @@ def construct_checkpoint_tuple(
Checkpoint,
{
**deserialize_from_base64(serde, *session_checkpoint.checkpoint),
"pending_sends": [serde.loads_typed(s[2]) for s in sends],
"channel_values": deserialize_from_base64(
serde, *session_checkpoint.channel_values
),
Expand Down Expand Up @@ -261,9 +258,6 @@ def create_session_checkpoint(
# Create copy to avoid modifying original checkpoint
checkpoint_copy = checkpoint.copy()

# Remove pending sends as they are handled separately
checkpoint_copy.pop("pending_sends")

# Extract required config values
thread_id = config["configurable"]["thread_id"]
checkpoint_ns = config["configurable"]["checkpoint_ns"]
Expand Down
22 changes: 11 additions & 11 deletions libs/langgraph-checkpoint-aws/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions libs/langgraph-checkpoint-aws/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ keywords = ["aws", "bedrock", "langchain", "langgraph", "checkpointer"]

[tool.poetry.dependencies]
python = ">=3.9,<4.0"
langgraph-checkpoint = ">=2.0.0"
langgraph = ">=0.2.55"
langgraph-checkpoint = ">=2.1.0"
langgraph = ">=0.5.3"
boto3 = ">=1.37.3"

[tool.poetry.group.dev]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,8 @@ def test_checkpoint_save_and_retrieve(self, boto_session_client, session_saver):
channel_values={"key": "value"},
channel_versions={},
versions_seen={},
pending_sends=[],
)
checkpoint_metadata = {"source": "input", "step": 1, "writes": {"key": "value"}}
checkpoint_metadata = {"source": "input", "step": 1}

try:
saved_config = session_saver.put(
Expand Down
2 changes: 0 additions & 2 deletions libs/langgraph-checkpoint-aws/tests/unit_tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,6 @@ def sample_checkpoint(sample_timestamp):
"node1": {"default": "v1", "tasks": "v2"},
"node2": {"results": "v1"},
},
pending_sends=[],
)


Expand All @@ -261,7 +260,6 @@ def sample_checkpoint_metadata(sample_timestamp):
return CheckpointMetadata(
source="input",
step=-1,
writes={"node1": ["write1", "write2"], "node2": {"key": "value"}},
parents={
"namespace1": "parent_checkpoint_1",
"namespace2": "parent_checkpoint_2",
Expand Down
56 changes: 0 additions & 56 deletions libs/langgraph-checkpoint-aws/tests/unit_tests/test_saver.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,58 +440,6 @@ def test__get_checkpoint_step_empty_without_checkpoint_id(
)
mock_boto_client.get_invocation_step.assert_not_called()

def test__get_task_sends_without_parent_checkpoint_id(
self, session_saver, sample_session_checkpoint
):
# Arrange
thread_id = "test_thread_id"
checkpoint_ns = "test_namespace"

# Act
result = session_saver._get_task_sends(thread_id, checkpoint_ns, None)

# Assert
assert result == []

def test__get_task_sends(
self, session_saver, sample_session_pending_write_with_sends
):
# Arrange
thread_id = "test_thread_id"
checkpoint_ns = "test_namespace"
parent_checkpoint_id = "test_parent_checkpoint_id"

session_saver._get_checkpoint_pending_writes = Mock(
return_value=sample_session_pending_write_with_sends
)

# Act
result = session_saver._get_task_sends(
thread_id, checkpoint_ns, parent_checkpoint_id
)

# Assert
assert result == [
["2", "__pregel_tasks", ["json", b"eyJrMiI6ICJ2MiJ9"], "/test2/path2", 1],
["3", "__pregel_tasks", ["json", b"eyJrMyI6ICJ2MyJ9"], "/test3/path3", 1],
]

def test__get_task_sends_empty(self, session_saver):
# Arrange
thread_id = "test_thread_id"
checkpoint_ns = "test_namespace"
parent_checkpoint_id = "test_parent_checkpoint_id"

session_saver._get_checkpoint_pending_writes = Mock(return_value=[])

# Act
result = session_saver._get_task_sends(
thread_id, checkpoint_ns, parent_checkpoint_id
)

# Assert
assert result == []

@patch("langgraph_checkpoint_aws.saver.construct_checkpoint_tuple")
def test_get_tuple_success(
self,
Expand All @@ -517,7 +465,6 @@ def test_get_tuple_success(
session_saver._get_checkpoint_pending_writes = Mock(
return_value=sample_session_pending_write_with_sends
)
session_saver._get_task_sends = Mock(return_value=[])
mock_construct_checkpoint.return_value = Mock(spec=CheckpointTuple)

# Act
Expand Down Expand Up @@ -730,7 +677,6 @@ def test_list_success(
)
)
session_saver._get_checkpoint_pending_writes = Mock(return_value=[])
session_saver._get_task_sends = Mock(return_value=[])
mock_construct_checkpoint.return_value = Mock(spec=CheckpointTuple)

# Act
Expand Down Expand Up @@ -801,7 +747,6 @@ def test_list_with_limit(
)
)
session_saver._get_checkpoint_pending_writes = Mock(return_value=[])
session_saver._get_task_sends = Mock(return_value=[])
mock_construct_checkpoint.return_value = Mock(spec=CheckpointTuple)

# Act
Expand Down Expand Up @@ -836,7 +781,6 @@ def test_list_with_filter(
)
)
session_saver._get_checkpoint_pending_writes = Mock(return_value=[])
session_saver._get_task_sends = Mock(return_value=[])
session_saver._construct_checkpoint_tuple = Mock(
return_value=Mock(spec=CheckpointTuple)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ def test__construct_checkpoint_tuple(
checkpoint_ns,
sample_session_checkpoint,
[sample_session_pending_write],
[],
serde,
)

Expand Down