Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/client/card-resolver.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { AGENT_CARD_PATH } from '../constants.js';
import { AgentCard } from '../index.js';
import { AgentCard as AgentCardValue, type AgentCard } from '../index.js';

export interface AgentCardResolverOptions {
path?: string;
Expand Down Expand Up @@ -54,7 +54,7 @@ export class DefaultAgentCardResolver implements AgentCardResolver {
*/
private normalizeAgentCard(card: unknown): AgentCard {
if (this.isProtoAgentCard(card)) {
const parsedProto = AgentCard.fromJSON(card);
const parsedProto = AgentCardValue.fromJSON(card);
return parsedProto;
}
return card as AgentCard;
Expand Down
13 changes: 5 additions & 8 deletions src/client/factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,16 +97,13 @@ export class ClientFactory {
* Creates a new client from the provided agent card.
*/
async createFromAgentCard(agentCard: AgentCard): Promise<Client> {
const agentCardPreferred = agentCard.preferredTransport ?? JsonRpcTransportFactory.name;
const additionalInterfaces = agentCard.additionalInterfaces ?? [];
const urlsPerAgentTransports = new CaseInsensitiveMap<string>([
[agentCardPreferred, agentCard.url],
...additionalInterfaces.map<[string, string]>((i) => [i.transport, i.url]),
]);
const supportedInterfaces = agentCard.supportedInterfaces || [];
const urlsPerAgentTransports = new CaseInsensitiveMap<string>(
supportedInterfaces.map((i) => [i.protocolBinding, i.url])
);
const transportsByPreference = [
...(this.options.preferredTransports ?? []),
agentCardPreferred,
...additionalInterfaces.map((i) => i.transport),
...supportedInterfaces.map((i) => i.protocolBinding),
];
for (const transport of transportsByPreference) {
const url = urlsPerAgentTransports.get(transport);
Expand Down
6 changes: 3 additions & 3 deletions src/client/interceptors.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { AgentCard, A2AStreamEventData } from '../index.js';
import { type AgentCard, A2AStreamEventData } from '../index.js';
import { Client } from './multitransport-client.js';
import { RequestOptions } from './multitransport-client.js';

Expand Down Expand Up @@ -126,8 +126,8 @@ type MethodResult<T, TMembers extends keyof T = keyof T, TOverrides = object> =
}[TMembers];

interface ResultsOverrides {
// sendMessageStream and resubscribeTask return async iterators and are intercepted on each item,
// sendMessageStream and subscribeToTask return async iterators and are intercepted on each item,
// which requires custom handling.
sendMessageStream: A2AStreamEventData;
resubscribeTask: A2AStreamEventData;
subscribeToTask: A2AStreamEventData;
}
63 changes: 34 additions & 29 deletions src/client/multitransport-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,20 @@ import { PushNotificationNotSupportedError } from '../errors.js';
import {
TaskPushNotificationConfig,
Task,
PushNotificationConfig,
AgentCard,
A2AStreamEventData,
SendMessageResult,
} from '../index.js';
import {
CancelTaskRequest,
CreateTaskPushNotificationConfigRequest,
DeleteTaskPushNotificationConfigRequest,
GetTaskPushNotificationConfigRequest,
GetTaskRequest,
ListTaskPushNotificationConfigRequest,
ListTaskPushNotificationConfigsRequest,
SendMessageConfiguration,
SendMessageRequest,
TaskSubscriptionRequest,
} from '../types/pb/a2a_types.js';
SubscribeToTaskRequest,
} from '../types/pb/a2a.js';
import { ClientCallContext } from './context.js';
import {
CallInterceptor,
Expand Down Expand Up @@ -45,7 +43,7 @@ export interface ClientConfig {
/**
* Specifies the default push notification configuration to apply for every Task.
*/
pushNotificationConfig?: PushNotificationConfig;
pushNotificationConfig?: Partial<TaskPushNotificationConfig>;

/**
* Interceptors invoked for each request.
Expand Down Expand Up @@ -82,10 +80,10 @@ export class Client {
* If the current agent card supports the extended feature, it will try to fetch the extended agent card from the server,
* Otherwise it will return the current agent card value.
*/
async getAgentCard(options?: RequestOptions): Promise<AgentCard> {
if (this.agentCard.supportsAuthenticatedExtendedCard) {
async getExtendedAgentCard(options?: RequestOptions): Promise<AgentCard> {
if (this.agentCard.capabilities?.extendedAgentCard) {
this.agentCard = await this.executeWithInterceptors(
{ method: 'getAgentCard' },
{ method: 'getExtendedAgentCard' },
options,
(_, options) => this.transport.getExtendedAgentCard(options)
);
Expand All @@ -100,7 +98,7 @@ export class Client {
sendMessage(params: SendMessageRequest, options?: RequestOptions): Promise<SendMessageResult> {
params = this.applyClientConfig({
params,
blocking: !(this.config?.polling ?? false),
returnImmediately: this.config?.polling ?? false,
});

return this.executeWithInterceptors(
Expand All @@ -120,7 +118,7 @@ export class Client {
): AsyncGenerator<A2AStreamEventData, void, undefined> {
const method = 'sendMessageStream';

params = this.applyClientConfig({ params, blocking: true });
params = this.applyClientConfig({ params, returnImmediately: false });
const beforeArgs: BeforeArgs<'sendMessageStream'> = {
input: { method, value: params },
agentCard: this.agentCard,
Expand Down Expand Up @@ -173,7 +171,7 @@ export class Client {
* Requires the server to have AgentCard.capabilities.pushNotifications: true.
*/
setTaskPushNotificationConfig(
params: CreateTaskPushNotificationConfigRequest,
params: TaskPushNotificationConfig,
options?: RequestOptions
): Promise<TaskPushNotificationConfig> {
if (!this.agentCard.capabilities?.pushNotifications) {
Expand Down Expand Up @@ -210,18 +208,18 @@ export class Client {
* Retrieves the associated push notification configurations for a specified task.
* Requires the server to have AgentCard.capabilities.pushNotifications: true.
*/
listTaskPushNotificationConfig(
params: ListTaskPushNotificationConfigRequest,
listTaskPushNotificationConfigs(
params: ListTaskPushNotificationConfigsRequest,
options?: RequestOptions
): Promise<TaskPushNotificationConfig[]> {
if (!this.agentCard.capabilities?.pushNotifications) {
throw new PushNotificationNotSupportedError();
}

return this.executeWithInterceptors(
{ method: 'listTaskPushNotificationConfig', value: params },
{ method: 'listTaskPushNotificationConfigs', value: params },
options,
this.transport.listTaskPushNotificationConfig.bind(this.transport)
this.transport.listTaskPushNotificationConfigs.bind(this.transport)
);
}

Expand Down Expand Up @@ -265,13 +263,13 @@ export class Client {
/**
* Allows a client to reconnect to an updates stream for an ongoing task after a previous connection was interrupted.
*/
async *resubscribeTask(
params: TaskSubscriptionRequest,
async *subscribeToTask(
params: SubscribeToTaskRequest,
options?: RequestOptions
): AsyncGenerator<A2AStreamEventData, void, undefined> {
const method = 'resubscribeTask';
const method = 'subscribeToTask';

const beforeArgs: BeforeArgs<'resubscribeTask'> = {
const beforeArgs: BeforeArgs<'subscribeToTask'> = {
input: { method, value: params },
agentCard: this.agentCard,
options,
Expand All @@ -280,7 +278,7 @@ export class Client {

if (beforeResult) {
const earlyReturn = beforeResult.earlyReturn.value;
const afterArgs: AfterArgs<'resubscribeTask'> = {
const afterArgs: AfterArgs<'subscribeToTask'> = {
result: { method, value: earlyReturn },
agentCard: this.agentCard,
options: beforeArgs.options,
Expand All @@ -290,11 +288,11 @@ export class Client {
return;
}

for await (const event of this.transport.resubscribeTask(
for await (const event of this.transport.subscribeToTask(
beforeArgs.input.value,
beforeArgs.options
)) {
const afterArgs: AfterArgs<'resubscribeTask'> = {
const afterArgs: AfterArgs<'subscribeToTask'> = {
result: { method, value: event },
agentCard: this.agentCard,
options: beforeArgs.options,
Expand All @@ -309,10 +307,10 @@ export class Client {

private applyClientConfig({
params,
blocking,
returnImmediately,
}: {
params: SendMessageRequest;
blocking: boolean;
returnImmediately: boolean;
}): SendMessageRequest {
const result = {
...params,
Expand All @@ -325,12 +323,19 @@ export class Client {
([] as string[]);
result.configuration.historyLength ??= 0;

if (!result.configuration.pushNotification && this.config?.pushNotificationConfig) {
if (params.request?.taskId !== undefined) {
result.configuration.pushNotification = this.config.pushNotificationConfig;
if (!result.configuration.taskTaskPushNotificationConfig && this.config?.pushNotificationConfig) {
if (params.message?.taskId !== undefined) {
result.configuration.taskTaskPushNotificationConfig = {
...this.config.pushNotificationConfig,
taskId: params.message.taskId,
id: this.config.pushNotificationConfig.id || '', // Must have ID? Or server generates it?
// If config is partial, we assume missing fields are handled by server or not needed?
// But TaskPushNotificationConfig expects fields.
// This is tricky if TaskPushNotificationConfig is strict.
} as TaskPushNotificationConfig;
}
}
result.configuration.blocking ??= blocking;
result.configuration.returnImmediately ??= returnImmediately;
return result;
}

Expand Down
51 changes: 27 additions & 24 deletions src/client/transports/grpc/grpc_transport.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
import * as grpc from '@grpc/grpc-js';
import { TransportProtocolName } from '../../../core.js';
import { A2AServiceClient, TaskPushNotificationConfig } from '../../../grpc/pb/a2a_services.js';
import { Task, AgentCard } from '../../../types/pb/a2a_types.js';
import { A2AServiceClient, TaskPushNotificationConfig } from '../../../grpc/pb/a2a.js';
import { type Task, type AgentCard } from '../../../types/pb/a2a.js';
import {
CancelTaskRequest,
CreateTaskPushNotificationConfigRequest,
DeleteTaskPushNotificationConfigRequest,
GetAgentCardRequest,
GetExtendedAgentCardRequest,
GetTaskPushNotificationConfigRequest,
GetTaskRequest,
ListTaskPushNotificationConfigRequest,
ListTaskPushNotificationConfigsRequest,
SendMessageRequest,
TaskSubscriptionRequest,
SubscribeToTaskRequest,
A2AStreamEventData,
SendMessageResult,
} from '../../../index.js';
Expand Down Expand Up @@ -60,11 +59,15 @@ export class GrpcTransport implements Transport {
}

async getExtendedAgentCard(options?: RequestOptions): Promise<AgentCard> {
const rpcResponse = await this._sendGrpcRequest<GetAgentCardRequest, AgentCard, AgentCard>(
'getAgentCard',
{},
const rpcResponse = await this._sendGrpcRequest<
GetExtendedAgentCardRequest,
AgentCard,
AgentCard
>(
'getExtendedAgentCard',
{ tenant: '' },
options,
this.grpcClient.getAgentCard.bind(this.grpcClient),
this.grpcClient.getExtendedAgentCard.bind(this.grpcClient),
(req) => req
);
return rpcResponse;
Expand Down Expand Up @@ -97,11 +100,11 @@ export class GrpcTransport implements Transport {
}

async setTaskPushNotificationConfig(
params: CreateTaskPushNotificationConfigRequest,
params: TaskPushNotificationConfig,
options?: RequestOptions
): Promise<TaskPushNotificationConfig> {
const rpcResponse = await this._sendGrpcRequest<
CreateTaskPushNotificationConfigRequest,
TaskPushNotificationConfig,
TaskPushNotificationConfig,
TaskPushNotificationConfig
>(
Expand Down Expand Up @@ -132,15 +135,15 @@ export class GrpcTransport implements Transport {
return rpcResponse;
}

async listTaskPushNotificationConfig(
params: ListTaskPushNotificationConfigRequest,
async listTaskPushNotificationConfigs(
params: ListTaskPushNotificationConfigsRequest,
options?: RequestOptions
): Promise<TaskPushNotificationConfig[]> {
const rpcResponse = await this._sendGrpcRequest(
'listTaskPushNotificationConfig',
'listTaskPushNotificationConfigs',
params,
options,
this.grpcClient.listTaskPushNotificationConfig.bind(this.grpcClient),
this.grpcClient.listTaskPushNotificationConfigs.bind(this.grpcClient),
FromProto.listTaskPushNotificationConfig
);
return rpcResponse;
Expand Down Expand Up @@ -181,15 +184,15 @@ export class GrpcTransport implements Transport {
return rpcResponse;
}

async *resubscribeTask(
params: TaskSubscriptionRequest,
async *subscribeToTask(
params: SubscribeToTaskRequest,
options?: RequestOptions
): AsyncGenerator<A2AStreamEventData, void, undefined> {
yield* this._sendGrpcStreamingRequest(
'taskSubscription',
'subscribeToTask',
params,
options,
this.grpcClient.taskSubscription.bind(this.grpcClient)
this.grpcClient.subscribeToTask.bind(this.grpcClient)
);
}

Expand Down Expand Up @@ -230,7 +233,7 @@ export class GrpcTransport implements Transport {
}

private async *_sendGrpcStreamingRequest<TReq, TRes>(
method: 'sendStreamingMessage' | 'taskSubscription',
method: 'sendStreamingMessage' | 'subscribeToTask',
params: TReq,
options: RequestOptions | undefined,
call: GrpcStreamCall<TReq, TRes>
Expand Down Expand Up @@ -290,7 +293,7 @@ export class GrpcTransport implements Transport {
if (method === 'cancelTask') {
return new TaskNotCancelableError(error.details);
}
if (method === 'getAgentCard') {
if (method === 'getExtendedAgentCard') {
return new AuthenticatedExtendedCardNotConfiguredError(error.details);
}
break;
Expand All @@ -300,12 +303,12 @@ export class GrpcTransport implements Transport {
'getTaskPushNotificationConfig',
'createTaskPushNotificationConfig',
'deleteTaskPushNotificationConfig',
'listTaskPushNotificationConfig',
'listTaskPushNotificationConfigs',
].includes(method)
) {
return new PushNotificationNotSupportedError(error.details);
}
if (['getAgentCard', 'taskSubscription'].includes(method)) {
if (['getExtendedAgentCard', 'subscribeToTask'].includes(method)) {
return new UnsupportedOperationError(error.details);
}
break;
Expand Down
Loading
Loading