Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ CpuCoreCountFlexible::createFromClass('WyriHaximus\React\ChildProcess\Messenger\
});
```

## Updating options during run time

A new feature in `1.5` is the `setOptions` method on pools. Which allows you to update a pools settings. This feature allows you to scale the amount of workers up and down when needed.

## License ##

Copyright 2016 [Cees-Jan Kiewiet](http://wyrihaximus.net/)
Expand Down
2 changes: 1 addition & 1 deletion appveyor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ init:
## Install PHP and composer, and run the appropriate composer command
install:
- IF EXIST c:\tools\php (SET PHP=0)
- ps: appveyor-retry cinst -y php --version ((choco search php --exact --all-versions -r | select-string -pattern $Env:php_ver_target | Select-Object -first 1) -replace '[php|]','')
- ps: appveyor-retry cinst -y php --ignore-checksums --version ((choco search php --exact --all-versions -r | select-string -pattern $Env:php_ver_target | Select-Object -first 1) -replace '[php|]','')
- cd c:\tools\php
- IF %PHP%==1 copy php.ini-production php.ini /Y
- IF %PHP%==1 echo date.timezone="UTC" >> php.ini
Expand Down
5 changes: 5 additions & 0 deletions examples/ping-pong-set-options/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
To start use the following command:

```sh
time php ping.php
```
97 changes: 97 additions & 0 deletions examples/ping-pong-set-options/ping.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
<?php

require dirname(dirname(__DIR__)) . '/vendor/autoload.php';

use React\ChildProcess\Process;
use React\EventLoop\Factory;
use WyriHaximus\React\ChildProcess\Pool\Factory\Fixed;
use WyriHaximus\React\ChildProcess\Pool\Factory\Flexible;
use WyriHaximus\React\ChildProcess\Pool\Options;
use WyriHaximus\React\ChildProcess\Pool\PoolInterface;

const POOL_PROCESS_COUNT = 10;
const I = 512;

echo 'Warning this example can be rather harsh on your hardware, stop now or continue with cation!!!!', PHP_EOL;
echo 'Starting a pool with ' . POOL_PROCESS_COUNT . ' child processes looping from 0 till ' . I . ' and calculating $i * $i * $i * $i in the child process.';
echo PHP_EOL;
echo 'Starting in:', PHP_EOL, '5', PHP_EOL;
sleep(1);
echo '4', PHP_EOL;
sleep(1);
echo '3', PHP_EOL;
sleep(1);
echo '2', PHP_EOL;
sleep(1);
echo '1', PHP_EOL;
sleep(1);

$loop = Factory::create();
//Flexible::create(new Process('php ' . dirname(dirname(__DIR__)) . '/examples/ping-pong/pong.php'), $loop)->then(function (PoolInterface $pool) use ($loop) {
Fixed::create(new Process('php ' . dirname(dirname(__DIR__)) . '/examples/ping-pong/pong.php'), $loop)->then(function (PoolInterface $pool) use ($loop) {
$pool->on('message', function ($message) {
var_export($message);
});

$pool->on('error', function ($e) {
echo 'Error: ', var_export($e, true), PHP_EOL;
});

for ($i = 0; $i < I; $i++) {
echo $i, PHP_EOL;
$j = $i;
$pool->rpc(\WyriHaximus\React\ChildProcess\Messenger\Messages\Factory::rpc('ping', [
'i' => $i,
's' => str_pad('', 512, '.'),
]))->then(function ($data) use ($j) {
echo 'Answer for ' . $j . ' * ' . $j . ' * ' . $j . ' * ' . $j . ': ', $data['result'], PHP_EOL;
}, function ($error) {
var_export($error);
die();
});
}

$timer = $loop->addPeriodicTimer(0.1, function () use ($pool) {
echo 'Pool status: ', PHP_EOL;
foreach ($pool->info() as $key => $value) {
echo "\t", $key, ': ', $value, PHP_EOL;
}
});

for ($i = 1; $i < 120; $i = $i + mt_rand(1, 20)) {
$loop->addTimer($i, function () use ($pool) {
//$pool->setOption(Options::MAX_SIZE, mt_rand(1, 20));
$pool->setOption(Options::SIZE, mt_rand(1, 20));
});
}

$loop->addTimer(10, function () use ($pool, $timer, $loop) {
for ($i = 0; $i < I; $i++) {
echo $i, PHP_EOL;
$j = $i;
$pool->rpc(\WyriHaximus\React\ChildProcess\Messenger\Messages\Factory::rpc('ping', [
'i' => $i,
's' => str_pad('', 512, '.'),
]))->then(function ($data) use ($j) {
echo 'Answer for ' . $j . ' * ' . $j . ' * ' . $j . ' * ' . $j . ': ', $data['result'], PHP_EOL;
}, function ($error) {
var_export($error);
die();
});
}

$pool->rpc(\WyriHaximus\React\ChildProcess\Messenger\Messages\Factory::rpc('ping', [
'i' => ++$i,
]))->then(function () use ($pool, $timer, $loop) {
echo 'Terminating pool', PHP_EOL;
$pool->terminate(\WyriHaximus\React\ChildProcess\Messenger\Messages\Factory::message([
'woeufh209h838392',
]));
$timer->cancel();
echo 'Done!!!', PHP_EOL;
$loop->stop();
});
});
});

$loop->run();
41 changes: 41 additions & 0 deletions examples/ping-pong-set-options/pong.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
<?php

require dirname(dirname(__DIR__)) . '/vendor/autoload.php';

use React\EventLoop\Factory;
use React\Promise\Deferred;
use WyriHaximus\React\ChildProcess\Messenger\Messages\Invoke;
use WyriHaximus\React\ChildProcess\Messenger\Messages\Payload;
use WyriHaximus\React\ChildProcess\Messenger\Messenger;
use WyriHaximus\React\ChildProcess\Messenger\Recipient;

$loop = Factory::create();

$recipient = \WyriHaximus\React\ChildProcess\Messenger\Factory::child($loop);
$recipient->on('message', function (Payload $payload, Messenger $messenger) {
$messenger->write(json_encode([
'type' => 'message',
'payload' => $payload,
]));

$messenger->getLoop()->addTimer(1, function () use ($messenger) {
$messenger->getLoop()->stop();
});
});
$recipient->registerRpc('ping', function (Payload $payload, Messenger $messenger) use ($loop) {
$stopAt = time() + mt_rand(1, 2);

do {
// Don nothing
} while ($stopAt >= time());

/*$messenger->getLoop()->addTimer(1, function () use ($messenger) {
$messenger->getLoop()->stop();
});*/

return \React\Promise\resolve([
'result' => $payload['i'] * $payload['i'] * $payload['i'] * $payload['i'],
]);
});

$loop->run();
13 changes: 8 additions & 5 deletions src/Info.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@

class Info
{
const BUSY = 'busy';
const CALLS = 'calls';
const IDLE = 'idle';
const SIZE = 'size';
const TOTAL = 'total';
const BUSY = 'busy';
const CALLS = 'calls';
const IDLE = 'idle';
const SIZE = 'size';
const STARTING = 'starting';
const RUNNING = 'running';
const TERMINATING = 'terminating';
const TOTAL = 'total';
}
86 changes: 74 additions & 12 deletions src/Manager/Fixed.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,32 +22,81 @@ class Fixed implements ManagerInterface
*/
protected $loop;

/**
* @var ProcessCollectionInterface
*/
protected $processesCollection;

/**
* @var WorkerInterface[]
*/
protected $workers = [];

/**
* @var array
*/
protected $options;

/**
* @var int
*/
protected $workerCount = 0;

/**
* @var int
*/
protected $terminatingCount = 0;

/**
* @var int
*/
protected $startingCount = 0;

public function __construct(ProcessCollectionInterface $processCollection, LoopInterface $loop, array $options = [])
{
$this->options = $options;
$this->loop = $loop;
$this->processesCollection = $processCollection;
$processCollection->rewind();
for ($i = 0; $i < $options[Options::SIZE]; $i++) {
$this->spawn($processCollection, $options);
$this->spawnWorkers($this->options[Options::SIZE]);
}

protected function spawnWorkers($count)
{
for ($i = 0; $i < $count; $i++) {
$this->spawn($this->processesCollection, $this->options);
}
}

protected function spawn($processCollection, $options)
{
$workerDone = function (WorkerInterface $worker) {
$this->workerAvailable($worker);
};
$this->startingCount++;
$current = $processCollection->current();
$promise = $current($this->loop, $options);
$promise->then(function (Messenger $messenger) use ($workerDone) {
$promise->then(function (Messenger $messenger) {
$this->startingCount--;
$this->workerCount++;
$worker = new Worker($messenger);
$this->workers[] = $worker;
$worker->on('done', $workerDone);
$this->workers[spl_object_hash($worker)] = $worker;
$worker->on('done', function (WorkerInterface $worker) {
if ($this->workerCount + $this->startingCount > $this->options[Options::SIZE]) {
$worker->terminate();
return;
}

$this->workerAvailable($worker);
});
$worker->on('terminating', function (WorkerInterface $worker) {
unset($this->workers[spl_object_hash($worker)]);
$this->workerCount--;
$this->terminatingCount++;
});
$worker->on('terminated', function (WorkerInterface $worker) {
$this->terminatingCount--;
});
$this->workerAvailable($worker);
}, function () {
$this->startingCount--;
});

$processCollection->next();
Expand All @@ -63,6 +112,10 @@ protected function workerAvailable(WorkerInterface $worker)

public function ping()
{
if ($this->workerCount + $this->startingCount < $this->options[Options::SIZE]) {
$this->spawnWorkers($this->options[Options::SIZE] - ($this->workerCount + $this->startingCount));
}

foreach ($this->workers as $worker) {
if (!$worker->isBusy()) {
$this->workerAvailable($worker);
Expand Down Expand Up @@ -90,17 +143,26 @@ public function terminate()

public function info()
{
$count = count($this->workers);
$busy = 0;
foreach ($this->workers as $worker) {
if ($worker->isBusy()) {
$busy++;
}
}

return [
Info::TOTAL => $count,
Info::BUSY => $busy,
Info::IDLE => $count - $busy,
Info::TOTAL => $this->workerCount + $this->terminatingCount,
Info::STARTING => $this->startingCount,
Info::RUNNING => $this->workerCount,
Info::TERMINATING => $this->terminatingCount,
Info::BUSY => $busy,
Info::IDLE => $this->workerCount - $busy,
];
}

public function setOptions(array $options)
{
$this->options = $options;
$this->ping();
}
}
Loading