Skip to content

Commit eea893b

Browse files
committed
WIP: AsyncGenerator streaming stub
1 parent 5c25ce4 commit eea893b

File tree

2 files changed

+93
-24
lines changed

2 files changed

+93
-24
lines changed

packages/agents/src/react.tsx

Lines changed: 84 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -145,11 +145,11 @@ type NonStreamingRPCMethod<T extends Method> =
145145
: never;
146146

147147
interface StreamingResponse<
148-
OnChunkT extends SerializableValue | unknown = unknown,
149-
OnDoneT extends SerializableValue | unknown = unknown
148+
Chunk extends SerializableValue | unknown = unknown,
149+
Done extends SerializableValue | unknown = unknown
150150
> {
151-
send(chunk: OnChunkT): void;
152-
end(finalChunk?: OnDoneT): void;
151+
send(chunk: Chunk): void;
152+
end(finalChunk?: Done): void;
153153
}
154154

155155
type StreamingRPCMethod<T extends Method> = T extends (
@@ -190,6 +190,14 @@ type StreamOptionsFrom<StreamingResponseT> =
190190
? StreamOptions<T, U>
191191
: never;
192192

193+
type ReturnAndChunkTypesFrom<StreamingResponseT extends StreamingResponse> =
194+
StreamingResponseT extends StreamingResponse<
195+
infer Chunk extends SerializableValue,
196+
infer Done extends SerializableValue
197+
>
198+
? [Chunk, Done]
199+
: never;
200+
193201
type RestParameters<T extends Method> =
194202
Parameters<StreamingRPCMethod<T>> extends [unknown, ...infer Rest]
195203
? Rest
@@ -266,20 +274,35 @@ type UntypedAgentMethodCall = <T = unknown>(
266274
) => Promise<T>;
267275

268276
type AgentStub<T> = {
277+
[K in keyof AgentMethods<T>]: AgentMethods<T>[K] extends NonStreamingRPCMethod<
278+
AgentMethods<T>[K]
279+
>
280+
? (
281+
...args: Parameters<AgentMethods<T>[K]>
282+
) => AgentPromiseReturnType<AgentMethods<T>, K>
283+
: never;
284+
};
285+
286+
type AgentStreamingStub<T> = {
269287
[K in keyof AgentMethods<T>]: AgentMethods<T>[K] extends StreamingRPCMethod<
270288
AgentMethods<T>[K]
271289
>
272290
? (
273-
options: StreamOptionsFrom<Parameters<AgentMethods<T>[K]>[0]>,
274291
...args: RestParameters<AgentMethods<T>[K]>
275-
) => void
276-
: (
277-
...args: Parameters<AgentMethods<T>[K]>
278-
) => AgentPromiseReturnType<AgentMethods<T>, K>;
292+
) => AsyncGenerator<
293+
ReturnAndChunkTypesFrom<
294+
Parameters<StreamingRPCMethod<AgentMethods<T>[K]>>[0]
295+
>[0],
296+
ReturnAndChunkTypesFrom<
297+
Parameters<StreamingRPCMethod<AgentMethods<T>[K]>>[0]
298+
>[1]
299+
>
300+
: never;
279301
};
280302

281303
// we neet to use Method instead of RPCMethod here for retro-compatibility
282304
type UntypedAgentStub = Record<string, Method>;
305+
type UntypedAgentStreamingStub = StreamingAgentMethods<unknown>;
283306

284307
/**
285308
* React hook for connecting to an Agent
@@ -292,6 +315,7 @@ export function useAgent<State = unknown>(
292315
setState: (state: State) => void;
293316
call: UntypedAgentMethodCall;
294317
stub: UntypedAgentStub;
318+
streamingStub: UntypedAgentStreamingStub;
295319
};
296320
export function useAgent<
297321
AgentT extends {
@@ -306,6 +330,7 @@ export function useAgent<
306330
setState: (state: State) => void;
307331
call: AgentMethodCall<AgentT>;
308332
stub: AgentStub<AgentT>;
333+
streamingStub: AgentStreamingStub<AgentT>;
309334
};
310335
export function useAgent<State>(
311336
options: UseAgentOptions<unknown>
@@ -462,6 +487,7 @@ export function useAgent<State>(
462487
setState: (state: State) => void;
463488
call: UntypedAgentMethodCall;
464489
stub: UntypedAgentStub;
490+
streamingStub: UntypedAgentStreamingStub;
465491
};
466492
// Create the call method
467493
const call = useCallback(
@@ -510,6 +536,55 @@ export function useAgent<State>(
510536
}
511537
}
512538
);
539+
// biome-ignore lint: suppressions/parse
540+
agent.streamingStub = new Proxy<any>(
541+
{},
542+
{
543+
get: (_target, method) => {
544+
return async function* (...args: unknown[]) {
545+
let resolve: (value: unknown) => void;
546+
let reject: (reason: unknown) => void;
547+
let promise = new Promise((res, rej) => {
548+
resolve = res;
549+
reject = rej;
550+
});
551+
552+
// 4. State flags
553+
let isDone = false;
554+
555+
// 5. Callback implementation
556+
const streamOptions: StreamOptions = {
557+
onChunk: (chunk: unknown) => {
558+
resolve(chunk);
559+
promise = new Promise((res, rej) => {
560+
resolve = res;
561+
reject = rej;
562+
});
563+
},
564+
onError: (error: unknown) => {
565+
isDone = true;
566+
reject(error);
567+
},
568+
onDone: (done: unknown) => {
569+
isDone = true;
570+
resolve(done);
571+
}
572+
};
573+
574+
call(method as string, args, streamOptions);
575+
576+
while (!isDone) {
577+
const result = await promise;
578+
if (isDone) {
579+
return result;
580+
} else {
581+
yield result;
582+
}
583+
}
584+
};
585+
}
586+
}
587+
);
513588

514589
// warn if agent isn't in lowercase
515590
if (agent.agent !== agent.agent.toLowerCase()) {

packages/agents/src/tests-d/example-stub.test-d.ts

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
import type { env } from "cloudflare:workers";
33
import { Agent, callable, type StreamingResponse } from "..";
44
import { useAgent } from "../react.tsx";
5-
import type { StreamOptions } from "../client.ts";
65

76
class MyAgent extends Agent<typeof env, {}> {
87
@callable()
@@ -57,7 +56,7 @@ class MyAgent extends Agent<typeof env, {}> {
5756
}
5857
}
5958

60-
const { stub } = useAgent<MyAgent, {}>({ agent: "my-agent" });
59+
const { stub, streamingStub } = useAgent<MyAgent, {}>({ agent: "my-agent" });
6160
// return type is promisified
6261
stub.sayHello() satisfies Promise<string>;
6362

@@ -76,21 +75,16 @@ await stub.nonRpc();
7675
// @ts-expect-error nonSerializable is not serializable
7776
await stub.nonSerializable("hello", new Date());
7877

79-
const streamOptions: StreamOptions<number, boolean> = {};
80-
81-
// biome-ignore lint: suspicious/noConfusingVoidType
82-
stub.performStream(streamOptions, "hello") satisfies void;
78+
const generator = streamingStub.performStream("hello") satisfies AsyncGenerator<
79+
number,
80+
boolean
81+
>;
82+
for await (const chunk of generator) {
83+
chunk satisfies number;
84+
}
8385

8486
// @ts-expect-error there's no 2nd argument
85-
stub.performStream(streamOptions, "hello", 1);
86-
87-
const invalidStreamOptions: StreamOptions<string, boolean> = {};
88-
89-
// @ts-expect-error streamOptions must be of type StreamOptions<number, boolean>
90-
stub.performStream(invalidStreamOptions, "hello");
91-
92-
// @ts-expect-error first argument is not a streamOptions
93-
stub.performStreamFirstArgNotStreamOptions("hello", streamOptions);
87+
streamingStub.performStream("hello", 1);
9488

9589
const { stub: stub2 } = useAgent<Omit<MyAgent, "nonRpc">, {}>({
9690
agent: "my-agent"

0 commit comments

Comments
 (0)