Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .changeset/mongodb-ttl-support.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
"@langchain/langgraph-checkpoint-mongodb": minor
---

Add TTL support for automatic checkpoint expiration

- Add optional `ttl` parameter to MongoDBSaver (value in seconds)
- Add `setup()` method to create TTL indexes on collections
- Add `upserted_at` timestamp to documents when TTL is enabled
- Each write refreshes TTL (expires after inactivity, not creation)
27 changes: 27 additions & 0 deletions libs/checkpoint-mongodb/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,30 @@ for await (const checkpoint of checkpointer.list(readConfig)) {

await client.close();
```

## TTL (Time-To-Live) Support

Automatically expire old checkpoints using MongoDB's TTL indexes:

```ts
import { MongoClient } from "mongodb";
import { MongoDBSaver } from "@langchain/langgraph-checkpoint-mongodb";

const client = new MongoClient(process.env.MONGODB_URL);

// Create checkpointer with 1-hour TTL (in seconds)
const checkpointer = new MongoDBSaver({
client,
ttl: 3600,
});

// Create TTL indexes (call during deployment/startup)
await checkpointer.setup();
```

When TTL is enabled:
- An `upserted_at` timestamp is added to each document on every write
- MongoDB automatically deletes documents after the TTL expires
- Each update resets the expiration timer

The `setup()` method creates the required TTL indexes. Call it during application startup or deployment. It is idempotent and handles concurrent calls safely.
40 changes: 39 additions & 1 deletion libs/checkpoint-mongodb/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ export type MongoDBSaverParams = {
dbName?: string;
checkpointCollectionName?: string;
checkpointWritesCollectionName?: string;
ttl?: number; // Time to live in seconds
};

/**
Expand All @@ -26,6 +27,8 @@ export class MongoDBSaver extends BaseCheckpointSaver {

protected db: MongoDatabase;

protected ttl?: number;

checkpointCollectionName = "checkpoints";

checkpointWritesCollectionName = "checkpoint_writes";
Expand All @@ -36,6 +39,7 @@ export class MongoDBSaver extends BaseCheckpointSaver {
dbName,
checkpointCollectionName,
checkpointWritesCollectionName,
ttl,
}: MongoDBSaverParams,
serde?: SerializerProtocol
) {
Expand All @@ -45,12 +49,38 @@ export class MongoDBSaver extends BaseCheckpointSaver {
name: "langgraphjs_checkpoint_saver",
});
this.db = this.client.db(dbName);
this.ttl = ttl;
this.checkpointCollectionName =
checkpointCollectionName ?? this.checkpointCollectionName;
this.checkpointWritesCollectionName =
checkpointWritesCollectionName ?? this.checkpointWritesCollectionName;
}

/**
* Creates TTL indexes on the checkpoint collections if TTL is configured.
* This method is idempotent and safe to call multiple times.
* Returns an array of errors (empty if successful) so caller can decide how to handle.
*/
async setup(): Promise<Error[]> {
if (this.ttl == null) return [];

const ttlIndex = { upserted_at: 1 };
const options = { expireAfterSeconds: this.ttl };

const results = await Promise.allSettled([
this.db
.collection(this.checkpointCollectionName)
.createIndex(ttlIndex, options),
this.db
.collection(this.checkpointWritesCollectionName)
.createIndex(ttlIndex, options),
]);

return results
.filter((r): r is PromiseRejectedResult => r.status === "rejected")
.map((r) => r.reason as Error);
}

/**
* Retrieves a checkpoint from the MongoDB database based on the
* provided config. If the config contains a "checkpoint_id" key, the checkpoint with
Expand Down Expand Up @@ -237,6 +267,7 @@ export class MongoDBSaver extends BaseCheckpointSaver {
type: checkpointType,
checkpoint: serializedCheckpoint,
metadata: serializedMetadata,
...(this.ttl != null && { upserted_at: new Date() }),
};
const upsertQuery = {
thread_id,
Expand Down Expand Up @@ -292,7 +323,14 @@ export class MongoDBSaver extends BaseCheckpointSaver {
return {
updateOne: {
filter: upsertQuery,
update: { $set: { channel, type, value: serializedValue } },
update: {
$set: {
channel,
type,
value: serializedValue,
...(this.ttl != null && { upserted_at: new Date() }),
},
},
upsert: true,
},
};
Expand Down
133 changes: 130 additions & 3 deletions libs/checkpoint-mongodb/src/tests/checkpoints.int.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { describe, it, expect, afterAll } from "vitest";
import { describe, it, expect, afterAll, afterEach } from "vitest";
import { MongoClient } from "mongodb";
import {
Checkpoint,
Expand Down Expand Up @@ -49,8 +49,10 @@ const client = new MongoClient(getEnvironmentVariable("MONGODB_URL")!, {

afterAll(async () => {
const db = client.db();
await db.dropCollection("checkpoints");
await db.dropCollection("checkpoint_writes");
await db.dropCollection("checkpoints").catch(() => {});
await db.dropCollection("checkpoint_writes").catch(() => {});
await db.dropCollection("checkpoints_ttl").catch(() => {});
await db.dropCollection("checkpoint_writes_ttl").catch(() => {});
await client.close();
});

Expand Down Expand Up @@ -171,4 +173,129 @@ describe("MongoDBSaver", () => {
await saver.getTuple({ configurable: { thread_id: "2" } })
).toBeDefined();
});

describe("TTL support", () => {
const ttlCheckpointCollection = "checkpoints_ttl";
const ttlWritesCollection = "checkpoint_writes_ttl";

afterEach(async () => {
const db = client.db();
await db.collection(ttlCheckpointCollection).deleteMany({});
await db.collection(ttlWritesCollection).deleteMany({});
});

it("should create TTL indexes on setup()", async () => {
const saver = new MongoDBSaver({
client,
ttl: 3600,
checkpointCollectionName: ttlCheckpointCollection,
checkpointWritesCollectionName: ttlWritesCollection,
});

await saver.setup();

const db = client.db();
const checkpointIndexes = await db
.collection(ttlCheckpointCollection)
.indexes();
const writesIndexes = await db.collection(ttlWritesCollection).indexes();

const checkpointTtlIndex = checkpointIndexes.find(
(idx) => idx.key?.upserted_at === 1
);
const writesTtlIndex = writesIndexes.find(
(idx) => idx.key?.upserted_at === 1
);

expect(checkpointTtlIndex).toBeDefined();
expect(checkpointTtlIndex?.expireAfterSeconds).toBe(3600);
expect(writesTtlIndex).toBeDefined();
expect(writesTtlIndex?.expireAfterSeconds).toBe(3600);
});

it("should add upserted_at field to checkpoints when TTL is enabled", async () => {
const saver = new MongoDBSaver({
client,
ttl: 3600,
checkpointCollectionName: ttlCheckpointCollection,
checkpointWritesCollectionName: ttlWritesCollection,
});

const beforePut = new Date();
await saver.put(
{ configurable: { thread_id: "ttl-test-1" } },
checkpoint1,
{ source: "update", step: -1, parents: {} }
);
const afterPut = new Date();

const db = client.db();
const doc = await db
.collection(ttlCheckpointCollection)
.findOne({ thread_id: "ttl-test-1" });

expect(doc?.upserted_at).toBeDefined();
expect(doc?.upserted_at).toBeInstanceOf(Date);
expect(doc?.upserted_at.getTime()).toBeGreaterThanOrEqual(
beforePut.getTime()
);
expect(doc?.upserted_at.getTime()).toBeLessThanOrEqual(afterPut.getTime());
});

it("should add upserted_at field to writes when TTL is enabled", async () => {
const saver = new MongoDBSaver({
client,
ttl: 3600,
checkpointCollectionName: ttlCheckpointCollection,
checkpointWritesCollectionName: ttlWritesCollection,
});

const beforePut = new Date();
await saver.putWrites(
{
configurable: {
thread_id: "ttl-test-2",
checkpoint_ns: "",
checkpoint_id: checkpoint1.id,
},
},
[["channel1", "value1"]],
"task1"
);
const afterPut = new Date();

const db = client.db();
const doc = await db
.collection(ttlWritesCollection)
.findOne({ thread_id: "ttl-test-2" });

expect(doc?.upserted_at).toBeDefined();
expect(doc?.upserted_at).toBeInstanceOf(Date);
expect(doc?.upserted_at.getTime()).toBeGreaterThanOrEqual(
beforePut.getTime()
);
expect(doc?.upserted_at.getTime()).toBeLessThanOrEqual(afterPut.getTime());
});

it("should NOT add upserted_at field when TTL is not enabled", async () => {
const saver = new MongoDBSaver({
client,
checkpointCollectionName: ttlCheckpointCollection,
checkpointWritesCollectionName: ttlWritesCollection,
});

await saver.put(
{ configurable: { thread_id: "no-ttl-test" } },
checkpoint1,
{ source: "update", step: -1, parents: {} }
);

const db = client.db();
const doc = await db
.collection(ttlCheckpointCollection)
.findOne({ thread_id: "no-ttl-test" });

expect(doc?.upserted_at).toBeUndefined();
});
});
});
Loading