Skip to content

Commit ebff2b1

Browse files
arzonusGroxx
andauthored
Introduce ExecuteWithVersion and ExecuteWithMinVersion to GetVersion (#1427)
* add support of GetVersionOption, ExecuteWithVersion and ExecuteWithMinVersion * fix comment * add tests * add tests * add public GetVersion * add TestVersionedWorkflows case * add integration test for versioned workflow * fix fmt * add V5 and V6 versions * change functional Option pattern to internal.GetVersionOptions * change description of ExecuteWithVersion and ExecuteWithMinVersion * fix doc * unified tests * simplified doc * move versioned workflows to integration tests * add support of ExecuteWithMinVersion and ExecuteWithVersion to testWorkflowEnvironmentImpl * add tests in Test_MockGetVersion * update GetVersionOptions to non-functional option * fix goimports * fix linter issues * Apply suggestions from code review Co-authored-by: Steven L <[email protected]> * rename versioned_workflow to versioned_workflow_test * remove versioned_workflow jsons * change usage of internal.GetVersionOption to GetVersionOption.apply interface * fix doc * fix doc * simplify VersionedWorkflowV* * replace asserts to ElementsMatch --------- Co-authored-by: Steven L <[email protected]>
1 parent 6f6a1fa commit ebff2b1

12 files changed

+847
-75
lines changed

internal/interceptors.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ type WorkflowInterceptor interface {
6363
GetSignalChannel(ctx Context, signalName string) Channel
6464
SideEffect(ctx Context, f func(ctx Context) interface{}) Value
6565
MutableSideEffect(ctx Context, id string, f func(ctx Context) interface{}, equals func(a, b interface{}) bool) Value
66-
GetVersion(ctx Context, changeID string, minSupported, maxSupported Version) Version
66+
GetVersion(ctx Context, changeID string, minSupported, maxSupported Version, opts ...GetVersionOption) Version
6767
SetQueryHandler(ctx Context, queryType string, handler interface{}) error
6868
IsReplaying(ctx Context) bool
6969
HasLastCompletionResult(ctx Context) bool
@@ -158,8 +158,8 @@ func (t *WorkflowInterceptorBase) MutableSideEffect(ctx Context, id string, f fu
158158
}
159159

160160
// GetVersion forwards to t.Next
161-
func (t *WorkflowInterceptorBase) GetVersion(ctx Context, changeID string, minSupported, maxSupported Version) Version {
162-
return t.Next.GetVersion(ctx, changeID, minSupported, maxSupported)
161+
func (t *WorkflowInterceptorBase) GetVersion(ctx Context, changeID string, minSupported, maxSupported Version, opts ...GetVersionOption) Version {
162+
return t.Next.GetVersion(ctx, changeID, minSupported, maxSupported, opts...)
163163
}
164164

165165
// SetQueryHandler forwards to t.Next

internal/internal_event_handlers.go

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -602,28 +602,58 @@ func validateVersion(changeID string, version, minSupported, maxSupported Versio
602602
}
603603
}
604604

605-
func (wc *workflowEnvironmentImpl) GetVersion(changeID string, minSupported, maxSupported Version) Version {
605+
func (wc *workflowEnvironmentImpl) GetVersion(changeID string, minSupported, maxSupported Version, opts ...GetVersionOption) Version {
606+
// Check if the changeID already has a version assigned
607+
// If it does, validate the version against the min and max supported versions
608+
// ensuring it is within the acceptable range
606609
if version, ok := wc.changeVersions[changeID]; ok {
607610
validateVersion(changeID, version, minSupported, maxSupported)
608611
return version
609612
}
610613

614+
// Apply the functional options to get the configuration
615+
config := &getVersionConfig{}
616+
for _, opt := range opts {
617+
opt.apply(config)
618+
}
619+
611620
var version Version
612-
if wc.isReplay {
613-
// GetVersion for changeID is called first time in replay mode, use DefaultVersion
621+
switch {
622+
623+
// GetVersion for changeID is called first time in replay mode, use DefaultVersion
624+
case wc.isReplay:
614625
version = DefaultVersion
615-
} else {
616-
// GetVersion for changeID is called first time (non-replay mode), generate a marker decision for it.
617-
// Also upsert search attributes to enable ability to search by changeVersion.
626+
627+
// If ExecuteWithVersion option is used, use the custom version provided
628+
case config.CustomVersion != nil:
629+
version = *config.CustomVersion
630+
631+
// If ExecuteWithMinVersion option is set, use the minimum supported version
632+
case config.UseMinVersion:
633+
version = minSupported
634+
635+
// Otherwise, use the maximum supported version
636+
default:
618637
version = maxSupported
638+
}
639+
640+
// Validate the version against the min and max supported versions
641+
// ensuring it is within the acceptable range
642+
validateVersion(changeID, version, minSupported, maxSupported)
643+
644+
// If the version is not the DefaultVersion, and it's not a replay, record it and update search attributes
645+
// Keeping the DefaultVersion as a special case where no version marker is recorded
646+
if !wc.isReplay && version != DefaultVersion {
647+
// Record the version marker and update search attributes
619648
wc.decisionsHelper.recordVersionMarker(changeID, version, wc.GetDataConverter())
620649
err := wc.UpsertSearchAttributes(createSearchAttributesForChangeVersion(changeID, version, wc.changeVersions))
621650
if err != nil {
622651
wc.logger.Warn("UpsertSearchAttributes failed", zap.Error(err))
623652
}
624653
}
625654

626-
validateVersion(changeID, version, minSupported, maxSupported)
655+
// Store the version to ensure that the version is stable
656+
// during the workflow execution
627657
wc.changeVersions[changeID] = version
628658
return version
629659
}

internal/internal_event_handlers_test.go

Lines changed: 72 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -754,6 +754,22 @@ func TestGetVersion(t *testing.T) {
754754
res := weh.GetVersion("test", 1, 3)
755755
assert.Equal(t, Version(2), res)
756756
})
757+
t.Run("version exists, ExecuteWithVersion is used", func(t *testing.T) {
758+
weh := testWorkflowExecutionEventHandler(t, newRegistry())
759+
weh.changeVersions = map[string]Version{
760+
"test": 2,
761+
}
762+
res := weh.GetVersion("test", 1, 3, ExecuteWithVersion(3))
763+
assert.Equal(t, Version(2), res)
764+
})
765+
t.Run("version exists, ExecuteWithMinVersion is used", func(t *testing.T) {
766+
weh := testWorkflowExecutionEventHandler(t, newRegistry())
767+
weh.changeVersions = map[string]Version{
768+
"test": 2,
769+
}
770+
res := weh.GetVersion("test", 1, 3, ExecuteWithMinVersion())
771+
assert.Equal(t, Version(2), res)
772+
})
757773
t.Run("version doesn't exist in replay", func(t *testing.T) {
758774
weh := testWorkflowExecutionEventHandler(t, newRegistry())
759775
weh.isReplay = true
@@ -770,6 +786,55 @@ func TestGetVersion(t *testing.T) {
770786
assert.Equal(t, Version(3), weh.changeVersions["test"])
771787
assert.Equal(t, []byte(`["test-3"]`), weh.workflowInfo.SearchAttributes.IndexedFields[CadenceChangeVersion], "ensure search attributes are updated")
772788
})
789+
t.Run("version doesn't exist, ExecuteWithVersion is used", func(t *testing.T) {
790+
weh := testWorkflowExecutionEventHandler(t, newRegistry())
791+
res := weh.GetVersion("test", DefaultVersion, 3, ExecuteWithVersion(2))
792+
assert.Equal(t, Version(2), res)
793+
require.Contains(t, weh.changeVersions, "test")
794+
assert.Equal(t, Version(2), weh.changeVersions["test"])
795+
assert.Equal(t, []byte(`["test-2"]`), weh.workflowInfo.SearchAttributes.IndexedFields[CadenceChangeVersion], "ensure search attributes are updated")
796+
})
797+
t.Run("version doesn't exist, ExecuteWithVersion is used, DefaultVersion is used", func(t *testing.T) {
798+
weh := testWorkflowExecutionEventHandler(t, newRegistry())
799+
res := weh.GetVersion("test", DefaultVersion, 3, ExecuteWithVersion(DefaultVersion))
800+
assert.Equal(t, DefaultVersion, res)
801+
require.Contains(t, weh.changeVersions, "test")
802+
assert.Equal(t, DefaultVersion, weh.changeVersions["test"])
803+
require.Nil(t, weh.workflowInfo.SearchAttributes, "ensure search attributes are not updated")
804+
})
805+
t.Run("version doesn't exist, ExecuteWithMinVersion is used, min is non DefaultVersion", func(t *testing.T) {
806+
weh := testWorkflowExecutionEventHandler(t, newRegistry())
807+
res := weh.GetVersion("test", 1, 3, ExecuteWithMinVersion())
808+
assert.Equal(t, Version(1), res)
809+
require.Contains(t, weh.changeVersions, "test")
810+
assert.Equal(t, Version(1), weh.changeVersions["test"])
811+
assert.Equal(t, []byte(`["test-1"]`), weh.workflowInfo.SearchAttributes.IndexedFields[CadenceChangeVersion], "ensure search attributes are updated")
812+
})
813+
t.Run("version doesn't exist, ExecuteWithMinVersion is used, DefaultVersion is used", func(t *testing.T) {
814+
weh := testWorkflowExecutionEventHandler(t, newRegistry())
815+
res := weh.GetVersion("test", DefaultVersion, 3, ExecuteWithMinVersion())
816+
assert.Equal(t, DefaultVersion, res)
817+
require.Contains(t, weh.changeVersions, "test")
818+
assert.Equal(t, DefaultVersion, weh.changeVersions["test"])
819+
require.Nil(t, weh.workflowInfo.SearchAttributes, "ensure search attributes are not updated")
820+
})
821+
822+
t.Run("version doesn't exist, ExecuteWithVersion is used, version > maximum version", func(t *testing.T) {
823+
weh := testWorkflowExecutionEventHandler(t, newRegistry())
824+
assert.PanicsWithValue(t, `Workflow code is too old to support version 10 for "test" changeID. The maximum supported version is 3`, func() {
825+
weh.GetVersion("test", DefaultVersion, 3, ExecuteWithVersion(10))
826+
})
827+
828+
require.Nil(t, weh.workflowInfo.SearchAttributes, "ensure search attributes are not updated")
829+
})
830+
t.Run("version doesn't exist, ExecuteWithVersion is used, version < minimum version", func(t *testing.T) {
831+
weh := testWorkflowExecutionEventHandler(t, newRegistry())
832+
assert.PanicsWithValue(t, `Workflow code removed support of version 0. for "test" changeID. The oldest supported version is 1`, func() {
833+
weh.GetVersion("test", 1, 3, ExecuteWithVersion(0))
834+
})
835+
836+
require.Nil(t, weh.workflowInfo.SearchAttributes, "ensure search attributes are not updated")
837+
})
773838
}
774839

775840
func TestMutableSideEffect(t *testing.T) {
@@ -982,6 +1047,13 @@ func TestWorkflowExecutionEnvironment_NewTimer_immediate_calls(t *testing.T) {
9821047
}
9831048

9841049
func testWorkflowExecutionEventHandler(t *testing.T, registry *registry) *workflowExecutionEventHandlerImpl {
1050+
var testWorkflowInfo = &WorkflowInfo{
1051+
WorkflowType: WorkflowType{
1052+
Name: "test",
1053+
Path: "",
1054+
},
1055+
}
1056+
9851057
return newWorkflowExecutionEventHandler(
9861058
testWorkflowInfo,
9871059
func(result []byte, err error) {},
@@ -996,13 +1068,6 @@ func testWorkflowExecutionEventHandler(t *testing.T, registry *registry) *workfl
9961068
).(*workflowExecutionEventHandlerImpl)
9971069
}
9981070

999-
var testWorkflowInfo = &WorkflowInfo{
1000-
WorkflowType: WorkflowType{
1001-
Name: "test",
1002-
Path: "",
1003-
},
1004-
}
1005-
10061071
func getSerializedDetails[T, V any](t *testing.T, id T, data V) []byte {
10071072
converter := defaultDataConverter{}
10081073
res, err := converter.ToData(id, data)

internal/internal_worker_base.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ type (
7676
localActivityClient
7777
workflowTimerClient
7878
SideEffect(f func() ([]byte, error), callback resultHandler)
79-
GetVersion(changeID string, minSupported, maxSupported Version) Version
79+
GetVersion(changeID string, minSupported, maxSupported Version, opts ...GetVersionOption) Version
8080
WorkflowInfo() *WorkflowInfo
8181
Complete(result []byte, err error)
8282
RegisterCancelHandler(handler func())

internal/internal_workflow_testsuite.go

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1928,7 +1928,7 @@ func (env *testWorkflowEnvironmentImpl) SideEffect(f func() ([]byte, error), cal
19281928
callback(f())
19291929
}
19301930

1931-
func (env *testWorkflowEnvironmentImpl) GetVersion(changeID string, minSupported, maxSupported Version) (retVersion Version) {
1931+
func (env *testWorkflowEnvironmentImpl) GetVersion(changeID string, minSupported, maxSupported Version, opts ...GetVersionOption) (retVersion Version) {
19321932
if mockVersion, ok := env.getMockedVersion(changeID, changeID, minSupported, maxSupported); ok {
19331933
// GetVersion for changeID is mocked
19341934
env.UpsertSearchAttributes(createSearchAttributesForChangeVersion(changeID, mockVersion, env.changeVersions))
@@ -1947,9 +1947,43 @@ func (env *testWorkflowEnvironmentImpl) GetVersion(changeID string, minSupported
19471947
validateVersion(changeID, version, minSupported, maxSupported)
19481948
return version
19491949
}
1950-
env.UpsertSearchAttributes(createSearchAttributesForChangeVersion(changeID, maxSupported, env.changeVersions))
1951-
env.changeVersions[changeID] = maxSupported
1952-
return maxSupported
1950+
1951+
// Apply the functional options to get the configuration
1952+
config := &getVersionConfig{}
1953+
for _, opt := range opts {
1954+
opt.apply(config)
1955+
}
1956+
1957+
// Determine the version to use based on the options provided
1958+
var version Version
1959+
switch {
1960+
// If ExecuteWithVersion option is used, use the custom version provided
1961+
case config.CustomVersion != nil:
1962+
version = *config.CustomVersion
1963+
1964+
// If ExecuteWithMinVersion option is set, use the minimum supported version
1965+
case config.UseMinVersion:
1966+
version = minSupported
1967+
1968+
// Otherwise, use the maximum supported version
1969+
default:
1970+
version = maxSupported
1971+
}
1972+
1973+
// Validate the version against the min and max supported versions
1974+
// ensuring it is within the acceptable range
1975+
validateVersion(changeID, version, minSupported, maxSupported)
1976+
1977+
// If the version is not the DefaultVersion, update search attributes
1978+
// Keeping the DefaultVersion as a special case where no search attributes are updated
1979+
if version != DefaultVersion {
1980+
env.UpsertSearchAttributes(createSearchAttributesForChangeVersion(changeID, version, env.changeVersions))
1981+
}
1982+
1983+
// Store the version to ensure that the version is stable
1984+
// during the workflow execution
1985+
env.changeVersions[changeID] = version
1986+
return version
19531987
}
19541988

19551989
func (env *testWorkflowEnvironmentImpl) getMockedVersion(mockedChangeID, changeID string, minSupported, maxSupported Version) (Version, bool) {

0 commit comments

Comments
 (0)