Skip to content

Commit ef8165c

Browse files
committed
add support of GetVersionOption, ExecuteWithVersion and ExecuteWithMinVersion
1 parent 291e3dd commit ef8165c

File tree

5 files changed

+132
-17
lines changed

5 files changed

+132
-17
lines changed

internal/interceptors.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ type WorkflowInterceptor interface {
6363
GetSignalChannel(ctx Context, signalName string) Channel
6464
SideEffect(ctx Context, f func(ctx Context) interface{}) Value
6565
MutableSideEffect(ctx Context, id string, f func(ctx Context) interface{}, equals func(a, b interface{}) bool) Value
66-
GetVersion(ctx Context, changeID string, minSupported, maxSupported Version) Version
66+
GetVersion(ctx Context, changeID string, minSupported, maxSupported Version, opts ...GetVersionOption) Version
6767
SetQueryHandler(ctx Context, queryType string, handler interface{}) error
6868
IsReplaying(ctx Context) bool
6969
HasLastCompletionResult(ctx Context) bool
@@ -158,8 +158,8 @@ func (t *WorkflowInterceptorBase) MutableSideEffect(ctx Context, id string, f fu
158158
}
159159

160160
// GetVersion forwards to t.Next
161-
func (t *WorkflowInterceptorBase) GetVersion(ctx Context, changeID string, minSupported, maxSupported Version) Version {
162-
return t.Next.GetVersion(ctx, changeID, minSupported, maxSupported)
161+
func (t *WorkflowInterceptorBase) GetVersion(ctx Context, changeID string, minSupported, maxSupported Version, opts ...GetVersionOption) Version {
162+
return t.Next.GetVersion(ctx, changeID, minSupported, maxSupported, opts...)
163163
}
164164

165165
// SetQueryHandler forwards to t.Next

internal/internal_event_handlers.go

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -602,29 +602,59 @@ func validateVersion(changeID string, version, minSupported, maxSupported Versio
602602
}
603603
}
604604

605-
func (wc *workflowEnvironmentImpl) GetVersion(changeID string, minSupported, maxSupported Version) Version {
605+
func (wc *workflowEnvironmentImpl) GetVersion(changeID string, minSupported, maxSupported Version, opts ...GetVersionOption) Version {
606+
// Check if the changeID already has a version assigned
607+
// If it does, validate the version against the min and max supported versions
606608
if version, ok := wc.changeVersions[changeID]; ok {
607609
validateVersion(changeID, version, minSupported, maxSupported)
608610
return version
609611
}
610612

613+
// Apply options to determine which version to use
614+
options := &getVersionOptions{}
615+
for _, opt := range opts {
616+
opt(options)
617+
}
618+
611619
var version Version
612-
if wc.isReplay {
613-
// GetVersion for changeID is called first time in replay mode, use DefaultVersion
620+
switch {
621+
622+
// GetVersion for changeID is called first time in replay mode, use DefaultVersion
623+
case wc.isReplay:
614624
version = DefaultVersion
615-
} else {
616-
// GetVersion for changeID is called first time (non-replay mode), generate a marker decision for it.
617-
// Also upsert search attributes to enable ability to search by changeVersion.
625+
626+
// If ExecuteWithVersion option is used, use the custom version provided
627+
case options.customVersion != nil:
628+
version = *options.customVersion
629+
630+
// If ExecuteWithMinVersion option is set, use the minimum supported version
631+
case options.useMinVersion:
632+
version = minSupported
633+
634+
// Otherwise, use the maximum supported version
635+
default:
618636
version = maxSupported
637+
}
638+
639+
// Validate the version against the min and max supported versions
640+
// ensuring it is within the acceptable range
641+
validateVersion(changeID, version, minSupported, maxSupported)
642+
643+
// If the version is not the DefaultVersion, and it's not a replay, record it and update search attributes
644+
// Keeping the DefaultVersion as a special case where no version marker is recorded
645+
if !wc.isReplay && version != DefaultVersion {
646+
// Record the version marker and update search attributes
619647
wc.decisionsHelper.recordVersionMarker(changeID, version, wc.GetDataConverter())
620648
err := wc.UpsertSearchAttributes(createSearchAttributesForChangeVersion(changeID, version, wc.changeVersions))
621649
if err != nil {
622650
wc.logger.Warn("UpsertSearchAttributes failed", zap.Error(err))
623651
}
624652
}
625653

626-
validateVersion(changeID, version, minSupported, maxSupported)
654+
// Store the version in the changeVersions
655+
// ensuring that it can be retrieved later
627656
wc.changeVersions[changeID] = version
657+
628658
return version
629659
}
630660

internal/internal_worker_base.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ type (
7676
localActivityClient
7777
workflowTimerClient
7878
SideEffect(f func() ([]byte, error), callback resultHandler)
79-
GetVersion(changeID string, minSupported, maxSupported Version) Version
79+
GetVersion(changeID string, minSupported, maxSupported Version, opts ...GetVersionOption) Version
8080
WorkflowInfo() *WorkflowInfo
8181
Complete(result []byte, err error)
8282
RegisterCancelHandler(handler func())

internal/internal_workflow_testsuite.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1928,7 +1928,7 @@ func (env *testWorkflowEnvironmentImpl) SideEffect(f func() ([]byte, error), cal
19281928
callback(f())
19291929
}
19301930

1931-
func (env *testWorkflowEnvironmentImpl) GetVersion(changeID string, minSupported, maxSupported Version) (retVersion Version) {
1931+
func (env *testWorkflowEnvironmentImpl) GetVersion(changeID string, minSupported, maxSupported Version, opts ...GetVersionOption) (retVersion Version) {
19321932
if mockVersion, ok := env.getMockedVersion(changeID, changeID, minSupported, maxSupported); ok {
19331933
// GetVersion for changeID is mocked
19341934
env.UpsertSearchAttributes(createSearchAttributesForChangeVersion(changeID, mockVersion, env.changeVersions))

internal/workflow.go

Lines changed: 90 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1567,11 +1567,96 @@ const DefaultVersion Version = -1
15671567
// CadenceChangeVersion is used as search attributes key to find workflows with specific change version.
15681568
const CadenceChangeVersion = "CadenceChangeVersion"
15691569

1570+
// GetVersionOption represents a function that configures GetVersion behavior
1571+
type GetVersionOption func(*getVersionOptions)
1572+
1573+
type getVersionOptions struct {
1574+
// customVersion is used to force GetVersion to return a specific version
1575+
// instead of maxSupported version. Set up via ExecuteWithVersion option.
1576+
customVersion *Version
1577+
1578+
// useMinVersion is used to force GetVersion to return minSupported version
1579+
// instead of maxSupported version. Set up via ExecuteWithMinVersion option.
1580+
useMinVersion bool
1581+
}
1582+
1583+
// ExecuteWithVersion returns a GetVersionOption that forces a specific version to be returned
1584+
// when executed for the first time, instead of returning maxSupported version.
1585+
//
1586+
// This option can be used when you want to separate the versioning of the workflow code and
1587+
// activation of the new logic in the workflow code, to ensure that your changes can be safely rolled back
1588+
// if needed. For example, initially a workflow has the following code:
1589+
//
1590+
// err = workflow.ExecuteActivity(ctx, foo).Get(ctx, nil)
1591+
//
1592+
// It should be updated to:
1593+
//
1594+
// err = workflow.ExecuteActivity(ctx, bar).Get(ctx, nil)
1595+
//
1596+
// Step 1
1597+
// To roll out your changes safely, both versions of your workflow code should be compatible with each other.
1598+
// To achieve that, you can use GetVersion with ExecuteWithVersion option.
1599+
// When GetVersion is executed for the first time, it will return DefaultVersion instead of maxSupported version:
1600+
//
1601+
// v := GetVersion(ctx, "fooChange", DefaultVersion, 1, ExecuteWithVersion(DefaultVersion))
1602+
// if v == DefaultVersion {
1603+
// err = workflow.ExecuteActivity(ctx, foo).Get(ctx, nil)
1604+
// } else {
1605+
// err = workflow.ExecuteActivity(ctx, bar).Get(ctx, nil)
1606+
// }
1607+
//
1608+
// At this step, the previous version of the code supports only DefaultVersion, however new version of the code
1609+
// supports both DefaultVersion and 1. At the same time, the new version of the code is not yet activated,
1610+
// so the workflow started on the new version of the code will still execute foo activity - previous version of the code.
1611+
// This makes it possible to safely roll back your changes if needed, as the previous code supports DefaultVersion.
1612+
//
1613+
// Step 2
1614+
// When the previous version of the code is no longer running, there is no need to start new workflow executions
1615+
// with DefaultVersion anymore, and you can the maxSupported version to activate the new code. To achieve that you can
1616+
// remove the usage of ExecuteWithVersion option. When GetVersion is executed for the first time, it will return maxSupported version:
1617+
//
1618+
// v := GetVersion(ctx, "fooChange", DefaultVersion, 1)
1619+
// if v == DefaultVersion {
1620+
// err = workflow.ExecuteActivity(ctx, foo).Get(ctx, nil)
1621+
// } else {
1622+
// err = workflow.ExecuteActivity(ctx, bar).Get(ctx, nil)
1623+
// }
1624+
//
1625+
// At this step, the previous and old versions of the code support both versions DefaultVersion and 1,
1626+
// however the new version of the code is activated, so the workflow started on the new version of the code
1627+
// will execute bar activity - new version of the code. This makes it possible to safely roll back your changes if needed,
1628+
// because both versions of the code support both versions DefaultVersion and 1.
1629+
//
1630+
// Step 3
1631+
// When there are no running previous version of the code and there are no workflow executions
1632+
// running DefaultVersion the correspondent branch can be removed:
1633+
//
1634+
// GetVersion(ctx, "fooChange", 1, 1)
1635+
// err = workflow.ExecuteActivity(ctx, bar).Get(ctx, nil)
1636+
//
1637+
// ExecuteWithVersion option is useful when you want to ensure that your changes can be safely rolled back if needed, as
1638+
// both versions of the workflow code are compatible with each other.
1639+
func ExecuteWithVersion(version Version) GetVersionOption {
1640+
return func(o *getVersionOptions) {
1641+
o.customVersion = &version
1642+
}
1643+
}
1644+
1645+
// ExecuteWithMinVersion returns a GetVersionOption that makes GetVersion return minSupported version
1646+
// when executed for the first time, instead of returning maxSupported version.
1647+
// To see how this option can be used, see the ExecuteWithVersion option
1648+
func ExecuteWithMinVersion() GetVersionOption {
1649+
return func(o *getVersionOptions) {
1650+
o.useMinVersion = true
1651+
}
1652+
}
1653+
15701654
// GetVersion is used to safely perform backwards incompatible changes to workflow definitions.
15711655
// It is not allowed to update workflow code while there are workflows running as it is going to break
15721656
// determinism. The solution is to have both old code that is used to replay existing workflows
15731657
// as well as the new one that is used when it is executed for the first time.
1574-
// GetVersion returns maxSupported version when is executed for the first time. This version is recorded into the
1658+
// GetVersion returns maxSupported version (to return another version, check GetVersionOption),
1659+
// when is executed for the first time. This version is recorded into the
15751660
// workflow history as a marker event. Even if maxSupported version is changed the version that was recorded is
15761661
// returned on replay. DefaultVersion constant contains version of code that wasn't versioned before.
15771662
// For example initially workflow has the following code:
@@ -1632,13 +1717,13 @@ const CadenceChangeVersion = "CadenceChangeVersion"
16321717
// } else {
16331718
// err = workflow.ExecuteActivity(ctx, qux, data).Get(ctx, nil)
16341719
// }
1635-
func GetVersion(ctx Context, changeID string, minSupported, maxSupported Version) Version {
1720+
func GetVersion(ctx Context, changeID string, minSupported, maxSupported Version, opts ...GetVersionOption) Version {
16361721
i := getWorkflowInterceptor(ctx)
1637-
return i.GetVersion(ctx, changeID, minSupported, maxSupported)
1722+
return i.GetVersion(ctx, changeID, minSupported, maxSupported, opts...)
16381723
}
16391724

1640-
func (wc *workflowEnvironmentInterceptor) GetVersion(ctx Context, changeID string, minSupported, maxSupported Version) Version {
1641-
return wc.env.GetVersion(changeID, minSupported, maxSupported)
1725+
func (wc *workflowEnvironmentInterceptor) GetVersion(ctx Context, changeID string, minSupported, maxSupported Version, opts ...GetVersionOption) Version {
1726+
return wc.env.GetVersion(changeID, minSupported, maxSupported, opts...)
16421727
}
16431728

16441729
// SetQueryHandler sets the query handler to handle workflow query. The queryType specify which query type this handler

0 commit comments

Comments
 (0)