Skip to content

Commit 3d23332

Browse files
authored
refactor: extract request handlers from A2AExpressApp (#162)
# Description As a preparation step for REST support proposed in #142, extract Express.js handlers from `A2AExpressApp` to allow assembling your own app using Express.js API directly for specifying individual paths and middlewares, rather than wrapping it in `A2AExpressApp`: ```ts // const a2aRequestHandler = . . . const app = express(); // Minimal example: app.use(`/${AGENT_CARD_PATH}`, agentCardHandler({ agentCardProvider: a2aRequestHandler })); app.use(jsonRpcHandler({ requestHandler: a2aRequestHandler })); // Advanced example: app.use(myCustomMiddleware()); app.use('/.well-known/custom-agent-card.json', myCustomAgentCardMiddleware, agentCardHandler({ agentCardProvider: a2aRequestHandler })); app.use('/a2a/jsonrpc', myJsonRpcMiddleware, jsonRpcHandler({ requestHandler: a2aRequestHandler })); app.use('/a2a/api', restHandler({ requestHandler: a2aRequestHandler })); ``` No logic changes to existing handlers, they are moved as is. Also `A2AExpressApp` is switched to use new handlers without behavior changes. **Note:** new functions **are not** exported via `index.ts` yet, it will be done together with updating docs and examples once REST support is done. Re #137
1 parent 67869e4 commit 3d23332

File tree

4 files changed

+168
-110
lines changed

4 files changed

+168
-110
lines changed
Lines changed: 15 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,16 @@
1-
import express, { Request, Response, Express, RequestHandler, ErrorRequestHandler, NextFunction } from 'express';
1+
import express, { Express, RequestHandler, ErrorRequestHandler } from 'express';
22

3-
import { A2AError } from "../error.js";
4-
import { JSONRPCErrorResponse, JSONRPCSuccessResponse, JSONRPCResponse } from "../../index.js";
53
import { A2ARequestHandler } from "../request_handler/a2a_request_handler.js";
64
import { JsonRpcTransportHandler } from "../transports/jsonrpc_transport_handler.js";
75
import { AGENT_CARD_PATH } from "../../constants.js";
6+
import { jsonErrorHandler, jsonRpcHandler } from './json_rpc_handler.js';
7+
import { agentCardHandler } from './agent_card_handler.js';
88

99
export class A2AExpressApp {
10-
private requestHandler: A2ARequestHandler; // Kept for getAgentCard
11-
private jsonRpcTransportHandler: JsonRpcTransportHandler;
10+
private requestHandler: A2ARequestHandler;
1211

1312
constructor(requestHandler: A2ARequestHandler) {
14-
this.requestHandler = requestHandler; // DefaultRequestHandler instance
15-
this.jsonRpcTransportHandler = new JsonRpcTransportHandler(requestHandler);
13+
this.requestHandler = requestHandler;
1614
}
1715

1816
/**
@@ -30,97 +28,21 @@ export class A2AExpressApp {
3028
agentCardPath: string = AGENT_CARD_PATH
3129
): Express {
3230
const router = express.Router();
33-
router.use(express.json(), ...(middlewares ?? []));
3431

35-
router.use((err: any, req: Request, res: Response, next: NextFunction) => {
36-
// Handle JSON parse errors from express.json() (https://github.com/expressjs/body-parser/issues/122)
37-
if (err instanceof SyntaxError && 'body' in err) {
38-
const a2aError = A2AError.parseError('Invalid JSON payload.');
39-
const errorResponse: JSONRPCErrorResponse = {
40-
jsonrpc: '2.0',
41-
id: null,
42-
error: a2aError.toJSONRPCError(),
43-
};
44-
return res.status(400).json(errorResponse);
45-
}
46-
next(err);
47-
});
32+
// Doing it here to maintain previous behaviour of invoking provided middlewares
33+
// after JSON body is parsed, jsonRpcHandler registers JSON parsing on the local router.
34+
// body-parser used by express.json() ignores subsequent calls and is safe to be added twice:
35+
// https://github.com/expressjs/body-parser/blob/168afff3470302aa28050a8ae6681fa1fdaf71a2/lib/read.js#L41.
36+
router.use(express.json(), jsonErrorHandler);
4837

49-
router.get(`/${agentCardPath}`, async (req: Request, res: Response) => {
50-
try {
51-
// getAgentCard is on A2ARequestHandler, which DefaultRequestHandler implements
52-
const agentCard = await this.requestHandler.getAgentCard();
53-
res.json(agentCard);
54-
} catch (error: any) {
55-
console.error("Error fetching agent card:", error);
56-
res.status(500).json({ error: "Failed to retrieve agent card" });
57-
}
58-
});
38+
if (middlewares && middlewares.length > 0) {
39+
router.use(middlewares);
40+
}
5941

60-
router.post("/", async (req: Request, res: Response) => {
61-
try {
62-
const rpcResponseOrStream = await this.jsonRpcTransportHandler.handle(req.body);
63-
64-
// Check if it's an AsyncGenerator (stream)
65-
if (typeof (rpcResponseOrStream as any)?.[Symbol.asyncIterator] === 'function') {
66-
const stream = rpcResponseOrStream as AsyncGenerator<JSONRPCSuccessResponse, void, undefined>;
67-
68-
res.setHeader('Content-Type', 'text/event-stream');
69-
res.setHeader('Cache-Control', 'no-cache');
70-
res.setHeader('Connection', 'keep-alive');
71-
res.flushHeaders();
72-
73-
try {
74-
for await (const event of stream) {
75-
// Each event from the stream is already a JSONRPCResult
76-
res.write(`id: ${new Date().getTime()}\n`);
77-
res.write(`data: ${JSON.stringify(event)}\n\n`);
78-
}
79-
} catch (streamError: any) {
80-
console.error(`Error during SSE streaming (request ${req.body?.id}):`, streamError);
81-
// If the stream itself throws an error, send a final JSONRPCErrorResponse
82-
const a2aError = streamError instanceof A2AError ? streamError : A2AError.internalError(streamError.message || 'Streaming error.');
83-
const errorResponse: JSONRPCErrorResponse = {
84-
jsonrpc: '2.0',
85-
id: req.body?.id || null, // Use original request ID if available
86-
error: a2aError.toJSONRPCError(),
87-
};
88-
if (!res.headersSent) { // Should not happen if flushHeaders worked
89-
res.status(500).json(errorResponse); // Should be JSON, not SSE here
90-
} else {
91-
// Try to send as last SSE event if possible, though client might have disconnected
92-
res.write(`id: ${new Date().getTime()}\n`);
93-
res.write(`event: error\n`); // Custom event type for client-side handling
94-
res.write(`data: ${JSON.stringify(errorResponse)}\n\n`);
95-
}
96-
} finally {
97-
if (!res.writableEnded) {
98-
res.end();
99-
}
100-
}
101-
} else { // Single JSON-RPC response
102-
const rpcResponse = rpcResponseOrStream as JSONRPCResponse;
103-
res.status(200).json(rpcResponse);
104-
}
105-
} catch (error: any) { // Catch errors from jsonRpcTransportHandler.handle itself (e.g., initial parse error)
106-
console.error("Unhandled error in A2AExpressApp POST handler:", error);
107-
const a2aError = error instanceof A2AError ? error : A2AError.internalError('General processing error.');
108-
const errorResponse: JSONRPCErrorResponse = {
109-
jsonrpc: '2.0',
110-
id: req.body?.id || null,
111-
error: a2aError.toJSONRPCError(),
112-
};
113-
if (!res.headersSent) {
114-
res.status(500).json(errorResponse);
115-
} else if (!res.writableEnded) {
116-
// If headers sent (likely during a stream attempt that failed early), try to end gracefully
117-
res.end();
118-
}
119-
}
120-
});
42+
router.use(jsonRpcHandler({ requestHandler: this.requestHandler }));
43+
router.use(`/${agentCardPath}`, agentCardHandler({ agentCardProvider: this.requestHandler }));
12144

12245
app.use(baseUrl, router);
123-
// The separate /stream endpoint is no longer needed.
12446
return app;
12547
}
12648
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
import express, { Request, RequestHandler, Response } from "express";
2+
import { AgentCard } from "../../types.js";
3+
4+
export interface AgentCardHandlerOptions {
5+
agentCardProvider: AgentCardProvider;
6+
}
7+
8+
export type AgentCardProvider =
9+
{ getAgentCard(): Promise<AgentCard>; }
10+
| (() => Promise<AgentCard>);
11+
12+
/**
13+
* Creates Express.js middleware to handle agent card requests.
14+
* @example
15+
* // With an existing A2ARequestHandler instance:
16+
* app.use('/.well-known/agent-card.json', agentCardHandler({ agentCardProvider: a2aRequestHandler }));
17+
* // or with a factory lambda:
18+
* app.use('/.well-known/agent-card.json', agentCardHandler({ agentCardProvider: async () => agentCard }));
19+
*/
20+
export function agentCardHandler(options: AgentCardHandlerOptions): RequestHandler {
21+
const router = express.Router()
22+
23+
const provider = typeof options.agentCardProvider === 'function'
24+
? options.agentCardProvider
25+
: options.agentCardProvider.getAgentCard.bind(options.agentCardProvider);
26+
27+
router.get('/', async (_req: Request, res: Response) => {
28+
try {
29+
const agentCard = await provider();
30+
res.json(agentCard);
31+
} catch (error: any) {
32+
console.error("Error fetching agent card:", error);
33+
res.status(500).json({ error: "Failed to retrieve agent card" });
34+
}
35+
})
36+
37+
return router
38+
}
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
import express, { Router, Request, Response, ErrorRequestHandler, NextFunction, RequestHandler } from "express";
2+
import { JSONRPCErrorResponse, JSONRPCSuccessResponse, JSONRPCResponse } from "../../types.js";
3+
import { A2AError } from "../error.js";
4+
import { A2ARequestHandler } from "../request_handler/a2a_request_handler.js";
5+
import { JsonRpcTransportHandler } from "../transports/jsonrpc_transport_handler.js";
6+
7+
export interface JsonRpcHandlerOptions {
8+
requestHandler: A2ARequestHandler;
9+
}
10+
11+
/**
12+
* Creates Express.js middleware to handle A2A JSON-RPC requests.
13+
* @example
14+
* // Handle at root
15+
* app.use(jsonRpcHandler({ requestHandler: a2aRequestHandler }));
16+
* // or
17+
* app.use('/a2a/json-rpc', jsonRpcHandler({ requestHandler: a2aRequestHandler }));
18+
*/
19+
export function jsonRpcHandler(options: JsonRpcHandlerOptions): RequestHandler {
20+
const jsonRpcTransportHandler = new JsonRpcTransportHandler(options.requestHandler);
21+
22+
const router = express.Router()
23+
24+
router.use(express.json(), jsonErrorHandler);
25+
26+
router.post("/", async (req: Request, res: Response) => {
27+
try {
28+
const rpcResponseOrStream = await jsonRpcTransportHandler.handle(req.body);
29+
30+
// Check if it's an AsyncGenerator (stream)
31+
if (typeof (rpcResponseOrStream as any)?.[Symbol.asyncIterator] === 'function') {
32+
const stream = rpcResponseOrStream as AsyncGenerator<JSONRPCSuccessResponse, void, undefined>;
33+
34+
res.setHeader('Content-Type', 'text/event-stream');
35+
res.setHeader('Cache-Control', 'no-cache');
36+
res.setHeader('Connection', 'keep-alive');
37+
res.flushHeaders();
38+
39+
try {
40+
for await (const event of stream) {
41+
// Each event from the stream is already a JSONRPCResult
42+
res.write(`id: ${new Date().getTime()}\n`);
43+
res.write(`data: ${JSON.stringify(event)}\n\n`);
44+
}
45+
} catch (streamError: any) {
46+
console.error(`Error during SSE streaming (request ${req.body?.id}):`, streamError);
47+
// If the stream itself throws an error, send a final JSONRPCErrorResponse
48+
const a2aError = streamError instanceof A2AError ? streamError : A2AError.internalError(streamError.message || 'Streaming error.');
49+
const errorResponse: JSONRPCErrorResponse = {
50+
jsonrpc: '2.0',
51+
id: req.body?.id || null, // Use original request ID if available
52+
error: a2aError.toJSONRPCError(),
53+
};
54+
if (!res.headersSent) { // Should not happen if flushHeaders worked
55+
res.status(500).json(errorResponse); // Should be JSON, not SSE here
56+
} else {
57+
// Try to send as last SSE event if possible, though client might have disconnected
58+
res.write(`id: ${new Date().getTime()}\n`);
59+
res.write(`event: error\n`); // Custom event type for client-side handling
60+
res.write(`data: ${JSON.stringify(errorResponse)}\n\n`);
61+
}
62+
} finally {
63+
if (!res.writableEnded) {
64+
res.end();
65+
}
66+
}
67+
} else { // Single JSON-RPC response
68+
const rpcResponse = rpcResponseOrStream as JSONRPCResponse;
69+
res.status(200).json(rpcResponse);
70+
}
71+
} catch (error: any) { // Catch errors from jsonRpcTransportHandler.handle itself (e.g., initial parse error)
72+
console.error("Unhandled error in JSON-RPC POST handler:", error);
73+
const a2aError = error instanceof A2AError ? error : A2AError.internalError('General processing error.');
74+
const errorResponse: JSONRPCErrorResponse = {
75+
jsonrpc: '2.0',
76+
id: req.body?.id || null,
77+
error: a2aError.toJSONRPCError(),
78+
};
79+
if (!res.headersSent) {
80+
res.status(500).json(errorResponse);
81+
} else if (!res.writableEnded) {
82+
// If headers sent (likely during a stream attempt that failed early), try to end gracefully
83+
res.end();
84+
}
85+
}
86+
});
87+
88+
return router
89+
}
90+
91+
export const jsonErrorHandler: ErrorRequestHandler = (err: any, _req: Request, res: Response, next: NextFunction) => {
92+
// Handle JSON parse errors from express.json() (https://github.com/expressjs/body-parser/issues/122)
93+
if (err instanceof SyntaxError && 'body' in err) {
94+
const a2aError = A2AError.parseError('Invalid JSON payload.');
95+
const errorResponse: JSONRPCErrorResponse = {
96+
jsonrpc: '2.0',
97+
id: null,
98+
error: a2aError.toJSONRPCError(),
99+
};
100+
return res.status(400).json(errorResponse);
101+
}
102+
next(err);
103+
}

test/server/a2a_express_app.spec.ts

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import 'mocha';
2-
import { assert, expect } from 'chai';
2+
import { assert } from 'chai';
33
import sinon, { SinonStub } from 'sinon';
44
import express, { Express, Request, Response } from 'express';
55
import request from 'supertest';
@@ -10,13 +10,12 @@ import { JsonRpcTransportHandler } from '../../src/server/transports/jsonrpc_tra
1010
import { AgentCard, JSONRPCSuccessResponse, JSONRPCErrorResponse } from '../../src/index.js';
1111
import { AGENT_CARD_PATH } from '../../src/constants.js';
1212
import { A2AError } from '../../src/server/error.js';
13-
import { parseArgs } from 'util';
1413

1514
describe('A2AExpressApp', () => {
1615
let mockRequestHandler: A2ARequestHandler;
17-
let mockJsonRpcTransportHandler: JsonRpcTransportHandler;
1816
let app: A2AExpressApp;
1917
let expressApp: Express;
18+
let handleStub: SinonStub;
2019

2120
// Helper function to create JSON-RPC request bodies
2221
const createRpcRequest = (id: string | null, method = 'message/send', params: object = {}) => ({
@@ -60,10 +59,7 @@ describe('A2AExpressApp', () => {
6059
app = new A2AExpressApp(mockRequestHandler);
6160
expressApp = express();
6261

63-
// Mock the JsonRpcTransportHandler - accessing private property for testing
64-
// Note: This is a necessary testing approach given current A2AExpressApp design
65-
mockJsonRpcTransportHandler = sinon.createStubInstance(JsonRpcTransportHandler);
66-
(app as any).jsonRpcTransportHandler = mockJsonRpcTransportHandler;
62+
handleStub = sinon.stub(JsonRpcTransportHandler.prototype, 'handle')
6763
});
6864

6965
afterEach(() => {
@@ -75,7 +71,6 @@ describe('A2AExpressApp', () => {
7571
const newApp = new A2AExpressApp(mockRequestHandler);
7672
assert.instanceOf(newApp, A2AExpressApp);
7773
assert.equal((newApp as any).requestHandler, mockRequestHandler);
78-
assert.instanceOf((newApp as any).jsonRpcTransportHandler, JsonRpcTransportHandler);
7974
});
8075
});
8176

@@ -139,7 +134,7 @@ describe('A2AExpressApp', () => {
139134
result: { message: 'success' }
140135
};
141136

142-
(mockJsonRpcTransportHandler.handle as SinonStub).resolves(mockResponse);
137+
handleStub.resolves(mockResponse);
143138

144139
const requestBody = createRpcRequest('test-id');
145140

@@ -149,7 +144,7 @@ describe('A2AExpressApp', () => {
149144
.expect(200);
150145

151146
assert.deepEqual(response.body, mockResponse);
152-
assert.isTrue((mockJsonRpcTransportHandler.handle as SinonStub).calledOnceWith(requestBody));
147+
assert.isTrue(handleStub.calledOnceWith(requestBody));
153148
});
154149

155150
it('should handle streaming JSON-RPC response', async () => {
@@ -160,7 +155,7 @@ describe('A2AExpressApp', () => {
160155
}
161156
};
162157

163-
(mockJsonRpcTransportHandler.handle as SinonStub).resolves(mockStreamResponse);
158+
handleStub.resolves(mockStreamResponse);
164159

165160
const requestBody = createRpcRequest('stream-test', 'message/stream');
166161

@@ -186,7 +181,7 @@ describe('A2AExpressApp', () => {
186181
}
187182
};
188183

189-
(mockJsonRpcTransportHandler.handle as SinonStub).resolves(mockErrorStream);
184+
handleStub.resolves(mockErrorStream);
190185

191186
const requestBody = createRpcRequest('stream-error-test', 'message/stream');
192187

@@ -207,7 +202,7 @@ describe('A2AExpressApp', () => {
207202
}
208203
};
209204

210-
(mockJsonRpcTransportHandler.handle as SinonStub).resolves(mockImmediateErrorStream);
205+
handleStub.resolves(mockImmediateErrorStream);
211206

212207
const requestBody = createRpcRequest('immediate-stream-error-test', 'message/stream');
213208

@@ -228,7 +223,7 @@ describe('A2AExpressApp', () => {
228223

229224
it('should handle general processing error', async () => {
230225
const error = new A2AError(-32603, 'Processing error');
231-
(mockJsonRpcTransportHandler.handle as SinonStub).rejects(error);
226+
handleStub.rejects(error);
232227

233228
const requestBody = createRpcRequest('error-test');
234229

@@ -251,7 +246,7 @@ describe('A2AExpressApp', () => {
251246

252247
it('should handle non-A2AError with fallback error handling', async () => {
253248
const genericError = new Error('Generic error');
254-
(mockJsonRpcTransportHandler.handle as SinonStub).rejects(genericError);
249+
handleStub.rejects(genericError);
255250

256251
const requestBody = createRpcRequest('generic-error-test');
257252

@@ -267,7 +262,7 @@ describe('A2AExpressApp', () => {
267262

268263
it('should handle request without id', async () => {
269264
const error = new A2AError(-32600, 'No ID error');
270-
(mockJsonRpcTransportHandler.handle as SinonStub).rejects(error);
265+
handleStub.rejects(error);
271266

272267
const requestBody = createRpcRequest(null);
273268

@@ -343,7 +338,7 @@ describe('A2AExpressApp', () => {
343338
.send(requestBody)
344339
.expect(200);
345340

346-
assert.isTrue((mockJsonRpcTransportHandler.handle as SinonStub).calledOnceWith(requestBody));
341+
assert.isTrue(handleStub.calledOnceWith(requestBody));
347342
});
348343

349344
it('should handle malformed json request', async () => {

0 commit comments

Comments
 (0)