Skip to content

Commit 61495d9

Browse files
Support feature flags in client (#1103)
1 parent ca6e026 commit 61495d9

18 files changed

+1001
-122
lines changed

.gen/go/shared/shared.go

Lines changed: 746 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

client/client.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@ type (
4747
// Options are optional parameters for Client creation.
4848
Options = internal.ClientOptions
4949

50+
// FeatureFlags define which breaking changes can be enabled for client
51+
FeatureFlags = internal.FeatureFlags
52+
5053
// StartWorkflowOptions configuration parameters for starting a workflow execution.
5154
StartWorkflowOptions = internal.StartWorkflowOptions
5255

evictiontest/workflow_cache_eviction_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ func TestWorkersTestSuite(t *testing.T) {
8787
}
8888

8989
// this is the mock for yarpcCallOptions, make sure length are the same
90-
var callOptions = []interface{}{gomock.Any(), gomock.Any(), gomock.Any()}
90+
var callOptions = []interface{}{gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()}
9191

9292
func createTestEventWorkflowExecutionStarted(eventID int64, attr *m.WorkflowExecutionStartedEventAttributes) *m.HistoryEvent {
9393
return &m.HistoryEvent{

idls

Submodule idls updated from 68a720e to 7d2d225

internal/activity_test.go

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -55,11 +55,13 @@ func (s *activityTestSuite) TearDownTest() {
5555
}
5656

5757
// this is the mock for yarpcCallOptions, make sure length are the same
58-
var callOptions = []interface{}{gomock.Any(), gomock.Any(), gomock.Any()}
58+
var callOptions = []interface{}{gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()}
59+
60+
var featureFlags = FeatureFlags{}
5961

6062
func (s *activityTestSuite) TestActivityHeartbeat() {
6163
ctx, cancel := context.WithCancel(context.Background())
62-
invoker := newServiceInvoker([]byte("task-token"), "identity", s.service, cancel, 1, make(chan struct{}))
64+
invoker := newServiceInvoker([]byte("task-token"), "identity", s.service, cancel, 1, make(chan struct{}), featureFlags)
6365
ctx = context.WithValue(ctx, activityEnvContextKey, &activityEnvironment{serviceInvoker: invoker})
6466

6567
s.service.EXPECT().RecordActivityTaskHeartbeat(gomock.Any(), gomock.Any(), callOptions...).
@@ -70,7 +72,7 @@ func (s *activityTestSuite) TestActivityHeartbeat() {
7072

7173
func (s *activityTestSuite) TestActivityHeartbeat_InternalError() {
7274
ctx, cancel := context.WithCancel(context.Background())
73-
invoker := newServiceInvoker([]byte("task-token"), "identity", s.service, cancel, 1, make(chan struct{}))
75+
invoker := newServiceInvoker([]byte("task-token"), "identity", s.service, cancel, 1, make(chan struct{}), featureFlags)
7476
ctx = context.WithValue(ctx, activityEnvContextKey, &activityEnvironment{
7577
serviceInvoker: invoker,
7678
logger: getTestLogger(s.T())})
@@ -86,7 +88,7 @@ func (s *activityTestSuite) TestActivityHeartbeat_InternalError() {
8688

8789
func (s *activityTestSuite) TestActivityHeartbeat_CancelRequested() {
8890
ctx, cancel := context.WithCancel(context.Background())
89-
invoker := newServiceInvoker([]byte("task-token"), "identity", s.service, cancel, 1, make(chan struct{}))
91+
invoker := newServiceInvoker([]byte("task-token"), "identity", s.service, cancel, 1, make(chan struct{}), featureFlags)
9092
ctx = context.WithValue(ctx, activityEnvContextKey, &activityEnvironment{
9193
serviceInvoker: invoker,
9294
logger: getTestLogger(s.T())})
@@ -101,7 +103,7 @@ func (s *activityTestSuite) TestActivityHeartbeat_CancelRequested() {
101103

102104
func (s *activityTestSuite) TestActivityHeartbeat_EntityNotExist() {
103105
ctx, cancel := context.WithCancel(context.Background())
104-
invoker := newServiceInvoker([]byte("task-token"), "identity", s.service, cancel, 1, make(chan struct{}))
106+
invoker := newServiceInvoker([]byte("task-token"), "identity", s.service, cancel, 1, make(chan struct{}), featureFlags)
105107
ctx = context.WithValue(ctx, activityEnvContextKey, &activityEnvironment{
106108
serviceInvoker: invoker,
107109
logger: getTestLogger(s.T())})
@@ -116,7 +118,7 @@ func (s *activityTestSuite) TestActivityHeartbeat_EntityNotExist() {
116118

117119
func (s *activityTestSuite) TestActivityHeartbeat_SuppressContinousInvokes() {
118120
ctx, cancel := context.WithCancel(context.Background())
119-
invoker := newServiceInvoker([]byte("task-token"), "identity", s.service, cancel, 2, make(chan struct{}))
121+
invoker := newServiceInvoker([]byte("task-token"), "identity", s.service, cancel, 2, make(chan struct{}), featureFlags)
120122
ctx = context.WithValue(ctx, activityEnvContextKey, &activityEnvironment{
121123
serviceInvoker: invoker,
122124
logger: getTestLogger(s.T())})
@@ -131,7 +133,7 @@ func (s *activityTestSuite) TestActivityHeartbeat_SuppressContinousInvokes() {
131133

132134
// No HB timeout configured.
133135
service2 := workflowservicetest.NewMockClient(s.mockCtrl)
134-
invoker2 := newServiceInvoker([]byte("task-token"), "identity", service2, cancel, 0, make(chan struct{}))
136+
invoker2 := newServiceInvoker([]byte("task-token"), "identity", service2, cancel, 0, make(chan struct{}), featureFlags)
135137
ctx = context.WithValue(ctx, activityEnvContextKey, &activityEnvironment{
136138
serviceInvoker: invoker2,
137139
logger: getTestLogger(s.T())})
@@ -144,7 +146,7 @@ func (s *activityTestSuite) TestActivityHeartbeat_SuppressContinousInvokes() {
144146
// simulate batch picks before expiry.
145147
waitCh := make(chan struct{})
146148
service3 := workflowservicetest.NewMockClient(s.mockCtrl)
147-
invoker3 := newServiceInvoker([]byte("task-token"), "identity", service3, cancel, 2, make(chan struct{}))
149+
invoker3 := newServiceInvoker([]byte("task-token"), "identity", service3, cancel, 2, make(chan struct{}), featureFlags)
148150
ctx = context.WithValue(ctx, activityEnvContextKey, &activityEnvironment{
149151
serviceInvoker: invoker3,
150152
logger: getTestLogger(s.T())})
@@ -174,7 +176,7 @@ func (s *activityTestSuite) TestActivityHeartbeat_SuppressContinousInvokes() {
174176
// simulate batch picks before expiry, with out any progress specified.
175177
waitCh2 := make(chan struct{})
176178
service4 := workflowservicetest.NewMockClient(s.mockCtrl)
177-
invoker4 := newServiceInvoker([]byte("task-token"), "identity", service4, cancel, 2, make(chan struct{}))
179+
invoker4 := newServiceInvoker([]byte("task-token"), "identity", service4, cancel, 2, make(chan struct{}), featureFlags)
178180
ctx = context.WithValue(ctx, activityEnvContextKey, &activityEnvironment{
179181
serviceInvoker: invoker4,
180182
logger: getTestLogger(s.T())})
@@ -198,7 +200,7 @@ func (s *activityTestSuite) TestActivityHeartbeat_SuppressContinousInvokes() {
198200
func (s *activityTestSuite) TestActivityHeartbeat_WorkerStop() {
199201
ctx, cancel := context.WithCancel(context.Background())
200202
workerStopChannel := make(chan struct{})
201-
invoker := newServiceInvoker([]byte("task-token"), "identity", s.service, cancel, 5, workerStopChannel)
203+
invoker := newServiceInvoker([]byte("task-token"), "identity", s.service, cancel, 5, workerStopChannel, featureFlags)
202204
ctx = context.WithValue(ctx, activityEnvContextKey, &activityEnvironment{serviceInvoker: invoker})
203205

204206
heartBeatDetail := "testDetails"

internal/client.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,7 @@ type (
335335
DataConverter DataConverter
336336
Tracer opentracing.Tracer
337337
ContextPropagators []ContextPropagator
338+
FeatureFlags FeatureFlags
338339
}
339340

340341
// StartWorkflowOptions configuration parameters for starting a workflow execution.
@@ -502,6 +503,15 @@ const (
502503
WorkflowIDReusePolicyTerminateIfRunning
503504
)
504505

506+
func getFeatureFlags(options *ClientOptions) FeatureFlags {
507+
if options != nil {
508+
return FeatureFlags{
509+
WorkflowExecutionAlreadyCompletedErrorEnabled: options.FeatureFlags.WorkflowExecutionAlreadyCompletedErrorEnabled,
510+
}
511+
}
512+
return FeatureFlags{}
513+
}
514+
505515
// NewClient creates an instance of a workflow client
506516
func NewClient(service workflowserviceclient.Interface, domain string, options *ClientOptions) Client {
507517
var identity string
@@ -532,6 +542,7 @@ func NewClient(service workflowserviceclient.Interface, domain string, options *
532542
} else {
533543
tracer = opentracing.NoopTracer{}
534544
}
545+
535546
return &workflowClient{
536547
workflowService: metrics.NewWorkflowServiceWrapper(service, metricScope),
537548
domain: domain,
@@ -541,6 +552,7 @@ func NewClient(service workflowserviceclient.Interface, domain string, options *
541552
dataConverter: dataConverter,
542553
contextPropagators: contextPropagators,
543554
tracer: tracer,
555+
featureFlags: getFeatureFlags(options),
544556
}
545557
}
546558

@@ -557,10 +569,12 @@ func NewDomainClient(service workflowserviceclient.Interface, options *ClientOpt
557569
metricScope = options.MetricsScope
558570
}
559571
metricScope = tagScope(metricScope, tagDomain, "domain-client", clientImplHeaderName, clientImplHeaderValue)
572+
560573
return &domainClient{
561574
workflowService: metrics.NewWorkflowServiceWrapper(service, metricScope),
562575
metricsScope: metricScope,
563576
identity: identity,
577+
featureFlags: getFeatureFlags(options),
564578
}
565579
}
566580

internal/internal_task_handlers.go

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ type (
144144
workerStopCh <-chan struct{}
145145
contextPropagators []ContextPropagator
146146
tracer opentracing.Tracer
147+
featureFlags FeatureFlags
147148
}
148149

149150
// history wrapper method to help information about events.
@@ -1659,6 +1660,7 @@ func newActivityTaskHandlerWithCustomProvider(
16591660
workerStopCh: params.WorkerStopChannel,
16601661
contextPropagators: params.ContextPropagators,
16611662
tracer: params.Tracer,
1663+
featureFlags: params.FeatureFlags,
16621664
}
16631665
}
16641666

@@ -1674,6 +1676,7 @@ type cadenceInvoker struct {
16741676
lastDetailsReported *[]byte // Details that were reported in the last reporting interval.
16751677
closeCh chan struct{}
16761678
workerStopChannel <-chan struct{}
1679+
featureFlags FeatureFlags
16771680
}
16781681

16791682
func (i *cadenceInvoker) Heartbeat(details []byte) error {
@@ -1778,7 +1781,7 @@ func (i *cadenceInvoker) internalHeartBeat(details []byte) (bool, error) {
17781781
ctx, cancel := context.WithTimeout(context.Background(), timeout)
17791782
defer cancel()
17801783

1781-
err := recordActivityHeartbeat(ctx, i.service, i.identity, i.taskToken, details)
1784+
err := recordActivityHeartbeat(ctx, i.service, i.identity, i.taskToken, details, i.featureFlags)
17821785

17831786
switch err.(type) {
17841787
case *CanceledError:
@@ -1813,7 +1816,7 @@ func (i *cadenceInvoker) Close(flushBufferedHeartbeat bool) {
18131816
}
18141817

18151818
func (i *cadenceInvoker) SignalWorkflow(ctx context.Context, domain, workflowID, runID, signalName string, signalInput []byte) error {
1816-
return signalWorkflow(ctx, i.service, i.identity, domain, workflowID, runID, signalName, signalInput)
1819+
return signalWorkflow(ctx, i.service, i.identity, domain, workflowID, runID, signalName, signalInput, i.featureFlags)
18171820
}
18181821

18191822
func newServiceInvoker(
@@ -1823,6 +1826,7 @@ func newServiceInvoker(
18231826
cancelHandler func(),
18241827
heartBeatTimeoutInSec int32,
18251828
workerStopChannel <-chan struct{},
1829+
featureFlags FeatureFlags,
18261830
) ServiceInvoker {
18271831
return &cadenceInvoker{
18281832
taskToken: taskToken,
@@ -1832,6 +1836,7 @@ func newServiceInvoker(
18321836
heartBeatTimeoutInSec: heartBeatTimeoutInSec,
18331837
closeCh: make(chan struct{}),
18341838
workerStopChannel: workerStopChannel,
1839+
featureFlags: featureFlags,
18351840
}
18361841
}
18371842

@@ -1851,7 +1856,7 @@ func (ath *activityTaskHandlerImpl) Execute(taskList string, t *s.PollForActivit
18511856
canCtx, cancel := context.WithCancel(rootCtx)
18521857
defer cancel()
18531858

1854-
invoker := newServiceInvoker(t.TaskToken, ath.identity, ath.service, cancel, t.GetHeartbeatTimeoutSeconds(), ath.workerStopCh)
1859+
invoker := newServiceInvoker(t.TaskToken, ath.identity, ath.service, cancel, t.GetHeartbeatTimeoutSeconds(), ath.workerStopCh, ath.featureFlags)
18551860
defer func() {
18561861
_, activityCompleted := result.(*s.RespondActivityTaskCompletedRequest)
18571862
invoker.Close(!activityCompleted) // flush buffered heartbeat if activity was not successfully completed.
@@ -1977,6 +1982,7 @@ func signalWorkflow(
19771982
runID string,
19781983
signalName string,
19791984
signalInput []byte,
1985+
featureFlags FeatureFlags,
19801986
) error {
19811987
request := &s.SignalWorkflowExecutionRequest{
19821988
Domain: common.StringPtr(domain),
@@ -1991,7 +1997,7 @@ func signalWorkflow(
19911997

19921998
return backoff.Retry(ctx,
19931999
func() error {
1994-
tchCtx, cancel, opt := newChannelContext(ctx)
2000+
tchCtx, cancel, opt := newChannelContext(ctx, featureFlags)
19952001
defer cancel()
19962002
return service.SignalWorkflowExecution(tchCtx, request, opt...)
19972003
}, createDynamicServiceRetryPolicy(ctx), isServiceTransientError)
@@ -2002,6 +2008,7 @@ func recordActivityHeartbeat(
20022008
service workflowserviceclient.Interface,
20032009
identity string,
20042010
taskToken, details []byte,
2011+
featureFlags FeatureFlags,
20052012
) error {
20062013
request := &s.RecordActivityTaskHeartbeatRequest{
20072014
TaskToken: taskToken,
@@ -2011,7 +2018,7 @@ func recordActivityHeartbeat(
20112018
var heartbeatResponse *s.RecordActivityTaskHeartbeatResponse
20122019
heartbeatErr := backoff.Retry(ctx,
20132020
func() error {
2014-
tchCtx, cancel, opt := newChannelContext(ctx)
2021+
tchCtx, cancel, opt := newChannelContext(ctx, featureFlags)
20152022
defer cancel()
20162023

20172024
var err error
@@ -2032,6 +2039,7 @@ func recordActivityHeartbeatByID(
20322039
identity string,
20332040
domain, workflowID, runID, activityID string,
20342041
details []byte,
2042+
featureFlags FeatureFlags,
20352043
) error {
20362044
request := &s.RecordActivityTaskHeartbeatByIDRequest{
20372045
Domain: common.StringPtr(domain),
@@ -2044,7 +2052,7 @@ func recordActivityHeartbeatByID(
20442052
var heartbeatResponse *s.RecordActivityTaskHeartbeatResponse
20452053
heartbeatErr := backoff.Retry(ctx,
20462054
func() error {
2047-
tchCtx, cancel, opt := newChannelContext(ctx)
2055+
tchCtx, cancel, opt := newChannelContext(ctx, featureFlags)
20482056
defer cancel()
20492057

20502058
var err error

internal/internal_task_handlers_test.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1265,7 +1265,9 @@ func (t *TaskHandlersTestSuite) TestHeartBeat_NilResponseWithError() {
12651265
mockService,
12661266
func() {},
12671267
0,
1268-
make(chan struct{}))
1268+
make(chan struct{}),
1269+
featureFlags,
1270+
)
12691271

12701272
heartbeatErr := cadenceInvoker.BatchHeartbeat(nil)
12711273
t.NotNil(heartbeatErr)
@@ -1289,7 +1291,9 @@ func (t *TaskHandlersTestSuite) TestHeartBeat_NilResponseWithDomainNotActiveErro
12891291
mockService,
12901292
cancelHandler,
12911293
0,
1292-
make(chan struct{}))
1294+
make(chan struct{}),
1295+
featureFlags,
1296+
)
12931297

12941298
heartbeatErr := cadenceInvoker.BatchHeartbeat(nil)
12951299
t.NotNil(heartbeatErr)

0 commit comments

Comments
 (0)