Skip to content

Commit 119e509

Browse files
committed
Ensure parallel() function cancels parallel tasks on error
1 parent ca4c692 commit 119e509

File tree

2 files changed

+49
-23
lines changed

2 files changed

+49
-23
lines changed

src/functions.php

Lines changed: 20 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
namespace React\Async;
44

55
use React\EventLoop\Loop;
6+
use React\Promise\CancellablePromiseInterface;
67
use React\Promise\Deferred;
78
use React\Promise\PromiseInterface;
89

@@ -96,48 +97,44 @@ function ($error) use (&$exception, &$rejected, &$wait) {
9697
*/
9798
function parallel(array $tasks)
9899
{
100+
$pending = array();
99101
$deferred = new Deferred();
100102
$results = array();
101-
$errors = array();
102-
103-
$done = function () use (&$results, &$errors, $deferred) {
104-
if (count($errors)) {
105-
$deferred->reject(reset($errors));
106-
return;
107-
}
108-
109-
$deferred->resolve($results);
110-
};
103+
$errored = false;
111104

112105
$numTasks = count($tasks);
113-
114106
if (0 === $numTasks) {
115-
$done();
107+
$deferred->resolve($results);
116108
}
117109

118-
$checkDone = function () use (&$results, &$errors, $numTasks, $done) {
119-
if ($numTasks === count($results) || count($errors)) {
120-
$done();
121-
}
122-
};
110+
$taskErrback = function ($error) use (&$pending, $deferred, &$errored) {
111+
$errored = true;
112+
$deferred->reject($error);
123113

124-
$taskErrback = function ($error) use (&$errors, $checkDone) {
125-
$errors[] = $error;
126-
$checkDone();
114+
foreach ($pending as $promise) {
115+
if ($promise instanceof CancellablePromiseInterface) {
116+
$promise->cancel();
117+
}
118+
}
119+
$pending = array();
127120
};
128121

129122
foreach ($tasks as $i => $task) {
130-
$taskCallback = function ($result) use (&$results, $i, $checkDone) {
123+
$taskCallback = function ($result) use (&$results, &$pending, $numTasks, $i, $deferred) {
131124
$results[$i] = $result;
132-
$checkDone();
125+
126+
if (count($results) === $numTasks) {
127+
$deferred->resolve($results);
128+
}
133129
};
134130

135131
$promise = call_user_func($task);
136132
assert($promise instanceof PromiseInterface);
133+
$pending[$i] = $promise;
137134

138135
$promise->then($taskCallback, $taskErrback);
139136

140-
if ($errors) {
137+
if ($errored) {
141138
break;
142139
}
143140
}

tests/ParallelTest.php

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,35 @@ function () use (&$called) {
8181
$this->assertSame(2, $called);
8282
}
8383

84+
public function testParallelWithErrorWillCancelPendingPromises()
85+
{
86+
$cancelled = 0;
87+
88+
$tasks = array(
89+
function () use (&$cancelled) {
90+
return new Promise(function () { }, function () use (&$cancelled) {
91+
$cancelled++;
92+
});
93+
},
94+
function () {
95+
return new Promise(function () {
96+
throw new \RuntimeException('whoops');
97+
});
98+
},
99+
function () use (&$cancelled) {
100+
return new Promise(function () { }, function () use (&$cancelled) {
101+
$cancelled++;
102+
});
103+
}
104+
);
105+
106+
$promise = React\Async\parallel($tasks);
107+
108+
$promise->then(null, $this->expectCallableOnceWith(new \RuntimeException('whoops')));
109+
110+
$this->assertSame(1, $cancelled);
111+
}
112+
84113
public function testParallelWithDelayedErrorReturnsPromiseRejectedWithExceptionFromTask()
85114
{
86115
$called = 0;

0 commit comments

Comments
 (0)