Skip to content

Commit 41a9bab

Browse files
committed
ack PR comments
1 parent 034ad83 commit 41a9bab

File tree

20 files changed

+202
-78
lines changed

20 files changed

+202
-78
lines changed

apps/sim/app/api/a2a/agents/[agentId]/route.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,14 @@ export async function PUT(request: NextRequest, { params }: { params: Promise<Ro
103103

104104
const body = await request.json()
105105

106+
if (
107+
body.skillTags !== undefined &&
108+
(!Array.isArray(body.skillTags) ||
109+
!body.skillTags.every((tag: unknown): tag is string => typeof tag === 'string'))
110+
) {
111+
return NextResponse.json({ error: 'skillTags must be an array of strings' }, { status: 400 })
112+
}
113+
106114
let skills = body.skills ?? existingAgent.skills
107115
if (body.skillTags !== undefined) {
108116
const agentName = body.name ?? existingAgent.name

apps/sim/app/api/a2a/agents/route.ts

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,14 @@ export async function POST(request: NextRequest) {
122122
)
123123
}
124124

125+
const workflowData = await loadWorkflowFromNormalizedTables(workflowId)
126+
if (!workflowData || !hasValidStartBlockInState(workflowData)) {
127+
return NextResponse.json(
128+
{ error: 'Workflow must have a Start block to be exposed as an A2A agent' },
129+
{ status: 400 }
130+
)
131+
}
132+
125133
const [existing] = await db
126134
.select({ id: a2aAgent.id })
127135
.from(a2aAgent)
@@ -135,14 +143,6 @@ export async function POST(request: NextRequest) {
135143
)
136144
}
137145

138-
const workflowData = await loadWorkflowFromNormalizedTables(workflowId)
139-
if (!workflowData || !hasValidStartBlockInState(workflowData)) {
140-
return NextResponse.json(
141-
{ error: 'Workflow must have a Start block to be exposed as an A2A agent' },
142-
{ status: 400 }
143-
)
144-
}
145-
146146
const skills = generateSkillsFromWorkflow(
147147
name || wf.name,
148148
description || wf.description,

apps/sim/app/api/a2a/serve/[agentId]/route.ts

Lines changed: 45 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -224,9 +224,8 @@ export async function POST(request: NextRequest, { params }: { params: Promise<R
224224
}
225225

226226
const { id, method, params: rpcParams } = body
227-
const apiKey =
228-
request.headers.get('X-API-Key') ||
229-
request.headers.get('Authorization')?.replace('Bearer ', '')
227+
// Only accept API keys from X-API-Key header to maintain proper auth boundaries
228+
const apiKey = request.headers.get('X-API-Key')
230229

231230
logger.info(`A2A request: ${method} for agent ${agentId}`)
232231

@@ -294,6 +293,9 @@ async function handleMessageSend(
294293
const taskId = message.taskId || generateTaskId()
295294
const contextId = message.contextId || uuidv4()
296295

296+
// Distributed lock to prevent concurrent task processing
297+
// Note: When Redis is unavailable, acquireLock returns true (degraded mode).
298+
// In production, ensure Redis is available for proper distributed locking.
297299
const lockKey = `a2a:task:${taskId}:lock`
298300
const lockValue = uuidv4()
299301
const acquired = await acquireLock(lockKey, lockValue, 60)
@@ -427,9 +429,14 @@ async function handleMessageSend(
427429

428430
return NextResponse.json(createResponse(id, task))
429431
} catch (error) {
430-
logger.error(`Error executing workflow for task ${taskId}:`, error)
432+
const isTimeout = error instanceof Error && error.name === 'TimeoutError'
433+
logger.error(`Error executing workflow for task ${taskId}:`, { error, isTimeout })
431434

432-
const errorMessage = error instanceof Error ? error.message : 'Workflow execution failed'
435+
const errorMessage = isTimeout
436+
? `Workflow execution timed out after ${A2A_DEFAULT_TIMEOUT}ms`
437+
: error instanceof Error
438+
? error.message
439+
: 'Workflow execution failed'
433440

434441
await db
435442
.update(a2aTask)
@@ -479,9 +486,14 @@ async function handleMessageStream(
479486
const contextId = message.contextId || uuidv4()
480487
const taskId = message.taskId || generateTaskId()
481488

489+
// Distributed lock to prevent concurrent task processing
490+
// Note: When Redis is unavailable, acquireLock returns true (degraded mode).
491+
// Lock timeout: 5 minutes for streaming to handle long-running workflows.
492+
// If a streaming request fails without releasing the lock, subsequent requests
493+
// will be blocked until timeout. The lock is released in finally block below.
482494
const lockKey = `a2a:task:${taskId}:lock`
483495
const lockValue = uuidv4()
484-
const acquired = await acquireLock(lockKey, lockValue, 300) // 5 minute timeout for streaming
496+
const acquired = await acquireLock(lockKey, lockValue, 300)
485497

486498
if (!acquired) {
487499
const encoder = new TextEncoder()
@@ -645,7 +657,11 @@ async function handleMessageStream(
645657
}
646658
}
647659

648-
const messageContent = finalContent || accumulatedContent || 'Task completed'
660+
// Use finalContent if available and non-empty, otherwise fall back to accumulated content
661+
const messageContent =
662+
(finalContent !== undefined && finalContent.length > 0
663+
? finalContent
664+
: accumulatedContent) || 'Task completed'
649665
const agentMessage = createAgentMessage(messageContent)
650666
agentMessage.taskId = taskId
651667
if (contextId) agentMessage.contextId = contextId
@@ -718,7 +734,14 @@ async function handleMessageStream(
718734
})
719735
}
720736
} catch (error) {
721-
logger.error(`Streaming error for task ${taskId}:`, error)
737+
const isTimeout = error instanceof Error && error.name === 'TimeoutError'
738+
logger.error(`Streaming error for task ${taskId}:`, { error, isTimeout })
739+
740+
const errorMessage = isTimeout
741+
? `Workflow execution timed out after ${A2A_DEFAULT_TIMEOUT}ms`
742+
: error instanceof Error
743+
? error.message
744+
: 'Streaming failed'
722745

723746
await db
724747
.update(a2aTask)
@@ -735,7 +758,7 @@ async function handleMessageStream(
735758

736759
sendEvent('error', {
737760
code: A2A_ERROR_CODES.INTERNAL_ERROR,
738-
message: error instanceof Error ? error.message : 'Streaming failed',
761+
message: errorMessage,
739762
})
740763
} finally {
741764
await releaseLock(lockKey, lockValue)
@@ -862,7 +885,7 @@ async function handleTaskCancel(id: string | number, params: TaskIdParams): Prom
862885
* Handle tasks/resubscribe - Reconnect to SSE stream for an ongoing task
863886
*/
864887
async function handleTaskResubscribe(
865-
_request: NextRequest,
888+
request: NextRequest,
866889
id: string | number,
867890
params: TaskIdParams
868891
): Promise<NextResponse> {
@@ -896,10 +919,20 @@ async function handleTaskResubscribe(
896919
let isCancelled = false
897920
let pollTimeoutId: ReturnType<typeof setTimeout> | null = null
898921

922+
// Listen for client disconnection via request signal
923+
const abortSignal = request.signal
924+
abortSignal.addEventListener('abort', () => {
925+
isCancelled = true
926+
if (pollTimeoutId) {
927+
clearTimeout(pollTimeoutId)
928+
pollTimeoutId = null
929+
}
930+
})
931+
899932
const stream = new ReadableStream({
900933
async start(controller) {
901934
const sendEvent = (event: string, data: unknown): boolean => {
902-
if (isCancelled) return false
935+
if (isCancelled || abortSignal.aborted) return false
903936
try {
904937
controller.enqueue(encoder.encode(`event: ${event}\ndata: ${JSON.stringify(data)}\n\n`))
905938
return true
@@ -935,7 +968,7 @@ async function handleTaskResubscribe(
935968

936969
let polls = 0
937970
const poll = async () => {
938-
if (isCancelled) {
971+
if (isCancelled || abortSignal.aborted) {
939972
cleanup()
940973
return
941974
}

apps/sim/app/api/a2a/serve/[agentId]/utils.ts

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -140,12 +140,22 @@ export function extractAgentContent(executeResult: {
140140
output?: { content?: string; [key: string]: unknown }
141141
error?: string
142142
}): string {
143-
return (
144-
executeResult.output?.content ||
145-
(typeof executeResult.output === 'object'
146-
? JSON.stringify(executeResult.output)
147-
: String(executeResult.output || executeResult.error || 'Task completed'))
148-
)
143+
// Prefer explicit content field
144+
if (executeResult.output?.content) {
145+
return executeResult.output.content
146+
}
147+
148+
// If output is an object with meaningful data, stringify it
149+
if (typeof executeResult.output === 'object' && executeResult.output !== null) {
150+
const keys = Object.keys(executeResult.output)
151+
// Skip empty objects or objects with only undefined values
152+
if (keys.length > 0 && keys.some((k) => executeResult.output![k] !== undefined)) {
153+
return JSON.stringify(executeResult.output)
154+
}
155+
}
156+
157+
// Fallback to error message or default
158+
return executeResult.error || 'Task completed'
149159
}
150160

151161
export function buildTaskResponse(params: {

apps/sim/app/api/tools/a2a/cancel-task/route.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
import type { Task } from '@a2a-js/sdk'
2-
import { ClientFactory } from '@a2a-js/sdk/client'
32
import { createLogger } from '@sim/logger'
43
import { type NextRequest, NextResponse } from 'next/server'
54
import { z } from 'zod'
5+
import { createA2AClient } from '@/lib/a2a/utils'
66
import { checkHybridAuth } from '@/lib/auth/hybrid'
77
import { generateRequestId } from '@/lib/core/utils/request'
88

@@ -41,8 +41,7 @@ export async function POST(request: NextRequest) {
4141
taskId: validatedData.taskId,
4242
})
4343

44-
const factory = new ClientFactory()
45-
const client = await factory.createFromUrl(validatedData.agentUrl)
44+
const client = await createA2AClient(validatedData.agentUrl, validatedData.apiKey)
4645

4746
const task = (await client.cancelTask({ id: validatedData.taskId })) as Task
4847

apps/sim/app/api/tools/a2a/delete-push-notification/route.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
import { ClientFactory } from '@a2a-js/sdk/client'
21
import { createLogger } from '@sim/logger'
32
import { type NextRequest, NextResponse } from 'next/server'
43
import { z } from 'zod'
4+
import { createA2AClient } from '@/lib/a2a/utils'
55
import { checkHybridAuth } from '@/lib/auth/hybrid'
66
import { generateRequestId } from '@/lib/core/utils/request'
77

@@ -51,8 +51,7 @@ export async function POST(request: NextRequest) {
5151
pushNotificationConfigId: validatedData.pushNotificationConfigId,
5252
})
5353

54-
const factory = new ClientFactory()
55-
const client = await factory.createFromUrl(validatedData.agentUrl)
54+
const client = await createA2AClient(validatedData.agentUrl, validatedData.apiKey)
5655

5756
await client.deleteTaskPushNotificationConfig({
5857
id: validatedData.taskId,

apps/sim/app/api/tools/a2a/get-agent-card/route.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
import { ClientFactory } from '@a2a-js/sdk/client'
21
import { createLogger } from '@sim/logger'
32
import { type NextRequest, NextResponse } from 'next/server'
43
import { z } from 'zod'
4+
import { createA2AClient } from '@/lib/a2a/utils'
55
import { checkHybridAuth } from '@/lib/auth/hybrid'
66
import { generateRequestId } from '@/lib/core/utils/request'
77

@@ -45,8 +45,7 @@ export async function POST(request: NextRequest) {
4545
agentUrl: validatedData.agentUrl,
4646
})
4747

48-
const factory = new ClientFactory()
49-
const client = await factory.createFromUrl(validatedData.agentUrl)
48+
const client = await createA2AClient(validatedData.agentUrl, validatedData.apiKey)
5049

5150
const agentCard = await client.getAgentCard()
5251

apps/sim/app/api/tools/a2a/get-push-notification/route.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
import { ClientFactory } from '@a2a-js/sdk/client'
21
import { createLogger } from '@sim/logger'
32
import { type NextRequest, NextResponse } from 'next/server'
43
import { z } from 'zod'
4+
import { createA2AClient } from '@/lib/a2a/utils'
55
import { checkHybridAuth } from '@/lib/auth/hybrid'
66
import { generateRequestId } from '@/lib/core/utils/request'
77

@@ -49,8 +49,7 @@ export async function POST(request: NextRequest) {
4949
taskId: validatedData.taskId,
5050
})
5151

52-
const factory = new ClientFactory()
53-
const client = await factory.createFromUrl(validatedData.agentUrl)
52+
const client = await createA2AClient(validatedData.agentUrl, validatedData.apiKey)
5453

5554
const result = await client.getTaskPushNotificationConfig({
5655
id: validatedData.taskId,

apps/sim/app/api/tools/a2a/get-task/route.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
import type { Task } from '@a2a-js/sdk'
2-
import { ClientFactory } from '@a2a-js/sdk/client'
32
import { createLogger } from '@sim/logger'
43
import { type NextRequest, NextResponse } from 'next/server'
54
import { z } from 'zod'
5+
import { createA2AClient } from '@/lib/a2a/utils'
66
import { checkHybridAuth } from '@/lib/auth/hybrid'
77
import { generateRequestId } from '@/lib/core/utils/request'
88

@@ -47,8 +47,7 @@ export async function POST(request: NextRequest) {
4747
historyLength: validatedData.historyLength,
4848
})
4949

50-
const factory = new ClientFactory()
51-
const client = await factory.createFromUrl(validatedData.agentUrl)
50+
const client = await createA2AClient(validatedData.agentUrl, validatedData.apiKey)
5251

5352
const task = (await client.getTask({
5453
id: validatedData.taskId,

apps/sim/app/api/tools/a2a/resubscribe/route.ts

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,10 @@ import type {
66
TaskState,
77
TaskStatusUpdateEvent,
88
} from '@a2a-js/sdk'
9-
import { ClientFactory } from '@a2a-js/sdk/client'
109
import { createLogger } from '@sim/logger'
1110
import { type NextRequest, NextResponse } from 'next/server'
1211
import { z } from 'zod'
13-
import { extractTextContent, isTerminalState } from '@/lib/a2a/utils'
12+
import { createA2AClient, extractTextContent, isTerminalState } from '@/lib/a2a/utils'
1413
import { checkHybridAuth } from '@/lib/auth/hybrid'
1514
import { generateRequestId } from '@/lib/core/utils/request'
1615

@@ -44,8 +43,7 @@ export async function POST(request: NextRequest) {
4443
const body = await request.json()
4544
const validatedData = A2AResubscribeSchema.parse(body)
4645

47-
const factory = new ClientFactory()
48-
const client = await factory.createFromUrl(validatedData.agentUrl)
46+
const client = await createA2AClient(validatedData.agentUrl, validatedData.apiKey)
4947

5048
const stream = client.resubscribeTask({ id: validatedData.taskId })
5149

0 commit comments

Comments
 (0)