Skip to content

Commit a50e09a

Browse files
authored
fix(history): prevent signals from bypassing DelayStart (cadence-workflow#7702)
**What changed?** Fixes cadence-workflow#7595 by removing the SignalDelayStart protection flag and making the “don’t schedule the first decision” logic unconditional for both SignalWorkflowExecution and SignalWithStart. Strengthened coverage by asserting DecisionScheduleID stays commonconstants. EmptyEventID in the engine unit test while keeping the host integration test. **Why?** Issue cadence-workflow#7595 describes that a workflow waiting on DelayStart (or its first Cron decision) should remain idle even if it receives signals—otherwise the signal immediately schedules a decision and the workflow starts early. Both the regular signal path and SignalWithStart were still creating that decision, so DelayStart was effectively bypassed. This change makes the protection unconditional (matching the issue’s request) so signals now behave the same as the normal start path and DelayStart/Cron timers are honored. **How did you test it?** go test ./service/history/engine/engineimpl -count=1 go test ./host -run TestIntegrationSuite -testify.m TestSignalDoesNotOverrideDelayStart -count=1 Manual steps (SQLite): 1. ./cadence-server --zone sqlite start 2. ./bin/helloworld -m worker 3. ./cadence --do cadence-samples workflow start \ --workflow_id delay-signal-demo-4 \ --tasklist helloWorldGroup \ --workflow_type helloWorldWorkflow \ --execution_timeout 60 \ --decision_timeout 60 \ --delay_start_seconds 300 \ --input '"Cadence"' 4. ./cadence --do cadence-samples workflow signal \ --workflow_id delay-signal-demo-4 \ --name test-signal \ --input '"payload"' 5. ./cadence --do cadence-samples workflow describe \ --workflow_id delay-signal-demo-4 (Confirmed CloseStatus=null and PendingDecision=null while DelayStart timer was running.) **Potential risks** Low—only affects decision scheduling before the first decision task, no schema or API changes. **Release notes** Signals sent before a workflow’s first decision no longer schedule a decision task while DelayStart/Cron is pending (Fixes cadence-workflow#7595). **Documentation Changes** N/A – DelayStart behavior now matches the expectation from issue cadence-workflow#7595 (signals no longer wake the workflow early), so no docs need changes. --------- Signed-off-by: Pratik Chandra <pratik.offcwrk@gmail.com>
1 parent 5610bd2 commit a50e09a

File tree

5 files changed

+278
-11
lines changed

5 files changed

+278
-11
lines changed

host/integration_test.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -617,6 +617,72 @@ func (s *IntegrationSuite) TestDelayStartWorkflow() {
617617
)
618618
}
619619

620+
func (s *IntegrationSuite) TestSignalDoesNotOverrideDelayStart() {
621+
id := "integration-signal-delay-start-test"
622+
wt := "integration-signal-delay-start-test-type"
623+
tl := "integration-signal-delay-start-test-tasklist"
624+
identity := "worker1"
625+
626+
workflowType := &types.WorkflowType{Name: wt}
627+
taskList := &types.TaskList{Name: tl}
628+
629+
// Start workflow with a very large delay (300s) so it won't fire during the test
630+
request := &types.StartWorkflowExecutionRequest{
631+
RequestID: uuid.New(),
632+
Domain: s.DomainName,
633+
WorkflowID: id,
634+
WorkflowType: workflowType,
635+
TaskList: taskList,
636+
Input: nil,
637+
ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(600),
638+
TaskStartToCloseTimeoutSeconds: common.Int32Ptr(10),
639+
Identity: identity,
640+
DelayStartSeconds: common.Int32Ptr(300),
641+
}
642+
643+
ctx, cancel := createContext()
644+
defer cancel()
645+
we, err := s.Engine.StartWorkflowExecution(ctx, request)
646+
s.NoError(err)
647+
s.NotNil(we)
648+
649+
// Signal the workflow before the delay expires
650+
signalName := "test-signal"
651+
signalInput := []byte(`"payload"`)
652+
ctx2, cancel2 := createContext()
653+
defer cancel2()
654+
err = s.Engine.SignalWorkflowExecution(ctx2, &types.SignalWorkflowExecutionRequest{
655+
Domain: s.DomainName,
656+
WorkflowExecution: &types.WorkflowExecution{
657+
WorkflowID: id,
658+
RunID: we.RunID,
659+
},
660+
SignalName: signalName,
661+
Input: signalInput,
662+
Identity: identity,
663+
})
664+
s.NoError(err)
665+
666+
// Describe the workflow and verify no decision task is pending
667+
ctx3, cancel3 := createContext()
668+
defer cancel3()
669+
descResp, err := s.Engine.DescribeWorkflowExecution(ctx3, &types.DescribeWorkflowExecutionRequest{
670+
Domain: s.DomainName,
671+
Execution: &types.WorkflowExecution{
672+
WorkflowID: id,
673+
RunID: we.RunID,
674+
},
675+
})
676+
s.NoError(err)
677+
s.NotNil(descResp)
678+
679+
// Workflow should still be open (no close time, no close status)
680+
s.Nil(descResp.WorkflowExecutionInfo.CloseStatus)
681+
682+
// No decision task should have been scheduled — the signal must not override DelayStart
683+
s.Nil(descResp.PendingDecision, "Signal should not schedule a decision task before DelayStart expires")
684+
}
685+
620686
func RunSequentialWorkflow(
621687
s *IntegrationSuite,
622688
workflowID string,

service/history/engine/engineimpl/describe_mutable_state_test.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
package engineimpl
2424

2525
import (
26+
"context"
2627
"encoding/json"
2728
"testing"
2829

@@ -72,7 +73,7 @@ func TestDescribeMutableState_Success(t *testing.T) {
7273
executionCache: cacheMock,
7374
}
7475

75-
resp, err := engine.DescribeMutableState(nil, &types.DescribeMutableStateRequest{
76+
resp, err := engine.DescribeMutableState(context.TODO(), &types.DescribeMutableStateRequest{
7677
DomainUUID: constants.TestDomainID,
7778
Execution: getExpectedWFExecution(),
7879
})
@@ -94,7 +95,7 @@ func TestDescribeMutableState_Success(t *testing.T) {
9495
func TestDescribeMutableState_Error_UnknownDomain(t *testing.T) {
9596
engine := &historyEngineImpl{}
9697

97-
_, err := engine.DescribeMutableState(nil, &types.DescribeMutableStateRequest{
98+
_, err := engine.DescribeMutableState(context.TODO(), &types.DescribeMutableStateRequest{
9899
DomainUUID: "This is not a uuid",
99100
})
100101
assert.Error(t, err)
@@ -114,7 +115,7 @@ func TestDescribeMutableState_Error_GetAndCreateError(t *testing.T) {
114115
executionCache: cacheMock,
115116
}
116117

117-
_, err := engine.DescribeMutableState(nil, &types.DescribeMutableStateRequest{
118+
_, err := engine.DescribeMutableState(context.TODO(), &types.DescribeMutableStateRequest{
118119
DomainUUID: constants.TestDomainID,
119120
Execution: getExpectedWFExecution(),
120121
})
@@ -142,7 +143,7 @@ func TestDescribeMutableState_Error_LoadFromDBError(t *testing.T) {
142143
executionCache: cacheMock,
143144
}
144145

145-
_, err := engine.DescribeMutableState(nil, &types.DescribeMutableStateRequest{
146+
_, err := engine.DescribeMutableState(context.TODO(), &types.DescribeMutableStateRequest{
146147
DomainUUID: constants.TestDomainID,
147148
Execution: getExpectedWFExecution(),
148149
})

service/history/engine/engineimpl/history_engine_test.go

Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5709,6 +5709,205 @@ func (s *engineSuite) TestSignalWorkflowExecution_WorkflowCompleted() {
57095709
s.EqualError(err, "workflow execution already completed")
57105710
}
57115711

5712+
func (s *engineSuite) TestSignalWorkflowExecution_DelayStart_NoDecisionScheduled() {
5713+
// 1. Setup Cluster Info
5714+
testActiveClusterInfo := &types.ActiveClusterInfo{
5715+
ActiveClusterName: constants.TestLocalDomainEntry.GetReplicationConfig().ActiveClusterName,
5716+
FailoverVersion: constants.TestLocalDomainEntry.GetFailoverVersion(),
5717+
}
5718+
s.mockShard.Resource.ActiveClusterMgr.EXPECT().GetActiveClusterInfoByWorkflow(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(testActiveClusterInfo, nil).AnyTimes()
5719+
5720+
// 2. Setup Request Data
5721+
we := types.WorkflowExecution{
5722+
WorkflowID: constants.TestWorkflowID,
5723+
RunID: constants.TestRunID,
5724+
}
5725+
tasklist := "testTaskList"
5726+
identity := "testIdentity"
5727+
signalName := "my signal name"
5728+
input := []byte("test input")
5729+
5730+
signalRequest := &types.HistorySignalWorkflowExecutionRequest{
5731+
DomainUUID: constants.TestDomainID,
5732+
SignalRequest: &types.SignalWorkflowExecutionRequest{
5733+
Domain: constants.TestDomainID,
5734+
WorkflowExecution: &we,
5735+
Identity: identity,
5736+
SignalName: signalName,
5737+
Input: input,
5738+
},
5739+
}
5740+
5741+
// 3. Build Mutable State - simulates DelayStart by NOT adding a decision task
5742+
msBuilder := execution.NewMutableStateBuilderWithEventV2(
5743+
s.mockHistoryEngine.shard,
5744+
testlogger.New(s.Suite.T()),
5745+
we.GetRunID(),
5746+
constants.TestLocalDomainEntry,
5747+
)
5748+
// Only add start event, no decision task scheduled — simulates DelayStart waiting
5749+
test.AddWorkflowExecutionStartedEvent(msBuilder, we, "wType", tasklist, []byte("input"), 100, 200, identity, nil)
5750+
5751+
ms := execution.CreatePersistenceMutableState(s.T(), msBuilder)
5752+
ms.ExecutionInfo.DomainID = constants.TestDomainID
5753+
gwmsResponse := &persistence.GetWorkflowExecutionResponse{State: ms}
5754+
var updateReq *persistence.UpdateWorkflowExecutionRequest
5755+
5756+
// 4. Setup Mocks - capture the update request to verify DecisionScheduleID
5757+
s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything, mock.Anything).Return(gwmsResponse, nil).Once()
5758+
s.mockHistoryV2Mgr.On("AppendHistoryNodes", mock.Anything, mock.Anything).Return(&persistence.AppendHistoryNodesResponse{}, nil).Once()
5759+
s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything, mock.Anything).
5760+
Run(func(args mock.Arguments) {
5761+
req, _ := args.Get(1).(*persistence.UpdateWorkflowExecutionRequest)
5762+
updateReq = req
5763+
}).
5764+
Return(&persistence.UpdateWorkflowExecutionResponse{MutableStateUpdateSessionStats: &persistence.MutableStateUpdateSessionStats{}}, nil).
5765+
Once()
5766+
5767+
// 5. Run the Signal Call
5768+
err := s.mockHistoryEngine.SignalWorkflowExecution(context.Background(), signalRequest)
5769+
5770+
// 6. Assertions
5771+
s.Nil(err)
5772+
s.NotNil(updateReq)
5773+
s.NotNil(updateReq.UpdateWorkflowMutation)
5774+
s.NotNil(updateReq.UpdateWorkflowMutation.ExecutionInfo)
5775+
5776+
// Verify that NO decision was scheduled (ID is empty) because workflow hasn't processed first decision
5777+
s.Equal(commonconstants.EmptyEventID, updateReq.UpdateWorkflowMutation.ExecutionInfo.DecisionScheduleID)
5778+
}
5779+
5780+
func (s *engineSuite) TestSignalWorkflowExecution_AfterFirstDecision_DecisionScheduled() {
5781+
// Test that signals DO schedule a decision when workflow has already processed its first decision
5782+
testActiveClusterInfo := &types.ActiveClusterInfo{
5783+
ActiveClusterName: constants.TestLocalDomainEntry.GetReplicationConfig().ActiveClusterName,
5784+
FailoverVersion: constants.TestLocalDomainEntry.GetFailoverVersion(),
5785+
}
5786+
s.mockShard.Resource.ActiveClusterMgr.EXPECT().GetActiveClusterInfoByWorkflow(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(testActiveClusterInfo, nil).AnyTimes()
5787+
5788+
we := types.WorkflowExecution{
5789+
WorkflowID: constants.TestWorkflowID,
5790+
RunID: constants.TestRunID,
5791+
}
5792+
tasklist := "testTaskList"
5793+
identity := "testIdentity"
5794+
signalName := "my signal name"
5795+
input := []byte("test input")
5796+
5797+
signalRequest := &types.HistorySignalWorkflowExecutionRequest{
5798+
DomainUUID: constants.TestDomainID,
5799+
SignalRequest: &types.SignalWorkflowExecutionRequest{
5800+
Domain: constants.TestDomainID,
5801+
WorkflowExecution: &we,
5802+
Identity: identity,
5803+
SignalName: signalName,
5804+
Input: input,
5805+
},
5806+
}
5807+
5808+
// Build mutable state with a pending decision (proves HasProcessedOrPendingDecision returns true)
5809+
msBuilder := execution.NewMutableStateBuilderWithEventV2(
5810+
s.mockHistoryEngine.shard,
5811+
testlogger.New(s.Suite.T()),
5812+
we.GetRunID(),
5813+
constants.TestLocalDomainEntry,
5814+
)
5815+
test.AddWorkflowExecutionStartedEvent(msBuilder, we, "wType", tasklist, []byte("input"), 100, 200, identity, nil)
5816+
test.AddDecisionTaskScheduledEvent(msBuilder)
5817+
5818+
ms := execution.CreatePersistenceMutableState(s.T(), msBuilder)
5819+
ms.ExecutionInfo.DomainID = constants.TestDomainID
5820+
gwmsResponse := &persistence.GetWorkflowExecutionResponse{State: ms}
5821+
var updateReq *persistence.UpdateWorkflowExecutionRequest
5822+
5823+
s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything, mock.Anything).Return(gwmsResponse, nil).Once()
5824+
s.mockHistoryV2Mgr.On("AppendHistoryNodes", mock.Anything, mock.Anything).Return(&persistence.AppendHistoryNodesResponse{}, nil).Once()
5825+
s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything, mock.Anything).
5826+
Run(func(args mock.Arguments) {
5827+
req, _ := args.Get(1).(*persistence.UpdateWorkflowExecutionRequest)
5828+
updateReq = req
5829+
}).
5830+
Return(&persistence.UpdateWorkflowExecutionResponse{MutableStateUpdateSessionStats: &persistence.MutableStateUpdateSessionStats{}}, nil).
5831+
Once()
5832+
5833+
err := s.mockHistoryEngine.SignalWorkflowExecution(context.Background(), signalRequest)
5834+
5835+
s.Nil(err)
5836+
s.NotNil(updateReq)
5837+
s.NotNil(updateReq.UpdateWorkflowMutation)
5838+
s.NotNil(updateReq.UpdateWorkflowMutation.ExecutionInfo)
5839+
5840+
// Verify that a decision WAS scheduled because workflow has processed first decision
5841+
s.NotEqual(commonconstants.EmptyEventID, updateReq.UpdateWorkflowMutation.ExecutionInfo.DecisionScheduleID)
5842+
}
5843+
5844+
func (s *engineSuite) TestSignalWithStartWorkflowExecution_DelayStart_NoDecisionScheduled() {
5845+
// Test SignalWithStart on existing workflow waiting for DelayStart - should NOT schedule decision
5846+
testActiveClusterInfo := &types.ActiveClusterInfo{
5847+
ActiveClusterName: constants.TestLocalDomainEntry.GetReplicationConfig().ActiveClusterName,
5848+
FailoverVersion: constants.TestLocalDomainEntry.GetFailoverVersion(),
5849+
}
5850+
s.mockShard.Resource.ActiveClusterMgr.EXPECT().GetActiveClusterInfoByWorkflow(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(testActiveClusterInfo, nil).AnyTimes()
5851+
5852+
domainID := constants.TestDomainID
5853+
workflowID := "wId"
5854+
runID := constants.TestRunID
5855+
identity := "testIdentity"
5856+
signalName := "my signal name"
5857+
input := []byte("test input")
5858+
5859+
sRequest := &types.HistorySignalWithStartWorkflowExecutionRequest{
5860+
DomainUUID: domainID,
5861+
SignalWithStartRequest: &types.SignalWithStartWorkflowExecutionRequest{
5862+
Domain: domainID,
5863+
WorkflowID: workflowID,
5864+
Identity: identity,
5865+
SignalName: signalName,
5866+
Input: input,
5867+
},
5868+
}
5869+
5870+
// Build mutable state simulating DelayStart - no decision task
5871+
msBuilder := execution.NewMutableStateBuilderWithEventV2(
5872+
s.mockHistoryEngine.shard,
5873+
testlogger.New(s.Suite.T()),
5874+
runID,
5875+
constants.TestLocalDomainEntry,
5876+
)
5877+
we := types.WorkflowExecution{
5878+
WorkflowID: workflowID,
5879+
RunID: runID,
5880+
}
5881+
// Add workflow started event but no decision task
5882+
test.AddWorkflowExecutionStartedEvent(msBuilder, we, "wType", "testTaskList", []byte("input"), 100, 200, identity, nil)
5883+
ms := execution.CreatePersistenceMutableState(s.T(), msBuilder)
5884+
gwmsResponse := &persistence.GetWorkflowExecutionResponse{State: ms}
5885+
gceResponse := &persistence.GetCurrentExecutionResponse{RunID: runID}
5886+
var updateReq *persistence.UpdateWorkflowExecutionRequest
5887+
5888+
s.mockExecutionMgr.On("GetCurrentExecution", mock.Anything, mock.Anything).Return(gceResponse, nil).Once()
5889+
s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything, mock.Anything).Return(gwmsResponse, nil).Once()
5890+
s.mockHistoryV2Mgr.On("AppendHistoryNodes", mock.Anything, mock.Anything).Return(&persistence.AppendHistoryNodesResponse{}, nil).Once()
5891+
s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything, mock.Anything).
5892+
Run(func(args mock.Arguments) {
5893+
req, _ := args.Get(1).(*persistence.UpdateWorkflowExecutionRequest)
5894+
updateReq = req
5895+
}).
5896+
Return(&persistence.UpdateWorkflowExecutionResponse{MutableStateUpdateSessionStats: &persistence.MutableStateUpdateSessionStats{}}, nil).
5897+
Once()
5898+
5899+
resp, err := s.mockHistoryEngine.SignalWithStartWorkflowExecution(context.Background(), sRequest)
5900+
5901+
s.Nil(err)
5902+
s.Equal(runID, resp.GetRunID())
5903+
s.NotNil(updateReq)
5904+
s.NotNil(updateReq.UpdateWorkflowMutation)
5905+
s.NotNil(updateReq.UpdateWorkflowMutation.ExecutionInfo)
5906+
5907+
// Verify that NO decision was scheduled because workflow hasn't processed first decision
5908+
s.Equal(commonconstants.EmptyEventID, updateReq.UpdateWorkflowMutation.ExecutionInfo.DecisionScheduleID)
5909+
}
5910+
57125911
func (s *engineSuite) TestRemoveSignalMutableState() {
57135912
testActiveClusterInfo := &types.ActiveClusterInfo{
57145913
ActiveClusterName: s.mockHistoryEngine.clusterMetadata.GetCurrentClusterName(),

service/history/engine/engineimpl/signal_workflow_execution.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,7 @@ func (e *historyEngineImpl) SignalWorkflowExecution(
8686

8787
executionInfo := mutableState.GetExecutionInfo()
8888
createDecisionTask := true
89-
// Do not create decision task when the workflow is cron and the cron has not been started yet
90-
if mutableState.GetExecutionInfo().CronSchedule != "" && !mutableState.HasProcessedOrPendingDecision() {
89+
if !mutableState.HasProcessedOrPendingDecision() {
9190
createDecisionTask = false
9291
}
9392

service/history/engine/engineimpl/start_workflow_execution.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ func (e *historyEngineImpl) startWorkflowHelper(
8484
startRequest *types.HistoryStartWorkflowExecutionRequest,
8585
domainEntry *cache.DomainCacheEntry,
8686
metricsScope metrics.ScopeIdx,
87-
signalWithStartArg *signalWithStartArg,
87+
sigWithStartArg *signalWithStartArg,
8888
) (_ *types.StartWorkflowExecutionResponse, _ *types.WorkflowExecution, _ *events.PersistedBlob, retError error) {
8989
if domainEntry.GetInfo().Status != persistence.DomainStatusRegistered {
9090
return nil, nil, nil, errDomainDeprecated
@@ -131,10 +131,10 @@ func (e *historyEngineImpl) startWorkflowHelper(
131131
// preprocess for signalWithStart
132132
var prevMutableState execution.MutableState
133133
var signalWithStartRequest *types.HistorySignalWithStartWorkflowExecutionRequest
134-
isSignalWithStart := signalWithStartArg != nil
134+
isSignalWithStart := sigWithStartArg != nil
135135
if isSignalWithStart {
136-
prevMutableState = signalWithStartArg.prevMutableState
137-
signalWithStartRequest = signalWithStartArg.signalWithStartRequest
136+
prevMutableState = sigWithStartArg.prevMutableState
137+
signalWithStartRequest = sigWithStartArg.signalWithStartRequest
138138
}
139139
if prevMutableState != nil {
140140
prevLastWriteVersion, err := prevMutableState.GetLastWriteVersion()
@@ -569,7 +569,9 @@ func (e *historyEngineImpl) SignalWithStartWorkflowExecution(
569569
}
570570

571571
// Create a transfer task to schedule a decision task
572-
if !mutableState.HasPendingDecision() {
572+
// Do not schedule if the workflow hasn't processed its first decision yet
573+
// (e.g. waiting for DelayStart or Cron timer)
574+
if !mutableState.HasPendingDecision() && mutableState.HasProcessedOrPendingDecision() {
573575
_, err := mutableState.AddDecisionTaskScheduledEvent(false)
574576
if err != nil {
575577
return nil, &types.InternalServiceError{Message: "Failed to add decision scheduled event."}

0 commit comments

Comments
 (0)