Skip to content

Commit 9cbe554

Browse files
fix some bad merging in the batch examples (#227)
1 parent 1b92568 commit 9cbe554

File tree

1 file changed

+28
-27
lines changed

1 file changed

+28
-27
lines changed

docs/02-use-cases/06-batch-job.md

Lines changed: 28 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ Heartbeating cadence activities are activities who emit their progress at an app
2929

3030
- This is for high-throughput operations where work may able to fit into a single long-running activity, or partitioned across multiple activities which can run for a longer duration.
3131
- This addresses problems customers may have running workflows which are returning large blocks of data where the data is hitting up against Cadence activity limits
32-
- This is a good way avoid hitting Cadence workflow history limits Cadence History entries (since this only is a single activity which is long running vs many small short-lived activities).
32+
- Because heartbeat data is only temporarily recorded, this is a good way avoid hitting Cadence workflow limits on the number of history events: there only is a single activity which is long running vs many small short-lived activities (each of which needs multiple history events).
3333

3434
### High level concept:
3535

@@ -39,16 +39,15 @@ The idea is to create an activity which will handle a lot of records and record
3939
func (a *ABatchActivity) Execute(ctx context.Context, params Params) error {
4040

4141
// in this case this is just a struct with a mutex protecting it
42-
var state State
43-
44-
// when starting the activity, check at start time for a previous iteration
45-
err := activity.GetHeartbeatDetails(ctx, &state)
46-
if err != nil {
47-
return err
48-
}
49-
log.Info("resuming from a previous state", zap.Any("state", state))
50-
}
51-
}
42+
var state State
43+
if activity.HasHeartbeatDetails(ctx) {
44+
// when starting the activity, check at start time for a previous iteration
45+
err := activity.GetHeartbeatDetails(ctx, &state)
46+
if err != nil {
47+
return err
48+
}
49+
log.Info("resuming from a previous state", zap.Any("state", state))
50+
}
5251

5352
// in the background, every 5 seconds, emit where we're up to
5453
// so the cadence server knows the activity is still alive, and
@@ -61,6 +60,7 @@ func (a *ABatchActivity) Execute(ctx context.Context, params Params) error {
6160
select {
6261
case <-ctx.Done():
6362
return
63+
case <-ticker.C:
6464
reportState := state.Clone()
6565
activity.RecordHeartbeat(ctx, reportState)
6666
}
@@ -71,14 +71,14 @@ func (a *ABatchActivity) Execute(ctx context.Context, params Params) error {
7171
// records which will take a while to get through. Importantly,
7272
// if we have to restart, don't start from the beginning, use the
7373
// offset so we don't redo work.
74-
batchDataToProcess := a.DB.GetDataFromOffset(state.GetOffset())
74+
batchDataToProcess := a.DB.GetDataFromOffset(state.GetOffset())
7575

7676
// go through and process all the records through whatever side-effects are appropriate
7777
for i := range batchDataToProcess {
7878
a.rpc.UpdateRecord(i)
7979
state.Finished(i)
8080
}
81-
return nil
81+
return nil
8282
}
8383
```
8484

@@ -88,36 +88,37 @@ And run this activity in a workflow with settings:
8888
// retry if the activity gets stopped for any reason
8989
func setActivityOptions(ctx workflow.Context) workflow.Context {
9090

91-
ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
92-
ScheduleToStartTimeout: time.Minute, // how long we expect this task to sit waiting to be picked up.
91+
ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
92+
ScheduleToStartTimeout: time.Minute, // how long we expect this task to sit waiting to be picked up.
9393
// Typically subseconds unless heavily contended
94-
StartToCloseTimeout: time.Hour, // however long this activity is expected to take, maximum, from end to end.
94+
StartToCloseTimeout: time.Hour, // however long this activity is expected to take, maximum, from end to end.
9595
// This is workload dependent
96-
HeartbeatTimeout: time.Second * 30, // How long we should wait before deciding to restart the activity because the
96+
HeartbeatTimeout: time.Second * 30, // How long we should wait before deciding to restart the activity because the
9797
// background thread hasn't checked in. Half a a minute is probably a bit
9898
// overgenous. In the example above we're picking 5 seconds to heartbeat
99-
99+
100100
// It is unrealistic to assume that a long running activity will succeed
101101
// so add a retry-policy to restart it when there's a failure.
102102
RetryPolicy: &workflow.RetryPolicy{
103-
InitialInterval: time.Second,
104-
MaximumInterval: time.Minute * 10,
105-
},
106-
})
107-
return ctx
103+
InitialInterval: time.Second,
104+
MaximumInterval: time.Minute * 10,
105+
MaximumAttempts: 10, // we expect this to have to restart a maximum of 10 times before giving up.
106+
},
107+
})
108+
return ctx
108109
}
109110

110111
func Workflow(ctx workflow.Context, config entity.Config) error {
111112

112-
log := workflow.GetLogger(ctx)
113-
ctx = setActivityOptions(ctx, config)
113+
log := workflow.GetLogger(ctx)
114+
ctx = setActivityOptions(ctx, config)
114115
err := workflow.ExecuteActivity(ctx, ABatchActivityName, config).Get(ctx, nil)
115116
if err != nil {
116117
log.Error("failed to execute activity", zap.Error(err), zap.Any("input", config))
117118

118119
}
119120

120-
log.Info("Workflow complete")
121-
return nil
121+
log.Info("Workflow complete")
122+
return nil
122123
}
123124
```

0 commit comments

Comments
 (0)