Skip to content

Commit 9326206

Browse files
committed
add external type for QueryConsistencyLevel, mappers to internal thrift types
1 parent 4aeb2f1 commit 9326206

File tree

7 files changed

+88
-13
lines changed

7 files changed

+88
-13
lines changed

client/client.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,11 @@ type (
8787
// ParentClosePolicy defines the behavior performed on a child workflow when its parent is closed
8888
ParentClosePolicy = internal.ParentClosePolicy
8989

90+
// QueryConsistencyLevel defines the level of consistency the query should respond with.
91+
// It will default to the cluster's configuration if not specified.
92+
// Valid values are QueryConsistencyLevelEventual (served by the receiving cluster), and QueryConsistencyLevelStrong (redirects to the active cluster).
93+
QueryConsistencyLevel = internal.QueryConsistencyLevel
94+
9095
// CancelOption values are functional options for the CancelWorkflow method.
9196
// Supported values can be created with:
9297
// - WithCancelReason(...)
@@ -492,6 +497,15 @@ const (
492497
ParentClosePolicyAbandon = internal.ParentClosePolicyAbandon
493498
)
494499

500+
const (
501+
// QueryConsistencyLevelUnspecified will use the default consistency level provided by the cluster.
502+
QueryConsistencyLevelUnspecified = internal.QueryConsistencyLevelUnspecified
503+
// QueryConsistencyLevelEventual passes the request to the receiving cluster and returns eventually consistent results
504+
QueryConsistencyLevelEventual = internal.QueryConsistencyLevelEventual
505+
// QueryConsistencyLevelStrong will redirect the request to the active cluster and returns strongly consistent results
506+
QueryConsistencyLevelStrong = internal.QueryConsistencyLevelStrong
507+
)
508+
495509
// NewClient creates an instance of a workflow client
496510
func NewClient(service workflowserviceclient.Interface, domain string, options *Options) Client {
497511
return internal.NewClient(service, domain, options)

internal/client.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -586,6 +586,11 @@ type (
586586
// ParentClosePolicy defines the action on children when parent is closed
587587
ParentClosePolicy int
588588

589+
// QueryConsistencyLevel defines the level of consistency the query should respond with.
590+
// It will default to the cluster's configuration if not specified.
591+
// Valid values are QueryConsistencyLevelEventual (served by the receiving cluster), and QueryConsistencyLevelStrong (redirects to the active cluster).
592+
QueryConsistencyLevel int
593+
589594
// ActiveClusterSelectionPolicy defines the policy for selecting the active cluster to start the workflow execution on for active-active domains.
590595
// Active-active domains can be configured to be active in multiple clusters (at most one in a given region).
591596
// Individual workflows can be configured to be active in one of the active clusters of the domain.
@@ -637,6 +642,15 @@ const (
637642
WorkflowIDReusePolicyTerminateIfRunning
638643
)
639644

645+
const (
646+
// QueryConsistencyLevelUnspecified will use the default consistency level provided by the cluster.
647+
QueryConsistencyLevelUnspecified QueryConsistencyLevel = iota
648+
// QueryConsistencyLevelEventual passes the request to the receiving cluster and returns eventually consistent results
649+
QueryConsistencyLevelEventual
650+
// QueryConsistencyLevelStrong will redirect the request to the active cluster and returns strongly consistent results
651+
QueryConsistencyLevelStrong
652+
)
653+
640654
func getFeatureFlags(options *ClientOptions) FeatureFlags {
641655
if options != nil {
642656
return FeatureFlags{

internal/convert.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,3 +72,14 @@ func convertActiveClusterSelectionPolicy(policy *ActiveClusterSelectionPolicy) (
7272
return nil, fmt.Errorf("invalid active cluster selection strategy: %d", policy.Strategy)
7373
}
7474
}
75+
76+
func convertQueryConsistencyLevel(level QueryConsistencyLevel) *s.QueryConsistencyLevel {
77+
switch level {
78+
case QueryConsistencyLevelEventual:
79+
return s.QueryConsistencyLevelEventual.Ptr()
80+
case QueryConsistencyLevelStrong:
81+
return s.QueryConsistencyLevelStrong.Ptr()
82+
default:
83+
return nil
84+
}
85+
}

internal/convert_test.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,3 +157,39 @@ func TestConvertActiveClusterSelectionPolicy(t *testing.T) {
157157
}
158158
}
159159
}
160+
161+
func TestConvertQueryConsistencyLevel(t *testing.T) {
162+
tests := []struct {
163+
name string
164+
level QueryConsistencyLevel
165+
expectedThrift *s.QueryConsistencyLevel
166+
}{
167+
{
168+
name: "QueryConsistencyLevelUnspecified - return nil",
169+
level: QueryConsistencyLevelUnspecified,
170+
expectedThrift: nil,
171+
},
172+
{
173+
name: "QueryConsistencyLevelEventual - return eventual",
174+
level: QueryConsistencyLevelEventual,
175+
expectedThrift: s.QueryConsistencyLevelEventual.Ptr(),
176+
},
177+
{
178+
name: "QueryConsistencyLevelStrong - return strong",
179+
level: QueryConsistencyLevelStrong,
180+
expectedThrift: s.QueryConsistencyLevelStrong.Ptr(),
181+
},
182+
{
183+
name: "invalid level (999) - return nil",
184+
level: QueryConsistencyLevel(999),
185+
expectedThrift: nil,
186+
},
187+
}
188+
189+
for _, test := range tests {
190+
t.Run(test.name, func(t *testing.T) {
191+
result := convertQueryConsistencyLevel(test.level)
192+
assert.Equal(t, test.expectedThrift, result)
193+
})
194+
}
195+
}

internal/internal_workflow_client.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -462,7 +462,7 @@ type GetWorkflowHistoryWithOptionsRequest struct {
462462
// QueryConsistencyLevel is an optional field used to specify the consistency level for the query.
463463
// QueryConsistencyLevelStrong will query the currently active cluster for this workflow - at the potential cost of additional latency.
464464
// If not set, server will use the default consistency level.
465-
QueryConsistencyLevel *s.QueryConsistencyLevel
465+
QueryConsistencyLevel QueryConsistencyLevel
466466
}
467467

468468
// GetWorkflowHistory return a channel which contains the history events of a given workflow
@@ -478,7 +478,7 @@ func (wc *workflowClient) GetWorkflowHistory(
478478
RunID: runID,
479479
IsLongPoll: isLongPoll,
480480
FilterType: filterType,
481-
QueryConsistencyLevel: nil, // Use server default
481+
QueryConsistencyLevel: QueryConsistencyLevelUnspecified,
482482
}
483483
iter, _ := wc.GetWorkflowHistoryWithOptions(ctx, request)
484484
return iter
@@ -503,7 +503,7 @@ func (wc *workflowClient) GetWorkflowHistoryWithOptions(ctx context.Context, req
503503
HistoryEventFilterType: &request.FilterType,
504504
NextPageToken: nextToken,
505505
SkipArchival: common.BoolPtr(request.IsLongPoll),
506-
QueryConsistencyLevel: request.QueryConsistencyLevel,
506+
QueryConsistencyLevel: convertQueryConsistencyLevel(request.QueryConsistencyLevel),
507507
}
508508

509509
var response *s.GetWorkflowExecutionHistoryResponse
@@ -830,7 +830,7 @@ type DescribeWorkflowExecutionWithOptionsRequest struct {
830830
// QueryConsistencyLevel is an optional field used to specify the consistency level for the query.
831831
// QueryConsistencyLevelStrong will query the currently active cluster for this workflow - at the potential cost of additional latency.
832832
// If not set, server will use the default consistency level.
833-
QueryConsistencyLevel *s.QueryConsistencyLevel
833+
QueryConsistencyLevel QueryConsistencyLevel
834834
}
835835

836836
// DescribeWorkflowExecution returns information about the specified workflow execution.
@@ -842,7 +842,7 @@ func (wc *workflowClient) DescribeWorkflowExecution(ctx context.Context, workflo
842842
request := &DescribeWorkflowExecutionWithOptionsRequest{
843843
WorkflowID: workflowID,
844844
RunID: runID,
845-
QueryConsistencyLevel: nil, // Use server default
845+
QueryConsistencyLevel: QueryConsistencyLevelUnspecified,
846846
}
847847
return wc.DescribeWorkflowExecutionWithOptions(ctx, request)
848848
}
@@ -860,7 +860,7 @@ func (wc *workflowClient) DescribeWorkflowExecutionWithOptions(ctx context.Conte
860860
WorkflowId: common.StringPtr(request.WorkflowID),
861861
RunId: common.StringPtr(request.RunID),
862862
},
863-
QueryConsistencyLevel: request.QueryConsistencyLevel,
863+
QueryConsistencyLevel: convertQueryConsistencyLevel(request.QueryConsistencyLevel),
864864
}
865865
var response *s.DescribeWorkflowExecutionResponse
866866
err := backoff.Retry(ctx,

internal/internal_workflow_client_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2056,7 +2056,7 @@ func (s *workflowClientTestSuite) TestDescribeWorkflowExecutionWithOptions() {
20562056
request: &DescribeWorkflowExecutionWithOptionsRequest{
20572057
WorkflowID: workflowID,
20582058
RunID: runID,
2059-
QueryConsistencyLevel: shared.QueryConsistencyLevelEventual.Ptr(),
2059+
QueryConsistencyLevel: QueryConsistencyLevelEventual,
20602060
},
20612061
requestValidator: func(req *shared.DescribeWorkflowExecutionRequest) {
20622062
s.Equal(domain, req.GetDomain())
@@ -2077,7 +2077,7 @@ func (s *workflowClientTestSuite) TestDescribeWorkflowExecutionWithOptions() {
20772077
request: &DescribeWorkflowExecutionWithOptionsRequest{
20782078
WorkflowID: workflowID,
20792079
RunID: runID,
2080-
QueryConsistencyLevel: shared.QueryConsistencyLevelStrong.Ptr(),
2080+
QueryConsistencyLevel: QueryConsistencyLevelStrong,
20812081
},
20822082
requestValidator: func(req *shared.DescribeWorkflowExecutionRequest) {
20832083
s.Equal(domain, req.GetDomain())
@@ -2673,7 +2673,7 @@ func (s *workflowClientTestSuite) TestGetWorkflowHistoryWithOptions() {
26732673
RunID: runID,
26742674
IsLongPoll: true,
26752675
FilterType: shared.HistoryEventFilterTypeCloseEvent,
2676-
QueryConsistencyLevel: shared.QueryConsistencyLevelEventual.Ptr(),
2676+
QueryConsistencyLevel: QueryConsistencyLevelEventual,
26772677
},
26782678
requestValidator: func(req *shared.GetWorkflowExecutionHistoryRequest) {
26792679
s.Equal(domain, req.GetDomain())
@@ -2709,7 +2709,7 @@ func (s *workflowClientTestSuite) TestGetWorkflowHistoryWithOptions() {
27092709
RunID: runID,
27102710
IsLongPoll: false,
27112711
FilterType: shared.HistoryEventFilterTypeAllEvent,
2712-
QueryConsistencyLevel: shared.QueryConsistencyLevelStrong.Ptr(),
2712+
QueryConsistencyLevel: QueryConsistencyLevelStrong,
27132713
},
27142714
requestValidator: func(req *shared.GetWorkflowExecutionHistoryRequest) {
27152715
s.Equal(domain, req.GetDomain())
@@ -2745,7 +2745,7 @@ func (s *workflowClientTestSuite) TestGetWorkflowHistoryWithOptions() {
27452745
RunID: runID,
27462746
IsLongPoll: false,
27472747
FilterType: shared.HistoryEventFilterTypeAllEvent,
2748-
QueryConsistencyLevel: shared.QueryConsistencyLevelStrong.Ptr(),
2748+
QueryConsistencyLevel: QueryConsistencyLevelStrong,
27492749
},
27502750
requestValidator: func(req *shared.GetWorkflowExecutionHistoryRequest) {
27512751
s.Equal(domain, req.GetDomain())

test/integration_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -320,7 +320,7 @@ func (ts *IntegrationTestSuite) TestConsistentQuery() {
320320
descResp, err := ts.libClient.DescribeWorkflowExecutionWithOptions(ctx, &client.DescribeWorkflowExecutionWithOptionsRequest{
321321
WorkflowID: "test-consistent-query",
322322
RunID: run.GetRunID(),
323-
QueryConsistencyLevel: shared.QueryConsistencyLevelStrong.Ptr(),
323+
QueryConsistencyLevel: client.QueryConsistencyLevelStrong,
324324
})
325325
ts.Nil(err)
326326
ts.NotNil(descResp)
@@ -334,7 +334,7 @@ func (ts *IntegrationTestSuite) TestConsistentQuery() {
334334
RunID: run.GetRunID(),
335335
IsLongPoll: false,
336336
FilterType: shared.HistoryEventFilterTypeAllEvent,
337-
QueryConsistencyLevel: shared.QueryConsistencyLevelStrong.Ptr(),
337+
QueryConsistencyLevel: client.QueryConsistencyLevelStrong,
338338
})
339339
ts.Nil(err)
340340
ts.NotNil(histIter)

0 commit comments

Comments
 (0)