Skip to content

Commit e44e039

Browse files
authored
Merge pull request #8 from clue-labs/all
Add all() helper to await successful fulfillment of all operations
2 parents e4b0331 + 3cd5241 commit e44e039

File tree

4 files changed

+360
-15
lines changed

4 files changed

+360
-15
lines changed

README.md

Lines changed: 80 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ much any API that already uses Promises.
3838
* [Promises](#promises)
3939
* [Cancellation](#cancellation)
4040
* [Timeout](#timeout)
41+
* [all()](#all)
4142
* [Blocking](#blocking)
4243
* [Install](#install)
4344
* [Tests](#tests)
@@ -256,6 +257,81 @@ $promise = Timer\timeout($q($url), 2.0, $loop);
256257
Please refer to [react/promise-timer](https://github.com/reactphp/promise-timer)
257258
for more details.
258259

260+
#### all()
261+
262+
The static `all(int $concurrency, array $jobs, callable $handler): PromiseInterface<mixed[]>` method can be used to
263+
concurrently process all given jobs through the given `$handler`.
264+
265+
This is a convenience method which uses the `Queue` internally to
266+
schedule all jobs while limiting concurrency to ensure no more than
267+
`$concurrency` jobs ever run at once. It will return a promise which
268+
resolves with the results of all jobs on success.
269+
270+
```php
271+
$loop = React\EventLoop\Factory::create();
272+
$browser = new Clue\React\Buzz\Browser($loop);
273+
274+
$promise = Queue:all(3, $urls, function ($url) use ($browser) {
275+
return $browser->get($url);
276+
});
277+
278+
$promise->then(function (array $responses) {
279+
echo 'All ' . count($responses) . ' successful!' . PHP_EOL;
280+
});
281+
```
282+
283+
If either of the jobs fail, it will reject the resulting promise and will
284+
try to cancel all outstanding jobs. Similarly, calling `cancel()` on the
285+
resulting promise will try to cancel all outstanding jobs. See
286+
[promises](#promises) and [cancellation](#cancellation) for details.
287+
288+
The `$concurrency` parameter sets a new soft limit for the maximum number
289+
of jobs to handle concurrently. Finding a good concurrency limit depends
290+
on your particular use case. It's common to limit concurrency to a rather
291+
small value, as doing more than a dozen of things at once may easily
292+
overwhelm the receiving side. Using a `1` value will ensure that all jobs
293+
are processed one after another, effectively creating a "waterfall" of
294+
jobs. Using a value less than 1 will reject with an
295+
`InvalidArgumentException` without processing any jobs.
296+
297+
```php
298+
// handle up to 10 jobs concurrently
299+
$promise = Queue:all(10, $jobs, $handler);
300+
```
301+
302+
```php
303+
// handle each job after another without concurrency (waterfall)
304+
$promise = Queue:all(1, $jobs, $handler);
305+
```
306+
307+
The `$jobs` parameter must be an array with all jobs to process. Each
308+
value in this array will be passed to the `$handler` to start one job.
309+
The array keys will be preserved in the resulting array, while the array
310+
values will be replaced with the job results as returned by the
311+
`$handler`. If this array is empty, this method will resolve with an
312+
empty array without processing any jobs.
313+
314+
The `$handler` parameter must be a valid callable that accepts your job
315+
parameters, invokes the appropriate operation and returns a Promise as a
316+
placeholder for its future result. If the given argument is not a valid
317+
callable, this method will reject with an `InvalidArgumentExceptionn`
318+
without processing any jobs.
319+
320+
```php
321+
// using a Closure as handler is usually recommended
322+
$promise = Queue::all(10, $jobs, function ($url) use ($browser) {
323+
return $browser->get($url);
324+
});
325+
```
326+
327+
```php
328+
// accepts any callable, so PHP's array notation is also supported
329+
$promise = Queue:all(10, $jobs, array($browser, 'get'));
330+
```
331+
332+
> Keep in mind that returning an array of response messages means that
333+
the whole response body has to be kept in memory.
334+
259335
#### Blocking
260336

261337
As stated above, this library provides you a powerful, async API by default.
@@ -272,18 +348,12 @@ use Clue\React\Block;
272348
$loop = React\EventLoop\Factory::create();
273349
$browser = new Clue\React\Buzz\Browser($loop);
274350

275-
$q = new Queue(10, null, function ($url) use ($browser) {
351+
$promise = Queue:all(3, $urls, function ($url) use ($browser) {
276352
return $browser->get($url);
277353
});
278354

279-
$promises = array(
280-
$q('http://example.com/'),
281-
$q('http://www.example.org/'),
282-
$q('http://example.net/'),
283-
);
284-
285355
try {
286-
$responses = Block\awaitAll($promises, $loop);
356+
$responses = Block\await($promise, $loop);
287357
// responses successfully received
288358
} catch (Exception $e) {
289359
// an error occured while performing the requests
@@ -306,16 +376,11 @@ function download(array $uris)
306376
$loop = React\EventLoop\Factory::create();
307377
$browser = new Clue\React\Buzz\Browser($loop);
308378

309-
$q = new Queue(10, null, function ($uri) use ($browser) {
379+
$promise = Queue::all(3, $uris, function ($uri) use ($browser) {
310380
return $browser->get($uri);
311381
});
312382

313-
$promises = array();
314-
foreach ($uris as $uri) {
315-
$promises[$uri] = $q($uri);
316-
}
317-
318-
return Clue\React\Block\awaitAll($promises, $loop);
383+
return Clue\React\Block\await($promise, $loop);
319384
}
320385
```
321386

examples/02-http-all.php

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
<?php
2+
3+
use Clue\React\Buzz\Browser;
4+
use Clue\React\Mq\Queue;
5+
use Psr\Http\Message\ResponseInterface;
6+
use React\EventLoop\Factory;
7+
8+
require __DIR__ . '/../vendor/autoload.php';
9+
10+
// list of all URLs you want to download
11+
// this list may potentially contain hundreds or thousands of entries
12+
$urls = array(
13+
'http://www.github.com/',
14+
'http://www.yahoo.com/',
15+
'http://www.bing.com/',
16+
'http://www.google.com/',
17+
//'http://httpbin.org/delay/2',
18+
);
19+
20+
$loop = Factory::create();
21+
$browser = new Browser($loop);
22+
23+
// each job should use the browser to GET a certain URL
24+
// limit number of concurrent jobs here to avoid using excessive network resources
25+
$promise = Queue::all(3, array_combine($urls, $urls), function ($url) use ($browser) {
26+
return $browser->get($url);
27+
});
28+
29+
$promise->then(
30+
function ($responses) {
31+
/* @var $responses ResponseInterface[] */
32+
echo 'All URLs succeeded!' . PHP_EOL;
33+
foreach ($responses as $url => $response) {
34+
echo $url . ' has ' . $response->getBody()->getSize() . ' bytes' . PHP_EOL;
35+
}
36+
},
37+
function ($e) {
38+
echo 'An error occured: ' . $e->getMessage() . PHP_EOL;
39+
}
40+
);
41+
42+
$loop->run();

src/Queue.php

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,120 @@ class Queue implements \Countable
2929
private $pending = 0;
3030
private $queue = array();
3131

32+
/**
33+
* Concurrently process all given jobs through the given `$handler`.
34+
*
35+
* This is a convenience method which uses the `Queue` internally to
36+
* schedule all jobs while limiting concurrency to ensure no more than
37+
* `$concurrency` jobs ever run at once. It will return a promise which
38+
* resolves with the results of all jobs on success.
39+
*
40+
* ```php
41+
* $loop = React\EventLoop\Factory::create();
42+
* $browser = new Clue\React\Buzz\Browser($loop);
43+
*
44+
* $promise = Queue:all(3, $urls, function ($url) use ($browser) {
45+
* return $browser->get($url);
46+
* });
47+
*
48+
* $promise->then(function (array $responses) {
49+
* echo 'All ' . count($responses) . ' successful!' . PHP_EOL;
50+
* });
51+
* ```
52+
*
53+
* If either of the jobs fail, it will reject the resulting promise and will
54+
* try to cancel all outstanding jobs. Similarly, calling `cancel()` on the
55+
* resulting promise will try to cancel all outstanding jobs. See
56+
* [promises](#promises) and [cancellation](#cancellation) for details.
57+
*
58+
* The `$concurrency` parameter sets a new soft limit for the maximum number
59+
* of jobs to handle concurrently. Finding a good concurrency limit depends
60+
* on your particular use case. It's common to limit concurrency to a rather
61+
* small value, as doing more than a dozen of things at once may easily
62+
* overwhelm the receiving side. Using a `1` value will ensure that all jobs
63+
* are processed one after another, effectively creating a "waterfall" of
64+
* jobs. Using a value less than 1 will reject with an
65+
* `InvalidArgumentException` without processing any jobs.
66+
*
67+
* ```php
68+
* // handle up to 10 jobs concurrently
69+
* $promise = Queue:all(10, $jobs, $handler);
70+
* ```
71+
*
72+
* ```php
73+
* // handle each job after another without concurrency (waterfall)
74+
* $promise = Queue:all(1, $jobs, $handler);
75+
* ```
76+
*
77+
* The `$jobs` parameter must be an array with all jobs to process. Each
78+
* value in this array will be passed to the `$handler` to start one job.
79+
* The array keys will be preserved in the resulting array, while the array
80+
* values will be replaced with the job results as returned by the
81+
* `$handler`. If this array is empty, this method will resolve with an
82+
* empty array without processing any jobs.
83+
*
84+
* The `$handler` parameter must be a valid callable that accepts your job
85+
* parameters, invokes the appropriate operation and returns a Promise as a
86+
* placeholder for its future result. If the given argument is not a valid
87+
* callable, this method will reject with an `InvalidArgumentExceptionn`
88+
* without processing any jobs.
89+
*
90+
* ```php
91+
* // using a Closure as handler is usually recommended
92+
* $promise = Queue::all(10, $jobs, function ($url) use ($browser) {
93+
* return $browser->get($url);
94+
* });
95+
* ```
96+
*
97+
* ```php
98+
* // accepts any callable, so PHP's array notation is also supported
99+
* $promise = Queue:all(10, $jobs, array($browser, 'get'));
100+
* ```
101+
*
102+
* > Keep in mind that returning an array of response messages means that
103+
* the whole response body has to be kept in memory.
104+
*
105+
* @param int $concurrency concurrency soft limit
106+
* @param array $jobs
107+
* @param callable $handler
108+
* @return PromiseInterface Returns a Promise<mixed[]> which resolves with an array of all resolution values
109+
* or rejects when any of the operations reject.
110+
*/
111+
public static function all($concurrency, array $jobs, $handler)
112+
{
113+
try {
114+
// limit number of concurrent operations
115+
$q = new self($concurrency, null, $handler);
116+
} catch (\InvalidArgumentException $e) {
117+
// reject if $concurrency or $handler is invalid
118+
return Promise\reject($e);
119+
}
120+
121+
// try invoking all operations and automatically queue excessive ones
122+
$promises = array_map($q, $jobs);
123+
124+
return new Promise\Promise(function ($resolve, $reject) use ($promises) {
125+
Promise\all($promises)->then($resolve, function ($e) use ($promises, $reject) {
126+
// cancel all pending promises if a single promise fails
127+
foreach (array_reverse($promises) as $promise) {
128+
if ($promise instanceof CancellablePromiseInterface) {
129+
$promise->cancel();
130+
}
131+
}
132+
133+
// reject with original rejection message
134+
$reject($e);
135+
});
136+
}, function () use ($promises) {
137+
// cancel all pending promises on cancellation
138+
foreach (array_reverse($promises) as $promise) {
139+
if ($promise instanceof CancellablePromiseInterface) {
140+
$promise->cancel();
141+
}
142+
}
143+
});
144+
}
145+
32146
/**
33147
* Instantiates a new queue object.
34148
*

0 commit comments

Comments
 (0)