Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ given event loop instance.

#### query()

The `query(string $query, array $params = []): PromiseInterface<MysqlResult>` method can be used to
The `query(string $query, list<string|int|float|bool|null> $params = []): PromiseInterface<MysqlResult>` method can be used to
perform an async query.

This method returns a promise that will resolve with a `MysqlResult` on
Expand Down Expand Up @@ -258,7 +258,7 @@ suited for exposing multiple possible results.

#### queryStream()

The `queryStream(string $sql, array $params = []): ReadableStreamInterface` method can be used to
The `queryStream(string $sql, list<string|int|float|bool|null> $params = []): ReadableStreamInterface` method can be used to
perform an async query and stream the rows of the result set.

This method returns a readable stream that will emit each row of the
Expand Down
17 changes: 2 additions & 15 deletions src/Io/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -90,16 +90,8 @@ public function isBusy()
return $this->parser->isBusy() || !$this->executor->isIdle();
}

/**
* {@inheritdoc}
*/
public function query($sql, array $params = [])
public function query(Query $query)
{
$query = new Query($sql);
if ($params) {
$query->bindParamsFromArray($params);
}

$command = new QueryCommand();
$command->setQuery($query);
try {
Expand Down Expand Up @@ -146,13 +138,8 @@ public function query($sql, array $params = [])
return $deferred->promise();
}

public function queryStream($sql, $params = [])
public function queryStream(Query $query)
{
$query = new Query($sql);
if ($params) {
$query->bindParamsFromArray($params);
}

$command = new QueryCommand();
$command->setQuery($query);
$this->_doCommand($command);
Expand Down
58 changes: 12 additions & 46 deletions src/Io/Query.php
Original file line number Diff line number Diff line change
Expand Up @@ -41,46 +41,22 @@ class Query
//"_" => "\\_",
];

public function __construct($sql)
{
$this->sql = $this->builtSql = $sql;
}

/**
* Binding params for the query, multiple arguments support.
*
* @param mixed $param
* @return self
*/
public function bindParams()
{
$this->builtSql = null;
$this->params = func_get_args();

return $this;
}

public function bindParamsFromArray(array $params)
{
$this->builtSql = null;
$this->params = $params;

return $this;
}

/**
* Binding params for the query, multiple arguments support.
*
* @param mixed $param
* @return self
* @deprecated
* @param string $sql
* @param list<string|int|float|bool|null> $params
* @throws \InvalidArgumentException if given $params are invalid
*/
public function params()
public function __construct($sql, array $params = [])
{
$this->params = func_get_args();
$this->builtSql = null;
foreach ($params as $param) {
if (!\is_scalar($param) && $param !== null) {
throw new \InvalidArgumentException('Query param must be of type string|int|float|bool|null, ' . (\is_object($param) ? \get_class($param) : \gettype($param)) . ' given');
}
}

return $this;
$this->sql = $sql;
$this->builtSql = $params ? null : $sql;
$this->params = $params;
}

public function escape($str)
Expand All @@ -105,19 +81,9 @@ protected function resolveValueForSql($value)
case 'string':
$value = "'" . $this->escape($value) . "'";
break;
case 'array':
$nvalue = [];
foreach ($value as $v) {
$nvalue[] = $this->resolveValueForSql($v);
}
$value = implode(',', $nvalue);
break;
case 'NULL':
$value = 'NULL';
break;
default:
throw new \InvalidArgumentException(sprintf('Not supported value type of %s.', $type));
break;
}

return $value;
Expand Down
26 changes: 17 additions & 9 deletions src/MysqlClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
use React\EventLoop\LoopInterface;
use React\Mysql\Io\Connection;
use React\Mysql\Io\Factory;
use React\Mysql\Io\Query;
use React\Promise\Deferred;
use React\Promise\Promise;
use React\Promise\PromiseInterface;
Expand Down Expand Up @@ -152,19 +153,22 @@ public function __construct(
* could allow for possible SQL injection attacks and this API is not
* suited for exposing multiple possible results.
*
* @param string $sql SQL statement
* @param array $params Parameters which should be bound to query
* @param string $sql SQL statement
* @param list<string|int|float|bool|null> $params Parameters which should be bound to query
* @return PromiseInterface<MysqlResult>
* Resolves with a `MysqlResult` on success or rejects with an `Exception` on error.
* @throws \InvalidArgumentException if given $params are invalid
*/
public function query($sql, array $params = [])
{
$query = new Query($sql, $params);

if ($this->closed || $this->quitting) {
return \React\Promise\reject(new Exception('Connection closed'));
}

return $this->getConnection()->then(function (Connection $connection) use ($sql, $params) {
return $connection->query($sql, $params)->then(function (MysqlResult $result) use ($connection) {
return $this->getConnection()->then(function (Connection $connection) use ($query) {
return $connection->query($query)->then(function (MysqlResult $result) use ($connection) {
$this->handleConnectionReady($connection);
return $result;
}, function (\Exception $e) use ($connection) {
Expand Down Expand Up @@ -229,19 +233,23 @@ public function query($sql, array $params = [])
* could allow for possible SQL injection attacks and this API is not
* suited for exposing multiple possible results.
*
* @param string $sql SQL statement
* @param array $params Parameters which should be bound to query
* @param string $sql SQL statement
* @param list<string|int|float|bool|null> $params Parameters which should be bound to query
* @return ReadableStreamInterface
* @throws \InvalidArgumentException if given $params are invalid
* @throws Exception if connection is already closed/closing
*/
public function queryStream($sql, $params = [])
public function queryStream($sql, array $params = [])
{
$query = new Query($sql, $params);

if ($this->closed || $this->quitting) {
throw new Exception('Connection closed');
}

return \React\Promise\Stream\unwrapReadable(
$this->getConnection()->then(function (Connection $connection) use ($sql, $params) {
$stream = $connection->queryStream($sql, $params);
$this->getConnection()->then(function (Connection $connection) use ($query) {
$stream = $connection->queryStream($query);

$stream->on('end', function () use ($connection) {
$this->handleConnectionReady($connection);
Expand Down
39 changes: 20 additions & 19 deletions tests/Io/ConnectionTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace React\Tests\Mysql\Io;

use React\Mysql\Io\Connection;
use React\Mysql\Io\Query;
use React\Tests\Mysql\BaseTestCase;

class ConnectionTest extends BaseTestCase
Expand All @@ -22,7 +23,7 @@ public function testIsBusyReturnsTrueWhenParserIsBusy()

$connection = new Connection($stream, $executor, $parser, $loop, null);

$connection->query('SELECT 1');
$connection->query(new Query('SELECT 1'));

$this->assertTrue($connection->isBusy());
}
Expand Down Expand Up @@ -57,7 +58,7 @@ public function testQueryWillEnqueueOneCommand()
$loop->expects($this->never())->method('addTimer');

$conn = new Connection($stream, $executor, $parser, $loop, null);
$conn->query('SELECT 1');
$conn->query(new Query('SELECT 1'));
}

public function testQueryWillReturnResolvedPromiseAndStartIdleTimerWhenQueryCommandEmitsSuccess()
Expand All @@ -81,7 +82,7 @@ public function testQueryWillReturnResolvedPromiseAndStartIdleTimerWhenQueryComm

$this->assertNull($currentCommand);

$promise = $connection->query('SELECT 1');
$promise = $connection->query(new Query('SELECT 1'));

$promise->then($this->expectCallableOnceWith($this->isInstanceOf('React\Mysql\MysqlResult')));

Expand Down Expand Up @@ -110,7 +111,7 @@ public function testQueryWillReturnResolvedPromiseAndStartIdleTimerWhenQueryComm

$this->assertNull($currentCommand);

$promise = $connection->query('SELECT 1');
$promise = $connection->query(new Query('SELECT 1'));

$promise->then($this->expectCallableOnceWith($this->isInstanceOf('React\Mysql\MysqlResult')));

Expand Down Expand Up @@ -139,7 +140,7 @@ public function testQueryWillReturnResolvedPromiseAndStartIdleTimerWhenIdlePerio

$this->assertNull($currentCommand);

$promise = $connection->query('SELECT 1');
$promise = $connection->query(new Query('SELECT 1'));

$promise->then($this->expectCallableOnceWith($this->isInstanceOf('React\Mysql\MysqlResult')));

Expand All @@ -166,7 +167,7 @@ public function testQueryWillReturnResolvedPromiseAndNotStartIdleTimerWhenIdlePe

$this->assertNull($currentCommand);

$promise = $connection->query('SELECT 1');
$promise = $connection->query(new Query('SELECT 1'));

$promise->then($this->expectCallableOnceWith($this->isInstanceOf('React\Mysql\MysqlResult')));

Expand Down Expand Up @@ -195,7 +196,7 @@ public function testQueryWillReturnRejectedPromiseAndStartIdleTimerWhenQueryComm

$this->assertNull($currentCommand);

$promise = $connection->query('SELECT 1');
$promise = $connection->query(new Query('SELECT 1'));

$promise->then(null, $this->expectCallableOnce());

Expand Down Expand Up @@ -230,7 +231,7 @@ public function testQueryFollowedByIdleTimerWillQuitUnderlyingConnectionAndEmitC

$this->assertNull($currentCommand);

$connection->query('SELECT 1');
$connection->query(new Query('SELECT 1'));

$this->assertNotNull($currentCommand);
$currentCommand->emit('success');
Expand Down Expand Up @@ -269,7 +270,7 @@ public function testQueryFollowedByIdleTimerWillQuitUnderlyingConnectionAndEmitC

$this->assertNull($currentCommand);

$connection->query('SELECT 1');
$connection->query(new Query('SELECT 1'));

$this->assertNotNull($currentCommand);
$currentCommand->emit('success');
Expand Down Expand Up @@ -300,8 +301,8 @@ public function testQueryTwiceWillEnqueueSecondQueryWithoutStartingIdleTimerWhen

$this->assertNull($currentCommand);

$connection->query('SELECT 1');
$connection->query('SELECT 2');
$connection->query(new Query('SELECT 1'));
$connection->query(new Query('SELECT 2'));

$this->assertNotNull($currentCommand);
$currentCommand->emit('success');
Expand All @@ -328,12 +329,12 @@ public function testQueryTwiceAfterIdleTimerWasStartedWillCancelIdleTimerAndEnqu

$this->assertNull($currentCommand);

$connection->query('SELECT 1');
$connection->query(new Query('SELECT 1'));

$this->assertNotNull($currentCommand);
$currentCommand->emit('success');

$connection->query('SELECT 2');
$connection->query(new Query('SELECT 2'));
}

public function testQueryStreamWillEnqueueOneCommand()
Expand All @@ -350,7 +351,7 @@ public function testQueryStreamWillEnqueueOneCommand()
$loop->expects($this->never())->method('addTimer');

$conn = new Connection($stream, $executor, $parser, $loop, null);
$conn->queryStream('SELECT 1');
$conn->queryStream(new Query('SELECT 1'));
}

public function testQueryStreamWillReturnStreamThatWillEmitEndEventAndStartIdleTimerWhenQueryCommandEmitsSuccess()
Expand All @@ -374,7 +375,7 @@ public function testQueryStreamWillReturnStreamThatWillEmitEndEventAndStartIdleT

$this->assertNull($currentCommand);

$stream = $connection->queryStream('SELECT 1');
$stream = $connection->queryStream(new Query('SELECT 1'));

$stream->on('end', $this->expectCallableOnce());
$stream->on('close', $this->expectCallableOnce());
Expand Down Expand Up @@ -404,7 +405,7 @@ public function testQueryStreamWillReturnStreamThatWillEmitErrorEventAndStartIdl

$this->assertNull($currentCommand);

$stream = $connection->queryStream('SELECT 1');
$stream = $connection->queryStream(new Query('SELECT 1'));

$stream->on('error', $this->expectCallableOnceWith($this->isInstanceOf('RuntimeException')));
$stream->on('close', $this->expectCallableOnce());
Expand Down Expand Up @@ -641,7 +642,7 @@ public function testQueryAfterQuitRejectsImmediately()

$conn = new Connection($stream, $executor, $parser, $loop, null);
$conn->quit();
$promise = $conn->query('SELECT 1');
$promise = $conn->query(new Query('SELECT 1'));

$promise->then(null, $this->expectCallableOnceWith(
$this->logicalAnd(
Expand All @@ -668,7 +669,7 @@ public function testQueryAfterCloseRejectsImmediately()

$conn = new Connection($stream, $executor, $parser, $loop, null);
$conn->close();
$promise = $conn->query('SELECT 1');
$promise = $conn->query(new Query('SELECT 1'));

$promise->then(null, $this->expectCallableOnceWith(
$this->logicalAnd(
Expand Down Expand Up @@ -697,7 +698,7 @@ public function testQueryStreamAfterQuitThrows()
$conn->quit();

try {
$conn->queryStream('SELECT 1');
$conn->queryStream(new Query('SELECT 1'));
} catch (\RuntimeException $e) {
$this->assertEquals('Connection closing (ENOTCONN)', $e->getMessage());
$this->assertEquals(defined('SOCKET_ENOTCONN') ? SOCKET_ENOTCONN : 107, $e->getCode());
Expand Down
Loading