Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
fe62e38
add support of GetVersionOption, ExecuteWithVersion and ExecuteWithMi…
arzonus Jun 17, 2025
913dc2b
fix comment
arzonus Jun 17, 2025
69ef925
add tests
arzonus Jun 17, 2025
efefadd
add tests
arzonus Jun 17, 2025
17dbdd5
add public GetVersion
arzonus Jun 19, 2025
a27d3bb
add TestVersionedWorkflows case
arzonus Jun 19, 2025
145356e
add integration test for versioned workflow
arzonus Jun 19, 2025
117d88b
fix fmt
arzonus Jun 19, 2025
8b88e01
add V5 and V6 versions
arzonus Jun 20, 2025
9f5b299
change functional Option pattern to internal.GetVersionOptions
arzonus Jun 25, 2025
345765d
change description of ExecuteWithVersion and ExecuteWithMinVersion
arzonus Jun 25, 2025
6122a71
fix doc
arzonus Jun 26, 2025
5894d0d
unified tests
arzonus Jun 26, 2025
d7c35cf
simplified doc
arzonus Jun 26, 2025
793a697
move versioned workflows to integration tests
arzonus Jun 26, 2025
fded5f8
add support of ExecuteWithMinVersion and ExecuteWithVersion to testW…
arzonus Jun 20, 2025
e339579
add tests in Test_MockGetVersion
arzonus Jun 20, 2025
a1fa47d
update GetVersionOptions to non-functional option
arzonus Jun 26, 2025
ad549d5
fix goimports
arzonus Jun 26, 2025
ec9f09a
fix linter issues
arzonus Jun 26, 2025
95aff29
Apply suggestions from code review
arzonus Jun 27, 2025
5c742b9
rename versioned_workflow to versioned_workflow_test
arzonus Jun 27, 2025
37110cb
remove versioned_workflow jsons
arzonus Jun 27, 2025
7f69426
change usage of internal.GetVersionOption to GetVersionOption.apply i…
arzonus Jun 27, 2025
1da2064
fix doc
arzonus Jun 27, 2025
b4f77cb
fix doc
arzonus Jun 27, 2025
bbfe66e
simplify VersionedWorkflowV*
arzonus Jun 27, 2025
b5257fc
replace asserts to ElementsMatch
arzonus Jun 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions internal/interceptors.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ type WorkflowInterceptor interface {
GetSignalChannel(ctx Context, signalName string) Channel
SideEffect(ctx Context, f func(ctx Context) interface{}) Value
MutableSideEffect(ctx Context, id string, f func(ctx Context) interface{}, equals func(a, b interface{}) bool) Value
GetVersion(ctx Context, changeID string, minSupported, maxSupported Version) Version
GetVersion(ctx Context, changeID string, minSupported, maxSupported Version, opts ...GetVersionOption) Version
SetQueryHandler(ctx Context, queryType string, handler interface{}) error
IsReplaying(ctx Context) bool
HasLastCompletionResult(ctx Context) bool
Expand Down Expand Up @@ -158,8 +158,8 @@ func (t *WorkflowInterceptorBase) MutableSideEffect(ctx Context, id string, f fu
}

// GetVersion forwards to t.Next
func (t *WorkflowInterceptorBase) GetVersion(ctx Context, changeID string, minSupported, maxSupported Version) Version {
return t.Next.GetVersion(ctx, changeID, minSupported, maxSupported)
func (t *WorkflowInterceptorBase) GetVersion(ctx Context, changeID string, minSupported, maxSupported Version, opts ...GetVersionOption) Version {
return t.Next.GetVersion(ctx, changeID, minSupported, maxSupported, opts...)
}

// SetQueryHandler forwards to t.Next
Expand Down
44 changes: 37 additions & 7 deletions internal/internal_event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -602,28 +602,58 @@ func validateVersion(changeID string, version, minSupported, maxSupported Versio
}
}

func (wc *workflowEnvironmentImpl) GetVersion(changeID string, minSupported, maxSupported Version) Version {
func (wc *workflowEnvironmentImpl) GetVersion(changeID string, minSupported, maxSupported Version, opts ...GetVersionOption) Version {
// Check if the changeID already has a version assigned
// If it does, validate the version against the min and max supported versions
// ensuring it is within the acceptable range
if version, ok := wc.changeVersions[changeID]; ok {
validateVersion(changeID, version, minSupported, maxSupported)
return version
}

// Apply the functional options to get the configuration
config := &getVersionConfig{}
for _, opt := range opts {
opt.apply(config)
}

var version Version
if wc.isReplay {
// GetVersion for changeID is called first time in replay mode, use DefaultVersion
switch {

// GetVersion for changeID is called first time in replay mode, use DefaultVersion
case wc.isReplay:
version = DefaultVersion
} else {
// GetVersion for changeID is called first time (non-replay mode), generate a marker decision for it.
// Also upsert search attributes to enable ability to search by changeVersion.

// If ExecuteWithVersion option is used, use the custom version provided
case config.CustomVersion != nil:
version = *config.CustomVersion

// If ExecuteWithMinVersion option is set, use the minimum supported version
case config.UseMinVersion:
version = minSupported

// Otherwise, use the maximum supported version
default:
version = maxSupported
}

// Validate the version against the min and max supported versions
// ensuring it is within the acceptable range
validateVersion(changeID, version, minSupported, maxSupported)

// If the version is not the DefaultVersion, and it's not a replay, record it and update search attributes
// Keeping the DefaultVersion as a special case where no version marker is recorded
if !wc.isReplay && version != DefaultVersion {
// Record the version marker and update search attributes
wc.decisionsHelper.recordVersionMarker(changeID, version, wc.GetDataConverter())
err := wc.UpsertSearchAttributes(createSearchAttributesForChangeVersion(changeID, version, wc.changeVersions))
if err != nil {
wc.logger.Warn("UpsertSearchAttributes failed", zap.Error(err))
}
}

validateVersion(changeID, version, minSupported, maxSupported)
// Store the version to ensure that the version is stable
// during the workflow execution
wc.changeVersions[changeID] = version
return version
}
Expand Down
79 changes: 72 additions & 7 deletions internal/internal_event_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -754,6 +754,22 @@ func TestGetVersion(t *testing.T) {
res := weh.GetVersion("test", 1, 3)
assert.Equal(t, Version(2), res)
})
t.Run("version exists, ExecuteWithVersion is used", func(t *testing.T) {
weh := testWorkflowExecutionEventHandler(t, newRegistry())
weh.changeVersions = map[string]Version{
"test": 2,
}
res := weh.GetVersion("test", 1, 3, ExecuteWithVersion(3))
assert.Equal(t, Version(2), res)
})
t.Run("version exists, ExecuteWithMinVersion is used", func(t *testing.T) {
weh := testWorkflowExecutionEventHandler(t, newRegistry())
weh.changeVersions = map[string]Version{
"test": 2,
}
res := weh.GetVersion("test", 1, 3, ExecuteWithMinVersion())
assert.Equal(t, Version(2), res)
})
t.Run("version doesn't exist in replay", func(t *testing.T) {
weh := testWorkflowExecutionEventHandler(t, newRegistry())
weh.isReplay = true
Expand All @@ -770,6 +786,55 @@ func TestGetVersion(t *testing.T) {
assert.Equal(t, Version(3), weh.changeVersions["test"])
assert.Equal(t, []byte(`["test-3"]`), weh.workflowInfo.SearchAttributes.IndexedFields[CadenceChangeVersion], "ensure search attributes are updated")
})
t.Run("version doesn't exist, ExecuteWithVersion is used", func(t *testing.T) {
weh := testWorkflowExecutionEventHandler(t, newRegistry())
res := weh.GetVersion("test", DefaultVersion, 3, ExecuteWithVersion(2))
assert.Equal(t, Version(2), res)
require.Contains(t, weh.changeVersions, "test")
assert.Equal(t, Version(2), weh.changeVersions["test"])
assert.Equal(t, []byte(`["test-2"]`), weh.workflowInfo.SearchAttributes.IndexedFields[CadenceChangeVersion], "ensure search attributes are updated")
})
t.Run("version doesn't exist, ExecuteWithVersion is used, DefaultVersion is used", func(t *testing.T) {
weh := testWorkflowExecutionEventHandler(t, newRegistry())
res := weh.GetVersion("test", DefaultVersion, 3, ExecuteWithVersion(DefaultVersion))
assert.Equal(t, DefaultVersion, res)
require.Contains(t, weh.changeVersions, "test")
assert.Equal(t, DefaultVersion, weh.changeVersions["test"])
require.Nil(t, weh.workflowInfo.SearchAttributes, "ensure search attributes are not updated")
})
t.Run("version doesn't exist, ExecuteWithMinVersion is used, min is non DefaultVersion", func(t *testing.T) {
weh := testWorkflowExecutionEventHandler(t, newRegistry())
res := weh.GetVersion("test", 1, 3, ExecuteWithMinVersion())
assert.Equal(t, Version(1), res)
require.Contains(t, weh.changeVersions, "test")
assert.Equal(t, Version(1), weh.changeVersions["test"])
assert.Equal(t, []byte(`["test-1"]`), weh.workflowInfo.SearchAttributes.IndexedFields[CadenceChangeVersion], "ensure search attributes are updated")
})
t.Run("version doesn't exist, ExecuteWithMinVersion is used, DefaultVersion is used", func(t *testing.T) {
weh := testWorkflowExecutionEventHandler(t, newRegistry())
res := weh.GetVersion("test", DefaultVersion, 3, ExecuteWithMinVersion())
assert.Equal(t, DefaultVersion, res)
require.Contains(t, weh.changeVersions, "test")
assert.Equal(t, DefaultVersion, weh.changeVersions["test"])
require.Nil(t, weh.workflowInfo.SearchAttributes, "ensure search attributes are not updated")
})

t.Run("version doesn't exist, ExecuteWithVersion is used, version > maximum version", func(t *testing.T) {
weh := testWorkflowExecutionEventHandler(t, newRegistry())
assert.PanicsWithValue(t, `Workflow code is too old to support version 10 for "test" changeID. The maximum supported version is 3`, func() {
weh.GetVersion("test", DefaultVersion, 3, ExecuteWithVersion(10))
})

require.Nil(t, weh.workflowInfo.SearchAttributes, "ensure search attributes are not updated")
})
t.Run("version doesn't exist, ExecuteWithVersion is used, version < minimum version", func(t *testing.T) {
weh := testWorkflowExecutionEventHandler(t, newRegistry())
assert.PanicsWithValue(t, `Workflow code removed support of version 0. for "test" changeID. The oldest supported version is 1`, func() {
weh.GetVersion("test", 1, 3, ExecuteWithVersion(0))
})

require.Nil(t, weh.workflowInfo.SearchAttributes, "ensure search attributes are not updated")
})
}

func TestMutableSideEffect(t *testing.T) {
Expand Down Expand Up @@ -982,6 +1047,13 @@ func TestWorkflowExecutionEnvironment_NewTimer_immediate_calls(t *testing.T) {
}

func testWorkflowExecutionEventHandler(t *testing.T, registry *registry) *workflowExecutionEventHandlerImpl {
var testWorkflowInfo = &WorkflowInfo{
WorkflowType: WorkflowType{
Name: "test",
Path: "",
},
}

return newWorkflowExecutionEventHandler(
testWorkflowInfo,
func(result []byte, err error) {},
Expand All @@ -996,13 +1068,6 @@ func testWorkflowExecutionEventHandler(t *testing.T, registry *registry) *workfl
).(*workflowExecutionEventHandlerImpl)
}

var testWorkflowInfo = &WorkflowInfo{
WorkflowType: WorkflowType{
Name: "test",
Path: "",
},
}

func getSerializedDetails[T, V any](t *testing.T, id T, data V) []byte {
converter := defaultDataConverter{}
res, err := converter.ToData(id, data)
Expand Down
2 changes: 1 addition & 1 deletion internal/internal_worker_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ type (
localActivityClient
workflowTimerClient
SideEffect(f func() ([]byte, error), callback resultHandler)
GetVersion(changeID string, minSupported, maxSupported Version) Version
GetVersion(changeID string, minSupported, maxSupported Version, opts ...GetVersionOption) Version
WorkflowInfo() *WorkflowInfo
Complete(result []byte, err error)
RegisterCancelHandler(handler func())
Expand Down
42 changes: 38 additions & 4 deletions internal/internal_workflow_testsuite.go
Original file line number Diff line number Diff line change
Expand Up @@ -1928,7 +1928,7 @@ func (env *testWorkflowEnvironmentImpl) SideEffect(f func() ([]byte, error), cal
callback(f())
}

func (env *testWorkflowEnvironmentImpl) GetVersion(changeID string, minSupported, maxSupported Version) (retVersion Version) {
func (env *testWorkflowEnvironmentImpl) GetVersion(changeID string, minSupported, maxSupported Version, opts ...GetVersionOption) (retVersion Version) {
if mockVersion, ok := env.getMockedVersion(changeID, changeID, minSupported, maxSupported); ok {
// GetVersion for changeID is mocked
env.UpsertSearchAttributes(createSearchAttributesForChangeVersion(changeID, mockVersion, env.changeVersions))
Expand All @@ -1947,9 +1947,43 @@ func (env *testWorkflowEnvironmentImpl) GetVersion(changeID string, minSupported
validateVersion(changeID, version, minSupported, maxSupported)
return version
}
env.UpsertSearchAttributes(createSearchAttributesForChangeVersion(changeID, maxSupported, env.changeVersions))
env.changeVersions[changeID] = maxSupported
return maxSupported

// Apply the functional options to get the configuration
config := &getVersionConfig{}
for _, opt := range opts {
opt.apply(config)
}

// Determine the version to use based on the options provided
var version Version
switch {
// If ExecuteWithVersion option is used, use the custom version provided
case config.CustomVersion != nil:
version = *config.CustomVersion

// If ExecuteWithMinVersion option is set, use the minimum supported version
case config.UseMinVersion:
version = minSupported

// Otherwise, use the maximum supported version
default:
version = maxSupported
}

// Validate the version against the min and max supported versions
// ensuring it is within the acceptable range
validateVersion(changeID, version, minSupported, maxSupported)

// If the version is not the DefaultVersion, update search attributes
// Keeping the DefaultVersion as a special case where no search attributes are updated
if version != DefaultVersion {
env.UpsertSearchAttributes(createSearchAttributesForChangeVersion(changeID, version, env.changeVersions))
}

// Store the version to ensure that the version is stable
// during the workflow execution
env.changeVersions[changeID] = version
return version
}

func (env *testWorkflowEnvironmentImpl) getMockedVersion(mockedChangeID, changeID string, minSupported, maxSupported Version) (Version, bool) {
Expand Down
Loading