1- /**
2- * A2A Serve Endpoint (v0.3)
3- *
4- * Implements A2A protocol v0.3 for workflow agents using the SDK server components.
5- * Handles JSON-RPC 2.0 requests for message sending, task management, and push notifications.
6- */
7-
81import type { Artifact , Message , PushNotificationConfig , Task , TaskState } from '@a2a-js/sdk'
92import { db } from '@sim/db'
103import { a2aAgent , a2aPushNotificationConfig , a2aTask , workflow } from '@sim/db/schema'
@@ -14,8 +7,10 @@ import { type NextRequest, NextResponse } from 'next/server'
147import { v4 as uuidv4 } from 'uuid'
158import { A2A_DEFAULT_TIMEOUT } from '@/lib/a2a/constants'
169import { notifyTaskStateChange } from '@/lib/a2a/push-notifications'
17- import { createAgentMessage , extractTextContent , isTerminalState } from '@/lib/a2a/utils'
10+ import { createAgentMessage , extractWorkflowInput , isTerminalState } from '@/lib/a2a/utils'
1811import { checkHybridAuth } from '@/lib/auth/hybrid'
12+ import { generateInternalToken } from '@/lib/auth/internal'
13+ import { getBrandConfig } from '@/lib/branding/branding'
1914import { acquireLock , getRedisClient , releaseLock } from '@/lib/core/config/redis'
2015import { SSE_HEADERS } from '@/lib/core/utils/sse'
2116import { getBaseUrl } from '@/lib/core/utils/urls'
@@ -32,7 +27,7 @@ import {
3227 type MessageSendParams ,
3328 type PushNotificationSetParams ,
3429 type TaskIdParams ,
35- } from '. /utils'
30+ } from '@/app/api/a2a/serve/[agentId] /utils'
3631
3732const logger = createLogger ( 'A2AServeAPI' )
3833
@@ -94,22 +89,41 @@ export async function GET(request: NextRequest, { params }: { params: Promise<Ro
9489 }
9590
9691 const baseUrl = getBaseUrl ( )
92+ const brandConfig = getBrandConfig ( )
93+
94+ const authConfig = agent . authentication as { schemes ?: string [ ] } | undefined
95+ const schemes = authConfig ?. schemes || [ ]
96+ const isPublic = schemes . includes ( 'none' )
9797
9898 const agentCard = {
99+ protocolVersion : '0.3.0' ,
99100 name : agent . name ,
100- description : agent . description ,
101+ description : agent . description || '' ,
101102 url : `${ baseUrl } /api/a2a/serve/${ agent . id } ` ,
102103 version : agent . version ,
104+ preferredTransport : 'JSONRPC' ,
103105 documentationUrl : `${ baseUrl } /docs/a2a` ,
104106 provider : {
105- organization : 'Sim Studio' ,
107+ organization : brandConfig . name ,
106108 url : baseUrl ,
107109 } ,
108110 capabilities : agent . capabilities ,
109- skills : agent . skills ,
110- authentication : agent . authentication ,
111- defaultInputModes : [ 'text' ] ,
112- defaultOutputModes : [ 'text' ] ,
111+ skills : agent . skills || [ ] ,
112+ ...( isPublic
113+ ? { }
114+ : {
115+ securitySchemes : {
116+ apiKey : {
117+ type : 'apiKey' as const ,
118+ name : 'X-API-Key' ,
119+ in : 'header' as const ,
120+ description : 'API key authentication' ,
121+ } ,
122+ } ,
123+ security : [ { apiKey : [ ] } ] ,
124+ } ) ,
125+ defaultInputModes : [ 'text/plain' , 'application/json' ] ,
126+ defaultOutputModes : [ 'text/plain' , 'application/json' ] ,
113127 }
114128
115129 if ( redis ) {
@@ -148,6 +162,7 @@ export async function POST(request: NextRequest, { params }: { params: Promise<R
148162 workspaceId : a2aAgent . workspaceId ,
149163 isPublished : a2aAgent . isPublished ,
150164 capabilities : a2aAgent . capabilities ,
165+ authentication : a2aAgent . authentication ,
151166 } )
152167 . from ( a2aAgent )
153168 . where ( eq ( a2aAgent . id , agentId ) )
@@ -167,12 +182,17 @@ export async function POST(request: NextRequest, { params }: { params: Promise<R
167182 )
168183 }
169184
170- const auth = await checkHybridAuth ( request , { requireWorkflowId : false } )
171- if ( ! auth . success || ! auth . userId ) {
172- return NextResponse . json (
173- createError ( null , A2A_ERROR_CODES . AUTHENTICATION_REQUIRED , 'Unauthorized' ) ,
174- { status : 401 }
175- )
185+ const authSchemes = ( agent . authentication as { schemes ?: string [ ] } ) ?. schemes || [ ]
186+ const requiresAuth = ! authSchemes . includes ( 'none' )
187+
188+ if ( requiresAuth ) {
189+ const auth = await checkHybridAuth ( request , { requireWorkflowId : false } )
190+ if ( ! auth . success || ! auth . userId ) {
191+ return NextResponse . json (
192+ createError ( null , A2A_ERROR_CODES . AUTHENTICATION_REQUIRED , 'Unauthorized' ) ,
193+ { status : 401 }
194+ )
195+ }
176196 }
177197
178198 const [ wf ] = await db
@@ -266,7 +286,7 @@ async function handleMessageSend(
266286
267287 const message = params . message
268288 const taskId = message . taskId || generateTaskId ( )
269- const contextId = message . contextId || uuidv4 ( ) // Generate contextId if not provided
289+ const contextId = message . contextId || uuidv4 ( )
270290
271291 const lockKey = `a2a:task:${ taskId } :lock`
272292 const lockValue = uuidv4 ( )
@@ -328,19 +348,38 @@ async function handleMessageSend(
328348
329349 const executeUrl = `${ getBaseUrl ( ) } /api/workflows/${ agent . workflowId } /execute`
330350 const headers : Record < string , string > = { 'Content-Type' : 'application/json' }
331- if ( apiKey ) headers [ 'X-API-Key' ] = apiKey
351+ let useInternalAuth = false
352+ if ( apiKey ) {
353+ headers [ 'X-API-Key' ] = apiKey
354+ } else {
355+ const internalToken = await generateInternalToken ( )
356+ headers . Authorization = `Bearer ${ internalToken } `
357+ useInternalAuth = true
358+ }
332359
333360 logger . info ( `Executing workflow ${ agent . workflowId } for A2A task ${ taskId } ` )
334361
335362 try {
336- const messageText = extractTextContent ( message )
363+ // Extract workflow input from A2A message parts
364+ const workflowInput = extractWorkflowInput ( message )
365+ if ( ! workflowInput ) {
366+ return NextResponse . json (
367+ createError (
368+ id ,
369+ A2A_ERROR_CODES . INVALID_PARAMS ,
370+ 'Message must contain at least one part with content'
371+ ) ,
372+ { status : 400 }
373+ )
374+ }
337375
338376 const response = await fetch ( executeUrl , {
339377 method : 'POST' ,
340378 headers,
341379 body : JSON . stringify ( {
342- input : messageText ,
380+ ... workflowInput ,
343381 triggerType : 'api' ,
382+ ...( useInternalAuth && { workflowId : agent . workflowId } ) ,
344383 } ) ,
345384 signal : AbortSignal . timeout ( A2A_DEFAULT_TIMEOUT ) ,
346385 } )
@@ -440,7 +479,27 @@ async function handleMessageStream(
440479 }
441480
442481 const message = params . message
443- const contextId = message . contextId || uuidv4 ( ) // Generate contextId if not provided
482+ const contextId = message . contextId || uuidv4 ( )
483+ const taskId = message . taskId || generateTaskId ( )
484+
485+ const lockKey = `a2a:task:${ taskId } :lock`
486+ const lockValue = uuidv4 ( )
487+ const acquired = await acquireLock ( lockKey , lockValue , 300 ) // 5 minute timeout for streaming
488+
489+ if ( ! acquired ) {
490+ const encoder = new TextEncoder ( )
491+ const errorStream = new ReadableStream ( {
492+ start ( controller ) {
493+ controller . enqueue (
494+ encoder . encode (
495+ `event: error\ndata: ${ JSON . stringify ( { code : A2A_ERROR_CODES . INTERNAL_ERROR , message : 'Task is currently being processed' } ) } \n\n`
496+ )
497+ )
498+ controller . close ( )
499+ } ,
500+ } )
501+ return new NextResponse ( errorStream , { headers : SSE_HEADERS } )
502+ }
444503
445504 let history : Message [ ] = [ ]
446505 let existingTask : typeof a2aTask . $inferSelect | null = null
@@ -450,12 +509,14 @@ async function handleMessageStream(
450509 existingTask = found || null
451510
452511 if ( ! existingTask ) {
512+ await releaseLock ( lockKey , lockValue )
453513 return NextResponse . json ( createError ( id , A2A_ERROR_CODES . TASK_NOT_FOUND , 'Task not found' ) , {
454514 status : 404 ,
455515 } )
456516 }
457517
458518 if ( isTerminalState ( existingTask . status as TaskState ) ) {
519+ await releaseLock ( lockKey , lockValue )
459520 return NextResponse . json (
460521 createError ( id , A2A_ERROR_CODES . TASK_ALREADY_COMPLETE , 'Task already in terminal state' ) ,
461522 { status : 400 }
@@ -465,7 +526,6 @@ async function handleMessageStream(
465526 history = existingTask . messages as Message [ ]
466527 }
467528
468- const taskId = message . taskId || generateTaskId ( )
469529 history . push ( message )
470530
471531 if ( existingTask ) {
@@ -515,17 +575,35 @@ async function handleMessageStream(
515575 'Content-Type' : 'application/json' ,
516576 'X-Stream-Response' : 'true' ,
517577 }
518- if ( apiKey ) headers [ 'X-API-Key' ] = apiKey
578+ let useInternalAuth = false
579+ if ( apiKey ) {
580+ headers [ 'X-API-Key' ] = apiKey
581+ } else {
582+ const internalToken = await generateInternalToken ( )
583+ headers . Authorization = `Bearer ${ internalToken } `
584+ useInternalAuth = true
585+ }
519586
520- const messageText = extractTextContent ( message )
587+ // Extract workflow input from A2A message parts
588+ const workflowInput = extractWorkflowInput ( message )
589+ if ( ! workflowInput ) {
590+ sendEvent ( 'error' , {
591+ code : A2A_ERROR_CODES . INVALID_PARAMS ,
592+ message : 'Message must contain at least one part with content' ,
593+ } )
594+ await releaseLock ( lockKey , lockValue )
595+ controller . close ( )
596+ return
597+ }
521598
522599 const response = await fetch ( executeUrl , {
523600 method : 'POST' ,
524601 headers,
525602 body : JSON . stringify ( {
526- input : messageText ,
603+ ... workflowInput ,
527604 triggerType : 'api' ,
528605 stream : true ,
606+ ...( useInternalAuth && { workflowId : agent . workflowId } ) ,
529607 } ) ,
530608 signal : AbortSignal . timeout ( A2A_DEFAULT_TIMEOUT ) ,
531609 } )
@@ -663,6 +741,7 @@ async function handleMessageStream(
663741 message : error instanceof Error ? error . message : 'Streaming failed' ,
664742 } )
665743 } finally {
744+ await releaseLock ( lockKey , lockValue )
666745 controller . close ( )
667746 }
668747 } ,
@@ -788,7 +867,7 @@ async function handleTaskCancel(id: string | number, params: TaskIdParams): Prom
788867 * Handle tasks/resubscribe - Reconnect to SSE stream for an ongoing task
789868 */
790869async function handleTaskResubscribe (
791- request : NextRequest ,
870+ _request : NextRequest ,
792871 id : string | number ,
793872 params : TaskIdParams
794873) : Promise < NextResponse > {
0 commit comments