Skip to content

Commit 9f10eb3

Browse files
committed
Merge pull request clue#23 from clue/monitor
Support MONITOR command, forward "monitor" events
2 parents 3ec3c01 + f4db800 commit 9f10eb3

File tree

5 files changed

+109
-0
lines changed

5 files changed

+109
-0
lines changed

examples/monitor.php

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
<?php
2+
3+
use Clue\React\Redis\Client;
4+
use Clue\React\Redis\Factory;
5+
use Clue\Redis\Protocol\Model\StatusReply;
6+
7+
require __DIR__ . '/../vendor/autoload.php';
8+
9+
$loop = React\EventLoop\Factory::create();
10+
$factory = new Factory($loop);
11+
12+
$factory->createClient()->then(function (Client $client) {
13+
$client->monitor()->then(function ($result) {
14+
echo 'Now monitoring all commands' . PHP_EOL;
15+
});
16+
17+
$client->on('monitor', function (StatusReply $message) {
18+
echo 'Monitored: ' . $message->getValueNative() . PHP_EOL;
19+
});
20+
21+
$client->echo('initial echo');
22+
});
23+
24+
$loop->run();

src/Client.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
*
1212
* @event data(ModelInterface $messageModel, Client $thisClient)
1313
* @event close()
14+
*
15+
* @event monitor(ModelInterface $statusModel)
1416
*/
1517
interface Client extends EventEmitterInterface
1618
{

src/StreamingClient.php

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
use React\Promise\Deferred;
1515
use Clue\Redis\Protocol\Model\ErrorReply;
1616
use Clue\Redis\Protocol\Model\ModelInterface;
17+
use Clue\Redis\Protocol\Model\StatusReply;
1718

1819
class StreamingClient extends EventEmitter implements Client
1920
{
@@ -24,6 +25,8 @@ class StreamingClient extends EventEmitter implements Client
2425
private $ending = false;
2526
private $closed = false;
2627

28+
private $monitoring = false;
29+
2730
public function __construct(Stream $stream, ParserInterface $parser = null, SerializerInterface $serializer = null)
2831
{
2932
if ($parser === null || $serializer === null) {
@@ -78,13 +81,25 @@ public function __call($name, $args)
7881
$this->requests []= $request;
7982
}
8083

84+
if (strtolower($name) === 'monitor') {
85+
$monitoring =& $this->monitoring;
86+
$request->then(function () use (&$monitoring) {
87+
$monitoring = true;
88+
});
89+
}
90+
8191
return $request->promise();
8292
}
8393

8494
public function handleMessage(ModelInterface $message)
8595
{
8696
$this->emit('data', array($message, $this));
8797

98+
if ($this->monitoring && $this->isMonitorMessage($message)) {
99+
$this->emit('monitor', array($message));
100+
return;
101+
}
102+
88103
if (!$this->requests) {
89104
throw new UnderflowException('Unexpected reply received, no matching request found');
90105
}
@@ -137,4 +152,10 @@ public function close()
137152
$request->reject(new RuntimeException('Connection closing'));
138153
}
139154
}
155+
156+
private function isMonitorMessage(ModelInterface $message)
157+
{
158+
// Check status '1409172115.207170 [0 127.0.0.1:58567] "ping"' contains otherwise uncommon '] "'
159+
return ($message instanceof StatusReply && strpos($message->getValueNative(), '] "') !== false);
160+
}
140161
}

tests/FunctionalTest.php

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,21 @@ public function testMultiExecQueuedExecHasValues(StreamingClient $client)
103103
return $client;
104104
}
105105

106+
/**
107+
*
108+
* @param StreamingClient $client
109+
* @depends testPipeline
110+
*/
111+
public function testMonitorPing(StreamingClient $client)
112+
{
113+
$client->on('monitor', $this->expectCallableOnce());
114+
115+
$client->monitor()->then($this->expectCallableOnce('OK'));
116+
$client->ping()->then($this->expectCallableOnce('PONG'));
117+
118+
$this->waitFor($client);
119+
}
120+
106121
public function testPubSub()
107122
{
108123
$consumer = $this->createClient();

tests/StreamingClientTest.php

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
use Clue\Redis\Protocol\Model\IntegerReply;
66
use Clue\Redis\Protocol\Model\BulkReply;
77
use Clue\Redis\Protocol\Model\ErrorReply;
8+
use Clue\Redis\Protocol\Model\StatusReply;
89

910
class ClientTest extends TestCase
1011
{
@@ -91,6 +92,52 @@ public function testPingPong()
9192
$promise->then($this->expectCallableOnce('PONG'));
9293
}
9394

95+
/**
96+
* @expectedException UnderflowException
97+
*/
98+
public function testInvalidMonitor()
99+
{
100+
$this->client->handleMessage(new StatusReply('+1409171800.312243 [0 127.0.0.1:58542] "ping"'));
101+
}
102+
103+
public function testMonitor()
104+
{
105+
$this->serializer->expects($this->once())->method('getRequestMessage')->with($this->equalTo('monitor'));
106+
107+
$promise = $this->client->monitor();
108+
109+
$this->client->handleMessage(new StatusReply('OK'));
110+
111+
$this->expectPromiseResolve($promise);
112+
$promise->then($this->expectCallableOnce('OK'));
113+
114+
return $this->client;
115+
}
116+
117+
/**
118+
* @depends testMonitor
119+
* @param StreamingClient $client
120+
*/
121+
public function testMonitorEvent(StreamingClient $client)
122+
{
123+
$client->on('monitor', $this->expectCallableOnce());
124+
125+
$client->handleMessage(new StatusReply('1409171800.312243 [0 127.0.0.1:58542] "ping"'));
126+
}
127+
128+
/**
129+
* @depends testMonitor
130+
* @param StreamingClient $client
131+
*/
132+
public function testMonitorPing(StreamingClient $client)
133+
{
134+
$client->on('monitor', $this->expectCallableOnce());
135+
136+
$client->ping();
137+
$client->handleMessage(new StatusReply('1409171800.312243 [0 127.0.0.1:58542] "ping"'));
138+
$client->handleMessage(new StatusReply('PONG'));
139+
}
140+
94141
public function testErrorReply()
95142
{
96143
$promise = $this->client->invalid();

0 commit comments

Comments
 (0)