diff --git a/README.md b/README.md index 31fb58f..4176ad0 100644 --- a/README.md +++ b/README.md @@ -53,6 +53,10 @@ CpuCoreCountFlexible::createFromClass('WyriHaximus\React\ChildProcess\Messenger\ }); ``` +## Updating options during run time + +A new feature in `1.5` is the `setOptions` method on pools. Which allows you to update a pools settings. This feature allows you to scale the amount of workers up and down when needed. + ## License ## Copyright 2016 [Cees-Jan Kiewiet](http://wyrihaximus.net/) diff --git a/appveyor.yml b/appveyor.yml index d559ba9..36e80ee 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -39,7 +39,7 @@ init: ## Install PHP and composer, and run the appropriate composer command install: - IF EXIST c:\tools\php (SET PHP=0) - - ps: appveyor-retry cinst -y php --version ((choco search php --exact --all-versions -r | select-string -pattern $Env:php_ver_target | Select-Object -first 1) -replace '[php|]','') + - ps: appveyor-retry cinst -y php --ignore-checksums --version ((choco search php --exact --all-versions -r | select-string -pattern $Env:php_ver_target | Select-Object -first 1) -replace '[php|]','') - cd c:\tools\php - IF %PHP%==1 copy php.ini-production php.ini /Y - IF %PHP%==1 echo date.timezone="UTC" >> php.ini diff --git a/examples/ping-pong-set-options/README.md b/examples/ping-pong-set-options/README.md new file mode 100644 index 0000000..3f144fc --- /dev/null +++ b/examples/ping-pong-set-options/README.md @@ -0,0 +1,5 @@ +To start use the following command: + +```sh +time php ping.php +``` diff --git a/examples/ping-pong-set-options/ping.php b/examples/ping-pong-set-options/ping.php new file mode 100644 index 0000000..acbc191 --- /dev/null +++ b/examples/ping-pong-set-options/ping.php @@ -0,0 +1,97 @@ +then(function (PoolInterface $pool) use ($loop) { +Fixed::create(new Process('php ' . dirname(dirname(__DIR__)) . '/examples/ping-pong/pong.php'), $loop)->then(function (PoolInterface $pool) use ($loop) { + $pool->on('message', function ($message) { + var_export($message); + }); + + $pool->on('error', function ($e) { + echo 'Error: ', var_export($e, true), PHP_EOL; + }); + + for ($i = 0; $i < I; $i++) { + echo $i, PHP_EOL; + $j = $i; + $pool->rpc(\WyriHaximus\React\ChildProcess\Messenger\Messages\Factory::rpc('ping', [ + 'i' => $i, + 's' => str_pad('', 512, '.'), + ]))->then(function ($data) use ($j) { + echo 'Answer for ' . $j . ' * ' . $j . ' * ' . $j . ' * ' . $j . ': ', $data['result'], PHP_EOL; + }, function ($error) { + var_export($error); + die(); + }); + } + + $timer = $loop->addPeriodicTimer(0.1, function () use ($pool) { + echo 'Pool status: ', PHP_EOL; + foreach ($pool->info() as $key => $value) { + echo "\t", $key, ': ', $value, PHP_EOL; + } + }); + + for ($i = 1; $i < 120; $i = $i + mt_rand(1, 20)) { + $loop->addTimer($i, function () use ($pool) { + //$pool->setOption(Options::MAX_SIZE, mt_rand(1, 20)); + $pool->setOption(Options::SIZE, mt_rand(1, 20)); + }); + } + + $loop->addTimer(10, function () use ($pool, $timer, $loop) { + for ($i = 0; $i < I; $i++) { + echo $i, PHP_EOL; + $j = $i; + $pool->rpc(\WyriHaximus\React\ChildProcess\Messenger\Messages\Factory::rpc('ping', [ + 'i' => $i, + 's' => str_pad('', 512, '.'), + ]))->then(function ($data) use ($j) { + echo 'Answer for ' . $j . ' * ' . $j . ' * ' . $j . ' * ' . $j . ': ', $data['result'], PHP_EOL; + }, function ($error) { + var_export($error); + die(); + }); + } + + $pool->rpc(\WyriHaximus\React\ChildProcess\Messenger\Messages\Factory::rpc('ping', [ + 'i' => ++$i, + ]))->then(function () use ($pool, $timer, $loop) { + echo 'Terminating pool', PHP_EOL; + $pool->terminate(\WyriHaximus\React\ChildProcess\Messenger\Messages\Factory::message([ + 'woeufh209h838392', + ])); + $timer->cancel(); + echo 'Done!!!', PHP_EOL; + $loop->stop(); + }); + }); +}); + +$loop->run(); diff --git a/examples/ping-pong-set-options/pong.php b/examples/ping-pong-set-options/pong.php new file mode 100644 index 0000000..9f8ea4d --- /dev/null +++ b/examples/ping-pong-set-options/pong.php @@ -0,0 +1,41 @@ +on('message', function (Payload $payload, Messenger $messenger) { + $messenger->write(json_encode([ + 'type' => 'message', + 'payload' => $payload, + ])); + + $messenger->getLoop()->addTimer(1, function () use ($messenger) { + $messenger->getLoop()->stop(); + }); +}); +$recipient->registerRpc('ping', function (Payload $payload, Messenger $messenger) use ($loop) { + $stopAt = time() + mt_rand(1, 2); + + do { + // Don nothing + } while ($stopAt >= time()); + + /*$messenger->getLoop()->addTimer(1, function () use ($messenger) { + $messenger->getLoop()->stop(); + });*/ + + return \React\Promise\resolve([ + 'result' => $payload['i'] * $payload['i'] * $payload['i'] * $payload['i'], + ]); +}); + +$loop->run(); diff --git a/src/Info.php b/src/Info.php index 92a25bc..c015a57 100644 --- a/src/Info.php +++ b/src/Info.php @@ -4,9 +4,12 @@ class Info { - const BUSY = 'busy'; - const CALLS = 'calls'; - const IDLE = 'idle'; - const SIZE = 'size'; - const TOTAL = 'total'; + const BUSY = 'busy'; + const CALLS = 'calls'; + const IDLE = 'idle'; + const SIZE = 'size'; + const STARTING = 'starting'; + const RUNNING = 'running'; + const TERMINATING = 'terminating'; + const TOTAL = 'total'; } diff --git a/src/Manager/Fixed.php b/src/Manager/Fixed.php index b614aee..b737ebc 100644 --- a/src/Manager/Fixed.php +++ b/src/Manager/Fixed.php @@ -22,32 +22,81 @@ class Fixed implements ManagerInterface */ protected $loop; + /** + * @var ProcessCollectionInterface + */ + protected $processesCollection; + /** * @var WorkerInterface[] */ protected $workers = []; + /** + * @var array + */ + protected $options; + + /** + * @var int + */ + protected $workerCount = 0; + + /** + * @var int + */ + protected $terminatingCount = 0; + + /** + * @var int + */ + protected $startingCount = 0; + public function __construct(ProcessCollectionInterface $processCollection, LoopInterface $loop, array $options = []) { + $this->options = $options; $this->loop = $loop; + $this->processesCollection = $processCollection; $processCollection->rewind(); - for ($i = 0; $i < $options[Options::SIZE]; $i++) { - $this->spawn($processCollection, $options); + $this->spawnWorkers($this->options[Options::SIZE]); + } + + protected function spawnWorkers($count) + { + for ($i = 0; $i < $count; $i++) { + $this->spawn($this->processesCollection, $this->options); } } protected function spawn($processCollection, $options) { - $workerDone = function (WorkerInterface $worker) { - $this->workerAvailable($worker); - }; + $this->startingCount++; $current = $processCollection->current(); $promise = $current($this->loop, $options); - $promise->then(function (Messenger $messenger) use ($workerDone) { + $promise->then(function (Messenger $messenger) { + $this->startingCount--; + $this->workerCount++; $worker = new Worker($messenger); - $this->workers[] = $worker; - $worker->on('done', $workerDone); + $this->workers[spl_object_hash($worker)] = $worker; + $worker->on('done', function (WorkerInterface $worker) { + if ($this->workerCount + $this->startingCount > $this->options[Options::SIZE]) { + $worker->terminate(); + return; + } + + $this->workerAvailable($worker); + }); + $worker->on('terminating', function (WorkerInterface $worker) { + unset($this->workers[spl_object_hash($worker)]); + $this->workerCount--; + $this->terminatingCount++; + }); + $worker->on('terminated', function (WorkerInterface $worker) { + $this->terminatingCount--; + }); $this->workerAvailable($worker); + }, function () { + $this->startingCount--; }); $processCollection->next(); @@ -63,6 +112,10 @@ protected function workerAvailable(WorkerInterface $worker) public function ping() { + if ($this->workerCount + $this->startingCount < $this->options[Options::SIZE]) { + $this->spawnWorkers($this->options[Options::SIZE] - ($this->workerCount + $this->startingCount)); + } + foreach ($this->workers as $worker) { if (!$worker->isBusy()) { $this->workerAvailable($worker); @@ -90,17 +143,26 @@ public function terminate() public function info() { - $count = count($this->workers); $busy = 0; foreach ($this->workers as $worker) { if ($worker->isBusy()) { $busy++; } } + return [ - Info::TOTAL => $count, - Info::BUSY => $busy, - Info::IDLE => $count - $busy, + Info::TOTAL => $this->workerCount + $this->terminatingCount, + Info::STARTING => $this->startingCount, + Info::RUNNING => $this->workerCount, + Info::TERMINATING => $this->terminatingCount, + Info::BUSY => $busy, + Info::IDLE => $this->workerCount - $busy, ]; } + + public function setOptions(array $options) + { + $this->options = $options; + $this->ping(); + } } diff --git a/src/Manager/Flexible.php b/src/Manager/Flexible.php index da7ff29..bfcaccf 100644 --- a/src/Manager/Flexible.php +++ b/src/Manager/Flexible.php @@ -6,6 +6,7 @@ use React\EventLoop\LoopInterface; use WyriHaximus\React\ChildProcess\Messenger\Messages\Message; use WyriHaximus\React\ChildProcess\Messenger\Messenger; +use WyriHaximus\React\ChildProcess\Pool\Info; use WyriHaximus\React\ChildProcess\Pool\ManagerInterface; use WyriHaximus\React\ChildProcess\Pool\Options; use WyriHaximus\React\ChildProcess\Pool\ProcessCollectionInterface; @@ -47,7 +48,17 @@ class Flexible implements ManagerInterface /** * @var int */ - protected $startingProcesses = 0; + protected $workerCount = 0; + + /** + * @var int + */ + protected $terminatingCount = 0; + + /** + * @var int + */ + protected $startingCount = 0; public function __construct(ProcessCollectionInterface $processCollection, LoopInterface $loop, array $options = []) { @@ -67,25 +78,28 @@ protected function workerAvailable(WorkerInterface $worker) protected function spawn() { - $this->startingProcesses++; + $this->startingCount++; $current = $this->processCollection->current(); $promise = $current($this->loop, $this->options); $promise->then(function (Messenger $messenger) { + $this->startingCount--; + $this->workerCount++; $worker = new Worker($messenger); - $this->workers[] = $worker; + $this->workers[spl_object_hash($worker)] = $worker; $worker->on('done', function (WorkerInterface $worker) { $this->workerAvailable($worker); }); $worker->on('terminating', function (WorkerInterface $worker) { - foreach ($this->workers as $key => $value) { - if ($worker === $value) { - unset($this->workers[$key]); - break; - } - } + unset($this->workers[spl_object_hash($worker)]); + $this->workerCount--; + $this->terminatingCount++; + }); + $worker->on('terminated', function (WorkerInterface $worker) { + $this->terminatingCount--; }); $this->workerAvailable($worker); - $this->startingProcesses--; + }, function () { + $this->startingCount--; }); $this->processCollection->next(); @@ -103,12 +117,12 @@ public function ping() } } - if (count($this->workers) + $this->startingProcesses < $this->options[Options::MIN_SIZE]) { + if ($this->workerCount + $this->startingCount < $this->options[Options::MIN_SIZE]) { $this->spawn(); return; } - if (count($this->workers) + $this->startingProcesses < $this->options[Options::MAX_SIZE]) { + if ($this->workerCount + $this->startingCount < $this->options[Options::MAX_SIZE]) { $this->spawn(); } } @@ -133,17 +147,25 @@ public function terminate() public function info() { - $count = count($this->workers); $busy = 0; foreach ($this->workers as $worker) { if ($worker->isBusy()) { $busy++; } } + return [ - 'total' => $count, - 'busy' => $busy, - 'idle' => $count - $busy, + Info::TOTAL => $this->workerCount + $this->terminatingCount, + Info::STARTING => $this->startingCount, + Info::RUNNING => $this->workerCount, + Info::TERMINATING => $this->terminatingCount, + Info::BUSY => $busy, + Info::IDLE => $this->workerCount - $busy, ]; } + + public function setOptions(array $options) + { + $this->options = $options; + } } diff --git a/src/ManagerInterface.php b/src/ManagerInterface.php index 97778f7..a2a2dc7 100644 --- a/src/ManagerInterface.php +++ b/src/ManagerInterface.php @@ -51,4 +51,12 @@ public function message(Message $message); * @return array */ public function info(); + + /** + * Overwrite the current options + * + * @param array $options + * @return void + */ + public function setOptions(array $options); } diff --git a/src/Pool/Dummy.php b/src/Pool/Dummy.php index bd948b8..727e14f 100644 --- a/src/Pool/Dummy.php +++ b/src/Pool/Dummy.php @@ -56,4 +56,8 @@ public function info() { return []; } + + public function setOption($key, $value) + { + } } diff --git a/src/Pool/Fixed.php b/src/Pool/Fixed.php index be7345b..2a973da 100644 --- a/src/Pool/Fixed.php +++ b/src/Pool/Fixed.php @@ -122,12 +122,20 @@ public function terminate(Message $message = null, $timeout = 5, $signal = null) */ public function info() { - $workers = $this->manager->info(); - return [ - Info::BUSY => $workers[Info::BUSY], - Info::CALLS => $this->queue->count(), - Info::IDLE => $workers[Info::IDLE], - Info::SIZE => $workers[Info::TOTAL], - ]; + $info = $this->manager->info(); + $info[Info::CALLS] = $this->queue->count(); + return $info; + } + + /** + * Override option + * + * @param $key + * @param $value + */ + public function setOption($key, $value) + { + $this->options[$key] = $value; + $this->manager->setOptions($this->options); } } diff --git a/src/Pool/Flexible.php b/src/Pool/Flexible.php index 75947bb..b20a976 100644 --- a/src/Pool/Flexible.php +++ b/src/Pool/Flexible.php @@ -153,12 +153,20 @@ public function terminate(Message $message = null, $timeout = 5, $signal = null) */ public function info() { - $workers = $this->manager->info(); - return [ - Info::BUSY => $workers[Info::BUSY], - Info::CALLS => $this->queue->count(), - Info::IDLE => $workers[Info::IDLE], - Info::SIZE => $workers[Info::TOTAL], - ]; + $info = $this->manager->info(); + $info[Info::CALLS] = $this->queue->count(); + return $info; + } + + /** + * Override option + * + * @param $key + * @param $value + */ + public function setOption($key, $value) + { + $this->options[$key] = $value; + $this->manager->setOptions($this->options); } } diff --git a/src/PoolInterface.php b/src/PoolInterface.php index 5cda7d8..0eca59a 100644 --- a/src/PoolInterface.php +++ b/src/PoolInterface.php @@ -43,4 +43,12 @@ public function message(Message $message); * @return mixed */ public function terminate(Message $message, $timeout = 5, $signal = null); + + /** + * Override option + * + * @param $key + * @param $value + */ + public function setOption($key, $value); } diff --git a/src/Worker.php b/src/Worker.php index e8aca94..8c06ce0 100644 --- a/src/Worker.php +++ b/src/Worker.php @@ -22,6 +22,11 @@ class Worker implements WorkerInterface */ protected $busy = false; + /** + * @var bool + */ + protected $terminating = false; + /** * @param Messenger $messenger */ @@ -59,13 +64,26 @@ public function isBusy() return $this->busy; } + /** + * @return bool + */ + public function isTerminating() + { + return $this->terminating; + } + /* * @return PromiseInterface */ public function terminate() { + $this->terminating = true; $this->busy = true; $this->emit('terminating', [$this]); - return $this->messenger->softTerminate(); + $promise = $this->messenger->softTerminate(); + $promise->always(function () { + $this->emit('terminated', [$this]); + }); + return $promise; } } diff --git a/src/WorkerInterface.php b/src/WorkerInterface.php index 9f489c3..2c31cf4 100644 --- a/src/WorkerInterface.php +++ b/src/WorkerInterface.php @@ -31,6 +31,11 @@ public function message(Message $message); */ public function isBusy(); + /** + * @return bool + */ + public function isTerminating(); + /** * @return PromiseInterface */ diff --git a/tests/FunctionsTest.php b/tests/FunctionsTest.php index 2c0566e..be6dadf 100644 --- a/tests/FunctionsTest.php +++ b/tests/FunctionsTest.php @@ -125,17 +125,19 @@ public function testGetQueue($options, $default, $loop, $output) public function providerGetManager() { + $processCollection = Phake::mock('WyriHaximus\React\ChildProcess\Pool\ProcessCollectionInterface'); + $r = []; $r[] = [ [ Options::SIZE => 0, ], - Phake::mock('WyriHaximus\React\ChildProcess\Pool\ProcessCollectionInterface'), + $processCollection, 'WyriHaximus\React\ChildProcess\Pool\Manager\Fixed', Factory::create(), new Fixed( - Phake::mock('WyriHaximus\React\ChildProcess\Pool\ProcessCollectionInterface'), + $processCollection, Factory::create(), [ Options::SIZE => 0, @@ -149,13 +151,12 @@ public function providerGetManager() Options::MANAGER => $mock, Options::SIZE => 0, ], - Phake::mock('WyriHaximus\React\ChildProcess\Pool\ProcessCollectionInterface'), + $processCollection, 'WyriHaximus\React\ChildProcess\Pool\Queue\Memory', Factory::create(), $mock, ]; - $processCollection = Phake::mock('WyriHaximus\React\ChildProcess\Pool\ProcessCollectionInterface'); $r[] = [ [ Options::MANAGER => new Flexible( @@ -168,7 +169,7 @@ public function providerGetManager() ), Options::SIZE => 0, ], - Phake::mock('WyriHaximus\React\ChildProcess\Pool\ProcessCollectionInterface'), + $processCollection, 'WyriHaximus\React\ChildProcess\Pool\Queue\Memory', Factory::create(), new Flexible( diff --git a/tests/Manager/FixedTest.php b/tests/Manager/FixedTest.php index d12ef8e..a34f5b6 100644 --- a/tests/Manager/FixedTest.php +++ b/tests/Manager/FixedTest.php @@ -72,6 +72,9 @@ public function testInfoAtStart() $this->assertSame([ Info::TOTAL => 0, + Info::STARTING => 0, + Info::RUNNING => 0, + Info::TERMINATING => 0, Info::BUSY => 0, Info::IDLE => 0, ], $this->manager->info()); @@ -122,6 +125,9 @@ public function testRpc() $this->assertSame([ Info::TOTAL => 1, + Info::STARTING => 0, + Info::RUNNING => 1, + Info::TERMINATING => 0, Info::BUSY => 0, Info::IDLE => 1, ], $this->manager->info()); @@ -130,6 +136,9 @@ public function testRpc() $this->assertSame([ Info::TOTAL => 1, + Info::STARTING => 0, + Info::RUNNING => 1, + Info::TERMINATING => 0, Info::BUSY => 1, Info::IDLE => 0, ], $this->manager->info()); @@ -138,6 +147,9 @@ public function testRpc() $this->assertSame([ Info::TOTAL => 1, + Info::STARTING => 0, + Info::RUNNING => 1, + Info::TERMINATING => 0, Info::BUSY => 0, Info::IDLE => 1, ], $this->manager->info()); @@ -145,9 +157,11 @@ public function testRpc() public function testTerminate() { + $terminateDeferred = new Deferred(); $workerDeferred = new Deferred(); $worker = null; $messenger = Phake::mock('WyriHaximus\React\ChildProcess\Messenger\Messenger'); + Phake::when($messenger)->softTerminate()->thenReturn($terminateDeferred->promise()); Phake::when($this->processCollection)->current()->thenReturnCallback(function () use ($workerDeferred) { return function () use ($workerDeferred) { @@ -165,11 +179,15 @@ public function testTerminate() $this->assertSame([ Info::TOTAL => 1, + Info::STARTING => 0, + Info::RUNNING => 1, + Info::TERMINATING => 0, Info::BUSY => 0, Info::IDLE => 1, ], $this->manager->info()); $emittedTerminate = false; + $worker->on('terminating', function ($worker) use (&$emittedTerminate) { $this->assertInstanceOf('WyriHaximus\React\ChildProcess\Pool\WorkerInterface', $worker); $emittedTerminate = true; @@ -179,7 +197,21 @@ public function testTerminate() $this->assertSame([ Info::TOTAL => 1, - Info::BUSY => 1, + Info::STARTING => 0, + Info::RUNNING => 0, + Info::TERMINATING => 1, + Info::BUSY => 0, + Info::IDLE => 0, + ], $this->manager->info()); + + $terminateDeferred->resolve(); + + $this->assertSame([ + Info::TOTAL => 0, + Info::STARTING => 0, + Info::RUNNING => 0, + Info::TERMINATING => 0, + Info::BUSY => 0, Info::IDLE => 0, ], $this->manager->info()); @@ -211,4 +243,16 @@ public function testMessage() Phake::verify($messenger)->message($message); } + + public function testSetOptions() + { + Phake::when($this->processCollection)->current()->thenReturn(function () { + return \React\Promise\resolve( + Phake::mock('WyriHaximus\React\ChildProcess\Messenger\Messenger') + ); + }); + $this->manager = new Fixed($this->processCollection, $this->loop, [ + Options::SIZE => 1, + ]); + } } diff --git a/tests/Manager/FlexibleTest.php b/tests/Manager/FlexibleTest.php index 858e524..eaba871 100644 --- a/tests/Manager/FlexibleTest.php +++ b/tests/Manager/FlexibleTest.php @@ -72,6 +72,9 @@ public function testInfoAtStart() $this->assertSame([ Info::TOTAL => 0, + Info::STARTING => 0, + Info::RUNNING => 0, + Info::TERMINATING => 0, Info::BUSY => 0, Info::IDLE => 0, ], $this->manager->info()); @@ -122,6 +125,9 @@ public function testRpc() $this->assertSame([ Info::TOTAL => 1, + Info::STARTING => 0, + Info::RUNNING => 1, + Info::TERMINATING => 0, Info::BUSY => 0, Info::IDLE => 1, ], $this->manager->info()); @@ -130,6 +136,9 @@ public function testRpc() $this->assertSame([ Info::TOTAL => 1, + Info::STARTING => 0, + Info::RUNNING => 1, + Info::TERMINATING => 0, Info::BUSY => 0, Info::IDLE => 1, ], $this->manager->info()); @@ -138,6 +147,9 @@ public function testRpc() $this->assertSame([ Info::TOTAL => 1, + Info::STARTING => 0, + Info::RUNNING => 1, + Info::TERMINATING => 0, Info::BUSY => 1, Info::IDLE => 0, ], $this->manager->info()); @@ -146,6 +158,9 @@ public function testRpc() $this->assertSame([ Info::TOTAL => 1, + Info::STARTING => 0, + Info::RUNNING => 1, + Info::TERMINATING => 0, Info::BUSY => 0, Info::IDLE => 1, ], $this->manager->info()); @@ -153,9 +168,11 @@ public function testRpc() public function testTerminate() { + $terminateDeferred = new Deferred(); $workerDeferred = new Deferred(); $worker = null; $messenger = Phake::mock('WyriHaximus\React\ChildProcess\Messenger\Messenger'); + Phake::when($messenger)->softTerminate()->thenReturn($terminateDeferred->promise()); Phake::when($this->processCollection)->current()->thenReturnCallback(function () use ($workerDeferred) { return function () use ($workerDeferred) { @@ -173,6 +190,9 @@ public function testTerminate() $this->assertSame([ Info::TOTAL => 1, + Info::STARTING => 0, + Info::RUNNING => 1, + Info::TERMINATING => 0, Info::BUSY => 0, Info::IDLE => 1, ], $this->manager->info()); @@ -181,6 +201,9 @@ public function testTerminate() $this->assertSame([ Info::TOTAL => 1, + Info::STARTING => 0, + Info::RUNNING => 1, + Info::TERMINATING => 0, Info::BUSY => 0, Info::IDLE => 1, ], $this->manager->info()); @@ -193,8 +216,22 @@ public function testTerminate() $this->manager->terminate(); + $this->assertSame([ + Info::TOTAL => 1, + Info::STARTING => 0, + Info::RUNNING => 0, + Info::TERMINATING => 1, + Info::BUSY => 0, + Info::IDLE => 0, + ], $this->manager->info()); + + $terminateDeferred->resolve(); + $this->assertSame([ Info::TOTAL => 0, + Info::STARTING => 0, + Info::RUNNING => 0, + Info::TERMINATING => 0, Info::BUSY => 0, Info::IDLE => 0, ], $this->manager->info()); @@ -207,6 +244,7 @@ public function testPingWorkerAvailable() { $rpc = Factory::rpc('foo', ['bar']); $messenger = Phake::mock('WyriHaximus\React\ChildProcess\Messenger\Messenger'); + Phake::when($messenger)->softTerminate()->thenReturn(new FulfilledPromise()); Phake::when($messenger)->rpc($rpc)->thenReturn((new Deferred())->promise()); $loop = Phake::mock('React\EventLoop\LoopInterface'); $processCollection = Phake::mock('WyriHaximus\React\ChildProcess\Pool\ProcessCollectionInterface'); diff --git a/tests/Pool/FixedTest.php b/tests/Pool/FixedTest.php index 0db9afc..36e5624 100644 --- a/tests/Pool/FixedTest.php +++ b/tests/Pool/FixedTest.php @@ -3,10 +3,13 @@ namespace WyriHaximus\React\Tests\ChildProcess\Pool\Pool; use Phake; +use React\EventLoop\Factory as LoopFactory; use WyriHaximus\React\ChildProcess\Messenger\Messages\Factory; use WyriHaximus\React\ChildProcess\Pool\Info; use WyriHaximus\React\ChildProcess\Pool\Options; use WyriHaximus\React\ChildProcess\Pool\Factory\Fixed; +use WyriHaximus\React\ChildProcess\Pool\Pool\Fixed as FixedPool; +use WyriHaximus\React\ChildProcess\Pool\ProcessCollection\Single; use WyriHaximus\React\Tests\ChildProcess\Pool\TestCase; class FixedTest extends TestCase @@ -67,10 +70,10 @@ public function testInfo() $this->assertTrue($promiseHasResolved); $this->assertSame([ + Info::TOTAL => 1, + Info::IDLE => 2, Info::BUSY => 3, Info::CALLS => 4, - Info::IDLE => 2, - Info::SIZE => 1, ], $poolInstance->info()); } @@ -140,4 +143,32 @@ public function testManagerReady() Phake::verify($worker)->rpc($message); } + + public function testSetOption() + { + $manager = Phake::mock('WyriHaximus\React\ChildProcess\Pool\Manager\Fixed'); + $options = [ + Options::SIZE => 1337, + Options::MANAGER => $manager, + ]; + $loop = LoopFactory::create(); + $processCollection = new Single(function () { + return Phake::mock('React\ChildProcess\Process'); + }); + $pool = new FixedPool($processCollection, $loop, $options); + $pool->setOption('key', 'value'); + $pool->setOption(Options::SIZE, 128); + Phake::inOrder( + Phake::verify($manager)->setOptions([ + 'key' => 'value', + Options::SIZE => 1337, + Options::MANAGER => $manager, + ]), + Phake::verify($manager)->setOptions([ + 'key' => 'value', + Options::SIZE => 128, + Options::MANAGER => $manager, + ]) + ); + } } diff --git a/tests/Pool/FlexibleTest.php b/tests/Pool/FlexibleTest.php index 79f34ad..1bb2ad7 100644 --- a/tests/Pool/FlexibleTest.php +++ b/tests/Pool/FlexibleTest.php @@ -3,10 +3,13 @@ namespace WyriHaximus\React\Tests\ChildProcess\Pool\Pool; use Phake; +use React\EventLoop\Factory as LoopFactory; use WyriHaximus\React\ChildProcess\Messenger\Messages\Factory; use WyriHaximus\React\ChildProcess\Pool\Info; use WyriHaximus\React\ChildProcess\Pool\Options; use WyriHaximus\React\ChildProcess\Pool\Factory\Flexible; +use WyriHaximus\React\ChildProcess\Pool\Pool\Flexible as FlexiblePool; +use WyriHaximus\React\ChildProcess\Pool\ProcessCollection\Single; use WyriHaximus\React\Tests\ChildProcess\Pool\TestCase; class FlexibleTest extends TestCase @@ -67,10 +70,10 @@ public function testInfo() $this->assertTrue($promiseHasResolved); $this->assertSame([ + Info::TOTAL => 1, + Info::IDLE => 2, Info::BUSY => 3, Info::CALLS => 4, - Info::IDLE => 2, - Info::SIZE => 1, ], $poolInstance->info()); } @@ -240,4 +243,32 @@ public function testManagerReadyQueueEmptyIsBusy() Phake::verify($timer)->cancel(); } + + public function testSetOption() + { + $manager = Phake::mock('WyriHaximus\React\ChildProcess\Pool\Manager\Fixed'); + $options = [ + Options::SIZE => 1337, + Options::MANAGER => $manager, + ]; + $loop = LoopFactory::create(); + $processCollection = new Single(function () { + return Phake::mock('React\ChildProcess\Process'); + }); + $pool = new FlexiblePool($processCollection, $loop, $options); + $pool->setOption('key', 'value'); + $pool->setOption(Options::SIZE, 128); + Phake::inOrder( + Phake::verify($manager)->setOptions([ + 'key' => 'value', + Options::SIZE => 1337, + Options::MANAGER => $manager, + ]), + Phake::verify($manager)->setOptions([ + 'key' => 'value', + Options::SIZE => 128, + Options::MANAGER => $manager, + ]) + ); + } } diff --git a/tests/WorkerTest.php b/tests/WorkerTest.php index 7edb9eb..05cd600 100644 --- a/tests/WorkerTest.php +++ b/tests/WorkerTest.php @@ -13,6 +13,7 @@ class WorkerTest extends TestCase public function testWorker() { $messenger = Phake::mock('WyriHaximus\React\ChildProcess\Messenger\Messenger'); + Phake::when($messenger)->softTerminate()->thenReturn(new FulfilledPromise()); $worker = new Worker($messenger); $this->assertFalse($worker->isBusy()); @@ -53,4 +54,17 @@ public function testMessage() $worker->message($message); Phake::verify($messenger)->message($message); } + + public function testTerminate() + { + $messenger = Phake::mock('WyriHaximus\React\ChildProcess\Messenger\Messenger'); + Phake::when($messenger)->softTerminate()->thenReturn(new FulfilledPromise()); + $worker = new Worker($messenger); + + $this->assertFalse($worker->isBusy()); + $this->assertFalse($worker->isTerminating()); + $worker->terminate(); + $this->assertTrue($worker->isBusy()); + $this->assertTrue($worker->isTerminating()); + } }