Skip to content

Commit 1e82897

Browse files
authored
Merge pull request #18 from clue-labs/any
Add any() helper to await first successful fulfillment of operations
2 parents 2192293 + ed8369f commit 1e82897

File tree

6 files changed

+365
-2
lines changed

6 files changed

+365
-2
lines changed

README.md

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ much any API that already uses Promises.
3939
* [Cancellation](#cancellation)
4040
* [Timeout](#timeout)
4141
* [all()](#all)
42+
* [any()](#any)
4243
* [Blocking](#blocking)
4344
* [Install](#install)
4445
* [Tests](#tests)
@@ -332,6 +333,80 @@ $promise = Queue::all(10, $jobs, array($browser, 'get'));
332333
> Keep in mind that returning an array of response messages means that
333334
the whole response body has to be kept in memory.
334335

336+
#### any()
337+
338+
The static `any(int $concurrency, array $jobs, callable $handler): PromiseInterface<mixed>` method can be used to
339+
concurrently process given jobs through the given `$handler` and resolve
340+
with first resolution value.
341+
342+
This is a convenience method which uses the `Queue` internally to
343+
schedule all jobs while limiting concurrency to ensure no more than
344+
`$concurrency` jobs ever run at once. It will return a promise which
345+
resolves with the result of the first job on success and will then try
346+
to `cancel()` all outstanding jobs.
347+
348+
```php
349+
$loop = React\EventLoop\Factory::create();
350+
$browser = new Clue\React\Buzz\Browser($loop);
351+
352+
$promise = Queue::any(3, $urls, function ($url) use ($browser) {
353+
return $browser->get($url);
354+
});
355+
356+
$promise->then(function (ResponseInterface $response) {
357+
echo 'First response: ' . $response->getBody() . PHP_EOL;
358+
});
359+
```
360+
361+
If all of the jobs fail, it will reject the resulting promise. Similarly,
362+
calling `cancel()` on the resulting promise will try to cancel all
363+
outstanding jobs. See [promises](#promises) and
364+
[cancellation](#cancellation) for details.
365+
366+
The `$concurrency` parameter sets a new soft limit for the maximum number
367+
of jobs to handle concurrently. Finding a good concurrency limit depends
368+
on your particular use case. It's common to limit concurrency to a rather
369+
small value, as doing more than a dozen of things at once may easily
370+
overwhelm the receiving side. Using a `1` value will ensure that all jobs
371+
are processed one after another, effectively creating a "waterfall" of
372+
jobs. Using a value less than 1 will reject with an
373+
`InvalidArgumentException` without processing any jobs.
374+
375+
```php
376+
// handle up to 10 jobs concurrently
377+
$promise = Queue::any(10, $jobs, $handler);
378+
```
379+
380+
```php
381+
// handle each job after another without concurrency (waterfall)
382+
$promise = Queue::any(1, $jobs, $handler);
383+
```
384+
385+
The `$jobs` parameter must be an array with all jobs to process. Each
386+
value in this array will be passed to the `$handler` to start one job.
387+
The array keys have no effect, the promise will simply resolve with the
388+
job results of the first successful job as returned by the `$handler`.
389+
If this array is empty, this method will reject without processing any
390+
jobs.
391+
392+
The `$handler` parameter must be a valid callable that accepts your job
393+
parameters, invokes the appropriate operation and returns a Promise as a
394+
placeholder for its future result. If the given argument is not a valid
395+
callable, this method will reject with an `InvalidArgumentExceptionn`
396+
without processing any jobs.
397+
398+
```php
399+
// using a Closure as handler is usually recommended
400+
$promise = Queue::any(10, $jobs, function ($url) use ($browser) {
401+
return $browser->get($url);
402+
});
403+
```
404+
405+
```php
406+
// accepts any callable, so PHP's array notation is also supported
407+
$promise = Queue::any(10, $jobs, array($browser, 'get'));
408+
```
409+
335410
#### Blocking
336411

337412
As stated above, this library provides you a powerful, async API by default.

composer.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
},
2323
"require-dev": {
2424
"clue/block-react": "^1.0",
25-
"clue/buzz-react": "^2.0",
25+
"clue/buzz-react": "^2.4",
2626
"phpunit/phpunit": "^7.0 || ^6.4 || ^5.7 || ^4.8.35",
2727
"react/event-loop": "^1.0 || ^0.5 || ^0.4 || ^0.3"
2828
}

examples/03-http-any.php

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
<?php
2+
3+
use Clue\React\Buzz\Browser;
4+
use Clue\React\Mq\Queue;
5+
use Psr\Http\Message\ResponseInterface;
6+
use React\EventLoop\Factory;
7+
8+
require __DIR__ . '/../vendor/autoload.php';
9+
10+
// list of all URLs you want to try
11+
// this list may potentially contain hundreds or thousands of entries
12+
$urls = array(
13+
'http://www.github.com/invalid',
14+
'http://www.yahoo.com/invalid',
15+
'http://www.bing.com/invalid',
16+
'http://www.bing.com/',
17+
'http://www.google.com/',
18+
'http://www.google.com/invalid',
19+
);
20+
21+
$loop = Factory::create();
22+
$browser = new Browser($loop);
23+
24+
// each job should use the browser to GET a certain URL
25+
// limit number of concurrent jobs here to avoid using excessive network resources
26+
$promise = Queue::any(2, $urls, function ($url) use ($browser) {
27+
return $browser->get($url)->then(
28+
function (ResponseInterface $response) use ($url) {
29+
// return only the URL for the first successful response
30+
return $url;
31+
}
32+
);
33+
});
34+
35+
$promise->then(
36+
function ($url) {
37+
echo 'First successful URL is ' . $url . PHP_EOL;
38+
},
39+
function ($e) {
40+
echo 'An error occured: ' . $e->getMessage() . PHP_EOL;
41+
}
42+
);
43+
44+
$loop->run();

src/Queue.php

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,124 @@ public static function all($concurrency, array $jobs, $handler)
143143
});
144144
}
145145

146+
/**
147+
* Concurrently process given jobs through the given `$handler` and resolve
148+
* with first resolution value.
149+
*
150+
* This is a convenience method which uses the `Queue` internally to
151+
* schedule all jobs while limiting concurrency to ensure no more than
152+
* `$concurrency` jobs ever run at once. It will return a promise which
153+
* resolves with the result of the first job on success and will then try
154+
* to `cancel()` all outstanding jobs.
155+
*
156+
* ```php
157+
* $loop = React\EventLoop\Factory::create();
158+
* $browser = new Clue\React\Buzz\Browser($loop);
159+
*
160+
* $promise = Queue::any(3, $urls, function ($url) use ($browser) {
161+
* return $browser->get($url);
162+
* });
163+
*
164+
* $promise->then(function (ResponseInterface $response) {
165+
* echo 'First response: ' . $response->getBody() . PHP_EOL;
166+
* });
167+
* ```
168+
*
169+
* If all of the jobs fail, it will reject the resulting promise. Similarly,
170+
* calling `cancel()` on the resulting promise will try to cancel all
171+
* outstanding jobs. See [promises](#promises) and
172+
* [cancellation](#cancellation) for details.
173+
*
174+
* The `$concurrency` parameter sets a new soft limit for the maximum number
175+
* of jobs to handle concurrently. Finding a good concurrency limit depends
176+
* on your particular use case. It's common to limit concurrency to a rather
177+
* small value, as doing more than a dozen of things at once may easily
178+
* overwhelm the receiving side. Using a `1` value will ensure that all jobs
179+
* are processed one after another, effectively creating a "waterfall" of
180+
* jobs. Using a value less than 1 will reject with an
181+
* `InvalidArgumentException` without processing any jobs.
182+
*
183+
* ```php
184+
* // handle up to 10 jobs concurrently
185+
* $promise = Queue::any(10, $jobs, $handler);
186+
* ```
187+
*
188+
* ```php
189+
* // handle each job after another without concurrency (waterfall)
190+
* $promise = Queue::any(1, $jobs, $handler);
191+
* ```
192+
*
193+
* The `$jobs` parameter must be an array with all jobs to process. Each
194+
* value in this array will be passed to the `$handler` to start one job.
195+
* The array keys have no effect, the promise will simply resolve with the
196+
* job results of the first successful job as returned by the `$handler`.
197+
* If this array is empty, this method will reject without processing any
198+
* jobs.
199+
*
200+
* The `$handler` parameter must be a valid callable that accepts your job
201+
* parameters, invokes the appropriate operation and returns a Promise as a
202+
* placeholder for its future result. If the given argument is not a valid
203+
* callable, this method will reject with an `InvalidArgumentExceptionn`
204+
* without processing any jobs.
205+
*
206+
* ```php
207+
* // using a Closure as handler is usually recommended
208+
* $promise = Queue::any(10, $jobs, function ($url) use ($browser) {
209+
* return $browser->get($url);
210+
* });
211+
* ```
212+
*
213+
* ```php
214+
* // accepts any callable, so PHP's array notation is also supported
215+
* $promise = Queue::any(10, $jobs, array($browser, 'get'));
216+
* ```
217+
*
218+
* @param int $concurrency concurrency soft limit
219+
* @param array $jobs
220+
* @param callable $handler
221+
* @return PromiseInterface Returns a Promise<mixed> which resolves with a single resolution value
222+
* or rejects when all of the operations reject.
223+
*/
224+
public static function any($concurrency, array $jobs, $handler)
225+
{
226+
// explicitly reject with empty jobs (https://github.com/reactphp/promise/pull/34)
227+
if (!$jobs) {
228+
return Promise\reject(new \UnderflowException('No jobs given'));
229+
}
230+
231+
try {
232+
// limit number of concurrent operations
233+
$q = new self($concurrency, null, $handler);
234+
} catch (\InvalidArgumentException $e) {
235+
// reject if $concurrency or $handler is invalid
236+
return Promise\reject($e);
237+
}
238+
239+
// try invoking all operations and automatically queue excessive ones
240+
$promises = array_map($q, $jobs);
241+
242+
return new Promise\Promise(function ($resolve, $reject) use ($promises) {
243+
Promise\any($promises)->then(function ($result) use ($promises, $resolve) {
244+
// cancel all pending promises if a single result is ready
245+
foreach (array_reverse($promises) as $promise) {
246+
if ($promise instanceof CancellablePromiseInterface) {
247+
$promise->cancel();
248+
}
249+
}
250+
251+
// resolve with original resolution value
252+
$resolve($result);
253+
}, $reject);
254+
}, function () use ($promises) {
255+
// cancel all pending promises on cancellation
256+
foreach (array_reverse($promises) as $promise) {
257+
if ($promise instanceof CancellablePromiseInterface) {
258+
$promise->cancel();
259+
}
260+
}
261+
});
262+
}
263+
146264
/**
147265
* Instantiates a new queue object.
148266
*

tests/QueueAllTest.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,9 @@ public function testCancelResultingPromiseWillCancelPendingOperation()
7474
$promise->cancel();
7575
}
7676

77-
public function testPendingOperationWillBeCancelledIfOneOperationRejects22222222222()
77+
public function testPendingOperationWillBeStartedAndCancelledIfOneOperationRejects()
7878
{
79+
// second operation will only be started to be cancelled immediately
7980
$first = new Deferred();
8081
$second = new Promise(function () { }, $this->expectCallableOnce());
8182

0 commit comments

Comments
 (0)