Skip to content

Commit 78b3560

Browse files
committed
Create vanity workflow client
Create a `workflow` vanity package to rename to more idiomatic naming. Deprecates ID ReusePolicy. Signed-off-by: joshvanl <[email protected]>
1 parent 2a27b62 commit 78b3560

File tree

11 files changed

+518
-18
lines changed

11 files changed

+518
-18
lines changed

api/orchestration.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ func WithInstanceID(id InstanceID) NewOrchestrationOptions {
8080
}
8181

8282
// WithOrchestrationIdReusePolicy configures Orchestration ID reuse policy.
83+
// Deprecated.
8384
func WithOrchestrationIdReusePolicy(policy *protos.OrchestrationIdReusePolicy) NewOrchestrationOptions {
8485
return func(req *protos.CreateInstanceRequest) error {
8586
// initialize CreateInstanceOption

task/activity.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import (
1111
"github.com/dapr/durabletask-go/api/protos"
1212
)
1313

14-
type callActivityOption func(*callActivityOptions) error
14+
type CallActivityOption func(*callActivityOptions) error
1515

1616
type callActivityOptions struct {
1717
rawInput *wrapperspb.StringValue
@@ -59,7 +59,7 @@ func (policy *RetryPolicy) Validate() error {
5959
return nil
6060
}
6161

62-
func WithActivityAppID(targetAppID string) callActivityOption {
62+
func WithActivityAppID(targetAppID string) CallActivityOption {
6363
return func(opt *callActivityOptions) error {
6464
opt.targetAppID = &targetAppID
6565
return nil
@@ -68,7 +68,7 @@ func WithActivityAppID(targetAppID string) callActivityOption {
6868

6969
// WithActivityInput configures an input for an activity invocation.
7070
// The specified input must be JSON serializable.
71-
func WithActivityInput(input any) callActivityOption {
71+
func WithActivityInput(input any) CallActivityOption {
7272
return func(opt *callActivityOptions) error {
7373
data, err := marshalData(input)
7474
if err != nil {
@@ -80,14 +80,14 @@ func WithActivityInput(input any) callActivityOption {
8080
}
8181

8282
// WithRawActivityInput configures a raw input for an activity invocation.
83-
func WithRawActivityInput(input *wrapperspb.StringValue) callActivityOption {
83+
func WithRawActivityInput(input *wrapperspb.StringValue) CallActivityOption {
8484
return func(opt *callActivityOptions) error {
8585
opt.rawInput = input
8686
return nil
8787
}
8888
}
8989

90-
func WithActivityRetryPolicy(policy *RetryPolicy) callActivityOption {
90+
func WithActivityRetryPolicy(policy *RetryPolicy) CallActivityOption {
9191
return func(opt *callActivityOptions) error {
9292
if policy == nil {
9393
return nil

task/orchestrator.go

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -58,14 +58,14 @@ type callSubOrchestratorOptions struct {
5858
retryPolicy *RetryPolicy
5959
}
6060

61-
// subOrchestratorOption is a functional option type for the CallSubOrchestrator orchestrator method.
62-
type subOrchestratorOption func(*callSubOrchestratorOptions) error
61+
// SubOrchestratorOption is a functional option type for the CallSubOrchestrator orchestrator method.
62+
type SubOrchestratorOption func(*callSubOrchestratorOptions) error
6363

6464
// ContinueAsNewOption is a functional option type for the ContinueAsNew orchestrator method.
6565
type ContinueAsNewOption func(*OrchestrationContext)
6666

6767
// WithSubOrchestratorAppID is a functional option type for the CallSubOrchestrator orchestrator method that specifies the app ID of the target activity.
68-
func WithSubOrchestratorAppID(appID string) subOrchestratorOption {
68+
func WithSubOrchestratorAppID(appID string) SubOrchestratorOption {
6969
return func(opts *callSubOrchestratorOptions) error {
7070
opts.targetAppID = &appID
7171
return nil
@@ -82,7 +82,7 @@ func WithKeepUnprocessedEvents() ContinueAsNewOption {
8282

8383
// WithSubOrchestratorInput is a functional option type for the CallSubOrchestrator
8484
// orchestrator method that takes an input value and marshals it to JSON.
85-
func WithSubOrchestratorInput(input any) subOrchestratorOption {
85+
func WithSubOrchestratorInput(input any) SubOrchestratorOption {
8686
return func(opts *callSubOrchestratorOptions) error {
8787
bytes, err := marshalData(input)
8888
if err != nil {
@@ -95,7 +95,7 @@ func WithSubOrchestratorInput(input any) subOrchestratorOption {
9595

9696
// WithRawSubOrchestratorInput is a functional option type for the CallSubOrchestrator
9797
// orchestrator method that takes a raw input value.
98-
func WithRawSubOrchestratorInput(input *wrapperspb.StringValue) subOrchestratorOption {
98+
func WithRawSubOrchestratorInput(input *wrapperspb.StringValue) SubOrchestratorOption {
9999
return func(opts *callSubOrchestratorOptions) error {
100100
opts.rawInput = input
101101
return nil
@@ -104,14 +104,14 @@ func WithRawSubOrchestratorInput(input *wrapperspb.StringValue) subOrchestratorO
104104

105105
// WithSubOrchestrationInstanceID is a functional option type for the CallSubOrchestrator
106106
// orchestrator method that specifies the instance ID of the sub-orchestration.
107-
func WithSubOrchestrationInstanceID(instanceID string) subOrchestratorOption {
107+
func WithSubOrchestrationInstanceID(instanceID string) SubOrchestratorOption {
108108
return func(opts *callSubOrchestratorOptions) error {
109109
opts.instanceID = instanceID
110110
return nil
111111
}
112112
}
113113

114-
func WithSubOrchestrationRetryPolicy(policy *RetryPolicy) subOrchestratorOption {
114+
func WithSubOrchestrationRetryPolicy(policy *RetryPolicy) SubOrchestratorOption {
115115
return func(opt *callSubOrchestratorOptions) error {
116116
if policy == nil {
117117
return nil
@@ -267,7 +267,7 @@ func (octx *OrchestrationContext) GetInput(v any) error {
267267
// CallActivity schedules an asynchronous invocation of an activity function. The [activity]
268268
// parameter can be either the name of an activity as a string or can be a pointer to the function
269269
// that implements the activity, in which case the name is obtained via reflection.
270-
func (ctx *OrchestrationContext) CallActivity(activity interface{}, opts ...callActivityOption) Task {
270+
func (ctx *OrchestrationContext) CallActivity(activity interface{}, opts ...CallActivityOption) Task {
271271
options := new(callActivityOptions)
272272
for _, configure := range opts {
273273
if err := configure(options); err != nil {
@@ -317,8 +317,7 @@ func (ctx *OrchestrationContext) internalScheduleActivity(activityName, taskExec
317317
return task
318318
}
319319

320-
// TODO: cassie wire appID into suborchestration options too for cross app wf
321-
func (ctx *OrchestrationContext) CallSubOrchestrator(orchestrator interface{}, opts ...subOrchestratorOption) Task {
320+
func (ctx *OrchestrationContext) CallSubOrchestrator(orchestrator interface{}, opts ...SubOrchestratorOption) Task {
322321
options := new(callSubOrchestratorOptions)
323322
for _, configure := range opts {
324323
if err := configure(options); err != nil {
@@ -421,7 +420,7 @@ func computeNextDelay(currentTimeUtc time.Time, policy RetryPolicy, attempt int,
421420
}
422421

423422
// CreateTimer schedules a durable timer that expires after the specified delay.
424-
func (ctx *OrchestrationContext) CreateTimer(delay time.Duration, opts ...createTimerOption) Task {
423+
func (ctx *OrchestrationContext) CreateTimer(delay time.Duration, opts ...CreateTimerOption) Task {
425424
options := new(createTimerOptions)
426425
for _, configure := range opts {
427426
if err := configure(options); err != nil {

task/timer.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
package task
22

3-
type createTimerOption func(*createTimerOptions) error
3+
type CreateTimerOption func(*createTimerOptions) error
44

55
type createTimerOptions struct {
66
name *string
77
}
88

9-
func WithTimerName(name string) createTimerOption {
9+
func WithTimerName(name string) CreateTimerOption {
1010
return func(opt *createTimerOptions) error {
1111
opt.name = &name
1212
return nil

workflow/activity.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package workflow
2+
3+
import (
4+
"time"
5+
6+
"github.com/dapr/durabletask-go/task"
7+
"google.golang.org/protobuf/types/known/wrapperspb"
8+
)
9+
10+
type CallActivityOption task.CallActivityOption
11+
12+
type RetryPolicy struct {
13+
// Max number of attempts to try the activity call, first execution inclusive
14+
MaxAttempts int
15+
// Timespan to wait for the first retry
16+
InitialRetryInterval time.Duration
17+
// Used to determine rate of increase of back-off
18+
BackoffCoefficient float64
19+
// Max timespan to wait for a retry
20+
MaxRetryInterval time.Duration
21+
// Total timeout across all the retries performed
22+
RetryTimeout time.Duration
23+
// Optional function to control if retries should proceed
24+
Handle func(error) bool
25+
}
26+
27+
func WithActivityAppID(targetAppID string) CallActivityOption {
28+
return CallActivityOption(task.WithActivityAppID(targetAppID))
29+
}
30+
31+
// WithActivityInput configures an input for an activity invocation. The
32+
// specified input must be JSON serializable.
33+
func WithActivityInput(input any) CallActivityOption {
34+
return CallActivityOption(task.WithActivityInput(input))
35+
}
36+
37+
// WithRawActivityInput configures a raw input for an activity invocation.
38+
func WithRawActivityInput(input *wrapperspb.StringValue) CallActivityOption {
39+
return CallActivityOption(task.WithRawActivityInput(input))
40+
}
41+
42+
func WithActivityRetryPolicy(policy *RetryPolicy) CallActivityOption {
43+
return CallActivityOption(task.WithActivityRetryPolicy((*task.RetryPolicy)(policy)))
44+
}
45+
46+
// ActivityContext is the context parameter type for activity implementations.
47+
type ActivityContext task.ActivityContext
48+
49+
// Activity is the functional interface for activity implementations.
50+
type Activity func(ActivityContext) (any, error)

workflow/api.go

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
package workflow
2+
3+
import (
4+
"time"
5+
6+
"github.com/dapr/durabletask-go/api"
7+
"github.com/dapr/durabletask-go/api/protos"
8+
"github.com/dapr/kit/ptr"
9+
"google.golang.org/protobuf/types/known/wrapperspb"
10+
)
11+
12+
type NewWorkflowOptions api.NewOrchestrationOptions
13+
type FetchWorkflowMetadataOptions api.FetchOrchestrationMetadataOptions
14+
type RaiseEventOptions api.RaiseEventOptions
15+
type TerminateOptions api.TerminateOptions
16+
type PurgeOptions api.PurgeOptions
17+
type RerunOptions api.RerunOptions
18+
19+
type WorkflowMetadata protos.OrchestrationMetadata
20+
21+
// WithInstanceID configures an explicit workflow instance ID. If not
22+
// specified, a random UUID value will be used for the workflow instance ID.
23+
func WithInstanceID(id string) NewWorkflowOptions {
24+
return NewWorkflowOptions(api.WithInstanceID(api.InstanceID(id)))
25+
}
26+
27+
// WithInput configures an input for the workflow. The specified input must be
28+
// serializable.
29+
func WithInput(input any) NewWorkflowOptions {
30+
return NewWorkflowOptions(api.WithInput(input))
31+
}
32+
33+
// WithRawInput configures an input for the workflow. The specified input must
34+
// be a string.
35+
func WithRawInput(rawInput *wrapperspb.StringValue) NewWorkflowOptions {
36+
return NewWorkflowOptions(api.WithRawInput(rawInput))
37+
}
38+
39+
// WithStartTime configures a start time at which the workflow should start
40+
// running. Note that the actual start time could be later than the specified
41+
// start time if the task hub is under load or if the app is not running at the
42+
// specified start time.
43+
func WithStartTime(startTime time.Time) NewWorkflowOptions {
44+
return NewWorkflowOptions(api.WithStartTime(startTime))
45+
}
46+
47+
// WithFetchPayloads configures whether to load workflow inputs, outputs, and
48+
// custom status values, which could be large.
49+
func WithFetchPayloads(fetchPayloads bool) FetchWorkflowMetadataOptions {
50+
return FetchWorkflowMetadataOptions(api.WithFetchPayloads(fetchPayloads))
51+
}
52+
53+
// WithEventPayload configures an event payload. The specified payload must be
54+
// serializable.
55+
func WithEventPayload(data any) RaiseEventOptions {
56+
return RaiseEventOptions(api.WithEventPayload(data))
57+
}
58+
59+
// WithRawEventData configures an event payload that is a raw, unprocessed
60+
// string (e.g. JSON data).
61+
func WithRawEventData(data *wrapperspb.StringValue) RaiseEventOptions {
62+
return RaiseEventOptions(api.WithRawEventData(data))
63+
}
64+
65+
// WithOutput configures an output for the terminated workflow. The specified
66+
// output must be serializable.
67+
func WithOutput(data any) TerminateOptions {
68+
return TerminateOptions(api.WithOutput(data))
69+
}
70+
71+
// WithRawOutput configures a raw, unprocessed output (i.e. pre-serialized) for
72+
// the terminated workflow.
73+
func WithRawOutput(data *wrapperspb.StringValue) TerminateOptions {
74+
return TerminateOptions(api.WithRawOutput(data))
75+
}
76+
77+
// WithRecursiveTerminate configures whether to terminate all child-workflows
78+
// created by the target workflow.
79+
func WithRecursiveTerminate(recursive bool) TerminateOptions {
80+
return TerminateOptions(api.WithRecursiveTerminate(recursive))
81+
}
82+
83+
// WithRecursivePurge configures whether to purge all child-workflows created
84+
// by the target workflow.
85+
func WithRecursivePurge(recursive bool) PurgeOptions {
86+
return PurgeOptions(api.WithRecursivePurge(recursive))
87+
}
88+
89+
func WorkflowMetadataIsRunning(o *WorkflowMetadata) bool {
90+
return api.OrchestrationMetadataIsComplete(ptr.Of(protos.OrchestrationMetadata(*o)))
91+
}
92+
93+
func WorkflowMetadataIsComplete(o *WorkflowMetadata) bool {
94+
return api.OrchestrationMetadataIsComplete(ptr.Of(protos.OrchestrationMetadata(*o)))
95+
}
96+
97+
func WithRerunInput(input any) RerunOptions {
98+
return RerunOptions(api.WithRerunInput(input))
99+
}
100+
101+
func WithRerunNewInstanceID(id string) RerunOptions {
102+
return RerunOptions(api.WithRerunNewInstanceID(api.InstanceID(id)))
103+
}

0 commit comments

Comments
 (0)