From 911d3be8c160a081be8d96250759d5ca94909eb0 Mon Sep 17 00:00:00 2001 From: UmeshpJadhav Date: Thu, 29 Jan 2026 11:44:13 +0530 Subject: [PATCH 1/7] feat(mongodb): implement complete MongoDB memory adapter --- .changeset/mongodb-feature-storage.md | 5 + packages/mongodb/README.md | 40 + packages/mongodb/docker-compose.test.yaml | 18 + packages/mongodb/package.json | 55 + .../mongodb/src/index.integration.test.ts | 651 ++++++++++++ packages/mongodb/src/index.ts | 2 + packages/mongodb/src/memory-adapter.spec.ts | 976 +++++++++++++++++ packages/mongodb/src/memory-adapter.ts | 977 ++++++++++++++++++ packages/mongodb/tsconfig.json | 30 + packages/mongodb/tsup.config.ts | 19 + packages/mongodb/vitest.config.mts | 10 + .../mongodb/vitest.integration.config.mts | 10 + pnpm-lock.yaml | 103 +- 13 files changed, 2895 insertions(+), 1 deletion(-) create mode 100644 .changeset/mongodb-feature-storage.md create mode 100644 packages/mongodb/README.md create mode 100644 packages/mongodb/docker-compose.test.yaml create mode 100644 packages/mongodb/package.json create mode 100644 packages/mongodb/src/index.integration.test.ts create mode 100644 packages/mongodb/src/index.ts create mode 100644 packages/mongodb/src/memory-adapter.spec.ts create mode 100644 packages/mongodb/src/memory-adapter.ts create mode 100644 packages/mongodb/tsconfig.json create mode 100644 packages/mongodb/tsup.config.ts create mode 100644 packages/mongodb/vitest.config.mts create mode 100644 packages/mongodb/vitest.integration.config.mts diff --git a/.changeset/mongodb-feature-storage.md b/.changeset/mongodb-feature-storage.md new file mode 100644 index 000000000..59baf61ea --- /dev/null +++ b/.changeset/mongodb-feature-storage.md @@ -0,0 +1,5 @@ +--- +"@voltagent/mongodb": minor +--- + +Initial release of MongoDB memory storage adapter diff --git a/packages/mongodb/README.md b/packages/mongodb/README.md new file mode 100644 index 000000000..14232061e --- /dev/null +++ b/packages/mongodb/README.md @@ -0,0 +1,40 @@ +# @voltagent/mongodb + +MongoDB storage adapter for VoltAgent memory. + +## Installation + +```bash +npm install @voltagent/mongodb +``` + +## Usage + +```typescript +import { MongoDBMemoryAdapter } from "@voltagent/mongodb"; +import { Memory } from "@voltagent/core"; + +const memory = new Memory({ + storage: new MongoDBMemoryAdapter({ + connection: process.env.MONGO_URI, + database: "voltagent", // optional + collectionPrefix: "voltagent_memory", // optional + }), +}); +``` + +## Features + +- **Persistent Storage**: Stores messages, conversations, and workflow states in MongoDB. +- **Efficient Queries**: Indexed for fast retrieval by user, conversation, or date. +- **Type Safe**: Fully typed implementation of the VoltAgent StorageAdapter interface. +- **Workflow Support**: Native support for resuming suspended workflows. + +## Configuration + +| Option | Type | Default | Description | +| str | str | str | str | +| `connection` | `string` | required | MongoDB connection URI | +| `database` | `string` | `"voltagent"` | Database name | +| `collectionPrefix` | `string` | `"voltagent_memory"` | Prefix for collections | +| `debug` | `boolean` | `false` | Enable debug logging | diff --git a/packages/mongodb/docker-compose.test.yaml b/packages/mongodb/docker-compose.test.yaml new file mode 100644 index 000000000..0f5400cae --- /dev/null +++ b/packages/mongodb/docker-compose.test.yaml @@ -0,0 +1,18 @@ +services: + mongodb-test: + image: mongo:7.0 + container_name: 'voltagent-mongodb-test' + ports: + - '27017:27017' + environment: + MONGO_INITDB_DATABASE: voltagent_test + volumes: + - test_mongodb_data:/data/db + healthcheck: + test: ["CMD", "mongosh", "--eval", "db.adminCommand('ping')"] + interval: 2s + timeout: 5s + retries: 10 + +volumes: + test_mongodb_data: diff --git a/packages/mongodb/package.json b/packages/mongodb/package.json new file mode 100644 index 000000000..1524a7f57 --- /dev/null +++ b/packages/mongodb/package.json @@ -0,0 +1,55 @@ +{ + "name": "@voltagent/mongodb", + "description": "VoltAgent MongoDB - MongoDB Memory provider integration for VoltAgent", + "version": "2.0.2", + "dependencies": { + "mongodb": "^7.0.0" + }, + "devDependencies": { + "@vitest/coverage-v8": "^3.2.4", + "@voltagent/core": "^2.0.2", + "ai": "^6.0.0" + }, + "exports": { + ".": { + "import": { + "types": "./dist/index.d.mts", + "default": "./dist/index.mjs" + }, + "require": { + "types": "./dist/index.d.ts", + "default": "./dist/index.js" + } + } + }, + "files": [ + "dist" + ], + "license": "MIT", + "main": "dist/index.js", + "module": "dist/index.mjs", + "peerDependencies": { + "@voltagent/core": "^2.0.0", + "ai": "^6.0.0" + }, + "repository": { + "type": "git", + "url": "https://github.com/VoltAgent/voltagent.git", + "directory": "packages/mongodb" + }, + "scripts": { + "attw": "attw --pack", + "build": "tsup", + "dev": "tsup --watch", + "lint": "biome check .", + "lint:fix": "biome check . --write", + "publint": "publint --strict", + "test": "vitest", + "test:coverage": "vitest run --coverage", + "test:integration": "npm run test:integration:setup && vitest run --config vitest.integration.config.mts && npm run test:integration:teardown", + "test:integration:ci": "vitest run --config vitest.integration.config.mts", + "test:integration:setup": "docker compose -f docker-compose.test.yaml up -d && sleep 10", + "test:integration:teardown": "docker compose -f docker-compose.test.yaml down -v" + }, + "types": "dist/index.d.ts" +} diff --git a/packages/mongodb/src/index.integration.test.ts b/packages/mongodb/src/index.integration.test.ts new file mode 100644 index 000000000..9991af3aa --- /dev/null +++ b/packages/mongodb/src/index.integration.test.ts @@ -0,0 +1,651 @@ +/** + * Integration tests for MongoDB Memory Storage Adapter + * Tests against real MongoDB instance running in Docker + */ + +import { ConversationAlreadyExistsError, ConversationNotFoundError } from "@voltagent/core"; +import type { UIMessage } from "ai"; +import { afterAll, beforeAll, beforeEach, describe, expect, it } from "vitest"; +import { MongoDBMemoryAdapter } from "./memory-adapter"; + +describe("MongoDBMemoryAdapter - Integration Tests", () => { + let adapter: MongoDBMemoryAdapter; + + const MONGO_URI = process.env.MONGO_URI || "mongodb://localhost:27017"; + const TEST_DATABASE = "voltagent_test"; + + beforeAll(async () => { + // Create adapter with test database + adapter = new MongoDBMemoryAdapter({ + connection: MONGO_URI, + database: TEST_DATABASE, + collectionPrefix: "test_memory", + debug: false, + }); + + // Wait for initialization + await new Promise((resolve) => setTimeout(resolve, 1000)); + }); + + beforeEach(async () => { + // Clean up all collections before each test + const { MongoClient } = await import("mongodb"); + const client = new MongoClient(MONGO_URI); + await client.connect(); + const db = client.db(TEST_DATABASE); + + const collections = await db.listCollections().toArray(); + for (const collection of collections) { + if (collection.name.startsWith("test_memory_")) { + await db.collection(collection.name).deleteMany({}); + } + } + + await client.close(); + }); + + afterAll(async () => { + // Close adapter connection + await adapter.close(); + }); + + // ============================================================================ + // Message Operations Integration Tests + // ============================================================================ + + describe("Message Operations", () => { + it("should add and retrieve messages", async () => { + // Create conversation first + const conversation = await adapter.createConversation({ + id: "conv-1", + resourceId: "resource-1", + userId: "user-1", + title: "Test Conversation", + metadata: {}, + }); + + expect(conversation.id).toBe("conv-1"); + + // Add message + const message: UIMessage = { + id: "msg-1", + role: "user", + parts: [{ type: "text", text: "Hello, world!" }], + metadata: { custom: "data" }, + }; + + await adapter.addMessage(message, "user-1", "conv-1"); + + // Retrieve messages + const messages = await adapter.getMessages("user-1", "conv-1"); + + expect(messages).toHaveLength(1); + expect(messages[0].id).toBe("msg-1"); + expect(messages[0].role).toBe("user"); + expect(messages[0].parts).toEqual(message.parts); + expect(messages[0].metadata?.createdAt).toBeInstanceOf(Date); + }); + + it("should add multiple messages in batch", async () => { + await adapter.createConversation({ + id: "conv-2", + resourceId: "resource-1", + userId: "user-1", + title: "Test", + metadata: {}, + }); + + const messages: UIMessage[] = [ + { id: "msg-1", role: "user", parts: [{ type: "text", text: "Hello" }] }, + { id: "msg-2", role: "assistant", parts: [{ type: "text", text: "Hi there!" }] }, + { id: "msg-3", role: "user", parts: [{ type: "text", text: "How are you?" }] }, + ]; + + await adapter.addMessages(messages, "user-1", "conv-2"); + + const retrieved = await adapter.getMessages("user-1", "conv-2"); + + expect(retrieved).toHaveLength(3); + expect(retrieved.map((m) => m.id)).toEqual(["msg-1", "msg-2", "msg-3"]); + }); + + it("should filter messages by role", async () => { + await adapter.createConversation({ + id: "conv-3", + resourceId: "resource-1", + userId: "user-1", + title: "Test", + metadata: {}, + }); + + await adapter.addMessages( + [ + { id: "msg-1", role: "user", parts: [{ type: "text", text: "Hello" }] }, + { id: "msg-2", role: "assistant", parts: [{ type: "text", text: "Hi" }] }, + { id: "msg-3", role: "user", parts: [{ type: "text", text: "How are you?" }] }, + ], + "user-1", + "conv-3", + ); + + const userMessages = await adapter.getMessages("user-1", "conv-3", { roles: ["user"] }); + + expect(userMessages).toHaveLength(2); + expect(userMessages.every((m) => m.role === "user")).toBe(true); + }); + + it("should clear messages for a conversation", async () => { + await adapter.createConversation({ + id: "conv-4", + resourceId: "resource-1", + userId: "user-1", + title: "Test", + metadata: {}, + }); + + await adapter.addMessage( + { id: "msg-1", role: "user", parts: [{ type: "text", text: "Hello" }] }, + "user-1", + "conv-4", + ); + + await adapter.clearMessages("user-1", "conv-4"); + + const messages = await adapter.getMessages("user-1", "conv-4"); + expect(messages).toHaveLength(0); + }); + + it("should throw error when adding message to non-existent conversation", async () => { + const message: UIMessage = { + id: "msg-1", + role: "user", + parts: [{ type: "text", text: "Hello" }], + }; + + await expect(adapter.addMessage(message, "user-1", "non-existent")).rejects.toThrow( + ConversationNotFoundError, + ); + }); + }); + + // ============================================================================ + // Conversation Operations Integration Tests + // ============================================================================ + + describe("Conversation Operations", () => { + it("should create and retrieve conversation", async () => { + const input = { + id: "conv-create-test", + resourceId: "resource-1", + userId: "user-1", + title: "Test Conversation", + metadata: { custom: "field" }, + }; + + const created = await adapter.createConversation(input); + + expect(created.id).toBe(input.id); + expect(created.title).toBe(input.title); + expect(created.metadata).toEqual(input.metadata); + expect(created.createdAt).toBeTypeOf("string"); + expect(created.updatedAt).toBeTypeOf("string"); + + const retrieved = await adapter.getConversation("conv-create-test"); + + expect(retrieved).not.toBeNull(); + expect(retrieved?.id).toBe(input.id); + }); + + it("should throw error when creating duplicate conversation", async () => { + const input = { + id: "conv-duplicate", + resourceId: "resource-1", + userId: "user-1", + title: "Test", + metadata: {}, + }; + + await adapter.createConversation(input); + + await expect(adapter.createConversation(input)).rejects.toThrow( + ConversationAlreadyExistsError, + ); + }); + + it("should update conversation", async () => { + await adapter.createConversation({ + id: "conv-update", + resourceId: "resource-1", + userId: "user-1", + title: "Original Title", + metadata: {}, + }); + + const updated = await adapter.updateConversation("conv-update", { + title: "Updated Title", + metadata: { updated: true }, + }); + + expect(updated.title).toBe("Updated Title"); + expect(updated.metadata.updated).toBe(true); + }); + + it("should delete conversation and cascade to messages", async () => { + await adapter.createConversation({ + id: "conv-delete", + resourceId: "resource-1", + userId: "user-1", + title: "To Delete", + metadata: {}, + }); + + await adapter.addMessage( + { id: "msg-1", role: "user", parts: [{ type: "text", text: "Hello" }] }, + "user-1", + "conv-delete", + ); + + await adapter.deleteConversation("conv-delete"); + + const conversation = await adapter.getConversation("conv-delete"); + expect(conversation).toBeNull(); + + const messages = await adapter.getMessages("user-1", "conv-delete"); + expect(messages).toHaveLength(0); + }); + + it("should query conversations with pagination", async () => { + // Create multiple conversations + for (let i = 1; i <= 5; i++) { + await adapter.createConversation({ + id: `conv-query-${i}`, + resourceId: "resource-1", + userId: "user-1", + title: `Conversation ${i}`, + metadata: {}, + }); + // Small delay to ensure different timestamps + await new Promise((resolve) => setTimeout(resolve, 10)); + } + + const page1 = await adapter.queryConversations({ + userId: "user-1", + limit: 2, + offset: 0, + }); + + expect(page1).toHaveLength(2); + + const page2 = await adapter.queryConversations({ + userId: "user-1", + limit: 2, + offset: 2, + }); + + expect(page2).toHaveLength(2); + expect(page1[0].id).not.toBe(page2[0].id); + }); + + it("should get conversations by resourceId", async () => { + await adapter.createConversation({ + id: "conv-res-1", + resourceId: "resource-test", + userId: "user-1", + title: "Test 1", + metadata: {}, + }); + + await adapter.createConversation({ + id: "conv-res-2", + resourceId: "resource-test", + userId: "user-2", + title: "Test 2", + metadata: {}, + }); + + const conversations = await adapter.getConversations("resource-test"); + + expect(conversations).toHaveLength(2); + }); + }); + + // ============================================================================ + // Working Memory Integration Tests + // ============================================================================ + + describe("Working Memory Operations", () => { + it("should set and get conversation-scoped working memory", async () => { + await adapter.createConversation({ + id: "conv-memory", + resourceId: "resource-1", + userId: "user-1", + title: "Test", + metadata: {}, + }); + + await adapter.setWorkingMemory({ + conversationId: "conv-memory", + content: "Important context about this conversation", + scope: "conversation", + }); + + const memory = await adapter.getWorkingMemory({ + conversationId: "conv-memory", + scope: "conversation", + }); + + expect(memory).toBe("Important context about this conversation"); + }); + + it("should set and get user-scoped working memory", async () => { + await adapter.setWorkingMemory({ + userId: "user-memory-test", + content: "User preferences and context", + scope: "user", + }); + + const memory = await adapter.getWorkingMemory({ + userId: "user-memory-test", + scope: "user", + }); + + expect(memory).toBe("User preferences and context"); + }); + + it("should delete working memory", async () => { + await adapter.createConversation({ + id: "conv-del-memory", + resourceId: "resource-1", + userId: "user-1", + title: "Test", + metadata: {}, + }); + + await adapter.setWorkingMemory({ + conversationId: "conv-del-memory", + content: "Test memory", + scope: "conversation", + }); + + await adapter.deleteWorkingMemory({ + conversationId: "conv-del-memory", + scope: "conversation", + }); + + const memory = await adapter.getWorkingMemory({ + conversationId: "conv-del-memory", + scope: "conversation", + }); + + expect(memory).toBeNull(); + }); + }); + + // ============================================================================ + // Workflow State Integration Tests + // ============================================================================ + + describe("Workflow State Operations", () => { + it("should set and get workflow state", async () => { + const state: any = { + id: "exec-1", + workflowId: "workflow-1", + workflowName: "Test Workflow", + status: "running", + suspension: null, + events: [], + output: null, + cancellation: null, + userId: "user-1", + conversationId: "conv-1", + metadata: {}, + createdAt: new Date(), + updatedAt: new Date(), + }; + + await adapter.setWorkflowState("exec-1", state); + + const retrieved = await adapter.getWorkflowState("exec-1"); + + expect(retrieved).not.toBeNull(); + expect(retrieved?.id).toBe("exec-1"); + expect(retrieved?.status).toBe("running"); + }); + + it("should update workflow state", async () => { + const state: any = { + id: "exec-update", + workflowId: "workflow-1", + workflowName: "Test", + status: "running", + createdAt: new Date(), + updatedAt: new Date(), + }; + + await adapter.setWorkflowState("exec-update", state); + + await adapter.updateWorkflowState("exec-update", { + status: "completed", + output: { result: "success" }, + }); + + const updated = await adapter.getWorkflowState("exec-update"); + + expect(updated?.status).toBe("completed"); + expect(updated?.output).toEqual({ result: "success" }); + }); + + it("should query workflow runs", async () => { + const now = new Date(); + + for (let i = 1; i <= 3; i++) { + await adapter.setWorkflowState(`exec-query-${i}`, { + id: `exec-query-${i}`, + workflowId: "workflow-query", + workflowName: "Test", + status: i === 1 ? "completed" : "running", + createdAt: now, + updatedAt: now, + } as any); + } + + const allRuns = await adapter.queryWorkflowRuns({ + workflowId: "workflow-query", + }); + + expect(allRuns.length).toBeGreaterThanOrEqual(3); + + const completedRuns = await adapter.queryWorkflowRuns({ + workflowId: "workflow-query", + status: "completed", + }); + + expect(completedRuns).toHaveLength(1); + }); + + it("should get suspended workflow states", async () => { + const now = new Date(); + + await adapter.setWorkflowState("exec-suspended-1", { + id: "exec-suspended-1", + workflowId: "workflow-suspend", + workflowName: "Test", + status: "suspended", + createdAt: now, + updatedAt: now, + } as any); + + await adapter.setWorkflowState("exec-running-1", { + id: "exec-running-1", + workflowId: "workflow-suspend", + workflowName: "Test", + status: "running", + createdAt: now, + updatedAt: now, + } as any); + + const suspended = await adapter.getSuspendedWorkflowStates("workflow-suspend"); + + expect(suspended).toHaveLength(1); + expect(suspended[0].status).toBe("suspended"); + }); + }); + + // ============================================================================ + // Conversation Steps Integration Tests + // ============================================================================ + + describe("Conversation Steps Operations", () => { + it("should save and retrieve conversation steps", async () => { + await adapter.createConversation({ + id: "conv-steps", + resourceId: "resource-1", + userId: "user-1", + title: "Test", + metadata: {}, + }); + + const steps: any[] = [ + { + id: "step-1", + conversationId: "conv-steps", + userId: "user-1", + agentId: "agent-1", + agentName: "Test Agent", + operationId: "op-1", + stepIndex: 0, + type: "message", + role: "user", + content: "Hello", + }, + { + id: "step-2", + conversationId: "conv-steps", + userId: "user-1", + agentId: "agent-1", + agentName: "Test Agent", + operationId: "op-1", + stepIndex: 1, + type: "message", + role: "assistant", + content: "Hi there", + }, + ]; + + await adapter.saveConversationSteps(steps); + + const retrieved = await adapter.getConversationSteps("user-1", "conv-steps"); + + expect(retrieved).toHaveLength(2); + expect(retrieved[0].stepIndex).toBe(0); + expect(retrieved[1].stepIndex).toBe(1); + }); + + it("should filter steps by operationId", async () => { + await adapter.createConversation({ + id: "conv-steps-filter", + resourceId: "resource-1", + userId: "user-1", + title: "Test", + metadata: {}, + }); + + await adapter.saveConversationSteps([ + { + id: "step-op1", + conversationId: "conv-steps-filter", + userId: "user-1", + agentId: "agent-1", + operationId: "op-1", + stepIndex: 0, + type: "message", + role: "user", + } as any, + { + id: "step-op2", + conversationId: "conv-steps-filter", + userId: "user-1", + agentId: "agent-1", + operationId: "op-2", + stepIndex: 1, + type: "message", + role: "user", + } as any, + ]); + + const op1Steps = await adapter.getConversationSteps("user-1", "conv-steps-filter", { + operationId: "op-1", + }); + + expect(op1Steps).toHaveLength(1); + expect(op1Steps[0].operationId).toBe("op-1"); + }); + }); + + // ============================================================================ + // Edge Cases and Error Handling + // ============================================================================ + + describe("Edge Cases", () => { + it("should handle empty message arrays", async () => { + await adapter.createConversation({ + id: "conv-empty", + resourceId: "resource-1", + userId: "user-1", + title: "Test", + metadata: {}, + }); + + await adapter.addMessages([], "user-1", "conv-empty"); + const messages = await adapter.getMessages("user-1", "conv-empty"); + + expect(messages).toHaveLength(0); + }); + + it("should return empty array for non-existent conversation messages", async () => { + const messages = await adapter.getMessages("user-1", "non-existent"); + expect(messages).toHaveLength(0); + }); + + it("should handle messages without IDs", async () => { + await adapter.createConversation({ + id: "conv-no-id", + resourceId: "resource-1", + userId: "user-1", + title: "Test", + metadata: {}, + }); + + const message: UIMessage = { + role: "user", + parts: [{ type: "text", text: "Hello" }], + id: "", + }; + + await adapter.addMessage(message, "user-1", "conv-no-id"); + + const messages = await adapter.getMessages("user-1", "conv-no-id"); + + expect(messages).toHaveLength(1); + expect(messages[0].id).toBeTruthy(); + }); + + it("should handle pagination beyond available results", async () => { + await adapter.createConversation({ + id: "conv-pagination", + resourceId: "resource-1", + userId: "user-1", + title: "Test", + metadata: {}, + }); + + const conversations = await adapter.queryConversations({ + userId: "user-1", + limit: 10, + offset: 100, + }); + + expect(conversations).toHaveLength(0); + }); + }); +}); diff --git a/packages/mongodb/src/index.ts b/packages/mongodb/src/index.ts new file mode 100644 index 000000000..cfebb019d --- /dev/null +++ b/packages/mongodb/src/index.ts @@ -0,0 +1,2 @@ +export { MongoDBMemoryAdapter } from "./memory-adapter"; +export type { MongoDBMemoryOptions } from "./memory-adapter"; diff --git a/packages/mongodb/src/memory-adapter.spec.ts b/packages/mongodb/src/memory-adapter.spec.ts new file mode 100644 index 000000000..c87fee035 --- /dev/null +++ b/packages/mongodb/src/memory-adapter.spec.ts @@ -0,0 +1,976 @@ +/** + * MongoDB Storage Adapter for Memory + * Stores conversations and messages in MongoDB database + */ + +import { ConversationAlreadyExistsError, ConversationNotFoundError } from "@voltagent/core"; +import type { + Conversation, + ConversationQueryOptions, + ConversationStepRecord, + CreateConversationInput, + GetConversationStepsOptions, + GetMessagesOptions, + StorageAdapter, + WorkflowRunQuery, + WorkflowStateEntry, + WorkingMemoryScope, +} from "@voltagent/core"; +import type { UIMessage } from "ai"; +import { type Collection, type Db, type Document, MongoClient } from "mongodb"; + +/** + * MongoDB configuration options for Memory + */ +export interface MongoDBMemoryOptions { + /** + * MongoDB connection URI + * Examples: + * - "mongodb://localhost:27017" + * - "mongodb://username:password@localhost:27017" + * - "mongodb+srv://username:password@cluster.mongodb.net" + */ + connection: string; + + /** + * Database name to use for collections + * @default "voltagent" + */ + database?: string; + + /** + * Prefix for collection names + * @default "voltagent_memory" + */ + collectionPrefix?: string; + + /** + * Whether to enable debug logging + * @default false + */ + debug?: boolean; +} + +/** + * MongoDB Storage Adapter for Memory + * Production-ready storage for conversations and messages + */ +export class MongoDBMemoryAdapter implements StorageAdapter { + private client: MongoClient; + private db: Db | null = null; + private databaseName: string; + private collectionPrefix: string; + private initialized = false; + private initPromise: Promise | null = null; + private debug: boolean; + + constructor(options: MongoDBMemoryOptions) { + this.databaseName = options.database ?? "voltagent"; + this.collectionPrefix = options.collectionPrefix ?? "voltagent_memory"; + this.debug = options.debug ?? false; + + // Validate collection prefix + if ( + this.collectionPrefix.includes("\0") || + this.collectionPrefix.includes("$") || + this.collectionPrefix.startsWith("system.") + ) { + throw new Error(`Invalid collection prefix: ${this.collectionPrefix}`); + } + + // Create MongoDB client + this.client = new MongoClient(options.connection); + + this.log("MongoDB Memory adapter initialized"); + + // Start initialization but don't await it + this.initPromise = this.initialize(); + } + + /** + * Log debug messages + */ + private log(...args: any[]): void { + if (this.debug) { + console.log("[MongoDB Memory]", ...args); + } + } + + /** + * Generate a random ID + */ + private generateId(): string { + return ( + Math.random().toString(36).substring(2, 15) + Math.random().toString(36).substring(2, 15) + ); + } + + /** + * Get collection by name + */ + private getCollection(collectionName: string): Collection { + if (!this.db) { + throw new Error("Database not initialized"); + } + return this.db.collection(`${this.collectionPrefix}_${collectionName}`); + } + + /** + * Initialize database schema + */ + private async initialize(): Promise { + if (this.initialized) return; + + // Prevent multiple simultaneous initializations + if (this.initPromise && !this.initialized) { + return this.initPromise; + } + + try { + // Connect to MongoDB + await this.client.connect(); + this.db = this.client.db(this.databaseName); + + this.log(`Connected to MongoDB database: ${this.databaseName}`); + + // Create indexes for all collections + const conversationsCollection = this.getCollection("conversations"); + const messagesCollection = this.getCollection("messages"); + const workflowStatesCollection = this.getCollection("workflow_states"); + const stepsCollection = this.getCollection("steps"); + + // Users collection indexes (none needed beyond _id) + + // Conversations collection indexes + await conversationsCollection.createIndex({ userId: 1 }, { background: true }); + await conversationsCollection.createIndex({ resourceId: 1 }, { background: true }); + await conversationsCollection.createIndex({ updatedAt: -1 }, { background: true }); + + // Messages collection indexes + await messagesCollection.createIndex( + { conversationId: 1, createdAt: 1 }, + { background: true }, + ); + await messagesCollection.createIndex({ conversationId: 1 }, { background: true }); + // Unique compound index to enforce message uniqueness + await messagesCollection.createIndex( + { conversationId: 1, messageId: 1 }, + { unique: true, background: true }, + ); + + // Workflow states collection indexes + await workflowStatesCollection.createIndex({ workflowId: 1 }, { background: true }); + await workflowStatesCollection.createIndex({ status: 1 }, { background: true }); + await workflowStatesCollection.createIndex({ createdAt: -1 }, { background: true }); + + // Steps collection indexes + await stepsCollection.createIndex({ conversationId: 1, stepIndex: 1 }, { background: true }); + await stepsCollection.createIndex( + { conversationId: 1, operationId: 1 }, + { background: true }, + ); + + this.initialized = true; + this.log("Database schema initialized with indexes"); + } catch (error) { + throw new Error( + `Failed to connect to MongoDB: ${error instanceof Error ? error.message : String(error)}`, + ); + } + } + + /** + * Close MongoDB connection + */ + async close(): Promise { + await this.client.close(); + this.log("MongoDB connection closed"); + } + + // ============================================================================ + // Message Operations + // ============================================================================ + + /** + * Add a single message + */ + async addMessage(message: UIMessage, userId: string, conversationId: string): Promise { + await this.initPromise; + + const messagesCollection = this.getCollection("messages"); + + // Ensure conversation exists + const conversation = await this.getConversation(conversationId); + if (!conversation) { + throw new ConversationNotFoundError(conversationId); + } + + const messageId = message.id || this.generateId(); + + try { + await messagesCollection.insertOne({ + _id: undefined, // Let MongoDB generate ObjectId + conversationId, + messageId, + userId, + role: message.role, + parts: message.parts, + metadata: message.metadata || {}, + formatVersion: 2, + createdAt: new Date(), + } as any); + + this.log(`Added message ${messageId} to conversation ${conversationId}`); + } catch (error: any) { + if (error.code === 11000) { + throw new Error( + `Message with ID ${messageId} already exists in conversation ${conversationId}`, + ); + } + throw error; + } + } + + /** + * Add multiple messages + */ + async addMessages(messages: UIMessage[], userId: string, conversationId: string): Promise { + await this.initPromise; + + if (messages.length === 0) return; + + const messagesCollection = this.getCollection("messages"); + + // Ensure conversation exists + const conversation = await this.getConversation(conversationId); + if (!conversation) { + throw new ConversationNotFoundError(conversationId); + } + + const documentsToInsert = messages.map((message) => ({ + _id: undefined, // Let MongoDB generate ObjectId + conversationId, + messageId: message.id || this.generateId(), + userId, + role: message.role, + parts: message.parts, + metadata: message.metadata || {}, + formatVersion: 2, + createdAt: new Date(), + })); + + try { + await messagesCollection.insertMany(documentsToInsert as any); + this.log(`Added ${messages.length} messages to conversation ${conversationId}`); + } catch (error: any) { + if (error.code === 11000) { + throw new Error(`One or more messages already exist in conversation ${conversationId}`); + } + throw error; + } + } + + /** + * Get messages for a conversation + */ + async getMessages( + userId: string, + conversationId: string, + options?: GetMessagesOptions, + ): Promise[]> { + await this.initPromise; + + const messagesCollection = this.getCollection("messages"); + + const filter: any = { conversationId, userId }; + + if (options?.roles && options.roles.length > 0) { + filter.role = { $in: options.roles }; + } + + if (options?.after) { + filter.createdAt = { $gt: options.after }; + } + + if (options?.before) { + filter.createdAt = { ...filter.createdAt, $lt: options.before }; + } + + let cursor = messagesCollection.find(filter).sort({ createdAt: 1 }); + + if (options?.limit) { + cursor = cursor.limit(options.limit); + } + + const messages = await cursor.toArray(); + + return messages.map((msg: any) => ({ + id: msg.messageId, + role: msg.role, + parts: msg.parts, + metadata: { + ...msg.metadata, + createdAt: msg.createdAt, + }, + })); + } + + /** + * Clear messages for a conversation or all conversations for a user + */ + async clearMessages(userId: string, conversationId?: string): Promise { + await this.initPromise; + + const messagesCollection = this.getCollection("messages"); + const stepsCollection = this.getCollection("steps"); + + if (conversationId) { + // Clear messages for specific conversation + await messagesCollection.deleteMany({ conversationId, userId }); + await stepsCollection.deleteMany({ conversationId, userId }); + this.log(`Cleared messages for conversation ${conversationId}`); + } else { + // Clear all messages for user + // First get all conversation IDs for this user + const conversationsCollection = this.getCollection("conversations"); + const userConversations = await conversationsCollection + .find({ userId }) + .project({ _id: 1 }) + .toArray(); + + const conversationIds = userConversations.map((conv: any) => conv._id); + + if (conversationIds.length > 0) { + await messagesCollection.deleteMany({ conversationId: { $in: conversationIds } }); + await stepsCollection.deleteMany({ conversationId: { $in: conversationIds } }); + this.log(`Cleared all messages for user ${userId}`); + } + } + } + + // ============================================================================ + // Conversation Operations + // ============================================================================ + + /** + * Create a new conversation + */ + async createConversation(input: CreateConversationInput): Promise { + await this.initPromise; + + const conversationsCollection = this.getCollection("conversations"); + + // Check if conversation already exists + const existing = await conversationsCollection.findOne({ _id: input.id } as any); + if (existing) { + throw new ConversationAlreadyExistsError(input.id); + } + + const now = new Date(); + const conversation = { + _id: input.id, + resourceId: input.resourceId, + userId: input.userId, + title: input.title, + metadata: input.metadata || {}, + createdAt: now, + updatedAt: now, + }; + + await conversationsCollection.insertOne(conversation as any); + + this.log(`Created conversation ${input.id}`); + + return { + id: conversation._id, + resourceId: conversation.resourceId, + userId: conversation.userId, + title: conversation.title, + metadata: conversation.metadata, + createdAt: conversation.createdAt.toISOString(), + updatedAt: conversation.updatedAt.toISOString(), + }; + } + + /** + * Get a conversation by ID + */ + async getConversation(id: string): Promise { + await this.initPromise; + + const conversationsCollection = this.getCollection("conversations"); + const conversation = await conversationsCollection.findOne({ _id: id } as any); + + if (!conversation) { + return null; + } + + return { + id: (conversation as any)._id, + resourceId: (conversation as any).resourceId, + userId: (conversation as any).userId, + title: (conversation as any).title, + metadata: (conversation as any).metadata || {}, + createdAt: (conversation as any).createdAt.toISOString(), + updatedAt: (conversation as any).updatedAt.toISOString(), + }; + } + + /** + * Get all conversations for a resource + */ + async getConversations(resourceId: string): Promise { + await this.initPromise; + + const conversationsCollection = this.getCollection("conversations"); + const conversations = await conversationsCollection + .find({ resourceId } as any) + .sort({ updatedAt: -1 }) + .toArray(); + + return conversations.map((conv: any) => ({ + id: conv._id, + resourceId: conv.resourceId, + userId: conv.userId, + title: conv.title, + metadata: conv.metadata || {}, + createdAt: conv.createdAt.toISOString(), + updatedAt: conv.updatedAt.toISOString(), + })); + } + + /** + * Get all conversations for a user + */ + async getConversationsByUserId( + userId: string, + options?: Omit, + ): Promise { + return this.queryConversations({ ...options, userId }); + } + + /** + * Query conversations with filters + */ + async queryConversations(options: ConversationQueryOptions): Promise { + await this.initPromise; + + const conversationsCollection = this.getCollection("conversations"); + + const filter: any = {}; + + if (options.userId) { + filter.userId = options.userId; + } + + if (options.resourceId) { + filter.resourceId = options.resourceId; + } + + let cursor = conversationsCollection.find(filter).sort({ updatedAt: -1 }); + + if (options.limit) { + cursor = cursor.limit(options.limit); + } + + if (options.offset) { + cursor = cursor.skip(options.offset); + } + + const conversations = await cursor.toArray(); + + return conversations.map((conv: any) => ({ + id: conv._id, + resourceId: conv.resourceId, + userId: conv.userId, + title: conv.title, + metadata: conv.metadata || {}, + createdAt: conv.createdAt.toISOString(), + updatedAt: conv.updatedAt.toISOString(), + })); + } + + /** + * Update a conversation + */ + async updateConversation( + id: string, + updates: Partial>, + ): Promise { + await this.initPromise; + + const conversationsCollection = this.getCollection("conversations"); + + const updateDoc: any = { + updatedAt: new Date(), + }; + + if (updates.title !== undefined) { + updateDoc.title = updates.title; + } + + if (updates.metadata !== undefined) { + updateDoc.metadata = updates.metadata; + } + + if (updates.resourceId !== undefined) { + updateDoc.resourceId = updates.resourceId; + } + + if (updates.userId !== undefined) { + updateDoc.userId = updates.userId; + } + + const result = await conversationsCollection.findOneAndUpdate( + { _id: id } as any, + { $set: updateDoc }, + { returnDocument: "after" }, + ); + + if (!result) { + throw new ConversationNotFoundError(id); + } + + this.log(`Updated conversation ${id}`); + + return { + id: (result as any)._id, + resourceId: (result as any).resourceId, + userId: (result as any).userId, + title: (result as any).title, + metadata: (result as any).metadata || {}, + createdAt: (result as any).createdAt.toISOString(), + updatedAt: (result as any).updatedAt.toISOString(), + }; + } + + /** + * Delete a conversation + */ + async deleteConversation(id: string): Promise { + await this.initPromise; + + const conversationsCollection = this.getCollection("conversations"); + + // MongoDB will cascade delete messages and steps automatically via application logic + const messagesCollection = this.getCollection("messages"); + const stepsCollection = this.getCollection("steps"); + + await messagesCollection.deleteMany({ conversationId: id } as any); + await stepsCollection.deleteMany({ conversationId: id } as any); + await conversationsCollection.deleteOne({ _id: id } as any); + + this.log(`Deleted conversation ${id}`); + } + + // ============================================================================ + // Conversation Steps Operations + // ============================================================================ + + /** + * Save conversation steps + */ + async saveConversationSteps(steps: ConversationStepRecord[]): Promise { + await this.initPromise; + + if (steps.length === 0) return; + + const stepsCollection = this.getCollection("steps"); + + const operations = steps.map((step) => ({ + replaceOne: { + filter: { _id: step.id || this.generateId() }, + replacement: { + _id: step.id || this.generateId(), + conversationId: step.conversationId, + userId: step.userId, + agentId: step.agentId, + agentName: step.agentName, + operationId: step.operationId, + stepIndex: step.stepIndex, + type: step.type, + role: step.role, + content: step.content, + arguments: step.arguments, + result: step.result, + usage: step.usage, + subAgentId: step.subAgentId, + subAgentName: step.subAgentName, + createdAt: new Date(), + }, + upsert: true, + }, + })); + + await stepsCollection.bulkWrite(operations as any); + + this.log(`Saved ${steps.length} conversation steps`); + } + + /** + * Get conversation steps + */ + async getConversationSteps( + userId: string, + conversationId: string, + options?: GetConversationStepsOptions, + ): Promise { + await this.initPromise; + + const stepsCollection = this.getCollection("steps"); + + const filter: any = { conversationId, userId }; + + if (options?.operationId) { + filter.operationId = options.operationId; + } + + let cursor = stepsCollection.find(filter).sort({ stepIndex: 1 }); + + if (options?.limit) { + cursor = cursor.limit(options.limit); + } + + const steps = await cursor.toArray(); + + return steps.map((step: any) => ({ + id: step._id, + conversationId: step.conversationId, + userId: step.userId, + agentId: step.agentId, + agentName: step.agentName, + operationId: step.operationId, + stepIndex: step.stepIndex, + type: step.type, + role: step.role, + content: step.content, + arguments: step.arguments, + result: step.result, + usage: step.usage, + subAgentId: step.subAgentId, + subAgentName: step.subAgentName, + createdAt: step.createdAt.toISOString(), + })); + } + + // ============================================================================ + // Working Memory Operations + // ============================================================================ + + /** + * Get working memory + */ + async getWorkingMemory(params: { + conversationId?: string; + userId?: string; + scope: WorkingMemoryScope; + }): Promise { + await this.initPromise; + + if (params.scope === "conversation" && params.conversationId) { + const conversationsCollection = this.getCollection("conversations"); + const conversation = await conversationsCollection.findOne({ + _id: params.conversationId, + } as any); + + if (!conversation) { + return null; + } + + const workingMemory = (conversation as any).metadata?.workingMemory; + return workingMemory || null; + } + + if (params.scope === "user" && params.userId) { + const usersCollection = this.getCollection("users"); + const user = await usersCollection.findOne({ _id: params.userId } as any); + + if (!user) { + return null; + } + + const workingMemory = (user as any).metadata?.workingMemory; + return workingMemory || null; + } + + return null; + } + + /** + * Set working memory + */ + async setWorkingMemory(params: { + conversationId?: string; + userId?: string; + content: string; + scope: WorkingMemoryScope; + }): Promise { + await this.initPromise; + + if (params.scope === "conversation" && params.conversationId) { + const conversationsCollection = this.getCollection("conversations"); + + const conversation = await conversationsCollection.findOne({ + _id: params.conversationId, + } as any); + if (!conversation) { + throw new ConversationNotFoundError(params.conversationId); + } + + await conversationsCollection.updateOne({ _id: params.conversationId } as any, { + $set: { + "metadata.workingMemory": params.content, + updatedAt: new Date(), + }, + }); + + this.log(`Set working memory for conversation ${params.conversationId}`); + } else if (params.scope === "user" && params.userId) { + const usersCollection = this.getCollection("users"); + + // Upsert user document with working memory + await usersCollection.updateOne( + { _id: params.userId } as any, + { + $set: { + "metadata.workingMemory": params.content, + updatedAt: new Date(), + }, + $setOnInsert: { + createdAt: new Date(), + }, + }, + { upsert: true }, + ); + + this.log(`Set working memory for user ${params.userId}`); + } + } + + /** + * Delete working memory + */ + async deleteWorkingMemory(params: { + conversationId?: string; + userId?: string; + scope: WorkingMemoryScope; + }): Promise { + await this.initPromise; + + if (params.scope === "conversation" && params.conversationId) { + const conversationsCollection = this.getCollection("conversations"); + + await conversationsCollection.updateOne({ _id: params.conversationId } as any, { + $unset: { "metadata.workingMemory": "" }, + $set: { updatedAt: new Date() }, + }); + + this.log(`Deleted working memory for conversation ${params.conversationId}`); + } else if (params.scope === "user" && params.userId) { + const usersCollection = this.getCollection("users"); + + await usersCollection.updateOne({ _id: params.userId } as any, { + $unset: { "metadata.workingMemory": "" }, + $set: { updatedAt: new Date() }, + }); + + this.log(`Deleted working memory for user ${params.userId}`); + } + } + + // ============================================================================ + // Workflow State Operations + // ============================================================================ + + /** + * Get workflow state by execution ID + */ + async getWorkflowState(executionId: string): Promise { + await this.initPromise; + + const workflowStatesCollection = this.getCollection("workflow_states"); + const state = await workflowStatesCollection.findOne({ _id: executionId } as any); + + if (!state) { + return null; + } + + return { + id: (state as any)._id, + workflowId: (state as any).workflowId, + workflowName: (state as any).workflowName, + status: (state as any).status, + suspension: (state as any).suspension, + events: (state as any).events, + output: (state as any).output, + cancellation: (state as any).cancellation, + userId: (state as any).userId, + conversationId: (state as any).conversationId, + metadata: (state as any).metadata, + createdAt: (state as any).createdAt.toISOString(), + updatedAt: (state as any).updatedAt.toISOString(), + }; + } + + /** + * Query workflow runs with filters + */ + async queryWorkflowRuns(query: WorkflowRunQuery): Promise { + await this.initPromise; + + const workflowStatesCollection = this.getCollection("workflow_states"); + + const filter: any = {}; + + if (query.workflowId) { + filter.workflowId = query.workflowId; + } + + if (query.status) { + filter.status = query.status; + } + + if (query.from) { + filter.createdAt = { $gte: query.from }; + } + + if (query.to) { + filter.createdAt = { ...filter.createdAt, $lte: query.to }; + } + + let cursor = workflowStatesCollection.find(filter).sort({ createdAt: -1 }); + + if (query.limit) { + cursor = cursor.limit(query.limit); + } + + if (query.offset) { + cursor = cursor.skip(query.offset); + } + + const states = await cursor.toArray(); + + return states.map((state: any) => ({ + id: state._id, + workflowId: state.workflowId, + workflowName: state.workflowName, + status: state.status, + suspension: state.suspension, + events: state.events, + output: state.output, + cancellation: state.cancellation, + userId: state.userId, + conversationId: state.conversationId, + metadata: state.metadata, + createdAt: state.createdAt.toISOString(), + updatedAt: state.updatedAt.toISOString(), + })); + } + + /** + * Set workflow state (create or replace) + */ + async setWorkflowState(executionId: string, state: WorkflowStateEntry): Promise { + await this.initPromise; + + const workflowStatesCollection = this.getCollection("workflow_states"); + + const now = new Date(); + + await workflowStatesCollection.replaceOne( + { _id: executionId } as any, + { + _id: executionId, + workflowId: state.workflowId, + workflowName: state.workflowName, + status: state.status, + suspension: state.suspension, + events: state.events, + output: state.output, + cancellation: state.cancellation, + userId: state.userId, + conversationId: state.conversationId, + metadata: state.metadata, + createdAt: state.createdAt ? new Date(state.createdAt) : now, + updatedAt: now, + } as any, + { upsert: true }, + ); + + this.log(`Set workflow state ${executionId}`); + } + + /** + * Update workflow state (partial update) + */ + async updateWorkflowState( + executionId: string, + updates: Partial, + ): Promise { + await this.initPromise; + + const workflowStatesCollection = this.getCollection("workflow_states"); + + const updateDoc: any = { + updatedAt: new Date(), + }; + + if (updates.status !== undefined) { + updateDoc.status = updates.status; + } + + if (updates.suspension !== undefined) { + updateDoc.suspension = updates.suspension; + } + + if (updates.events !== undefined) { + updateDoc.events = updates.events; + } + + if (updates.output !== undefined) { + updateDoc.output = updates.output; + } + + if (updates.cancellation !== undefined) { + updateDoc.cancellation = updates.cancellation; + } + + if (updates.metadata !== undefined) { + updateDoc.metadata = updates.metadata; + } + + await workflowStatesCollection.updateOne({ _id: executionId } as any, { $set: updateDoc }); + + this.log(`Updated workflow state ${executionId}`); + } + + /** + * Get suspended workflow states + */ + async getSuspendedWorkflowStates(workflowId: string): Promise { + await this.initPromise; + + const workflowStatesCollection = this.getCollection("workflow_states"); + + const states = await workflowStatesCollection + .find({ workflowId, status: "suspended" } as any) + .sort({ createdAt: -1 }) + .toArray(); + + return states.map((state: any) => ({ + id: state._id, + workflowId: state.workflowId, + workflowName: state.workflowName, + status: state.status, + suspension: state.suspension, + events: state.events, + output: state.output, + cancellation: state.cancellation, + userId: state.userId, + conversationId: state.conversationId, + metadata: state.metadata, + createdAt: state.createdAt, + updatedAt: state.updatedAt, + })); + } +} diff --git a/packages/mongodb/src/memory-adapter.ts b/packages/mongodb/src/memory-adapter.ts new file mode 100644 index 000000000..b840d5559 --- /dev/null +++ b/packages/mongodb/src/memory-adapter.ts @@ -0,0 +1,977 @@ +/** + * MongoDB Storage Adapter for Memory + * Stores conversations and messages in MongoDB database + */ + +import { ConversationAlreadyExistsError, ConversationNotFoundError } from "@voltagent/core"; +import type { + Conversation, + ConversationQueryOptions, + ConversationStepRecord, + CreateConversationInput, + GetConversationStepsOptions, + GetMessagesOptions, + StorageAdapter, + WorkflowRunQuery, + WorkflowStateEntry, + WorkingMemoryScope, +} from "@voltagent/core"; +import type { UIMessage } from "ai"; +import { type Collection, type Db, type Document, MongoClient } from "mongodb"; + +/** + * MongoDB configuration options for Memory + */ +export interface MongoDBMemoryOptions { + /** + * MongoDB connection URI + * Examples: + * - "mongodb://localhost:27017" + * - "mongodb://username:password@localhost:27017" + * - "mongodb+srv://username:password@cluster.mongodb.net" + */ + connection: string; + + /** + * Database name to use for collections + * @default "voltagent" + */ + database?: string; + + /** + * Prefix for collection names + * @default "voltagent_memory" + */ + collectionPrefix?: string; + + /** + * Whether to enable debug logging + * @default false + */ + debug?: boolean; +} + +/** + * MongoDB Storage Adapter for Memory + * Production-ready storage for conversations and messages + */ +export class MongoDBMemoryAdapter implements StorageAdapter { + private client: MongoClient; + private db: Db | null = null; + private databaseName: string; + private collectionPrefix: string; + private initialized = false; + private initPromise: Promise | null = null; + private debug: boolean; + + constructor(options: MongoDBMemoryOptions) { + this.databaseName = options.database ?? "voltagent"; + this.collectionPrefix = options.collectionPrefix ?? "voltagent_memory"; + this.debug = options.debug ?? false; + + // Validate collection prefix + if ( + this.collectionPrefix.includes("\0") || + this.collectionPrefix.includes("$") || + this.collectionPrefix.startsWith("system.") + ) { + throw new Error(`Invalid collection prefix: ${this.collectionPrefix}`); + } + + // Create MongoDB client + this.client = new MongoClient(options.connection); + + this.log("MongoDB Memory adapter initialized"); + + // Start initialization but don't await it + this.initPromise = this.initialize(); + } + + /** + * Log debug messages + */ + private log(...args: any[]): void { + if (this.debug) { + console.log("[MongoDB Memory]", ...args); + } + } + + /** + * Generate a random ID + */ + private generateId(): string { + return ( + Math.random().toString(36).substring(2, 15) + Math.random().toString(36).substring(2, 15) + ); + } + + /** + * Get collection by name + */ + private getCollection(collectionName: string): Collection { + if (!this.db) { + throw new Error("Database not initialized"); + } + return this.db.collection(`${this.collectionPrefix}_${collectionName}`); + } + + /** + * Initialize database schema + */ + private async initialize(): Promise { + if (this.initialized) return; + + // Prevent multiple simultaneous initializations + if (this.initPromise && !this.initialized) { + return this.initPromise; + } + + try { + // Connect to MongoDB + await this.client.connect(); + this.db = this.client.db(this.databaseName); + + this.log(`Connected to MongoDB database: ${this.databaseName}`); + + // Create indexes for all collections + + const conversationsCollection = this.getCollection("conversations"); + const messagesCollection = this.getCollection("messages"); + const workflowStatesCollection = this.getCollection("workflow_states"); + const stepsCollection = this.getCollection("steps"); + + // Users collection indexes (none needed beyond _id) + + // Conversations collection indexes + await conversationsCollection.createIndex({ userId: 1 }, { background: true }); + await conversationsCollection.createIndex({ resourceId: 1 }, { background: true }); + await conversationsCollection.createIndex({ updatedAt: -1 }, { background: true }); + + // Messages collection indexes + await messagesCollection.createIndex( + { conversationId: 1, createdAt: 1 }, + { background: true }, + ); + await messagesCollection.createIndex({ conversationId: 1 }, { background: true }); + // Unique compound index to enforce message uniqueness + await messagesCollection.createIndex( + { conversationId: 1, messageId: 1 }, + { unique: true, background: true }, + ); + + // Workflow states collection indexes + await workflowStatesCollection.createIndex({ workflowId: 1 }, { background: true }); + await workflowStatesCollection.createIndex({ status: 1 }, { background: true }); + await workflowStatesCollection.createIndex({ createdAt: -1 }, { background: true }); + + // Steps collection indexes + await stepsCollection.createIndex({ conversationId: 1, stepIndex: 1 }, { background: true }); + await stepsCollection.createIndex( + { conversationId: 1, operationId: 1 }, + { background: true }, + ); + + this.initialized = true; + this.log("Database schema initialized with indexes"); + } catch (error) { + throw new Error( + `Failed to connect to MongoDB: ${error instanceof Error ? error.message : String(error)}`, + ); + } + } + + /** + * Close MongoDB connection + */ + async close(): Promise { + await this.client.close(); + this.log("MongoDB connection closed"); + } + + // ============================================================================ + // Message Operations + // ============================================================================ + + /** + * Add a single message + */ + async addMessage(message: UIMessage, userId: string, conversationId: string): Promise { + await this.initPromise; + + const messagesCollection = this.getCollection("messages"); + + // Ensure conversation exists + const conversation = await this.getConversation(conversationId); + if (!conversation) { + throw new ConversationNotFoundError(conversationId); + } + + const messageId = message.id || this.generateId(); + + try { + await messagesCollection.insertOne({ + _id: undefined, + conversationId, + messageId, + userId, + role: message.role, + parts: message.parts, + metadata: message.metadata || {}, + formatVersion: 2, + createdAt: new Date(), + } as any); + + this.log(`Added message ${messageId} to conversation ${conversationId}`); + } catch (error: any) { + if (error.code === 11000) { + throw new Error( + `Message with ID ${messageId} already exists in conversation ${conversationId}`, + ); + } + throw error; + } + } + + /** + * Add multiple messages + */ + async addMessages(messages: UIMessage[], userId: string, conversationId: string): Promise { + await this.initPromise; + + if (messages.length === 0) return; + + const messagesCollection = this.getCollection("messages"); + + // Ensure conversation exists + const conversation = await this.getConversation(conversationId); + if (!conversation) { + throw new ConversationNotFoundError(conversationId); + } + + const documentsToInsert = messages.map((message) => ({ + _id: undefined, // Let MongoDB generate ObjectId + conversationId, + messageId: message.id || this.generateId(), + userId, + role: message.role, + parts: message.parts, + metadata: message.metadata || {}, + formatVersion: 2, + createdAt: new Date(), + })); + + try { + await messagesCollection.insertMany(documentsToInsert as any); + this.log(`Added ${messages.length} messages to conversation ${conversationId}`); + } catch (error: any) { + if (error.code === 11000) { + throw new Error(`One or more messages already exist in conversation ${conversationId}`); + } + throw error; + } + } + + /** + * Get messages for a conversation + */ + async getMessages( + userId: string, + conversationId: string, + options?: GetMessagesOptions, + ): Promise[]> { + await this.initPromise; + + const messagesCollection = this.getCollection("messages"); + + const filter: any = { conversationId, userId }; + + if (options?.roles && options.roles.length > 0) { + filter.role = { $in: options.roles }; + } + + if (options?.after) { + filter.createdAt = { $gt: options.after }; + } + + if (options?.before) { + filter.createdAt = { ...filter.createdAt, $lt: options.before }; + } + + let cursor = messagesCollection.find(filter).sort({ createdAt: 1 }); + + if (options?.limit) { + cursor = cursor.limit(options.limit); + } + + const messages = await cursor.toArray(); + + return messages.map((msg: any) => ({ + id: msg.messageId, + role: msg.role, + parts: msg.parts, + metadata: { + ...msg.metadata, + createdAt: msg.createdAt, + }, + })); + } + + /** + * Clear messages for a conversation or all conversations for a user + */ + async clearMessages(userId: string, conversationId?: string): Promise { + await this.initPromise; + + const messagesCollection = this.getCollection("messages"); + const stepsCollection = this.getCollection("steps"); + + if (conversationId) { + // Clear messages for specific conversation + await messagesCollection.deleteMany({ conversationId, userId }); + await stepsCollection.deleteMany({ conversationId, userId }); + this.log(`Cleared messages for conversation ${conversationId}`); + } else { + // Clear all messages for user + // First get all conversation IDs for this user + const conversationsCollection = this.getCollection("conversations"); + const userConversations = await conversationsCollection + .find({ userId }) + .project({ _id: 1 }) + .toArray(); + + const conversationIds = userConversations.map((conv: any) => conv._id); + + if (conversationIds.length > 0) { + await messagesCollection.deleteMany({ conversationId: { $in: conversationIds } }); + await stepsCollection.deleteMany({ conversationId: { $in: conversationIds } }); + this.log(`Cleared all messages for user ${userId}`); + } + } + } + + // ============================================================================ + // Conversation Operations + // ============================================================================ + + /** + * Create a new conversation + */ + async createConversation(input: CreateConversationInput): Promise { + await this.initPromise; + + const conversationsCollection = this.getCollection("conversations"); + + // Check if conversation already exists + const existing = await conversationsCollection.findOne({ _id: input.id } as any); + if (existing) { + throw new ConversationAlreadyExistsError(input.id); + } + + const now = new Date(); + const conversation = { + _id: input.id, + resourceId: input.resourceId, + userId: input.userId, + title: input.title, + metadata: input.metadata || {}, + createdAt: now, + updatedAt: now, + }; + + await conversationsCollection.insertOne(conversation as any); + + this.log(`Created conversation ${input.id}`); + + return { + id: conversation._id, + resourceId: conversation.resourceId, + userId: conversation.userId, + title: conversation.title, + metadata: conversation.metadata, + createdAt: conversation.createdAt.toISOString(), + updatedAt: conversation.updatedAt.toISOString(), + }; + } + + /** + * Get a conversation by ID + */ + async getConversation(id: string): Promise { + await this.initPromise; + + const conversationsCollection = this.getCollection("conversations"); + const conversation = await conversationsCollection.findOne({ _id: id } as any); + + if (!conversation) { + return null; + } + + return { + id: (conversation as any)._id, + resourceId: (conversation as any).resourceId, + userId: (conversation as any).userId, + title: (conversation as any).title, + metadata: (conversation as any).metadata || {}, + createdAt: (conversation as any).createdAt.toISOString(), + updatedAt: (conversation as any).updatedAt.toISOString(), + }; + } + + /** + * Get all conversations for a resource + */ + async getConversations(resourceId: string): Promise { + await this.initPromise; + + const conversationsCollection = this.getCollection("conversations"); + const conversations = await conversationsCollection + .find({ resourceId } as any) + .sort({ updatedAt: -1 }) + .toArray(); + + return conversations.map((conv: any) => ({ + id: conv._id, + resourceId: conv.resourceId, + userId: conv.userId, + title: conv.title, + metadata: conv.metadata || {}, + createdAt: conv.createdAt.toISOString(), + updatedAt: conv.updatedAt.toISOString(), + })); + } + + /** + * Get all conversations for a user + */ + async getConversationsByUserId( + userId: string, + options?: Omit, + ): Promise { + return this.queryConversations({ ...options, userId }); + } + + /** + * Query conversations with filters + */ + async queryConversations(options: ConversationQueryOptions): Promise { + await this.initPromise; + + const conversationsCollection = this.getCollection("conversations"); + + const filter: any = {}; + + if (options.userId) { + filter.userId = options.userId; + } + + if (options.resourceId) { + filter.resourceId = options.resourceId; + } + + let cursor = conversationsCollection.find(filter).sort({ updatedAt: -1 }); + + if (options.limit) { + cursor = cursor.limit(options.limit); + } + + if (options.offset) { + cursor = cursor.skip(options.offset); + } + + const conversations = await cursor.toArray(); + + return conversations.map((conv: any) => ({ + id: conv._id, + resourceId: conv.resourceId, + userId: conv.userId, + title: conv.title, + metadata: conv.metadata || {}, + createdAt: conv.createdAt.toISOString(), + updatedAt: conv.updatedAt.toISOString(), + })); + } + + /** + * Update a conversation + */ + async updateConversation( + id: string, + updates: Partial>, + ): Promise { + await this.initPromise; + + const conversationsCollection = this.getCollection("conversations"); + + const updateDoc: any = { + updatedAt: new Date(), + }; + + if (updates.title !== undefined) { + updateDoc.title = updates.title; + } + + if (updates.metadata !== undefined) { + updateDoc.metadata = updates.metadata; + } + + if (updates.resourceId !== undefined) { + updateDoc.resourceId = updates.resourceId; + } + + if (updates.userId !== undefined) { + updateDoc.userId = updates.userId; + } + + const result = await conversationsCollection.findOneAndUpdate( + { _id: id } as any, + { $set: updateDoc }, + { returnDocument: "after" }, + ); + + if (!result) { + throw new ConversationNotFoundError(id); + } + + this.log(`Updated conversation ${id}`); + + return { + id: (result as any)._id, + resourceId: (result as any).resourceId, + userId: (result as any).userId, + title: (result as any).title, + metadata: (result as any).metadata || {}, + createdAt: (result as any).createdAt.toISOString(), + updatedAt: (result as any).updatedAt.toISOString(), + }; + } + + /** + * Delete a conversation + */ + async deleteConversation(id: string): Promise { + await this.initPromise; + + const conversationsCollection = this.getCollection("conversations"); + + // MongoDB will cascade delete messages and steps automatically via application logic + const messagesCollection = this.getCollection("messages"); + const stepsCollection = this.getCollection("steps"); + + await messagesCollection.deleteMany({ conversationId: id } as any); + await stepsCollection.deleteMany({ conversationId: id } as any); + await conversationsCollection.deleteOne({ _id: id } as any); + + this.log(`Deleted conversation ${id}`); + } + + // ============================================================================ + // Conversation Steps Operations + // ============================================================================ + + /** + * Save conversation steps + */ + async saveConversationSteps(steps: ConversationStepRecord[]): Promise { + await this.initPromise; + + if (steps.length === 0) return; + + const stepsCollection = this.getCollection("steps"); + + const operations = steps.map((step) => ({ + replaceOne: { + filter: { _id: step.id || this.generateId() }, + replacement: { + _id: step.id || this.generateId(), + conversationId: step.conversationId, + userId: step.userId, + agentId: step.agentId, + agentName: step.agentName, + operationId: step.operationId, + stepIndex: step.stepIndex, + type: step.type, + role: step.role, + content: step.content, + arguments: step.arguments, + result: step.result, + usage: step.usage, + subAgentId: step.subAgentId, + subAgentName: step.subAgentName, + createdAt: new Date(), + }, + upsert: true, + }, + })); + + await stepsCollection.bulkWrite(operations as any); + + this.log(`Saved ${steps.length} conversation steps`); + } + + /** + * Get conversation steps + */ + async getConversationSteps( + userId: string, + conversationId: string, + options?: GetConversationStepsOptions, + ): Promise { + await this.initPromise; + + const stepsCollection = this.getCollection("steps"); + + const filter: any = { conversationId, userId }; + + if (options?.operationId) { + filter.operationId = options.operationId; + } + + let cursor = stepsCollection.find(filter).sort({ stepIndex: 1 }); + + if (options?.limit) { + cursor = cursor.limit(options.limit); + } + + const steps = await cursor.toArray(); + + return steps.map((step: any) => ({ + id: step._id, + conversationId: step.conversationId, + userId: step.userId, + agentId: step.agentId, + agentName: step.agentName, + operationId: step.operationId, + stepIndex: step.stepIndex, + type: step.type, + role: step.role, + content: step.content, + arguments: step.arguments, + result: step.result, + usage: step.usage, + subAgentId: step.subAgentId, + subAgentName: step.subAgentName, + createdAt: step.createdAt.toISOString(), + })); + } + + // ============================================================================ + // Working Memory Operations + // ============================================================================ + + /** + * Get working memory + */ + async getWorkingMemory(params: { + conversationId?: string; + userId?: string; + scope: WorkingMemoryScope; + }): Promise { + await this.initPromise; + + if (params.scope === "conversation" && params.conversationId) { + const conversationsCollection = this.getCollection("conversations"); + const conversation = await conversationsCollection.findOne({ + _id: params.conversationId, + } as any); + + if (!conversation) { + return null; + } + + const workingMemory = (conversation as any).metadata?.workingMemory; + return workingMemory || null; + } + + if (params.scope === "user" && params.userId) { + const usersCollection = this.getCollection("users"); + const user = await usersCollection.findOne({ _id: params.userId } as any); + + if (!user) { + return null; + } + + const workingMemory = (user as any).metadata?.workingMemory; + return workingMemory || null; + } + + return null; + } + + /** + * Set working memory + */ + async setWorkingMemory(params: { + conversationId?: string; + userId?: string; + content: string; + scope: WorkingMemoryScope; + }): Promise { + await this.initPromise; + + if (params.scope === "conversation" && params.conversationId) { + const conversationsCollection = this.getCollection("conversations"); + + const conversation = await conversationsCollection.findOne({ + _id: params.conversationId, + } as any); + if (!conversation) { + throw new ConversationNotFoundError(params.conversationId); + } + + await conversationsCollection.updateOne({ _id: params.conversationId } as any, { + $set: { + "metadata.workingMemory": params.content, + updatedAt: new Date(), + }, + }); + + this.log(`Set working memory for conversation ${params.conversationId}`); + } else if (params.scope === "user" && params.userId) { + const usersCollection = this.getCollection("users"); + + // Upsert user document with working memory + await usersCollection.updateOne( + { _id: params.userId } as any, + { + $set: { + "metadata.workingMemory": params.content, + updatedAt: new Date(), + }, + $setOnInsert: { + createdAt: new Date(), + }, + }, + { upsert: true }, + ); + + this.log(`Set working memory for user ${params.userId}`); + } + } + + /** + * Delete working memory + */ + async deleteWorkingMemory(params: { + conversationId?: string; + userId?: string; + scope: WorkingMemoryScope; + }): Promise { + await this.initPromise; + + if (params.scope === "conversation" && params.conversationId) { + const conversationsCollection = this.getCollection("conversations"); + + await conversationsCollection.updateOne({ _id: params.conversationId } as any, { + $unset: { "metadata.workingMemory": "" }, + $set: { updatedAt: new Date() }, + }); + + this.log(`Deleted working memory for conversation ${params.conversationId}`); + } else if (params.scope === "user" && params.userId) { + const usersCollection = this.getCollection("users"); + + await usersCollection.updateOne({ _id: params.userId } as any, { + $unset: { "metadata.workingMemory": "" }, + $set: { updatedAt: new Date() }, + }); + + this.log(`Deleted working memory for user ${params.userId}`); + } + } + + // ============================================================================ + // Workflow State Operations + // ============================================================================ + + /** + * Get workflow state by execution ID + */ + async getWorkflowState(executionId: string): Promise { + await this.initPromise; + + const workflowStatesCollection = this.getCollection("workflow_states"); + const state = await workflowStatesCollection.findOne({ _id: executionId } as any); + + if (!state) { + return null; + } + + return { + id: (state as any)._id, + workflowId: (state as any).workflowId, + workflowName: (state as any).workflowName, + status: (state as any).status, + suspension: (state as any).suspension, + events: (state as any).events, + output: (state as any).output, + cancellation: (state as any).cancellation, + userId: (state as any).userId, + conversationId: (state as any).conversationId, + metadata: (state as any).metadata, + createdAt: (state as any).createdAt.toISOString(), + updatedAt: (state as any).updatedAt.toISOString(), + }; + } + + /** + * Query workflow runs with filters + */ + async queryWorkflowRuns(query: WorkflowRunQuery): Promise { + await this.initPromise; + + const workflowStatesCollection = this.getCollection("workflow_states"); + + const filter: any = {}; + + if (query.workflowId) { + filter.workflowId = query.workflowId; + } + + if (query.status) { + filter.status = query.status; + } + + if (query.from) { + filter.createdAt = { $gte: query.from }; + } + + if (query.to) { + filter.createdAt = { ...filter.createdAt, $lte: query.to }; + } + + let cursor = workflowStatesCollection.find(filter).sort({ createdAt: -1 }); + + if (query.limit) { + cursor = cursor.limit(query.limit); + } + + if (query.offset) { + cursor = cursor.skip(query.offset); + } + + const states = await cursor.toArray(); + + return states.map((state: any) => ({ + id: state._id, + workflowId: state.workflowId, + workflowName: state.workflowName, + status: state.status, + suspension: state.suspension, + events: state.events, + output: state.output, + cancellation: state.cancellation, + userId: state.userId, + conversationId: state.conversationId, + metadata: state.metadata, + createdAt: state.createdAt.toISOString(), + updatedAt: state.updatedAt.toISOString(), + })); + } + + /** + * Set workflow state (create or replace) + */ + async setWorkflowState(executionId: string, state: WorkflowStateEntry): Promise { + await this.initPromise; + + const workflowStatesCollection = this.getCollection("workflow_states"); + + const now = new Date(); + + await workflowStatesCollection.replaceOne( + { _id: executionId } as any, + { + _id: executionId, + workflowId: state.workflowId, + workflowName: state.workflowName, + status: state.status, + suspension: state.suspension, + events: state.events, + output: state.output, + cancellation: state.cancellation, + userId: state.userId, + conversationId: state.conversationId, + metadata: state.metadata, + createdAt: state.createdAt ? new Date(state.createdAt) : now, + updatedAt: now, + } as any, + { upsert: true }, + ); + + this.log(`Set workflow state ${executionId}`); + } + + /** + * Update workflow state (partial update) + */ + async updateWorkflowState( + executionId: string, + updates: Partial, + ): Promise { + await this.initPromise; + + const workflowStatesCollection = this.getCollection("workflow_states"); + + const updateDoc: any = { + updatedAt: new Date(), + }; + + if (updates.status !== undefined) { + updateDoc.status = updates.status; + } + + if (updates.suspension !== undefined) { + updateDoc.suspension = updates.suspension; + } + + if (updates.events !== undefined) { + updateDoc.events = updates.events; + } + + if (updates.output !== undefined) { + updateDoc.output = updates.output; + } + + if (updates.cancellation !== undefined) { + updateDoc.cancellation = updates.cancellation; + } + + if (updates.metadata !== undefined) { + updateDoc.metadata = updates.metadata; + } + + await workflowStatesCollection.updateOne({ _id: executionId } as any, { $set: updateDoc }); + + this.log(`Updated workflow state ${executionId}`); + } + + /** + * Get suspended workflow states + */ + async getSuspendedWorkflowStates(workflowId: string): Promise { + await this.initPromise; + + const workflowStatesCollection = this.getCollection("workflow_states"); + + const states = await workflowStatesCollection + .find({ workflowId, status: "suspended" } as any) + .sort({ createdAt: -1 }) + .toArray(); + + return states.map((state: any) => ({ + id: state._id, + workflowId: state.workflowId, + workflowName: state.workflowName, + status: state.status, + suspension: state.suspension, + events: state.events, + output: state.output, + cancellation: state.cancellation, + userId: state.userId, + conversationId: state.conversationId, + metadata: state.metadata, + createdAt: state.createdAt, + updatedAt: state.updatedAt, + })); + } +} diff --git a/packages/mongodb/tsconfig.json b/packages/mongodb/tsconfig.json new file mode 100644 index 000000000..fb0c6c836 --- /dev/null +++ b/packages/mongodb/tsconfig.json @@ -0,0 +1,30 @@ +{ + "compilerOptions": { + "target": "es2018", + "lib": ["dom", "dom.iterable", "esnext"], + "module": "esnext", + "moduleResolution": "node", + "declaration": true, + "declarationMap": true, + "sourceMap": true, + "outDir": "./dist", + "rootDir": "./", + "strict": true, + "noImplicitAny": true, + "strictNullChecks": true, + "strictFunctionTypes": true, + "strictBindCallApply": true, + "strictPropertyInitialization": true, + "noImplicitThis": true, + "noUnusedLocals": true, + "noUnusedParameters": true, + "noImplicitReturns": true, + "noFallthroughCasesInSwitch": true, + "esModuleInterop": true, + "skipLibCheck": true, + "forceConsistentCasingInFileNames": true, + "types": ["node", "vitest/globals"] + }, + "include": ["src/**/*.ts", "__tests__/**/*.ts"], + "exclude": ["node_modules", "dist"] +} diff --git a/packages/mongodb/tsup.config.ts b/packages/mongodb/tsup.config.ts new file mode 100644 index 000000000..0819104fd --- /dev/null +++ b/packages/mongodb/tsup.config.ts @@ -0,0 +1,19 @@ +import { defineConfig } from "tsup"; +import { markAsExternalPlugin } from "../shared/tsup-plugins/mark-as-external"; + +export default defineConfig({ + entry: ["src/index.ts"], + format: ["cjs", "esm"], + splitting: false, + sourcemap: true, + clean: false, + target: "es2022", + outDir: "dist", + minify: false, + dts: true, + esbuildPlugins: [markAsExternalPlugin], + esbuildOptions(options) { + options.keepNames = true; + return options; + }, +}); diff --git a/packages/mongodb/vitest.config.mts b/packages/mongodb/vitest.config.mts new file mode 100644 index 000000000..cc2bb3aa7 --- /dev/null +++ b/packages/mongodb/vitest.config.mts @@ -0,0 +1,10 @@ +import { defineConfig } from "vitest/config"; + +export default defineConfig({ + test: { + globals: true, + environment: "node", + include: ["src/**/*.{test,spec}.ts"], + exclude: ["src/**/*.integration.test.ts", "node_modules", "dist"], + }, +}); diff --git a/packages/mongodb/vitest.integration.config.mts b/packages/mongodb/vitest.integration.config.mts new file mode 100644 index 000000000..fc5c2ab76 --- /dev/null +++ b/packages/mongodb/vitest.integration.config.mts @@ -0,0 +1,10 @@ +import { defineConfig } from "vitest/config"; + +export default defineConfig({ + test: { + globals: true, + environment: "node", + include: ["src/**/*.integration.test.ts"], + testTimeout: 30000, + }, +}); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 9c77b482e..5f62cd597 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -4004,6 +4004,22 @@ importers: specifier: ^3.25.76 version: 3.25.76 + packages/mongodb: + dependencies: + mongodb: + specifier: ^7.0.0 + version: 7.0.0 + devDependencies: + '@vitest/coverage-v8': + specifier: ^3.2.4 + version: 3.2.4(vitest@3.2.4) + '@voltagent/core': + specifier: ^2.0.2 + version: link:../core + ai: + specifier: ^6.0.0 + version: 6.0.3(zod@4.2.1) + packages/postgres: dependencies: '@voltagent/internal': @@ -10979,6 +10995,12 @@ packages: os-filter-obj: 2.0.0 dev: true + /@mongodb-js/saslprep@1.4.4: + resolution: {integrity: sha512-p7X/ytJDIdwUfFL/CLOhKgdfJe1Fa8uw9seJYvdOmnP9JBWGWHW69HkOixXS6Wy9yvGf1MbhcS6lVmrhy4jm2g==} + dependencies: + sparse-bitfield: 3.0.3 + dev: false + /@mswjs/interceptors@0.40.0: resolution: {integrity: sha512-EFd6cVbHsgLa6wa4RljGj6Wk75qoHxUSyc5asLyyPSyuhIcdS2Q3Phw6ImS1q+CkALthJRShiYfKANcQMuMqsQ==} engines: {node: '>=18'} @@ -19120,6 +19142,16 @@ packages: resolution: {integrity: sha512-oIQLCGWtcFZy2JW77j9k8nHzAOpqMHLQejDA48XXMWH6tjCQHz5RCFz1bzsmROyL6PUm+LLnUiI4BCn221inxA==} dev: false + /@types/webidl-conversions@7.0.3: + resolution: {integrity: sha512-CiJJvcRtIgzadHCYXw7dqEnMNRjhGZlYK05Mj9OyktqV8uVT8fD2BFOB7S1uwBE3Kj2Z+4UyPmFw/Ixgw/LAlA==} + dev: false + + /@types/whatwg-url@13.0.0: + resolution: {integrity: sha512-N8WXpbE6Wgri7KUSvrmQcqrMllKZ9uxkYWMt+mCSGwNc0Hsw9VQTW7ApqI4XNrx6/SaM2QQJCzMPDEXE058s+Q==} + dependencies: + '@types/webidl-conversions': 7.0.3 + dev: false + /@types/wrap-ansi@3.0.0: resolution: {integrity: sha512-ltIpx+kM7g/MLRZfkbL7EsCEjfzCcScLpkg37eXEtx5kmrAKBkTJwd1GIAjDSL8wTpM6Hzn5YO4pSb91BEwu1g==} dev: false @@ -21781,6 +21813,11 @@ packages: node-int64: 0.4.0 dev: true + /bson@7.0.0: + resolution: {integrity: sha512-Kwc6Wh4lQ5OmkqqKhYGKIuELXl+EPYSCObVE6bWsp1T/cGkOCBN0I8wF/T44BiuhHyNi1mmKVPXk60d41xZ7kw==} + engines: {node: '>=20.19.0'} + dev: false + /buffer-crc32@0.2.13: resolution: {integrity: sha512-VO9Ht/+p3SN7SKWqcrgEzjGbRSJYTx+Q1pTQC0wrWqHx0vpJraQ6GtHx8tvcg1rlK1byhU5gccxgOgj7B0TDkQ==} dev: true @@ -30390,6 +30427,10 @@ packages: resolution: {integrity: sha512-rkpe71W0N0c0Xz6QD0eJETuWAJGnJ9afsl1srmwPrI+yBCkge5EycXXbYRyvL29zZVUWQCY7InPRCv3GDXuZNw==} dev: true + /memory-pager@1.5.0: + resolution: {integrity: sha512-ZS4Bp4r/Zoeq6+NLJpP+0Zzm0pR8whtGPf1XExKLJBAczGMnSi3It14OiNCStjQjM6NU1okjQGSxgEZN8eBYKg==} + dev: false + /meow@12.1.1: resolution: {integrity: sha512-BhXM0Au22RwUneMPwSCnyhTOizdWoIEPU9sp0Aqa1PnDMR5Wv2FGXYDjuzJEIX+Eo2Rb8xuYe5jrnm5QowQFkw==} engines: {node: '>=16.10'} @@ -31223,6 +31264,46 @@ packages: micro-memoize: 4.1.3 dev: true + /mongodb-connection-string-url@7.0.0: + resolution: {integrity: sha512-irhhjRVLE20hbkRl4zpAYLnDMM+zIZnp0IDB9akAFFUZp/3XdOfwwddc7y6cNvF2WCEtfTYRwYbIfYa2kVY0og==} + engines: {node: '>=20.19.0'} + dependencies: + '@types/whatwg-url': 13.0.0 + whatwg-url: 14.2.0 + dev: false + + /mongodb@7.0.0: + resolution: {integrity: sha512-vG/A5cQrvGGvZm2mTnCSz1LUcbOPl83hfB6bxULKQ8oFZauyox/2xbZOoGNl+64m8VBrETkdGCDBdOsCr3F3jg==} + engines: {node: '>=20.19.0'} + peerDependencies: + '@aws-sdk/credential-providers': ^3.806.0 + '@mongodb-js/zstd': ^7.0.0 + gcp-metadata: ^7.0.1 + kerberos: ^7.0.0 + mongodb-client-encryption: '>=7.0.0 <7.1.0' + snappy: ^7.3.2 + socks: ^2.8.6 + peerDependenciesMeta: + '@aws-sdk/credential-providers': + optional: true + '@mongodb-js/zstd': + optional: true + gcp-metadata: + optional: true + kerberos: + optional: true + mongodb-client-encryption: + optional: true + snappy: + optional: true + socks: + optional: true + dependencies: + '@mongodb-js/saslprep': 1.4.4 + bson: 7.0.0 + mongodb-connection-string-url: 7.0.0 + dev: false + /motion-dom@12.23.12: resolution: {integrity: sha512-RcR4fvMCTESQBD/uKQe49D5RUeDOokkGRmz4ceaJKDBgHYtZtntC/s2vLvY38gqGaytinij/yi3hMcWVcEF5Kw==} dependencies: @@ -36266,6 +36347,12 @@ packages: /space-separated-tokens@2.0.2: resolution: {integrity: sha512-PEGlAwrG8yXGXRjW32fGbg66JAlOAwbObuqVoJpv/mRgoWDQfgH1wDPvtzWyUSNAXBGSk8h755YDbbcEy3SH2Q==} + /sparse-bitfield@3.0.3: + resolution: {integrity: sha512-kvzhi7vqKTfkh0PZU+2D2PIllw2ymqJKujUcyPMd9Y75Nv4nPbGJZXNhxsgdQab2BmlDct1YnfQCguEvHr7VsQ==} + dependencies: + memory-pager: 1.5.0 + dev: false + /spawndamnit@3.0.1: resolution: {integrity: sha512-MmnduQUuHCoFckZoWnXsTg7JaiLBJrKFj9UI2MbRPGaJeVpsLcVBu6P/IGZovziM/YBsellCmsprgNA+w0CzVg==} dependencies: @@ -37373,6 +37460,13 @@ packages: punycode: 2.3.1 dev: true + /tr46@5.1.1: + resolution: {integrity: sha512-hdF5ZgjTqgAntKkklYw0R03MG2x/bSzTtkxmIRw/sTNV8YXsCJ1tfLAX23lhxhHJlEf3CRCOCGGWw3vI3GaSPw==} + engines: {node: '>=18'} + dependencies: + punycode: 2.3.1 + dev: false + /tree-kill@1.2.2: resolution: {integrity: sha512-L0Orpi8qGpRG//Nd+H90vFB+3iHnue1zSSGmNOOCh1GLJ7rUKVwV2HvijphGQS2UmhUZewS9VgvxYIdgr+fG1A==} hasBin: true @@ -39529,7 +39623,6 @@ packages: /webidl-conversions@7.0.0: resolution: {integrity: sha512-VwddBukDzu71offAQR975unBIGqfKZpM+8ZX6ySk8nYhVoo5CYaZyzt3YBvYtRtO+aoGlqxPg/B87NGVZ/fu6g==} engines: {node: '>=12'} - dev: true /webpack-node-externals@3.0.0: resolution: {integrity: sha512-LnL6Z3GGDPht/AigwRh2dvL9PQPFQ8skEpVrWZXLWBYmqcaojHNN0onvHzie6rq7EWKrrBfPYqNEzTJgiwEQDQ==} @@ -39621,6 +39714,14 @@ packages: webidl-conversions: 7.0.0 dev: true + /whatwg-url@14.2.0: + resolution: {integrity: sha512-De72GdQZzNTUBBChsXueQUnPKDkg/5A5zp7pFDuQAj5UFoENpiACU0wlCvzpAGnTkj++ihpKwKyYewn/XNUbKw==} + engines: {node: '>=18'} + dependencies: + tr46: 5.1.1 + webidl-conversions: 7.0.0 + dev: false + /whatwg-url@5.0.0: resolution: {integrity: sha512-saE57nupxk6v3HY35+jzBwYa0rKSy0XR8JSxZPwgLr7ys0IBzhGviA1/TUGJLmSVqs8pb9AnvICXEuOHLprYTw==} dependencies: From d59ad0e5e0f0322139e30c21abb365ef159eb6b7 Mon Sep 17 00:00:00 2001 From: UmeshpJadhav Date: Sat, 31 Jan 2026 11:00:16 +0530 Subject: [PATCH 2/7] fix: fixed the build failure and the implemented code review comments --- packages/mongodb/README.md | 12 ++--- packages/mongodb/src/memory-adapter.spec.ts | 57 +++++++++++---------- packages/mongodb/src/memory-adapter.ts | 57 +++++++++++---------- pnpm-lock.yaml | 24 +++++++-- 4 files changed, 86 insertions(+), 64 deletions(-) diff --git a/packages/mongodb/README.md b/packages/mongodb/README.md index 14232061e..f3e5ce150 100644 --- a/packages/mongodb/README.md +++ b/packages/mongodb/README.md @@ -32,9 +32,9 @@ const memory = new Memory({ ## Configuration -| Option | Type | Default | Description | -| str | str | str | str | -| `connection` | `string` | required | MongoDB connection URI | -| `database` | `string` | `"voltagent"` | Database name | -| `collectionPrefix` | `string` | `"voltagent_memory"` | Prefix for collections | -| `debug` | `boolean` | `false` | Enable debug logging | +| Option | Type | Default | Description | +| ------------------ | --------- | -------------------- | ---------------------- | +| `connection` | `string` | required | MongoDB connection URI | +| `database` | `string` | `"voltagent"` | Database name | +| `collectionPrefix` | `string` | `"voltagent_memory"` | Prefix for collections | +| `debug` | `boolean` | `false` | Enable debug logging | diff --git a/packages/mongodb/src/memory-adapter.spec.ts b/packages/mongodb/src/memory-adapter.spec.ts index c87fee035..96dac79f4 100644 --- a/packages/mongodb/src/memory-adapter.spec.ts +++ b/packages/mongodb/src/memory-adapter.spec.ts @@ -577,30 +577,33 @@ export class MongoDBMemoryAdapter implements StorageAdapter { const stepsCollection = this.getCollection("steps"); - const operations = steps.map((step) => ({ - replaceOne: { - filter: { _id: step.id || this.generateId() }, - replacement: { - _id: step.id || this.generateId(), - conversationId: step.conversationId, - userId: step.userId, - agentId: step.agentId, - agentName: step.agentName, - operationId: step.operationId, - stepIndex: step.stepIndex, - type: step.type, - role: step.role, - content: step.content, - arguments: step.arguments, - result: step.result, - usage: step.usage, - subAgentId: step.subAgentId, - subAgentName: step.subAgentName, - createdAt: new Date(), + const operations = steps.map((step) => { + const id = step.id || this.generateId(); + return { + replaceOne: { + filter: { _id: id }, + replacement: { + _id: id, + conversationId: step.conversationId, + userId: step.userId, + agentId: step.agentId, + agentName: step.agentName, + operationId: step.operationId, + stepIndex: step.stepIndex, + type: step.type, + role: step.role, + content: step.content, + arguments: step.arguments, + result: step.result, + usage: step.usage, + subAgentId: step.subAgentId, + subAgentName: step.subAgentName, + createdAt: new Date(), + }, + upsert: true, }, - upsert: true, - }, - })); + }; + }); await stepsCollection.bulkWrite(operations as any); @@ -807,8 +810,8 @@ export class MongoDBMemoryAdapter implements StorageAdapter { userId: (state as any).userId, conversationId: (state as any).conversationId, metadata: (state as any).metadata, - createdAt: (state as any).createdAt.toISOString(), - updatedAt: (state as any).updatedAt.toISOString(), + createdAt: (state as any).createdAt, + updatedAt: (state as any).updatedAt, }; } @@ -862,8 +865,8 @@ export class MongoDBMemoryAdapter implements StorageAdapter { userId: state.userId, conversationId: state.conversationId, metadata: state.metadata, - createdAt: state.createdAt.toISOString(), - updatedAt: state.updatedAt.toISOString(), + createdAt: state.createdAt, + updatedAt: state.updatedAt, })); } diff --git a/packages/mongodb/src/memory-adapter.ts b/packages/mongodb/src/memory-adapter.ts index b840d5559..942e6d4f0 100644 --- a/packages/mongodb/src/memory-adapter.ts +++ b/packages/mongodb/src/memory-adapter.ts @@ -578,30 +578,33 @@ export class MongoDBMemoryAdapter implements StorageAdapter { const stepsCollection = this.getCollection("steps"); - const operations = steps.map((step) => ({ - replaceOne: { - filter: { _id: step.id || this.generateId() }, - replacement: { - _id: step.id || this.generateId(), - conversationId: step.conversationId, - userId: step.userId, - agentId: step.agentId, - agentName: step.agentName, - operationId: step.operationId, - stepIndex: step.stepIndex, - type: step.type, - role: step.role, - content: step.content, - arguments: step.arguments, - result: step.result, - usage: step.usage, - subAgentId: step.subAgentId, - subAgentName: step.subAgentName, - createdAt: new Date(), + const operations = steps.map((step) => { + const id = step.id || this.generateId(); + return { + replaceOne: { + filter: { _id: id }, + replacement: { + _id: id, + conversationId: step.conversationId, + userId: step.userId, + agentId: step.agentId, + agentName: step.agentName, + operationId: step.operationId, + stepIndex: step.stepIndex, + type: step.type, + role: step.role, + content: step.content, + arguments: step.arguments, + result: step.result, + usage: step.usage, + subAgentId: step.subAgentId, + subAgentName: step.subAgentName, + createdAt: new Date(), + }, + upsert: true, }, - upsert: true, - }, - })); + }; + }); await stepsCollection.bulkWrite(operations as any); @@ -808,8 +811,8 @@ export class MongoDBMemoryAdapter implements StorageAdapter { userId: (state as any).userId, conversationId: (state as any).conversationId, metadata: (state as any).metadata, - createdAt: (state as any).createdAt.toISOString(), - updatedAt: (state as any).updatedAt.toISOString(), + createdAt: (state as any).createdAt, + updatedAt: (state as any).updatedAt, }; } @@ -863,8 +866,8 @@ export class MongoDBMemoryAdapter implements StorageAdapter { userId: state.userId, conversationId: state.conversationId, metadata: state.metadata, - createdAt: state.createdAt.toISOString(), - updatedAt: state.updatedAt.toISOString(), + createdAt: state.createdAt, + updatedAt: state.updatedAt, })); } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 5f62cd597..146c7ab9d 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -3005,7 +3005,7 @@ importers: version: link:../../packages/server-hono '@voltagent/voice': specifier: ^2.0.2 - version: link:../../packages/voice + version: 2.1.0(@voltagent/core@packages+core)(zod@3.25.76) ai: specifier: ^6.0.0 version: 6.0.3(zod@3.25.76) @@ -3042,7 +3042,7 @@ importers: version: link:../../packages/server-hono '@voltagent/voice': specifier: ^2.0.2 - version: link:../../packages/voice + version: 2.1.0(@voltagent/core@packages+core)(zod@3.25.76) ai: specifier: ^6.0.0 version: 6.0.3(zod@3.25.76) @@ -3085,7 +3085,7 @@ importers: version: link:../../packages/server-hono '@voltagent/voice': specifier: ^2.0.2 - version: link:../../packages/voice + version: 2.1.0(@voltagent/core@packages+core)(zod@3.25.76) ai: specifier: ^6.0.0 version: 6.0.3(zod@3.25.76) @@ -4018,7 +4018,7 @@ importers: version: link:../core ai: specifier: ^6.0.0 - version: 6.0.3(zod@4.2.1) + version: 6.0.3(zod@4.3.5) packages/postgres: dependencies: @@ -20003,6 +20003,22 @@ packages: zod: 3.25.76 dev: false + /@voltagent/voice@2.1.0(@voltagent/core@packages+core)(zod@3.25.76): + resolution: {integrity: sha512-TByV1ci+aV4q6cgNImZCDsMAkEP6tRXhcEF+WrXHisldCNynNfp8WiJvRwNdRRPkLVaqqXmj20Mnj0j6Avr81w==} + peerDependencies: + '@voltagent/core': ^2.0.0 + dependencies: + '@voltagent/core': link:packages/core + '@xsai/generate-speech': 0.4.0-beta.1 + '@xsai/generate-transcription': 0.4.0-beta.1 + elevenlabs: 1.59.0 + openai: 4.104.0(ws@8.18.3)(zod@3.25.76) + transitivePeerDependencies: + - encoding + - ws + - zod + dev: false + /@vue-macros/common@3.0.0-beta.16(vue@3.5.22): resolution: {integrity: sha512-8O2gWxWFiaoNkk7PGi0+p7NPGe/f8xJ3/INUufvje/RZOs7sJvlI1jnR4lydtRFa/mU0ylMXUXXjSK0fHDEYTA==} engines: {node: '>=20.18.0'} From ef863b8e57a0f815577a96280e5e83cacd60e9fa Mon Sep 17 00:00:00 2001 From: UmeshpJadhav Date: Sat, 31 Jan 2026 11:14:31 +0530 Subject: [PATCH 3/7] fix: handle race condition in createConversation --- packages/mongodb/src/memory-adapter.ts | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/packages/mongodb/src/memory-adapter.ts b/packages/mongodb/src/memory-adapter.ts index 942e6d4f0..85541d414 100644 --- a/packages/mongodb/src/memory-adapter.ts +++ b/packages/mongodb/src/memory-adapter.ts @@ -361,12 +361,6 @@ export class MongoDBMemoryAdapter implements StorageAdapter { const conversationsCollection = this.getCollection("conversations"); - // Check if conversation already exists - const existing = await conversationsCollection.findOne({ _id: input.id } as any); - if (existing) { - throw new ConversationAlreadyExistsError(input.id); - } - const now = new Date(); const conversation = { _id: input.id, @@ -378,7 +372,14 @@ export class MongoDBMemoryAdapter implements StorageAdapter { updatedAt: now, }; - await conversationsCollection.insertOne(conversation as any); + try { + await conversationsCollection.insertOne(conversation as any); + } catch (error: any) { + if (error.code === 11000) { + throw new ConversationAlreadyExistsError(input.id); + } + throw error; + } this.log(`Created conversation ${input.id}`); From 76dab7a318548a5a72a8d84aaa4f80ed3cb2c9b7 Mon Sep 17 00:00:00 2001 From: UmeshpJadhav Date: Sat, 31 Jan 2026 11:15:59 +0530 Subject: [PATCH 4/7] fix: explicitly externalize dependencies in tsup build --- packages/mongodb/tsup.config.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/mongodb/tsup.config.ts b/packages/mongodb/tsup.config.ts index 0819104fd..ae0c8c510 100644 --- a/packages/mongodb/tsup.config.ts +++ b/packages/mongodb/tsup.config.ts @@ -11,6 +11,7 @@ export default defineConfig({ outDir: "dist", minify: false, dts: true, + external: ["@voltagent/core", "ai", "mongodb"], esbuildPlugins: [markAsExternalPlugin], esbuildOptions(options) { options.keepNames = true; From be1fd7143e5db61bb476c68708e54507ed22a541 Mon Sep 17 00:00:00 2001 From: UmeshpJadhav Date: Sat, 31 Jan 2026 11:29:08 +0530 Subject: [PATCH 5/7] fix: implement missing StorageAdapter methods --- packages/mongodb/src/memory-adapter.spec.ts | 17 ++++++------ packages/mongodb/src/memory-adapter.ts | 29 +++++++++++++++++++-- 2 files changed, 36 insertions(+), 10 deletions(-) diff --git a/packages/mongodb/src/memory-adapter.spec.ts b/packages/mongodb/src/memory-adapter.spec.ts index 96dac79f4..d96987eae 100644 --- a/packages/mongodb/src/memory-adapter.spec.ts +++ b/packages/mongodb/src/memory-adapter.spec.ts @@ -358,13 +358,7 @@ export class MongoDBMemoryAdapter implements StorageAdapter { async createConversation(input: CreateConversationInput): Promise { await this.initPromise; - const conversationsCollection = this.getCollection("conversations"); - - // Check if conversation already exists - const existing = await conversationsCollection.findOne({ _id: input.id } as any); - if (existing) { - throw new ConversationAlreadyExistsError(input.id); - } + const conversationsCollection = this.getCollection("conversations"); const now = new Date(); const conversation = { @@ -377,7 +371,14 @@ export class MongoDBMemoryAdapter implements StorageAdapter { updatedAt: now, }; - await conversationsCollection.insertOne(conversation as any); + try { + await conversationsCollection.insertOne(conversation as any); + } catch (error: any) { + if (error.code === 11000) { + throw new ConversationAlreadyExistsError(input.id); + } + throw error; + } this.log(`Created conversation ${input.id}`); diff --git a/packages/mongodb/src/memory-adapter.ts b/packages/mongodb/src/memory-adapter.ts index 85541d414..812d25391 100644 --- a/packages/mongodb/src/memory-adapter.ts +++ b/packages/mongodb/src/memory-adapter.ts @@ -316,6 +316,21 @@ export class MongoDBMemoryAdapter implements StorageAdapter { })); } + /** + * Delete all messages for a conversation + */ + async deleteMessages(conversationId: string): Promise { + await this.initPromise; + + const messagesCollection = this.getCollection("messages"); + const stepsCollection = this.getCollection("steps"); + + await messagesCollection.deleteMany({ conversationId }); + await stepsCollection.deleteMany({ conversationId }); + + this.log(`Deleted messages for conversation ${conversationId}`); + } + /** * Clear messages for a conversation or all conversations for a user */ @@ -359,7 +374,7 @@ export class MongoDBMemoryAdapter implements StorageAdapter { async createConversation(input: CreateConversationInput): Promise { await this.initPromise; - const conversationsCollection = this.getCollection("conversations"); + const conversationsCollection = this.getCollection("conversations"); const now = new Date(); const conversation = { @@ -373,7 +388,7 @@ export class MongoDBMemoryAdapter implements StorageAdapter { }; try { - await conversationsCollection.insertOne(conversation as any); + await conversationsCollection.insertOne(conversation); } catch (error: any) { if (error.code === 11000) { throw new ConversationAlreadyExistsError(input.id); @@ -394,6 +409,16 @@ export class MongoDBMemoryAdapter implements StorageAdapter { }; } + /** + * Count conversations for a user + */ + async countConversations(userId: string): Promise { + await this.initPromise; + + const conversationsCollection = this.getCollection("conversations"); + return conversationsCollection.countDocuments({ userId }); + } + /** * Get a conversation by ID */ From 545a62ab94d4d1005040c823db36054d474b359c Mon Sep 17 00:00:00 2001 From: UmeshpJadhav Date: Sat, 31 Jan 2026 11:37:40 +0530 Subject: [PATCH 6/7] fix: resolve security vulnerability in deleteMessages and implement missing methods --- packages/mongodb/src/memory-adapter.ts | 34 +++++++++++++++++++------- 1 file changed, 25 insertions(+), 9 deletions(-) diff --git a/packages/mongodb/src/memory-adapter.ts b/packages/mongodb/src/memory-adapter.ts index 812d25391..c4e17233b 100644 --- a/packages/mongodb/src/memory-adapter.ts +++ b/packages/mongodb/src/memory-adapter.ts @@ -317,18 +317,24 @@ export class MongoDBMemoryAdapter implements StorageAdapter { } /** - * Delete all messages for a conversation + * Delete specific messages */ - async deleteMessages(conversationId: string): Promise { + async deleteMessages( + messageIds: string[], + userId: string, + conversationId: string, + ): Promise { await this.initPromise; const messagesCollection = this.getCollection("messages"); - const stepsCollection = this.getCollection("steps"); - await messagesCollection.deleteMany({ conversationId }); - await stepsCollection.deleteMany({ conversationId }); + await messagesCollection.deleteMany({ + conversationId, + userId, + messageId: { $in: messageIds }, + }); - this.log(`Deleted messages for conversation ${conversationId}`); + this.log(`Deleted ${messageIds.length} messages from conversation ${conversationId}`); } /** @@ -410,13 +416,23 @@ export class MongoDBMemoryAdapter implements StorageAdapter { } /** - * Count conversations for a user + * Count conversations based on filters */ - async countConversations(userId: string): Promise { + async countConversations(options: ConversationQueryOptions): Promise { await this.initPromise; const conversationsCollection = this.getCollection("conversations"); - return conversationsCollection.countDocuments({ userId }); + const filter: any = {}; + + if (options.userId) { + filter.userId = options.userId; + } + + if (options.resourceId) { + filter.resourceId = options.resourceId; + } + + return conversationsCollection.countDocuments(filter); } /** From 7d0eb68835f0c41c402bc6a72c366c80f4c867b5 Mon Sep 17 00:00:00 2001 From: UmeshpJadhav Date: Sat, 31 Jan 2026 12:11:58 +0530 Subject: [PATCH 7/7] fix: refactor ID handling, update logic, and tests --- packages/mongodb/src/memory-adapter.spec.ts | 1047 ++----------------- packages/mongodb/src/memory-adapter.ts | 42 +- 2 files changed, 92 insertions(+), 997 deletions(-) diff --git a/packages/mongodb/src/memory-adapter.spec.ts b/packages/mongodb/src/memory-adapter.spec.ts index d96987eae..82caa89ba 100644 --- a/packages/mongodb/src/memory-adapter.spec.ts +++ b/packages/mongodb/src/memory-adapter.spec.ts @@ -1,980 +1,73 @@ -/** - * MongoDB Storage Adapter for Memory - * Stores conversations and messages in MongoDB database - */ - -import { ConversationAlreadyExistsError, ConversationNotFoundError } from "@voltagent/core"; -import type { - Conversation, - ConversationQueryOptions, - ConversationStepRecord, - CreateConversationInput, - GetConversationStepsOptions, - GetMessagesOptions, - StorageAdapter, - WorkflowRunQuery, - WorkflowStateEntry, - WorkingMemoryScope, -} from "@voltagent/core"; -import type { UIMessage } from "ai"; -import { type Collection, type Db, type Document, MongoClient } from "mongodb"; - -/** - * MongoDB configuration options for Memory - */ -export interface MongoDBMemoryOptions { - /** - * MongoDB connection URI - * Examples: - * - "mongodb://localhost:27017" - * - "mongodb://username:password@localhost:27017" - * - "mongodb+srv://username:password@cluster.mongodb.net" - */ - connection: string; - - /** - * Database name to use for collections - * @default "voltagent" - */ - database?: string; - - /** - * Prefix for collection names - * @default "voltagent_memory" - */ - collectionPrefix?: string; - - /** - * Whether to enable debug logging - * @default false - */ - debug?: boolean; -} - -/** - * MongoDB Storage Adapter for Memory - * Production-ready storage for conversations and messages - */ -export class MongoDBMemoryAdapter implements StorageAdapter { - private client: MongoClient; - private db: Db | null = null; - private databaseName: string; - private collectionPrefix: string; - private initialized = false; - private initPromise: Promise | null = null; - private debug: boolean; - - constructor(options: MongoDBMemoryOptions) { - this.databaseName = options.database ?? "voltagent"; - this.collectionPrefix = options.collectionPrefix ?? "voltagent_memory"; - this.debug = options.debug ?? false; - - // Validate collection prefix - if ( - this.collectionPrefix.includes("\0") || - this.collectionPrefix.includes("$") || - this.collectionPrefix.startsWith("system.") - ) { - throw new Error(`Invalid collection prefix: ${this.collectionPrefix}`); - } - - // Create MongoDB client - this.client = new MongoClient(options.connection); - - this.log("MongoDB Memory adapter initialized"); - - // Start initialization but don't await it - this.initPromise = this.initialize(); - } - - /** - * Log debug messages - */ - private log(...args: any[]): void { - if (this.debug) { - console.log("[MongoDB Memory]", ...args); - } - } - - /** - * Generate a random ID - */ - private generateId(): string { - return ( - Math.random().toString(36).substring(2, 15) + Math.random().toString(36).substring(2, 15) - ); - } - - /** - * Get collection by name - */ - private getCollection(collectionName: string): Collection { - if (!this.db) { - throw new Error("Database not initialized"); - } - return this.db.collection(`${this.collectionPrefix}_${collectionName}`); - } - - /** - * Initialize database schema - */ - private async initialize(): Promise { - if (this.initialized) return; - - // Prevent multiple simultaneous initializations - if (this.initPromise && !this.initialized) { - return this.initPromise; - } - - try { - // Connect to MongoDB - await this.client.connect(); - this.db = this.client.db(this.databaseName); - - this.log(`Connected to MongoDB database: ${this.databaseName}`); - - // Create indexes for all collections - const conversationsCollection = this.getCollection("conversations"); - const messagesCollection = this.getCollection("messages"); - const workflowStatesCollection = this.getCollection("workflow_states"); - const stepsCollection = this.getCollection("steps"); - - // Users collection indexes (none needed beyond _id) - - // Conversations collection indexes - await conversationsCollection.createIndex({ userId: 1 }, { background: true }); - await conversationsCollection.createIndex({ resourceId: 1 }, { background: true }); - await conversationsCollection.createIndex({ updatedAt: -1 }, { background: true }); - - // Messages collection indexes - await messagesCollection.createIndex( - { conversationId: 1, createdAt: 1 }, - { background: true }, - ); - await messagesCollection.createIndex({ conversationId: 1 }, { background: true }); - // Unique compound index to enforce message uniqueness - await messagesCollection.createIndex( - { conversationId: 1, messageId: 1 }, - { unique: true, background: true }, - ); - - // Workflow states collection indexes - await workflowStatesCollection.createIndex({ workflowId: 1 }, { background: true }); - await workflowStatesCollection.createIndex({ status: 1 }, { background: true }); - await workflowStatesCollection.createIndex({ createdAt: -1 }, { background: true }); - - // Steps collection indexes - await stepsCollection.createIndex({ conversationId: 1, stepIndex: 1 }, { background: true }); - await stepsCollection.createIndex( - { conversationId: 1, operationId: 1 }, - { background: true }, - ); - - this.initialized = true; - this.log("Database schema initialized with indexes"); - } catch (error) { - throw new Error( - `Failed to connect to MongoDB: ${error instanceof Error ? error.message : String(error)}`, - ); - } - } - - /** - * Close MongoDB connection - */ - async close(): Promise { - await this.client.close(); - this.log("MongoDB connection closed"); - } - - // ============================================================================ - // Message Operations - // ============================================================================ - - /** - * Add a single message - */ - async addMessage(message: UIMessage, userId: string, conversationId: string): Promise { - await this.initPromise; - - const messagesCollection = this.getCollection("messages"); - - // Ensure conversation exists - const conversation = await this.getConversation(conversationId); - if (!conversation) { - throw new ConversationNotFoundError(conversationId); - } - - const messageId = message.id || this.generateId(); - - try { - await messagesCollection.insertOne({ - _id: undefined, // Let MongoDB generate ObjectId - conversationId, - messageId, - userId, - role: message.role, - parts: message.parts, - metadata: message.metadata || {}, - formatVersion: 2, - createdAt: new Date(), - } as any); - - this.log(`Added message ${messageId} to conversation ${conversationId}`); - } catch (error: any) { - if (error.code === 11000) { - throw new Error( - `Message with ID ${messageId} already exists in conversation ${conversationId}`, - ); - } - throw error; - } - } - - /** - * Add multiple messages - */ - async addMessages(messages: UIMessage[], userId: string, conversationId: string): Promise { - await this.initPromise; - - if (messages.length === 0) return; - - const messagesCollection = this.getCollection("messages"); - - // Ensure conversation exists - const conversation = await this.getConversation(conversationId); - if (!conversation) { - throw new ConversationNotFoundError(conversationId); - } - - const documentsToInsert = messages.map((message) => ({ - _id: undefined, // Let MongoDB generate ObjectId - conversationId, - messageId: message.id || this.generateId(), - userId, - role: message.role, - parts: message.parts, - metadata: message.metadata || {}, - formatVersion: 2, - createdAt: new Date(), - })); - - try { - await messagesCollection.insertMany(documentsToInsert as any); - this.log(`Added ${messages.length} messages to conversation ${conversationId}`); - } catch (error: any) { - if (error.code === 11000) { - throw new Error(`One or more messages already exist in conversation ${conversationId}`); - } - throw error; - } - } - - /** - * Get messages for a conversation - */ - async getMessages( - userId: string, - conversationId: string, - options?: GetMessagesOptions, - ): Promise[]> { - await this.initPromise; - - const messagesCollection = this.getCollection("messages"); - - const filter: any = { conversationId, userId }; - - if (options?.roles && options.roles.length > 0) { - filter.role = { $in: options.roles }; - } - - if (options?.after) { - filter.createdAt = { $gt: options.after }; - } - - if (options?.before) { - filter.createdAt = { ...filter.createdAt, $lt: options.before }; - } - - let cursor = messagesCollection.find(filter).sort({ createdAt: 1 }); - - if (options?.limit) { - cursor = cursor.limit(options.limit); - } - - const messages = await cursor.toArray(); - - return messages.map((msg: any) => ({ - id: msg.messageId, - role: msg.role, - parts: msg.parts, - metadata: { - ...msg.metadata, - createdAt: msg.createdAt, - }, - })); - } - - /** - * Clear messages for a conversation or all conversations for a user - */ - async clearMessages(userId: string, conversationId?: string): Promise { - await this.initPromise; - - const messagesCollection = this.getCollection("messages"); - const stepsCollection = this.getCollection("steps"); - - if (conversationId) { - // Clear messages for specific conversation - await messagesCollection.deleteMany({ conversationId, userId }); - await stepsCollection.deleteMany({ conversationId, userId }); - this.log(`Cleared messages for conversation ${conversationId}`); - } else { - // Clear all messages for user - // First get all conversation IDs for this user - const conversationsCollection = this.getCollection("conversations"); - const userConversations = await conversationsCollection - .find({ userId }) - .project({ _id: 1 }) - .toArray(); - - const conversationIds = userConversations.map((conv: any) => conv._id); - - if (conversationIds.length > 0) { - await messagesCollection.deleteMany({ conversationId: { $in: conversationIds } }); - await stepsCollection.deleteMany({ conversationId: { $in: conversationIds } }); - this.log(`Cleared all messages for user ${userId}`); - } - } - } - - // ============================================================================ - // Conversation Operations - // ============================================================================ - - /** - * Create a new conversation - */ - async createConversation(input: CreateConversationInput): Promise { - await this.initPromise; - - const conversationsCollection = this.getCollection("conversations"); - - const now = new Date(); - const conversation = { - _id: input.id, - resourceId: input.resourceId, - userId: input.userId, - title: input.title, - metadata: input.metadata || {}, - createdAt: now, - updatedAt: now, - }; - - try { - await conversationsCollection.insertOne(conversation as any); - } catch (error: any) { - if (error.code === 11000) { - throw new ConversationAlreadyExistsError(input.id); - } - throw error; - } - - this.log(`Created conversation ${input.id}`); - - return { - id: conversation._id, - resourceId: conversation.resourceId, - userId: conversation.userId, - title: conversation.title, - metadata: conversation.metadata, - createdAt: conversation.createdAt.toISOString(), - updatedAt: conversation.updatedAt.toISOString(), - }; - } - - /** - * Get a conversation by ID - */ - async getConversation(id: string): Promise { - await this.initPromise; - - const conversationsCollection = this.getCollection("conversations"); - const conversation = await conversationsCollection.findOne({ _id: id } as any); - - if (!conversation) { - return null; - } - - return { - id: (conversation as any)._id, - resourceId: (conversation as any).resourceId, - userId: (conversation as any).userId, - title: (conversation as any).title, - metadata: (conversation as any).metadata || {}, - createdAt: (conversation as any).createdAt.toISOString(), - updatedAt: (conversation as any).updatedAt.toISOString(), - }; - } - - /** - * Get all conversations for a resource - */ - async getConversations(resourceId: string): Promise { - await this.initPromise; - - const conversationsCollection = this.getCollection("conversations"); - const conversations = await conversationsCollection - .find({ resourceId } as any) - .sort({ updatedAt: -1 }) - .toArray(); - - return conversations.map((conv: any) => ({ - id: conv._id, - resourceId: conv.resourceId, - userId: conv.userId, - title: conv.title, - metadata: conv.metadata || {}, - createdAt: conv.createdAt.toISOString(), - updatedAt: conv.updatedAt.toISOString(), - })); - } - - /** - * Get all conversations for a user - */ - async getConversationsByUserId( - userId: string, - options?: Omit, - ): Promise { - return this.queryConversations({ ...options, userId }); - } - - /** - * Query conversations with filters - */ - async queryConversations(options: ConversationQueryOptions): Promise { - await this.initPromise; - - const conversationsCollection = this.getCollection("conversations"); - - const filter: any = {}; - - if (options.userId) { - filter.userId = options.userId; - } - - if (options.resourceId) { - filter.resourceId = options.resourceId; - } - - let cursor = conversationsCollection.find(filter).sort({ updatedAt: -1 }); - - if (options.limit) { - cursor = cursor.limit(options.limit); - } - - if (options.offset) { - cursor = cursor.skip(options.offset); - } - - const conversations = await cursor.toArray(); - - return conversations.map((conv: any) => ({ - id: conv._id, - resourceId: conv.resourceId, - userId: conv.userId, - title: conv.title, - metadata: conv.metadata || {}, - createdAt: conv.createdAt.toISOString(), - updatedAt: conv.updatedAt.toISOString(), - })); - } - - /** - * Update a conversation - */ - async updateConversation( - id: string, - updates: Partial>, - ): Promise { - await this.initPromise; - - const conversationsCollection = this.getCollection("conversations"); - - const updateDoc: any = { - updatedAt: new Date(), - }; - - if (updates.title !== undefined) { - updateDoc.title = updates.title; - } - - if (updates.metadata !== undefined) { - updateDoc.metadata = updates.metadata; - } - - if (updates.resourceId !== undefined) { - updateDoc.resourceId = updates.resourceId; - } - - if (updates.userId !== undefined) { - updateDoc.userId = updates.userId; - } - - const result = await conversationsCollection.findOneAndUpdate( - { _id: id } as any, - { $set: updateDoc }, - { returnDocument: "after" }, - ); - - if (!result) { - throw new ConversationNotFoundError(id); - } - - this.log(`Updated conversation ${id}`); - - return { - id: (result as any)._id, - resourceId: (result as any).resourceId, - userId: (result as any).userId, - title: (result as any).title, - metadata: (result as any).metadata || {}, - createdAt: (result as any).createdAt.toISOString(), - updatedAt: (result as any).updatedAt.toISOString(), - }; - } - - /** - * Delete a conversation - */ - async deleteConversation(id: string): Promise { - await this.initPromise; - - const conversationsCollection = this.getCollection("conversations"); - - // MongoDB will cascade delete messages and steps automatically via application logic - const messagesCollection = this.getCollection("messages"); - const stepsCollection = this.getCollection("steps"); - - await messagesCollection.deleteMany({ conversationId: id } as any); - await stepsCollection.deleteMany({ conversationId: id } as any); - await conversationsCollection.deleteOne({ _id: id } as any); - - this.log(`Deleted conversation ${id}`); - } - - // ============================================================================ - // Conversation Steps Operations - // ============================================================================ - - /** - * Save conversation steps - */ - async saveConversationSteps(steps: ConversationStepRecord[]): Promise { - await this.initPromise; - - if (steps.length === 0) return; - - const stepsCollection = this.getCollection("steps"); - - const operations = steps.map((step) => { - const id = step.id || this.generateId(); - return { - replaceOne: { - filter: { _id: id }, - replacement: { - _id: id, - conversationId: step.conversationId, - userId: step.userId, - agentId: step.agentId, - agentName: step.agentName, - operationId: step.operationId, - stepIndex: step.stepIndex, - type: step.type, - role: step.role, - content: step.content, - arguments: step.arguments, - result: step.result, - usage: step.usage, - subAgentId: step.subAgentId, - subAgentName: step.subAgentName, - createdAt: new Date(), - }, - upsert: true, - }, - }; +import { MongoClient } from "mongodb"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { MongoDBMemoryAdapter } from "./memory-adapter"; + +vi.mock("mongodb", () => { + const collection = { + createIndex: vi.fn(), + insertOne: vi.fn(), + insertMany: vi.fn(), + find: vi.fn().mockReturnThis(), + sort: vi.fn().mockReturnThis(), + limit: vi.fn().mockReturnThis(), + skip: vi.fn().mockReturnThis(), + toArray: vi.fn().mockResolvedValue([]), + findOne: vi.fn(), + updateOne: vi.fn(), + deleteMany: vi.fn(), + deleteOne: vi.fn(), + countDocuments: vi.fn(), + bulkWrite: vi.fn(), + }; + + const db = { + collection: vi.fn().mockReturnValue(collection), + }; + + const client = { + connect: vi.fn(), + db: vi.fn().mockReturnValue(db), + close: vi.fn(), + }; + + return { + MongoClient: vi.fn().mockImplementation(() => client), + }; +}); + +describe("MongoDBMemoryAdapter", () => { + let adapter: MongoDBMemoryAdapter; + + beforeEach(() => { + vi.clearAllMocks(); + adapter = new MongoDBMemoryAdapter({ + connection: "mongodb://localhost:27017", }); - - await stepsCollection.bulkWrite(operations as any); - - this.log(`Saved ${steps.length} conversation steps`); - } - - /** - * Get conversation steps - */ - async getConversationSteps( - userId: string, - conversationId: string, - options?: GetConversationStepsOptions, - ): Promise { - await this.initPromise; - - const stepsCollection = this.getCollection("steps"); - - const filter: any = { conversationId, userId }; - - if (options?.operationId) { - filter.operationId = options.operationId; - } - - let cursor = stepsCollection.find(filter).sort({ stepIndex: 1 }); - - if (options?.limit) { - cursor = cursor.limit(options.limit); - } - - const steps = await cursor.toArray(); - - return steps.map((step: any) => ({ - id: step._id, - conversationId: step.conversationId, - userId: step.userId, - agentId: step.agentId, - agentName: step.agentName, - operationId: step.operationId, - stepIndex: step.stepIndex, - type: step.type, - role: step.role, - content: step.content, - arguments: step.arguments, - result: step.result, - usage: step.usage, - subAgentId: step.subAgentId, - subAgentName: step.subAgentName, - createdAt: step.createdAt.toISOString(), - })); - } - - // ============================================================================ - // Working Memory Operations - // ============================================================================ - - /** - * Get working memory - */ - async getWorkingMemory(params: { - conversationId?: string; - userId?: string; - scope: WorkingMemoryScope; - }): Promise { - await this.initPromise; - - if (params.scope === "conversation" && params.conversationId) { - const conversationsCollection = this.getCollection("conversations"); - const conversation = await conversationsCollection.findOne({ - _id: params.conversationId, - } as any); - - if (!conversation) { - return null; - } - - const workingMemory = (conversation as any).metadata?.workingMemory; - return workingMemory || null; - } - - if (params.scope === "user" && params.userId) { - const usersCollection = this.getCollection("users"); - const user = await usersCollection.findOne({ _id: params.userId } as any); - - if (!user) { - return null; - } - - const workingMemory = (user as any).metadata?.workingMemory; - return workingMemory || null; - } - - return null; - } - - /** - * Set working memory - */ - async setWorkingMemory(params: { - conversationId?: string; - userId?: string; - content: string; - scope: WorkingMemoryScope; - }): Promise { - await this.initPromise; - - if (params.scope === "conversation" && params.conversationId) { - const conversationsCollection = this.getCollection("conversations"); - - const conversation = await conversationsCollection.findOne({ - _id: params.conversationId, - } as any); - if (!conversation) { - throw new ConversationNotFoundError(params.conversationId); - } - - await conversationsCollection.updateOne({ _id: params.conversationId } as any, { - $set: { - "metadata.workingMemory": params.content, - updatedAt: new Date(), - }, - }); - - this.log(`Set working memory for conversation ${params.conversationId}`); - } else if (params.scope === "user" && params.userId) { - const usersCollection = this.getCollection("users"); - - // Upsert user document with working memory - await usersCollection.updateOne( - { _id: params.userId } as any, - { - $set: { - "metadata.workingMemory": params.content, - updatedAt: new Date(), - }, - $setOnInsert: { - createdAt: new Date(), - }, - }, - { upsert: true }, - ); - - this.log(`Set working memory for user ${params.userId}`); - } - } - - /** - * Delete working memory - */ - async deleteWorkingMemory(params: { - conversationId?: string; - userId?: string; - scope: WorkingMemoryScope; - }): Promise { - await this.initPromise; - - if (params.scope === "conversation" && params.conversationId) { - const conversationsCollection = this.getCollection("conversations"); - - await conversationsCollection.updateOne({ _id: params.conversationId } as any, { - $unset: { "metadata.workingMemory": "" }, - $set: { updatedAt: new Date() }, - }); - - this.log(`Deleted working memory for conversation ${params.conversationId}`); - } else if (params.scope === "user" && params.userId) { - const usersCollection = this.getCollection("users"); - - await usersCollection.updateOne({ _id: params.userId } as any, { - $unset: { "metadata.workingMemory": "" }, - $set: { updatedAt: new Date() }, - }); - - this.log(`Deleted working memory for user ${params.userId}`); - } - } - - // ============================================================================ - // Workflow State Operations - // ============================================================================ - - /** - * Get workflow state by execution ID - */ - async getWorkflowState(executionId: string): Promise { - await this.initPromise; - - const workflowStatesCollection = this.getCollection("workflow_states"); - const state = await workflowStatesCollection.findOne({ _id: executionId } as any); - - if (!state) { - return null; - } - - return { - id: (state as any)._id, - workflowId: (state as any).workflowId, - workflowName: (state as any).workflowName, - status: (state as any).status, - suspension: (state as any).suspension, - events: (state as any).events, - output: (state as any).output, - cancellation: (state as any).cancellation, - userId: (state as any).userId, - conversationId: (state as any).conversationId, - metadata: (state as any).metadata, - createdAt: (state as any).createdAt, - updatedAt: (state as any).updatedAt, + }); + + afterEach(async () => { + await adapter.close(); + }); + + it("should be defined", () => { + expect(adapter).toBeDefined(); + }); + + it("should initialize correctly", async () => { + await (adapter as any).initialize(); + expect(MongoClient).toHaveBeenCalledTimes(1); + }); + + it("should perform createConversation", async () => { + const input = { + id: "test-conv-id", + resourceId: "resource-1", + userId: "user-1", + title: "Test Conversation", + metadata: {}, }; - } - - /** - * Query workflow runs with filters - */ - async queryWorkflowRuns(query: WorkflowRunQuery): Promise { - await this.initPromise; - - const workflowStatesCollection = this.getCollection("workflow_states"); - - const filter: any = {}; - - if (query.workflowId) { - filter.workflowId = query.workflowId; - } - - if (query.status) { - filter.status = query.status; - } - - if (query.from) { - filter.createdAt = { $gte: query.from }; - } - - if (query.to) { - filter.createdAt = { ...filter.createdAt, $lte: query.to }; - } - - let cursor = workflowStatesCollection.find(filter).sort({ createdAt: -1 }); - - if (query.limit) { - cursor = cursor.limit(query.limit); - } - - if (query.offset) { - cursor = cursor.skip(query.offset); - } - - const states = await cursor.toArray(); - - return states.map((state: any) => ({ - id: state._id, - workflowId: state.workflowId, - workflowName: state.workflowName, - status: state.status, - suspension: state.suspension, - events: state.events, - output: state.output, - cancellation: state.cancellation, - userId: state.userId, - conversationId: state.conversationId, - metadata: state.metadata, - createdAt: state.createdAt, - updatedAt: state.updatedAt, - })); - } - - /** - * Set workflow state (create or replace) - */ - async setWorkflowState(executionId: string, state: WorkflowStateEntry): Promise { - await this.initPromise; - - const workflowStatesCollection = this.getCollection("workflow_states"); - - const now = new Date(); - - await workflowStatesCollection.replaceOne( - { _id: executionId } as any, - { - _id: executionId, - workflowId: state.workflowId, - workflowName: state.workflowName, - status: state.status, - suspension: state.suspension, - events: state.events, - output: state.output, - cancellation: state.cancellation, - userId: state.userId, - conversationId: state.conversationId, - metadata: state.metadata, - createdAt: state.createdAt ? new Date(state.createdAt) : now, - updatedAt: now, - } as any, - { upsert: true }, - ); - - this.log(`Set workflow state ${executionId}`); - } - - /** - * Update workflow state (partial update) - */ - async updateWorkflowState( - executionId: string, - updates: Partial, - ): Promise { - await this.initPromise; - - const workflowStatesCollection = this.getCollection("workflow_states"); - - const updateDoc: any = { - updatedAt: new Date(), - }; - - if (updates.status !== undefined) { - updateDoc.status = updates.status; - } - - if (updates.suspension !== undefined) { - updateDoc.suspension = updates.suspension; - } - - if (updates.events !== undefined) { - updateDoc.events = updates.events; - } - - if (updates.output !== undefined) { - updateDoc.output = updates.output; - } - - if (updates.cancellation !== undefined) { - updateDoc.cancellation = updates.cancellation; - } - - if (updates.metadata !== undefined) { - updateDoc.metadata = updates.metadata; - } - - await workflowStatesCollection.updateOne({ _id: executionId } as any, { $set: updateDoc }); - - this.log(`Updated workflow state ${executionId}`); - } - - /** - * Get suspended workflow states - */ - async getSuspendedWorkflowStates(workflowId: string): Promise { - await this.initPromise; - - const workflowStatesCollection = this.getCollection("workflow_states"); - - const states = await workflowStatesCollection - .find({ workflowId, status: "suspended" } as any) - .sort({ createdAt: -1 }) - .toArray(); - return states.map((state: any) => ({ - id: state._id, - workflowId: state.workflowId, - workflowName: state.workflowName, - status: state.status, - suspension: state.suspension, - events: state.events, - output: state.output, - cancellation: state.cancellation, - userId: state.userId, - conversationId: state.conversationId, - metadata: state.metadata, - createdAt: state.createdAt, - updatedAt: state.updatedAt, - })); - } -} + const conv = await adapter.createConversation(input); + expect(conv.id).toBe(input.id); + }); +}); diff --git a/packages/mongodb/src/memory-adapter.ts b/packages/mongodb/src/memory-adapter.ts index c4e17233b..91e5b5d94 100644 --- a/packages/mongodb/src/memory-adapter.ts +++ b/packages/mongodb/src/memory-adapter.ts @@ -210,7 +210,6 @@ export class MongoDBMemoryAdapter implements StorageAdapter { try { await messagesCollection.insertOne({ - _id: undefined, conversationId, messageId, userId, @@ -249,7 +248,6 @@ export class MongoDBMemoryAdapter implements StorageAdapter { } const documentsToInsert = messages.map((message) => ({ - _id: undefined, // Let MongoDB generate ObjectId conversationId, messageId: message.id || this.generateId(), userId, @@ -623,25 +621,29 @@ export class MongoDBMemoryAdapter implements StorageAdapter { const operations = steps.map((step) => { const id = step.id || this.generateId(); return { - replaceOne: { + updateOne: { filter: { _id: id }, - replacement: { - _id: id, - conversationId: step.conversationId, - userId: step.userId, - agentId: step.agentId, - agentName: step.agentName, - operationId: step.operationId, - stepIndex: step.stepIndex, - type: step.type, - role: step.role, - content: step.content, - arguments: step.arguments, - result: step.result, - usage: step.usage, - subAgentId: step.subAgentId, - subAgentName: step.subAgentName, - createdAt: new Date(), + update: { + $set: { + conversationId: step.conversationId, + userId: step.userId, + agentId: step.agentId, + agentName: step.agentName, + operationId: step.operationId, + stepIndex: step.stepIndex, + type: step.type, + role: step.role, + content: step.content, + arguments: step.arguments, + result: step.result, + usage: step.usage, + subAgentId: step.subAgentId, + subAgentName: step.subAgentName, + }, + $setOnInsert: { + _id: id, + createdAt: new Date(), + }, }, upsert: true, },