-
Hi there, I had a huge spike in traffic in my server and ran out of Postgres connections directly. The docs show an example on how to setup the checkpointer connection directly to the db connection using a postgres pool. For my use case this is not good enough, I need to setup pgbouncer to scale the number of connections I can handle concurrently in a single server. Using the same code I had I am not able to set pgbouncer. I changed the db_uri to be the new pgbouncer link and added this to my connection: connection_kwargs = {
# CRITICAL: Disable prepared statements for PgBouncer transaction mode
# This prevents LangGraph's PostgresSaver from creating conflicting prepared statements
# when connections are reused across different client sessions
"prepare_threshold": 0,
# Connection behavior
"autocommit": True,
# Keepalive settings optimized for PgBouncer
"keepalives": 1,
"keepalives_idle": 600, # 10 minutes - longer since PgBouncer manages pooling
"keepalives_interval": 30, # 30 seconds - less frequent checks
"keepalives_count": 3, # Standard retry count
# TCP settings for PgBouncer connection (not direct to PostgreSQL)
"tcp_user_timeout": 30000, # 30 seconds - faster failover since connecting to PgBouncer
"connect_timeout": 10, # 10 seconds - faster since PgBouncer is local
# Application identification
"application_name": "websocket",
} I am using transactions not session for the pgbouncer. I am getting this error: PostgresSaver initialization failed: prepared statement "_pg3_0" already exists Any pointers on what I can do here? |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment
-
Short answer to my own question. Is yes they do but you need to use your own instance rather than the example in the docs with Postgres conn string. You need to setup the I reproduced this with a script and tested it. #!/usr/bin/env python3
"""
Simple test for PostgresSaver with connection string.
Tests the "prepared statement already exists" error.
"""
import os
import sys
import logging
from langgraph.checkpoint.postgres import PostgresSaver
from langgraph.checkpoint.base import Checkpoint
from psycopg_pool import ConnectionPool
# Add config path
sys.path.append(os.path.join(os.path.dirname(__file__), 'config'))
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def get_db_uri():
"""Get database URI from environment or config"""
# Try environment variable first
db_uri = os.getenv('POSTGRES_DB_URI')
if db_uri:
return db_uri
def test_postgres_saver():
"""Test PostgresSaver using from_conn_string"""
logger.info("Testing PostgresSaver with from_conn_string...")
db_uri = get_db_uri()
if not db_uri:
logger.error("No database URI found")
return False
pool = None
try:
logger.info(f"Using DB URI: {db_uri[:50]}...")
# Create connection pool with proper parameters
pool = ConnectionPool(
conninfo=db_uri,
max_size=10,
min_size=2,
kwargs={
"prepare_threshold": None, # Disable prepared statements
"autocommit": True
}
)
# Use PostgresSaver with the pool
checkpointer = PostgresSaver(pool)
logger.info("✓ PostgresSaver created successfully")
# Setup tables
checkpointer.setup()
logger.info("✓ PostgresSaver setup completed")
# Test checkpoint operations
config = {
"configurable": {
"thread_id": "test_thread_123",
"checkpoint_ns": "test"
}
}
checkpoint = Checkpoint(
v=1,
id="test_checkpoint_1",
ts="2024-01-01T00:00:00Z",
channel_values={"test": "data"},
channel_versions={"test": 1},
versions_seen={"test": {"test": 1}},
pending_sends=[]
)
# This is where the prepared statement error usually occurs
checkpointer.put(config, checkpoint, {}, {})
logger.info("✓ Checkpoint created successfully")
# Test retrieval
retrieved = checkpointer.get(config)
if retrieved:
logger.info("✓ Checkpoint retrieved successfully")
# Retrieved checkpoint is a dict, not a Checkpoint object
logger.info(f"Retrieved checkpoint ID: {retrieved.get('id', 'no id found')}")
else:
logger.warning("⚠ Checkpoint retrieval returned None")
logger.info("✓ PostgresSaver test PASSED")
return True
except Exception as e:
if "prepared statement" in str(e).lower():
logger.error(f"✗ PREPARED STATEMENT ERROR: {e}")
logger.error("The prepare_threshold=0 setting is not working!")
else:
logger.error(f"✗ PostgresSaver test FAILED: {e}")
return False
finally:
if pool:
pool.close()
def test_multiple_operations():
"""Test multiple operations that might trigger prepared statement conflicts"""
logger.info("Testing multiple operations...")
db_uri = get_db_uri()
if not db_uri:
logger.error("No database URI found")
return False
pool = None
try:
# Create connection pool with proper parameters
pool = ConnectionPool(
conninfo=db_uri,
max_size=10,
min_size=2,
kwargs={
"prepare_threshold": None, # Disable prepared statements
"autocommit": True
}
)
checkpointer = PostgresSaver(pool)
checkpointer.setup()
# Create multiple checkpoints
for i in range(5):
config = {
"configurable": {
"thread_id": f"test_thread_{i}",
"checkpoint_ns": "multiple_test"
}
}
checkpoint = Checkpoint(
v=1,
id=f"checkpoint_{i}",
ts=f"2024-01-01T00:00:0{i}Z",
channel_values={"test": i},
channel_versions={"test": 1},
versions_seen={"test": {"test": 1}},
pending_sends=[]
)
checkpointer.put(config, checkpoint, {}, {})
logger.info(f"✓ Checkpoint {i} created successfully")
logger.info("✓ Multiple operations test PASSED")
return True
except Exception as e:
if "prepared statement" in str(e).lower():
logger.error(f"✗ PREPARED STATEMENT ERROR: {e}")
else:
logger.error(f"✗ Multiple operations test FAILED: {e}")
return False
finally:
if pool:
pool.close()
def main():
"""Run all tests"""
logger.info("Starting PostgresSaver tests...")
logger.info("=" * 50)
results = []
# Test 1: Basic PostgresSaver
results.append(test_postgres_saver())
# Test 2: Multiple operations
results.append(test_multiple_operations())
# Summary
logger.info("=" * 50)
logger.info("TEST SUMMARY")
logger.info("=" * 50)
passed = sum(results)
total = len(results)
logger.info(f"Tests passed: {passed}/{total}")
if passed == total:
logger.info("🎉 ALL TESTS PASSED!")
else:
logger.info("⚠ Some tests failed - check logs above")
return passed == total
if __name__ == "__main__":
success = main()
sys.exit(0 if success else 1) |
Beta Was this translation helpful? Give feedback.
Short answer to my own question. Is yes they do but you need to use your own instance rather than the example in the docs with Postgres conn string. You need to setup the
prepare_threshold
to beNone
not0
. Setting to None completely disables prepare statements.I reproduced this with a script and tested it.