diff --git a/composer.json b/composer.json index cf82f5a..702979a 100644 --- a/composer.json +++ b/composer.json @@ -16,7 +16,7 @@ "require-dev": { "mockery/mockery": "^1.6", "phpunit/phpunit": "^12.1.5", - "doppar/framework": "^3.0.0" + "doppar/framework": "^3.0" }, "autoload": { "psr-4": { diff --git a/src/Commands/QueueRunCommand.php b/src/Commands/QueueRunCommand.php index bc51931..1362c6c 100644 --- a/src/Commands/QueueRunCommand.php +++ b/src/Commands/QueueRunCommand.php @@ -13,7 +13,7 @@ class QueueRunCommand extends Command * * @var string */ - protected $name = 'queue:run {--queue=default} {--sleep=3} {--memory=128} {--timeout=3600}'; + protected $name = 'queue:run {--queue=default} {--sleep=3} {--memory=128} {--timeout=3600} {--limit=}'; /** * The command description. @@ -50,7 +50,7 @@ public function __construct(QueueManager $manager) /** * Execute the console command. - * Example: php pool queue:run --queue=reports --sleep=10 --memory=1024 --timeout=3600 + * Example: php pool queue:run --queue=reports --sleep=10 --memory=1024 --timeout=3600 --limit= * * @return int */ @@ -61,9 +61,20 @@ protected function handle(): int $sleep = (int) $this->option('sleep', 3); $maxMemory = (int) $this->option('memory', 128); $maxTime = (int) $this->option('timeout', 3600); + $maxLimit = $this->option('limit'); - $this->info("Starting queue worker on queue: {$queue}"); - $this->info("Configuration: sleep={$sleep}s, memory={$maxMemory}MB, timeout={$maxTime}s"); + $maxLimit = $maxLimit !== null ? (int) $maxLimit : null; + + $this->displaySuccess("Starting queue worker on queue: {$queue}"); + $configInfo = "Configuration: sleep={$sleep}s, memory={$maxMemory}MB, timeout={$maxTime}s"; + + if (!empty($maxLimit)) { + $configInfo .= ", limit={$maxLimit} jobs"; + } else { + $configInfo .= ", limit=unlimited"; + } + + $this->info($configInfo); try { $this->worker->setOnJobProcessing(function ($job) { @@ -85,6 +96,7 @@ protected function handle(): int 'sleep' => $sleep, 'maxMemory' => $maxMemory, 'maxExecutionTime' => $maxTime, + 'maxJobs' => $maxLimit, ]); return Command::SUCCESS; diff --git a/src/QueueWorker.php b/src/QueueWorker.php index f05d850..7e54b84 100644 --- a/src/QueueWorker.php +++ b/src/QueueWorker.php @@ -42,6 +42,20 @@ class QueueWorker */ protected $maxMemory = 128; + /** + * The maximum number of jobs to process. + * + * @var int|null + */ + protected $maxJobs = null; + + /** + * The number of jobs processed. + * + * @var int + */ + protected $jobsProcessed = 0; + /** * Callback to be executed **before** a job is processed. * @@ -109,6 +123,12 @@ public function daemon(string $queue = 'default', array $options = []): void break; } + // Check if max jobs limit reached + if ($this->maxJobsReached()) { + $this->stop(0, "Maximum job limit of {$this->maxJobs} reached"); + break; + } + // Check memory usage if ($this->memoryExceeded()) { $this->stop(12, 'Memory limit exceeded'); @@ -143,6 +163,7 @@ protected function processNextJob(string $queue): void } $this->processJob($queueJob); + $this->jobsProcessed++; } catch (\Throwable $e) { $this->handleWorkerException($e); $this->sleep($this->sleep); @@ -261,6 +282,16 @@ protected function shouldQuit(): bool return $this->shouldQuit; } + /** + * Determine if the maximum number of jobs has been reached. + * + * @return bool + */ + protected function maxJobsReached(): bool + { + return !empty($this->maxJobs) && $this->jobsProcessed >= $this->maxJobs; + } + /** * Stop the worker. * @@ -352,6 +383,8 @@ protected function configureOptions(array $options): void if (isset($options['maxExecutionTime'])) { $this->maxExecutionTime = (int) $options['maxExecutionTime']; } + + $this->maxJobs = $options['maxJobs'] ?? null; } /** @@ -408,4 +441,25 @@ public function setMaxExecutionTime(int $seconds): void { $this->maxExecutionTime = $seconds; } + + /** + * Set the maximum number of jobs to process. + * + * @param int|null $maxJobs + * @return void + */ + public function setMaxJobs(?int $maxJobs): void + { + $this->maxJobs = $maxJobs; + } + + /** + * Get the number of jobs processed. + * + * @return int + */ + public function getJobsProcessed(): int + { + return $this->jobsProcessed; + } }