Skip to content

Commit b788fd4

Browse files
author
Arif Hoque
committed
maximum job limit added for worker
1 parent a73a580 commit b788fd4

File tree

3 files changed

+73
-5
lines changed

3 files changed

+73
-5
lines changed

composer.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
"require-dev": {
1717
"mockery/mockery": "^1.6",
1818
"phpunit/phpunit": "^12.1.5",
19-
"doppar/framework": "^3.0.0"
19+
"doppar/framework": "^3.0"
2020
},
2121
"autoload": {
2222
"psr-4": {

src/Commands/QueueRunCommand.php

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ class QueueRunCommand extends Command
1313
*
1414
* @var string
1515
*/
16-
protected $name = 'queue:run {--queue=default} {--sleep=3} {--memory=128} {--timeout=3600}';
16+
protected $name = 'queue:run {--queue=default} {--sleep=3} {--memory=128} {--timeout=3600} {--limit=}';
1717

1818
/**
1919
* The command description.
@@ -50,7 +50,7 @@ public function __construct(QueueManager $manager)
5050

5151
/**
5252
* Execute the console command.
53-
* Example: php pool queue:run --queue=reports --sleep=10 --memory=1024 --timeout=3600
53+
* Example: php pool queue:run --queue=reports --sleep=10 --memory=1024 --timeout=3600 --limit=
5454
*
5555
* @return int
5656
*/
@@ -61,9 +61,20 @@ protected function handle(): int
6161
$sleep = (int) $this->option('sleep', 3);
6262
$maxMemory = (int) $this->option('memory', 128);
6363
$maxTime = (int) $this->option('timeout', 3600);
64+
$maxLimit = $this->option('limit');
6465

65-
$this->info("Starting queue worker on queue: {$queue}");
66-
$this->info("Configuration: sleep={$sleep}s, memory={$maxMemory}MB, timeout={$maxTime}s");
66+
$maxLimit = $maxLimit !== null ? (int) $maxLimit : null;
67+
68+
$this->displaySuccess("Starting queue worker on queue: {$queue}");
69+
$configInfo = "Configuration: sleep={$sleep}s, memory={$maxMemory}MB, timeout={$maxTime}s";
70+
71+
if (!empty($maxLimit)) {
72+
$configInfo .= ", limit={$maxLimit} jobs";
73+
} else {
74+
$configInfo .= ", limit=unlimited";
75+
}
76+
77+
$this->info($configInfo);
6778

6879
try {
6980
$this->worker->setOnJobProcessing(function ($job) {
@@ -85,6 +96,7 @@ protected function handle(): int
8596
'sleep' => $sleep,
8697
'maxMemory' => $maxMemory,
8798
'maxExecutionTime' => $maxTime,
99+
'maxJobs' => $maxLimit,
88100
]);
89101

90102
return Command::SUCCESS;

src/QueueWorker.php

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,20 @@ class QueueWorker
4242
*/
4343
protected $maxMemory = 128;
4444

45+
/**
46+
* The maximum number of jobs to process.
47+
*
48+
* @var int|null
49+
*/
50+
protected $maxJobs = null;
51+
52+
/**
53+
* The number of jobs processed.
54+
*
55+
* @var int
56+
*/
57+
protected $jobsProcessed = 0;
58+
4559
/**
4660
* Callback to be executed **before** a job is processed.
4761
*
@@ -109,6 +123,12 @@ public function daemon(string $queue = 'default', array $options = []): void
109123
break;
110124
}
111125

126+
// Check if max jobs limit reached
127+
if ($this->maxJobsReached()) {
128+
$this->stop(0, "Maximum job limit of {$this->maxJobs} reached");
129+
break;
130+
}
131+
112132
// Check memory usage
113133
if ($this->memoryExceeded()) {
114134
$this->stop(12, 'Memory limit exceeded');
@@ -143,6 +163,7 @@ protected function processNextJob(string $queue): void
143163
}
144164

145165
$this->processJob($queueJob);
166+
$this->jobsProcessed++;
146167
} catch (\Throwable $e) {
147168
$this->handleWorkerException($e);
148169
$this->sleep($this->sleep);
@@ -261,6 +282,16 @@ protected function shouldQuit(): bool
261282
return $this->shouldQuit;
262283
}
263284

285+
/**
286+
* Determine if the maximum number of jobs has been reached.
287+
*
288+
* @return bool
289+
*/
290+
protected function maxJobsReached(): bool
291+
{
292+
return $this->maxJobs !== null && $this->jobsProcessed >= $this->maxJobs;
293+
}
294+
264295
/**
265296
* Stop the worker.
266297
*
@@ -352,6 +383,10 @@ protected function configureOptions(array $options): void
352383
if (isset($options['maxExecutionTime'])) {
353384
$this->maxExecutionTime = (int) $options['maxExecutionTime'];
354385
}
386+
387+
if (isset($options['maxJobs'])) {
388+
$this->maxJobs = (int) $options['maxJobs'];
389+
}
355390
}
356391

357392
/**
@@ -408,4 +443,25 @@ public function setMaxExecutionTime(int $seconds): void
408443
{
409444
$this->maxExecutionTime = $seconds;
410445
}
446+
447+
/**
448+
* Set the maximum number of jobs to process.
449+
*
450+
* @param int|null $maxJobs
451+
* @return void
452+
*/
453+
public function setMaxJobs(?int $maxJobs): void
454+
{
455+
$this->maxJobs = $maxJobs;
456+
}
457+
458+
/**
459+
* Get the number of jobs processed.
460+
*
461+
* @return int
462+
*/
463+
public function getJobsProcessed(): int
464+
{
465+
return $this->jobsProcessed;
466+
}
411467
}

0 commit comments

Comments
 (0)