Skip to content

Commit c7e8b2d

Browse files
authored
Merge pull request #1 from DACSoftware/master
Merge pool processing feature
2 parents d2d77dd + e2a71a1 commit c7e8b2d

File tree

6 files changed

+171
-37
lines changed

6 files changed

+171
-37
lines changed

src/Spork/AbstractJob.php

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
<?php
2+
3+
namespace Spork;
4+
5+
use Spork\Exception\UnexpectedTypeException;
6+
7+
abstract class AbstractJob
8+
{
9+
protected $manager;
10+
protected $data;
11+
protected $name;
12+
protected $callback;
13+
14+
public function __construct(ProcessManager $manager, $data = null)
15+
{
16+
$this->manager = $manager;
17+
$this->data = $data;
18+
$this->name = '<anonymous>';
19+
}
20+
21+
public function setName($name)
22+
{
23+
$this->name = $name;
24+
25+
return $this;
26+
}
27+
28+
public function setData($data)
29+
{
30+
$this->data = $data;
31+
32+
return $this;
33+
}
34+
35+
public function setCallback($callback)
36+
{
37+
if (!is_callable($callback)) {
38+
throw new UnexpectedTypeException($callback, 'callable');
39+
}
40+
41+
$this->callback = $callback;
42+
43+
return $this;
44+
}
45+
46+
public function execute($callback = null)
47+
{
48+
if (null !== $callback) {
49+
$this->setCallback($callback);
50+
}
51+
52+
return $this->manager->fork($this)->setName($this->name);
53+
}
54+
55+
/**
56+
* Runs in a child process.
57+
*
58+
* @see execute()
59+
*/
60+
abstract public function __invoke();
61+
62+
}

src/Spork/Batch/BatchJob.php

Lines changed: 6 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -14,29 +14,17 @@
1414
use Spork\Batch\Strategy\ChunkStrategy;
1515
use Spork\Batch\Strategy\StrategyInterface;
1616
use Spork\Exception\UnexpectedTypeException;
17+
use Spork\AbstractJob;
1718
use Spork\ProcessManager;
1819

19-
class BatchJob
20+
class BatchJob extends AbstractJob
2021
{
21-
private $manager;
22-
private $data;
23-
private $strategy;
24-
private $name;
25-
private $callback;
22+
protected $strategy;
2623

2724
public function __construct(ProcessManager $manager, $data = null, StrategyInterface $strategy = null)
2825
{
29-
$this->manager = $manager;
30-
$this->data = $data;
26+
parent::__construct($manager, $data);
3127
$this->strategy = $strategy ?: new ChunkStrategy();
32-
$this->name = '<anonymous>';
33-
}
34-
35-
public function setName($name)
36-
{
37-
$this->name = $name;
38-
39-
return $this;
4028
}
4129

4230
public function setStrategy(StrategyInterface $strategy)
@@ -46,31 +34,13 @@ public function setStrategy(StrategyInterface $strategy)
4634
return $this;
4735
}
4836

49-
public function setData($data)
50-
{
51-
$this->data = $data;
52-
53-
return $this;
54-
}
55-
56-
public function setCallback($callback)
57-
{
58-
if (!is_callable($callback)) {
59-
throw new UnexpectedTypeException($callback, 'callable');
60-
}
61-
62-
$this->callback = $callback;
63-
64-
return $this;
65-
}
66-
6737
public function execute($callback = null)
6838
{
6939
if (null !== $callback) {
7040
$this->setCallback($callback);
7141
}
7242

73-
return $this->manager->fork($this)->setName($this->name.' batch');
43+
return $this->manager->fork($this)->setName($this->name . ' batch');
7444
}
7545

7646
/**
@@ -84,8 +54,7 @@ public function __invoke()
8454
foreach ($this->strategy->createBatches($this->data) as $index => $batch) {
8555
$forks[] = $this->manager
8656
->fork($this->strategy->createRunner($batch, $this->callback))
87-
->setName(sprintf('%s batch #%d', $this->name, $index))
88-
;
57+
->setName(sprintf('%s batch #%d', $this->name, $index));
8958
}
9059

9160
// block until all forks have exited

src/Spork/Factory.php

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
use Spork\Batch\BatchJob;
1515
use Spork\Batch\Strategy\StrategyInterface;
16+
use Spork\Pool\PoolJob;
1617

1718
class Factory
1819
{
@@ -30,6 +31,18 @@ public function createBatchJob(ProcessManager $manager, $data = null, StrategyIn
3031
return new BatchJob($manager, $data, $strategy);
3132
}
3233

34+
/**
35+
* @param ProcessManager $manager
36+
* @param null $data
37+
* @param int $poolSize
38+
* @return AbstractJob
39+
*
40+
*/
41+
public function createPoolJob(ProcessManager $manager, $data = null, $poolSize = 3)
42+
{
43+
return new PoolJob($manager, $data, $poolSize);
44+
}
45+
3346
/**
3447
* Creates a new shared memory instance.
3548
*

src/Spork/Pool/PoolJob.php

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
<?php
2+
3+
namespace Spork\Pool;
4+
5+
use Spork\AbstractJob;
6+
use Spork\Batch\BatchRunner;
7+
use Spork\ProcessManager;
8+
9+
class PoolJob extends AbstractJob
10+
{
11+
protected $poolSize;
12+
13+
public function __construct(ProcessManager $manager, $data, $pollSize = 3)
14+
{
15+
parent::__construct($manager, $data);
16+
17+
$this->poolSize = $pollSize;
18+
}
19+
20+
/**
21+
* Runs in a child process.
22+
*
23+
* @see execute()
24+
*/
25+
public function __invoke()
26+
{
27+
$forks = array();
28+
$results = array();
29+
$batches = $this->data;
30+
$index = 0;
31+
while (count($batches) > 0) {
32+
while (count($forks) < $this->poolSize) {
33+
$batch = array_splice($batches, 0, 1);
34+
$fork = $this->manager->fork(new BatchRunner($batch, $this->callback))
35+
->setName(sprintf('%s part #%d', $this->name, $index));
36+
$forks[$fork->getPid()] = $fork;
37+
$index++;
38+
}
39+
do {
40+
$endedFork = $this->manager->waitForNext();
41+
} while (!isset($endedFork));
42+
43+
$results = array_merge($results, $endedFork->getResult());
44+
unset($forks[$endedFork->getPid()]);
45+
46+
$endedFork = null;
47+
}
48+
// block until all forks have exited
49+
$this->manager->wait();
50+
51+
52+
foreach ($forks as $fork) {
53+
$results = array_merge($results, $fork->getResult());
54+
}
55+
56+
return $results;
57+
}
58+
}

src/Spork/ProcessManager.php

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,11 +76,30 @@ public function createBatchJob($data = null, StrategyInterface $strategy = null)
7676
return $this->factory->createBatchJob($this, $data, $strategy);
7777
}
7878

79+
/**
80+
* @param $data
81+
* @param $callable
82+
* @param StrategyInterface|null $strategy
83+
*
84+
* @return Fork
85+
*/
7986
public function process($data, $callable, StrategyInterface $strategy = null)
8087
{
8188
return $this->createBatchJob($data, $strategy)->execute($callable);
8289
}
8390

91+
/**
92+
* @param $data
93+
* @param $callable
94+
* @param int $poolSize
95+
* @return Fork
96+
*
97+
*/
98+
public function parallel($data, $callable, $poolSize = 3)
99+
{
100+
return $this->factory->createPoolJob($this, $data, $poolSize)->execute($callable);
101+
}
102+
84103
/**
85104
* Forks something into another process and returns a deferred object.
86105
*/

tests/Spork/Test/ProcessManagerTest.php

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,19 @@ public function testBatchProcessing()
9797
$this->assertEquals($expected, $fork->getResult());
9898
}
9999

100+
public function testPoolProcessing()
101+
{
102+
$expected = range(50, 59);
103+
$fork = $this->manager->parallel($expected, function($item) {
104+
return $item;
105+
});
106+
107+
$this->manager->wait();
108+
109+
$actual = $fork->getResult();
110+
$this->assertEquals(sort($expected), sort($actual));
111+
}
112+
100113
/**
101114
* Test batch processing with return values containing a newline character
102115
*/

0 commit comments

Comments
 (0)