-
Notifications
You must be signed in to change notification settings - Fork 3.2k
feat(a2a): added a2a protocol #2784
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: staging
Are you sure you want to change the base?
Conversation
|
The latest updates on your projects. Learn more about Vercel for GitHub. |
Greptile OverviewGreptile SummaryThis PR introduces comprehensive support for the A2A (Agent-to-Agent) protocol v0.3, enabling Sim Studio workflows to be exposed as A2A-compatible agents and interact with external A2A agents. Key AdditionsCore Protocol Implementation:
Database Schema:
Authentication & Security:
Developer Experience:
Issues IdentifiedCritical (Logic):
Important (Logic): Best Practices (Style):
Architectural Strengths
Confidence Score: 3/5
What's Good (+):
Issues Requiring Attention (-):
Why not higher: The authentication and concurrency issues are functional problems that could impact production usage. The API key extraction flaw is a security risk that should be fixed. Why not lower: The core implementation is sound, changes are well-tested manually, and most issues are fixable without major refactoring. The architecture supports the requirements well.
Important Files ChangedFile Analysis
Sequence DiagramsequenceDiagram
participant Client as External A2A Client
participant API as /api/a2a/serve/[agentId]
participant Auth as Authentication
participant DB as Database
participant Redis as Redis (Optional)
participant Workflow as Workflow Executor
participant Trigger as Trigger.dev (Optional)
participant Webhook as Client Webhook
Note over Client,Webhook: A2A Message Send Flow (message/send)
Client->>API: POST JSON-RPC request (message/send)
API->>Auth: Check authentication (session/API key/internal JWT)
Auth-->>API: Authentication result
alt Authentication Failed
API-->>Client: 401 Unauthorized
end
API->>DB: Query agent and workflow
alt Agent not published or workflow not deployed
API-->>Client: 404 Agent unavailable
end
API->>Redis: Acquire task lock (if Redis available)
alt Lock not acquired
API-->>Client: 409 Task being processed
end
API->>DB: Create or update task (status: working)
API->>Workflow: POST /api/workflows/[id]/execute
Note right of Workflow: Execute with workflow input,<br/>files, and data parts
alt Workflow execution succeeds
Workflow-->>API: Success response with output
API->>DB: Update task (status: completed, save result)
API->>Trigger: Queue push notification (if configured)
Trigger->>Webhook: POST task update notification
API-->>Client: Task response (completed)
else Workflow execution fails
Workflow-->>API: Error response
API->>DB: Update task (status: failed)
API->>Trigger: Queue push notification (if configured)
API-->>Client: Error response
end
API->>Redis: Release task lock
Note over Client,Webhook: A2A Streaming Flow (message/stream)
Client->>API: POST JSON-RPC request (message/stream)
API->>Auth: Check authentication
API->>DB: Query agent and workflow
API->>Redis: Acquire task lock (5min timeout)
API->>DB: Create or update task
API->>Workflow: POST /api/workflows/[id]/execute (stream=true)
Note over API: Opens SSE stream
loop Stream chunks
Workflow-->>API: SSE chunk events
API-->>Client: Forward as A2A SSE events
end
Workflow-->>API: Final event (completed)
API->>DB: Update task (status: completed)
API->>Trigger: Queue push notification
API->>Redis: Release task lock
API-->>Client: Close stream
Note over Client,Webhook: Task Query Flow (tasks/get)
Client->>API: POST JSON-RPC request (tasks/get)
API->>DB: Query task by ID
alt Task found
API-->>Client: Task with history/artifacts
else Task not found
API-->>Client: 404 Task not found
end
Note over Client,Webhook: Agent Discovery Flow (GET /api/a2a/serve/[agentId])
Client->>API: GET request for agent card
API->>Redis: Check cache (if available)
alt Cache hit
API-->>Client: Cached agent card
else Cache miss
API->>DB: Query agent details
API->>Redis: Store in cache (1 hour TTL)
API-->>Client: Agent card with capabilities/skills
end
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
22 files reviewed, 22 comments
| signal: AbortSignal.timeout(A2A_DEFAULT_TIMEOUT), | ||
| }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The AbortSignal.timeout(A2A_DEFAULT_TIMEOUT) will throw an error when the timeout is reached, but this error is caught in the catch block at line 429. However, if the fetch is aborted due to timeout, the agent message added to history (line 395-398) will not reflect the actual failure state.
The task should be marked as 'failed' with an appropriate timeout message rather than allowing the catch block to handle it generically. Consider checking if the error is a timeout error specifically:
| signal: AbortSignal.timeout(A2A_DEFAULT_TIMEOUT), | |
| }) | |
| } catch (error) { | |
| const isTimeout = error instanceof Error && error.name === 'TimeoutError' | |
| logger.error(`Error executing workflow for task ${taskId}:`, { error, isTimeout }) | |
| const errorMessage = isTimeout | |
| ? `Workflow execution timed out after ${A2A_DEFAULT_TIMEOUT}ms` | |
| : (error instanceof Error ? error.message : 'Workflow execution failed') |
Prompt To Fix With AI
This is a comment left during a code review.
Path: apps/sim/app/api/a2a/serve/[agentId]/route.ts
Line: 387:388
Comment:
The `AbortSignal.timeout(A2A_DEFAULT_TIMEOUT)` will throw an error when the timeout is reached, but this error is caught in the catch block at line 429. However, if the fetch is aborted due to timeout, the agent message added to history (line 395-398) will not reflect the actual failure state.
The task should be marked as 'failed' with an appropriate timeout message rather than allowing the catch block to handle it generically. Consider checking if the error is a timeout error specifically:
```suggestion
} catch (error) {
const isTimeout = error instanceof Error && error.name === 'TimeoutError'
logger.error(`Error executing workflow for task ${taskId}:`, { error, isTimeout })
const errorMessage = isTimeout
? `Workflow execution timed out after ${A2A_DEFAULT_TIMEOUT}ms`
: (error instanceof Error ? error.message : 'Workflow execution failed')
```
How can I resolve this? If you propose a fix, please make it concise.| const lockValue = uuidv4() | ||
| const acquired = await acquireLock(lockKey, lockValue, 60) | ||
|
|
||
| if (!acquired) { | ||
| return NextResponse.json( | ||
| createError(id, A2A_ERROR_CODES.INTERNAL_ERROR, 'Task is currently being processed'), | ||
| { status: 409 } | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Race condition: If two requests arrive simultaneously for the same task, both could pass the acquireLock check before either updates the database. While Redis locking is attempted, when Redis is unavailable (line 92 in redis.ts returns true), the lock is not actually acquired.
Without Redis, concurrent requests could:
- Both read the same task state
- Both attempt to execute the workflow
- Both update the task with different execution results
This is particularly problematic for terminal state checks. Consider adding a database-level check after acquiring the lock or implementing optimistic locking with a version field in the task table.
Prompt To Fix With AI
This is a comment left during a code review.
Path: apps/sim/app/api/a2a/serve/[agentId]/route.ts
Line: 298:305
Comment:
Race condition: If two requests arrive simultaneously for the same task, both could pass the `acquireLock` check before either updates the database. While Redis locking is attempted, when Redis is unavailable (line 92 in redis.ts returns `true`), the lock is not actually acquired.
Without Redis, concurrent requests could:
1. Both read the same task state
2. Both attempt to execute the workflow
3. Both update the task with different execution results
This is particularly problematic for terminal state checks. Consider adding a database-level check after acquiring the lock or implementing optimistic locking with a version field in the task table.
How can I resolve this? If you propose a fix, please make it concise.| if (isTerminalState(finalState)) { | ||
| notifyTaskStateChange(taskId, finalState).catch((err) => { | ||
| logger.error('Failed to trigger push notification', { taskId, error: err }) | ||
| }) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The push notification is triggered with notifyTaskStateChange but errors are only logged and not handled. If the push notification delivery fails silently, external clients won't receive task completion updates.
While this is acceptable for fire-and-forget notifications, consider:
- At least tracking failed notification attempts in metrics/logs for monitoring
- Adding a retry mechanism for critical notifications
- Documenting that push notifications are best-effort delivery
Prompt To Fix With AI
This is a comment left during a code review.
Path: apps/sim/app/api/a2a/serve/[agentId]/route.ts
Line: 414:418
Comment:
The push notification is triggered with `notifyTaskStateChange` but errors are only logged and not handled. If the push notification delivery fails silently, external clients won't receive task completion updates.
While this is acceptable for fire-and-forget notifications, consider:
1. At least tracking failed notification attempts in metrics/logs for monitoring
2. Adding a retry mechanism for critical notifications
3. Documenting that push notifications are best-effort delivery
How can I resolve this? If you propose a fix, please make it concise.| } | ||
|
|
||
| const messageContent = finalContent || accumulatedContent || 'Task completed' | ||
| const agentMessage = createAgentMessage(messageContent) | ||
| agentMessage.taskId = taskId | ||
| if (contextId) agentMessage.contextId = contextId | ||
| history.push(agentMessage) | ||
|
|
||
| await db | ||
| .update(a2aTask) | ||
| .set({ | ||
| status: 'completed', | ||
| messages: history, | ||
| completedAt: new Date(), | ||
| updatedAt: new Date(), | ||
| }) | ||
| .where(eq(a2aTask.id, taskId)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the streaming handler, when parsing SSE chunks, if parsed.finalContent is available but empty, it will overwrite the accumulatedContent. This could result in losing streamed content if the final event has an empty content field.
| } | |
| const messageContent = finalContent || accumulatedContent || 'Task completed' | |
| const agentMessage = createAgentMessage(messageContent) | |
| agentMessage.taskId = taskId | |
| if (contextId) agentMessage.contextId = contextId | |
| history.push(agentMessage) | |
| await db | |
| .update(a2aTask) | |
| .set({ | |
| status: 'completed', | |
| messages: history, | |
| completedAt: new Date(), | |
| updatedAt: new Date(), | |
| }) | |
| .where(eq(a2aTask.id, taskId)) | |
| const messageContent = finalContent || accumulatedContent || 'Task completed' |
Consider: const messageContent = (finalContent !== undefined ? finalContent : accumulatedContent) || 'Task completed' to properly handle empty strings vs undefined.
Prompt To Fix With AI
This is a comment left during a code review.
Path: apps/sim/app/api/a2a/serve/[agentId]/route.ts
Line: 646:662
Comment:
In the streaming handler, when parsing SSE chunks, if `parsed.finalContent` is available but empty, it will overwrite the `accumulatedContent`. This could result in losing streamed content if the final event has an empty content field.
```suggestion
const messageContent = finalContent || accumulatedContent || 'Task completed'
```
Consider: `const messageContent = (finalContent !== undefined ? finalContent : accumulatedContent) || 'Task completed'` to properly handle empty strings vs undefined.
How can I resolve this? If you propose a fix, please make it concise.| const pollInterval = 3000 // 3 seconds (reduced from 1s to lower DB load) | ||
| const maxPolls = 100 // 5 minutes max (100 * 3s = 300s) | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The polling mechanism in handleTaskResubscribe uses a 3-second interval with a maximum of 100 polls (5 minutes total). However, there's no graceful degradation if the client disconnects early.
The cancel() method is implemented (line 1031-1037), but consider adding a connection check or using request.signal to detect early disconnection and stop polling immediately to reduce unnecessary database load.
Prompt To Fix With AI
This is a comment left during a code review.
Path: apps/sim/app/api/a2a/serve/[agentId]/route.ts
Line: 933:935
Comment:
The polling mechanism in `handleTaskResubscribe` uses a 3-second interval with a maximum of 100 polls (5 minutes total). However, there's no graceful degradation if the client disconnects early.
The `cancel()` method is implemented (line 1031-1037), but consider adding a connection check or using `request.signal` to detect early disconnection and stop polling immediately to reduce unnecessary database load.
How can I resolve this? If you propose a fix, please make it concise.| * @returns The workspace if found, null otherwise | ||
| */ | ||
| export async function getWorkspaceById(workspaceId: string): Promise<WorkspaceBasic | null> { | ||
| const [ws] = await db | ||
| .select({ id: workspace.id }) | ||
| .from(workspace) | ||
| .where(eq(workspace.id, workspaceId)) | ||
| .limit(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The getWorkspaceById function only selects the id field, which is already provided as input. This seems redundant - the function is essentially checking existence.
Consider renaming this to workspaceExists and returning a boolean directly, or if you need the workspace object for future use, document why only the ID is selected:
| * @returns The workspace if found, null otherwise | |
| */ | |
| export async function getWorkspaceById(workspaceId: string): Promise<WorkspaceBasic | null> { | |
| const [ws] = await db | |
| .select({ id: workspace.id }) | |
| .from(workspace) | |
| .where(eq(workspace.id, workspaceId)) | |
| .limit(1) | |
| /** | |
| * Check if a workspace exists by ID | |
| * | |
| * @param workspaceId - The workspace ID to check | |
| * @returns True if workspace exists, false otherwise | |
| */ | |
| export async function workspaceExists(workspaceId: string): Promise<boolean> { | |
| const [ws] = await db | |
| .select({ id: workspace.id }) | |
| .from(workspace) | |
| .where(eq(workspace.id, workspaceId)) | |
| .limit(1) | |
| return !!ws | |
| } |
Note: There's already a workspaceExists function at line 59-62, but it calls getWorkspaceById which seems circular.
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
Prompt To Fix With AI
This is a comment left during a code review.
Path: apps/sim/lib/workspaces/permissions/utils.ts
Line: 26:33
Comment:
The `getWorkspaceById` function only selects the `id` field, which is already provided as input. This seems redundant - the function is essentially checking existence.
Consider renaming this to `workspaceExists` and returning a boolean directly, or if you need the workspace object for future use, document why only the ID is selected:
```suggestion
/**
* Check if a workspace exists by ID
*
* @param workspaceId - The workspace ID to check
* @returns True if workspace exists, false otherwise
*/
export async function workspaceExists(workspaceId: string): Promise<boolean> {
const [ws] = await db
.select({ id: workspace.id })
.from(workspace)
.where(eq(workspace.id, workspaceId))
.limit(1)
return !!ws
}
```
Note: There's already a `workspaceExists` function at line 59-62, but it calls `getWorkspaceById` which seems circular.
<sub>Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!</sub>
How can I resolve this? If you propose a fix, please make it concise.| export const a2aPushNotificationTask = task({ | ||
| id: 'a2a-push-notification-delivery', | ||
| retry: { | ||
| maxAttempts: 5, | ||
| minTimeoutInMs: 1000, | ||
| maxTimeoutInMs: 60000, | ||
| factor: 2, | ||
| }, | ||
| run: async (params: A2APushNotificationParams) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Trigger.dev task configuration has a maximum of 5 retry attempts with exponential backoff (factor: 2), which means:
- Attempt 1: immediate
- Attempt 2: ~1s later
- Attempt 3: ~2s later
- Attempt 4: ~4s later
- Attempt 5: ~8s later
Total: ~15 seconds of retries. For webhook deliveries, this might be too aggressive if the recipient server is temporarily down. Consider:
- Increasing maxTimeoutInMs to allow longer delays between retries
- Reducing maxAttempts but with longer delays
- Adding jitter to prevent thundering herd if many notifications fail simultaneously
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
Prompt To Fix With AI
This is a comment left during a code review.
Path: apps/sim/background/a2a-push-notification-delivery.ts
Line: 13:21
Comment:
The Trigger.dev task configuration has a maximum of 5 retry attempts with exponential backoff (factor: 2), which means:
- Attempt 1: immediate
- Attempt 2: ~1s later
- Attempt 3: ~2s later
- Attempt 4: ~4s later
- Attempt 5: ~8s later
Total: ~15 seconds of retries. For webhook deliveries, this might be too aggressive if the recipient server is temporarily down. Consider:
1. Increasing maxTimeoutInMs to allow longer delays between retries
2. Reducing maxAttempts but with longer delays
3. Adding jitter to prevent thundering herd if many notifications fail simultaneously
<sub>Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!</sub>
How can I resolve this? If you propose a fix, please make it concise.| const acquired = await acquireLock(lockKey, lockValue, 300) // 5 minute timeout for streaming | ||
|
|
||
| if (!acquired) { | ||
| const encoder = new TextEncoder() | ||
| const errorStream = new ReadableStream({ | ||
| start(controller) { | ||
| controller.enqueue( | ||
| encoder.encode( | ||
| `event: error\ndata: ${JSON.stringify({ code: A2A_ERROR_CODES.INTERNAL_ERROR, message: 'Task is currently being processed' })}\n\n` | ||
| ) | ||
| ) | ||
| controller.close() | ||
| }, | ||
| }) | ||
| return new NextResponse(errorStream, { headers: SSE_HEADERS }) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When the streaming lock cannot be acquired (line 486), an error stream is created and returned. However, the lock timeout for streaming is set to 300 seconds (5 minutes, line 484), which is quite long.
If a streaming request hangs or fails without properly releasing the lock, subsequent streaming requests for the same task will be blocked for up to 5 minutes. Consider:
- Using a shorter lock timeout for streaming operations
- Implementing a heartbeat mechanism to extend the lock while streaming is active
- Adding cleanup logic to detect and remove stale locks
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
Prompt To Fix With AI
This is a comment left during a code review.
Path: apps/sim/app/api/a2a/serve/[agentId]/route.ts
Line: 484:499
Comment:
When the streaming lock cannot be acquired (line 486), an error stream is created and returned. However, the lock timeout for streaming is set to 300 seconds (5 minutes, line 484), which is quite long.
If a streaming request hangs or fails without properly releasing the lock, subsequent streaming requests for the same task will be blocked for up to 5 minutes. Consider:
1. Using a shorter lock timeout for streaming operations
2. Implementing a heartbeat mechanism to extend the lock while streaming is active
3. Adding cleanup logic to detect and remove stale locks
<sub>Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!</sub>
How can I resolve this? If you propose a fix, please make it concise.| sessionIdIdx: index('a2a_task_session_id_idx').on(table.sessionId), | ||
| statusIdx: index('a2a_task_status_idx').on(table.status), | ||
| executionIdIdx: index('a2a_task_execution_id_idx').on(table.executionId), | ||
| createdAtIdx: index('a2a_task_created_at_idx').on(table.createdAt), | ||
| }) | ||
| ) | ||
|
|
||
| /** | ||
| * A2A Push Notification Config - Webhook configuration for task updates | ||
| * Stores push notification webhooks for async task updates | ||
| */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The a2aTask table stores messages as JSONB with a default of '[]', but there's no database-level constraint on array size. With A2A_MAX_HISTORY_LENGTH set to 100 in constants.ts, message arrays could grow unbounded if the application-level truncation fails.
Consider:
- Adding application-level cleanup of old messages when updating tasks
- Documenting the expected maximum size for capacity planning
- Adding monitoring for tasks with excessive message counts
Prompt To Fix With AI
This is a comment left during a code review.
Path: packages/db/schema.ts
Line: 1892:1902
Comment:
The `a2aTask` table stores `messages` as JSONB with a default of `'[]'`, but there's no database-level constraint on array size. With `A2A_MAX_HISTORY_LENGTH` set to 100 in constants.ts, message arrays could grow unbounded if the application-level truncation fails.
Consider:
1. Adding application-level cleanup of old messages when updating tasks
2. Documenting the expected maximum size for capacity planning
3. Adding monitoring for tasks with excessive message counts
How can I resolve this? If you propose a fix, please make it concise.| export function extractAgentContent(executeResult: { | ||
| output?: { content?: string; [key: string]: unknown } | ||
| error?: string | ||
| }): string { | ||
| return ( | ||
| executeResult.output?.content || | ||
| (typeof executeResult.output === 'object' | ||
| ? JSON.stringify(executeResult.output) | ||
| : String(executeResult.output || executeResult.error || 'Task completed')) | ||
| ) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The extractAgentContent function has a fallback chain that could return misleading results. If executeResult.output is an empty object, it will stringify to a string representation instead of using the fallback message.
Consider checking for meaningful content before stringifying objects to avoid returning unhelpful empty object representations.
Prompt To Fix With AI
This is a comment left during a code review.
Path: apps/sim/app/api/a2a/serve/[agentId]/utils.ts
Line: 139:149
Comment:
The `extractAgentContent` function has a fallback chain that could return misleading results. If `executeResult.output` is an empty object, it will stringify to a string representation instead of using the fallback message.
Consider checking for meaningful content before stringifying objects to avoid returning unhelpful empty object representations.
How can I resolve this? If you propose a fix, please make it concise.
Summary
Type of Change
Testing
Tested manually
Checklist