Skip to content

Commit 533ee9a

Browse files
authored
Merge pull request #6 from browser-use/feat/structured
feat: fix stream and structured on stream
2 parents f730bbb + 1cd2157 commit 533ee9a

File tree

4 files changed

+98
-87
lines changed

4 files changed

+98
-87
lines changed

examples/stream-zod.ts

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
#!/usr/bin/env -S npm run tsn -T
2+
3+
import { BrowserUse } from 'browser-use-sdk';
4+
import z from 'zod';
5+
6+
const HackerNewsResponse = z.object({
7+
title: z.string(),
8+
url: z.string(),
9+
score: z.number(),
10+
});
11+
12+
const TaskOutput = z.object({
13+
posts: z.array(HackerNewsResponse),
14+
});
15+
16+
async function main() {
17+
// gets API Key from environment variable BROWSER_USE_API_KEY
18+
const browseruse = new BrowserUse();
19+
20+
console.log('Creating task and starting stream...\n');
21+
22+
// Create a task and get the stream
23+
const stream = browseruse.tasks.stream({
24+
task: 'Extract top 10 Hacker News posts and return the title, url, and score',
25+
structuredOutputJson: TaskOutput,
26+
});
27+
28+
for await (const msg of stream) {
29+
// Regular
30+
process.stdout.write(`${msg.data.status}`);
31+
if (msg.data.sessionLiveUrl) {
32+
process.stdout.write(` | Live URL: ${msg.data.sessionLiveUrl}`);
33+
}
34+
35+
if (msg.data.steps.length > 0) {
36+
const latestStep = msg.data.steps[msg.data.steps.length - 1];
37+
process.stdout.write(` | ${latestStep!.nextGoal}`);
38+
}
39+
40+
process.stdout.write('\n');
41+
42+
// Output
43+
if (msg.data.status === 'finished') {
44+
process.stdout.write(`\n\nOUTPUT:`);
45+
46+
for (const post of msg.data.doneOutput!.posts) {
47+
process.stdout.write(`\n - ${post.title} (${post.score}) ${post.url}`);
48+
}
49+
}
50+
}
51+
52+
console.log('\nStream completed');
53+
}
54+
55+
main().catch(console.error);

examples/stream.ts

Lines changed: 6 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -6,69 +6,18 @@ async function main() {
66
// gets API Key from environment variable BROWSER_USE_API_KEY
77
const browseruse = new BrowserUse();
88

9-
console.log('Creating task and starting stream...\n');
9+
console.log('Creating task and starting stream...');
1010

1111
// Create a task and get the stream
12-
const stream = browseruse.tasks.stream({
12+
const gen = browseruse.tasks.stream({
1313
task: 'What is the weather in San Francisco?',
1414
});
1515

16-
// Get a reader from the stream
17-
const reader = stream.getReader();
18-
const decoder = new TextDecoder();
19-
20-
try {
21-
// Read the stream chunk by chunk
22-
while (true) {
23-
const { done, value } = await reader.read();
24-
25-
if (done) {
26-
console.log('\nStream completed');
27-
break;
28-
}
29-
30-
// Decode the chunk and parse the Server-Sent Events format
31-
const chunk = decoder.decode(value, { stream: true });
32-
const lines = chunk.split('\n');
33-
34-
for (const line of lines) {
35-
if (line.startsWith('event: ')) {
36-
const event = line.slice(7);
37-
process.stdout.write(`\n[${event}] `);
38-
} else if (line.startsWith('data: ')) {
39-
const data = line.slice(6);
40-
if (data.trim() && data !== '{}') {
41-
try {
42-
const parsed = JSON.parse(data) as BrowserUse.TaskView;
43-
44-
process.stdout.write(`${parsed.status}`);
45-
if (parsed.sessionLiveUrl) {
46-
process.stdout.write(` | Live URL: ${parsed.sessionLiveUrl}`);
47-
}
48-
49-
if (parsed.steps.length > 0) {
50-
const latestStep = parsed.steps[parsed.steps.length - 1];
51-
process.stdout.write(` | ${latestStep!.nextGoal}`);
52-
}
53-
54-
if (parsed.status === 'finished') {
55-
process.stdout.write(`\n\nOUTPUT: ${parsed.doneOutput}`);
56-
// Close the reader and exit the main loop when task is finished
57-
reader.releaseLock();
58-
return;
59-
}
60-
} catch (e) {
61-
process.stdout.write(`Raw data: ${data}`);
62-
}
63-
}
64-
}
65-
}
66-
}
67-
} catch (error) {
68-
console.error('Error reading stream:', error);
69-
} finally {
70-
reader.releaseLock();
16+
for await (const msg of gen) {
17+
console.log(msg);
7118
}
19+
20+
console.log('\nStream completed');
7221
}
7322

7423
main().catch(console.error);
File renamed without changes.

src/resources/tasks.ts

Lines changed: 37 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -131,38 +131,45 @@ export class Tasks extends APIResource {
131131
} while (true);
132132
}
133133

134-
stream(body: TaskCreateParams, options?: RequestOptions) {
135-
const self = this;
136-
137-
const enc = new TextEncoder();
138-
139-
const stream = new ReadableStream<Uint8Array>({
140-
async start(controller) {
141-
// open the SSE stream quickly
142-
controller.enqueue(enc.encode(': connected\n\n'));
143-
144-
try {
145-
for await (const msg of self.watch(body, { interval: 500 }, options)) {
146-
if (options?.signal?.aborted) {
147-
break;
148-
}
149-
150-
const data = JSON.stringify(msg.data);
151-
152-
const payload = `event: ${msg.event}\ndata: ${data}\n\n`;
153-
controller.enqueue(enc.encode(payload));
154-
}
134+
stream<T extends ZodType>(
135+
body: TaskCreateParamsWithSchema<T>,
136+
options?: RequestOptions,
137+
): AsyncGenerator<{ event: 'status'; data: TaskViewWithSchema<T> }>;
138+
stream(
139+
body: TaskCreateParams,
140+
options?: RequestOptions,
141+
): AsyncGenerator<{ event: 'status'; data: TaskView }>;
142+
async *stream(
143+
body: TaskCreateParams | TaskCreateParamsWithSchema<ZodType>,
144+
options?: RequestOptions,
145+
): AsyncGenerator<unknown> {
146+
let req: TaskCreateParams;
147+
148+
if (
149+
'structuredOutputJson' in body &&
150+
body.structuredOutputJson != null &&
151+
typeof body.structuredOutputJson === 'object'
152+
) {
153+
req = {
154+
...body,
155+
structuredOutputJson: stringifyStructuredOutput(body.structuredOutputJson),
156+
};
157+
} else {
158+
req = body as TaskCreateParams;
159+
}
155160

156-
controller.enqueue(enc.encode('event: end\ndata: {}\n\n'));
157-
} catch (e) {
158-
controller.enqueue(enc.encode(`event: error\ndata: ${JSON.stringify({ message: String(e) })}\n\n`));
159-
} finally {
160-
controller.close();
161-
}
162-
},
163-
});
161+
for await (const msg of this.watch(req, { interval: 500 }, options)) {
162+
if (options?.signal?.aborted) {
163+
break;
164+
}
164165

165-
return stream;
166+
if (body.structuredOutputJson != null && typeof body.structuredOutputJson === 'object') {
167+
const parsed = parseStructuredTaskOutput<ZodType>(msg.data, body.structuredOutputJson);
168+
yield { event: 'status', data: parsed };
169+
} else {
170+
yield { event: 'status', data: msg.data };
171+
}
172+
}
166173
}
167174

168175
/**

0 commit comments

Comments
 (0)