Skip to content

Commit 46e4cd2

Browse files
committed
Keep track of underlying connection and create new when connection lost
1 parent 82dc4c7 commit 46e4cd2

File tree

3 files changed

+104
-40
lines changed

3 files changed

+104
-40
lines changed

README.md

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -212,24 +212,34 @@ $client->end();
212212

213213
This method immediately returns a "virtual" connection implementing the
214214
[`Client`](#client) that can be used to interface with your Redis database.
215-
Internally, it lazily creates the underlying database connection (which may
216-
take some time) only once the first request is invoked on this instance and
217-
will queue all outstanding requests until the underlying connection is ready.
215+
Internally, it lazily creates the underlying database connection only on
216+
demand once the first request is invoked on this instance and will queue
217+
all outstanding requests until the underlying connection is ready.
218+
Additionally, it will keep track of this underlying connection and will
219+
create a new underlying connection on demand when the current connection
220+
is lost.
218221

219222
From a consumer side this means that you can start sending commands to the
220-
database right away while the actual connection may still be outstanding.
221-
It will ensure that all commands will be executed in the order they are
222-
enqueued once the connection is ready. If the database connection fails,
223-
it will emit an `error` event, reject all outstanding commands and `close`
224-
the connection as described in the `Client`. In other words, it behaves just
225-
like a real connection and frees you from having to deal with its async
226-
resolution.
223+
database right away while the underlying connection may still be
224+
outstanding. Because creating this underlying connection may take some
225+
time, it will enqueue all oustanding commands and will ensure that all
226+
commands will be executed in correct order once the connection is ready.
227+
In other words, this "virtual" connection behaves just like a "real"
228+
connection as described in the `Client` interface and frees you from having
229+
to deal with its async resolution.
230+
231+
If the underlying database connection fails, it will reject all
232+
outstanding commands and will return to the initial "idle" state. This
233+
means that you can keep sending additional commands at a later time which
234+
will again try to open the underlying connection.
227235

228236
Note that creating the underlying connection will be deferred until the
229237
first request is invoked. Accordingly, any eventual connection issues
230-
will be detected once this instance is first used. Similarly, calling
231-
`end()` on this instance before invoking any requests will succeed
232-
immediately and will not wait for an actual underlying connection.
238+
will be detected once this instance is first used. You can use the
239+
`end()` method to ensure that the "virtual" connection will be soft-closed
240+
and no further commands can be enqueued. Similarly, calling `end()` on
241+
this instance before invoking any requests will succeed immediately and
242+
will not wait for an actual underlying connection.
233243

234244
Depending on your particular use case, you may prefer this method or the
235245
underlying `createClient()` which resolves with a promise. For many

src/LazyClient.php

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
namespace Clue\React\Redis;
44

55
use Evenement\EventEmitter;
6-
use React\Promise\PromiseInterface;
76
use React\Stream\Util;
87

98
/**
@@ -14,7 +13,6 @@ class LazyClient extends EventEmitter implements Client
1413
private $target;
1514
/** @var Factory */
1615
private $factory;
17-
private $ending = false;
1816
private $closed = false;
1917
private $promise;
2018

@@ -25,23 +23,26 @@ public function __construct($target, Factory $factory)
2523
{
2624
$this->target = $target;
2725
$this->factory = $factory;
28-
29-
$this->on('close', array($this, 'removeAllListeners'));
3026
}
3127

3228
private function client()
3329
{
34-
if ($this->promise instanceof PromiseInterface) {
30+
if ($this->promise !== null) {
3531
return $this->promise;
3632
}
3733

3834
$self = $this;
39-
return $this->promise = $this->factory->createClient($this->target)->then(function (Client $client) use ($self) {
35+
$pending =& $this->promise;
36+
return $pending = $this->factory->createClient($this->target)->then(function (Client $client) use ($self, &$pending) {
37+
// connection completed => remember only until closed
38+
$client->on('close', function () use (&$pending) {
39+
$pending = null;
40+
});
41+
4042
Util::forwardEvents(
4143
$client,
4244
$self,
4345
array(
44-
'error',
4546
'message',
4647
'subscribe',
4748
'unsubscribe',
@@ -51,16 +52,10 @@ private function client()
5152
)
5253
);
5354

54-
$client->on('close', array($self, 'close'));
55-
5655
return $client;
57-
}, function (\Exception $e) use ($self) {
58-
// connection failed => emit error if connection is not already closed
59-
if ($self->closed) {
60-
return;
61-
}
62-
$self->emit('error', array($e));
63-
$self->close();
56+
}, function (\Exception $e) use (&$pending) {
57+
// connection failed => discard connection attempt
58+
$pending = null;
6459

6560
throw $e;
6661
});
@@ -87,7 +82,11 @@ public function end()
8782
return;
8883
}
8984

90-
return $this->client()->then(function (Client $client) {
85+
$that = $this;
86+
return $this->client()->then(function (Client $client) use ($that) {
87+
$client->on('close', function () use ($that) {
88+
$that->close();
89+
});
9190
$client->end();
9291
});
9392
}

tests/LazyClientTest.php

Lines changed: 65 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -75,22 +75,54 @@ public function testPingWillRejectWhenUnderlyingClientRejectsPing()
7575
$promise->then(null, $this->expectCallableOnceWith($error));
7676
}
7777

78-
public function testPingWillRejectAndEmitErrorAndCloseWhenFactoryRejectsUnderlyingClient()
78+
public function testPingWillRejectAndNotEmitErrorOrCloseWhenFactoryRejectsUnderlyingClient()
7979
{
8080
$error = new \RuntimeException();
8181

8282
$deferred = new Deferred();
8383
$this->factory->expects($this->once())->method('createClient')->willReturn($deferred->promise());
8484

85-
$this->client->on('error', $this->expectCallableOnceWith($error));
86-
$this->client->on('close', $this->expectCallableOnce());
85+
$this->client->on('error', $this->expectCallableNever());
86+
$this->client->on('close', $this->expectCallableNever());
8787

8888
$promise = $this->client->ping();
8989
$deferred->reject($error);
9090

9191
$promise->then(null, $this->expectCallableOnceWith($error));
9292
}
9393

94+
public function testPingAfterPreviousFactoryRejectsUnderlyingClientWillCreateNewUnderlyingConnection()
95+
{
96+
$error = new \RuntimeException();
97+
98+
$deferred = new Deferred();
99+
$this->factory->expects($this->exactly(2))->method('createClient')->willReturnOnConsecutiveCalls(
100+
$deferred->promise(),
101+
new Promise(function () { })
102+
);
103+
104+
$this->client->ping();
105+
$deferred->reject($error);
106+
107+
$this->client->ping();
108+
}
109+
110+
public function testPingAfterPreviousUnderlyingClientAlreadyClosedWillCreateNewUnderlyingConnection()
111+
{
112+
$client = $this->getMockBuilder('Clue\React\Redis\StreamingClient')->disableOriginalConstructor()->setMethods(array('__call'))->getMock();
113+
$client->expects($this->once())->method('__call')->with('ping')->willReturn(\React\Promise\resolve('PONG'));
114+
115+
$this->factory->expects($this->exactly(2))->method('createClient')->willReturnOnConsecutiveCalls(
116+
\React\Promise\resolve($client),
117+
new Promise(function () { })
118+
);
119+
120+
$this->client->ping();
121+
$client->emit('close');
122+
123+
$this->client->ping();
124+
}
125+
94126
public function testPingAfterCloseWillRejectWithoutCreatingUnderlyingConnection()
95127
{
96128
$this->factory->expects($this->never())->method('createClient');
@@ -144,6 +176,7 @@ public function testCloseAfterPingWillEmitCloseWithoutErrorWhenUnderlyingClientC
144176
public function testCloseAfterPingWillCloseUnderlyingClientConnectionWhenAlreadyResolved()
145177
{
146178
$client = $this->getMockBuilder('Clue\React\Redis\Client')->getMock();
179+
$client->expects($this->once())->method('__call')->willReturn(\React\Promise\resolve());
147180
$client->expects($this->once())->method('close');
148181

149182
$deferred = new Deferred();
@@ -174,39 +207,61 @@ public function testEndAfterPingWillEndUnderlyingClient()
174207
$this->client->end();
175208
}
176209

177-
public function testEmitsErrorEventWhenUnderlyingClientEmitsError()
210+
public function testEndAfterPingWillCloseClientWhenUnderlyingClientEmitsClose()
211+
{
212+
$client = $this->getMockBuilder('Clue\React\Redis\StreamingClient')->disableOriginalConstructor()->setMethods(array('__call', 'end'))->getMock();
213+
//$client = $this->getMockBuilder('Clue\React\Redis\Client')->getMock();
214+
$client->expects($this->once())->method('__call')->with('ping')->willReturn(\React\Promise\resolve('PONG'));
215+
$client->expects($this->once())->method('end');
216+
217+
$deferred = new Deferred();
218+
$this->factory->expects($this->once())->method('createClient')->willReturn($deferred->promise());
219+
220+
$this->client->ping();
221+
$deferred->resolve($client);
222+
223+
$this->client->on('close', $this->expectCallableOnce());
224+
$this->client->end();
225+
226+
$client->emit('close');
227+
}
228+
229+
public function testEmitsNoErrorEventWhenUnderlyingClientEmitsError()
178230
{
179231
$error = new \RuntimeException();
180232

181-
$client = $this->getMockBuilder('Clue\React\Redis\StreamingClient')->disableOriginalConstructor()->setMethods(array('close'))->getMock();
233+
$client = $this->getMockBuilder('Clue\React\Redis\StreamingClient')->disableOriginalConstructor()->setMethods(array('__call'))->getMock();
234+
$client->expects($this->once())->method('__call')->willReturn(\React\Promise\resolve());
182235

183236
$deferred = new Deferred();
184237
$this->factory->expects($this->once())->method('createClient')->willReturn($deferred->promise());
185238

186239
$this->client->ping();
187240
$deferred->resolve($client);
188241

189-
$this->client->on('error', $this->expectCallableOnceWith($error));
242+
$this->client->on('error', $this->expectCallableNever());
190243
$client->emit('error', array($error));
191244
}
192245

193-
public function testEmitsCloseEventWhenUnderlyingClientEmitsClose()
246+
public function testEmitsNoCloseEventWhenUnderlyingClientEmitsClose()
194247
{
195-
$client = $this->getMockBuilder('Clue\React\Redis\StreamingClient')->disableOriginalConstructor()->setMethods(array('close'))->getMock();
248+
$client = $this->getMockBuilder('Clue\React\Redis\StreamingClient')->disableOriginalConstructor()->setMethods(array('__call'))->getMock();
249+
$client->expects($this->once())->method('__call')->willReturn(\React\Promise\resolve());
196250

197251
$deferred = new Deferred();
198252
$this->factory->expects($this->once())->method('createClient')->willReturn($deferred->promise());
199253

200254
$this->client->ping();
201255
$deferred->resolve($client);
202256

203-
$this->client->on('close', $this->expectCallableOnce());
257+
$this->client->on('close', $this->expectCallableNever());
204258
$client->emit('close');
205259
}
206260

207261
public function testEmitsMessageEventWhenUnderlyingClientEmitsMessageForPubSubChannel()
208262
{
209-
$client = $this->getMockBuilder('Clue\React\Redis\StreamingClient')->disableOriginalConstructor()->setMethods(array('close'))->getMock();
263+
$client = $this->getMockBuilder('Clue\React\Redis\StreamingClient')->disableOriginalConstructor()->setMethods(array('__call'))->getMock();
264+
$client->expects($this->once())->method('__call')->willReturn(\React\Promise\resolve());
210265

211266
$deferred = new Deferred();
212267
$this->factory->expects($this->once())->method('createClient')->willReturn($deferred->promise());

0 commit comments

Comments
 (0)