Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ type executor struct {
jobsOutCompleted chan uuid.UUID
// used to request jobs from the scheduler
jobOutRequest chan jobOutRequest
// used to request jobs from the scheduler
jobOutUpdateLockRequest chan jobOutUpdateLockRequest

// sends out job needs to update the next runs
jobUpdateNextRuns chan uuid.UUID
Expand Down Expand Up @@ -392,7 +394,14 @@ func (e *executor) runJob(j internalJob, jIn jobIn) {
e.sendOutForNextRunUpdate(&jIn)
return
}
defer func() { _ = lock.Unlock(j.ctx) }()
e.jobOutUpdateLockRequest <- jobOutUpdateLockRequest{
id: j.id,
lock: lock,
}

defer func() {
_ = lock.Unlock(j.ctx)
}()
} else if !j.disabledLocker && e.locker != nil {
lock, err := e.locker.Lock(j.ctx, j.name)
if err != nil {
Expand All @@ -402,7 +411,14 @@ func (e *executor) runJob(j internalJob, jIn jobIn) {
e.sendOutForNextRunUpdate(&jIn)
return
}
defer func() { _ = lock.Unlock(j.ctx) }()
e.jobOutUpdateLockRequest <- jobOutUpdateLockRequest{
id: j.id,
lock: lock,
}

defer func() {
_ = lock.Unlock(j.ctx)
}()
}

_ = callJobFuncWithParams(j.beforeJobRuns, j.id, j.name)
Expand Down
8 changes: 8 additions & 0 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type internalJob struct {
nextScheduled []time.Time

lastRun time.Time
lastLock Lock
function any
parameters []any
timer clockwork.Timer
Expand Down Expand Up @@ -1124,6 +1125,7 @@ type Job interface {
RunNow() error
// Tags returns the job's string tags.
Tags() []string
Lock() Lock
}

var _ Job = (*job)(nil)
Expand Down Expand Up @@ -1224,3 +1226,9 @@ func (j job) RunNow() error {
}
return err
}

func (j job) Lock() Lock {
ij := requestJob(j.id, j.jobOutRequest)

return ij.lastLock
}
17 changes: 17 additions & 0 deletions mocks/job.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 9 additions & 7 deletions mocks/scheduler.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 22 additions & 6 deletions scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ type jobOutRequest struct {
outChan chan internalJob
}

type jobOutUpdateLockRequest struct {
id uuid.UUID
lock Lock
}

type runJobRequest struct {
id uuid.UUID
outChan chan error
Expand All @@ -136,12 +141,13 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) {
logger: &noOpLogger{},
clock: clockwork.NewRealClock(),

jobsIn: make(chan jobIn),
jobsOutForRescheduling: make(chan uuid.UUID),
jobUpdateNextRuns: make(chan uuid.UUID),
jobsOutCompleted: make(chan uuid.UUID),
jobOutRequest: make(chan jobOutRequest, 1000),
done: make(chan error, 1),
jobsIn: make(chan jobIn),
jobsOutForRescheduling: make(chan uuid.UUID),
jobUpdateNextRuns: make(chan uuid.UUID),
jobsOutCompleted: make(chan uuid.UUID),
jobOutRequest: make(chan jobOutRequest, 1000),
jobOutUpdateLockRequest: make(chan jobOutUpdateLockRequest),
done: make(chan error, 1),
}

s := &scheduler{
Expand Down Expand Up @@ -197,6 +203,9 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) {
case out := <-s.jobOutRequestCh:
s.selectJobOutRequest(out)

case out := <-s.exec.jobOutUpdateLockRequest:
s.jobOutUpdateLockRequest(out)

case out := <-s.allJobsOutRequest:
s.selectAllJobsOutRequest(out)

Expand Down Expand Up @@ -471,6 +480,13 @@ func (s *scheduler) selectJobOutRequest(out jobOutRequest) {
close(out.outChan)
}

func (s *scheduler) jobOutUpdateLockRequest(out jobOutUpdateLockRequest) {
if j, ok := s.jobs[out.id]; ok {
j.lastLock = out.lock
s.jobs[out.id] = j
}
}

func (s *scheduler) selectNewJob(in newJobIn) {
j := in.job
if s.started {
Expand Down
42 changes: 42 additions & 0 deletions scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2828,3 +2828,45 @@ func TestScheduler_WithMonitor(t *testing.T) {
})
}
}

func TestJob_Lock(t *testing.T) {
locker := &testLocker{
notLocked: make(chan struct{}, 1),
}

s := newTestScheduler(t,
WithDistributedLocker(locker),
)

jobRan := make(chan struct{})
j, err := s.NewJob(
DurationJob(time.Millisecond*100),
NewTask(func() {
time.Sleep(50 * time.Millisecond)
jobRan <- struct{}{}
}),
)
require.NoError(t, err)

s.Start()
defer func(s Scheduler) {
err = s.Shutdown()
if err != nil {
require.NoError(t, err)
}
}(s)

select {
case <-jobRan:
// Job has run
case <-time.After(200 * time.Millisecond):
t.Fatal("Job did not run in time")
}

require.Eventually(t, func() bool {
return locker.jobLocked
}, 200*time.Millisecond, 100*time.Millisecond, "Job should be locked")

lock := j.Lock()
assert.NotNil(t, lock, "Job Lock() should return a non-nil Locker")
}