diff --git a/examples/stream-zod.ts b/examples/stream-zod.ts new file mode 100755 index 0000000..9867523 --- /dev/null +++ b/examples/stream-zod.ts @@ -0,0 +1,55 @@ +#!/usr/bin/env -S npm run tsn -T + +import { BrowserUse } from 'browser-use-sdk'; +import z from 'zod'; + +const HackerNewsResponse = z.object({ + title: z.string(), + url: z.string(), + score: z.number(), +}); + +const TaskOutput = z.object({ + posts: z.array(HackerNewsResponse), +}); + +async function main() { + // gets API Key from environment variable BROWSER_USE_API_KEY + const browseruse = new BrowserUse(); + + console.log('Creating task and starting stream...\n'); + + // Create a task and get the stream + const stream = browseruse.tasks.stream({ + task: 'Extract top 10 Hacker News posts and return the title, url, and score', + structuredOutputJson: TaskOutput, + }); + + for await (const msg of stream) { + // Regular + process.stdout.write(`${msg.data.status}`); + if (msg.data.sessionLiveUrl) { + process.stdout.write(` | Live URL: ${msg.data.sessionLiveUrl}`); + } + + if (msg.data.steps.length > 0) { + const latestStep = msg.data.steps[msg.data.steps.length - 1]; + process.stdout.write(` | ${latestStep!.nextGoal}`); + } + + process.stdout.write('\n'); + + // Output + if (msg.data.status === 'finished') { + process.stdout.write(`\n\nOUTPUT:`); + + for (const post of msg.data.doneOutput!.posts) { + process.stdout.write(`\n - ${post.title} (${post.score}) ${post.url}`); + } + } + } + + console.log('\nStream completed'); +} + +main().catch(console.error); diff --git a/examples/stream.ts b/examples/stream.ts index d04fbfb..6c38fad 100755 --- a/examples/stream.ts +++ b/examples/stream.ts @@ -6,69 +6,18 @@ async function main() { // gets API Key from environment variable BROWSER_USE_API_KEY const browseruse = new BrowserUse(); - console.log('Creating task and starting stream...\n'); + console.log('Creating task and starting stream...'); // Create a task and get the stream - const stream = browseruse.tasks.stream({ + const gen = browseruse.tasks.stream({ task: 'What is the weather in San Francisco?', }); - // Get a reader from the stream - const reader = stream.getReader(); - const decoder = new TextDecoder(); - - try { - // Read the stream chunk by chunk - while (true) { - const { done, value } = await reader.read(); - - if (done) { - console.log('\nStream completed'); - break; - } - - // Decode the chunk and parse the Server-Sent Events format - const chunk = decoder.decode(value, { stream: true }); - const lines = chunk.split('\n'); - - for (const line of lines) { - if (line.startsWith('event: ')) { - const event = line.slice(7); - process.stdout.write(`\n[${event}] `); - } else if (line.startsWith('data: ')) { - const data = line.slice(6); - if (data.trim() && data !== '{}') { - try { - const parsed = JSON.parse(data) as BrowserUse.TaskView; - - process.stdout.write(`${parsed.status}`); - if (parsed.sessionLiveUrl) { - process.stdout.write(` | Live URL: ${parsed.sessionLiveUrl}`); - } - - if (parsed.steps.length > 0) { - const latestStep = parsed.steps[parsed.steps.length - 1]; - process.stdout.write(` | ${latestStep!.nextGoal}`); - } - - if (parsed.status === 'finished') { - process.stdout.write(`\n\nOUTPUT: ${parsed.doneOutput}`); - // Close the reader and exit the main loop when task is finished - reader.releaseLock(); - return; - } - } catch (e) { - process.stdout.write(`Raw data: ${data}`); - } - } - } - } - } - } catch (error) { - console.error('Error reading stream:', error); - } finally { - reader.releaseLock(); + for await (const msg of gen) { + console.log(msg); } + + console.log('\nStream completed'); } main().catch(console.error); diff --git a/examples/structured-output.ts b/examples/zod.ts similarity index 100% rename from examples/structured-output.ts rename to examples/zod.ts diff --git a/src/resources/tasks.ts b/src/resources/tasks.ts index 02d7c36..9b3e6ea 100644 --- a/src/resources/tasks.ts +++ b/src/resources/tasks.ts @@ -131,38 +131,45 @@ export class Tasks extends APIResource { } while (true); } - stream(body: TaskCreateParams, options?: RequestOptions) { - const self = this; - - const enc = new TextEncoder(); - - const stream = new ReadableStream({ - async start(controller) { - // open the SSE stream quickly - controller.enqueue(enc.encode(': connected\n\n')); - - try { - for await (const msg of self.watch(body, { interval: 500 }, options)) { - if (options?.signal?.aborted) { - break; - } - - const data = JSON.stringify(msg.data); - - const payload = `event: ${msg.event}\ndata: ${data}\n\n`; - controller.enqueue(enc.encode(payload)); - } + stream( + body: TaskCreateParamsWithSchema, + options?: RequestOptions, + ): AsyncGenerator<{ event: 'status'; data: TaskViewWithSchema }>; + stream( + body: TaskCreateParams, + options?: RequestOptions, + ): AsyncGenerator<{ event: 'status'; data: TaskView }>; + async *stream( + body: TaskCreateParams | TaskCreateParamsWithSchema, + options?: RequestOptions, + ): AsyncGenerator { + let req: TaskCreateParams; + + if ( + 'structuredOutputJson' in body && + body.structuredOutputJson != null && + typeof body.structuredOutputJson === 'object' + ) { + req = { + ...body, + structuredOutputJson: stringifyStructuredOutput(body.structuredOutputJson), + }; + } else { + req = body as TaskCreateParams; + } - controller.enqueue(enc.encode('event: end\ndata: {}\n\n')); - } catch (e) { - controller.enqueue(enc.encode(`event: error\ndata: ${JSON.stringify({ message: String(e) })}\n\n`)); - } finally { - controller.close(); - } - }, - }); + for await (const msg of this.watch(req, { interval: 500 }, options)) { + if (options?.signal?.aborted) { + break; + } - return stream; + if (body.structuredOutputJson != null && typeof body.structuredOutputJson === 'object') { + const parsed = parseStructuredTaskOutput(msg.data, body.structuredOutputJson); + yield { event: 'status', data: parsed }; + } else { + yield { event: 'status', data: msg.data }; + } + } } /**