Skip to content

Commit 0983088

Browse files
authored
Merge pull request #1 from datastax/abort-signal
Adds ability to abort requests
2 parents 4f036aa + 4a059fb commit 0983088

File tree

5 files changed

+139
-70
lines changed

5 files changed

+139
-70
lines changed

README.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ This package provides an easy way to use the [Langflow API](https://docs.datasta
1515
- [Calling a flow](#calling-a-flow)
1616
- [Flow reponses](#flow-reponses)
1717
- [File upload](#file-upload)
18+
- [Aborting requests](#aborting-requests)
1819
- [Contributing](#contributing)
1920

2021
## Installation
@@ -158,6 +159,21 @@ const response = await flow.tweak("ChatInput-abcd": { files: file.filePath }).ru
158159
> [!WARNING]
159160
> DataStax Langflow doesn't make file upload available, you will receive a 501 Not Implemented error.
160161
162+
### Aborting requests
163+
164+
You can use the standard [`AbortController`](https://developer.mozilla.org/en-US/docs/Web/API/AbortController) to cancel requests by passing a `signal` to the `run` or `uploadFile` functions. The functions will reject with a `DOMException` error with the name `AbortError` or, if you use [`AbortSignal.timeout`](https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal/timeout_static), `TimeoutError`.
165+
166+
For example, when running the following code, if the entire request takes longer than 500ms, then the promise will reject and the error message will be, "The operation was aborted due to timeout".
167+
168+
```js
169+
const signal = AbortSignal.timeout(500);
170+
try {
171+
const response = await client.flow(flowId).run(input, { signal });
172+
} catch (error) {
173+
console.error(error.message);
174+
}
175+
```
176+
161177
## Contributing
162178

163179
To run and contribute to this library you can clone the repository:

src/flow.ts

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,12 @@ export class Flow {
5050
input_value: string,
5151
options: Partial<Omit<FlowRequestOptions, "input_value">> = {}
5252
) {
53-
const { input_type = "chat", output_type = "chat", session_id } = options;
53+
const {
54+
input_type = "chat",
55+
output_type = "chat",
56+
session_id,
57+
signal,
58+
} = options;
5459
const tweaks = { ...this.tweaks, ...options.tweaks };
5560
const body = JSON.stringify({
5661
input_type,
@@ -62,18 +67,20 @@ export class Flow {
6267
const headers = new Headers();
6368
headers.set("Content-Type", "application/json");
6469
headers.set("Accept", "application/json");
65-
const result = (await this.client.request(
66-
`/run/${this.id}`,
67-
"POST",
70+
const result = (await this.client.request({
71+
path: `/run/${this.id}`,
72+
method: "POST",
6873
body,
69-
headers
70-
)) as LangflowResponse;
74+
headers,
75+
signal,
76+
})) as LangflowResponse;
7177

7278
return new FlowResponse(result);
7379
}
7480

75-
async uploadFile(path: string) {
81+
async uploadFile(path: string, options: { signal?: AbortSignal } = {}) {
7682
const data = await readFile(path);
83+
const { signal } = options;
7784
const type = mime.getType(extname(path));
7885
const file = new File([data], basename(path), type ? { type } : {});
7986

@@ -83,12 +90,13 @@ export class Flow {
8390
const headers = new Headers();
8491
headers.set("Accept", "application/json");
8592

86-
const response = (await this.client.request(
87-
`/files/upload/${this.id}`,
88-
"POST",
89-
form,
90-
headers
91-
)) as LangflowUploadResponse;
93+
const response = (await this.client.request({
94+
path: `/files/upload/${this.id}`,
95+
method: "POST",
96+
body: form,
97+
headers,
98+
signal,
99+
})) as LangflowUploadResponse;
92100
return new UploadResponse(response);
93101
}
94102
}

src/index.ts

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -79,13 +79,15 @@ export class LangflowClient {
7979
return new Flow(this, flowId, tweaks);
8080
}
8181

82-
async request(
83-
path: string,
84-
method: string,
85-
body: string | FormData,
86-
headers: Headers,
87-
query?: Record<string, string>
88-
): Promise<unknown> {
82+
async request(options: {
83+
path: string;
84+
method: string;
85+
body: string | FormData;
86+
headers: Headers;
87+
query?: Record<string, string>;
88+
signal?: AbortSignal;
89+
}): Promise<unknown> {
90+
const { path, method, body, headers, query, signal } = options;
8991
const url = new URL(`${this.baseUrl}${this.basePath}${path}`);
9092
for (const [header, value] of this.defaultHeaders.entries()) {
9193
if (!headers.has(header)) {
@@ -101,16 +103,23 @@ export class LangflowClient {
101103
this.#setApiKey(this.apiKey, headers);
102104
}
103105
try {
104-
const response = await this.fetch(url, { method, body, headers });
106+
signal?.throwIfAborted();
107+
const response = await this.fetch(url, { method, body, headers, signal });
105108
if (!response.ok) {
106109
throw new LangflowError(
107110
`${response.status} - ${response.statusText}`,
108111
response
109112
);
110113
}
114+
signal?.throwIfAborted();
111115
return await response.json();
112116
} catch (error) {
113-
if (error instanceof LangflowError) {
117+
// If it is a LangflowError or the result of an aborted signal, rethrow it
118+
if (
119+
error instanceof LangflowError ||
120+
(error instanceof DOMException &&
121+
(error.name === "AbortError" || error.name === "TimeoutError"))
122+
) {
114123
throw error;
115124
}
116125
if (error instanceof Error) {

src/test/index.test.ts

Lines changed: 83 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -94,16 +94,16 @@ describe("LangflowClient", () => {
9494
apiKey,
9595
fetch: fetcher,
9696
});
97-
const response = await client.request(
98-
"/run/flow-id",
99-
"POST",
100-
JSON.stringify({
97+
const response = await client.request({
98+
path: "/run/flow-id",
99+
method: "POST",
100+
body: JSON.stringify({
101101
input_type: "chat",
102102
output_type: "chat",
103103
input_value: "Hello, world!",
104104
}),
105-
new Headers()
106-
);
105+
headers: new Headers(),
106+
});
107107
assert.deepEqual(response, { session_id: "session-id", outputs: [] });
108108
});
109109

@@ -122,16 +122,16 @@ describe("LangflowClient", () => {
122122
apiKey,
123123
fetch: fetcher,
124124
});
125-
await client.request(
126-
"/run/flow-id",
127-
"POST",
128-
JSON.stringify({
125+
await client.request({
126+
path: "/run/flow-id",
127+
method: "POST",
128+
body: JSON.stringify({
129129
input_type: "chat",
130130
output_type: "chat",
131131
input_value: "Hello, world!",
132132
}),
133-
new Headers()
134-
);
133+
headers: new Headers(),
134+
});
135135
});
136136

137137
it("includes a user agent in the headers", async () => {
@@ -154,16 +154,16 @@ describe("LangflowClient", () => {
154154
apiKey,
155155
fetch: fetcher,
156156
});
157-
await client.request(
158-
"/run/flow-id",
159-
"POST",
160-
JSON.stringify({
157+
await client.request({
158+
path: "/run/flow-id",
159+
method: "POST",
160+
body: JSON.stringify({
161161
input_type: "chat",
162162
output_type: "chat",
163163
input_value: "Hello, world!",
164164
}),
165-
new Headers()
166-
);
165+
headers: new Headers(),
166+
});
167167
});
168168

169169
it("throws a LangflowError if the response is not ok", async () => {
@@ -182,16 +182,16 @@ describe("LangflowClient", () => {
182182
});
183183

184184
try {
185-
await client.request(
186-
"/run/flow-id",
187-
"POST",
188-
JSON.stringify({
185+
await client.request({
186+
path: "/run/flow-id",
187+
method: "POST",
188+
body: JSON.stringify({
189189
input_type: "chat",
190190
output_type: "chat",
191191
input_value: "Hello, world!",
192192
}),
193-
new Headers()
194-
);
193+
headers: new Headers(),
194+
});
195195
assert.fail("Expected an error to be thrown");
196196
} catch (error) {
197197
assert.ok(error instanceof LangflowError);
@@ -214,22 +214,57 @@ describe("LangflowClient", () => {
214214
});
215215

216216
try {
217-
await client.request(
218-
"/run/flow-id",
219-
"POST",
220-
JSON.stringify({
217+
await client.request({
218+
path: "/run/flow-id",
219+
method: "POST",
220+
body: JSON.stringify({
221221
input_type: "chat",
222222
output_type: "chat",
223223
input_value: "Hello, world!",
224224
}),
225-
new Headers()
226-
);
225+
headers: new Headers(),
226+
});
227227
assert.fail("Expected an error to be thrown");
228228
} catch (error) {
229229
assert.ok(error instanceof LangflowRequestError);
230230
assert.equal(error.message, "Internal Server Error");
231231
}
232232
});
233+
234+
it("throws a DOMException AbortError if the request is aborted", async () => {
235+
const fetcher = createMockFetch(
236+
{ session_id: "session-id", outputs: [] },
237+
() => {
238+
assert.fail("Should not have made a request");
239+
}
240+
);
241+
242+
const client = new LangflowClient({
243+
baseUrl,
244+
langflowId,
245+
apiKey,
246+
fetch: fetcher,
247+
});
248+
const ac = new AbortController();
249+
ac.abort();
250+
try {
251+
await client.request({
252+
path: "/run/flow-id",
253+
method: "POST",
254+
body: JSON.stringify({
255+
input_type: "chat",
256+
output_type: "chat",
257+
input_value: "Hello, world!",
258+
}),
259+
headers: new Headers(),
260+
signal: ac.signal,
261+
});
262+
assert.fail("Expected an error to be thrown");
263+
} catch (error) {
264+
assert.ok(error instanceof DOMException);
265+
assert.equal(error.message, ac.signal.reason.message);
266+
}
267+
});
233268
});
234269
});
235270

@@ -268,16 +303,16 @@ describe("LangflowClient", () => {
268303
baseUrl,
269304
fetch: fetcher,
270305
});
271-
const response = await client.request(
272-
"/run/flow-id",
273-
"POST",
274-
JSON.stringify({
306+
const response = await client.request({
307+
path: "/run/flow-id",
308+
method: "POST",
309+
body: JSON.stringify({
275310
input_type: "chat",
276311
output_type: "chat",
277312
input_value: "Hello, world!",
278313
}),
279-
new Headers()
280-
);
314+
headers: new Headers(),
315+
});
281316
assert.deepEqual(response, { session_id: "session-id", outputs: [] });
282317
});
283318

@@ -295,16 +330,16 @@ describe("LangflowClient", () => {
295330
apiKey,
296331
fetch: fetcher,
297332
});
298-
await client.request(
299-
"/run/flow-id",
300-
"POST",
301-
JSON.stringify({
333+
await client.request({
334+
path: "/run/flow-id",
335+
method: "POST",
336+
body: JSON.stringify({
302337
input_type: "chat",
303338
output_type: "chat",
304339
input_value: "Hello, world!",
305340
}),
306-
new Headers()
307-
);
341+
headers: new Headers(),
342+
});
308343
});
309344

310345
it("throws a LangflowError if the response is not ok", async () => {
@@ -321,16 +356,16 @@ describe("LangflowClient", () => {
321356
});
322357

323358
try {
324-
await client.request(
325-
"/run/flow-id",
326-
"POST",
327-
JSON.stringify({
359+
await client.request({
360+
path: "/run/flow-id",
361+
method: "POST",
362+
body: JSON.stringify({
328363
input_type: "chat",
329364
output_type: "chat",
330365
input_value: "Hello, world!",
331366
}),
332-
new Headers()
333-
);
367+
headers: new Headers(),
368+
});
334369
assert.fail("Expected an error to be thrown");
335370
} catch (error) {
336371
assert.ok(error instanceof LangflowError);

src/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ export interface FlowRequestOptions {
3333
input_value: string;
3434
tweaks?: Tweaks;
3535
session_id?: string;
36+
signal?: AbortSignal;
3637
}
3738

3839
export type Tweak = Record<string, string | number | null | boolean>;

0 commit comments

Comments
 (0)