Skip to content

Commit dcaec77

Browse files
authored
Maintain a stable order of children context, resolves a non-determinism around cancels (#1183)
After some tough-to-identify determinism issues in what appeared to be correct user workflows, and some investigations by both us and them: This PR resolves a non-deterministic behavior involving child context cancellation propagation, in particular when unblocking selects based on those contexts (possibly transitively, e.g. via activity futures). As this was previously non-deterministic behavior, both the previous and new code _could_ cause determinism failures after upgrading... but the random execution order previously stood a good chance of failing a few times and then automatically resolving itself. Unfortunately that is not maintained here - failures are likely to be permanent. Resolving this is... probably not feasible currently. We do not record client-library versions in workflow history, so we cannot maintain backwards compatibility accurately in scenarios like this. We almost certainly _should_ record this on decisions, at least when it changes - we could randomly cancel entries in the list when replaying old decisions, and allow the random behavior to eventually choose a stable execution on a host somewhere. In any case, for all future workflows this makes behavior deterministic, and should resolve the issue for good. --- A full repro can be seen with: 1. Create multiple cancellable child contexts off a single cancellable parent context, populating its child-context map. 2. Base some behavior off each child context. Any one-shot logic works, but activities are pretty easy and occur a lot in practice (i.e. waiting on N activities, and being able to cancel many at once). 3. Block on the selector. 4. Cancel the parent context. This will: 1. Cancel the parent context 2. Propagate that to a _random_ child context 3. Which will synchronously resolve the future(s) attached to the child context 4. Which will synchronously trigger any pending callbacks 5. One of which is a "first call wins" closure which the selector uses to choose which branch to execute Maintaining the children contexts in _an_ order resolves this, as it ensures the same child is canceled first (then second, etc) each time. Any order should work. For clearer semantics, I chose to implement it as a compacting FIFO list (as children can remove themselves if they are cancelled independently). This is not noticeably costly (maintenance in a large list will be dwarfed by any side effects of canceling) and it makes it very easy to define and hopefully maintain, as it _must not_ be changed. --- This order decision _will not_ be a defined semantic of workflows, however. Cancellation of multiple futures / selector branches _should_ be treated as unordered, and implementing exactly the same behavior in other languages may not be efficient. In a future implementation it may be worth making selectors choose from _any_ available branch pseudo-randomly, e.g. by run-ID, for the same reason Go explicitly randomizes these behaviors: it prevents accidentally depending on implementation details, by exposing logical flaws sooner.
1 parent aa89bb7 commit dcaec77

File tree

2 files changed

+176
-28
lines changed

2 files changed

+176
-28
lines changed

internal/context.go

Lines changed: 44 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -230,10 +230,7 @@ func propagateCancel(parent Context, child canceler) {
230230
child.cancel(false, p.err)
231231
} else {
232232
p.childrenLock.Lock()
233-
if p.children == nil {
234-
p.children = make(map[canceler]bool)
235-
}
236-
p.children[child] = true
233+
p.children = append(p.children, child)
237234
p.childrenLock.Unlock()
238235
p.cancelLock.Unlock()
239236
}
@@ -258,7 +255,7 @@ func parentCancelCtx(parent Context) (*cancelCtx, bool) {
258255
case *cancelCtx:
259256
return c, true
260257
// TODO: Uncomment once timer story is implemented
261-
//case *timerCtx:
258+
// case *timerCtx:
262259
// return c.cancelCtx, true
263260
case *valueCtx:
264261
parent = c.Context
@@ -278,10 +275,31 @@ func removeChild(parent Context, child canceler) {
278275
p.childrenLock.Lock()
279276
defer p.childrenLock.Unlock()
280277
if p.children != nil {
281-
delete(p.children, child)
278+
removeChildFromSlice(p.children, child)
282279
}
283280
}
284281

282+
// Helper to remove a child from a context's canceler list.
283+
// There should only ever be one instance per list due to code elsewhere,
284+
// but this func does not check or enforce that.
285+
func removeChildFromSlice(children []canceler, child canceler) []canceler {
286+
// This maintains the original order, mostly because it makes behavior easier to reason about
287+
// in case that becomes necessary (e.g. bug hunting).
288+
// Out-of-order (move last item into the gap) is equally correct and slightly more efficient,
289+
// but this likely cannot be changed without changing the order of code execution.
290+
found := -1
291+
for idx, c := range children {
292+
if c == child {
293+
found = idx
294+
break
295+
}
296+
}
297+
if found >= 0 {
298+
children = append(children[:found], children[found+1:]...)
299+
}
300+
return children
301+
}
302+
285303
// A canceler is a context type that can be canceled directly. The
286304
// implementations are *cancelCtx and *timerCtx.
287305
type canceler interface {
@@ -300,8 +318,8 @@ type cancelCtx struct {
300318
canceled bool
301319

302320
childrenLock sync.Mutex
303-
children map[canceler]bool // set to nil by the first cancel call
304-
err error // set to non-nil by the first cancel call
321+
children []canceler
322+
err error // set to non-nil by the first cancel call
305323
}
306324

307325
func (c *cancelCtx) Done() Channel {
@@ -320,11 +338,9 @@ func (c *cancelCtx) getChildren() []canceler {
320338
c.childrenLock.Lock()
321339
defer c.childrenLock.Unlock()
322340

323-
out := []canceler{}
324-
for key := range c.children {
325-
out = append(out, key)
326-
}
327-
return out
341+
dup := make([]canceler, len(c.children))
342+
copy(dup, c.children)
343+
return dup
328344
}
329345

330346
// cancel closes c.done, cancels each of c's children, and, if
@@ -374,7 +390,7 @@ func (c *cancelCtx) cancel(removeFromParent bool, err error) {
374390
//
375391
// Canceling this context releases resources associated with it, so code should
376392
// call cancel as soon as the operations running in this Context complete.
377-
//func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc) {
393+
// func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc) {
378394
// if cur, ok := parent.Deadline(); ok && cur.Before(deadline) {
379395
// // The current deadline is already sooner than the new one.
380396
// return WithCancel(parent)
@@ -395,27 +411,27 @@ func (c *cancelCtx) cancel(removeFromParent bool, err error) {
395411
// })
396412
// }
397413
// return c, func() { c.cancel(true, Canceled) }
398-
//}
414+
// }
399415
//
400-
//// A timerCtx carries a timer and a deadline. It embeds a cancelCtx to
401-
//// implement Done and Err. It implements cancel by stopping its timer then
402-
//// delegating to cancelCtx.cancel.
403-
//type timerCtx struct {
416+
// // A timerCtx carries a timer and a deadline. It embeds a cancelCtx to
417+
// // implement Done and Err. It implements cancel by stopping its timer then
418+
// // delegating to cancelCtx.cancel.
419+
// type timerCtx struct {
404420
// *cancelCtx
405421
// timer *time.Timer // Under cancelCtx.mu.
406422
//
407423
// deadline time.Time
408-
//}
424+
// }
409425
//
410-
//func (c *timerCtx) Deadline() (deadline time.Time, ok bool) {
426+
// func (c *timerCtx) Deadline() (deadline time.Time, ok bool) {
411427
// return c.deadline, true
412-
//}
428+
// }
413429
//
414-
//func (c *timerCtx) String() string {
430+
// func (c *timerCtx) String() string {
415431
// return fmt.Sprintf("%v.WithDeadline(%s [%s])", c.cancelCtx.Context, c.deadline, c.deadline.Sub(time.Now()))
416-
//}
432+
// }
417433
//
418-
//func (c *timerCtx) cancel(removeFromParent bool, err error) {
434+
// func (c *timerCtx) cancel(removeFromParent bool, err error) {
419435
// c.cancelCtx.cancel(false, err)
420436
// if removeFromParent {
421437
// // Remove this timerCtx from its parent cancelCtx's children.
@@ -425,7 +441,7 @@ func (c *cancelCtx) cancel(removeFromParent bool, err error) {
425441
// c.timer.Stop()
426442
// c.timer = nil
427443
// }
428-
//}
444+
// }
429445
//
430446
// WithTimeout returns WithDeadline(parent, time.Now().Add(timeout)).
431447
//
@@ -437,9 +453,9 @@ func (c *cancelCtx) cancel(removeFromParent bool, err error) {
437453
// defer cancel() // releases resources if slowOperation completes before timeout elapses
438454
// return slowOperation(ctx)
439455
// }
440-
//func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
456+
// func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
441457
// return WithDeadline(parent, time.Now().Add(timeout))
442-
//}
458+
// }
443459

444460
// WithValue returns a copy of parent in which the value associated with key is
445461
// val.

internal/context_test.go

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,13 @@
2222
package internal
2323

2424
import (
25+
"context"
2526
"testing"
2627
"time"
2728

2829
"github.com/stretchr/testify/assert"
30+
"github.com/stretchr/testify/mock"
31+
"github.com/stretchr/testify/require"
2932
)
3033

3134
func TestContextChildParentCancelRace(t *testing.T) {
@@ -128,3 +131,132 @@ func TestContextAddChildCancelParentRace(t *testing.T) {
128131
env.ExecuteWorkflow(wf)
129132
assert.NoError(t, env.GetWorkflowError())
130133
}
134+
135+
func TestContextCancellationOrderDeterminism(t *testing.T) {
136+
/*
137+
Previously, child-contexts were stored in a map, preventing deterministic order when propagating cancellation.
138+
The order of branches being selected in this test was random, both for the first event and in following ones.
139+
140+
In principle this should be fine, but it's possible for the effects of cancellation to trigger a selector's
141+
future-done callback, which currently records the *real-time*-first event as the branch to unblock, rather than
142+
doing something more safe by design (e.g. choosing based on state when the selector's goroutine is unblocked).
143+
144+
Unfortunately, we cannot change the selector's behavior without introducing non-backwards-compatible changes to
145+
currently-working workflows.
146+
147+
So the workaround for now is to maintain child-context order, so they are canceled in a consistent order.
148+
As this order was not controlled before, and Go does a pretty good job at randomizing map iteration order,
149+
converting non-determinism to determinism should be strictly no worse for backwards compatibility, and it
150+
fixes the issue for future executions.
151+
*/
152+
check := func(t *testing.T, separateStart, separateSelect bool) {
153+
env := newTestWorkflowEnv(t)
154+
act := func(ctx context.Context) error {
155+
return nil // will be mocked
156+
}
157+
wf := func(ctx Context) ([]int, error) {
158+
ctx, cancel := WithCancel(ctx)
159+
Go(ctx, func(ctx Context) {
160+
_ = Sleep(ctx, time.Minute)
161+
cancel()
162+
})
163+
164+
// start some activities, which will not complete before the timeout cancels them
165+
ctx = WithActivityOptions(ctx, ActivityOptions{
166+
TaskList: "",
167+
ScheduleToCloseTimeout: time.Hour,
168+
ScheduleToStartTimeout: time.Hour,
169+
StartToCloseTimeout: time.Hour,
170+
})
171+
s := NewSelector(ctx)
172+
var result []int
173+
for i := 0; i < 10; i++ {
174+
i := i
175+
// need a child context, a future alone is not enough as it does not become a child
176+
cctx, ccancel := WithCancel(ctx)
177+
178+
s.AddFuture(ExecuteActivity(cctx, act), func(f Future) {
179+
ccancel() // TODO: is this necessary to prevent leaks? if it is, how can we make it not?
180+
err := f.Get(ctx, nil)
181+
if err == nil || !IsCanceledError(err) {
182+
// fail the test, this should not happen - activities must be canceled or it's not valid.
183+
t.Errorf("activity completion or failure for some reason other than cancel: %v", err)
184+
}
185+
result = append(result, i)
186+
})
187+
188+
if separateStart {
189+
// yield so they are submitted one at a time, in case that matters
190+
_ = Sleep(ctx, time.Second)
191+
}
192+
}
193+
for i := 0; i < 10; i++ {
194+
if separateSelect {
195+
// yield so they are selected one at a time, in case that matters
196+
_ = Sleep(ctx, time.Second)
197+
}
198+
s.Select(ctx)
199+
}
200+
201+
return result, nil
202+
}
203+
env.RegisterWorkflow(wf)
204+
env.RegisterActivity(act)
205+
206+
// activities must not complete in time
207+
env.OnActivity(act, mock.Anything).After(5 * time.Minute).Return(nil)
208+
209+
env.ExecuteWorkflow(wf)
210+
require.NoError(t, env.GetWorkflowError())
211+
var result []int
212+
require.NoError(t, env.GetWorkflowResult(&result))
213+
require.NotEmpty(t, result)
214+
assert.Equal(t, 0, result[0], "first activity to be created should be the first one canceled")
215+
assert.Equal(t, []int{1, 2, 3, 4, 5, 6, 7, 8, 9}, result[1:], "other activities should finish in a consistent (but undefined) order")
216+
}
217+
218+
type variant struct {
219+
name string
220+
separateStart bool
221+
separateSelect bool
222+
}
223+
// all variants expose this behavior, but being a bit more exhaustive in the face
224+
// of decision-scheduling differences seems good.
225+
for _, test := range []variant{
226+
{"many in one decision", false, false},
227+
{"many started at once, selected slowly", false, true},
228+
{"started slowly, selected quickly", true, false},
229+
{"started and selected slowly", true, true},
230+
} {
231+
t.Run(test.name, func(t *testing.T) {
232+
check(t, test.separateStart, test.separateSelect)
233+
})
234+
}
235+
}
236+
237+
func BenchmarkSliceMaintenance(b *testing.B) {
238+
// all essentially identical
239+
b.Run("append", func(b *testing.B) {
240+
data := []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
241+
for i := 0; i < b.N; i++ {
242+
data = append(data[:5], data[6:]...)
243+
data = append(data, i) // keep the slice the same size for all iterations
244+
}
245+
})
246+
b.Run("copy", func(b *testing.B) {
247+
data := []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
248+
for i := 0; i < b.N; i++ {
249+
copy(data[5:], data[6:])
250+
data = data[:9] // trim to actual size, as the last value is now duplicated. capacity is still 10.
251+
data = append(data, i) // keep the slice the same size for all iterations
252+
}
253+
})
254+
b.Run("copy explicit capacity", func(b *testing.B) {
255+
data := []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
256+
for i := 0; i < b.N; i++ {
257+
copy(data[5:], data[6:])
258+
data = data[:9:10] // trim to actual size, as the last value is now duplicated. explicitly reserve 10 cap.
259+
data = append(data, i) // keep the slice the same size for all iterations
260+
}
261+
})
262+
}

0 commit comments

Comments
 (0)