@@ -13,6 +13,8 @@ import (
1313 "go.temporal.io/sdk/client"
1414 "go.temporal.io/sdk/temporal"
1515 "go.temporal.io/sdk/workflow"
16+ "go.temporal.io/server/common"
17+ "go.temporal.io/server/common/dynamicconfig"
1618 "go.temporal.io/server/common/payloads"
1719 "go.temporal.io/server/tests/testcore"
1820)
@@ -26,6 +28,17 @@ func TestMaxBufferedEventSuite(t *testing.T) {
2628 suite .Run (t , new (MaxBufferedEventSuite ))
2729}
2830
31+ func (s * MaxBufferedEventSuite ) SetupSuite () {
32+ dynamicConfigOverrides := map [dynamicconfig.Key ]any {
33+ // Set MaximumBufferedEventsSizeInBytes high so we don't hit that limit
34+ dynamicconfig .MaximumBufferedEventsSizeInBytes .Key (): 10 * 1024 * 1024 , // 10MB
35+ // Set MutableStateSizeLimitError low so buffered events exhaust mutable state size
36+ dynamicconfig .MutableStateSizeLimitWarn .Key (): 200 ,
37+ dynamicconfig .MutableStateSizeLimitError .Key (): 410 * 1024 , // 410KB
38+ }
39+ s .SetupSuiteWithCluster (testcore .WithDynamicConfigOverrides (dynamicConfigOverrides ))
40+ }
41+
2942func (s * MaxBufferedEventSuite ) TestMaxBufferedEventsLimit () {
3043 /*
3144 This test starts a workflow, and block its workflow task, then sending
@@ -118,10 +131,12 @@ func (s *MaxBufferedEventSuite) TestMaxBufferedEventsLimit() {
118131
119132func (s * MaxBufferedEventSuite ) TestBufferedEventsMutableStateSizeLimit () {
120133 /*
121- This test starts a workflow, and block its workflow task, then sending
122- signals to it which will be buffered. The default max mutable state
123- size is 2MB and each signal will have a 500KB payload, so when the 4th
124- signal is received, the workflow will be force terminated.
134+ This test starts a workflow, and blocks its workflow task, then sends
135+ signals to it which will be buffered. The test is configured with
136+ MaximumBufferedEventsSizeInBytes set to 10MB (high) and MutableStateSizeLimitError
137+ set to 410KB (low). Each signal has a 100KB payload. The first three signals
138+ succeed, and the fourth signal causes the mutable state size to exceed the limit,
139+ resulting in workflow termination.
125140 */
126141 closeStartChanOnce := sync.Once {}
127142 waitStartChan := make (chan struct {})
@@ -159,7 +174,7 @@ func (s *MaxBufferedEventSuite) TestBufferedEventsMutableStateSizeLimit() {
159174
160175 s .Worker ().RegisterWorkflow (workflowFn )
161176
162- testCtx , cancel := context .WithTimeout (context .Background (), time .Second * 20 )
177+ testCtx , cancel := context .WithTimeout (context .Background (), 40 * time .Second )
163178 defer cancel ()
164179
165180 wid := "test-max-buffered-events-limit"
@@ -174,40 +189,49 @@ func (s *MaxBufferedEventSuite) TestBufferedEventsMutableStateSizeLimit() {
174189 // block until workflow task started
175190 <- waitStartChan
176191
177- // now send 3 signals with 500KB payload each, all of them will be buffered
178- buf := make ([]byte , 500 * 1024 )
192+ // now send signals with 100KB payload each, which will be buffered
193+ buf := make ([]byte , 100 * 1024 ) // 100KB
179194 // fill the slice with random data to make sure the
180195 // encoder does not zero out the data
181196 _ , err := rand .Read (buf )
182197 s .NoError (err )
183198 largePayload := payloads .EncodeBytes (buf )
199+
200+ // Send signals until mutable state size limit is exceeded
201+ // With 410KB limit and 100KB payloads, the first 3 signals succeed but the 4th exceeds the limit
202+ // First three signals should succeed
184203 for i := 0 ; i < 3 ; i ++ {
185- err : = s .SdkClient ().SignalWorkflow (testCtx , wid , "" , "test-signal" , largePayload )
186- s .NoError (err )
204+ err = s .SdkClient ().SignalWorkflow (testCtx , wid , "" , "test-signal" , largePayload )
205+ s .NoError (err , "Signal %d should succeed" , i + 1 )
187206 }
188207
189- // send 4th signal, this will fail the started workflow task and force terminate the workflow
208+ // Fourth signal should fail due to mutable state size limit
190209 err = s .SdkClient ().SignalWorkflow (testCtx , wid , "" , "test-signal" , largePayload )
191- s .NoError (err )
210+ s .Error (err , "Fourth signal should fail due to mutable state size limit" )
211+ s .Contains (err .Error (), "mutable state size exceeds limit" , "Expected mutable state size limit error" )
192212
193213 // unblock goroutine that runs local activity
194214 close (waitSignalChan )
195215
196216 var sigCount int
197217 err = wf1 .Get (testCtx , & sigCount )
198- s . NoError ( err )
199- s .Equal ( 4 , sigCount )
218+ // The workflow should be terminated, so we expect an error
219+ s .Error ( err )
200220
201221 historyEvents := s .GetHistory (s .Namespace ().String (), & commonpb.WorkflowExecution {WorkflowId : wf1 .GetID ()})
202- // Not using historyrequire here because history is not deterministic.
203- var failedCause enumspb.WorkflowTaskFailedCause
204- var failedCount int
222+
223+ // Verify that the workflow was terminated due to mutable state size limit
224+ var terminated bool
225+ var terminationReason string
205226 for _ , evt := range historyEvents {
206- if evt .GetEventType () == enumspb .EVENT_TYPE_WORKFLOW_TASK_FAILED {
207- failedCause = evt .GetWorkflowTaskFailedEventAttributes ().Cause
208- failedCount ++
227+ if evt .GetEventType () == enumspb .EVENT_TYPE_WORKFLOW_EXECUTION_TERMINATED {
228+ terminated = true
229+ attrs := evt .GetWorkflowExecutionTerminatedEventAttributes ()
230+ terminationReason = attrs .GetReason ()
231+ break
209232 }
210233 }
211- s .Equal (enumspb .WORKFLOW_TASK_FAILED_CAUSE_FORCE_CLOSE_COMMAND , failedCause )
212- s .Equal (1 , failedCount )
234+ s .True (terminated , "Expected workflow to be terminated" )
235+ s .Equal (common .FailureReasonMutableStateSizeExceedsLimit , terminationReason ,
236+ "Expected workflow to be terminated due to mutable state size limit" )
213237}
0 commit comments