diff --git a/.github/workflows/linux-php-8.0.yml b/.github/workflows/linux-php-8.0.yml deleted file mode 100644 index 3408ee8..0000000 --- a/.github/workflows/linux-php-8.0.yml +++ /dev/null @@ -1,15 +0,0 @@ -name: CI Tests on Linux with PHP 8.0 - -on: - push: - branches: - - main - pull_request: - schedule: - - cron: '0 0 * * 0' - -jobs: - linux_php_80: - uses: ./.github/workflows/ci-tests.yml - with: - php: 8.0 diff --git a/.github/workflows/linux-php-8.1.yml b/.github/workflows/linux-php-8.1.yml deleted file mode 100644 index 89c467a..0000000 --- a/.github/workflows/linux-php-8.1.yml +++ /dev/null @@ -1,15 +0,0 @@ -name: CI Tests on Linux with PHP 8.1 - -on: - push: - branches: - - main - pull_request: - schedule: - - cron: '0 0 * * 0' - -jobs: - linux_php_81: - uses: ./.github/workflows/ci-tests.yml - with: - php: 8.1 diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..d871b69 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,33 @@ +# Changelog + +All notable changes to **parallel-sdk** are documented in this file. The format follows [Keep a Changelog](https://keepachangelog.com/en/1.1.0/) and the project adheres to [Semantic Versioning](https://semver.org/). + +## `3.0.0` – 2025-07-04 + +### Added +- **Domain-specific exception hierarchy** + `ActionNotImplementedException`, `InvalidMessageReceivedException`, `NoWorkerDefinedException`, `TaskExecutionFailedException`, `WorkerAlreadyDefinedException`, `WorkerNotDefinedException`, plus a `ParallelException` value object to serialise task-side errors. +- `Scheduler::runTask()` now surfaces worker exceptions through `TaskExecutionFailedException` for clearer diagnostics. +- Adopted modern PHP 8.2 language features: + * `readonly` properties and promoted constructor parameters + * Precise return-type hints (e.g. `void` on closures) + * `match`-style strict comparisons and assorted PSR-12 tidy-ups. + +### Changed +- **BC-BREAK:** Minimum supported PHP version raised from 8.0 → **8.2**. + Consumers on <8.2 will remain on the `2.x` series. +- `Scheduler::using()` now throws `WorkerAlreadyDefinedException` when attempting to re-register an existing worker with constructor args. +- Error bubbling in `Scheduler::runTask()` changed from `RuntimeException` with string message to the typed `TaskExecutionFailedException`. + +### Removed +- CI workflows for PHP 8.0 & 8.1 ‒ the suite now runs on 8.2/8.3/8.4. +- Legacy catch-all `RuntimeException` branches replaced with specific domain exceptions. + +### Fixed +- Sporadic race-condition when starting the runner under PHP ≥ 8.1 by ensuring a micro-sleep and event handshake. +- Several docblock inaccuracies and progress-bar initialisation edge cases. + +--- + +## [2.1.4] – 2024-xx-xx +_Refer to Git history for details prior to the 3.x line._ diff --git a/README.md b/README.md index 7f120f8..f9ad3a2 100644 --- a/README.md +++ b/README.md @@ -7,8 +7,6 @@ An implementation of [krakjoe/parallel](https://github.com/krakjoe/parallel) PHP [![Monthly Downloads](https://img.shields.io/packagist/dm/hds-solutions/parallel-sdk?style=flat-square&color=747474&label)](https://packagist.org/packages/hds-solutions/parallel-sdk) [![Required PHP version](https://img.shields.io/packagist/dependency-v/hds-solutions/parallel-sdk/php?style=flat-square&color=006496&logo=php&logoColor=white)](https://packagist.org/packages/hds-solutions/parallel-sdk) -[![PHP 8.0](https://img.shields.io/github/actions/workflow/status/hds-solutions/parallel-sdk/linux-php-8.0.yml?style=flat-square&logo=github&label=PHP%208.0)](https://github.com/hschimpf/parallel-sdk/actions/workflows/linux-php-8.0.yml) -[![PHP 8.1](https://img.shields.io/github/actions/workflow/status/hds-solutions/parallel-sdk/linux-php-8.1.yml?style=flat-square&logo=github&label=PHP%208.1)](https://github.com/hschimpf/parallel-sdk/actions/workflows/linux-php-8.1.yml) [![PHP 8.2](https://img.shields.io/github/actions/workflow/status/hds-solutions/parallel-sdk/linux-php-8.2.yml?style=flat-square&logo=github&label=PHP%208.2)](https://github.com/hschimpf/parallel-sdk/actions/workflows/linux-php-8.2.yml) [![PHP 8.3](https://img.shields.io/github/actions/workflow/status/hds-solutions/parallel-sdk/linux-php-8.3.yml?style=flat-square&logo=github&label=PHP%208.3)](https://github.com/hschimpf/parallel-sdk/actions/workflows/linux-php-8.3.yml) [![PHP 8.4](https://img.shields.io/github/actions/workflow/status/hds-solutions/parallel-sdk/linux-php-8.4.yml?style=flat-square&logo=github&label=PHP%208.4)](https://github.com/hschimpf/parallel-sdk/actions/workflows/linux-php-8.4.yml) @@ -19,7 +17,7 @@ That allow that your code can be deployed in any environment, and if `parallel` ## Installation ### Dependencies You need these dependencies to execute tasks in parallel. -- PHP >= 8.0 with ZTS enabled +- PHP >= 8.2 with ZTS enabled - parallel PECL extension _(v1.2.5 or higher)_ Parallel extension documentation can be found on https://php.net/parallel. diff --git a/composer.json b/composer.json index 69da485..d051ed5 100644 --- a/composer.json +++ b/composer.json @@ -15,7 +15,7 @@ "symfony/console": "Allows usage of a shared ProgressBar between the Workers" }, "require": { - "php": "^8.0" + "php": "^8.2" }, "autoload": { "files": [ diff --git a/src/Exceptions/ActionNotImplementedException.php b/src/Exceptions/ActionNotImplementedException.php new file mode 100644 index 0000000..fb07e94 --- /dev/null +++ b/src/Exceptions/ActionNotImplementedException.php @@ -0,0 +1,13 @@ +getMessage()); + } + +} diff --git a/src/Exceptions/WorkerAlreadyDefinedException.php b/src/Exceptions/WorkerAlreadyDefinedException.php new file mode 100644 index 0000000..cdb3849 --- /dev/null +++ b/src/Exceptions/WorkerAlreadyDefinedException.php @@ -0,0 +1,15 @@ +$name; - } - } diff --git a/src/Internals/Commands/ProgressBar/EnableProgressBarMessage.php b/src/Internals/Commands/ProgressBar/EnableProgressBarMessage.php index 60c57b3..2e78d68 100644 --- a/src/Internals/Commands/ProgressBar/EnableProgressBarMessage.php +++ b/src/Internals/Commands/ProgressBar/EnableProgressBarMessage.php @@ -8,7 +8,7 @@ /** * Message sent to {@see Runner} to execute {@see Runner::enableProgressBar()} action */ -final class EnableProgressBarMessage extends ParallelCommandMessage { +final readonly class EnableProgressBarMessage extends ParallelCommandMessage { /** * @param string $identifier diff --git a/src/Internals/Commands/ProgressBar/ProgressBarActionMessage.php b/src/Internals/Commands/ProgressBar/ProgressBarActionMessage.php index d51a67a..fab0f7d 100644 --- a/src/Internals/Commands/ProgressBar/ProgressBarActionMessage.php +++ b/src/Internals/Commands/ProgressBar/ProgressBarActionMessage.php @@ -7,7 +7,7 @@ /** * Message sent to {@see ProgressBarWorker} to execute {@see ProgressBarWorker::progressBarAction()} */ -final class ProgressBarActionMessage extends ParallelCommandMessage { +final readonly class ProgressBarActionMessage extends ParallelCommandMessage { /** * @param string $action diff --git a/src/Internals/Commands/ProgressBar/ProgressBarRegistrationMessage.php b/src/Internals/Commands/ProgressBar/ProgressBarRegistrationMessage.php index dc975fb..414b195 100644 --- a/src/Internals/Commands/ProgressBar/ProgressBarRegistrationMessage.php +++ b/src/Internals/Commands/ProgressBar/ProgressBarRegistrationMessage.php @@ -8,7 +8,7 @@ /** * Message sent to {@see ProgressBarWorker} to execute {@see ProgressBarWorker::registerWorker()} action */ -final class ProgressBarRegistrationMessage extends ParallelCommandMessage { +final readonly class ProgressBarRegistrationMessage extends ParallelCommandMessage { /** * @param string $worker diff --git a/src/Internals/Commands/ProgressBar/StatsReportMessage.php b/src/Internals/Commands/ProgressBar/StatsReportMessage.php index 50f4b8d..ea4981a 100644 --- a/src/Internals/Commands/ProgressBar/StatsReportMessage.php +++ b/src/Internals/Commands/ProgressBar/StatsReportMessage.php @@ -7,7 +7,7 @@ /** * Message sent to {@see ProgressBarWorker} to execute {@see ProgressBarWorker::statsReport()} */ -final class StatsReportMessage extends ParallelCommandMessage { +final readonly class StatsReportMessage extends ParallelCommandMessage { /** * @param string $worker_id diff --git a/src/Internals/Commands/Runner/GetRegisteredWorkerMessage.php b/src/Internals/Commands/Runner/GetRegisteredWorkerMessage.php index a88dd86..9766a23 100644 --- a/src/Internals/Commands/Runner/GetRegisteredWorkerMessage.php +++ b/src/Internals/Commands/Runner/GetRegisteredWorkerMessage.php @@ -8,7 +8,7 @@ /** * Message sent to {@see Runner} to execute {@see Runner::getRegisteredWorker()} action */ -final class GetRegisteredWorkerMessage extends ParallelCommandMessage { +final readonly class GetRegisteredWorkerMessage extends ParallelCommandMessage { /** * @param string $worker diff --git a/src/Internals/Commands/Runner/GetTasksMessage.php b/src/Internals/Commands/Runner/GetTasksMessage.php index d8a7136..54771b9 100644 --- a/src/Internals/Commands/Runner/GetTasksMessage.php +++ b/src/Internals/Commands/Runner/GetTasksMessage.php @@ -8,7 +8,7 @@ /** * Message sent to {@see Runner} to execute {@see Runner::getTasks()} action */ -final class GetTasksMessage extends ParallelCommandMessage { +final readonly class GetTasksMessage extends ParallelCommandMessage { public function __construct() { parent::__construct('get_tasks'); diff --git a/src/Internals/Commands/Runner/QueueTaskMessage.php b/src/Internals/Commands/Runner/QueueTaskMessage.php index 3944e6f..79b0166 100644 --- a/src/Internals/Commands/Runner/QueueTaskMessage.php +++ b/src/Internals/Commands/Runner/QueueTaskMessage.php @@ -8,7 +8,7 @@ /** * Message sent to {@see Runner} to execute {@see Runner::queueTask()} action */ -final class QueueTaskMessage extends ParallelCommandMessage { +final readonly class QueueTaskMessage extends ParallelCommandMessage { /** * @param array $data diff --git a/src/Internals/Commands/Runner/RegisterWorkerMessage.php b/src/Internals/Commands/Runner/RegisterWorkerMessage.php index 5fc5d79..9a7e784 100644 --- a/src/Internals/Commands/Runner/RegisterWorkerMessage.php +++ b/src/Internals/Commands/Runner/RegisterWorkerMessage.php @@ -9,7 +9,7 @@ /** * Message sent to {@see Runner} to execute {@see Runner::registerWorker()} action */ -final class RegisterWorkerMessage extends ParallelCommandMessage { +final readonly class RegisterWorkerMessage extends ParallelCommandMessage { /** * @param string|Closure $worker diff --git a/src/Internals/Commands/Runner/RemoveAllTasksMessage.php b/src/Internals/Commands/Runner/RemoveAllTasksMessage.php index d869f58..3b4c52b 100644 --- a/src/Internals/Commands/Runner/RemoveAllTasksMessage.php +++ b/src/Internals/Commands/Runner/RemoveAllTasksMessage.php @@ -8,7 +8,7 @@ /** * Message sent to {@see Runner} to execute {@see Runner::removeAllTasks()} action */ -final class RemoveAllTasksMessage extends ParallelCommandMessage { +final readonly class RemoveAllTasksMessage extends ParallelCommandMessage { public function __construct() { parent::__construct('remove_all_tasks'); diff --git a/src/Internals/Commands/Runner/RemovePendingTasksMessage.php b/src/Internals/Commands/Runner/RemovePendingTasksMessage.php index ce14cc2..8030658 100644 --- a/src/Internals/Commands/Runner/RemovePendingTasksMessage.php +++ b/src/Internals/Commands/Runner/RemovePendingTasksMessage.php @@ -8,7 +8,7 @@ /** * Message sent to {@see Runner} to execute {@see Runner::removePendingTasks()} action */ -final class RemovePendingTasksMessage extends ParallelCommandMessage { +final readonly class RemovePendingTasksMessage extends ParallelCommandMessage { public function __construct() { parent::__construct('remove_pending_tasks'); diff --git a/src/Internals/Commands/Runner/RemoveTaskMessage.php b/src/Internals/Commands/Runner/RemoveTaskMessage.php index 12f5920..e2985ad 100644 --- a/src/Internals/Commands/Runner/RemoveTaskMessage.php +++ b/src/Internals/Commands/Runner/RemoveTaskMessage.php @@ -8,7 +8,7 @@ /** * Message sent to {@see Runner} to execute {@see Runner::removeTask()} action */ -final class RemoveTaskMessage extends ParallelCommandMessage { +final readonly class RemoveTaskMessage extends ParallelCommandMessage { /** * @param int $task_id diff --git a/src/Internals/Commands/Runner/SetMaxCpuUsage.php b/src/Internals/Commands/Runner/SetMaxCpuUsage.php index ebae7d9..4f72aad 100644 --- a/src/Internals/Commands/Runner/SetMaxCpuUsage.php +++ b/src/Internals/Commands/Runner/SetMaxCpuUsage.php @@ -7,7 +7,7 @@ /** * Message sent to {@see Runner} to execute {@see Runner::setMaxCpuCountUsage()} | {@see Runner::setMaxCpuPercentageUsage()} action */ -final class SetMaxCpuUsage extends ParallelCommandMessage { +final readonly class SetMaxCpuUsage extends ParallelCommandMessage { public function __construct(int | float $max, bool $percentage = false) { parent::__construct(sprintf('set_max_cpu_%s_usage', $percentage ? 'percentage' : 'count'), [ $max ]); diff --git a/src/Internals/Commands/Runner/StopRunningTasksMessage.php b/src/Internals/Commands/Runner/StopRunningTasksMessage.php index 2d6ec2b..69a0b6f 100644 --- a/src/Internals/Commands/Runner/StopRunningTasksMessage.php +++ b/src/Internals/Commands/Runner/StopRunningTasksMessage.php @@ -8,7 +8,7 @@ /** * Message sent to {@see Runner} to execute {@see Runner::stopRunningTasks()} action */ -final class StopRunningTasksMessage extends ParallelCommandMessage { +final readonly class StopRunningTasksMessage extends ParallelCommandMessage { public function __construct() { parent::__construct('stop_running_tasks', [ true ]); diff --git a/src/Internals/Commands/Runner/UpdateMessage.php b/src/Internals/Commands/Runner/UpdateMessage.php index 2e31417..5f63fa0 100644 --- a/src/Internals/Commands/Runner/UpdateMessage.php +++ b/src/Internals/Commands/Runner/UpdateMessage.php @@ -8,7 +8,7 @@ /** * Message sent to {@see Runner} to execute {@see Runner::update()} action */ -final class UpdateMessage extends ParallelCommandMessage { +final readonly class UpdateMessage extends ParallelCommandMessage { public function __construct() { parent::__construct('update'); diff --git a/src/Internals/Commands/Runner/WaitTasksCompletionMessage.php b/src/Internals/Commands/Runner/WaitTasksCompletionMessage.php index c5495eb..18767ab 100644 --- a/src/Internals/Commands/Runner/WaitTasksCompletionMessage.php +++ b/src/Internals/Commands/Runner/WaitTasksCompletionMessage.php @@ -10,7 +10,7 @@ /** * Message sent to {@see Runner} to execute {@see Runner::await()} action */ -final class WaitTasksCompletionMessage extends ParallelCommandMessage { +final readonly class WaitTasksCompletionMessage extends ParallelCommandMessage { /** * @param DateInterval|null $wait_until diff --git a/src/Internals/Common/ListenEventsAndExecuteActions.php b/src/Internals/Common/ListenEventsAndExecuteActions.php index 7c7d25e..7633bd1 100644 --- a/src/Internals/Common/ListenEventsAndExecuteActions.php +++ b/src/Internals/Common/ListenEventsAndExecuteActions.php @@ -2,9 +2,10 @@ namespace HDSSolutions\Console\Parallel\Internals\Common; +use HDSSolutions\Console\Parallel\Exceptions\ActionNotImplementedException; +use HDSSolutions\Console\Parallel\Exceptions\InvalidMessageReceivedException; use HDSSolutions\Console\Parallel\Exceptions\ParallelException; use HDSSolutions\Console\Parallel\Internals\Commands\ParallelCommandMessage; -use RuntimeException; use parallel\Channel; use parallel\Events\Event; use Throwable; @@ -12,7 +13,7 @@ trait ListenEventsAndExecuteActions { /** - * Watch for events. This is used only on a multi-threaded environment + * Watch for events. This is used only on a multithreaded environment */ final public function listen(): void { // notify successful start @@ -23,7 +24,7 @@ final public function listen(): void { try { // check if we got a valid message if ( !($message instanceof ParallelCommandMessage)) { - throw new RuntimeException('Invalid message received!'); + throw new InvalidMessageReceivedException; } // process message @@ -45,12 +46,12 @@ abstract protected function afterListening(): void; * @param ParallelCommandMessage $message * * @return mixed - * @throws RuntimeException If the requested action isn't implemented + * @throws ActionNotImplementedException If the requested action isn't implemented */ final public function processMessage(ParallelCommandMessage $message): mixed { // check if action is implemented if ( !method_exists($this, $method = lcfirst(implode('', array_map('ucfirst', explode('_', $message->action)))))) { - throw new RuntimeException(sprintf('Action "%s" not yet implemented', $message->action)); + throw new ActionNotImplementedException($message->action); } // execute action and return the result diff --git a/src/Internals/Communication/TwoWayChannel.php b/src/Internals/Communication/TwoWayChannel.php index dbb4eb6..3796a61 100644 --- a/src/Internals/Communication/TwoWayChannel.php +++ b/src/Internals/Communication/TwoWayChannel.php @@ -20,10 +20,7 @@ final class TwoWayChannel implements Contracts\TwoWayChannel { /** * Disable constructor */ - private function __construct( - private string $name, - private bool $creator = false, - ) {} + private function __construct() {} /** * Shall make an unbuffered two-way channel with the given name
@@ -35,7 +32,7 @@ private function __construct( * @throws Channel\Error\Existence if channel already exists */ public static function make(string $name): self { - $instance = new self($name, true); + $instance = new self; // create channels $instance->input = Channel::make("$name@input"); $instance->output = Channel::make("$name@output"); @@ -52,7 +49,7 @@ public static function make(string $name): self { * @throws Channel\Error\Existence if channel does not exist */ public static function open(string $name): self { - $instance = new self($name, false); + $instance = new self; // create channels $instance->input = Channel::open("$name@output"); $instance->output = Channel::open("$name@input"); diff --git a/src/Internals/ProgressBarWorker.php b/src/Internals/ProgressBarWorker.php index f1ec5e6..8b7839f 100644 --- a/src/Internals/ProgressBarWorker.php +++ b/src/Internals/ProgressBarWorker.php @@ -2,7 +2,6 @@ namespace HDSSolutions\Console\Parallel\Internals; -use HDSSolutions\Console\Parallel\Internals\Common; use Symfony\Component\Console\Helper\Helper; final class ProgressBarWorker { @@ -21,11 +20,8 @@ final class ProgressBarWorker { */ private array $items = []; - /** - * @param string $uuid - */ public function __construct( - private string $uuid, + private readonly string $uuid, ) { $this->openChannels(); $this->createProgressBar(); @@ -41,7 +37,7 @@ public function afterListening(): void { $this->closeChannels(); } - protected function registerWorker(string $worker, int $steps = 0): void { + private function registerWorker(string $worker, int $steps = 0): void { // check if ProgressBar isn't already started if ( !$this->progressBarStarted) { // start Worker ProgressBar @@ -56,7 +52,7 @@ protected function registerWorker(string $worker, int $steps = 0): void { $this->release(); } - protected function progressBarAction(string $action, array $args): void { + private function progressBarAction(string $action, array $args): void { // redirect action to ProgressBar instance $this->progressBar->$action(...$args); @@ -68,7 +64,7 @@ protected function progressBarAction(string $action, array $args): void { } } - protected function statsReport(string $worker_id, int $memory_usage): void { + private function statsReport(string $worker_id, int $memory_usage): void { // save memory usage of thread $this->threads_memory['current'][$worker_id] = $memory_usage; // update peak memory usage @@ -95,7 +91,7 @@ private function getMemoryUsage(): string { private function getItemsPerSecond(): string { // check for empty list - if (empty($this->items)) return '0'; + if ($this->items === []) return '0'; // keep only last 15s for average $this->items = array_slice($this->items, -15, preserve_keys: true); diff --git a/src/Internals/ProgressBarWorker/HasChannels.php b/src/Internals/ProgressBarWorker/HasChannels.php index 16ca048..f42c330 100644 --- a/src/Internals/ProgressBarWorker/HasChannels.php +++ b/src/Internals/ProgressBarWorker/HasChannels.php @@ -12,7 +12,7 @@ trait HasChannels { private TwoWayChannel $progressbar_channel; private function openChannels(): void { - if ( !PARALLEL_EXT_LOADED) return; + if (! PARALLEL_EXT_LOADED) return; // channel to receive and process ProgressBar events $this->progressbar_channel = TwoWayChannel::make(self::class.'@'.$this->uuid); @@ -27,7 +27,7 @@ protected function send(mixed $value): mixed { } protected function release(): void { - if ( !PARALLEL_EXT_LOADED) return; + if (! PARALLEL_EXT_LOADED) return; $this->progressbar_channel->release(); } diff --git a/src/Internals/ProgressBarWorker/HasProgressBar.php b/src/Internals/ProgressBarWorker/HasProgressBar.php index 189b59e..d35f822 100644 --- a/src/Internals/ProgressBarWorker/HasProgressBar.php +++ b/src/Internals/ProgressBarWorker/HasProgressBar.php @@ -18,13 +18,13 @@ trait HasProgressBar { private bool $progressBarStarted = false; private function createProgressBar(): void { - $this->progressBar = new ProgressBar(new ConsoleOutput()); + $this->progressBar = new ProgressBar(new ConsoleOutput); // configure ProgressBar settings - $this->progressBar->setBarWidth( 80 ); - $this->progressBar->setRedrawFrequency( 100 ); - $this->progressBar->minSecondsBetweenRedraws( 0.1 ); - $this->progressBar->maxSecondsBetweenRedraws( 0.2 ); + $this->progressBar->setBarWidth(80); + $this->progressBar->setRedrawFrequency(100); + $this->progressBar->minSecondsBetweenRedraws(0.1); + $this->progressBar->maxSecondsBetweenRedraws(0.2); $this->progressBar->setFormat(format: "%current% of %max%: %message%\n". "[%bar%] %percent:3s%%\n". diff --git a/src/Internals/Runner.php b/src/Internals/Runner.php index 941580d..0d48d07 100644 --- a/src/Internals/Runner.php +++ b/src/Internals/Runner.php @@ -3,10 +3,11 @@ namespace HDSSolutions\Console\Parallel\Internals; use Closure; -use HDSSolutions\Console\Parallel\Internals\Common; +use HDSSolutions\Console\Parallel\Exceptions\NoWorkerDefinedException; +use HDSSolutions\Console\Parallel\Exceptions\WorkerAlreadyDefinedException; +use HDSSolutions\Console\Parallel\Exceptions\WorkerNotDefinedException; use HDSSolutions\Console\Parallel\RegisteredWorker; use HDSSolutions\Console\Parallel\Task; -use RuntimeException; use Throwable; final class Runner { @@ -40,15 +41,15 @@ private function getMaxCpuUsage(): int { return $this->max_cpu_count ??= (isset($_SERVER['PARALLEL_MAX_COUNT']) ? (int) $_SERVER['PARALLEL_MAX_COUNT'] : cpu_count( (float) ($_SERVER['PARALLEL_MAX_PERCENT'] ?? 1.0) )); } - protected function setMaxCpuCountUsage(int $count): int { + private function setMaxCpuCountUsage(int $count): int { return $this->send($this->max_cpu_count = $count); } - protected function setMaxCpuPercentageUsage(float $percentage): int { + private function setMaxCpuPercentageUsage(float $percentage): int { return $this->send($this->max_cpu_count = max(1, cpu_count($percentage))); } - protected function getRegisteredWorker(string $worker): RegisteredWorker | false { + private function getRegisteredWorker(string $worker): RegisteredWorker | false { if ( !array_key_exists($worker, $this->workers_hashmap)) { return $this->send(false); } @@ -58,10 +59,10 @@ protected function getRegisteredWorker(string $worker): RegisteredWorker | false ->send($this->getSelectedWorker()); } - protected function registerWorker(string | Closure $worker, array $args = []): RegisteredWorker { + private function registerWorker(string | Closure $worker, array $args = []): RegisteredWorker { // check if worker is already registered if (is_string($worker) && array_key_exists($worker, $this->workers_hashmap)) { - throw new RuntimeException(sprintf('Worker class "%s" is already registered', $worker)); + throw new WorkerAlreadyDefinedException($worker); } // register worker @@ -79,10 +80,10 @@ protected function registerWorker(string | Closure $worker, array $args = []): R ->send($registered_worker); } - protected function queueTask(array $data): int { + private function queueTask(array $data): int { if (null === $worker = $this->getSelectedWorker()) { // reject task scheduling, no worker is defined - throw new RuntimeException('No worker is defined'); + throw new NoWorkerDefinedException; } // get next task id @@ -99,7 +100,7 @@ protected function queueTask(array $data): int { $this->pending_tasks[$task_id] = $task->getIdentifier(); // if we are on a non-threaded environment, - if ( !PARALLEL_EXT_LOADED) { + if (! PARALLEL_EXT_LOADED) { // just process the Task $this->startNextPendingTask(); // clean finished Task @@ -109,8 +110,8 @@ protected function queueTask(array $data): int { return $this->send($task->getIdentifier()); } - protected function getTasks(): array | false { - if ( !PARALLEL_EXT_LOADED) { + private function getTasks(): array | false { + if (! PARALLEL_EXT_LOADED) { return $this->tasks; } @@ -122,7 +123,7 @@ protected function getTasks(): array | false { return false; } - protected function removeTask(int $task_id): bool { + private function removeTask(int $task_id): bool { // remove it from pending tasks if (array_key_exists($task_id, $this->pending_tasks)) { unset($this->pending_tasks[$task_id]); @@ -147,7 +148,7 @@ protected function removeTask(int $task_id): bool { return $this->send(false); } - protected function removeAllTasks(): bool { + private function removeAllTasks(): bool { $this->stopRunningTasks(); $this->tasks = []; @@ -156,18 +157,18 @@ protected function removeAllTasks(): bool { return $this->send(true); } - protected function removePendingTasks(): bool { + private function removePendingTasks(): bool { // clear pending tasks $this->pending_tasks = []; return $this->send(true); } - protected function stopRunningTasks(bool $should_return = false): bool { + private function stopRunningTasks(bool $should_return = false): bool { // kill all running threads foreach ($this->running_tasks as $task_id => $running_task) { // check if future is already done working - if ( !PARALLEL_EXT_LOADED || $running_task->done()) { + if (! PARALLEL_EXT_LOADED || $running_task->done()) { // store the ProcessedTask try { // get the result of the process @@ -196,9 +197,9 @@ protected function stopRunningTasks(bool $should_return = false): bool { return true; } - protected function enableProgressBar(string $worker_id, int $steps): bool { + private function enableProgressBar(string $worker_id, int $steps): bool { if ( !array_key_exists($worker_id, $this->workers_hashmap)) { - throw new RuntimeException('Worker is not defined'); + throw new WorkerNotDefinedException; } // get registered Worker @@ -217,7 +218,7 @@ protected function enableProgressBar(string $worker_id, int $steps): bool { return $this->send(true); } - protected function update(): void { + private function update(): void { $this->cleanFinishedTasks(); while ($this->hasCpuAvailable() && $this->hasPendingTasks()) { $this->startNextPendingTask(); @@ -234,7 +235,7 @@ protected function update(): void { } } - protected function await(?int $wait_until = null): bool { + private function await(?int $wait_until = null): bool { if (PARALLEL_EXT_LOADED) { return $this->send(time() <= ($wait_until ?? time()) && ($this->hasPendingTasks() || $this->hasRunningTasks())); } diff --git a/src/Internals/Runner/HasChannels.php b/src/Internals/Runner/HasChannels.php index 0aecaa8..8ebe14e 100644 --- a/src/Internals/Runner/HasChannels.php +++ b/src/Internals/Runner/HasChannels.php @@ -23,7 +23,7 @@ trait HasChannels { private Channel $tasks_channel; private function openChannels(): void { - if ( !PARALLEL_EXT_LOADED) return; + if (! PARALLEL_EXT_LOADED) return; // channels to receive and process events $this->channel = TwoWayChannel::make(self::class.'@'.$this->uuid); diff --git a/src/Internals/Runner/HasEater.php b/src/Internals/Runner/HasEater.php index ea840c8..4bb402b 100644 --- a/src/Internals/Runner/HasEater.php +++ b/src/Internals/Runner/HasEater.php @@ -14,10 +14,10 @@ trait HasEater { private Future $eater; private function startEater(): void { - if ( !PARALLEL_EXT_LOADED) return; + if (! PARALLEL_EXT_LOADED) return; // run an eater to keep updating states - $this->eater = (new Runtime(PARALLEL_AUTOLOADER))->run(static function(string $uuid): void { + $this->eater = (new Runtime(PARALLEL_AUTOLOADER))->run(static function (string $uuid): void { // create communication channel $channel = TwoWayChannel::make(Runner::class.'@'.$uuid.':eater'); // open communication channel with the Runner diff --git a/src/Internals/Runner/HasSharedProgressBar.php b/src/Internals/Runner/HasSharedProgressBar.php index 770f11f..c698098 100644 --- a/src/Internals/Runner/HasSharedProgressBar.php +++ b/src/Internals/Runner/HasSharedProgressBar.php @@ -41,7 +41,7 @@ private function initProgressBar(): void { : new Internals\ProgressBarWorker($this->uuid); // check if progressbar is already started, or we are on a non-threaded environment - if ($this->progressbar_started || !PARALLEL_EXT_LOADED) return; + if ($this->progressbar_started || ! PARALLEL_EXT_LOADED) return; // open communication channel with the ProgressBar worker while ($this->progressbar_channel === null) { @@ -57,7 +57,7 @@ private function initProgressBar(): void { } private function stopProgressBar(): void { - if ( !PARALLEL_EXT_LOADED || !$this->progressbar_started) return; + if (! PARALLEL_EXT_LOADED || ! $this->progressbar_started) return; // stop ProgressBar worker instance $this->progressbar_channel->send(Event\Type::Close); diff --git a/src/Internals/Runner/ManagesTasks.php b/src/Internals/Runner/ManagesTasks.php index 45e2552..86c0cfe 100644 --- a/src/Internals/Runner/ManagesTasks.php +++ b/src/Internals/Runner/ManagesTasks.php @@ -47,7 +47,7 @@ private function cleanFinishedTasks(): void { $finished_tasks = []; foreach ($this->running_tasks as $idx => $running_task) { // check if future is already done working - if ( !PARALLEL_EXT_LOADED || $running_task->done()) { + if (! PARALLEL_EXT_LOADED || $running_task->done()) { // store the ProcessedTask try { // get the result of the process diff --git a/src/Internals/Worker.php b/src/Internals/Worker.php index 9972215..2410a4c 100644 --- a/src/Internals/Worker.php +++ b/src/Internals/Worker.php @@ -7,7 +7,7 @@ final class Worker extends ParallelWorker { - public function process(Closure $processor = null, ...$data): mixed { + public function process(?Closure $processor = null, ...$data): mixed { // execute original closure return $processor(...$data); } diff --git a/src/Internals/Worker/CommunicatesWithProgressBarWorker.php b/src/Internals/Worker/CommunicatesWithProgressBarWorker.php index cff2f08..e16919c 100644 --- a/src/Internals/Worker/CommunicatesWithProgressBarWorker.php +++ b/src/Internals/Worker/CommunicatesWithProgressBarWorker.php @@ -16,7 +16,7 @@ trait CommunicatesWithProgressBarWorker { private TwoWayChannel | Closure | null $progressbar_channel = null; final public function connectProgressBar(string | Closure $uuid, string $identifier = null): bool { - if ( !PARALLEL_EXT_LOADED) { + if (! PARALLEL_EXT_LOADED) { $this->progressbar_channel = $uuid; return true; diff --git a/src/RegisteredWorker.php b/src/RegisteredWorker.php index cc43b45..42b5b16 100644 --- a/src/RegisteredWorker.php +++ b/src/RegisteredWorker.php @@ -16,19 +16,19 @@ final class RegisteredWorker { private int $steps; public function __construct( - private string $uuid, - private int $identifier, - private string $worker_class, - private ?Closure $closure = null, - private array $args = [], + private readonly string $uuid, + private readonly int $identifier, + private readonly string $worker_class, + private readonly ?Closure $closure = null, + private readonly array $args = [], ) {} public function getIdentifier(): string { - if ($this->getClosure() === null) { - return $this->getWorkerClass(); + if ($this->closure === null) { + return $this->worker_class; } - return sprintf('%s@%.0u', $this->getWorkerClass(), $this->identifier); + return sprintf('%s@%.0u', $this->worker_class, $this->identifier); } /** @@ -41,8 +41,8 @@ public function withProgress(bool $with_progress = true, int $steps = 0): void { $this->with_progress = $with_progress; // check if caller is Runner - $caller = debug_backtrace(!DEBUG_BACKTRACE_PROVIDE_OBJECT | DEBUG_BACKTRACE_IGNORE_ARGS, 2)[1] ?? null; - if (($caller['class'] ?? null) === Internals\Runner::class || !PARALLEL_EXT_LOADED) { + $caller = debug_backtrace(! DEBUG_BACKTRACE_PROVIDE_OBJECT | DEBUG_BACKTRACE_IGNORE_ARGS, 2)[1] ?? null; + if (($caller['class'] ?? null) === Internals\Runner::class || ! PARALLEL_EXT_LOADED) { $this->steps = $steps; return; diff --git a/src/Scheduler.php b/src/Scheduler.php index 241f790..1344060 100644 --- a/src/Scheduler.php +++ b/src/Scheduler.php @@ -7,12 +7,13 @@ use Generator; use HDSSolutions\Console\Parallel\Contracts\Task; use HDSSolutions\Console\Parallel\Exceptions\ParallelException; +use HDSSolutions\Console\Parallel\Exceptions\TaskExecutionFailedException; +use HDSSolutions\Console\Parallel\Exceptions\WorkerAlreadyDefinedException; use HDSSolutions\Console\Parallel\Internals\Commands; use parallel; use parallel\Channel; use parallel\Events\Event; use parallel\Runtime; -use RuntimeException; use Throwable; final class Scheduler { @@ -104,13 +105,15 @@ public static function setMaxCpuPercentageUsage(float $percentage): int { * @param mixed ...$args Arguments passed to Worker constructor * * @return RegisteredWorker + * + * @throws WorkerAlreadyDefinedException if a worker is already registered and constructor parameters are specified */ public static function using(string | Closure $worker, ...$args): RegisteredWorker { // check if worker is already registered if (is_string($worker) && false !== $registered_worker = self::instance()->getRegisteredWorker($worker)) { - if ( !empty($args)) { + if ($args !== []) { // args must not be defined if worker already exists - throw new RuntimeException(sprintf('Worker "%s" is already defined, you can\'t specify new constructor parameters!', $worker)); + throw new WorkerAlreadyDefinedException($worker, with_parameters: true); } return $registered_worker; @@ -130,7 +133,8 @@ public static function using(string | Closure $worker, ...$args): RegisteredWork * @throws Runtime\Error\IllegalFunction if task is a closure created from an internal function * @throws Runtime\Error\IllegalInstruction if task contains illegal instructions * @throws Runtime\Error\IllegalParameter if task accepts or argv contains illegal variables - * @throws Runtime\Error\IllegalReturn | Throwable if task returns illegally + * @throws Runtime\Error\IllegalReturn if task returns illegally + * @throws TaskExecutionFailedException if task execution failed * * @see Runtime::run() for more details * @link https://www.php.net/manual/en/parallel.run @@ -144,7 +148,7 @@ public static function runTask(mixed ...$data): int { // get queued task and check if there was an exception thrown if (($task_id = self::instance()->recv()) instanceof ParallelException) { // redirect exception - throw new RuntimeException($task_id->getMessage()); + throw new TaskExecutionFailedException($task_id); } return $task_id; @@ -158,7 +162,7 @@ public static function runTask(mixed ...$data): int { * * @param DateInterval|null $wait_until Should wait until specified DateInterval or until all tasks finished. */ - public static function awaitTasksCompletion(DateInterval $wait_until = null): bool { + public static function awaitTasksCompletion(?DateInterval $wait_until = null): bool { $message = new Commands\Runner\WaitTasksCompletionMessage($wait_until); if (PARALLEL_EXT_LOADED) { @@ -261,7 +265,7 @@ public static function removeAllTasks(): bool { */ public static function stop(bool $force = true): void { // check if extension isn't loaded and just return - if ( !PARALLEL_EXT_LOADED) return; + if (! PARALLEL_EXT_LOADED) return; self::removePendingTasks(); if ($force) { @@ -280,7 +284,7 @@ public function __destruct() { self::removeAllTasks(); // check if extension isn't loaded and just return - if ( !PARALLEL_EXT_LOADED) return; + if (! PARALLEL_EXT_LOADED) return; try { // stop Runner instance diff --git a/src/Task.php b/src/Task.php index 94683de..47281de 100644 --- a/src/Task.php +++ b/src/Task.php @@ -24,10 +24,10 @@ final class Task implements Contracts\Task { * @param mixed $input Input of the Task */ public function __construct( - private int $identifier, - private string $worker_class, - private int $worker_id, - private mixed $input = null, + private readonly int $identifier, + private readonly string $worker_class, + private readonly int $worker_id, + private readonly mixed $input = null, ) {} public function getIdentifier(): int { @@ -44,7 +44,7 @@ public function getWorkerId(): int { /** @inheritdoc */ public function getData(): mixed { - return $this->getInput(); + return $this->input; } public function getInput(): mixed { @@ -71,7 +71,7 @@ public function setResult(mixed $result): self { /** @inheritdoc */ public function getResult(): mixed { - return $this->getOutput(); + return $this->output; } public function getOutput(): mixed { @@ -79,19 +79,19 @@ public function getOutput(): mixed { } public function isPending(): bool { - return $this->getState() === self::STATE_Pending; + return $this->state === self::STATE_Pending; } public function isBeingProcessed(): bool { - return $this->getState() === self::STATE_Processing; + return $this->state === self::STATE_Processing; } public function wasProcessed(): bool { - return $this->getState() === self::STATE_Processed; + return $this->state === self::STATE_Processed; } public function wasCancelled(): bool { - return $this->getState() === self::STATE_Cancelled; + return $this->state === self::STATE_Cancelled; } } diff --git a/src/autoload.php b/src/autoload.php index 88a93ef..d179ffa 100644 --- a/src/autoload.php +++ b/src/autoload.php @@ -2,7 +2,7 @@ require_once __DIR__.'/helpers.php'; -spl_autoload_register(static function($class) { +spl_autoload_register(static function($class): void { // build class file path $classfile = sprintf('%s/%s.php', __DIR__, // replace namespace and invert slashes diff --git a/src/helpers.php b/src/helpers.php index e82dbd4..4845c4b 100644 --- a/src/helpers.php +++ b/src/helpers.php @@ -29,7 +29,7 @@ function cpu_count(float $percent = 1.0): int { if (false !== $ps) { $output = stream_get_contents($ps); preg_match('/hw.ncpu: (\d+)/', $output, $matches); - if ( !empty($matches)) { + if ($matches !== []) { $cpu_count = (int) $matches[1][0]; } diff --git a/tests/Workers/AnotherWorker.php b/tests/Workers/AnotherWorker.php index bccfcf9..b937877 100644 --- a/tests/Workers/AnotherWorker.php +++ b/tests/Workers/AnotherWorker.php @@ -7,7 +7,7 @@ final class AnotherWorker extends ParallelWorker { public function __construct( - private array $multipliers, + private readonly array $multipliers, ) {} protected function process(int $number = 0): array { diff --git a/tests/Workers/TestWorker.php b/tests/Workers/TestWorker.php index c94decb..efa4747 100644 --- a/tests/Workers/TestWorker.php +++ b/tests/Workers/TestWorker.php @@ -7,7 +7,7 @@ final class TestWorker extends ParallelWorker { public function __construct( - private array $multipliers, + private readonly array $multipliers, ) {} protected function process(int $number = 0): array {