Skip to content

Commit ebe93bb

Browse files
committed
Make retry # available for activity execution
1 parent 8256a79 commit ebe93bb

File tree

10 files changed

+67
-6
lines changed

10 files changed

+67
-6
lines changed

activity/logger.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,8 @@ import (
1111
func Logger(ctx context.Context) *slog.Logger {
1212
return activity.GetActivityState(ctx).Logger
1313
}
14+
15+
// Attempt returns the current attempt of this activity execution
16+
func Attempt(ctx context.Context) int {
17+
return activity.GetActivityState(ctx).Attempt
18+
}

activitytester/activitytester.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,5 +15,5 @@ func WithActivityTestState(ctx context.Context, activityID, instanceID string, l
1515
logger = slog.Default()
1616
}
1717

18-
return activity.WithActivityState(ctx, activity.NewActivityState(activityID, core.NewWorkflowInstance(instanceID, ""), logger))
18+
return activity.WithActivityState(ctx, activity.NewActivityState(activityID, 0, core.NewWorkflowInstance(instanceID, ""), logger))
1919
}

backend/history/activity_scheduled.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import (
88
type ActivityScheduledAttributes struct {
99
Name string `json:"name,omitempty"`
1010

11+
Attempt int `json:"attempt,omitempty"`
12+
1113
Inputs []payload.Payload `json:"inputs,omitempty"`
1214

1315
Metadata *metadata.WorkflowMetadata `json:"metadata,omitempty"`

backend/test/e2e_activity.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"testing"
77
"time"
88

9+
"github.com/cschleiden/go-workflows/activity"
910
"github.com/cschleiden/go-workflows/client"
1011
"github.com/cschleiden/go-workflows/worker"
1112
"github.com/cschleiden/go-workflows/workflow"
@@ -75,6 +76,39 @@ var e2eActivityTests = []backendTest{
7576
require.NoError(t, err)
7677
},
7778
},
79+
{
80+
name: "Activity_ReceiveAttempt",
81+
f: func(t *testing.T, ctx context.Context, c *client.Client, w *worker.Worker, b TestBackend) {
82+
var maxAttempt int
83+
84+
a := func(context.Context) error {
85+
attempt := activity.Attempt(ctx)
86+
maxAttempt = attempt
87+
88+
if attempt < 2 {
89+
return &CustomError{msg: "custom error"}
90+
}
91+
92+
return nil
93+
}
94+
95+
wf := func(ctx workflow.Context) error {
96+
_, err := workflow.ExecuteActivity[int](ctx, workflow.ActivityOptions{
97+
RetryOptions: workflow.RetryOptions{
98+
MaxAttempts: 4,
99+
},
100+
}, a).Get(ctx)
101+
102+
return err
103+
}
104+
register(t, ctx, w, []interface{}{wf}, []interface{}{a})
105+
106+
_, err := runWorkflowWithResult[bool](t, ctx, c, wf)
107+
require.NoError(t, err)
108+
109+
require.Equal(t, 2, maxAttempt)
110+
},
111+
},
78112
{
79113
name: "Activity_ExtendTask",
80114
customWorkerOptions: func(w *worker.Options) {

docs/source/includes/_guide.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -355,6 +355,17 @@ func Activity1(ctx context.Context, name string) (int, error) {
355355

356356
With the default `DefaultActivityOptions`, Activities are retried up to three times when they return an error. If you want to keep automatic retries, but want to avoid them when hitting certain error types, you can wrap an error with `workflow.NewPermanentError`.
357357

358+
```go
359+
func Activity1(ctx context.Context, name string) (int, error) {
360+
// Current retry attempt
361+
attempt := activity.Attempt(ctx)
362+
363+
return http.Do("POST", "https://example.com", name)
364+
}
365+
```
366+
367+
`activity.Attempt` returns the current attempt retry.
368+
358369
## `ContinueAsNew`
359370

360371
```go

internal/activity/activitystate.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,18 +10,21 @@ import (
1010

1111
type ActivityState struct {
1212
ActivityID string
13+
Attempt int
1314
Instance *workflow.Instance
1415
Logger *slog.Logger
1516
}
1617

17-
func NewActivityState(activityID string, instance *workflow.Instance, logger *slog.Logger) *ActivityState {
18+
func NewActivityState(activityID string, attempt int, instance *workflow.Instance, logger *slog.Logger) *ActivityState {
1819
return &ActivityState{
1920
activityID,
21+
attempt,
2022
instance,
2123
logger.With(
2224
log.ActivityIDKey, activityID,
2325
log.InstanceIDKey, instance.InstanceID,
2426
log.ExecutionIDKey, instance.ExecutionID,
27+
log.AttemptKey, attempt,
2528
)}
2629
}
2730

internal/activity/executor.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/cschleiden/go-workflows/registry"
1818
wf "github.com/cschleiden/go-workflows/workflow"
1919
"go.opentelemetry.io/otel/attribute"
20+
"go.opentelemetry.io/otel/codes"
2021
"go.opentelemetry.io/otel/trace"
2122
)
2223

@@ -49,7 +50,7 @@ func (e *Executor) ExecuteActivity(ctx context.Context, task *backend.ActivityTa
4950

5051
activity, err := e.r.GetActivity(a.Name)
5152
if err != nil {
52-
return nil, err
53+
return nil, workflowerrors.NewPermanentError(fmt.Errorf("activity not found %w", err))
5354
}
5455

5556
activityFn := reflect.ValueOf(activity)
@@ -65,6 +66,7 @@ func (e *Executor) ExecuteActivity(ctx context.Context, task *backend.ActivityTa
6566
// Add activity state to context
6667
as := NewActivityState(
6768
task.Event.ID,
69+
a.Attempt,
6870
task.WorkflowInstance,
6971
e.logger)
7072
activityCtx := WithActivityState(ctx, as)
@@ -80,6 +82,7 @@ func (e *Executor) ExecuteActivity(ctx context.Context, task *backend.ActivityTa
8082
attribute.String(log.ActivityNameKey, a.Name),
8183
attribute.String(log.InstanceIDKey, task.WorkflowInstance.InstanceID),
8284
attribute.String(log.ActivityIDKey, task.ID),
85+
attribute.Int(log.AttemptKey, a.Attempt),
8386
))
8487
defer span.End()
8588

internal/command/schedule_activity.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,19 +12,21 @@ type ScheduleActivityCommand struct {
1212

1313
Name string
1414
Inputs []payload.Payload
15+
Attempt int
1516
Metadata *metadata.WorkflowMetadata
1617
}
1718

1819
var _ Command = (*ScheduleActivityCommand)(nil)
1920

20-
func NewScheduleActivityCommand(id int64, name string, inputs []payload.Payload, metadata *metadata.WorkflowMetadata) *ScheduleActivityCommand {
21+
func NewScheduleActivityCommand(id int64, name string, inputs []payload.Payload, attempt int, metadata *metadata.WorkflowMetadata) *ScheduleActivityCommand {
2122
return &ScheduleActivityCommand{
2223
command: command{
2324
id: id,
2425
name: "ScheduleActivity",
2526
state: CommandState_Pending,
2627
},
2728
Name: name,
29+
Attempt: attempt,
2830
Inputs: inputs,
2931
Metadata: metadata,
3032
}
@@ -41,6 +43,7 @@ func (c *ScheduleActivityCommand) Execute(clock clock.Clock) *CommandResult {
4143
&history.ActivityScheduledAttributes{
4244
Name: c.Name,
4345
Inputs: c.Inputs,
46+
Attempt: c.Attempt,
4447
Metadata: c.Metadata,
4548
},
4649
history.ScheduleEventID(c.id))

internal/command/schedule_activity_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ func TestScheduleActivityCommand_StateTransitions(t *testing.T) {
3737
for _, tt := range tests {
3838
t.Run(tt.name, func(t *testing.T) {
3939
clock := clock.NewMock()
40-
cmd := NewScheduleActivityCommand(1, "activity", []payload.Payload{}, &metadata.WorkflowMetadata{})
40+
cmd := NewScheduleActivityCommand(1, "activity", []payload.Payload{}, 0, &metadata.WorkflowMetadata{})
4141

4242
tt.f(t, cmd, clock)
4343
})

workflow/activity.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ func executeActivity[TResult any](ctx Context, options ActivityOptions, attempt
7171
return f
7272
}
7373

74-
cmd := command.NewScheduleActivityCommand(scheduleEventID, name, inputs, metadata)
74+
cmd := command.NewScheduleActivityCommand(scheduleEventID, name, inputs, attempt, metadata)
7575
wfState.AddCommand(cmd)
7676
wfState.TrackFuture(scheduleEventID, workflowstate.AsDecodingSettable(cv, fmt.Sprintf("activity: %s", name), f))
7777

0 commit comments

Comments
 (0)