Skip to content

Commit 1b92568

Browse files
adds basic example for batch operations (#226)
Updates some examples for batch operations
1 parent 61b90c9 commit 1b92568

File tree

1 file changed

+115
-5
lines changed

1 file changed

+115
-5
lines changed

docs/02-use-cases/06-batch-job.md

Lines changed: 115 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,120 @@ title: Batch job
44
permalink: /docs/use-cases/batch-job
55
---
66

7-
# Batch job
8-
9-
A lot of batch jobs are not pure data manipulation programs. For those, the existing big data frameworks are the best fit.
7+
## Batch job
8+
A lot of batch jobs are not pure data manipulation programs. For those, the existing big data frameworks are the best fit. Cadence is a more general orchestration mechanism and doesn't provide native SQL or worker data shuffle functionality out of the box, engineers wishing to rely on these would need to implement this functionality themselves.
109
But if processing a record requires external API calls that might fail and potentially take a long time, Cadence might be preferable.
1110

12-
One of our internal Uber customers use Cadence for end of month statement generation. Each statement requires calls to multiple
13-
microservices and some statements can be really large. Cadence was chosen because it provides hard guarantees around durability of the financial data and seamlessly deals with long running operations, retries, and intermittent failures.
11+
#### Use Case:
12+
13+
One of our internal Uber customers use Cadence for end of month statement generation. Each statement requires calls to multiple microservices and some statements can be really large. Cadence was chosen because it provides hard guarantees around durability of the financial data and seamlessly deals with long running operations, retries, and intermittent failures.
14+
15+
## Batch jobs with heartbeating
16+
17+
Cadence is able to coordinate, restart and track progress of large batch jobs by keeping track of their incremental progress and allowing them to resume if they're stopped for any reason. This predominantly relies on the `heartbeat` feature and activity retries.
18+
19+
This is used in production for customers who wish to work through large batch workloads
20+
21+
### Considerations before starting
22+
23+
Heartbeating cadence activities are activities who emit their progress at an appropriate interval (usually every few seconds) indicating where they are up to. Optionally, they may use progress information (like an offset number or iterator) to resume their progress. However, this necessarily implies that:
24+
25+
- If activities get restarted, they may redo some work, so this is not suitable for non-idempotent operations.
26+
- The activity will be handling all the progress, so apart from heartbeat information, debugging about the granular operations being performed is not necessarily visible as compared by doing each operation in a distinct activity.
27+
28+
### What problems this solves
29+
30+
- This is for high-throughput operations where work may able to fit into a single long-running activity, or partitioned across multiple activities which can run for a longer duration.
31+
- This addresses problems customers may have running workflows which are returning large blocks of data where the data is hitting up against Cadence activity limits
32+
- This is a good way avoid hitting Cadence workflow history limits Cadence History entries (since this only is a single activity which is long running vs many small short-lived activities).
33+
34+
### High level concept:
35+
36+
The idea is to create an activity which will handle a lot of records and record its progress:
37+
38+
```golang
39+
func (a *ABatchActivity) Execute(ctx context.Context, params Params) error {
40+
41+
// in this case this is just a struct with a mutex protecting it
42+
var state State
43+
44+
// when starting the activity, check at start time for a previous iteration
45+
err := activity.GetHeartbeatDetails(ctx, &state)
46+
if err != nil {
47+
return err
48+
}
49+
log.Info("resuming from a previous state", zap.Any("state", state))
50+
}
51+
}
52+
53+
// in the background, every 5 seconds, emit where we're up to
54+
// so the cadence server knows the activity is still alive, and
55+
// put the progress in the recordheartbeat call so it can be pulled in
56+
// if we have to restart
57+
go func() {
58+
ticker := time.NewTicker(time.Seconds * 5)
59+
defer ticker.Stop()
60+
for {
61+
select {
62+
case <-ctx.Done():
63+
return
64+
reportState := state.Clone()
65+
activity.RecordHeartbeat(ctx, reportState)
66+
}
67+
}
68+
}()
69+
70+
// here in this example, we may assume this is several thousand
71+
// records which will take a while to get through. Importantly,
72+
// if we have to restart, don't start from the beginning, use the
73+
// offset so we don't redo work.
74+
batchDataToProcess := a.DB.GetDataFromOffset(state.GetOffset())
75+
76+
// go through and process all the records through whatever side-effects are appropriate
77+
for i := range batchDataToProcess {
78+
a.rpc.UpdateRecord(i)
79+
state.Finished(i)
80+
}
81+
return nil
82+
}
83+
```
84+
85+
And run this activity in a workflow with settings:
86+
```golang
87+
// an example configuration for setting activity options to
88+
// retry if the activity gets stopped for any reason
89+
func setActivityOptions(ctx workflow.Context) workflow.Context {
90+
91+
ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
92+
ScheduleToStartTimeout: time.Minute, // how long we expect this task to sit waiting to be picked up.
93+
// Typically subseconds unless heavily contended
94+
StartToCloseTimeout: time.Hour, // however long this activity is expected to take, maximum, from end to end.
95+
// This is workload dependent
96+
HeartbeatTimeout: time.Second * 30, // How long we should wait before deciding to restart the activity because the
97+
// background thread hasn't checked in. Half a a minute is probably a bit
98+
// overgenous. In the example above we're picking 5 seconds to heartbeat
99+
100+
// It is unrealistic to assume that a long running activity will succeed
101+
// so add a retry-policy to restart it when there's a failure.
102+
RetryPolicy: &workflow.RetryPolicy{
103+
InitialInterval: time.Second,
104+
MaximumInterval: time.Minute * 10,
105+
},
106+
})
107+
return ctx
108+
}
109+
110+
func Workflow(ctx workflow.Context, config entity.Config) error {
111+
112+
log := workflow.GetLogger(ctx)
113+
ctx = setActivityOptions(ctx, config)
114+
err := workflow.ExecuteActivity(ctx, ABatchActivityName, config).Get(ctx, nil)
115+
if err != nil {
116+
log.Error("failed to execute activity", zap.Error(err), zap.Any("input", config))
117+
118+
}
119+
120+
log.Info("Workflow complete")
121+
return nil
122+
}
123+
```

0 commit comments

Comments
 (0)