Skip to content

Commit 9564839

Browse files
authored
Merge pull request #48 from JoshVanL/workflow-vanity-client
Create vanity `workflow` client
2 parents 2a27b62 + 411f994 commit 9564839

File tree

12 files changed

+571
-18
lines changed

12 files changed

+571
-18
lines changed

api/orchestration.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ func WithInstanceID(id InstanceID) NewOrchestrationOptions {
8080
}
8181

8282
// WithOrchestrationIdReusePolicy configures Orchestration ID reuse policy.
83+
// Deprecated.
8384
func WithOrchestrationIdReusePolicy(policy *protos.OrchestrationIdReusePolicy) NewOrchestrationOptions {
8485
return func(req *protos.CreateInstanceRequest) error {
8586
// initialize CreateInstanceOption

task/activity.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import (
1111
"github.com/dapr/durabletask-go/api/protos"
1212
)
1313

14-
type callActivityOption func(*callActivityOptions) error
14+
type CallActivityOption func(*callActivityOptions) error
1515

1616
type callActivityOptions struct {
1717
rawInput *wrapperspb.StringValue
@@ -59,7 +59,7 @@ func (policy *RetryPolicy) Validate() error {
5959
return nil
6060
}
6161

62-
func WithActivityAppID(targetAppID string) callActivityOption {
62+
func WithActivityAppID(targetAppID string) CallActivityOption {
6363
return func(opt *callActivityOptions) error {
6464
opt.targetAppID = &targetAppID
6565
return nil
@@ -68,7 +68,7 @@ func WithActivityAppID(targetAppID string) callActivityOption {
6868

6969
// WithActivityInput configures an input for an activity invocation.
7070
// The specified input must be JSON serializable.
71-
func WithActivityInput(input any) callActivityOption {
71+
func WithActivityInput(input any) CallActivityOption {
7272
return func(opt *callActivityOptions) error {
7373
data, err := marshalData(input)
7474
if err != nil {
@@ -80,14 +80,14 @@ func WithActivityInput(input any) callActivityOption {
8080
}
8181

8282
// WithRawActivityInput configures a raw input for an activity invocation.
83-
func WithRawActivityInput(input *wrapperspb.StringValue) callActivityOption {
83+
func WithRawActivityInput(input *wrapperspb.StringValue) CallActivityOption {
8484
return func(opt *callActivityOptions) error {
8585
opt.rawInput = input
8686
return nil
8787
}
8888
}
8989

90-
func WithActivityRetryPolicy(policy *RetryPolicy) callActivityOption {
90+
func WithActivityRetryPolicy(policy *RetryPolicy) CallActivityOption {
9191
return func(opt *callActivityOptions) error {
9292
if policy == nil {
9393
return nil

task/orchestrator.go

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -58,14 +58,14 @@ type callSubOrchestratorOptions struct {
5858
retryPolicy *RetryPolicy
5959
}
6060

61-
// subOrchestratorOption is a functional option type for the CallSubOrchestrator orchestrator method.
62-
type subOrchestratorOption func(*callSubOrchestratorOptions) error
61+
// SubOrchestratorOption is a functional option type for the CallSubOrchestrator orchestrator method.
62+
type SubOrchestratorOption func(*callSubOrchestratorOptions) error
6363

6464
// ContinueAsNewOption is a functional option type for the ContinueAsNew orchestrator method.
6565
type ContinueAsNewOption func(*OrchestrationContext)
6666

6767
// WithSubOrchestratorAppID is a functional option type for the CallSubOrchestrator orchestrator method that specifies the app ID of the target activity.
68-
func WithSubOrchestratorAppID(appID string) subOrchestratorOption {
68+
func WithSubOrchestratorAppID(appID string) SubOrchestratorOption {
6969
return func(opts *callSubOrchestratorOptions) error {
7070
opts.targetAppID = &appID
7171
return nil
@@ -82,7 +82,7 @@ func WithKeepUnprocessedEvents() ContinueAsNewOption {
8282

8383
// WithSubOrchestratorInput is a functional option type for the CallSubOrchestrator
8484
// orchestrator method that takes an input value and marshals it to JSON.
85-
func WithSubOrchestratorInput(input any) subOrchestratorOption {
85+
func WithSubOrchestratorInput(input any) SubOrchestratorOption {
8686
return func(opts *callSubOrchestratorOptions) error {
8787
bytes, err := marshalData(input)
8888
if err != nil {
@@ -95,7 +95,7 @@ func WithSubOrchestratorInput(input any) subOrchestratorOption {
9595

9696
// WithRawSubOrchestratorInput is a functional option type for the CallSubOrchestrator
9797
// orchestrator method that takes a raw input value.
98-
func WithRawSubOrchestratorInput(input *wrapperspb.StringValue) subOrchestratorOption {
98+
func WithRawSubOrchestratorInput(input *wrapperspb.StringValue) SubOrchestratorOption {
9999
return func(opts *callSubOrchestratorOptions) error {
100100
opts.rawInput = input
101101
return nil
@@ -104,14 +104,14 @@ func WithRawSubOrchestratorInput(input *wrapperspb.StringValue) subOrchestratorO
104104

105105
// WithSubOrchestrationInstanceID is a functional option type for the CallSubOrchestrator
106106
// orchestrator method that specifies the instance ID of the sub-orchestration.
107-
func WithSubOrchestrationInstanceID(instanceID string) subOrchestratorOption {
107+
func WithSubOrchestrationInstanceID(instanceID string) SubOrchestratorOption {
108108
return func(opts *callSubOrchestratorOptions) error {
109109
opts.instanceID = instanceID
110110
return nil
111111
}
112112
}
113113

114-
func WithSubOrchestrationRetryPolicy(policy *RetryPolicy) subOrchestratorOption {
114+
func WithSubOrchestrationRetryPolicy(policy *RetryPolicy) SubOrchestratorOption {
115115
return func(opt *callSubOrchestratorOptions) error {
116116
if policy == nil {
117117
return nil
@@ -267,7 +267,7 @@ func (octx *OrchestrationContext) GetInput(v any) error {
267267
// CallActivity schedules an asynchronous invocation of an activity function. The [activity]
268268
// parameter can be either the name of an activity as a string or can be a pointer to the function
269269
// that implements the activity, in which case the name is obtained via reflection.
270-
func (ctx *OrchestrationContext) CallActivity(activity interface{}, opts ...callActivityOption) Task {
270+
func (ctx *OrchestrationContext) CallActivity(activity interface{}, opts ...CallActivityOption) Task {
271271
options := new(callActivityOptions)
272272
for _, configure := range opts {
273273
if err := configure(options); err != nil {
@@ -317,8 +317,7 @@ func (ctx *OrchestrationContext) internalScheduleActivity(activityName, taskExec
317317
return task
318318
}
319319

320-
// TODO: cassie wire appID into suborchestration options too for cross app wf
321-
func (ctx *OrchestrationContext) CallSubOrchestrator(orchestrator interface{}, opts ...subOrchestratorOption) Task {
320+
func (ctx *OrchestrationContext) CallSubOrchestrator(orchestrator interface{}, opts ...SubOrchestratorOption) Task {
322321
options := new(callSubOrchestratorOptions)
323322
for _, configure := range opts {
324323
if err := configure(options); err != nil {
@@ -421,7 +420,7 @@ func computeNextDelay(currentTimeUtc time.Time, policy RetryPolicy, attempt int,
421420
}
422421

423422
// CreateTimer schedules a durable timer that expires after the specified delay.
424-
func (ctx *OrchestrationContext) CreateTimer(delay time.Duration, opts ...createTimerOption) Task {
423+
func (ctx *OrchestrationContext) CreateTimer(delay time.Duration, opts ...CreateTimerOption) Task {
425424
options := new(createTimerOptions)
426425
for _, configure := range opts {
427426
if err := configure(options); err != nil {

task/timer.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
package task
22

3-
type createTimerOption func(*createTimerOptions) error
3+
type CreateTimerOption func(*createTimerOptions) error
44

55
type createTimerOptions struct {
66
name *string
77
}
88

9-
func WithTimerName(name string) createTimerOption {
9+
func WithTimerName(name string) CreateTimerOption {
1010
return func(opt *createTimerOptions) error {
1111
opt.name = &name
1212
return nil

workflow/activity.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package workflow
2+
3+
import (
4+
"time"
5+
6+
"github.com/dapr/durabletask-go/task"
7+
"google.golang.org/protobuf/types/known/wrapperspb"
8+
)
9+
10+
type CallActivityOption task.CallActivityOption
11+
12+
type RetryPolicy struct {
13+
// Max number of attempts to try the activity call, first execution inclusive
14+
MaxAttempts int
15+
// Timespan to wait for the first retry
16+
InitialRetryInterval time.Duration
17+
// Used to determine rate of increase of back-off
18+
BackoffCoefficient float64
19+
// Max timespan to wait for a retry
20+
MaxRetryInterval time.Duration
21+
// Total timeout across all the retries performed
22+
RetryTimeout time.Duration
23+
// Optional function to control if retries should proceed
24+
Handle func(error) bool
25+
}
26+
27+
func WithActivityAppID(targetAppID string) CallActivityOption {
28+
return CallActivityOption(task.WithActivityAppID(targetAppID))
29+
}
30+
31+
// WithActivityInput configures an input for an activity invocation. The
32+
// specified input must be JSON serializable.
33+
func WithActivityInput(input any) CallActivityOption {
34+
return CallActivityOption(task.WithActivityInput(input))
35+
}
36+
37+
// WithRawActivityInput configures a raw input for an activity invocation.
38+
func WithRawActivityInput(input *wrapperspb.StringValue) CallActivityOption {
39+
return CallActivityOption(task.WithRawActivityInput(input))
40+
}
41+
42+
func WithActivityRetryPolicy(policy *RetryPolicy) CallActivityOption {
43+
return CallActivityOption(task.WithActivityRetryPolicy((*task.RetryPolicy)(policy)))
44+
}
45+
46+
// ActivityContext is the context parameter type for activity implementations.
47+
type ActivityContext task.ActivityContext
48+
49+
// Activity is the functional interface for activity implementations.
50+
type Activity func(ActivityContext) (any, error)

workflow/api.go

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
package workflow
2+
3+
import (
4+
"time"
5+
6+
"github.com/dapr/durabletask-go/api"
7+
"github.com/dapr/durabletask-go/api/protos"
8+
"github.com/dapr/kit/ptr"
9+
"google.golang.org/protobuf/types/known/wrapperspb"
10+
)
11+
12+
type NewWorkflowOptions api.NewOrchestrationOptions
13+
type FetchWorkflowMetadataOptions api.FetchOrchestrationMetadataOptions
14+
type RaiseEventOptions api.RaiseEventOptions
15+
type TerminateOptions api.TerminateOptions
16+
type PurgeOptions api.PurgeOptions
17+
type RerunOptions api.RerunOptions
18+
19+
// WithInstanceID configures an explicit workflow instance ID. If not
20+
// specified, a random UUID value will be used for the workflow instance ID.
21+
func WithInstanceID(id string) NewWorkflowOptions {
22+
return NewWorkflowOptions(api.WithInstanceID(api.InstanceID(id)))
23+
}
24+
25+
// WithInput configures an input for the workflow. The specified input must be
26+
// serializable.
27+
func WithInput(input any) NewWorkflowOptions {
28+
return NewWorkflowOptions(api.WithInput(input))
29+
}
30+
31+
// WithRawInput configures an input for the workflow. The specified input must
32+
// be a string.
33+
func WithRawInput(rawInput *wrapperspb.StringValue) NewWorkflowOptions {
34+
return NewWorkflowOptions(api.WithRawInput(rawInput))
35+
}
36+
37+
// WithStartTime configures a start time at which the workflow should start
38+
// running. Note that the actual start time could be later than the specified
39+
// start time if the task hub is under load or if the app is not running at the
40+
// specified start time.
41+
func WithStartTime(startTime time.Time) NewWorkflowOptions {
42+
return NewWorkflowOptions(api.WithStartTime(startTime))
43+
}
44+
45+
// WithFetchPayloads configures whether to load workflow inputs, outputs, and
46+
// custom status values, which could be large.
47+
func WithFetchPayloads(fetchPayloads bool) FetchWorkflowMetadataOptions {
48+
return FetchWorkflowMetadataOptions(api.WithFetchPayloads(fetchPayloads))
49+
}
50+
51+
// WithEventPayload configures an event payload. The specified payload must be
52+
// serializable.
53+
func WithEventPayload(data any) RaiseEventOptions {
54+
return RaiseEventOptions(api.WithEventPayload(data))
55+
}
56+
57+
// WithRawEventData configures an event payload that is a raw, unprocessed
58+
// string (e.g. JSON data).
59+
func WithRawEventData(data *wrapperspb.StringValue) RaiseEventOptions {
60+
return RaiseEventOptions(api.WithRawEventData(data))
61+
}
62+
63+
// WithOutput configures an output for the terminated workflow. The specified
64+
// output must be serializable.
65+
func WithOutput(data any) TerminateOptions {
66+
return TerminateOptions(api.WithOutput(data))
67+
}
68+
69+
// WithRawOutput configures a raw, unprocessed output (i.e. pre-serialized) for
70+
// the terminated workflow.
71+
func WithRawOutput(data *wrapperspb.StringValue) TerminateOptions {
72+
return TerminateOptions(api.WithRawOutput(data))
73+
}
74+
75+
// WithRecursiveTerminate configures whether to terminate all child-workflows
76+
// created by the target workflow.
77+
func WithRecursiveTerminate(recursive bool) TerminateOptions {
78+
return TerminateOptions(api.WithRecursiveTerminate(recursive))
79+
}
80+
81+
// WithRecursivePurge configures whether to purge all child-workflows created
82+
// by the target workflow.
83+
func WithRecursivePurge(recursive bool) PurgeOptions {
84+
return PurgeOptions(api.WithRecursivePurge(recursive))
85+
}
86+
87+
func WorkflowMetadataIsRunning(o *WorkflowMetadata) bool {
88+
return api.OrchestrationMetadataIsComplete(ptr.Of(protos.OrchestrationMetadata(*o)))
89+
}
90+
91+
func WorkflowMetadataIsComplete(o *WorkflowMetadata) bool {
92+
return api.OrchestrationMetadataIsComplete(ptr.Of(protos.OrchestrationMetadata(*o)))
93+
}
94+
95+
func WithRerunInput(input any) RerunOptions {
96+
return RerunOptions(api.WithRerunInput(input))
97+
}
98+
99+
func WithRerunNewInstanceID(id string) RerunOptions {
100+
return RerunOptions(api.WithRerunNewInstanceID(api.InstanceID(id)))
101+
}

0 commit comments

Comments
 (0)