Skip to content

Commit f4db800

Browse files
committed
Only emit "monitor" event when actually receiving a monitor message
1 parent d574654 commit f4db800

File tree

3 files changed

+70
-2
lines changed

3 files changed

+70
-2
lines changed

src/StreamingClient.php

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,8 @@ public function handleMessage(ModelInterface $message)
9595
{
9696
$this->emit('data', array($message, $this));
9797

98-
if ($this->monitoring && $message instanceof StatusReply) {
99-
$this->emit('monitor', array($message, $this));
98+
if ($this->monitoring && $this->isMonitorMessage($message)) {
99+
$this->emit('monitor', array($message));
100100
return;
101101
}
102102

@@ -152,4 +152,10 @@ public function close()
152152
$request->reject(new RuntimeException('Connection closing'));
153153
}
154154
}
155+
156+
private function isMonitorMessage(ModelInterface $message)
157+
{
158+
// Check status '1409172115.207170 [0 127.0.0.1:58567] "ping"' contains otherwise uncommon '] "'
159+
return ($message instanceof StatusReply && strpos($message->getValueNative(), '] "') !== false);
160+
}
155161
}

tests/FunctionalTest.php

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,21 @@ public function testMultiExecQueuedExecHasValues(StreamingClient $client)
103103
return $client;
104104
}
105105

106+
/**
107+
*
108+
* @param StreamingClient $client
109+
* @depends testPipeline
110+
*/
111+
public function testMonitorPing(StreamingClient $client)
112+
{
113+
$client->on('monitor', $this->expectCallableOnce());
114+
115+
$client->monitor()->then($this->expectCallableOnce('OK'));
116+
$client->ping()->then($this->expectCallableOnce('PONG'));
117+
118+
$this->waitFor($client);
119+
}
120+
106121
public function testPubSub()
107122
{
108123
$consumer = $this->createClient();

tests/StreamingClientTest.php

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
use Clue\Redis\Protocol\Model\IntegerReply;
66
use Clue\Redis\Protocol\Model\BulkReply;
77
use Clue\Redis\Protocol\Model\ErrorReply;
8+
use Clue\Redis\Protocol\Model\StatusReply;
89

910
class ClientTest extends TestCase
1011
{
@@ -91,6 +92,52 @@ public function testPingPong()
9192
$promise->then($this->expectCallableOnce('PONG'));
9293
}
9394

95+
/**
96+
* @expectedException UnderflowException
97+
*/
98+
public function testInvalidMonitor()
99+
{
100+
$this->client->handleMessage(new StatusReply('+1409171800.312243 [0 127.0.0.1:58542] "ping"'));
101+
}
102+
103+
public function testMonitor()
104+
{
105+
$this->serializer->expects($this->once())->method('getRequestMessage')->with($this->equalTo('monitor'));
106+
107+
$promise = $this->client->monitor();
108+
109+
$this->client->handleMessage(new StatusReply('OK'));
110+
111+
$this->expectPromiseResolve($promise);
112+
$promise->then($this->expectCallableOnce('OK'));
113+
114+
return $this->client;
115+
}
116+
117+
/**
118+
* @depends testMonitor
119+
* @param StreamingClient $client
120+
*/
121+
public function testMonitorEvent(StreamingClient $client)
122+
{
123+
$client->on('monitor', $this->expectCallableOnce());
124+
125+
$client->handleMessage(new StatusReply('1409171800.312243 [0 127.0.0.1:58542] "ping"'));
126+
}
127+
128+
/**
129+
* @depends testMonitor
130+
* @param StreamingClient $client
131+
*/
132+
public function testMonitorPing(StreamingClient $client)
133+
{
134+
$client->on('monitor', $this->expectCallableOnce());
135+
136+
$client->ping();
137+
$client->handleMessage(new StatusReply('1409171800.312243 [0 127.0.0.1:58542] "ping"'));
138+
$client->handleMessage(new StatusReply('PONG'));
139+
}
140+
94141
public function testErrorReply()
95142
{
96143
$promise = $this->client->invalid();

0 commit comments

Comments
 (0)