Skip to content

Commit 2343d9c

Browse files
committed
Take advantage of iterators instead of converting to array first
1 parent a58b179 commit 2343d9c

File tree

4 files changed

+141
-26
lines changed

4 files changed

+141
-26
lines changed

src/functions.php

Lines changed: 37 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -548,19 +548,10 @@ function parallel(iterable $tasks): PromiseInterface
548548
$pending = [];
549549
});
550550
$results = [];
551-
$errored = false;
551+
$continue = true;
552552

553-
if (!\is_array($tasks)) {
554-
$tasks = \iterator_to_array($tasks);
555-
}
556-
557-
$numTasks = count($tasks);
558-
if (0 === $numTasks) {
559-
$deferred->resolve($results);
560-
}
561-
562-
$taskErrback = function ($error) use (&$pending, $deferred, &$errored) {
563-
$errored = true;
553+
$taskErrback = function ($error) use (&$pending, $deferred, &$continue) {
554+
$continue = false;
564555
$deferred->reject($error);
565556

566557
foreach ($pending as $promise) {
@@ -572,25 +563,31 @@ function parallel(iterable $tasks): PromiseInterface
572563
};
573564

574565
foreach ($tasks as $i => $task) {
575-
$taskCallback = function ($result) use (&$results, &$pending, $numTasks, $i, $deferred) {
566+
$taskCallback = function ($result) use (&$results, &$pending, &$continue, $i, $deferred) {
576567
$results[$i] = $result;
568+
unset($pending[$i]);
577569

578-
if (count($results) === $numTasks) {
570+
if (!$pending && !$continue) {
579571
$deferred->resolve($results);
580572
}
581573
};
582574

583-
$promise = call_user_func($task);
575+
$promise = \call_user_func($task);
584576
assert($promise instanceof PromiseInterface);
585577
$pending[$i] = $promise;
586578

587579
$promise->then($taskCallback, $taskErrback);
588580

589-
if ($errored) {
581+
if (!$continue) {
590582
break;
591583
}
592584
}
593585

586+
$continue = false;
587+
if (!$pending) {
588+
$deferred->resolve($results);
589+
}
590+
594591
return $deferred->promise();
595592
}
596593

@@ -609,8 +606,9 @@ function series(iterable $tasks): PromiseInterface
609606
});
610607
$results = [];
611608

612-
if (!\is_array($tasks)) {
613-
$tasks = \iterator_to_array($tasks);
609+
if ($tasks instanceof \IteratorAggregate) {
610+
$tasks = $tasks->getIterator();
611+
assert($tasks instanceof \Iterator);
614612
}
615613

616614
/** @var callable():void $next */
@@ -620,13 +618,19 @@ function series(iterable $tasks): PromiseInterface
620618
};
621619

622620
$next = function () use (&$tasks, $taskCallback, $deferred, &$results, &$pending) {
623-
if (0 === count($tasks)) {
621+
if ($tasks instanceof \Iterator ? !$tasks->valid() : !$tasks) {
624622
$deferred->resolve($results);
625623
return;
626624
}
627625

628-
$task = array_shift($tasks);
629-
$promise = call_user_func($task);
626+
if ($tasks instanceof \Iterator) {
627+
$task = $tasks->current();
628+
$tasks->next();
629+
} else {
630+
$task = \array_shift($tasks);
631+
}
632+
633+
$promise = \call_user_func($task);
630634
assert($promise instanceof PromiseInterface);
631635
$pending = $promise;
632636

@@ -652,19 +656,26 @@ function waterfall(iterable $tasks): PromiseInterface
652656
$pending = null;
653657
});
654658

655-
if (!\is_array($tasks)) {
656-
$tasks = \iterator_to_array($tasks);
659+
if ($tasks instanceof \IteratorAggregate) {
660+
$tasks = $tasks->getIterator();
661+
assert($tasks instanceof \Iterator);
657662
}
658663

659664
/** @var callable $next */
660665
$next = function ($value = null) use (&$tasks, &$next, $deferred, &$pending) {
661-
if (0 === count($tasks)) {
666+
if ($tasks instanceof \Iterator ? !$tasks->valid() : !$tasks) {
662667
$deferred->resolve($value);
663668
return;
664669
}
665670

666-
$task = array_shift($tasks);
667-
$promise = call_user_func_array($task, func_get_args());
671+
if ($tasks instanceof \Iterator) {
672+
$task = $tasks->current();
673+
$tasks->next();
674+
} else {
675+
$task = \array_shift($tasks);
676+
}
677+
678+
$promise = \call_user_func_array($task, func_get_args());
668679
assert($promise instanceof PromiseInterface);
669680
$pending = $promise;
670681

tests/ParallelTest.php

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
use React;
66
use React\EventLoop\Loop;
77
use React\Promise\Promise;
8+
use function React\Promise\reject;
89

910
class ParallelTest extends TestCase
1011
{
@@ -126,6 +127,25 @@ function () use (&$called) {
126127
$this->assertSame(2, $called);
127128
}
128129

130+
public function testParallelWithErrorFromInfiniteGeneratorReturnsPromiseRejectedWithExceptionFromTaskAndStopsCallingAdditionalTasks()
131+
{
132+
$called = 0;
133+
134+
$tasks = (function () use (&$called) {
135+
while (true) {
136+
yield function () use (&$called) {
137+
return reject(new \RuntimeException('Rejected ' . ++$called));
138+
};
139+
}
140+
})();
141+
142+
$promise = React\Async\parallel($tasks);
143+
144+
$promise->then(null, $this->expectCallableOnceWith(new \RuntimeException('Rejected 1')));
145+
146+
$this->assertSame(1, $called);
147+
}
148+
129149
public function testParallelWithErrorWillCancelPendingPromises()
130150
{
131151
$cancelled = 0;

tests/SeriesTest.php

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
use React;
66
use React\EventLoop\Loop;
77
use React\Promise\Promise;
8+
use function React\Promise\reject;
89

910
class SeriesTest extends TestCase
1011
{
@@ -125,6 +126,47 @@ function () use (&$called) {
125126
$this->assertSame(1, $called);
126127
}
127128

129+
public function testSeriesWithErrorFromInfiniteGeneratorReturnsPromiseRejectedWithExceptionFromTaskAndStopsCallingAdditionalTasks()
130+
{
131+
$called = 0;
132+
133+
$tasks = (function () use (&$called) {
134+
while (true) {
135+
yield function () use (&$called) {
136+
return reject(new \RuntimeException('Rejected ' . ++$called));
137+
};
138+
}
139+
})();
140+
141+
$promise = React\Async\series($tasks);
142+
143+
$promise->then(null, $this->expectCallableOnceWith(new \RuntimeException('Rejected 1')));
144+
145+
$this->assertSame(1, $called);
146+
}
147+
148+
public function testSeriesWithErrorFromInfiniteIteratorAggregateReturnsPromiseRejectedWithExceptionFromTaskAndStopsCallingAdditionalTasks()
149+
{
150+
$tasks = new class() implements \IteratorAggregate {
151+
public $called = 0;
152+
153+
public function getIterator(): \Iterator
154+
{
155+
while (true) {
156+
yield function () {
157+
return reject(new \RuntimeException('Rejected ' . ++$this->called));
158+
};
159+
}
160+
}
161+
};
162+
163+
$promise = React\Async\series($tasks);
164+
165+
$promise->then(null, $this->expectCallableOnceWith(new \RuntimeException('Rejected 1')));
166+
167+
$this->assertSame(1, $tasks->called);
168+
}
169+
128170
public function testSeriesWillCancelFirstPendingPromiseWhenCallingCancelOnResultingPromise()
129171
{
130172
$cancelled = 0;

tests/WaterfallTest.php

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
use React;
66
use React\EventLoop\Loop;
77
use React\Promise\Promise;
8+
use function React\Promise\reject;
89

910
class WaterfallTest extends TestCase
1011
{
@@ -139,6 +140,47 @@ function () use (&$called) {
139140
$this->assertSame(1, $called);
140141
}
141142

143+
public function testWaterfallWithErrorFromInfiniteGeneratorReturnsPromiseRejectedWithExceptionFromTaskAndStopsCallingAdditionalTasks()
144+
{
145+
$called = 0;
146+
147+
$tasks = (function () use (&$called) {
148+
while (true) {
149+
yield function () use (&$called) {
150+
return reject(new \RuntimeException('Rejected ' . ++$called));
151+
};
152+
}
153+
})();
154+
155+
$promise = React\Async\waterfall($tasks);
156+
157+
$promise->then(null, $this->expectCallableOnceWith(new \RuntimeException('Rejected 1')));
158+
159+
$this->assertSame(1, $called);
160+
}
161+
162+
public function testWaterfallWithErrorFromInfiniteIteratorAggregateReturnsPromiseRejectedWithExceptionFromTaskAndStopsCallingAdditionalTasks()
163+
{
164+
$tasks = new class() implements \IteratorAggregate {
165+
public $called = 0;
166+
167+
public function getIterator(): \Iterator
168+
{
169+
while (true) {
170+
yield function () {
171+
return reject(new \RuntimeException('Rejected ' . ++$this->called));
172+
};
173+
}
174+
}
175+
};
176+
177+
$promise = React\Async\waterfall($tasks);
178+
179+
$promise->then(null, $this->expectCallableOnceWith(new \RuntimeException('Rejected 1')));
180+
181+
$this->assertSame(1, $tasks->called);
182+
}
183+
142184
public function testWaterfallWillCancelFirstPendingPromiseWhenCallingCancelOnResultingPromise()
143185
{
144186
$cancelled = 0;

0 commit comments

Comments
 (0)