Skip to content

Commit 7445d71

Browse files
committed
update protos and fix conflicts
Signed-off-by: Cassandra Coyle <[email protected]>
2 parents 476e079 + 07b13de commit 7445d71

File tree

12 files changed

+379
-49
lines changed

12 files changed

+379
-49
lines changed

backend/executor.go

Lines changed: 26 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package backend
22

33
import (
4-
context "context"
4+
"context"
55
"errors"
66
"fmt"
77
"strconv"
@@ -49,15 +49,16 @@ type Executor interface {
4949
type grpcExecutor struct {
5050
protos.UnimplementedTaskHubSidecarServiceServer
5151

52-
workItemQueue chan *protos.WorkItem
53-
pendingOrchestrators *sync.Map // map[api.InstanceID]*ExecutionResults
54-
pendingActivities *sync.Map // map[string]*activityExecutionResult
55-
backend Backend
56-
logger Logger
57-
onWorkItemConnection func(context.Context) error
58-
onWorkItemDisconnect func(context.Context) error
59-
streamShutdownChan <-chan any
60-
streamSendTimeout *time.Duration
52+
workItemQueue chan *protos.WorkItem
53+
pendingOrchestrators *sync.Map // map[api.InstanceID]*ExecutionResults
54+
pendingActivities *sync.Map // map[string]*activityExecutionResult
55+
backend Backend
56+
logger Logger
57+
onWorkItemConnection func(context.Context) error
58+
onWorkItemDisconnect func(context.Context) error
59+
streamShutdownChan <-chan any
60+
streamSendTimeout *time.Duration
61+
skipWaitForInstanceStart bool
6162
}
6263

6364
type grpcExecutorOptions func(g *grpcExecutor)
@@ -98,6 +99,12 @@ func WithStreamSendTimeout(d time.Duration) grpcExecutorOptions {
9899
}
99100
}
100101

102+
func WithSkipWaitForInstanceStart() grpcExecutorOptions {
103+
return func(g *grpcExecutor) {
104+
g.skipWaitForInstanceStart = true
105+
}
106+
}
107+
101108
// NewGrpcExecutor returns the Executor object and a method to invoke to register the gRPC server in the executor.
102109
func NewGrpcExecutor(be Backend, logger Logger, opts ...grpcExecutorOptions) (executor Executor, registerServerFn func(grpcServer grpc.ServiceRegistrar)) {
103110
grpcExecutor := &grpcExecutor{
@@ -172,6 +179,7 @@ func (executor *grpcExecutor) ExecuteActivity(ctx context.Context, iid api.Insta
172179
Input: task.Input,
173180
OrchestrationInstance: &protos.OrchestrationInstance{InstanceId: string(iid)},
174181
TaskId: e.EventId,
182+
TaskExecutionId: task.TaskExecutionId,
175183
},
176184
},
177185
}
@@ -205,6 +213,7 @@ func (executor *grpcExecutor) ExecuteActivity(ctx context.Context, iid api.Insta
205213
EventType: &protos.HistoryEvent_TaskFailed{
206214
TaskFailed: &protos.TaskFailedEvent{
207215
TaskScheduledId: result.response.TaskId,
216+
TaskExecutionId: task.TaskExecutionId,
208217
FailureDetails: failureDetails,
209218
},
210219
},
@@ -218,6 +227,7 @@ func (executor *grpcExecutor) ExecuteActivity(ctx context.Context, iid api.Insta
218227
TaskCompleted: &protos.TaskCompletedEvent{
219228
TaskScheduledId: result.response.TaskId,
220229
Result: result.response.Result,
230+
TaskExecutionId: task.TaskExecutionId,
221231
},
222232
},
223233
Router: e.Router,
@@ -508,12 +518,14 @@ func (g *grpcExecutor) StartInstance(ctx context.Context, req *protos.CreateInst
508518
},
509519
}
510520
if err := g.backend.CreateOrchestrationInstance(ctx, e, WithOrchestrationIdReusePolicy(req.OrchestrationIdReusePolicy)); err != nil {
511-
return nil, err
521+
return nil, fmt.Errorf("failed to create orchestration instance: %w", err)
512522
}
513523

514-
_, err := g.WaitForInstanceStart(ctx, &protos.GetInstanceRequest{InstanceId: instanceID})
515-
if err != nil {
516-
return nil, err
524+
if !g.skipWaitForInstanceStart {
525+
_, err := g.WaitForInstanceStart(ctx, &protos.GetInstanceRequest{InstanceId: instanceID})
526+
if err != nil {
527+
return nil, err
528+
}
517529
}
518530

519531
return &protos.CreateInstanceResponse{InstanceId: instanceID}, nil

backend/runtimestate/runtimestate.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,13 @@ import (
55
"fmt"
66
"time"
77

8-
"github.com/dapr/durabletask-go/api"
9-
"github.com/dapr/durabletask-go/api/helpers"
10-
"github.com/dapr/durabletask-go/api/protos"
118
"github.com/google/uuid"
129
"google.golang.org/protobuf/types/known/timestamppb"
1310
"google.golang.org/protobuf/types/known/wrapperspb"
11+
12+
"github.com/dapr/durabletask-go/api"
13+
"github.com/dapr/durabletask-go/api/helpers"
14+
"github.com/dapr/durabletask-go/api/protos"
1415
)
1516

1617
var ErrDuplicateEvent = errors.New("duplicate event")
@@ -192,6 +193,7 @@ func ApplyActions(s *protos.OrchestrationRuntimeState, customStatus *wrapperspb.
192193
EventType: &protos.HistoryEvent_TaskScheduled{
193194
TaskScheduled: &protos.TaskScheduledEvent{
194195
Name: scheduleTask.Name,
196+
TaskExecutionId: scheduleTask.TaskExecutionId,
195197
Version: scheduleTask.Version,
196198
Input: scheduleTask.Input,
197199
ParentTraceContext: currentTraceContext,

client/worker_grpc.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,16 @@ import (
88
"time"
99

1010
"github.com/cenkalti/backoff/v4"
11-
"github.com/dapr/durabletask-go/api"
12-
"github.com/dapr/durabletask-go/api/protos"
13-
"github.com/dapr/durabletask-go/backend"
14-
"github.com/dapr/durabletask-go/task"
1511
"google.golang.org/grpc/codes"
1612
"google.golang.org/grpc/status"
1713
"google.golang.org/protobuf/types/known/emptypb"
1814
"google.golang.org/protobuf/types/known/timestamppb"
1915
"google.golang.org/protobuf/types/known/wrapperspb"
16+
17+
"github.com/dapr/durabletask-go/api"
18+
"github.com/dapr/durabletask-go/api/protos"
19+
"github.com/dapr/durabletask-go/backend"
20+
"github.com/dapr/durabletask-go/task"
2021
)
2122

2223
type workItemsStream interface {
@@ -183,6 +184,7 @@ func (c *TaskHubGrpcClient) processActivityWorkItem(
183184
Name: req.Name,
184185
Version: req.Version,
185186
Input: req.Input,
187+
TaskExecutionId: req.TaskExecutionId,
186188
ParentTraceContext: tc,
187189
},
188190
},

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ go 1.23.1
44

55
require (
66
github.com/cenkalti/backoff/v4 v4.3.0
7-
github.com/dapr/kit v0.13.1-0.20250110192255-fb195706966f
7+
github.com/dapr/kit v0.15.3-0.20250616160611-598b032bce69
88
github.com/google/uuid v1.6.0
99
github.com/jackc/pgx/v5 v5.7.4
1010
github.com/stretchr/testify v1.10.0

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
22
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
3-
github.com/dapr/kit v0.13.1-0.20250110192255-fb195706966f h1:gugkO8r833phJ31v/HhCUOHxznAkuxdsC6KMh391NOE=
4-
github.com/dapr/kit v0.13.1-0.20250110192255-fb195706966f/go.mod h1:HwFsBKEbcyLanWlDZE7u/jnaDCD/tU+n3pkFNUctQNw=
3+
github.com/dapr/kit v0.15.3-0.20250616160611-598b032bce69 h1:I1Uoy3fn906AZZdG8+n8fHitgY7Wn9c+smz4WQdOy1Q=
4+
github.com/dapr/kit v0.15.3-0.20250616160611-598b032bce69/go.mod h1:6w2Pr38zOAtBn+ld/jknwI4kgMfwanCIcFVnPykdPZQ=
55
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
66
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
77
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"errors"
7+
"fmt"
8+
"log"
9+
"sync"
10+
"sync/atomic"
11+
"time"
12+
13+
"github.com/dapr/durabletask-go/backend"
14+
"github.com/dapr/durabletask-go/backend/sqlite"
15+
"github.com/dapr/durabletask-go/task"
16+
)
17+
18+
func main() {
19+
// Create a new task registry and add the orchestrator and activities
20+
r := task.NewTaskRegistry()
21+
must(r.AddOrchestrator(RetryActivityOrchestrator))
22+
must(r.AddActivity(RandomFailActivity))
23+
24+
// Init the client
25+
ctx := context.Background()
26+
client, worker, err := Init(ctx, r)
27+
if err != nil {
28+
log.Fatalf("Failed to initialize the client: %v", err)
29+
}
30+
defer func() {
31+
must(worker.Shutdown(ctx))
32+
}()
33+
34+
// Start a new orchestration
35+
id, err := client.ScheduleNewOrchestration(ctx, RetryActivityOrchestrator)
36+
if err != nil {
37+
log.Fatalf("Failed to schedule new orchestration: %v", err)
38+
}
39+
40+
// Wait for the orchestration to complete
41+
metadata, err := client.WaitForOrchestrationCompletion(ctx, id)
42+
if err != nil {
43+
log.Fatalf("Failed to wait for orchestration to complete: %v", err)
44+
}
45+
46+
// Print the results
47+
metadataEnc, err := json.MarshalIndent(metadata, "", " ")
48+
if err != nil {
49+
log.Fatalf("Failed to encode result to JSON: %v", err)
50+
}
51+
log.Printf("Orchestration completed: %v", string(metadataEnc))
52+
}
53+
54+
// Init creates and initializes an in-memory client and worker pair with default configuration.
55+
func Init(ctx context.Context, r *task.TaskRegistry) (backend.TaskHubClient, backend.TaskHubWorker, error) {
56+
logger := backend.DefaultLogger()
57+
58+
// Create an executor
59+
executor := task.NewTaskExecutor(r)
60+
61+
// Create a new backend
62+
// Use the in-memory sqlite provider by specifying ""
63+
be := sqlite.NewSqliteBackend(sqlite.NewSqliteOptions(""), logger)
64+
orchestrationWorker := backend.NewOrchestrationWorker(be, executor, logger)
65+
activityWorker := backend.NewActivityTaskWorker(be, executor, logger)
66+
taskHubWorker := backend.NewTaskHubWorker(be, orchestrationWorker, activityWorker, logger)
67+
68+
// Start the worker
69+
err := taskHubWorker.Start(ctx)
70+
if err != nil {
71+
return nil, nil, err
72+
}
73+
74+
// Get the client to the backend
75+
taskHubClient := backend.NewTaskHubClient(be)
76+
77+
return taskHubClient, taskHubWorker, nil
78+
}
79+
80+
func RetryActivityOrchestrator(ctx *task.OrchestrationContext) (any, error) {
81+
t := ctx.CallActivity(RandomFailActivity, task.WithActivityRetryPolicy(&task.RetryPolicy{
82+
MaxAttempts: 10,
83+
InitialRetryInterval: 100 * time.Millisecond,
84+
BackoffCoefficient: 2,
85+
MaxRetryInterval: 3 * time.Second,
86+
}))
87+
88+
t1 := ctx.CallActivity(RandomFailActivity, task.WithActivityRetryPolicy(&task.RetryPolicy{
89+
MaxAttempts: 10,
90+
InitialRetryInterval: 100 * time.Millisecond,
91+
BackoffCoefficient: 2,
92+
MaxRetryInterval: 3 * time.Second,
93+
}))
94+
95+
if err := t.Await(nil); err != nil {
96+
return nil, err
97+
}
98+
99+
if err := t1.Await(nil); err != nil {
100+
return nil, err
101+
}
102+
103+
return nil, nil
104+
}
105+
106+
var (
107+
counters = sync.Map{}
108+
)
109+
110+
// getCounter returns a Counter instance for the specified taskExecutionId.
111+
// If no counter exists for the taskExecutionId, a new one is created.
112+
func getCounter(taskExecutionId string) *atomic.Int32 {
113+
counter, _ := counters.LoadOrStore(taskExecutionId, &atomic.Int32{})
114+
return counter.(*atomic.Int32)
115+
}
116+
117+
func RandomFailActivity(ctx task.ActivityContext) (any, error) {
118+
log.Println(fmt.Sprintf("#### [%v] activity %v failure", ctx.GetTaskExecutionId(), ctx.GetTaskID()))
119+
counter := getCounter(ctx.GetTaskExecutionId())
120+
// The activity should fail 5 times before succeeding.
121+
if counter.Load() != 5 {
122+
log.Println("random activity failure")
123+
counter.Add(1)
124+
return "", errors.New("random activity failure")
125+
}
126+
127+
return "ok", nil
128+
}
129+
130+
func must(err error) {
131+
if err != nil {
132+
panic(err)
133+
}
134+
}

submodules/durabletask-protobuf

Submodule durabletask-protobuf deleted from 1b7094d

task/activity.go

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,9 @@ import (
66
"math"
77
"time"
88

9-
"github.com/dapr/durabletask-go/api/protos"
109
"google.golang.org/protobuf/types/known/wrapperspb"
10+
11+
"github.com/dapr/durabletask-go/api/protos"
1112
)
1213

1314
type callActivityOption func(*callActivityOptions) error
@@ -103,12 +104,15 @@ func WithActivityRetryPolicy(policy *RetryPolicy) callActivityOption {
103104
// ActivityContext is the context parameter type for activity implementations.
104105
type ActivityContext interface {
105106
GetInput(resultPtr any) error
107+
GetTaskID() int32
108+
GetTaskExecutionId() string
106109
Context() context.Context
107110
}
108111

109112
type activityContext struct {
110-
TaskID int32
111-
Name string
113+
TaskID int32
114+
TaskExecutionId string
115+
Name string
112116

113117
rawInput []byte
114118
ctx context.Context
@@ -119,10 +123,11 @@ type Activity func(ctx ActivityContext) (any, error)
119123

120124
func newTaskActivityContext(ctx context.Context, taskID int32, ts *protos.TaskScheduledEvent) *activityContext {
121125
return &activityContext{
122-
TaskID: taskID,
123-
Name: ts.Name,
124-
rawInput: []byte(ts.Input.GetValue()),
125-
ctx: ctx,
126+
TaskID: taskID,
127+
TaskExecutionId: ts.TaskExecutionId,
128+
Name: ts.Name,
129+
rawInput: []byte(ts.Input.GetValue()),
130+
ctx: ctx,
126131
}
127132
}
128133

@@ -134,3 +139,11 @@ func (actx *activityContext) GetInput(v any) error {
134139
func (actx *activityContext) Context() context.Context {
135140
return actx.ctx
136141
}
142+
143+
func (actx *activityContext) GetTaskID() int32 {
144+
return actx.TaskID
145+
}
146+
147+
func (actx *activityContext) GetTaskExecutionId() string {
148+
return actx.TaskExecutionId
149+
}

0 commit comments

Comments
 (0)