1- import type { Artifact , Message , PushNotificationConfig , Task , TaskState } from '@a2a-js/sdk'
1+ import type { Artifact , Message , PushNotificationConfig , TaskState } from '@a2a-js/sdk'
22import { db } from '@sim/db'
33import { a2aAgent , a2aPushNotificationConfig , a2aTask , workflow } from '@sim/db/schema'
44import { createLogger } from '@sim/logger'
@@ -7,9 +7,13 @@ import { type NextRequest, NextResponse } from 'next/server'
77import { v4 as uuidv4 } from 'uuid'
88import { A2A_DEFAULT_TIMEOUT } from '@/lib/a2a/constants'
99import { notifyTaskStateChange } from '@/lib/a2a/push-notifications'
10- import { createAgentMessage , extractWorkflowInput , isTerminalState } from '@/lib/a2a/utils'
10+ import {
11+ createAgentMessage ,
12+ extractWorkflowInput ,
13+ isTerminalState ,
14+ parseWorkflowSSEChunk ,
15+ } from '@/lib/a2a/utils'
1116import { checkHybridAuth } from '@/lib/auth/hybrid'
12- import { generateInternalToken } from '@/lib/auth/internal'
1317import { getBrandConfig } from '@/lib/branding/branding'
1418import { acquireLock , getRedisClient , releaseLock } from '@/lib/core/config/redis'
1519import { SSE_HEADERS } from '@/lib/core/utils/sse'
@@ -18,9 +22,11 @@ import { markExecutionCancelled } from '@/lib/execution/cancellation'
1822import {
1923 A2A_ERROR_CODES ,
2024 A2A_METHODS ,
25+ buildExecuteRequest ,
26+ buildTaskResponse ,
2127 createError ,
2228 createResponse ,
23- createTaskStatus ,
29+ extractAgentContent ,
2430 formatTaskResponse ,
2531 generateTaskId ,
2632 isJSONRPCRequest ,
@@ -41,7 +47,7 @@ interface RouteParams {
4147/**
4248 * GET - Returns the Agent Card (discovery document)
4349 */
44- export async function GET ( request : NextRequest , { params } : { params : Promise < RouteParams > } ) {
50+ export async function GET ( _request : NextRequest , { params } : { params : Promise < RouteParams > } ) {
4551 const { agentId } = await params
4652
4753 const redis = getRedisClient ( )
@@ -346,21 +352,18 @@ async function handleMessageSend(
346352 } )
347353 }
348354
349- const executeUrl = `${ getBaseUrl ( ) } /api/workflows/${ agent . workflowId } /execute`
350- const headers : Record < string , string > = { 'Content-Type' : 'application/json' }
351- let useInternalAuth = false
352- if ( apiKey ) {
353- headers [ 'X-API-Key' ] = apiKey
354- } else {
355- const internalToken = await generateInternalToken ( )
356- headers . Authorization = `Bearer ${ internalToken } `
357- useInternalAuth = true
358- }
355+ const {
356+ url : executeUrl ,
357+ headers,
358+ useInternalAuth,
359+ } = await buildExecuteRequest ( {
360+ workflowId : agent . workflowId ,
361+ apiKey,
362+ } )
359363
360364 logger . info ( `Executing workflow ${ agent . workflowId } for A2A task ${ taskId } ` )
361365
362366 try {
363- // Extract workflow input from A2A message parts
364367 const workflowInput = extractWorkflowInput ( message )
365368 if ( ! workflowInput ) {
366369 return NextResponse . json (
@@ -388,12 +391,7 @@ async function handleMessageSend(
388391
389392 const finalState : TaskState = response . ok ? 'completed' : 'failed'
390393
391- const agentContent =
392- executeResult . output ?. content ||
393- ( typeof executeResult . output === 'object'
394- ? JSON . stringify ( executeResult . output )
395- : String ( executeResult . output || executeResult . error || 'Task completed' ) )
396-
394+ const agentContent = extractAgentContent ( executeResult )
397395 const agentMessage = createAgentMessage ( agentContent )
398396 agentMessage . taskId = taskId
399397 if ( contextId ) agentMessage . contextId = contextId
@@ -419,14 +417,13 @@ async function handleMessageSend(
419417 } )
420418 }
421419
422- const task : Task = {
423- kind : 'task' ,
424- id : taskId ,
420+ const task = buildTaskResponse ( {
421+ taskId,
425422 contextId,
426- status : createTaskStatus ( finalState ) ,
423+ state : finalState ,
427424 history,
428425 artifacts,
429- }
426+ } )
430427
431428 return NextResponse . json ( createResponse ( id , task ) )
432429 } catch ( error ) {
@@ -460,7 +457,7 @@ async function handleMessageSend(
460457 * Handle message/stream - Stream a message response (v0.3)
461458 */
462459async function handleMessageStream (
463- request : NextRequest ,
460+ _request : NextRequest ,
464461 id : string | number ,
465462 agent : {
466463 id : string
@@ -570,21 +567,16 @@ async function handleMessageStream(
570567 } )
571568
572569 try {
573- const executeUrl = `${ getBaseUrl ( ) } /api/workflows/${ agent . workflowId } /execute`
574- const headers : Record < string , string > = {
575- 'Content-Type' : 'application/json' ,
576- 'X-Stream-Response' : 'true' ,
577- }
578- let useInternalAuth = false
579- if ( apiKey ) {
580- headers [ 'X-API-Key' ] = apiKey
581- } else {
582- const internalToken = await generateInternalToken ( )
583- headers . Authorization = `Bearer ${ internalToken } `
584- useInternalAuth = true
585- }
570+ const {
571+ url : executeUrl ,
572+ headers,
573+ useInternalAuth,
574+ } = await buildExecuteRequest ( {
575+ workflowId : agent . workflowId ,
576+ apiKey,
577+ stream : true ,
578+ } )
586579
587- // Extract workflow input from A2A message parts
588580 const workflowInput = extractWorkflowInput ( message )
589581 if ( ! workflowInput ) {
590582 sendEvent ( 'error' , {
@@ -626,26 +618,35 @@ async function handleMessageStream(
626618 if ( response . body && isStreamingResponse ) {
627619 const reader = response . body . getReader ( )
628620 const decoder = new TextDecoder ( )
629- let fullContent = ''
621+ let accumulatedContent = ''
622+ let finalContent : string | undefined
630623
631624 while ( true ) {
632625 const { done, value } = await reader . read ( )
633626 if ( done ) break
634627
635- const chunk = decoder . decode ( value , { stream : true } )
636- fullContent += chunk
628+ const rawChunk = decoder . decode ( value , { stream : true } )
629+ const parsed = parseWorkflowSSEChunk ( rawChunk )
637630
638- sendEvent ( 'message' , {
639- kind : 'message' ,
640- taskId,
641- contextId,
642- role : 'agent' ,
643- parts : [ { kind : 'text' , text : chunk } ] ,
644- final : false ,
645- } )
631+ if ( parsed . content ) {
632+ accumulatedContent += parsed . content
633+ sendEvent ( 'message' , {
634+ kind : 'message' ,
635+ taskId,
636+ contextId,
637+ role : 'agent' ,
638+ parts : [ { kind : 'text' , text : parsed . content } ] ,
639+ final : false ,
640+ } )
641+ }
642+
643+ if ( parsed . finalContent ) {
644+ finalContent = parsed . finalContent
645+ }
646646 }
647647
648- const agentMessage = createAgentMessage ( fullContent || 'Task completed' )
648+ const messageContent = finalContent || accumulatedContent || 'Task completed'
649+ const agentMessage = createAgentMessage ( messageContent )
649650 agentMessage . taskId = taskId
650651 if ( contextId ) agentMessage . contextId = contextId
651652 history . push ( agentMessage )
@@ -674,11 +675,7 @@ async function handleMessageStream(
674675 } else {
675676 const result = await response . json ( )
676677
677- const content =
678- result . output ?. content ||
679- ( typeof result . output === 'object'
680- ? JSON . stringify ( result . output )
681- : String ( result . output || 'Task completed' ) )
678+ const content = extractAgentContent ( result )
682679
683680 sendEvent ( 'message' , {
684681 kind : 'message' ,
@@ -779,14 +776,13 @@ async function handleTaskGet(id: string | number, params: TaskIdParams): Promise
779776 } )
780777 }
781778
782- const taskResponse : Task = {
783- kind : 'task' ,
784- id : task . id ,
779+ const taskResponse = buildTaskResponse ( {
780+ taskId : task . id ,
785781 contextId : task . sessionId || task . id ,
786- status : createTaskStatus ( task . status as TaskState ) ,
782+ state : task . status as TaskState ,
787783 history : task . messages as Message [ ] ,
788784 artifacts : ( task . artifacts as Artifact [ ] ) || [ ] ,
789- }
785+ } )
790786
791787 const result = formatTaskResponse ( taskResponse , historyLength )
792788
@@ -851,14 +847,13 @@ async function handleTaskCancel(id: string | number, params: TaskIdParams): Prom
851847 } )
852848 } )
853849
854- const canceledTask : Task = {
855- kind : 'task' ,
856- id : task . id ,
850+ const canceledTask = buildTaskResponse ( {
851+ taskId : task . id ,
857852 contextId : task . sessionId || task . id ,
858- status : createTaskStatus ( 'canceled' ) ,
853+ state : 'canceled' ,
859854 history : task . messages as Message [ ] ,
860855 artifacts : ( task . artifacts as Artifact [ ] ) || [ ] ,
861- }
856+ } )
862857
863858 return NextResponse . json ( createResponse ( id , canceledTask ) )
864859}
@@ -887,14 +882,13 @@ async function handleTaskResubscribe(
887882 }
888883
889884 if ( isTerminalState ( task . status as TaskState ) ) {
890- const completedTask : Task = {
891- kind : 'task' ,
892- id : task . id ,
885+ const completedTask = buildTaskResponse ( {
886+ taskId : task . id ,
893887 contextId : task . sessionId || task . id ,
894- status : createTaskStatus ( task . status as TaskState ) ,
888+ state : task . status as TaskState ,
895889 history : task . messages as Message [ ] ,
896890 artifacts : ( task . artifacts as Artifact [ ] ) || [ ] ,
897- }
891+ } )
898892 return NextResponse . json ( createResponse ( id , completedTask ) )
899893 }
900894
0 commit comments