Skip to content

Commit 9c7bf0c

Browse files
committed
Merge pull request clue#18 from clue/streaming-client
Rename Client to StreamingClient
2 parents a3f392d + a12027a commit 9c7bf0c

File tree

5 files changed

+181
-20
lines changed

5 files changed

+181
-20
lines changed

src/Factory.php

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
use React\SocketClient\ConnectorInterface;
66
use React\Stream\Stream;
7-
use Clue\React\Redis\Client;
7+
use Clue\React\Redis\StreamingClient;
88
use Clue\Redis\Protocol\Factory as ProtocolFactory;
99
use InvalidArgumentException;
1010
use BadMethodCallException;
@@ -38,11 +38,11 @@ public function createClient($target = null)
3838
$protocol = $this->protocol;
3939

4040
$promise = $this->connect($target)->then(function (Stream $stream) use ($protocol) {
41-
return new Client($stream, $protocol->createResponseParser(), $protocol->createSerializer());
41+
return new StreamingClient($stream, $protocol->createResponseParser(), $protocol->createSerializer());
4242
});
4343

4444
if ($auth !== null) {
45-
$promise = $promise->then(function (Client $client) use ($auth) {
45+
$promise = $promise->then(function (StreamingClient $client) use ($auth) {
4646
return $client->auth($auth)->then(
4747
function () use ($client) {
4848
return $client;
@@ -56,7 +56,7 @@ function ($error) use ($client) {
5656
}
5757

5858
if ($db !== null) {
59-
$promise = $promise->then(function (Client $client) use ($db) {
59+
$promise = $promise->then(function (StreamingClient $client) use ($db) {
6060
return $client->select($db)->then(
6161
function () use ($client) {
6262
return $client;

src/Client.php renamed to src/StreamingClient.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
use Clue\Redis\Protocol\Model\ErrorReply;
1616
use Clue\Redis\Protocol\Model\ModelInterface;
1717

18-
class Client extends EventEmitter
18+
class StreamingClient extends EventEmitter
1919
{
2020
private $stream;
2121
private $parser;

tests/FactoryTest.php

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
use Clue\React\Redis\Server;
66

7-
use Clue\React\Redis\Client;
7+
use Clue\React\Redis\StreamingClient;
88

99
use Clue\React\Redis\Factory;
1010

@@ -32,7 +32,7 @@ public function testClientDefaultSuccess()
3232
{
3333
$promise = $this->factory->createClient();
3434

35-
$this->expectPromiseResolve($promise)->then(function (Client $client) {
35+
$this->expectPromiseResolve($promise)->then(function (StreamingClient $client) {
3636
$client->end();
3737
});
3838

@@ -46,7 +46,7 @@ public function testClientAuthSelect()
4646
{
4747
$promise = $this->factory->createClient('tcp://[email protected]:6379/0');
4848

49-
$this->expectPromiseResolve($promise)->then(function (Client $client) {
49+
$this->expectPromiseResolve($promise)->then(function (StreamingClient $client) {
5050
$client->end();
5151
});
5252

@@ -60,7 +60,7 @@ public function testClientAuthenticationContainsColons()
6060
{
6161
$promise = $this->factory->createClient('tcp://authentication:can:contain:[email protected]:6379');
6262

63-
$this->expectPromiseResolve($promise)->then(function (Client $client) {
63+
$this->expectPromiseResolve($promise)->then(function (StreamingClient $client) {
6464
$client->end();
6565
});
6666

tests/FunctionalTest.php

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
use Clue\React\Redis\Factory;
88

9-
use Clue\React\Redis\Client;
9+
use Clue\React\Redis\StreamingClient;
1010

1111
class FunctionalTest extends TestCase
1212
{
@@ -40,10 +40,10 @@ public function testPing()
4040

4141
/**
4242
*
43-
* @param Client $client
43+
* @param StreamingClient $client
4444
* @depends testPing
4545
*/
46-
public function testPipeline(Client $client)
46+
public function testPipeline(StreamingClient $client)
4747
{
4848
$this->assertFalse($client->isBusy());
4949

@@ -61,10 +61,10 @@ public function testPipeline(Client $client)
6161

6262
/**
6363
*
64-
* @param Client $client
64+
* @param StreamingClient $client
6565
* @depends testPipeline
6666
*/
67-
public function testInvalidCommand(Client $client)
67+
public function testInvalidCommand(StreamingClient $client)
6868
{
6969
$client->doesnotexist(1, 2, 3)->then($this->expectCallableNever());
7070

@@ -75,10 +75,10 @@ public function testInvalidCommand(Client $client)
7575

7676
/**
7777
*
78-
* @param Client $client
78+
* @param StreamingClient $client
7979
* @depends testInvalidCommand
8080
*/
81-
public function testMultiExecEmpty(Client $client)
81+
public function testMultiExecEmpty(StreamingClient $client)
8282
{
8383
$client->multi()->then($this->expectCallableOnce('OK'));
8484
$client->exec()->then($this->expectCallableOnce(array()));
@@ -90,10 +90,10 @@ public function testMultiExecEmpty(Client $client)
9090

9191
/**
9292
*
93-
* @param Client $client
93+
* @param StreamingClient $client
9494
* @depends testMultiExecEmpty
9595
*/
96-
public function testMultiExecQueuedExecHasValues(Client $client)
96+
public function testMultiExecQueuedExecHasValues(StreamingClient $client)
9797
{
9898
$client->multi()->then($this->expectCallableOnce('OK'));
9999
$client->set('b', 10)->then($this->expectCallableOnce('QUEUED'));
@@ -191,7 +191,7 @@ protected function createClientResponse($response)
191191

192192
$stream = new Stream($fp, self::$loop);
193193

194-
return new Client($stream);
194+
return new StreamingClient($stream);
195195
}
196196

197197
protected function createServer($response)
@@ -201,7 +201,7 @@ protected function createServer($response)
201201

202202
}
203203

204-
protected function waitFor(Client $client)
204+
protected function waitFor(StreamingClient $client)
205205
{
206206
$this->assertTrue($client->isBusy());
207207

tests/StreamingClientTest.php

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
<?php
2+
3+
use Clue\React\Redis\StreamingClient;
4+
use Clue\Redis\Protocol\Parser\ParserException;
5+
use Clue\Redis\Protocol\Model\IntegerReply;
6+
use Clue\Redis\Protocol\Model\BulkReply;
7+
use Clue\Redis\Protocol\Model\ErrorReply;
8+
9+
class ClientTest extends TestCase
10+
{
11+
private $stream;
12+
private $parser;
13+
private $serializer;
14+
private $client;
15+
16+
public function setUp()
17+
{
18+
$this->stream = $this->getMockBuilder('React\Stream\Stream')->disableOriginalConstructor()->setMethods(array('write', 'close', 'resume', 'pause'))->getMock();
19+
$this->parser = $this->getMock('Clue\Redis\Protocol\Parser\ParserInterface');
20+
$this->serializer = $this->getMock('Clue\Redis\Protocol\Serializer\SerializerInterface');
21+
22+
$this->client = new StreamingClient($this->stream, $this->parser, $this->serializer);
23+
}
24+
25+
public function testSending()
26+
{
27+
$this->serializer->expects($this->once())->method('getRequestMessage')->with($this->equalTo('ping'))->will($this->returnValue('message'));
28+
$this->stream->expects($this->once())->method('write')->with($this->equalTo('message'));
29+
30+
$this->client->ping();
31+
}
32+
33+
public function testClosingClientEmitsEvent()
34+
{
35+
//$this->client->on('close', $this->expectCallableOnce());
36+
37+
$this->client->close();
38+
}
39+
40+
public function testClosingStreamClosesClient()
41+
{
42+
$this->client->on('close', $this->expectCallableOnce());
43+
44+
$this->stream->emit('close');
45+
}
46+
47+
public function testReceiveParseErrorEmitsErrorEvent()
48+
{
49+
$this->client->on('error', $this->expectCallableOnce());
50+
//$this->client->on('close', $this->expectCallableOnce());
51+
52+
$this->parser->expects($this->once())->method('pushIncoming')->with($this->equalTo('message'))->will($this->throwException(new ParserException()));
53+
$this->stream->emit('data', array('message'));
54+
}
55+
56+
public function testReceiveMessageEmitsEvent()
57+
{
58+
$this->client->on('message', $this->expectCallableOnce());
59+
60+
$this->parser->expects($this->once())->method('pushIncoming')->with($this->equalTo('message'))->will($this->returnValue(array(new IntegerReply(2))));
61+
$this->stream->emit('data', array('message'));
62+
}
63+
64+
public function testReceiveThrowMessageEmitsErrorEvent()
65+
{
66+
$this->client->on('message', $this->expectCallableOnce());
67+
$this->client->on('message', function() {
68+
throw new UnderflowException();
69+
});
70+
71+
$this->client->on('error', $this->expectCallableOnce());
72+
73+
$this->parser->expects($this->once())->method('pushIncoming')->with($this->equalTo('message'))->will($this->returnValue(array(new IntegerReply(2))));
74+
$this->stream->emit('data', array('message'));
75+
}
76+
77+
public function testDefaultCtor()
78+
{
79+
$client = new StreamingClient($this->stream);
80+
}
81+
82+
public function testPingPong()
83+
{
84+
$this->serializer->expects($this->once())->method('getRequestMessage')->with($this->equalTo('ping'));
85+
86+
$promise = $this->client->ping();
87+
88+
$this->client->handleMessage(new BulkReply('PONG'));
89+
90+
$this->expectPromiseResolve($promise);
91+
$promise->then($this->expectCallableOnce('PONG'));
92+
}
93+
94+
public function testErrorReply()
95+
{
96+
$promise = $this->client->invalid();
97+
98+
$err = new ErrorReply("ERR unknown command 'invalid'");
99+
$this->client->handleMessage($err);
100+
101+
$this->expectPromiseReject($promise);
102+
$promise->then(null, $this->expectCallableOnce($err));
103+
}
104+
105+
public function testClosingClientRejectsAllRemainingRequests()
106+
{
107+
$promise = $this->client->ping();
108+
$this->assertTrue($this->client->isBusy());
109+
110+
$this->client->close();
111+
112+
$this->expectPromiseReject($promise);
113+
$this->assertFalse($this->client->isBusy());
114+
}
115+
116+
public function testClosedClientRejectsAllNewRequests()
117+
{
118+
$this->client->close();
119+
120+
$promise = $this->client->ping();
121+
122+
$this->expectPromiseReject($promise);
123+
$this->assertFalse($this->client->isBusy());
124+
}
125+
126+
public function testEndingNonBusyClosesClient()
127+
{
128+
$this->markTestIncomplete();
129+
130+
$this->assertFalse($this->client->isBusy());
131+
132+
$this->client->on('close', $this->expectCallableOnce());
133+
$this->client->end();
134+
}
135+
136+
public function testEndingBusyClosesClientWhenNotBusyAnymore()
137+
{
138+
$this->markTestIncomplete();
139+
140+
// count how often the "close" method has been called
141+
$closed = 0;
142+
$this->client->on('close', function() use (&$closed) {
143+
++$closed;
144+
});
145+
146+
$promise = $this->client->ping();
147+
$this->assertEquals(0, $closed);
148+
149+
$this->client->end();
150+
$this->assertEquals(0, $closed);
151+
152+
$this->client->handleMessage(new BulkReply('PONG'));
153+
$this->assertEquals(1, $closed);
154+
}
155+
156+
public function testReceivingUnexpectedMessageThrowsException()
157+
{
158+
$this->setExpectedException('UnderflowException');
159+
$this->client->handleMessage(new BulkReply('PONG'));
160+
}
161+
}

0 commit comments

Comments
 (0)