Skip to content

Commit 7fcb65b

Browse files
committed
Merge branch 'master' into pubsub
2 parents 8041e40 + 9f10eb3 commit 7fcb65b

File tree

5 files changed

+108
-0
lines changed

5 files changed

+108
-0
lines changed

examples/monitor.php

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
<?php
2+
3+
use Clue\React\Redis\Client;
4+
use Clue\React\Redis\Factory;
5+
use Clue\Redis\Protocol\Model\StatusReply;
6+
7+
require __DIR__ . '/../vendor/autoload.php';
8+
9+
$loop = React\EventLoop\Factory::create();
10+
$factory = new Factory($loop);
11+
12+
$factory->createClient()->then(function (Client $client) {
13+
$client->monitor()->then(function ($result) {
14+
echo 'Now monitoring all commands' . PHP_EOL;
15+
});
16+
17+
$client->on('monitor', function (StatusReply $message) {
18+
echo 'Monitored: ' . $message->getValueNative() . PHP_EOL;
19+
});
20+
21+
$client->echo('initial echo');
22+
});
23+
24+
$loop->run();

src/Client.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
* @event pmessage($pattern, $channel, $message)
2020
* @event psubscribe($channel, $numberOfChannels)
2121
* @event punsubscribe($channel, $numberOfChannels)
22+
*
23+
* @event monitor(ModelInterface $statusModel)
2224
*/
2325
interface Client extends EventEmitterInterface
2426
{

src/StreamingClient.php

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use Clue\Redis\Protocol\Model\ErrorReply;
1616
use Clue\Redis\Protocol\Model\ModelInterface;
1717
use Clue\Redis\Protocol\Model\MultiBulkReply;
18+
use Clue\Redis\Protocol\Model\StatusReply;
1819

1920
class StreamingClient extends EventEmitter implements Client
2021
{
@@ -27,6 +28,7 @@ class StreamingClient extends EventEmitter implements Client
2728

2829
private $subscribed = 0;
2930
private $psubscribed = 0;
31+
private $monitoring = false;
3032

3133
public function __construct(Stream $stream, ParserInterface $parser = null, SerializerInterface $serializer = null)
3234
{
@@ -82,13 +84,25 @@ public function __call($name, $args)
8284
$this->requests []= $request;
8385
}
8486

87+
if (strtolower($name) === 'monitor') {
88+
$monitoring =& $this->monitoring;
89+
$request->then(function () use (&$monitoring) {
90+
$monitoring = true;
91+
});
92+
}
93+
8594
return $request->promise();
8695
}
8796

8897
public function handleMessage(ModelInterface $message)
8998
{
9099
$this->emit('data', array($message, $this));
91100

101+
if ($this->monitoring && $this->isMonitorMessage($message)) {
102+
$this->emit('monitor', array($message));
103+
return;
104+
}
105+
92106
if (/*($this->subscribed !== 0 || $this->psubscribed !== 0) &&*/ $message instanceof MultiBulkReply) {
93107
$array = $message->getValueNative();
94108
$first = array_shift($array);
@@ -156,4 +170,10 @@ public function close()
156170
$request->reject(new RuntimeException('Connection closing'));
157171
}
158172
}
173+
174+
private function isMonitorMessage(ModelInterface $message)
175+
{
176+
// Check status '1409172115.207170 [0 127.0.0.1:58567] "ping"' contains otherwise uncommon '] "'
177+
return ($message instanceof StatusReply && strpos($message->getValueNative(), '] "') !== false);
178+
}
159179
}

tests/FunctionalTest.php

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,21 @@ public function testMultiExecQueuedExecHasValues(StreamingClient $client)
117117
return $client;
118118
}
119119

120+
/**
121+
*
122+
* @param StreamingClient $client
123+
* @depends testPipeline
124+
*/
125+
public function testMonitorPing(StreamingClient $client)
126+
{
127+
$client->on('monitor', $this->expectCallableOnce());
128+
129+
$client->monitor()->then($this->expectCallableOnce('OK'));
130+
$client->ping()->then($this->expectCallableOnce('PONG'));
131+
132+
$this->waitFor($client);
133+
}
134+
120135
public function testPubSub()
121136
{
122137
$consumer = $this->createClient();

tests/StreamingClientTest.php

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
use Clue\Redis\Protocol\Model\ErrorReply;
88
use Clue\Redis\Protocol\Model\MultiBulkReply;
99
use Clue\React\Redis\Client;
10+
use Clue\Redis\Protocol\Model\StatusReply;
1011

1112
class ClientTest extends TestCase
1213
{
@@ -93,6 +94,52 @@ public function testPingPong()
9394
$promise->then($this->expectCallableOnce('PONG'));
9495
}
9596

97+
/**
98+
* @expectedException UnderflowException
99+
*/
100+
public function testInvalidMonitor()
101+
{
102+
$this->client->handleMessage(new StatusReply('+1409171800.312243 [0 127.0.0.1:58542] "ping"'));
103+
}
104+
105+
public function testMonitor()
106+
{
107+
$this->serializer->expects($this->once())->method('getRequestMessage')->with($this->equalTo('monitor'));
108+
109+
$promise = $this->client->monitor();
110+
111+
$this->client->handleMessage(new StatusReply('OK'));
112+
113+
$this->expectPromiseResolve($promise);
114+
$promise->then($this->expectCallableOnce('OK'));
115+
116+
return $this->client;
117+
}
118+
119+
/**
120+
* @depends testMonitor
121+
* @param StreamingClient $client
122+
*/
123+
public function testMonitorEvent(StreamingClient $client)
124+
{
125+
$client->on('monitor', $this->expectCallableOnce());
126+
127+
$client->handleMessage(new StatusReply('1409171800.312243 [0 127.0.0.1:58542] "ping"'));
128+
}
129+
130+
/**
131+
* @depends testMonitor
132+
* @param StreamingClient $client
133+
*/
134+
public function testMonitorPing(StreamingClient $client)
135+
{
136+
$client->on('monitor', $this->expectCallableOnce());
137+
138+
$client->ping();
139+
$client->handleMessage(new StatusReply('1409171800.312243 [0 127.0.0.1:58542] "ping"'));
140+
$client->handleMessage(new StatusReply('PONG'));
141+
}
142+
96143
public function testErrorReply()
97144
{
98145
$promise = $this->client->invalid();

0 commit comments

Comments
 (0)