Skip to content

Commit 4baa105

Browse files
committed
Use react pcntl extension to get rid of cleanup
1 parent e02736c commit 4baa105

File tree

9 files changed

+75
-25
lines changed

9 files changed

+75
-25
lines changed

EProcess/Adapter/BaseAdapter.php

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
namespace EProcess\Adapter;
44

5+
use EProcess\Terminator;
56
use React\EventLoop\LoopInterface;
67

78
abstract class BaseAdapter
@@ -15,6 +16,11 @@ public function __construct(LoopInterface $loop)
1516
$this->node = uniqid('thread_');
1617
}
1718

19+
public function getUnixSocketFile()
20+
{
21+
return sprintf('%s/%s.sock', EPROCESS_SOCKET_DIR, $this->node);
22+
}
23+
1824
protected function createUnixSocket()
1925
{
2026
if (!defined('EPROCESS_SOCKET_DIR')) {
@@ -29,16 +35,7 @@ protected function createUnixSocket()
2935
throw new \RuntimeException(sprintf('Cannot write to "%s".', EPROCESS_SOCKET_DIR));
3036
}
3137

32-
$unixFile = sprintf('%s/%s.sock', EPROCESS_SOCKET_DIR, $this->node);
33-
$unix = sprintf('unix://%s', $unixFile);
34-
35-
$cleanup = function () use ($unixFile) {
36-
$this->loop->stop();
37-
@unlink($unixFile);
38-
};
39-
40-
register_shutdown_function($cleanup);
41-
pcntl_signal(SIGINT, $cleanup);
38+
$unix = sprintf('unix://%s', $this->getUnixSocketFile());
4239

4340
return $unix;
4441
}

EProcess/Adapter/ChildProcess.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ public function create($class, array $data = [])
7272
$this->process = new Process(sprintf('exec %s %s', $php, realpath($file)));
7373
$this->process->start($this->loop);
7474

75-
$this->loop->addTimer(3, function() use ($file) {
75+
$this->loop->addTimer(1, function() use ($file) {
7676
unlink($file);
7777
});
7878

EProcess/Application/Application.php

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@
44

55
use EProcess\Behaviour\UniversalSerializer;
66
use EProcess\Behaviour\Workable;
7+
use EProcess\Message;
8+
use EProcess\Messenger;
9+
use EProcess\Worker;
710
use Evenement\EventEmitterTrait;
11+
use MKraemer\ReactPCNTL\PCNTL;
812
use React\EventLoop\LoopInterface;
9-
use EProcess\Messenger;
10-
use EProcess\Message;
1113

1214
abstract class Application
1315
{
@@ -20,6 +22,30 @@ abstract class Application
2022
private $loop;
2123
private $messenger;
2224
private $data;
25+
private $pcntl;
26+
private $workers = [];
27+
28+
public function addWorker(Worker $worker)
29+
{
30+
$this->workers[] = $worker;
31+
}
32+
33+
public function cleanWorkers()
34+
{
35+
foreach ($this->workers as $worker) {
36+
$worker->emit('shutdown');
37+
unlink($worker->adapter()->getUnixSocketFile());
38+
}
39+
}
40+
41+
public function pcntl(PCNTL $pcntl = null)
42+
{
43+
if ($pcntl) {
44+
$this->pcntl = $pcntl;
45+
}
46+
47+
return $this->pcntl;
48+
}
2349

2450
public function loop(LoopInterface $loop = null)
2551
{
@@ -33,7 +59,7 @@ public function loop(LoopInterface $loop = null)
3359
public function messenger(Messenger $messenger = null)
3460
{
3561
if ($messenger) {
36-
$messenger->on('message', function(Message $message) {
62+
$messenger->on('message', function (Message $message) {
3763
$this->emitterEmit($message->getEvent(), [$message->getContent()]);
3864
});
3965

@@ -52,7 +78,7 @@ public function data(array $data = null)
5278
return $this->data;
5379
}
5480

55-
public function emit($event, $data)
81+
public function emit($event, $data = '')
5682
{
5783
$this->messenger->emit($event, $data);
5884
}

EProcess/Application/ApplicationFactory.php

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
namespace EProcess\Application;
44

5+
use EProcess\Worker;
6+
use MKraemer\ReactPCNTL\PCNTL;
57
use React\EventLoop\Factory;
68

79
class ApplicationFactory
@@ -29,6 +31,22 @@ public static function create($fqcn)
2931
$application = new $fqcn();
3032
$application->loop($loop);
3133

34+
$shutdown = function() use ($application) {
35+
$application->loop()->stop();
36+
$application->cleanWorkers();
37+
};
38+
39+
$pcntl = new PCNTL($loop);
40+
$pcntl->on(SIGINT, $shutdown);
41+
42+
$application->on('shutdown', $shutdown);
43+
44+
$application->pcntl($pcntl);
45+
46+
$application->on('worker.created', function(Worker $worker) use ($application) {
47+
$application->addWorker($worker);
48+
});
49+
3250
return $application;
3351
}
3452
}

EProcess/Behaviour/Workable.php

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,16 @@ trait Workable
88
{
99
public function createWorker($fqcn, array $data = [])
1010
{
11-
return new Worker($this->loop(), $fqcn, extension_loaded('pthreads') ? 'pthreads' : 'child_process', $data);
11+
$worker = new Worker(
12+
$this->loop(),
13+
$fqcn,
14+
extension_loaded('pthreads') ? 'pthreads' : 'child_process',
15+
$data
16+
);
17+
18+
$this->emitterEmit('worker.created', [$worker]);
19+
20+
return $worker;
1221
}
1322

1423
abstract public function loop();

EProcess/Messenger.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public function __construct($connection)
3737
}
3838
}
3939

40-
public function emit($event, $data)
40+
public function emit($event, $data = [])
4141
{
4242
$this->connection->send((string) new Message($event, $this->serialize($data)));
4343
}

EProcess/Worker.php

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,6 @@ public function __construct(LoopInterface $loop, $class, $adapter = null, array
3838
$this->messenger()->on('initialized', function() {
3939
$this->initialized = true;
4040
});
41-
42-
register_shutdown_function(function() {
43-
$this->kill();
44-
});
4541
}
4642

4743
public function kill()
@@ -54,7 +50,12 @@ public function messenger()
5450
return $this->messenger;
5551
}
5652

57-
public function emit($event, $data)
53+
public function adapter()
54+
{
55+
return $this->adapter;
56+
}
57+
58+
public function emit($event, $data = [])
5859
{
5960
if ($this->initialized) {
6061
$this->messenger->emit($event, $data);

composer.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@
1717
"jms/serializer": "^1.1",
1818
"doctrine/collections": "~1.3",
1919
"concerto/comms": "~0.8",
20-
"symfony/process": "^3.0"
20+
"symfony/process": "^3.0",
21+
"mkraemer/react-pcntl": "^2.0"
2122
},
2223
"license": "MIT",
2324
"require-dev": {

examples/autoload.php

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
<?php
22

3-
declare(ticks = 1);
4-
53
define('EPROCESS_AUTOLOAD', __FILE__);
64
define('EPROCESS_SOCKET_DIR', __DIR__ . '/../tmp/');
75

0 commit comments

Comments
 (0)