Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion src/server/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ export { DefaultExecutionEventBusManager } from './events/execution_event_bus_ma
export { ExecutionEventQueue } from './events/execution_event_queue.js';

export type { A2ARequestHandler } from './request_handler/a2a_request_handler.js';
export { DefaultRequestHandler } from './request_handler/default_request_handler.js';
export { RequestHandlerInterceptor as DefaultRequestHandler } from './request_handler/request_handler_interceptor.js';
export type { ExtendedAgentCardProvider } from './request_handler/default_request_handler.js';
export { ResultManager } from './result_manager.js';
export type { TaskStore } from './store.js';
Expand All @@ -31,3 +31,11 @@ export { InMemoryPushNotificationStore } from './push_notification/push_notifica

export type { User } from './authentication/user.js';
export { UnauthenticatedUser } from './authentication/user.js';

export {
HandlerInterceptor,
BeforeArgs,
AfterArgs,
HandlerCallInput,
HandlerCallResult,
} from './interceptors.js';
128 changes: 128 additions & 0 deletions src/server/interceptors.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
import { A2AStreamEventData } from './request_handler/a2a_request_handler.js';
import { ServerCallContext } from './context.js';
import { A2ARequestHandler } from './request_handler/a2a_request_handler.js';
import { AgentCard } from '../types.js';

export interface HandlerInterceptor {
/**
* Invoked before handler method.
*/
before(args: BeforeArgs): Promise<EarlyReturn | void>;

/**
* Invoked after handler method.
*/
after(args: AfterArgs): Promise<EarlyReturn | void>;
}

export interface EarlyReturn<K extends keyof A2ARequestHandler = keyof A2ARequestHandler> {
/**
* If set by the interceptor, stops execution, invokes "after"
* for executed interceptors and returns the result. Request Handler is not called.
*/
value: HandlerCallResult<K>;
}

export interface BeforeArgs<K extends keyof A2ARequestHandler = keyof A2ARequestHandler> {
/**
* Identifies the Handler method invoked and its payload.
* Payload inside the input object can be modified.
*/
readonly input: HandlerCallInput<K>;

/**
* Identifies the agent card used to create the handler
*/
readonly agentCard: AgentCard;

/**
* Identifies the context created by the server
*/
context?: ServerCallContext;
}

export interface AfterArgs<K extends keyof A2ARequestHandler = keyof A2ARequestHandler> {
/**
* Identifies the Handler method invoked and its result.
* Payload inside the result object can be modified.
*/
readonly result: HandlerCallResult<K>;

/**
* Identifies the agent card used to create the handler
*/
readonly agentCard: AgentCard;

/**
* Identifies the context created by the server
*/
context?: ServerCallContext;
}

export type HandlerCallInput<K extends keyof A2ARequestHandler = keyof A2ARequestHandler> =
MethodInput<A2ARequestHandler, K>;
export type HandlerCallResult<K extends keyof A2ARequestHandler = keyof A2ARequestHandler> =
MethodResult<A2ARequestHandler, K, ResultsOverrides>;

// Types below are helper types and are not exported to allow simplifying it without affecting
// public API if necessary. They are exported via type aliases HandlerXxx which can be replaced with explicit union if necessary.

/**
* For
*
* interface Foo {
* f1(arg: string): Promise<Result1>;
* f2(arg: number): Promise<Result2>;
* }
*
* MethodInputs<Foo> resolves to
*
* {
* readonly method: "f1";
* value: string;
* } | {
* readonly method: "f2";
* value: number;
* }
*/
type MethodInput<T, TMembers extends keyof T = keyof T> = {
[M in TMembers]: T[M] extends (context: ServerCallContext | undefined) => unknown
? { readonly method: M; value?: never }
: T[M] extends (payload: infer P) => unknown
? { readonly method: M; value: P }
: never;
}[TMembers];

/**
* For
*
* interface Foo {
* f1(): Promise<Result1>;
* f2(): Promise<Result2>;
* }
*
* MethodsResults<Foo> resolves to
*
* {
* readonly method: "f1";
* value: Result1;
* } | {
* readonly method: "f2";
* value: Result2;
* }
*/
type MethodResult<T, TMembers extends keyof T = keyof T, TOverrides = object> = {
[M in TMembers]: M extends keyof TOverrides // If there is an override, use it directly.
? { readonly method: M; value: TOverrides[M] }
: // Infer result, unwrap it from Promise and pack with method name.
T[M] extends (payload: unknown) => infer R
? { readonly method: M; value: Awaited<R> }
: never;
}[TMembers];

interface ResultsOverrides {
// sendMessageStream and resubscribeTask return async iterators and are intercepted on each item,
// which requires custom handling.
sendMessageStream: A2AStreamEventData;
resubscribe: A2AStreamEventData;
}
8 changes: 7 additions & 1 deletion src/server/request_handler/a2a_request_handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import {
} from '../../types.js';
import { ServerCallContext } from '../context.js';

export type A2AStreamEventData = Message | Task | TaskStatusUpdateEvent | TaskArtifactUpdateEvent;

export interface A2ARequestHandler {
getAgentCard(): Promise<AgentCard>;

Expand Down Expand Up @@ -56,5 +58,9 @@ export interface A2ARequestHandler {
resubscribe(
params: TaskIdParams,
context?: ServerCallContext
): AsyncGenerator<Task | TaskStatusUpdateEvent | TaskArtifactUpdateEvent, void, undefined>;
): AsyncGenerator<
Message | Task | TaskStatusUpdateEvent | TaskArtifactUpdateEvent,
void,
undefined
>;
}
16 changes: 3 additions & 13 deletions src/server/request_handler/default_request_handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import { AgentExecutionEvent } from '../events/execution_event_bus.js';
import { ExecutionEventQueue } from '../events/execution_event_queue.js';
import { ResultManager } from '../result_manager.js';
import { TaskStore } from '../store.js';
import { A2ARequestHandler } from './a2a_request_handler.js';
import { A2ARequestHandler, A2AStreamEventData } from './a2a_request_handler.js';
import {
InMemoryPushNotificationStore,
PushNotificationStore,
Expand Down Expand Up @@ -305,11 +305,7 @@ export class DefaultRequestHandler implements A2ARequestHandler {
async *sendMessageStream(
params: MessageSendParams,
context?: ServerCallContext
): AsyncGenerator<
Message | Task | TaskStatusUpdateEvent | TaskArtifactUpdateEvent,
void,
undefined
> {
): AsyncGenerator<A2AStreamEventData, void, undefined> {
const incomingMessage = params.message;
if (!incomingMessage.messageId) {
// For streams, messageId might be set by client, or server can generate if not present.
Expand Down Expand Up @@ -541,13 +537,7 @@ export class DefaultRequestHandler implements A2ARequestHandler {
async *resubscribe(
params: TaskIdParams,
_context?: ServerCallContext
): AsyncGenerator<
| Task // Initial task state
| TaskStatusUpdateEvent
| TaskArtifactUpdateEvent,
void,
undefined
> {
): AsyncGenerator<A2AStreamEventData, void, undefined> {
if (!this.agentCard.capabilities.streaming) {
throw A2AError.unsupportedOperation('Streaming (and thus resubscription) is not supported.');
}
Expand Down
Loading