Skip to content

Commit 063894a

Browse files
authored
Merge pull request #2 from datastax/streaming
Implements easy handling of streaming
2 parents 0983088 + 24a4480 commit 063894a

File tree

9 files changed

+482
-33
lines changed

9 files changed

+482
-33
lines changed

README.md

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ This package provides an easy way to use the [Langflow API](https://docs.datasta
1414
- [Initialization](#initialization)
1515
- [Calling a flow](#calling-a-flow)
1616
- [Flow reponses](#flow-reponses)
17+
- [Streaming](#streaming)
1718
- [File upload](#file-upload)
1819
- [Aborting requests](#aborting-requests)
1920
- [Contributing](#contributing)
@@ -142,6 +143,48 @@ const response = client.flow(flowId).run(input);
142143
console.log(response.chatOutputText());
143144
```
144145

146+
### Streaming
147+
148+
The Langflow API supports streaming responses. Instead of calling `run` on a `Flow` object, you can call `stream` with the same arguments and the response will be a [`ReadableStream`](https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream) of objects.
149+
150+
```js
151+
const response = await client.flow(flowId).stream(input);
152+
153+
for await (const event of response) {
154+
console.log(event);
155+
}
156+
```
157+
158+
There are three different events: `add_message`, `token`, and `end`. The events mean:
159+
160+
- `add_message`: a message being added to the chat and can refer to a human input message or a response from an AI
161+
- `token`: a token that is emitted as part of a message being generated by the flow
162+
- `end`: all tokens have been returned, this message will also contain a full `FlowResponse`
163+
164+
Event objects have the format:
165+
166+
```typescript
167+
{
168+
event: "add_message" | "token" | "end",
169+
data: object
170+
}
171+
```
172+
173+
The `event.data` is different per event type. The `token` event type is the simplest and looks like this:
174+
175+
```typescript
176+
{
177+
"event": "token",
178+
"data": {
179+
"chunk": "hello ",
180+
"id": "6686ff20-0c95-40bb-8879-fd90ed3d634e",
181+
"timestamp": "2025-02-12 22:18:04 UTC"
182+
}
183+
}
184+
```
185+
186+
There's more [documentation and examples of a streaming response in the Langflow docs](https://docs.langflow.org/api-reference-api-examples#run-flow).
187+
145188
### File upload
146189

147190
Chat input components support files as input as well as text. You need to upload your file first, using the file upload function, then provide the file path to the flow as a tweak.

src/errors.ts

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,21 +21,11 @@ export class LangflowError extends Error {
2121
super(message, { cause: response });
2222
this.cause = response;
2323
}
24-
25-
response(): Response {
26-
return this.cause;
27-
}
2824
}
2925

3026
export class LangflowRequestError extends Error {
31-
cause: Error;
32-
3327
constructor(message: string, error: Error) {
3428
super(message, { cause: error });
3529
this.cause = error;
3630
}
37-
38-
error(): Error {
39-
return this.cause;
40-
}
4131
}

src/flow.ts

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,37 @@ export class Flow {
7878
return new FlowResponse(result);
7979
}
8080

81+
async stream(
82+
input_value: string,
83+
options: Partial<Omit<FlowRequestOptions, "input_value">> = {}
84+
) {
85+
const {
86+
input_type = "chat",
87+
output_type = "chat",
88+
session_id,
89+
signal,
90+
} = options;
91+
const tweaks = { ...this.tweaks, ...options.tweaks };
92+
const body = JSON.stringify({
93+
input_type,
94+
output_type,
95+
input_value,
96+
tweaks,
97+
session_id,
98+
});
99+
const headers = new Headers();
100+
headers.set("Content-Type", "application/json");
101+
headers.set("Accept", "application/json");
102+
const streamingResult = await this.client.stream({
103+
path: `/run/${this.id}`,
104+
method: "POST",
105+
body,
106+
headers,
107+
signal,
108+
});
109+
return streamingResult;
110+
}
111+
81112
async uploadFile(path: string, options: { signal?: AbortSignal } = {}) {
82113
const data = await readFile(path);
83114
const { signal } = options;

src/index.ts

Lines changed: 61 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,21 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
import { fetch, FormData } from "undici";
15+
import { fetch } from "undici";
1616

1717
import pkg from "../package.json" with { type: "json" };
1818
import { LangflowError, LangflowRequestError } from "./errors.js";
1919
import { Flow } from "./flow.js";
20-
import type { LangflowClientOptions, Tweaks } from "./types.js";
20+
import type {
21+
LangflowClientOptions,
22+
RequestOptions,
23+
StreamEvent,
24+
Tweaks,
25+
} from "./types.js";
2126
import { DATASTAX_LANGFLOW_BASE_URL } from "./consts.js";
2227

2328
import { platform, arch } from "os";
29+
import { NDJSONStream } from "./ndjson.js";
2430

2531
export class LangflowClient {
2632
baseUrl: string;
@@ -75,33 +81,25 @@ export class LangflowClient {
7581
}
7682
}
7783

78-
flow(flowId: string, tweaks?: Tweaks): Flow {
79-
return new Flow(this, flowId, tweaks);
80-
}
81-
82-
async request(options: {
83-
path: string;
84-
method: string;
85-
body: string | FormData;
86-
headers: Headers;
87-
query?: Record<string, string>;
88-
signal?: AbortSignal;
89-
}): Promise<unknown> {
90-
const { path, method, body, headers, query, signal } = options;
91-
const url = new URL(`${this.baseUrl}${this.basePath}${path}`);
84+
#setHeaders(headers: Headers) {
9285
for (const [header, value] of this.defaultHeaders.entries()) {
9386
if (!headers.has(header)) {
9487
headers.set(header, value);
9588
}
9689
}
97-
if (query) {
98-
Object.entries(query).forEach(([key, value]) => {
99-
url.searchParams.set(key, value);
100-
});
101-
}
10290
if (this.apiKey) {
10391
this.#setApiKey(this.apiKey, headers);
10492
}
93+
}
94+
95+
flow(flowId: string, tweaks?: Tweaks): Flow {
96+
return new Flow(this, flowId, tweaks);
97+
}
98+
99+
async request(options: RequestOptions): Promise<unknown> {
100+
const { path, method, body, headers, signal } = options;
101+
const url = new URL(`${this.baseUrl}${this.basePath}${path}`);
102+
this.#setHeaders(headers);
105103
try {
106104
signal?.throwIfAborted();
107105
const response = await this.fetch(url, { method, body, headers, signal });
@@ -128,4 +126,46 @@ export class LangflowClient {
128126
throw error;
129127
}
130128
}
129+
130+
async stream(options: RequestOptions): Promise<ReadableStream<StreamEvent>> {
131+
const { path, method, body, headers, signal } = options;
132+
const url = new URL(`${this.baseUrl}${this.basePath}${path}`);
133+
url.searchParams.set("stream", "true");
134+
this.#setHeaders(headers);
135+
try {
136+
signal?.throwIfAborted();
137+
const response = await this.fetch(url, {
138+
method,
139+
body,
140+
headers,
141+
signal,
142+
});
143+
if (!response.ok) {
144+
throw new LangflowError(
145+
`${response.status} - ${response.statusText}`,
146+
response
147+
);
148+
}
149+
const ndjsonStream = NDJSONStream<StreamEvent>();
150+
if (response.body) {
151+
return response.body
152+
.pipeThrough(new TextDecoderStream(), { signal })
153+
.pipeThrough(ndjsonStream, { signal });
154+
} else {
155+
throw new LangflowError("No body in the response", response);
156+
}
157+
} catch (error) {
158+
if (
159+
error instanceof LangflowError ||
160+
(error instanceof DOMException &&
161+
(error.name === "AbortError" || error.name === "TimeoutError"))
162+
) {
163+
throw error;
164+
}
165+
if (error instanceof Error) {
166+
throw new LangflowRequestError(error.message, error);
167+
}
168+
throw error;
169+
}
170+
}
131171
}

src/ndjson.ts

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
// Copyright DataStax, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
import { QueuingStrategy } from "node:stream/web";
16+
17+
export function NDJSONStream<T>(
18+
writableStrategy?: QueuingStrategy<string>,
19+
readableStrategy?: QueuingStrategy<T>
20+
) {
21+
let buffer = "";
22+
return new TransformStream<string, T>(
23+
{
24+
transform(chunk, controller) {
25+
const text = chunk.toString();
26+
const lines = text.split("\n");
27+
for (const line of lines) {
28+
buffer += line;
29+
try {
30+
const result = JSON.parse(buffer);
31+
controller.enqueue(result);
32+
buffer = "";
33+
} catch {
34+
// Not a valid JSON object
35+
}
36+
}
37+
},
38+
},
39+
writableStrategy,
40+
readableStrategy
41+
);
42+
}

0 commit comments

Comments
 (0)