diff --git a/client/client.go b/client/client.go index 58b7c9304..06d44f92a 100644 --- a/client/client.go +++ b/client/client.go @@ -78,9 +78,20 @@ type ( // QueryWorkflowWithOptionsResponse defines the response to QueryWorkflowWithOptions QueryWorkflowWithOptionsResponse = internal.QueryWorkflowWithOptionsResponse + // GetWorkflowHistoryWithOptionsRequest defines the request to GetWorkflowHistoryWithOptions + GetWorkflowHistoryWithOptionsRequest = internal.GetWorkflowHistoryWithOptionsRequest + + // DescribeWorkflowExecutionWithOptionsRequest defines the request to DescribeWorkflowExecutionWithOptions + DescribeWorkflowExecutionWithOptionsRequest = internal.DescribeWorkflowExecutionWithOptionsRequest + // ParentClosePolicy defines the behavior performed on a child workflow when its parent is closed ParentClosePolicy = internal.ParentClosePolicy + // QueryConsistencyLevel defines the level of consistency the query should respond with. + // It will default to the cluster's configuration if not specified. + // Valid values are QueryConsistencyLevelEventual (served by the receiving cluster), and QueryConsistencyLevelStrong (redirects to the active cluster). + QueryConsistencyLevel = internal.QueryConsistencyLevel + // CancelOption values are functional options for the CancelWorkflow method. // Supported values can be created with: // - WithCancelReason(...) @@ -226,6 +237,26 @@ type ( // } GetWorkflowHistory(ctx context.Context, workflowID string, runID string, isLongPoll bool, filterType s.HistoryEventFilterType) HistoryEventIterator + // GetWorkflowHistoryWithOptions gets history events of a particular workflow. + // See GetWorkflowHistoryWithOptionsRequest for more information. + // Returns an iterator of HistoryEvents - see shared.HistoryEvent for more details. + // Example:- + // To iterate all events, + // iter := GetWorkflowHistory(ctx, workflowID, runID, isLongPoll, filterType) + // events := []*shared.HistoryEvent{} + // for iter.HasNext() { + // event, err := iter.Next() + // if err != nil { + // return err + // } + // events = append(events, event) + // } + // Returns the following errors: + // - EntityNotExistsError + // - BadRequestError + // - InternalServiceError + GetWorkflowHistoryWithOptions(ctx context.Context, request *GetWorkflowHistoryWithOptionsRequest) HistoryEventIterator + // CompleteActivity reports activity completed. // activity Execute method can return activity.ErrResultPending to // indicate the activity is not completed when it's Execute method returns. In that case, this CompleteActivity() method @@ -383,6 +414,14 @@ type ( // - EntityNotExistError DescribeWorkflowExecution(ctx context.Context, workflowID, runID string) (*s.DescribeWorkflowExecutionResponse, error) + // DescribeWorkflowExecutionWithOptions returns information about the specified workflow execution. + // See DescribeWorkflowExecutionWithOptionsRequest for more information. + // The errors it can return: + // - BadRequestError + // - InternalServiceError + // - EntityNotExistError + DescribeWorkflowExecutionWithOptions(ctx context.Context, request *DescribeWorkflowExecutionWithOptionsRequest) (*s.DescribeWorkflowExecutionResponse, error) + // DescribeTaskList returns information about the target tasklist, right now this API returns the // pollers which polled this tasklist in last few minutes. // The errors it can return: @@ -458,6 +497,15 @@ const ( ParentClosePolicyAbandon = internal.ParentClosePolicyAbandon ) +const ( + // QueryConsistencyLevelUnspecified will use the default consistency level provided by the cluster. + QueryConsistencyLevelUnspecified = internal.QueryConsistencyLevelUnspecified + // QueryConsistencyLevelEventual passes the request to the receiving cluster and returns eventually consistent results + QueryConsistencyLevelEventual = internal.QueryConsistencyLevelEventual + // QueryConsistencyLevelStrong will redirect the request to the active cluster and returns strongly consistent results + QueryConsistencyLevelStrong = internal.QueryConsistencyLevelStrong +) + // NewClient creates an instance of a workflow client func NewClient(service workflowserviceclient.Interface, domain string, options *Options) Client { return internal.NewClient(service, domain, options) diff --git a/internal/client.go b/internal/client.go index 8c6d1128d..e8bd46fe0 100644 --- a/internal/client.go +++ b/internal/client.go @@ -219,6 +219,15 @@ type ( // } GetWorkflowHistory(ctx context.Context, workflowID string, runID string, isLongPoll bool, filterType s.HistoryEventFilterType) HistoryEventIterator + // GetWorkflowHistoryWithOptions gets history events of a particular workflow. + // See GetWorkflowHistoryWithOptionsRequest for more information. + // Returns an iterator of HistoryEvents - see shared.HistoryEvent for more details. + // The errors it can return: + // - EntityNotExistsError + // - BadRequestError + // - InternalServiceError + GetWorkflowHistoryWithOptions(ctx context.Context, request *GetWorkflowHistoryWithOptionsRequest) HistoryEventIterator + // CompleteActivity reports activity completed. // activity Execute method can return acitivity.activity.ErrResultPending to // indicate the activity is not completed when it's Execute method returns. In that case, this CompleteActivity() method @@ -367,6 +376,14 @@ type ( // - EntityNotExistError DescribeWorkflowExecution(ctx context.Context, workflowID, runID string) (*s.DescribeWorkflowExecutionResponse, error) + // DescribeWorkflowExecutionWithOptions returns information about the specified workflow execution. + // See DescribeWorkflowExecutionWithOptionsRequest for more information. + // The errors it can return: + // - BadRequestError + // - InternalServiceError + // - EntityNotExistError + DescribeWorkflowExecutionWithOptions(ctx context.Context, request *DescribeWorkflowExecutionWithOptionsRequest) (*s.DescribeWorkflowExecutionResponse, error) + // DescribeTaskList returns information about the target tasklist, right now this API returns the // pollers which polled this tasklist in last few minutes. // The errors it can return: @@ -569,6 +586,11 @@ type ( // ParentClosePolicy defines the action on children when parent is closed ParentClosePolicy int + // QueryConsistencyLevel defines the level of consistency the query should respond with. + // It will default to the cluster's configuration if not specified. + // Valid values are QueryConsistencyLevelEventual (served by the receiving cluster), and QueryConsistencyLevelStrong (redirects to the active cluster). + QueryConsistencyLevel int + // ActiveClusterSelectionPolicy defines the policy for selecting the active cluster to start the workflow execution on for active-active domains. // Active-active domains can be configured to be active in multiple clusters (at most one in a given region). // Individual workflows can be configured to be active in one of the active clusters of the domain. @@ -620,6 +642,15 @@ const ( WorkflowIDReusePolicyTerminateIfRunning ) +const ( + // QueryConsistencyLevelUnspecified will use the default consistency level provided by the cluster. + QueryConsistencyLevelUnspecified QueryConsistencyLevel = iota + // QueryConsistencyLevelEventual passes the request to the receiving cluster and returns eventually consistent results + QueryConsistencyLevelEventual + // QueryConsistencyLevelStrong will redirect the request to the active cluster and returns strongly consistent results + QueryConsistencyLevelStrong +) + func getFeatureFlags(options *ClientOptions) FeatureFlags { if options != nil { return FeatureFlags{ diff --git a/internal/convert.go b/internal/convert.go index f90899bd8..399ddf24a 100644 --- a/internal/convert.go +++ b/internal/convert.go @@ -72,3 +72,14 @@ func convertActiveClusterSelectionPolicy(policy *ActiveClusterSelectionPolicy) ( return nil, fmt.Errorf("invalid active cluster selection strategy: %d", policy.Strategy) } } + +func convertQueryConsistencyLevel(level QueryConsistencyLevel) *s.QueryConsistencyLevel { + switch level { + case QueryConsistencyLevelEventual: + return s.QueryConsistencyLevelEventual.Ptr() + case QueryConsistencyLevelStrong: + return s.QueryConsistencyLevelStrong.Ptr() + default: + return nil + } +} diff --git a/internal/convert_test.go b/internal/convert_test.go index 2b8d22aeb..3be8872c9 100644 --- a/internal/convert_test.go +++ b/internal/convert_test.go @@ -157,3 +157,39 @@ func TestConvertActiveClusterSelectionPolicy(t *testing.T) { } } } + +func TestConvertQueryConsistencyLevel(t *testing.T) { + tests := []struct { + name string + level QueryConsistencyLevel + expectedThrift *s.QueryConsistencyLevel + }{ + { + name: "QueryConsistencyLevelUnspecified - return nil", + level: QueryConsistencyLevelUnspecified, + expectedThrift: nil, + }, + { + name: "QueryConsistencyLevelEventual - return eventual", + level: QueryConsistencyLevelEventual, + expectedThrift: s.QueryConsistencyLevelEventual.Ptr(), + }, + { + name: "QueryConsistencyLevelStrong - return strong", + level: QueryConsistencyLevelStrong, + expectedThrift: s.QueryConsistencyLevelStrong.Ptr(), + }, + { + name: "invalid level (999) - return nil", + level: QueryConsistencyLevel(999), + expectedThrift: nil, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + result := convertQueryConsistencyLevel(test.level) + assert.Equal(t, test.expectedThrift, result) + }) + } +} diff --git a/internal/internal_workflow_client.go b/internal/internal_workflow_client.go index b90962dac..fa479c792 100644 --- a/internal/internal_workflow_client.go +++ b/internal/internal_workflow_client.go @@ -445,6 +445,26 @@ func (wc *workflowClient) TerminateWorkflow(ctx context.Context, workflowID stri return err } +// GetWorkflowHistoryWithOptionsRequest is the request to GetWorkflowHistoryWithOptions +type GetWorkflowHistoryWithOptionsRequest struct { + // WorkflowID specifies the workflow to request. Required. + WorkflowID string + // RunID is an optional field used to identify a specific run of the workflow. + // If RunID is not provided the latest run will be used. + RunID string + // IsLongPoll is an optional field indicating whether to continue polling for new events until the workflow is terminal. + // If IsLongPoll is true, the client can continue to iterate on new events that occur after the initial request. + // Note that this means the client request will remain open until the workflow is terminal. + IsLongPoll bool + // FilterType is an optional field used to specify which events to return. + // CloseEvent will only return the last event in the workflow history. + FilterType s.HistoryEventFilterType + // QueryConsistencyLevel is an optional field used to specify the consistency level for the query. + // QueryConsistencyLevelStrong will query the currently active cluster for this workflow - at the potential cost of additional latency. + // If not set, server will use the default consistency level. + QueryConsistencyLevel QueryConsistencyLevel +} + // GetWorkflowHistory return a channel which contains the history events of a given workflow func (wc *workflowClient) GetWorkflowHistory( ctx context.Context, @@ -453,19 +473,37 @@ func (wc *workflowClient) GetWorkflowHistory( isLongPoll bool, filterType s.HistoryEventFilterType, ) HistoryEventIterator { + request := &GetWorkflowHistoryWithOptionsRequest{ + WorkflowID: workflowID, + RunID: runID, + IsLongPoll: isLongPoll, + FilterType: filterType, + QueryConsistencyLevel: QueryConsistencyLevelUnspecified, + } + iter := wc.GetWorkflowHistoryWithOptions(ctx, request) + return iter +} +// GetWorkflowHistoryWithOptions gets history events with additional options including query consistency level. +// See GetWorkflowHistoryWithOptionsRequest for more information. +// The errors it can return: +// - EntityNotExistsError +// - BadRequestError +// - InternalServiceError +func (wc *workflowClient) GetWorkflowHistoryWithOptions(ctx context.Context, request *GetWorkflowHistoryWithOptionsRequest) HistoryEventIterator { domain := wc.domain paginate := func(nextToken []byte) (*s.GetWorkflowExecutionHistoryResponse, error) { - request := &s.GetWorkflowExecutionHistoryRequest{ + req := &s.GetWorkflowExecutionHistoryRequest{ Domain: common.StringPtr(domain), Execution: &s.WorkflowExecution{ - WorkflowId: common.StringPtr(workflowID), - RunId: getRunID(runID), + WorkflowId: common.StringPtr(request.WorkflowID), + RunId: getRunID(request.RunID), }, - WaitForNewEvent: common.BoolPtr(isLongPoll), - HistoryEventFilterType: &filterType, + WaitForNewEvent: common.BoolPtr(request.IsLongPoll), + HistoryEventFilterType: &request.FilterType, NextPageToken: nextToken, - SkipArchival: common.BoolPtr(isLongPoll), + SkipArchival: common.BoolPtr(request.IsLongPoll), + QueryConsistencyLevel: convertQueryConsistencyLevel(request.QueryConsistencyLevel), } var response *s.GetWorkflowExecutionHistoryResponse @@ -477,7 +515,7 @@ func (wc *workflowClient) GetWorkflowHistory( func() error { var err1 error tchCtx, cancel, opt := newChannelContext(ctx, wc.featureFlags, func(builder *contextBuilder) { - if isLongPoll { + if request.IsLongPoll { builder.Timeout = defaultGetHistoryTimeoutInSecs * time.Second deadline, ok := ctx.Deadline() if ok && deadline.Before(time.Now().Add(builder.Timeout)) { @@ -487,14 +525,14 @@ func (wc *workflowClient) GetWorkflowHistory( } }) defer cancel() - response, err1 = wc.workflowService.GetWorkflowExecutionHistory(tchCtx, request, opt...) + response, err1 = wc.workflowService.GetWorkflowExecutionHistory(tchCtx, req, opt...) if err1 != nil { return err1 } if response.RawHistory != nil { - history, err := serializer.DeserializeBlobDataToHistoryEvents(response.RawHistory, filterType) + history, err := serializer.DeserializeBlobDataToHistoryEvents(response.RawHistory, request.FilterType) if err != nil { return err } @@ -511,7 +549,7 @@ func (wc *workflowClient) GetWorkflowHistory( if err != nil { return nil, err } - if isLongPoll && len(response.History.Events) == 0 && len(response.NextPageToken) != 0 { + if request.IsLongPoll && len(response.History.Events) == 0 && len(response.NextPageToken) != 0 { if isFinalLongPoll { // essentially a deadline exceeded, the last attempt did not get a result. // this is necessary because the server does not know if we are able to try again, @@ -519,7 +557,7 @@ func (wc *workflowClient) GetWorkflowHistory( // attempt's token can be returned if it wishes to retry. return nil, fmt.Errorf("timed out waiting for the workflow to finish: %w", context.DeadlineExceeded) } - request.NextPageToken = response.NextPageToken + req.NextPageToken = response.NextPageToken continue Loop } break Loop @@ -782,18 +820,47 @@ func (wc *workflowClient) GetSearchAttributes(ctx context.Context) (*s.GetSearch return response, nil } +// DescribeWorkflowExecutionWithOptionsRequest is the request to DescribeWorkflowExecutionWithOptions +type DescribeWorkflowExecutionWithOptionsRequest struct { + // WorkflowID specifies the workflow to request. Required. + WorkflowID string + // RunID is an optional field used to identify a specific run of the workflow. + // If RunID is not provided the latest run will be used. + RunID string + // QueryConsistencyLevel is an optional field used to specify the consistency level for the query. + // QueryConsistencyLevelStrong will query the currently active cluster for this workflow - at the potential cost of additional latency. + // If not set, server will use the default consistency level. + QueryConsistencyLevel QueryConsistencyLevel +} + // DescribeWorkflowExecution returns information about the specified workflow execution. // The errors it can return: // - BadRequestError // - InternalServiceError // - EntityNotExistError func (wc *workflowClient) DescribeWorkflowExecution(ctx context.Context, workflowID, runID string) (*s.DescribeWorkflowExecutionResponse, error) { - request := &s.DescribeWorkflowExecutionRequest{ + request := &DescribeWorkflowExecutionWithOptionsRequest{ + WorkflowID: workflowID, + RunID: runID, + QueryConsistencyLevel: QueryConsistencyLevelUnspecified, + } + return wc.DescribeWorkflowExecutionWithOptions(ctx, request) +} + +// DescribeWorkflowExecutionWithOptions returns information about workflow execution with additional options including query consistency level. +// See DescribeWorkflowExecutionWithOptionsRequest for more information. +// The errors it can return: +// - BadRequestError +// - InternalServiceError +// - EntityNotExistError +func (wc *workflowClient) DescribeWorkflowExecutionWithOptions(ctx context.Context, request *DescribeWorkflowExecutionWithOptionsRequest) (*s.DescribeWorkflowExecutionResponse, error) { + req := &s.DescribeWorkflowExecutionRequest{ Domain: common.StringPtr(wc.domain), Execution: &s.WorkflowExecution{ - WorkflowId: common.StringPtr(workflowID), - RunId: common.StringPtr(runID), + WorkflowId: common.StringPtr(request.WorkflowID), + RunId: common.StringPtr(request.RunID), }, + QueryConsistencyLevel: convertQueryConsistencyLevel(request.QueryConsistencyLevel), } var response *s.DescribeWorkflowExecutionResponse err := backoff.Retry(ctx, @@ -801,7 +868,7 @@ func (wc *workflowClient) DescribeWorkflowExecution(ctx context.Context, workflo var err1 error tchCtx, cancel, opt := newChannelContext(ctx, wc.featureFlags) defer cancel() - response, err1 = wc.workflowService.DescribeWorkflowExecution(tchCtx, request, opt...) + response, err1 = wc.workflowService.DescribeWorkflowExecution(tchCtx, req, opt...) return err1 }, createDynamicServiceRetryPolicy(ctx), isServiceTransientError) if err != nil { diff --git a/internal/internal_workflow_client_test.go b/internal/internal_workflow_client_test.go index 93bd43117..e1ead5843 100644 --- a/internal/internal_workflow_client_test.go +++ b/internal/internal_workflow_client_test.go @@ -2023,6 +2023,107 @@ func (s *workflowClientTestSuite) TestDescribeWorkflowExecution() { } } +func (s *workflowClientTestSuite) TestDescribeWorkflowExecutionWithOptions() { + testcases := []struct { + name string + request *DescribeWorkflowExecutionWithOptionsRequest + requestValidator func(req *shared.DescribeWorkflowExecutionRequest) + rpcError error + response *shared.DescribeWorkflowExecutionResponse + responseValidator func(resp *shared.DescribeWorkflowExecutionResponse, err error) + }{ + { + name: "success without query consistency level", + request: &DescribeWorkflowExecutionWithOptionsRequest{ + WorkflowID: workflowID, + RunID: runID, + }, + requestValidator: func(req *shared.DescribeWorkflowExecutionRequest) { + s.Equal(domain, req.GetDomain()) + s.Equal(workflowID, req.GetExecution().GetWorkflowId()) + s.Equal(runID, req.GetExecution().GetRunId()) + s.Nil(req.QueryConsistencyLevel, "should be nil when not specified") + }, + rpcError: nil, + response: &shared.DescribeWorkflowExecutionResponse{}, + responseValidator: func(resp *shared.DescribeWorkflowExecutionResponse, err error) { + s.NoError(err) + s.NotNil(resp) + }, + }, + { + name: "success with query consistency level eventual", + request: &DescribeWorkflowExecutionWithOptionsRequest{ + WorkflowID: workflowID, + RunID: runID, + QueryConsistencyLevel: QueryConsistencyLevelEventual, + }, + requestValidator: func(req *shared.DescribeWorkflowExecutionRequest) { + s.Equal(domain, req.GetDomain()) + s.Equal(workflowID, req.GetExecution().GetWorkflowId()) + s.Equal(runID, req.GetExecution().GetRunId()) + s.NotNil(req.QueryConsistencyLevel) + s.Equal(shared.QueryConsistencyLevelEventual, *req.QueryConsistencyLevel) + }, + rpcError: nil, + response: &shared.DescribeWorkflowExecutionResponse{}, + responseValidator: func(resp *shared.DescribeWorkflowExecutionResponse, err error) { + s.NoError(err) + s.NotNil(resp) + }, + }, + { + name: "success with query consistency level strong", + request: &DescribeWorkflowExecutionWithOptionsRequest{ + WorkflowID: workflowID, + RunID: runID, + QueryConsistencyLevel: QueryConsistencyLevelStrong, + }, + requestValidator: func(req *shared.DescribeWorkflowExecutionRequest) { + s.Equal(domain, req.GetDomain()) + s.Equal(workflowID, req.GetExecution().GetWorkflowId()) + s.Equal(runID, req.GetExecution().GetRunId()) + s.NotNil(req.QueryConsistencyLevel) + s.Equal(shared.QueryConsistencyLevelStrong, *req.QueryConsistencyLevel) + }, + rpcError: nil, + response: &shared.DescribeWorkflowExecutionResponse{}, + responseValidator: func(resp *shared.DescribeWorkflowExecutionResponse, err error) { + s.NoError(err) + s.NotNil(resp) + }, + }, + { + name: "RPC failure", + request: &DescribeWorkflowExecutionWithOptionsRequest{ + WorkflowID: workflowID, + RunID: runID, + }, + requestValidator: func(req *shared.DescribeWorkflowExecutionRequest) {}, + rpcError: &shared.AccessDeniedError{}, + response: nil, + responseValidator: func(resp *shared.DescribeWorkflowExecutionResponse, err error) { + s.Equal(&shared.AccessDeniedError{}, err) + s.Nil(resp) + }, + }, + } + + for _, tt := range testcases { + s.Run(tt.name, func() { + s.service.EXPECT(). + DescribeWorkflowExecution(gomock.Any(), gomock.Any(), gomock.Any()). + Do(func(_ context.Context, req *shared.DescribeWorkflowExecutionRequest, _ ...interface{}) { + tt.requestValidator(req) + }). + Return(tt.response, tt.rpcError) + + resp, err := s.client.DescribeWorkflowExecutionWithOptions(context.Background(), tt.request) + tt.responseValidator(resp, err) + }) + } +} + func (s *workflowClientTestSuite) TestCompleteActivity() { testcases := []struct { name string @@ -2521,6 +2622,202 @@ func (s *workflowClientTestSuite) TestGetWorkflowHistory() { } } +func (s *workflowClientTestSuite) TestGetWorkflowHistoryWithOptions() { + testcases := []struct { + name string + request *GetWorkflowHistoryWithOptionsRequest + requestValidator func(req *shared.GetWorkflowExecutionHistoryRequest) + rpcError error + response *shared.GetWorkflowExecutionHistoryResponse + responseValidator func(iter HistoryEventIterator) + }{ + { + name: "success without query consistency level", + request: &GetWorkflowHistoryWithOptionsRequest{ + WorkflowID: workflowID, + RunID: runID, + IsLongPoll: false, + FilterType: shared.HistoryEventFilterTypeAllEvent, + }, + requestValidator: func(req *shared.GetWorkflowExecutionHistoryRequest) { + s.Equal(domain, req.GetDomain()) + s.Equal(workflowID, req.GetExecution().GetWorkflowId()) + s.Equal(runID, req.GetExecution().GetRunId()) + s.Equal(false, req.GetWaitForNewEvent()) + s.Equal(shared.HistoryEventFilterTypeAllEvent, req.GetHistoryEventFilterType()) + s.Nil(req.QueryConsistencyLevel, "should be nil when not specified") + }, + rpcError: nil, + response: &shared.GetWorkflowExecutionHistoryResponse{ + History: &shared.History{ + Events: []*shared.HistoryEvent{ + {EventId: common.Int64Ptr(1)}, + {EventId: common.Int64Ptr(2)}, + }, + }, + NextPageToken: nil, + }, + responseValidator: func(iter HistoryEventIterator) { + s.NotNil(iter) + s.True(iter.HasNext()) + event, nextErr := iter.Next() + s.NoError(nextErr) + s.Equal(int64(1), event.GetEventId()) + }, + }, + { + name: "success with query consistency level eventual", + request: &GetWorkflowHistoryWithOptionsRequest{ + WorkflowID: workflowID, + RunID: runID, + IsLongPoll: true, + FilterType: shared.HistoryEventFilterTypeCloseEvent, + QueryConsistencyLevel: QueryConsistencyLevelEventual, + }, + requestValidator: func(req *shared.GetWorkflowExecutionHistoryRequest) { + s.Equal(domain, req.GetDomain()) + s.Equal(workflowID, req.GetExecution().GetWorkflowId()) + s.Equal(runID, req.GetExecution().GetRunId()) + s.Equal(true, req.GetWaitForNewEvent()) + s.Equal(shared.HistoryEventFilterTypeCloseEvent, req.GetHistoryEventFilterType()) + s.NotNil(req.QueryConsistencyLevel) + s.Equal(shared.QueryConsistencyLevelEventual, *req.QueryConsistencyLevel) + }, + rpcError: nil, + response: &shared.GetWorkflowExecutionHistoryResponse{ + History: &shared.History{ + Events: []*shared.HistoryEvent{ + {EventId: common.Int64Ptr(10)}, + }, + }, + NextPageToken: nil, + }, + responseValidator: func(iter HistoryEventIterator) { + s.NotNil(iter) + s.True(iter.HasNext()) + event, nextErr := iter.Next() + s.NoError(nextErr) + s.Equal(int64(10), event.GetEventId()) + }, + }, + { + name: "success with query consistency level strong", + request: &GetWorkflowHistoryWithOptionsRequest{ + WorkflowID: workflowID, + RunID: runID, + IsLongPoll: false, + FilterType: shared.HistoryEventFilterTypeAllEvent, + QueryConsistencyLevel: QueryConsistencyLevelStrong, + }, + requestValidator: func(req *shared.GetWorkflowExecutionHistoryRequest) { + s.Equal(domain, req.GetDomain()) + s.Equal(workflowID, req.GetExecution().GetWorkflowId()) + s.Equal(runID, req.GetExecution().GetRunId()) + s.Equal(false, req.GetWaitForNewEvent()) + s.Equal(shared.HistoryEventFilterTypeAllEvent, req.GetHistoryEventFilterType()) + s.NotNil(req.QueryConsistencyLevel) + s.Equal(shared.QueryConsistencyLevelStrong, *req.QueryConsistencyLevel) + }, + rpcError: nil, + response: &shared.GetWorkflowExecutionHistoryResponse{ + History: &shared.History{ + Events: []*shared.HistoryEvent{ + {EventId: common.Int64Ptr(5)}, + }, + }, + NextPageToken: nil, + }, + responseValidator: func(iter HistoryEventIterator) { + s.NotNil(iter) + s.True(iter.HasNext()) + event, nextErr := iter.Next() + s.NoError(nextErr) + s.Equal(int64(5), event.GetEventId()) + }, + }, + { + name: "success with multiple events and pagination", + request: &GetWorkflowHistoryWithOptionsRequest{ + WorkflowID: workflowID, + RunID: runID, + IsLongPoll: false, + FilterType: shared.HistoryEventFilterTypeAllEvent, + QueryConsistencyLevel: QueryConsistencyLevelStrong, + }, + requestValidator: func(req *shared.GetWorkflowExecutionHistoryRequest) { + s.Equal(domain, req.GetDomain()) + s.Equal(workflowID, req.GetExecution().GetWorkflowId()) + s.Equal(runID, req.GetExecution().GetRunId()) + s.Equal(false, req.GetWaitForNewEvent()) + s.Equal(shared.HistoryEventFilterTypeAllEvent, req.GetHistoryEventFilterType()) + s.NotNil(req.QueryConsistencyLevel) + s.Equal(shared.QueryConsistencyLevelStrong, *req.QueryConsistencyLevel) + }, + rpcError: nil, + response: &shared.GetWorkflowExecutionHistoryResponse{ + History: &shared.History{ + Events: []*shared.HistoryEvent{ + {EventId: common.Int64Ptr(1)}, + {EventId: common.Int64Ptr(2)}, + {EventId: common.Int64Ptr(3)}, + {EventId: common.Int64Ptr(4)}, + }, + }, + NextPageToken: nil, + }, + responseValidator: func(iter HistoryEventIterator) { + s.NotNil(iter) + + // Check that the iterator returns all events in order + for i := 1; i <= 4; i++ { + s.True(iter.HasNext(), "should have event %d", i) + event, nextErr := iter.Next() + s.NoError(nextErr) + s.Equal(int64(i), event.GetEventId()) + } + + // Verify no more events + s.False(iter.HasNext(), "should not have more events") + }, + }, + { + name: "RPC failure", + request: &GetWorkflowHistoryWithOptionsRequest{ + WorkflowID: workflowID, + RunID: runID, + IsLongPoll: false, + FilterType: shared.HistoryEventFilterTypeAllEvent, + }, + requestValidator: func(req *shared.GetWorkflowExecutionHistoryRequest) {}, + rpcError: &shared.AccessDeniedError{}, + response: nil, + responseValidator: func(iter HistoryEventIterator) { + s.NotNil(iter, "iterator should be returned") + + // Error should be returned when iterator is used + s.True(iter.HasNext(), "should have next to trigger error") + event, iterErr := iter.Next() + s.Equal(&shared.AccessDeniedError{}, iterErr) + s.Nil(event) + }, + }, + } + + for _, tt := range testcases { + s.Run(tt.name, func() { + s.service.EXPECT(). + GetWorkflowExecutionHistory(gomock.Any(), gomock.Any(), gomock.Any()). + Do(func(_ context.Context, req *shared.GetWorkflowExecutionHistoryRequest, _ ...interface{}) { + tt.requestValidator(req) + }). + Return(tt.response, tt.rpcError) + + iter := s.client.GetWorkflowHistoryWithOptions(context.Background(), tt.request) + tt.responseValidator(iter) + }) + } +} + func TestGetWorkflowStartRequest(t *testing.T) { tests := []struct { name string diff --git a/mocks/Client.go b/mocks/Client.go index ed291db7d..8ad733b8f 100644 --- a/mocks/Client.go +++ b/mocks/Client.go @@ -168,6 +168,36 @@ func (_m *Client) DescribeWorkflowExecution(ctx context.Context, workflowID stri return r0, r1 } +// DescribeWorkflowExecutionWithOptions provides a mock function with given fields: ctx, request +func (_m *Client) DescribeWorkflowExecutionWithOptions(ctx context.Context, request *internal.DescribeWorkflowExecutionWithOptionsRequest) (*shared.DescribeWorkflowExecutionResponse, error) { + ret := _m.Called(ctx, request) + + if len(ret) == 0 { + panic("no return value specified for DescribeWorkflowExecutionWithOptions") + } + + var r0 *shared.DescribeWorkflowExecutionResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *internal.DescribeWorkflowExecutionWithOptionsRequest) (*shared.DescribeWorkflowExecutionResponse, error)); ok { + return rf(ctx, request) + } + if rf, ok := ret.Get(0).(func(context.Context, *internal.DescribeWorkflowExecutionWithOptionsRequest) *shared.DescribeWorkflowExecutionResponse); ok { + r0 = rf(ctx, request) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*shared.DescribeWorkflowExecutionResponse) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *internal.DescribeWorkflowExecutionWithOptionsRequest) error); ok { + r1 = rf(ctx, request) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // ExecuteWorkflow provides a mock function with given fields: ctx, options, workflow, args func (_m *Client) ExecuteWorkflow(ctx context.Context, options internal.StartWorkflowOptions, workflow interface{}, args ...interface{}) (internal.WorkflowRun, error) { var _ca []interface{} @@ -271,6 +301,26 @@ func (_m *Client) GetWorkflowHistory(ctx context.Context, workflowID string, run return r0 } +// GetWorkflowHistoryWithOptions provides a mock function with given fields: ctx, request +func (_m *Client) GetWorkflowHistoryWithOptions(ctx context.Context, request *internal.GetWorkflowHistoryWithOptionsRequest) internal.HistoryEventIterator { + ret := _m.Called(ctx, request) + + if len(ret) == 0 { + panic("no return value specified for GetWorkflowHistoryWithOptions") + } + + var r0 internal.HistoryEventIterator + if rf, ok := ret.Get(0).(func(context.Context, *internal.GetWorkflowHistoryWithOptionsRequest) internal.HistoryEventIterator); ok { + r0 = rf(ctx, request) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(internal.HistoryEventIterator) + } + } + + return r0 +} + // ListArchivedWorkflow provides a mock function with given fields: ctx, request func (_m *Client) ListArchivedWorkflow(ctx context.Context, request *shared.ListArchivedWorkflowExecutionsRequest) (*shared.ListArchivedWorkflowExecutionsResponse, error) { ret := _m.Called(ctx, request) diff --git a/test/integration_test.go b/test/integration_test.go index 380023c47..c1e6dcfb3 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -315,6 +315,34 @@ func (ts *IntegrationTestSuite) TestConsistentQuery() { var queryResult string ts.NoError(value.QueryResult.Get(&queryResult)) ts.Equal("signal-input", queryResult) + + // Test DescribeWorkflowExecutionWithOptions with QueryConsistencyLevel + descResp, err := ts.libClient.DescribeWorkflowExecutionWithOptions(ctx, &client.DescribeWorkflowExecutionWithOptionsRequest{ + WorkflowID: "test-consistent-query", + RunID: run.GetRunID(), + QueryConsistencyLevel: client.QueryConsistencyLevelStrong, + }) + ts.Nil(err) + ts.NotNil(descResp) + ts.NotNil(descResp.WorkflowExecutionInfo) + ts.Equal("test-consistent-query", descResp.WorkflowExecutionInfo.GetExecution().GetWorkflowId()) + ts.Equal(run.GetRunID(), descResp.WorkflowExecutionInfo.GetExecution().GetRunId()) + + // Test GetWorkflowHistoryWithOptions with QueryConsistencyLevel + histIter := ts.libClient.GetWorkflowHistoryWithOptions(ctx, &client.GetWorkflowHistoryWithOptionsRequest{ + WorkflowID: "test-consistent-query", + RunID: run.GetRunID(), + IsLongPoll: false, + FilterType: shared.HistoryEventFilterTypeAllEvent, + QueryConsistencyLevel: client.QueryConsistencyLevelStrong, + }) + ts.Nil(err) + ts.NotNil(histIter) + ts.True(histIter.HasNext()) + firstEvent, err := histIter.Next() + ts.Nil(err) + ts.NotNil(firstEvent) + ts.Equal(shared.EventTypeWorkflowExecutionStarted, firstEvent.GetEventType()) } func (ts *IntegrationTestSuite) TestWorkflowIDReuseRejectDuplicate() {