Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4,497 changes: 2,343 additions & 2,154 deletions api/protos/orchestrator_service.pb.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion api/protos/orchestrator_service_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion backend/executor.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package backend

import (
context "context"
"context"
"errors"
"fmt"
"strconv"
Expand Down Expand Up @@ -179,6 +179,7 @@ func (executor *grpcExecutor) ExecuteActivity(ctx context.Context, iid api.Insta
Input: task.Input,
OrchestrationInstance: &protos.OrchestrationInstance{InstanceId: string(iid)},
TaskId: e.EventId,
TaskExecutionId: task.TaskExecutionId,
},
},
}
Expand Down Expand Up @@ -212,6 +213,7 @@ func (executor *grpcExecutor) ExecuteActivity(ctx context.Context, iid api.Insta
EventType: &protos.HistoryEvent_TaskFailed{
TaskFailed: &protos.TaskFailedEvent{
TaskScheduledId: result.response.TaskId,
TaskExecutionId: task.TaskExecutionId,
FailureDetails: failureDetails,
},
},
Expand All @@ -224,6 +226,7 @@ func (executor *grpcExecutor) ExecuteActivity(ctx context.Context, iid api.Insta
TaskCompleted: &protos.TaskCompletedEvent{
TaskScheduledId: result.response.TaskId,
Result: result.response.Result,
TaskExecutionId: task.TaskExecutionId,
},
},
}
Expand Down
8 changes: 5 additions & 3 deletions backend/runtimestate/runtimestate.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ import (
"fmt"
"time"

"github.com/dapr/durabletask-go/api"
"github.com/dapr/durabletask-go/api/helpers"
"github.com/dapr/durabletask-go/api/protos"
"github.com/google/uuid"
"google.golang.org/protobuf/types/known/timestamppb"
"google.golang.org/protobuf/types/known/wrapperspb"

"github.com/dapr/durabletask-go/api"
"github.com/dapr/durabletask-go/api/helpers"
"github.com/dapr/durabletask-go/api/protos"
)

var ErrDuplicateEvent = errors.New("duplicate event")
Expand Down Expand Up @@ -188,6 +189,7 @@ func ApplyActions(s *protos.OrchestrationRuntimeState, customStatus *wrapperspb.
EventType: &protos.HistoryEvent_TaskScheduled{
TaskScheduled: &protos.TaskScheduledEvent{
Name: scheduleTask.Name,
TaskExecutionId: scheduleTask.TaskExecutionId,
Version: scheduleTask.Version,
Input: scheduleTask.Input,
ParentTraceContext: currentTraceContext,
Expand Down
10 changes: 6 additions & 4 deletions client/worker_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,16 @@ import (
"time"

"github.com/cenkalti/backoff/v4"
"github.com/dapr/durabletask-go/api"
"github.com/dapr/durabletask-go/api/protos"
"github.com/dapr/durabletask-go/backend"
"github.com/dapr/durabletask-go/task"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"
"google.golang.org/protobuf/types/known/timestamppb"
"google.golang.org/protobuf/types/known/wrapperspb"

"github.com/dapr/durabletask-go/api"
"github.com/dapr/durabletask-go/api/protos"
"github.com/dapr/durabletask-go/backend"
"github.com/dapr/durabletask-go/task"
)

type workItemsStream interface {
Expand Down Expand Up @@ -183,6 +184,7 @@ func (c *TaskHubGrpcClient) processActivityWorkItem(
Name: req.Name,
Version: req.Version,
Input: req.Input,
TaskExecutionId: req.TaskExecutionId,
ParentTraceContext: tc,
},
},
Expand Down
134 changes: 134 additions & 0 deletions samples/taskexecutionid/taskexecutionid.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package main

import (
"context"
"encoding/json"
"errors"
"fmt"
"log"
"sync"
"sync/atomic"
"time"

"github.com/dapr/durabletask-go/backend"
"github.com/dapr/durabletask-go/backend/sqlite"
"github.com/dapr/durabletask-go/task"
)

func main() {
// Create a new task registry and add the orchestrator and activities
r := task.NewTaskRegistry()
must(r.AddOrchestrator(RetryActivityOrchestrator))
must(r.AddActivity(RandomFailActivity))

// Init the client
ctx := context.Background()
client, worker, err := Init(ctx, r)
if err != nil {
log.Fatalf("Failed to initialize the client: %v", err)
}
defer func() {
must(worker.Shutdown(ctx))
}()

// Start a new orchestration
id, err := client.ScheduleNewOrchestration(ctx, RetryActivityOrchestrator)
if err != nil {
log.Fatalf("Failed to schedule new orchestration: %v", err)
}

// Wait for the orchestration to complete
metadata, err := client.WaitForOrchestrationCompletion(ctx, id)
if err != nil {
log.Fatalf("Failed to wait for orchestration to complete: %v", err)
}

// Print the results
metadataEnc, err := json.MarshalIndent(metadata, "", " ")
if err != nil {
log.Fatalf("Failed to encode result to JSON: %v", err)
}
log.Printf("Orchestration completed: %v", string(metadataEnc))
}

// Init creates and initializes an in-memory client and worker pair with default configuration.
func Init(ctx context.Context, r *task.TaskRegistry) (backend.TaskHubClient, backend.TaskHubWorker, error) {
logger := backend.DefaultLogger()

// Create an executor
executor := task.NewTaskExecutor(r)

// Create a new backend
// Use the in-memory sqlite provider by specifying ""
be := sqlite.NewSqliteBackend(sqlite.NewSqliteOptions(""), logger)
orchestrationWorker := backend.NewOrchestrationWorker(be, executor, logger)
activityWorker := backend.NewActivityTaskWorker(be, executor, logger)
taskHubWorker := backend.NewTaskHubWorker(be, orchestrationWorker, activityWorker, logger)

// Start the worker
err := taskHubWorker.Start(ctx)
if err != nil {
return nil, nil, err
}

// Get the client to the backend
taskHubClient := backend.NewTaskHubClient(be)

return taskHubClient, taskHubWorker, nil
}

func RetryActivityOrchestrator(ctx *task.OrchestrationContext) (any, error) {
t := ctx.CallActivity(RandomFailActivity, task.WithActivityRetryPolicy(&task.RetryPolicy{
MaxAttempts: 10,
InitialRetryInterval: 100 * time.Millisecond,
BackoffCoefficient: 2,
MaxRetryInterval: 3 * time.Second,
}))

t1 := ctx.CallActivity(RandomFailActivity, task.WithActivityRetryPolicy(&task.RetryPolicy{
MaxAttempts: 10,
InitialRetryInterval: 100 * time.Millisecond,
BackoffCoefficient: 2,
MaxRetryInterval: 3 * time.Second,
}))

if err := t.Await(nil); err != nil {
return nil, err
}

if err := t1.Await(nil); err != nil {
return nil, err
}

return nil, nil
}

var (
counters = sync.Map{}
)

// getCounter returns a Counter instance for the specified taskExecutionId.
// If no counter exists for the taskExecutionId, a new one is created.
func getCounter(taskExecutionId string) *atomic.Int32 {
counter, _ := counters.LoadOrStore(taskExecutionId, &atomic.Int32{})
return counter.(*atomic.Int32)
}

func RandomFailActivity(ctx task.ActivityContext) (any, error) {
log.Println(fmt.Sprintf("#### [%v] activity %v failure", ctx.GetTaskExecutionId(), ctx.GetTaskID()))
counter := getCounter(ctx.GetTaskExecutionId())
// The activity should fail 5 times before succeeding.
if counter.Load() != 5 {
log.Println("random activity failure")
counter.Add(1)
return "", errors.New("random activity failure")
}

return "ok", nil
}

func must(err error) {
if err != nil {
panic(err)
}
}
27 changes: 20 additions & 7 deletions task/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ import (
"math"
"time"

"github.com/dapr/durabletask-go/api/protos"
"google.golang.org/protobuf/types/known/wrapperspb"

"github.com/dapr/durabletask-go/api/protos"
)

type callActivityOption func(*callActivityOptions) error
Expand Down Expand Up @@ -95,12 +96,15 @@ func WithActivityRetryPolicy(policy *RetryPolicy) callActivityOption {
// ActivityContext is the context parameter type for activity implementations.
type ActivityContext interface {
GetInput(resultPtr any) error
GetTaskID() int32
GetTaskExecutionId() string
Context() context.Context
}

type activityContext struct {
TaskID int32
Name string
TaskID int32
TaskExecutionId string
Name string

rawInput []byte
ctx context.Context
Expand All @@ -111,10 +115,11 @@ type Activity func(ctx ActivityContext) (any, error)

func newTaskActivityContext(ctx context.Context, taskID int32, ts *protos.TaskScheduledEvent) *activityContext {
return &activityContext{
TaskID: taskID,
Name: ts.Name,
rawInput: []byte(ts.Input.GetValue()),
ctx: ctx,
TaskID: taskID,
TaskExecutionId: ts.TaskExecutionId,
Name: ts.Name,
rawInput: []byte(ts.Input.GetValue()),
ctx: ctx,
}
}

Expand All @@ -126,3 +131,11 @@ func (actx *activityContext) GetInput(v any) error {
func (actx *activityContext) Context() context.Context {
return actx.ctx
}

func (actx *activityContext) GetTaskID() int32 {
return actx.TaskID
}

func (actx *activityContext) GetTaskExecutionId() string {
return actx.TaskExecutionId
}
9 changes: 7 additions & 2 deletions task/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ import (
"fmt"
"time"

"google.golang.org/protobuf/types/known/timestamppb"
"google.golang.org/protobuf/types/known/wrapperspb"

"github.com/dapr/durabletask-go/api"
"github.com/dapr/durabletask-go/api/protos"
"github.com/dapr/durabletask-go/backend"
"google.golang.org/protobuf/types/known/timestamppb"
"google.golang.org/protobuf/types/known/wrapperspb"
)

type taskExecutor struct {
Expand Down Expand Up @@ -42,6 +43,7 @@ func (te *taskExecutor) ExecuteActivity(ctx context.Context, id api.InstanceID,
EventType: &protos.HistoryEvent_TaskFailed{
TaskFailed: &protos.TaskFailedEvent{
TaskScheduledId: e.EventId,
TaskExecutionId: ts.GetTaskExecutionId(),
FailureDetails: &protos.TaskFailureDetails{
ErrorType: "TaskActivityNotRegistered",
ErrorMessage: fmt.Sprintf("no task activity named '%s' was registered", ts.Name),
Expand Down Expand Up @@ -81,6 +83,7 @@ func (te *taskExecutor) ExecuteActivity(ctx context.Context, id api.InstanceID,
EventType: &protos.HistoryEvent_TaskFailed{
TaskFailed: &protos.TaskFailedEvent{
TaskScheduledId: e.EventId,
TaskExecutionId: ts.GetTaskExecutionId(),
FailureDetails: &protos.TaskFailureDetails{
ErrorType: fmt.Sprintf("%T", err),
ErrorMessage: fmt.Sprintf("%+v", err),
Expand All @@ -98,6 +101,7 @@ func (te *taskExecutor) ExecuteActivity(ctx context.Context, id api.InstanceID,
EventType: &protos.HistoryEvent_TaskFailed{
TaskFailed: &protos.TaskFailedEvent{
TaskScheduledId: e.EventId,
TaskExecutionId: ts.GetTaskExecutionId(),
FailureDetails: &protos.TaskFailureDetails{
ErrorType: fmt.Sprintf("%T", err),
ErrorMessage: fmt.Sprintf("%+v", err),
Expand All @@ -116,6 +120,7 @@ func (te *taskExecutor) ExecuteActivity(ctx context.Context, id api.InstanceID,
EventType: &protos.HistoryEvent_TaskCompleted{
TaskCompleted: &protos.TaskCompletedEvent{
TaskScheduledId: e.EventId,
TaskExecutionId: ts.GetTaskExecutionId(),
Result: rawResult,
},
},
Expand Down
Loading