Skip to content

Commit 4e21c28

Browse files
committed
Adding streaming support
1 parent 4d8896e commit 4e21c28

File tree

15 files changed

+273
-67
lines changed

15 files changed

+273
-67
lines changed

packages/core/src/v3/apiClient/index.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,14 +61,13 @@ import {
6161
SubscribeToRunsQueryParams,
6262
UpdateEnvironmentVariableParams,
6363
} from "./types.js";
64-
import type { AsyncIterableStream } from "./stream.js";
64+
import { AsyncIterableStream } from "../streams/asyncIterableStream.js";
6565

6666
export type {
6767
CreateEnvironmentVariableParams,
6868
ImportEnvironmentVariablesParams,
6969
SubscribeToRunsQueryParams,
7070
UpdateEnvironmentVariableParams,
71-
AsyncIterableStream,
7271
};
7372

7473
export type ClientTriggerOptions = {

packages/core/src/v3/apiClient/runStream.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,11 @@ import {
1616
} from "../utils/ioSerialization.js";
1717
import { ApiError } from "./errors.js";
1818
import { ApiClient } from "./index.js";
19+
import { LineTransformStream, zodShapeStream } from "./stream.js";
1920
import {
2021
AsyncIterableStream,
2122
createAsyncIterableReadable,
22-
LineTransformStream,
23-
zodShapeStream,
24-
} from "./stream.js";
23+
} from "../streams/asyncIterableStream.js";
2524

2625
export type RunShape<TRunTypes extends AnyRunTypes> = TRunTypes extends AnyRunTypes
2726
? {

packages/core/src/v3/apiClient/stream.ts

Lines changed: 1 addition & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import {
99
type Row,
1010
type ShapeStreamInterface,
1111
} from "@electric-sql/client";
12+
import { AsyncIterableStream, createAsyncIterableStream } from "../streams/asyncIterableStream.js";
1213

1314
export type ZodShapeStreamOptions = {
1415
headers?: Record<string, string>;
@@ -82,57 +83,6 @@ export function zodShapeStream<TShapeSchema extends z.ZodTypeAny>(
8283
};
8384
}
8485

85-
export type AsyncIterableStream<T> = AsyncIterable<T> & ReadableStream<T>;
86-
87-
export function createAsyncIterableStream<S, T>(
88-
source: ReadableStream<S>,
89-
transformer: Transformer<S, T>
90-
): AsyncIterableStream<T> {
91-
const transformedStream: any = source.pipeThrough(new TransformStream(transformer));
92-
93-
transformedStream[Symbol.asyncIterator] = () => {
94-
const reader = transformedStream.getReader();
95-
return {
96-
async next(): Promise<IteratorResult<string>> {
97-
const { done, value } = await reader.read();
98-
return done ? { done: true, value: undefined } : { done: false, value };
99-
},
100-
};
101-
};
102-
103-
return transformedStream;
104-
}
105-
106-
export function createAsyncIterableReadable<S, T>(
107-
source: ReadableStream<S>,
108-
transformer: Transformer<S, T>,
109-
signal: AbortSignal
110-
): AsyncIterableStream<T> {
111-
return new ReadableStream<T>({
112-
async start(controller) {
113-
const transformedStream = source.pipeThrough(new TransformStream(transformer));
114-
const reader = transformedStream.getReader();
115-
116-
signal.addEventListener("abort", () => {
117-
queueMicrotask(() => {
118-
reader.cancel();
119-
controller.close();
120-
});
121-
});
122-
123-
while (true) {
124-
const { done, value } = await reader.read();
125-
if (done) {
126-
controller.close();
127-
break;
128-
}
129-
130-
controller.enqueue(value);
131-
}
132-
},
133-
}) as AsyncIterableStream<T>;
134-
}
135-
13686
class ReadableShapeStream<T extends Row<unknown> = Row> {
13787
readonly #stream: ShapeStreamInterface<T>;
13888
readonly #currentState: Map<string, T> = new Map();

packages/core/src/v3/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ export * from "./types/index.js";
2121
export { links } from "./links.js";
2222
export * from "./jwt.js";
2323
export * from "./idempotencyKeys.js";
24+
export * from "./streams/asyncIterableStream.js";
2425
export {
2526
formatDuration,
2627
formatDurationInDays,

packages/core/src/v3/logger/index.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,10 @@ export class LoggerAPI implements TaskLogger {
5151
return this.#getTaskLogger().trace(name, fn, options);
5252
}
5353

54+
public startSpan(name: string, options?: SpanOptions): Span {
55+
return this.#getTaskLogger().startSpan(name, options);
56+
}
57+
5458
#getTaskLogger(): TaskLogger {
5559
return getGlobal(API_NAME) ?? NOOP_TASK_LOGGER;
5660
}

packages/core/src/v3/logger/taskLogger.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ export interface TaskLogger {
2424
warn(message: string, properties?: Record<string, unknown>): void;
2525
error(message: string, properties?: Record<string, unknown>): void;
2626
trace<T>(name: string, fn: (span: Span) => Promise<T>, options?: SpanOptions): Promise<T>;
27+
startSpan(name: string, options?: SpanOptions): Span;
2728
}
2829

2930
export class OtelTaskLogger implements TaskLogger {
@@ -90,6 +91,10 @@ export class OtelTaskLogger implements TaskLogger {
9091
return this._config.tracer.startActiveSpan(name, fn, options);
9192
}
9293

94+
startSpan(name: string, options?: SpanOptions): Span {
95+
return this._config.tracer.startSpan(name, options);
96+
}
97+
9398
#getTimestampInHrTime(): ClockTime {
9499
return clock.preciseNow();
95100
}
@@ -104,6 +109,9 @@ export class NoopTaskLogger implements TaskLogger {
104109
trace<T>(name: string, fn: (span: Span) => Promise<T>): Promise<T> {
105110
return fn({} as Span);
106111
}
112+
startSpan(): Span {
113+
return {} as Span;
114+
}
107115
}
108116

109117
function safeJsonProcess(value?: Record<string, unknown>): Record<string, unknown> | undefined {

packages/core/src/v3/runMetadata/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { DeserializedJson } from "../../schemas/json.js";
2-
import { AsyncIterableStream } from "../apiClient/stream.js";
2+
import { AsyncIterableStream } from "../streams/asyncIterableStream.js";
33
import { getGlobal, registerGlobal } from "../utils/globals.js";
44
import { ApiRequestOptions } from "../zodfetch.js";
55
import { NoopRunMetadataManager } from "./noopManager.js";

packages/core/src/v3/runMetadata/manager.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
import { dequal } from "dequal/lite";
22
import { DeserializedJson } from "../../schemas/json.js";
33
import { ApiClient } from "../apiClient/index.js";
4-
import { AsyncIterableStream } from "../apiClient/stream.js";
54
import { FlushedRunMetadata, RunMetadataChangeOperation } from "../schemas/common.js";
65
import { ApiRequestOptions } from "../zodfetch.js";
76
import { MetadataStream } from "./metadataStream.js";
87
import { applyMetadataOperations } from "./operations.js";
98
import { RunMetadataManager, RunMetadataUpdater } from "./types.js";
9+
import { AsyncIterableStream } from "../streams/asyncIterableStream.js";
1010

1111
const MAXIMUM_ACTIVE_STREAMS = 5;
1212
const MAXIMUM_TOTAL_STREAMS = 10;

packages/core/src/v3/runMetadata/noopManager.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { DeserializedJson } from "../../schemas/json.js";
2-
import { AsyncIterableStream } from "../apiClient/stream.js";
2+
import { AsyncIterableStream } from "../streams/asyncIterableStream.js";
33
import { ApiRequestOptions } from "../zodfetch.js";
44
import type { RunMetadataManager, RunMetadataUpdater } from "./types.js";
55

packages/core/src/v3/runMetadata/types.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { DeserializedJson } from "../../schemas/json.js";
2-
import { AsyncIterableStream } from "../apiClient/stream.js";
2+
import { AsyncIterableStream } from "../streams/asyncIterableStream.js";
33
import { ApiRequestOptions } from "../zodfetch.js";
44

55
export interface RunMetadataUpdater {

0 commit comments

Comments
 (0)