Skip to content

Commit cc82c01

Browse files
committed
Reject pending commands if connection is closed
1 parent 01962f3 commit cc82c01

File tree

5 files changed

+115
-1
lines changed

5 files changed

+115
-1
lines changed

src/Command.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ abstract class Command extends EventEmitter implements CommandInterface
138138
* @param integer $cmd
139139
* @param string $q
140140
*/
141-
public function __construct(Connection $connection)
141+
public function __construct(ConnectionInterface $connection)
142142
{
143143
$this->connection = $connection;
144144
}

src/Connection.php

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,16 @@ public function handleConnectionClosed()
282282
$this->state = self::STATE_CLOSED;
283283
$this->emit('error', [new \RuntimeException('mysql server has gone away'), $this]);
284284
}
285+
286+
// reject all pending commands if connection is closed
287+
while (!$this->executor->isIdle()) {
288+
$command = $this->executor->dequeue();
289+
$command->emit('error', array(
290+
new \RuntimeException('Connection lost'),
291+
$command,
292+
$this
293+
));
294+
}
285295
}
286296

287297
/**

src/Protocal/Parser.php

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -379,6 +379,12 @@ protected function onClose()
379379

380380
if ($command->equals(Command::QUIT)) {
381381
$command->emit('success');
382+
} else {
383+
$command->emit('error', array(
384+
new \RuntimeException('Connection lost'),
385+
$command,
386+
$command->getConnection()
387+
));
382388
}
383389
}
384390
}

tests/ConnectionTest.php

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
use React\MySQL\Connection;
66
use React\MySQL\Exception;
7+
use React\Socket\Server;
78

89
class ConnectionTest extends BaseTestCase
910
{
@@ -111,6 +112,59 @@ public function testCloseWhileConnectingWillBeQueuedAfterConnection()
111112
$loop->run();
112113
}
113114

115+
public function testPingAfterConnectWillEmitErrorWhenServerClosesConnection()
116+
{
117+
$this->expectOutputString('Connection lost');
118+
119+
$loop = \React\EventLoop\Factory::create();
120+
121+
$server = new Server(0, $loop);
122+
$server->on('connection', function ($connection) use ($server) {
123+
$server->close();
124+
$connection->close();
125+
});
126+
127+
$parts = parse_url($server->getAddress());
128+
$options = $this->getConnectionOptions();
129+
$options['host'] = $parts['host'];
130+
$options['port'] = $parts['port'];
131+
132+
$conn = new Connection($loop, $options);
133+
134+
$conn->connect(function ($err) {
135+
echo $err ? $err->getMessage() : 'OK';
136+
});
137+
138+
$loop->run();
139+
}
140+
141+
public function testConnectWillEmitErrorWhenServerClosesConnection()
142+
{
143+
$this->expectOutputString('Connection lost');
144+
145+
$loop = \React\EventLoop\Factory::create();
146+
147+
$server = new Server(0, $loop);
148+
$server->on('connection', function ($connection) use ($server) {
149+
$server->close();
150+
$connection->close();
151+
});
152+
153+
$parts = parse_url($server->getAddress());
154+
$options = $this->getConnectionOptions();
155+
$options['host'] = $parts['host'];
156+
$options['port'] = $parts['port'];
157+
158+
$conn = new Connection($loop, $options);
159+
160+
$conn->connect(function () { });
161+
$conn->ping(function ($err) {
162+
echo $err ? $err->getMessage() : 'OK';
163+
});
164+
165+
$loop->run();
166+
}
167+
114168
public function testPingAndCloseWhileConnectingWillBeQueuedAfterConnection()
115169
{
116170
$this->expectOutputString('connectedpingclosed');

tests/Protocal/ParserTest.php

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
<?php
2+
3+
namespace React\Tests\MySQL\Protocal;
4+
5+
use React\MySQL\Commands\QueryCommand;
6+
use React\MySQL\Executor;
7+
use React\MySQL\Protocal\Parser;
8+
use React\Stream\ThroughStream;
9+
use React\Tests\MySQL\BaseTestCase;
10+
11+
class ParserTest extends BaseTestCase
12+
{
13+
public function testClosingStreamEmitsCloseEvent()
14+
{
15+
$stream = new ThroughStream();
16+
$connection = $this->getMockBuilder('React\MySQL\ConnectionInterface')->disableOriginalConstructor()->getMock();
17+
$executor = new Executor($connection);
18+
19+
$parser = new Parser($stream, $executor);
20+
$parser->start();
21+
22+
$parser->on('close', $this->expectCallableOnce());
23+
24+
$stream->close();
25+
}
26+
27+
public function testClosingStreamEmitsErrorForCurrentCommand()
28+
{
29+
$stream = new ThroughStream();
30+
$connection = $this->getMockBuilder('React\MySQL\ConnectionInterface')->disableOriginalConstructor()->getMock();
31+
$executor = new Executor($connection);
32+
33+
$parser = new Parser($stream, $executor);
34+
$parser->start();
35+
36+
$command = new QueryCommand($connection);
37+
$command->on('error', $this->expectCallableOnce());
38+
39+
// hack to inject command as current command
40+
$parser->setOptions(array('currCommand' => $command));
41+
42+
$stream->close();
43+
}
44+
}

0 commit comments

Comments
 (0)