Skip to content

Commit 836ca2d

Browse files
committed
Refactor to move command queuing logic to MysqlClient
1 parent df9ac96 commit 836ca2d

File tree

6 files changed

+1269
-99
lines changed

6 files changed

+1269
-99
lines changed

src/Io/Connection.php

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ class Connection extends EventEmitter
4040
*/
4141
private $stream;
4242

43+
/** @var Parser */
44+
private $parser;
45+
4346
/** @var LoopInterface */
4447
private $loop;
4548

@@ -57,13 +60,15 @@ class Connection extends EventEmitter
5760
*
5861
* @param SocketConnectionInterface $stream
5962
* @param Executor $executor
63+
* @param Parser $parser
6064
* @param LoopInterface $loop
6165
* @param ?float $idlePeriod
6266
*/
63-
public function __construct(SocketConnectionInterface $stream, Executor $executor, LoopInterface $loop, $idlePeriod)
67+
public function __construct(SocketConnectionInterface $stream, Executor $executor, Parser $parser, LoopInterface $loop, $idlePeriod)
6468
{
6569
$this->stream = $stream;
6670
$this->executor = $executor;
71+
$this->parser = $parser;
6772

6873
$this->loop = $loop;
6974
if ($idlePeriod !== null) {
@@ -74,6 +79,17 @@ public function __construct(SocketConnectionInterface $stream, Executor $executo
7479
$stream->on('close', [$this, 'handleConnectionClosed']);
7580
}
7681

82+
/**
83+
* busy executing some command such as query or ping
84+
*
85+
* @return bool
86+
* @throws void
87+
*/
88+
public function isBusy()
89+
{
90+
return $this->parser->isBusy() || !$this->executor->isIdle();
91+
}
92+
7793
/**
7894
* {@inheritdoc}
7995
*/

src/Io/Factory.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ public function createConnection(
215215
$executor = new Executor();
216216
$parser = new Parser($stream, $executor);
217217

218-
$connection = new Connection($stream, $executor, $this->loop, $idlePeriod);
218+
$connection = new Connection($stream, $executor, $parser, $this->loop, $idlePeriod);
219219
$command = $executor->enqueue($authCommand);
220220
$parser->start();
221221

src/Io/Parser.php

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,17 @@ public function __construct(DuplexStreamInterface $stream, Executor $executor)
115115
});
116116
}
117117

118+
/**
119+
* busy executing some command such as query or ping
120+
*
121+
* @return bool
122+
* @throws void
123+
*/
124+
public function isBusy()
125+
{
126+
return $this->currCommand !== null;
127+
}
128+
118129
public function start()
119130
{
120131
$this->stream->on('data', [$this, 'handleData']);

src/MysqlClient.php

Lines changed: 105 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
use React\EventLoop\LoopInterface;
77
use React\Mysql\Io\Connection;
88
use React\Mysql\Io\Factory;
9+
use React\Promise\Deferred;
910
use React\Promise\Promise;
1011
use React\Socket\ConnectorInterface;
1112
use React\Stream\ReadableStreamInterface;
@@ -58,6 +59,13 @@ class MysqlClient extends EventEmitter
5859
/** @var ?Connection */
5960
private $connection;
6061

62+
/**
63+
* array of outstanding connection requests to send next commands once a connection becomes ready
64+
*
65+
* @var array<int,Deferred<Connection>>
66+
*/
67+
private $pending = [];
68+
6169
/**
6270
* set to true only between calling `quit()` and the connection closing in response
6371
*
@@ -77,44 +85,6 @@ public function __construct(
7785
$this->uri = $uri;
7886
}
7987

80-
/**
81-
* @return PromiseInterface<Connection>
82-
*/
83-
private function getConnection()
84-
{
85-
// happy path: reuse existing connection unless it is already closing after an idle timeout
86-
if ($this->connection !== null && ($this->quitting || $this->connection->state !== Connection::STATE_CLOSING)) {
87-
return \React\Promise\resolve($this->connection);
88-
}
89-
90-
if ($this->connecting !== null) {
91-
return $this->connecting;
92-
}
93-
94-
// force-close connection if still waiting for previous disconnection
95-
if ($this->connection !== null) {
96-
assert($this->connection->state === Connection::STATE_CLOSING);
97-
$this->connection->close();
98-
}
99-
100-
// create new connection if not already connected or connecting
101-
$this->connecting = $connecting = $this->factory->createConnection($this->uri);
102-
$this->connecting->then(function (Connection $connection) {
103-
$this->connection = $connection;
104-
$this->connecting = null;
105-
106-
// connection completed => remember only until closed
107-
$connection->on('close', function () {
108-
$this->connection = null;
109-
});
110-
}, function () {
111-
// connection failed => discard connection attempt
112-
$this->connecting = null;
113-
});
114-
115-
return $connecting;
116-
}
117-
11888
/**
11989
* Performs an async query.
12090
*
@@ -176,12 +146,18 @@ private function getConnection()
176146
*/
177147
public function query($sql, array $params = [])
178148
{
179-
if ($this->closed) {
149+
if ($this->closed || $this->quitting) {
180150
return \React\Promise\reject(new Exception('Connection closed'));
181151
}
182152

183153
return $this->getConnection()->then(function (Connection $connection) use ($sql, $params) {
184-
return $connection->query($sql, $params);
154+
return $connection->query($sql, $params)->then(function (MysqlResult $result) use ($connection) {
155+
$this->handleConnectionReady($connection);
156+
return $result;
157+
}, function (\Exception $e) use ($connection) {
158+
$this->handleConnectionReady($connection);
159+
throw $e;
160+
});
185161
});
186162
}
187163

@@ -246,13 +222,22 @@ public function query($sql, array $params = [])
246222
*/
247223
public function queryStream($sql, $params = [])
248224
{
249-
if ($this->closed) {
225+
if ($this->closed || $this->quitting) {
250226
throw new Exception('Connection closed');
251227
}
252228

253229
return \React\Promise\Stream\unwrapReadable(
254230
$this->getConnection()->then(function (Connection $connection) use ($sql, $params) {
255-
return $connection->queryStream($sql, $params);
231+
$stream = $connection->queryStream($sql, $params);
232+
233+
$stream->on('end', function () use ($connection) {
234+
$this->handleConnectionReady($connection);
235+
});
236+
$stream->on('error', function () use ($connection) {
237+
$this->handleConnectionReady($connection);
238+
});
239+
240+
return $stream;
256241
})
257242
);
258243
}
@@ -279,12 +264,17 @@ public function queryStream($sql, $params = [])
279264
*/
280265
public function ping()
281266
{
282-
if ($this->closed) {
267+
if ($this->closed || $this->quitting) {
283268
return \React\Promise\reject(new Exception('Connection closed'));
284269
}
285270

286271
return $this->getConnection()->then(function (Connection $connection) {
287-
return $connection->ping();
272+
return $connection->ping()->then(function () use ($connection) {
273+
$this->handleConnectionReady($connection);
274+
}, function (\Exception $e) use ($connection) {
275+
$this->handleConnectionReady($connection);
276+
throw $e;
277+
});
288278
});
289279
}
290280

@@ -312,7 +302,7 @@ public function ping()
312302
*/
313303
public function quit()
314304
{
315-
if ($this->closed) {
305+
if ($this->closed || $this->quitting) {
316306
return \React\Promise\reject(new Exception('Connection closed'));
317307
}
318308

@@ -379,7 +369,77 @@ public function close()
379369
$this->connecting = null;
380370
}
381371

372+
// clear all outstanding commands
373+
foreach ($this->pending as $deferred) {
374+
$deferred->reject(new \RuntimeException('Connection closed'));
375+
}
376+
$this->pending = [];
377+
382378
$this->emit('close');
383379
$this->removeAllListeners();
384380
}
381+
382+
383+
/**
384+
* @return PromiseInterface<Connection>
385+
*/
386+
private function getConnection()
387+
{
388+
$deferred = new Deferred();
389+
390+
// force-close connection if still waiting for previous disconnection due to idle timer
391+
if ($this->connection !== null && $this->connection->state === Connection::STATE_CLOSING) {
392+
$this->connection->close();
393+
$this->connection = null;
394+
}
395+
396+
// happy path: reuse existing connection unless it is currently busy executing another command
397+
if ($this->connection !== null && !$this->connection->isBusy()) {
398+
$deferred->resolve($this->connection);
399+
return $deferred->promise();
400+
}
401+
402+
// queue pending connection request until connection becomes ready
403+
$this->pending[] = $deferred;
404+
405+
// create new connection if not already connected or connecting
406+
if ($this->connection === null && $this->connecting === null) {
407+
$this->connecting = $this->factory->createConnection($this->uri);
408+
$this->connecting->then(function (Connection $connection) {
409+
// connection completed => remember only until closed
410+
$this->connecting = null;
411+
$this->connection = $connection;
412+
$connection->on('close', function () {
413+
$this->connection = null;
414+
});
415+
416+
// handle first command from queue when connection is ready
417+
$this->handleConnectionReady($connection);
418+
}, function (\Exception $e) {
419+
// connection failed => discard connection attempt
420+
$this->connecting = null;
421+
422+
foreach ($this->pending as $key => $deferred) {
423+
$deferred->reject($e);
424+
unset($this->pending[$key]);
425+
}
426+
});
427+
}
428+
429+
return $deferred->promise();
430+
}
431+
432+
private function handleConnectionReady(Connection $connection)
433+
{
434+
$deferred = \reset($this->pending);
435+
if ($deferred === false) {
436+
// nothing to do if there are no outstanding connection requests
437+
return;
438+
}
439+
440+
assert($deferred instanceof Deferred);
441+
unset($this->pending[\key($this->pending)]);
442+
443+
$deferred->resolve($connection);
444+
}
385445
}

0 commit comments

Comments
 (0)