Skip to content

Commit 30016dc

Browse files
artursouzaJoshVanL
andauthored
Queue lock by key (by Josh) (#30)
* Lock by key when executing cron queue Use a per job key mutex to lock execution of a trigger. A per key lock prevents writes of new jobs from being blocked by actively executing triggers. Signed-off-by: joshvanl <[email protected]> * Update go.mod Signed-off-by: joshvanl <[email protected]> * Lint Signed-off-by: joshvanl <[email protected]> * Lint Signed-off-by: joshvanl <[email protected]> * Fix go.mod + update dapr/kit Signed-off-by: Artur Souza <[email protected]> * Fix .golangci.yaml to match pinned lint version. Signed-off-by: Artur Souza <[email protected]> * Pin go version without using toolchain. Signed-off-by: Artur Souza <[email protected]> * Fix a bunch of lint after go update. Signed-off-by: Artur Souza <[email protected]> --------- Signed-off-by: joshvanl <[email protected]> Signed-off-by: Artur Souza <[email protected]> Co-authored-by: joshvanl <[email protected]>
1 parent a46af5f commit 30016dc

File tree

16 files changed

+313
-395
lines changed

16 files changed

+313
-395
lines changed

.golangci.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ linters:
1919
- gci
2020
- funlen
2121
- maintidx
22+
- containedctx
2223
linters-settings:
2324
goimports:
2425
local-prefixes: github.com/diagridio

api.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ func (c *cron) Delete(ctx context.Context, name string) error {
9999
return err
100100
}
101101

102-
return c.queue.Dequeue(c.key.JobKey(name))
102+
return nil
103103
}
104104

105105
// validateName validates the name of a job.

api_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -495,7 +495,8 @@ func Test_validateName(t *testing.T) {
495495
}
496496

497497
for _, test := range tests {
498-
test := test
498+
name := test.name
499+
expErr := test.expErr
499500
t.Run(test.name, func(t *testing.T) {
500501
t.Parallel()
501502
c, err := New(Options{
@@ -506,8 +507,8 @@ func Test_validateName(t *testing.T) {
506507
TriggerFn: func(context.Context, *api.TriggerRequest) bool { return true },
507508
})
508509
require.NoError(t, err)
509-
err = c.(*cron).validateName(test.name)
510-
assert.Equal(t, test.expErr, err != nil, "%v", err)
510+
err = c.(*cron).validateName(name)
511+
assert.Equal(t, expErr, err != nil, "%v", err)
511512
})
512513
}
513514
}

cron.go

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ type cron struct {
107107
wg sync.WaitGroup
108108
// queueLock prevents an informed schedule from overwriting a job as it is
109109
// being triggered, i.e. prevent a PUT and mid-trigger race condition.
110-
queueLock sync.RWMutex
110+
queueLock concurrency.MutexMap[string]
111111
}
112112

113113
// New creates a new cron instance.
@@ -178,6 +178,7 @@ func New(opts Options) (Interface, error) {
178178
readyCh: make(chan struct{}),
179179
closeCh: make(chan struct{}),
180180
errCh: make(chan error),
181+
queueLock: concurrency.NewMutexMap[string](),
181182
}, nil
182183
}
183184

@@ -190,12 +191,20 @@ func (c *cron) Run(ctx context.Context) error {
190191

191192
c.queue = queue.NewProcessor[string, *counter.Counter](
192193
func(counter *counter.Counter) {
193-
c.queueLock.RLock()
194+
c.queueLock.Lock(counter.Key())
195+
if ctx.Err() != nil {
196+
c.queueLock.Unlock(counter.Key())
197+
return
198+
}
199+
194200
c.wg.Add(1)
195201
go func() {
196-
defer c.queueLock.RUnlock()
197202
defer c.wg.Done()
198-
c.handleTrigger(ctx, counter)
203+
if c.handleTrigger(ctx, counter) {
204+
c.queueLock.Unlock(counter.Key())
205+
} else {
206+
c.queueLock.DeleteUnlock(counter.Key())
207+
}
199208
}()
200209
},
201210
).WithClock(c.clock)
@@ -259,7 +268,8 @@ func (c *cron) Run(ctx context.Context) error {
259268
}
260269

261270
// handleTrigger handles triggering a schedule job.
262-
func (c *cron) handleTrigger(ctx context.Context, counter *counter.Counter) {
271+
// Returns true if the job is being re-enqueued, false otherwise.
272+
func (c *cron) handleTrigger(ctx context.Context, counter *counter.Counter) bool {
263273
if !c.triggerFn(ctx, counter.TriggerRequest()) {
264274
// If the trigger function returns false, i.e. failed client side,
265275
// re-enqueue the job immediately.
@@ -269,7 +279,7 @@ func (c *cron) handleTrigger(ctx context.Context, counter *counter.Counter) {
269279
case c.errCh <- err:
270280
}
271281
}
272-
return
282+
return true
273283
}
274284

275285
ok, err := counter.Trigger(ctx)
@@ -283,29 +293,26 @@ func (c *cron) handleTrigger(ctx context.Context, counter *counter.Counter) {
283293
case c.errCh <- err:
284294
}
285295
}
296+
297+
return true
286298
}
299+
300+
return false
287301
}
288302

289303
// handleInformerEvent handles an etcd informed event.
290-
// TODO: @joshvanl: add a safe per key read lock to prevent locking all
291-
// triggers and an unrelated write. Must be able to handle a key being
292-
// de-queued and unlocked (deleted) whilst an Add schedule is waiting on the
293-
// lock, and visa versa. I don't think there is much if any we gain though as
294-
// we _always_ hack to lock somewhere..
295304
func (c *cron) handleInformerEvent(ctx context.Context, e *informer.Event) error {
296-
c.queueLock.Lock()
297-
defer c.queueLock.Unlock()
298-
299-
select {
300-
case <-ctx.Done():
305+
if ctx.Err() != nil {
301306
return ctx.Err()
302-
default:
303307
}
304308

309+
c.queueLock.Lock(string(e.Key))
305310
if e.IsPut {
311+
defer c.queueLock.Unlock(string(e.Key))
306312
return c.schedule(ctx, c.key.JobName(e.Key), e.Job)
307313
}
308314

315+
defer c.queueLock.DeleteUnlock(string(e.Key))
309316
return c.queue.Dequeue(string(e.Key))
310317
}
311318

0 commit comments

Comments
 (0)