Skip to content

Commit f87e0d5

Browse files
committed
Split off ResponseApi from Client.
Handling the Request-Response pattern has been moved to a dedicated ResponseApi class. This helps keeping concerns separated and prepares this project to support the PubSub pattern.
1 parent 8912e90 commit f87e0d5

File tree

8 files changed

+311
-90
lines changed

8 files changed

+311
-90
lines changed

example/cli.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
$line = fgets(STDIN);
3131
if ($line === false || $line === '') {
3232
echo '# CTRL-D -> Ending connection...' . PHP_EOL;
33-
$client->end();
33+
$client->close();
3434
} else {
3535
$line = rtrim($line);
3636

@@ -39,7 +39,7 @@
3939
} else {
4040
$params = explode(' ', $line);
4141
$method = array_shift($params);
42-
call_user_func_array(array($client, $method), $params);
42+
$client->send($method, $params);
4343
}
4444
}
4545
});

example/incr.php

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
use Clue\React\Redis\Client;
44
use Clue\React\Redis\Factory;
5+
use Clue\React\Redis\ResponseApi;
56

67
require __DIR__ . '/../vendor/autoload.php';
78

@@ -12,13 +13,15 @@
1213
$factory = new Factory($connector);
1314

1415
$factory->createClient()->then(function (Client $client) {
15-
$client->incr('test');
16+
$api = new ResponseApi($client);
1617

17-
$client->get('test')->then(function ($result) {
18+
$api->incr('test');
19+
20+
$api->get('test')->then(function ($result) {
1821
var_dump($result);
1922
});
2023

21-
$client->end();
24+
$api->end();
2225
});
2326

2427
$loop->run();

src/Client.php

Lines changed: 3 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@ class Client extends EventEmitter
2020
private $stream;
2121
private $parser;
2222
private $serializer;
23-
private $requests = array();
24-
private $ending = false;
2523

2624
public function __construct(Stream $stream, ParserInterface $parser = null, SerializerInterface $serializer = null)
2725
{
@@ -48,7 +46,7 @@ public function __construct(Stream $stream, ParserInterface $parser = null, Seri
4846

4947
foreach ($models as $data) {
5048
try {
51-
$that->handleMessage($data);
49+
$that->emit('message', array($data, $that));
5250
}
5351
catch (UnderflowException $error) {
5452
$that->emit('error', array($error));
@@ -67,73 +65,13 @@ public function __construct(Stream $stream, ParserInterface $parser = null, Seri
6765
$this->serializer = $serializer;
6866
}
6967

70-
public function __call($name, $args)
68+
public function send($name, $args)
7169
{
72-
$request = new Deferred();
73-
74-
if ($this->ending) {
75-
$request->reject(new RuntimeException('Connection closed'));
76-
} else {
77-
$this->stream->write($this->serializer->getRequestMessage($name, $args));
78-
$this->requests []= $request;
79-
}
80-
81-
return $request->promise();
82-
}
83-
84-
public function handleMessage(ModelInterface $message)
85-
{
86-
$this->emit('message', array($message, $this));
87-
88-
if (!$this->requests) {
89-
throw new UnderflowException('Unexpected reply received, no matching request found');
90-
}
91-
92-
$request = array_shift($this->requests);
93-
/* @var $request Deferred */
94-
95-
if ($message instanceof ErrorReply) {
96-
$request->reject($message);
97-
} else {
98-
$request->resolve($message->getValueNative());
99-
}
100-
101-
if ($this->ending && !$this->isBusy()) {
102-
$this->close();
103-
}
104-
}
105-
106-
public function isBusy()
107-
{
108-
return !!$this->requests;
109-
}
110-
111-
/**
112-
* end connection once all pending requests have been replied to
113-
*
114-
* @uses self::close() once all replies have been received
115-
* @see self::close() for closing the connection immediately
116-
*/
117-
public function end()
118-
{
119-
$this->ending = true;
120-
121-
if (!$this->isBusy()) {
122-
$this->close();
123-
}
70+
$this->stream->write($this->serializer->getRequestMessage($name, $args));
12471
}
12572

12673
public function close()
12774
{
128-
$this->ending = true;
129-
13075
$this->stream->close();
131-
132-
// reject all remaining requests in the queue
133-
while($this->requests) {
134-
$request = array_shift($this->requests);
135-
/* @var $request Request */
136-
$request->reject(new RuntimeException('Connection closing'));
137-
}
13876
}
13977
}

src/Factory.php

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,8 @@ public function createClient($target = null)
4343

4444
if ($auth !== null) {
4545
$promise = $promise->then(function (Client $client) use ($auth) {
46-
return $client->auth($auth)->then(
46+
$api = new ResponseApi($client);
47+
return $api->auth($auth)->then(
4748
function () use ($client) {
4849
return $client;
4950
},
@@ -57,7 +58,8 @@ function ($error) use ($client) {
5758

5859
if ($db !== null) {
5960
$promise = $promise->then(function (Client $client) use ($db) {
60-
return $client->select($db)->then(
61+
$api = new ResponseApi($client);
62+
return $api->select($db)->then(
6163
function () use ($client) {
6264
return $client;
6365
},

src/ResponseApi.php

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
<?php
2+
3+
namespace Clue\React\Redis;
4+
5+
use Clue\Redis\Protocol\Model\ModelInterface;
6+
use UnderflowException;
7+
use RuntimeException;
8+
use React\Promise\Deferred;
9+
use Clue\Redis\Protocol\Model\ErrorReply;
10+
11+
class ResponseApi
12+
{
13+
private $client;
14+
private $requests = array();
15+
private $ending = false;
16+
17+
public function __construct(Client $client)
18+
{
19+
$this->client = $client;
20+
21+
$client->on('message', array($this, 'handleMessage'));
22+
$client->on('close', array($this, 'handleClose'));
23+
}
24+
25+
public function __call($name, $args)
26+
{
27+
$request = new Deferred();
28+
29+
if ($this->ending) {
30+
$request->reject(new RuntimeException('Connection closed'));
31+
} else {
32+
$this->client->send($name, $args);
33+
$this->requests []= $request;
34+
}
35+
36+
return $request->promise();
37+
}
38+
39+
public function handleMessage(ModelInterface $message)
40+
{
41+
if (!$this->requests) {
42+
throw new UnderflowException('Unexpected reply received, no matching request found');
43+
}
44+
45+
$request = array_shift($this->requests);
46+
/* @var $request Deferred */
47+
48+
if ($message instanceof ErrorReply) {
49+
$request->reject($message);
50+
} else {
51+
$request->resolve($message->getValueNative());
52+
}
53+
54+
if ($this->ending && !$this->isBusy()) {
55+
$this->client->close();
56+
}
57+
}
58+
59+
public function handleClose()
60+
{
61+
$this->ending = true;
62+
63+
$this->client->removeListener('message', array($this, 'handleMessage'));
64+
$this->client->removeListener('close', array($this, 'handleClose'));
65+
66+
// reject all remaining requests in the queue
67+
while($this->requests) {
68+
$request = array_shift($this->requests);
69+
/* @var $request Request */
70+
$request->reject(new RuntimeException('Connection closing'));
71+
}
72+
73+
$this->requests = array();
74+
}
75+
76+
public function isBusy()
77+
{
78+
return !!$this->requests;
79+
}
80+
81+
/**
82+
* end connection once all pending requests have been replied to
83+
*
84+
* @uses self::close() once all replies have been received
85+
* @see self::close() for closing the connection immediately
86+
*/
87+
public function end()
88+
{
89+
$this->ending = true;
90+
91+
if (!$this->isBusy()) {
92+
$this->client->close();
93+
}
94+
}
95+
}

tests/ClientTest.php

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
<?php
2+
3+
use Clue\React\Redis\Client;
4+
use Clue\Redis\Protocol\Parser\ParserException;
5+
use Clue\Redis\Protocol\Model\IntegerReply;
6+
7+
class ClientTest extends TestCase
8+
{
9+
private $stream;
10+
private $parser;
11+
private $serializer;
12+
private $client;
13+
14+
public function setUp()
15+
{
16+
$this->stream = $this->getMockBuilder('React\Stream\Stream')->disableOriginalConstructor()->setMethods(array('write', 'close', 'resume', 'pause'))->getMock();
17+
$this->parser = $this->getMock('Clue\Redis\Protocol\Parser\ParserInterface');
18+
$this->serializer = $this->getMock('Clue\Redis\Protocol\Serializer\SerializerInterface');
19+
20+
$this->client = new Client($this->stream, $this->parser, $this->serializer);
21+
}
22+
23+
public function testSending()
24+
{
25+
$this->serializer->expects($this->once())->method('getRequestMessage')->with($this->equalTo('ping'))->will($this->returnValue('message'));
26+
$this->stream->expects($this->once())->method('write')->with($this->equalTo('message'));
27+
28+
$this->client->send('ping', array());
29+
}
30+
31+
public function testClosingClientEmitsEvent()
32+
{
33+
//$this->client->on('close', $this->expectCallableOnce());
34+
35+
$this->client->close();
36+
}
37+
38+
public function testClosingStreamClosesClient()
39+
{
40+
$this->client->on('close', $this->expectCallableOnce());
41+
42+
$this->stream->emit('close');
43+
}
44+
45+
public function testReceiveParseErrorEmitsErrorEvent()
46+
{
47+
$this->client->on('error', $this->expectCallableOnce());
48+
//$this->client->on('close', $this->expectCallableOnce());
49+
50+
$this->parser->expects($this->once())->method('pushIncoming')->with($this->equalTo('message'))->will($this->throwException(new ParserException()));
51+
$this->stream->emit('data', array('message'));
52+
}
53+
54+
public function testReceiveMessageEmitsEvent()
55+
{
56+
$this->client->on('message', $this->expectCallableOnce());
57+
58+
$this->parser->expects($this->once())->method('pushIncoming')->with($this->equalTo('message'))->will($this->returnValue(array(new IntegerReply(2))));
59+
$this->stream->emit('data', array('message'));
60+
}
61+
62+
public function testReceiveThrowMessageEmitsErrorEvent()
63+
{
64+
$this->client->on('message', $this->expectCallableOnce());
65+
$this->client->on('message', function() {
66+
throw new UnderflowException();
67+
});
68+
69+
$this->client->on('error', $this->expectCallableOnce());
70+
71+
$this->parser->expects($this->once())->method('pushIncoming')->with($this->equalTo('message'))->will($this->returnValue(array(new IntegerReply(2))));
72+
$this->stream->emit('data', array('message'));
73+
}
74+
75+
public function testDefaultCtor()
76+
{
77+
$client = new Client($this->stream);
78+
}
79+
}

0 commit comments

Comments
 (0)