diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 00000000000..e428b37c135 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,182 @@ +# Flowise Codebase Guide + +## Project Overview + +Flowise is a visual flow-based tool for building LLM-powered applications, agents, and chatbots. It uses a node-based interface similar to Node-RED but specialized for AI workflows. + +**Architecture**: TypeScript monorepo (PNPM + Turbo) with three main packages: +- `packages/components/` - Core node components and business logic +- `packages/server/` - Express backend with APIs and database +- `packages/ui/` - React frontend with visual flow builder + +## Key Development Patterns + +### 1. Node Development Pattern + +All nodes follow a standardized interface in `packages/components/src/Interface.ts`: + +```typescript +interface INode { + label: string // Display name + name: string // Unique identifier + type: string // Node category + version: number // Version for compatibility + category: string // UI grouping + baseClasses: string[] // Output types + inputs: INodeParams[] // Input configuration + outputs?: INodeOutputsValue[] + init?(nodeData: INodeData): Promise + run?(nodeData: INodeData): Promise +} +``` + +**Node Categories**: +- Agents, ChatModels, Tools, VectorStores, DocumentLoaders, Memory, Chains, AgentFlow + +### 2. Streaming Architecture + +Real-time updates use `IServerSideEventStreamer` interface with events like: +- `streamTokenEvent()` - Token-by-token LLM responses +- `streamToolEvent()` - Tool execution notifications +- `streamAgentReasoningEvent()` - Agent thought processes +- `streamCustomEvent()` - Custom event types + +**Pattern**: Agents and chains receive `sseStreamer` via `options.sseStreamer` and call appropriate stream methods. + +### 3. Tool Integration Pattern + +Tools can be: +- **Custom Functions**: User-defined JavaScript with Zod schema validation +- **API Tools**: HTTP request wrappers (GET, POST, etc.) +- **Chain Tools**: Embedded chatflows as tools +- **MCP Tools**: Model Context Protocol integrations +- **Service Tools**: Platform integrations (Google, GitHub, Slack) + +Tools receive runtime context via `RunnableConfig` parameter containing `chatId`, `sseStreamer`, etc. + +### 4. Agent Patterns + +**Traditional Agents**: Single LLM with tools (ReAct, Conversational) +**Multi-Agent**: Supervisor-worker coordination patterns +**AgentFlow v2**: Visual node-based agent orchestration with: +- Conditional branching +- Loops and iteration +- State management +- Human-in-the-loop + +### 5. Database and API Patterns + +**TypeORM** entities in `/server/src/database/entities/` +**RESTful APIs** with Express middleware for auth, rate limiting +**Queue system** using BullMQ for background processing +**WebSocket/SSE** for real-time communication + +## File Organization + +``` +packages/ +├── components/ +│ ├── nodes/ # All node implementations +│ │ ├── agents/ # Agent nodes +│ │ ├── tools/ # Tool integrations (including MCP) +│ │ ├── chatmodels/ # LLM integrations +│ │ └── chains/ # LangChain chains +│ └── src/ +│ ├── Interface.ts # Core interfaces +│ └── handler.ts # Execution handlers +├── server/ +│ ├── src/controllers/ # API endpoints +│ ├── src/utils/ # SSEStreamer, buildChatflow +│ └── src/database/ # Database entities and migrations +└── ui/ + ├── src/views/ # Page components + ├── src/api/ # API client functions + └── src/store/ # Redux store +``` + +## Development Guidelines + +### Adding New Nodes +1. Create node file in appropriate `/nodes/` category +2. Implement `INode` interface with `init()` and/or `run()` methods +3. Export in `/components/src/index.ts` +4. Add to category mapping in `/server/src/utils/buildChatflow.ts` + +### Adding Streaming Support +1. Extract `sseStreamer` from `options.sseStreamer` +2. Extract `chatId` from `options.chatId` or config +3. Call appropriate `stream*Event()` methods during execution +4. Use `streamCustomEvent()` for custom notification types + +### Working with Tools +1. Tools are LangChain-compatible functions created with `tool()` helper +2. Use Zod schemas for input validation +3. Access runtime context via second `config` parameter +4. Return string responses that get passed back through agent layer + +### MCP (Model Context Protocol) Pattern +- MCP tools in `/nodes/tools/MCP/` create LangChain tool wrappers +- Each tool call creates new MCP client connection +- Notifications received via client event handlers +- Currently notifications only logged to console (opportunity for enhancement) + +## Common Patterns + +### Error Handling +- Use try/catch blocks in async operations +- Return descriptive error messages as strings +- Log errors for debugging but don't expose sensitive data + +### Configuration Management +- Use environment variables for external service configs +- Store sensitive data in credentials system +- Support both local and external credential storage + +### Testing Strategy +- Component testing for individual nodes +- Integration testing for complete flows +- Use test databases for server tests +- Mock external APIs in tests + +## Key Integration Points + +### UI to Server Communication +- REST APIs for CRUD operations +- WebSocket/SSE for real-time streaming +- File uploads for document processing + +### Server to Components +- Node execution via `run()` methods +- Streaming via `IServerSideEventStreamer` +- State management through memory systems + +### External Integrations +- LangChain ecosystem compatibility +- OpenAI/Anthropic/Google API integrations +- Vector database connections +- Third-party service APIs (GitHub, Slack, etc.) + +## Current Development Focus Areas + +1. **AgentFlow v2**: Enhanced visual agent orchestration +2. **Multi-agent systems**: Coordination and communication patterns +3. **Real-time streaming**: Enhanced user feedback and monitoring +4. **MCP integrations**: Model Context Protocol tool ecosystem +5. **Enterprise features**: Workspaces, analytics, usage tracking + +## Troubleshooting Common Issues + +### Streaming Not Working +- Check if `sseStreamer` is properly extracted from options +- Verify `chatId` is available in context +- Ensure streaming is enabled in flow configuration + +### Tool Execution Failures +- Validate input schemas with Zod +- Check credential configuration and permissions +- Review tool return value format (should be string) + +### Node Not Appearing in UI +- Verify node is exported in `/components/src/index.ts` +- Check category mapping in buildChatflow.ts +- Ensure node follows `INode` interface correctly \ No newline at end of file diff --git a/packages/components/nodes/agentflow/Agent/Agent.ts b/packages/components/nodes/agentflow/Agent/Agent.ts index bdd6eae048f..a50298c73aa 100644 --- a/packages/components/nodes/agentflow/Agent/Agent.ts +++ b/packages/components/nodes/agentflow/Agent/Agent.ts @@ -1411,14 +1411,6 @@ class Agent_Agentflow implements INode { let isToolRequireHumanInput = (selectedTool as any).requiresHumanInput && (!iterationContext || Object.keys(iterationContext).length === 0) - const flowConfig = { - chatflowId: options.chatflowid, - sessionId: options.sessionId, - chatId: options.chatId, - input: input, - state: options.agentflowRuntime?.state - } - if (isToolRequireHumanInput) { const toolCallDetails = '```json\n' + JSON.stringify(toolCall, null, 2) + '\n```' const responseContent = response.content + `\nAttempting to use tool:\n${toolCallDetails}` @@ -1434,7 +1426,17 @@ class Agent_Agentflow implements INode { try { //@ts-ignore - let toolOutput = await selectedTool.call(toolCall.args, { signal: abortController?.signal }, undefined, flowConfig) + let toolOutput = await selectedTool.invoke(toolCall.args, { + signal: abortController?.signal, + configurable: { + sessionId: options.sessionId, + chatId: chatId, + input: input, + state: options.agentflowRuntime?.state, + sseStreamer: sseStreamer, + flowise_chatId: chatId + } + }) if (options.analyticHandlers && toolIds) { await options.analyticHandlers.onToolEnd(toolIds, toolOutput) @@ -1679,14 +1681,6 @@ class Agent_Agentflow implements INode { let parsedDocs let parsedArtifacts - const flowConfig = { - chatflowId: options.chatflowid, - sessionId: options.sessionId, - chatId: options.chatId, - input: input, - state: options.agentflowRuntime?.state - } - if (humanInput.type === 'reject') { messages.pop() const toBeRemovedTool = toolsInstance.find((tool) => tool.name === toolCall.name) @@ -1706,7 +1700,17 @@ class Agent_Agentflow implements INode { try { //@ts-ignore - let toolOutput = await selectedTool.call(toolCall.args, { signal: abortController?.signal }, undefined, flowConfig) + let toolOutput = await selectedTool.invoke(toolCall.args, { + signal: abortController?.signal, + configurable: { + sessionId: options.sessionId, + chatId: chatId, + input: input, + state: options.agentflowRuntime?.state, + sseStreamer: sseStreamer, + flowise_chatId: chatId + } + }) if (options.analyticHandlers && toolIds) { await options.analyticHandlers.onToolEnd(toolIds, toolOutput) diff --git a/packages/components/nodes/agents/ToolAgent/ToolAgent.ts b/packages/components/nodes/agents/ToolAgent/ToolAgent.ts index 5244f76e5ac..77c408ce975 100644 --- a/packages/components/nodes/agents/ToolAgent/ToolAgent.ts +++ b/packages/components/nodes/agents/ToolAgent/ToolAgent.ts @@ -116,7 +116,12 @@ class ToolAgent_Agents implements INode { } async init(nodeData: INodeData, input: string, options: ICommonObject): Promise { - return prepareAgent(nodeData, options, { sessionId: this.sessionId, chatId: options.chatId, input }) + return prepareAgent(nodeData, options, { + sessionId: this.sessionId, + chatId: options.chatId, + input, + sseStreamer: options.sseStreamer + }) } async run(nodeData: INodeData, input: string, options: ICommonObject): Promise { @@ -141,7 +146,12 @@ class ToolAgent_Agents implements INode { } } - const executor = await prepareAgent(nodeData, options, { sessionId: this.sessionId, chatId: options.chatId, input }) + const executor = await prepareAgent(nodeData, options, { + sessionId: this.sessionId, + chatId: options.chatId, + input, + sseStreamer: options.sseStreamer + }) const loggerHandler = new ConsoleCallbackHandler(options.logger, options?.orgId) const callbacks = await additionalCallbacks(nodeData, options) @@ -268,7 +278,7 @@ class ToolAgent_Agents implements INode { const prepareAgent = async ( nodeData: INodeData, options: ICommonObject, - flowObj: { sessionId?: string; chatId?: string; input?: string } + flowObj: { sessionId?: string; chatId?: string; input?: string; sseStreamer?: any } ) => { const model = nodeData.inputs?.model as BaseChatModel const maxIterations = nodeData.inputs?.maxIterations as string @@ -370,6 +380,7 @@ const prepareAgent = async ( sessionId: flowObj?.sessionId, chatId: flowObj?.chatId, input: flowObj?.input, + sseStreamer: flowObj?.sseStreamer, verbose: process.env.DEBUG === 'true' ? true : false, maxIterations: maxIterations ? parseFloat(maxIterations) : undefined }) diff --git a/packages/components/nodes/tools/MCP/config.ts b/packages/components/nodes/tools/MCP/config.ts new file mode 100644 index 00000000000..567e2404a1f --- /dev/null +++ b/packages/components/nodes/tools/MCP/config.ts @@ -0,0 +1,6 @@ +export const MCP_STREAMING_CONFIG = { + DEFAULT_COMPLETION_TIMEOUT: 600000, // 10 minutes fallback - only as safety net + NOTIFICATION_DELAY: 1000, // 1 second delay before cleanup + SUPPORTED_NOTIFICATION_TYPES: ['logging/message', 'progress'], + STREAMING_MARKER: '[MCP Streaming]' +} diff --git a/packages/components/nodes/tools/MCP/core.ts b/packages/components/nodes/tools/MCP/core.ts index b2c9e63f636..570ce27487d 100644 --- a/packages/components/nodes/tools/MCP/core.ts +++ b/packages/components/nodes/tools/MCP/core.ts @@ -1,10 +1,17 @@ -import { CallToolRequest, CallToolResultSchema, ListToolsResult, ListToolsResultSchema } from '@modelcontextprotocol/sdk/types.js' +import { + CallToolRequest, + CallToolResultSchema, + ListToolsResult, + ListToolsResultSchema, + LoggingMessageNotificationSchema +} from '@modelcontextprotocol/sdk/types.js' import { Client } from '@modelcontextprotocol/sdk/client/index.js' import { StdioClientTransport, StdioServerParameters } from '@modelcontextprotocol/sdk/client/stdio.js' import { BaseToolkit, tool, Tool } from '@langchain/core/tools' import { z } from 'zod' import { StreamableHTTPClientTransport } from '@modelcontextprotocol/sdk/client/streamableHttp.js' import { SSEClientTransport } from '@modelcontextprotocol/sdk/client/sse.js' +import { MCP_STREAMING_CONFIG } from './config.js' export class MCPToolkit extends BaseToolkit { tools: Tool[] = [] @@ -20,8 +27,8 @@ export class MCPToolkit extends BaseToolkit { this.transportType = transportType } - // Method to create a new client with transport - async createClient(): Promise { + // Method to create a new client with transport and detect streaming capabilities + async createClient(): Promise<{ client: Client; hasStreaming: boolean }> { const client = new Client( { name: 'flowise-client', @@ -80,12 +87,25 @@ export class MCPToolkit extends BaseToolkit { } } - return client + // Check server capabilities for streaming support + let hasStreaming = false + try { + const capabilities = client.getServerCapabilities() + // Check for streaming capability in experimental or notifications section + hasStreaming = + (capabilities as any)?.notifications?.streaming === true || + (capabilities as any)?.experimental?.notifications?.streaming === true + } catch (error) { + console.error(`⚠️ [MCP Core] Could not detect streaming capabilities, falling back to non-streaming:`, error.message) + } + + return { client, hasStreaming } } async initialize() { if (this._tools === null) { - this.client = await this.createClient() + const { client } = await this.createClient() + this.client = client this._tools = await this.client.request({ method: 'tools/list' }, ListToolsResultSchema) @@ -104,11 +124,13 @@ export class MCPToolkit extends BaseToolkit { if (this.client === null) { throw new Error('Client is not initialized') } + return await MCPTool({ toolkit: this, name: tool.name, description: tool.description || '', - argsSchema: createSchemaModel(tool.inputSchema) + argsSchema: createSchemaModel(tool.inputSchema), + annotations: tool.annotations || {} }) }) const res = await Promise.allSettled(toolsPromises) @@ -125,37 +147,189 @@ export async function MCPTool({ toolkit, name, description, - argsSchema + argsSchema, + annotations = {} }: { toolkit: MCPToolkit name: string description: string argsSchema: any + annotations?: any }): Promise { - return tool( - async (input): Promise => { - // Create a new client for this request - const client = await toolkit.createClient() + const { client, hasStreaming } = await toolkit.createClient() + await client.close() - try { - const req: CallToolRequest = { method: 'tools/call', params: { name: name, arguments: input as any } } - const res = await client.request(req, CallToolResultSchema) - const content = res.content - const contentString = JSON.stringify(content) - return contentString - } finally { - // Always close the client after the request completes - await client.close() - } + const toolHasStreaming = annotations.streaming_enabled === true + const shouldUseStreaming = hasStreaming && toolHasStreaming + + return tool( + async (input, config): Promise => { + return await executeMCPTool(toolkit, name, input, config, annotations) }, { name: name, - description: description, + description: shouldUseStreaming ? `${description} ${MCP_STREAMING_CONFIG.STREAMING_MARKER}` : description, schema: argsSchema } ) } +async function executeMCPTool(toolkit: MCPToolkit, name: string, input: any, config: any, annotations: any = {}): Promise { + const { chatId, sseStreamer } = extractConfig(config, input) + const { client, hasStreaming } = await toolkit.createClient() + const notifications: string[] = [] + + // Only use streaming if both server and tool support it + const toolHasStreaming = annotations.streaming_enabled === true + const shouldUseStreaming = hasStreaming && toolHasStreaming + + try { + setupStreamingIfSupported(shouldUseStreaming, sseStreamer, chatId, name) + setupNotificationHandlers(client, sseStreamer, chatId, name, shouldUseStreaming, notifications, annotations) + + const toolResponse = await callMCPTool(client, name, input) + + return await handleToolResponse(toolResponse, shouldUseStreaming, sseStreamer, chatId, name, notifications) + } finally { + if (!shouldUseStreaming) { + await client.close() + } + } +} + +function extractConfig(config: any, input?: any): { chatId: string; sseStreamer: any } { + const configChatId = config?.configurable?.flowise_chatId + const configSseStreamer = config?.configurable?.sseStreamer + const inputChatId = input?.flowise_chatId + + return { + chatId: configChatId || inputChatId, + sseStreamer: configSseStreamer + } +} + +function setupStreamingIfSupported(hasStreaming: boolean, sseStreamer: any, chatId: string, name: string): void { + if (hasStreaming && sseStreamer && chatId) { + sseStreamer.addMcpConnection(chatId, name) + } +} + +async function callMCPTool(client: Client, name: string, input: any): Promise { + const progressToken = `${name}_${Date.now()}_${Math.random().toString(36).substr(2, 9)}` + const req: CallToolRequest = { + method: 'tools/call', + params: { + name: name, + arguments: input as any, + _meta: { progressToken } + } + } + + const res = await client.request(req, CallToolResultSchema) + return JSON.stringify(res.content) +} + +async function handleToolResponse( + contentString: string, + hasStreaming: boolean, + sseStreamer: any, + chatId: string, + name: string, + notifications: string[] +): Promise { + // Non-streaming tools return immediately + if (!hasStreaming || !sseStreamer || !chatId) { + if (sseStreamer && chatId) { + sseStreamer.removeMcpConnection(chatId, name) + } + return contentString + } + + // Streaming tools wait for completion + return waitForStreamingCompletion(contentString, sseStreamer, chatId, name, notifications) +} + +function waitForStreamingCompletion( + contentString: string, + sseStreamer: any, + chatId: string, + name: string, + notifications: string[] +): Promise { + return new Promise((resolve) => { + let completed = false + + const completeExecution = (_reason: string) => { + if (completed) return + completed = true + + const fullResponse = buildFullResponse(contentString, notifications) + resolve(fullResponse) + } + + // Poll for completion + const pollInterval = setInterval(() => { + if (!sseStreamer.hasMcpConnections(chatId)) { + clearInterval(pollInterval) + completeExecution('✅') + } + }, 500) + + // Fallback timeout + setTimeout(() => { + clearInterval(pollInterval) + sseStreamer.removeMcpConnection(chatId, name) + completeExecution('⏰') + }, MCP_STREAMING_CONFIG.DEFAULT_COMPLETION_TIMEOUT) + }) +} + +function buildFullResponse(contentString: string, notifications: string[]): string { + return notifications.length > 0 ? `${contentString}\n\n--- Execution Log ---\n${notifications.join('\n')}` : contentString +} + +function setupNotificationHandlers( + client: Client, + sseStreamer: any, + chatId: string, + toolName: string, + shouldUseStreaming: boolean, + notifications?: string[], + annotations: any = {} +) { + if (!shouldUseStreaming || !sseStreamer || !chatId) { + return + } + + // Get completion signals from annotations, fallback to default + const completionSignals = annotations.notification_types || ['task_completion'] + + client.setNotificationHandler(LoggingMessageNotificationSchema, (notification) => { + const message = String(notification.params.data) + + // Stream to UI + sseStreamer.streamTokenEvent(chatId, `\n🔔 ${toolName}: ${message}\n`) + + // Collect for final response + if (notifications) { + notifications.push(message) + } + + const { logger } = notification.params + + // Detect completion based on tool's annotation signals + if (completionSignals.includes(logger)) { + // Add visual separation before LLM response + sseStreamer.streamTokenEvent(chatId, '\n\n') + + // Trigger cleanup after brief delay + setTimeout(() => { + sseStreamer.removeMcpConnection(chatId, toolName) + }, MCP_STREAMING_CONFIG.NOTIFICATION_DELAY) + } + }) +} + function createSchemaModel( inputSchema: { type: 'object' @@ -171,6 +345,9 @@ function createSchemaModel( return acc }, {} as Record) + // Add Flowise context fields to allow them through schema validation + schemaProperties['flowise_chatId'] = z.string().optional() + return z.object(schemaProperties) } diff --git a/packages/components/src/Interface.ts b/packages/components/src/Interface.ts index 5e2ee383c07..952fe33e0a7 100644 --- a/packages/components/src/Interface.ts +++ b/packages/components/src/Interface.ts @@ -441,6 +441,11 @@ export interface IServerSideEventStreamer { streamAbortEvent(chatId: string): void streamEndEvent(chatId: string): void streamUsageMetadataEvent(chatId: string, data: any): void + // Enhanced MCP streaming methods + addMcpConnection?(chatId: string, toolName?: string): void + removeMcpConnection?(chatId: string, toolName?: string): void + markMcpConnectionCompleting?(chatId: string, toolName?: string): void + hasMcpConnections?(chatId: string): boolean } export enum FollowUpPromptProvider { diff --git a/packages/components/src/agents.ts b/packages/components/src/agents.ts index 022a5be09f6..34d69a4445f 100644 --- a/packages/components/src/agents.ts +++ b/packages/components/src/agents.ts @@ -268,6 +268,8 @@ export class AgentExecutor extends BaseChain { input?: string + sseStreamer?: any + isXML?: boolean /** @@ -292,7 +294,7 @@ export class AgentExecutor extends BaseChain { return this.agent.returnValues } - constructor(input: AgentExecutorInput & { sessionId?: string; chatId?: string; input?: string; isXML?: boolean }) { + constructor(input: AgentExecutorInput & { sessionId?: string; chatId?: string; input?: string; sseStreamer?: any; isXML?: boolean }) { let agent: BaseSingleActionAgent | BaseMultiActionAgent if (Runnable.isRunnable(input.agent)) { agent = new RunnableAgent({ runnable: input.agent }) @@ -320,16 +322,18 @@ export class AgentExecutor extends BaseChain { this.sessionId = input.sessionId this.chatId = input.chatId this.input = input.input + this.sseStreamer = input.sseStreamer this.isXML = input.isXML } static fromAgentAndTools( - fields: AgentExecutorInput & { sessionId?: string; chatId?: string; input?: string; isXML?: boolean } + fields: AgentExecutorInput & { sessionId?: string; chatId?: string; input?: string; sseStreamer?: any; isXML?: boolean } ): AgentExecutor { const newInstance = new AgentExecutor(fields) if (fields.sessionId) newInstance.sessionId = fields.sessionId if (fields.chatId) newInstance.chatId = fields.chatId if (fields.input) newInstance.input = fields.input + if (fields.sseStreamer) newInstance.sseStreamer = fields.sseStreamer if (fields.isXML) newInstance.isXML = fields.isXML return newInstance } @@ -427,17 +431,23 @@ export class AgentExecutor extends BaseChain { * - flowConfig?: { sessionId?: string, chatId?: string, input?: string } */ if (tool) { - observation = await (tool as any).call( - this.isXML && typeof action.toolInput === 'string' ? { input: action.toolInput } : action.toolInput, - runManager?.getChild(), - undefined, + observation = await tool.invoke( + this.isXML && typeof action.toolInput === 'string' + ? { input: action.toolInput } + : (action.toolInput as any), { - sessionId: this.sessionId, - chatId: this.chatId, - input: this.input, - state: inputs + callbacks: runManager?.getChild(), + configurable: { + sessionId: this.sessionId, + chatId: this.chatId, + input: this.input, + state: inputs, + sseStreamer: this.sseStreamer, + flowise_chatId: this.chatId + } } ) + let toolOutput = observation if (typeof toolOutput === 'string' && toolOutput.includes(SOURCE_DOCUMENTS_PREFIX)) { toolOutput = toolOutput.split(SOURCE_DOCUMENTS_PREFIX)[0] @@ -607,14 +617,20 @@ export class AgentExecutor extends BaseChain { * - flowConfig?: { sessionId?: string, chatId?: string, input?: string } */ observation = await (tool as any).call( - this.isXML && typeof agentAction.toolInput === 'string' ? { input: agentAction.toolInput } : agentAction.toolInput, + { + ...(this.isXML && typeof agentAction.toolInput === 'string' + ? { input: agentAction.toolInput } + : (agentAction.toolInput as any)), + flowise_chatId: this.chatId + }, runManager?.getChild(), undefined, { sessionId: this.sessionId, chatId: this.chatId, input: this.input, - state: inputs + state: inputs, + sseStreamer: this.sseStreamer } ) if (typeof observation === 'string' && observation.includes(SOURCE_DOCUMENTS_PREFIX)) { diff --git a/packages/server/src/Interface.ts b/packages/server/src/Interface.ts index 97f66d10920..15eda482a07 100644 --- a/packages/server/src/Interface.ts +++ b/packages/server/src/Interface.ts @@ -70,6 +70,7 @@ export interface IChatFlow { category?: string type?: ChatflowType workspaceId?: string + currentHistoryVersion?: number } export interface IChatMessage { @@ -125,6 +126,7 @@ export interface IAssistant { updatedDate: Date createdDate: Date workspaceId?: string + currentHistoryVersion?: number } export interface ICredential { @@ -191,6 +193,17 @@ export interface IVariableDict { [key: string]: string } +export interface IFlowHistory { + id: string + entityType: 'CHATFLOW' | 'ASSISTANT' + entityId: string + snapshotData: string + changeDescription?: string + version: number + createdDate: Date + workspaceId?: string +} + export interface INodeDependencies { [key: string]: number } diff --git a/packages/server/src/controllers/history/index.ts b/packages/server/src/controllers/history/index.ts new file mode 100644 index 00000000000..7104730509e --- /dev/null +++ b/packages/server/src/controllers/history/index.ts @@ -0,0 +1,110 @@ +import { NextFunction, Request, Response } from 'express' +import { StatusCodes } from 'http-status-codes' +import { EntityType } from '../../database/entities/FlowHistory' +import { InternalFlowiseError } from '../../errors/internalFlowiseError' +import historyService from '../../services/history' + +const validateEntityType = (entityType: string): EntityType => { + const upperType = entityType.toUpperCase() + if (!['CHATFLOW', 'ASSISTANT'].includes(upperType)) { + throw new InternalFlowiseError(StatusCodes.BAD_REQUEST, 'entityType must be either CHATFLOW or ASSISTANT') + } + return upperType as EntityType +} + +const getHistory = async (req: Request, res: Response, next: NextFunction) => { + try { + const { entityType, entityId } = req.params + const { page = '1', limit = '20' } = req.query + + if (!entityType || !entityId) { + throw new InternalFlowiseError(StatusCodes.BAD_REQUEST, 'entityType and entityId are required parameters') + } + + const pageNum = Number(page) + const limitNum = Number(limit) + + const result = await historyService.getHistory({ + entityType: validateEntityType(entityType), + entityId, + workspaceId: req.user?.activeWorkspaceId, + limit: limitNum, + offset: (pageNum - 1) * limitNum + }) + + return res.json(result) + } catch (error) { + next(error) + } +} + +const getSnapshotById = async (req: Request, res: Response, next: NextFunction) => { + try { + const { historyId } = req.params + if (!historyId) { + throw new InternalFlowiseError(StatusCodes.BAD_REQUEST, 'historyId is required') + } + + const snapshot = await historyService.getSnapshotById(historyId) + return res.json(snapshot) + } catch (error) { + next(error) + } +} + +const restoreSnapshot = async (req: Request, res: Response, next: NextFunction) => { + try { + const { historyId } = req.params + if (!historyId) { + throw new InternalFlowiseError(StatusCodes.BAD_REQUEST, 'historyId is required') + } + + const restoredEntity = await historyService.restoreSnapshot({ + historyId, + workspaceId: req.user?.activeWorkspaceId + }) + + return res.json({ + message: 'Successfully restored from history snapshot', + entity: restoredEntity + }) + } catch (error) { + next(error) + } +} + +const deleteSnapshot = async (req: Request, res: Response, next: NextFunction) => { + try { + const { historyId } = req.params + if (!historyId) { + throw new InternalFlowiseError(StatusCodes.BAD_REQUEST, 'historyId is required') + } + + await historyService.deleteSnapshot(historyId, req.user?.activeWorkspaceId) + return res.json({ message: 'History snapshot deleted successfully' }) + } catch (error) { + next(error) + } +} + +const getSnapshotComparison = async (req: Request, res: Response, next: NextFunction) => { + try { + const { historyId1, historyId2 } = req.params + if (!historyId1 || !historyId2) { + throw new InternalFlowiseError(StatusCodes.BAD_REQUEST, 'Both historyId1 and historyId2 are required') + } + + const comparison = await historyService.getSnapshotComparison(historyId1, historyId2, req.user?.activeWorkspaceId) + return res.json(comparison) + } catch (error) { + next(error) + } +} + +export default { + getHistory, + getSnapshotById, + restoreSnapshot, + deleteSnapshot, + getSnapshotComparison +} diff --git a/packages/server/src/database/entities/Assistant.ts b/packages/server/src/database/entities/Assistant.ts index 28843213928..3b62cf7ff7a 100644 --- a/packages/server/src/database/entities/Assistant.ts +++ b/packages/server/src/database/entities/Assistant.ts @@ -29,4 +29,7 @@ export class Assistant implements IAssistant { @Column({ nullable: true, type: 'text' }) workspaceId?: string + + @Column({ nullable: true, type: 'int' }) + currentHistoryVersion?: number } diff --git a/packages/server/src/database/entities/ChatFlow.ts b/packages/server/src/database/entities/ChatFlow.ts index 4c14e99c1c4..bce0f1d8f69 100644 --- a/packages/server/src/database/entities/ChatFlow.ts +++ b/packages/server/src/database/entities/ChatFlow.ts @@ -53,4 +53,7 @@ export class ChatFlow implements IChatFlow { @Column({ nullable: true, type: 'text' }) workspaceId?: string + + @Column({ nullable: true, type: 'int' }) + currentHistoryVersion?: number } diff --git a/packages/server/src/database/entities/FlowHistory.ts b/packages/server/src/database/entities/FlowHistory.ts new file mode 100644 index 00000000000..18cfe03bf54 --- /dev/null +++ b/packages/server/src/database/entities/FlowHistory.ts @@ -0,0 +1,45 @@ +/* eslint-disable */ +import { Entity, Column, CreateDateColumn, PrimaryGeneratedColumn, Index } from 'typeorm' + +export type EntityType = 'CHATFLOW' | 'ASSISTANT' + +export interface IFlowHistory { + id: string + entityType: EntityType + entityId: string + snapshotData: string + changeDescription?: string + version: number + createdDate: Date + workspaceId?: string +} + +@Entity() +@Index(['entityType', 'entityId', 'version']) +@Index(['entityType', 'entityId', 'createdDate']) +export class FlowHistory implements IFlowHistory { + @PrimaryGeneratedColumn('uuid') + id: string + + @Column({ type: 'varchar', length: 20 }) + entityType: EntityType + + @Column({ type: 'uuid' }) + entityId: string + + @Column({ type: 'text' }) + snapshotData: string + + @Column({ nullable: true, type: 'text' }) + changeDescription?: string + + @Column({ type: 'int' }) + version: number + + @Column({ type: 'timestamp' }) + @CreateDateColumn() + createdDate: Date + + @Column({ nullable: true, type: 'text' }) + workspaceId?: string +} \ No newline at end of file diff --git a/packages/server/src/database/entities/index.ts b/packages/server/src/database/entities/index.ts index b65ea28b58a..85c0deb2e0f 100644 --- a/packages/server/src/database/entities/index.ts +++ b/packages/server/src/database/entities/index.ts @@ -17,6 +17,7 @@ import { Evaluator } from './Evaluator' import { ApiKey } from './ApiKey' import { CustomTemplate } from './CustomTemplate' import { Execution } from './Execution' +import { FlowHistory } from './FlowHistory' import { LoginActivity, WorkspaceShared, WorkspaceUsers } from '../../enterprise/database/entities/EnterpriseEntities' import { User } from '../../enterprise/database/entities/user.entity' import { Organization } from '../../enterprise/database/entities/organization.entity' @@ -50,6 +51,7 @@ export const entities = { WorkspaceShared, CustomTemplate, Execution, + FlowHistory, Organization, Role, OrganizationUser, diff --git a/packages/server/src/database/migrations/mariadb/1750000000000-AddFlowHistoryEntity.ts b/packages/server/src/database/migrations/mariadb/1750000000000-AddFlowHistoryEntity.ts new file mode 100644 index 00000000000..e21e6f9dd38 --- /dev/null +++ b/packages/server/src/database/migrations/mariadb/1750000000000-AddFlowHistoryEntity.ts @@ -0,0 +1,25 @@ +import { MigrationInterface, QueryRunner } from 'typeorm' + +export class AddFlowHistoryEntity1750000000000 implements MigrationInterface { + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query( + `CREATE TABLE IF NOT EXISTS \`flow_history\` ( + \`id\` varchar(36) NOT NULL, + \`entityType\` varchar(20) NOT NULL, + \`entityId\` varchar(36) NOT NULL, + \`snapshotData\` longtext NOT NULL, + \`changeDescription\` text DEFAULT NULL, + \`version\` int NOT NULL, + \`createdDate\` datetime(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6), + \`workspaceId\` text DEFAULT NULL, + PRIMARY KEY (\`id\`), + INDEX \`IDX_flow_history_entity_version\` (\`entityType\`, \`entityId\`, \`version\`), + INDEX \`IDX_flow_history_entity_date\` (\`entityType\`, \`entityId\`, \`createdDate\`) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;` + ) + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query(`DROP TABLE flow_history`) + } +} diff --git a/packages/server/src/database/migrations/mariadb/1750000000001-AddCurrentHistoryVersion.ts b/packages/server/src/database/migrations/mariadb/1750000000001-AddCurrentHistoryVersion.ts new file mode 100644 index 00000000000..cb496ea0998 --- /dev/null +++ b/packages/server/src/database/migrations/mariadb/1750000000001-AddCurrentHistoryVersion.ts @@ -0,0 +1,11 @@ +import { MigrationInterface, QueryRunner } from 'typeorm' + +export class AddCurrentHistoryVersion1750000000001 implements MigrationInterface { + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query(`ALTER TABLE \`chat_flow\` ADD \`currentHistoryVersion\` int NULL`) + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query(`ALTER TABLE \`chat_flow\` DROP COLUMN \`currentHistoryVersion\``) + } +} diff --git a/packages/server/src/database/migrations/mariadb/1750000000002-AddAssistantHistoryVersion.ts b/packages/server/src/database/migrations/mariadb/1750000000002-AddAssistantHistoryVersion.ts new file mode 100644 index 00000000000..4a88986065c --- /dev/null +++ b/packages/server/src/database/migrations/mariadb/1750000000002-AddAssistantHistoryVersion.ts @@ -0,0 +1,11 @@ +import { MigrationInterface, QueryRunner } from 'typeorm' + +export class AddAssistantHistoryVersion1750000000002 implements MigrationInterface { + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query(`ALTER TABLE \`assistant\` ADD \`currentHistoryVersion\` int NULL`) + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query(`ALTER TABLE \`assistant\` DROP COLUMN \`currentHistoryVersion\``) + } +} diff --git a/packages/server/src/database/migrations/mariadb/index.ts b/packages/server/src/database/migrations/mariadb/index.ts index 272a6bb1ff2..fcdebb2d876 100644 --- a/packages/server/src/database/migrations/mariadb/index.ts +++ b/packages/server/src/database/migrations/mariadb/index.ts @@ -36,6 +36,9 @@ import { AddExecutionEntity1738090872625 } from './1738090872625-AddExecutionEnt import { FixOpenSourceAssistantTable1743758056188 } from './1743758056188-FixOpenSourceAssistantTable' import { AddErrorToEvaluationRun1744964560174 } from './1744964560174-AddErrorToEvaluationRun' import { ModifyExecutionDataColumnType1747902489801 } from './1747902489801-ModifyExecutionDataColumnType' +import { AddFlowHistoryEntity1750000000000 } from './1750000000000-AddFlowHistoryEntity' +import { AddCurrentHistoryVersion1750000000001 } from './1750000000001-AddCurrentHistoryVersion' +import { AddAssistantHistoryVersion1750000000002 } from './1750000000002-AddAssistantHistoryVersion' import { AddAuthTables1720230151482 } from '../../../enterprise/database/migrations/mariadb/1720230151482-AddAuthTables' import { AddWorkspace1725437498242 } from '../../../enterprise/database/migrations/mariadb/1725437498242-AddWorkspace' @@ -98,5 +101,8 @@ export const mariadbMigrations = [ FixOpenSourceAssistantTable1743758056188, AddErrorToEvaluationRun1744964560174, ExecutionLinkWorkspaceId1746862866554, - ModifyExecutionDataColumnType1747902489801 + ModifyExecutionDataColumnType1747902489801, + AddFlowHistoryEntity1750000000000, + AddCurrentHistoryVersion1750000000001, + AddAssistantHistoryVersion1750000000002 ] diff --git a/packages/server/src/database/migrations/mysql/1750000000000-AddFlowHistoryEntity.ts b/packages/server/src/database/migrations/mysql/1750000000000-AddFlowHistoryEntity.ts new file mode 100644 index 00000000000..e21e6f9dd38 --- /dev/null +++ b/packages/server/src/database/migrations/mysql/1750000000000-AddFlowHistoryEntity.ts @@ -0,0 +1,25 @@ +import { MigrationInterface, QueryRunner } from 'typeorm' + +export class AddFlowHistoryEntity1750000000000 implements MigrationInterface { + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query( + `CREATE TABLE IF NOT EXISTS \`flow_history\` ( + \`id\` varchar(36) NOT NULL, + \`entityType\` varchar(20) NOT NULL, + \`entityId\` varchar(36) NOT NULL, + \`snapshotData\` longtext NOT NULL, + \`changeDescription\` text DEFAULT NULL, + \`version\` int NOT NULL, + \`createdDate\` datetime(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6), + \`workspaceId\` text DEFAULT NULL, + PRIMARY KEY (\`id\`), + INDEX \`IDX_flow_history_entity_version\` (\`entityType\`, \`entityId\`, \`version\`), + INDEX \`IDX_flow_history_entity_date\` (\`entityType\`, \`entityId\`, \`createdDate\`) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;` + ) + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query(`DROP TABLE flow_history`) + } +} diff --git a/packages/server/src/database/migrations/mysql/1750000000001-AddCurrentHistoryVersion.ts b/packages/server/src/database/migrations/mysql/1750000000001-AddCurrentHistoryVersion.ts new file mode 100644 index 00000000000..cb496ea0998 --- /dev/null +++ b/packages/server/src/database/migrations/mysql/1750000000001-AddCurrentHistoryVersion.ts @@ -0,0 +1,11 @@ +import { MigrationInterface, QueryRunner } from 'typeorm' + +export class AddCurrentHistoryVersion1750000000001 implements MigrationInterface { + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query(`ALTER TABLE \`chat_flow\` ADD \`currentHistoryVersion\` int NULL`) + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query(`ALTER TABLE \`chat_flow\` DROP COLUMN \`currentHistoryVersion\``) + } +} diff --git a/packages/server/src/database/migrations/mysql/1750000000002-AddAssistantHistoryVersion.ts b/packages/server/src/database/migrations/mysql/1750000000002-AddAssistantHistoryVersion.ts new file mode 100644 index 00000000000..4a88986065c --- /dev/null +++ b/packages/server/src/database/migrations/mysql/1750000000002-AddAssistantHistoryVersion.ts @@ -0,0 +1,11 @@ +import { MigrationInterface, QueryRunner } from 'typeorm' + +export class AddAssistantHistoryVersion1750000000002 implements MigrationInterface { + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query(`ALTER TABLE \`assistant\` ADD \`currentHistoryVersion\` int NULL`) + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query(`ALTER TABLE \`assistant\` DROP COLUMN \`currentHistoryVersion\``) + } +} diff --git a/packages/server/src/database/migrations/mysql/index.ts b/packages/server/src/database/migrations/mysql/index.ts index c51ebb8a945..97908339193 100644 --- a/packages/server/src/database/migrations/mysql/index.ts +++ b/packages/server/src/database/migrations/mysql/index.ts @@ -37,6 +37,9 @@ import { FixOpenSourceAssistantTable1743758056188 } from './1743758056188-FixOpe import { AddErrorToEvaluationRun1744964560174 } from './1744964560174-AddErrorToEvaluationRun' import { FixErrorsColumnInEvaluationRun1746437114935 } from './1746437114935-FixErrorsColumnInEvaluationRun' import { ModifyExecutionDataColumnType1747902489801 } from './1747902489801-ModifyExecutionDataColumnType' +import { AddFlowHistoryEntity1750000000000 } from './1750000000000-AddFlowHistoryEntity' +import { AddCurrentHistoryVersion1750000000001 } from './1750000000001-AddCurrentHistoryVersion' +import { AddAssistantHistoryVersion1750000000002 } from './1750000000002-AddAssistantHistoryVersion' import { AddAuthTables1720230151482 } from '../../../enterprise/database/migrations/mysql/1720230151482-AddAuthTables' import { AddWorkspace1720230151484 } from '../../../enterprise/database/migrations/mysql/1720230151484-AddWorkspace' @@ -100,5 +103,8 @@ export const mysqlMigrations = [ AddErrorToEvaluationRun1744964560174, FixErrorsColumnInEvaluationRun1746437114935, ExecutionLinkWorkspaceId1746862866554, - ModifyExecutionDataColumnType1747902489801 + ModifyExecutionDataColumnType1747902489801, + AddFlowHistoryEntity1750000000000, + AddCurrentHistoryVersion1750000000001, + AddAssistantHistoryVersion1750000000002 ] diff --git a/packages/server/src/database/migrations/postgres/1750000000000-AddFlowHistoryEntity.ts b/packages/server/src/database/migrations/postgres/1750000000000-AddFlowHistoryEntity.ts new file mode 100644 index 00000000000..198d0174e1c --- /dev/null +++ b/packages/server/src/database/migrations/postgres/1750000000000-AddFlowHistoryEntity.ts @@ -0,0 +1,27 @@ +import { MigrationInterface, QueryRunner } from 'typeorm' + +export class AddFlowHistoryEntity1750000000000 implements MigrationInterface { + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query( + `CREATE TABLE IF NOT EXISTS "flow_history" ( + "id" uuid NOT NULL DEFAULT gen_random_uuid(), + "entityType" varchar(20) NOT NULL, + "entityId" uuid NOT NULL, + "snapshotData" text NOT NULL, + "changeDescription" text DEFAULT NULL, + "version" integer NOT NULL, + "createdDate" timestamp(6) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "workspaceId" text DEFAULT NULL, + CONSTRAINT "PK_flow_history_id" PRIMARY KEY ("id") + );` + ) + + await queryRunner.query(`CREATE INDEX "IDX_flow_history_entity_version" ON "flow_history" ("entityType", "entityId", "version");`) + + await queryRunner.query(`CREATE INDEX "IDX_flow_history_entity_date" ON "flow_history" ("entityType", "entityId", "createdDate");`) + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query(`DROP TABLE "flow_history"`) + } +} diff --git a/packages/server/src/database/migrations/postgres/1750000000001-AddCurrentHistoryVersion.ts b/packages/server/src/database/migrations/postgres/1750000000001-AddCurrentHistoryVersion.ts new file mode 100644 index 00000000000..2df04a3bea0 --- /dev/null +++ b/packages/server/src/database/migrations/postgres/1750000000001-AddCurrentHistoryVersion.ts @@ -0,0 +1,11 @@ +import { MigrationInterface, QueryRunner } from 'typeorm' + +export class AddCurrentHistoryVersion1750000000001 implements MigrationInterface { + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query(`ALTER TABLE "chat_flow" ADD "currentHistoryVersion" integer`) + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query(`ALTER TABLE "chat_flow" DROP COLUMN "currentHistoryVersion"`) + } +} diff --git a/packages/server/src/database/migrations/postgres/1750000000002-AddAssistantHistoryVersion.ts b/packages/server/src/database/migrations/postgres/1750000000002-AddAssistantHistoryVersion.ts new file mode 100644 index 00000000000..da2bf3da35e --- /dev/null +++ b/packages/server/src/database/migrations/postgres/1750000000002-AddAssistantHistoryVersion.ts @@ -0,0 +1,11 @@ +import { MigrationInterface, QueryRunner } from 'typeorm' + +export class AddAssistantHistoryVersion1750000000002 implements MigrationInterface { + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query(`ALTER TABLE "assistant" ADD "currentHistoryVersion" integer`) + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query(`ALTER TABLE "assistant" DROP COLUMN "currentHistoryVersion"`) + } +} diff --git a/packages/server/src/database/migrations/postgres/index.ts b/packages/server/src/database/migrations/postgres/index.ts index 4da17daa4ab..c64e1a30a61 100644 --- a/packages/server/src/database/migrations/postgres/index.ts +++ b/packages/server/src/database/migrations/postgres/index.ts @@ -36,6 +36,9 @@ import { AddExecutionEntity1738090872625 } from './1738090872625-AddExecutionEnt import { FixOpenSourceAssistantTable1743758056188 } from './1743758056188-FixOpenSourceAssistantTable' import { AddErrorToEvaluationRun1744964560174 } from './1744964560174-AddErrorToEvaluationRun' import { ModifyExecutionSessionIdFieldType1748450230238 } from './1748450230238-ModifyExecutionSessionIdFieldType' +import { AddFlowHistoryEntity1750000000000 } from './1750000000000-AddFlowHistoryEntity' +import { AddCurrentHistoryVersion1750000000001 } from './1750000000001-AddCurrentHistoryVersion' +import { AddAssistantHistoryVersion1750000000002 } from './1750000000002-AddAssistantHistoryVersion' import { AddAuthTables1720230151482 } from '../../../enterprise/database/migrations/postgres/1720230151482-AddAuthTables' import { AddWorkspace1720230151484 } from '../../../enterprise/database/migrations/postgres/1720230151484-AddWorkspace' @@ -98,5 +101,8 @@ export const postgresMigrations = [ FixOpenSourceAssistantTable1743758056188, AddErrorToEvaluationRun1744964560174, ExecutionLinkWorkspaceId1746862866554, - ModifyExecutionSessionIdFieldType1748450230238 + ModifyExecutionSessionIdFieldType1748450230238, + AddFlowHistoryEntity1750000000000, + AddCurrentHistoryVersion1750000000001, + AddAssistantHistoryVersion1750000000002 ] diff --git a/packages/server/src/database/migrations/sqlite/1750000000000-AddFlowHistoryEntity.ts b/packages/server/src/database/migrations/sqlite/1750000000000-AddFlowHistoryEntity.ts new file mode 100644 index 00000000000..034dbac28f9 --- /dev/null +++ b/packages/server/src/database/migrations/sqlite/1750000000000-AddFlowHistoryEntity.ts @@ -0,0 +1,34 @@ +import { MigrationInterface, QueryRunner } from 'typeorm' + +export class AddFlowHistoryEntity1750000000000 implements MigrationInterface { + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query( + `CREATE TABLE IF NOT EXISTS "flow_history" ( + "id" varchar PRIMARY KEY NOT NULL, + "entityType" varchar(20) NOT NULL, + "entityId" varchar NOT NULL, + "snapshotData" text NOT NULL, + "version" integer NOT NULL, + "changeDescription" text, + "workspaceId" varchar, + "createdDate" datetime NOT NULL DEFAULT (datetime('now')), + "updatedDate" datetime NOT NULL DEFAULT (datetime('now')) + );` + ) + + // Create indexes for better query performance + await queryRunner.query( + `CREATE INDEX IF NOT EXISTS "IDX_flow_history_entity_version" ON "flow_history" ("entityType", "entityId", "version");` + ) + + await queryRunner.query( + `CREATE INDEX IF NOT EXISTS "IDX_flow_history_entity_date" ON "flow_history" ("entityType", "entityId", "createdDate");` + ) + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query(`DROP INDEX IF EXISTS "IDX_flow_history_entity_date"`) + await queryRunner.query(`DROP INDEX IF EXISTS "IDX_flow_history_entity_version"`) + await queryRunner.query(`DROP TABLE IF EXISTS "flow_history"`) + } +} diff --git a/packages/server/src/database/migrations/sqlite/1750000000001-AddCurrentHistoryVersion.ts b/packages/server/src/database/migrations/sqlite/1750000000001-AddCurrentHistoryVersion.ts new file mode 100644 index 00000000000..2df04a3bea0 --- /dev/null +++ b/packages/server/src/database/migrations/sqlite/1750000000001-AddCurrentHistoryVersion.ts @@ -0,0 +1,11 @@ +import { MigrationInterface, QueryRunner } from 'typeorm' + +export class AddCurrentHistoryVersion1750000000001 implements MigrationInterface { + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query(`ALTER TABLE "chat_flow" ADD "currentHistoryVersion" integer`) + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query(`ALTER TABLE "chat_flow" DROP COLUMN "currentHistoryVersion"`) + } +} diff --git a/packages/server/src/database/migrations/sqlite/1750000000002-AddAssistantHistoryVersion.ts b/packages/server/src/database/migrations/sqlite/1750000000002-AddAssistantHistoryVersion.ts new file mode 100644 index 00000000000..da2bf3da35e --- /dev/null +++ b/packages/server/src/database/migrations/sqlite/1750000000002-AddAssistantHistoryVersion.ts @@ -0,0 +1,11 @@ +import { MigrationInterface, QueryRunner } from 'typeorm' + +export class AddAssistantHistoryVersion1750000000002 implements MigrationInterface { + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query(`ALTER TABLE "assistant" ADD "currentHistoryVersion" integer`) + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query(`ALTER TABLE "assistant" DROP COLUMN "currentHistoryVersion"`) + } +} diff --git a/packages/server/src/database/migrations/sqlite/index.ts b/packages/server/src/database/migrations/sqlite/index.ts index 0b15e26938f..a35846d867e 100644 --- a/packages/server/src/database/migrations/sqlite/index.ts +++ b/packages/server/src/database/migrations/sqlite/index.ts @@ -34,6 +34,9 @@ import { AddSeqNoToDatasetRow1733752119696 } from './1733752119696-AddSeqNoToDat import { AddExecutionEntity1738090872625 } from './1738090872625-AddExecutionEntity' import { FixOpenSourceAssistantTable1743758056188 } from './1743758056188-FixOpenSourceAssistantTable' import { AddErrorToEvaluationRun1744964560174 } from './1744964560174-AddErrorToEvaluationRun' +import { AddFlowHistoryEntity1750000000000 } from './1750000000000-AddFlowHistoryEntity' +import { AddCurrentHistoryVersion1750000000001 } from './1750000000001-AddCurrentHistoryVersion' +import { AddAssistantHistoryVersion1750000000002 } from './1750000000002-AddAssistantHistoryVersion' import { AddAuthTables1720230151482 } from '../../../enterprise/database/migrations/sqlite/1720230151482-AddAuthTables' import { AddWorkspace1720230151484 } from '../../../enterprise/database/migrations/sqlite/1720230151484-AddWorkspace' @@ -94,5 +97,8 @@ export const sqliteMigrations = [ AddExecutionEntity1738090872625, FixOpenSourceAssistantTable1743758056188, AddErrorToEvaluationRun1744964560174, - ExecutionLinkWorkspaceId1746862866554 + ExecutionLinkWorkspaceId1746862866554, + AddFlowHistoryEntity1750000000000, + AddCurrentHistoryVersion1750000000001, + AddAssistantHistoryVersion1750000000002 ] diff --git a/packages/server/src/routes/history/index.ts b/packages/server/src/routes/history/index.ts new file mode 100644 index 00000000000..a575b68bfc7 --- /dev/null +++ b/packages/server/src/routes/history/index.ts @@ -0,0 +1,26 @@ +import express from 'express' +import historyController from '../../controllers/history' +import { checkAnyPermission } from '../../enterprise/rbac/PermissionCheck' + +const router = express.Router() + +// GET specific snapshot by ID +router.get('/snapshot/:historyId', checkAnyPermission('chatflows:view,assistants:view'), historyController.getSnapshotById) + +// GET comparison between two snapshots +router.get( + '/compare/:historyId1/:historyId2', + checkAnyPermission('chatflows:view,assistants:view'), + historyController.getSnapshotComparison +) + +// POST restore from history snapshot +router.post('/restore/:historyId', checkAnyPermission('chatflows:update,assistants:update'), historyController.restoreSnapshot) + +// DELETE history snapshot +router.delete('/snapshot/:historyId', checkAnyPermission('chatflows:delete,assistants:delete'), historyController.deleteSnapshot) + +// GET history for an entity +router.get('/:entityType/:entityId', checkAnyPermission('chatflows:view,assistants:view'), historyController.getHistory) + +export default router diff --git a/packages/server/src/routes/index.ts b/packages/server/src/routes/index.ts index 4941a0076f9..1fe70a0fefa 100644 --- a/packages/server/src/routes/index.ts +++ b/packages/server/src/routes/index.ts @@ -20,6 +20,7 @@ import filesRouter from './files' import flowConfigRouter from './flow-config' import getUploadFileRouter from './get-upload-file' import getUploadPathRouter from './get-upload-path' +import historyRouter from './history' import internalChatmessagesRouter from './internal-chat-messages' import internalPredictionRouter from './internal-predictions' import leadsRouter from './leads' @@ -93,6 +94,7 @@ router.use('/internal-chatmessage', internalChatmessagesRouter) router.use('/internal-prediction', internalPredictionRouter) router.use('/get-upload-file', getUploadFileRouter) router.use('/get-upload-path', getUploadPathRouter) +router.use('/history', historyRouter) router.use('/leads', leadsRouter) router.use('/load-prompt', loadPromptRouter) router.use('/marketplaces', marketplacesRouter) diff --git a/packages/server/src/services/assistants/index.ts b/packages/server/src/services/assistants/index.ts index 72dfc4a008a..d3f938d4217 100644 --- a/packages/server/src/services/assistants/index.ts +++ b/packages/server/src/services/assistants/index.ts @@ -19,6 +19,7 @@ import logger from '../../utils/logger' import { ASSISTANT_PROMPT_GENERATOR } from '../../utils/prompt' import { checkUsageLimit } from '../../utils/quotaUsage' import nodesService from '../nodes' +import historyService from '../history' const createAssistant = async (requestBody: any, orgId: string): Promise => { try { @@ -46,6 +47,16 @@ const createAssistant = async (requestBody: any, orgId: string): Promise => { @@ -298,6 +299,22 @@ const saveChatflow = async ( { status: FLOWISE_COUNTER_STATUS.SUCCESS } ) + // Create initial history snapshot + const snapshot = await historyService.createSnapshot({ + entityType: 'CHATFLOW', + entityId: dbResponse.id, + entityData: dbResponse, + changeDescription: 'Initial creation', + workspaceId: dbResponse.workspaceId + }) + if (snapshot) { + // Re-fetch the chatflow to get the updated currentHistoryVersion + const updatedChatflow = await appServer.AppDataSource.getRepository(ChatFlow).findOne({ + where: { id: dbResponse.id } + }) + return updatedChatflow || dbResponse + } + return dbResponse } catch (error) { throw new InternalFlowiseError( @@ -330,6 +347,22 @@ const updateChatflow = async ( await _checkAndUpdateDocumentStoreUsage(newDbChatflow, chatflow.workspaceId) const dbResponse = await appServer.AppDataSource.getRepository(ChatFlow).save(newDbChatflow) + // Create history snapshot for update + const snapshot = await historyService.createSnapshot({ + entityType: 'CHATFLOW', + entityId: dbResponse.id, + entityData: dbResponse, + changeDescription: 'Updated', + workspaceId: dbResponse.workspaceId + }) + if (snapshot) { + // Re-fetch the chatflow to get the updated currentHistoryVersion + const updatedChatflow = await appServer.AppDataSource.getRepository(ChatFlow).findOne({ + where: { id: dbResponse.id } + }) + return updatedChatflow || dbResponse + } + return dbResponse } catch (error) { throw new InternalFlowiseError( diff --git a/packages/server/src/services/history/index.ts b/packages/server/src/services/history/index.ts new file mode 100644 index 00000000000..2da5c5acb2c --- /dev/null +++ b/packages/server/src/services/history/index.ts @@ -0,0 +1,305 @@ +import { StatusCodes } from 'http-status-codes' +import { FindOptionsWhere, Repository } from 'typeorm' +import { FlowHistory, EntityType } from '../../database/entities/FlowHistory' +import { ChatFlow } from '../../database/entities/ChatFlow' +import { Assistant } from '../../database/entities/Assistant' +import { InternalFlowiseError } from '../../errors/internalFlowiseError' +import { getErrorMessage } from '../../errors/utils' +import { getRunningExpressApp } from '../../utils/getRunningExpressApp' +import logger from '../../utils/logger' + +const updateEntityVersion = async (appServer: any, entityType: EntityType, entityId: string, version: number): Promise => { + if (entityType === 'CHATFLOW') { + const chatflowRepository = appServer.AppDataSource.getRepository(ChatFlow) + await chatflowRepository.update(entityId, { currentHistoryVersion: version }) + } else if (entityType === 'ASSISTANT') { + const assistantRepository = appServer.AppDataSource.getRepository(Assistant) + await assistantRepository.update(entityId, { currentHistoryVersion: version }) + } +} + +const cleanEntityData = (data: any) => { + const { updatedDate: _updatedDate, createdDate: _createdDate, currentHistoryVersion: _currentHistoryVersion, ...cleanData } = data + return cleanData +} + +const hasEntityDataChanged = async ( + entityType: EntityType, + entityId: string, + newEntityData: any, + historyRepository: Repository +): Promise => { + try { + const lastSnapshot = await historyRepository.findOne({ + where: { entityType, entityId }, + order: { version: 'DESC' } + }) + + if (!lastSnapshot) return true + + const lastData = cleanEntityData(JSON.parse(lastSnapshot.snapshotData)) + const newData = cleanEntityData(JSON.parse(JSON.stringify(newEntityData))) + + const hasChanged = JSON.stringify(lastData) !== JSON.stringify(newData) + logger.debug(`Data comparison for ${entityType} ${entityId}: ${hasChanged ? 'changed' : 'no changes'}`) + return hasChanged + } catch (error) { + logger.warn(`Failed to compare entity data for ${entityType} ${entityId}: ${getErrorMessage(error)}`) + return true + } +} + +interface CreateSnapshotOptions { + entityType: EntityType + entityId: string + entityData: any + changeDescription?: string + workspaceId?: string +} + +interface GetHistoryOptions { + entityType: EntityType + entityId: string + workspaceId?: string + limit?: number + offset?: number +} + +interface RestoreSnapshotOptions { + historyId: string + workspaceId?: string +} + +const createSnapshot = async ({ + entityType, + entityId, + entityData, + changeDescription, + workspaceId +}: CreateSnapshotOptions): Promise => { + try { + const appServer = getRunningExpressApp() + const historyRepository = appServer.AppDataSource.getRepository(FlowHistory) + + const dataHasChanged = await hasEntityDataChanged(entityType, entityId, entityData, historyRepository) + if (!dataHasChanged) { + logger.debug(`No changes detected for ${entityType} ${entityId}, skipping snapshot creation`) + return null + } + + const lastSnapshot = await historyRepository.findOne({ + where: { entityType, entityId }, + order: { version: 'DESC' } + }) + + const nextVersion = lastSnapshot ? lastSnapshot.version + 1 : 1 + + const snapshot = historyRepository.create({ + entityType, + entityId, + snapshotData: JSON.stringify(entityData), + changeDescription, + version: nextVersion, + workspaceId + }) + + const savedSnapshot = await historyRepository.save(snapshot) + await updateEntityVersion(appServer, entityType, entityId, nextVersion) + + logger.info(`Created history snapshot for ${entityType} ${entityId}, version ${nextVersion}`) + await cleanupOldSnapshots(entityType, entityId, 50) + + return savedSnapshot + } catch (error) { + logger.error(`Failed to create history snapshot for ${entityType} ${entityId}: ${getErrorMessage(error)}`) + return null + } +} + +const getHistory = async ({ + entityType, + entityId, + workspaceId, + limit = 20, + offset = 0 +}: GetHistoryOptions): Promise<{ data: FlowHistory[]; total: number }> => { + try { + const appServer = getRunningExpressApp() + const historyRepository = appServer.AppDataSource.getRepository(FlowHistory) + + const whereConditions: FindOptionsWhere = { entityType, entityId } + if (workspaceId) whereConditions.workspaceId = workspaceId + + const [data, total] = await historyRepository.findAndCount({ + where: whereConditions, + order: { version: 'DESC' }, + take: limit, + skip: offset + }) + + return { data, total } + } catch (error) { + throw new InternalFlowiseError(StatusCodes.INTERNAL_SERVER_ERROR, `Error: historyService.getHistory - ${getErrorMessage(error)}`) + } +} + +const getSnapshotById = async (historyId: string, workspaceId?: string): Promise => { + try { + const appServer = getRunningExpressApp() + const historyRepository = appServer.AppDataSource.getRepository(FlowHistory) + + const whereConditions: FindOptionsWhere = { id: historyId } + if (workspaceId) whereConditions.workspaceId = workspaceId + + const snapshot = await historyRepository.findOne({ where: whereConditions }) + if (!snapshot) { + throw new InternalFlowiseError(StatusCodes.NOT_FOUND, `History snapshot ${historyId} not found`) + } + + return snapshot + } catch (error) { + throw new InternalFlowiseError( + StatusCodes.INTERNAL_SERVER_ERROR, + `Error: historyService.getSnapshotById - ${getErrorMessage(error)}` + ) + } +} + +const restoreChatflow = async (appServer: any, snapshot: FlowHistory, snapshotData: any): Promise => { + const chatflowRepository = appServer.AppDataSource.getRepository(ChatFlow) + const existing = await chatflowRepository.findOne({ where: { id: snapshot.entityId } }) + + if (!existing) { + throw new InternalFlowiseError(StatusCodes.NOT_FOUND, `ChatFlow ${snapshot.entityId} not found`) + } + + const updated = chatflowRepository.merge(existing, { + ...snapshotData, + currentHistoryVersion: snapshot.version + }) + + return await chatflowRepository.save(updated) +} + +const restoreAssistant = async (appServer: any, snapshot: FlowHistory, snapshotData: any): Promise => { + const assistantRepository = appServer.AppDataSource.getRepository(Assistant) + const existing = await assistantRepository.findOne({ where: { id: snapshot.entityId } }) + + if (!existing) { + throw new InternalFlowiseError(StatusCodes.NOT_FOUND, `Assistant ${snapshot.entityId} not found`) + } + + const updated = assistantRepository.merge(existing, { + details: snapshotData.details, + credential: snapshotData.credential, + iconSrc: snapshotData.iconSrc, + type: snapshotData.type + }) + + return await assistantRepository.save(updated) +} + +const restoreSnapshot = async ({ historyId, workspaceId }: RestoreSnapshotOptions): Promise => { + try { + const appServer = getRunningExpressApp() + const snapshot = await getSnapshotById(historyId, workspaceId) + const snapshotData = JSON.parse(snapshot.snapshotData) + + const restoredEntity = + snapshot.entityType === 'CHATFLOW' + ? await restoreChatflow(appServer, snapshot, snapshotData) + : await restoreAssistant(appServer, snapshot, snapshotData) + + await createSnapshot({ + entityType: snapshot.entityType, + entityId: snapshot.entityId, + entityData: restoredEntity, + changeDescription: `Restored from version ${snapshot.version}`, + workspaceId + }) + + logger.info(`Restored ${snapshot.entityType} ${snapshot.entityId} from version ${snapshot.version}`) + return restoredEntity + } catch (error) { + throw new InternalFlowiseError( + StatusCodes.INTERNAL_SERVER_ERROR, + `Error: historyService.restoreSnapshot - ${getErrorMessage(error)}` + ) + } +} + +const deleteSnapshot = async (historyId: string, workspaceId?: string): Promise => { + try { + const appServer = getRunningExpressApp() + const historyRepository = appServer.AppDataSource.getRepository(FlowHistory) + + const whereConditions: FindOptionsWhere = { id: historyId } + if (workspaceId) whereConditions.workspaceId = workspaceId + + const result = await historyRepository.delete(whereConditions) + if (result.affected === 0) { + throw new InternalFlowiseError(StatusCodes.NOT_FOUND, `History snapshot ${historyId} not found`) + } + + logger.info(`Deleted history snapshot ${historyId}`) + } catch (error) { + throw new InternalFlowiseError( + StatusCodes.INTERNAL_SERVER_ERROR, + `Error: historyService.deleteSnapshot - ${getErrorMessage(error)}` + ) + } +} + +const cleanupOldSnapshots = async (entityType: EntityType, entityId: string, keepCount: number = 50): Promise => { + try { + const appServer = getRunningExpressApp() + const historyRepository = appServer.AppDataSource.getRepository(FlowHistory) + + const allSnapshots = await historyRepository.find({ + where: { entityType, entityId }, + order: { version: 'DESC' } + }) + + if (allSnapshots.length > keepCount) { + const idsToDelete = allSnapshots.slice(keepCount).map((s) => s.id) + await historyRepository.delete(idsToDelete) + logger.info(`Cleaned up ${idsToDelete.length} old snapshots for ${entityType} ${entityId}`) + } + } catch (error) { + logger.error(`Error cleaning up old snapshots: ${getErrorMessage(error)}`) + } +} + +const getSnapshotComparison = async ( + historyId1: string, + historyId2: string, + workspaceId?: string +): Promise<{ snapshot1: FlowHistory; snapshot2: FlowHistory }> => { + try { + const [snapshot1, snapshot2] = await Promise.all([ + getSnapshotById(historyId1, workspaceId), + getSnapshotById(historyId2, workspaceId) + ]) + + if (snapshot1.entityType !== snapshot2.entityType || snapshot1.entityId !== snapshot2.entityId) { + throw new InternalFlowiseError(StatusCodes.BAD_REQUEST, 'Cannot compare snapshots from different entities') + } + + return { snapshot1, snapshot2 } + } catch (error) { + throw new InternalFlowiseError( + StatusCodes.INTERNAL_SERVER_ERROR, + `Error: historyService.getSnapshotComparison - ${getErrorMessage(error)}` + ) + } +} + +export default { + createSnapshot, + getHistory, + getSnapshotById, + restoreSnapshot, + deleteSnapshot, + cleanupOldSnapshots, + getSnapshotComparison +} diff --git a/packages/server/src/utils/SSEStreamer.ts b/packages/server/src/utils/SSEStreamer.ts index 2b950579cc1..61c03f4801a 100644 --- a/packages/server/src/utils/SSEStreamer.ts +++ b/packages/server/src/utils/SSEStreamer.ts @@ -8,10 +8,20 @@ type Client = { response: Response // optional property with default value started?: boolean + // flag to indicate client is pending removal due to active MCP connections + pendingRemoval?: boolean } export class SSEStreamer implements IServerSideEventStreamer { clients: { [id: string]: Client } = {} + private activeMcpConnections: Map< + string, + { + toolName: string + startTime: number + status: 'active' | 'completing' + } + > = new Map() addExternalClient(chatId: string, res: Response) { this.clients[chatId] = { clientType: 'EXTERNAL', response: res, started: false } @@ -21,7 +31,52 @@ export class SSEStreamer implements IServerSideEventStreamer { this.clients[chatId] = { clientType: 'INTERNAL', response: res, started: false } } + addMcpConnection(chatId: string, toolName?: string) { + const connectionId = `${chatId}:${toolName || 'unknown'}` + this.activeMcpConnections.set(connectionId, { + toolName: toolName || 'unknown', + startTime: Date.now(), + status: 'active' + }) + } + + markMcpConnectionCompleting(chatId: string, toolName?: string) { + const connectionId = `${chatId}:${toolName || 'unknown'}` + const connection = this.activeMcpConnections.get(connectionId) + if (connection) { + connection.status = 'completing' + } + } + + removeMcpConnection(chatId: string, toolName?: string) { + const connectionId = `${chatId}:${toolName || 'unknown'}` + this.activeMcpConnections.delete(connectionId) + + const hasActiveConnections = Array.from(this.activeMcpConnections.keys()).some((key) => key.startsWith(`${chatId}:`)) + + if (!hasActiveConnections && this.clients[chatId]?.pendingRemoval) { + this.forceRemoveClient(chatId) + } + } + + hasMcpConnections(chatId: string): boolean { + return Array.from(this.activeMcpConnections.keys()).some((key) => key.startsWith(`${chatId}:`)) + } + removeClient(chatId: string) { + const client = this.clients[chatId] + if (client) { + // Check if there are active MCP connections for this chatId + if (this.hasMcpConnections(chatId)) { + client.pendingRemoval = true + return + } + + this.forceRemoveClient(chatId) + } + } + + forceRemoveClient(chatId: string) { const client = this.clients[chatId] if (client) { const clientResponse = { @@ -31,6 +86,8 @@ export class SSEStreamer implements IServerSideEventStreamer { client.response.write('message\ndata:' + JSON.stringify(clientResponse) + '\n\n') client.response.end() delete this.clients[chatId] + // Also clean up any remaining MCP connections + this.activeMcpConnections.delete(chatId) } } diff --git a/packages/ui/src/api/history.js b/packages/ui/src/api/history.js new file mode 100644 index 00000000000..03b049a9c45 --- /dev/null +++ b/packages/ui/src/api/history.js @@ -0,0 +1,19 @@ +import client from './client' + +const getHistory = (entityType, entityId, params) => client.get(`/history/${entityType}/${entityId}`, { params }) + +const getSnapshotById = (historyId) => client.get(`/history/snapshot/${historyId}`) + +const restoreSnapshot = (historyId) => client.post(`/history/restore/${historyId}`) + +const deleteSnapshot = (historyId) => client.delete(`/history/snapshot/${historyId}`) + +const getSnapshotComparison = (historyId1, historyId2) => client.get(`/history/compare/${historyId1}/${historyId2}`) + +export default { + getHistory, + getSnapshotById, + restoreSnapshot, + deleteSnapshot, + getSnapshotComparison +} diff --git a/packages/ui/src/ui-component/button/HistoryButton.jsx b/packages/ui/src/ui-component/button/HistoryButton.jsx new file mode 100644 index 00000000000..44a84d34044 --- /dev/null +++ b/packages/ui/src/ui-component/button/HistoryButton.jsx @@ -0,0 +1,61 @@ +import { useState } from 'react' +import PropTypes from 'prop-types' +import { IconButton, Tooltip } from '@mui/material' +import { IconHistory } from '@tabler/icons-react' + +// project imports +import HistoryDialog from '@/ui-component/dialog/HistoryDialog' + +const HistoryButton = ({ entityType, entityId, entityName, onRestore, disabled = false, size = 'medium', color = 'default' }) => { + const [showHistory, setShowHistory] = useState(false) + + const handleOpenHistory = () => { + if (!disabled && entityId) { + setShowHistory(true) + } + } + + const handleCloseHistory = () => { + setShowHistory(false) + } + + const handleRestore = (historyItem) => { + onRestore?.(historyItem) + // Optionally reload the current flow/assistant data here + } + + return ( + <> + + + + + + + + + + + ) +} + +HistoryButton.propTypes = { + entityType: PropTypes.string.isRequired, + entityId: PropTypes.string.isRequired, + entityName: PropTypes.string.isRequired, + onRestore: PropTypes.func, + disabled: PropTypes.bool, + size: PropTypes.string, + color: PropTypes.string +} + +export default HistoryButton diff --git a/packages/ui/src/ui-component/dialog/HistoryDialog.jsx b/packages/ui/src/ui-component/dialog/HistoryDialog.jsx new file mode 100644 index 00000000000..d5eb0daca09 --- /dev/null +++ b/packages/ui/src/ui-component/dialog/HistoryDialog.jsx @@ -0,0 +1,302 @@ +import { useState, useEffect, useCallback } from 'react' +import { createPortal } from 'react-dom' +import { + Dialog, + DialogContent, + DialogTitle, + Typography, + Box, + List, + ListItem, + ListItemText, + ListItemSecondaryAction, + IconButton, + Button, + Chip, + Paper, + Skeleton, + Alert, + DialogActions, + Tooltip +} from '@mui/material' +import { useTheme } from '@mui/material/styles' + +// icons +import { IconHistory, IconRestore, IconTrash, IconEye, IconX } from '@tabler/icons-react' + +// project imports +import useConfirm from '@/hooks/useConfirm' +import useNotifier from '@/utils/useNotifier' +import { useDispatch } from 'react-redux' +import { enqueueSnackbar as enqueueSnackbarAction } from '@/store/actions' + +// API +import historyApi from '@/api/history' + +// utils +import moment from 'moment' + +const HistoryDialog = ({ show, dialogProps, onCancel, onRestore }) => { + const theme = useTheme() + const portalElement = document.getElementById('portal') + const { confirm } = useConfirm() + const dispatch = useDispatch() + useNotifier() // Side effect hook + + const enqueueSnackbar = useCallback((...args) => dispatch(enqueueSnackbarAction(...args)), [dispatch]) + + const [historyItems, setHistoryItems] = useState([]) + const [loading, setLoading] = useState(false) + const [selectedSnapshot, setSelectedSnapshot] = useState(null) + const [currentPage, setCurrentPage] = useState(1) + const [totalItems, setTotalItems] = useState(0) + const [latestVersion, setLatestVersion] = useState(null) + const [reloadTrigger, setReloadTrigger] = useState(0) + + const { entityType, entityId, entityName, currentVersion } = dialogProps || {} + const itemsPerPage = 10 + + // Load history when dialog opens + useEffect(() => { + if (!show || !entityType || !entityId) return + + const loadHistory = async () => { + try { + setLoading(true) + const params = { page: currentPage, limit: itemsPerPage } + const response = await historyApi.getHistory(entityType, entityId, params) + const historyData = response.data.data || [] + + setHistoryItems(historyData) + setTotalItems(response.data.total) + + if (latestVersion === null && currentPage === 1) { + // Set latest version only when on first page + setLatestVersion(historyData[0]?.version || null) + } + } catch (error) { + enqueueSnackbar({ + message: `Failed to load history: ${error.message}`, + options: { variant: 'error' } + }) + } finally { + setLoading(false) + } + } + + loadHistory() + }, [show, entityType, entityId, currentPage, reloadTrigger, enqueueSnackbar]) + + const handleRestore = async (historyItem) => { + const confirmed = await confirm({ + title: 'Restore Version', + description: `Are you sure you want to restore "${entityName}" to version ${historyItem.version}? This will create a new version with the restored data.`, + confirmButtonName: 'Restore', + cancelButtonName: 'Cancel' + }) + + if (confirmed) { + try { + const response = await historyApi.restoreSnapshot(historyItem.id) + enqueueSnackbar({ + message: `Successfully restored to version ${historyItem.version}`, + options: { variant: 'success' } + }) + const restoreData = { ...response.data, version: historyItem.version } + onRestore?.(restoreData) + onCancel() + } catch (error) { + console.error('Restore error:', error) + enqueueSnackbar({ + message: `Failed to restore: ${error.message}`, + options: { variant: 'error' } + }) + } + } + } + + const handleDelete = async (historyItem) => { + const confirmed = await confirm({ + title: 'Delete Version', + description: `Are you sure you want to delete version ${historyItem.version}? This action cannot be undone.`, + confirmButtonName: 'Delete', + cancelButtonName: 'Cancel' + }) + + if (confirmed) { + try { + await historyApi.deleteSnapshot(historyItem.id) + enqueueSnackbar({ + message: `Successfully deleted version ${historyItem.version}`, + options: { variant: 'success' } + }) + + // Force reload by triggering useEffect + setHistoryItems([]) + setLatestVersion(null) + setReloadTrigger((prev) => prev + 1) + } catch (error) { + console.error('Delete error:', error) + enqueueSnackbar({ + message: `Failed to delete: ${error.message}`, + options: { variant: 'error' } + }) + } + } + } + + const handleViewSnapshot = async (historyItem) => { + try { + const response = await historyApi.getSnapshotById(historyItem.id) + setSelectedSnapshot(response.data) + } catch (error) { + enqueueSnackbar({ + message: `Failed to load snapshot: ${error.message}`, + options: { variant: 'error' } + }) + } + } + + const formatDate = (dateString) => moment(dateString).fromNow() + const getVersionChipColor = (version, isCurrent) => (isCurrent ? 'success' : version === 1 ? 'secondary' : 'default') + + const component = show ? ( + + + + Version History - {entityName} + + + + {loading ? ( + + {[1, 2, 3].map((item) => ( + + ))} + + ) : historyItems.length === 0 ? ( + No version history found. History is automatically created when you save changes. + ) : ( + + + {historyItems.map((item) => { + const isCurrent = item.version === currentVersion + const isLatest = item.version === latestVersion + return ( + + + + + {isCurrent && } + {isLatest && !isCurrent && } + + {item.changeDescription || 'No description'} + + + } + secondary={ + + {formatDate(item.createdDate)} + + } + /> + + + + handleViewSnapshot(item)}> + + + + + {!isCurrent && ( + + handleRestore(item)} color='primary'> + + + + )} + + + handleDelete(item)} color='error'> + + + + + + + + ) + })} + + + {totalItems > itemsPerPage && ( + + + + Page {currentPage} of {Math.ceil(totalItems / itemsPerPage)} + + + + )} + + )} + + + + + + + {selectedSnapshot && ( + setSelectedSnapshot(null)}> + + + Version {selectedSnapshot.version} Snapshot + setSelectedSnapshot(null)}> + + + + + + Change Description: {selectedSnapshot.changeDescription || 'No description'} + + + Created: {formatDate(selectedSnapshot.createdDate)} + + + + + {JSON.stringify(JSON.parse(selectedSnapshot.snapshotData), null, 2)} + + + + + )} + + ) : null + + return createPortal(component, portalElement) +} + +export default HistoryDialog diff --git a/packages/ui/src/views/agentflowsv2/Canvas.jsx b/packages/ui/src/views/agentflowsv2/Canvas.jsx index 3a724666f94..99c4c6c6de7 100644 --- a/packages/ui/src/views/agentflowsv2/Canvas.jsx +++ b/packages/ui/src/views/agentflowsv2/Canvas.jsx @@ -240,6 +240,44 @@ const AgentflowCanvas = () => { } } + const handleFlowReload = (restoredChatflow) => { + // Directly update the canvas with the restored flow data + if (restoredChatflow && restoredChatflow.flowData) { + try { + const restoredFlow = JSON.parse(restoredChatflow.flowData) + + // Force React Flow to recognize the change by creating new array references + const newNodes = [...(restoredFlow.nodes || [])] + const newEdges = [...(restoredFlow.edges || [])] + + // Clear first, then set new data to force re-render + setNodes([]) + setEdges([]) + + // Use setTimeout to ensure React processes the clear operation first + setTimeout(() => { + setNodes(newNodes) + setEdges(newEdges) + dispatch({ type: SET_CHATFLOW, chatflow: restoredChatflow }) + dispatch({ type: REMOVE_DIRTY }) + + // Force React Flow to update its viewport after restore + if (reactFlowInstance) { + setTimeout(() => { + reactFlowInstance.fitView({ padding: 0.1 }) + }, 100) + } + }, 10) + } catch (error) { + console.error('Error parsing restored flow data:', error) + // Fallback to refetching from server + if (restoredChatflow.id) { + getSpecificChatflowApi.request(restoredChatflow.id) + } + } + } + } + // eslint-disable-next-line const onNodeClick = useCallback((event, clickedNode) => { setSelectedNode(clickedNode) @@ -694,6 +732,7 @@ const AgentflowCanvas = () => { handleSaveFlow={handleSaveFlow} handleDeleteFlow={handleDeleteFlow} handleLoadFlow={handleLoadFlow} + onFlowReload={handleFlowReload} isAgentCanvas={true} isAgentflowV2={true} /> diff --git a/packages/ui/src/views/assistants/custom/CustomAssistantConfigurePreview.jsx b/packages/ui/src/views/assistants/custom/CustomAssistantConfigurePreview.jsx index 5a45d247d88..f16e64c2460 100644 --- a/packages/ui/src/views/assistants/custom/CustomAssistantConfigurePreview.jsx +++ b/packages/ui/src/views/assistants/custom/CustomAssistantConfigurePreview.jsx @@ -20,7 +20,8 @@ import { IconX, IconTrash, IconWand, - IconArrowsMaximize + IconArrowsMaximize, + IconHistory } from '@tabler/icons-react' // Project import @@ -42,6 +43,7 @@ import PromptGeneratorDialog from '@/ui-component/dialog/PromptGeneratorDialog' import { Available } from '@/ui-component/rbac/available' import ExpandTextDialog from '@/ui-component/dialog/ExpandTextDialog' import { SwitchInput } from '@/ui-component/switch/Switch' +import HistoryDialog from '@/ui-component/dialog/HistoryDialog' // API import assistantsApi from '@/api/assistants' @@ -115,6 +117,8 @@ const CustomAssistantConfigurePreview = () => { const [assistantPromptGeneratorDialogProps, setAssistantPromptGeneratorDialogProps] = useState({}) const [showExpandDialog, setShowExpandDialog] = useState(false) const [expandDialogProps, setExpandDialogProps] = useState({}) + const [historyDialogOpen, setHistoryDialogOpen] = useState(false) + const [historyDialogProps, setHistoryDialogProps] = useState({}) const [loading, setLoading] = useState(false) const [loadingAssistant, setLoadingAssistant] = useState(true) @@ -664,6 +668,55 @@ const CustomAssistantConfigurePreview = () => { setAPIDialogOpen(true) } + const onHistoryClick = () => { + if (selectedCustomAssistant?.id) { + setHistoryDialogProps({ + entityType: 'ASSISTANT', + entityId: selectedCustomAssistant.id, + entityName: selectedCustomAssistant.name || selectedCustomAssistant.details?.name || 'Untitled', + currentVersion: selectedCustomAssistant.currentHistoryVersion + }) + setHistoryDialogOpen(true) + } + } + + const onHistoryRestore = async (restoredData) => { + try { + if (restoredData && restoredData.entity) { + enqueueSnackbar({ + message: `Successfully restored to version ${restoredData.version || 'previous'}. Refreshing...`, + options: { + key: new Date().getTime() + Math.random(), + variant: 'success' + } + }) + + // Give the user a moment to see the success message, then reload + setTimeout(() => { + window.location.reload() + }, 500) + } else { + enqueueSnackbar({ + message: 'Successfully restored. Please refresh the page to see changes.', + options: { + key: new Date().getTime() + Math.random(), + variant: 'success', + action: (_key) => ( + + ) + } + }) + } + } catch (error) { + enqueueSnackbar({ + message: `Failed to restore: ${error.message}`, + options: { variant: 'error' } + }) + } + } + const onDocStoreItemSelected = (docStoreIds) => { const docStoresIds = JSON.parse(docStoreIds) const newSelectedDocumentStores = [] @@ -752,7 +805,12 @@ const CustomAssistantConfigurePreview = () => { setLoadingAssistant(false) try { const assistantDetails = JSON.parse(getSpecificAssistantApi.data.details) - setSelectedCustomAssistant(assistantDetails) + // Set the full assistant entity (not just the details) + setSelectedCustomAssistant({ + ...getSpecificAssistantApi.data, + name: assistantDetails.name, + details: assistantDetails + }) if (assistantDetails.chatModel) { setSelectedChatModel(assistantDetails.chatModel) @@ -867,7 +925,7 @@ const CustomAssistantConfigurePreview = () => { - {selectedCustomAssistant?.name ?? ''} + {selectedCustomAssistant?.name || selectedCustomAssistant?.details?.name || ''}
@@ -915,6 +973,30 @@ const CustomAssistantConfigurePreview = () => { + {selectedCustomAssistant?.id && ( + + + + + + + + )} {customAssistantFlowId && !loadingAssistant && ( { setShowExpandDialog(false) }} > + setHistoryDialogOpen(false)} + onRestore={onHistoryRestore} + /> ) diff --git a/packages/ui/src/views/assistants/openai/AssistantDialog.jsx b/packages/ui/src/views/assistants/openai/AssistantDialog.jsx index 7b2b3041cc9..2300d98bb5c 100644 --- a/packages/ui/src/views/assistants/openai/AssistantDialog.jsx +++ b/packages/ui/src/views/assistants/openai/AssistantDialog.jsx @@ -29,6 +29,7 @@ import { File } from '@/ui-component/file/File' import { BackdropLoader } from '@/ui-component/loading/BackdropLoader' import DeleteConfirmDialog from './DeleteConfirmDialog' import AssistantVectorStoreDialog from './AssistantVectorStoreDialog' +import HistoryButton from '@/ui-component/button/HistoryButton' import { StyledPermissionButton } from '@/ui-component/button/RBACButtons' // Icons @@ -1035,6 +1036,28 @@ const AssistantDialog = ({ show, dialogProps, onCancel, onConfirm, setError }) = + {dialogProps.type === 'EDIT' && assistantId && ( + { + enqueueSnackbar({ + message: `Successfully restored to version ${historyItem.version}. Please reload the assistant to see changes.`, + options: { + key: new Date().getTime() + Math.random(), + variant: 'success', + action: (key) => ( + + ) + } + }) + }} + /> + )} {dialogProps.type === 'EDIT' && ( { +const CanvasHeader = ({ + chatflow, + isAgentCanvas, + isAgentflowV2, + handleSaveFlow, + handleDeleteFlow, + handleLoadFlow, + onFlowReload: _onFlowReload +}) => { const theme = useTheme() const dispatch = useDispatch() const navigate = useNavigate() @@ -58,6 +67,8 @@ const CanvasHeader = ({ chatflow, isAgentCanvas, isAgentflowV2, handleSaveFlow, const [exportAsTemplateDialogOpen, setExportAsTemplateDialogOpen] = useState(false) const [exportAsTemplateDialogProps, setExportAsTemplateDialogProps] = useState({}) + const [historyDialogOpen, setHistoryDialogOpen] = useState(false) + const [historyDialogProps, setHistoryDialogProps] = useState({}) const enqueueSnackbar = (...args) => dispatch(enqueueSnackbarAction(...args)) const closeSnackbar = (...args) => dispatch(closeSnackbarAction(...args)) @@ -218,6 +229,62 @@ const CanvasHeader = ({ chatflow, isAgentCanvas, isAgentflowV2, handleSaveFlow, else setFlowDialogOpen(true) } + const onHistoryClick = () => { + if (chatflow?.id) { + setHistoryDialogProps({ + entityType: 'CHATFLOW', // All canvas types use CHATFLOW entity type + entityId: chatflow.id, + entityName: chatflow.name || 'Untitled', + currentVersion: chatflow.currentHistoryVersion + }) + setHistoryDialogOpen(true) + } + } + + const onHistoryRestore = async (restoredData) => { + try { + // Update the chatflow data immediately after restore + if (restoredData && restoredData.entity) { + // Always do a page reload for now to ensure proper canvas update + enqueueSnackbar({ + message: `Successfully restored to version ${restoredData.version || 'previous'}. Refreshing...`, + options: { + key: new Date().getTime() + Math.random(), + variant: 'success' + } + }) + + // Give the user a moment to see the success message, then reload + setTimeout(() => { + window.location.reload() + }, 500) + } else { + // Fallback to refresh if no data returned + enqueueSnackbar({ + message: 'Successfully restored. Please refresh the page to see changes.', + options: { + key: new Date().getTime() + Math.random(), + variant: 'success', + action: (_key) => ( + + ) + } + }) + } + } catch (error) { + console.error('Error in onHistoryRestore:', error) + enqueueSnackbar({ + message: `Error during restore: ${error.message}`, + options: { + key: new Date().getTime() + Math.random(), + variant: 'error' + } + }) + } + } + const onConfirmSaveName = (flowName) => { setFlowDialogOpen(false) setSavePermission(isAgentCanvas ? 'agentflows:update' : 'chatflows:update') @@ -432,6 +499,28 @@ const CanvasHeader = ({ chatflow, isAgentCanvas, isAgentflowV2, handleSaveFlow, + {chatflow?.id && ( + + + + + + )} setChatflowConfigurationDialogOpen(false)} isAgentCanvas={isAgentCanvas} /> + setHistoryDialogOpen(false)} + onRestore={onHistoryRestore} + /> ) } @@ -507,6 +602,7 @@ CanvasHeader.propTypes = { handleSaveFlow: PropTypes.func, handleDeleteFlow: PropTypes.func, handleLoadFlow: PropTypes.func, + onFlowReload: PropTypes.func, isAgentCanvas: PropTypes.bool, isAgentflowV2: PropTypes.bool } diff --git a/packages/ui/src/views/canvas/index.jsx b/packages/ui/src/views/canvas/index.jsx index ebfbd0506fa..142dbbfb642 100644 --- a/packages/ui/src/views/canvas/index.jsx +++ b/packages/ui/src/views/canvas/index.jsx @@ -243,6 +243,45 @@ const Canvas = () => { } } + const handleFlowReload = (restoredChatflow) => { + // Directly update the canvas with the restored flow data + if (restoredChatflow && restoredChatflow.flowData) { + try { + const restoredFlow = JSON.parse(restoredChatflow.flowData) + + // Force React Flow to recognize the change by creating new array references + const newNodes = [...(restoredFlow.nodes || [])] + const newEdges = [...(restoredFlow.edges || [])] + + // Clear first, then set new data to force re-render + setNodes([]) + setEdges([]) + + // Use setTimeout to ensure React processes the clear operation first + setTimeout(() => { + setNodes(newNodes) + setEdges(newEdges) + setLasUpdatedDateTime(restoredChatflow.updatedDate) + dispatch({ type: SET_CHATFLOW, chatflow: restoredChatflow }) + dispatch({ type: REMOVE_DIRTY }) + + // Force React Flow to update its viewport after restore + if (reactFlowInstance) { + setTimeout(() => { + reactFlowInstance.fitView({ padding: 0.1 }) + }, 100) + } + }, 10) + } catch (error) { + console.error('Error parsing restored flow data:', error) + // Fallback to refetching from server + if (restoredChatflow.id) { + getSpecificChatflowApi.request(restoredChatflow.id) + } + } + } + } + // eslint-disable-next-line const onNodeClick = useCallback((event, clickedNode) => { setSelectedNode(clickedNode) @@ -575,6 +614,7 @@ const Canvas = () => { handleSaveFlow={handleSaveFlow} handleDeleteFlow={handleDeleteFlow} handleLoadFlow={handleLoadFlow} + onFlowReload={handleFlowReload} isAgentCanvas={isAgentCanvas} />