Skip to content

Commit d104269

Browse files
committed
feat(store): enhance PostgresStore to support shared pg.Pool instances
- Updated PostgresStoreConfig to accept a preconfigured pg.Pool. - Added fromPool static method for creating PostgresStore instances from an existing pool. - Modified connection handling to allow sharing a pool between PostgresStore and PostgresSaver. - Updated stop method to conditionally close the pool based on ownership. - Added integration tests for pool sharing functionality.
1 parent d066129 commit d104269

File tree

4 files changed

+221
-9
lines changed

4 files changed

+221
-9
lines changed
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
---
2+
"@langchain/langgraph-checkpoint-postgres": minor
3+
---
4+
5+
Add support for passing a preconfigured `pg.Pool` to `PostgresStore`, enabling a single connection pool to be shared between `PostgresSaver` and `PostgresStore` for better resource management.
6+
7+
**What changed:**
8+
9+
- `PostgresStoreConfig` now accepts an optional `pool` field for passing a preconfigured `pg.Pool` instance directly. If both `pool` and `connectionOptions` are provided, `pool` takes precedence.
10+
- Added a new `PostgresStore.fromPool(pool, options?)` static factory method for convenience.
11+
- When an external pool is provided, `store.stop()` will no longer close the pool — the caller retains ownership of the pool lifecycle.
12+
13+
**Why:**
14+
15+
Previously, `PostgresStore` always created its own internal pool, which meant a server using both a `PostgresSaver` and a `PostgresStore` would hold two separate connection pools. This made it impossible to share connections across use cases and led to unnecessary resource consumption.
16+
17+
**How to update your code:**
18+
19+
```typescript
20+
import pg from "pg";
21+
import { PostgresSaver } from "@langchain/langgraph-checkpoint-postgres";
22+
import { PostgresStore } from "@langchain/langgraph-checkpoint-postgres/store";
23+
24+
// Create a single shared pool
25+
const pool = new pg.Pool({ connectionString: "postgresql://..." });
26+
27+
// Share it between the saver and the store
28+
const saver = new PostgresSaver(pool);
29+
const store = PostgresStore.fromPool(pool);
30+
31+
// Or pass it via the constructor
32+
const store2 = new PostgresStore({ pool });
33+
34+
await saver.setup();
35+
await store.setup();
36+
```

libs/checkpoint-postgres/src/store/index.ts

Lines changed: 64 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -54,14 +54,27 @@ export class PostgresStore extends BaseStore {
5454

5555
private ensureTables: boolean;
5656

57+
private poolOwned: boolean;
58+
5759
constructor(config: PostgresStoreConfig) {
5860
super();
5961

60-
// Create connection pool
61-
const pool =
62-
typeof config.connectionOptions === "string"
63-
? new Pool({ connectionString: config.connectionOptions })
64-
: new Pool(config.connectionOptions);
62+
// Use the provided pool directly, or create one from connectionOptions.
63+
let pool: pg.Pool;
64+
if (config.pool) {
65+
pool = config.pool;
66+
this.poolOwned = false;
67+
} else if (config.connectionOptions) {
68+
pool =
69+
typeof config.connectionOptions === "string"
70+
? new Pool({ connectionString: config.connectionOptions })
71+
: new Pool(config.connectionOptions);
72+
this.poolOwned = true;
73+
} else {
74+
throw new Error(
75+
'PostgresStoreConfig requires either "pool" or "connectionOptions".'
76+
);
77+
}
6578

6679
// Initialize core and modules
6780
this.core = new DatabaseCore(
@@ -186,14 +199,51 @@ export class PostgresStore extends BaseStore {
186199
*/
187200
static fromConnString(
188201
connectionString: string,
189-
options?: Omit<PostgresStoreConfig, "connectionOptions">
202+
options?: Omit<PostgresStoreConfig, "pool" | "connectionOptions">
190203
): PostgresStore {
191204
return new PostgresStore({
192205
connectionOptions: connectionString,
193206
...options,
194207
});
195208
}
196209

210+
/**
211+
* Creates a PostgresStore instance from a preconfigured pg.Pool.
212+
*
213+
* This enables sharing a single connection pool between PostgresSaver and
214+
* PostgresStore for better resource management.
215+
*
216+
* The pool will NOT be closed when the store is stopped -- the caller
217+
* retains ownership of the pool lifecycle.
218+
*
219+
* @param pool - A preconfigured pg.Pool instance.
220+
* @param options - Optional configuration (schema, TTL, index, etc.).
221+
* @returns A new PostgresStore instance backed by the provided pool.
222+
*
223+
* @example
224+
* ```typescript
225+
* import pg from "pg";
226+
* import { PostgresSaver } from "@langchain/langgraph-checkpoint-postgres";
227+
* import { PostgresStore } from "@langchain/langgraph-checkpoint-postgres/store";
228+
*
229+
* const pool = new pg.Pool({ connectionString: "postgresql://..." });
230+
* const saver = new PostgresSaver(pool);
231+
* const store = PostgresStore.fromPool(pool);
232+
*
233+
* await saver.setup();
234+
* await store.setup();
235+
* ```
236+
*/
237+
static fromPool(
238+
pool: pg.Pool,
239+
options?: Omit<PostgresStoreConfig, "pool" | "connectionOptions">
240+
): PostgresStore {
241+
return new PostgresStore({
242+
pool,
243+
...options,
244+
});
245+
}
246+
197247
/**
198248
* Initialize the store by running migrations to create necessary tables and indexes.
199249
*/
@@ -374,13 +424,19 @@ export class PostgresStore extends BaseStore {
374424
}
375425

376426
/**
377-
* Stop the store and close all database connections.
427+
* Stop the store and close database connections.
428+
*
429+
* If the pool was provided externally (e.g. via a pg.Pool instance in
430+
* connectionOptions or via {@link fromPool}), the pool will NOT be closed
431+
* because the caller retains ownership of the pool lifecycle.
378432
*/
379433
async stop(): Promise<void> {
380434
if (this.isClosed) return;
381435

382436
this.ttlManager.stop();
383-
await this.core.pool.end();
437+
if (this.poolOwned) {
438+
await this.core.pool.end();
439+
}
384440
this.isClosed = true;
385441
}
386442

libs/checkpoint-postgres/src/store/modules/types.ts

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,8 +181,31 @@ export interface PutOptions {
181181
}
182182

183183
export interface PostgresStoreConfig {
184+
/**
185+
* A preconfigured pg.Pool instance.
186+
*
187+
* When provided, this pool will be used directly and will NOT be closed
188+
* when the store is stopped -- the caller retains ownership of the pool
189+
* lifecycle. This enables sharing a single pool between PostgresSaver
190+
* and PostgresStore for better resource management.
191+
*
192+
* If both `pool` and `connectionOptions` are provided, `pool` takes
193+
* precedence.
194+
*
195+
* @example
196+
* ```typescript
197+
* const pool = new Pool({ connectionString: "postgresql://..." });
198+
* const saver = new PostgresSaver(pool);
199+
* const store = new PostgresStore({ pool });
200+
* ```
201+
*/
202+
pool?: pg.Pool;
203+
184204
/**
185205
* PostgreSQL connection string or connection configuration object.
206+
* A new pool will be created internally from these options.
207+
*
208+
* Ignored if `pool` is provided.
186209
*
187210
* @example
188211
* // Connection string
@@ -197,7 +220,7 @@ export interface PostgresStoreConfig {
197220
* password: "password"
198221
* }
199222
*/
200-
connectionOptions: string | pg.PoolConfig;
223+
connectionOptions?: string | pg.PoolConfig;
201224

202225
/**
203226
* Database schema name to use for store tables.

libs/checkpoint-postgres/src/tests/store.int.test.ts

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1527,3 +1527,100 @@ describe("PostgresStore Migration System", () => {
15271527
}
15281528
});
15291529
});
1530+
1531+
describe("PostgresStore Pool Sharing", () => {
1532+
let sharedPool: pg.Pool;
1533+
let dbName: string;
1534+
let dbConnectionString: string;
1535+
1536+
beforeEach(async () => {
1537+
dbName = `pool_share_test_${Date.now()}_${Math.floor(
1538+
Math.random() * 1000
1539+
)}`;
1540+
const adminPool = new Pool({ connectionString: TEST_POSTGRES_URL });
1541+
try {
1542+
await adminPool.query(`CREATE DATABASE ${dbName}`);
1543+
} finally {
1544+
await adminPool.end();
1545+
}
1546+
dbConnectionString = `${TEST_POSTGRES_URL!
1547+
.split("/")
1548+
.slice(0, -1)
1549+
.join("/")}/${dbName}`;
1550+
sharedPool = new Pool({ connectionString: dbConnectionString });
1551+
});
1552+
1553+
afterEach(async () => {
1554+
try {
1555+
await sharedPool.end();
1556+
} catch {
1557+
// Pool may already be ended; ignore.
1558+
}
1559+
const adminPool = new Pool({ connectionString: TEST_POSTGRES_URL });
1560+
try {
1561+
await adminPool.query(`DROP DATABASE IF EXISTS ${dbName} WITH (FORCE)`);
1562+
} finally {
1563+
await adminPool.end();
1564+
}
1565+
}, 30_000);
1566+
1567+
it("should accept a pg.Pool via the constructor", async () => {
1568+
const store = new PostgresStore({ pool: sharedPool });
1569+
await store.setup();
1570+
1571+
await store.put(["pool-test"], "key1", { data: "hello" });
1572+
const item = await store.get(["pool-test"], "key1");
1573+
expect(item).toBeDefined();
1574+
expect(item?.value).toEqual({ data: "hello" });
1575+
1576+
await store.stop();
1577+
});
1578+
1579+
it("should accept a pg.Pool via fromPool factory", async () => {
1580+
const store = PostgresStore.fromPool(sharedPool);
1581+
await store.setup();
1582+
1583+
await store.put(["pool-test"], "key1", { data: "fromPool" });
1584+
const item = await store.get(["pool-test"], "key1");
1585+
expect(item).toBeDefined();
1586+
expect(item?.value).toEqual({ data: "fromPool" });
1587+
1588+
await store.stop();
1589+
});
1590+
1591+
it("should not close an externally-provided pool on stop()", async () => {
1592+
const store = PostgresStore.fromPool(sharedPool);
1593+
await store.setup();
1594+
1595+
await store.put(["pool-test"], "key1", { data: "before-stop" });
1596+
await store.stop();
1597+
1598+
// The shared pool should still be usable after the store is stopped
1599+
const result = await sharedPool.query("SELECT 1 AS val");
1600+
expect(result.rows[0].val).toBe(1);
1601+
});
1602+
1603+
it("should allow a shared pool between two PostgresStore instances", async () => {
1604+
const store1 = PostgresStore.fromPool(sharedPool, { schema: "shared_a" });
1605+
const store2 = PostgresStore.fromPool(sharedPool, { schema: "shared_b" });
1606+
1607+
await store1.setup();
1608+
await store2.setup();
1609+
1610+
await store1.put(["ns"], "k1", { from: "store1" });
1611+
await store2.put(["ns"], "k1", { from: "store2" });
1612+
1613+
const item1 = await store1.get(["ns"], "k1");
1614+
const item2 = await store2.get(["ns"], "k1");
1615+
1616+
expect(item1?.value).toEqual({ from: "store1" });
1617+
expect(item2?.value).toEqual({ from: "store2" });
1618+
1619+
// Stop both stores -- the shared pool should remain open
1620+
await store1.stop();
1621+
await store2.stop();
1622+
1623+
const result = await sharedPool.query("SELECT 1 AS val");
1624+
expect(result.rows[0].val).toBe(1);
1625+
});
1626+
});

0 commit comments

Comments
 (0)