Skip to content

Commit 58f8cba

Browse files
authored
Merge pull request #52 from clue-labs/close
Reject pending commands if connection is closed
2 parents bb713ac + cc82c01 commit 58f8cba

File tree

5 files changed

+160
-15
lines changed

5 files changed

+160
-15
lines changed

src/Command.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ abstract class Command extends EventEmitter implements CommandInterface
138138
* @param integer $cmd
139139
* @param string $q
140140
*/
141-
public function __construct(Connection $connection)
141+
public function __construct(ConnectionInterface $connection)
142142
{
143143
$this->connection = $connection;
144144
}

src/Connection.php

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,16 @@ public function handleConnectionClosed()
282282
$this->state = self::STATE_CLOSED;
283283
$this->emit('error', [new \RuntimeException('mysql server has gone away'), $this]);
284284
}
285+
286+
// reject all pending commands if connection is closed
287+
while (!$this->executor->isIdle()) {
288+
$command = $this->executor->dequeue();
289+
$command->emit('error', array(
290+
new \RuntimeException('Connection lost'),
291+
$command,
292+
$this
293+
));
294+
}
285295
}
286296

287297
/**

src/Protocal/Parser.php

Lines changed: 51 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,20 @@ class Parser extends EventEmitter
2525
protected $dbname = '';
2626

2727
/**
28-
* @var \React\MySQL\Command
28+
* Keeps a reference to the command that is currently being processed.
29+
*
30+
* The MySQL protocol is inherently sequential, the pending commands will be
31+
* stored in an `Executor` queue.
32+
*
33+
* The MySQL protocol communication starts with the server sending a
34+
* handshake message, so the current command will be `null` until it's our
35+
* turn.
36+
*
37+
* Similarly, when one command is finished, it will continue processing the
38+
* next command from the `Executor` queue. If no command is outstanding,
39+
* this will be reset to the `null` state.
40+
*
41+
* @var \React\MySQL\Command|null
2942
*/
3043
protected $currCommand;
3144

@@ -80,13 +93,20 @@ class Parser extends EventEmitter
8093
*/
8194
protected $executor;
8295

96+
/**
97+
* @deprecated
98+
* @see self::$currCommand
99+
*/
83100
protected $queue;
84101

85102
public function __construct($stream, $executor)
86103
{
87104
$this->stream = $stream;
88105
$this->executor = $executor;
89-
$this->queue = new \SplQueue($this);
106+
107+
// @deprecated unused, exists for BC only.
108+
$this->queue = new \SplQueue();
109+
90110
$executor->on('new', array($this, 'handleNewCommand'));
91111
}
92112

@@ -98,7 +118,7 @@ public function start()
98118

99119
public function handleNewCommand()
100120
{
101-
if ($this->queue->count() <= 0) {
121+
if ($this->currCommand === null) {
102122
$this->nextRequest();
103123
}
104124
}
@@ -191,8 +211,8 @@ public function parse($data)
191211
$this->errmsg = $this->read($this->pctSize - $len + $this->length());
192212
$this->debug(sprintf("Error Packet:%d %s\n", $this->errno, $this->errmsg));
193213

194-
$this->nextRequest();
195214
$this->onError();
215+
$this->nextRequest();
196216
} elseif ($fieldCount === 0x00) {
197217
// Empty OK Packet
198218
$this->debug('Ok Packet');
@@ -238,8 +258,8 @@ public function parse($data)
238258
// finalize this result set (all rows completed)
239259
$this->debug('result done');
240260

241-
$this->nextRequest();
242261
$this->onResultDone();
262+
$this->nextRequest();
243263
} else {
244264
// move to next part of result set (header->field->row)
245265
++$this->rsState;
@@ -298,14 +318,15 @@ private function onResultRow($row)
298318
{
299319
// $this->debug('row data: ' . json_encode($row));
300320
$this->resultRows[] = $row;
301-
$command = $this->queue->dequeue();
321+
$command = $this->currCommand;
302322
$command->emit('result', array($row, $command, $command->getConnection()));
303-
$this->queue->unshift($command);
304323
}
305324

306325
protected function onError()
307326
{
308-
$command = $this->queue->dequeue();
327+
$command = $this->currCommand;
328+
$this->currCommand = null;
329+
309330
$error = new Exception($this->errmsg, $this->errno);
310331
$command->setError($error);
311332
$command->emit('error', array($error, $command, $command->getConnection()));
@@ -315,7 +336,9 @@ protected function onError()
315336

316337
protected function onResultDone()
317338
{
318-
$command = $this->queue->dequeue();
339+
$command = $this->currCommand;
340+
$this->currCommand = null;
341+
319342
$command->resultRows = $this->resultRows;
320343
$command->resultFields = $this->resultFields;
321344
$command->emit('results', array($this->resultRows, $command, $command->getConnection()));
@@ -327,7 +350,9 @@ protected function onResultDone()
327350

328351
protected function onSuccess()
329352
{
330-
$command = $this->queue->dequeue();
353+
$command = $this->currCommand;
354+
$this->currCommand = null;
355+
331356
if ($command->equals(Command::QUERY)) {
332357
$command->affectedRows = $this->affectedRows;
333358
$command->insertId = $this->insertId;
@@ -339,17 +364,27 @@ protected function onSuccess()
339364

340365
protected function onAuthenticated()
341366
{
342-
$command = $this->queue->dequeue();
367+
$command = $this->currCommand;
368+
$this->currCommand = null;
369+
343370
$command->emit('authenticated', array($this->connectOptions));
344371
}
345372

346373
protected function onClose()
347374
{
348375
$this->emit('close');
349-
if ($this->queue->count()) {
350-
$command = $this->queue->dequeue();
376+
if ($this->currCommand !== null) {
377+
$command = $this->currCommand;
378+
$this->currCommand = null;
379+
351380
if ($command->equals(Command::QUIT)) {
352381
$command->emit('success');
382+
} else {
383+
$command->emit('error', array(
384+
new \RuntimeException('Connection lost'),
385+
$command,
386+
$command->getConnection()
387+
));
353388
}
354389
}
355390
}
@@ -525,9 +560,11 @@ protected function nextRequest($isHandshake = false)
525560
if (!$isHandshake && $this->phase != self::PHASE_HANDSHAKED) {
526561
return false;
527562
}
563+
528564
if (!$this->executor->isIdle()) {
529565
$command = $this->executor->dequeue();
530-
$this->queue->enqueue($command);
566+
$this->currCommand = $command;
567+
531568
if ($command->equals(Command::INIT_AUTHENTICATE)) {
532569
$this->authenticate();
533570
} else {

tests/ConnectionTest.php

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
use React\MySQL\Connection;
66
use React\MySQL\Exception;
7+
use React\Socket\Server;
78

89
class ConnectionTest extends BaseTestCase
910
{
@@ -111,6 +112,59 @@ public function testCloseWhileConnectingWillBeQueuedAfterConnection()
111112
$loop->run();
112113
}
113114

115+
public function testPingAfterConnectWillEmitErrorWhenServerClosesConnection()
116+
{
117+
$this->expectOutputString('Connection lost');
118+
119+
$loop = \React\EventLoop\Factory::create();
120+
121+
$server = new Server(0, $loop);
122+
$server->on('connection', function ($connection) use ($server) {
123+
$server->close();
124+
$connection->close();
125+
});
126+
127+
$parts = parse_url($server->getAddress());
128+
$options = $this->getConnectionOptions();
129+
$options['host'] = $parts['host'];
130+
$options['port'] = $parts['port'];
131+
132+
$conn = new Connection($loop, $options);
133+
134+
$conn->connect(function ($err) {
135+
echo $err ? $err->getMessage() : 'OK';
136+
});
137+
138+
$loop->run();
139+
}
140+
141+
public function testConnectWillEmitErrorWhenServerClosesConnection()
142+
{
143+
$this->expectOutputString('Connection lost');
144+
145+
$loop = \React\EventLoop\Factory::create();
146+
147+
$server = new Server(0, $loop);
148+
$server->on('connection', function ($connection) use ($server) {
149+
$server->close();
150+
$connection->close();
151+
});
152+
153+
$parts = parse_url($server->getAddress());
154+
$options = $this->getConnectionOptions();
155+
$options['host'] = $parts['host'];
156+
$options['port'] = $parts['port'];
157+
158+
$conn = new Connection($loop, $options);
159+
160+
$conn->connect(function () { });
161+
$conn->ping(function ($err) {
162+
echo $err ? $err->getMessage() : 'OK';
163+
});
164+
165+
$loop->run();
166+
}
167+
114168
public function testPingAndCloseWhileConnectingWillBeQueuedAfterConnection()
115169
{
116170
$this->expectOutputString('connectedpingclosed');

tests/Protocal/ParserTest.php

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
<?php
2+
3+
namespace React\Tests\MySQL\Protocal;
4+
5+
use React\MySQL\Commands\QueryCommand;
6+
use React\MySQL\Executor;
7+
use React\MySQL\Protocal\Parser;
8+
use React\Stream\ThroughStream;
9+
use React\Tests\MySQL\BaseTestCase;
10+
11+
class ParserTest extends BaseTestCase
12+
{
13+
public function testClosingStreamEmitsCloseEvent()
14+
{
15+
$stream = new ThroughStream();
16+
$connection = $this->getMockBuilder('React\MySQL\ConnectionInterface')->disableOriginalConstructor()->getMock();
17+
$executor = new Executor($connection);
18+
19+
$parser = new Parser($stream, $executor);
20+
$parser->start();
21+
22+
$parser->on('close', $this->expectCallableOnce());
23+
24+
$stream->close();
25+
}
26+
27+
public function testClosingStreamEmitsErrorForCurrentCommand()
28+
{
29+
$stream = new ThroughStream();
30+
$connection = $this->getMockBuilder('React\MySQL\ConnectionInterface')->disableOriginalConstructor()->getMock();
31+
$executor = new Executor($connection);
32+
33+
$parser = new Parser($stream, $executor);
34+
$parser->start();
35+
36+
$command = new QueryCommand($connection);
37+
$command->on('error', $this->expectCallableOnce());
38+
39+
// hack to inject command as current command
40+
$parser->setOptions(array('currCommand' => $command));
41+
42+
$stream->close();
43+
}
44+
}

0 commit comments

Comments
 (0)