Skip to content

Commit 116dec8

Browse files
authored
Merge pull request #25 from javier-aliaga/javi-activity-execution-id
chore: Support taskExecutionIds
2 parents 15af16c + 8fa1510 commit 116dec8

File tree

12 files changed

+2699
-2188
lines changed

12 files changed

+2699
-2188
lines changed

api/protos/orchestrator_service.pb.go

Lines changed: 2343 additions & 2154 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

api/protos/orchestrator_service_grpc.pb.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

backend/executor.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package backend
22

33
import (
4-
context "context"
4+
"context"
55
"errors"
66
"fmt"
77
"strconv"
@@ -179,6 +179,7 @@ func (executor *grpcExecutor) ExecuteActivity(ctx context.Context, iid api.Insta
179179
Input: task.Input,
180180
OrchestrationInstance: &protos.OrchestrationInstance{InstanceId: string(iid)},
181181
TaskId: e.EventId,
182+
TaskExecutionId: task.TaskExecutionId,
182183
},
183184
},
184185
}
@@ -212,6 +213,7 @@ func (executor *grpcExecutor) ExecuteActivity(ctx context.Context, iid api.Insta
212213
EventType: &protos.HistoryEvent_TaskFailed{
213214
TaskFailed: &protos.TaskFailedEvent{
214215
TaskScheduledId: result.response.TaskId,
216+
TaskExecutionId: task.TaskExecutionId,
215217
FailureDetails: failureDetails,
216218
},
217219
},
@@ -224,6 +226,7 @@ func (executor *grpcExecutor) ExecuteActivity(ctx context.Context, iid api.Insta
224226
TaskCompleted: &protos.TaskCompletedEvent{
225227
TaskScheduledId: result.response.TaskId,
226228
Result: result.response.Result,
229+
TaskExecutionId: task.TaskExecutionId,
227230
},
228231
},
229232
}

backend/runtimestate/runtimestate.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,13 @@ import (
55
"fmt"
66
"time"
77

8-
"github.com/dapr/durabletask-go/api"
9-
"github.com/dapr/durabletask-go/api/helpers"
10-
"github.com/dapr/durabletask-go/api/protos"
118
"github.com/google/uuid"
129
"google.golang.org/protobuf/types/known/timestamppb"
1310
"google.golang.org/protobuf/types/known/wrapperspb"
11+
12+
"github.com/dapr/durabletask-go/api"
13+
"github.com/dapr/durabletask-go/api/helpers"
14+
"github.com/dapr/durabletask-go/api/protos"
1415
)
1516

1617
var ErrDuplicateEvent = errors.New("duplicate event")
@@ -188,6 +189,7 @@ func ApplyActions(s *protos.OrchestrationRuntimeState, customStatus *wrapperspb.
188189
EventType: &protos.HistoryEvent_TaskScheduled{
189190
TaskScheduled: &protos.TaskScheduledEvent{
190191
Name: scheduleTask.Name,
192+
TaskExecutionId: scheduleTask.TaskExecutionId,
191193
Version: scheduleTask.Version,
192194
Input: scheduleTask.Input,
193195
ParentTraceContext: currentTraceContext,

client/worker_grpc.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,16 @@ import (
88
"time"
99

1010
"github.com/cenkalti/backoff/v4"
11-
"github.com/dapr/durabletask-go/api"
12-
"github.com/dapr/durabletask-go/api/protos"
13-
"github.com/dapr/durabletask-go/backend"
14-
"github.com/dapr/durabletask-go/task"
1511
"google.golang.org/grpc/codes"
1612
"google.golang.org/grpc/status"
1713
"google.golang.org/protobuf/types/known/emptypb"
1814
"google.golang.org/protobuf/types/known/timestamppb"
1915
"google.golang.org/protobuf/types/known/wrapperspb"
16+
17+
"github.com/dapr/durabletask-go/api"
18+
"github.com/dapr/durabletask-go/api/protos"
19+
"github.com/dapr/durabletask-go/backend"
20+
"github.com/dapr/durabletask-go/task"
2021
)
2122

2223
type workItemsStream interface {
@@ -183,6 +184,7 @@ func (c *TaskHubGrpcClient) processActivityWorkItem(
183184
Name: req.Name,
184185
Version: req.Version,
185186
Input: req.Input,
187+
TaskExecutionId: req.TaskExecutionId,
186188
ParentTraceContext: tc,
187189
},
188190
},
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"errors"
7+
"fmt"
8+
"log"
9+
"sync"
10+
"sync/atomic"
11+
"time"
12+
13+
"github.com/dapr/durabletask-go/backend"
14+
"github.com/dapr/durabletask-go/backend/sqlite"
15+
"github.com/dapr/durabletask-go/task"
16+
)
17+
18+
func main() {
19+
// Create a new task registry and add the orchestrator and activities
20+
r := task.NewTaskRegistry()
21+
must(r.AddOrchestrator(RetryActivityOrchestrator))
22+
must(r.AddActivity(RandomFailActivity))
23+
24+
// Init the client
25+
ctx := context.Background()
26+
client, worker, err := Init(ctx, r)
27+
if err != nil {
28+
log.Fatalf("Failed to initialize the client: %v", err)
29+
}
30+
defer func() {
31+
must(worker.Shutdown(ctx))
32+
}()
33+
34+
// Start a new orchestration
35+
id, err := client.ScheduleNewOrchestration(ctx, RetryActivityOrchestrator)
36+
if err != nil {
37+
log.Fatalf("Failed to schedule new orchestration: %v", err)
38+
}
39+
40+
// Wait for the orchestration to complete
41+
metadata, err := client.WaitForOrchestrationCompletion(ctx, id)
42+
if err != nil {
43+
log.Fatalf("Failed to wait for orchestration to complete: %v", err)
44+
}
45+
46+
// Print the results
47+
metadataEnc, err := json.MarshalIndent(metadata, "", " ")
48+
if err != nil {
49+
log.Fatalf("Failed to encode result to JSON: %v", err)
50+
}
51+
log.Printf("Orchestration completed: %v", string(metadataEnc))
52+
}
53+
54+
// Init creates and initializes an in-memory client and worker pair with default configuration.
55+
func Init(ctx context.Context, r *task.TaskRegistry) (backend.TaskHubClient, backend.TaskHubWorker, error) {
56+
logger := backend.DefaultLogger()
57+
58+
// Create an executor
59+
executor := task.NewTaskExecutor(r)
60+
61+
// Create a new backend
62+
// Use the in-memory sqlite provider by specifying ""
63+
be := sqlite.NewSqliteBackend(sqlite.NewSqliteOptions(""), logger)
64+
orchestrationWorker := backend.NewOrchestrationWorker(be, executor, logger)
65+
activityWorker := backend.NewActivityTaskWorker(be, executor, logger)
66+
taskHubWorker := backend.NewTaskHubWorker(be, orchestrationWorker, activityWorker, logger)
67+
68+
// Start the worker
69+
err := taskHubWorker.Start(ctx)
70+
if err != nil {
71+
return nil, nil, err
72+
}
73+
74+
// Get the client to the backend
75+
taskHubClient := backend.NewTaskHubClient(be)
76+
77+
return taskHubClient, taskHubWorker, nil
78+
}
79+
80+
func RetryActivityOrchestrator(ctx *task.OrchestrationContext) (any, error) {
81+
t := ctx.CallActivity(RandomFailActivity, task.WithActivityRetryPolicy(&task.RetryPolicy{
82+
MaxAttempts: 10,
83+
InitialRetryInterval: 100 * time.Millisecond,
84+
BackoffCoefficient: 2,
85+
MaxRetryInterval: 3 * time.Second,
86+
}))
87+
88+
t1 := ctx.CallActivity(RandomFailActivity, task.WithActivityRetryPolicy(&task.RetryPolicy{
89+
MaxAttempts: 10,
90+
InitialRetryInterval: 100 * time.Millisecond,
91+
BackoffCoefficient: 2,
92+
MaxRetryInterval: 3 * time.Second,
93+
}))
94+
95+
if err := t.Await(nil); err != nil {
96+
return nil, err
97+
}
98+
99+
if err := t1.Await(nil); err != nil {
100+
return nil, err
101+
}
102+
103+
return nil, nil
104+
}
105+
106+
var (
107+
counters = sync.Map{}
108+
)
109+
110+
// getCounter returns a Counter instance for the specified taskExecutionId.
111+
// If no counter exists for the taskExecutionId, a new one is created.
112+
func getCounter(taskExecutionId string) *atomic.Int32 {
113+
counter, _ := counters.LoadOrStore(taskExecutionId, &atomic.Int32{})
114+
return counter.(*atomic.Int32)
115+
}
116+
117+
func RandomFailActivity(ctx task.ActivityContext) (any, error) {
118+
log.Println(fmt.Sprintf("#### [%v] activity %v failure", ctx.GetTaskExecutionId(), ctx.GetTaskID()))
119+
counter := getCounter(ctx.GetTaskExecutionId())
120+
// The activity should fail 5 times before succeeding.
121+
if counter.Load() != 5 {
122+
log.Println("random activity failure")
123+
counter.Add(1)
124+
return "", errors.New("random activity failure")
125+
}
126+
127+
return "ok", nil
128+
}
129+
130+
func must(err error) {
131+
if err != nil {
132+
panic(err)
133+
}
134+
}

task/activity.go

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,9 @@ import (
66
"math"
77
"time"
88

9-
"github.com/dapr/durabletask-go/api/protos"
109
"google.golang.org/protobuf/types/known/wrapperspb"
10+
11+
"github.com/dapr/durabletask-go/api/protos"
1112
)
1213

1314
type callActivityOption func(*callActivityOptions) error
@@ -95,12 +96,15 @@ func WithActivityRetryPolicy(policy *RetryPolicy) callActivityOption {
9596
// ActivityContext is the context parameter type for activity implementations.
9697
type ActivityContext interface {
9798
GetInput(resultPtr any) error
99+
GetTaskID() int32
100+
GetTaskExecutionId() string
98101
Context() context.Context
99102
}
100103

101104
type activityContext struct {
102-
TaskID int32
103-
Name string
105+
TaskID int32
106+
TaskExecutionId string
107+
Name string
104108

105109
rawInput []byte
106110
ctx context.Context
@@ -111,10 +115,11 @@ type Activity func(ctx ActivityContext) (any, error)
111115

112116
func newTaskActivityContext(ctx context.Context, taskID int32, ts *protos.TaskScheduledEvent) *activityContext {
113117
return &activityContext{
114-
TaskID: taskID,
115-
Name: ts.Name,
116-
rawInput: []byte(ts.Input.GetValue()),
117-
ctx: ctx,
118+
TaskID: taskID,
119+
TaskExecutionId: ts.TaskExecutionId,
120+
Name: ts.Name,
121+
rawInput: []byte(ts.Input.GetValue()),
122+
ctx: ctx,
118123
}
119124
}
120125

@@ -126,3 +131,11 @@ func (actx *activityContext) GetInput(v any) error {
126131
func (actx *activityContext) Context() context.Context {
127132
return actx.ctx
128133
}
134+
135+
func (actx *activityContext) GetTaskID() int32 {
136+
return actx.TaskID
137+
}
138+
139+
func (actx *activityContext) GetTaskExecutionId() string {
140+
return actx.TaskExecutionId
141+
}

task/executor.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,12 @@ import (
66
"fmt"
77
"time"
88

9+
"google.golang.org/protobuf/types/known/timestamppb"
10+
"google.golang.org/protobuf/types/known/wrapperspb"
11+
912
"github.com/dapr/durabletask-go/api"
1013
"github.com/dapr/durabletask-go/api/protos"
1114
"github.com/dapr/durabletask-go/backend"
12-
"google.golang.org/protobuf/types/known/timestamppb"
13-
"google.golang.org/protobuf/types/known/wrapperspb"
1415
)
1516

1617
type taskExecutor struct {
@@ -42,6 +43,7 @@ func (te *taskExecutor) ExecuteActivity(ctx context.Context, id api.InstanceID,
4243
EventType: &protos.HistoryEvent_TaskFailed{
4344
TaskFailed: &protos.TaskFailedEvent{
4445
TaskScheduledId: e.EventId,
46+
TaskExecutionId: ts.GetTaskExecutionId(),
4547
FailureDetails: &protos.TaskFailureDetails{
4648
ErrorType: "TaskActivityNotRegistered",
4749
ErrorMessage: fmt.Sprintf("no task activity named '%s' was registered", ts.Name),
@@ -81,6 +83,7 @@ func (te *taskExecutor) ExecuteActivity(ctx context.Context, id api.InstanceID,
8183
EventType: &protos.HistoryEvent_TaskFailed{
8284
TaskFailed: &protos.TaskFailedEvent{
8385
TaskScheduledId: e.EventId,
86+
TaskExecutionId: ts.GetTaskExecutionId(),
8487
FailureDetails: &protos.TaskFailureDetails{
8588
ErrorType: fmt.Sprintf("%T", err),
8689
ErrorMessage: fmt.Sprintf("%+v", err),
@@ -98,6 +101,7 @@ func (te *taskExecutor) ExecuteActivity(ctx context.Context, id api.InstanceID,
98101
EventType: &protos.HistoryEvent_TaskFailed{
99102
TaskFailed: &protos.TaskFailedEvent{
100103
TaskScheduledId: e.EventId,
104+
TaskExecutionId: ts.GetTaskExecutionId(),
101105
FailureDetails: &protos.TaskFailureDetails{
102106
ErrorType: fmt.Sprintf("%T", err),
103107
ErrorMessage: fmt.Sprintf("%+v", err),
@@ -116,6 +120,7 @@ func (te *taskExecutor) ExecuteActivity(ctx context.Context, id api.InstanceID,
116120
EventType: &protos.HistoryEvent_TaskCompleted{
117121
TaskCompleted: &protos.TaskCompletedEvent{
118122
TaskScheduledId: e.EventId,
123+
TaskExecutionId: ts.GetTaskExecutionId(),
119124
Result: rawResult,
120125
},
121126
},

0 commit comments

Comments
 (0)