Skip to content

Commit f619345

Browse files
authored
Clear workflow state when not cached and not complete (#1111)
This resolves a bug a user encountered, where a full sticky cache + a query for a non-cached workflow would result in a goroutine leak due to the associated event handler never being shut down. Since this leak retains all in-workflow data, it can eventually lead to an out-of-memory crash, though it does not cause any logic errors (these abandoned goroutines are forever idle once abandoned). There are probably also other scenarios where this is possible, hopefully all caught by this addition. --- Separately: this function seems to be far too complex, and is almost certainly duplicating checks made elsewhere, which should not be duplicated like this. There have been multiple issues with state-clearing that have lead to adding conditions to this func, which is a clear sign of a code smell. State / cache decisions like this should be made in exactly one place ever, and built up as obviously as possible, to ensure gaps like this never occur. In this case we'll likely need to invert the dependency flow somehow, so callers control when cache is cleared based on whether or not it is cached, rather than double-checking internally like this. We should also probably add something like go.uber.org/goleak to our tests, to help ensure we do not have goroutine leaks. This may not have been caught by that, as the steps leading to it are a bit odd and rely on singleton config (sticky cache size), but it may find or prevent others. With this PR we now have at least one test using it, but it'll probably take some time to roll out all over, and to add missing test-state cleanup funcs.
1 parent c4a7e03 commit f619345

File tree

3 files changed

+126
-11
lines changed

3 files changed

+126
-11
lines changed

Makefile

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -134,11 +134,13 @@ bins: thriftc $(ALL_SRC) $(BUILD)/copyright lint $(BUILD)/dummy
134134
unit_test: $(BUILD)/dummy
135135
@mkdir -p $(COVER_ROOT)
136136
@echo "mode: atomic" > $(UT_COVER_FILE)
137-
@for dir in $(UT_DIRS); do \
137+
@failed=0; \
138+
for dir in $(UT_DIRS); do \
138139
mkdir -p $(COVER_ROOT)/"$$dir"; \
139-
go test "$$dir" $(TEST_ARG) -coverprofile=$(COVER_ROOT)/"$$dir"/cover.out || exit 1; \
140+
go test "$$dir" $(TEST_ARG) -coverprofile=$(COVER_ROOT)/"$$dir"/cover.out || failed=1; \
140141
cat $(COVER_ROOT)/"$$dir"/cover.out | grep -v "mode: atomic" >> $(UT_COVER_FILE); \
141-
done;
142+
done; \
143+
exit $$failed
142144

143145
integ_test_sticky_off: $(BUILD)/dummy
144146
@mkdir -p $(COVER_ROOT)

internal/internal_task_handlers.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -476,16 +476,24 @@ func (w *workflowExecutionContextImpl) Lock() {
476476
}
477477

478478
func (w *workflowExecutionContextImpl) Unlock(err error) {
479+
cleared := false
480+
cached := getWorkflowCache().Exist(w.workflowInfo.WorkflowExecution.RunID)
479481
if err != nil || w.err != nil || w.isWorkflowCompleted || (w.wth.disableStickyExecution && !w.hasPendingLocalActivityWork()) {
480482
// TODO: in case of closed, it assumes the close decision always succeed. need server side change to return
481-
// error to indicate the close failure case. This should be rear case. For now, always remove the cache, and
483+
// error to indicate the close failure case. This should be rare case. For now, always remove the cache, and
482484
// if the close decision failed, the next decision will have to rebuild the state.
483-
if getWorkflowCache().Exist(w.workflowInfo.WorkflowExecution.RunID) {
485+
if cached {
486+
// also clears state asynchronously via cache eviction
484487
removeWorkflowContext(w.workflowInfo.WorkflowExecution.RunID)
485488
} else {
486-
// sticky is disabled, manually clear the workflow state.
487489
w.clearState()
488490
}
491+
cleared = true
492+
}
493+
// there are a variety of reasons a workflow may not have been put into the cache.
494+
// all of them mean we need to clear the state at this point, or any running goroutines will be orphaned.
495+
if !cleared && !cached {
496+
w.clearState()
489497
}
490498

491499
w.mutex.Unlock()

internal/internal_task_handlers_test.go

Lines changed: 110 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,18 +28,19 @@ import (
2828
"fmt"
2929
"reflect"
3030
"strings"
31+
"sync"
3132
"testing"
3233
"time"
3334

35+
"github.com/golang/mock/gomock"
3436
"github.com/opentracing/opentracing-go"
3537
"github.com/pborman/uuid"
36-
"github.com/stretchr/testify/suite"
37-
38-
"github.com/golang/mock/gomock"
3938
"github.com/stretchr/testify/require"
39+
"github.com/stretchr/testify/suite"
4040
"go.uber.org/cadence/.gen/go/cadence/workflowservicetest"
4141
s "go.uber.org/cadence/.gen/go/shared"
4242
"go.uber.org/cadence/internal/common"
43+
"go.uber.org/goleak"
4344
"go.uber.org/zap"
4445
)
4546

@@ -416,7 +417,7 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_ActivityTaskScheduled() {
416417
t.NotNil(response.Decisions[0].CompleteWorkflowExecutionDecisionAttributes)
417418
}
418419

419-
func (t *TaskHandlersTestSuite) TestWorkflowTask_QueryWorkflow_Sticky() {
420+
func (t *TaskHandlersTestSuite) TestWorkflowTask_QueryWorkflow() {
420421
// Schedule an activity and see if we complete workflow.
421422
taskList := "sticky-tl"
422423
execution := &s.WorkflowExecution{
@@ -463,8 +464,12 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_QueryWorkflow_Sticky() {
463464
t.verifyQueryResult(queryResp, "waiting-activity-result")
464465
}
465466

466-
func (t *TaskHandlersTestSuite) TestWorkflowTask_QueryWorkflow_NonSticky() {
467+
func (t *TaskHandlersTestSuite) TestWorkflowTask_QueryWorkflow_2() {
467468
// Schedule an activity and see if we complete workflow.
469+
470+
// This test appears to be just a finer-grained version of TestWorkflowTask_QueryWorkflow, though the older names
471+
// for them implied entirely different purposes. Likely it can be combined with TestWorkflowTask_QueryWorkflow
472+
// without losing anything useful.
468473
taskList := "tl1"
469474
testEvents := []*s.HistoryEvent{
470475
createTestEventWorkflowExecutionStarted(1, &s.WorkflowExecutionStartedEventAttributes{TaskList: &s.TaskList{Name: &taskList}}),
@@ -478,6 +483,8 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_QueryWorkflow_NonSticky() {
478483
}),
479484
createTestEventActivityTaskStarted(6, &s.ActivityTaskStartedEventAttributes{}),
480485
createTestEventActivityTaskCompleted(7, &s.ActivityTaskCompletedEventAttributes{ScheduledEventId: common.Int64Ptr(5)}),
486+
// TODO: below seems irrational. there's a start without a schedule, and this workflow does not respond to signals.
487+
// aside from this, the list of tasks is the same as TestWorkflowTask_QueryWorkflow
481488
createTestEventDecisionTaskStarted(8),
482489
createTestEventWorkflowExecutionSignaled(9, "test-signal"),
483490
}
@@ -1449,6 +1456,104 @@ func (t *TaskHandlersTestSuite) TestActivityExecutionWorkerStop() {
14491456
t.NotNil(r)
14501457
}
14511458

1459+
// a regrettably-hacky func to use goleak to count leaking goroutines.
1460+
// ideally there will be a structured way to do this in the future, rather than string parsing
1461+
func countLeaks(leaks error) int {
1462+
if leaks == nil {
1463+
return 0
1464+
}
1465+
// leak messages look something like:
1466+
// Goroutine 23 in state chan receive, with go.uber.org/cadence/internal.(*coroutineState).initialYield on top of the stack:
1467+
// ... stacktrace ...
1468+
//
1469+
// Goroutine 28 ... on top of the stack:
1470+
// ... stacktrace ...
1471+
return strings.Count(leaks.Error(), "on top of the stack")
1472+
}
1473+
1474+
func (t *TaskHandlersTestSuite) TestRegression_QueriesDoNotLeakGoroutines() {
1475+
// this test must not be run in parallel with most other tests, as it mutates global vars
1476+
var ridsToCleanUp []string
1477+
originalLeaks := goleak.Find()
1478+
defer func(size int) {
1479+
// empty the cache to clear out any newly-introduced leaks
1480+
current := getWorkflowCache()
1481+
for _, rid := range ridsToCleanUp {
1482+
current.Delete(rid)
1483+
}
1484+
// check the cleanup
1485+
currentLeaks := goleak.Find()
1486+
if countLeaks(currentLeaks) != countLeaks(originalLeaks) {
1487+
t.T().Errorf("failed to clean up goroutines.\nOriginal state:\n%v\n\nCurrent state:\n%v", originalLeaks, currentLeaks)
1488+
}
1489+
1490+
// reset everything to make it "normal".
1491+
// this does NOT restore the original workflow cache - that cannot be done correctly, initCacheOnce is not safe to copy (thus restore).
1492+
stickyCacheSize = size
1493+
workflowCache = nil
1494+
initCacheOnce = sync.Once{}
1495+
}(stickyCacheSize)
1496+
workflowCache = nil
1497+
initCacheOnce = sync.Once{}
1498+
// cache is intentionally not *disabled*, as that would go down no-cache code paths.
1499+
// also, there is an LRU-cache bug where the size allows N to enter, but then removes until N-1 remain,
1500+
// so a size of 2 actually means a size of 1.
1501+
SetStickyWorkflowCacheSize(2)
1502+
1503+
taskList := "tl1"
1504+
params := workerExecutionParameters{
1505+
TaskList: taskList,
1506+
Identity: "test-id-1",
1507+
Logger: t.logger,
1508+
DisableStickyExecution: false,
1509+
}
1510+
taskHandler := newWorkflowTaskHandler(testDomain, params, nil, t.registry)
1511+
1512+
// process a throw-away workflow to fill the cache. this is copied from TestWorkflowTask_QueryWorkflow since it's
1513+
// relatively simple, but any should work fine, as long as it can be queried.
1514+
testEvents := []*s.HistoryEvent{
1515+
createTestEventWorkflowExecutionStarted(1, &s.WorkflowExecutionStartedEventAttributes{TaskList: &s.TaskList{Name: &taskList}}),
1516+
createTestEventDecisionTaskScheduled(2, &s.DecisionTaskScheduledEventAttributes{TaskList: &s.TaskList{Name: &taskList}}),
1517+
createTestEventDecisionTaskStarted(3),
1518+
createTestEventDecisionTaskCompleted(4, &s.DecisionTaskCompletedEventAttributes{ScheduledEventId: common.Int64Ptr(2)}),
1519+
createTestEventActivityTaskScheduled(5, &s.ActivityTaskScheduledEventAttributes{
1520+
ActivityId: common.StringPtr("0"),
1521+
ActivityType: &s.ActivityType{Name: common.StringPtr("Greeter_Activity")},
1522+
TaskList: &s.TaskList{Name: &taskList},
1523+
}),
1524+
}
1525+
cachedTask := createWorkflowTask(testEvents[0:1], 1, "HelloWorld_Workflow")
1526+
cachedTask.WorkflowExecution.WorkflowId = common.StringPtr("cache-filling workflow id")
1527+
ridsToCleanUp = append(ridsToCleanUp, *cachedTask.WorkflowExecution.RunId)
1528+
_, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: cachedTask}, nil)
1529+
1530+
// sanity check that the cache was indeed filled, and that it has created a goroutine
1531+
require.NoError(t.T(), err, "cache-filling must succeed")
1532+
require.Equal(t.T(), 1, getWorkflowCache().Size(), "workflow should be cached, but was not")
1533+
oneCachedLeak := goleak.Find()
1534+
require.Error(t.T(), oneCachedLeak, "expected at least one leaking goroutine")
1535+
require.Equal(t.T(), countLeaks(originalLeaks)+1, countLeaks(oneCachedLeak), // ideally == 1, but currently there are other leaks
1536+
"expected the cached workflow to leak one goroutine. original leaks:\n%v\n\nleaks after one workflow:\n%v", originalLeaks, oneCachedLeak)
1537+
1538+
// now query a different workflow ID / run ID
1539+
uncachedTask := createQueryTask(testEvents, 5, "HelloWorld_Workflow", queryType)
1540+
uncachedTask.WorkflowExecution.WorkflowId = common.StringPtr("should not leak this workflow id")
1541+
ridsToCleanUp = append(ridsToCleanUp, *uncachedTask.WorkflowExecution.RunId) // only necessary if the test fails
1542+
result, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: uncachedTask}, nil)
1543+
require.NoError(t.T(), err)
1544+
t.verifyQueryResult(result, "waiting-activity-result") // largely a sanity check
1545+
1546+
// and finally the purpose of this test:
1547+
// verify that the cache has not been modified, and that there is no new leak
1548+
t.Equal(1, getWorkflowCache().Size(), "workflow cache should be the same size")
1549+
t.True(getWorkflowCache().Exist(cachedTask.WorkflowExecution.GetRunId()), "originally-cached workflow should still be cached")
1550+
t.False(getWorkflowCache().Exist(uncachedTask.WorkflowExecution.GetRunId()), "queried workflow should not be cached")
1551+
newLeaks := goleak.Find()
1552+
t.Error(newLeaks, "expected at least one leaking goroutine")
1553+
t.Equal(countLeaks(oneCachedLeak), countLeaks(newLeaks),
1554+
"expected the query to leak no new goroutines. before query:\n%v\n\nafter query:\n%v", oneCachedLeak, newLeaks)
1555+
}
1556+
14521557
func Test_NonDeterministicCheck(t *testing.T) {
14531558
decisionTypes := s.DecisionType_Values()
14541559
require.Equal(t, 13, len(decisionTypes), "If you see this error, you are adding new decision type. "+

0 commit comments

Comments
 (0)