Skip to content

Commit 9d150a0

Browse files
authored
sync thrift files, add RunWorkflow API which starts a workflow and ea… (#328)
* sync thrift files, add ExecuteWorkflow API which starts a workflow and wait until the workflow finishes, return the result, similar to ExecuteChildWorkflow. * change the signature of GetWorkflowHistory from returning whole history to history event iterator * expose WorkflowIDReusePolicy along with values
1 parent ac74a77 commit 9d150a0

File tree

14 files changed

+1837
-90
lines changed

14 files changed

+1837
-90
lines changed

.gen/go/shared/idl.go

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.gen/go/shared/types.go

Lines changed: 886 additions & 30 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

client/client.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,29 @@ type (
4646
// DomainClient is the client for managing operations on the domain.
4747
// CLI, tools, ... can use this layer to manager operations on domain.
4848
DomainClient = internal.DomainClient
49+
50+
// HistoryEventIterator is a iterator which can return history events
51+
HistoryEventIterator = internal.HistoryEventIterator
52+
53+
// WorkflowRun represents a started non child workflow
54+
WorkflowRun = internal.WorkflowRun
55+
56+
// WorkflowIDReusePolicy defines workflow ID reuse behavior.
57+
WorkflowIDReusePolicy = internal.WorkflowIDReusePolicy
58+
)
59+
60+
const (
61+
// WorkflowIDReusePolicyAllowDuplicateFailedOnly allow start a workflow execution
62+
// when workflow not running, and the last execution close state is in
63+
// [terminated, cancelled, timeouted, failed].
64+
WorkflowIDReusePolicyAllowDuplicateFailedOnly WorkflowIDReusePolicy = internal.WorkflowIDReusePolicyAllowDuplicateFailedOnly
65+
66+
// WorkflowIDReusePolicyAllowDuplicate allow start a workflow execution using
67+
// the same workflow ID,when workflow not running.
68+
WorkflowIDReusePolicyAllowDuplicate WorkflowIDReusePolicy = internal.WorkflowIDReusePolicyAllowDuplicate
69+
70+
// WorkflowIDReusePolicyRejectDuplicate do not allow start a workflow execution using the same workflow ID at all
71+
WorkflowIDReusePolicyRejectDuplicate WorkflowIDReusePolicy = internal.WorkflowIDReusePolicyRejectDuplicate
4972
)
5073

5174
// NewClient creates an instance of a workflow client

idl/github.com/uber/cadence/shared.thrift

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,24 @@ exception QueryFailedError {
5454
1: required string message
5555
}
5656

57+
enum WorkflowIdReusePolicy {
58+
/*
59+
* allow start a workflow execution using the same workflow ID,
60+
* when workflow not running, and the last execution close state is in
61+
* [terminated, cancelled, timeouted, failed].
62+
*/
63+
AllowDuplicateFailedOnly,
64+
/*
65+
* allow start a workflow execution using the same workflow ID,
66+
* when workflow not running.
67+
*/
68+
AllowDuplicate,
69+
/*
70+
* do not allow start a workflow execution using the same workflow ID at all
71+
*/
72+
RejectDuplicate,
73+
}
74+
5775
enum DomainStatus {
5876
REGISTERED,
5977
DEPRECATED,
@@ -167,6 +185,17 @@ enum QueryTaskCompletedType {
167185
FAILED,
168186
}
169187

188+
enum PendingActivityState {
189+
SCHEDULED,
190+
STARTED,
191+
CANCEL_REQUESTED,
192+
}
193+
194+
enum HistoryEventFilterType {
195+
ALL_EVENT,
196+
CLOSE_EVENT,
197+
}
198+
170199
struct WorkflowType {
171200
10: optional string name
172201
}
@@ -693,6 +722,7 @@ struct StartWorkflowExecutionRequest {
693722
70: optional i32 taskStartToCloseTimeoutSeconds
694723
80: optional string identity
695724
90: optional string requestId
725+
100: optional WorkflowIdReusePolicy workflowIdReusePolicy
696726
}
697727

698728
struct StartWorkflowExecutionResponse {
@@ -828,6 +858,7 @@ struct GetWorkflowExecutionHistoryRequest {
828858
30: optional i32 maximumPageSize
829859
40: optional binary nextPageToken
830860
50: optional bool waitForNewEvent
861+
60: optional HistoryEventFilterType HistoryEventFilterType
831862
}
832863

833864
struct GetWorkflowExecutionHistoryResponse {
@@ -907,7 +938,16 @@ struct DescribeWorkflowExecutionRequest {
907938
20: optional WorkflowExecution execution
908939
}
909940

941+
struct PendingActivityInfo {
942+
10: optional string activityID
943+
20: optional ActivityType activityType
944+
30: optional PendingActivityState state
945+
40: optional binary heartbeatDetails
946+
50: optional i64 (js.type = "Long") lastHeartbeatTimestamp
947+
}
948+
910949
struct DescribeWorkflowExecutionResponse {
911950
10: optional WorkflowExecutionConfiguration executionConfiguration
912951
20: optional WorkflowExecutionInfo workflowExecutionInfo
952+
30: optional list<PendingActivityInfo> pendingActivities
913953
}

internal/client.go

Lines changed: 85 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ package internal
2222

2323
import (
2424
"context"
25+
"fmt"
2526
"time"
2627

2728
"go.uber.org/cadence/encoded"
@@ -43,15 +44,42 @@ type (
4344
// StartWorkflow starts a workflow execution
4445
// The user can use this to start using a function or workflow type name.
4546
// Either by
46-
// StartWorkflow(ctx, options, "workflowTypeName", input)
47+
// StartWorkflow(ctx, options, "workflowTypeName", arg1, arg2, arg3)
4748
// or
4849
// StartWorkflow(ctx, options, workflowExecuteFn, arg1, arg2, arg3)
4950
// The errors it can return:
50-
// - EntityNotExistsError
51+
// - EntityNotExistsError, if domain does not exists
5152
// - BadRequestError
5253
// - WorkflowExecutionAlreadyStartedError
54+
// - InternalServiceError
5355
StartWorkflow(ctx context.Context, options StartWorkflowOptions, workflow interface{}, args ...interface{}) (*WorkflowExecution, error)
5456

57+
// ExecuteWorkflow starts a workflow execution and return a WorkflowRun instance and error
58+
// The user can use this to start using a function or workflow type name.
59+
// Either by
60+
// ExecuteWorkflow(ctx, options, "workflowTypeName", arg1, arg2, arg3)
61+
// or
62+
// ExecuteWorkflow(ctx, options, workflowExecuteFn, arg1, arg2, arg3)
63+
// The errors it can return:
64+
// - EntityNotExistsError, if domain does not exists
65+
// - BadRequestError
66+
// - WorkflowExecutionAlreadyStartedError
67+
// - InternalServiceError
68+
//
69+
// WorkflowRun has 2 methods:
70+
// - GetRunID() string: which return the first started workflow run ID (please see below)
71+
// - Get(ctx context.Context, valuePtr interface{}) error: which will fill the workflow
72+
// execution result to valuePtr, if workflow execution is a success, or return corresponding
73+
// error. This is a blocking API.
74+
// NOTE: if the started workflow return ContinueAsNewError during the workflow execution, the
75+
// return result of GetRunID() will be the started workflow run ID, not the new run ID caused by ContinueAsNewError,
76+
// however, Get(ctx context.Context, valuePtr interface{}) will return result from the run which did not return ContinueAsNewError.
77+
// Say ExecuteWorkflow started a workflow, in its first run, has run ID "run ID 1", and returned ContinueAsNewError,
78+
// the second run has run ID "run ID 2" and return some result other than ContinueAsNewError:
79+
// GetRunID() will always return "run ID 1" and Get(ctx context.Context, valuePtr interface{}) will return the result of second run.
80+
// NOTE: DO NOT USE THIS API INSIDE A WORKFLOW, USE workflow.ExecuteChildWorkflow instead
81+
ExecuteWorkflow(ctx context.Context, options StartWorkflowOptions, workflow interface{}, args ...interface{}) (WorkflowRun, error)
82+
5583
// SignalWorkflow sends a signals to a workflow in execution
5684
// - workflow ID of the workflow.
5785
// - runID can be default(empty string). if empty string then it will pick the running execution of that workflow ID.
@@ -80,14 +108,25 @@ type (
80108
// - InternalServiceError
81109
TerminateWorkflow(ctx context.Context, workflowID string, runID string, reason string, details []byte) error
82110

83-
// GetWorkflowHistory gets history of a particular workflow.
111+
// GetWorkflowHistory gets history events of a particular workflow
84112
// - workflow ID of the workflow.
85-
// - runID can be default(empty string). if empty string then it will pick the running execution of that workflow ID.
86-
// The errors it can return:
87-
// - EntityNotExistsError
88-
// - BadRequestError
89-
// - InternalServiceError
90-
GetWorkflowHistory(ctx context.Context, workflowID string, runID string) (*s.History, error)
113+
// - runID can be default(empty string). if empty string then it will pick the last running execution of that workflow ID.
114+
// - whether use long poll for tracking new events: when the workflow is running, there can be new events generated during iteration
115+
// of HistoryEventIterator, if isLongPoll == true, then iterator will do long poll, tracking new history event, i.e. the iteration
116+
// will not be finished until workflow is finished; if isLongPoll == false, then iterator will only return current history events.
117+
// - whether return all history events or just the last event, which contains the workflow execution end result
118+
// Example:-
119+
// To iterate all events,
120+
// iter := GetWorkflowHistory(ctx, workflowID, runID, isLongPoll, filterType)
121+
// events := []*shared.HistoryEvent{}
122+
// for iter.HasNext() {
123+
// event, err := iter.Next()
124+
// if err != nil {
125+
// return err
126+
// }
127+
// events = append(events, event)
128+
// }
129+
GetWorkflowHistory(ctx context.Context, workflowID string, runID string, isLongPoll bool, filterType s.HistoryEventFilterType) HistoryEventIterator
91130

92131
// CompleteActivity reports activity completed.
93132
// activity Execute method can return acitivity.activity.ErrResultPending to
@@ -147,7 +186,7 @@ type (
147186
// run. By default, cadence supports "__stack_trace" as a standard query type, which will return string value
148187
// representing the call stack of the target workflow. The target workflow could also setup different query handler
149188
// to handle custom query types.
150-
// See comments at cadence.SetQueryHandler(ctx Context, queryType string, handler interface{}) for more details
189+
// See comments at workflow.SetQueryHandler(ctx Context, queryType string, handler interface{}) for more details
151190
// on how to setup query handler within the target workflow.
152191
// - workflowID is required.
153192
// - runID can be default(empty string). if empty string then it will pick the running execution of that workflow ID.
@@ -196,6 +235,10 @@ type (
196235
// The resolution is seconds.
197236
// Optional: defaulted to 20 secs.
198237
DecisionTaskStartToCloseTimeout time.Duration
238+
239+
// WorkflowIDReusePolicy - Whether server allow reuse of workflow ID, can be useful
240+
// for dedup logic if set to WorkflowIdReusePolicyRejectDuplicate
241+
WorkflowIDReusePolicy WorkflowIDReusePolicy
199242
}
200243

201244
// DomainClient is the client for managing operations on the domain.
@@ -226,6 +269,23 @@ type (
226269
// - InternalServiceError
227270
Update(ctx context.Context, name string, domainInfo *s.UpdateDomainInfo, domainConfig *s.DomainConfiguration) error
228271
}
272+
273+
// WorkflowIDReusePolicy defines workflow ID reuse behavior.
274+
WorkflowIDReusePolicy int
275+
)
276+
277+
const (
278+
// WorkflowIDReusePolicyAllowDuplicateFailedOnly allow start a workflow execution
279+
// when workflow not running, and the last execution close state is in
280+
// [terminated, cancelled, timeouted, failed].
281+
WorkflowIDReusePolicyAllowDuplicateFailedOnly WorkflowIDReusePolicy = iota
282+
283+
// WorkflowIDReusePolicyAllowDuplicate allow start a workflow execution using
284+
// the same workflow ID,when workflow not running.
285+
WorkflowIDReusePolicyAllowDuplicate
286+
287+
// WorkflowIDReusePolicyRejectDuplicate do not allow start a workflow execution using the same workflow ID at all
288+
WorkflowIDReusePolicyRejectDuplicate
229289
)
230290

231291
// NewClient creates an instance of a workflow client
@@ -268,3 +328,18 @@ func NewDomainClient(service workflowserviceclient.Interface, options *ClientOpt
268328
identity: identity,
269329
}
270330
}
331+
332+
func (p WorkflowIDReusePolicy) toThriftPtr() *s.WorkflowIdReusePolicy {
333+
var policy s.WorkflowIdReusePolicy
334+
switch p {
335+
case WorkflowIDReusePolicyAllowDuplicate:
336+
policy = s.WorkflowIdReusePolicyAllowDuplicate
337+
case WorkflowIDReusePolicyAllowDuplicateFailedOnly:
338+
policy = s.WorkflowIdReusePolicyAllowDuplicateFailedOnly
339+
case WorkflowIDReusePolicyRejectDuplicate:
340+
policy = s.WorkflowIdReusePolicyRejectDuplicate
341+
default:
342+
panic(fmt.Sprintf("unknown workflow reuse policy %v", p))
343+
}
344+
return &policy
345+
}

internal/error.go

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,10 @@ Workflow code could handle errors based on different types of error. Below is sa
5555
_, err := workflow.ExecuteActivity(ctx, MyActivity, ...).Get(nil)
5656
if err != nil {
5757
switch err := err.(type) {
58-
case *CustomError:
58+
case *workflow.CustomError:
5959
// handle activity errors (created via NewCustomError() API)
6060
switch err.Reason() {
61-
case cadence.CustomErrReasonA: // assume CustomErrReasonA is constant defined by activity implementation
61+
case CustomErrReasonA: // assume CustomErrReasonA is constant defined by activity implementation
6262
var detailMsg string // assuming activity return error by NewCustomError(CustomErrReasonA, "string details")
6363
err.Details(&detailMsg) // extract strong typed details (corresponding to CustomErrReasonA)
6464
// handle CustomErrReasonA
@@ -67,13 +67,13 @@ if err != nil {
6767
default:
6868
// newer version of activity could return new errors that workflow was not aware of.
6969
}
70-
case *cadence.GenericError:
70+
case *workflow.GenericError:
7171
// handle generic error (errors created other than using NewCustomError() API)
72-
case *cadence.CanceledError:
72+
case *workflow.CanceledError:
7373
// handle cancellation
74-
case *cadence.TimeoutError:
74+
case *workflow.TimeoutError:
7575
// handle timeout, could check timeout type by err.TimeoutType()
76-
case *cadence.PanicError:
76+
case *workflow.PanicError:
7777
// handle panic
7878
}
7979
}
@@ -106,6 +106,10 @@ type (
106106
details []byte
107107
}
108108

109+
// TerminatedError returned when workflow was terminated.
110+
TerminatedError struct {
111+
}
112+
109113
// PanicError contains information about panicked workflow/activity.
110114
PanicError struct {
111115
value string
@@ -276,3 +280,13 @@ func (e *PanicError) StackTrace() string {
276280
func (e *ContinueAsNewError) Error() string {
277281
return "ContinueAsNew"
278282
}
283+
284+
// newTerminatedError creates NewTerminatedError instance
285+
func newTerminatedError() *TerminatedError {
286+
return &TerminatedError{}
287+
}
288+
289+
// Error from error interface
290+
func (e *TerminatedError) Error() string {
291+
return "Terminated"
292+
}

internal/internal_event_handlers.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -813,7 +813,7 @@ func (weh *workflowExecutionEventHandlerImpl) handleChildWorkflowExecutionTermin
813813
if childWorkflow.handled {
814814
return nil
815815
}
816-
err := errors.New("terminated")
816+
err := newTerminatedError()
817817
childWorkflow.handle(nil, err)
818818

819819
return nil

internal/internal_task_handlers.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1244,7 +1244,7 @@ func recordActivityHeartbeat(
12441244
}
12451245

12461246
// This enables verbose logging in the client library.
1247-
// check Cadence.EnableVerboseLogging()
1247+
// check worker.EnableVerboseLogging()
12481248
func traceLog(fn func()) {
12491249
if enableVerboseLogging {
12501250
fn()

internal/internal_workflow.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ type (
8383
}
8484

8585
// Workflow is an interface that any workflow should implement.
86-
// Code of a workflow must be deterministic. It must use cadence.Channel, cadence.Selector, and workflow.Go instead of
86+
// Code of a workflow must be deterministic. It must use workflow.Channel, workflow.Selector, and workflow.Go instead of
8787
// native channels, select and go. It also must not use range operation over map as it is randomized by go runtime.
8888
// All time manipulation should use current time returned by GetTime(ctx) method.
8989
// Note that workflow.Context is used instead of context.Context to avoid use of raw channels.

0 commit comments

Comments
 (0)