-
Notifications
You must be signed in to change notification settings - Fork 3.2k
feat(workflow-as-mcp): wrapping workflows in the same workspace as MCP tools to particular MCP servers #2415
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 25 commits
Commits
Show all changes
48 commits
Select commit
Hold shift + click to select a range
9321743
added a workflow as mcp
a15ac73
fixed the issue of UI rendering for deleted mcp servers
57f3697
fixing lint issues
85af046
using mcn components
2259bfc
fixing merge conflicts
1f22d7a
fix
cfbe4a4
fix lint errors
f2450d3
refactored code to use hasstartblock from the tirgger utils
896e967
removing unecessary auth
6c10f31
using official mcp sdk and added description fields
fe4f895
using normalised input schema function
9743a2f
Merge branch 'staging' into feat/workflow-as-mcp
4bb9a8d
ui fixes part 1
Sg312 b94c716
remove migration before merge
icecrasher321 e48aab1
Merge branch 'staging' into feat/workflow-as-mcp
icecrasher321 6771195
fix merge conflicts
icecrasher321 8b4f2a0
remove migration to prep merge
icecrasher321 a40243f
Merge branch 'staging' into feat/workflow-as-mcp
icecrasher321 e90fdb4
re-add migration
icecrasher321 cfdbcee
cleanup code to use mcp sdk types
icecrasher321 6856b26
fix discovery calls
icecrasher321 46f6383
Merge branch 'staging' into feat/workflow-as-mcp
icecrasher321 8489dd5
add migration
icecrasher321 da144ab
ui improvements
icecrasher321 b2153cd
fix lint
icecrasher321 ab44c11
fix types
icecrasher321 6dac845
fix lint
icecrasher321 24d58ea
fix spacing
icecrasher321 feacd08
remove migration to prep merge
icecrasher321 90e0b93
Merge branch 'staging' into feat/workflow-as-mcp
icecrasher321 fe3ed9f
add migration back
icecrasher321 c50b067
Merge branch 'staging' into feat/workflow-as-mcp
icecrasher321 fc77143
fix imports
icecrasher321 1998efa
Merge staging into feat/workflow-as-mcp
icecrasher321 1c97aab
Merge staging into feat/workflow-as-mcp
icecrasher321 404b46f
fix tool refresh ux
icecrasher321 08909ec
fix test failures
icecrasher321 216fb42
fix tests
icecrasher321 0346aac
cleanup code
icecrasher321 d6a0036
styling improvements, ability to edit mcp server description, etc
waleedlatif1 599a5db
fixed ui in light mode api keys modal
waleedlatif1 0badbdd
update docs
icecrasher321 725cdac
Merge branch 'feat/workflow-as-mcp' of github.com:simstudioai/sim int…
icecrasher321 6b6b57b
deprecated unused input components, shifted to emcn
waleedlatif1 62bc844
updated playground, simplified components
waleedlatif1 ae74f83
Merge branch 'staging' into feat/workflow-as-mcp
waleedlatif1 6a5ffa2
move images and videos
waleedlatif1 98b9845
updated more docs images
waleedlatif1 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,88 @@ | ||
| import { db } from '@sim/db' | ||
| import { permissions, workflowMcpServer, workspace } from '@sim/db/schema' | ||
| import { and, eq, sql } from 'drizzle-orm' | ||
| import { type NextRequest, NextResponse } from 'next/server' | ||
| import { checkHybridAuth } from '@/lib/auth/hybrid' | ||
| import { getBaseUrl } from '@/lib/core/utils/urls' | ||
| import { createLogger } from '@/lib/logs/console/logger' | ||
|
|
||
| const logger = createLogger('McpDiscoverAPI') | ||
|
|
||
| export const dynamic = 'force-dynamic' | ||
|
|
||
| /** | ||
| * Discover all MCP servers available to the authenticated user. | ||
| */ | ||
| export async function GET(request: NextRequest) { | ||
| try { | ||
| const auth = await checkHybridAuth(request, { requireWorkflowId: false }) | ||
|
|
||
| if (!auth.success || !auth.userId) { | ||
| return NextResponse.json( | ||
| { success: false, error: 'Authentication required. Provide X-API-Key header.' }, | ||
| { status: 401 } | ||
| ) | ||
| } | ||
|
|
||
| const userId = auth.userId | ||
|
|
||
| const userWorkspacePermissions = await db | ||
| .select({ entityId: permissions.entityId }) | ||
| .from(permissions) | ||
| .where(and(eq(permissions.userId, userId), eq(permissions.entityType, 'workspace'))) | ||
|
|
||
| const workspaceIds = userWorkspacePermissions.map((w) => w.entityId) | ||
|
|
||
| if (workspaceIds.length === 0) { | ||
| return NextResponse.json({ success: true, servers: [] }) | ||
| } | ||
|
|
||
| const servers = await db | ||
| .select({ | ||
| id: workflowMcpServer.id, | ||
| name: workflowMcpServer.name, | ||
| description: workflowMcpServer.description, | ||
| workspaceId: workflowMcpServer.workspaceId, | ||
| workspaceName: workspace.name, | ||
| createdAt: workflowMcpServer.createdAt, | ||
| toolCount: sql<number>`( | ||
| SELECT COUNT(*)::int | ||
| FROM "workflow_mcp_tool" | ||
| WHERE "workflow_mcp_tool"."server_id" = "workflow_mcp_server"."id" | ||
| )`.as('tool_count'), | ||
| }) | ||
| .from(workflowMcpServer) | ||
| .leftJoin(workspace, eq(workflowMcpServer.workspaceId, workspace.id)) | ||
| .where(sql`${workflowMcpServer.workspaceId} IN ${workspaceIds}`) | ||
| .orderBy(workflowMcpServer.name) | ||
|
|
||
| const baseUrl = getBaseUrl() | ||
|
|
||
| const formattedServers = servers.map((server) => ({ | ||
| id: server.id, | ||
| name: server.name, | ||
| description: server.description, | ||
| workspace: { id: server.workspaceId, name: server.workspaceName }, | ||
| toolCount: server.toolCount || 0, | ||
| createdAt: server.createdAt, | ||
| url: `${baseUrl}/api/mcp/serve/${server.id}`, | ||
| })) | ||
|
|
||
| logger.info(`User ${userId} discovered ${formattedServers.length} MCP servers`) | ||
|
|
||
| return NextResponse.json({ | ||
| success: true, | ||
| servers: formattedServers, | ||
| authentication: { | ||
| method: 'API Key', | ||
| header: 'X-API-Key', | ||
| }, | ||
| }) | ||
| } catch (error) { | ||
| logger.error('Error discovering MCP servers:', error) | ||
| return NextResponse.json( | ||
| { success: false, error: 'Failed to discover MCP servers' }, | ||
| { status: 500 } | ||
| ) | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,306 @@ | ||
| /** | ||
| * MCP Serve Endpoint - Implements MCP protocol for workflow servers using SDK types. | ||
| */ | ||
|
|
||
| import { | ||
| type CallToolResult, | ||
| ErrorCode, | ||
| type InitializeResult, | ||
| isJSONRPCNotification, | ||
| isJSONRPCRequest, | ||
| type JSONRPCError, | ||
| type JSONRPCMessage, | ||
| type JSONRPCResponse, | ||
| type ListToolsResult, | ||
| type RequestId, | ||
| } from '@modelcontextprotocol/sdk/types.js' | ||
| import { db } from '@sim/db' | ||
| import { workflow, workflowMcpServer, workflowMcpTool } from '@sim/db/schema' | ||
| import { eq } from 'drizzle-orm' | ||
| import { type NextRequest, NextResponse } from 'next/server' | ||
| import { checkHybridAuth } from '@/lib/auth/hybrid' | ||
| import { getBaseUrl } from '@/lib/core/utils/urls' | ||
| import { createLogger } from '@/lib/logs/console/logger' | ||
|
|
||
| const logger = createLogger('WorkflowMcpServeAPI') | ||
|
|
||
| export const dynamic = 'force-dynamic' | ||
|
|
||
| interface RouteParams { | ||
| serverId: string | ||
| } | ||
|
|
||
| function createResponse(id: RequestId, result: unknown): JSONRPCResponse { | ||
| return { | ||
| jsonrpc: '2.0', | ||
| id, | ||
| result: result as JSONRPCResponse['result'], | ||
| } | ||
| } | ||
|
|
||
| function createError(id: RequestId, code: ErrorCode | number, message: string): JSONRPCError { | ||
| return { | ||
| jsonrpc: '2.0', | ||
| id, | ||
| error: { code, message }, | ||
| } | ||
| } | ||
|
|
||
| async function getServer(serverId: string) { | ||
| const [server] = await db | ||
| .select({ | ||
| id: workflowMcpServer.id, | ||
| name: workflowMcpServer.name, | ||
| workspaceId: workflowMcpServer.workspaceId, | ||
| }) | ||
| .from(workflowMcpServer) | ||
| .where(eq(workflowMcpServer.id, serverId)) | ||
| .limit(1) | ||
|
|
||
| return server | ||
| } | ||
|
|
||
| export async function GET(request: NextRequest, { params }: { params: Promise<RouteParams> }) { | ||
| const { serverId } = await params | ||
|
|
||
| try { | ||
| const server = await getServer(serverId) | ||
| if (!server) { | ||
| return NextResponse.json({ error: 'Server not found' }, { status: 404 }) | ||
| } | ||
|
|
||
| return NextResponse.json({ | ||
| name: server.name, | ||
| version: '1.0.0', | ||
| protocolVersion: '2024-11-05', | ||
| capabilities: { tools: {} }, | ||
| }) | ||
| } catch (error) { | ||
| logger.error('Error getting MCP server info:', error) | ||
| return NextResponse.json({ error: 'Internal server error' }, { status: 500 }) | ||
| } | ||
| } | ||
|
|
||
| export async function POST(request: NextRequest, { params }: { params: Promise<RouteParams> }) { | ||
| const { serverId } = await params | ||
|
|
||
| try { | ||
| const server = await getServer(serverId) | ||
| if (!server) { | ||
| return NextResponse.json({ error: 'Server not found' }, { status: 404 }) | ||
| } | ||
|
|
||
| const auth = await checkHybridAuth(request, { requireWorkflowId: false }) | ||
| if (!auth.success || !auth.userId) { | ||
| return NextResponse.json({ error: 'Unauthorized' }, { status: 401 }) | ||
| } | ||
|
|
||
| const body = await request.json() | ||
| const message = body as JSONRPCMessage | ||
|
|
||
| if (isJSONRPCNotification(message)) { | ||
| logger.info(`Received notification: ${message.method}`) | ||
| return new NextResponse(null, { status: 202 }) | ||
| } | ||
|
|
||
| if (!isJSONRPCRequest(message)) { | ||
| return NextResponse.json( | ||
| createError(0, ErrorCode.InvalidRequest, 'Invalid JSON-RPC message'), | ||
| { | ||
| status: 400, | ||
| } | ||
| ) | ||
| } | ||
|
|
||
| const { id, method, params: rpcParams } = message | ||
| const apiKey = | ||
| request.headers.get('X-API-Key') || | ||
| request.headers.get('Authorization')?.replace('Bearer ', '') | ||
|
|
||
| switch (method) { | ||
| case 'initialize': { | ||
| const result: InitializeResult = { | ||
| protocolVersion: '2024-11-05', | ||
| capabilities: { tools: {} }, | ||
| serverInfo: { name: server.name, version: '1.0.0' }, | ||
| } | ||
| return NextResponse.json(createResponse(id, result)) | ||
| } | ||
|
|
||
| case 'ping': | ||
| return NextResponse.json(createResponse(id, {})) | ||
|
|
||
| case 'tools/list': | ||
| return handleToolsList(id, serverId) | ||
|
|
||
| case 'tools/call': | ||
| return handleToolsCall( | ||
| id, | ||
| serverId, | ||
| rpcParams as { name: string; arguments?: Record<string, unknown> }, | ||
| apiKey | ||
| ) | ||
|
|
||
| default: | ||
| return NextResponse.json( | ||
| createError(id, ErrorCode.MethodNotFound, `Method not found: ${method}`), | ||
| { | ||
| status: 404, | ||
| } | ||
| ) | ||
| } | ||
| } catch (error) { | ||
| logger.error('Error handling MCP request:', error) | ||
| return NextResponse.json(createError(0, ErrorCode.InternalError, 'Internal error'), { | ||
| status: 500, | ||
| }) | ||
| } | ||
| } | ||
|
|
||
| async function handleToolsList(id: RequestId, serverId: string): Promise<NextResponse> { | ||
| try { | ||
| const tools = await db | ||
| .select({ | ||
| toolName: workflowMcpTool.toolName, | ||
| toolDescription: workflowMcpTool.toolDescription, | ||
| parameterSchema: workflowMcpTool.parameterSchema, | ||
| }) | ||
| .from(workflowMcpTool) | ||
| .where(eq(workflowMcpTool.serverId, serverId)) | ||
|
|
||
| const result: ListToolsResult = { | ||
| tools: tools.map((tool) => { | ||
| const schema = tool.parameterSchema as { | ||
| type?: string | ||
| properties?: Record<string, unknown> | ||
| required?: string[] | ||
| } | null | ||
| return { | ||
| name: tool.toolName, | ||
| description: tool.toolDescription || `Execute workflow: ${tool.toolName}`, | ||
| inputSchema: { | ||
| type: 'object' as const, | ||
| properties: schema?.properties || {}, | ||
| ...(schema?.required && schema.required.length > 0 && { required: schema.required }), | ||
| }, | ||
| } | ||
| }), | ||
| } | ||
|
|
||
| return NextResponse.json(createResponse(id, result)) | ||
| } catch (error) { | ||
| logger.error('Error listing tools:', error) | ||
| return NextResponse.json(createError(id, ErrorCode.InternalError, 'Failed to list tools'), { | ||
| status: 500, | ||
| }) | ||
| } | ||
| } | ||
|
|
||
| async function handleToolsCall( | ||
| id: RequestId, | ||
| serverId: string, | ||
| params: { name: string; arguments?: Record<string, unknown> } | undefined, | ||
| apiKey?: string | null | ||
| ): Promise<NextResponse> { | ||
| try { | ||
| if (!params?.name) { | ||
| return NextResponse.json(createError(id, ErrorCode.InvalidParams, 'Tool name required'), { | ||
| status: 400, | ||
| }) | ||
| } | ||
|
|
||
| const tools = await db | ||
| .select({ | ||
| toolName: workflowMcpTool.toolName, | ||
| workflowId: workflowMcpTool.workflowId, | ||
| }) | ||
| .from(workflowMcpTool) | ||
| .where(eq(workflowMcpTool.serverId, serverId)) | ||
|
|
||
| const tool = tools.find((t) => t.toolName === params.name) | ||
| if (!tool) { | ||
| return NextResponse.json( | ||
| createError(id, ErrorCode.InvalidParams, `Tool not found: ${params.name}`), | ||
| { | ||
| status: 404, | ||
| } | ||
| ) | ||
| } | ||
|
|
||
| const [wf] = await db | ||
| .select({ isDeployed: workflow.isDeployed }) | ||
| .from(workflow) | ||
| .where(eq(workflow.id, tool.workflowId)) | ||
| .limit(1) | ||
|
|
||
| if (!wf?.isDeployed) { | ||
| return NextResponse.json( | ||
| createError(id, ErrorCode.InternalError, 'Workflow is not deployed'), | ||
| { | ||
| status: 400, | ||
| } | ||
| ) | ||
| } | ||
|
|
||
| const executeUrl = `${getBaseUrl()}/api/workflows/${tool.workflowId}/execute` | ||
| const headers: Record<string, string> = { 'Content-Type': 'application/json' } | ||
| if (apiKey) headers['X-API-Key'] = apiKey | ||
|
|
||
| logger.info(`Executing workflow ${tool.workflowId} via MCP tool ${params.name}`) | ||
|
|
||
| const response = await fetch(executeUrl, { | ||
| method: 'POST', | ||
| headers, | ||
| body: JSON.stringify({ input: params.arguments || {}, triggerType: 'mcp' }), | ||
| }) | ||
icecrasher321 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| const executeResult = await response.json() | ||
|
|
||
| if (!response.ok) { | ||
| return NextResponse.json( | ||
| createError( | ||
| id, | ||
| ErrorCode.InternalError, | ||
| executeResult.error || 'Workflow execution failed' | ||
| ), | ||
| { status: 500 } | ||
| ) | ||
| } | ||
|
|
||
| const result: CallToolResult = { | ||
| content: [ | ||
| { type: 'text', text: JSON.stringify(executeResult.output || executeResult, null, 2) }, | ||
| ], | ||
| isError: !executeResult.success, | ||
| } | ||
|
|
||
| return NextResponse.json(createResponse(id, result)) | ||
| } catch (error) { | ||
| logger.error('Error calling tool:', error) | ||
| return NextResponse.json(createError(id, ErrorCode.InternalError, 'Tool execution failed'), { | ||
| status: 500, | ||
| }) | ||
| } | ||
| } | ||
|
|
||
| export async function DELETE(request: NextRequest, { params }: { params: Promise<RouteParams> }) { | ||
| const { serverId } = await params | ||
|
|
||
| try { | ||
| const server = await getServer(serverId) | ||
| if (!server) { | ||
| return NextResponse.json({ error: 'Server not found' }, { status: 404 }) | ||
| } | ||
|
|
||
| const auth = await checkHybridAuth(request, { requireWorkflowId: false }) | ||
| if (!auth.success || !auth.userId) { | ||
| return NextResponse.json({ error: 'Unauthorized' }, { status: 401 }) | ||
| } | ||
|
|
||
| logger.info(`MCP session terminated for server ${serverId}`) | ||
| return new NextResponse(null, { status: 204 }) | ||
| } catch (error) { | ||
| logger.error('Error handling MCP DELETE request:', error) | ||
| return NextResponse.json({ error: 'Internal server error' }, { status: 500 }) | ||
| } | ||
| } | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.