Skip to content

Commit 97eba63

Browse files
authored
Add support for strong consistency on GetWorkflowHistory and DescribeWorkflowExecution (#1445)
What changed? Adds support to make requests with a QueryConsistencyLevel to: GetWorkflowHistory DescribeWorkflowExecution Using STRONG as the QueryConsistencyLevel will direct cadence to retrieve information from the "active" cluster, regardless of where the request is received (active or passive). Note that this will increase the latency of requests that are received in the passive cluster. The existing methods have been kept for compatibility and can continue to be used.
1 parent d867eeb commit 97eba63

File tree

8 files changed

+583
-15
lines changed

8 files changed

+583
-15
lines changed

client/client.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,9 +78,20 @@ type (
7878
// QueryWorkflowWithOptionsResponse defines the response to QueryWorkflowWithOptions
7979
QueryWorkflowWithOptionsResponse = internal.QueryWorkflowWithOptionsResponse
8080

81+
// GetWorkflowHistoryWithOptionsRequest defines the request to GetWorkflowHistoryWithOptions
82+
GetWorkflowHistoryWithOptionsRequest = internal.GetWorkflowHistoryWithOptionsRequest
83+
84+
// DescribeWorkflowExecutionWithOptionsRequest defines the request to DescribeWorkflowExecutionWithOptions
85+
DescribeWorkflowExecutionWithOptionsRequest = internal.DescribeWorkflowExecutionWithOptionsRequest
86+
8187
// ParentClosePolicy defines the behavior performed on a child workflow when its parent is closed
8288
ParentClosePolicy = internal.ParentClosePolicy
8389

90+
// QueryConsistencyLevel defines the level of consistency the query should respond with.
91+
// It will default to the cluster's configuration if not specified.
92+
// Valid values are QueryConsistencyLevelEventual (served by the receiving cluster), and QueryConsistencyLevelStrong (redirects to the active cluster).
93+
QueryConsistencyLevel = internal.QueryConsistencyLevel
94+
8495
// CancelOption values are functional options for the CancelWorkflow method.
8596
// Supported values can be created with:
8697
// - WithCancelReason(...)
@@ -226,6 +237,26 @@ type (
226237
// }
227238
GetWorkflowHistory(ctx context.Context, workflowID string, runID string, isLongPoll bool, filterType s.HistoryEventFilterType) HistoryEventIterator
228239

240+
// GetWorkflowHistoryWithOptions gets history events of a particular workflow.
241+
// See GetWorkflowHistoryWithOptionsRequest for more information.
242+
// Returns an iterator of HistoryEvents - see shared.HistoryEvent for more details.
243+
// Example:-
244+
// To iterate all events,
245+
// iter := GetWorkflowHistory(ctx, workflowID, runID, isLongPoll, filterType)
246+
// events := []*shared.HistoryEvent{}
247+
// for iter.HasNext() {
248+
// event, err := iter.Next()
249+
// if err != nil {
250+
// return err
251+
// }
252+
// events = append(events, event)
253+
// }
254+
// Returns the following errors:
255+
// - EntityNotExistsError
256+
// - BadRequestError
257+
// - InternalServiceError
258+
GetWorkflowHistoryWithOptions(ctx context.Context, request *GetWorkflowHistoryWithOptionsRequest) HistoryEventIterator
259+
229260
// CompleteActivity reports activity completed.
230261
// activity Execute method can return activity.ErrResultPending to
231262
// indicate the activity is not completed when it's Execute method returns. In that case, this CompleteActivity() method
@@ -383,6 +414,14 @@ type (
383414
// - EntityNotExistError
384415
DescribeWorkflowExecution(ctx context.Context, workflowID, runID string) (*s.DescribeWorkflowExecutionResponse, error)
385416

417+
// DescribeWorkflowExecutionWithOptions returns information about the specified workflow execution.
418+
// See DescribeWorkflowExecutionWithOptionsRequest for more information.
419+
// The errors it can return:
420+
// - BadRequestError
421+
// - InternalServiceError
422+
// - EntityNotExistError
423+
DescribeWorkflowExecutionWithOptions(ctx context.Context, request *DescribeWorkflowExecutionWithOptionsRequest) (*s.DescribeWorkflowExecutionResponse, error)
424+
386425
// DescribeTaskList returns information about the target tasklist, right now this API returns the
387426
// pollers which polled this tasklist in last few minutes.
388427
// The errors it can return:
@@ -458,6 +497,15 @@ const (
458497
ParentClosePolicyAbandon = internal.ParentClosePolicyAbandon
459498
)
460499

500+
const (
501+
// QueryConsistencyLevelUnspecified will use the default consistency level provided by the cluster.
502+
QueryConsistencyLevelUnspecified = internal.QueryConsistencyLevelUnspecified
503+
// QueryConsistencyLevelEventual passes the request to the receiving cluster and returns eventually consistent results
504+
QueryConsistencyLevelEventual = internal.QueryConsistencyLevelEventual
505+
// QueryConsistencyLevelStrong will redirect the request to the active cluster and returns strongly consistent results
506+
QueryConsistencyLevelStrong = internal.QueryConsistencyLevelStrong
507+
)
508+
461509
// NewClient creates an instance of a workflow client
462510
func NewClient(service workflowserviceclient.Interface, domain string, options *Options) Client {
463511
return internal.NewClient(service, domain, options)

internal/client.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,15 @@ type (
219219
// }
220220
GetWorkflowHistory(ctx context.Context, workflowID string, runID string, isLongPoll bool, filterType s.HistoryEventFilterType) HistoryEventIterator
221221

222+
// GetWorkflowHistoryWithOptions gets history events of a particular workflow.
223+
// See GetWorkflowHistoryWithOptionsRequest for more information.
224+
// Returns an iterator of HistoryEvents - see shared.HistoryEvent for more details.
225+
// The errors it can return:
226+
// - EntityNotExistsError
227+
// - BadRequestError
228+
// - InternalServiceError
229+
GetWorkflowHistoryWithOptions(ctx context.Context, request *GetWorkflowHistoryWithOptionsRequest) HistoryEventIterator
230+
222231
// CompleteActivity reports activity completed.
223232
// activity Execute method can return acitivity.activity.ErrResultPending to
224233
// indicate the activity is not completed when it's Execute method returns. In that case, this CompleteActivity() method
@@ -367,6 +376,14 @@ type (
367376
// - EntityNotExistError
368377
DescribeWorkflowExecution(ctx context.Context, workflowID, runID string) (*s.DescribeWorkflowExecutionResponse, error)
369378

379+
// DescribeWorkflowExecutionWithOptions returns information about the specified workflow execution.
380+
// See DescribeWorkflowExecutionWithOptionsRequest for more information.
381+
// The errors it can return:
382+
// - BadRequestError
383+
// - InternalServiceError
384+
// - EntityNotExistError
385+
DescribeWorkflowExecutionWithOptions(ctx context.Context, request *DescribeWorkflowExecutionWithOptionsRequest) (*s.DescribeWorkflowExecutionResponse, error)
386+
370387
// DescribeTaskList returns information about the target tasklist, right now this API returns the
371388
// pollers which polled this tasklist in last few minutes.
372389
// The errors it can return:
@@ -569,6 +586,11 @@ type (
569586
// ParentClosePolicy defines the action on children when parent is closed
570587
ParentClosePolicy int
571588

589+
// QueryConsistencyLevel defines the level of consistency the query should respond with.
590+
// It will default to the cluster's configuration if not specified.
591+
// Valid values are QueryConsistencyLevelEventual (served by the receiving cluster), and QueryConsistencyLevelStrong (redirects to the active cluster).
592+
QueryConsistencyLevel int
593+
572594
// ActiveClusterSelectionPolicy defines the policy for selecting the active cluster to start the workflow execution on for active-active domains.
573595
// Active-active domains can be configured to be active in multiple clusters (at most one in a given region).
574596
// Individual workflows can be configured to be active in one of the active clusters of the domain.
@@ -620,6 +642,15 @@ const (
620642
WorkflowIDReusePolicyTerminateIfRunning
621643
)
622644

645+
const (
646+
// QueryConsistencyLevelUnspecified will use the default consistency level provided by the cluster.
647+
QueryConsistencyLevelUnspecified QueryConsistencyLevel = iota
648+
// QueryConsistencyLevelEventual passes the request to the receiving cluster and returns eventually consistent results
649+
QueryConsistencyLevelEventual
650+
// QueryConsistencyLevelStrong will redirect the request to the active cluster and returns strongly consistent results
651+
QueryConsistencyLevelStrong
652+
)
653+
623654
func getFeatureFlags(options *ClientOptions) FeatureFlags {
624655
if options != nil {
625656
return FeatureFlags{

internal/convert.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,3 +72,14 @@ func convertActiveClusterSelectionPolicy(policy *ActiveClusterSelectionPolicy) (
7272
return nil, fmt.Errorf("invalid active cluster selection strategy: %d", policy.Strategy)
7373
}
7474
}
75+
76+
func convertQueryConsistencyLevel(level QueryConsistencyLevel) *s.QueryConsistencyLevel {
77+
switch level {
78+
case QueryConsistencyLevelEventual:
79+
return s.QueryConsistencyLevelEventual.Ptr()
80+
case QueryConsistencyLevelStrong:
81+
return s.QueryConsistencyLevelStrong.Ptr()
82+
default:
83+
return nil
84+
}
85+
}

internal/convert_test.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,3 +157,39 @@ func TestConvertActiveClusterSelectionPolicy(t *testing.T) {
157157
}
158158
}
159159
}
160+
161+
func TestConvertQueryConsistencyLevel(t *testing.T) {
162+
tests := []struct {
163+
name string
164+
level QueryConsistencyLevel
165+
expectedThrift *s.QueryConsistencyLevel
166+
}{
167+
{
168+
name: "QueryConsistencyLevelUnspecified - return nil",
169+
level: QueryConsistencyLevelUnspecified,
170+
expectedThrift: nil,
171+
},
172+
{
173+
name: "QueryConsistencyLevelEventual - return eventual",
174+
level: QueryConsistencyLevelEventual,
175+
expectedThrift: s.QueryConsistencyLevelEventual.Ptr(),
176+
},
177+
{
178+
name: "QueryConsistencyLevelStrong - return strong",
179+
level: QueryConsistencyLevelStrong,
180+
expectedThrift: s.QueryConsistencyLevelStrong.Ptr(),
181+
},
182+
{
183+
name: "invalid level (999) - return nil",
184+
level: QueryConsistencyLevel(999),
185+
expectedThrift: nil,
186+
},
187+
}
188+
189+
for _, test := range tests {
190+
t.Run(test.name, func(t *testing.T) {
191+
result := convertQueryConsistencyLevel(test.level)
192+
assert.Equal(t, test.expectedThrift, result)
193+
})
194+
}
195+
}

internal/internal_workflow_client.go

Lines changed: 82 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -445,6 +445,26 @@ func (wc *workflowClient) TerminateWorkflow(ctx context.Context, workflowID stri
445445
return err
446446
}
447447

448+
// GetWorkflowHistoryWithOptionsRequest is the request to GetWorkflowHistoryWithOptions
449+
type GetWorkflowHistoryWithOptionsRequest struct {
450+
// WorkflowID specifies the workflow to request. Required.
451+
WorkflowID string
452+
// RunID is an optional field used to identify a specific run of the workflow.
453+
// If RunID is not provided the latest run will be used.
454+
RunID string
455+
// IsLongPoll is an optional field indicating whether to continue polling for new events until the workflow is terminal.
456+
// If IsLongPoll is true, the client can continue to iterate on new events that occur after the initial request.
457+
// Note that this means the client request will remain open until the workflow is terminal.
458+
IsLongPoll bool
459+
// FilterType is an optional field used to specify which events to return.
460+
// CloseEvent will only return the last event in the workflow history.
461+
FilterType s.HistoryEventFilterType
462+
// QueryConsistencyLevel is an optional field used to specify the consistency level for the query.
463+
// QueryConsistencyLevelStrong will query the currently active cluster for this workflow - at the potential cost of additional latency.
464+
// If not set, server will use the default consistency level.
465+
QueryConsistencyLevel QueryConsistencyLevel
466+
}
467+
448468
// GetWorkflowHistory return a channel which contains the history events of a given workflow
449469
func (wc *workflowClient) GetWorkflowHistory(
450470
ctx context.Context,
@@ -453,19 +473,37 @@ func (wc *workflowClient) GetWorkflowHistory(
453473
isLongPoll bool,
454474
filterType s.HistoryEventFilterType,
455475
) HistoryEventIterator {
476+
request := &GetWorkflowHistoryWithOptionsRequest{
477+
WorkflowID: workflowID,
478+
RunID: runID,
479+
IsLongPoll: isLongPoll,
480+
FilterType: filterType,
481+
QueryConsistencyLevel: QueryConsistencyLevelUnspecified,
482+
}
483+
iter := wc.GetWorkflowHistoryWithOptions(ctx, request)
484+
return iter
485+
}
456486

487+
// GetWorkflowHistoryWithOptions gets history events with additional options including query consistency level.
488+
// See GetWorkflowHistoryWithOptionsRequest for more information.
489+
// The errors it can return:
490+
// - EntityNotExistsError
491+
// - BadRequestError
492+
// - InternalServiceError
493+
func (wc *workflowClient) GetWorkflowHistoryWithOptions(ctx context.Context, request *GetWorkflowHistoryWithOptionsRequest) HistoryEventIterator {
457494
domain := wc.domain
458495
paginate := func(nextToken []byte) (*s.GetWorkflowExecutionHistoryResponse, error) {
459-
request := &s.GetWorkflowExecutionHistoryRequest{
496+
req := &s.GetWorkflowExecutionHistoryRequest{
460497
Domain: common.StringPtr(domain),
461498
Execution: &s.WorkflowExecution{
462-
WorkflowId: common.StringPtr(workflowID),
463-
RunId: getRunID(runID),
499+
WorkflowId: common.StringPtr(request.WorkflowID),
500+
RunId: getRunID(request.RunID),
464501
},
465-
WaitForNewEvent: common.BoolPtr(isLongPoll),
466-
HistoryEventFilterType: &filterType,
502+
WaitForNewEvent: common.BoolPtr(request.IsLongPoll),
503+
HistoryEventFilterType: &request.FilterType,
467504
NextPageToken: nextToken,
468-
SkipArchival: common.BoolPtr(isLongPoll),
505+
SkipArchival: common.BoolPtr(request.IsLongPoll),
506+
QueryConsistencyLevel: convertQueryConsistencyLevel(request.QueryConsistencyLevel),
469507
}
470508

471509
var response *s.GetWorkflowExecutionHistoryResponse
@@ -477,7 +515,7 @@ func (wc *workflowClient) GetWorkflowHistory(
477515
func() error {
478516
var err1 error
479517
tchCtx, cancel, opt := newChannelContext(ctx, wc.featureFlags, func(builder *contextBuilder) {
480-
if isLongPoll {
518+
if request.IsLongPoll {
481519
builder.Timeout = defaultGetHistoryTimeoutInSecs * time.Second
482520
deadline, ok := ctx.Deadline()
483521
if ok && deadline.Before(time.Now().Add(builder.Timeout)) {
@@ -487,14 +525,14 @@ func (wc *workflowClient) GetWorkflowHistory(
487525
}
488526
})
489527
defer cancel()
490-
response, err1 = wc.workflowService.GetWorkflowExecutionHistory(tchCtx, request, opt...)
528+
response, err1 = wc.workflowService.GetWorkflowExecutionHistory(tchCtx, req, opt...)
491529

492530
if err1 != nil {
493531
return err1
494532
}
495533

496534
if response.RawHistory != nil {
497-
history, err := serializer.DeserializeBlobDataToHistoryEvents(response.RawHistory, filterType)
535+
history, err := serializer.DeserializeBlobDataToHistoryEvents(response.RawHistory, request.FilterType)
498536
if err != nil {
499537
return err
500538
}
@@ -511,15 +549,15 @@ func (wc *workflowClient) GetWorkflowHistory(
511549
if err != nil {
512550
return nil, err
513551
}
514-
if isLongPoll && len(response.History.Events) == 0 && len(response.NextPageToken) != 0 {
552+
if request.IsLongPoll && len(response.History.Events) == 0 && len(response.NextPageToken) != 0 {
515553
if isFinalLongPoll {
516554
// essentially a deadline exceeded, the last attempt did not get a result.
517555
// this is necessary because the server does not know if we are able to try again,
518556
// so it returns an empty result slightly before a timeout occurs, so the next
519557
// attempt's token can be returned if it wishes to retry.
520558
return nil, fmt.Errorf("timed out waiting for the workflow to finish: %w", context.DeadlineExceeded)
521559
}
522-
request.NextPageToken = response.NextPageToken
560+
req.NextPageToken = response.NextPageToken
523561
continue Loop
524562
}
525563
break Loop
@@ -782,26 +820,55 @@ func (wc *workflowClient) GetSearchAttributes(ctx context.Context) (*s.GetSearch
782820
return response, nil
783821
}
784822

823+
// DescribeWorkflowExecutionWithOptionsRequest is the request to DescribeWorkflowExecutionWithOptions
824+
type DescribeWorkflowExecutionWithOptionsRequest struct {
825+
// WorkflowID specifies the workflow to request. Required.
826+
WorkflowID string
827+
// RunID is an optional field used to identify a specific run of the workflow.
828+
// If RunID is not provided the latest run will be used.
829+
RunID string
830+
// QueryConsistencyLevel is an optional field used to specify the consistency level for the query.
831+
// QueryConsistencyLevelStrong will query the currently active cluster for this workflow - at the potential cost of additional latency.
832+
// If not set, server will use the default consistency level.
833+
QueryConsistencyLevel QueryConsistencyLevel
834+
}
835+
785836
// DescribeWorkflowExecution returns information about the specified workflow execution.
786837
// The errors it can return:
787838
// - BadRequestError
788839
// - InternalServiceError
789840
// - EntityNotExistError
790841
func (wc *workflowClient) DescribeWorkflowExecution(ctx context.Context, workflowID, runID string) (*s.DescribeWorkflowExecutionResponse, error) {
791-
request := &s.DescribeWorkflowExecutionRequest{
842+
request := &DescribeWorkflowExecutionWithOptionsRequest{
843+
WorkflowID: workflowID,
844+
RunID: runID,
845+
QueryConsistencyLevel: QueryConsistencyLevelUnspecified,
846+
}
847+
return wc.DescribeWorkflowExecutionWithOptions(ctx, request)
848+
}
849+
850+
// DescribeWorkflowExecutionWithOptions returns information about workflow execution with additional options including query consistency level.
851+
// See DescribeWorkflowExecutionWithOptionsRequest for more information.
852+
// The errors it can return:
853+
// - BadRequestError
854+
// - InternalServiceError
855+
// - EntityNotExistError
856+
func (wc *workflowClient) DescribeWorkflowExecutionWithOptions(ctx context.Context, request *DescribeWorkflowExecutionWithOptionsRequest) (*s.DescribeWorkflowExecutionResponse, error) {
857+
req := &s.DescribeWorkflowExecutionRequest{
792858
Domain: common.StringPtr(wc.domain),
793859
Execution: &s.WorkflowExecution{
794-
WorkflowId: common.StringPtr(workflowID),
795-
RunId: common.StringPtr(runID),
860+
WorkflowId: common.StringPtr(request.WorkflowID),
861+
RunId: common.StringPtr(request.RunID),
796862
},
863+
QueryConsistencyLevel: convertQueryConsistencyLevel(request.QueryConsistencyLevel),
797864
}
798865
var response *s.DescribeWorkflowExecutionResponse
799866
err := backoff.Retry(ctx,
800867
func() error {
801868
var err1 error
802869
tchCtx, cancel, opt := newChannelContext(ctx, wc.featureFlags)
803870
defer cancel()
804-
response, err1 = wc.workflowService.DescribeWorkflowExecution(tchCtx, request, opt...)
871+
response, err1 = wc.workflowService.DescribeWorkflowExecution(tchCtx, req, opt...)
805872
return err1
806873
}, createDynamicServiceRetryPolicy(ctx), isServiceTransientError)
807874
if err != nil {

0 commit comments

Comments
 (0)