Skip to content

Commit 7099196

Browse files
committed
Merge pull request clue#12 from clue/responseapi
Split off ResponseApi from Client
2 parents 8912e90 + cc901b6 commit 7099196

File tree

9 files changed

+329
-93
lines changed

9 files changed

+329
-93
lines changed

README.md

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,19 +13,21 @@ local redis server and send some requests:
1313

1414
$factory = new Factory($connector);
1515
$factory->createClient()->then(function (Client $client) use ($loop) {
16-
$client->SET('greeting', 'Hello world');
17-
$client->APPEND('greeting', '!');
16+
$api = new RequestApi($client);
1817

19-
$client->GET('greeting')->then(function ($greeting) {
18+
$api->set('greeting', 'Hello world');
19+
$api->append('greeting', '!');
20+
21+
$api->get('greeting')->then(function ($greeting) {
2022
echo $greeting . PHP_EOL;
2123
});
2224

23-
$client->INCR('invocation')->then(function ($n) {
25+
$api->incr('invocation')->then(function ($n) {
2426
echo 'count: ' . $n . PHP_EOL;
2527
});
2628

2729
// end connection once all pending requests have been resolved
28-
$client->end();
30+
$api->end();
2931
});
3032

3133
$loop->run();

example/cli.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
$line = fgets(STDIN);
3131
if ($line === false || $line === '') {
3232
echo '# CTRL-D -> Ending connection...' . PHP_EOL;
33-
$client->end();
33+
$client->close();
3434
} else {
3535
$line = rtrim($line);
3636

@@ -39,7 +39,7 @@
3939
} else {
4040
$params = explode(' ', $line);
4141
$method = array_shift($params);
42-
call_user_func_array(array($client, $method), $params);
42+
$client->send($method, $params);
4343
}
4444
}
4545
});

example/incr.php

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
use Clue\React\Redis\Client;
44
use Clue\React\Redis\Factory;
5+
use Clue\React\Redis\RequestApi;
56

67
require __DIR__ . '/../vendor/autoload.php';
78

@@ -12,13 +13,15 @@
1213
$factory = new Factory($connector);
1314

1415
$factory->createClient()->then(function (Client $client) {
15-
$client->incr('test');
16+
$api = new RequestApi($client);
1617

17-
$client->get('test')->then(function ($result) {
18+
$api->incr('test');
19+
20+
$api->get('test')->then(function ($result) {
1821
var_dump($result);
1922
});
2023

21-
$client->end();
24+
$api->end();
2225
});
2326

2427
$loop->run();

src/Client.php

Lines changed: 16 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@ class Client extends EventEmitter
2020
private $stream;
2121
private $parser;
2222
private $serializer;
23-
private $requests = array();
24-
private $ending = false;
2523

2624
public function __construct(Stream $stream, ParserInterface $parser = null, SerializerInterface $serializer = null)
2725
{
@@ -48,7 +46,7 @@ public function __construct(Stream $stream, ParserInterface $parser = null, Seri
4846

4947
foreach ($models as $data) {
5048
try {
51-
$that->handleMessage($data);
49+
$that->emit('message', array($data, $that));
5250
}
5351
catch (UnderflowException $error) {
5452
$that->emit('error', array($error));
@@ -67,73 +65,32 @@ public function __construct(Stream $stream, ParserInterface $parser = null, Seri
6765
$this->serializer = $serializer;
6866
}
6967

70-
public function __call($name, $args)
71-
{
72-
$request = new Deferred();
73-
74-
if ($this->ending) {
75-
$request->reject(new RuntimeException('Connection closed'));
76-
} else {
77-
$this->stream->write($this->serializer->getRequestMessage($name, $args));
78-
$this->requests []= $request;
79-
}
80-
81-
return $request->promise();
82-
}
83-
84-
public function handleMessage(ModelInterface $message)
85-
{
86-
$this->emit('message', array($message, $this));
87-
88-
if (!$this->requests) {
89-
throw new UnderflowException('Unexpected reply received, no matching request found');
90-
}
91-
92-
$request = array_shift($this->requests);
93-
/* @var $request Deferred */
94-
95-
if ($message instanceof ErrorReply) {
96-
$request->reject($message);
97-
} else {
98-
$request->resolve($message->getValueNative());
99-
}
100-
101-
if ($this->ending && !$this->isBusy()) {
102-
$this->close();
103-
}
104-
}
105-
106-
public function isBusy()
68+
/**
69+
* Sends command with given $name and additial $args
70+
*
71+
* @param name $name
72+
* @param array $args
73+
*/
74+
public function sendRequest($name, array $args = array())
10775
{
108-
return !!$this->requests;
76+
$this->stream->write($this->serializer->getRequestMessage($name, $args));
10977
}
11078

11179
/**
112-
* end connection once all pending requests have been replied to
80+
* Sends given message model (request message)
11381
*
114-
* @uses self::close() once all replies have been received
115-
* @see self::close() for closing the connection immediately
82+
* @param ModelInterface $message
11683
*/
117-
public function end()
84+
public function sendMessage(ModelInterface $message)
11885
{
119-
$this->ending = true;
120-
121-
if (!$this->isBusy()) {
122-
$this->close();
123-
}
86+
$this->stream->write($message->getMessageSerialized($this->serializer));
12487
}
12588

89+
/**
90+
* Immediately terminate the connection and discard incoming and outgoing buffers
91+
*/
12692
public function close()
12793
{
128-
$this->ending = true;
129-
13094
$this->stream->close();
131-
132-
// reject all remaining requests in the queue
133-
while($this->requests) {
134-
$request = array_shift($this->requests);
135-
/* @var $request Request */
136-
$request->reject(new RuntimeException('Connection closing'));
137-
}
13895
}
13996
}

src/Factory.php

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,8 @@ public function createClient($target = null)
4343

4444
if ($auth !== null) {
4545
$promise = $promise->then(function (Client $client) use ($auth) {
46-
return $client->auth($auth)->then(
46+
$api = new RequestApi($client);
47+
return $api->auth($auth)->then(
4748
function () use ($client) {
4849
return $client;
4950
},
@@ -57,7 +58,8 @@ function ($error) use ($client) {
5758

5859
if ($db !== null) {
5960
$promise = $promise->then(function (Client $client) use ($db) {
60-
return $client->select($db)->then(
61+
$api = new RequestApi($client);
62+
return $api->select($db)->then(
6163
function () use ($client) {
6264
return $client;
6365
},

src/RequestApi.php

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
<?php
2+
3+
namespace Clue\React\Redis;
4+
5+
use Clue\Redis\Protocol\Model\ModelInterface;
6+
use UnderflowException;
7+
use RuntimeException;
8+
use React\Promise\Deferred;
9+
use Clue\Redis\Protocol\Model\ErrorReply;
10+
11+
class RequestApi
12+
{
13+
private $client;
14+
private $requests = array();
15+
private $ending = false;
16+
17+
public function __construct(Client $client)
18+
{
19+
$this->client = $client;
20+
}
21+
22+
public function __call($name, $args)
23+
{
24+
$request = new Deferred();
25+
26+
if ($this->ending) {
27+
$request->reject(new RuntimeException('Connection closed'));
28+
} else {
29+
if (!$this->isBusy()) {
30+
$this->client->on('message', array($this, 'handleMessage'));
31+
$this->client->on('close', array($this, 'handleClose'));
32+
}
33+
34+
$this->client->sendRequest($name, $args);
35+
$this->requests []= $request;
36+
}
37+
38+
return $request->promise();
39+
}
40+
41+
public function handleMessage(ModelInterface $message)
42+
{
43+
$request = array_shift($this->requests);
44+
/* @var $request Deferred */
45+
46+
if ($message instanceof ErrorReply) {
47+
$request->reject($message);
48+
} else {
49+
$request->resolve($message->getValueNative());
50+
}
51+
52+
if (!$this->isBusy()) {
53+
$this->client->removeListener('message', array($this, 'handleMessage'));
54+
$this->client->removeListener('close', array($this, 'handleClose'));
55+
56+
if ($this->ending) {
57+
$this->client->close();
58+
}
59+
}
60+
}
61+
62+
public function handleClose()
63+
{
64+
$this->ending = true;
65+
66+
$this->client->removeListener('message', array($this, 'handleMessage'));
67+
$this->client->removeListener('close', array($this, 'handleClose'));
68+
69+
// reject all remaining requests in the queue
70+
while($this->requests) {
71+
$request = array_shift($this->requests);
72+
/* @var $request Request */
73+
$request->reject(new RuntimeException('Connection closing'));
74+
}
75+
76+
$this->requests = array();
77+
}
78+
79+
public function isBusy()
80+
{
81+
return !!$this->requests;
82+
}
83+
84+
/**
85+
* end connection once all pending requests have been replied to
86+
*
87+
* @uses self::close() once all replies have been received
88+
* @see self::close() for closing the connection immediately
89+
*/
90+
public function end()
91+
{
92+
$this->ending = true;
93+
94+
if (!$this->isBusy()) {
95+
$this->client->close();
96+
}
97+
}
98+
}

tests/ClientTest.php

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
<?php
2+
3+
use Clue\React\Redis\Client;
4+
use Clue\Redis\Protocol\Parser\ParserException;
5+
use Clue\Redis\Protocol\Model\IntegerReply;
6+
7+
class ClientTest extends TestCase
8+
{
9+
private $stream;
10+
private $parser;
11+
private $serializer;
12+
private $client;
13+
14+
public function setUp()
15+
{
16+
$this->stream = $this->getMockBuilder('React\Stream\Stream')->disableOriginalConstructor()->setMethods(array('write', 'close', 'resume', 'pause'))->getMock();
17+
$this->parser = $this->getMock('Clue\Redis\Protocol\Parser\ParserInterface');
18+
$this->serializer = $this->getMock('Clue\Redis\Protocol\Serializer\SerializerInterface');
19+
20+
$this->client = new Client($this->stream, $this->parser, $this->serializer);
21+
}
22+
23+
public function testSending()
24+
{
25+
$this->serializer->expects($this->once())->method('getRequestMessage')->with($this->equalTo('ping'))->will($this->returnValue('message'));
26+
$this->stream->expects($this->once())->method('write')->with($this->equalTo('message'));
27+
28+
$this->client->sendRequest('ping', array());
29+
}
30+
31+
public function testClosingClientEmitsEvent()
32+
{
33+
//$this->client->on('close', $this->expectCallableOnce());
34+
35+
$this->client->close();
36+
}
37+
38+
public function testClosingStreamClosesClient()
39+
{
40+
$this->client->on('close', $this->expectCallableOnce());
41+
42+
$this->stream->emit('close');
43+
}
44+
45+
public function testReceiveParseErrorEmitsErrorEvent()
46+
{
47+
$this->client->on('error', $this->expectCallableOnce());
48+
//$this->client->on('close', $this->expectCallableOnce());
49+
50+
$this->parser->expects($this->once())->method('pushIncoming')->with($this->equalTo('message'))->will($this->throwException(new ParserException()));
51+
$this->stream->emit('data', array('message'));
52+
}
53+
54+
public function testReceiveMessageEmitsEvent()
55+
{
56+
$this->client->on('message', $this->expectCallableOnce());
57+
58+
$this->parser->expects($this->once())->method('pushIncoming')->with($this->equalTo('message'))->will($this->returnValue(array(new IntegerReply(2))));
59+
$this->stream->emit('data', array('message'));
60+
}
61+
62+
public function testReceiveThrowMessageEmitsErrorEvent()
63+
{
64+
$this->client->on('message', $this->expectCallableOnce());
65+
$this->client->on('message', function() {
66+
throw new UnderflowException();
67+
});
68+
69+
$this->client->on('error', $this->expectCallableOnce());
70+
71+
$this->parser->expects($this->once())->method('pushIncoming')->with($this->equalTo('message'))->will($this->returnValue(array(new IntegerReply(2))));
72+
$this->stream->emit('data', array('message'));
73+
}
74+
75+
public function testDefaultCtor()
76+
{
77+
$client = new Client($this->stream);
78+
}
79+
}

0 commit comments

Comments
 (0)