Skip to content

Commit 4e4beb0

Browse files
committed
result streaming
1 parent 99fbde6 commit 4e4beb0

File tree

3 files changed

+302
-6
lines changed

3 files changed

+302
-6
lines changed
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
---
2+
slug: result-streaming
3+
version: v1.518.0
4+
title: Result streaming
5+
tags: ['streaming', 'result']
6+
description: >
7+
We introduce native result streaming in 1.518.0. Returns a stream (any `AsyncGenerator` or `iter` compatible object) as a result OR to stream text before the result is fully returned. It works with Typescript (Bun, Deno, Nativets), Python and is compatible with agent workers.
8+
If not returning the stream directly, we introduce 2 new functions on our SDK: `wmill.streamResult(stream)` (TS) and `wmill.stream_result(stream)` (Python), to do it mid-script.
9+
10+
It's made with LLM action in mind because many scripts nowadays interact with LLM that have streaming response that can be returned in a streaming manner as a result. We will progressively refactor all our relevant hub scripts that are compatible with streaming to leverage this new capability.
11+
12+
The stream only exists while the job is in the queue. Afterwards, the full stream becomes the result (or added as the field "wm_stream" if there is already a result).
13+
features:
14+
- return stream objects to stream directly results
15+
- compatible with LLM sdks to stream their response as-is
16+
- new SDK functions to stream within the job
17+
- once job is finished, full stream becomes the result
18+
docs: /docs/core_concepts/jobs#result-streaming
19+
---

docs/core_concepts/20_jobs/index.mdx

Lines changed: 279 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
import DocCard from '@site/src/components/DocCard';
22

3+
import Tabs from '@theme/Tabs';
4+
import TabItem from '@theme/TabItem';
5+
36
# Jobs
47

58
A job represents a past, present or future "task" or "work" to be executed by a
@@ -104,6 +107,137 @@ Jobs take a JSON object as input which can be empty. That input is passed as the
104107

105108
If the payload contains keys that are not defined as parameters in the main function, they will be ignored. This allows you to handle arbitrary JSON payloads, as you can choose which keys to define as parameters in your script and process the data accordingly.
106109

110+
## Result
111+
112+
Jobs have as result the return of the main function serialized as a json object. We highly recommend to return small objects as they will be stored directly in the Database. For larger objects, use [Object Storage](../38_object_storage_in_windmill/index.mdx).
113+
114+
### Result of jobs that failed
115+
116+
If the jobs fail, it will have result an error object of the following shape:
117+
118+
```
119+
{
120+
"error": {
121+
"name": "ErrorName",
122+
"message": "Error message",
123+
"stack": "full stack"
124+
}
125+
}
126+
```
127+
128+
In python and typescript, and similarly for all languages, this is constructed by extracting those information from the native Exception and Error objects that are raised by the code.
129+
130+
### Result streaming
131+
132+
In Python and Typescript (bun, nativets, deno), it's possible to stream back the result as a text stream (any `AsyncGenerator<string>` or `iter` compatible object) as a result OR to stream text before the result is fully returned.
133+
If not returning the stream directly, we introduce 2 new functions on our SDK: `wmill.streamResult(stream)` (TS) and `wmill.stream_result(stream)` (Python), to do it prior to the return.
134+
135+
The stream only exists while the job is in the queue. Afterwards, the full stream becomes the result (or added as the field "wm_stream" if there is already a result).
136+
137+
#### Returning a stream directly
138+
139+
<Tabs className="unique-tabs">
140+
<TabItem value="bun" label="TypeScript" attributes={{className: "text-xs p-4 !mt-0 !ml-0"}}>
141+
142+
```typescript
143+
// can work with //native and recommended
144+
145+
async function* streamWeatherReport(): AsyncGenerator<string> {
146+
const reportLines = [
147+
'📍 Current Weather Report\n',
148+
'Location: San Francisco, CA\n\n',
149+
'🌤️ Conditions: Partly Cloudy\n',
150+
'🌡️ Temperature: 72°F (22°C)\n',
151+
'💨 Wind: 8 mph SW\n',
152+
'💧 Humidity: 65%\n',
153+
'👁️ Visibility: 10 miles\n\n',
154+
"📊 Today's Forecast:\n",
155+
'Morning: Sunny, 68°F\n',
156+
'Afternoon: Partly cloudy, 75°F\n',
157+
'Evening: Clear skies, 70°F\n',
158+
'Night: Cool and clear, 62°F\n\n',
159+
'🔮 Tomorrow: Sunny with highs near 78°F\n',
160+
'Perfect weather for outdoor activities! ☀️\n'
161+
];
162+
163+
for (const line of reportLines) {
164+
yield line;
165+
// Sleep between 200-500ms for natural reading pace
166+
await new Promise((resolve) => setTimeout(resolve, 200 + Math.random() * 300));
167+
}
168+
}
169+
170+
export async function main(x: string) {
171+
return streamWeatherReport();
172+
}
173+
```
174+
175+
</TabItem>
176+
<TabItem value="python" label="Python" attributes={{className: "text-xs p-4 !mt-0 !ml-0"}}>
177+
178+
```python
179+
180+
import time
181+
from typing import AsyncGenerator
182+
183+
def stream_weather_report():
184+
report_lines = [
185+
"📍 Current Weather Report\n",
186+
"Location: San Francisco, CA\n\n",
187+
"🌤️ Conditions: Partly Cloudy\n",
188+
"🌡️ Temperature: 72°F (22°C)\n",
189+
"💨 Wind: 8 mph SW\n",
190+
"💧 Humidity: 65%\n",
191+
"👁️ Visibility: 10 miles\n\n",
192+
"📊 Today's Forecast:\n",
193+
"Morning: Sunny, 68°F\n",
194+
"Afternoon: Partly cloudy, 75°F\n",
195+
"Evening: Clear skies, 70°F\n",
196+
"Night: Cool and clear, 62°F\n\n",
197+
"🔮 Tomorrow: Sunny with highs near 78°F\n",
198+
"Perfect weather for outdoor activities! ☀️\n"
199+
]
200+
201+
for line in report_lines:
202+
yield line
203+
# Sleep 0.2s for reading peace
204+
time.sleep(0.2)
205+
206+
def main() -> AsyncGenerator[str, None]:
207+
return stream_weather_report()
208+
```
209+
210+
</TabItem>
211+
</Tabs>
212+
213+
#### Proxy the stream before returning the result
214+
215+
<Tabs className="unique-tabs">
216+
<TabItem value="bun" label="TypeScript" attributes={{className: "text-xs p-4 !mt-0 !ml-0"}}>
217+
218+
```typescript
219+
// similar as above
220+
221+
export async function main(x: string) {
222+
await wmill.streamResult(streamWeatherReport());
223+
return { foo: 42 };
224+
}
225+
```
226+
227+
</TabItem>
228+
<TabItem value="python" label="Python" attributes={{className: "text-xs p-4 !mt-0 !ml-0"}}>
229+
230+
```python
231+
# similar as above
232+
233+
def main() -> AsyncGenerator[str, None]:
234+
wmill.stream_result(stream_weather_report())
235+
return { "foo": 42}
236+
```
237+
238+
</TabItem>
239+
</Tabs>
240+
107241
## Retention policy
108242

109243
The retention policy for jobs runs details varies depending on your team's [plan](/pricing):
@@ -116,7 +250,7 @@ The retention policy for jobs runs details varies depending on your team's [plan
116250

117251
You can set a custom retention period for the jobs runs details. The retention period can be configured in the [instance settings](../../advanced/18_instance_settings/index.mdx#retention-period-in-secs), in the "Core" tab.
118252

119-
![Set Retention Period](./set_retention_policy.png "Set Retention Period")
253+
![Set Retention Period](./set_retention_policy.png 'Set Retention Period')
120254

121255
<div className="grid grid-cols-2 gap-6 mb-4">
122256
<DocCard
@@ -126,6 +260,148 @@ You can set a custom retention period for the jobs runs details. The retention p
126260
/>
127261
</div>
128262

263+
## Job Progress Stream API
264+
265+
This section is relevant if you want to use the API of Windmill to build your own client or frontend that watch the progress of a job, retrieve its new logs, result stream and get the job at completion with its result and all job metadata.
266+
267+
Windmill provides an [SSE endpoint](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events) to watch a given job for new logs or completion. The API endpoint is [api/job/get/w/\{workspace\}/jobs_u/getupdate_sse/\{id\}](https://app.windmill.dev/openapi.html#tag/job/get/w/{workspace}/jobs_u/getupdate_sse/{id})
268+
269+
It takes as query args:
270+
271+
### Job progress SSE Query args
272+
273+
``
274+
running (optional boolean, default false): was the job already running ? Optimization for the backend to not waste time checking if the job is running if we know it's already the case
275+
log_offset (optional integer, default 0): what was the last log offset known by the client to start from when streaming new logs
276+
stream_offset (optional integer, default 0): what was the last log offset known by the client to start from when streaming the result stream
277+
get_progress (optional integer, default false): should the updates contains [explicit job progress](/docs/advanced//19_explicit_progress/index.mdx)
278+
only_result (optional integer, default false): should we only care about the result and ignore all else
279+
no_logs (optionbal boolean, default false): should we skip streaming logs
280+
281+
````
282+
283+
### Job Progress Event Response
284+
285+
The response from the SSE endpoint is a stream of JSON objects, each with a `type` field indicating the kind of event. The possible shapes are:
286+
287+
- **Update**
288+
289+
The updates will only contain new updates since the last one, no update will be sent if there are no updates to share, otherwise the maximum frequency at which updates are sent is every 100ms.
290+
291+
with only_result = false (default):
292+
```json
293+
{
294+
"type": "update",
295+
"running": true, // optional, boolean: whether the job is running
296+
"completed": false, // optional, boolean: whether the job is completed
297+
"new_logs": "string", // optional, string: new logs since last update
298+
"new_result_stream": "string", // optional, string: new result stream data since last update
299+
"log_offset": 123, // optional, integer: current log offset
300+
"stream_offset": 456, // optional, integer: current result stream offset
301+
"mem_peak": 789, // optional, integer: peak memory usage
302+
"progress": 50, // optional, integer: explicit job progress (0-100)
303+
"flow_status": {...}, // optional, JSON: status of the flow (raw JSON)
304+
"workflow_as_code_status": {...}, // optional, JSON: status of workflow as code (raw JSON)
305+
"job": {...}, // optional, JSON: job metadata
306+
}
307+
````
308+
309+
Once completed, the job field will contain the full completed job data, including its result but not only
310+
311+
```
312+
313+
- **workspace_id** (string): The unique identifier of the workspace where the job was run.
314+
- **id** (UUID): The unique identifier of the job.
315+
- **parent_job** (UUID, optional): The ID of the parent job, if this job was triggered by another job.
316+
- **created_by** (string): The user who created the job.
317+
- **created_at** (datetime): The timestamp when the job was created.
318+
- **started_at** (datetime, optional): The timestamp when the job started running.
319+
- **duration_ms** (integer): The duration of the job in milliseconds.
320+
- **success** (boolean): Whether the job completed successfully.
321+
- **script_hash** (string, optional): The hash of the script that was executed.
322+
- **script_path** (string, optional): The path to the script that was executed.
323+
- **args** (object, optional): The arguments passed to the job, as a JSON object.
324+
- **result** (object, optional): The result of the job, as a JSON object.
325+
- **result_columns** (array of strings, optional): The columns of the result, if applicable.
326+
- **logs** (string, optional): The logs generated by the job.
327+
- **deleted** (boolean): Whether the job has been deleted.
328+
- **canceled** (boolean): Whether the job was canceled.
329+
- **canceled_by** (string, optional): The user who canceled the job, if applicable.
330+
- **canceled_reason** (string, optional): The reason the job was canceled, if provided.
331+
- **job_kind** (enum): The kind of job (e.g., script, flow, etc.).
332+
- **schedule_path** (string, optional): The path of the schedule that triggered the job, if any.
333+
- **permissioned_as** (string): The permission context under which the job ran.
334+
- **flow_status** (object, optional): The status of the flow, as a JSON object.
335+
- **workflow_as_code_status** (object, optional): The status of the workflow as code, as a JSON object.
336+
- **is_flow_step** (boolean): Whether this job is a step in a flow.
337+
- **language** (enum, optional): The language of the script executed.
338+
- **is_skipped** (boolean): Whether this job was skipped.
339+
- **email** (string): The email of the user who created the job.
340+
- **visible_to_owner** (boolean): Whether the job is visible to the owner.
341+
- **mem_peak** (integer, optional): The peak memory usage during the job.
342+
- **tag** (string): The tag associated with the job.
343+
- **priority** (integer, optional): The priority of the job.
344+
- **labels** (object, optional): Arbitrary labels associated with the job, as a JSON object.
345+
346+
This structure provides a comprehensive record of a completed job, including its metadata, execution details, results, and status.
347+
348+
```
349+
350+
with only_result = true, the updates are much lighter.
351+
352+
```json
353+
{
354+
"type": "update",
355+
"running": true, // optional, boolean: whether the job is running
356+
"completed": false, // optional, boolean: whether the job is completed
357+
"new_result_stream": "string", // optional, string: new result stream data since last update
358+
"stream_offset": 456, // optional, integer: current result stream offset
359+
"only_result": {...} // optional, JSON: only the result, only
360+
}
361+
```
362+
363+
- **Error**
364+
365+
```json
366+
{
367+
"type": "error",
368+
"0": "Error message as string"
369+
}
370+
```
371+
372+
- **NotFound**
373+
374+
```json
375+
{
376+
"type": "notfound"
377+
}
378+
```
379+
380+
- **Timeout**
381+
382+
```json
383+
{
384+
"type": "timeout"
385+
}
386+
```
387+
388+
- **Ping**
389+
```json
390+
{
391+
"type": "ping"
392+
}
393+
```
394+
395+
**Notes:**
396+
397+
- All fields except `type` are optional and may be omitted if not relevant for the event.
398+
- The `update` event contains job status, logs, result stream, and progress information.
399+
- The `error` event contains a string error message.
400+
- The `notfound`, `timeout`, and `ping` events are signals with no additional data.
401+
402+
```
403+
404+
129405
## High priority jobs
130406
131407
High priority jobs are jobs that are given a `priority` value between 1 and 100. Jobs with a higher priority value will be given precedence over jobs with a lower priority value in the job queue.
@@ -181,4 +457,5 @@ You can choose to use S3, Azure Blob Storage, AWS OIDC or Google Cloud Storage.
181457
| Field | Description |
182458
|-------|-------------|
183459
| Bucket | The name of your Google Cloud Storage bucket |
184-
| Service Account Key | The service account key for your Google Cloud Storage bucket in JSON format |
460+
| Service Account Key | The service account key for your Google Cloud Storage bucket in JSON format |
461+
```

docs/core_concepts/2_variables_and_secrets/index.mdx

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,20 +37,20 @@ wmill.set_variable("u/user/foo", value)
3737
```
3838

3939
</TabItem>
40-
<TabItem value="deno" label="TypeScript (Deno)" attributes={{className: "text-xs p-4 !mt-0 !ml-0"}}>
40+
<TabItem value="bun" label="TypeScript (Bun)" attributes={{className: "text-xs p-4 !mt-0 !ml-0"}}>
4141

4242
```ts
43-
import { getVariable, setVariable } from 'npm:windmill-client@1';
43+
import { getVariable, setVariable } from 'windmill-client';
4444

4545
getVariable('u/user/foo');
4646
setVariable('u/user/foo', value);
4747
```
4848

4949
</TabItem>
50-
<TabItem value="bun" label="TypeScript (Bun)" attributes={{className: "text-xs p-4 !mt-0 !ml-0"}}>
50+
<TabItem value="deno" label="TypeScript (Deno)" attributes={{className: "text-xs p-4 !mt-0 !ml-0"}}>
5151

5252
```ts
53-
import { getVariable, setVariable } from 'windmill-client';
53+
import { getVariable, setVariable } from 'npm:windmill-client@1';
5454

5555
getVariable('u/user/foo');
5656
setVariable('u/user/foo', value);

0 commit comments

Comments
 (0)