Skip to content

Commit b40e6ea

Browse files
authored
Merge pull request #327 from cschleiden/make-activity-attempt-available
Make retry # available for activity execution
2 parents 8256a79 + 454d9de commit b40e6ea

File tree

11 files changed

+67
-7
lines changed

11 files changed

+67
-7
lines changed

activity/logger.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,8 @@ import (
1111
func Logger(ctx context.Context) *slog.Logger {
1212
return activity.GetActivityState(ctx).Logger
1313
}
14+
15+
// Attempt returns the current attempt of this activity execution
16+
func Attempt(ctx context.Context) int {
17+
return activity.GetActivityState(ctx).Attempt
18+
}

activitytester/activitytester.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,5 +15,5 @@ func WithActivityTestState(ctx context.Context, activityID, instanceID string, l
1515
logger = slog.Default()
1616
}
1717

18-
return activity.WithActivityState(ctx, activity.NewActivityState(activityID, core.NewWorkflowInstance(instanceID, ""), logger))
18+
return activity.WithActivityState(ctx, activity.NewActivityState(activityID, 0, core.NewWorkflowInstance(instanceID, ""), logger))
1919
}

backend/history/activity_scheduled.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import (
88
type ActivityScheduledAttributes struct {
99
Name string `json:"name,omitempty"`
1010

11+
Attempt int `json:"attempt,omitempty"`
12+
1113
Inputs []payload.Payload `json:"inputs,omitempty"`
1214

1315
Metadata *metadata.WorkflowMetadata `json:"metadata,omitempty"`

backend/test/e2e_activity.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"testing"
77
"time"
88

9+
"github.com/cschleiden/go-workflows/activity"
910
"github.com/cschleiden/go-workflows/client"
1011
"github.com/cschleiden/go-workflows/worker"
1112
"github.com/cschleiden/go-workflows/workflow"
@@ -75,6 +76,39 @@ var e2eActivityTests = []backendTest{
7576
require.NoError(t, err)
7677
},
7778
},
79+
{
80+
name: "Activity_ReceiveAttempt",
81+
f: func(t *testing.T, ctx context.Context, c *client.Client, w *worker.Worker, b TestBackend) {
82+
var maxAttempt int
83+
84+
a := func(ctx context.Context) error {
85+
attempt := activity.Attempt(ctx)
86+
maxAttempt = attempt
87+
88+
if attempt < 2 {
89+
return &CustomError{msg: "custom error"}
90+
}
91+
92+
return nil
93+
}
94+
95+
wf := func(ctx workflow.Context) error {
96+
_, err := workflow.ExecuteActivity[int](ctx, workflow.ActivityOptions{
97+
RetryOptions: workflow.RetryOptions{
98+
MaxAttempts: 4,
99+
},
100+
}, a).Get(ctx)
101+
102+
return err
103+
}
104+
register(t, ctx, w, []interface{}{wf}, []interface{}{a})
105+
106+
_, err := runWorkflowWithResult[bool](t, ctx, c, wf)
107+
require.NoError(t, err)
108+
109+
require.Equal(t, 2, maxAttempt)
110+
},
111+
},
78112
{
79113
name: "Activity_ExtendTask",
80114
customWorkerOptions: func(w *worker.Options) {

docs/source/includes/_guide.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -355,6 +355,17 @@ func Activity1(ctx context.Context, name string) (int, error) {
355355

356356
With the default `DefaultActivityOptions`, Activities are retried up to three times when they return an error. If you want to keep automatic retries, but want to avoid them when hitting certain error types, you can wrap an error with `workflow.NewPermanentError`.
357357

358+
```go
359+
func Activity1(ctx context.Context, name string) (int, error) {
360+
// Current retry attempt
361+
attempt := activity.Attempt(ctx)
362+
363+
return http.Do("POST", "https://example.com", name)
364+
}
365+
```
366+
367+
`activity.Attempt` returns the current attempt retry.
368+
358369
## `ContinueAsNew`
359370

360371
```go

internal/activity/activitystate.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,18 +10,21 @@ import (
1010

1111
type ActivityState struct {
1212
ActivityID string
13+
Attempt int
1314
Instance *workflow.Instance
1415
Logger *slog.Logger
1516
}
1617

17-
func NewActivityState(activityID string, instance *workflow.Instance, logger *slog.Logger) *ActivityState {
18+
func NewActivityState(activityID string, attempt int, instance *workflow.Instance, logger *slog.Logger) *ActivityState {
1819
return &ActivityState{
1920
activityID,
21+
attempt,
2022
instance,
2123
logger.With(
2224
log.ActivityIDKey, activityID,
2325
log.InstanceIDKey, instance.InstanceID,
2426
log.ExecutionIDKey, instance.ExecutionID,
27+
log.AttemptKey, attempt,
2528
)}
2629
}
2730

internal/activity/executor.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ func (e *Executor) ExecuteActivity(ctx context.Context, task *backend.ActivityTa
4949

5050
activity, err := e.r.GetActivity(a.Name)
5151
if err != nil {
52-
return nil, err
52+
return nil, workflowerrors.NewPermanentError(fmt.Errorf("activity not found: %w", err))
5353
}
5454

5555
activityFn := reflect.ValueOf(activity)
@@ -65,6 +65,7 @@ func (e *Executor) ExecuteActivity(ctx context.Context, task *backend.ActivityTa
6565
// Add activity state to context
6666
as := NewActivityState(
6767
task.Event.ID,
68+
a.Attempt,
6869
task.WorkflowInstance,
6970
e.logger)
7071
activityCtx := WithActivityState(ctx, as)
@@ -80,6 +81,7 @@ func (e *Executor) ExecuteActivity(ctx context.Context, task *backend.ActivityTa
8081
attribute.String(log.ActivityNameKey, a.Name),
8182
attribute.String(log.InstanceIDKey, task.WorkflowInstance.InstanceID),
8283
attribute.String(log.ActivityIDKey, task.ID),
84+
attribute.Int(log.AttemptKey, a.Attempt),
8385
))
8486
defer span.End()
8587

internal/activity/executor_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ func TestExecutor_ExecuteActivity(t *testing.T) {
3737
result: func(t *testing.T, result payload.Payload, err error) {
3838
require.Nil(t, result)
3939
require.Error(t, err)
40-
require.EqualError(t, err, "activity not found")
40+
require.EqualError(t, err, "activity not found: activity not found")
4141
},
4242
},
4343
{

internal/command/schedule_activity.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,19 +12,21 @@ type ScheduleActivityCommand struct {
1212

1313
Name string
1414
Inputs []payload.Payload
15+
Attempt int
1516
Metadata *metadata.WorkflowMetadata
1617
}
1718

1819
var _ Command = (*ScheduleActivityCommand)(nil)
1920

20-
func NewScheduleActivityCommand(id int64, name string, inputs []payload.Payload, metadata *metadata.WorkflowMetadata) *ScheduleActivityCommand {
21+
func NewScheduleActivityCommand(id int64, name string, inputs []payload.Payload, attempt int, metadata *metadata.WorkflowMetadata) *ScheduleActivityCommand {
2122
return &ScheduleActivityCommand{
2223
command: command{
2324
id: id,
2425
name: "ScheduleActivity",
2526
state: CommandState_Pending,
2627
},
2728
Name: name,
29+
Attempt: attempt,
2830
Inputs: inputs,
2931
Metadata: metadata,
3032
}
@@ -41,6 +43,7 @@ func (c *ScheduleActivityCommand) Execute(clock clock.Clock) *CommandResult {
4143
&history.ActivityScheduledAttributes{
4244
Name: c.Name,
4345
Inputs: c.Inputs,
46+
Attempt: c.Attempt,
4447
Metadata: c.Metadata,
4548
},
4649
history.ScheduleEventID(c.id))

internal/command/schedule_activity_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ func TestScheduleActivityCommand_StateTransitions(t *testing.T) {
3737
for _, tt := range tests {
3838
t.Run(tt.name, func(t *testing.T) {
3939
clock := clock.NewMock()
40-
cmd := NewScheduleActivityCommand(1, "activity", []payload.Payload{}, &metadata.WorkflowMetadata{})
40+
cmd := NewScheduleActivityCommand(1, "activity", []payload.Payload{}, 0, &metadata.WorkflowMetadata{})
4141

4242
tt.f(t, cmd, clock)
4343
})

0 commit comments

Comments
 (0)