Skip to content

Commit 5491030

Browse files
authored
feat: jsonrpc server (a2aproject#91)
* Implemented `jsonrpc` server transport exposed as `a2asrv.NewJSONRPCHandler(requestHandler)`. * Created an `e2e` package for tests using real dependencies (except `AgentExecutor`). * Extracted `mockAgentExecutor` to `internal/testutil/testexecutor` package, unfortunately can't be used in `a2asrv` because of a circular reference. Will need to think about reorganizing tests later. * Extracted server-sent-events utilities to `internal/sse` package. * Extracted jsonrpc-related constants and jsonrpc error mapping to `internal/jsonrpc` to reuse between `a2asrv` and `a2aclient`.
1 parent f7aa465 commit 5491030

File tree

12 files changed

+1153
-223
lines changed

12 files changed

+1153
-223
lines changed

a2aclient/auth_test.go

Lines changed: 5 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -24,27 +24,13 @@ import (
2424
"github.com/a2aproject/a2a-go/a2agrpc"
2525
"github.com/a2aproject/a2a-go/a2asrv"
2626
"github.com/a2aproject/a2a-go/a2asrv/eventqueue"
27+
"github.com/a2aproject/a2a-go/internal/testutil/testexecutor"
2728
"github.com/google/go-cmp/cmp"
2829
"google.golang.org/grpc"
2930
"google.golang.org/grpc/credentials/insecure"
3031
"google.golang.org/grpc/test/bufconn"
3132
)
3233

33-
type mockAgentExecutor struct {
34-
ExecuteFn func(context.Context, *a2asrv.RequestContext, eventqueue.Queue) error
35-
}
36-
37-
func (e *mockAgentExecutor) Execute(ctx context.Context, reqCtx *a2asrv.RequestContext, q eventqueue.Queue) error {
38-
if e.ExecuteFn != nil {
39-
return e.ExecuteFn(ctx, reqCtx, q)
40-
}
41-
return nil
42-
}
43-
44-
func (e *mockAgentExecutor) Cancel(ctx context.Context, reqCtx *a2asrv.RequestContext, q eventqueue.Queue) error {
45-
return nil
46-
}
47-
4834
func startGRPCTestServer(t *testing.T, handler a2asrv.RequestHandler, listener *bufconn.Listener) {
4935
s := grpc.NewServer()
5036
grpcHandler := a2agrpc.NewHandler(handler)
@@ -68,12 +54,10 @@ func TestAuth_GRPC(t *testing.T) {
6854
listener := bufconn.Listen(1024 * 1024)
6955

7056
var capturedCallContext *a2asrv.CallContext
71-
executor := &mockAgentExecutor{
72-
ExecuteFn: func(ctx context.Context, reqCtx *a2asrv.RequestContext, q eventqueue.Queue) error {
73-
capturedCallContext, _ = a2asrv.CallContextFrom(ctx)
74-
return q.Write(ctx, a2a.NewMessage(a2a.MessageRoleAgent))
75-
},
76-
}
57+
executor := testexecutor.FromFunction(func(ctx context.Context, reqCtx *a2asrv.RequestContext, q eventqueue.Queue) error {
58+
capturedCallContext, _ = a2asrv.CallContextFrom(ctx)
59+
return q.Write(ctx, a2a.NewMessage(a2a.MessageRoleAgent))
60+
})
7761
handler := a2asrv.NewHandler(executor)
7862
go startGRPCTestServer(t, handler, listener)
7963

a2aclient/jsonrpc.go

Lines changed: 58 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
package a2aclient
1616

1717
import (
18-
"bufio"
1918
"bytes"
2019
"context"
2120
"encoding/json"
@@ -26,33 +25,27 @@ import (
2625
"time"
2726

2827
"github.com/a2aproject/a2a-go/a2a"
28+
"github.com/a2aproject/a2a-go/internal/jsonrpc"
29+
"github.com/a2aproject/a2a-go/internal/sse"
2930
"github.com/a2aproject/a2a-go/log"
3031
"github.com/google/uuid"
3132
)
3233

33-
// JSON-RPC 2.0 protocol constants
34-
const (
35-
jsonrpcVersion = "2.0"
36-
37-
// HTTP headers
38-
contentTypeJSON = "application/json"
39-
acceptEventStream = "text/event-stream"
40-
41-
// JSON-RPC method names per A2A spec §7
42-
methodMessageSend = "message/send"
43-
methodMessageStream = "message/stream"
44-
methodTasksGet = "tasks/get"
45-
methodTasksCancel = "tasks/cancel"
46-
methodTasksResubscribe = "tasks/resubscribe"
47-
methodPushConfigGet = "tasks/pushNotificationConfig/get"
48-
methodPushConfigSet = "tasks/pushNotificationConfig/set"
49-
methodPushConfigList = "tasks/pushNotificationConfig/list"
50-
methodPushConfigDelete = "tasks/pushNotificationConfig/delete"
51-
methodGetAuthenticatedExtended = "agent/getAuthenticatedExtendedCard"
52-
53-
// SSE data prefix
54-
sseDataPrefix = "data: "
55-
)
34+
// jsonrpcRequest represents a JSON-RPC 2.0 request.
35+
type jsonrpcRequest struct {
36+
JSONRPC string `json:"jsonrpc"`
37+
Method string `json:"method"`
38+
Params any `json:"params,omitempty"`
39+
ID string `json:"id"`
40+
}
41+
42+
// jsonrpcResponse represents a JSON-RPC 2.0 response.
43+
type jsonrpcResponse struct {
44+
JSONRPC string `json:"jsonrpc"`
45+
ID string `json:"id"`
46+
Result json.RawMessage `json:"result,omitempty"`
47+
Error *jsonrpc.Error `json:"error,omitempty"`
48+
}
5649

5750
// JSONRPCOption configures optional parameters for the JSONRPC transport.
5851
// Options are applied during NewJSONRPCTransport initialization.
@@ -87,18 +80,17 @@ func WithJSONRPCTransport(opts ...JSONRPCOption) FactoryOption {
8780
return WithTransport(
8881
a2a.TransportProtocolJSONRPC,
8982
TransportFactoryFn(func(ctx context.Context, url string, card *a2a.AgentCard) (Transport, error) {
90-
return NewJSONRPCTransport(url, card, opts...), nil
83+
return NewJSONRPCTransport(url, opts...), nil
9184
}),
9285
)
9386
}
9487

9588
// NewJSONRPCTransport creates a new JSON-RPC transport for A2A protocol communication.
9689
// By default, an HTTP client with 5-second timeout is used (matching Python SDK behavior).
9790
// For custom timeout, retry logic, or connection pooling, provide a configured client via WithHTTPClient.
98-
func NewJSONRPCTransport(url string, card *a2a.AgentCard, opts ...JSONRPCOption) Transport {
91+
func NewJSONRPCTransport(url string, opts ...JSONRPCOption) Transport {
9992
t := &jsonrpcTransport{
100-
url: url,
101-
agentCard: card,
93+
url: url,
10294
httpClient: &http.Client{
10395
Timeout: 5 * time.Second, // Match Python SDK httpx.AsyncClient default
10496
},
@@ -115,45 +107,11 @@ func NewJSONRPCTransport(url string, card *a2a.AgentCard, opts ...JSONRPCOption)
115107
type jsonrpcTransport struct {
116108
url string
117109
httpClient *http.Client
118-
agentCard *a2a.AgentCard
119-
}
120-
121-
// jsonrpcRequest represents a JSON-RPC 2.0 request.
122-
type jsonrpcRequest struct {
123-
JSONRPC string `json:"jsonrpc"`
124-
Method string `json:"method"`
125-
Params any `json:"params,omitempty"`
126-
ID string `json:"id"`
127-
}
128-
129-
// jsonrpcResponse represents a JSON-RPC 2.0 response.
130-
type jsonrpcResponse struct {
131-
JSONRPC string `json:"jsonrpc"`
132-
ID string `json:"id"`
133-
Result json.RawMessage `json:"result,omitempty"`
134-
Error *jsonrpcError `json:"error,omitempty"`
135-
}
136-
137-
// jsonrpcError represents a JSON-RPC 2.0 error object.
138-
// TODO(yarolegovich): Convert to transport-agnostic error format so Client can use errors.Is(err, a2a.ErrMethodNotFound).
139-
// This needs to be implemented across all transports (currently not in grpc either).
140-
type jsonrpcError struct {
141-
Code int `json:"code"`
142-
Message string `json:"message"`
143-
Data json.RawMessage `json:"data,omitempty"`
144-
}
145-
146-
// Error implements the error interface for jsonrpcError.
147-
func (e *jsonrpcError) Error() string {
148-
if len(e.Data) > 0 {
149-
return fmt.Sprintf("jsonrpc error %d: %s (data: %s)", e.Code, e.Message, string(e.Data))
150-
}
151-
return fmt.Sprintf("jsonrpc error %d: %s", e.Code, e.Message)
152110
}
153111

154112
func (t *jsonrpcTransport) newHTTPRequest(ctx context.Context, method string, params any) (*http.Request, error) {
155113
req := jsonrpcRequest{
156-
JSONRPC: jsonrpcVersion,
114+
JSONRPC: jsonrpc.Version,
157115
Method: method,
158116
Params: params,
159117
ID: uuid.New().String(),
@@ -169,7 +127,7 @@ func (t *jsonrpcTransport) newHTTPRequest(ctx context.Context, method string, pa
169127
return nil, fmt.Errorf("failed to create HTTP request: %w", err)
170128
}
171129

172-
httpReq.Header.Set("Content-Type", contentTypeJSON)
130+
httpReq.Header.Set("Content-Type", jsonrpc.ContentJSON)
173131

174132
if callMeta, ok := CallMetaFrom(ctx); ok {
175133
for k, vals := range callMeta {
@@ -209,7 +167,7 @@ func (t *jsonrpcTransport) sendRequest(ctx context.Context, method string, param
209167
}
210168

211169
if resp.Error != nil {
212-
return nil, resp.Error
170+
return nil, resp.Error.ToA2AError()
213171
}
214172

215173
return resp.Result, nil
@@ -221,7 +179,7 @@ func (t *jsonrpcTransport) sendStreamingRequest(ctx context.Context, method stri
221179
if err != nil {
222180
return nil, fmt.Errorf("failed to create HTTP request: %w", err)
223181
}
224-
httpReq.Header.Set("Accept", acceptEventStream)
182+
httpReq.Header.Set("Accept", sse.ContentEventStream)
225183

226184
httpResp, err := t.httpClient.Do(httpReq)
227185
if err != nil {
@@ -241,43 +199,30 @@ func (t *jsonrpcTransport) sendStreamingRequest(ctx context.Context, method stri
241199
// parseSSEStream parses Server-Sent Events and yields JSON-RPC responses.
242200
func parseSSEStream(body io.Reader) iter.Seq2[json.RawMessage, error] {
243201
return func(yield func(json.RawMessage, error) bool) {
244-
scanner := bufio.NewScanner(body)
245-
prefixBytes := []byte(sseDataPrefix)
246-
247-
for scanner.Scan() {
248-
lineBytes := scanner.Bytes()
249-
250-
// SSE data lines start with "data: "
251-
if bytes.HasPrefix(lineBytes, prefixBytes) {
252-
data := lineBytes[len(prefixBytes):]
253-
254-
var resp jsonrpcResponse
255-
if err := json.Unmarshal(data, &resp); err != nil {
256-
yield(nil, fmt.Errorf("failed to parse SSE data: %w", err))
257-
return
258-
}
259-
260-
if resp.Error != nil {
261-
yield(nil, resp.Error)
262-
return
263-
}
264-
265-
if !yield(resp.Result, nil) {
266-
return
267-
}
202+
for data, err := range sse.ParseDataStream(body) {
203+
if err != nil {
204+
yield(nil, err)
205+
return
206+
}
207+
var resp jsonrpcResponse
208+
if err := json.Unmarshal(data, &resp); err != nil {
209+
yield(nil, fmt.Errorf("failed to parse SSE data: %w", err))
210+
return
211+
}
212+
if resp.Error != nil {
213+
yield(nil, resp.Error.ToA2AError())
214+
return
215+
}
216+
if !yield(resp.Result, nil) {
217+
return
268218
}
269-
// Ignore empty lines, comments, and other SSE event types
270-
}
271-
272-
if err := scanner.Err(); err != nil {
273-
yield(nil, fmt.Errorf("SSE stream error: %w", err))
274219
}
275220
}
276221
}
277222

278223
// SendMessage sends a non-streaming message to the agent.
279224
func (t *jsonrpcTransport) SendMessage(ctx context.Context, message *a2a.MessageSendParams) (a2a.SendMessageResult, error) {
280-
result, err := t.sendRequest(ctx, methodMessageSend, message)
225+
result, err := t.sendRequest(ctx, jsonrpc.MethodMessageSend, message)
281226
if err != nil {
282227
return nil, err
283228
}
@@ -335,12 +280,12 @@ func (t *jsonrpcTransport) streamRequestToEvents(ctx context.Context, method str
335280

336281
// SendStreamingMessage sends a streaming message to the agent.
337282
func (t *jsonrpcTransport) SendStreamingMessage(ctx context.Context, message *a2a.MessageSendParams) iter.Seq2[a2a.Event, error] {
338-
return t.streamRequestToEvents(ctx, methodMessageStream, message)
283+
return t.streamRequestToEvents(ctx, jsonrpc.MethodMessageStream, message)
339284
}
340285

341286
// GetTask retrieves the current state of a task.
342287
func (t *jsonrpcTransport) GetTask(ctx context.Context, query *a2a.TaskQueryParams) (*a2a.Task, error) {
343-
result, err := t.sendRequest(ctx, methodTasksGet, query)
288+
result, err := t.sendRequest(ctx, jsonrpc.MethodTasksGet, query)
344289
if err != nil {
345290
return nil, err
346291
}
@@ -355,7 +300,7 @@ func (t *jsonrpcTransport) GetTask(ctx context.Context, query *a2a.TaskQueryPara
355300

356301
// CancelTask requests cancellation of a task.
357302
func (t *jsonrpcTransport) CancelTask(ctx context.Context, id *a2a.TaskIDParams) (*a2a.Task, error) {
358-
result, err := t.sendRequest(ctx, methodTasksCancel, id)
303+
result, err := t.sendRequest(ctx, jsonrpc.MethodTasksCancel, id)
359304
if err != nil {
360305
return nil, err
361306
}
@@ -370,12 +315,12 @@ func (t *jsonrpcTransport) CancelTask(ctx context.Context, id *a2a.TaskIDParams)
370315

371316
// ResubscribeToTask reconnects to an SSE stream for an ongoing task.
372317
func (t *jsonrpcTransport) ResubscribeToTask(ctx context.Context, id *a2a.TaskIDParams) iter.Seq2[a2a.Event, error] {
373-
return t.streamRequestToEvents(ctx, methodTasksResubscribe, id)
318+
return t.streamRequestToEvents(ctx, jsonrpc.MethodTasksResubscribe, id)
374319
}
375320

376321
// GetTaskPushConfig retrieves the push notification configuration for a task.
377322
func (t *jsonrpcTransport) GetTaskPushConfig(ctx context.Context, params *a2a.GetTaskPushConfigParams) (*a2a.TaskPushConfig, error) {
378-
result, err := t.sendRequest(ctx, methodPushConfigGet, params)
323+
result, err := t.sendRequest(ctx, jsonrpc.MethodPushConfigGet, params)
379324
if err != nil {
380325
return nil, err
381326
}
@@ -390,7 +335,7 @@ func (t *jsonrpcTransport) GetTaskPushConfig(ctx context.Context, params *a2a.Ge
390335

391336
// ListTaskPushConfig lists push notification configurations.
392337
func (t *jsonrpcTransport) ListTaskPushConfig(ctx context.Context, params *a2a.ListTaskPushConfigParams) ([]*a2a.TaskPushConfig, error) {
393-
result, err := t.sendRequest(ctx, methodPushConfigList, params)
338+
result, err := t.sendRequest(ctx, jsonrpc.MethodPushConfigList, params)
394339
if err != nil {
395340
return nil, err
396341
}
@@ -405,7 +350,7 @@ func (t *jsonrpcTransport) ListTaskPushConfig(ctx context.Context, params *a2a.L
405350

406351
// SetTaskPushConfig sets or updates the push notification configuration for a task.
407352
func (t *jsonrpcTransport) SetTaskPushConfig(ctx context.Context, params *a2a.TaskPushConfig) (*a2a.TaskPushConfig, error) {
408-
result, err := t.sendRequest(ctx, methodPushConfigSet, params)
353+
result, err := t.sendRequest(ctx, jsonrpc.MethodPushConfigSet, params)
409354
if err != nil {
410355
return nil, err
411356
}
@@ -420,16 +365,22 @@ func (t *jsonrpcTransport) SetTaskPushConfig(ctx context.Context, params *a2a.Ta
420365

421366
// DeleteTaskPushConfig deletes a push notification configuration.
422367
func (t *jsonrpcTransport) DeleteTaskPushConfig(ctx context.Context, params *a2a.DeleteTaskPushConfigParams) error {
423-
_, err := t.sendRequest(ctx, methodPushConfigDelete, params)
368+
_, err := t.sendRequest(ctx, jsonrpc.MethodPushConfigDelete, params)
424369
return err
425370
}
426371

427372
// GetAgentCard retrieves the agent's card.
428373
func (t *jsonrpcTransport) GetAgentCard(ctx context.Context) (*a2a.AgentCard, error) {
429-
if t.agentCard == nil {
430-
return nil, fmt.Errorf("no agent card available")
374+
result, err := t.sendRequest(ctx, jsonrpc.MethodGetExtendedAgentCard, nil)
375+
if err != nil {
376+
return nil, err
377+
}
378+
379+
var card a2a.AgentCard
380+
if err := json.Unmarshal(result, &card); err != nil {
381+
return nil, fmt.Errorf("failed to unmarshal agent card: %w", err)
431382
}
432-
return t.agentCard, nil
383+
return &card, nil
433384
}
434385

435386
// Destroy closes the transport and releases resources.

0 commit comments

Comments
 (0)