Skip to content

Bedrock checkpoint saver failing with pop pending_sends #546

@sdole

Description

@sdole

Checked other resources

  • This is a bug, not a usage question. For questions, please use the LangChain Forum (https://forum.langchain.com/).
  • I added a clear and detailed title that summarizes the issue.
  • I read what a minimal reproducible example is (https://stackoverflow.com/help/minimal-reproducible-example).
  • I included a self-contained, minimal example that demonstrates the issue INCLUDING all the relevant imports. The code run AS IS to reproduce the issue.

Example Code

chat_config = {"configurable": {"thread_id": get_or_create_bedrock_session(connection_id)}}
    for msg, metadata in chat_graph.stream(messages_json, config=chat_config, stream_mode="messages", ):
        # for msg, metadata in chat_graph.stream(messages_json, stream_mode="messages", ):
        logger.debug("New message received", extra={"marker": "start"})
        logger.debug("Message content", extra={"UGHH": str(msg)})
        logger.debug("Message metadata", extra={"metadata": msg.response_metadata})
        logger.debug("End of message processing", extra={"marker": "end"})
        msg: AIMessageChunk = msg
        if not msg.response_metadata.get("finish_reason", "NOT FINISHED") == "stop":
            yield msg.content
        else:
            logger.debug("Message finished", extra={"finish_reason": msg.response_metadata.get("finish_reason")})
            yield "------END OF RESPONSE------"

def get_or_create_bedrock_session(connection_id: str):
    """
    Look for a bedrock session ID in the connections table for the given connection.
    If none is found, create a new bedrock checkpointer session and update the item.
    Returns the session ID.
    """
    try:
        response = connection_table.get_item(Key={"connectionId": connection_id})
        if "Item" in response:
            # Connection exists, check if it has a bedrock session ID
            item = response["Item"]
            if "bedrockSessionId" in item and item["bedrockSessionId"]:
                logger.debug("Found existing bedrock session ID", extra={"session_id": item["bedrockSessionId"]})
                return item["bedrockSessionId"]

        logger.debug("No bedrock session ID found, creating a new one")
        bedrock_session = checkpointer.session_client.create_session()
        bedrock_session_id = bedrock_session.session_id
        logger.debug("Created new bedrock session ID", extra={"session_id": bedrock_session_id})

        response = connection_table.update_item(
            Key={"connectionId": connection_id},
            UpdateExpression="SET bedrockSessionId = :sessionId",
            ExpressionAttributeValues={":sessionId": bedrock_session_id},
            # Create the item if it doesn't exist
            ReturnValues="UPDATED_NEW"
        )
        logger.debug("Updated connection table with bedrock session ID", extra=response)

        return bedrock_session_id

    except ClientError as e:
        logger.exception(f"Error getting or creating bedrock session: {e}")
        logger.error(f"Error details: {traceback.format_exc()}")
        raise e
    except Exception as e:
        logger.exception(f"Unexpected error in get_or_create_bedrock_session: {e}")
        raise e

Error Message and Stack Trace (if applicable)

..\.venv\Lib\site-packages\langgraph\pregel\__init__.py:2467: in stream
    with SyncPregelLoop(
..\.venv\Lib\site-packages\langgraph\pregel\loop.py:1051: in __exit__
    return self.stack.__exit__(exc_type, exc_value, traceback)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
D:\Tools\uv\pythons\cpython-3.13.5-windows-x86_64-none\Lib\contextlib.py:619: in __exit__
    raise exc
D:\Tools\uv\pythons\cpython-3.13.5-windows-x86_64-none\Lib\contextlib.py:604: in __exit__
    if cb(*exc_details):
       ^^^^^^^^^^^^^^^^
..\.venv\Lib\site-packages\langgraph\pregel\executor.py:118: in __exit__
    task.result()
D:\Tools\uv\pythons\cpython-3.13.5-windows-x86_64-none\Lib\concurrent\futures\_base.py:449: in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
D:\Tools\uv\pythons\cpython-3.13.5-windows-x86_64-none\Lib\concurrent\futures\_base.py:401: in __get_result
    raise self._exception
..\.venv\Lib\site-packages\langgraph\pregel\loop.py:952: in _checkpointer_put_after_previous
    prev.result()
D:\Tools\uv\pythons\cpython-3.13.5-windows-x86_64-none\Lib\concurrent\futures\_base.py:449: in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
D:\Tools\uv\pythons\cpython-3.13.5-windows-x86_64-none\Lib\concurrent\futures\_base.py:401: in __get_result
    raise self._exception
..\.venv\Lib\site-packages\langgraph\pregel\executor.py:81: in done
    task.result()
D:\Tools\uv\pythons\cpython-3.13.5-windows-x86_64-none\Lib\concurrent\futures\_base.py:449: in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
D:\Tools\uv\pythons\cpython-3.13.5-windows-x86_64-none\Lib\concurrent\futures\_base.py:401: in __get_result
    raise self._exception
D:\Tools\uv\pythons\cpython-3.13.5-windows-x86_64-none\Lib\concurrent\futures\thread.py:59: in run
    result = self.fn(*self.args, **self.kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
..\.venv\Lib\site-packages\langgraph\pregel\loop.py:954: in _checkpointer_put_after_previous
    cast(BaseCheckpointSaver, self.checkpointer).put(
..\.venv\Lib\site-packages\langgraph_checkpoint_aws\saver.py:364: in put
    session_checkpoint = create_session_checkpoint(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

checkpoint = {'channel_values': {'__start__': {'messages': [{'content': 'hello', 'role': 'user'}]}}, 'channel_versions': {'__start__': 1}, 'id': '1f06690f-388f-6732-bfff-8fdf16046c32', 'ts': '2025-07-22T00:15:19.564874+00:00', ...}
config = {'callbacks': None, 'configurable': {'checkpoint_ns': '', 'thread_id': 'ad9415f5-6b95-471b-a403-26ffe25f7e6e'}, 'metadata': ChainMap({'thread_id': 'ad9415f5-6b95-471b-a403-26ffe25f7e6e'}), 'recursion_limit': 25, ...}
metadata = {'parents': {}, 'source': 'input', 'step': -1}
serializer = <langgraph.checkpoint.serde.jsonplus.JsonPlusSerializer object at 0x0000025DC6A1EA50>
new_versions = {'__start__': 1}

    def create_session_checkpoint(
        checkpoint: Checkpoint,
        config: RunnableConfig,
        metadata: CheckpointMetadata,
        serializer: SerializerProtocol,
        new_versions: ChannelVersions,
    ) -> SessionCheckpoint:
        """
        Create a SessionCheckpoint object from the given checkpoint and related data.
    
        This function processes the checkpoint, extracts necessary information from the config,
        and serializes various components to create a SessionCheckpoint object.
    
        Args:
            checkpoint (Checkpoint): The checkpoint to process.
            config (RunnableConfig): Configuration containing thread and checkpoint information.
            metadata (CheckpointMetadata): Metadata associated with the checkpoint.
            serializer (SerializerProtocol): Serializer for data conversion.
            new_versions (ChannelVersions): New versions of channel data.
    
        Returns:
            SessionCheckpoint: A SessionCheckpoint object containing the processed and serialized data.
        """
        # Create copy to avoid modifying original checkpoint
        checkpoint_copy = checkpoint.copy()
    
        # Remove pending sends as they are handled separately
>       checkpoint_copy.pop("pending_sends")
E       KeyError: 'pending_sends'

..\.venv\Lib\site-packages\langgraph_checkpoint_aws\utils.py:265: KeyError

Description

This is happening in a server environment and I am able to reproduce it in my workstation. I will try to post complete code. I wanted to ask if there is something that everyone knows and I don't.

This is for Bedrock Session Saver.

System Info

System Information

OS: Windows
OS Version: 10.0.26100
Python Version: 3.13.5 (main, Jun 12 2025, 12:42:35) [MSC v.1943 64 bit (AMD64)]

Package Information

langchain_core: 0.3.69
langsmith: 0.4.6
langchain_aws: 0.2.28
langchain_openai: 0.3.28
langgraph_checkpoint_aws: 0.1.0
langgraph_sdk: 0.1.73

Optional packages not installed

langserve

Other Dependencies

boto3: 1.39.5
httpx: 0.28.1
httpx>=0.25.2: Installed. No version info available.
jsonpatch<2.0,>=1.33: Installed. No version info available.
langchain-core<1.0.0,>=0.3.68: Installed. No version info available.
langgraph: 0.5.3
langgraph-checkpoint: 2.1.0
langsmith-pyo3: Installed. No version info available.
langsmith>=0.3.45: Installed. No version info available.
numpy: 2.3.1
openai-agents: Installed. No version info available.
openai<2.0.0,>=1.86.0: Installed. No version info available.
opentelemetry-api: Installed. No version info available.
opentelemetry-exporter-otlp-proto-http: Installed. No version info available.
opentelemetry-sdk: Installed. No version info available.
orjson: 3.11.0
orjson>=3.10.1: Installed. No version info available.
packaging: 25.0
packaging>=23.2: Installed. No version info available.
pydantic: 2.11.7
pydantic>=2.7.4: Installed. No version info available.
pytest: 8.4.1
PyYAML>=5.3: Installed. No version info available.
requests: 2.32.4
requests-toolbelt: 1.0.0
rich: Installed. No version info available.
tenacity!=8.4.0,<10.0.0,>=8.1.0: Installed. No version info available.
tiktoken<1,>=0.7: Installed. No version info available.
typing-extensions>=4.7: Installed. No version info available.
zstandard: 0.23.0

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions