Skip to content

Commit 2629459

Browse files
committed
"Backported" un-bugfix in case people hit non-determinism errors
Introduces and uses a "bug backport" config, to allow selecting old behavior that could not be fixed in a completely transparent way. See code comments for detailed use for this new flag. In general we should treat this config as *somewhat desirable* to keep when the upkeep cost is low or zero, but it does represent known tech debt that we need to clean up at some point. When adding or removing fields on it, make sure to cover it in the version notes!
1 parent d40c5cc commit 2629459

File tree

7 files changed

+354
-16
lines changed

7 files changed

+354
-16
lines changed

internal/internal_workflow.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,7 @@ type (
183183
memo map[string]interface{}
184184
searchAttributes map[string]interface{}
185185
parentClosePolicy ParentClosePolicy
186+
bugports Bugports
186187
}
187188

188189
executeWorkflowParams struct {
@@ -596,7 +597,7 @@ func (c *channelImpl) Receive(ctx Context, valuePtr interface{}) (more bool) {
596597
hasResult = false
597598
v, ok, m := c.receiveAsyncImpl(callback)
598599

599-
if !ok && !m { //channel closed and empty
600+
if !ok && !m { // channel closed and empty
600601
return m
601602
}
602603

@@ -606,7 +607,7 @@ func (c *channelImpl) Receive(ctx Context, valuePtr interface{}) (more bool) {
606607
state.unblocked()
607608
return m
608609
}
609-
continue //corrupt signal. Drop and reset process
610+
continue // corrupt signal. Drop and reset process
610611
}
611612
for {
612613
if hasResult {
@@ -615,7 +616,7 @@ func (c *channelImpl) Receive(ctx Context, valuePtr interface{}) (more bool) {
615616
state.unblocked()
616617
return more
617618
}
618-
break //Corrupt signal. Drop and reset process.
619+
break // Corrupt signal. Drop and reset process.
619620
}
620621
state.yield(fmt.Sprintf("blocked on %s.Receive", c.name))
621622
}
@@ -631,7 +632,7 @@ func (c *channelImpl) ReceiveAsync(valuePtr interface{}) (ok bool) {
631632
func (c *channelImpl) ReceiveAsyncWithMoreFlag(valuePtr interface{}) (ok bool, more bool) {
632633
for {
633634
v, ok, more := c.receiveAsyncImpl(nil)
634-
if !ok && !more { //channel closed and empty
635+
if !ok && !more { // channel closed and empty
635636
return ok, more
636637
}
637638

@@ -774,7 +775,7 @@ func (c *channelImpl) Close() {
774775
// Takes a value and assigns that 'to' value. logs a metric if it is unable to deserialize
775776
func (c *channelImpl) assignValue(from interface{}, to interface{}) error {
776777
err := decodeAndAssignValue(c.dataConverter, from, to)
777-
//add to metrics
778+
// add to metrics
778779
if err != nil {
779780
c.env.GetLogger().Error(fmt.Sprintf("Corrupt signal received on channel %s. Error deserializing", c.name), zap.Error(err))
780781
c.env.GetMetricsScope().Counter(metrics.CorruptedSignalsCounter).Inc(1)

internal/internal_workflow_testsuite_test.go

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2793,7 +2793,7 @@ func (s *WorkflowTestSuiteUnitTest) Test_ChildWorkflowAlreadyRunning() {
27932793
ctx1 := WithChildWorkflowOptions(ctx, ChildWorkflowOptions{
27942794
WorkflowID: "Test_ChildWorkflowAlreadyRunning",
27952795
ExecutionStartToCloseTimeout: time.Minute,
2796-
//WorkflowIDReusePolicy: WorkflowIDReusePolicyAllowDuplicate,
2796+
// WorkflowIDReusePolicy: WorkflowIDReusePolicyAllowDuplicate,
27972797
})
27982798

27992799
var result1, result2 string
@@ -3109,7 +3109,7 @@ func (s *WorkflowTestSuiteUnitTest) Test_Regression_ExecuteChildWorkflowWithCanc
31093109
// - <0 == do not cancel
31103110
// - 0 == cancel synchronously
31113111
// - >0 == cancel after waiting that long
3112-
check := func(cancelTime time.Duration, expected string) {
3112+
check := func(cancelTime time.Duration, bugport bool, expected string) {
31133113
env := s.NewTestWorkflowEnvironment()
31143114
env.Test(s.T())
31153115
env.RegisterWorkflowWithOptions(func(ctx Context) error {
@@ -3129,6 +3129,9 @@ func (s *WorkflowTestSuiteUnitTest) Test_Regression_ExecuteChildWorkflowWithCanc
31293129
ctx = WithChildWorkflowOptions(ctx, ChildWorkflowOptions{
31303130
ExecutionStartToCloseTimeout: 2 * time.Minute,
31313131
TaskStartToCloseTimeout: 2 * time.Minute,
3132+
Bugports: Bugports{
3133+
StartChildWorkflowsOnCanceledContext: bugport,
3134+
},
31323135
})
31333136
err := ExecuteChildWorkflow(ctx, "child").Get(ctx, nil)
31343137

@@ -3150,14 +3153,20 @@ func (s *WorkflowTestSuiteUnitTest) Test_Regression_ExecuteChildWorkflowWithCanc
31503153
}
31513154
s.Run("sanity check", func() {
31523155
// workflow should run the child successfully normally...
3153-
check(-1, "no err")
3156+
check(-1, false, "no err")
31543157
})
31553158
s.Run("canceled after child starts", func() {
31563159
// ... and cancel the child when the child is canceled...
3157-
check(30*time.Second, "canceled")
3160+
check(30*time.Second, false, "canceled")
31583161
})
31593162
s.Run("canceled before child starts", func() {
31603163
// ... and should not start the child (i.e. be canceled) when canceled before it is started.
3161-
check(0, "canceled")
3164+
check(0, false, "canceled")
3165+
})
3166+
s.Run("canceled before child starts with bugport enabled", func() {
3167+
// prior to v0.18.4, canceling before the child was started would still start the child,
3168+
// and it would continue running.
3169+
// the bugport provides this old behavior to ease migration, at least until we feel the need to remove it.
3170+
check(0, true, "no err")
31623171
})
31633172
}

internal/workflow.go

Lines changed: 78 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -472,6 +472,65 @@ type (
472472
// ParentClosePolicy - Optional policy to decide what to do for the child.
473473
// Default is Terminate (if onboarded to this feature)
474474
ParentClosePolicy ParentClosePolicy
475+
476+
// Bugports allows opt-in enabling of older, possibly buggy behavior, primarily intended to allow temporarily
477+
// emulating old behavior until a fix is deployed.
478+
//
479+
// Bugports are always deprecated and may be removed in future versions.
480+
// Generally speaking they will *likely* remain in place for one minor version, and then they may be removed to
481+
// allow cleaning up the additional code complexity that they cause.
482+
//
483+
// deprecated
484+
Bugports Bugports
485+
}
486+
487+
// Bugports allows opt-in enabling of older, possibly buggy behavior, primarily intended to allow temporarily
488+
// emulating old behavior until a fix is deployed.
489+
// By default, bugs (especially rarely-occurring ones) are fixed and all users are opted into the new behavior.
490+
// Back-ported buggy behavior *may* be available via these flags.
491+
//
492+
// Fields in here are NOT guaranteed to be stable. They will almost certainly be removed in the next major
493+
// release, and might be removed earlier if a need arises, e.g. if the historical behavior causes too much of an
494+
// increase in code complexity.
495+
//
496+
// See each individual field for details.
497+
//
498+
// Bugports are always deprecated and may be removed in future versions.
499+
// Generally speaking they will *likely* remain in place for one minor version, and then they may be removed to
500+
// allow cleaning up the additional code complexity that they cause.
501+
//
502+
// deprecated
503+
Bugports struct {
504+
// StartChildWorkflowsOnCanceledContext allows emulating older, buggy behavior that existed prior to v0.18.4.
505+
//
506+
// Prior to the fix, child workflows would be started and keep running when their context was canceled in two
507+
// situations:
508+
// 1) when the context was canceled before ExecuteChildWorkflow is called, and
509+
// 2) when the context was canceled after ExecuteChildWorkflow but before the child workflow was started.
510+
//
511+
// 1 is unfortunately easy to trigger, though many workflows will encounter an error earlier and not reach the
512+
// child-workflow-executing code. 2 is expected to be very rare in practice.
513+
//
514+
// To permanently emulate old behavior, use a disconnected context when starting child workflows, and
515+
// cancel it only after `childfuture.GetWorkflowExecution().Get(...)` returns. This can be used when this flag
516+
// is removed in the future.
517+
//
518+
// If you have currently-broken workflows and need to repair them, there are two primary options:
519+
//
520+
// 1: Check the BinaryChecksum value of your new deploy and/or of the decision that is currently failing
521+
// workflows. Then set this flag when replaying history on those not-fixed checksums. Concretely, this means
522+
// checking both `workflow.GetInfo(ctx).BinaryChecksum` (note that sufficiently old clients may not have
523+
// recorded a value, and it may be nil) and `workflow.IsReplaying(ctx)`.
524+
//
525+
// 2: Reset broken workflows back to either before the buggy behavior was recorded, or before the fixed behavior
526+
// was deployed. A "bad binary" reset type can do the latter in bulk, see the CLI's
527+
// `cadence workflow reset-batch --reset_type BadBinary --help` for details. For the former, check the failing
528+
// histories, identify the point at which the bug occurred, and reset to prior to that decision task.
529+
//
530+
// Added in 0.18.4, this may be removed in or after v0.19.0, so please migrate off of it ASAP.
531+
//
532+
// deprecated
533+
StartChildWorkflowsOnCanceledContext bool
475534
}
476535
)
477536

@@ -896,15 +955,23 @@ func (wc *workflowEnvironmentInterceptor) ExecuteChildWorkflow(ctx Context, chil
896955
decodeFutureImpl: mainFuture.(*decodeFutureImpl),
897956
executionFuture: executionFuture.(*futureImpl),
898957
}
958+
// clients prior to v0.18.4 would incorrectly start child workflows that were started with cancelled contexts,
959+
// and did not react to cancellation between requested and started.
960+
correctChildCancellation := true
961+
workflowOptionsFromCtx := getWorkflowEnvOptions(ctx)
899962

900963
// Starting with a canceled context should immediately fail, no need to even try.
901964
if ctx.Err() != nil {
902-
mainSettable.SetError(ctx.Err())
903-
executionSettable.SetError(ctx.Err())
904-
return result
965+
if workflowOptionsFromCtx.bugports.StartChildWorkflowsOnCanceledContext {
966+
// backport the bug
967+
correctChildCancellation = false
968+
} else {
969+
mainSettable.SetError(ctx.Err())
970+
executionSettable.SetError(ctx.Err())
971+
return result
972+
}
905973
}
906974

907-
workflowOptionsFromCtx := getWorkflowEnvOptions(ctx)
908975
dc := workflowOptionsFromCtx.dataConverter
909976
env := getWorkflowEnvironment(ctx)
910977
wfType, input, err := getValidatedWorkflowFunction(childWorkflowType, args, dc, env.GetRegistry())
@@ -951,7 +1018,11 @@ func (wc *workflowEnvironmentInterceptor) ExecuteChildWorkflow(ctx Context, chil
9511018

9521019
// forward the delayed cancellation if necessary
9531020
if shouldCancelAsync && e == nil && !mainFuture.IsReady() {
954-
getWorkflowEnvironment(ctx).RequestCancelChildWorkflow(*options.domain, childWorkflowExecution.ID)
1021+
if workflowOptionsFromCtx.bugports.StartChildWorkflowsOnCanceledContext {
1022+
// do nothing: buggy behavior did not forward the cancellation
1023+
} else {
1024+
getWorkflowEnvironment(ctx).RequestCancelChildWorkflow(*options.domain, childWorkflowExecution.ID)
1025+
}
9551026
}
9561027
})
9571028

@@ -967,7 +1038,7 @@ func (wc *workflowEnvironmentInterceptor) ExecuteChildWorkflow(ctx Context, chil
9671038
if childWorkflowExecution != nil && !mainFuture.IsReady() {
9681039
// child workflow started, and ctx cancelled. forward cancel to the child.
9691040
getWorkflowEnvironment(ctx).RequestCancelChildWorkflow(*options.domain, childWorkflowExecution.ID)
970-
} else if childWorkflowExecution == nil {
1041+
} else if childWorkflowExecution == nil && correctChildCancellation {
9711042
// decision to start the child has been made, but it has not yet started.
9721043

9731044
// TODO: ideal, but not strictly necessary for correctness:
@@ -1294,6 +1365,7 @@ func WithChildWorkflowOptions(ctx Context, cwo ChildWorkflowOptions) Context {
12941365
wfOptions.memo = cwo.Memo
12951366
wfOptions.searchAttributes = cwo.SearchAttributes
12961367
wfOptions.parentClosePolicy = cwo.ParentClosePolicy
1368+
wfOptions.bugports = cwo.Bugports
12971369

12981370
return ctx1
12991371
}

0 commit comments

Comments
 (0)