@@ -3,8 +3,9 @@ package scheduler_test
33import (
44 "errors"
55 "testing"
6+ "time"
67
7- "github.com/stretchr/testify/suite "
8+ "github.com/stretchr/testify/require "
89 "go.temporal.io/server/chasm"
910 "go.temporal.io/server/chasm/lib/scheduler"
1011 "go.temporal.io/server/chasm/lib/scheduler/gen/schedulerpb/v1"
@@ -16,129 +17,139 @@ import (
1617 "google.golang.org/protobuf/types/known/timestamppb"
1718)
1819
19- type generatorTasksSuite struct {
20- schedulerSuite
21- executor * scheduler.GeneratorTaskExecutor
22- }
23-
24- func TestGeneratorTasksSuite (t * testing.T ) {
25- suite .Run (t , & generatorTasksSuite {})
26- }
27-
28- func (s * generatorTasksSuite ) SetupTest () {
29- s .schedulerSuite .SetupTest ()
30- s .executor = scheduler .NewGeneratorTaskExecutor (scheduler.GeneratorTaskExecutorOptions {
20+ func newGeneratorExecutor (env * testEnv ) * scheduler.GeneratorTaskExecutor {
21+ return scheduler .NewGeneratorTaskExecutor (scheduler.GeneratorTaskExecutorOptions {
3122 Config : defaultConfig (),
3223 MetricsHandler : metrics .NoopMetricsHandler ,
33- BaseLogger : s . logger ,
34- SpecProcessor : s . specProcessor ,
24+ BaseLogger : env . Logger ,
25+ SpecProcessor : env . SpecProcessor ,
3526 SpecBuilder : legacyscheduler .NewSpecBuilder (),
3627 })
3728}
3829
39- func (s * generatorTasksSuite ) TestExecute_ProcessTimeRangeFails () {
40- sched := s .scheduler
41- ctx := s .newMutableContext ()
30+ func TestGeneratorTask_Execute_ProcessTimeRangeFails (t * testing.T ) {
31+ // Create a custom mock spec processor that fails on ProcessTimeRange.
32+ ctrl := gomock .NewController (t )
33+ specProcessor := scheduler .NewMockSpecProcessor (ctrl )
34+ now := time .Now ()
4235
43- // If ProcessTimeRange fails, we should fail the task as an internal error.
44- s .specProcessor .EXPECT ().ProcessTimeRange (
36+ // First call during newTestEnv's CloseTransaction should succeed.
37+ specProcessor .EXPECT ().ProcessTimeRange (
38+ gomock .Any (), gomock .Any (), gomock .Any (), gomock .Any (), gomock .Any (), gomock .Any (), gomock .Any (), gomock .Any (),
39+ ).Return (& scheduler.ProcessedTimeRange {
40+ NextWakeupTime : now .Add (defaultInterval ),
41+ LastActionTime : now ,
42+ }, nil ).Times (1 )
43+
44+ // Second call during test should fail.
45+ specProcessor .EXPECT ().ProcessTimeRange (
4546 gomock .Any (), gomock .Any (), gomock .Any (), gomock .Any (), gomock .Any (), gomock .Any (), gomock .Any (), gomock .Any (),
4647 ).Return (nil , errors .New ("processTimeRange bug" ))
4748
48- // Execute the generate task.
49- generator := sched .Generator .Get (ctx )
50- err := s .executor .Execute (ctx , generator , chasm.TaskAttributes {}, & schedulerpb.GeneratorTask {})
49+ specProcessor .EXPECT ().NextTime (gomock .Any (), gomock .Any ()).Return (legacyscheduler.GetNextTimeResult {
50+ Next : now .Add (defaultInterval ),
51+ Nominal : now .Add (defaultInterval ),
52+ }, nil ).AnyTimes ()
53+
54+ env := newTestEnv (t , withSpecProcessor (specProcessor ))
55+ executor := newGeneratorExecutor (env )
56+
57+ ctx := env .MutableContext ()
58+ generator := env .Scheduler .Generator .Get (ctx )
59+
60+ // If ProcessTimeRange fails, we should fail the task as an internal error.
61+ err := executor .Execute (ctx , generator , chasm.TaskAttributes {}, & schedulerpb.GeneratorTask {})
5162 var target * queueerrors.UnprocessableTaskError
52- s .ErrorAs (err , & target )
53- s .Equal ("failed to process a time range: processTimeRange bug" , target .Message )
63+ require .ErrorAs (t , err , & target )
64+ require .Equal (t , "failed to process a time range: processTimeRange bug" , target .Message )
5465}
5566
56- func ( s * generatorTasksSuite ) TestExecuteBufferTask_Basic ( ) {
57- ctx := s . newMutableContext ( )
58- sched := s . scheduler
67+ func TestGeneratorTask_ExecuteBufferTask_Basic ( t * testing. T ) {
68+ env := newTestEnv ( t )
69+ executor := newGeneratorExecutor ( env )
5970
71+ ctx := env .MutableContext ()
72+ sched := env .Scheduler
6073 generator := sched .Generator .Get (ctx )
6174
62- // Use a real SpecProcessor implementation.
63- specProcessor := newTestSpecProcessor (s .controller )
64- s .executor .SpecProcessor = specProcessor
65-
6675 // Move high water mark back in time (Generator always compares high water mark
6776 // against system time) to generate buffered actions.
6877 highWatermark := ctx .Now (generator ).UTC ().Add (- defaultInterval * 5 )
6978 generator .LastProcessedTime = timestamppb .New (highWatermark )
7079
7180 // Execute the generate task.
72- err := s . executor .Execute (ctx , generator , chasm.TaskAttributes {}, & schedulerpb.GeneratorTask {})
73- s .NoError (err )
81+ err := executor .Execute (ctx , generator , chasm.TaskAttributes {}, & schedulerpb.GeneratorTask {})
82+ require .NoError (t , err )
7483
7584 // We expect 5 buffered starts.
7685 invoker := sched .Invoker .Get (ctx )
77- s . Equal ( 5 , len ( invoker .BufferedStarts ) )
86+ require . Len ( t , invoker .BufferedStarts , 5 )
7887
79- // Validate RequestId -> WorkflowId mapping
88+ // Validate RequestId -> WorkflowId mapping.
8089 for _ , start := range invoker .BufferedStarts {
81- s .Equal (start .WorkflowId , invoker .RunningWorkflowID (start .RequestId ))
90+ require .Equal (t , start .WorkflowId , invoker .RunningWorkflowID (start .RequestId ))
8291 }
8392
8493 // Generator's high water mark should have advanced.
8594 newHighWatermark := generator .LastProcessedTime .AsTime ()
86- s .True (newHighWatermark .After (highWatermark ))
95+ require .True (t , newHighWatermark .After (highWatermark ))
8796
8897 // Ensure we scheduled a physical side-effect task on the tree at immediate time.
8998 // The InvokerExecuteTask is a side-effect task that starts workflows.
9099 // The InvokerProcessBufferTask (pure) executes inline during CloseTransaction.
91- _ , err = s .node .CloseTransaction ()
92- s .NoError (err )
93- s .True (s .hasTask (& tasks.ChasmTask {}, chasm .TaskScheduledTimeImmediate ))
100+ require .NoError (t , env .CloseTransaction ())
101+ require .True (t , env .HasTask (& tasks.ChasmTask {}, chasm .TaskScheduledTimeImmediate ))
94102}
95103
96- func (s * generatorTasksSuite ) TestUpdateFutureActionTimes_UnlimitedActions () {
97- ctx := s .newMutableContext ()
98- sched := s .scheduler
99- generator := sched .Generator .Get (ctx )
104+ func TestGeneratorTask_UpdateFutureActionTimes_UnlimitedActions (t * testing.T ) {
105+ env := newTestEnv (t )
106+ executor := newGeneratorExecutor (env )
100107
101- s .executor .SpecProcessor = newTestSpecProcessor (s .controller )
108+ ctx := env .MutableContext ()
109+ generator := env .Scheduler .Generator .Get (ctx )
102110
103- err := s . executor .Execute (ctx , generator , chasm.TaskAttributes {}, & schedulerpb.GeneratorTask {})
104- s .NoError (err )
111+ err := executor .Execute (ctx , generator , chasm.TaskAttributes {}, & schedulerpb.GeneratorTask {})
112+ require .NoError (t , err )
105113
106- s .NotEmpty (generator .FutureActionTimes )
107- s . Require (). Len (generator .FutureActionTimes , 10 )
114+ require .NotEmpty (t , generator .FutureActionTimes )
115+ require . Len (t , generator .FutureActionTimes , 10 )
108116}
109117
110- func (s * generatorTasksSuite ) TestUpdateFutureActionTimes_LimitedActions () {
111- ctx := s .newMutableContext ()
112- sched := s .scheduler
118+ func TestGeneratorTask_UpdateFutureActionTimes_LimitedActions (t * testing.T ) {
119+ env := newTestEnv (t )
120+ executor := newGeneratorExecutor (env )
121+
122+ ctx := env .MutableContext ()
123+ sched := env .Scheduler
113124 generator := sched .Generator .Get (ctx )
114125
115126 sched .Schedule .State .LimitedActions = true
116127 sched .Schedule .State .RemainingActions = 2
117- s .executor .SpecProcessor = newTestSpecProcessor (s .controller )
118128
119- err := s . executor .Execute (ctx , generator , chasm.TaskAttributes {}, & schedulerpb.GeneratorTask {})
120- s .NoError (err )
129+ err := executor .Execute (ctx , generator , chasm.TaskAttributes {}, & schedulerpb.GeneratorTask {})
130+ require .NoError (t , err )
121131
122- s .Len (generator .FutureActionTimes , 2 )
132+ require .Len (t , generator .FutureActionTimes , 2 )
123133}
124134
125- func (s * generatorTasksSuite ) TestUpdateFutureActionTimes_SkipsBeforeUpdateTime () {
126- ctx := s .newMutableContext ()
127- sched := s .scheduler
128- generator := sched .Generator .Get (ctx )
135+ func TestGeneratorTask_UpdateFutureActionTimes_SkipsBeforeUpdateTime (t * testing.T ) {
136+ env := newTestEnv (t )
137+ executor := newGeneratorExecutor (env )
129138
130- s .executor .SpecProcessor = newTestSpecProcessor (s .controller )
139+ ctx := env .MutableContext ()
140+ sched := env .Scheduler
141+ generator := sched .Generator .Get (ctx )
131142
132143 // UpdateTime acts as a floor - action times at or before it are skipped.
133144 baseTime := ctx .Now (generator ).UTC ()
134145 updateTime := baseTime .Add (defaultInterval / 2 )
135146 sched .Info .UpdateTime = timestamppb .New (updateTime )
136147
137- err := s . executor .Execute (ctx , generator , chasm.TaskAttributes {}, & schedulerpb.GeneratorTask {})
138- s .NoError (err )
148+ err := executor .Execute (ctx , generator , chasm.TaskAttributes {}, & schedulerpb.GeneratorTask {})
149+ require .NoError (t , err )
139150
140- s . Require (). NotEmpty (generator .FutureActionTimes )
151+ require . NotEmpty (t , generator .FutureActionTimes )
141152 for _ , futureTime := range generator .FutureActionTimes {
142- s .True (futureTime .AsTime ().After (updateTime ))
153+ require .True (t , futureTime .AsTime ().After (updateTime ))
143154 }
144155}
0 commit comments