Skip to content

Commit a58b179

Browse files
committed
Support iterable type for parallel() + series() + waterfall()
1 parent cfd52ac commit a58b179

File tree

5 files changed

+163
-9
lines changed

5 files changed

+163
-9
lines changed

README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -396,7 +396,7 @@ $promise->then(function (int $bytes) {
396396

397397
### parallel()
398398

399-
The `parallel(array<callable():PromiseInterface<mixed,Exception>> $tasks): PromiseInterface<array<mixed>,Exception>` function can be used
399+
The `parallel(iterable<callable():PromiseInterface<mixed,Exception>> $tasks): PromiseInterface<array<mixed>,Exception>` function can be used
400400
like this:
401401

402402
```php
@@ -438,7 +438,7 @@ React\Async\parallel([
438438

439439
### series()
440440

441-
The `series(array<callable():PromiseInterface<mixed,Exception>> $tasks): PromiseInterface<array<mixed>,Exception>` function can be used
441+
The `series(iterable<callable():PromiseInterface<mixed,Exception>> $tasks): PromiseInterface<array<mixed>,Exception>` function can be used
442442
like this:
443443

444444
```php
@@ -480,7 +480,7 @@ React\Async\series([
480480

481481
### waterfall()
482482

483-
The `waterfall(array<callable(mixed=):PromiseInterface<mixed,Exception>> $tasks): PromiseInterface<mixed,Exception>` function can be used
483+
The `waterfall(iterable<callable(mixed=):PromiseInterface<mixed,Exception>> $tasks): PromiseInterface<mixed,Exception>` function can be used
484484
like this:
485485

486486
```php

src/functions.php

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -533,10 +533,10 @@ function coroutine(callable $function, mixed ...$args): PromiseInterface
533533
}
534534

535535
/**
536-
* @param array<callable():PromiseInterface<mixed,Exception>> $tasks
536+
* @param iterable<callable():PromiseInterface<mixed,Exception>> $tasks
537537
* @return PromiseInterface<array<mixed>,Exception>
538538
*/
539-
function parallel(array $tasks): PromiseInterface
539+
function parallel(iterable $tasks): PromiseInterface
540540
{
541541
$pending = [];
542542
$deferred = new Deferred(function () use (&$pending) {
@@ -550,6 +550,10 @@ function parallel(array $tasks): PromiseInterface
550550
$results = [];
551551
$errored = false;
552552

553+
if (!\is_array($tasks)) {
554+
$tasks = \iterator_to_array($tasks);
555+
}
556+
553557
$numTasks = count($tasks);
554558
if (0 === $numTasks) {
555559
$deferred->resolve($results);
@@ -591,10 +595,10 @@ function parallel(array $tasks): PromiseInterface
591595
}
592596

593597
/**
594-
* @param array<callable():PromiseInterface<mixed,Exception>> $tasks
598+
* @param iterable<callable():PromiseInterface<mixed,Exception>> $tasks
595599
* @return PromiseInterface<array<mixed>,Exception>
596600
*/
597-
function series(array $tasks): PromiseInterface
601+
function series(iterable $tasks): PromiseInterface
598602
{
599603
$pending = null;
600604
$deferred = new Deferred(function () use (&$pending) {
@@ -605,6 +609,10 @@ function series(array $tasks): PromiseInterface
605609
});
606610
$results = [];
607611

612+
if (!\is_array($tasks)) {
613+
$tasks = \iterator_to_array($tasks);
614+
}
615+
608616
/** @var callable():void $next */
609617
$taskCallback = function ($result) use (&$results, &$next) {
610618
$results[] = $result;
@@ -631,10 +639,10 @@ function series(array $tasks): PromiseInterface
631639
}
632640

633641
/**
634-
* @param array<callable(mixed=):PromiseInterface<mixed,Exception>> $tasks
642+
* @param iterable<callable(mixed=):PromiseInterface<mixed,Exception>> $tasks
635643
* @return PromiseInterface<mixed,Exception>
636644
*/
637-
function waterfall(array $tasks): PromiseInterface
645+
function waterfall(iterable $tasks): PromiseInterface
638646
{
639647
$pending = null;
640648
$deferred = new Deferred(function () use (&$pending) {
@@ -644,6 +652,10 @@ function waterfall(array $tasks): PromiseInterface
644652
$pending = null;
645653
});
646654

655+
if (!\is_array($tasks)) {
656+
$tasks = \iterator_to_array($tasks);
657+
}
658+
647659
/** @var callable $next */
648660
$next = function ($value = null) use (&$tasks, &$next, $deferred, &$pending) {
649661
if (0 === count($tasks)) {

tests/ParallelTest.php

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,19 @@ public function testParallelWithoutTasks()
1717
$promise->then($this->expectCallableOnceWith(array()));
1818
}
1919

20+
public function testParallelWithoutTasksFromEmptyGeneratorResolvesWithEmptyArray()
21+
{
22+
$tasks = (function () {
23+
if (false) {
24+
yield;
25+
}
26+
})();
27+
28+
$promise = React\Async\parallel($tasks);
29+
30+
$promise->then($this->expectCallableOnceWith([]));
31+
}
32+
2033
public function testParallelWithTasks()
2134
{
2235
$tasks = array(
@@ -49,6 +62,38 @@ function () {
4962
$timer->assertInRange(0.1, 0.2);
5063
}
5164

65+
public function testParallelWithTasksFromGeneratorResolvesWithArrayOfFulfillmentValues()
66+
{
67+
$tasks = (function () {
68+
yield function () {
69+
return new Promise(function ($resolve) {
70+
Loop::addTimer(0.1, function () use ($resolve) {
71+
$resolve('foo');
72+
});
73+
});
74+
};
75+
yield function () {
76+
return new Promise(function ($resolve) {
77+
Loop::addTimer(0.11, function () use ($resolve) {
78+
$resolve('bar');
79+
});
80+
});
81+
};
82+
})();
83+
84+
$promise = React\Async\parallel($tasks);
85+
86+
$promise->then($this->expectCallableOnceWith(array('foo', 'bar')));
87+
88+
$timer = new Timer($this);
89+
$timer->start();
90+
91+
Loop::run();
92+
93+
$timer->stop();
94+
$timer->assertInRange(0.1, 0.2);
95+
}
96+
5297
public function testParallelWithErrorReturnsPromiseRejectedWithExceptionFromTaskAndStopsCallingAdditionalTasks()
5398
{
5499
$called = 0;

tests/SeriesTest.php

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,19 @@ public function testSeriesWithoutTasks()
1717
$promise->then($this->expectCallableOnceWith(array()));
1818
}
1919

20+
public function testSeriesWithoutTasksFromEmptyGeneratorResolvesWithEmptyArray()
21+
{
22+
$tasks = (function () {
23+
if (false) {
24+
yield;
25+
}
26+
})();
27+
28+
$promise = React\Async\series($tasks);
29+
30+
$promise->then($this->expectCallableOnceWith([]));
31+
}
32+
2033
public function testSeriesWithTasks()
2134
{
2235
$tasks = array(
@@ -49,6 +62,38 @@ function () {
4962
$timer->assertInRange(0.10, 0.20);
5063
}
5164

65+
public function testSeriesWithTasksFromGeneratorResolvesWithArrayOfFulfillmentValues()
66+
{
67+
$tasks = (function () {
68+
yield function () {
69+
return new Promise(function ($resolve) {
70+
Loop::addTimer(0.051, function () use ($resolve) {
71+
$resolve('foo');
72+
});
73+
});
74+
};
75+
yield function () {
76+
return new Promise(function ($resolve) {
77+
Loop::addTimer(0.051, function () use ($resolve) {
78+
$resolve('bar');
79+
});
80+
});
81+
};
82+
})();
83+
84+
$promise = React\Async\series($tasks);
85+
86+
$promise->then($this->expectCallableOnceWith(array('foo', 'bar')));
87+
88+
$timer = new Timer($this);
89+
$timer->start();
90+
91+
Loop::run();
92+
93+
$timer->stop();
94+
$timer->assertInRange(0.10, 0.20);
95+
}
96+
5297
public function testSeriesWithError()
5398
{
5499
$called = 0;

tests/WaterfallTest.php

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,19 @@ public function testWaterfallWithoutTasks()
1717
$promise->then($this->expectCallableOnceWith(null));
1818
}
1919

20+
public function testWaterfallWithoutTasksFromEmptyGeneratorResolvesWithNull()
21+
{
22+
$tasks = (function () {
23+
if (false) {
24+
yield;
25+
}
26+
})();
27+
28+
$promise = React\Async\waterfall($tasks);
29+
30+
$promise->then($this->expectCallableOnceWith(null));
31+
}
32+
2033
public function testWaterfallWithTasks()
2134
{
2235
$tasks = array(
@@ -56,6 +69,45 @@ function ($bar) {
5669
$timer->assertInRange(0.15, 0.30);
5770
}
5871

72+
public function testWaterfallWithTasksFromGeneratorResolvesWithFinalFulfillmentValue()
73+
{
74+
$tasks = (function () {
75+
yield function ($foo = 'foo') {
76+
return new Promise(function ($resolve) use ($foo) {
77+
Loop::addTimer(0.05, function () use ($resolve, $foo) {
78+
$resolve($foo);
79+
});
80+
});
81+
};
82+
yield function ($foo) {
83+
return new Promise(function ($resolve) use ($foo) {
84+
Loop::addTimer(0.05, function () use ($resolve, $foo) {
85+
$resolve($foo . 'bar');
86+
});
87+
});
88+
};
89+
yield function ($bar) {
90+
return new Promise(function ($resolve) use ($bar) {
91+
Loop::addTimer(0.05, function () use ($resolve, $bar) {
92+
$resolve($bar . 'baz');
93+
});
94+
});
95+
};
96+
})();
97+
98+
$promise = React\Async\waterfall($tasks);
99+
100+
$promise->then($this->expectCallableOnceWith('foobarbaz'));
101+
102+
$timer = new Timer($this);
103+
$timer->start();
104+
105+
Loop::run();
106+
107+
$timer->stop();
108+
$timer->assertInRange(0.15, 0.30);
109+
}
110+
59111
public function testWaterfallWithError()
60112
{
61113
$called = 0;

0 commit comments

Comments
 (0)