Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions batch-queue/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
This sample shows how to implement a workflow that simulates a queue, using the event history and workflow input. It continuously listens for incoming tasks via a channel, while also listening on a different channel to understand when to write a batch. The tasks are sent from a different workflow through Temporal signals and a repeating timer writes on the other channel to signal the workflow when to write a batch.

The workflow also uses continue-as-new to avoid large history size caused by the timers and signals. This is done when a certain number of events (signal received, timer fired) have been recorded.

The example also illustrates how to simulate a ticker using timers.

### Steps to run this sample:
1) Run a [Temporal service](https://github.com/temporalio/samples-go/tree/main/#how-to-use).
2) Run the following command to start the worker
```
go run forever-batch-operations/worker/main.go
```
3) Run the following command to start the example
```
go run forever-batch-operations/starter/main.go
```

Note that the workflows will continue running even after you stop the worker.

Compare the values_received.txt and values_sent.txt files. Values should be written in the same order.
117 changes: 117 additions & 0 deletions batch-queue/accumulate_and_batch_workflow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package batch_queue

import (
"context"
"fmt"
"os"
"strings"
"time"

"go.temporal.io/sdk/workflow"
)

const (
SIGNAL_READ_VALS = "read_vals"
SIGNAL_COMMIT_BATCH = "commit_batch"
)

func GetAccumulateAndBatchWorkflowID() string {
return "AccumulateAndBatchWorkflowID"
}

type WorkflowTicker struct {
d time.Duration
C workflow.Channel
}

func NewWorkflowTicker(ctx workflow.Context, d time.Duration) *WorkflowTicker {
wt := WorkflowTicker{
d: d,
C: workflow.NewChannel(ctx),
}

workflow.Go(ctx, func(ctx workflow.Context) {
for {
t := workflow.NewTimer(ctx, d)
err := t.Get(ctx, nil)
if err != nil {
workflow.GetLogger(ctx).Error("timer failed, restarting...", "error", err)
continue
}
wt.C.Send(ctx, nil)
}
})

return &wt
}

// AccumulateAndBatchWorkflow keeps accumulating data via signals. A timer
// decides when the accumulated data should be processed as a batch.
func AccumulateAndBatchWorkflow(ctx workflow.Context, vals []string) error {

logger := workflow.GetLogger(ctx)
logger.Info("AccumulateAndBatchWorkflow workflow started", "vals", vals)

// listen for incoming data
readValsCh := workflow.GetSignalChannel(ctx, SIGNAL_READ_VALS)

// batch every 10 seconds
t := NewWorkflowTicker(ctx, 10*time.Second)

eventsReceived := 0

selector := workflow.NewSelector(ctx)
selector.AddReceive(readValsCh, func(c workflow.ReceiveChannel, more bool) {
eventsReceived += 1
var incoming string
c.Receive(ctx, &incoming)
vals = append(vals, incoming)
})
selector.AddReceive(t.C, func(c workflow.ReceiveChannel, more bool) {
eventsReceived += 1
c.Receive(ctx, nil)
logger.Info("commiting batch...")

ao := workflow.ActivityOptions{
StartToCloseTimeout: 1 * time.Minute,
}
actx := workflow.WithActivityOptions(ctx, ao)
err := workflow.ExecuteActivity(actx, WriteBatchToFile, vals).Get(ctx, nil)
if err != nil {
// Couldn't write the batch, so do not discard the data. Note that
// activity should be idempotent, so you may need to dedupe. This
// example doesn't dedupe.
logger.Error("failed to write batch", "error", err)
return
}

// Discard the written data.
vals = []string{}
})

// Batch when you received enough events. Using a low number just to
// illustrate the batching, you can go much higher.
for {
selector.Select(ctx)
if eventsReceived > 15 {
// Pass in the existing vals since the signal history is no longer
// available.
return workflow.NewContinueAsNewError(ctx, AccumulateAndBatchWorkflow, vals)
}
}
}

func WriteBatchToFile(ctx context.Context, vals []string) error {
// Write the values to this file. We can compare values here to values sent
// to the workflow to see the durability.
f, err := os.OpenFile("values_received.txt", os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return fmt.Errorf("failed to open file: %w", err)
}
defer f.Close()

f.WriteString(strings.Join(vals, "\n"))
f.WriteString("\n")

return nil
}
90 changes: 90 additions & 0 deletions batch-queue/signal_new_values_workflow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package batch_queue

import (
"context"
"fmt"
"math/rand/v2"
"os"
"strconv"
"time"

"go.temporal.io/sdk/workflow"
)

func GetSignalNewValuesWorkflowID() string {
return "SignalNewValuesWorkflowID"
}

// SignalNewValuesWorkflow is a workflow that keeps signaling new values to
// a forever-running workflow periodically. After each value is signaled, this
// workflow also writes it to a file. This file and the file where the batching
// workflow writes all values can be compared to see the durability.
func SignalNewValuesWorkflow(ctx workflow.Context) error {

logger := workflow.GetLogger(ctx)
logger.Info("SignalNewValuesWorkflow workflow started")

var sendValue int
sendValueFuture := workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} {
return rand.IntN(1_000)
})
err := sendValueFuture.Get(&sendValue)
if err != nil {
return err
}

// Note that to durably send, you can use activities. Here we don't handle
// that, just fail early.
for range 1000 {

var sendValue int
sendValueFuture := workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} {
return rand.IntN(1_000)
})
err := sendValueFuture.Get(&sendValue)
if err != nil {
return err
}

// The workflow we're signaling does continue-as-new, so do not pass a
// run ID here.
//
// We can also signal in an activity if we want retries.
err = workflow.SignalExternalWorkflow(ctx, GetAccumulateAndBatchWorkflowID(), "", SIGNAL_READ_VALS, strconv.Itoa(sendValue)).Get(ctx, nil)
if err != nil {
return err
}

ao := workflow.ActivityOptions{
StartToCloseTimeout: 1 * time.Minute,
}
actx := workflow.WithActivityOptions(ctx, ao)
err = workflow.ExecuteActivity(actx, WriteValToFile, sendValue).Get(ctx, nil)
if err != nil {
return err
}

err = workflow.Sleep(ctx, 1*time.Second)
if err != nil {
return err
}
}

// avoid big history size
return workflow.NewContinueAsNewError(ctx, SignalNewValuesWorkflow)
}

func WriteValToFile(ctx context.Context, val int) error {
// Write the values to this file. We can compare values here to values sent
// to the workflow to see the durability.
f, err := os.OpenFile("values_sent.txt", os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return fmt.Errorf("failed to open file: %w", err)
}
defer f.Close()

f.WriteString(strconv.Itoa(val))
f.WriteString("\n")

return nil
}
39 changes: 39 additions & 0 deletions batch-queue/starter/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package main

import (
"context"
"log"

"go.temporal.io/sdk/client"

batch_queue "github.com/temporalio/samples-go/batch-queue"
)

func main() {
// The client is a heavyweight object that should be created once per process.
c, err := client.Dial(client.Options{})
if err != nil {
log.Fatalln("Unable to create client", err)
}
defer c.Close()

// start forever batching workflow
workflowOptions := client.StartWorkflowOptions{
ID: batch_queue.GetAccumulateAndBatchWorkflowID(),
TaskQueue: "batch",
}
_, err = c.ExecuteWorkflow(context.Background(), workflowOptions, batch_queue.AccumulateAndBatchWorkflow, nil)
if err != nil {
log.Fatalln("Unable to execute workflow", err)
}

// start signaling workflow
workflowOptions = client.StartWorkflowOptions{
ID: batch_queue.GetSignalNewValuesWorkflowID(),
TaskQueue: "batch",
}
_, err = c.ExecuteWorkflow(context.Background(), workflowOptions, batch_queue.SignalNewValuesWorkflow)
if err != nil {
log.Fatalln("Unable to execute workflow", err)
}
}
54 changes: 54 additions & 0 deletions batch-queue/values_received.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
557
255
884
82
385
477
121
989
673
883
448
796
823
381
451
747
157
876
620
743
490
842
554
632
493
741
198
930
630
530
451
29
383
968
421
861
24
356
511
430
774
510
671
281
212
636
627
374
371
762
147
203
408
559
64 changes: 64 additions & 0 deletions batch-queue/values_sent.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
557
255
884
82
385
477
121
989
673
883
448
796
823
381
451
747
157
876
620
743
490
842
554
632
493
741
198
930
630
530
451
29
383
968
421
861
24
356
511
430
774
510
671
281
212
636
627
374
371
762
147
203
408
559
201
541
34
22
993
0
502
709
970
837
Loading
Loading