Skip to content

Commit f56251c

Browse files
JoshVanLartursouza
authored andcommitted
Fix Delete race and prioritize write ops over tirgger execution (#31)
* Fix Delete race and prioritize write ops over tirgger execution Fixes a race condition whereby a Delete operation on a job which was mid execution on the same scheduler instance, would not see that job as not deleted from the in-memory. Adds fix for this by tracking names of active jobs, and backing out of an execution if it is no longer in that cache (deleted). Using RWMutex, priorities Delete/Write operations over job trigger execution when on local node. Signed-off-by: joshvanl <[email protected]> * Update dapr/kit go.mod Signed-off-by: joshvanl <[email protected]> --------- Signed-off-by: joshvanl <[email protected]>
1 parent 30016dc commit f56251c

File tree

6 files changed

+83
-16
lines changed

6 files changed

+83
-16
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,4 +72,4 @@ GEN_PROTOS:=$(foreach ITEM,$(PROTOS),gen-proto-$(ITEM))
7272
gen-proto: check-proto-version $(GEN_PROTOS) modtidy
7373

7474
test:
75-
go test -timeout 300s --race ./...
75+
go test -count 1 -timeout 300s --race ./...

api.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,11 +95,17 @@ func (c *cron) Delete(ctx context.Context, name string) error {
9595
return err
9696
}
9797

98-
if _, err := c.client.Delete(ctx, c.key.JobKey(name)); err != nil {
98+
jobKey := c.key.JobKey(name)
99+
100+
c.queueLock.Lock(jobKey)
101+
defer c.queueLock.Unlock(jobKey)
102+
103+
if _, err := c.client.Delete(ctx, jobKey); err != nil {
99104
return err
100105
}
101106

102-
return nil
107+
c.queueCache.Delete(jobKey)
108+
return c.queue.Dequeue(jobKey)
103109
}
104110

105111
// validateName validates the name of a job.

cron.go

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,8 @@ type cron struct {
107107
wg sync.WaitGroup
108108
// queueLock prevents an informed schedule from overwriting a job as it is
109109
// being triggered, i.e. prevent a PUT and mid-trigger race condition.
110-
queueLock concurrency.MutexMap[string]
110+
queueLock concurrency.MutexMap[string]
111+
queueCache *sync.Map
111112
}
112113

113114
// New creates a new cron instance.
@@ -179,6 +180,7 @@ func New(opts Options) (Interface, error) {
179180
closeCh: make(chan struct{}),
180181
errCh: make(chan error),
181182
queueLock: concurrency.NewMutexMap[string](),
183+
queueCache: new(sync.Map),
182184
}, nil
183185
}
184186

@@ -191,19 +193,21 @@ func (c *cron) Run(ctx context.Context) error {
191193

192194
c.queue = queue.NewProcessor[string, *counter.Counter](
193195
func(counter *counter.Counter) {
194-
c.queueLock.Lock(counter.Key())
195-
if ctx.Err() != nil {
196-
c.queueLock.Unlock(counter.Key())
196+
c.queueLock.RLock(counter.Key())
197+
_, ok := c.queueCache.Load(counter.Key())
198+
if !ok || ctx.Err() != nil {
199+
c.queueLock.DeleteRUnlock(counter.Key())
197200
return
198201
}
199202

200203
c.wg.Add(1)
201204
go func() {
202205
defer c.wg.Done()
203206
if c.handleTrigger(ctx, counter) {
204-
c.queueLock.Unlock(counter.Key())
207+
c.queueLock.RUnlock(counter.Key())
205208
} else {
206-
c.queueLock.DeleteUnlock(counter.Key())
209+
c.queueCache.Delete(counter.Key())
210+
c.queueLock.DeleteRUnlock(counter.Key())
207211
}
208212
}()
209213
},
@@ -313,6 +317,7 @@ func (c *cron) handleInformerEvent(ctx context.Context, e *informer.Event) error
313317
}
314318

315319
defer c.queueLock.DeleteUnlock(string(e.Key))
320+
c.queueCache.Delete(string(e.Key))
316321
return c.queue.Dequeue(string(e.Key))
317322
}
318323

@@ -342,5 +347,6 @@ func (c *cron) schedule(ctx context.Context, name string, job *api.JobStored) er
342347
default:
343348
}
344349

350+
c.queueCache.Store(counter.Key(), nil)
345351
return c.queue.Enqueue(counter)
346352
}

cron_test.go

Lines changed: 59 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ func Test_repeat(t *testing.T) {
170170
helper := testCron(t, 1)
171171

172172
job := &api.Job{
173-
Schedule: ptr.Of("@every 1s"),
173+
Schedule: ptr.Of("@every 10ms"),
174174
Repeats: ptr.Of(uint32(3)),
175175
}
176176

@@ -523,7 +523,7 @@ func Test_parallel(t *testing.T) {
523523
var done atomic.Int32
524524
helper := testCronWithOptions(t, testCronOptions{
525525
total: total,
526-
triggerFn: func() {
526+
triggerFn: func(*api.TriggerRequest) {
527527
waiting.Add(1)
528528
<-releaseCh
529529
done.Add(1)
@@ -547,11 +547,66 @@ func Test_parallel(t *testing.T) {
547547
}
548548
}
549549

550+
func Test_DeleteRace(t *testing.T) {
551+
t.Parallel()
552+
553+
triggered := make([]atomic.Int64, 20)
554+
helper := testCronWithOptions(t, testCronOptions{
555+
total: 1,
556+
triggerFn: func(req *api.TriggerRequest) {
557+
i, err := strconv.Atoi(req.Name)
558+
require.NoError(t, err)
559+
triggered[i].Add(1)
560+
},
561+
})
562+
563+
jobNames := make([]string, 20)
564+
for i := range jobNames {
565+
jobNames[i] = strconv.Itoa(i)
566+
require.NoError(t, helper.cron.Add(helper.ctx, jobNames[i], &api.Job{
567+
Schedule: ptr.Of("@every 1s"),
568+
}))
569+
}
570+
571+
cron := helper.cron.(*cron)
572+
573+
for i, name := range jobNames {
574+
i := i
575+
name := name
576+
keyName := "abc/jobs/" + name
577+
t.Run(name, func(t *testing.T) {
578+
t.Parallel()
579+
580+
assert.EventuallyWithT(t, func(c *assert.CollectT) {
581+
_, ok := cron.queueCache.Load(keyName)
582+
assert.True(c, ok)
583+
}, 5*time.Second, time.Millisecond)
584+
585+
assert.EventuallyWithT(t, func(c *assert.CollectT) {
586+
assert.GreaterOrEqual(c, triggered[i].Load(), int64(1))
587+
}, 5*time.Second, time.Millisecond)
588+
589+
_, ok := cron.queueCache.Load(keyName)
590+
assert.True(t, ok)
591+
require.NoError(t, helper.cron.Delete(helper.ctx, name))
592+
593+
_, ok = cron.queueCache.Load(keyName)
594+
assert.False(t, ok)
595+
596+
currentTriggered := triggered[i].Load()
597+
time.Sleep(time.Second * 2)
598+
assert.Equal(t, currentTriggered, triggered[i].Load())
599+
_, ok = cron.queueCache.Load(keyName)
600+
assert.False(t, ok)
601+
})
602+
}
603+
}
604+
550605
type testCronOptions struct {
551606
total uint32
552607
returnOk *atomic.Bool
553608
gotCh chan *api.TriggerRequest
554-
triggerFn func()
609+
triggerFn func(*api.TriggerRequest)
555610
}
556611

557612
type helper struct {
@@ -592,7 +647,7 @@ func testCronWithOptions(t *testing.T, opts testCronOptions) *helper {
592647
opts.gotCh <- req
593648
}
594649
if opts.triggerFn != nil {
595-
opts.triggerFn()
650+
opts.triggerFn(req)
596651
}
597652
return ok
598653
},

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ module github.com/diagridio/go-etcd-cron
33
go 1.22.5
44

55
require (
6-
github.com/dapr/kit v0.13.1-0.20240722163453-58c6d9df14d3
6+
github.com/dapr/kit v0.13.1-0.20240724000121-26b564d9d0f5
77
github.com/go-logr/logr v1.3.0
88
github.com/go-logr/zapr v1.3.0
99
github.com/stretchr/testify v1.9.0

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@ github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmf
3333
github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
3434
github.com/coreos/go-systemd/v22 v22.3.2 h1:D9/bQk5vlXQFZ6Kwuu6zaiXJ9oTPe68++AzAJc1DzSI=
3535
github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
36-
github.com/dapr/kit v0.13.1-0.20240722163453-58c6d9df14d3 h1:+HZd67sGtxQq7UoEXpby0x0pE0XcZwTY03UuZc48P/M=
37-
github.com/dapr/kit v0.13.1-0.20240722163453-58c6d9df14d3/go.mod h1:Hz1W2LmWfA4UX/12MdA+brsf+np6f/1dJt6C6F63cjI=
36+
github.com/dapr/kit v0.13.1-0.20240724000121-26b564d9d0f5 h1:FQKdGOG6Zi3gBhtnxPQmrd8QFLs8e6JTGkts+aaXba0=
37+
github.com/dapr/kit v0.13.1-0.20240724000121-26b564d9d0f5/go.mod h1:Hz1W2LmWfA4UX/12MdA+brsf+np6f/1dJt6C6F63cjI=
3838
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
3939
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
4040
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=

0 commit comments

Comments
 (0)