Skip to content

Commit 353876b

Browse files
committed
chore: Support taskExecutionIds
taskExecutionId is a new field, the values is unique among retries of the activity Signed-off-by: Javier Aliaga <[email protected]>
1 parent fb03c12 commit 353876b

File tree

12 files changed

+1526
-1353
lines changed

12 files changed

+1526
-1353
lines changed

api/protos/orchestrator_service.pb.go

Lines changed: 1361 additions & 1309 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

api/protos/orchestrator_service_grpc.pb.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

backend/backend.go

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,16 @@ package backend
22

33
import (
44
"context"
5+
"encoding/json"
56
"errors"
67
"fmt"
78

9+
"google.golang.org/protobuf/proto"
10+
"google.golang.org/protobuf/types/known/timestamppb"
11+
812
"github.com/dapr/durabletask-go/api"
913
"github.com/dapr/durabletask-go/api/protos"
1014
"github.com/dapr/durabletask-go/backend/runtimestate"
11-
"google.golang.org/protobuf/proto"
12-
"google.golang.org/protobuf/types/known/timestamppb"
1315
)
1416

1517
var (
@@ -137,6 +139,15 @@ func MarshalHistoryEvent(e *HistoryEvent) ([]byte, error) {
137139
}
138140
}
139141

142+
// MarshalJsonHistoryEvent serializes the [HistoryEvent] into a protobuf byte array.
143+
func MarshalJsonHistoryEvent(e *HistoryEvent) ([]byte, error) {
144+
if bytes, err := json.Marshal(e); err != nil {
145+
return nil, fmt.Errorf("failed to marshal history event: %w", err)
146+
} else {
147+
return bytes, nil
148+
}
149+
}
150+
140151
// UnmarshalHistoryEvent deserializes a [HistoryEvent] from a protobuf byte array.
141152
func UnmarshalHistoryEvent(bytes []byte) (*HistoryEvent, error) {
142153
e := &protos.HistoryEvent{}

backend/executor.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package backend
22

33
import (
4-
context "context"
4+
"context"
55
"errors"
66
"fmt"
77
"strconv"
@@ -172,6 +172,7 @@ func (executor *grpcExecutor) ExecuteActivity(ctx context.Context, iid api.Insta
172172
Input: task.Input,
173173
OrchestrationInstance: &protos.OrchestrationInstance{InstanceId: string(iid)},
174174
TaskId: e.EventId,
175+
TaskExecutionId: task.TaskExecutionId,
175176
},
176177
},
177178
}

backend/runtimestate/runtimestate.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,13 @@ import (
55
"fmt"
66
"time"
77

8-
"github.com/dapr/durabletask-go/api"
9-
"github.com/dapr/durabletask-go/api/helpers"
10-
"github.com/dapr/durabletask-go/api/protos"
118
"github.com/google/uuid"
129
"google.golang.org/protobuf/types/known/timestamppb"
1310
"google.golang.org/protobuf/types/known/wrapperspb"
11+
12+
"github.com/dapr/durabletask-go/api"
13+
"github.com/dapr/durabletask-go/api/helpers"
14+
"github.com/dapr/durabletask-go/api/protos"
1415
)
1516

1617
var ErrDuplicateEvent = errors.New("duplicate event")
@@ -188,6 +189,7 @@ func ApplyActions(s *protos.OrchestrationRuntimeState, customStatus *wrapperspb.
188189
EventType: &protos.HistoryEvent_TaskScheduled{
189190
TaskScheduled: &protos.TaskScheduledEvent{
190191
Name: scheduleTask.Name,
192+
TaskExecutionId: scheduleTask.TaskExecutionId,
191193
Version: scheduleTask.Version,
192194
Input: scheduleTask.Input,
193195
ParentTraceContext: currentTraceContext,

client/worker_grpc.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,16 @@ import (
88
"time"
99

1010
"github.com/cenkalti/backoff/v4"
11-
"github.com/dapr/durabletask-go/api"
12-
"github.com/dapr/durabletask-go/api/protos"
13-
"github.com/dapr/durabletask-go/backend"
14-
"github.com/dapr/durabletask-go/task"
1511
"google.golang.org/grpc/codes"
1612
"google.golang.org/grpc/status"
1713
"google.golang.org/protobuf/types/known/emptypb"
1814
"google.golang.org/protobuf/types/known/timestamppb"
1915
"google.golang.org/protobuf/types/known/wrapperspb"
16+
17+
"github.com/dapr/durabletask-go/api"
18+
"github.com/dapr/durabletask-go/api/protos"
19+
"github.com/dapr/durabletask-go/backend"
20+
"github.com/dapr/durabletask-go/task"
2021
)
2122

2223
type workItemsStream interface {
@@ -183,6 +184,7 @@ func (c *TaskHubGrpcClient) processActivityWorkItem(
183184
Name: req.Name,
184185
Version: req.Version,
185186
Input: req.Input,
187+
TaskExecutionId: req.TaskExecutionId,
186188
ParentTraceContext: tc,
187189
},
188190
},

samples/retries/retries.go

Lines changed: 78 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,9 @@ import (
44
"context"
55
"encoding/json"
66
"errors"
7+
"fmt"
78
"log"
8-
"math/rand"
9+
"sync"
910
"time"
1011

1112
"github.com/dapr/durabletask-go/backend"
@@ -16,16 +17,18 @@ import (
1617
func main() {
1718
// Create a new task registry and add the orchestrator and activities
1819
r := task.NewTaskRegistry()
19-
r.AddOrchestrator(RetryActivityOrchestrator)
20-
r.AddActivity(RandomFailActivity)
20+
must(r.AddOrchestrator(RetryActivityOrchestrator))
21+
must(r.AddActivity(RandomFailActivity))
2122

2223
// Init the client
2324
ctx := context.Background()
2425
client, worker, err := Init(ctx, r)
2526
if err != nil {
2627
log.Fatalf("Failed to initialize the client: %v", err)
2728
}
28-
defer worker.Shutdown(ctx)
29+
defer func() {
30+
must(worker.Shutdown(ctx))
31+
}()
2932

3033
// Start a new orchestration
3134
id, err := client.ScheduleNewOrchestration(ctx, RetryActivityOrchestrator)
@@ -56,7 +59,7 @@ func Init(ctx context.Context, r *task.TaskRegistry) (backend.TaskHubClient, bac
5659

5760
// Create a new backend
5861
// Use the in-memory sqlite provider by specifying ""
59-
be := sqlite.NewSqliteBackend(sqlite.NewSqliteOptions(""), logger)
62+
be := sqlite.NewSqliteBackend(sqlite.NewSqliteOptions("/Users/javi/projects/javi-durabletask-go/backend/sqlite/retries"), logger)
6063
orchestrationWorker := backend.NewOrchestrationWorker(be, executor, logger)
6164
activityWorker := backend.NewActivityTaskWorker(be, executor, logger)
6265
taskHubWorker := backend.NewTaskHubWorker(be, orchestrationWorker, activityWorker, logger)
@@ -74,22 +77,89 @@ func Init(ctx context.Context, r *task.TaskRegistry) (backend.TaskHubClient, bac
7477
}
7578

7679
func RetryActivityOrchestrator(ctx *task.OrchestrationContext) (any, error) {
77-
if err := ctx.CallActivity(RandomFailActivity, task.WithActivityRetryPolicy(&task.RetryPolicy{
80+
t := ctx.CallActivity(RandomFailActivity, task.WithActivityRetryPolicy(&task.RetryPolicy{
7881
MaxAttempts: 10,
7982
InitialRetryInterval: 100 * time.Millisecond,
8083
BackoffCoefficient: 2,
8184
MaxRetryInterval: 3 * time.Second,
82-
})).Await(nil); err != nil {
85+
}))
86+
87+
t1 := ctx.CallActivity(RandomFailActivity, task.WithActivityRetryPolicy(&task.RetryPolicy{
88+
MaxAttempts: 10,
89+
InitialRetryInterval: 100 * time.Millisecond,
90+
BackoffCoefficient: 2,
91+
MaxRetryInterval: 3 * time.Second,
92+
}))
93+
94+
if err := t.Await(nil); err != nil {
8395
return nil, err
8496
}
97+
98+
if err := t1.Await(nil); err != nil {
99+
return nil, err
100+
}
101+
85102
return nil, nil
86103
}
87104

105+
type Counter struct {
106+
c int32
107+
lock sync.Mutex
108+
}
109+
110+
func (c *Counter) Increment() {
111+
c.lock.Lock()
112+
defer c.lock.Unlock()
113+
c.c++
114+
}
115+
116+
func (c *Counter) GetValue() int32 {
117+
c.lock.Lock()
118+
defer c.lock.Unlock()
119+
return c.c
120+
}
121+
122+
var (
123+
counters = make(map[string]*Counter)
124+
countersLock sync.RWMutex
125+
)
126+
127+
// GetCounter returns a Counter instance for the specified taskExecutionId.
128+
// If no counter exists for the taskExecutionId, a new one is created.
129+
func GetCounter(taskExecutionId string) *Counter {
130+
countersLock.RLock()
131+
counter, exists := counters[taskExecutionId]
132+
countersLock.RUnlock()
133+
134+
if !exists {
135+
countersLock.Lock()
136+
// Check again to handle race conditions
137+
counter, exists = counters[taskExecutionId]
138+
if !exists {
139+
counter = &Counter{}
140+
counters[taskExecutionId] = counter
141+
}
142+
countersLock.Unlock()
143+
}
144+
145+
return counter
146+
}
147+
88148
func RandomFailActivity(ctx task.ActivityContext) (any, error) {
149+
log.Println(fmt.Sprintf("#### [%v] activity %v failure", ctx.GetTaskExecutionId(), ctx.GetTaskID()))
150+
89151
// 70% possibility for activity failure
90-
if rand.Intn(100) <= 70 {
152+
if GetCounter(ctx.GetTaskExecutionId()).GetValue() != 5 {
91153
log.Println("random activity failure")
154+
GetCounter(ctx.GetTaskExecutionId()).Increment()
92155
return "", errors.New("random activity failure")
93156
}
157+
94158
return "ok", nil
95159
}
160+
161+
func must(err error) {
162+
if err != nil {
163+
panic(err)
164+
}
165+
}

task/activity.go

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,17 @@ import (
66
"math"
77
"time"
88

9-
"github.com/dapr/durabletask-go/api/protos"
109
"google.golang.org/protobuf/types/known/wrapperspb"
10+
11+
"github.com/dapr/durabletask-go/api/protos"
1112
)
1213

1314
type callActivityOption func(*callActivityOptions) error
1415

1516
type callActivityOptions struct {
16-
rawInput *wrapperspb.StringValue
17-
retryPolicy *RetryPolicy
17+
rawInput *wrapperspb.StringValue
18+
retryPolicy *RetryPolicy
19+
taskExecutionId string
1820
}
1921

2022
type RetryPolicy struct {
@@ -95,12 +97,15 @@ func WithActivityRetryPolicy(policy *RetryPolicy) callActivityOption {
9597
// ActivityContext is the context parameter type for activity implementations.
9698
type ActivityContext interface {
9799
GetInput(resultPtr any) error
100+
GetTaskID() int32
101+
GetTaskExecutionId() string
98102
Context() context.Context
99103
}
100104

101105
type activityContext struct {
102-
TaskID int32
103-
Name string
106+
TaskID int32
107+
TaskExecutionId string
108+
Name string
104109

105110
rawInput []byte
106111
ctx context.Context
@@ -111,10 +116,11 @@ type Activity func(ctx ActivityContext) (any, error)
111116

112117
func newTaskActivityContext(ctx context.Context, taskID int32, ts *protos.TaskScheduledEvent) *activityContext {
113118
return &activityContext{
114-
TaskID: taskID,
115-
Name: ts.Name,
116-
rawInput: []byte(ts.Input.GetValue()),
117-
ctx: ctx,
119+
TaskID: taskID,
120+
TaskExecutionId: ts.TaskExecutionId,
121+
Name: ts.Name,
122+
rawInput: []byte(ts.Input.GetValue()),
123+
ctx: ctx,
118124
}
119125
}
120126

@@ -126,3 +132,11 @@ func (actx *activityContext) GetInput(v any) error {
126132
func (actx *activityContext) Context() context.Context {
127133
return actx.ctx
128134
}
135+
136+
func (actx *activityContext) GetTaskID() int32 {
137+
return actx.TaskID
138+
}
139+
140+
func (actx *activityContext) GetTaskExecutionId() string {
141+
return actx.TaskExecutionId
142+
}

task/executor.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,12 @@ import (
66
"fmt"
77
"time"
88

9+
"google.golang.org/protobuf/types/known/timestamppb"
10+
"google.golang.org/protobuf/types/known/wrapperspb"
11+
912
"github.com/dapr/durabletask-go/api"
1013
"github.com/dapr/durabletask-go/api/protos"
1114
"github.com/dapr/durabletask-go/backend"
12-
"google.golang.org/protobuf/types/known/timestamppb"
13-
"google.golang.org/protobuf/types/known/wrapperspb"
1415
)
1516

1617
type taskExecutor struct {
@@ -42,6 +43,7 @@ func (te *taskExecutor) ExecuteActivity(ctx context.Context, id api.InstanceID,
4243
EventType: &protos.HistoryEvent_TaskFailed{
4344
TaskFailed: &protos.TaskFailedEvent{
4445
TaskScheduledId: e.EventId,
46+
TaskExecutionId: ts.GetTaskExecutionId(),
4547
FailureDetails: &protos.TaskFailureDetails{
4648
ErrorType: "TaskActivityNotRegistered",
4749
ErrorMessage: fmt.Sprintf("no task activity named '%s' was registered", ts.Name),
@@ -81,6 +83,7 @@ func (te *taskExecutor) ExecuteActivity(ctx context.Context, id api.InstanceID,
8183
EventType: &protos.HistoryEvent_TaskFailed{
8284
TaskFailed: &protos.TaskFailedEvent{
8385
TaskScheduledId: e.EventId,
86+
TaskExecutionId: ts.GetTaskExecutionId(),
8487
FailureDetails: &protos.TaskFailureDetails{
8588
ErrorType: fmt.Sprintf("%T", err),
8689
ErrorMessage: fmt.Sprintf("%+v", err),
@@ -98,6 +101,7 @@ func (te *taskExecutor) ExecuteActivity(ctx context.Context, id api.InstanceID,
98101
EventType: &protos.HistoryEvent_TaskFailed{
99102
TaskFailed: &protos.TaskFailedEvent{
100103
TaskScheduledId: e.EventId,
104+
TaskExecutionId: ts.GetTaskExecutionId(),
101105
FailureDetails: &protos.TaskFailureDetails{
102106
ErrorType: fmt.Sprintf("%T", err),
103107
ErrorMessage: fmt.Sprintf("%+v", err),
@@ -116,6 +120,7 @@ func (te *taskExecutor) ExecuteActivity(ctx context.Context, id api.InstanceID,
116120
EventType: &protos.HistoryEvent_TaskCompleted{
117121
TaskCompleted: &protos.TaskCompletedEvent{
118122
TaskScheduledId: e.EventId,
123+
TaskExecutionId: ts.GetTaskExecutionId(),
119124
Result: rawResult,
120125
},
121126
},

0 commit comments

Comments
 (0)