Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
196 changes: 191 additions & 5 deletions job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,13 +629,11 @@
NewAtTime(0, 0, 0),
),
),
func(t *testing.T, iteration int, previousRun, nextRun time.Time) {

Check failure on line 632 in job_test.go

View workflow job for this annotation

GitHub Actions / lint and test (1.25)

unused-parameter: parameter 'iteration' seems to be unused, consider removing or renaming it as _ (revive)

Check failure on line 632 in job_test.go

View workflow job for this annotation

GitHub Actions / lint and test (1.24)

unused-parameter: parameter 'iteration' seems to be unused, consider removing or renaming it as _ (revive)
// With the fix for NextRun accuracy, the immediate run (Jan 1) is removed
// from nextScheduled after it completes. So all intervals should be 14 days
// (2 weeks as configured).
diff := time.Hour * 14 * 24
if iteration == 1 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lint is complaining that the iteration value passed in is unused now. Looks like it can be removed entirely from the assertion func

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching! Made quick fix here 0f55e2a based on feedback 🙏

// because the job is run immediately, the first run is on
// Saturday 1/1/2000. The following run is then on Tuesday 1/11/2000
diff = time.Hour * 10 * 24
}
assert.Equal(t, previousRun.Add(diff).Day(), nextRun.Day())
},
},
Expand Down Expand Up @@ -1197,3 +1195,191 @@
assert.Less(t, timeSinceStart.Seconds(), 1.0,
"First run should happen quickly with WithStartImmediately")
}

func TestJob_NextRun_MultipleJobsSimultaneously(t *testing.T) {
// This test reproduces the bug where multiple jobs completing simultaneously
// would cause NextRun() to return stale values due to race conditions in
// nextScheduled cleanup.

testTime := time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC)
fakeClock := clockwork.NewFakeClockAt(testTime)

s := newTestScheduler(t,
WithClock(fakeClock),
WithLocation(time.UTC),
)

jobsCompleted := make(chan struct{}, 4)

// Create multiple jobs with different intervals that will complete around the same time
job1, err := s.NewJob(
DurationJob(1*time.Minute),
NewTask(func() {
jobsCompleted <- struct{}{}
}),
WithName("job1"),
WithStartAt(WithStartImmediately()),
)
require.NoError(t, err)

job2, err := s.NewJob(
DurationJob(2*time.Minute),
NewTask(func() {
jobsCompleted <- struct{}{}
}),
WithName("job2"),
WithStartAt(WithStartImmediately()),
)
require.NoError(t, err)

job3, err := s.NewJob(
DurationJob(3*time.Minute),
NewTask(func() {
jobsCompleted <- struct{}{}
}),
WithName("job3"),
WithStartAt(WithStartImmediately()),
)
require.NoError(t, err)

job4, err := s.NewJob(
DurationJob(4*time.Minute),
NewTask(func() {
jobsCompleted <- struct{}{}
}),
WithName("job4"),
WithStartAt(WithStartImmediately()),
)
require.NoError(t, err)

s.Start()

// Wait for all 4 jobs to complete their immediate run
for i := 0; i < 4; i++ {
<-jobsCompleted
}

// Give the scheduler time to process the completions and reschedule
time.Sleep(50 * time.Millisecond)

// Verify that NextRun() returns the correct next scheduled time for each job
// and not a stale value from the just-completed run

nextRun1, err := job1.NextRun()
require.NoError(t, err)
assert.Equal(t, testTime.Add(1*time.Minute), nextRun1, "job1 NextRun should be 1 minute from start")

nextRun2, err := job2.NextRun()
require.NoError(t, err)
assert.Equal(t, testTime.Add(2*time.Minute), nextRun2, "job2 NextRun should be 2 minutes from start")

nextRun3, err := job3.NextRun()
require.NoError(t, err)
assert.Equal(t, testTime.Add(3*time.Minute), nextRun3, "job3 NextRun should be 3 minutes from start")

nextRun4, err := job4.NextRun()
require.NoError(t, err)
assert.Equal(t, testTime.Add(4*time.Minute), nextRun4, "job4 NextRun should be 4 minutes from start")

// Advance time to trigger job1's next run
fakeClock.Advance(1 * time.Minute)

// Wait for job1 to complete
<-jobsCompleted
time.Sleep(50 * time.Millisecond)

// After job1's second run, it should be scheduled for +2 minutes from start
nextRun1, err = job1.NextRun()
require.NoError(t, err)
assert.Equal(t, testTime.Add(2*time.Minute), nextRun1, "job1 NextRun should be 2 minutes from start after first interval")

require.NoError(t, s.Shutdown())
}

func TestJob_NextRun_ConcurrentCompletions(t *testing.T) {
// This test verifies that when multiple jobs complete at exactly the same time,
// their NextRun() values are correctly updated without race conditions.

testTime := time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC)
fakeClock := clockwork.NewFakeClockAt(testTime)

s := newTestScheduler(t,
WithClock(fakeClock),
WithLocation(time.UTC), // Set scheduler to use UTC to match our test time
)

var wg sync.WaitGroup
jobCompletionBarrier := make(chan struct{})

// Create jobs that will all complete at the same instant
createJob := func(name string, interval time.Duration) Job {
job, err := s.NewJob(
DurationJob(interval),
NewTask(func() {
wg.Done()
<-jobCompletionBarrier // Wait until all jobs are ready to complete
}),
WithName(name),
WithStartAt(WithStartImmediately()),
)
require.NoError(t, err)
return job
}

wg.Add(4)
job1 := createJob("concurrent-job1", 1*time.Minute)
job2 := createJob("concurrent-job2", 2*time.Minute)
job3 := createJob("concurrent-job3", 3*time.Minute)
job4 := createJob("concurrent-job4", 4*time.Minute)

s.Start()

wg.Wait()
close(jobCompletionBarrier)

// Give the scheduler time to process all completions
time.Sleep(100 * time.Millisecond)

// Verify NextRun() for all jobs concurrently to stress test the race condition
var testWg sync.WaitGroup
testWg.Add(4)

go func() {
defer testWg.Done()
for i := 0; i < 10; i++ {
nextRun, err := job1.NextRun()
require.NoError(t, err)
assert.Equal(t, testTime.Add(1*time.Minute), nextRun)
}
}()

go func() {
defer testWg.Done()
for i := 0; i < 10; i++ {
nextRun, err := job2.NextRun()
require.NoError(t, err)
assert.Equal(t, testTime.Add(2*time.Minute), nextRun)
}
}()

go func() {
defer testWg.Done()
for i := 0; i < 10; i++ {
nextRun, err := job3.NextRun()
require.NoError(t, err)
assert.Equal(t, testTime.Add(3*time.Minute), nextRun)
}
}()

go func() {
defer testWg.Done()
for i := 0; i < 10; i++ {
nextRun, err := job4.NextRun()
require.NoError(t, err)
assert.Equal(t, testTime.Add(4*time.Minute), nextRun)
}
}()

testWg.Wait()
require.NoError(t, s.Shutdown())
}
14 changes: 7 additions & 7 deletions scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,11 +443,11 @@ func (s *scheduler) updateNextScheduled(id uuid.UUID) {
return
}
var newNextScheduled []time.Time
now := s.now()
for _, t := range j.nextScheduled {
if t.Before(s.now()) {
continue
if t.After(now) { // Changed to match selectExecJobsOutCompleted
newNextScheduled = append(newNextScheduled, t)
}
newNextScheduled = append(newNextScheduled, t)
}
j.nextScheduled = newNextScheduled
s.jobs[id] = j
Expand All @@ -460,13 +460,13 @@ func (s *scheduler) selectExecJobsOutCompleted(id uuid.UUID) {
}

// if the job has nextScheduled time in the past,
// we need to remove any that are in the past.
// we need to remove any that are in the past or at the current time (just executed).
var newNextScheduled []time.Time
now := s.now()
for _, t := range j.nextScheduled {
if t.Before(s.now()) {
continue
if t.After(now) {
newNextScheduled = append(newNextScheduled, t)
}
newNextScheduled = append(newNextScheduled, t)
}
j.nextScheduled = newNextScheduled

Expand Down
Loading