Skip to content

Commit 97c903b

Browse files
committed
Docs for queues
1 parent c3999c8 commit 97c903b

File tree

3 files changed

+85
-0
lines changed

3 files changed

+85
-0
lines changed

docs/source/includes/_faq.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,8 @@ and only if a workflow instance was created with a version of `>= 2` will `Activ
7575

7676
This kind of check is understandable for simple changes, but it becomes hard and a source of bugs for more complicated workflows. Therefore for now versioning is not supported and the guidance is to rely on **side-by-side** deployments. See also Azure's [Durable Functions](https://docs.microsoft.com/en-us/azure/azure-functions/durable/durable-functions-versioning) documentation for the same topic.
7777

78+
In addition to side-by-side deployments, you can use [Queues](#queues) to route workflows to different workers based on their version.
79+
7880
## How to safely upgrade?
7981

8082
All backend implementations have limited support for migrations which by default are automatically executed when a backend is started. This generally assumes only a single running worker. If you use multiple workers, you need to synchronize migration execution yourself.

docs/source/includes/_guide.md

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,52 @@ func Workflow2(ctx workflow.Context, msg string) (string, error) {
155155

156156
If you need to run any activities or make calls using `workflow.Context` you need to create a new context with `workflow.NewDisconnectedContext`, since the original context is canceled at this point.
157157

158+
## Workers
159+
160+
```go
161+
defaultWorker := worker.New(mb, &worker.Options{})
162+
163+
workflowWorker := worker.NewWorkflowWorker(b, &worker.WorkflowWorkerOptions{})
164+
165+
activityWorker := worker.NewActivityWorker(b, &worker.ActivityWorkerOptions{})
166+
```
167+
168+
There are three different types of workers:
169+
170+
- the default worker is a combined worker that listens to both workflow and activity queues
171+
- a workflow worker that only listens to workflow queues
172+
- an activity worker that only listens to activity queues
173+
174+
```go
175+
defaultWorker.RegisterWorkflow(Workflow1)
176+
defaultWorker.RegisterActivity(Activity1)
177+
178+
ctx, cancel := context.WithCancel(context.Background())
179+
defaultWorker.Start(ctx)
180+
181+
cancel()
182+
defaultWorker.WaitForCompletion()
183+
```
184+
185+
All workers have the same simple interface. You can register workflows and activities, start the worker, and when shutting down wait for all pending tasks to be finished.
186+
187+
## Queues
188+
189+
Workers can pull workflow and activity tasks from different queues. By default workers listen to two queues:
190+
191+
- `default`
192+
- `_system_` for system workflows and activities
193+
194+
For now, every worker will _always_ pull from `_system_`, but you can configure other queues you want to listen to. All worker options `struct`s take a `Queues` option.
195+
196+
When starting workflows, creating sub-workflow instances, or scheduling activities you can pass a queue you want the task to be scheduled on.
197+
198+
The default behavior if no explicit queue is given:
199+
200+
- **Starting a workflow**: the default queue is `default`.
201+
- **Creating a sub-workflow instance**: the default behavior is to inherit the queue from the parent workflow instance.
202+
- **Scheduling an activity**: the default behavior is to inherit the queue from the parent workflow instance.
203+
158204
## Executing activities
159205

160206
```go
@@ -168,6 +214,23 @@ log.Println(r1)
168214

169215
From a workflow, call `workflow.ExecuteActivity` to execute an activity. The call returns a `Future[T]` you can await to get the result or any error it might return.
170216

217+
<div style="clear: both"></div>
218+
219+
### Executing activities on a specific queue
220+
221+
```go
222+
r1, err := workflow.ExecuteActivity[int](ctx, workflow.ActivityOptions{
223+
Queue: "my-queue",
224+
}, Activity1, 35, 12, nil, "test").Get(ctx)
225+
if err != nil {
226+
panic("error getting activity 1 result")
227+
}
228+
229+
log.Println(r1)
230+
```
231+
232+
<div style="clear: both"></div>
233+
171234
### Canceling activities
172235

173236
Canceling activities is not supported at this time.
@@ -275,6 +338,22 @@ func SubWorkflow(ctx workflow.Context, msg string) (int, error) {
275338

276339
Call `workflow.CreateSubWorkflowInstance` to start a sub-workflow. The returned `Future` will resolve once the sub-workflow has finished.
277340

341+
<div style="clear: both"></div>
342+
343+
### Executing sub-workflows on a specific queue
344+
345+
```go
346+
result, err := workflow.CreateSubWorkflowInstance[int]
347+
ctx, workflow.SubWorkflowInstanceOptions{
348+
Queue: "my-queue",
349+
}, SubWorkflow, "some input").Get(ctx)
350+
if err != nil {
351+
return errors.Wrap(err, "could not get sub workflow result")
352+
}
353+
```
354+
355+
<div style="clear: both"></div>
356+
278357
### Canceling sub-workflows
279358

280359
Similar to timer cancellation, you can pass a cancelable context to `CreateSubWorkflowInstance` and cancel the sub-workflow that way. Reacting to the cancellation is the same as canceling a workflow via the `Client`. See [Canceling workflows](#canceling-workflows) for more details.

docs/source/includes/_samples.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@ Shows how to handle errors in workflows and activities.
3636

3737
Demonstrates sub-workflow and activity retries.
3838

39+
## Queues
40+
41+
Shows how to queue workflows and activities to different queues and how to create workers that only listen for specific queues or specific tasks (workflows or activities).
42+
3943
## Scale
4044

4145
Simple sample with a split worker and "starter" process.

0 commit comments

Comments
 (0)