diff --git a/apps/docs/components/icons.tsx b/apps/docs/components/icons.tsx index de0ab92021..6295969a0b 100644 --- a/apps/docs/components/icons.tsx +++ b/apps/docs/components/icons.tsx @@ -4061,6 +4061,31 @@ export function McpIcon(props: SVGProps) { ) } +export function A2AIcon(props: SVGProps) { + return ( + + + + + + + + + + ) +} + export function WordpressIcon(props: SVGProps) { return ( diff --git a/apps/docs/components/ui/icon-mapping.ts b/apps/docs/components/ui/icon-mapping.ts index 60d0351e57..77cff00caa 100644 --- a/apps/docs/components/ui/icon-mapping.ts +++ b/apps/docs/components/ui/icon-mapping.ts @@ -4,6 +4,7 @@ import type { ComponentType, SVGProps } from 'react' import { + A2AIcon, AhrefsIcon, AirtableIcon, ApifyIcon, @@ -126,6 +127,7 @@ import { type IconComponent = ComponentType> export const blockTypeToIconMap: Record = { + a2a: A2AIcon, ahrefs: AhrefsIcon, airtable: AirtableIcon, apify: ApifyIcon, diff --git a/apps/docs/content/docs/en/tools/a2a.mdx b/apps/docs/content/docs/en/tools/a2a.mdx new file mode 100644 index 0000000000..558f1f907e --- /dev/null +++ b/apps/docs/content/docs/en/tools/a2a.mdx @@ -0,0 +1,215 @@ +--- +title: A2A +description: Interact with external A2A-compatible agents +--- + +import { BlockInfoCard } from "@/components/ui/block-info-card" + + + +{/* MANUAL-CONTENT-START:intro */} +The A2A (Agent-to-Agent) protocol enables Sim to interact with external AI agents and systems that implement A2A-compatible APIs. With A2A, you can connect Sim’s automations and workflows to remote agents—such as LLM-powered bots, microservices, and other AI-based tools—using a standardized messaging format. + +Using the A2A tools in Sim, you can: + +- **Send Messages to External Agents**: Communicate directly with remote agents, providing prompts, commands, or data. +- **Receive and Stream Responses**: Get structured responses, artifacts, or real-time updates from the agent as the task progresses. +- **Continue Conversations or Tasks**: Carry on multi-turn conversations or workflows by referencing task and context IDs. +- **Integrate Third-Party AI and Automation**: Leverage external A2A-compatible services as part of your Sim workflows. + +These features allow you to build advanced workflows that combine Sim’s native capabilities with the intelligence and automation of external AIs or custom agents. To use A2A integrations, you’ll need the external agent’s endpoint URL and, if required, an API key or credentials. +{/* MANUAL-CONTENT-END */} + + +## Usage Instructions + +Use the A2A (Agent-to-Agent) protocol to interact with external AI agents. + + + +## Tools + +### `a2a_send_message` + +Send a message to an external A2A-compatible agent. + +#### Input + +| Parameter | Type | Required | Description | +| --------- | ---- | -------- | ----------- | +| `agentUrl` | string | Yes | The A2A agent endpoint URL | +| `message` | string | Yes | Message to send to the agent | +| `taskId` | string | No | Task ID for continuing an existing task | +| `contextId` | string | No | Context ID for conversation continuity | +| `apiKey` | string | No | API key for authentication | + +#### Output + +| Parameter | Type | Description | +| --------- | ---- | ----------- | +| `content` | string | The text response from the agent | +| `taskId` | string | Task ID for follow-up interactions | +| `contextId` | string | Context ID for conversation continuity | +| `state` | string | Task state | +| `artifacts` | array | Structured output artifacts | +| `history` | array | Full message history | + +### `a2a_get_task` + +Query the status of an existing A2A task. + +#### Input + +| Parameter | Type | Required | Description | +| --------- | ---- | -------- | ----------- | +| `agentUrl` | string | Yes | The A2A agent endpoint URL | +| `taskId` | string | Yes | Task ID to query | +| `apiKey` | string | No | API key for authentication | +| `historyLength` | number | No | Number of history messages to include | + +#### Output + +| Parameter | Type | Description | +| --------- | ---- | ----------- | +| `taskId` | string | Task ID | +| `contextId` | string | Context ID | +| `state` | string | Task state | +| `artifacts` | array | Output artifacts | +| `history` | array | Message history | + +### `a2a_cancel_task` + +Cancel a running A2A task. + +#### Input + +| Parameter | Type | Required | Description | +| --------- | ---- | -------- | ----------- | +| `agentUrl` | string | Yes | The A2A agent endpoint URL | +| `taskId` | string | Yes | Task ID to cancel | +| `apiKey` | string | No | API key for authentication | + +#### Output + +| Parameter | Type | Description | +| --------- | ---- | ----------- | +| `cancelled` | boolean | Whether cancellation was successful | +| `state` | string | Task state after cancellation | + +### `a2a_get_agent_card` + +Fetch the Agent Card (discovery document) for an A2A agent. + +#### Input + +| Parameter | Type | Required | Description | +| --------- | ---- | -------- | ----------- | +| `agentUrl` | string | Yes | The A2A agent endpoint URL | +| `apiKey` | string | No | API key for authentication \(if required\) | + +#### Output + +| Parameter | Type | Description | +| --------- | ---- | ----------- | +| `name` | string | Agent name | +| `description` | string | Agent description | +| `url` | string | Agent endpoint URL | +| `version` | string | Agent version | +| `capabilities` | object | Agent capabilities \(streaming, pushNotifications, etc.\) | +| `skills` | array | Skills the agent can perform | +| `defaultInputModes` | array | Default input modes \(text, file, data\) | +| `defaultOutputModes` | array | Default output modes \(text, file, data\) | + +### `a2a_resubscribe` + +Reconnect to an ongoing A2A task stream after connection interruption. + +#### Input + +| Parameter | Type | Required | Description | +| --------- | ---- | -------- | ----------- | +| `agentUrl` | string | Yes | The A2A agent endpoint URL | +| `taskId` | string | Yes | Task ID to resubscribe to | +| `apiKey` | string | No | API key for authentication | + +#### Output + +| Parameter | Type | Description | +| --------- | ---- | ----------- | +| `taskId` | string | Task ID | +| `contextId` | string | Context ID | +| `state` | string | Current task state | +| `isRunning` | boolean | Whether the task is still running | +| `artifacts` | array | Output artifacts | +| `history` | array | Message history | + +### `a2a_set_push_notification` + +Configure a webhook to receive task update notifications. + +#### Input + +| Parameter | Type | Required | Description | +| --------- | ---- | -------- | ----------- | +| `agentUrl` | string | Yes | The A2A agent endpoint URL | +| `taskId` | string | Yes | Task ID to configure notifications for | +| `webhookUrl` | string | Yes | HTTPS webhook URL to receive notifications | +| `token` | string | No | Token for webhook validation | +| `apiKey` | string | No | API key for authentication | + +#### Output + +| Parameter | Type | Description | +| --------- | ---- | ----------- | +| `url` | string | Configured webhook URL | +| `token` | string | Token for webhook validation | +| `success` | boolean | Whether configuration was successful | + +### `a2a_get_push_notification` + +Get the push notification webhook configuration for a task. + +#### Input + +| Parameter | Type | Required | Description | +| --------- | ---- | -------- | ----------- | +| `agentUrl` | string | Yes | The A2A agent endpoint URL | +| `taskId` | string | Yes | Task ID to get notification config for | +| `apiKey` | string | No | API key for authentication | + +#### Output + +| Parameter | Type | Description | +| --------- | ---- | ----------- | +| `url` | string | Configured webhook URL | +| `token` | string | Token for webhook validation | +| `exists` | boolean | Whether a push notification config exists | + +### `a2a_delete_push_notification` + +Delete the push notification webhook configuration for a task. + +#### Input + +| Parameter | Type | Required | Description | +| --------- | ---- | -------- | ----------- | +| `agentUrl` | string | Yes | The A2A agent endpoint URL | +| `taskId` | string | Yes | Task ID to delete notification config for | +| `pushNotificationConfigId` | string | No | Push notification configuration ID to delete \(optional - server can derive from taskId\) | +| `apiKey` | string | No | API key for authentication | + +#### Output + +| Parameter | Type | Description | +| --------- | ---- | ----------- | +| `success` | boolean | Whether deletion was successful | + + + +## Notes + +- Category: `tools` +- Type: `a2a` diff --git a/apps/docs/content/docs/en/tools/meta.json b/apps/docs/content/docs/en/tools/meta.json index 8a2bca881a..4971aa738c 100644 --- a/apps/docs/content/docs/en/tools/meta.json +++ b/apps/docs/content/docs/en/tools/meta.json @@ -1,6 +1,7 @@ { "pages": [ "index", + "a2a", "ahrefs", "airtable", "apify", diff --git a/apps/sim/app/api/a2a/agents/[agentId]/route.ts b/apps/sim/app/api/a2a/agents/[agentId]/route.ts new file mode 100644 index 0000000000..74c13af879 --- /dev/null +++ b/apps/sim/app/api/a2a/agents/[agentId]/route.ts @@ -0,0 +1,289 @@ +import { db } from '@sim/db' +import { a2aAgent, workflow } from '@sim/db/schema' +import { createLogger } from '@sim/logger' +import { eq } from 'drizzle-orm' +import { type NextRequest, NextResponse } from 'next/server' +import { generateAgentCard, generateSkillsFromWorkflow } from '@/lib/a2a/agent-card' +import type { AgentCapabilities, AgentSkill } from '@/lib/a2a/types' +import { checkHybridAuth } from '@/lib/auth/hybrid' +import { getRedisClient } from '@/lib/core/config/redis' +import { loadWorkflowFromNormalizedTables } from '@/lib/workflows/persistence/utils' + +const logger = createLogger('A2AAgentCardAPI') + +export const dynamic = 'force-dynamic' + +interface RouteParams { + agentId: string +} + +/** + * GET - Returns the Agent Card for discovery + */ +export async function GET(request: NextRequest, { params }: { params: Promise }) { + const { agentId } = await params + + try { + const [agent] = await db + .select({ + agent: a2aAgent, + workflow: workflow, + }) + .from(a2aAgent) + .innerJoin(workflow, eq(a2aAgent.workflowId, workflow.id)) + .where(eq(a2aAgent.id, agentId)) + .limit(1) + + if (!agent) { + return NextResponse.json({ error: 'Agent not found' }, { status: 404 }) + } + + if (!agent.agent.isPublished) { + const auth = await checkHybridAuth(request, { requireWorkflowId: false }) + if (!auth.success) { + return NextResponse.json({ error: 'Agent not published' }, { status: 404 }) + } + } + + const agentCard = generateAgentCard( + { + id: agent.agent.id, + name: agent.agent.name, + description: agent.agent.description, + version: agent.agent.version, + capabilities: agent.agent.capabilities as AgentCapabilities, + skills: agent.agent.skills as AgentSkill[], + }, + { + id: agent.workflow.id, + name: agent.workflow.name, + description: agent.workflow.description, + } + ) + + return NextResponse.json(agentCard, { + headers: { + 'Content-Type': 'application/json', + 'Cache-Control': agent.agent.isPublished ? 'public, max-age=3600' : 'private, no-cache', + }, + }) + } catch (error) { + logger.error('Error getting Agent Card:', error) + return NextResponse.json({ error: 'Internal server error' }, { status: 500 }) + } +} + +/** + * PUT - Update an agent + */ +export async function PUT(request: NextRequest, { params }: { params: Promise }) { + const { agentId } = await params + + try { + const auth = await checkHybridAuth(request, { requireWorkflowId: false }) + if (!auth.success || !auth.userId) { + return NextResponse.json({ error: 'Unauthorized' }, { status: 401 }) + } + + const [existingAgent] = await db + .select() + .from(a2aAgent) + .where(eq(a2aAgent.id, agentId)) + .limit(1) + + if (!existingAgent) { + return NextResponse.json({ error: 'Agent not found' }, { status: 404 }) + } + + const body = await request.json() + + if ( + body.skillTags !== undefined && + (!Array.isArray(body.skillTags) || + !body.skillTags.every((tag: unknown): tag is string => typeof tag === 'string')) + ) { + return NextResponse.json({ error: 'skillTags must be an array of strings' }, { status: 400 }) + } + + let skills = body.skills ?? existingAgent.skills + if (body.skillTags !== undefined) { + const agentName = body.name ?? existingAgent.name + const agentDescription = body.description ?? existingAgent.description + skills = generateSkillsFromWorkflow(agentName, agentDescription, body.skillTags) + } + + const [updatedAgent] = await db + .update(a2aAgent) + .set({ + name: body.name ?? existingAgent.name, + description: body.description ?? existingAgent.description, + version: body.version ?? existingAgent.version, + capabilities: body.capabilities ?? existingAgent.capabilities, + skills, + authentication: body.authentication ?? existingAgent.authentication, + isPublished: body.isPublished ?? existingAgent.isPublished, + publishedAt: + body.isPublished && !existingAgent.isPublished ? new Date() : existingAgent.publishedAt, + updatedAt: new Date(), + }) + .where(eq(a2aAgent.id, agentId)) + .returning() + + logger.info(`Updated A2A agent: ${agentId}`) + + return NextResponse.json({ success: true, agent: updatedAgent }) + } catch (error) { + logger.error('Error updating agent:', error) + return NextResponse.json({ error: 'Internal server error' }, { status: 500 }) + } +} + +/** + * DELETE - Delete an agent + */ +export async function DELETE(request: NextRequest, { params }: { params: Promise }) { + const { agentId } = await params + + try { + const auth = await checkHybridAuth(request, { requireWorkflowId: false }) + if (!auth.success || !auth.userId) { + return NextResponse.json({ error: 'Unauthorized' }, { status: 401 }) + } + + const [existingAgent] = await db + .select() + .from(a2aAgent) + .where(eq(a2aAgent.id, agentId)) + .limit(1) + + if (!existingAgent) { + return NextResponse.json({ error: 'Agent not found' }, { status: 404 }) + } + + await db.delete(a2aAgent).where(eq(a2aAgent.id, agentId)) + + logger.info(`Deleted A2A agent: ${agentId}`) + + return NextResponse.json({ success: true }) + } catch (error) { + logger.error('Error deleting agent:', error) + return NextResponse.json({ error: 'Internal server error' }, { status: 500 }) + } +} + +/** + * POST - Publish/unpublish an agent + */ +export async function POST(request: NextRequest, { params }: { params: Promise }) { + const { agentId } = await params + + try { + const auth = await checkHybridAuth(request, { requireWorkflowId: false }) + if (!auth.success || !auth.userId) { + logger.warn('A2A agent publish auth failed:', { error: auth.error, hasUserId: !!auth.userId }) + return NextResponse.json({ error: auth.error || 'Unauthorized' }, { status: 401 }) + } + + const [existingAgent] = await db + .select() + .from(a2aAgent) + .where(eq(a2aAgent.id, agentId)) + .limit(1) + + if (!existingAgent) { + return NextResponse.json({ error: 'Agent not found' }, { status: 404 }) + } + + const body = await request.json() + const action = body.action as 'publish' | 'unpublish' | 'refresh' + + if (action === 'publish') { + const [wf] = await db + .select({ isDeployed: workflow.isDeployed }) + .from(workflow) + .where(eq(workflow.id, existingAgent.workflowId)) + .limit(1) + + if (!wf?.isDeployed) { + return NextResponse.json( + { error: 'Workflow must be deployed before publishing agent' }, + { status: 400 } + ) + } + + await db + .update(a2aAgent) + .set({ + isPublished: true, + publishedAt: new Date(), + updatedAt: new Date(), + }) + .where(eq(a2aAgent.id, agentId)) + + const redis = getRedisClient() + if (redis) { + try { + await redis.del(`a2a:agent:${agentId}:card`) + } catch (err) { + logger.warn('Failed to invalidate agent card cache', { agentId, error: err }) + } + } + + logger.info(`Published A2A agent: ${agentId}`) + return NextResponse.json({ success: true, isPublished: true }) + } + + if (action === 'unpublish') { + await db + .update(a2aAgent) + .set({ + isPublished: false, + updatedAt: new Date(), + }) + .where(eq(a2aAgent.id, agentId)) + + const redis = getRedisClient() + if (redis) { + try { + await redis.del(`a2a:agent:${agentId}:card`) + } catch (err) { + logger.warn('Failed to invalidate agent card cache', { agentId, error: err }) + } + } + + logger.info(`Unpublished A2A agent: ${agentId}`) + return NextResponse.json({ success: true, isPublished: false }) + } + + if (action === 'refresh') { + const workflowData = await loadWorkflowFromNormalizedTables(existingAgent.workflowId) + if (!workflowData) { + return NextResponse.json({ error: 'Failed to load workflow' }, { status: 500 }) + } + + const [wf] = await db + .select({ name: workflow.name, description: workflow.description }) + .from(workflow) + .where(eq(workflow.id, existingAgent.workflowId)) + .limit(1) + + const skills = generateSkillsFromWorkflow(wf?.name || existingAgent.name, wf?.description) + + await db + .update(a2aAgent) + .set({ + skills, + updatedAt: new Date(), + }) + .where(eq(a2aAgent.id, agentId)) + + logger.info(`Refreshed skills for A2A agent: ${agentId}`) + return NextResponse.json({ success: true, skills }) + } + + return NextResponse.json({ error: 'Invalid action' }, { status: 400 }) + } catch (error) { + logger.error('Error with agent action:', error) + return NextResponse.json({ error: 'Internal server error' }, { status: 500 }) + } +} diff --git a/apps/sim/app/api/a2a/agents/route.ts b/apps/sim/app/api/a2a/agents/route.ts new file mode 100644 index 0000000000..e4229ea1e4 --- /dev/null +++ b/apps/sim/app/api/a2a/agents/route.ts @@ -0,0 +1,186 @@ +/** + * A2A Agents List Endpoint + * + * List and create A2A agents for a workspace. + */ + +import { db } from '@sim/db' +import { a2aAgent, workflow } from '@sim/db/schema' +import { createLogger } from '@sim/logger' +import { and, eq, sql } from 'drizzle-orm' +import { type NextRequest, NextResponse } from 'next/server' +import { v4 as uuidv4 } from 'uuid' +import { generateSkillsFromWorkflow } from '@/lib/a2a/agent-card' +import { A2A_DEFAULT_CAPABILITIES } from '@/lib/a2a/constants' +import { sanitizeAgentName } from '@/lib/a2a/utils' +import { checkHybridAuth } from '@/lib/auth/hybrid' +import { loadWorkflowFromNormalizedTables } from '@/lib/workflows/persistence/utils' +import { hasValidStartBlockInState } from '@/lib/workflows/triggers/trigger-utils' +import { getWorkspaceById } from '@/lib/workspaces/permissions/utils' + +const logger = createLogger('A2AAgentsAPI') + +export const dynamic = 'force-dynamic' + +/** + * GET - List all A2A agents for a workspace + */ +export async function GET(request: NextRequest) { + try { + const auth = await checkHybridAuth(request, { requireWorkflowId: false }) + if (!auth.success || !auth.userId) { + return NextResponse.json({ error: 'Unauthorized' }, { status: 401 }) + } + + const { searchParams } = new URL(request.url) + const workspaceId = searchParams.get('workspaceId') + + if (!workspaceId) { + return NextResponse.json({ error: 'workspaceId is required' }, { status: 400 }) + } + + const ws = await getWorkspaceById(workspaceId) + if (!ws) { + return NextResponse.json({ error: 'Workspace not found' }, { status: 404 }) + } + + const agents = await db + .select({ + id: a2aAgent.id, + workspaceId: a2aAgent.workspaceId, + workflowId: a2aAgent.workflowId, + name: a2aAgent.name, + description: a2aAgent.description, + version: a2aAgent.version, + capabilities: a2aAgent.capabilities, + skills: a2aAgent.skills, + authentication: a2aAgent.authentication, + isPublished: a2aAgent.isPublished, + publishedAt: a2aAgent.publishedAt, + createdAt: a2aAgent.createdAt, + updatedAt: a2aAgent.updatedAt, + workflowName: workflow.name, + workflowDescription: workflow.description, + isDeployed: workflow.isDeployed, + taskCount: sql`( + SELECT COUNT(*)::int + FROM "a2a_task" + WHERE "a2a_task"."agent_id" = "a2a_agent"."id" + )`.as('task_count'), + }) + .from(a2aAgent) + .leftJoin(workflow, eq(a2aAgent.workflowId, workflow.id)) + .where(eq(a2aAgent.workspaceId, workspaceId)) + .orderBy(a2aAgent.createdAt) + + logger.info(`Listed ${agents.length} A2A agents for workspace ${workspaceId}`) + + return NextResponse.json({ success: true, agents }) + } catch (error) { + logger.error('Error listing agents:', error) + return NextResponse.json({ error: 'Internal server error' }, { status: 500 }) + } +} + +/** + * POST - Create a new A2A agent from a workflow + */ +export async function POST(request: NextRequest) { + try { + const auth = await checkHybridAuth(request, { requireWorkflowId: false }) + if (!auth.success || !auth.userId) { + return NextResponse.json({ error: 'Unauthorized' }, { status: 401 }) + } + + const body = await request.json() + const { workspaceId, workflowId, name, description, capabilities, authentication, skillTags } = + body + + if (!workspaceId || !workflowId) { + return NextResponse.json( + { error: 'workspaceId and workflowId are required' }, + { status: 400 } + ) + } + + const [wf] = await db + .select({ + id: workflow.id, + name: workflow.name, + description: workflow.description, + workspaceId: workflow.workspaceId, + isDeployed: workflow.isDeployed, + }) + .from(workflow) + .where(and(eq(workflow.id, workflowId), eq(workflow.workspaceId, workspaceId))) + .limit(1) + + if (!wf) { + return NextResponse.json( + { error: 'Workflow not found or does not belong to workspace' }, + { status: 404 } + ) + } + + const workflowData = await loadWorkflowFromNormalizedTables(workflowId) + if (!workflowData || !hasValidStartBlockInState(workflowData)) { + return NextResponse.json( + { error: 'Workflow must have a Start block to be exposed as an A2A agent' }, + { status: 400 } + ) + } + + const [existing] = await db + .select({ id: a2aAgent.id }) + .from(a2aAgent) + .where(and(eq(a2aAgent.workspaceId, workspaceId), eq(a2aAgent.workflowId, workflowId))) + .limit(1) + + if (existing) { + return NextResponse.json( + { error: 'An agent already exists for this workflow' }, + { status: 409 } + ) + } + + const skills = generateSkillsFromWorkflow( + name || wf.name, + description || wf.description, + skillTags + ) + + const agentId = uuidv4() + const agentName = name || sanitizeAgentName(wf.name) + + const [agent] = await db + .insert(a2aAgent) + .values({ + id: agentId, + workspaceId, + workflowId, + createdBy: auth.userId, + name: agentName, + description: description || wf.description, + version: '1.0.0', + capabilities: { + ...A2A_DEFAULT_CAPABILITIES, + ...capabilities, + }, + skills, + authentication: authentication || { + schemes: ['bearer', 'apiKey'], + }, + isPublished: false, + createdAt: new Date(), + updatedAt: new Date(), + }) + .returning() + + logger.info(`Created A2A agent ${agentId} for workflow ${workflowId}`) + + return NextResponse.json({ success: true, agent }, { status: 201 }) + } catch (error) { + logger.error('Error creating agent:', error) + return NextResponse.json({ error: 'Internal server error' }, { status: 500 }) + } +} diff --git a/apps/sim/app/api/a2a/serve/[agentId]/route.ts b/apps/sim/app/api/a2a/serve/[agentId]/route.ts new file mode 100644 index 0000000000..dfd1f4e8f2 --- /dev/null +++ b/apps/sim/app/api/a2a/serve/[agentId]/route.ts @@ -0,0 +1,1263 @@ +import type { Artifact, Message, PushNotificationConfig, TaskState } from '@a2a-js/sdk' +import { db } from '@sim/db' +import { a2aAgent, a2aPushNotificationConfig, a2aTask, workflow } from '@sim/db/schema' +import { createLogger } from '@sim/logger' +import { eq } from 'drizzle-orm' +import { type NextRequest, NextResponse } from 'next/server' +import { v4 as uuidv4 } from 'uuid' +import { A2A_DEFAULT_TIMEOUT, A2A_MAX_HISTORY_LENGTH } from '@/lib/a2a/constants' +import { notifyTaskStateChange } from '@/lib/a2a/push-notifications' +import { + createAgentMessage, + extractWorkflowInput, + isTerminalState, + parseWorkflowSSEChunk, +} from '@/lib/a2a/utils' +import { checkHybridAuth } from '@/lib/auth/hybrid' +import { getBrandConfig } from '@/lib/branding/branding' +import { acquireLock, getRedisClient, releaseLock } from '@/lib/core/config/redis' +import { SSE_HEADERS } from '@/lib/core/utils/sse' +import { getBaseUrl } from '@/lib/core/utils/urls' +import { markExecutionCancelled } from '@/lib/execution/cancellation' +import { + A2A_ERROR_CODES, + A2A_METHODS, + buildExecuteRequest, + buildTaskResponse, + createError, + createResponse, + extractAgentContent, + formatTaskResponse, + generateTaskId, + isJSONRPCRequest, + type MessageSendParams, + type PushNotificationSetParams, + type TaskIdParams, +} from '@/app/api/a2a/serve/[agentId]/utils' + +const logger = createLogger('A2AServeAPI') + +export const dynamic = 'force-dynamic' +export const runtime = 'nodejs' + +interface RouteParams { + agentId: string +} + +/** + * GET - Returns the Agent Card (discovery document) + */ +export async function GET(_request: NextRequest, { params }: { params: Promise }) { + const { agentId } = await params + + const redis = getRedisClient() + const cacheKey = `a2a:agent:${agentId}:card` + + if (redis) { + try { + const cached = await redis.get(cacheKey) + if (cached) { + return NextResponse.json(JSON.parse(cached), { + headers: { + 'Content-Type': 'application/json', + 'Cache-Control': 'private, max-age=60', + 'X-Cache': 'HIT', + }, + }) + } + } catch (err) { + logger.warn('Redis cache read failed', { agentId, error: err }) + } + } + + try { + const [agent] = await db + .select({ + id: a2aAgent.id, + name: a2aAgent.name, + description: a2aAgent.description, + version: a2aAgent.version, + capabilities: a2aAgent.capabilities, + skills: a2aAgent.skills, + authentication: a2aAgent.authentication, + isPublished: a2aAgent.isPublished, + }) + .from(a2aAgent) + .where(eq(a2aAgent.id, agentId)) + .limit(1) + + if (!agent) { + return NextResponse.json({ error: 'Agent not found' }, { status: 404 }) + } + + if (!agent.isPublished) { + return NextResponse.json({ error: 'Agent not published' }, { status: 404 }) + } + + const baseUrl = getBaseUrl() + const brandConfig = getBrandConfig() + + const authConfig = agent.authentication as { schemes?: string[] } | undefined + const schemes = authConfig?.schemes || [] + const isPublic = schemes.includes('none') + + const agentCard = { + protocolVersion: '0.3.0', + name: agent.name, + description: agent.description || '', + url: `${baseUrl}/api/a2a/serve/${agent.id}`, + version: agent.version, + preferredTransport: 'JSONRPC', + documentationUrl: `${baseUrl}/docs/a2a`, + provider: { + organization: brandConfig.name, + url: baseUrl, + }, + capabilities: agent.capabilities, + skills: agent.skills || [], + ...(isPublic + ? {} + : { + securitySchemes: { + apiKey: { + type: 'apiKey' as const, + name: 'X-API-Key', + in: 'header' as const, + description: 'API key authentication', + }, + }, + security: [{ apiKey: [] }], + }), + defaultInputModes: ['text/plain', 'application/json'], + defaultOutputModes: ['text/plain', 'application/json'], + } + + if (redis) { + try { + await redis.set(cacheKey, JSON.stringify(agentCard), 'EX', 60) + } catch (err) { + logger.warn('Redis cache write failed', { agentId, error: err }) + } + } + + return NextResponse.json(agentCard, { + headers: { + 'Content-Type': 'application/json', + 'Cache-Control': 'private, max-age=60', + 'X-Cache': 'MISS', + }, + }) + } catch (error) { + logger.error('Error getting Agent Card:', error) + return NextResponse.json({ error: 'Internal server error' }, { status: 500 }) + } +} + +/** + * POST - Handle JSON-RPC requests + */ +export async function POST(request: NextRequest, { params }: { params: Promise }) { + const { agentId } = await params + + try { + const [agent] = await db + .select({ + id: a2aAgent.id, + name: a2aAgent.name, + workflowId: a2aAgent.workflowId, + workspaceId: a2aAgent.workspaceId, + isPublished: a2aAgent.isPublished, + capabilities: a2aAgent.capabilities, + authentication: a2aAgent.authentication, + }) + .from(a2aAgent) + .where(eq(a2aAgent.id, agentId)) + .limit(1) + + if (!agent) { + return NextResponse.json( + createError(null, A2A_ERROR_CODES.AGENT_UNAVAILABLE, 'Agent not found'), + { status: 404 } + ) + } + + if (!agent.isPublished) { + return NextResponse.json( + createError(null, A2A_ERROR_CODES.AGENT_UNAVAILABLE, 'Agent not published'), + { status: 404 } + ) + } + + const authSchemes = (agent.authentication as { schemes?: string[] })?.schemes || [] + const requiresAuth = !authSchemes.includes('none') + + if (requiresAuth) { + const auth = await checkHybridAuth(request, { requireWorkflowId: false }) + if (!auth.success || !auth.userId) { + return NextResponse.json( + createError(null, A2A_ERROR_CODES.AUTHENTICATION_REQUIRED, 'Unauthorized'), + { status: 401 } + ) + } + } + + const [wf] = await db + .select({ isDeployed: workflow.isDeployed }) + .from(workflow) + .where(eq(workflow.id, agent.workflowId)) + .limit(1) + + if (!wf?.isDeployed) { + return NextResponse.json( + createError(null, A2A_ERROR_CODES.AGENT_UNAVAILABLE, 'Workflow is not deployed'), + { status: 400 } + ) + } + + const body = await request.json() + + if (!isJSONRPCRequest(body)) { + return NextResponse.json( + createError(null, A2A_ERROR_CODES.INVALID_REQUEST, 'Invalid JSON-RPC request'), + { status: 400 } + ) + } + + const { id, method, params: rpcParams } = body + const apiKey = request.headers.get('X-API-Key') + + logger.info(`A2A request: ${method} for agent ${agentId}`) + + switch (method) { + case A2A_METHODS.MESSAGE_SEND: + return handleMessageSend(id, agent, rpcParams as MessageSendParams, apiKey) + + case A2A_METHODS.MESSAGE_STREAM: + return handleMessageStream(request, id, agent, rpcParams as MessageSendParams, apiKey) + + case A2A_METHODS.TASKS_GET: + return handleTaskGet(id, rpcParams as TaskIdParams) + + case A2A_METHODS.TASKS_CANCEL: + return handleTaskCancel(id, rpcParams as TaskIdParams) + + case A2A_METHODS.TASKS_RESUBSCRIBE: + return handleTaskResubscribe(request, id, rpcParams as TaskIdParams) + + case A2A_METHODS.PUSH_NOTIFICATION_SET: + return handlePushNotificationSet(id, rpcParams as PushNotificationSetParams) + + case A2A_METHODS.PUSH_NOTIFICATION_GET: + return handlePushNotificationGet(id, rpcParams as TaskIdParams) + + case A2A_METHODS.PUSH_NOTIFICATION_DELETE: + return handlePushNotificationDelete(id, rpcParams as TaskIdParams) + + default: + return NextResponse.json( + createError(id, A2A_ERROR_CODES.METHOD_NOT_FOUND, `Method not found: ${method}`), + { status: 404 } + ) + } + } catch (error) { + logger.error('Error handling A2A request:', error) + return NextResponse.json(createError(null, A2A_ERROR_CODES.INTERNAL_ERROR, 'Internal error'), { + status: 500, + }) + } +} + +/** + * Handle message/send - Send a message (v0.3) + */ +async function handleMessageSend( + id: string | number, + agent: { + id: string + name: string + workflowId: string + workspaceId: string + }, + params: MessageSendParams, + apiKey?: string | null +): Promise { + if (!params?.message) { + return NextResponse.json( + createError(id, A2A_ERROR_CODES.INVALID_PARAMS, 'Message is required'), + { status: 400 } + ) + } + + const message = params.message + const taskId = message.taskId || generateTaskId() + const contextId = message.contextId || uuidv4() + + // Distributed lock to prevent concurrent task processing + const lockKey = `a2a:task:${taskId}:lock` + const lockValue = uuidv4() + const acquired = await acquireLock(lockKey, lockValue, 60) + + if (!acquired) { + return NextResponse.json( + createError(id, A2A_ERROR_CODES.INTERNAL_ERROR, 'Task is currently being processed'), + { status: 409 } + ) + } + + try { + let existingTask: typeof a2aTask.$inferSelect | null = null + if (message.taskId) { + const [found] = await db.select().from(a2aTask).where(eq(a2aTask.id, message.taskId)).limit(1) + existingTask = found || null + + if (!existingTask) { + return NextResponse.json( + createError(id, A2A_ERROR_CODES.TASK_NOT_FOUND, 'Task not found'), + { status: 404 } + ) + } + + if (isTerminalState(existingTask.status as TaskState)) { + return NextResponse.json( + createError(id, A2A_ERROR_CODES.TASK_ALREADY_COMPLETE, 'Task already in terminal state'), + { status: 400 } + ) + } + } + + const history: Message[] = existingTask?.messages ? (existingTask.messages as Message[]) : [] + + history.push(message) + + if (history.length > A2A_MAX_HISTORY_LENGTH) { + history.splice(0, history.length - A2A_MAX_HISTORY_LENGTH) + } + + if (existingTask) { + await db + .update(a2aTask) + .set({ + status: 'working', + messages: history, + updatedAt: new Date(), + }) + .where(eq(a2aTask.id, taskId)) + } else { + await db.insert(a2aTask).values({ + id: taskId, + agentId: agent.id, + sessionId: contextId || null, + status: 'working', + messages: history, + metadata: {}, + createdAt: new Date(), + updatedAt: new Date(), + }) + } + + const { + url: executeUrl, + headers, + useInternalAuth, + } = await buildExecuteRequest({ + workflowId: agent.workflowId, + apiKey, + }) + + logger.info(`Executing workflow ${agent.workflowId} for A2A task ${taskId}`) + + try { + const workflowInput = extractWorkflowInput(message) + if (!workflowInput) { + return NextResponse.json( + createError( + id, + A2A_ERROR_CODES.INVALID_PARAMS, + 'Message must contain at least one part with content' + ), + { status: 400 } + ) + } + + const response = await fetch(executeUrl, { + method: 'POST', + headers, + body: JSON.stringify({ + ...workflowInput, + triggerType: 'api', + ...(useInternalAuth && { workflowId: agent.workflowId }), + }), + signal: AbortSignal.timeout(A2A_DEFAULT_TIMEOUT), + }) + + const executeResult = await response.json() + + const finalState: TaskState = response.ok ? 'completed' : 'failed' + + const agentContent = extractAgentContent(executeResult) + const agentMessage = createAgentMessage(agentContent) + agentMessage.taskId = taskId + if (contextId) agentMessage.contextId = contextId + history.push(agentMessage) + + const artifacts = executeResult.output?.artifacts || [] + + await db + .update(a2aTask) + .set({ + status: finalState, + messages: history, + artifacts, + executionId: executeResult.metadata?.executionId, + completedAt: new Date(), + updatedAt: new Date(), + }) + .where(eq(a2aTask.id, taskId)) + + if (isTerminalState(finalState)) { + notifyTaskStateChange(taskId, finalState).catch((err) => { + logger.error('Failed to trigger push notification', { taskId, error: err }) + }) + } + + const task = buildTaskResponse({ + taskId, + contextId, + state: finalState, + history, + artifacts, + }) + + return NextResponse.json(createResponse(id, task)) + } catch (error) { + const isTimeout = error instanceof Error && error.name === 'TimeoutError' + logger.error(`Error executing workflow for task ${taskId}:`, { error, isTimeout }) + + const errorMessage = isTimeout + ? `Workflow execution timed out after ${A2A_DEFAULT_TIMEOUT}ms` + : error instanceof Error + ? error.message + : 'Workflow execution failed' + + await db + .update(a2aTask) + .set({ + status: 'failed', + updatedAt: new Date(), + completedAt: new Date(), + }) + .where(eq(a2aTask.id, taskId)) + + notifyTaskStateChange(taskId, 'failed').catch((err) => { + logger.error('Failed to trigger push notification for failure', { taskId, error: err }) + }) + + return NextResponse.json(createError(id, A2A_ERROR_CODES.INTERNAL_ERROR, errorMessage), { + status: 500, + }) + } + } finally { + await releaseLock(lockKey, lockValue) + } +} + +/** + * Handle message/stream - Stream a message response (v0.3) + */ +async function handleMessageStream( + _request: NextRequest, + id: string | number, + agent: { + id: string + name: string + workflowId: string + workspaceId: string + }, + params: MessageSendParams, + apiKey?: string | null +): Promise { + if (!params?.message) { + return NextResponse.json( + createError(id, A2A_ERROR_CODES.INVALID_PARAMS, 'Message is required'), + { status: 400 } + ) + } + + const message = params.message + const contextId = message.contextId || uuidv4() + const taskId = message.taskId || generateTaskId() + + // Distributed lock to prevent concurrent task processing + const lockKey = `a2a:task:${taskId}:lock` + const lockValue = uuidv4() + const acquired = await acquireLock(lockKey, lockValue, 300) + + if (!acquired) { + const encoder = new TextEncoder() + const errorStream = new ReadableStream({ + start(controller) { + controller.enqueue( + encoder.encode( + `event: error\ndata: ${JSON.stringify({ code: A2A_ERROR_CODES.INTERNAL_ERROR, message: 'Task is currently being processed' })}\n\n` + ) + ) + controller.close() + }, + }) + return new NextResponse(errorStream, { headers: SSE_HEADERS }) + } + + let history: Message[] = [] + let existingTask: typeof a2aTask.$inferSelect | null = null + + if (message.taskId) { + const [found] = await db.select().from(a2aTask).where(eq(a2aTask.id, message.taskId)).limit(1) + existingTask = found || null + + if (!existingTask) { + await releaseLock(lockKey, lockValue) + return NextResponse.json(createError(id, A2A_ERROR_CODES.TASK_NOT_FOUND, 'Task not found'), { + status: 404, + }) + } + + if (isTerminalState(existingTask.status as TaskState)) { + await releaseLock(lockKey, lockValue) + return NextResponse.json( + createError(id, A2A_ERROR_CODES.TASK_ALREADY_COMPLETE, 'Task already in terminal state'), + { status: 400 } + ) + } + + history = existingTask.messages as Message[] + } + + history.push(message) + + if (history.length > A2A_MAX_HISTORY_LENGTH) { + history.splice(0, history.length - A2A_MAX_HISTORY_LENGTH) + } + + if (existingTask) { + await db + .update(a2aTask) + .set({ + status: 'working', + messages: history, + updatedAt: new Date(), + }) + .where(eq(a2aTask.id, taskId)) + } else { + await db.insert(a2aTask).values({ + id: taskId, + agentId: agent.id, + sessionId: contextId || null, + status: 'working', + messages: history, + metadata: {}, + createdAt: new Date(), + updatedAt: new Date(), + }) + } + + const encoder = new TextEncoder() + + const stream = new ReadableStream({ + async start(controller) { + const sendEvent = (event: string, data: unknown) => { + try { + const jsonRpcResponse = { + jsonrpc: '2.0' as const, + id, + result: data, + } + controller.enqueue( + encoder.encode(`event: ${event}\ndata: ${JSON.stringify(jsonRpcResponse)}\n\n`) + ) + } catch (error) { + logger.error('Error sending SSE event:', error) + } + } + + sendEvent('status', { + kind: 'status', + taskId, + contextId, + status: { state: 'working', timestamp: new Date().toISOString() }, + }) + + try { + const { + url: executeUrl, + headers, + useInternalAuth, + } = await buildExecuteRequest({ + workflowId: agent.workflowId, + apiKey, + stream: true, + }) + + const workflowInput = extractWorkflowInput(message) + if (!workflowInput) { + sendEvent('error', { + code: A2A_ERROR_CODES.INVALID_PARAMS, + message: 'Message must contain at least one part with content', + }) + await releaseLock(lockKey, lockValue) + controller.close() + return + } + + const response = await fetch(executeUrl, { + method: 'POST', + headers, + body: JSON.stringify({ + ...workflowInput, + triggerType: 'api', + stream: true, + ...(useInternalAuth && { workflowId: agent.workflowId }), + }), + signal: AbortSignal.timeout(A2A_DEFAULT_TIMEOUT), + }) + + if (!response.ok) { + let errorMessage = 'Workflow execution failed' + try { + const errorResult = await response.json() + errorMessage = errorResult.error || errorMessage + } catch { + // Response may not be JSON + } + throw new Error(errorMessage) + } + + const contentType = response.headers.get('content-type') || '' + const isStreamingResponse = + contentType.includes('text/event-stream') || contentType.includes('text/plain') + + if (response.body && isStreamingResponse) { + const reader = response.body.getReader() + const decoder = new TextDecoder() + let accumulatedContent = '' + let finalContent: string | undefined + + while (true) { + const { done, value } = await reader.read() + if (done) break + + const rawChunk = decoder.decode(value, { stream: true }) + const parsed = parseWorkflowSSEChunk(rawChunk) + + if (parsed.content) { + accumulatedContent += parsed.content + sendEvent('message', { + kind: 'message', + taskId, + contextId, + role: 'agent', + parts: [{ kind: 'text', text: parsed.content }], + final: false, + }) + } + + if (parsed.finalContent) { + finalContent = parsed.finalContent + } + } + + const messageContent = + (finalContent !== undefined && finalContent.length > 0 + ? finalContent + : accumulatedContent) || 'Task completed' + const agentMessage = createAgentMessage(messageContent) + agentMessage.taskId = taskId + if (contextId) agentMessage.contextId = contextId + history.push(agentMessage) + + await db + .update(a2aTask) + .set({ + status: 'completed', + messages: history, + completedAt: new Date(), + updatedAt: new Date(), + }) + .where(eq(a2aTask.id, taskId)) + + notifyTaskStateChange(taskId, 'completed').catch((err) => { + logger.error('Failed to trigger push notification', { taskId, error: err }) + }) + + sendEvent('task', { + kind: 'task', + id: taskId, + contextId, + status: { state: 'completed', timestamp: new Date().toISOString() }, + history, + artifacts: [], + }) + } else { + const result = await response.json() + + const content = extractAgentContent(result) + + sendEvent('message', { + kind: 'message', + taskId, + contextId, + role: 'agent', + parts: [{ kind: 'text', text: content }], + final: true, + }) + + const agentMessage = createAgentMessage(content) + agentMessage.taskId = taskId + if (contextId) agentMessage.contextId = contextId + history.push(agentMessage) + + const artifacts = (result.output?.artifacts as Artifact[]) || [] + + await db + .update(a2aTask) + .set({ + status: 'completed', + messages: history, + artifacts, + executionId: result.metadata?.executionId, + completedAt: new Date(), + updatedAt: new Date(), + }) + .where(eq(a2aTask.id, taskId)) + + notifyTaskStateChange(taskId, 'completed').catch((err) => { + logger.error('Failed to trigger push notification', { taskId, error: err }) + }) + + sendEvent('task', { + kind: 'task', + id: taskId, + contextId, + status: { state: 'completed', timestamp: new Date().toISOString() }, + history, + artifacts, + }) + } + } catch (error) { + const isTimeout = error instanceof Error && error.name === 'TimeoutError' + logger.error(`Streaming error for task ${taskId}:`, { error, isTimeout }) + + const errorMessage = isTimeout + ? `Workflow execution timed out after ${A2A_DEFAULT_TIMEOUT}ms` + : error instanceof Error + ? error.message + : 'Streaming failed' + + await db + .update(a2aTask) + .set({ + status: 'failed', + completedAt: new Date(), + updatedAt: new Date(), + }) + .where(eq(a2aTask.id, taskId)) + + notifyTaskStateChange(taskId, 'failed').catch((err) => { + logger.error('Failed to trigger push notification for failure', { taskId, error: err }) + }) + + sendEvent('error', { + code: A2A_ERROR_CODES.INTERNAL_ERROR, + message: errorMessage, + }) + } finally { + await releaseLock(lockKey, lockValue) + controller.close() + } + }, + }) + + return new NextResponse(stream, { + headers: { + ...SSE_HEADERS, + 'X-Task-Id': taskId, + }, + }) +} + +/** + * Handle tasks/get - Query task status + */ +async function handleTaskGet(id: string | number, params: TaskIdParams): Promise { + if (!params?.id) { + return NextResponse.json( + createError(id, A2A_ERROR_CODES.INVALID_PARAMS, 'Task ID is required'), + { status: 400 } + ) + } + + const historyLength = + params.historyLength !== undefined && params.historyLength >= 0 + ? params.historyLength + : undefined + + const [task] = await db.select().from(a2aTask).where(eq(a2aTask.id, params.id)).limit(1) + + if (!task) { + return NextResponse.json(createError(id, A2A_ERROR_CODES.TASK_NOT_FOUND, 'Task not found'), { + status: 404, + }) + } + + const taskResponse = buildTaskResponse({ + taskId: task.id, + contextId: task.sessionId || task.id, + state: task.status as TaskState, + history: task.messages as Message[], + artifacts: (task.artifacts as Artifact[]) || [], + }) + + const result = formatTaskResponse(taskResponse, historyLength) + + return NextResponse.json(createResponse(id, result)) +} + +/** + * Handle tasks/cancel - Cancel a running task + */ +async function handleTaskCancel(id: string | number, params: TaskIdParams): Promise { + if (!params?.id) { + return NextResponse.json( + createError(id, A2A_ERROR_CODES.INVALID_PARAMS, 'Task ID is required'), + { status: 400 } + ) + } + + const [task] = await db.select().from(a2aTask).where(eq(a2aTask.id, params.id)).limit(1) + + if (!task) { + return NextResponse.json(createError(id, A2A_ERROR_CODES.TASK_NOT_FOUND, 'Task not found'), { + status: 404, + }) + } + + if (isTerminalState(task.status as TaskState)) { + return NextResponse.json( + createError(id, A2A_ERROR_CODES.TASK_ALREADY_COMPLETE, 'Task already in terminal state'), + { status: 400 } + ) + } + + if (task.executionId) { + try { + await markExecutionCancelled(task.executionId) + logger.info('Cancelled workflow execution', { + taskId: task.id, + executionId: task.executionId, + }) + } catch (error) { + logger.warn('Failed to cancel workflow execution', { + taskId: task.id, + executionId: task.executionId, + error, + }) + } + } + + await db + .update(a2aTask) + .set({ + status: 'canceled', + updatedAt: new Date(), + completedAt: new Date(), + }) + .where(eq(a2aTask.id, params.id)) + + notifyTaskStateChange(params.id, 'canceled').catch((err) => { + logger.error('Failed to trigger push notification for cancellation', { + taskId: params.id, + error: err, + }) + }) + + const canceledTask = buildTaskResponse({ + taskId: task.id, + contextId: task.sessionId || task.id, + state: 'canceled', + history: task.messages as Message[], + artifacts: (task.artifacts as Artifact[]) || [], + }) + + return NextResponse.json(createResponse(id, canceledTask)) +} + +/** + * Handle tasks/resubscribe - Reconnect to SSE stream for an ongoing task + */ +async function handleTaskResubscribe( + request: NextRequest, + id: string | number, + params: TaskIdParams +): Promise { + if (!params?.id) { + return NextResponse.json( + createError(id, A2A_ERROR_CODES.INVALID_PARAMS, 'Task ID is required'), + { status: 400 } + ) + } + + const [task] = await db.select().from(a2aTask).where(eq(a2aTask.id, params.id)).limit(1) + + if (!task) { + return NextResponse.json(createError(id, A2A_ERROR_CODES.TASK_NOT_FOUND, 'Task not found'), { + status: 404, + }) + } + + const encoder = new TextEncoder() + + if (isTerminalState(task.status as TaskState)) { + const completedTask = buildTaskResponse({ + taskId: task.id, + contextId: task.sessionId || task.id, + state: task.status as TaskState, + history: task.messages as Message[], + artifacts: (task.artifacts as Artifact[]) || [], + }) + const jsonRpcResponse = { jsonrpc: '2.0' as const, id, result: completedTask } + const sseData = `event: task\ndata: ${JSON.stringify(jsonRpcResponse)}\n\n` + const stream = new ReadableStream({ + start(controller) { + controller.enqueue(encoder.encode(sseData)) + controller.close() + }, + }) + return new NextResponse(stream, { headers: SSE_HEADERS }) + } + let isCancelled = false + let pollTimeoutId: ReturnType | null = null + + const abortSignal = request.signal + abortSignal.addEventListener('abort', () => { + isCancelled = true + if (pollTimeoutId) { + clearTimeout(pollTimeoutId) + pollTimeoutId = null + } + }) + + const stream = new ReadableStream({ + async start(controller) { + const sendEvent = (event: string, data: unknown): boolean => { + if (isCancelled || abortSignal.aborted) return false + try { + const jsonRpcResponse = { jsonrpc: '2.0' as const, id, result: data } + controller.enqueue( + encoder.encode(`event: ${event}\ndata: ${JSON.stringify(jsonRpcResponse)}\n\n`) + ) + return true + } catch (error) { + logger.error('Error sending SSE event:', error) + isCancelled = true + return false + } + } + + const cleanup = () => { + isCancelled = true + if (pollTimeoutId) { + clearTimeout(pollTimeoutId) + pollTimeoutId = null + } + } + + if ( + !sendEvent('status', { + kind: 'status', + taskId: task.id, + contextId: task.sessionId, + status: { state: task.status, timestamp: new Date().toISOString() }, + }) + ) { + cleanup() + return + } + + const pollInterval = 3000 // 3 seconds + const maxPolls = 100 // 5 minutes max + + let polls = 0 + const poll = async () => { + if (isCancelled || abortSignal.aborted) { + cleanup() + return + } + + polls++ + if (polls > maxPolls) { + cleanup() + try { + controller.close() + } catch { + // Already closed + } + return + } + + try { + const [updatedTask] = await db + .select() + .from(a2aTask) + .where(eq(a2aTask.id, params.id)) + .limit(1) + + if (isCancelled) { + cleanup() + return + } + + if (!updatedTask) { + sendEvent('error', { code: A2A_ERROR_CODES.TASK_NOT_FOUND, message: 'Task not found' }) + cleanup() + try { + controller.close() + } catch { + // Already closed + } + return + } + + if (updatedTask.status !== task.status) { + if ( + !sendEvent('status', { + kind: 'status', + taskId: updatedTask.id, + contextId: updatedTask.sessionId, + status: { state: updatedTask.status, timestamp: new Date().toISOString() }, + final: isTerminalState(updatedTask.status as TaskState), + }) + ) { + cleanup() + return + } + } + + if (isTerminalState(updatedTask.status as TaskState)) { + const messages = updatedTask.messages as Message[] + const lastMessage = messages[messages.length - 1] + if (lastMessage && lastMessage.role === 'agent') { + sendEvent('message', { + ...lastMessage, + taskId: updatedTask.id, + contextId: updatedTask.sessionId || updatedTask.id, + final: true, + }) + } + + cleanup() + try { + controller.close() + } catch { + // Already closed + } + return + } + + pollTimeoutId = setTimeout(poll, pollInterval) + } catch (error) { + logger.error('Error during SSE poll:', error) + sendEvent('error', { + code: A2A_ERROR_CODES.INTERNAL_ERROR, + message: error instanceof Error ? error.message : 'Polling failed', + }) + cleanup() + try { + controller.close() + } catch { + // Already closed + } + } + } + + poll() + }, + cancel() { + isCancelled = true + if (pollTimeoutId) { + clearTimeout(pollTimeoutId) + pollTimeoutId = null + } + }, + }) + + return new NextResponse(stream, { + headers: { + ...SSE_HEADERS, + 'X-Task-Id': params.id, + }, + }) +} + +/** + * Handle tasks/pushNotificationConfig/set - Set webhook for task updates + */ +async function handlePushNotificationSet( + id: string | number, + params: PushNotificationSetParams +): Promise { + if (!params?.id) { + return NextResponse.json( + createError(id, A2A_ERROR_CODES.INVALID_PARAMS, 'Task ID is required'), + { status: 400 } + ) + } + + if (!params?.pushNotificationConfig?.url) { + return NextResponse.json( + createError(id, A2A_ERROR_CODES.INVALID_PARAMS, 'Push notification URL is required'), + { status: 400 } + ) + } + + try { + const url = new URL(params.pushNotificationConfig.url) + if (url.protocol !== 'https:') { + return NextResponse.json( + createError(id, A2A_ERROR_CODES.INVALID_PARAMS, 'Push notification URL must use HTTPS'), + { status: 400 } + ) + } + } catch { + return NextResponse.json( + createError(id, A2A_ERROR_CODES.INVALID_PARAMS, 'Invalid push notification URL'), + { status: 400 } + ) + } + + const [task] = await db.select().from(a2aTask).where(eq(a2aTask.id, params.id)).limit(1) + + if (!task) { + return NextResponse.json(createError(id, A2A_ERROR_CODES.TASK_NOT_FOUND, 'Task not found'), { + status: 404, + }) + } + + const [existingConfig] = await db + .select() + .from(a2aPushNotificationConfig) + .where(eq(a2aPushNotificationConfig.taskId, params.id)) + .limit(1) + + const config = params.pushNotificationConfig + + if (existingConfig) { + await db + .update(a2aPushNotificationConfig) + .set({ + url: config.url, + token: config.token || null, + isActive: true, + updatedAt: new Date(), + }) + .where(eq(a2aPushNotificationConfig.id, existingConfig.id)) + } else { + await db.insert(a2aPushNotificationConfig).values({ + id: uuidv4(), + taskId: params.id, + url: config.url, + token: config.token || null, + isActive: true, + createdAt: new Date(), + updatedAt: new Date(), + }) + } + + const result: PushNotificationConfig = { + url: config.url, + token: config.token, + } + + return NextResponse.json(createResponse(id, result)) +} + +/** + * Handle tasks/pushNotificationConfig/get - Get webhook config for a task + */ +async function handlePushNotificationGet( + id: string | number, + params: TaskIdParams +): Promise { + if (!params?.id) { + return NextResponse.json( + createError(id, A2A_ERROR_CODES.INVALID_PARAMS, 'Task ID is required'), + { status: 400 } + ) + } + + const [task] = await db.select().from(a2aTask).where(eq(a2aTask.id, params.id)).limit(1) + + if (!task) { + return NextResponse.json(createError(id, A2A_ERROR_CODES.TASK_NOT_FOUND, 'Task not found'), { + status: 404, + }) + } + + const [config] = await db + .select() + .from(a2aPushNotificationConfig) + .where(eq(a2aPushNotificationConfig.taskId, params.id)) + .limit(1) + + if (!config) { + return NextResponse.json( + createError(id, A2A_ERROR_CODES.TASK_NOT_FOUND, 'Push notification config not found'), + { status: 404 } + ) + } + + const result: PushNotificationConfig = { + url: config.url, + token: config.token || undefined, + } + + return NextResponse.json(createResponse(id, result)) +} + +/** + * Handle tasks/pushNotificationConfig/delete - Delete webhook config for a task + */ +async function handlePushNotificationDelete( + id: string | number, + params: TaskIdParams +): Promise { + if (!params?.id) { + return NextResponse.json( + createError(id, A2A_ERROR_CODES.INVALID_PARAMS, 'Task ID is required'), + { status: 400 } + ) + } + + const [task] = await db.select().from(a2aTask).where(eq(a2aTask.id, params.id)).limit(1) + + if (!task) { + return NextResponse.json(createError(id, A2A_ERROR_CODES.TASK_NOT_FOUND, 'Task not found'), { + status: 404, + }) + } + + const [config] = await db + .select() + .from(a2aPushNotificationConfig) + .where(eq(a2aPushNotificationConfig.taskId, params.id)) + .limit(1) + + if (!config) { + return NextResponse.json( + createError(id, A2A_ERROR_CODES.TASK_NOT_FOUND, 'Push notification config not found'), + { status: 404 } + ) + } + + await db.delete(a2aPushNotificationConfig).where(eq(a2aPushNotificationConfig.id, config.id)) + + return NextResponse.json(createResponse(id, { success: true })) +} diff --git a/apps/sim/app/api/a2a/serve/[agentId]/utils.ts b/apps/sim/app/api/a2a/serve/[agentId]/utils.ts new file mode 100644 index 0000000000..f157d1efb3 --- /dev/null +++ b/apps/sim/app/api/a2a/serve/[agentId]/utils.ts @@ -0,0 +1,176 @@ +import type { Artifact, Message, PushNotificationConfig, Task, TaskState } from '@a2a-js/sdk' +import { v4 as uuidv4 } from 'uuid' +import { generateInternalToken } from '@/lib/auth/internal' +import { getBaseUrl } from '@/lib/core/utils/urls' + +/** A2A v0.3 JSON-RPC method names */ +export const A2A_METHODS = { + MESSAGE_SEND: 'message/send', + MESSAGE_STREAM: 'message/stream', + TASKS_GET: 'tasks/get', + TASKS_CANCEL: 'tasks/cancel', + TASKS_RESUBSCRIBE: 'tasks/resubscribe', + PUSH_NOTIFICATION_SET: 'tasks/pushNotificationConfig/set', + PUSH_NOTIFICATION_GET: 'tasks/pushNotificationConfig/get', + PUSH_NOTIFICATION_DELETE: 'tasks/pushNotificationConfig/delete', +} as const + +/** A2A v0.3 error codes */ +export const A2A_ERROR_CODES = { + PARSE_ERROR: -32700, + INVALID_REQUEST: -32600, + METHOD_NOT_FOUND: -32601, + INVALID_PARAMS: -32602, + INTERNAL_ERROR: -32603, + TASK_NOT_FOUND: -32001, + TASK_ALREADY_COMPLETE: -32002, + AGENT_UNAVAILABLE: -32003, + AUTHENTICATION_REQUIRED: -32004, +} as const + +export interface JSONRPCRequest { + jsonrpc: '2.0' + id: string | number + method: string + params?: unknown +} + +export interface JSONRPCResponse { + jsonrpc: '2.0' + id: string | number | null + result?: unknown + error?: { + code: number + message: string + data?: unknown + } +} + +export interface MessageSendParams { + message: Message + configuration?: { + acceptedOutputModes?: string[] + historyLength?: number + pushNotificationConfig?: PushNotificationConfig + } +} + +export interface TaskIdParams { + id: string + historyLength?: number +} + +export interface PushNotificationSetParams { + id: string + pushNotificationConfig: PushNotificationConfig +} + +export function createResponse(id: string | number | null, result: unknown): JSONRPCResponse { + return { jsonrpc: '2.0', id, result } +} + +export function createError( + id: string | number | null, + code: number, + message: string, + data?: unknown +): JSONRPCResponse { + return { jsonrpc: '2.0', id, error: { code, message, data } } +} + +export function isJSONRPCRequest(obj: unknown): obj is JSONRPCRequest { + if (!obj || typeof obj !== 'object') return false + const r = obj as Record + return r.jsonrpc === '2.0' && typeof r.method === 'string' && r.id !== undefined +} + +export function generateTaskId(): string { + return uuidv4() +} + +export function createTaskStatus(state: TaskState): { state: TaskState; timestamp: string } { + return { state, timestamp: new Date().toISOString() } +} + +export function formatTaskResponse(task: Task, historyLength?: number): Task { + if (historyLength !== undefined && task.history) { + return { + ...task, + history: task.history.slice(-historyLength), + } + } + return task +} + +export interface ExecuteRequestConfig { + workflowId: string + apiKey?: string | null + stream?: boolean +} + +export interface ExecuteRequestResult { + url: string + headers: Record + useInternalAuth: boolean +} + +export async function buildExecuteRequest( + config: ExecuteRequestConfig +): Promise { + const url = `${getBaseUrl()}/api/workflows/${config.workflowId}/execute` + const headers: Record = { 'Content-Type': 'application/json' } + let useInternalAuth = false + + if (config.apiKey) { + headers['X-API-Key'] = config.apiKey + } else { + const internalToken = await generateInternalToken() + headers.Authorization = `Bearer ${internalToken}` + useInternalAuth = true + } + + if (config.stream) { + headers['X-Stream-Response'] = 'true' + } + + return { url, headers, useInternalAuth } +} + +export function extractAgentContent(executeResult: { + output?: { content?: string; [key: string]: unknown } + error?: string +}): string { + // Prefer explicit content field + if (executeResult.output?.content) { + return executeResult.output.content + } + + // If output is an object with meaningful data, stringify it + if (typeof executeResult.output === 'object' && executeResult.output !== null) { + const keys = Object.keys(executeResult.output) + // Skip empty objects or objects with only undefined values + if (keys.length > 0 && keys.some((k) => executeResult.output![k] !== undefined)) { + return JSON.stringify(executeResult.output) + } + } + + // Fallback to error message or default + return executeResult.error || 'Task completed' +} + +export function buildTaskResponse(params: { + taskId: string + contextId: string + state: TaskState + history: Message[] + artifacts?: Artifact[] +}): Task { + return { + kind: 'task', + id: params.taskId, + contextId: params.contextId, + status: createTaskStatus(params.state), + history: params.history, + artifacts: params.artifacts || [], + } +} diff --git a/apps/sim/app/api/memory/[id]/route.ts b/apps/sim/app/api/memory/[id]/route.ts index 617979ef16..2f5b5ae1cc 100644 --- a/apps/sim/app/api/memory/[id]/route.ts +++ b/apps/sim/app/api/memory/[id]/route.ts @@ -1,11 +1,12 @@ import { db } from '@sim/db' -import { memory, permissions, workspace } from '@sim/db/schema' +import { memory } from '@sim/db/schema' import { createLogger } from '@sim/logger' import { and, eq } from 'drizzle-orm' import { type NextRequest, NextResponse } from 'next/server' import { z } from 'zod' import { checkHybridAuth } from '@/lib/auth/hybrid' import { generateRequestId } from '@/lib/core/utils/request' +import { checkWorkspaceAccess } from '@/lib/workspaces/permissions/utils' const logger = createLogger('MemoryByIdAPI') @@ -29,46 +30,6 @@ const memoryPutBodySchema = z.object({ workspaceId: z.string().uuid('Invalid workspace ID format'), }) -async function checkWorkspaceAccess( - workspaceId: string, - userId: string -): Promise<{ hasAccess: boolean; canWrite: boolean }> { - const [workspaceRow] = await db - .select({ ownerId: workspace.ownerId }) - .from(workspace) - .where(eq(workspace.id, workspaceId)) - .limit(1) - - if (!workspaceRow) { - return { hasAccess: false, canWrite: false } - } - - if (workspaceRow.ownerId === userId) { - return { hasAccess: true, canWrite: true } - } - - const [permissionRow] = await db - .select({ permissionType: permissions.permissionType }) - .from(permissions) - .where( - and( - eq(permissions.userId, userId), - eq(permissions.entityType, 'workspace'), - eq(permissions.entityId, workspaceId) - ) - ) - .limit(1) - - if (!permissionRow) { - return { hasAccess: false, canWrite: false } - } - - return { - hasAccess: true, - canWrite: permissionRow.permissionType === 'write' || permissionRow.permissionType === 'admin', - } -} - async function validateMemoryAccess( request: NextRequest, workspaceId: string, @@ -86,8 +47,8 @@ async function validateMemoryAccess( } } - const { hasAccess, canWrite } = await checkWorkspaceAccess(workspaceId, authResult.userId) - if (!hasAccess) { + const access = await checkWorkspaceAccess(workspaceId, authResult.userId) + if (!access.exists || !access.hasAccess) { return { error: NextResponse.json( { success: false, error: { message: 'Workspace not found' } }, @@ -96,7 +57,7 @@ async function validateMemoryAccess( } } - if (action === 'write' && !canWrite) { + if (action === 'write' && !access.canWrite) { return { error: NextResponse.json( { success: false, error: { message: 'Write access denied' } }, diff --git a/apps/sim/app/api/memory/route.ts b/apps/sim/app/api/memory/route.ts index fe159b9664..072756c7a6 100644 --- a/apps/sim/app/api/memory/route.ts +++ b/apps/sim/app/api/memory/route.ts @@ -1,56 +1,17 @@ import { db } from '@sim/db' -import { memory, permissions, workspace } from '@sim/db/schema' +import { memory } from '@sim/db/schema' import { createLogger } from '@sim/logger' import { and, eq, isNull, like } from 'drizzle-orm' import { type NextRequest, NextResponse } from 'next/server' import { checkHybridAuth } from '@/lib/auth/hybrid' import { generateRequestId } from '@/lib/core/utils/request' +import { checkWorkspaceAccess } from '@/lib/workspaces/permissions/utils' const logger = createLogger('MemoryAPI') export const dynamic = 'force-dynamic' export const runtime = 'nodejs' -async function checkWorkspaceAccess( - workspaceId: string, - userId: string -): Promise<{ hasAccess: boolean; canWrite: boolean }> { - const [workspaceRow] = await db - .select({ ownerId: workspace.ownerId }) - .from(workspace) - .where(eq(workspace.id, workspaceId)) - .limit(1) - - if (!workspaceRow) { - return { hasAccess: false, canWrite: false } - } - - if (workspaceRow.ownerId === userId) { - return { hasAccess: true, canWrite: true } - } - - const [permissionRow] = await db - .select({ permissionType: permissions.permissionType }) - .from(permissions) - .where( - and( - eq(permissions.userId, userId), - eq(permissions.entityType, 'workspace'), - eq(permissions.entityId, workspaceId) - ) - ) - .limit(1) - - if (!permissionRow) { - return { hasAccess: false, canWrite: false } - } - - return { - hasAccess: true, - canWrite: permissionRow.permissionType === 'write' || permissionRow.permissionType === 'admin', - } -} - export async function GET(request: NextRequest) { const requestId = generateRequestId() @@ -76,8 +37,14 @@ export async function GET(request: NextRequest) { ) } - const { hasAccess } = await checkWorkspaceAccess(workspaceId, authResult.userId) - if (!hasAccess) { + const access = await checkWorkspaceAccess(workspaceId, authResult.userId) + if (!access.exists) { + return NextResponse.json( + { success: false, error: { message: 'Workspace not found' } }, + { status: 404 } + ) + } + if (!access.hasAccess) { return NextResponse.json( { success: false, error: { message: 'Access denied to this workspace' } }, { status: 403 } @@ -155,15 +122,21 @@ export async function POST(request: NextRequest) { ) } - const { hasAccess, canWrite } = await checkWorkspaceAccess(workspaceId, authResult.userId) - if (!hasAccess) { + const access = await checkWorkspaceAccess(workspaceId, authResult.userId) + if (!access.exists) { return NextResponse.json( { success: false, error: { message: 'Workspace not found' } }, { status: 404 } ) } + if (!access.hasAccess) { + return NextResponse.json( + { success: false, error: { message: 'Access denied to this workspace' } }, + { status: 403 } + ) + } - if (!canWrite) { + if (!access.canWrite) { return NextResponse.json( { success: false, error: { message: 'Write access denied to this workspace' } }, { status: 403 } @@ -282,15 +255,21 @@ export async function DELETE(request: NextRequest) { ) } - const { hasAccess, canWrite } = await checkWorkspaceAccess(workspaceId, authResult.userId) - if (!hasAccess) { + const access = await checkWorkspaceAccess(workspaceId, authResult.userId) + if (!access.exists) { return NextResponse.json( { success: false, error: { message: 'Workspace not found' } }, { status: 404 } ) } + if (!access.hasAccess) { + return NextResponse.json( + { success: false, error: { message: 'Access denied to this workspace' } }, + { status: 403 } + ) + } - if (!canWrite) { + if (!access.canWrite) { return NextResponse.json( { success: false, error: { message: 'Write access denied to this workspace' } }, { status: 403 } diff --git a/apps/sim/app/api/tools/a2a/cancel-task/route.ts b/apps/sim/app/api/tools/a2a/cancel-task/route.ts new file mode 100644 index 0000000000..9298273cee --- /dev/null +++ b/apps/sim/app/api/tools/a2a/cancel-task/route.ts @@ -0,0 +1,84 @@ +import type { Task } from '@a2a-js/sdk' +import { createLogger } from '@sim/logger' +import { type NextRequest, NextResponse } from 'next/server' +import { z } from 'zod' +import { createA2AClient } from '@/lib/a2a/utils' +import { checkHybridAuth } from '@/lib/auth/hybrid' +import { generateRequestId } from '@/lib/core/utils/request' + +const logger = createLogger('A2ACancelTaskAPI') + +export const dynamic = 'force-dynamic' + +const A2ACancelTaskSchema = z.object({ + agentUrl: z.string().min(1, 'Agent URL is required'), + taskId: z.string().min(1, 'Task ID is required'), + apiKey: z.string().optional(), +}) + +export async function POST(request: NextRequest) { + const requestId = generateRequestId() + + try { + const authResult = await checkHybridAuth(request, { requireWorkflowId: false }) + + if (!authResult.success) { + logger.warn(`[${requestId}] Unauthorized A2A cancel task attempt`) + return NextResponse.json( + { + success: false, + error: authResult.error || 'Authentication required', + }, + { status: 401 } + ) + } + + const body = await request.json() + const validatedData = A2ACancelTaskSchema.parse(body) + + logger.info(`[${requestId}] Canceling A2A task`, { + agentUrl: validatedData.agentUrl, + taskId: validatedData.taskId, + }) + + const client = await createA2AClient(validatedData.agentUrl, validatedData.apiKey) + + const task = (await client.cancelTask({ id: validatedData.taskId })) as Task + + logger.info(`[${requestId}] Successfully canceled A2A task`, { + taskId: validatedData.taskId, + state: task.status.state, + }) + + return NextResponse.json({ + success: true, + output: { + cancelled: true, + state: task.status.state, + }, + }) + } catch (error) { + if (error instanceof z.ZodError) { + logger.warn(`[${requestId}] Invalid A2A cancel task request`, { + errors: error.errors, + }) + return NextResponse.json( + { + success: false, + error: 'Invalid request data', + details: error.errors, + }, + { status: 400 } + ) + } + + logger.error(`[${requestId}] Error canceling A2A task:`, error) + return NextResponse.json( + { + success: false, + error: error instanceof Error ? error.message : 'Failed to cancel task', + }, + { status: 500 } + ) + } +} diff --git a/apps/sim/app/api/tools/a2a/delete-push-notification/route.ts b/apps/sim/app/api/tools/a2a/delete-push-notification/route.ts new file mode 100644 index 0000000000..f222ef8830 --- /dev/null +++ b/apps/sim/app/api/tools/a2a/delete-push-notification/route.ts @@ -0,0 +1,94 @@ +import { createLogger } from '@sim/logger' +import { type NextRequest, NextResponse } from 'next/server' +import { z } from 'zod' +import { createA2AClient } from '@/lib/a2a/utils' +import { checkHybridAuth } from '@/lib/auth/hybrid' +import { generateRequestId } from '@/lib/core/utils/request' + +export const dynamic = 'force-dynamic' + +const logger = createLogger('A2ADeletePushNotificationAPI') + +const A2ADeletePushNotificationSchema = z.object({ + agentUrl: z.string().min(1, 'Agent URL is required'), + taskId: z.string().min(1, 'Task ID is required'), + pushNotificationConfigId: z.string().optional(), + apiKey: z.string().optional(), +}) + +export async function POST(request: NextRequest) { + const requestId = generateRequestId() + + try { + const authResult = await checkHybridAuth(request, { requireWorkflowId: false }) + + if (!authResult.success) { + logger.warn( + `[${requestId}] Unauthorized A2A delete push notification attempt: ${authResult.error}` + ) + return NextResponse.json( + { + success: false, + error: authResult.error || 'Authentication required', + }, + { status: 401 } + ) + } + + logger.info( + `[${requestId}] Authenticated A2A delete push notification request via ${authResult.authType}`, + { + userId: authResult.userId, + } + ) + + const body = await request.json() + const validatedData = A2ADeletePushNotificationSchema.parse(body) + + logger.info(`[${requestId}] Deleting A2A push notification config`, { + agentUrl: validatedData.agentUrl, + taskId: validatedData.taskId, + pushNotificationConfigId: validatedData.pushNotificationConfigId, + }) + + const client = await createA2AClient(validatedData.agentUrl, validatedData.apiKey) + + await client.deleteTaskPushNotificationConfig({ + id: validatedData.taskId, + pushNotificationConfigId: validatedData.pushNotificationConfigId || validatedData.taskId, + }) + + logger.info(`[${requestId}] Push notification config deleted successfully`, { + taskId: validatedData.taskId, + }) + + return NextResponse.json({ + success: true, + output: { + success: true, + }, + }) + } catch (error) { + if (error instanceof z.ZodError) { + logger.warn(`[${requestId}] Invalid request data`, { errors: error.errors }) + return NextResponse.json( + { + success: false, + error: 'Invalid request data', + details: error.errors, + }, + { status: 400 } + ) + } + + logger.error(`[${requestId}] Error deleting A2A push notification:`, error) + + return NextResponse.json( + { + success: false, + error: error instanceof Error ? error.message : 'Failed to delete push notification', + }, + { status: 500 } + ) + } +} diff --git a/apps/sim/app/api/tools/a2a/get-agent-card/route.ts b/apps/sim/app/api/tools/a2a/get-agent-card/route.ts new file mode 100644 index 0000000000..c26ed764b6 --- /dev/null +++ b/apps/sim/app/api/tools/a2a/get-agent-card/route.ts @@ -0,0 +1,92 @@ +import { createLogger } from '@sim/logger' +import { type NextRequest, NextResponse } from 'next/server' +import { z } from 'zod' +import { createA2AClient } from '@/lib/a2a/utils' +import { checkHybridAuth } from '@/lib/auth/hybrid' +import { generateRequestId } from '@/lib/core/utils/request' + +export const dynamic = 'force-dynamic' + +const logger = createLogger('A2AGetAgentCardAPI') + +const A2AGetAgentCardSchema = z.object({ + agentUrl: z.string().min(1, 'Agent URL is required'), + apiKey: z.string().optional(), +}) + +export async function POST(request: NextRequest) { + const requestId = generateRequestId() + + try { + const authResult = await checkHybridAuth(request, { requireWorkflowId: false }) + + if (!authResult.success) { + logger.warn(`[${requestId}] Unauthorized A2A get agent card attempt: ${authResult.error}`) + return NextResponse.json( + { + success: false, + error: authResult.error || 'Authentication required', + }, + { status: 401 } + ) + } + + logger.info( + `[${requestId}] Authenticated A2A get agent card request via ${authResult.authType}`, + { + userId: authResult.userId, + } + ) + + const body = await request.json() + const validatedData = A2AGetAgentCardSchema.parse(body) + + logger.info(`[${requestId}] Fetching Agent Card`, { + agentUrl: validatedData.agentUrl, + }) + + const client = await createA2AClient(validatedData.agentUrl, validatedData.apiKey) + + const agentCard = await client.getAgentCard() + + logger.info(`[${requestId}] Agent Card fetched successfully`, { + agentName: agentCard.name, + }) + + return NextResponse.json({ + success: true, + output: { + name: agentCard.name, + description: agentCard.description, + url: agentCard.url, + version: agentCard.protocolVersion, + capabilities: agentCard.capabilities, + skills: agentCard.skills, + defaultInputModes: agentCard.defaultInputModes, + defaultOutputModes: agentCard.defaultOutputModes, + }, + }) + } catch (error) { + if (error instanceof z.ZodError) { + logger.warn(`[${requestId}] Invalid request data`, { errors: error.errors }) + return NextResponse.json( + { + success: false, + error: 'Invalid request data', + details: error.errors, + }, + { status: 400 } + ) + } + + logger.error(`[${requestId}] Error fetching Agent Card:`, error) + + return NextResponse.json( + { + success: false, + error: error instanceof Error ? error.message : 'Failed to fetch Agent Card', + }, + { status: 500 } + ) + } +} diff --git a/apps/sim/app/api/tools/a2a/get-push-notification/route.ts b/apps/sim/app/api/tools/a2a/get-push-notification/route.ts new file mode 100644 index 0000000000..5feedf4de1 --- /dev/null +++ b/apps/sim/app/api/tools/a2a/get-push-notification/route.ts @@ -0,0 +1,115 @@ +import { createLogger } from '@sim/logger' +import { type NextRequest, NextResponse } from 'next/server' +import { z } from 'zod' +import { createA2AClient } from '@/lib/a2a/utils' +import { checkHybridAuth } from '@/lib/auth/hybrid' +import { generateRequestId } from '@/lib/core/utils/request' + +export const dynamic = 'force-dynamic' + +const logger = createLogger('A2AGetPushNotificationAPI') + +const A2AGetPushNotificationSchema = z.object({ + agentUrl: z.string().min(1, 'Agent URL is required'), + taskId: z.string().min(1, 'Task ID is required'), + apiKey: z.string().optional(), +}) + +export async function POST(request: NextRequest) { + const requestId = generateRequestId() + + try { + const authResult = await checkHybridAuth(request, { requireWorkflowId: false }) + + if (!authResult.success) { + logger.warn( + `[${requestId}] Unauthorized A2A get push notification attempt: ${authResult.error}` + ) + return NextResponse.json( + { + success: false, + error: authResult.error || 'Authentication required', + }, + { status: 401 } + ) + } + + logger.info( + `[${requestId}] Authenticated A2A get push notification request via ${authResult.authType}`, + { + userId: authResult.userId, + } + ) + + const body = await request.json() + const validatedData = A2AGetPushNotificationSchema.parse(body) + + logger.info(`[${requestId}] Getting push notification config`, { + agentUrl: validatedData.agentUrl, + taskId: validatedData.taskId, + }) + + const client = await createA2AClient(validatedData.agentUrl, validatedData.apiKey) + + const result = await client.getTaskPushNotificationConfig({ + id: validatedData.taskId, + }) + + if (!result || !result.pushNotificationConfig) { + logger.info(`[${requestId}] No push notification config found for task`, { + taskId: validatedData.taskId, + }) + return NextResponse.json({ + success: true, + output: { + exists: false, + }, + }) + } + + logger.info(`[${requestId}] Push notification config retrieved successfully`, { + taskId: validatedData.taskId, + }) + + return NextResponse.json({ + success: true, + output: { + url: result.pushNotificationConfig.url, + token: result.pushNotificationConfig.token, + exists: true, + }, + }) + } catch (error) { + if (error instanceof z.ZodError) { + logger.warn(`[${requestId}] Invalid request data`, { errors: error.errors }) + return NextResponse.json( + { + success: false, + error: 'Invalid request data', + details: error.errors, + }, + { status: 400 } + ) + } + + if (error instanceof Error && error.message.includes('not found')) { + logger.info(`[${requestId}] Task not found, returning exists: false`) + return NextResponse.json({ + success: true, + output: { + exists: false, + }, + }) + } + + logger.error(`[${requestId}] Error getting A2A push notification:`, error) + + return NextResponse.json( + { + success: false, + error: error instanceof Error ? error.message : 'Failed to get push notification', + }, + { status: 500 } + ) + } +} diff --git a/apps/sim/app/api/tools/a2a/get-task/route.ts b/apps/sim/app/api/tools/a2a/get-task/route.ts new file mode 100644 index 0000000000..35aa5e278d --- /dev/null +++ b/apps/sim/app/api/tools/a2a/get-task/route.ts @@ -0,0 +1,95 @@ +import type { Task } from '@a2a-js/sdk' +import { createLogger } from '@sim/logger' +import { type NextRequest, NextResponse } from 'next/server' +import { z } from 'zod' +import { createA2AClient } from '@/lib/a2a/utils' +import { checkHybridAuth } from '@/lib/auth/hybrid' +import { generateRequestId } from '@/lib/core/utils/request' + +export const dynamic = 'force-dynamic' + +const logger = createLogger('A2AGetTaskAPI') + +const A2AGetTaskSchema = z.object({ + agentUrl: z.string().min(1, 'Agent URL is required'), + taskId: z.string().min(1, 'Task ID is required'), + apiKey: z.string().optional(), + historyLength: z.number().optional(), +}) + +export async function POST(request: NextRequest) { + const requestId = generateRequestId() + + try { + const authResult = await checkHybridAuth(request, { requireWorkflowId: false }) + + if (!authResult.success) { + logger.warn(`[${requestId}] Unauthorized A2A get task attempt: ${authResult.error}`) + return NextResponse.json( + { + success: false, + error: authResult.error || 'Authentication required', + }, + { status: 401 } + ) + } + + logger.info(`[${requestId}] Authenticated A2A get task request via ${authResult.authType}`, { + userId: authResult.userId, + }) + + const body = await request.json() + const validatedData = A2AGetTaskSchema.parse(body) + + logger.info(`[${requestId}] Getting A2A task`, { + agentUrl: validatedData.agentUrl, + taskId: validatedData.taskId, + historyLength: validatedData.historyLength, + }) + + const client = await createA2AClient(validatedData.agentUrl, validatedData.apiKey) + + const task = (await client.getTask({ + id: validatedData.taskId, + historyLength: validatedData.historyLength, + })) as Task + + logger.info(`[${requestId}] Successfully retrieved A2A task`, { + taskId: task.id, + state: task.status.state, + }) + + return NextResponse.json({ + success: true, + output: { + taskId: task.id, + contextId: task.contextId, + state: task.status.state, + artifacts: task.artifacts, + history: task.history, + }, + }) + } catch (error) { + if (error instanceof z.ZodError) { + logger.warn(`[${requestId}] Invalid request data`, { errors: error.errors }) + return NextResponse.json( + { + success: false, + error: 'Invalid request data', + details: error.errors, + }, + { status: 400 } + ) + } + + logger.error(`[${requestId}] Error getting A2A task:`, error) + + return NextResponse.json( + { + success: false, + error: error instanceof Error ? error.message : 'Failed to get task', + }, + { status: 500 } + ) + } +} diff --git a/apps/sim/app/api/tools/a2a/resubscribe/route.ts b/apps/sim/app/api/tools/a2a/resubscribe/route.ts new file mode 100644 index 0000000000..75c0d24aec --- /dev/null +++ b/apps/sim/app/api/tools/a2a/resubscribe/route.ts @@ -0,0 +1,119 @@ +import type { + Artifact, + Message, + Task, + TaskArtifactUpdateEvent, + TaskState, + TaskStatusUpdateEvent, +} from '@a2a-js/sdk' +import { createLogger } from '@sim/logger' +import { type NextRequest, NextResponse } from 'next/server' +import { z } from 'zod' +import { createA2AClient, extractTextContent, isTerminalState } from '@/lib/a2a/utils' +import { checkHybridAuth } from '@/lib/auth/hybrid' +import { generateRequestId } from '@/lib/core/utils/request' + +const logger = createLogger('A2AResubscribeAPI') + +export const dynamic = 'force-dynamic' + +const A2AResubscribeSchema = z.object({ + agentUrl: z.string().min(1, 'Agent URL is required'), + taskId: z.string().min(1, 'Task ID is required'), + apiKey: z.string().optional(), +}) + +export async function POST(request: NextRequest) { + const requestId = generateRequestId() + + try { + const authResult = await checkHybridAuth(request, { requireWorkflowId: false }) + + if (!authResult.success) { + logger.warn(`[${requestId}] Unauthorized A2A resubscribe attempt`) + return NextResponse.json( + { + success: false, + error: authResult.error || 'Authentication required', + }, + { status: 401 } + ) + } + + const body = await request.json() + const validatedData = A2AResubscribeSchema.parse(body) + + const client = await createA2AClient(validatedData.agentUrl, validatedData.apiKey) + + const stream = client.resubscribeTask({ id: validatedData.taskId }) + + let taskId = validatedData.taskId + let contextId: string | undefined + let state: TaskState = 'working' + let content = '' + let artifacts: Artifact[] = [] + let history: Message[] = [] + + for await (const event of stream) { + if (event.kind === 'message') { + const msg = event as Message + content = extractTextContent(msg) + taskId = msg.taskId || taskId + contextId = msg.contextId || contextId + state = 'completed' + } else if (event.kind === 'task') { + const task = event as Task + taskId = task.id + contextId = task.contextId + state = task.status.state + artifacts = task.artifacts || [] + history = task.history || [] + const lastAgentMessage = history.filter((m) => m.role === 'agent').pop() + if (lastAgentMessage) { + content = extractTextContent(lastAgentMessage) + } + } else if ('status' in event) { + const statusEvent = event as TaskStatusUpdateEvent + state = statusEvent.status.state + } else if ('artifact' in event) { + const artifactEvent = event as TaskArtifactUpdateEvent + artifacts.push(artifactEvent.artifact) + } + } + + logger.info(`[${requestId}] Successfully resubscribed to A2A task ${taskId}`) + + return NextResponse.json({ + success: true, + output: { + taskId, + contextId, + state, + isRunning: !isTerminalState(state), + artifacts, + history, + }, + }) + } catch (error) { + if (error instanceof z.ZodError) { + logger.warn(`[${requestId}] Invalid A2A resubscribe data`, { errors: error.errors }) + return NextResponse.json( + { + success: false, + error: 'Invalid request data', + details: error.errors, + }, + { status: 400 } + ) + } + + logger.error(`[${requestId}] Error resubscribing to A2A task:`, error) + return NextResponse.json( + { + success: false, + error: error instanceof Error ? error.message : 'Failed to resubscribe', + }, + { status: 500 } + ) + } +} diff --git a/apps/sim/app/api/tools/a2a/send-message-stream/route.ts b/apps/sim/app/api/tools/a2a/send-message-stream/route.ts new file mode 100644 index 0000000000..e30689a801 --- /dev/null +++ b/apps/sim/app/api/tools/a2a/send-message-stream/route.ts @@ -0,0 +1,150 @@ +import type { + Artifact, + Message, + Task, + TaskArtifactUpdateEvent, + TaskState, + TaskStatusUpdateEvent, +} from '@a2a-js/sdk' +import { createLogger } from '@sim/logger' +import { type NextRequest, NextResponse } from 'next/server' +import { z } from 'zod' +import { createA2AClient, extractTextContent, isTerminalState } from '@/lib/a2a/utils' +import { checkHybridAuth } from '@/lib/auth/hybrid' +import { generateRequestId } from '@/lib/core/utils/request' + +export const dynamic = 'force-dynamic' + +const logger = createLogger('A2ASendMessageStreamAPI') + +const A2ASendMessageStreamSchema = z.object({ + agentUrl: z.string().min(1, 'Agent URL is required'), + message: z.string().min(1, 'Message is required'), + taskId: z.string().optional(), + contextId: z.string().optional(), + apiKey: z.string().optional(), +}) + +export async function POST(request: NextRequest) { + const requestId = generateRequestId() + + try { + const authResult = await checkHybridAuth(request, { requireWorkflowId: false }) + + if (!authResult.success) { + logger.warn( + `[${requestId}] Unauthorized A2A send message stream attempt: ${authResult.error}` + ) + return NextResponse.json( + { + success: false, + error: authResult.error || 'Authentication required', + }, + { status: 401 } + ) + } + + logger.info( + `[${requestId}] Authenticated A2A send message stream request via ${authResult.authType}`, + { + userId: authResult.userId, + } + ) + + const body = await request.json() + const validatedData = A2ASendMessageStreamSchema.parse(body) + + logger.info(`[${requestId}] Sending A2A streaming message`, { + agentUrl: validatedData.agentUrl, + hasTaskId: !!validatedData.taskId, + hasContextId: !!validatedData.contextId, + }) + + const client = await createA2AClient(validatedData.agentUrl, validatedData.apiKey) + + const message: Message = { + kind: 'message', + messageId: crypto.randomUUID(), + role: 'user', + parts: [{ kind: 'text', text: validatedData.message }], + ...(validatedData.taskId && { taskId: validatedData.taskId }), + ...(validatedData.contextId && { contextId: validatedData.contextId }), + } + + const stream = client.sendMessageStream({ message }) + + let taskId = '' + let contextId: string | undefined + let state: TaskState = 'working' + let content = '' + let artifacts: Artifact[] = [] + let history: Message[] = [] + + for await (const event of stream) { + if (event.kind === 'message') { + const msg = event as Message + content = extractTextContent(msg) + taskId = msg.taskId || taskId + contextId = msg.contextId || contextId + state = 'completed' + } else if (event.kind === 'task') { + const task = event as Task + taskId = task.id + contextId = task.contextId + state = task.status.state + artifacts = task.artifacts || [] + history = task.history || [] + const lastAgentMessage = history.filter((m) => m.role === 'agent').pop() + if (lastAgentMessage) { + content = extractTextContent(lastAgentMessage) + } + } else if ('status' in event) { + const statusEvent = event as TaskStatusUpdateEvent + state = statusEvent.status.state + } else if ('artifact' in event) { + const artifactEvent = event as TaskArtifactUpdateEvent + artifacts.push(artifactEvent.artifact) + } + } + + logger.info(`[${requestId}] A2A streaming message completed`, { + taskId, + state, + artifactCount: artifacts.length, + }) + + return NextResponse.json({ + success: isTerminalState(state) && state !== 'failed', + output: { + content, + taskId, + contextId, + state, + artifacts, + history, + }, + }) + } catch (error) { + if (error instanceof z.ZodError) { + logger.warn(`[${requestId}] Invalid request data`, { errors: error.errors }) + return NextResponse.json( + { + success: false, + error: 'Invalid request data', + details: error.errors, + }, + { status: 400 } + ) + } + + logger.error(`[${requestId}] Error in A2A streaming:`, error) + + return NextResponse.json( + { + success: false, + error: error instanceof Error ? error.message : 'Streaming failed', + }, + { status: 500 } + ) + } +} diff --git a/apps/sim/app/api/tools/a2a/send-message/route.ts b/apps/sim/app/api/tools/a2a/send-message/route.ts new file mode 100644 index 0000000000..4d52fc710c --- /dev/null +++ b/apps/sim/app/api/tools/a2a/send-message/route.ts @@ -0,0 +1,126 @@ +import type { Message, Task } from '@a2a-js/sdk' +import { createLogger } from '@sim/logger' +import { type NextRequest, NextResponse } from 'next/server' +import { z } from 'zod' +import { createA2AClient, extractTextContent, isTerminalState } from '@/lib/a2a/utils' +import { checkHybridAuth } from '@/lib/auth/hybrid' +import { generateRequestId } from '@/lib/core/utils/request' + +export const dynamic = 'force-dynamic' + +const logger = createLogger('A2ASendMessageAPI') + +const A2ASendMessageSchema = z.object({ + agentUrl: z.string().min(1, 'Agent URL is required'), + message: z.string().min(1, 'Message is required'), + taskId: z.string().optional(), + contextId: z.string().optional(), + apiKey: z.string().optional(), +}) + +export async function POST(request: NextRequest) { + const requestId = generateRequestId() + + try { + const authResult = await checkHybridAuth(request, { requireWorkflowId: false }) + + if (!authResult.success) { + logger.warn(`[${requestId}] Unauthorized A2A send message attempt: ${authResult.error}`) + return NextResponse.json( + { + success: false, + error: authResult.error || 'Authentication required', + }, + { status: 401 } + ) + } + + logger.info( + `[${requestId}] Authenticated A2A send message request via ${authResult.authType}`, + { + userId: authResult.userId, + } + ) + + const body = await request.json() + const validatedData = A2ASendMessageSchema.parse(body) + + logger.info(`[${requestId}] Sending A2A message`, { + agentUrl: validatedData.agentUrl, + hasTaskId: !!validatedData.taskId, + hasContextId: !!validatedData.contextId, + }) + + const client = await createA2AClient(validatedData.agentUrl, validatedData.apiKey) + + const message: Message = { + kind: 'message', + messageId: crypto.randomUUID(), + role: 'user', + parts: [{ kind: 'text', text: validatedData.message }], + ...(validatedData.taskId && { taskId: validatedData.taskId }), + ...(validatedData.contextId && { contextId: validatedData.contextId }), + } + + const result = await client.sendMessage({ message }) + + if (result.kind === 'message') { + const responseMessage = result as Message + + logger.info(`[${requestId}] A2A message sent successfully (message response)`) + + return NextResponse.json({ + success: true, + output: { + content: extractTextContent(responseMessage), + taskId: responseMessage.taskId || '', + contextId: responseMessage.contextId, + state: 'completed', + }, + }) + } + + const task = result as Task + const lastAgentMessage = task.history?.filter((m) => m.role === 'agent').pop() + const content = lastAgentMessage ? extractTextContent(lastAgentMessage) : '' + + logger.info(`[${requestId}] A2A message sent successfully (task response)`, { + taskId: task.id, + state: task.status.state, + }) + + return NextResponse.json({ + success: isTerminalState(task.status.state) && task.status.state !== 'failed', + output: { + content, + taskId: task.id, + contextId: task.contextId, + state: task.status.state, + artifacts: task.artifacts, + history: task.history, + }, + }) + } catch (error) { + if (error instanceof z.ZodError) { + logger.warn(`[${requestId}] Invalid request data`, { errors: error.errors }) + return NextResponse.json( + { + success: false, + error: 'Invalid request data', + details: error.errors, + }, + { status: 400 } + ) + } + + logger.error(`[${requestId}] Error sending A2A message:`, error) + + return NextResponse.json( + { + success: false, + error: error instanceof Error ? error.message : 'Internal server error', + }, + { status: 500 } + ) + } +} diff --git a/apps/sim/app/api/tools/a2a/set-push-notification/route.ts b/apps/sim/app/api/tools/a2a/set-push-notification/route.ts new file mode 100644 index 0000000000..d407609418 --- /dev/null +++ b/apps/sim/app/api/tools/a2a/set-push-notification/route.ts @@ -0,0 +1,93 @@ +import { createLogger } from '@sim/logger' +import { type NextRequest, NextResponse } from 'next/server' +import { z } from 'zod' +import { createA2AClient } from '@/lib/a2a/utils' +import { checkHybridAuth } from '@/lib/auth/hybrid' +import { generateRequestId } from '@/lib/core/utils/request' + +export const dynamic = 'force-dynamic' + +const logger = createLogger('A2ASetPushNotificationAPI') + +const A2ASetPushNotificationSchema = z.object({ + agentUrl: z.string().min(1, 'Agent URL is required'), + taskId: z.string().min(1, 'Task ID is required'), + webhookUrl: z.string().min(1, 'Webhook URL is required'), + token: z.string().optional(), + apiKey: z.string().optional(), +}) + +export async function POST(request: NextRequest) { + const requestId = generateRequestId() + + try { + const authResult = await checkHybridAuth(request, { requireWorkflowId: false }) + + if (!authResult.success) { + logger.warn(`[${requestId}] Unauthorized A2A set push notification attempt`, { + error: authResult.error || 'Authentication required', + }) + return NextResponse.json( + { + success: false, + error: authResult.error || 'Authentication required', + }, + { status: 401 } + ) + } + + const body = await request.json() + const validatedData = A2ASetPushNotificationSchema.parse(body) + + logger.info(`[${requestId}] A2A set push notification request`, { + agentUrl: validatedData.agentUrl, + taskId: validatedData.taskId, + webhookUrl: validatedData.webhookUrl, + }) + + const client = await createA2AClient(validatedData.agentUrl, validatedData.apiKey) + + const result = await client.setTaskPushNotificationConfig({ + taskId: validatedData.taskId, + pushNotificationConfig: { + url: validatedData.webhookUrl, + token: validatedData.token, + }, + }) + + logger.info(`[${requestId}] A2A set push notification successful`, { + taskId: validatedData.taskId, + }) + + return NextResponse.json({ + success: true, + output: { + url: result.pushNotificationConfig.url, + token: result.pushNotificationConfig.token, + success: true, + }, + }) + } catch (error) { + if (error instanceof z.ZodError) { + logger.warn(`[${requestId}] Invalid request data`, { errors: error.errors }) + return NextResponse.json( + { + success: false, + error: 'Invalid request data', + details: error.errors, + }, + { status: 400 } + ) + } + + logger.error(`[${requestId}] Error setting A2A push notification:`, error) + + return NextResponse.json( + { + success: false, + error: error instanceof Error ? error.message : 'Failed to set push notification', + }, + { status: 500 } + ) + } +} diff --git a/apps/sim/app/api/workflows/[id]/execute/route.ts b/apps/sim/app/api/workflows/[id]/execute/route.ts index ca35437e5a..e045e6eaba 100644 --- a/apps/sim/app/api/workflows/[id]/execute/route.ts +++ b/apps/sim/app/api/workflows/[id]/execute/route.ts @@ -215,10 +215,10 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: workflowStateOverride, } = validation.data - // For API key auth, the entire body is the input (except for our control fields) + // For API key and internal JWT auth, the entire body is the input (except for our control fields) // For session auth, the input is explicitly provided in the input field const input = - auth.authType === 'api_key' + auth.authType === 'api_key' || auth.authType === 'internal_jwt' ? (() => { const { selectedOutputs, @@ -226,6 +226,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: stream, useDraftState, workflowStateOverride, + workflowId: _workflowId, // Also exclude workflowId used for internal JWT auth ...rest } = body return Object.keys(rest).length > 0 ? rest : validatedInput diff --git a/apps/sim/app/api/workflows/route.ts b/apps/sim/app/api/workflows/route.ts index 7c905ab7e6..81d4c885b9 100644 --- a/apps/sim/app/api/workflows/route.ts +++ b/apps/sim/app/api/workflows/route.ts @@ -1,12 +1,12 @@ import { db } from '@sim/db' -import { workflow, workspace } from '@sim/db/schema' +import { workflow } from '@sim/db/schema' import { createLogger } from '@sim/logger' import { eq } from 'drizzle-orm' import { type NextRequest, NextResponse } from 'next/server' import { z } from 'zod' import { getSession } from '@/lib/auth' import { generateRequestId } from '@/lib/core/utils/request' -import { getUserEntityPermissions } from '@/lib/workspaces/permissions/utils' +import { getUserEntityPermissions, workspaceExists } from '@/lib/workspaces/permissions/utils' import { verifyWorkspaceMembership } from '@/app/api/workflows/utils' const logger = createLogger('WorkflowAPI') @@ -36,13 +36,9 @@ export async function GET(request: Request) { const userId = session.user.id if (workspaceId) { - const workspaceExists = await db - .select({ id: workspace.id }) - .from(workspace) - .where(eq(workspace.id, workspaceId)) - .then((rows) => rows.length > 0) + const wsExists = await workspaceExists(workspaceId) - if (!workspaceExists) { + if (!wsExists) { logger.warn( `[${requestId}] Attempt to fetch workflows for non-existent workspace: ${workspaceId}` ) diff --git a/apps/sim/app/api/workspaces/[id]/api-keys/route.ts b/apps/sim/app/api/workspaces/[id]/api-keys/route.ts index 1232272366..c649972140 100644 --- a/apps/sim/app/api/workspaces/[id]/api-keys/route.ts +++ b/apps/sim/app/api/workspaces/[id]/api-keys/route.ts @@ -1,5 +1,5 @@ import { db } from '@sim/db' -import { apiKey, workspace } from '@sim/db/schema' +import { apiKey } from '@sim/db/schema' import { createLogger } from '@sim/logger' import { and, eq, inArray } from 'drizzle-orm' import { nanoid } from 'nanoid' @@ -9,7 +9,7 @@ import { createApiKey, getApiKeyDisplayFormat } from '@/lib/api-key/auth' import { getSession } from '@/lib/auth' import { PlatformEvents } from '@/lib/core/telemetry' import { generateRequestId } from '@/lib/core/utils/request' -import { getUserEntityPermissions } from '@/lib/workspaces/permissions/utils' +import { getUserEntityPermissions, getWorkspaceById } from '@/lib/workspaces/permissions/utils' const logger = createLogger('WorkspaceApiKeysAPI') @@ -34,8 +34,8 @@ export async function GET(request: NextRequest, { params }: { params: Promise<{ const userId = session.user.id - const ws = await db.select().from(workspace).where(eq(workspace.id, workspaceId)).limit(1) - if (!ws.length) { + const ws = await getWorkspaceById(workspaceId) + if (!ws) { return NextResponse.json({ error: 'Workspace not found' }, { status: 404 }) } diff --git a/apps/sim/app/api/workspaces/[id]/byok-keys/route.ts b/apps/sim/app/api/workspaces/[id]/byok-keys/route.ts index 84be273d12..82045c0c50 100644 --- a/apps/sim/app/api/workspaces/[id]/byok-keys/route.ts +++ b/apps/sim/app/api/workspaces/[id]/byok-keys/route.ts @@ -1,5 +1,5 @@ import { db } from '@sim/db' -import { workspace, workspaceBYOKKeys } from '@sim/db/schema' +import { workspaceBYOKKeys } from '@sim/db/schema' import { createLogger } from '@sim/logger' import { and, eq } from 'drizzle-orm' import { nanoid } from 'nanoid' @@ -10,7 +10,7 @@ import { isEnterpriseOrgAdminOrOwner } from '@/lib/billing/core/subscription' import { isHosted } from '@/lib/core/config/feature-flags' import { decryptSecret, encryptSecret } from '@/lib/core/security/encryption' import { generateRequestId } from '@/lib/core/utils/request' -import { getUserEntityPermissions } from '@/lib/workspaces/permissions/utils' +import { getUserEntityPermissions, getWorkspaceById } from '@/lib/workspaces/permissions/utils' const logger = createLogger('WorkspaceBYOKKeysAPI') @@ -48,8 +48,8 @@ export async function GET(request: NextRequest, { params }: { params: Promise<{ const userId = session.user.id - const ws = await db.select().from(workspace).where(eq(workspace.id, workspaceId)).limit(1) - if (!ws.length) { + const ws = await getWorkspaceById(workspaceId) + if (!ws) { return NextResponse.json({ error: 'Workspace not found' }, { status: 404 }) } diff --git a/apps/sim/app/api/workspaces/[id]/environment/route.ts b/apps/sim/app/api/workspaces/[id]/environment/route.ts index 9c1ee4eb04..f11da0ecc9 100644 --- a/apps/sim/app/api/workspaces/[id]/environment/route.ts +++ b/apps/sim/app/api/workspaces/[id]/environment/route.ts @@ -1,5 +1,5 @@ import { db } from '@sim/db' -import { environment, workspace, workspaceEnvironment } from '@sim/db/schema' +import { environment, workspaceEnvironment } from '@sim/db/schema' import { createLogger } from '@sim/logger' import { eq } from 'drizzle-orm' import { type NextRequest, NextResponse } from 'next/server' @@ -7,7 +7,7 @@ import { z } from 'zod' import { getSession } from '@/lib/auth' import { decryptSecret, encryptSecret } from '@/lib/core/security/encryption' import { generateRequestId } from '@/lib/core/utils/request' -import { getUserEntityPermissions } from '@/lib/workspaces/permissions/utils' +import { getUserEntityPermissions, getWorkspaceById } from '@/lib/workspaces/permissions/utils' const logger = createLogger('WorkspaceEnvironmentAPI') @@ -33,8 +33,8 @@ export async function GET(request: NextRequest, { params }: { params: Promise<{ const userId = session.user.id // Validate workspace exists - const ws = await db.select().from(workspace).where(eq(workspace.id, workspaceId)).limit(1) - if (!ws.length) { + const ws = await getWorkspaceById(workspaceId) + if (!ws) { return NextResponse.json({ error: 'Workspace not found' }, { status: 404 }) } diff --git a/apps/sim/app/playground/page.tsx b/apps/sim/app/playground/page.tsx index a1a6694cfb..4670b805e0 100644 --- a/apps/sim/app/playground/page.tsx +++ b/apps/sim/app/playground/page.tsx @@ -364,12 +364,30 @@ export default function PlaygroundPage() { + {}} /> + {}} /> {}} /> + +
+ true} + onRemove={() => {}} + placeholder='Add tags' + placeholderWithTags='Add another' + tagVariant='secondary' + triggerKeys={['Enter', ',']} + /> +
+
diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/chat/components/output-select/output-select.tsx b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/chat/components/output-select/output-select.tsx index 43fee221a3..cf6973216e 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/chat/components/output-select/output-select.tsx +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/chat/components/output-select/output-select.tsx @@ -1,16 +1,9 @@ 'use client' import type React from 'react' -import { useCallback, useEffect, useMemo, useRef, useState } from 'react' -import { Check, RepeatIcon, SplitIcon } from 'lucide-react' -import { - Badge, - Popover, - PopoverContent, - PopoverDivider, - PopoverItem, - PopoverTrigger, -} from '@/components/emcn' +import { useMemo } from 'react' +import { RepeatIcon, SplitIcon } from 'lucide-react' +import { Combobox, type ComboboxOptionGroup } from '@/components/emcn' import { extractFieldsFromSchema, parseResponseFormatSafely, @@ -21,7 +14,7 @@ import { useSubBlockStore } from '@/stores/workflows/subblock/store' import { useWorkflowStore } from '@/stores/workflows/workflow/store' /** - * Renders a tag icon with background color. + * Renders a tag icon with background color for block section headers. * * @param icon - Either a letter string or a Lucide icon component * @param color - Background color for the icon container @@ -62,14 +55,9 @@ interface OutputSelectProps { placeholder?: string /** Whether to emit output IDs or labels in onOutputSelect callback */ valueMode?: 'id' | 'label' - /** - * When true, renders the underlying popover content inline instead of in a portal. - * Useful when used inside dialogs or other portalled components that manage scroll locking. - */ - disablePopoverPortal?: boolean - /** Alignment of the popover relative to the trigger */ + /** Alignment of the dropdown relative to the trigger */ align?: 'start' | 'end' | 'center' - /** Maximum height of the popover content in pixels */ + /** Maximum height of the dropdown content in pixels */ maxHeight?: number } @@ -90,14 +78,9 @@ export function OutputSelect({ disabled = false, placeholder = 'Select outputs', valueMode = 'id', - disablePopoverPortal = false, align = 'start', maxHeight = 200, }: OutputSelectProps) { - const [open, setOpen] = useState(false) - const [highlightedIndex, setHighlightedIndex] = useState(-1) - const triggerRef = useRef(null) - const popoverRef = useRef(null) const blocks = useWorkflowStore((state) => state.blocks) const { isShowingDiff, isDiffReady, hasActiveDiff, baselineWorkflow } = useWorkflowDiffStore() const subBlockValues = useSubBlockStore((state) => @@ -206,21 +189,10 @@ export function OutputSelect({ shouldUseBaseline, ]) - /** - * Checks if an output is currently selected by comparing both ID and label - * @param o - The output object to check - * @returns True if the output is selected, false otherwise - */ - const isSelectedValue = useCallback( - (o: { id: string; label: string }) => - selectedOutputs.includes(o.id) || selectedOutputs.includes(o.label), - [selectedOutputs] - ) - /** * Gets display text for selected outputs */ - const selectedOutputsDisplayText = useMemo(() => { + const selectedDisplayText = useMemo(() => { if (!selectedOutputs || selectedOutputs.length === 0) { return placeholder } @@ -234,19 +206,27 @@ export function OutputSelect({ } if (validOutputs.length === 1) { - const output = workflowOutputs.find( - (o) => o.id === validOutputs[0] || o.label === validOutputs[0] - ) - return output?.label || placeholder + return '1 output' } return `${validOutputs.length} outputs` }, [selectedOutputs, workflowOutputs, placeholder]) /** - * Groups outputs by block and sorts by distance from starter block + * Gets the background color for a block output based on its type + * @param blockType - The type of the block + * @returns The hex color code for the block */ - const groupedOutputs = useMemo(() => { + const getOutputColor = (blockType: string) => { + const blockConfig = getBlock(blockType) + return blockConfig?.bgColor || '#2F55FF' + } + + /** + * Groups outputs by block and sorts by distance from starter block. + * Returns ComboboxOptionGroup[] for use with Combobox. + */ + const comboboxGroups = useMemo((): ComboboxOptionGroup[] => { const groups: Record = {} const blockDistances: Record = {} const edges = useWorkflowStore.getState().edges @@ -283,242 +263,75 @@ export function OutputSelect({ groups[output.blockName].push(output) }) - return Object.entries(groups) + const sortedGroups = Object.entries(groups) .map(([blockName, outputs]) => ({ blockName, outputs, distance: blockDistances[outputs[0]?.blockId] || 0, })) .sort((a, b) => b.distance - a.distance) - .reduce( - (acc, { blockName, outputs }) => { - acc[blockName] = outputs - return acc - }, - {} as Record - ) - }, [workflowOutputs, blocks]) - - /** - * Gets the background color for a block output based on its type - * @param blockId - The block ID (unused but kept for future extensibility) - * @param blockType - The type of the block - * @returns The hex color code for the block - */ - const getOutputColor = (blockId: string, blockType: string) => { - const blockConfig = getBlock(blockType) - return blockConfig?.bgColor || '#2F55FF' - } - - /** - * Flattened outputs for keyboard navigation - */ - const flattenedOutputs = useMemo(() => { - return Object.values(groupedOutputs).flat() - }, [groupedOutputs]) - - /** - * Handles output selection by toggling the selected state - * @param value - The output label to toggle - */ - const handleOutputSelection = useCallback( - (value: string) => { - const emittedValue = - valueMode === 'label' ? value : workflowOutputs.find((o) => o.label === value)?.id || value - const index = selectedOutputs.indexOf(emittedValue) - - const newSelectedOutputs = - index === -1 - ? [...new Set([...selectedOutputs, emittedValue])] - : selectedOutputs.filter((id) => id !== emittedValue) - - onOutputSelect(newSelectedOutputs) - }, - [valueMode, workflowOutputs, selectedOutputs, onOutputSelect] - ) - - /** - * Handles keyboard navigation within the output list - * Supports ArrowUp, ArrowDown, Enter, and Escape keys - */ - useEffect(() => { - if (!open || flattenedOutputs.length === 0) return - - const handleKeyboardEvent = (e: KeyboardEvent) => { - switch (e.key) { - case 'ArrowDown': - e.preventDefault() - e.stopPropagation() - setHighlightedIndex((prev) => { - if (prev === -1 || prev >= flattenedOutputs.length - 1) { - return 0 - } - return prev + 1 - }) - break - - case 'ArrowUp': - e.preventDefault() - e.stopPropagation() - setHighlightedIndex((prev) => { - if (prev <= 0) { - return flattenedOutputs.length - 1 - } - return prev - 1 - }) - break - - case 'Enter': - e.preventDefault() - e.stopPropagation() - setHighlightedIndex((currentIndex) => { - if (currentIndex >= 0 && currentIndex < flattenedOutputs.length) { - handleOutputSelection(flattenedOutputs[currentIndex].label) - } - return currentIndex - }) - break - case 'Escape': - e.preventDefault() - e.stopPropagation() - setOpen(false) - break + return sortedGroups.map(({ blockName, outputs }) => { + const firstOutput = outputs[0] + const blockConfig = getBlock(firstOutput.blockType) + const blockColor = getOutputColor(firstOutput.blockType) + + let blockIcon: string | React.ComponentType<{ className?: string }> = blockName + .charAt(0) + .toUpperCase() + + if (blockConfig?.icon) { + blockIcon = blockConfig.icon + } else if (firstOutput.blockType === 'loop') { + blockIcon = RepeatIcon + } else if (firstOutput.blockType === 'parallel') { + blockIcon = SplitIcon } - } - - window.addEventListener('keydown', handleKeyboardEvent, true) - return () => window.removeEventListener('keydown', handleKeyboardEvent, true) - }, [open, flattenedOutputs, handleOutputSelection]) - /** - * Reset highlighted index when popover opens/closes - */ - useEffect(() => { - if (open) { - const firstSelectedIndex = flattenedOutputs.findIndex((output) => isSelectedValue(output)) - setHighlightedIndex(firstSelectedIndex >= 0 ? firstSelectedIndex : -1) - } else { - setHighlightedIndex(-1) - } - }, [open, flattenedOutputs, isSelectedValue]) - - /** - * Scroll highlighted item into view - */ - useEffect(() => { - if (highlightedIndex >= 0 && popoverRef.current) { - const highlightedElement = popoverRef.current.querySelector( - `[data-option-index="${highlightedIndex}"]` - ) - if (highlightedElement) { - highlightedElement.scrollIntoView({ behavior: 'smooth', block: 'nearest' }) + return { + sectionElement: ( +
+ + {blockName} +
+ ), + items: outputs.map((output) => ({ + label: output.path, + value: valueMode === 'label' ? output.label : output.id, + })), } - } - }, [highlightedIndex]) + }) + }, [workflowOutputs, blocks, valueMode]) /** - * Closes popover when clicking outside + * Normalize selected values to match the valueMode */ - useEffect(() => { - if (!open) return - - const handleClickOutside = (event: MouseEvent) => { - const target = event.target as Node - const insideTrigger = triggerRef.current?.contains(target) - const insidePopover = popoverRef.current?.contains(target) - - if (!insideTrigger && !insidePopover) { - setOpen(false) - } - } - - document.addEventListener('mousedown', handleClickOutside) - return () => document.removeEventListener('mousedown', handleClickOutside) - }, [open]) + const normalizedSelectedValues = useMemo(() => { + return selectedOutputs + .map((val) => { + // Find the output that matches either id or label + const output = workflowOutputs.find((o) => o.id === val || o.label === val) + if (!output) return null + // Return in the format matching valueMode + return valueMode === 'label' ? output.label : output.id + }) + .filter((v): v is string => v !== null) + }, [selectedOutputs, workflowOutputs, valueMode]) return ( - - -
- { - if (disabled || workflowOutputs.length === 0) return - e.stopPropagation() - setOpen((prev) => !prev) - }} - > - {selectedOutputsDisplayText} - -
-
- -
- {Object.entries(groupedOutputs).map(([blockName, outputs], groupIndex, groupArray) => { - const startIndex = flattenedOutputs.findIndex((o) => o.blockName === blockName) - - const firstOutput = outputs[0] - const blockConfig = getBlock(firstOutput.blockType) - const blockColor = getOutputColor(firstOutput.blockId, firstOutput.blockType) - - let blockIcon: string | React.ComponentType<{ className?: string }> = blockName - .charAt(0) - .toUpperCase() - - if (blockConfig?.icon) { - blockIcon = blockConfig.icon - } else if (firstOutput.blockType === 'loop') { - blockIcon = RepeatIcon - } else if (firstOutput.blockType === 'parallel') { - blockIcon = SplitIcon - } - - return ( -
-
- - {blockName} -
- -
- {outputs.map((output, localIndex) => { - const globalIndex = startIndex + localIndex - const isHighlighted = globalIndex === highlightedIndex - - return ( - handleOutputSelection(output.label)} - onMouseEnter={() => setHighlightedIndex(globalIndex)} - > - {output.path} - {isSelectedValue(output) && } - - ) - })} -
- {groupIndex < groupArray.length - 1 && } -
- ) - })} -
-
-
+ ) } diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/deploy/components/deploy-modal/components/a2a/a2a.tsx b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/deploy/components/deploy-modal/components/a2a/a2a.tsx new file mode 100644 index 0000000000..f72c96bc00 --- /dev/null +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/deploy/components/deploy-modal/components/a2a/a2a.tsx @@ -0,0 +1,935 @@ +'use client' + +import { useCallback, useEffect, useMemo, useState } from 'react' +import { createLogger } from '@sim/logger' +import { Check, Clipboard } from 'lucide-react' +import { useParams } from 'next/navigation' +import { + Badge, + Button, + ButtonGroup, + ButtonGroupItem, + Checkbox, + Code, + Combobox, + type ComboboxOption, + Input, + Label, + TagInput, + Textarea, + Tooltip, +} from '@/components/emcn' +import { Skeleton } from '@/components/ui' +import type { AgentAuthentication, AgentCapabilities } from '@/lib/a2a/types' +import { getBaseUrl } from '@/lib/core/utils/urls' +import { normalizeInputFormatValue } from '@/lib/workflows/input-format-utils' +import { StartBlockPath, TriggerUtils } from '@/lib/workflows/triggers/triggers' +import { + useA2AAgentByWorkflow, + useCreateA2AAgent, + useDeleteA2AAgent, + usePublishA2AAgent, + useUpdateA2AAgent, +} from '@/hooks/queries/a2a/agents' +import { useCollaborativeWorkflow } from '@/hooks/use-collaborative-workflow' +import { useSubBlockStore } from '@/stores/workflows/subblock/store' +import { useWorkflowStore } from '@/stores/workflows/workflow/store' + +const logger = createLogger('A2ADeploy') + +interface InputFormatField { + id?: string + name?: string + type?: string + value?: unknown + collapsed?: boolean +} + +/** + * Check if a description is a default/placeholder value that should be filtered out + */ +function isDefaultDescription(desc: string | null | undefined, workflowName: string): boolean { + if (!desc) return true + const normalized = desc.toLowerCase().trim() + return ( + normalized === '' || normalized === 'new workflow' || normalized === workflowName.toLowerCase() + ) +} + +type CodeLanguage = 'curl' | 'python' | 'javascript' | 'typescript' + +const LANGUAGE_LABELS: Record = { + curl: 'cURL', + python: 'Python', + javascript: 'JavaScript', + typescript: 'TypeScript', +} + +const LANGUAGE_SYNTAX: Record = { + curl: 'javascript', + python: 'python', + javascript: 'javascript', + typescript: 'javascript', +} + +interface A2aDeployProps { + workflowId: string + workflowName: string + workflowDescription?: string | null + isDeployed: boolean + workflowNeedsRedeployment?: boolean + onSubmittingChange?: (submitting: boolean) => void + onCanSaveChange?: (canSave: boolean) => void + onAgentExistsChange?: (exists: boolean) => void + onPublishedChange?: (published: boolean) => void + onNeedsRepublishChange?: (needsRepublish: boolean) => void + onDeployWorkflow?: () => Promise +} + +type AuthScheme = 'none' | 'apiKey' + +export function A2aDeploy({ + workflowId, + workflowName, + workflowDescription, + isDeployed, + workflowNeedsRedeployment, + onSubmittingChange, + onCanSaveChange, + onAgentExistsChange, + onPublishedChange, + onNeedsRepublishChange, + onDeployWorkflow, +}: A2aDeployProps) { + const params = useParams() + const workspaceId = params.workspaceId as string + + const { data: existingAgent, isLoading } = useA2AAgentByWorkflow(workspaceId, workflowId) + + const createAgent = useCreateA2AAgent() + const updateAgent = useUpdateA2AAgent() + const deleteAgent = useDeleteA2AAgent() + const publishAgent = usePublishA2AAgent() + + const blocks = useWorkflowStore((state) => state.blocks) + const { collaborativeSetSubblockValue } = useCollaborativeWorkflow() + + const startBlockId = useMemo(() => { + if (!blocks || Object.keys(blocks).length === 0) return null + const candidate = TriggerUtils.findStartBlock(blocks, 'api') + if (!candidate || candidate.path !== StartBlockPath.UNIFIED) return null + return candidate.blockId + }, [blocks]) + + const startBlockInputFormat = useSubBlockStore((state) => { + if (!workflowId || !startBlockId) return null + const workflowValues = state.workflowValues[workflowId] + const fromStore = workflowValues?.[startBlockId]?.inputFormat + if (fromStore !== undefined) return fromStore + const startBlock = blocks[startBlockId] + return startBlock?.subBlocks?.inputFormat?.value ?? null + }) + + const missingFields = useMemo(() => { + if (!startBlockId) return { input: false, data: false, files: false, any: false } + const normalizedFields = normalizeInputFormatValue(startBlockInputFormat) + const existingNames = new Set( + normalizedFields + .map((field) => field.name) + .filter((n): n is string => typeof n === 'string' && n.trim() !== '') + .map((n) => n.trim().toLowerCase()) + ) + const missing = { + input: !existingNames.has('input'), + data: !existingNames.has('data'), + files: !existingNames.has('files'), + any: false, + } + missing.any = missing.input || missing.data || missing.files + return missing + }, [startBlockId, startBlockInputFormat]) + + const handleAddA2AInputs = useCallback(() => { + if (!startBlockId) return + + const normalizedExisting = normalizeInputFormatValue(startBlockInputFormat) + const newFields: InputFormatField[] = [] + + // Add input field if missing (for TextPart) + if (missingFields.input) { + newFields.push({ + id: crypto.randomUUID(), + name: 'input', + type: 'string', + value: '', + collapsed: false, + }) + } + + // Add data field if missing (for DataPart) + if (missingFields.data) { + newFields.push({ + id: crypto.randomUUID(), + name: 'data', + type: 'object', + value: '', + collapsed: false, + }) + } + + // Add files field if missing (for FilePart) + if (missingFields.files) { + newFields.push({ + id: crypto.randomUUID(), + name: 'files', + type: 'files', + value: '', + collapsed: false, + }) + } + + if (newFields.length > 0) { + const updatedFields = [...newFields, ...normalizedExisting] + collaborativeSetSubblockValue(startBlockId, 'inputFormat', updatedFields) + logger.info( + `Added A2A input fields to Start block: ${newFields.map((f) => f.name).join(', ')}` + ) + } + }, [startBlockId, startBlockInputFormat, missingFields, collaborativeSetSubblockValue]) + + const [name, setName] = useState('') + const [description, setDescription] = useState('') + const [authScheme, setAuthScheme] = useState('apiKey') + const [pushNotificationsEnabled, setPushNotificationsEnabled] = useState(false) + const [skillTags, setSkillTags] = useState(['workflow', 'automation']) + const [language, setLanguage] = useState('curl') + const [useStreamingExample, setUseStreamingExample] = useState(false) + const [copied, setCopied] = useState(false) + + useEffect(() => { + if (existingAgent) { + setName(existingAgent.name) + const savedDesc = existingAgent.description || '' + setDescription(isDefaultDescription(savedDesc, workflowName) ? '' : savedDesc) + setPushNotificationsEnabled(existingAgent.capabilities?.pushNotifications ?? false) + const schemes = existingAgent.authentication?.schemes || [] + if (schemes.includes('apiKey')) { + setAuthScheme('apiKey') + } else { + setAuthScheme('none') + } + const skills = existingAgent.skills as Array<{ tags?: string[] }> | undefined + const savedTags = skills?.[0]?.tags + setSkillTags(savedTags?.length ? savedTags : ['workflow', 'automation']) + } else { + setName(workflowName) + setDescription( + isDefaultDescription(workflowDescription, workflowName) ? '' : workflowDescription || '' + ) + setAuthScheme('apiKey') + setPushNotificationsEnabled(false) + setSkillTags(['workflow', 'automation']) + } + }, [existingAgent, workflowName, workflowDescription]) + + useEffect(() => { + onAgentExistsChange?.(!!existingAgent) + }, [existingAgent, onAgentExistsChange]) + + useEffect(() => { + onPublishedChange?.(existingAgent?.isPublished ?? false) + }, [existingAgent?.isPublished, onPublishedChange]) + + const hasFormChanges = useMemo(() => { + if (!existingAgent) return false + const savedSchemes = existingAgent.authentication?.schemes || [] + const savedAuthScheme = savedSchemes.includes('apiKey') ? 'apiKey' : 'none' + const savedDesc = existingAgent.description || '' + const normalizedSavedDesc = isDefaultDescription(savedDesc, workflowName) ? '' : savedDesc + const skills = existingAgent.skills as Array<{ tags?: string[] }> | undefined + const savedTags = skills?.[0]?.tags || ['workflow', 'automation'] + const tagsChanged = + skillTags.length !== savedTags.length || skillTags.some((t, i) => t !== savedTags[i]) + return ( + name !== existingAgent.name || + description !== normalizedSavedDesc || + pushNotificationsEnabled !== (existingAgent.capabilities?.pushNotifications ?? false) || + authScheme !== savedAuthScheme || + tagsChanged + ) + }, [ + existingAgent, + name, + description, + pushNotificationsEnabled, + authScheme, + skillTags, + workflowName, + ]) + + const hasWorkflowChanges = useMemo(() => { + if (!existingAgent) return false + return !!workflowNeedsRedeployment + }, [existingAgent, workflowNeedsRedeployment]) + + const needsRepublish = existingAgent && (hasFormChanges || hasWorkflowChanges) + + useEffect(() => { + onNeedsRepublishChange?.(!!needsRepublish) + }, [needsRepublish, onNeedsRepublishChange]) + + const authSchemeOptions: ComboboxOption[] = useMemo( + () => [ + { label: 'API Key', value: 'apiKey' }, + { label: 'None (Public)', value: 'none' }, + ], + [] + ) + + const canSave = name.trim().length > 0 && description.trim().length > 0 + useEffect(() => { + onCanSaveChange?.(canSave) + }, [canSave, onCanSaveChange]) + + const isSubmitting = + createAgent.isPending || + updateAgent.isPending || + deleteAgent.isPending || + publishAgent.isPending + + useEffect(() => { + onSubmittingChange?.(isSubmitting) + }, [isSubmitting, onSubmittingChange]) + + const handleCreateOrUpdate = useCallback(async () => { + const capabilities: AgentCapabilities = { + streaming: true, + pushNotifications: pushNotificationsEnabled, + stateTransitionHistory: true, + } + + const authentication: AgentAuthentication = { + schemes: authScheme === 'none' ? ['none'] : [authScheme], + } + + try { + if (existingAgent) { + await updateAgent.mutateAsync({ + agentId: existingAgent.id, + name: name.trim(), + description: description.trim() || undefined, + capabilities, + authentication, + skillTags, + }) + } else { + await createAgent.mutateAsync({ + workspaceId, + workflowId, + name: name.trim(), + description: description.trim() || undefined, + capabilities, + authentication, + skillTags, + }) + } + } catch (error) { + logger.error('Failed to save A2A agent:', error) + } + }, [ + existingAgent, + name, + description, + pushNotificationsEnabled, + authScheme, + skillTags, + workspaceId, + workflowId, + createAgent, + updateAgent, + ]) + + const handlePublish = useCallback(async () => { + if (!existingAgent) return + try { + await publishAgent.mutateAsync({ + agentId: existingAgent.id, + workspaceId, + action: 'publish', + }) + } catch (error) { + logger.error('Failed to publish A2A agent:', error) + } + }, [existingAgent, workspaceId, publishAgent]) + + const handleUnpublish = useCallback(async () => { + if (!existingAgent) return + try { + await publishAgent.mutateAsync({ + agentId: existingAgent.id, + workspaceId, + action: 'unpublish', + }) + } catch (error) { + logger.error('Failed to unpublish A2A agent:', error) + } + }, [existingAgent, workspaceId, publishAgent]) + + const handleDelete = useCallback(async () => { + if (!existingAgent) return + try { + await deleteAgent.mutateAsync({ + agentId: existingAgent.id, + workspaceId, + }) + setName(workflowName) + setDescription(workflowDescription || '') + } catch (error) { + logger.error('Failed to delete A2A agent:', error) + } + }, [existingAgent, workspaceId, deleteAgent, workflowName, workflowDescription]) + + const handlePublishNewAgent = useCallback(async () => { + const capabilities: AgentCapabilities = { + streaming: true, + pushNotifications: pushNotificationsEnabled, + stateTransitionHistory: true, + } + + const authentication: AgentAuthentication = { + schemes: authScheme === 'none' ? ['none'] : [authScheme], + } + + try { + if (!isDeployed && onDeployWorkflow) { + await onDeployWorkflow() + } + + const newAgent = await createAgent.mutateAsync({ + workspaceId, + workflowId, + name: name.trim(), + description: description.trim() || undefined, + capabilities, + authentication, + skillTags, + }) + + await publishAgent.mutateAsync({ + agentId: newAgent.id, + workspaceId, + action: 'publish', + }) + } catch (error) { + logger.error('Failed to publish A2A agent:', error) + } + }, [ + name, + description, + pushNotificationsEnabled, + authScheme, + skillTags, + workspaceId, + workflowId, + createAgent, + publishAgent, + isDeployed, + onDeployWorkflow, + ]) + + const handleUpdateAndRepublish = useCallback(async () => { + if (!existingAgent) return + + const capabilities: AgentCapabilities = { + streaming: true, + pushNotifications: pushNotificationsEnabled, + stateTransitionHistory: true, + } + + const authentication: AgentAuthentication = { + schemes: authScheme === 'none' ? ['none'] : [authScheme], + } + + try { + if (!isDeployed && onDeployWorkflow) { + await onDeployWorkflow() + } + + await updateAgent.mutateAsync({ + agentId: existingAgent.id, + name: name.trim(), + description: description.trim() || undefined, + capabilities, + authentication, + skillTags, + }) + + await publishAgent.mutateAsync({ + agentId: existingAgent.id, + workspaceId, + action: 'publish', + }) + } catch (error) { + logger.error('Failed to update and republish A2A agent:', error) + } + }, [ + existingAgent, + isDeployed, + onDeployWorkflow, + name, + description, + pushNotificationsEnabled, + authScheme, + skillTags, + workspaceId, + updateAgent, + publishAgent, + ]) + + const baseUrl = getBaseUrl() + const endpoint = existingAgent ? `${baseUrl}/api/a2a/serve/${existingAgent.id}` : null + + const additionalInputFields = useMemo(() => { + const allFields = normalizeInputFormatValue(startBlockInputFormat) + return allFields.filter( + (field): field is InputFormatField & { name: string } => + !!field.name && + field.name.toLowerCase() !== 'input' && + field.name.toLowerCase() !== 'data' && + field.name.toLowerCase() !== 'files' + ) + }, [startBlockInputFormat]) + + const getExampleInputData = useCallback((): Record => { + const data: Record = {} + for (const field of additionalInputFields) { + switch (field.type) { + case 'string': + data[field.name] = 'example' + break + case 'number': + data[field.name] = 42 + break + case 'boolean': + data[field.name] = true + break + case 'object': + data[field.name] = { key: 'value' } + break + case 'array': + data[field.name] = [1, 2, 3] + break + default: + data[field.name] = 'example' + } + } + return data + }, [additionalInputFields]) + + const getJsonRpcPayload = useCallback((): Record => { + const inputData = getExampleInputData() + const hasAdditionalData = Object.keys(inputData).length > 0 + + // Build parts array: TextPart for message text, DataPart for additional fields + const parts: Array> = [{ kind: 'text', text: 'Hello, agent!' }] + if (hasAdditionalData) { + parts.push({ kind: 'data', data: inputData }) + } + + return { + jsonrpc: '2.0', + id: '1', + method: useStreamingExample ? 'message/stream' : 'message/send', + params: { + message: { + role: 'user', + parts, + }, + }, + } + }, [getExampleInputData, useStreamingExample]) + + const getCurlCommand = useCallback((): string => { + if (!endpoint) return '' + const payload = getJsonRpcPayload() + const requiresAuth = authScheme !== 'none' + + switch (language) { + case 'curl': + return requiresAuth + ? `curl -X POST \\ + -H "X-API-Key: $SIM_API_KEY" \\ + -H "Content-Type: application/json" \\ + -d '${JSON.stringify(payload)}' \\ + ${endpoint}` + : `curl -X POST \\ + -H "Content-Type: application/json" \\ + -d '${JSON.stringify(payload)}' \\ + ${endpoint}` + + case 'python': + return requiresAuth + ? `import os +import requests + +response = requests.post( + "${endpoint}", + headers={ + "X-API-Key": os.environ.get("SIM_API_KEY"), + "Content-Type": "application/json" + }, + json=${JSON.stringify(payload, null, 4).replace(/\n/g, '\n ')} +) + +print(response.json())` + : `import requests + +response = requests.post( + "${endpoint}", + headers={"Content-Type": "application/json"}, + json=${JSON.stringify(payload, null, 4).replace(/\n/g, '\n ')} +) + +print(response.json())` + + case 'javascript': + return requiresAuth + ? `const response = await fetch("${endpoint}", { + method: "POST", + headers: { + "X-API-Key": process.env.SIM_API_KEY, + "Content-Type": "application/json" + }, + body: JSON.stringify(${JSON.stringify(payload)}) +}); + +const data = await response.json(); +console.log(data);` + : `const response = await fetch("${endpoint}", { + method: "POST", + headers: {"Content-Type": "application/json"}, + body: JSON.stringify(${JSON.stringify(payload)}) +}); + +const data = await response.json(); +console.log(data);` + + case 'typescript': + return requiresAuth + ? `const response = await fetch("${endpoint}", { + method: "POST", + headers: { + "X-API-Key": process.env.SIM_API_KEY, + "Content-Type": "application/json" + }, + body: JSON.stringify(${JSON.stringify(payload)}) +}); + +const data: Record = await response.json(); +console.log(data);` + : `const response = await fetch("${endpoint}", { + method: "POST", + headers: {"Content-Type": "application/json"}, + body: JSON.stringify(${JSON.stringify(payload)}) +}); + +const data: Record = await response.json(); +console.log(data);` + + default: + return '' + } + }, [endpoint, language, getJsonRpcPayload, authScheme]) + + const handleCopyCommand = useCallback(() => { + navigator.clipboard.writeText(getCurlCommand()) + setCopied(true) + setTimeout(() => setCopied(false), 2000) + }, [getCurlCommand]) + + if (isLoading) { + return ( +
+
+ + + +
+
+ + +
+
+ + +
+
+ + +
+
+ ) + } + + return ( +
{ + e.preventDefault() + handleCreateOrUpdate() + }} + className='-mx-1 space-y-[12px] overflow-y-auto px-1 pb-[16px]' + > + {/* Endpoint URL (shown when agent exists) */} + {existingAgent && endpoint && ( +
+ +
+
+ {baseUrl.replace(/^https?:\/\//, '')}/api/a2a/serve/ +
+
+ + + + + + + {copied ? 'Copied' : 'Copy'} + + +
+
+

+ The A2A endpoint URL where clients can discover and call your agent +

+
+ )} + + {/* Agent Name */} +
+ + setName(e.target.value)} + placeholder='Enter agent name' + required + /> +

+ Human-readable name shown in the Agent Card +

+
+ + {/* Description */} +
+ +