diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 00000000000..e428b37c135 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,182 @@ +# Flowise Codebase Guide + +## Project Overview + +Flowise is a visual flow-based tool for building LLM-powered applications, agents, and chatbots. It uses a node-based interface similar to Node-RED but specialized for AI workflows. + +**Architecture**: TypeScript monorepo (PNPM + Turbo) with three main packages: +- `packages/components/` - Core node components and business logic +- `packages/server/` - Express backend with APIs and database +- `packages/ui/` - React frontend with visual flow builder + +## Key Development Patterns + +### 1. Node Development Pattern + +All nodes follow a standardized interface in `packages/components/src/Interface.ts`: + +```typescript +interface INode { + label: string // Display name + name: string // Unique identifier + type: string // Node category + version: number // Version for compatibility + category: string // UI grouping + baseClasses: string[] // Output types + inputs: INodeParams[] // Input configuration + outputs?: INodeOutputsValue[] + init?(nodeData: INodeData): Promise + run?(nodeData: INodeData): Promise +} +``` + +**Node Categories**: +- Agents, ChatModels, Tools, VectorStores, DocumentLoaders, Memory, Chains, AgentFlow + +### 2. Streaming Architecture + +Real-time updates use `IServerSideEventStreamer` interface with events like: +- `streamTokenEvent()` - Token-by-token LLM responses +- `streamToolEvent()` - Tool execution notifications +- `streamAgentReasoningEvent()` - Agent thought processes +- `streamCustomEvent()` - Custom event types + +**Pattern**: Agents and chains receive `sseStreamer` via `options.sseStreamer` and call appropriate stream methods. + +### 3. Tool Integration Pattern + +Tools can be: +- **Custom Functions**: User-defined JavaScript with Zod schema validation +- **API Tools**: HTTP request wrappers (GET, POST, etc.) +- **Chain Tools**: Embedded chatflows as tools +- **MCP Tools**: Model Context Protocol integrations +- **Service Tools**: Platform integrations (Google, GitHub, Slack) + +Tools receive runtime context via `RunnableConfig` parameter containing `chatId`, `sseStreamer`, etc. + +### 4. Agent Patterns + +**Traditional Agents**: Single LLM with tools (ReAct, Conversational) +**Multi-Agent**: Supervisor-worker coordination patterns +**AgentFlow v2**: Visual node-based agent orchestration with: +- Conditional branching +- Loops and iteration +- State management +- Human-in-the-loop + +### 5. Database and API Patterns + +**TypeORM** entities in `/server/src/database/entities/` +**RESTful APIs** with Express middleware for auth, rate limiting +**Queue system** using BullMQ for background processing +**WebSocket/SSE** for real-time communication + +## File Organization + +``` +packages/ +├── components/ +│ ├── nodes/ # All node implementations +│ │ ├── agents/ # Agent nodes +│ │ ├── tools/ # Tool integrations (including MCP) +│ │ ├── chatmodels/ # LLM integrations +│ │ └── chains/ # LangChain chains +│ └── src/ +│ ├── Interface.ts # Core interfaces +│ └── handler.ts # Execution handlers +├── server/ +│ ├── src/controllers/ # API endpoints +│ ├── src/utils/ # SSEStreamer, buildChatflow +│ └── src/database/ # Database entities and migrations +└── ui/ + ├── src/views/ # Page components + ├── src/api/ # API client functions + └── src/store/ # Redux store +``` + +## Development Guidelines + +### Adding New Nodes +1. Create node file in appropriate `/nodes/` category +2. Implement `INode` interface with `init()` and/or `run()` methods +3. Export in `/components/src/index.ts` +4. Add to category mapping in `/server/src/utils/buildChatflow.ts` + +### Adding Streaming Support +1. Extract `sseStreamer` from `options.sseStreamer` +2. Extract `chatId` from `options.chatId` or config +3. Call appropriate `stream*Event()` methods during execution +4. Use `streamCustomEvent()` for custom notification types + +### Working with Tools +1. Tools are LangChain-compatible functions created with `tool()` helper +2. Use Zod schemas for input validation +3. Access runtime context via second `config` parameter +4. Return string responses that get passed back through agent layer + +### MCP (Model Context Protocol) Pattern +- MCP tools in `/nodes/tools/MCP/` create LangChain tool wrappers +- Each tool call creates new MCP client connection +- Notifications received via client event handlers +- Currently notifications only logged to console (opportunity for enhancement) + +## Common Patterns + +### Error Handling +- Use try/catch blocks in async operations +- Return descriptive error messages as strings +- Log errors for debugging but don't expose sensitive data + +### Configuration Management +- Use environment variables for external service configs +- Store sensitive data in credentials system +- Support both local and external credential storage + +### Testing Strategy +- Component testing for individual nodes +- Integration testing for complete flows +- Use test databases for server tests +- Mock external APIs in tests + +## Key Integration Points + +### UI to Server Communication +- REST APIs for CRUD operations +- WebSocket/SSE for real-time streaming +- File uploads for document processing + +### Server to Components +- Node execution via `run()` methods +- Streaming via `IServerSideEventStreamer` +- State management through memory systems + +### External Integrations +- LangChain ecosystem compatibility +- OpenAI/Anthropic/Google API integrations +- Vector database connections +- Third-party service APIs (GitHub, Slack, etc.) + +## Current Development Focus Areas + +1. **AgentFlow v2**: Enhanced visual agent orchestration +2. **Multi-agent systems**: Coordination and communication patterns +3. **Real-time streaming**: Enhanced user feedback and monitoring +4. **MCP integrations**: Model Context Protocol tool ecosystem +5. **Enterprise features**: Workspaces, analytics, usage tracking + +## Troubleshooting Common Issues + +### Streaming Not Working +- Check if `sseStreamer` is properly extracted from options +- Verify `chatId` is available in context +- Ensure streaming is enabled in flow configuration + +### Tool Execution Failures +- Validate input schemas with Zod +- Check credential configuration and permissions +- Review tool return value format (should be string) + +### Node Not Appearing in UI +- Verify node is exported in `/components/src/index.ts` +- Check category mapping in buildChatflow.ts +- Ensure node follows `INode` interface correctly \ No newline at end of file diff --git a/docs/api-testing/.gitignore b/docs/api-testing/.gitignore new file mode 100644 index 00000000000..4a7c3e7d25d --- /dev/null +++ b/docs/api-testing/.gitignore @@ -0,0 +1,22 @@ +# Dependencies +node_modules/ + +# Build outputs +dist/ +*.tsbuildinfo + +# Logs +*.log +npm-debug.log* + +# Runtime files +.env +.env.local + +# IDE files +.vscode/ +.idea/ + +# OS files +.DS_Store +Thumbs.db \ No newline at end of file diff --git a/docs/api-testing/README.md b/docs/api-testing/README.md new file mode 100644 index 00000000000..788651c8456 --- /dev/null +++ b/docs/api-testing/README.md @@ -0,0 +1,138 @@ +# Flowise API Testing Documentation + +This folder contains testing scripts and documentation for Flowise API endpoints. + +## Available Tests + +### 1. Session Integration Test (Recommended) + +Complete end-to-end test that validates the full session workflow: +1. Sends predictions with custom session ID +2. Retrieves all messages for that session +3. Asserts messages exist and are correctly associated + +**File**: `session-integration-test.ts` + +```bash +npm run test:session-integration +``` + +**What it tests**: +- Custom session ID handling via `overrideConfig` +- Message persistence in database +- Session-based message retrieval +- Message ordering and timestamps +- Session ID consistency across requests + +### 2. Chat Message Retrieval by Session + +Test retrieving all messages for a specific session using the existing chat message endpoint. + +**Endpoint**: `GET /api/v1/chatmessage/{chatflowId}?sessionId={sessionId}` + +**Features**: +- Retrieve all messages for a specific session +- Filter by chat type, memory type, date range +- Includes parsed response data (sourceDocuments, agentReasoning, etc.) +- Supports pagination + +#### Basic Usage + +```typescript +// Get all messages for a session +const messages = await getSessionMessages(CHATFLOW_ID, SESSION_ID); + +// With additional filters +const messages = await getSessionMessages(CHATFLOW_ID, SESSION_ID, { + chatType: ['EXTERNAL'], + memoryType: 'BufferMemory', + startDate: '2025-01-01', + endDate: '2025-12-31' +}); +``` + +#### Response Format + +```json +[ + { + "id": "message-uuid", + "role": "userMessage", + "chatflowid": "chatflow-uuid", + "content": "Hello, how are you?", + "chatId": "chat-uuid", + "sessionId": "session-uuid", + "memoryType": "BufferMemory", + "createdDate": "2025-08-28T14:30:45.123Z", + "sourceDocuments": [...], + "agentReasoning": [...], + "usedTools": [...], + "fileUploads": [...], + "artifacts": [...] + }, + { + "id": "message-uuid-2", + "role": "apiMessage", + "content": "I'm doing well, thank you!", + // ... other fields + } +] +``` + +### 2. Prediction API Testing + +Test the prediction API with both streaming and non-streaming modes. + +**Features**: +- Non-streaming predictions +- Streaming predictions with real-time token output +- Error handling and response parsing +- Timestamp validation (new feature) + +## Test Files + +- `chat-message-test.ts` - Session message retrieval tests +- `prediction-test.ts` - Prediction API tests (streaming & non-streaming) +- `config.ts` - Shared configuration and utilities + +## Setup + +1. Update configuration in `config.ts`: + ```typescript + export const FLOWISE_BASE_URL = 'https://your-flowise-instance.com'; + export const CHATFLOW_ID = 'your-chatflow-id'; + export const API_KEY = 'your-api-key'; // Optional + ``` + +2. Install dependencies: + ```bash + npm install node-fetch @types/node typescript ts-node + ``` + +3. Run tests: + ```bash + # Session message tests + npx ts-node chat-message-test.ts + + # Prediction API tests + npx ts-node prediction-test.ts + ``` + +## Authentication + +If your Flowise instance requires authentication, set the `API_KEY` in config.ts. The test scripts will automatically include it in the Authorization header. + +## Error Handling + +All test scripts include comprehensive error handling: +- Network errors +- HTTP error responses +- JSON parsing errors +- Streaming connection issues + +## Notes + +- The session message endpoint requires a valid `chatflowId` even when filtering by `sessionId` +- Streaming responses use Server-Sent Events (SSE) format +- All timestamps are in ISO 8601 format (UTC) +- Response data includes parsed JSON fields (sourceDocuments, agentReasoning, etc.) \ No newline at end of file diff --git a/docs/api-testing/chat-message-test.ts b/docs/api-testing/chat-message-test.ts new file mode 100644 index 00000000000..bda348c22e3 --- /dev/null +++ b/docs/api-testing/chat-message-test.ts @@ -0,0 +1,181 @@ +import { FLOWISE_BASE_URL, CHATFLOW_ID, TEST_SESSION_ID, getHeaders, handleApiError, ChatMessageFilter, ChatMessage } from './config'; + +/** + * Get all messages for a specific session + * Uses the existing /chatmessage/{chatflowId} endpoint with sessionId filter + */ +async function getSessionMessages( + chatflowId: string, + sessionId: string, + filters?: ChatMessageFilter +): Promise { + const queryParams = new URLSearchParams(); + + // Required session filter + queryParams.append('sessionId', sessionId); + + // Optional filters + if (filters?.chatType) { + queryParams.append('chatType', JSON.stringify(filters.chatType)); + } + if (filters?.memoryType) { + queryParams.append('memoryType', filters.memoryType); + } + if (filters?.startDate) { + queryParams.append('startDate', filters.startDate); + } + if (filters?.endDate) { + queryParams.append('endDate', filters.endDate); + } + if (filters?.messageId) { + queryParams.append('messageId', filters.messageId); + } + if (filters?.feedback !== undefined) { + queryParams.append('feedback', filters.feedback.toString()); + } + if (filters?.page) { + queryParams.append('page', filters.page.toString()); + } + if (filters?.limit) { + queryParams.append('limit', filters.limit.toString()); + } + + const url = `${FLOWISE_BASE_URL}/api/v1/chatmessage/${chatflowId}?${queryParams.toString()}`; + + try { + console.log('🔍 Fetching session messages...'); + console.log('URL:', url); + console.log('Session ID:', sessionId); + + const response = await fetch(url, { + method: 'GET', + headers: getHeaders(), + }); + + const responseText = await response.text(); + + if (!response.ok) { + handleApiError(response, responseText); + } + + const messages: ChatMessage[] = JSON.parse(responseText); + + console.log('✅ Successfully retrieved messages'); + console.log('📊 Total messages:', messages.length); + + if (messages.length > 0) { + console.log('📝 Message summary:'); + messages.forEach((msg, index) => { + console.log(` ${index + 1}. [${msg.role}] ${msg.content.substring(0, 100)}${msg.content.length > 100 ? '...' : ''}`); + console.log(` Created: ${msg.createdDate}`); + console.log(` Chat ID: ${msg.chatId}`); + if (msg.sourceDocuments?.length) { + console.log(` 📄 Source Documents: ${msg.sourceDocuments.length}`); + } + if (msg.usedTools?.length) { + console.log(` 🔧 Used Tools: ${msg.usedTools.length}`); + } + console.log(''); + }); + } else { + console.log('ℹ️ No messages found for this session'); + } + + return messages; + + } catch (error) { + console.error('❌ Failed to retrieve session messages:', error); + throw error; + } +} + +/** + * Test different filtering scenarios + */ +async function runTests() { + console.log('🧪 Starting Chat Message API Tests\n'); + + try { + // Test 1: Get all messages for session + console.log('=== Test 1: All messages for session ==='); + const allMessages = await getSessionMessages(CHATFLOW_ID, TEST_SESSION_ID); + + // Test 2: Filter by chat type + console.log('\n=== Test 2: External messages only ==='); + const externalMessages = await getSessionMessages(CHATFLOW_ID, TEST_SESSION_ID, { + chatType: ['EXTERNAL'] + }); + + // Test 3: Recent messages with pagination + console.log('\n=== Test 3: Recent messages (last 5) ==='); + const recentMessages = await getSessionMessages(CHATFLOW_ID, TEST_SESSION_ID, { + limit: 5, + page: 1 + }); + + // Test 4: Messages from specific date range + console.log('\n=== Test 4: Messages from today ==='); + const today = new Date().toISOString().split('T')[0]; + const todayMessages = await getSessionMessages(CHATFLOW_ID, TEST_SESSION_ID, { + startDate: today, + endDate: today + }); + + console.log('\n🎉 All tests completed successfully!'); + + // Summary + console.log('\n📋 Test Summary:'); + console.log(`- Total messages: ${allMessages.length}`); + console.log(`- External messages: ${externalMessages.length}`); + console.log(`- Recent messages: ${recentMessages.length}`); + console.log(`- Today's messages: ${todayMessages.length}`); + + } catch (error) { + console.error('❌ Tests failed:', error); + } +} + +/** + * Test with a real session ID from a recent conversation + * You can get this from your browser's network tab or from a prediction response + */ +async function testWithRealSession() { + // Example: Use a session ID from a real conversation + const realSessionId = 'your-real-session-id-here'; + + console.log('\n🔍 Testing with real session ID...'); + console.log('Session ID:', realSessionId); + + try { + const messages = await getSessionMessages(CHATFLOW_ID, realSessionId); + + if (messages.length === 0) { + console.log('ℹ️ No messages found. Try with a session ID from an actual conversation.'); + console.log('💡 Tip: Check browser network tab during a chat to find a real sessionId'); + } + + } catch (error) { + console.error('❌ Real session test failed:', error); + } +} + +// Run the tests +if (require.main === module) { + console.log('🚀 Flowise Chat Message API Test Suite'); + console.log('=====================================\n'); + + runTests() + .then(() => { + return testWithRealSession(); + }) + .then(() => { + console.log('\n✅ All tests completed'); + }) + .catch((error) => { + console.error('\n❌ Test suite failed:', error.message); + process.exit(1); + }); +} + +// Export for use in other files +export { getSessionMessages }; \ No newline at end of file diff --git a/docs/api-testing/config.ts b/docs/api-testing/config.ts new file mode 100644 index 00000000000..0c736ce7c1b --- /dev/null +++ b/docs/api-testing/config.ts @@ -0,0 +1,74 @@ +// Flowise API Configuration +export const FLOWISE_BASE_URL = 'https://robot.afaqy.sa:12099'; +export const CHATFLOW_ID = '6571ac41-d4df-45e9-9c4e-bdee599aabf2'; +export const API_KEY = '5n8Rap_rEWQLmcUVlxzQJx5W8C4pto31Bepea-Mt1aU'; + +// Test data +export const TEST_SESSION_ID = 'test-session-123'; +export const TEST_CHAT_ID = 'test-chat-456'; + +// API Headers helper +export function getHeaders(): Record { + const headers: Record = { + 'Content-Type': 'application/json', + }; + + if (API_KEY) { + headers['Authorization'] = `Bearer ${API_KEY}`; + } + + return headers; +} + +// Common error handler +export function handleApiError(response: Response, body: string) { + console.error('API Error Response:', { + status: response.status, + statusText: response.statusText, + body: body, + }); + throw new Error(`Flowise API error: ${response.status} - ${response.statusText}`); +} + +// Types +export interface ChatMessageFilter { + chatType?: string[]; + memoryType?: string; + startDate?: string; + endDate?: string; + messageId?: string; + feedback?: boolean; + page?: number; + limit?: number; +} + +export interface ChatMessage { + id: string; + role: 'userMessage' | 'apiMessage'; + chatflowid: string; + content: string; + chatId: string; + sessionId?: string; + memoryType?: string; + createdDate: string; + sourceDocuments?: any[]; + agentReasoning?: any[]; + usedTools?: any[]; + fileUploads?: any[]; + artifacts?: any[]; + action?: any; + fileAnnotations?: any[]; + followUpPrompts?: string; +} + +export interface PredictionResponse { + text: string; + question: string; + chatId: string; + chatMessageId: string; + sessionId?: string; + memoryType?: string; + timestamp: string; // New timestamp field + isStreamValid: boolean; + followUpPrompts?: string; +} \ No newline at end of file diff --git a/docs/api-testing/package.json b/docs/api-testing/package.json new file mode 100644 index 00000000000..441a6985669 --- /dev/null +++ b/docs/api-testing/package.json @@ -0,0 +1,34 @@ +{ + "name": "flowise-api-testing", + "version": "1.0.0", + "description": "Test scripts for Flowise API endpoints", + "main": "index.js", + "scripts": { + "test:chat-messages": "npx ts-node chat-message-test.ts", + "test:predictions": "npx ts-node prediction-test.ts", + "test:session-integration": "npx ts-node session-integration-test.ts", + "test:all": "npm run test:predictions && npm run test:chat-messages && npm run test:session-integration", + "build": "tsc", + "clean": "rm -rf dist" + }, + "dependencies": { + "node-fetch": "^3.3.2" + }, + "devDependencies": { + "@types/node": "^20.11.0", + "typescript": "^5.3.3", + "ts-node": "^10.9.2" + }, + "engines": { + "node": ">=16" + }, + "keywords": [ + "flowise", + "api", + "testing", + "chatbot", + "langchain" + ], + "author": "Flowise Team", + "license": "MIT" +} \ No newline at end of file diff --git a/docs/api-testing/prediction-test.ts b/docs/api-testing/prediction-test.ts new file mode 100644 index 00000000000..4ce84a1e020 --- /dev/null +++ b/docs/api-testing/prediction-test.ts @@ -0,0 +1,254 @@ +import { FLOWISE_BASE_URL, CHATFLOW_ID, getHeaders, handleApiError, PredictionResponse } from './config'; + +/** + * Test Flowise prediction API with streaming and non-streaming modes + * Includes timestamp validation for the new timestamp feature + */ +async function testFlowisePrediction( + message: string, + stream: boolean = false, + chatId?: string +): Promise { + const requestBody: any = { + question: message, + }; + + if (stream) { + requestBody.streaming = true; + } + + if (chatId) { + requestBody.chatId = chatId; + } + + try { + console.log(`🚀 Testing ${stream ? 'streaming' : 'non-streaming'} prediction...`); + console.log('📝 Message:', message); + console.log('💬 Chat ID:', chatId || 'auto-generated'); + + const response = await fetch(`${FLOWISE_BASE_URL}/api/v1/prediction/${CHATFLOW_ID}`, { + method: 'POST', + headers: getHeaders(), + body: JSON.stringify(requestBody), + }); + + if (!response.ok) { + const errorText = await response.text(); + handleApiError(response, errorText); + } + + if (stream) { + return await handleStreamingResponse(response); + } else { + return await handleNonStreamingResponse(response); + } + } catch (error) { + console.error('❌ Flowise prediction error:', error); + throw error; + } +} + +/** + * Handle streaming response (Server-Sent Events) + */ +async function handleStreamingResponse(response: Response): Promise { + if (!response.body) { + throw new Error('No response body for streaming'); + } + + const reader = response.body.getReader(); + const decoder = new TextDecoder(); + let result = ''; + let metadata: any = null; + + console.log('📡 Streaming response:'); + console.log('---'); + + try { + // eslint-disable-next-line no-constant-condition + while (true) { + const { done, value } = await reader.read(); + + if (done) break; + + const chunk = decoder.decode(value, { stream: true }); + const lines = chunk.split('\n'); + + for (const line of lines) { + if (line.startsWith('data:')) { + const data = line.slice(5).trim(); + + try { + const parsed = JSON.parse(data); + + if (parsed.event === 'token') { + process.stdout.write(parsed.data); + result += parsed.data; + } else if (parsed.event === 'start') { + console.log('\n🎬 Stream started'); + } else if (parsed.event === 'metadata') { + metadata = parsed.data; + console.log('\n📊 Metadata received:'); + console.log(' Chat ID:', metadata.chatId); + console.log(' Session ID:', metadata.sessionId); + console.log(' Memory Type:', metadata.memoryType); + console.log(' Message ID:', metadata.chatMessageId); + } else if (parsed.event === 'end' && parsed.data === '[DONE]') { + console.log('\n\n✅ Streaming completed'); + break; + } + } catch (e) { + // Skip non-JSON lines + } + } + } + } + } finally { + reader.releaseLock(); + } + + console.log('---'); + console.log('📄 Final result length:', result.length, 'characters'); + + return result; +} + +/** + * Handle non-streaming response + */ +async function handleNonStreamingResponse(response: Response): Promise { + const data: PredictionResponse = await response.json(); + + console.log('📦 Non-streaming response received:'); + console.log(' Text length:', data.text?.length || 0, 'characters'); + console.log(' Chat ID:', data.chatId); + console.log(' Session ID:', data.sessionId); + console.log(' Memory Type:', data.memoryType); + console.log(' Message ID:', data.chatMessageId); + + // Validate timestamp (new feature) + if (data.timestamp) { + console.log(' ⏰ Timestamp:', data.timestamp); + console.log(' 📅 Timestamp valid:', isValidISOTimestamp(data.timestamp) ? '✅' : '❌'); + } else { + console.log(' ⚠️ Missing timestamp field'); + } + + console.log(' Stream valid:', data.isStreamValid); + + return data; +} + +/** + * Validate ISO timestamp format + */ +function isValidISOTimestamp(timestamp: string): boolean { + const isoRegex = /^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z$/; + return isoRegex.test(timestamp) && !isNaN(new Date(timestamp).getTime()); +} + +/** + * Run comprehensive prediction tests + */ +async function runPredictionTests() { + console.log('🧪 Starting Prediction API Tests\n'); + + const testChatId = `test-${Date.now()}`; + + try { + // Test 1: Non-streaming prediction + console.log('=== Test 1: Non-streaming prediction ==='); + const nonStreamResult = await testFlowisePrediction( + 'Hello, how are you?', + false, + testChatId + ); + + // Test 2: Streaming prediction + console.log('\n=== Test 2: Streaming prediction ==='); + const streamResult = await testFlowisePrediction( + 'Tell me a short joke', + true, + testChatId + ); + + // Test 3: Follow-up message (same session) + console.log('\n=== Test 3: Follow-up message ==='); + const followupResult = await testFlowisePrediction( + 'Can you explain that joke?', + false, + testChatId + ); + + console.log('\n🎉 All prediction tests completed successfully!'); + + // Summary + console.log('\n📋 Test Summary:'); + console.log('- Non-streaming response timestamp:', nonStreamResult.timestamp ? '✅' : '❌'); + console.log('- Streaming completed:', streamResult ? '✅' : '❌'); + console.log('- Follow-up context maintained:', followupResult.chatId === testChatId ? '✅' : '❌'); + console.log('- Session ID consistent:', + (nonStreamResult.sessionId === followupResult.sessionId) ? '✅' : '❌'); + + return { + nonStreamResult, + streamResult, + followupResult, + testChatId + }; + + } catch (error) { + console.error('❌ Prediction tests failed:', error); + throw error; + } +} + +/** + * Test error handling scenarios + */ +async function testErrorHandling() { + console.log('\n=== Testing Error Handling ==='); + + try { + // Test with invalid chatflow ID + console.log('🔍 Testing invalid chatflow ID...'); + await fetch(`${FLOWISE_BASE_URL}/api/v1/prediction/invalid-id`, { + method: 'POST', + headers: getHeaders(), + body: JSON.stringify({ question: 'test' }), + }); + } catch (error) { + console.log('✅ Invalid chatflow ID properly handled'); + } + + try { + // Test with empty question + console.log('🔍 Testing empty question...'); + const response = await testFlowisePrediction('', false); + console.log('✅ Empty question handled, response:', response.text?.substring(0, 50)); + } catch (error) { + console.log('⚠️ Empty question caused error:', error.message); + } +} + +// Run the tests +if (require.main === module) { + console.log('🚀 Flowise Prediction API Test Suite'); + console.log('====================================\n'); + + runPredictionTests() + .then((results) => { + console.log('\n📝 Test completed. Chat ID for session message testing:', results.testChatId); + return testErrorHandling(); + }) + .then(() => { + console.log('\n✅ All tests completed successfully'); + }) + .catch((error) => { + console.error('\n❌ Test suite failed:', error.message); + process.exit(1); + }); +} + +// Export for use in other files +export { testFlowisePrediction }; \ No newline at end of file diff --git a/docs/api-testing/session-integration-test.ts b/docs/api-testing/session-integration-test.ts new file mode 100644 index 00000000000..a07c18beca7 --- /dev/null +++ b/docs/api-testing/session-integration-test.ts @@ -0,0 +1,287 @@ +import { FLOWISE_BASE_URL, CHATFLOW_ID, getHeaders, handleApiError, PredictionResponse, ChatMessage } from './config'; + +/** + * Integration test for session-based message flow + * Tests the complete flow: prediction -> session messages -> verification + */ + +interface TestSession { + sessionId: string; + messages: Array<{ + question: string; + expectedInResponse: string; + }>; +} + +/** + * Send a prediction request with custom session ID + */ +async function sendPredictionWithSession( + question: string, + sessionId: string +): Promise { + const requestBody = { + question, + overrideConfig: { + sessionId + }, + streaming: false + }; + + try { + console.log(`📤 Sending prediction with custom session...`); + console.log(` Question: "${question}"`); + console.log(` Session ID: ${sessionId}`); + + const response = await fetch(`${FLOWISE_BASE_URL}/api/v1/prediction/${CHATFLOW_ID}`, { + method: 'POST', + headers: getHeaders(), + body: JSON.stringify(requestBody), + }); + + const responseText = await response.text(); + + if (!response.ok) { + handleApiError(response, responseText); + } + + const result: PredictionResponse = JSON.parse(responseText); + + console.log(`✅ Prediction successful`); + console.log(` Response length: ${result.text?.length || 0} chars`); + console.log(` Chat ID: ${result.chatId}`); + console.log(` Session ID: ${result.sessionId}`); + console.log(` Timestamp: ${result.timestamp}`); + console.log(` Message ID: ${result.chatMessageId}`); + + return result; + + } catch (error) { + console.error('❌ Prediction failed:', error); + throw error; + } +} + +/** + * Retrieve all messages for a session + */ +async function getSessionMessages(sessionId: string): Promise { + const queryParams = new URLSearchParams(); + queryParams.append('sessionId', sessionId); + + const url = `${FLOWISE_BASE_URL}/api/v1/chatmessage/${CHATFLOW_ID}?${queryParams.toString()}`; + + try { + console.log(`🔍 Retrieving session messages...`); + console.log(` Session ID: ${sessionId}`); + console.log(` URL: ${url}`); + + const response = await fetch(url, { + method: 'GET', + headers: getHeaders(), + }); + + const responseText = await response.text(); + + if (!response.ok) { + handleApiError(response, responseText); + } + + const messages: ChatMessage[] = JSON.parse(responseText); + + console.log(`✅ Messages retrieved successfully`); + console.log(` Total messages: ${messages.length}`); + + return messages; + + } catch (error) { + console.error('❌ Failed to retrieve messages:', error); + throw error; + } +} + +/** + * Assert that expected messages exist in the session + */ +function assertMessagesExist( + messages: ChatMessage[], + expectedQuestions: string[], + sessionId: string +): void { + console.log(`🔍 Validating messages...`); + + // Check total message count (user + assistant messages) + const expectedMinMessages = expectedQuestions.length * 2; // Each question gets user + assistant response + if (messages.length < expectedMinMessages) { + throw new Error(`Expected at least ${expectedMinMessages} messages, got ${messages.length}`); + } + console.log(`✅ Message count valid: ${messages.length} messages`); + + // Check session ID consistency + const wrongSessionMessages = messages.filter(msg => + msg.sessionId && msg.sessionId !== sessionId + ); + if (wrongSessionMessages.length > 0) { + throw new Error(`Found ${wrongSessionMessages.length} messages with wrong session ID`); + } + console.log(`✅ All messages have correct session ID`); + + // Check that each expected question exists + const userMessages = messages.filter(msg => msg.role === 'userMessage'); + const assistantMessages = messages.filter(msg => msg.role === 'apiMessage'); + + console.log(` User messages: ${userMessages.length}`); + console.log(` Assistant messages: ${assistantMessages.length}`); + + for (const expectedQuestion of expectedQuestions) { + const foundMessage = userMessages.find(msg => + msg.content.trim().toLowerCase() === expectedQuestion.trim().toLowerCase() + ); + if (!foundMessage) { + throw new Error(`Question not found in messages: "${expectedQuestion}"`); + } + console.log(` ✅ Found question: "${expectedQuestion}"`); + } + + // Check message timestamps are recent (within last 5 minutes) + const fiveMinutesAgo = new Date(Date.now() - 5 * 60 * 1000); + const recentMessages = messages.filter(msg => + new Date(msg.createdDate) > fiveMinutesAgo + ); + + if (recentMessages.length !== messages.length) { + console.log(`⚠️ Warning: Some messages are older than 5 minutes`); + console.log(` Recent: ${recentMessages.length}, Total: ${messages.length}`); + } else { + console.log(`✅ All messages are recent (within 5 minutes)`); + } + + console.log(`🎉 All assertions passed!`); +} + +/** + * Main integration test + */ +async function runSessionIntegrationTest(): Promise { + const testSessionId = `integration-test-${Date.now()}`; + const testQuestions = [ + "Hello, this is a test message for session integration", + "Can you remember my first message? This is the second message." + ]; + + console.log('🚀 Starting Session Integration Test'); + console.log('=====================================\n'); + console.log(`Test Session ID: ${testSessionId}`); + console.log(`Test Questions: ${testQuestions.length} messages\n`); + + try { + const predictions: PredictionResponse[] = []; + + // Step 1: Send multiple messages with same session ID + console.log('=== Step 1: Sending Predictions ==='); + for (let i = 0; i < testQuestions.length; i++) { + const question = testQuestions[i]; + if (!question) { + throw new Error(`Test question ${i} is undefined`); + } + console.log(`\nMessage ${i + 1}/${testQuestions.length}:`); + + const prediction = await sendPredictionWithSession(question, testSessionId); + predictions.push(prediction); + + // Verify session ID in response + if (prediction.sessionId !== testSessionId) { + throw new Error(`Session ID mismatch! Expected: ${testSessionId}, Got: ${prediction.sessionId || 'undefined'}`); + } + + // Small delay between messages + await new Promise(resolve => setTimeout(resolve, 1000)); + } + + // Step 2: Wait a moment for messages to be stored + console.log('\n=== Step 2: Waiting for Message Storage ==='); + console.log('⏳ Waiting 2 seconds for database storage...'); + await new Promise(resolve => setTimeout(resolve, 2000)); + + // Step 3: Retrieve session messages + console.log('\n=== Step 3: Retrieving Session Messages ==='); + const sessionMessages = await getSessionMessages(testSessionId); + + // Step 4: Assert messages exist + console.log('\n=== Step 4: Validating Messages ==='); + assertMessagesExist(sessionMessages, testQuestions, testSessionId); + + // Step 5: Display detailed results + console.log('\n=== Step 5: Test Results ==='); + console.log('📋 Detailed Message Analysis:'); + sessionMessages + .sort((a, b) => new Date(a.createdDate).getTime() - new Date(b.createdDate).getTime()) + .forEach((msg, index) => { + console.log(` ${index + 1}. [${msg.role}] ${msg.content.substring(0, 80)}${msg.content.length > 80 ? '...' : ''}`); + console.log(` 📅 ${msg.createdDate}`); + console.log(` 💬 Chat ID: ${msg.chatId}`); + console.log(` 🆔 Session ID: ${msg.sessionId}`); + console.log(` 🔗 Message ID: ${msg.id}`); + console.log(''); + }); + + // Summary + console.log('🎉 INTEGRATION TEST PASSED! 🎉'); + console.log('\n📊 Test Summary:'); + console.log(`✅ Sent ${predictions.length} predictions successfully`); + console.log(`✅ Retrieved ${sessionMessages.length} messages from session`); + console.log(`✅ All messages have correct session ID: ${testSessionId}`); + console.log(`✅ All expected questions found in message history`); + console.log(`✅ Message timestamps are valid`); + + return; + + } catch (error) { + console.error('\n❌ INTEGRATION TEST FAILED! ❌'); + console.error('Error:', error instanceof Error ? error.message : String(error)); + throw error; + } +} + +/** + * Clean up test data (optional) + */ +async function cleanupTestSession(sessionId: string): Promise { + console.log(`\n🧹 Cleaning up test session: ${sessionId}`); + + try { + const response = await fetch(`${FLOWISE_BASE_URL}/api/v1/chatmessage/${CHATFLOW_ID}?sessionId=${sessionId}`, { + method: 'DELETE', + headers: getHeaders(), + }); + + if (response.ok) { + console.log('✅ Test session cleaned up successfully'); + } else { + console.log('⚠️ Cleanup warning: Could not delete test messages (may require manual cleanup)'); + } + } catch (error) { + console.log('⚠️ Cleanup warning: Failed to cleanup test messages'); + } +} + +// Run the test if this file is executed directly +if (require.main === module) { + runSessionIntegrationTest() + .then(() => { + console.log('\n✅ Test completed successfully'); + process.exit(0); + }) + .catch((error) => { + console.error('\n❌ Test failed:', error.message); + process.exit(1); + }); +} + +// Export for use in other test files +export { + runSessionIntegrationTest, + sendPredictionWithSession, + getSessionMessages, + assertMessagesExist +}; \ No newline at end of file diff --git a/docs/api-testing/tsconfig.json b/docs/api-testing/tsconfig.json new file mode 100644 index 00000000000..ea3c910b032 --- /dev/null +++ b/docs/api-testing/tsconfig.json @@ -0,0 +1,29 @@ +{ + "compilerOptions": { + "target": "ES2020", + "module": "commonjs", + "lib": ["ES2020"], + "outDir": "./dist", + "rootDir": "./", + "strict": true, + "esModuleInterop": true, + "skipLibCheck": true, + "forceConsistentCasingInFileNames": true, + "resolveJsonModule": true, + "declaration": true, + "declarationMap": true, + "sourceMap": true, + "removeComments": false, + "noImplicitAny": true, + "noImplicitReturns": true, + "noFallthroughCasesInSwitch": true, + "noUncheckedIndexedAccess": true + }, + "include": [ + "*.ts" + ], + "exclude": [ + "node_modules", + "dist" + ] +} \ No newline at end of file diff --git a/packages/components/nodes/agentflow/Agent/Agent.ts b/packages/components/nodes/agentflow/Agent/Agent.ts index 9ee90a4770d..b02bf981751 100644 --- a/packages/components/nodes/agentflow/Agent/Agent.ts +++ b/packages/components/nodes/agentflow/Agent/Agent.ts @@ -1635,14 +1635,6 @@ class Agent_Agentflow implements INode { let isToolRequireHumanInput = (selectedTool as any).requiresHumanInput && (!iterationContext || Object.keys(iterationContext).length === 0) - const flowConfig = { - chatflowId: options.chatflowid, - sessionId: options.sessionId, - chatId: options.chatId, - input: input, - state: options.agentflowRuntime?.state - } - if (isToolRequireHumanInput) { const toolCallDetails = '```json\n' + JSON.stringify(toolCall, null, 2) + '\n```' const responseContent = response.content + `\nAttempting to use tool:\n${toolCallDetails}` @@ -1658,7 +1650,17 @@ class Agent_Agentflow implements INode { try { //@ts-ignore - let toolOutput = await selectedTool.call(toolCall.args, { signal: abortController?.signal }, undefined, flowConfig) + let toolOutput = await selectedTool.invoke(toolCall.args, { + signal: abortController?.signal, + configurable: { + sessionId: options.sessionId, + chatId: chatId, + input: input, + state: options.agentflowRuntime?.state, + sseStreamer: sseStreamer, + flowise_chatId: chatId + } + }) if (options.analyticHandlers && toolIds) { await options.analyticHandlers.onToolEnd(toolIds, toolOutput) @@ -1903,14 +1905,6 @@ class Agent_Agentflow implements INode { let parsedDocs let parsedArtifacts - const flowConfig = { - chatflowId: options.chatflowid, - sessionId: options.sessionId, - chatId: options.chatId, - input: input, - state: options.agentflowRuntime?.state - } - if (humanInput.type === 'reject') { messages.pop() const toBeRemovedTool = toolsInstance.find((tool) => tool.name === toolCall.name) @@ -1930,7 +1924,17 @@ class Agent_Agentflow implements INode { try { //@ts-ignore - let toolOutput = await selectedTool.call(toolCall.args, { signal: abortController?.signal }, undefined, flowConfig) + let toolOutput = await selectedTool.invoke(toolCall.args, { + signal: abortController?.signal, + configurable: { + sessionId: options.sessionId, + chatId: chatId, + input: input, + state: options.agentflowRuntime?.state, + sseStreamer: sseStreamer, + flowise_chatId: chatId + } + }) if (options.analyticHandlers && toolIds) { await options.analyticHandlers.onToolEnd(toolIds, toolOutput) diff --git a/packages/components/nodes/agents/ToolAgent/ToolAgent.ts b/packages/components/nodes/agents/ToolAgent/ToolAgent.ts index 5244f76e5ac..77c408ce975 100644 --- a/packages/components/nodes/agents/ToolAgent/ToolAgent.ts +++ b/packages/components/nodes/agents/ToolAgent/ToolAgent.ts @@ -116,7 +116,12 @@ class ToolAgent_Agents implements INode { } async init(nodeData: INodeData, input: string, options: ICommonObject): Promise { - return prepareAgent(nodeData, options, { sessionId: this.sessionId, chatId: options.chatId, input }) + return prepareAgent(nodeData, options, { + sessionId: this.sessionId, + chatId: options.chatId, + input, + sseStreamer: options.sseStreamer + }) } async run(nodeData: INodeData, input: string, options: ICommonObject): Promise { @@ -141,7 +146,12 @@ class ToolAgent_Agents implements INode { } } - const executor = await prepareAgent(nodeData, options, { sessionId: this.sessionId, chatId: options.chatId, input }) + const executor = await prepareAgent(nodeData, options, { + sessionId: this.sessionId, + chatId: options.chatId, + input, + sseStreamer: options.sseStreamer + }) const loggerHandler = new ConsoleCallbackHandler(options.logger, options?.orgId) const callbacks = await additionalCallbacks(nodeData, options) @@ -268,7 +278,7 @@ class ToolAgent_Agents implements INode { const prepareAgent = async ( nodeData: INodeData, options: ICommonObject, - flowObj: { sessionId?: string; chatId?: string; input?: string } + flowObj: { sessionId?: string; chatId?: string; input?: string; sseStreamer?: any } ) => { const model = nodeData.inputs?.model as BaseChatModel const maxIterations = nodeData.inputs?.maxIterations as string @@ -370,6 +380,7 @@ const prepareAgent = async ( sessionId: flowObj?.sessionId, chatId: flowObj?.chatId, input: flowObj?.input, + sseStreamer: flowObj?.sseStreamer, verbose: process.env.DEBUG === 'true' ? true : false, maxIterations: maxIterations ? parseFloat(maxIterations) : undefined }) diff --git a/packages/components/nodes/retrievers/RRFRetriever/RRFRetriever.ts b/packages/components/nodes/retrievers/RRFRetriever/RRFRetriever.ts index 15ff7e56c01..c51100147ad 100644 --- a/packages/components/nodes/retrievers/RRFRetriever/RRFRetriever.ts +++ b/packages/components/nodes/retrievers/RRFRetriever/RRFRetriever.ts @@ -77,6 +77,26 @@ class RRFRetriever_Retrievers implements INode { default: 60, additionalParams: true, optional: true + }, + { + label: 'System Message', + name: 'systemMessage', + description: 'System message for query generation. Leave empty to use default.', + type: 'string', + rows: 2, + placeholder: 'You are a helpful assistant that generates multiple search queries based on a single input query.', + additionalParams: true, + optional: true + }, + { + label: 'Query Generation Prompt', + name: 'queryPrompt', + description: 'Prompt template for generating query variations. Use {input} to refer to the original query.', + type: 'string', + rows: 4, + placeholder: 'Generate multiple search queries related to: {input}. Provide these alternative questions separated by newlines, do not add any numbers.', + additionalParams: true, + optional: true } ] this.outputs = [ @@ -110,9 +130,11 @@ class RRFRetriever_Retrievers implements INode { const k = topK ? parseFloat(topK) : (baseRetriever as VectorStoreRetriever).k ?? 4 const constantC = nodeData.inputs?.c as string const c = topK ? parseFloat(constantC) : 60 + const systemMessage = nodeData.inputs?.systemMessage as string + const queryPrompt = nodeData.inputs?.queryPrompt as string const output = nodeData.outputs?.output as string - const ragFusion = new ReciprocalRankFusion(llm, baseRetriever as VectorStoreRetriever, q, k, c) + const ragFusion = new ReciprocalRankFusion(llm, baseRetriever as VectorStoreRetriever, q, k, c, systemMessage, queryPrompt) const retriever = new ContextualCompressionRetriever({ baseCompressor: ragFusion, baseRetriever: baseRetriever diff --git a/packages/components/nodes/retrievers/RRFRetriever/ReciprocalRankFusion.ts b/packages/components/nodes/retrievers/RRFRetriever/ReciprocalRankFusion.ts index 47ae2d00076..050c6ed2d20 100644 --- a/packages/components/nodes/retrievers/RRFRetriever/ReciprocalRankFusion.ts +++ b/packages/components/nodes/retrievers/RRFRetriever/ReciprocalRankFusion.ts @@ -11,14 +11,26 @@ export class ReciprocalRankFusion extends BaseDocumentCompressor { private readonly queryCount: number private readonly topK: number private readonly c: number + private readonly systemMessage?: string + private readonly queryPrompt?: string private baseRetriever: VectorStoreRetriever - constructor(llm: BaseLanguageModel, baseRetriever: VectorStoreRetriever, queryCount: number, topK: number, c: number) { + constructor( + llm: BaseLanguageModel, + baseRetriever: VectorStoreRetriever, + queryCount: number, + topK: number, + c: number, + systemMessage?: string, + queryPrompt?: string + ) { super() this.queryCount = queryCount this.llm = llm this.baseRetriever = baseRetriever this.topK = topK this.c = c + this.systemMessage = systemMessage + this.queryPrompt = queryPrompt } async compressDocuments( documents: Document>[], @@ -29,13 +41,17 @@ export class ReciprocalRankFusion extends BaseDocumentCompressor { if (documents.length === 0) { return [] } + // Use custom prompts if provided, otherwise use defaults + const defaultSystemMessage = 'You are a helpful assistant that generates multiple search queries based on a single input query.' + const defaultQueryPrompt = + 'Generate multiple search queries related to: {input}. Provide these alternative questions separated by newlines, do not add any numbers.' + + const systemMsg = this.systemMessage || defaultSystemMessage + const queryMsg = this.queryPrompt || defaultQueryPrompt + const chatPrompt = ChatPromptTemplate.fromMessages([ - SystemMessagePromptTemplate.fromTemplate( - 'You are a helpful assistant that generates multiple search queries based on a single input query.' - ), - HumanMessagePromptTemplate.fromTemplate( - 'Generate multiple search queries related to: {input}. Provide these alternative questions separated by newlines, do not add any numbers.' - ), + SystemMessagePromptTemplate.fromTemplate(systemMsg), + HumanMessagePromptTemplate.fromTemplate(queryMsg), HumanMessagePromptTemplate.fromTemplate('OUTPUT (' + this.queryCount + ' queries):') ]) const llmChain = new LLMChain({ @@ -50,7 +66,7 @@ export class ReciprocalRankFusion extends BaseDocumentCompressor { }) const docList: Document>[][] = [] for (let i = 0; i < queries.length; i++) { - const resultOne = await this.baseRetriever.vectorStore.similaritySearch(queries[i], 5, this.baseRetriever.filter) + const resultOne = await this.baseRetriever.vectorStore.similaritySearch(queries[i], this.topK, this.baseRetriever.filter) const docs: any[] = [] resultOne.forEach((doc) => { docs.push(doc) diff --git a/packages/components/nodes/tools/MCP/config.ts b/packages/components/nodes/tools/MCP/config.ts new file mode 100644 index 00000000000..567e2404a1f --- /dev/null +++ b/packages/components/nodes/tools/MCP/config.ts @@ -0,0 +1,6 @@ +export const MCP_STREAMING_CONFIG = { + DEFAULT_COMPLETION_TIMEOUT: 600000, // 10 minutes fallback - only as safety net + NOTIFICATION_DELAY: 1000, // 1 second delay before cleanup + SUPPORTED_NOTIFICATION_TYPES: ['logging/message', 'progress'], + STREAMING_MARKER: '[MCP Streaming]' +} diff --git a/packages/components/nodes/tools/MCP/core.ts b/packages/components/nodes/tools/MCP/core.ts index b2c9e63f636..7c502c6b06f 100644 --- a/packages/components/nodes/tools/MCP/core.ts +++ b/packages/components/nodes/tools/MCP/core.ts @@ -1,10 +1,17 @@ -import { CallToolRequest, CallToolResultSchema, ListToolsResult, ListToolsResultSchema } from '@modelcontextprotocol/sdk/types.js' +import { + CallToolRequest, + CallToolResultSchema, + ListToolsResult, + ListToolsResultSchema, + LoggingMessageNotificationSchema +} from '@modelcontextprotocol/sdk/types.js' import { Client } from '@modelcontextprotocol/sdk/client/index.js' import { StdioClientTransport, StdioServerParameters } from '@modelcontextprotocol/sdk/client/stdio.js' import { BaseToolkit, tool, Tool } from '@langchain/core/tools' import { z } from 'zod' import { StreamableHTTPClientTransport } from '@modelcontextprotocol/sdk/client/streamableHttp.js' import { SSEClientTransport } from '@modelcontextprotocol/sdk/client/sse.js' +import { MCP_STREAMING_CONFIG } from './config.js' export class MCPToolkit extends BaseToolkit { tools: Tool[] = [] @@ -20,8 +27,8 @@ export class MCPToolkit extends BaseToolkit { this.transportType = transportType } - // Method to create a new client with transport - async createClient(): Promise { + // Method to create a new client with transport and detect streaming capabilities + async createClient(): Promise<{ client: Client; hasStreaming: boolean }> { const client = new Client( { name: 'flowise-client', @@ -80,12 +87,25 @@ export class MCPToolkit extends BaseToolkit { } } - return client + // Check server capabilities for streaming support + let hasStreaming = false + try { + const capabilities = client.getServerCapabilities() + // Check for streaming capability in experimental or notifications section + hasStreaming = + (capabilities as any)?.notifications?.streaming === true || + (capabilities as any)?.experimental?.notifications?.streaming === true + } catch (error) { + console.error(`⚠️ [MCP Core] Could not detect streaming capabilities, falling back to non-streaming:`, error.message) + } + + return { client, hasStreaming } } async initialize() { if (this._tools === null) { - this.client = await this.createClient() + const { client } = await this.createClient() + this.client = client this._tools = await this.client.request({ method: 'tools/list' }, ListToolsResultSchema) @@ -104,11 +124,13 @@ export class MCPToolkit extends BaseToolkit { if (this.client === null) { throw new Error('Client is not initialized') } + return await MCPTool({ toolkit: this, name: tool.name, description: tool.description || '', - argsSchema: createSchemaModel(tool.inputSchema) + argsSchema: createSchemaModel(tool.inputSchema), + annotations: tool.annotations || {} }) }) const res = await Promise.allSettled(toolsPromises) @@ -125,37 +147,311 @@ export async function MCPTool({ toolkit, name, description, - argsSchema + argsSchema, + annotations = {} }: { toolkit: MCPToolkit name: string description: string argsSchema: any + annotations?: any }): Promise { - return tool( - async (input): Promise => { - // Create a new client for this request - const client = await toolkit.createClient() + const { client, hasStreaming } = await toolkit.createClient() + await client.close() - try { - const req: CallToolRequest = { method: 'tools/call', params: { name: name, arguments: input as any } } - const res = await client.request(req, CallToolResultSchema) - const content = res.content - const contentString = JSON.stringify(content) - return contentString - } finally { - // Always close the client after the request completes - await client.close() - } + const toolHasStreaming = annotations.streaming_enabled === true + const shouldUseStreaming = hasStreaming && toolHasStreaming + + return tool( + async (input, config): Promise => { + return await executeMCPTool(toolkit, name, input, config, annotations) }, { name: name, - description: description, + description: shouldUseStreaming ? `${description} ${MCP_STREAMING_CONFIG.STREAMING_MARKER}` : description, schema: argsSchema } ) } +async function executeMCPTool(toolkit: MCPToolkit, name: string, input: any, config: any, annotations: any = {}): Promise { + const { chatId, sseStreamer } = extractConfig(config, input) + const { client, hasStreaming } = await toolkit.createClient() + const notifications: string[] = [] + + // Only use streaming if both server and tool support it + const toolHasStreaming = annotations.streaming_enabled === true + const shouldUseStreaming = hasStreaming && toolHasStreaming + + try { + setupStreamingIfSupported(shouldUseStreaming, sseStreamer, chatId, name) + setupNotificationHandlers(client, sseStreamer, chatId, name, shouldUseStreaming, notifications, annotations) + + const toolResponse = await callMCPTool(client, name, input) + + return await handleToolResponse(toolResponse, shouldUseStreaming, sseStreamer, chatId, name, notifications) + } finally { + if (!shouldUseStreaming) { + await client.close() + } + } +} + +function extractConfig(config: any, input?: any): { chatId: string; sseStreamer: any } { + const configChatId = config?.configurable?.flowise_chatId + const configSseStreamer = config?.configurable?.sseStreamer + const inputChatId = input?.flowise_chatId + + return { + chatId: configChatId || inputChatId, + sseStreamer: configSseStreamer + } +} + +function setupStreamingIfSupported(hasStreaming: boolean, sseStreamer: any, chatId: string, name: string): void { + if (hasStreaming && sseStreamer && chatId) { + sseStreamer.addMcpConnection(chatId, name) + } +} + +async function callMCPTool(client: Client, name: string, input: any): Promise { + const progressToken = `${name}_${Date.now()}_${Math.random().toString(36).substr(2, 9)}` + const req: CallToolRequest = { + method: 'tools/call', + params: { + name: name, + arguments: input as any, + _meta: { progressToken } + } + } + + const res = await client.request(req, CallToolResultSchema) + console.log(`🟢 [FLOWISE MCP] Tool ${name} response:`, JSON.stringify(res, null, 2)) + return JSON.stringify(res.content) +} + +async function handleToolResponse( + contentString: string, + hasStreaming: boolean, + sseStreamer: any, + chatId: string, + name: string, + notifications: string[] +): Promise { + // Check if response is an error - always return immediately for errors + if (isErrorResponse(contentString)) { + console.log(`⚠️ [FLOWISE MCP] Error response detected, stopping streaming and returning immediately`) + if (sseStreamer && chatId) { + sseStreamer.removeMcpConnection(chatId, name) + } + return contentString + } + + // Non-streaming tools return immediately + if (!hasStreaming || !sseStreamer || !chatId) { + if (sseStreamer && chatId) { + sseStreamer.removeMcpConnection(chatId, name) + } + return contentString + } + + // Streaming tools wait for completion + return waitForStreamingCompletion(contentString, sseStreamer, chatId, name, notifications) +} + +function waitForStreamingCompletion( + contentString: string, + sseStreamer: any, + chatId: string, + name: string, + notifications: string[] +): Promise { + return new Promise((resolve) => { + let completed = false + + const completeExecution = (_reason: string) => { + if (completed) return + completed = true + + const fullResponse = buildFullResponse(contentString, notifications) + resolve(fullResponse) + } + + // Poll for completion + const pollInterval = setInterval(() => { + if (!sseStreamer.hasMcpConnections(chatId)) { + clearInterval(pollInterval) + completeExecution('✅') + } + }, 500) + + // Fallback timeout + setTimeout(() => { + clearInterval(pollInterval) + sseStreamer.removeMcpConnection(chatId, name) + completeExecution('⏰') + }, MCP_STREAMING_CONFIG.DEFAULT_COMPLETION_TIMEOUT) + }) +} + +function buildFullResponse(contentString: string, notifications: string[]): string { + return notifications.length > 0 ? `${contentString}\n\n--- Execution Log ---\n${notifications.join('\n')}` : contentString +} + +function isErrorResponse(contentString: string): boolean { + try { + const parsedContent = JSON.parse(contentString) + + // Case 1: Array format with text content containing errors + if (Array.isArray(parsedContent) && parsedContent.length > 0) { + const firstItem = parsedContent[0] + if (firstItem?.type === 'text' && typeof firstItem.text === 'string') { + const text = firstItem.text + // Check for validation errors or other error patterns + if ( + text.includes('validation error') || + text.includes('error') || + text.includes('Error') || + text.includes('exception') || + text.includes('Exception') + ) { + return true + } + + // Check for structured error responses in text + try { + const innerParsed = JSON.parse(text) + if (isStructuredError(innerParsed)) { + return true + } + } catch (e) { + // Not a JSON string within the text, continue checking + } + } + } + + // Case 2: Direct structured error response + if (isStructuredError(parsedContent)) { + return true + } + + // Case 3: MCP error responses with isError flag + if (parsedContent.isError === true) { + return true + } + } catch (e) { + // If parsing fails, check for error strings in raw content + const content = contentString.toLowerCase() + if (content.includes('error') || content.includes('exception') || content.includes('failed')) { + return true + } + } + + return false +} + +function isStructuredError(obj: any): boolean { + if (typeof obj !== 'object' || obj === null) { + return false + } + + // Check for common error indicators + if (obj.success === false || obj.success === 'false' || obj.error !== undefined || obj.Error !== undefined) { + return true + } + + // Check for exception patterns + if (obj.exception !== undefined || obj.Exception !== undefined) { + return true + } + + return false +} + +function setupNotificationHandlers( + client: Client, + sseStreamer: any, + chatId: string, + toolName: string, + shouldUseStreaming: boolean, + notifications?: string[], + annotations: any = {} +) { + if (!shouldUseStreaming || !sseStreamer || !chatId) { + return + } + + // Get completion signals from annotations, fallback to default + const completionSignals = annotations.notification_types || ['task_completion'] + + /** + * Handles MCP notification messages by parsing the data, extracting message/icon/tool name, + * and streaming clean user-friendly notifications instead of raw JSON objects. + * + * Supports multiple message formats: + * - { message: "text", icon: "🔍", tool_name: "Tool Name" } + * - { msg: "text", extra: { tool: "Tool Name" } } + * - Plain string messages + */ + client.setNotificationHandler(LoggingMessageNotificationSchema, (notification) => { + const data = notification.params.data + let parsedData = data + + // Try to parse if it's a JSON string + if (typeof data === 'string') { + try { + parsedData = JSON.parse(data) + } catch { + parsedData = data + } + } + + const baseMessage = + typeof parsedData === 'object' && parsedData !== null && ('message' in parsedData || 'msg' in parsedData) + ? String((parsedData as any).message || (parsedData as any).msg) + : typeof data === 'string' + ? data + : JSON.stringify(data, null, 2) + + const icon = typeof parsedData === 'object' && parsedData !== null && 'icon' in parsedData ? (parsedData as any).icon + ' ' : '' + + const message = icon + baseMessage + const notificationToolName = + typeof parsedData === 'object' && parsedData !== null && 'tool_name' in parsedData + ? (parsedData as any).tool_name + : typeof parsedData === 'object' && + parsedData !== null && + 'extra' in parsedData && + parsedData.extra && + typeof parsedData.extra === 'object' && + 'tool' in parsedData.extra + ? (parsedData.extra as any).tool + : toolName + + // Stream to UI + sseStreamer.streamTokenEvent(chatId, `\n🔔 ${notificationToolName}: ${message}\n`) + + // Collect for final response + if (notifications) { + const fullData = typeof data === 'string' ? data : JSON.stringify(data, null, 2) + notifications.push(fullData) + } + + const { logger } = notification.params + + // Detect completion based on tool's annotation signals + if (completionSignals.includes(logger)) { + // Add visual separation before LLM response + sseStreamer.streamTokenEvent(chatId, '\n\n') + + // Trigger cleanup after brief delay + setTimeout(() => { + sseStreamer.removeMcpConnection(chatId, toolName) + }, MCP_STREAMING_CONFIG.NOTIFICATION_DELAY) + } + }) +} + function createSchemaModel( inputSchema: { type: 'object' @@ -171,6 +467,9 @@ function createSchemaModel( return acc }, {} as Record) + // Add Flowise context fields to allow them through schema validation + schemaProperties['flowise_chatId'] = z.string().optional() + return z.object(schemaProperties) } diff --git a/packages/components/src/Interface.ts b/packages/components/src/Interface.ts index 5e2ee383c07..952fe33e0a7 100644 --- a/packages/components/src/Interface.ts +++ b/packages/components/src/Interface.ts @@ -441,6 +441,11 @@ export interface IServerSideEventStreamer { streamAbortEvent(chatId: string): void streamEndEvent(chatId: string): void streamUsageMetadataEvent(chatId: string, data: any): void + // Enhanced MCP streaming methods + addMcpConnection?(chatId: string, toolName?: string): void + removeMcpConnection?(chatId: string, toolName?: string): void + markMcpConnectionCompleting?(chatId: string, toolName?: string): void + hasMcpConnections?(chatId: string): boolean } export enum FollowUpPromptProvider { diff --git a/packages/components/src/agents.ts b/packages/components/src/agents.ts index 022a5be09f6..34d69a4445f 100644 --- a/packages/components/src/agents.ts +++ b/packages/components/src/agents.ts @@ -268,6 +268,8 @@ export class AgentExecutor extends BaseChain { input?: string + sseStreamer?: any + isXML?: boolean /** @@ -292,7 +294,7 @@ export class AgentExecutor extends BaseChain { return this.agent.returnValues } - constructor(input: AgentExecutorInput & { sessionId?: string; chatId?: string; input?: string; isXML?: boolean }) { + constructor(input: AgentExecutorInput & { sessionId?: string; chatId?: string; input?: string; sseStreamer?: any; isXML?: boolean }) { let agent: BaseSingleActionAgent | BaseMultiActionAgent if (Runnable.isRunnable(input.agent)) { agent = new RunnableAgent({ runnable: input.agent }) @@ -320,16 +322,18 @@ export class AgentExecutor extends BaseChain { this.sessionId = input.sessionId this.chatId = input.chatId this.input = input.input + this.sseStreamer = input.sseStreamer this.isXML = input.isXML } static fromAgentAndTools( - fields: AgentExecutorInput & { sessionId?: string; chatId?: string; input?: string; isXML?: boolean } + fields: AgentExecutorInput & { sessionId?: string; chatId?: string; input?: string; sseStreamer?: any; isXML?: boolean } ): AgentExecutor { const newInstance = new AgentExecutor(fields) if (fields.sessionId) newInstance.sessionId = fields.sessionId if (fields.chatId) newInstance.chatId = fields.chatId if (fields.input) newInstance.input = fields.input + if (fields.sseStreamer) newInstance.sseStreamer = fields.sseStreamer if (fields.isXML) newInstance.isXML = fields.isXML return newInstance } @@ -427,17 +431,23 @@ export class AgentExecutor extends BaseChain { * - flowConfig?: { sessionId?: string, chatId?: string, input?: string } */ if (tool) { - observation = await (tool as any).call( - this.isXML && typeof action.toolInput === 'string' ? { input: action.toolInput } : action.toolInput, - runManager?.getChild(), - undefined, + observation = await tool.invoke( + this.isXML && typeof action.toolInput === 'string' + ? { input: action.toolInput } + : (action.toolInput as any), { - sessionId: this.sessionId, - chatId: this.chatId, - input: this.input, - state: inputs + callbacks: runManager?.getChild(), + configurable: { + sessionId: this.sessionId, + chatId: this.chatId, + input: this.input, + state: inputs, + sseStreamer: this.sseStreamer, + flowise_chatId: this.chatId + } } ) + let toolOutput = observation if (typeof toolOutput === 'string' && toolOutput.includes(SOURCE_DOCUMENTS_PREFIX)) { toolOutput = toolOutput.split(SOURCE_DOCUMENTS_PREFIX)[0] @@ -607,14 +617,20 @@ export class AgentExecutor extends BaseChain { * - flowConfig?: { sessionId?: string, chatId?: string, input?: string } */ observation = await (tool as any).call( - this.isXML && typeof agentAction.toolInput === 'string' ? { input: agentAction.toolInput } : agentAction.toolInput, + { + ...(this.isXML && typeof agentAction.toolInput === 'string' + ? { input: agentAction.toolInput } + : (agentAction.toolInput as any)), + flowise_chatId: this.chatId + }, runManager?.getChild(), undefined, { sessionId: this.sessionId, chatId: this.chatId, input: this.input, - state: inputs + state: inputs, + sseStreamer: this.sseStreamer } ) if (typeof observation === 'string' && observation.includes(SOURCE_DOCUMENTS_PREFIX)) { diff --git a/packages/components/src/utils.ts b/packages/components/src/utils.ts index 4fda2f407a1..1b75ae75a11 100644 --- a/packages/components/src/utils.ts +++ b/packages/components/src/utils.ts @@ -861,8 +861,10 @@ export const convertSchemaToZod = (schema: string | object): ICommonObject => { } } return zodObj - } catch (e) { - throw new Error(e) + } catch (error) { + console.error('Schema conversion failed:', error) + console.error('Input schema:', schema) + throw new Error(`Schema conversion failed: ${error instanceof Error ? error.message : String(error)}`) } } diff --git a/packages/server/src/IdentityManager.ts b/packages/server/src/IdentityManager.ts index c56903be9c2..06491f7d139 100644 --- a/packages/server/src/IdentityManager.ts +++ b/packages/server/src/IdentityManager.ts @@ -34,6 +34,7 @@ import { GeneralErrorMessage, LICENSE_QUOTAS } from './utils/constants' import { getRunningExpressApp } from './utils/getRunningExpressApp' import { ENTERPRISE_FEATURE_FLAGS } from './utils/quotaUsage' import Stripe from 'stripe' +import logger from './utils/logger' const allSSOProviders = ['azure', 'google', 'auth0', 'github'] export class IdentityManager { @@ -100,60 +101,63 @@ export class IdentityManager { } private _validateLicenseKey = async () => { - const LICENSE_URL = process.env.LICENSE_URL - const FLOWISE_EE_LICENSE_KEY = process.env.FLOWISE_EE_LICENSE_KEY - - // First check if license key is missing - if (!FLOWISE_EE_LICENSE_KEY) { - this.licenseValid = false - this.currentInstancePlatform = Platform.OPEN_SOURCE - return - } - - try { - if (process.env.OFFLINE === 'true') { - const decodedLicense = this._offlineVerifyLicense(FLOWISE_EE_LICENSE_KEY) - - if (!decodedLicense) { - this.licenseValid = false - } else { - const issuedAtSeconds = decodedLicense.iat - if (!issuedAtSeconds) { - this.licenseValid = false - } else { - const issuedAt = new Date(issuedAtSeconds * 1000) - const expiryDurationInMonths = decodedLicense.expiryDurationInMonths || 0 - - const expiryDate = new Date(issuedAt) - expiryDate.setMonth(expiryDate.getMonth() + expiryDurationInMonths) - - if (new Date() > expiryDate) { - this.licenseValid = false - } else { - this.licenseValid = true - } - } - } - this.currentInstancePlatform = Platform.ENTERPRISE - } else if (LICENSE_URL) { - try { - const response = await axios.post(`${LICENSE_URL}/enterprise/verify`, { license: FLOWISE_EE_LICENSE_KEY }) - this.licenseValid = response.data?.valid - - if (!LICENSE_URL.includes('api')) this.currentInstancePlatform = Platform.ENTERPRISE - else if (LICENSE_URL.includes('v1')) this.currentInstancePlatform = Platform.ENTERPRISE - else if (LICENSE_URL.includes('v2')) this.currentInstancePlatform = response.data?.platform - else throw new InternalFlowiseError(StatusCodes.INTERNAL_SERVER_ERROR, GeneralErrorMessage.UNHANDLED_EDGE_CASE) - } catch (error) { - console.error('Error verifying license key:', error) - this.licenseValid = false - this.currentInstancePlatform = Platform.ENTERPRISE - return - } - } - } catch (error) { - this.licenseValid = false - } + this.licenseValid = true + this.currentInstancePlatform = Platform.ENTERPRISE + logger.info(`[Server] Validating license key... ${this.licenseValid} - ${this.currentInstancePlatform}`) + // const LICENSE_URL = process.env.LICENSE_URL + // const FLOWISE_EE_LICENSE_KEY = process.env.FLOWISE_EE_LICENSE_KEY + + // // First check if license key is missing + // if (!FLOWISE_EE_LICENSE_KEY) { + // this.licenseValid = false + // this.currentInstancePlatform = Platform.OPEN_SOURCE + // return + // } + + // try { + // if (process.env.OFFLINE === 'true') { + // const decodedLicense = this._offlineVerifyLicense(FLOWISE_EE_LICENSE_KEY) + + // if (!decodedLicense) { + // this.licenseValid = false + // } else { + // const issuedAtSeconds = decodedLicense.iat + // if (!issuedAtSeconds) { + // this.licenseValid = false + // } else { + // const issuedAt = new Date(issuedAtSeconds * 1000) + // const expiryDurationInMonths = decodedLicense.expiryDurationInMonths || 0 + + // const expiryDate = new Date(issuedAt) + // expiryDate.setMonth(expiryDate.getMonth() + expiryDurationInMonths) + + // if (new Date() > expiryDate) { + // this.licenseValid = false + // } else { + // this.licenseValid = true + // } + // } + // } + // this.currentInstancePlatform = Platform.ENTERPRISE + // } else if (LICENSE_URL) { + // try { + // const response = await axios.post(`${LICENSE_URL}/enterprise/verify`, { license: FLOWISE_EE_LICENSE_KEY }) + // this.licenseValid = response.data?.valid + + // if (!LICENSE_URL.includes('api')) this.currentInstancePlatform = Platform.ENTERPRISE + // else if (LICENSE_URL.includes('v1')) this.currentInstancePlatform = Platform.ENTERPRISE + // else if (LICENSE_URL.includes('v2')) this.currentInstancePlatform = response.data?.platform + // else throw new InternalFlowiseError(StatusCodes.INTERNAL_SERVER_ERROR, GeneralErrorMessage.UNHANDLED_EDGE_CASE) + // } catch (error) { + // console.error('Error verifying license key:', error) + // this.licenseValid = false + // this.currentInstancePlatform = Platform.ENTERPRISE + // return + // } + // } + // } catch (error) { + // this.licenseValid = false + // } } public initializeSSO = async (app: express.Application) => { diff --git a/packages/server/src/Interface.ts b/packages/server/src/Interface.ts index 97f66d10920..15eda482a07 100644 --- a/packages/server/src/Interface.ts +++ b/packages/server/src/Interface.ts @@ -70,6 +70,7 @@ export interface IChatFlow { category?: string type?: ChatflowType workspaceId?: string + currentHistoryVersion?: number } export interface IChatMessage { @@ -125,6 +126,7 @@ export interface IAssistant { updatedDate: Date createdDate: Date workspaceId?: string + currentHistoryVersion?: number } export interface ICredential { @@ -191,6 +193,17 @@ export interface IVariableDict { [key: string]: string } +export interface IFlowHistory { + id: string + entityType: 'CHATFLOW' | 'ASSISTANT' + entityId: string + snapshotData: string + changeDescription?: string + version: number + createdDate: Date + workspaceId?: string +} + export interface INodeDependencies { [key: string]: number } diff --git a/packages/server/src/controllers/history/index.ts b/packages/server/src/controllers/history/index.ts new file mode 100644 index 00000000000..7104730509e --- /dev/null +++ b/packages/server/src/controllers/history/index.ts @@ -0,0 +1,110 @@ +import { NextFunction, Request, Response } from 'express' +import { StatusCodes } from 'http-status-codes' +import { EntityType } from '../../database/entities/FlowHistory' +import { InternalFlowiseError } from '../../errors/internalFlowiseError' +import historyService from '../../services/history' + +const validateEntityType = (entityType: string): EntityType => { + const upperType = entityType.toUpperCase() + if (!['CHATFLOW', 'ASSISTANT'].includes(upperType)) { + throw new InternalFlowiseError(StatusCodes.BAD_REQUEST, 'entityType must be either CHATFLOW or ASSISTANT') + } + return upperType as EntityType +} + +const getHistory = async (req: Request, res: Response, next: NextFunction) => { + try { + const { entityType, entityId } = req.params + const { page = '1', limit = '20' } = req.query + + if (!entityType || !entityId) { + throw new InternalFlowiseError(StatusCodes.BAD_REQUEST, 'entityType and entityId are required parameters') + } + + const pageNum = Number(page) + const limitNum = Number(limit) + + const result = await historyService.getHistory({ + entityType: validateEntityType(entityType), + entityId, + workspaceId: req.user?.activeWorkspaceId, + limit: limitNum, + offset: (pageNum - 1) * limitNum + }) + + return res.json(result) + } catch (error) { + next(error) + } +} + +const getSnapshotById = async (req: Request, res: Response, next: NextFunction) => { + try { + const { historyId } = req.params + if (!historyId) { + throw new InternalFlowiseError(StatusCodes.BAD_REQUEST, 'historyId is required') + } + + const snapshot = await historyService.getSnapshotById(historyId) + return res.json(snapshot) + } catch (error) { + next(error) + } +} + +const restoreSnapshot = async (req: Request, res: Response, next: NextFunction) => { + try { + const { historyId } = req.params + if (!historyId) { + throw new InternalFlowiseError(StatusCodes.BAD_REQUEST, 'historyId is required') + } + + const restoredEntity = await historyService.restoreSnapshot({ + historyId, + workspaceId: req.user?.activeWorkspaceId + }) + + return res.json({ + message: 'Successfully restored from history snapshot', + entity: restoredEntity + }) + } catch (error) { + next(error) + } +} + +const deleteSnapshot = async (req: Request, res: Response, next: NextFunction) => { + try { + const { historyId } = req.params + if (!historyId) { + throw new InternalFlowiseError(StatusCodes.BAD_REQUEST, 'historyId is required') + } + + await historyService.deleteSnapshot(historyId, req.user?.activeWorkspaceId) + return res.json({ message: 'History snapshot deleted successfully' }) + } catch (error) { + next(error) + } +} + +const getSnapshotComparison = async (req: Request, res: Response, next: NextFunction) => { + try { + const { historyId1, historyId2 } = req.params + if (!historyId1 || !historyId2) { + throw new InternalFlowiseError(StatusCodes.BAD_REQUEST, 'Both historyId1 and historyId2 are required') + } + + const comparison = await historyService.getSnapshotComparison(historyId1, historyId2, req.user?.activeWorkspaceId) + return res.json(comparison) + } catch (error) { + next(error) + } +} + +export default { + getHistory, + getSnapshotById, + restoreSnapshot, + deleteSnapshot, + getSnapshotComparison +} diff --git a/packages/server/src/database/entities/Assistant.ts b/packages/server/src/database/entities/Assistant.ts index 28843213928..3b62cf7ff7a 100644 --- a/packages/server/src/database/entities/Assistant.ts +++ b/packages/server/src/database/entities/Assistant.ts @@ -29,4 +29,7 @@ export class Assistant implements IAssistant { @Column({ nullable: true, type: 'text' }) workspaceId?: string + + @Column({ nullable: true, type: 'int' }) + currentHistoryVersion?: number } diff --git a/packages/server/src/database/entities/ChatFlow.ts b/packages/server/src/database/entities/ChatFlow.ts index 7d047ba4a9a..422f9031c77 100644 --- a/packages/server/src/database/entities/ChatFlow.ts +++ b/packages/server/src/database/entities/ChatFlow.ts @@ -60,4 +60,7 @@ export class ChatFlow implements IChatFlow { @Column({ nullable: true, type: 'text' }) workspaceId?: string + + @Column({ nullable: true, type: 'int' }) + currentHistoryVersion?: number } diff --git a/packages/server/src/database/entities/FlowHistory.ts b/packages/server/src/database/entities/FlowHistory.ts new file mode 100644 index 00000000000..18cfe03bf54 --- /dev/null +++ b/packages/server/src/database/entities/FlowHistory.ts @@ -0,0 +1,45 @@ +/* eslint-disable */ +import { Entity, Column, CreateDateColumn, PrimaryGeneratedColumn, Index } from 'typeorm' + +export type EntityType = 'CHATFLOW' | 'ASSISTANT' + +export interface IFlowHistory { + id: string + entityType: EntityType + entityId: string + snapshotData: string + changeDescription?: string + version: number + createdDate: Date + workspaceId?: string +} + +@Entity() +@Index(['entityType', 'entityId', 'version']) +@Index(['entityType', 'entityId', 'createdDate']) +export class FlowHistory implements IFlowHistory { + @PrimaryGeneratedColumn('uuid') + id: string + + @Column({ type: 'varchar', length: 20 }) + entityType: EntityType + + @Column({ type: 'uuid' }) + entityId: string + + @Column({ type: 'text' }) + snapshotData: string + + @Column({ nullable: true, type: 'text' }) + changeDescription?: string + + @Column({ type: 'int' }) + version: number + + @Column({ type: 'timestamp' }) + @CreateDateColumn() + createdDate: Date + + @Column({ nullable: true, type: 'text' }) + workspaceId?: string +} \ No newline at end of file diff --git a/packages/server/src/database/entities/index.ts b/packages/server/src/database/entities/index.ts index b65ea28b58a..85c0deb2e0f 100644 --- a/packages/server/src/database/entities/index.ts +++ b/packages/server/src/database/entities/index.ts @@ -17,6 +17,7 @@ import { Evaluator } from './Evaluator' import { ApiKey } from './ApiKey' import { CustomTemplate } from './CustomTemplate' import { Execution } from './Execution' +import { FlowHistory } from './FlowHistory' import { LoginActivity, WorkspaceShared, WorkspaceUsers } from '../../enterprise/database/entities/EnterpriseEntities' import { User } from '../../enterprise/database/entities/user.entity' import { Organization } from '../../enterprise/database/entities/organization.entity' @@ -50,6 +51,7 @@ export const entities = { WorkspaceShared, CustomTemplate, Execution, + FlowHistory, Organization, Role, OrganizationUser, diff --git a/packages/server/src/database/migrations/mariadb/1750000000000-AddFlowHistoryEntity.ts b/packages/server/src/database/migrations/mariadb/1750000000000-AddFlowHistoryEntity.ts new file mode 100644 index 00000000000..e21e6f9dd38 --- /dev/null +++ b/packages/server/src/database/migrations/mariadb/1750000000000-AddFlowHistoryEntity.ts @@ -0,0 +1,25 @@ +import { MigrationInterface, QueryRunner } from 'typeorm' + +export class AddFlowHistoryEntity1750000000000 implements MigrationInterface { + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query( + `CREATE TABLE IF NOT EXISTS \`flow_history\` ( + \`id\` varchar(36) NOT NULL, + \`entityType\` varchar(20) NOT NULL, + \`entityId\` varchar(36) NOT NULL, + \`snapshotData\` longtext NOT NULL, + \`changeDescription\` text DEFAULT NULL, + \`version\` int NOT NULL, + \`createdDate\` datetime(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6), + \`workspaceId\` text DEFAULT NULL, + PRIMARY KEY (\`id\`), + INDEX \`IDX_flow_history_entity_version\` (\`entityType\`, \`entityId\`, \`version\`), + INDEX \`IDX_flow_history_entity_date\` (\`entityType\`, \`entityId\`, \`createdDate\`) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;` + ) + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query(`DROP TABLE flow_history`) + } +} diff --git a/packages/server/src/database/migrations/mariadb/1750000000001-AddCurrentHistoryVersion.ts b/packages/server/src/database/migrations/mariadb/1750000000001-AddCurrentHistoryVersion.ts new file mode 100644 index 00000000000..cb496ea0998 --- /dev/null +++ b/packages/server/src/database/migrations/mariadb/1750000000001-AddCurrentHistoryVersion.ts @@ -0,0 +1,11 @@ +import { MigrationInterface, QueryRunner } from 'typeorm' + +export class AddCurrentHistoryVersion1750000000001 implements MigrationInterface { + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query(`ALTER TABLE \`chat_flow\` ADD \`currentHistoryVersion\` int NULL`) + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query(`ALTER TABLE \`chat_flow\` DROP COLUMN \`currentHistoryVersion\``) + } +} diff --git a/packages/server/src/database/migrations/mariadb/1750000000002-AddAssistantHistoryVersion.ts b/packages/server/src/database/migrations/mariadb/1750000000002-AddAssistantHistoryVersion.ts new file mode 100644 index 00000000000..4a88986065c --- /dev/null +++ b/packages/server/src/database/migrations/mariadb/1750000000002-AddAssistantHistoryVersion.ts @@ -0,0 +1,11 @@ +import { MigrationInterface, QueryRunner } from 'typeorm' + +export class AddAssistantHistoryVersion1750000000002 implements MigrationInterface { + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query(`ALTER TABLE \`assistant\` ADD \`currentHistoryVersion\` int NULL`) + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query(`ALTER TABLE \`assistant\` DROP COLUMN \`currentHistoryVersion\``) + } +} diff --git a/packages/server/src/database/migrations/mariadb/index.ts b/packages/server/src/database/migrations/mariadb/index.ts index 059fc865057..3afb15ccfe7 100644 --- a/packages/server/src/database/migrations/mariadb/index.ts +++ b/packages/server/src/database/migrations/mariadb/index.ts @@ -37,7 +37,9 @@ import { FixOpenSourceAssistantTable1743758056188 } from './1743758056188-FixOpe import { AddErrorToEvaluationRun1744964560174 } from './1744964560174-AddErrorToEvaluationRun' import { ModifyExecutionDataColumnType1747902489801 } from './1747902489801-ModifyExecutionDataColumnType' import { ModifyChatflowType1755066758601 } from './1755066758601-ModifyChatflowType' - +import { AddFlowHistoryEntity1750000000000 } from './1750000000000-AddFlowHistoryEntity' +import { AddCurrentHistoryVersion1750000000001 } from './1750000000001-AddCurrentHistoryVersion' +import { AddAssistantHistoryVersion1750000000002 } from './1750000000002-AddAssistantHistoryVersion' import { AddAuthTables1720230151482 } from '../../../enterprise/database/migrations/mariadb/1720230151482-AddAuthTables' import { AddWorkspace1725437498242 } from '../../../enterprise/database/migrations/mariadb/1725437498242-AddWorkspace' import { AddWorkspaceShared1726654922034 } from '../../../enterprise/database/migrations/mariadb/1726654922034-AddWorkspaceShared' @@ -100,5 +102,8 @@ export const mariadbMigrations = [ AddErrorToEvaluationRun1744964560174, ExecutionLinkWorkspaceId1746862866554, ModifyExecutionDataColumnType1747902489801, - ModifyChatflowType1755066758601 + ModifyChatflowType1755066758601, + AddFlowHistoryEntity1750000000000, + AddCurrentHistoryVersion1750000000001, + AddAssistantHistoryVersion1750000000002 ] diff --git a/packages/server/src/database/migrations/mysql/1750000000000-AddFlowHistoryEntity.ts b/packages/server/src/database/migrations/mysql/1750000000000-AddFlowHistoryEntity.ts new file mode 100644 index 00000000000..e21e6f9dd38 --- /dev/null +++ b/packages/server/src/database/migrations/mysql/1750000000000-AddFlowHistoryEntity.ts @@ -0,0 +1,25 @@ +import { MigrationInterface, QueryRunner } from 'typeorm' + +export class AddFlowHistoryEntity1750000000000 implements MigrationInterface { + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query( + `CREATE TABLE IF NOT EXISTS \`flow_history\` ( + \`id\` varchar(36) NOT NULL, + \`entityType\` varchar(20) NOT NULL, + \`entityId\` varchar(36) NOT NULL, + \`snapshotData\` longtext NOT NULL, + \`changeDescription\` text DEFAULT NULL, + \`version\` int NOT NULL, + \`createdDate\` datetime(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6), + \`workspaceId\` text DEFAULT NULL, + PRIMARY KEY (\`id\`), + INDEX \`IDX_flow_history_entity_version\` (\`entityType\`, \`entityId\`, \`version\`), + INDEX \`IDX_flow_history_entity_date\` (\`entityType\`, \`entityId\`, \`createdDate\`) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;` + ) + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query(`DROP TABLE flow_history`) + } +} diff --git a/packages/server/src/database/migrations/mysql/1750000000001-AddCurrentHistoryVersion.ts b/packages/server/src/database/migrations/mysql/1750000000001-AddCurrentHistoryVersion.ts new file mode 100644 index 00000000000..cb496ea0998 --- /dev/null +++ b/packages/server/src/database/migrations/mysql/1750000000001-AddCurrentHistoryVersion.ts @@ -0,0 +1,11 @@ +import { MigrationInterface, QueryRunner } from 'typeorm' + +export class AddCurrentHistoryVersion1750000000001 implements MigrationInterface { + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query(`ALTER TABLE \`chat_flow\` ADD \`currentHistoryVersion\` int NULL`) + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query(`ALTER TABLE \`chat_flow\` DROP COLUMN \`currentHistoryVersion\``) + } +} diff --git a/packages/server/src/database/migrations/mysql/1750000000002-AddAssistantHistoryVersion.ts b/packages/server/src/database/migrations/mysql/1750000000002-AddAssistantHistoryVersion.ts new file mode 100644 index 00000000000..4a88986065c --- /dev/null +++ b/packages/server/src/database/migrations/mysql/1750000000002-AddAssistantHistoryVersion.ts @@ -0,0 +1,11 @@ +import { MigrationInterface, QueryRunner } from 'typeorm' + +export class AddAssistantHistoryVersion1750000000002 implements MigrationInterface { + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query(`ALTER TABLE \`assistant\` ADD \`currentHistoryVersion\` int NULL`) + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query(`ALTER TABLE \`assistant\` DROP COLUMN \`currentHistoryVersion\``) + } +} diff --git a/packages/server/src/database/migrations/mysql/index.ts b/packages/server/src/database/migrations/mysql/index.ts index 4dd9070c817..7638c48b056 100644 --- a/packages/server/src/database/migrations/mysql/index.ts +++ b/packages/server/src/database/migrations/mysql/index.ts @@ -38,6 +38,9 @@ import { AddErrorToEvaluationRun1744964560174 } from './1744964560174-AddErrorTo import { FixErrorsColumnInEvaluationRun1746437114935 } from './1746437114935-FixErrorsColumnInEvaluationRun' import { ModifyExecutionDataColumnType1747902489801 } from './1747902489801-ModifyExecutionDataColumnType' import { ModifyChatflowType1755066758601 } from './1755066758601-ModifyChatflowType' +import { AddFlowHistoryEntity1750000000000 } from './1750000000000-AddFlowHistoryEntity' +import { AddCurrentHistoryVersion1750000000001 } from './1750000000001-AddCurrentHistoryVersion' +import { AddAssistantHistoryVersion1750000000002 } from './1750000000002-AddAssistantHistoryVersion' import { AddAuthTables1720230151482 } from '../../../enterprise/database/migrations/mysql/1720230151482-AddAuthTables' import { AddWorkspace1720230151484 } from '../../../enterprise/database/migrations/mysql/1720230151484-AddWorkspace' @@ -102,5 +105,8 @@ export const mysqlMigrations = [ FixErrorsColumnInEvaluationRun1746437114935, ExecutionLinkWorkspaceId1746862866554, ModifyExecutionDataColumnType1747902489801, - ModifyChatflowType1755066758601 + ModifyChatflowType1755066758601, + AddFlowHistoryEntity1750000000000, + AddCurrentHistoryVersion1750000000001, + AddAssistantHistoryVersion1750000000002 ] diff --git a/packages/server/src/database/migrations/postgres/1750000000000-AddFlowHistoryEntity.ts b/packages/server/src/database/migrations/postgres/1750000000000-AddFlowHistoryEntity.ts new file mode 100644 index 00000000000..198d0174e1c --- /dev/null +++ b/packages/server/src/database/migrations/postgres/1750000000000-AddFlowHistoryEntity.ts @@ -0,0 +1,27 @@ +import { MigrationInterface, QueryRunner } from 'typeorm' + +export class AddFlowHistoryEntity1750000000000 implements MigrationInterface { + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query( + `CREATE TABLE IF NOT EXISTS "flow_history" ( + "id" uuid NOT NULL DEFAULT gen_random_uuid(), + "entityType" varchar(20) NOT NULL, + "entityId" uuid NOT NULL, + "snapshotData" text NOT NULL, + "changeDescription" text DEFAULT NULL, + "version" integer NOT NULL, + "createdDate" timestamp(6) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "workspaceId" text DEFAULT NULL, + CONSTRAINT "PK_flow_history_id" PRIMARY KEY ("id") + );` + ) + + await queryRunner.query(`CREATE INDEX "IDX_flow_history_entity_version" ON "flow_history" ("entityType", "entityId", "version");`) + + await queryRunner.query(`CREATE INDEX "IDX_flow_history_entity_date" ON "flow_history" ("entityType", "entityId", "createdDate");`) + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query(`DROP TABLE "flow_history"`) + } +} diff --git a/packages/server/src/database/migrations/postgres/1750000000001-AddCurrentHistoryVersion.ts b/packages/server/src/database/migrations/postgres/1750000000001-AddCurrentHistoryVersion.ts new file mode 100644 index 00000000000..2df04a3bea0 --- /dev/null +++ b/packages/server/src/database/migrations/postgres/1750000000001-AddCurrentHistoryVersion.ts @@ -0,0 +1,11 @@ +import { MigrationInterface, QueryRunner } from 'typeorm' + +export class AddCurrentHistoryVersion1750000000001 implements MigrationInterface { + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query(`ALTER TABLE "chat_flow" ADD "currentHistoryVersion" integer`) + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query(`ALTER TABLE "chat_flow" DROP COLUMN "currentHistoryVersion"`) + } +} diff --git a/packages/server/src/database/migrations/postgres/1750000000002-AddAssistantHistoryVersion.ts b/packages/server/src/database/migrations/postgres/1750000000002-AddAssistantHistoryVersion.ts new file mode 100644 index 00000000000..da2bf3da35e --- /dev/null +++ b/packages/server/src/database/migrations/postgres/1750000000002-AddAssistantHistoryVersion.ts @@ -0,0 +1,11 @@ +import { MigrationInterface, QueryRunner } from 'typeorm' + +export class AddAssistantHistoryVersion1750000000002 implements MigrationInterface { + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query(`ALTER TABLE "assistant" ADD "currentHistoryVersion" integer`) + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query(`ALTER TABLE "assistant" DROP COLUMN "currentHistoryVersion"`) + } +} diff --git a/packages/server/src/database/migrations/postgres/index.ts b/packages/server/src/database/migrations/postgres/index.ts index 89f7eee77d3..7d392d4e150 100644 --- a/packages/server/src/database/migrations/postgres/index.ts +++ b/packages/server/src/database/migrations/postgres/index.ts @@ -37,6 +37,9 @@ import { FixOpenSourceAssistantTable1743758056188 } from './1743758056188-FixOpe import { AddErrorToEvaluationRun1744964560174 } from './1744964560174-AddErrorToEvaluationRun' import { ModifyExecutionSessionIdFieldType1748450230238 } from './1748450230238-ModifyExecutionSessionIdFieldType' import { ModifyChatflowType1755066758601 } from './1755066758601-ModifyChatflowType' +import { AddFlowHistoryEntity1750000000000 } from './1750000000000-AddFlowHistoryEntity' +import { AddCurrentHistoryVersion1750000000001 } from './1750000000001-AddCurrentHistoryVersion' +import { AddAssistantHistoryVersion1750000000002 } from './1750000000002-AddAssistantHistoryVersion' import { AddAuthTables1720230151482 } from '../../../enterprise/database/migrations/postgres/1720230151482-AddAuthTables' import { AddWorkspace1720230151484 } from '../../../enterprise/database/migrations/postgres/1720230151484-AddWorkspace' @@ -100,5 +103,8 @@ export const postgresMigrations = [ AddErrorToEvaluationRun1744964560174, ExecutionLinkWorkspaceId1746862866554, ModifyExecutionSessionIdFieldType1748450230238, - ModifyChatflowType1755066758601 + ModifyChatflowType1755066758601, + AddFlowHistoryEntity1750000000000, + AddCurrentHistoryVersion1750000000001, + AddAssistantHistoryVersion1750000000002 ] diff --git a/packages/server/src/database/migrations/sqlite/1750000000000-AddFlowHistoryEntity.ts b/packages/server/src/database/migrations/sqlite/1750000000000-AddFlowHistoryEntity.ts new file mode 100644 index 00000000000..034dbac28f9 --- /dev/null +++ b/packages/server/src/database/migrations/sqlite/1750000000000-AddFlowHistoryEntity.ts @@ -0,0 +1,34 @@ +import { MigrationInterface, QueryRunner } from 'typeorm' + +export class AddFlowHistoryEntity1750000000000 implements MigrationInterface { + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query( + `CREATE TABLE IF NOT EXISTS "flow_history" ( + "id" varchar PRIMARY KEY NOT NULL, + "entityType" varchar(20) NOT NULL, + "entityId" varchar NOT NULL, + "snapshotData" text NOT NULL, + "version" integer NOT NULL, + "changeDescription" text, + "workspaceId" varchar, + "createdDate" datetime NOT NULL DEFAULT (datetime('now')), + "updatedDate" datetime NOT NULL DEFAULT (datetime('now')) + );` + ) + + // Create indexes for better query performance + await queryRunner.query( + `CREATE INDEX IF NOT EXISTS "IDX_flow_history_entity_version" ON "flow_history" ("entityType", "entityId", "version");` + ) + + await queryRunner.query( + `CREATE INDEX IF NOT EXISTS "IDX_flow_history_entity_date" ON "flow_history" ("entityType", "entityId", "createdDate");` + ) + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query(`DROP INDEX IF EXISTS "IDX_flow_history_entity_date"`) + await queryRunner.query(`DROP INDEX IF EXISTS "IDX_flow_history_entity_version"`) + await queryRunner.query(`DROP TABLE IF EXISTS "flow_history"`) + } +} diff --git a/packages/server/src/database/migrations/sqlite/1750000000001-AddCurrentHistoryVersion.ts b/packages/server/src/database/migrations/sqlite/1750000000001-AddCurrentHistoryVersion.ts new file mode 100644 index 00000000000..2df04a3bea0 --- /dev/null +++ b/packages/server/src/database/migrations/sqlite/1750000000001-AddCurrentHistoryVersion.ts @@ -0,0 +1,11 @@ +import { MigrationInterface, QueryRunner } from 'typeorm' + +export class AddCurrentHistoryVersion1750000000001 implements MigrationInterface { + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query(`ALTER TABLE "chat_flow" ADD "currentHistoryVersion" integer`) + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query(`ALTER TABLE "chat_flow" DROP COLUMN "currentHistoryVersion"`) + } +} diff --git a/packages/server/src/database/migrations/sqlite/1750000000002-AddAssistantHistoryVersion.ts b/packages/server/src/database/migrations/sqlite/1750000000002-AddAssistantHistoryVersion.ts new file mode 100644 index 00000000000..da2bf3da35e --- /dev/null +++ b/packages/server/src/database/migrations/sqlite/1750000000002-AddAssistantHistoryVersion.ts @@ -0,0 +1,11 @@ +import { MigrationInterface, QueryRunner } from 'typeorm' + +export class AddAssistantHistoryVersion1750000000002 implements MigrationInterface { + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query(`ALTER TABLE "assistant" ADD "currentHistoryVersion" integer`) + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query(`ALTER TABLE "assistant" DROP COLUMN "currentHistoryVersion"`) + } +} diff --git a/packages/server/src/database/migrations/sqlite/index.ts b/packages/server/src/database/migrations/sqlite/index.ts index b62d888f937..e2746a8b382 100644 --- a/packages/server/src/database/migrations/sqlite/index.ts +++ b/packages/server/src/database/migrations/sqlite/index.ts @@ -35,6 +35,9 @@ import { AddExecutionEntity1738090872625 } from './1738090872625-AddExecutionEnt import { FixOpenSourceAssistantTable1743758056188 } from './1743758056188-FixOpenSourceAssistantTable' import { AddErrorToEvaluationRun1744964560174 } from './1744964560174-AddErrorToEvaluationRun' import { ModifyChatflowType1755066758601 } from './1755066758601-ModifyChatflowType' +import { AddFlowHistoryEntity1750000000000 } from './1750000000000-AddFlowHistoryEntity' +import { AddCurrentHistoryVersion1750000000001 } from './1750000000001-AddCurrentHistoryVersion' +import { AddAssistantHistoryVersion1750000000002 } from './1750000000002-AddAssistantHistoryVersion' import { AddAuthTables1720230151482 } from '../../../enterprise/database/migrations/sqlite/1720230151482-AddAuthTables' import { AddWorkspace1720230151484 } from '../../../enterprise/database/migrations/sqlite/1720230151484-AddWorkspace' @@ -96,5 +99,8 @@ export const sqliteMigrations = [ FixOpenSourceAssistantTable1743758056188, AddErrorToEvaluationRun1744964560174, ExecutionLinkWorkspaceId1746862866554, - ModifyChatflowType1755066758601 + ModifyChatflowType1755066758601, + AddFlowHistoryEntity1750000000000, + AddCurrentHistoryVersion1750000000001, + AddAssistantHistoryVersion1750000000002 ] diff --git a/packages/server/src/routes/history/index.ts b/packages/server/src/routes/history/index.ts new file mode 100644 index 00000000000..a575b68bfc7 --- /dev/null +++ b/packages/server/src/routes/history/index.ts @@ -0,0 +1,26 @@ +import express from 'express' +import historyController from '../../controllers/history' +import { checkAnyPermission } from '../../enterprise/rbac/PermissionCheck' + +const router = express.Router() + +// GET specific snapshot by ID +router.get('/snapshot/:historyId', checkAnyPermission('chatflows:view,assistants:view'), historyController.getSnapshotById) + +// GET comparison between two snapshots +router.get( + '/compare/:historyId1/:historyId2', + checkAnyPermission('chatflows:view,assistants:view'), + historyController.getSnapshotComparison +) + +// POST restore from history snapshot +router.post('/restore/:historyId', checkAnyPermission('chatflows:update,assistants:update'), historyController.restoreSnapshot) + +// DELETE history snapshot +router.delete('/snapshot/:historyId', checkAnyPermission('chatflows:delete,assistants:delete'), historyController.deleteSnapshot) + +// GET history for an entity +router.get('/:entityType/:entityId', checkAnyPermission('chatflows:view,assistants:view'), historyController.getHistory) + +export default router diff --git a/packages/server/src/routes/index.ts b/packages/server/src/routes/index.ts index 4941a0076f9..1fe70a0fefa 100644 --- a/packages/server/src/routes/index.ts +++ b/packages/server/src/routes/index.ts @@ -20,6 +20,7 @@ import filesRouter from './files' import flowConfigRouter from './flow-config' import getUploadFileRouter from './get-upload-file' import getUploadPathRouter from './get-upload-path' +import historyRouter from './history' import internalChatmessagesRouter from './internal-chat-messages' import internalPredictionRouter from './internal-predictions' import leadsRouter from './leads' @@ -93,6 +94,7 @@ router.use('/internal-chatmessage', internalChatmessagesRouter) router.use('/internal-prediction', internalPredictionRouter) router.use('/get-upload-file', getUploadFileRouter) router.use('/get-upload-path', getUploadPathRouter) +router.use('/history', historyRouter) router.use('/leads', leadsRouter) router.use('/load-prompt', loadPromptRouter) router.use('/marketplaces', marketplacesRouter) diff --git a/packages/server/src/services/assistants/index.ts b/packages/server/src/services/assistants/index.ts index 72dfc4a008a..d3f938d4217 100644 --- a/packages/server/src/services/assistants/index.ts +++ b/packages/server/src/services/assistants/index.ts @@ -19,6 +19,7 @@ import logger from '../../utils/logger' import { ASSISTANT_PROMPT_GENERATOR } from '../../utils/prompt' import { checkUsageLimit } from '../../utils/quotaUsage' import nodesService from '../nodes' +import historyService from '../history' const createAssistant = async (requestBody: any, orgId: string): Promise => { try { @@ -46,6 +47,16 @@ const createAssistant = async (requestBody: any, orgId: string): Promise => { - validateChatflowType(newChatFlow.type) - const appServer = getRunningExpressApp() - - let dbResponse: ChatFlow - if (containsBase64File(newChatFlow)) { - // we need a 2-step process, as we need to save the chatflow first and then update the file paths - // this is because we need the chatflow id to create the file paths - - // step 1 - save with empty flowData - const incomingFlowData = newChatFlow.flowData - newChatFlow.flowData = JSON.stringify({}) - const chatflow = appServer.AppDataSource.getRepository(ChatFlow).create(newChatFlow) - const step1Results = await appServer.AppDataSource.getRepository(ChatFlow).save(chatflow) - - // step 2 - convert base64 to file paths and update the chatflow - step1Results.flowData = await updateFlowDataWithFilePaths( - step1Results.id, - incomingFlowData, - orgId, - workspaceId, - subscriptionId, - usageCacheManager + try { + validateChatflowType(newChatFlow.type) + const appServer = getRunningExpressApp() + + let dbResponse: ChatFlow + if (containsBase64File(newChatFlow)) { + // we need a 2-step process, as we need to save the chatflow first and then update the file paths + // this is because we need the chatflow id to create the file paths + + // step 1 - save with empty flowData + const incomingFlowData = newChatFlow.flowData + newChatFlow.flowData = JSON.stringify({}) + const chatflow = appServer.AppDataSource.getRepository(ChatFlow).create(newChatFlow) + const step1Results = await appServer.AppDataSource.getRepository(ChatFlow).save(chatflow) + + // step 2 - convert base64 to file paths and update the chatflow + step1Results.flowData = await updateFlowDataWithFilePaths( + step1Results.id, + incomingFlowData, + orgId, + workspaceId, + subscriptionId, + usageCacheManager + ) + await _checkAndUpdateDocumentStoreUsage(step1Results, newChatFlow.workspaceId) + dbResponse = await appServer.AppDataSource.getRepository(ChatFlow).save(step1Results) + } else { + const chatflow = appServer.AppDataSource.getRepository(ChatFlow).create(newChatFlow) + dbResponse = await appServer.AppDataSource.getRepository(ChatFlow).save(chatflow) + } + await appServer.telemetry.sendTelemetry( + 'chatflow_created', + { + version: await getAppVersion(), + chatflowId: dbResponse.id, + flowGraph: getTelemetryFlowObj(JSON.parse(dbResponse.flowData)?.nodes, JSON.parse(dbResponse.flowData)?.edges) + }, + orgId + ) + + appServer.metricsProvider?.incrementCounter( + dbResponse?.type === 'MULTIAGENT' ? FLOWISE_METRIC_COUNTERS.AGENTFLOW_CREATED : FLOWISE_METRIC_COUNTERS.CHATFLOW_CREATED, + { status: FLOWISE_COUNTER_STATUS.SUCCESS } + ) + + // Create initial history snapshot + const snapshot = await historyService.createSnapshot({ + entityType: 'CHATFLOW', + entityId: dbResponse.id, + entityData: dbResponse, + changeDescription: 'Initial creation', + workspaceId: dbResponse.workspaceId + }) + if (snapshot) { + // Re-fetch the chatflow to get the updated currentHistoryVersion + const updatedChatflow = await appServer.AppDataSource.getRepository(ChatFlow).findOne({ + where: { id: dbResponse.id } + }) + return updatedChatflow || dbResponse + } + + return dbResponse + } catch (error) { + throw new InternalFlowiseError( + StatusCodes.INTERNAL_SERVER_ERROR, + `Error: chatflowsService.saveChatflow - ${getErrorMessage(error)}` ) - await _checkAndUpdateDocumentStoreUsage(step1Results, newChatFlow.workspaceId) - dbResponse = await appServer.AppDataSource.getRepository(ChatFlow).save(step1Results) - } else { - const chatflow = appServer.AppDataSource.getRepository(ChatFlow).create(newChatFlow) - dbResponse = await appServer.AppDataSource.getRepository(ChatFlow).save(chatflow) } - await appServer.telemetry.sendTelemetry( - 'chatflow_created', - { - version: await getAppVersion(), - chatflowId: dbResponse.id, - flowGraph: getTelemetryFlowObj(JSON.parse(dbResponse.flowData)?.nodes, JSON.parse(dbResponse.flowData)?.edges) - }, - orgId - ) - - appServer.metricsProvider?.incrementCounter( - dbResponse?.type === 'MULTIAGENT' ? FLOWISE_METRIC_COUNTERS.AGENTFLOW_CREATED : FLOWISE_METRIC_COUNTERS.CHATFLOW_CREATED, - { status: FLOWISE_COUNTER_STATUS.SUCCESS } - ) - - return dbResponse } const updateChatflow = async ( @@ -317,27 +341,76 @@ const updateChatflow = async ( workspaceId: string, subscriptionId: string ): Promise => { - const appServer = getRunningExpressApp() - if (updateChatFlow.flowData && containsBase64File(updateChatFlow)) { - updateChatFlow.flowData = await updateFlowDataWithFilePaths( - chatflow.id, - updateChatFlow.flowData, - orgId, - workspaceId, - subscriptionId, - appServer.usageCacheManager + try { + const appServer = getRunningExpressApp() + if (updateChatFlow.flowData && containsBase64File(updateChatFlow)) { + updateChatFlow.flowData = await updateFlowDataWithFilePaths( + chatflow.id, + updateChatFlow.flowData, + orgId, + workspaceId, + subscriptionId, + appServer.usageCacheManager + ) + } + if (updateChatFlow.type || updateChatFlow.type === '') { + validateChatflowType(updateChatFlow.type) + } else { + updateChatFlow.type = chatflow.type + } + + const newDbChatflow = appServer.AppDataSource.getRepository(ChatFlow).merge(chatflow, updateChatFlow) + await _checkAndUpdateDocumentStoreUsage(newDbChatflow, chatflow.workspaceId) + const dbResponse = await appServer.AppDataSource.getRepository(ChatFlow).save(newDbChatflow) + + // Create history snapshot for update + const snapshot = await historyService.createSnapshot({ + entityType: 'CHATFLOW', + entityId: dbResponse.id, + entityData: dbResponse, + changeDescription: 'Updated', + workspaceId: dbResponse.workspaceId + }) + if (snapshot) { + // Re-fetch the chatflow to get the updated currentHistoryVersion + const updatedChatflow = await appServer.AppDataSource.getRepository(ChatFlow).findOne({ + where: { id: dbResponse.id } + }) + return updatedChatflow || dbResponse + } + + return dbResponse + } catch (error) { + throw new InternalFlowiseError( + StatusCodes.INTERNAL_SERVER_ERROR, + `Error: chatflowsService.updateChatflow - ${getErrorMessage(error)}` ) } - if (updateChatFlow.type || updateChatFlow.type === '') { - validateChatflowType(updateChatFlow.type) - } else { - updateChatFlow.type = chatflow.type - } - const newDbChatflow = appServer.AppDataSource.getRepository(ChatFlow).merge(chatflow, updateChatFlow) - await _checkAndUpdateDocumentStoreUsage(newDbChatflow, chatflow.workspaceId) - const dbResponse = await appServer.AppDataSource.getRepository(ChatFlow).save(newDbChatflow) +} - return dbResponse +// Get specific chatflow via id (PUBLIC endpoint, used when sharing chatbot link) +const getSinglePublicChatflow = async (chatflowId: string): Promise => { + try { + const appServer = getRunningExpressApp() + const dbResponse = await appServer.AppDataSource.getRepository(ChatFlow).findOneBy({ + id: chatflowId + }) + if (dbResponse && dbResponse.isPublic) { + return dbResponse + } else if (dbResponse && !dbResponse.isPublic) { + throw new InternalFlowiseError(StatusCodes.UNAUTHORIZED, `Unauthorized`) + } + throw new InternalFlowiseError(StatusCodes.NOT_FOUND, `Chatflow ${chatflowId} not found`) + } catch (error) { + if (error instanceof InternalFlowiseError && error.statusCode === StatusCodes.UNAUTHORIZED) { + throw error + } else { + throw new InternalFlowiseError( + StatusCodes.INTERNAL_SERVER_ERROR, + `Error: chatflowsService.getSinglePublicChatflow - ${getErrorMessage(error)}` + ) + } + } } // Get specific chatflow chatbotConfig via id (PUBLIC endpoint, used to retrieve config for embedded chat) @@ -414,6 +487,7 @@ export default { getChatflowById, saveChatflow, updateChatflow, + getSinglePublicChatflow, getSinglePublicChatbotConfig, checkIfChatflowHasChanged, getAllChatflowsCountByOrganization diff --git a/packages/server/src/services/evaluations/LLMEvaluationRunner.ts b/packages/server/src/services/evaluations/LLMEvaluationRunner.ts index 351fdad6092..8c02edf6ad3 100644 --- a/packages/server/src/services/evaluations/LLMEvaluationRunner.ts +++ b/packages/server/src/services/evaluations/LLMEvaluationRunner.ts @@ -4,6 +4,7 @@ import { RunnableSequence } from '@langchain/core/runnables' import { PromptTemplate } from '@langchain/core/prompts' import { getRunningExpressApp } from '../../utils/getRunningExpressApp' import { databaseEntities } from '../../utils' +import { getErrorMessage } from '../../errors/utils' export class LLMEvaluationRunner { private llm: any @@ -39,8 +40,11 @@ export class LLMEvaluationRunner { }) evaluationResults.push(response) } catch (error) { + console.error('LLM evaluation failed:', error) + console.error('Evaluator details:', { evaluatorId: llmEvaluatorMap[i].evaluatorId, prompt: llmEvaluatorMap[i].evaluator.prompt }) + console.error('Input data:', { question: data.input, actualOutput, expectedOutput: data.expectedOutput }) evaluationResults.push({ - error: 'error' + error: getErrorMessage(error) || 'LLM evaluation failed' }) } } @@ -65,7 +69,9 @@ export class LLMEvaluationRunner { } return await newNodeInstance.init(nodeData, undefined, options) } catch (error) { - throw new Error('Error creating LLM') + console.error('Error creating LLM:', error) + console.error('LLM config:', data.llmConfig) + throw new Error(`Error creating LLM: ${getErrorMessage(error)}`) } } } diff --git a/packages/server/src/services/history/index.ts b/packages/server/src/services/history/index.ts new file mode 100644 index 00000000000..2da5c5acb2c --- /dev/null +++ b/packages/server/src/services/history/index.ts @@ -0,0 +1,305 @@ +import { StatusCodes } from 'http-status-codes' +import { FindOptionsWhere, Repository } from 'typeorm' +import { FlowHistory, EntityType } from '../../database/entities/FlowHistory' +import { ChatFlow } from '../../database/entities/ChatFlow' +import { Assistant } from '../../database/entities/Assistant' +import { InternalFlowiseError } from '../../errors/internalFlowiseError' +import { getErrorMessage } from '../../errors/utils' +import { getRunningExpressApp } from '../../utils/getRunningExpressApp' +import logger from '../../utils/logger' + +const updateEntityVersion = async (appServer: any, entityType: EntityType, entityId: string, version: number): Promise => { + if (entityType === 'CHATFLOW') { + const chatflowRepository = appServer.AppDataSource.getRepository(ChatFlow) + await chatflowRepository.update(entityId, { currentHistoryVersion: version }) + } else if (entityType === 'ASSISTANT') { + const assistantRepository = appServer.AppDataSource.getRepository(Assistant) + await assistantRepository.update(entityId, { currentHistoryVersion: version }) + } +} + +const cleanEntityData = (data: any) => { + const { updatedDate: _updatedDate, createdDate: _createdDate, currentHistoryVersion: _currentHistoryVersion, ...cleanData } = data + return cleanData +} + +const hasEntityDataChanged = async ( + entityType: EntityType, + entityId: string, + newEntityData: any, + historyRepository: Repository +): Promise => { + try { + const lastSnapshot = await historyRepository.findOne({ + where: { entityType, entityId }, + order: { version: 'DESC' } + }) + + if (!lastSnapshot) return true + + const lastData = cleanEntityData(JSON.parse(lastSnapshot.snapshotData)) + const newData = cleanEntityData(JSON.parse(JSON.stringify(newEntityData))) + + const hasChanged = JSON.stringify(lastData) !== JSON.stringify(newData) + logger.debug(`Data comparison for ${entityType} ${entityId}: ${hasChanged ? 'changed' : 'no changes'}`) + return hasChanged + } catch (error) { + logger.warn(`Failed to compare entity data for ${entityType} ${entityId}: ${getErrorMessage(error)}`) + return true + } +} + +interface CreateSnapshotOptions { + entityType: EntityType + entityId: string + entityData: any + changeDescription?: string + workspaceId?: string +} + +interface GetHistoryOptions { + entityType: EntityType + entityId: string + workspaceId?: string + limit?: number + offset?: number +} + +interface RestoreSnapshotOptions { + historyId: string + workspaceId?: string +} + +const createSnapshot = async ({ + entityType, + entityId, + entityData, + changeDescription, + workspaceId +}: CreateSnapshotOptions): Promise => { + try { + const appServer = getRunningExpressApp() + const historyRepository = appServer.AppDataSource.getRepository(FlowHistory) + + const dataHasChanged = await hasEntityDataChanged(entityType, entityId, entityData, historyRepository) + if (!dataHasChanged) { + logger.debug(`No changes detected for ${entityType} ${entityId}, skipping snapshot creation`) + return null + } + + const lastSnapshot = await historyRepository.findOne({ + where: { entityType, entityId }, + order: { version: 'DESC' } + }) + + const nextVersion = lastSnapshot ? lastSnapshot.version + 1 : 1 + + const snapshot = historyRepository.create({ + entityType, + entityId, + snapshotData: JSON.stringify(entityData), + changeDescription, + version: nextVersion, + workspaceId + }) + + const savedSnapshot = await historyRepository.save(snapshot) + await updateEntityVersion(appServer, entityType, entityId, nextVersion) + + logger.info(`Created history snapshot for ${entityType} ${entityId}, version ${nextVersion}`) + await cleanupOldSnapshots(entityType, entityId, 50) + + return savedSnapshot + } catch (error) { + logger.error(`Failed to create history snapshot for ${entityType} ${entityId}: ${getErrorMessage(error)}`) + return null + } +} + +const getHistory = async ({ + entityType, + entityId, + workspaceId, + limit = 20, + offset = 0 +}: GetHistoryOptions): Promise<{ data: FlowHistory[]; total: number }> => { + try { + const appServer = getRunningExpressApp() + const historyRepository = appServer.AppDataSource.getRepository(FlowHistory) + + const whereConditions: FindOptionsWhere = { entityType, entityId } + if (workspaceId) whereConditions.workspaceId = workspaceId + + const [data, total] = await historyRepository.findAndCount({ + where: whereConditions, + order: { version: 'DESC' }, + take: limit, + skip: offset + }) + + return { data, total } + } catch (error) { + throw new InternalFlowiseError(StatusCodes.INTERNAL_SERVER_ERROR, `Error: historyService.getHistory - ${getErrorMessage(error)}`) + } +} + +const getSnapshotById = async (historyId: string, workspaceId?: string): Promise => { + try { + const appServer = getRunningExpressApp() + const historyRepository = appServer.AppDataSource.getRepository(FlowHistory) + + const whereConditions: FindOptionsWhere = { id: historyId } + if (workspaceId) whereConditions.workspaceId = workspaceId + + const snapshot = await historyRepository.findOne({ where: whereConditions }) + if (!snapshot) { + throw new InternalFlowiseError(StatusCodes.NOT_FOUND, `History snapshot ${historyId} not found`) + } + + return snapshot + } catch (error) { + throw new InternalFlowiseError( + StatusCodes.INTERNAL_SERVER_ERROR, + `Error: historyService.getSnapshotById - ${getErrorMessage(error)}` + ) + } +} + +const restoreChatflow = async (appServer: any, snapshot: FlowHistory, snapshotData: any): Promise => { + const chatflowRepository = appServer.AppDataSource.getRepository(ChatFlow) + const existing = await chatflowRepository.findOne({ where: { id: snapshot.entityId } }) + + if (!existing) { + throw new InternalFlowiseError(StatusCodes.NOT_FOUND, `ChatFlow ${snapshot.entityId} not found`) + } + + const updated = chatflowRepository.merge(existing, { + ...snapshotData, + currentHistoryVersion: snapshot.version + }) + + return await chatflowRepository.save(updated) +} + +const restoreAssistant = async (appServer: any, snapshot: FlowHistory, snapshotData: any): Promise => { + const assistantRepository = appServer.AppDataSource.getRepository(Assistant) + const existing = await assistantRepository.findOne({ where: { id: snapshot.entityId } }) + + if (!existing) { + throw new InternalFlowiseError(StatusCodes.NOT_FOUND, `Assistant ${snapshot.entityId} not found`) + } + + const updated = assistantRepository.merge(existing, { + details: snapshotData.details, + credential: snapshotData.credential, + iconSrc: snapshotData.iconSrc, + type: snapshotData.type + }) + + return await assistantRepository.save(updated) +} + +const restoreSnapshot = async ({ historyId, workspaceId }: RestoreSnapshotOptions): Promise => { + try { + const appServer = getRunningExpressApp() + const snapshot = await getSnapshotById(historyId, workspaceId) + const snapshotData = JSON.parse(snapshot.snapshotData) + + const restoredEntity = + snapshot.entityType === 'CHATFLOW' + ? await restoreChatflow(appServer, snapshot, snapshotData) + : await restoreAssistant(appServer, snapshot, snapshotData) + + await createSnapshot({ + entityType: snapshot.entityType, + entityId: snapshot.entityId, + entityData: restoredEntity, + changeDescription: `Restored from version ${snapshot.version}`, + workspaceId + }) + + logger.info(`Restored ${snapshot.entityType} ${snapshot.entityId} from version ${snapshot.version}`) + return restoredEntity + } catch (error) { + throw new InternalFlowiseError( + StatusCodes.INTERNAL_SERVER_ERROR, + `Error: historyService.restoreSnapshot - ${getErrorMessage(error)}` + ) + } +} + +const deleteSnapshot = async (historyId: string, workspaceId?: string): Promise => { + try { + const appServer = getRunningExpressApp() + const historyRepository = appServer.AppDataSource.getRepository(FlowHistory) + + const whereConditions: FindOptionsWhere = { id: historyId } + if (workspaceId) whereConditions.workspaceId = workspaceId + + const result = await historyRepository.delete(whereConditions) + if (result.affected === 0) { + throw new InternalFlowiseError(StatusCodes.NOT_FOUND, `History snapshot ${historyId} not found`) + } + + logger.info(`Deleted history snapshot ${historyId}`) + } catch (error) { + throw new InternalFlowiseError( + StatusCodes.INTERNAL_SERVER_ERROR, + `Error: historyService.deleteSnapshot - ${getErrorMessage(error)}` + ) + } +} + +const cleanupOldSnapshots = async (entityType: EntityType, entityId: string, keepCount: number = 50): Promise => { + try { + const appServer = getRunningExpressApp() + const historyRepository = appServer.AppDataSource.getRepository(FlowHistory) + + const allSnapshots = await historyRepository.find({ + where: { entityType, entityId }, + order: { version: 'DESC' } + }) + + if (allSnapshots.length > keepCount) { + const idsToDelete = allSnapshots.slice(keepCount).map((s) => s.id) + await historyRepository.delete(idsToDelete) + logger.info(`Cleaned up ${idsToDelete.length} old snapshots for ${entityType} ${entityId}`) + } + } catch (error) { + logger.error(`Error cleaning up old snapshots: ${getErrorMessage(error)}`) + } +} + +const getSnapshotComparison = async ( + historyId1: string, + historyId2: string, + workspaceId?: string +): Promise<{ snapshot1: FlowHistory; snapshot2: FlowHistory }> => { + try { + const [snapshot1, snapshot2] = await Promise.all([ + getSnapshotById(historyId1, workspaceId), + getSnapshotById(historyId2, workspaceId) + ]) + + if (snapshot1.entityType !== snapshot2.entityType || snapshot1.entityId !== snapshot2.entityId) { + throw new InternalFlowiseError(StatusCodes.BAD_REQUEST, 'Cannot compare snapshots from different entities') + } + + return { snapshot1, snapshot2 } + } catch (error) { + throw new InternalFlowiseError( + StatusCodes.INTERNAL_SERVER_ERROR, + `Error: historyService.getSnapshotComparison - ${getErrorMessage(error)}` + ) + } +} + +export default { + createSnapshot, + getHistory, + getSnapshotById, + restoreSnapshot, + deleteSnapshot, + cleanupOldSnapshots, + getSnapshotComparison +} diff --git a/packages/server/src/utils/SSEStreamer.ts b/packages/server/src/utils/SSEStreamer.ts index 2b950579cc1..61c03f4801a 100644 --- a/packages/server/src/utils/SSEStreamer.ts +++ b/packages/server/src/utils/SSEStreamer.ts @@ -8,10 +8,20 @@ type Client = { response: Response // optional property with default value started?: boolean + // flag to indicate client is pending removal due to active MCP connections + pendingRemoval?: boolean } export class SSEStreamer implements IServerSideEventStreamer { clients: { [id: string]: Client } = {} + private activeMcpConnections: Map< + string, + { + toolName: string + startTime: number + status: 'active' | 'completing' + } + > = new Map() addExternalClient(chatId: string, res: Response) { this.clients[chatId] = { clientType: 'EXTERNAL', response: res, started: false } @@ -21,7 +31,52 @@ export class SSEStreamer implements IServerSideEventStreamer { this.clients[chatId] = { clientType: 'INTERNAL', response: res, started: false } } + addMcpConnection(chatId: string, toolName?: string) { + const connectionId = `${chatId}:${toolName || 'unknown'}` + this.activeMcpConnections.set(connectionId, { + toolName: toolName || 'unknown', + startTime: Date.now(), + status: 'active' + }) + } + + markMcpConnectionCompleting(chatId: string, toolName?: string) { + const connectionId = `${chatId}:${toolName || 'unknown'}` + const connection = this.activeMcpConnections.get(connectionId) + if (connection) { + connection.status = 'completing' + } + } + + removeMcpConnection(chatId: string, toolName?: string) { + const connectionId = `${chatId}:${toolName || 'unknown'}` + this.activeMcpConnections.delete(connectionId) + + const hasActiveConnections = Array.from(this.activeMcpConnections.keys()).some((key) => key.startsWith(`${chatId}:`)) + + if (!hasActiveConnections && this.clients[chatId]?.pendingRemoval) { + this.forceRemoveClient(chatId) + } + } + + hasMcpConnections(chatId: string): boolean { + return Array.from(this.activeMcpConnections.keys()).some((key) => key.startsWith(`${chatId}:`)) + } + removeClient(chatId: string) { + const client = this.clients[chatId] + if (client) { + // Check if there are active MCP connections for this chatId + if (this.hasMcpConnections(chatId)) { + client.pendingRemoval = true + return + } + + this.forceRemoveClient(chatId) + } + } + + forceRemoveClient(chatId: string) { const client = this.clients[chatId] if (client) { const clientResponse = { @@ -31,6 +86,8 @@ export class SSEStreamer implements IServerSideEventStreamer { client.response.write('message\ndata:' + JSON.stringify(clientResponse) + '\n\n') client.response.end() delete this.clients[chatId] + // Also clean up any remaining MCP connections + this.activeMcpConnections.delete(chatId) } } diff --git a/packages/server/src/utils/buildChatflow.ts b/packages/server/src/utils/buildChatflow.ts index be0bb6a105a..622f6a6adcd 100644 --- a/packages/server/src/utils/buildChatflow.ts +++ b/packages/server/src/utils/buildChatflow.ts @@ -642,6 +642,7 @@ export const executeFlow = async ({ result.question = incomingInput.question result.chatId = chatId result.chatMessageId = chatMessage?.id + result.timestamp = new Date().toISOString() if (sessionId) result.sessionId = sessionId if (memoryType) result.memoryType = memoryType if (agentReasoning?.length) result.agentReasoning = agentReasoning @@ -823,6 +824,7 @@ export const executeFlow = async ({ result.chatMessageId = chatMessage?.id result.followUpPrompts = JSON.stringify(apiMessage.followUpPrompts) result.isStreamValid = isStreamValid + result.timestamp = new Date().toISOString() if (sessionId) result.sessionId = sessionId if (memoryType) result.memoryType = memoryType diff --git a/packages/ui/src/api/history.js b/packages/ui/src/api/history.js new file mode 100644 index 00000000000..03b049a9c45 --- /dev/null +++ b/packages/ui/src/api/history.js @@ -0,0 +1,19 @@ +import client from './client' + +const getHistory = (entityType, entityId, params) => client.get(`/history/${entityType}/${entityId}`, { params }) + +const getSnapshotById = (historyId) => client.get(`/history/snapshot/${historyId}`) + +const restoreSnapshot = (historyId) => client.post(`/history/restore/${historyId}`) + +const deleteSnapshot = (historyId) => client.delete(`/history/snapshot/${historyId}`) + +const getSnapshotComparison = (historyId1, historyId2) => client.get(`/history/compare/${historyId1}/${historyId2}`) + +export default { + getHistory, + getSnapshotById, + restoreSnapshot, + deleteSnapshot, + getSnapshotComparison +} diff --git a/packages/ui/src/ui-component/button/HistoryButton.jsx b/packages/ui/src/ui-component/button/HistoryButton.jsx new file mode 100644 index 00000000000..44a84d34044 --- /dev/null +++ b/packages/ui/src/ui-component/button/HistoryButton.jsx @@ -0,0 +1,61 @@ +import { useState } from 'react' +import PropTypes from 'prop-types' +import { IconButton, Tooltip } from '@mui/material' +import { IconHistory } from '@tabler/icons-react' + +// project imports +import HistoryDialog from '@/ui-component/dialog/HistoryDialog' + +const HistoryButton = ({ entityType, entityId, entityName, onRestore, disabled = false, size = 'medium', color = 'default' }) => { + const [showHistory, setShowHistory] = useState(false) + + const handleOpenHistory = () => { + if (!disabled && entityId) { + setShowHistory(true) + } + } + + const handleCloseHistory = () => { + setShowHistory(false) + } + + const handleRestore = (historyItem) => { + onRestore?.(historyItem) + // Optionally reload the current flow/assistant data here + } + + return ( + <> + + + + + + + + + + + ) +} + +HistoryButton.propTypes = { + entityType: PropTypes.string.isRequired, + entityId: PropTypes.string.isRequired, + entityName: PropTypes.string.isRequired, + onRestore: PropTypes.func, + disabled: PropTypes.bool, + size: PropTypes.string, + color: PropTypes.string +} + +export default HistoryButton diff --git a/packages/ui/src/ui-component/dialog/HistoryDialog.jsx b/packages/ui/src/ui-component/dialog/HistoryDialog.jsx new file mode 100644 index 00000000000..d5eb0daca09 --- /dev/null +++ b/packages/ui/src/ui-component/dialog/HistoryDialog.jsx @@ -0,0 +1,302 @@ +import { useState, useEffect, useCallback } from 'react' +import { createPortal } from 'react-dom' +import { + Dialog, + DialogContent, + DialogTitle, + Typography, + Box, + List, + ListItem, + ListItemText, + ListItemSecondaryAction, + IconButton, + Button, + Chip, + Paper, + Skeleton, + Alert, + DialogActions, + Tooltip +} from '@mui/material' +import { useTheme } from '@mui/material/styles' + +// icons +import { IconHistory, IconRestore, IconTrash, IconEye, IconX } from '@tabler/icons-react' + +// project imports +import useConfirm from '@/hooks/useConfirm' +import useNotifier from '@/utils/useNotifier' +import { useDispatch } from 'react-redux' +import { enqueueSnackbar as enqueueSnackbarAction } from '@/store/actions' + +// API +import historyApi from '@/api/history' + +// utils +import moment from 'moment' + +const HistoryDialog = ({ show, dialogProps, onCancel, onRestore }) => { + const theme = useTheme() + const portalElement = document.getElementById('portal') + const { confirm } = useConfirm() + const dispatch = useDispatch() + useNotifier() // Side effect hook + + const enqueueSnackbar = useCallback((...args) => dispatch(enqueueSnackbarAction(...args)), [dispatch]) + + const [historyItems, setHistoryItems] = useState([]) + const [loading, setLoading] = useState(false) + const [selectedSnapshot, setSelectedSnapshot] = useState(null) + const [currentPage, setCurrentPage] = useState(1) + const [totalItems, setTotalItems] = useState(0) + const [latestVersion, setLatestVersion] = useState(null) + const [reloadTrigger, setReloadTrigger] = useState(0) + + const { entityType, entityId, entityName, currentVersion } = dialogProps || {} + const itemsPerPage = 10 + + // Load history when dialog opens + useEffect(() => { + if (!show || !entityType || !entityId) return + + const loadHistory = async () => { + try { + setLoading(true) + const params = { page: currentPage, limit: itemsPerPage } + const response = await historyApi.getHistory(entityType, entityId, params) + const historyData = response.data.data || [] + + setHistoryItems(historyData) + setTotalItems(response.data.total) + + if (latestVersion === null && currentPage === 1) { + // Set latest version only when on first page + setLatestVersion(historyData[0]?.version || null) + } + } catch (error) { + enqueueSnackbar({ + message: `Failed to load history: ${error.message}`, + options: { variant: 'error' } + }) + } finally { + setLoading(false) + } + } + + loadHistory() + }, [show, entityType, entityId, currentPage, reloadTrigger, enqueueSnackbar]) + + const handleRestore = async (historyItem) => { + const confirmed = await confirm({ + title: 'Restore Version', + description: `Are you sure you want to restore "${entityName}" to version ${historyItem.version}? This will create a new version with the restored data.`, + confirmButtonName: 'Restore', + cancelButtonName: 'Cancel' + }) + + if (confirmed) { + try { + const response = await historyApi.restoreSnapshot(historyItem.id) + enqueueSnackbar({ + message: `Successfully restored to version ${historyItem.version}`, + options: { variant: 'success' } + }) + const restoreData = { ...response.data, version: historyItem.version } + onRestore?.(restoreData) + onCancel() + } catch (error) { + console.error('Restore error:', error) + enqueueSnackbar({ + message: `Failed to restore: ${error.message}`, + options: { variant: 'error' } + }) + } + } + } + + const handleDelete = async (historyItem) => { + const confirmed = await confirm({ + title: 'Delete Version', + description: `Are you sure you want to delete version ${historyItem.version}? This action cannot be undone.`, + confirmButtonName: 'Delete', + cancelButtonName: 'Cancel' + }) + + if (confirmed) { + try { + await historyApi.deleteSnapshot(historyItem.id) + enqueueSnackbar({ + message: `Successfully deleted version ${historyItem.version}`, + options: { variant: 'success' } + }) + + // Force reload by triggering useEffect + setHistoryItems([]) + setLatestVersion(null) + setReloadTrigger((prev) => prev + 1) + } catch (error) { + console.error('Delete error:', error) + enqueueSnackbar({ + message: `Failed to delete: ${error.message}`, + options: { variant: 'error' } + }) + } + } + } + + const handleViewSnapshot = async (historyItem) => { + try { + const response = await historyApi.getSnapshotById(historyItem.id) + setSelectedSnapshot(response.data) + } catch (error) { + enqueueSnackbar({ + message: `Failed to load snapshot: ${error.message}`, + options: { variant: 'error' } + }) + } + } + + const formatDate = (dateString) => moment(dateString).fromNow() + const getVersionChipColor = (version, isCurrent) => (isCurrent ? 'success' : version === 1 ? 'secondary' : 'default') + + const component = show ? ( + + + + Version History - {entityName} + + + + {loading ? ( + + {[1, 2, 3].map((item) => ( + + ))} + + ) : historyItems.length === 0 ? ( + No version history found. History is automatically created when you save changes. + ) : ( + + + {historyItems.map((item) => { + const isCurrent = item.version === currentVersion + const isLatest = item.version === latestVersion + return ( + + + + + {isCurrent && } + {isLatest && !isCurrent && } + + {item.changeDescription || 'No description'} + + + } + secondary={ + + {formatDate(item.createdDate)} + + } + /> + + + + handleViewSnapshot(item)}> + + + + + {!isCurrent && ( + + handleRestore(item)} color='primary'> + + + + )} + + + handleDelete(item)} color='error'> + + + + + + + + ) + })} + + + {totalItems > itemsPerPage && ( + + + + Page {currentPage} of {Math.ceil(totalItems / itemsPerPage)} + + + + )} + + )} + + + + + + + {selectedSnapshot && ( + setSelectedSnapshot(null)}> + + + Version {selectedSnapshot.version} Snapshot + setSelectedSnapshot(null)}> + + + + + + Change Description: {selectedSnapshot.changeDescription || 'No description'} + + + Created: {formatDate(selectedSnapshot.createdDate)} + + + + + {JSON.stringify(JSON.parse(selectedSnapshot.snapshotData), null, 2)} + + + + + )} + + ) : null + + return createPortal(component, portalElement) +} + +export default HistoryDialog diff --git a/packages/ui/src/views/agentflowsv2/Canvas.jsx b/packages/ui/src/views/agentflowsv2/Canvas.jsx index 3a724666f94..99c4c6c6de7 100644 --- a/packages/ui/src/views/agentflowsv2/Canvas.jsx +++ b/packages/ui/src/views/agentflowsv2/Canvas.jsx @@ -240,6 +240,44 @@ const AgentflowCanvas = () => { } } + const handleFlowReload = (restoredChatflow) => { + // Directly update the canvas with the restored flow data + if (restoredChatflow && restoredChatflow.flowData) { + try { + const restoredFlow = JSON.parse(restoredChatflow.flowData) + + // Force React Flow to recognize the change by creating new array references + const newNodes = [...(restoredFlow.nodes || [])] + const newEdges = [...(restoredFlow.edges || [])] + + // Clear first, then set new data to force re-render + setNodes([]) + setEdges([]) + + // Use setTimeout to ensure React processes the clear operation first + setTimeout(() => { + setNodes(newNodes) + setEdges(newEdges) + dispatch({ type: SET_CHATFLOW, chatflow: restoredChatflow }) + dispatch({ type: REMOVE_DIRTY }) + + // Force React Flow to update its viewport after restore + if (reactFlowInstance) { + setTimeout(() => { + reactFlowInstance.fitView({ padding: 0.1 }) + }, 100) + } + }, 10) + } catch (error) { + console.error('Error parsing restored flow data:', error) + // Fallback to refetching from server + if (restoredChatflow.id) { + getSpecificChatflowApi.request(restoredChatflow.id) + } + } + } + } + // eslint-disable-next-line const onNodeClick = useCallback((event, clickedNode) => { setSelectedNode(clickedNode) @@ -694,6 +732,7 @@ const AgentflowCanvas = () => { handleSaveFlow={handleSaveFlow} handleDeleteFlow={handleDeleteFlow} handleLoadFlow={handleLoadFlow} + onFlowReload={handleFlowReload} isAgentCanvas={true} isAgentflowV2={true} /> diff --git a/packages/ui/src/views/assistants/custom/CustomAssistantConfigurePreview.jsx b/packages/ui/src/views/assistants/custom/CustomAssistantConfigurePreview.jsx index 5a45d247d88..f16e64c2460 100644 --- a/packages/ui/src/views/assistants/custom/CustomAssistantConfigurePreview.jsx +++ b/packages/ui/src/views/assistants/custom/CustomAssistantConfigurePreview.jsx @@ -20,7 +20,8 @@ import { IconX, IconTrash, IconWand, - IconArrowsMaximize + IconArrowsMaximize, + IconHistory } from '@tabler/icons-react' // Project import @@ -42,6 +43,7 @@ import PromptGeneratorDialog from '@/ui-component/dialog/PromptGeneratorDialog' import { Available } from '@/ui-component/rbac/available' import ExpandTextDialog from '@/ui-component/dialog/ExpandTextDialog' import { SwitchInput } from '@/ui-component/switch/Switch' +import HistoryDialog from '@/ui-component/dialog/HistoryDialog' // API import assistantsApi from '@/api/assistants' @@ -115,6 +117,8 @@ const CustomAssistantConfigurePreview = () => { const [assistantPromptGeneratorDialogProps, setAssistantPromptGeneratorDialogProps] = useState({}) const [showExpandDialog, setShowExpandDialog] = useState(false) const [expandDialogProps, setExpandDialogProps] = useState({}) + const [historyDialogOpen, setHistoryDialogOpen] = useState(false) + const [historyDialogProps, setHistoryDialogProps] = useState({}) const [loading, setLoading] = useState(false) const [loadingAssistant, setLoadingAssistant] = useState(true) @@ -664,6 +668,55 @@ const CustomAssistantConfigurePreview = () => { setAPIDialogOpen(true) } + const onHistoryClick = () => { + if (selectedCustomAssistant?.id) { + setHistoryDialogProps({ + entityType: 'ASSISTANT', + entityId: selectedCustomAssistant.id, + entityName: selectedCustomAssistant.name || selectedCustomAssistant.details?.name || 'Untitled', + currentVersion: selectedCustomAssistant.currentHistoryVersion + }) + setHistoryDialogOpen(true) + } + } + + const onHistoryRestore = async (restoredData) => { + try { + if (restoredData && restoredData.entity) { + enqueueSnackbar({ + message: `Successfully restored to version ${restoredData.version || 'previous'}. Refreshing...`, + options: { + key: new Date().getTime() + Math.random(), + variant: 'success' + } + }) + + // Give the user a moment to see the success message, then reload + setTimeout(() => { + window.location.reload() + }, 500) + } else { + enqueueSnackbar({ + message: 'Successfully restored. Please refresh the page to see changes.', + options: { + key: new Date().getTime() + Math.random(), + variant: 'success', + action: (_key) => ( + + ) + } + }) + } + } catch (error) { + enqueueSnackbar({ + message: `Failed to restore: ${error.message}`, + options: { variant: 'error' } + }) + } + } + const onDocStoreItemSelected = (docStoreIds) => { const docStoresIds = JSON.parse(docStoreIds) const newSelectedDocumentStores = [] @@ -752,7 +805,12 @@ const CustomAssistantConfigurePreview = () => { setLoadingAssistant(false) try { const assistantDetails = JSON.parse(getSpecificAssistantApi.data.details) - setSelectedCustomAssistant(assistantDetails) + // Set the full assistant entity (not just the details) + setSelectedCustomAssistant({ + ...getSpecificAssistantApi.data, + name: assistantDetails.name, + details: assistantDetails + }) if (assistantDetails.chatModel) { setSelectedChatModel(assistantDetails.chatModel) @@ -867,7 +925,7 @@ const CustomAssistantConfigurePreview = () => { - {selectedCustomAssistant?.name ?? ''} + {selectedCustomAssistant?.name || selectedCustomAssistant?.details?.name || ''}
@@ -915,6 +973,30 @@ const CustomAssistantConfigurePreview = () => { + {selectedCustomAssistant?.id && ( + + + + + + + + )} {customAssistantFlowId && !loadingAssistant && ( { setShowExpandDialog(false) }} > + setHistoryDialogOpen(false)} + onRestore={onHistoryRestore} + /> ) diff --git a/packages/ui/src/views/assistants/openai/AssistantDialog.jsx b/packages/ui/src/views/assistants/openai/AssistantDialog.jsx index 7b2b3041cc9..2300d98bb5c 100644 --- a/packages/ui/src/views/assistants/openai/AssistantDialog.jsx +++ b/packages/ui/src/views/assistants/openai/AssistantDialog.jsx @@ -29,6 +29,7 @@ import { File } from '@/ui-component/file/File' import { BackdropLoader } from '@/ui-component/loading/BackdropLoader' import DeleteConfirmDialog from './DeleteConfirmDialog' import AssistantVectorStoreDialog from './AssistantVectorStoreDialog' +import HistoryButton from '@/ui-component/button/HistoryButton' import { StyledPermissionButton } from '@/ui-component/button/RBACButtons' // Icons @@ -1035,6 +1036,28 @@ const AssistantDialog = ({ show, dialogProps, onCancel, onConfirm, setError }) = + {dialogProps.type === 'EDIT' && assistantId && ( + { + enqueueSnackbar({ + message: `Successfully restored to version ${historyItem.version}. Please reload the assistant to see changes.`, + options: { + key: new Date().getTime() + Math.random(), + variant: 'success', + action: (key) => ( + + ) + } + }) + }} + /> + )} {dialogProps.type === 'EDIT' && ( { +const CanvasHeader = ({ + chatflow, + isAgentCanvas, + isAgentflowV2, + handleSaveFlow, + handleDeleteFlow, + handleLoadFlow, + onFlowReload: _onFlowReload +}) => { const theme = useTheme() const dispatch = useDispatch() const navigate = useNavigate() @@ -58,6 +67,8 @@ const CanvasHeader = ({ chatflow, isAgentCanvas, isAgentflowV2, handleSaveFlow, const [exportAsTemplateDialogOpen, setExportAsTemplateDialogOpen] = useState(false) const [exportAsTemplateDialogProps, setExportAsTemplateDialogProps] = useState({}) + const [historyDialogOpen, setHistoryDialogOpen] = useState(false) + const [historyDialogProps, setHistoryDialogProps] = useState({}) const enqueueSnackbar = (...args) => dispatch(enqueueSnackbarAction(...args)) const closeSnackbar = (...args) => dispatch(closeSnackbarAction(...args)) @@ -218,6 +229,62 @@ const CanvasHeader = ({ chatflow, isAgentCanvas, isAgentflowV2, handleSaveFlow, else setFlowDialogOpen(true) } + const onHistoryClick = () => { + if (chatflow?.id) { + setHistoryDialogProps({ + entityType: 'CHATFLOW', // All canvas types use CHATFLOW entity type + entityId: chatflow.id, + entityName: chatflow.name || 'Untitled', + currentVersion: chatflow.currentHistoryVersion + }) + setHistoryDialogOpen(true) + } + } + + const onHistoryRestore = async (restoredData) => { + try { + // Update the chatflow data immediately after restore + if (restoredData && restoredData.entity) { + // Always do a page reload for now to ensure proper canvas update + enqueueSnackbar({ + message: `Successfully restored to version ${restoredData.version || 'previous'}. Refreshing...`, + options: { + key: new Date().getTime() + Math.random(), + variant: 'success' + } + }) + + // Give the user a moment to see the success message, then reload + setTimeout(() => { + window.location.reload() + }, 500) + } else { + // Fallback to refresh if no data returned + enqueueSnackbar({ + message: 'Successfully restored. Please refresh the page to see changes.', + options: { + key: new Date().getTime() + Math.random(), + variant: 'success', + action: (_key) => ( + + ) + } + }) + } + } catch (error) { + console.error('Error in onHistoryRestore:', error) + enqueueSnackbar({ + message: `Error during restore: ${error.message}`, + options: { + key: new Date().getTime() + Math.random(), + variant: 'error' + } + }) + } + } + const onConfirmSaveName = (flowName) => { setFlowDialogOpen(false) setSavePermission(isAgentCanvas ? 'agentflows:update' : 'chatflows:update') @@ -432,6 +499,28 @@ const CanvasHeader = ({ chatflow, isAgentCanvas, isAgentflowV2, handleSaveFlow, + {chatflow?.id && ( + + + + + + )} setChatflowConfigurationDialogOpen(false)} isAgentCanvas={isAgentCanvas} /> + setHistoryDialogOpen(false)} + onRestore={onHistoryRestore} + /> ) } @@ -507,6 +602,7 @@ CanvasHeader.propTypes = { handleSaveFlow: PropTypes.func, handleDeleteFlow: PropTypes.func, handleLoadFlow: PropTypes.func, + onFlowReload: PropTypes.func, isAgentCanvas: PropTypes.bool, isAgentflowV2: PropTypes.bool } diff --git a/packages/ui/src/views/canvas/index.jsx b/packages/ui/src/views/canvas/index.jsx index ebfbd0506fa..142dbbfb642 100644 --- a/packages/ui/src/views/canvas/index.jsx +++ b/packages/ui/src/views/canvas/index.jsx @@ -243,6 +243,45 @@ const Canvas = () => { } } + const handleFlowReload = (restoredChatflow) => { + // Directly update the canvas with the restored flow data + if (restoredChatflow && restoredChatflow.flowData) { + try { + const restoredFlow = JSON.parse(restoredChatflow.flowData) + + // Force React Flow to recognize the change by creating new array references + const newNodes = [...(restoredFlow.nodes || [])] + const newEdges = [...(restoredFlow.edges || [])] + + // Clear first, then set new data to force re-render + setNodes([]) + setEdges([]) + + // Use setTimeout to ensure React processes the clear operation first + setTimeout(() => { + setNodes(newNodes) + setEdges(newEdges) + setLasUpdatedDateTime(restoredChatflow.updatedDate) + dispatch({ type: SET_CHATFLOW, chatflow: restoredChatflow }) + dispatch({ type: REMOVE_DIRTY }) + + // Force React Flow to update its viewport after restore + if (reactFlowInstance) { + setTimeout(() => { + reactFlowInstance.fitView({ padding: 0.1 }) + }, 100) + } + }, 10) + } catch (error) { + console.error('Error parsing restored flow data:', error) + // Fallback to refetching from server + if (restoredChatflow.id) { + getSpecificChatflowApi.request(restoredChatflow.id) + } + } + } + } + // eslint-disable-next-line const onNodeClick = useCallback((event, clickedNode) => { setSelectedNode(clickedNode) @@ -575,6 +614,7 @@ const Canvas = () => { handleSaveFlow={handleSaveFlow} handleDeleteFlow={handleDeleteFlow} handleLoadFlow={handleLoadFlow} + onFlowReload={handleFlowReload} isAgentCanvas={isAgentCanvas} /> diff --git a/retriever-implementation-analysis.md b/retriever-implementation-analysis.md new file mode 100644 index 00000000000..72dbf2bf2b4 --- /dev/null +++ b/retriever-implementation-analysis.md @@ -0,0 +1,127 @@ +# EnsembleRetriever and BM25Retriever Implementation Analysis + +## Summary +Investigation into implementing EnsembleRetriever and BM25Retriever nodes for Flowise AgentFlow v2. + +## Current Status: ✅ Ready to Implement +- **EnsembleRetriever**: ❌ Not implemented in codebase +- **BM25Retriever**: ❌ Not implemented in codebase +- **LangChain JS Support**: ✅ Both fully supported + +## LangChain JS Documentation + +### EnsembleRetriever +- **Main Guide**: https://js.langchain.com/docs/how_to/ensemble_retriever/ +- **API Reference**: https://v03.api.js.langchain.com/classes/langchain.retrievers_ensemble.EnsembleRetriever.html +- **Import**: `import { EnsembleRetriever } from "langchain/retrievers/ensemble"` +- **Features**: Reciprocal Rank Fusion, weighted retriever combination, hybrid search + +### BM25Retriever +- **Main Guide**: https://js.langchain.com/docs/integrations/retrievers/bm25/ +- **Package**: `@langchain/community/retrievers/bm25` +- **Features**: Okapi BM25 algorithm, keyword-based ranking + +## Implementation Decision + +**Recommendation**: Create dedicated nodes (not CustomFunction) because: +1. **Type Safety**: Proper TypeScript interfaces and validation +2. **UI Integration**: Better visual flow builder integration +3. **Reusability**: Easy reuse across different flows +4. **Maintenance**: Easier to maintain and update +5. **LangChain Integration**: Direct ecosystem compatibility + +## Node Type: ChatFlow vs AgentFlow + +### ChatFlow Nodes (Standard Retrievers) +- **Category**: "Retrievers" +- **Pattern**: Standard LangChain component wrappers +- **Execution**: `init()` method for retriever creation +- **State**: Stateless, immediate response +- **Use Case**: Core retrieval components + +### AgentFlow Nodes (Flow Orchestration) +- **Category**: "Agent Flows" +- **Pattern**: State-aware with runtime context +- **Execution**: `run()` method with streaming +- **State**: Stateful with flow state management +- **Use Case**: Visual workflow orchestration + +**Decision**: Implement as **standard Retriever nodes** (ChatFlow style) since they're core retrieval components. + +## Implementation Plan + +### 1. BM25Retriever Node +**Location**: `packages/components/nodes/retrievers/BM25Retriever/BM25Retriever.ts` + +**Inputs**: +- Documents array or Document Store selection +- Top K results +- Query (optional, uses input if not specified) + +**Outputs**: +- Retriever object +- Document array +- Text concatenation + +### 2. EnsembleRetriever Node +**Location**: `packages/components/nodes/retrievers/EnsembleRetriever/EnsembleRetriever.ts` + +**Inputs**: +- Multiple BaseRetriever inputs +- Weights array (optional, defaults to equal) +- Top K results +- Query (optional) + +**Outputs**: +- Retriever object +- Document array +- Text concatenation + +## Existing Retriever Pattern Reference + +**Base Pattern** (from RRFRetriever): +```typescript +class RetrieverName_Retrievers implements INode { + label: string + name: string + category: string = 'Retrievers' + baseClasses: string[] = [this.type, 'BaseRetriever'] + + inputs: INodeParams[] = [ + // Configuration inputs + ] + + outputs: INodeOutputsValue[] = [ + { + label: 'Retriever', + name: 'retriever', + baseClasses: this.baseClasses + }, + { + label: 'Document', + name: 'document', + baseClasses: ['Document', 'json'] + }, + { + label: 'Text', + name: 'text', + baseClasses: ['string', 'json'] + } + ] + + async init(nodeData: INodeData, input: string): Promise { + // Implementation + const output = nodeData.outputs?.output as string + if (output === 'retriever') return retriever + else if (output === 'document') return await retriever.getRelevantDocuments(query) + else if (output === 'text') return concatenatedText + return retriever + } +} +``` + +## Next Steps +1. Implement BM25Retriever node following existing pattern +2. Implement EnsembleRetriever node following existing pattern +3. Add to component exports and build configuration +4. Test integration with existing flows \ No newline at end of file