Skip to content
Merged
48 changes: 48 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,20 @@ type (
// QueryWorkflowWithOptionsResponse defines the response to QueryWorkflowWithOptions
QueryWorkflowWithOptionsResponse = internal.QueryWorkflowWithOptionsResponse

// GetWorkflowHistoryWithOptionsRequest defines the request to GetWorkflowHistoryWithOptions
GetWorkflowHistoryWithOptionsRequest = internal.GetWorkflowHistoryWithOptionsRequest

// DescribeWorkflowExecutionWithOptionsRequest defines the request to DescribeWorkflowExecutionWithOptions
DescribeWorkflowExecutionWithOptionsRequest = internal.DescribeWorkflowExecutionWithOptionsRequest

// ParentClosePolicy defines the behavior performed on a child workflow when its parent is closed
ParentClosePolicy = internal.ParentClosePolicy

// QueryConsistencyLevel defines the level of consistency the query should respond with.
// It will default to the cluster's configuration if not specified.
// Valid values are QueryConsistencyLevelEventual (served by the receiving cluster), and QueryConsistencyLevelStrong (redirects to the active cluster).
QueryConsistencyLevel = internal.QueryConsistencyLevel

// CancelOption values are functional options for the CancelWorkflow method.
// Supported values can be created with:
// - WithCancelReason(...)
Expand Down Expand Up @@ -226,6 +237,26 @@ type (
// }
GetWorkflowHistory(ctx context.Context, workflowID string, runID string, isLongPoll bool, filterType s.HistoryEventFilterType) HistoryEventIterator

// GetWorkflowHistoryWithOptions gets history events of a particular workflow.
// See GetWorkflowHistoryWithOptionsRequest for more information.
// Returns an iterator of HistoryEvents - see shared.HistoryEvent for more details.
// Example:-
// To iterate all events,
// iter := GetWorkflowHistory(ctx, workflowID, runID, isLongPoll, filterType)
// events := []*shared.HistoryEvent{}
// for iter.HasNext() {
// event, err := iter.Next()
// if err != nil {
// return err
// }
// events = append(events, event)
// }
// Returns the following errors:
// - EntityNotExistsError
// - BadRequestError
// - InternalServiceError
GetWorkflowHistoryWithOptions(ctx context.Context, request *GetWorkflowHistoryWithOptionsRequest) (HistoryEventIterator, error)

// CompleteActivity reports activity completed.
// activity Execute method can return activity.ErrResultPending to
// indicate the activity is not completed when it's Execute method returns. In that case, this CompleteActivity() method
Expand Down Expand Up @@ -383,6 +414,14 @@ type (
// - EntityNotExistError
DescribeWorkflowExecution(ctx context.Context, workflowID, runID string) (*s.DescribeWorkflowExecutionResponse, error)

// DescribeWorkflowExecutionWithOptions returns information about the specified workflow execution.
// See DescribeWorkflowExecutionWithOptionsRequest for more information.
// The errors it can return:
// - BadRequestError
// - InternalServiceError
// - EntityNotExistError
DescribeWorkflowExecutionWithOptions(ctx context.Context, request *DescribeWorkflowExecutionWithOptionsRequest) (*s.DescribeWorkflowExecutionResponse, error)

// DescribeTaskList returns information about the target tasklist, right now this API returns the
// pollers which polled this tasklist in last few minutes.
// The errors it can return:
Expand Down Expand Up @@ -458,6 +497,15 @@ const (
ParentClosePolicyAbandon = internal.ParentClosePolicyAbandon
)

const (
// QueryConsistencyLevelUnspecified will use the default consistency level provided by the cluster.
QueryConsistencyLevelUnspecified = internal.QueryConsistencyLevelUnspecified
// QueryConsistencyLevelEventual passes the request to the receiving cluster and returns eventually consistent results
QueryConsistencyLevelEventual = internal.QueryConsistencyLevelEventual
// QueryConsistencyLevelStrong will redirect the request to the active cluster and returns strongly consistent results
QueryConsistencyLevelStrong = internal.QueryConsistencyLevelStrong
)

// NewClient creates an instance of a workflow client
func NewClient(service workflowserviceclient.Interface, domain string, options *Options) Client {
return internal.NewClient(service, domain, options)
Expand Down
31 changes: 31 additions & 0 deletions internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,15 @@ type (
// }
GetWorkflowHistory(ctx context.Context, workflowID string, runID string, isLongPoll bool, filterType s.HistoryEventFilterType) HistoryEventIterator

// GetWorkflowHistoryWithOptions gets history events of a particular workflow.
// See GetWorkflowHistoryWithOptionsRequest for more information.
// Returns an iterator of HistoryEvents - see shared.HistoryEvent for more details.
// The errors it can return:
// - EntityNotExistsError
// - BadRequestError
// - InternalServiceError
GetWorkflowHistoryWithOptions(ctx context.Context, request *GetWorkflowHistoryWithOptionsRequest) (HistoryEventIterator, error)

// CompleteActivity reports activity completed.
// activity Execute method can return acitivity.activity.ErrResultPending to
// indicate the activity is not completed when it's Execute method returns. In that case, this CompleteActivity() method
Expand Down Expand Up @@ -367,6 +376,14 @@ type (
// - EntityNotExistError
DescribeWorkflowExecution(ctx context.Context, workflowID, runID string) (*s.DescribeWorkflowExecutionResponse, error)

// DescribeWorkflowExecutionWithOptions returns information about the specified workflow execution.
// See DescribeWorkflowExecutionWithOptionsRequest for more information.
// The errors it can return:
// - BadRequestError
// - InternalServiceError
// - EntityNotExistError
DescribeWorkflowExecutionWithOptions(ctx context.Context, request *DescribeWorkflowExecutionWithOptionsRequest) (*s.DescribeWorkflowExecutionResponse, error)

// DescribeTaskList returns information about the target tasklist, right now this API returns the
// pollers which polled this tasklist in last few minutes.
// The errors it can return:
Expand Down Expand Up @@ -569,6 +586,11 @@ type (
// ParentClosePolicy defines the action on children when parent is closed
ParentClosePolicy int

// QueryConsistencyLevel defines the level of consistency the query should respond with.
// It will default to the cluster's configuration if not specified.
// Valid values are QueryConsistencyLevelEventual (served by the receiving cluster), and QueryConsistencyLevelStrong (redirects to the active cluster).
QueryConsistencyLevel int

// ActiveClusterSelectionPolicy defines the policy for selecting the active cluster to start the workflow execution on for active-active domains.
// Active-active domains can be configured to be active in multiple clusters (at most one in a given region).
// Individual workflows can be configured to be active in one of the active clusters of the domain.
Expand Down Expand Up @@ -620,6 +642,15 @@ const (
WorkflowIDReusePolicyTerminateIfRunning
)

const (
// QueryConsistencyLevelUnspecified will use the default consistency level provided by the cluster.
QueryConsistencyLevelUnspecified QueryConsistencyLevel = iota
// QueryConsistencyLevelEventual passes the request to the receiving cluster and returns eventually consistent results
QueryConsistencyLevelEventual
// QueryConsistencyLevelStrong will redirect the request to the active cluster and returns strongly consistent results
QueryConsistencyLevelStrong
)

func getFeatureFlags(options *ClientOptions) FeatureFlags {
if options != nil {
return FeatureFlags{
Expand Down
11 changes: 11 additions & 0 deletions internal/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,14 @@ func convertActiveClusterSelectionPolicy(policy *ActiveClusterSelectionPolicy) (
return nil, fmt.Errorf("invalid active cluster selection strategy: %d", policy.Strategy)
}
}

func convertQueryConsistencyLevel(level QueryConsistencyLevel) *s.QueryConsistencyLevel {
switch level {
case QueryConsistencyLevelEventual:
return s.QueryConsistencyLevelEventual.Ptr()
case QueryConsistencyLevelStrong:
return s.QueryConsistencyLevelStrong.Ptr()
default:
return nil
}
}
36 changes: 36 additions & 0 deletions internal/convert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,3 +157,39 @@ func TestConvertActiveClusterSelectionPolicy(t *testing.T) {
}
}
}

func TestConvertQueryConsistencyLevel(t *testing.T) {
tests := []struct {
name string
level QueryConsistencyLevel
expectedThrift *s.QueryConsistencyLevel
}{
{
name: "QueryConsistencyLevelUnspecified - return nil",
level: QueryConsistencyLevelUnspecified,
expectedThrift: nil,
},
{
name: "QueryConsistencyLevelEventual - return eventual",
level: QueryConsistencyLevelEventual,
expectedThrift: s.QueryConsistencyLevelEventual.Ptr(),
},
{
name: "QueryConsistencyLevelStrong - return strong",
level: QueryConsistencyLevelStrong,
expectedThrift: s.QueryConsistencyLevelStrong.Ptr(),
},
{
name: "invalid level (999) - return nil",
level: QueryConsistencyLevel(999),
expectedThrift: nil,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
result := convertQueryConsistencyLevel(test.level)
assert.Equal(t, test.expectedThrift, result)
})
}
}
99 changes: 83 additions & 16 deletions internal/internal_workflow_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,26 @@ func (wc *workflowClient) TerminateWorkflow(ctx context.Context, workflowID stri
return err
}

// GetWorkflowHistoryWithOptionsRequest is the request to GetWorkflowHistoryWithOptions
type GetWorkflowHistoryWithOptionsRequest struct {
// WorkflowID specifies the workflow to request. Required.
WorkflowID string
// RunID is an optional field used to identify a specific run of the workflow.
// If RunID is not provided the latest run will be used.
RunID string
// IsLongPoll is an optional field indicating whether to continue polling for new events until the workflow is terminal.
// If IsLongPoll is true, the client can continue to iterate on new events that occur after the initial request.
// Note that this means the client request will remain open until the workflow is terminal.
IsLongPoll bool
// FilterType is an optional field used to specify which events to return.
// CloseEvent will only return the last event in the workflow history.
FilterType s.HistoryEventFilterType
// QueryConsistencyLevel is an optional field used to specify the consistency level for the query.
// QueryConsistencyLevelStrong will query the currently active cluster for this workflow - at the potential cost of additional latency.
// If not set, server will use the default consistency level.
QueryConsistencyLevel QueryConsistencyLevel
}

// GetWorkflowHistory return a channel which contains the history events of a given workflow
func (wc *workflowClient) GetWorkflowHistory(
ctx context.Context,
Expand All @@ -453,19 +473,37 @@ func (wc *workflowClient) GetWorkflowHistory(
isLongPoll bool,
filterType s.HistoryEventFilterType,
) HistoryEventIterator {
request := &GetWorkflowHistoryWithOptionsRequest{
WorkflowID: workflowID,
RunID: runID,
IsLongPoll: isLongPoll,
FilterType: filterType,
QueryConsistencyLevel: QueryConsistencyLevelUnspecified,
}
iter, _ := wc.GetWorkflowHistoryWithOptions(ctx, request)
return iter
}

// GetWorkflowHistoryWithOptions gets history events with additional options including query consistency level.
// See GetWorkflowHistoryWithOptionsRequest for more information.
// The errors it can return:
// - EntityNotExistsError
// - BadRequestError
// - InternalServiceError
func (wc *workflowClient) GetWorkflowHistoryWithOptions(ctx context.Context, request *GetWorkflowHistoryWithOptionsRequest) (HistoryEventIterator, error) {
domain := wc.domain
paginate := func(nextToken []byte) (*s.GetWorkflowExecutionHistoryResponse, error) {
request := &s.GetWorkflowExecutionHistoryRequest{
req := &s.GetWorkflowExecutionHistoryRequest{
Domain: common.StringPtr(domain),
Execution: &s.WorkflowExecution{
WorkflowId: common.StringPtr(workflowID),
RunId: getRunID(runID),
WorkflowId: common.StringPtr(request.WorkflowID),
RunId: getRunID(request.RunID),
},
WaitForNewEvent: common.BoolPtr(isLongPoll),
HistoryEventFilterType: &filterType,
WaitForNewEvent: common.BoolPtr(request.IsLongPoll),
HistoryEventFilterType: &request.FilterType,
NextPageToken: nextToken,
SkipArchival: common.BoolPtr(isLongPoll),
SkipArchival: common.BoolPtr(request.IsLongPoll),
QueryConsistencyLevel: convertQueryConsistencyLevel(request.QueryConsistencyLevel),
}

var response *s.GetWorkflowExecutionHistoryResponse
Expand All @@ -477,7 +515,7 @@ func (wc *workflowClient) GetWorkflowHistory(
func() error {
var err1 error
tchCtx, cancel, opt := newChannelContext(ctx, wc.featureFlags, func(builder *contextBuilder) {
if isLongPoll {
if request.IsLongPoll {
builder.Timeout = defaultGetHistoryTimeoutInSecs * time.Second
deadline, ok := ctx.Deadline()
if ok && deadline.Before(time.Now().Add(builder.Timeout)) {
Expand All @@ -487,14 +525,14 @@ func (wc *workflowClient) GetWorkflowHistory(
}
})
defer cancel()
response, err1 = wc.workflowService.GetWorkflowExecutionHistory(tchCtx, request, opt...)
response, err1 = wc.workflowService.GetWorkflowExecutionHistory(tchCtx, req, opt...)

if err1 != nil {
return err1
}

if response.RawHistory != nil {
history, err := serializer.DeserializeBlobDataToHistoryEvents(response.RawHistory, filterType)
history, err := serializer.DeserializeBlobDataToHistoryEvents(response.RawHistory, request.FilterType)
if err != nil {
return err
}
Expand All @@ -511,15 +549,15 @@ func (wc *workflowClient) GetWorkflowHistory(
if err != nil {
return nil, err
}
if isLongPoll && len(response.History.Events) == 0 && len(response.NextPageToken) != 0 {
if request.IsLongPoll && len(response.History.Events) == 0 && len(response.NextPageToken) != 0 {
if isFinalLongPoll {
// essentially a deadline exceeded, the last attempt did not get a result.
// this is necessary because the server does not know if we are able to try again,
// so it returns an empty result slightly before a timeout occurs, so the next
// attempt's token can be returned if it wishes to retry.
return nil, fmt.Errorf("timed out waiting for the workflow to finish: %w", context.DeadlineExceeded)
}
request.NextPageToken = response.NextPageToken
req.NextPageToken = response.NextPageToken
continue Loop
}
break Loop
Expand All @@ -529,7 +567,7 @@ func (wc *workflowClient) GetWorkflowHistory(

return &historyEventIteratorImpl{
paginate: paginate,
}
}, nil
}

func isEntityNonExistFromPassive(err error) bool {
Expand Down Expand Up @@ -782,26 +820,55 @@ func (wc *workflowClient) GetSearchAttributes(ctx context.Context) (*s.GetSearch
return response, nil
}

// DescribeWorkflowExecutionWithOptionsRequest is the request to DescribeWorkflowExecutionWithOptions
type DescribeWorkflowExecutionWithOptionsRequest struct {
// WorkflowID specifies the workflow to request. Required.
WorkflowID string
// RunID is an optional field used to identify a specific run of the workflow.
// If RunID is not provided the latest run will be used.
RunID string
// QueryConsistencyLevel is an optional field used to specify the consistency level for the query.
// QueryConsistencyLevelStrong will query the currently active cluster for this workflow - at the potential cost of additional latency.
// If not set, server will use the default consistency level.
QueryConsistencyLevel QueryConsistencyLevel
}

// DescribeWorkflowExecution returns information about the specified workflow execution.
// The errors it can return:
// - BadRequestError
// - InternalServiceError
// - EntityNotExistError
func (wc *workflowClient) DescribeWorkflowExecution(ctx context.Context, workflowID, runID string) (*s.DescribeWorkflowExecutionResponse, error) {
request := &s.DescribeWorkflowExecutionRequest{
request := &DescribeWorkflowExecutionWithOptionsRequest{
WorkflowID: workflowID,
RunID: runID,
QueryConsistencyLevel: QueryConsistencyLevelUnspecified,
}
return wc.DescribeWorkflowExecutionWithOptions(ctx, request)
}

// DescribeWorkflowExecutionWithOptions returns information about workflow execution with additional options including query consistency level.
// See DescribeWorkflowExecutionWithOptionsRequest for more information.
// The errors it can return:
// - BadRequestError
// - InternalServiceError
// - EntityNotExistError
func (wc *workflowClient) DescribeWorkflowExecutionWithOptions(ctx context.Context, request *DescribeWorkflowExecutionWithOptionsRequest) (*s.DescribeWorkflowExecutionResponse, error) {
req := &s.DescribeWorkflowExecutionRequest{
Domain: common.StringPtr(wc.domain),
Execution: &s.WorkflowExecution{
WorkflowId: common.StringPtr(workflowID),
RunId: common.StringPtr(runID),
WorkflowId: common.StringPtr(request.WorkflowID),
RunId: common.StringPtr(request.RunID),
},
QueryConsistencyLevel: convertQueryConsistencyLevel(request.QueryConsistencyLevel),
}
var response *s.DescribeWorkflowExecutionResponse
err := backoff.Retry(ctx,
func() error {
var err1 error
tchCtx, cancel, opt := newChannelContext(ctx, wc.featureFlags)
defer cancel()
response, err1 = wc.workflowService.DescribeWorkflowExecution(tchCtx, request, opt...)
response, err1 = wc.workflowService.DescribeWorkflowExecution(tchCtx, req, opt...)
return err1
}, createDynamicServiceRetryPolicy(ctx), isServiceTransientError)
if err != nil {
Expand Down
Loading