Skip to content

Commit a27c1cb

Browse files
committed
Add ConnectionManagerConcurrent
1 parent b5a5c55 commit a27c1cb

File tree

3 files changed

+111
-0
lines changed

3 files changed

+111
-0
lines changed

README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,11 @@ any of the given `ConnectionManager`s in consecutive order until the first one s
8686
The `ConnectionManagerRandom($connectors)` works much like `ConnectionManagerConsecutive` but instead
8787
of using a fixed order, it always uses a randomly shuffled order.
8888

89+
### Concurrent
90+
91+
The `ConnectionManagerConcurrent($connectors)` establishes connections by trying to connect through
92+
ALL of the given `ConnectionManager`s at once, until the first one succeeds.
93+
8994
### Selective
9095

9196
The `ConnectionManagerSelective()` manages several `Connector`s and forwards connection through either of
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
<?php
2+
3+
namespace ConnectionManager\Extra\Multiple;
4+
5+
use ConnectionManager\Extra\Multiple\ConnectionManagerConsecutive;
6+
use React\Promise;
7+
use React\Promise\CancellablePromiseInterface;
8+
9+
class ConnectionManagerConcurrent extends ConnectionManagerConsecutive
10+
{
11+
public function create($host, $port)
12+
{
13+
$all = array();
14+
foreach ($this->managers as $connector) {
15+
/* @var $connection Connector */
16+
$all []= $connector->create($host, $port);
17+
}
18+
return Promise\any($all)->then(function ($conn) use ($all) {
19+
// a connection attempt succeeded
20+
// => cancel all pending connection attempts
21+
foreach ($all as $promise) {
22+
if ($promise instanceof CancellablePromiseInterface) {
23+
$promise->cancel();
24+
}
25+
26+
// if promise resolves despite cancellation, immediately close stream
27+
$promise->then(function ($stream) use ($conn) {
28+
if ($stream !== $conn) {
29+
$stream->close();
30+
}
31+
});
32+
}
33+
return $conn;
34+
});
35+
}
36+
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
<?php
2+
3+
use ConnectionManager\Extra\Multiple\ConnectionManagerConcurrent;
4+
use React\Promise;
5+
6+
class ConnectionManagerConcurrentTest extends TestCase
7+
{
8+
public function testEmptyRejects()
9+
{
10+
$connector = new ConnectionManagerConcurrent();
11+
12+
$promise = $connector->create('google.com', 80);
13+
14+
$this->assertPromiseReject($promise);
15+
}
16+
17+
public function testWillForwardToInnerConnector()
18+
{
19+
$pending = new Promise\Promise(function() { });
20+
21+
$only = $this->getMock('React\SocketClient\ConnectorInterface');
22+
$only->expects($this->once())->method('create')->with('google.com', 80)->willReturn($pending);
23+
24+
$connector = new ConnectionManagerConcurrent();
25+
$connector->addConnectionManager($only);
26+
27+
$promise = $connector->create('google.com', 80);
28+
29+
$promise->then($this->expectCallableNever(), $this->expectCallableNever());
30+
}
31+
32+
public function testWillCancelOtherIfOneResolves()
33+
{
34+
$resolved = Promise\resolve($this->getMock('React\Stream\DuplexStreamInterface'));
35+
$first = $this->getMock('React\SocketClient\ConnectorInterface');
36+
$first->expects($this->once())->method('create')->with('google.com', 80)->willReturn($resolved);
37+
38+
$pending = new Promise\Promise(function() { }, $this->expectCallableOnce());
39+
$second = $this->getMock('React\SocketClient\ConnectorInterface');
40+
$second->expects($this->once())->method('create')->with('google.com', 80)->willReturn($pending);
41+
42+
$connector = new ConnectionManagerConcurrent();
43+
$connector->addConnectionManager($first);
44+
$connector->addConnectionManager($second);
45+
46+
$promise = $connector->create('google.com', 80);
47+
48+
$this->assertPromiseResolve($promise);
49+
}
50+
51+
public function testWillCloseOtherIfOneResolves()
52+
{
53+
$resolved = Promise\resolve($this->getMock('React\Stream\DuplexStreamInterface'));
54+
$first = $this->getMock('React\SocketClient\ConnectorInterface');
55+
$first->expects($this->once())->method('create')->with('google.com', 80)->willReturn($resolved);
56+
57+
$slower = $this->getMock('React\Stream\DuplexStreamInterface');
58+
$slower->expects($this->once())->method('close');
59+
$second = $this->getMock('React\SocketClient\ConnectorInterface');
60+
$second->expects($this->once())->method('create')->with('google.com', 80)->willReturn(Promise\resolve($slower));
61+
62+
$connector = new ConnectionManagerConcurrent();
63+
$connector->addConnectionManager($first);
64+
$connector->addConnectionManager($second);
65+
66+
$promise = $connector->create('google.com', 80);
67+
68+
$this->assertPromiseResolve($promise);
69+
}
70+
}

0 commit comments

Comments
 (0)