Skip to content

Commit b6f7375

Browse files
authored
feat(js): durable streaming for flows (experimental) (#3770)
1 parent 56b9e85 commit b6f7375

File tree

30 files changed

+4568
-141
lines changed

30 files changed

+4568
-141
lines changed

.gitignore

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@ js/testapps/flow-simple-ai/*.wav
4747
js/testapps/flow-simple-ai/meme-of-the-day.mp4
4848
js/testapps/flow-simple-ai/photo.mp4
4949
*.local.*
50+
js/plugins/firebase/database-debug.log
51+
js/plugins/firebase/firestore-debug.log
52+
.firebaserc
5053

5154
# auto-generated
5255
/js/core/src/__codegen
@@ -64,4 +67,4 @@ next-env.d.ts
6467

6568
# Code Coverage
6669
js/plugins/compat-oai/coverage/
67-
.firebaserc
70+

js/core/src/action.ts

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,25 @@ import { lazy } from './async.js';
2121
import { getContext, runWithContext, type ActionContext } from './context.js';
2222
import type { ActionType, Registry } from './registry.js';
2323
import { parseSchema } from './schema.js';
24+
import {
25+
type ActionStreamInput,
26+
type ActionStreamSubscriber,
27+
type StreamManager,
28+
} from './streaming.js';
2429
import {
2530
SPAN_TYPE_ATTR,
2631
runInNewSpan,
2732
setCustomMetadataAttributes,
2833
} from './tracing.js';
2934

3035
export { StatusCodes, StatusSchema, type Status } from './statusTypes.js';
31-
export type { JSONSchema7 };
36+
export { InMemoryStreamManager, StreamNotFoundError } from './streaming.js';
37+
export type {
38+
ActionStreamInput,
39+
ActionStreamSubscriber,
40+
JSONSchema7,
41+
StreamManager,
42+
};
3243

3344
const makeNoopAbortSignal = () => new AbortController().signal;
3445

js/core/src/async.ts

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ export interface Task<T> {
2727
}
2828

2929
/** Utility for creating Tasks. */
30-
function createTask<T>(): Task<T> {
30+
export function createTask<T>(): Task<T> {
3131
let resolve: unknown, reject: unknown;
3232
const promise = new Promise<T>(
3333
(res, rej) => ([resolve, reject] = [res, rej])
@@ -126,3 +126,51 @@ export function lazy<T>(fn: () => T | PromiseLike<T>): PromiseLike<T> {
126126
}
127127
});
128128
}
129+
130+
/**
131+
* Options for AsyncTaskQueue.
132+
*/
133+
export interface AsyncTaskQueueOptions {
134+
/**
135+
* If true, the queue will stop executing subsequent tasks if a task fails.
136+
* If false (default), the queue will continue executing subsequent tasks even if a task fails.
137+
*/
138+
stopOnError?: boolean;
139+
}
140+
141+
/**
142+
* A queue for asynchronous tasks. The queue ensures that only one task runs at a time in order.
143+
*/
144+
export class AsyncTaskQueue {
145+
private last: Promise<any> = Promise.resolve();
146+
private options: AsyncTaskQueueOptions;
147+
148+
constructor(options?: AsyncTaskQueueOptions) {
149+
this.options = options || {};
150+
}
151+
152+
/**
153+
* Adds a task to the queue.
154+
* The task will be executed when its turn comes up in the queue.
155+
* @param task A function that returns a value or a PromiseLike.
156+
*/
157+
enqueue(task: () => any | PromiseLike<any>) {
158+
if (this.options.stopOnError) {
159+
this.last = this.last.then(() => lazy(task)).then((res) => res);
160+
} else {
161+
this.last = this.last
162+
.catch(() => {})
163+
.then(() => lazy(task))
164+
.then((res) => res);
165+
}
166+
// Prevent unhandled promise rejections.
167+
this.last.catch(() => {});
168+
}
169+
170+
/**
171+
* Waits for all tasks currently in the queue to complete.
172+
*/
173+
async merge() {
174+
await this.last;
175+
}
176+
}

js/core/src/streaming.ts

Lines changed: 206 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,206 @@
1+
/**
2+
* Copyright 2024 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import { GenkitError } from './error';
18+
19+
/**
20+
* Error thrown when a stream cannot be found.
21+
*/
22+
export class StreamNotFoundError extends GenkitError {
23+
constructor(message: string) {
24+
super({ status: 'NOT_FOUND', message });
25+
this.name = 'StreamNotFoundError';
26+
}
27+
}
28+
29+
/**
30+
* Interface for writing content to a stream.
31+
* @template S The type of the stream chunks.
32+
* @template O The type of the final output.
33+
*/
34+
export interface ActionStreamInput<S, O> {
35+
/**
36+
* Writes a chunk to the stream.
37+
* @param chunk The chunk data to write.
38+
*/
39+
write(chunk: S): Promise<void>;
40+
/**
41+
* Closes the stream with a final output.
42+
* @param output The final output data.
43+
*/
44+
done(output: O): Promise<void>;
45+
/**
46+
* Closes the stream with an error.
47+
* @param err The error that occurred.
48+
*/
49+
error(err: any): Promise<void>;
50+
}
51+
52+
/**
53+
* Subscriber callbacks for receiving stream events.
54+
* @template S The type of the stream chunks.
55+
* @template O The type of the final output.
56+
*/
57+
export type ActionStreamSubscriber<S, O> = {
58+
/** Called when a chunk is received. */
59+
onChunk: (chunk: S) => void;
60+
/** Called when the stream completes successfully. */
61+
onDone: (output: O) => void;
62+
/** Called when the stream encounters an error. */
63+
onError: (error: any) => void;
64+
};
65+
66+
/**
67+
* Interface for managing streaming actions, allowing creation and subscription to streams.
68+
* Implementations can provide different storage backends (e.g., in-memory, database, cache).
69+
*/
70+
export interface StreamManager {
71+
/**
72+
* Opens a stream for writing.
73+
* @param streamId The unique identifier for the stream.
74+
* @returns An object to write to the stream.
75+
*/
76+
open<S, O>(streamId: string): Promise<ActionStreamInput<S, O>>;
77+
/**
78+
* Subscribes to a stream to receive its events.
79+
* @param streamId The unique identifier for the stream.
80+
* @param options The subscriber callbacks.
81+
* @returns A promise resolving to an object containing an unsubscribe function.
82+
*/
83+
subscribe<S, O>(
84+
streamId: string,
85+
options: ActionStreamSubscriber<S, O>
86+
): Promise<{ unsubscribe: () => void }>;
87+
}
88+
89+
type StreamState<S, O> =
90+
| {
91+
status: 'open';
92+
chunks: S[];
93+
subscribers: ActionStreamSubscriber<S, O>[];
94+
lastTouched: number;
95+
}
96+
| { status: 'done'; chunks: S[]; output: O; lastTouched: number }
97+
| { status: 'error'; chunks: S[]; error: any; lastTouched: number };
98+
99+
/**
100+
* An in-memory implementation of StreamManager.
101+
* Useful for testing or single-instance deployments where persistence is not required.
102+
*/
103+
export class InMemoryStreamManager implements StreamManager {
104+
private streams: Map<string, StreamState<any, any>> = new Map();
105+
106+
/**
107+
* @param options Configuration options.
108+
* @param options.ttlSeconds Time-to-live for streams in seconds. Defaults to 5 minutes.
109+
*/
110+
constructor(private options: { ttlSeconds?: number } = {}) {}
111+
112+
private _cleanup() {
113+
const ttl = (this.options.ttlSeconds ?? 5 * 60) * 1000;
114+
const now = Date.now();
115+
for (const [streamId, stream] of this.streams.entries()) {
116+
if (stream.status !== 'open' && now - stream.lastTouched > ttl) {
117+
this.streams.delete(streamId);
118+
}
119+
}
120+
}
121+
122+
async open<S, O>(streamId: string): Promise<ActionStreamInput<S, O>> {
123+
this._cleanup();
124+
if (this.streams.has(streamId)) {
125+
throw new Error(`Stream with id ${streamId} already exists.`);
126+
}
127+
this.streams.set(streamId, {
128+
status: 'open',
129+
chunks: [],
130+
subscribers: [],
131+
lastTouched: Date.now(),
132+
});
133+
134+
return {
135+
write: async (chunk: S) => {
136+
const stream = this.streams.get(streamId);
137+
if (stream?.status === 'open') {
138+
stream.chunks.push(chunk);
139+
stream.subscribers.forEach((s) => s.onChunk(chunk));
140+
stream.lastTouched = Date.now();
141+
}
142+
},
143+
done: async (output: O) => {
144+
const stream = this.streams.get(streamId);
145+
if (stream?.status === 'open') {
146+
this.streams.set(streamId, {
147+
status: 'done',
148+
chunks: stream.chunks,
149+
output,
150+
lastTouched: Date.now(),
151+
});
152+
stream.subscribers.forEach((s) => s.onDone(output));
153+
}
154+
},
155+
error: async (err: any) => {
156+
const stream = this.streams.get(streamId);
157+
if (stream?.status === 'open') {
158+
stream.subscribers.forEach((s) => s.onError(err));
159+
this.streams.set(streamId, {
160+
status: 'error',
161+
chunks: stream.chunks,
162+
error: err,
163+
lastTouched: Date.now(),
164+
});
165+
}
166+
},
167+
};
168+
}
169+
170+
async subscribe<S, O>(
171+
streamId: string,
172+
subscriber: ActionStreamSubscriber<S, O>
173+
): Promise<{ unsubscribe: () => void }> {
174+
const stream = this.streams.get(streamId);
175+
if (!stream) {
176+
throw new StreamNotFoundError(`Stream with id ${streamId} not found.`);
177+
}
178+
179+
if (stream.status === 'done') {
180+
for (const chunk of stream.chunks) {
181+
subscriber.onChunk(chunk);
182+
}
183+
subscriber.onDone(stream.output);
184+
} else if (stream.status === 'error') {
185+
for (const chunk of stream.chunks) {
186+
subscriber.onChunk(chunk);
187+
}
188+
subscriber.onError(stream.error);
189+
} else {
190+
stream.chunks.forEach((chunk) => subscriber.onChunk(chunk));
191+
stream.subscribers.push(subscriber);
192+
}
193+
194+
return {
195+
unsubscribe: () => {
196+
const currentStream = this.streams.get(streamId);
197+
if (currentStream?.status === 'open') {
198+
const index = currentStream.subscribers.indexOf(subscriber);
199+
if (index > -1) {
200+
currentStream.subscribers.splice(index, 1);
201+
}
202+
}
203+
},
204+
};
205+
}
206+
}

0 commit comments

Comments
 (0)