File tree Expand file tree Collapse file tree 3 files changed +11
-5
lines changed
internal-packages/run-engine/src/run-queue
packages/redis-worker/src Expand file tree Collapse file tree 3 files changed +11
-5
lines changed Original file line number Diff line number Diff line change @@ -212,7 +212,7 @@ export class RunQueue {
212
212
scanConcurrencySets : {
213
213
...workerCatalog . scanConcurrencySets ,
214
214
cron : options . concurrencySweeper ?. scanSchedule ?? workerCatalog . scanConcurrencySets . cron ,
215
- jitter :
215
+ jitterInMs :
216
216
options . concurrencySweeper ?. scanJitterInMs ??
217
217
workerCatalog . scanConcurrencySets . jitterInMs ,
218
218
} ,
Original file line number Diff line number Diff line change @@ -13,7 +13,7 @@ const testOptions = {
13
13
tracer : trace . getTracer ( "rq" ) ,
14
14
workers : 1 ,
15
15
defaultEnvConcurrency : 25 ,
16
- logger : new Logger ( "RunQueue" , "warn " ) ,
16
+ logger : new Logger ( "RunQueue" , "debug " ) ,
17
17
retryOptions : {
18
18
maxAttempts : 5 ,
19
19
factor : 1.1 ,
@@ -59,6 +59,7 @@ describe("RunQueue Concurrency Sweeper", () => {
59
59
60
60
const queue = new RunQueue ( {
61
61
...testOptions ,
62
+ logLevel : "debug" ,
62
63
queueSelectionStrategy : new FairQueueSelectionStrategy ( {
63
64
redis : {
64
65
keyPrefix : "runqueue:test:" ,
@@ -67,6 +68,10 @@ describe("RunQueue Concurrency Sweeper", () => {
67
68
} ,
68
69
keys : testOptions . keys ,
69
70
} ) ,
71
+ workerOptions : {
72
+ pollIntervalMs : 100 ,
73
+ immediatePollIntervalMs : 100 ,
74
+ } ,
70
75
redis : {
71
76
keyPrefix : "runqueue:test:" ,
72
77
host : redisContainer . getHost ( ) ,
Original file line number Diff line number Diff line change @@ -693,9 +693,8 @@ class Worker<TCatalog extends WorkerCatalog> {
693
693
const scheduledAt = this . calculateNextScheduledAt ( cron , lastTimestamp ) ;
694
694
const identifier = [ job , this . timestampIdentifier ( scheduledAt ) ] . join ( ":" ) ;
695
695
// Calculate the availableAt date by calculating a random number between -jitter/2 and jitter/2 and adding it to the scheduledAt
696
- const availableAt = jitter
697
- ? new Date ( scheduledAt . getTime ( ) + Math . random ( ) * jitter - jitter / 2 )
698
- : scheduledAt ;
696
+ const appliedJitter = typeof jitter === "number" ? Math . random ( ) * jitter - jitter / 2 : 0 ;
697
+ const availableAt = new Date ( scheduledAt . getTime ( ) + appliedJitter ) ;
699
698
700
699
const enqueued = await this . enqueueOnce ( {
701
700
id : identifier ,
@@ -715,6 +714,8 @@ class Worker<TCatalog extends WorkerCatalog> {
715
714
scheduledAt,
716
715
enqueued,
717
716
availableAt,
717
+ appliedJitter,
718
+ jitter,
718
719
} ) ;
719
720
720
721
return {
You can’t perform that action at this time.
0 commit comments