Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
"require-dev": {
"mockery/mockery": "^1.6",
"phpunit/phpunit": "^12.1.5",
"doppar/framework": "^3.0.0"
"doppar/framework": "^3.0"
},
"autoload": {
"psr-4": {
Expand Down
20 changes: 16 additions & 4 deletions src/Commands/QueueRunCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class QueueRunCommand extends Command
*
* @var string
*/
protected $name = 'queue:run {--queue=default} {--sleep=3} {--memory=128} {--timeout=3600}';
protected $name = 'queue:run {--queue=default} {--sleep=3} {--memory=128} {--timeout=3600} {--limit=}';

/**
* The command description.
Expand Down Expand Up @@ -50,7 +50,7 @@ public function __construct(QueueManager $manager)

/**
* Execute the console command.
* Example: php pool queue:run --queue=reports --sleep=10 --memory=1024 --timeout=3600
* Example: php pool queue:run --queue=reports --sleep=10 --memory=1024 --timeout=3600 --limit=
*
* @return int
*/
Expand All @@ -61,9 +61,20 @@ protected function handle(): int
$sleep = (int) $this->option('sleep', 3);
$maxMemory = (int) $this->option('memory', 128);
$maxTime = (int) $this->option('timeout', 3600);
$maxLimit = $this->option('limit');

$this->info("Starting queue worker on queue: {$queue}");
$this->info("Configuration: sleep={$sleep}s, memory={$maxMemory}MB, timeout={$maxTime}s");
$maxLimit = $maxLimit !== null ? (int) $maxLimit : null;

$this->displaySuccess("Starting queue worker on queue: {$queue}");
$configInfo = "Configuration: sleep={$sleep}s, memory={$maxMemory}MB, timeout={$maxTime}s";

if (!empty($maxLimit)) {
$configInfo .= ", limit={$maxLimit} jobs";
} else {
$configInfo .= ", limit=unlimited";
}

$this->info($configInfo);

try {
$this->worker->setOnJobProcessing(function ($job) {
Expand All @@ -85,6 +96,7 @@ protected function handle(): int
'sleep' => $sleep,
'maxMemory' => $maxMemory,
'maxExecutionTime' => $maxTime,
'maxJobs' => $maxLimit,
]);

return Command::SUCCESS;
Expand Down
54 changes: 54 additions & 0 deletions src/QueueWorker.php
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,20 @@ class QueueWorker
*/
protected $maxMemory = 128;

/**
* The maximum number of jobs to process.
*
* @var int|null
*/
protected $maxJobs = null;

/**
* The number of jobs processed.
*
* @var int
*/
protected $jobsProcessed = 0;

/**
* Callback to be executed **before** a job is processed.
*
Expand Down Expand Up @@ -109,6 +123,12 @@ public function daemon(string $queue = 'default', array $options = []): void
break;
}

// Check if max jobs limit reached
if ($this->maxJobsReached()) {
$this->stop(0, "Maximum job limit of {$this->maxJobs} reached");
break;
}

// Check memory usage
if ($this->memoryExceeded()) {
$this->stop(12, 'Memory limit exceeded');
Expand Down Expand Up @@ -143,6 +163,7 @@ protected function processNextJob(string $queue): void
}

$this->processJob($queueJob);
$this->jobsProcessed++;
} catch (\Throwable $e) {
$this->handleWorkerException($e);
$this->sleep($this->sleep);
Expand Down Expand Up @@ -261,6 +282,16 @@ protected function shouldQuit(): bool
return $this->shouldQuit;
}

/**
* Determine if the maximum number of jobs has been reached.
*
* @return bool
*/
protected function maxJobsReached(): bool
{
return !empty($this->maxJobs) && $this->jobsProcessed >= $this->maxJobs;
}

/**
* Stop the worker.
*
Expand Down Expand Up @@ -352,6 +383,8 @@ protected function configureOptions(array $options): void
if (isset($options['maxExecutionTime'])) {
$this->maxExecutionTime = (int) $options['maxExecutionTime'];
}

$this->maxJobs = $options['maxJobs'] ?? null;
}

/**
Expand Down Expand Up @@ -408,4 +441,25 @@ public function setMaxExecutionTime(int $seconds): void
{
$this->maxExecutionTime = $seconds;
}

/**
* Set the maximum number of jobs to process.
*
* @param int|null $maxJobs
* @return void
*/
public function setMaxJobs(?int $maxJobs): void
{
$this->maxJobs = $maxJobs;
}

/**
* Get the number of jobs processed.
*
* @return int
*/
public function getJobsProcessed(): int
{
return $this->jobsProcessed;
}
}