Skip to content

Commit 36eb448

Browse files
committed
Calling cancel() on resulting promise should cancel all pending tasks
1 parent 119e509 commit 36eb448

File tree

4 files changed

+95
-5
lines changed

4 files changed

+95
-5
lines changed

src/functions.php

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,14 @@ function ($error) use (&$exception, &$rejected, &$wait) {
9898
function parallel(array $tasks)
9999
{
100100
$pending = array();
101-
$deferred = new Deferred();
101+
$deferred = new Deferred(function () use (&$pending) {
102+
foreach ($pending as $promise) {
103+
if ($promise instanceof CancellablePromiseInterface) {
104+
$promise->cancel();
105+
}
106+
}
107+
$pending = array();
108+
});
102109
$results = array();
103110
$errored = false;
104111

@@ -148,7 +155,13 @@ function parallel(array $tasks)
148155
*/
149156
function series(array $tasks)
150157
{
151-
$deferred = new Deferred();
158+
$pending = null;
159+
$deferred = new Deferred(function () use (&$pending) {
160+
if ($pending instanceof CancellablePromiseInterface) {
161+
$pending->cancel();
162+
}
163+
$pending = null;
164+
});
152165
$results = array();
153166

154167
/** @var callable():void $next */
@@ -157,7 +170,7 @@ function series(array $tasks)
157170
$next();
158171
};
159172

160-
$next = function () use (&$tasks, $taskCallback, $deferred, &$results) {
173+
$next = function () use (&$tasks, $taskCallback, $deferred, &$results, &$pending) {
161174
if (0 === count($tasks)) {
162175
$deferred->resolve($results);
163176
return;
@@ -166,6 +179,7 @@ function series(array $tasks)
166179
$task = array_shift($tasks);
167180
$promise = call_user_func($task);
168181
assert($promise instanceof PromiseInterface);
182+
$pending = $promise;
169183

170184
$promise->then($taskCallback, array($deferred, 'reject'));
171185
};
@@ -181,10 +195,16 @@ function series(array $tasks)
181195
*/
182196
function waterfall(array $tasks)
183197
{
184-
$deferred = new Deferred();
198+
$pending = null;
199+
$deferred = new Deferred(function () use (&$pending) {
200+
if ($pending instanceof CancellablePromiseInterface) {
201+
$pending->cancel();
202+
}
203+
$pending = null;
204+
});
185205

186206
/** @var callable $next */
187-
$next = function ($value = null) use (&$tasks, &$next, $deferred) {
207+
$next = function ($value = null) use (&$tasks, &$next, $deferred, &$pending) {
188208
if (0 === count($tasks)) {
189209
$deferred->resolve($value);
190210
return;
@@ -193,6 +213,7 @@ function waterfall(array $tasks)
193213
$task = array_shift($tasks);
194214
$promise = call_user_func_array($task, func_get_args());
195215
assert($promise instanceof PromiseInterface);
216+
$pending = $promise;
196217

197218
$promise->then($next, array($deferred, 'reject'));
198219
};

tests/ParallelTest.php

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,29 @@ function () use (&$cancelled) {
110110
$this->assertSame(1, $cancelled);
111111
}
112112

113+
public function testParallelWillCancelPendingPromisesWhenCallingCancelOnResultingPromise()
114+
{
115+
$cancelled = 0;
116+
117+
$tasks = array(
118+
function () use (&$cancelled) {
119+
return new Promise(function () { }, function () use (&$cancelled) {
120+
$cancelled++;
121+
});
122+
},
123+
function () use (&$cancelled) {
124+
return new Promise(function () { }, function () use (&$cancelled) {
125+
$cancelled++;
126+
});
127+
}
128+
);
129+
130+
$promise = React\Async\parallel($tasks);
131+
$promise->cancel();
132+
133+
$this->assertSame(2, $cancelled);
134+
}
135+
113136
public function testParallelWithDelayedErrorReturnsPromiseRejectedWithExceptionFromTask()
114137
{
115138
$called = 0;

tests/SeriesTest.php

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,4 +79,27 @@ function () use (&$called) {
7979

8080
$this->assertSame(1, $called);
8181
}
82+
83+
public function testSeriesWillCancelFirstPendingPromiseWhenCallingCancelOnResultingPromise()
84+
{
85+
$cancelled = 0;
86+
87+
$tasks = array(
88+
function () {
89+
return new Promise(function ($resolve) {
90+
$resolve();
91+
});
92+
},
93+
function () use (&$cancelled) {
94+
return new Promise(function () { }, function () use (&$cancelled) {
95+
$cancelled++;
96+
});
97+
}
98+
);
99+
100+
$promise = React\Async\series($tasks);
101+
$promise->cancel();
102+
103+
$this->assertSame(1, $cancelled);
104+
}
82105
}

tests/WaterfallTest.php

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,4 +86,27 @@ function () use (&$called) {
8686

8787
$this->assertSame(1, $called);
8888
}
89+
90+
public function testWaterfallWillCancelFirstPendingPromiseWhenCallingCancelOnResultingPromise()
91+
{
92+
$cancelled = 0;
93+
94+
$tasks = array(
95+
function () {
96+
return new Promise(function ($resolve) {
97+
$resolve();
98+
});
99+
},
100+
function () use (&$cancelled) {
101+
return new Promise(function () { }, function () use (&$cancelled) {
102+
$cancelled++;
103+
});
104+
}
105+
);
106+
107+
$promise = React\Async\waterfall($tasks);
108+
$promise->cancel();
109+
110+
$this->assertSame(1, $cancelled);
111+
}
89112
}

0 commit comments

Comments
 (0)