Skip to content

Commit 56de3f6

Browse files
committed
add stream
1 parent 4201564 commit 56de3f6

File tree

4 files changed

+295
-3
lines changed

4 files changed

+295
-3
lines changed

examples/stream.ts

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
#!/usr/bin/env -S npm run tsn -T
2+
3+
import { BrowserUse } from 'browser-use-sdk';
4+
5+
async function main() {
6+
// gets API Key from environment variable BROWSER_USE_API_KEY
7+
const browseruse = new BrowserUse();
8+
9+
console.log('Creating task and starting stream...\n');
10+
11+
// Create a task and get the stream
12+
const stream = browseruse.tasks.stream({
13+
task: 'What is the weather in San Francisco?',
14+
});
15+
16+
// Get a reader from the stream
17+
const reader = stream.getReader();
18+
const decoder = new TextDecoder();
19+
20+
try {
21+
// Read the stream chunk by chunk
22+
while (true) {
23+
const { done, value } = await reader.read();
24+
25+
if (done) {
26+
console.log('\nStream completed');
27+
break;
28+
}
29+
30+
// Decode the chunk and parse the Server-Sent Events format
31+
const chunk = decoder.decode(value, { stream: true });
32+
const lines = chunk.split('\n');
33+
34+
for (const line of lines) {
35+
if (line.startsWith('event: ')) {
36+
const event = line.slice(7);
37+
process.stdout.write(`\n[${event}] `);
38+
} else if (line.startsWith('data: ')) {
39+
const data = line.slice(6);
40+
if (data.trim() && data !== '{}') {
41+
try {
42+
const parsed = JSON.parse(data) as BrowserUse.TaskView;
43+
44+
process.stdout.write(`${parsed.status}`);
45+
if (parsed.sessionLiveUrl) {
46+
process.stdout.write(` | Live URL: ${parsed.sessionLiveUrl}`);
47+
}
48+
49+
if (parsed.steps.length > 0) {
50+
const latestStep = parsed.steps[parsed.steps.length - 1];
51+
process.stdout.write(` | ${latestStep!.nextGoal}`);
52+
}
53+
54+
if (parsed.status === 'finished') {
55+
process.stdout.write(`\n\nOUTPUT: ${parsed.doneOutput}`);
56+
// Close the reader and exit the main loop when task is finished
57+
reader.releaseLock();
58+
return;
59+
}
60+
} catch (e) {
61+
process.stdout.write(`Raw data: ${data}`);
62+
}
63+
}
64+
}
65+
}
66+
}
67+
} catch (error) {
68+
console.error('Error reading stream:', error);
69+
} finally {
70+
reader.releaseLock();
71+
}
72+
}
73+
74+
main().catch(console.error);

src/lib/stream.ts

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
import type { TaskView, TaskStepView } from '../resources/tasks';
2+
import { ExhaustiveSwitchCheck } from './types';
3+
4+
export type ReducerEvent = TaskView | null;
5+
6+
export type BrowserState = Readonly<{
7+
taskId: string;
8+
sessionId: string;
9+
10+
liveUrl: string | null;
11+
12+
steps: ReadonlyArray<TaskStepView>;
13+
}> | null;
14+
15+
type BrowserAction = {
16+
kind: 'status';
17+
status: TaskView;
18+
};
19+
20+
export function reducer(state: BrowserState, action: BrowserAction): [BrowserState, ReducerEvent] {
21+
switch (action.kind) {
22+
case 'status': {
23+
// INIT
24+
25+
if (state == null) {
26+
const liveUrl = action.status.sessionLiveUrl ?? null;
27+
28+
const state: BrowserState = {
29+
taskId: action.status.id,
30+
sessionId: action.status.sessionId,
31+
liveUrl: liveUrl,
32+
steps: action.status.steps,
33+
};
34+
35+
return [state, action.status];
36+
}
37+
38+
// UPDATE
39+
40+
const liveUrl = action.status.sessionLiveUrl ?? null;
41+
const steps: TaskStepView[] = [...state.steps];
42+
43+
if (action.status.steps != null) {
44+
const newSteps = action.status.steps.slice(state.steps.length);
45+
46+
for (const step of newSteps) {
47+
steps.push(step);
48+
}
49+
}
50+
51+
const newState: BrowserState = { ...state, liveUrl, steps };
52+
53+
// CHANGES
54+
55+
if ((state.liveUrl == null && liveUrl != null) || state.steps.length !== steps.length) {
56+
const update: ReducerEvent = {
57+
...action.status,
58+
steps: steps,
59+
sessionLiveUrl: liveUrl,
60+
};
61+
62+
return [newState, update];
63+
}
64+
65+
return [newState, null];
66+
}
67+
default:
68+
throw new ExhaustiveSwitchCheck(action.kind);
69+
}
70+
}

src/lib/types.ts

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/**
2+
* Distributive Pick - does not collapse unions into a "shared type" only to
3+
* run Pick on it. Instead, it "picks" from each union item separately.
4+
*
5+
* See https://github.com/klimashkin/css-modules-theme/pull/8
6+
*
7+
* Example:
8+
* Pick<{ type: "pick" } | { type: "omit" }, "type">
9+
* produces { type: "pick" | "omit" }
10+
*
11+
* UnionPick<{ type: "pick" } | { type: "omit" }, "type">
12+
* produces { type: "pick" } | { type: "omit" }
13+
*/
14+
export type UnionPick<T, K extends keyof T> = T extends unknown ? Pick<T, K> : never;
15+
16+
/**
17+
* Like UnionPick, but for Omit
18+
*/
19+
export type UnionOmit<T, K extends keyof T> = T extends unknown ? Omit<T, K> : never;
20+
21+
/**
22+
* Utility type for properties that may be undefined until loaded.
23+
*/
24+
export type Loadable<T> = ({ loading: true } & { [K in keyof T]?: never }) | ({ loading: false } & T);
25+
26+
/**
27+
* Utility type that removes null fields from a type.
28+
*/
29+
export type DeepRequired<T> = {
30+
[P in keyof T]: Exclude<T[P], null>;
31+
};
32+
33+
/**
34+
* Makes a type check that is only valid when all cases of a switch
35+
* statement have been convered.
36+
*/
37+
export class ExhaustiveSwitchCheck extends Error {
38+
constructor(val: never) {
39+
super(`Unreachable case: ${JSON.stringify(val)}`);
40+
}
41+
}
42+
43+
/**
44+
* A utiliy type that lets you extract a union member by its `kind` property.
45+
*
46+
* @example
47+
*
48+
* type Shape =
49+
* | { kind: 'circle'; radius: number }
50+
* | { kind: 'square'; sideLength: number }
51+
* | { kind: 'rectangle'; width: number; height: number };
52+
*
53+
* type Circle = ExtractKind<Shape, 'circle'>; // { kind: 'circle'; radius: number }
54+
*/
55+
export type ExtractKind<T, K> = T extends { kind: K } ? T : never;
56+
57+
/**
58+
* A utiliy type that lets you extract a union member by its `ok` property.
59+
*
60+
* @example
61+
*
62+
* type Result = { ok: true; value: string } | { ok: false; error: string };
63+
*
64+
* type Ok = ExtractResult<Result, true>; // { ok: true; value: string }
65+
*/
66+
export type ExtractResult<T, K extends boolean> = T extends { ok: K } ? T : never;
67+
68+
/**
69+
* Creates a deep readonly object mutable.
70+
*/
71+
export type DeepMutable<T> = {
72+
-readonly [P in keyof T]: T[P] extends object ? DeepMutable<T[P]> : T[P];
73+
};

src/resources/tasks.ts

Lines changed: 78 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,18 @@
22

33
import type { ZodType } from 'zod';
44

5-
import { APIResource } from '../core/resource';
6-
import * as TasksAPI from './tasks';
75
import { APIPromise } from '../core/api-promise';
6+
import { APIResource } from '../core/resource';
87
import { RequestOptions } from '../internal/request-options';
98
import { path } from '../internal/utils/path';
109
import {
1110
parseStructuredTaskOutput,
1211
stringifyStructuredOutput,
13-
type TaskViewWithSchema,
1412
type TaskCreateParamsWithSchema,
13+
type TaskViewWithSchema,
1514
} from '../lib/parse';
15+
import { BrowserState, reducer } from '../lib/stream';
16+
import * as TasksAPI from './tasks';
1617

1718
export class Tasks extends APIResource {
1819
/**
@@ -90,6 +91,80 @@ export class Tasks extends APIResource {
9091
return this._client.post('/tasks', { body, ...options });
9192
}
9293

94+
private async *watch(
95+
data: TaskCreateParams,
96+
config: { interval: number },
97+
options?: RequestOptions,
98+
): AsyncGenerator<{ event: 'status'; data: TaskView }> {
99+
const tick: { current: number } = { current: 0 };
100+
const state: { current: BrowserState } = { current: null };
101+
102+
poll: do {
103+
if (options?.signal?.aborted) {
104+
break poll;
105+
}
106+
107+
tick.current++;
108+
109+
let status: TaskView;
110+
111+
// NOTE: We take action on each tick.
112+
if (state.current == null) {
113+
status = await this.create(data, options);
114+
} else {
115+
status = await this.retrieve(state.current.taskId);
116+
}
117+
118+
const [newState, event] = reducer(state.current, { kind: 'status', status });
119+
120+
if (event != null) {
121+
yield { event: 'status', data: event };
122+
123+
if (event.status === 'finished') {
124+
break;
125+
}
126+
}
127+
128+
state.current = newState;
129+
130+
await new Promise((resolve) => setTimeout(resolve, config.interval));
131+
} while (true);
132+
}
133+
134+
stream(body: TaskCreateParams, options?: RequestOptions) {
135+
const self = this;
136+
137+
const enc = new TextEncoder();
138+
139+
const stream = new ReadableStream<Uint8Array>({
140+
async start(controller) {
141+
// open the SSE stream quickly
142+
controller.enqueue(enc.encode(': connected\n\n'));
143+
144+
try {
145+
for await (const msg of self.watch(body, { interval: 500 }, options)) {
146+
if (options?.signal?.aborted) {
147+
break;
148+
}
149+
150+
const data = JSON.stringify(msg.data);
151+
152+
const payload = `event: ${msg.event}\ndata: ${data}\n\n`;
153+
controller.enqueue(enc.encode(payload));
154+
}
155+
156+
controller.enqueue(enc.encode('event: end\ndata: {}\n\n'));
157+
} catch (e) {
158+
controller.enqueue(enc.encode(`event: error\ndata: ${JSON.stringify({ message: String(e) })}\n\n`));
159+
} finally {
160+
controller.close();
161+
}
162+
},
163+
});
164+
165+
return stream;
166+
}
167+
93168
/**
94169
* Get detailed information about a specific AI agent task.
95170
*

0 commit comments

Comments
 (0)