Skip to content

Commit 4f128f8

Browse files
committed
[php] Use non-blocking I/O for DB communication
Just as AMPHP, ReactPHP requires asynchronous implementations for it to work as expected. When using PDO we will block the process when establishing the DB connection and sending queries. This replaces the implementation with a fully async MySQL client, also cleaning removing unnecessary extensions from the image. Signed-off-by: Luís Cobucci <[email protected]>
1 parent 28aedbd commit 4f128f8

File tree

4 files changed

+115
-103
lines changed

4 files changed

+115
-103
lines changed

frameworks/PHP/reactphp/app.php

Lines changed: 111 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -1,134 +1,147 @@
11
<?php
2+
3+
use Psr\Http\Message\ResponseInterface;
24
use Psr\Http\Message\ServerRequestInterface as Request;
5+
use React\EventLoop\Loop;
6+
use React\MySQL\ConnectionInterface as DbConnection;
7+
use React\MySQL\Factory as DbFactory;
38
use React\Http\Message\Response;
9+
use React\MySQL\QueryResult;
10+
use React\Promise\PromiseInterface;
411

5-
function init()
6-
{
7-
global $world, $fortune, $update;
8-
$pdo = new PDO(
9-
'mysql:host=tfb-database;dbname=hello_world',
10-
'benchmarkdbuser',
11-
'benchmarkdbpass',
12-
[
13-
PDO::ATTR_DEFAULT_FETCH_MODE => PDO::FETCH_ASSOC,
14-
PDO::ATTR_EMULATE_PREPARES => false
15-
]
16-
);
17-
$world = $pdo->prepare('SELECT id,randomNumber FROM World WHERE id=?');
18-
$update = $pdo->prepare('UPDATE World SET randomNumber=? WHERE id=?');
19-
$fortune = $pdo->prepare('SELECT id,message FROM Fortune');
20-
$fortune->setFetchMode(PDO::FETCH_KEY_PAIR);
21-
}
12+
use function React\Promise\all;
2213

23-
function router(Request $request)
14+
/** @return Closure(Request):ResponseInterface */
15+
function requestHandler(): Closure
2416
{
25-
return match($request->getUri()->getPath()) {
26-
'/plaintext' => text(),
27-
'/json' => json(),
28-
'/db' => db(),
29-
'/fortunes' => fortune(),
30-
'/query' => query($request),
31-
'/update' => updateraw($request),
32-
// '/info' => info(),
33-
default => new Response(404, [], 'Error 404'),
17+
$connection = establishDbConnection('benchmarkdbuser:benchmarkdbpass@tfb-database/hello_world?idle=0.5');
18+
19+
$world = static function (int $id) use ($connection): PromiseInterface {
20+
return $connection->query('SELECT id,randomNumber FROM World WHERE id=?', [$id]);
3421
};
35-
}
3622

37-
function text()
38-
{
39-
return new Response(200, [
40-
'Content-Type' => 'text/plain'
41-
], 'Hello, World!');
42-
}
23+
$fortune = static function () use ($connection): PromiseInterface {
24+
return $connection->query('SELECT id,message FROM Fortune');
25+
};
4326

44-
function json()
45-
{
46-
return new Response(200, [
47-
'Content-Type' => 'application/json'
48-
], json_encode(['message' => 'Hello, World!']));
27+
$update = static function (int $id, int $randomNumber) use ($connection): PromiseInterface {
28+
return $connection->query('UPDATE World SET randomNumber=? WHERE id=?', [$randomNumber, $id]);
29+
};
30+
31+
return static function (Request $request) use ($world, $fortune, $update): ResponseInterface | PromiseInterface {
32+
return match($request->getUri()->getPath()) {
33+
'/plaintext' => Response::plaintext('Hello, World!'),
34+
'/json' => Response::json(['message' => 'Hello, World!']),
35+
'/db' => db($world),
36+
'/fortunes' => fortune($fortune),
37+
'/query' => query(queryCount($request), $world),
38+
'/update' => updateraw(queryCount($request), $world, $update),
39+
// '/info' => info(),
40+
default => new Response(404, [], 'Error 404'),
41+
};
42+
};
4943
}
5044

51-
function db()
52-
{
53-
global $world;
45+
function establishDbConnection(
46+
#[SensitiveParameter]
47+
string $uri,
48+
): DbConnection {
49+
$connection = (new DbFactory())->createLazyConnection($uri);
50+
51+
$interrupt = $connection->quit(...);
5452

55-
$world->execute([mt_rand(1, 10000)]);
53+
$connection->on('close', static function () use (&$interrupt) {
54+
Loop::removeSignal(SIGINT, $interrupt);
55+
Loop::removeSignal(SIGTERM, $interrupt);
56+
});
5657

57-
return new Response(200, [
58-
'Content-Type' => 'application/json'
59-
], json_encode($world->fetch()));
58+
Loop::addSignal(SIGINT, $interrupt);
59+
Loop::addSignal(SIGTERM, $interrupt);
60+
61+
return $connection;
6062
}
6163

62-
function query($request)
64+
/** @param Closure(int):PromiseInterface $world */
65+
function db(Closure $world): PromiseInterface
6366
{
64-
global $world;
67+
$id = mt_rand(1, 10000);
6568

66-
$query_count = 1;
67-
$q = (int) $request->getQueryParams()['q'];
68-
if ($q > 1) {
69-
$query_count = min($q, 500);
70-
}
69+
return $world($id)->then(
70+
static fn (QueryResult $result): ResponseInterface => Response::json($result->resultRows[0]),
71+
);
72+
}
7173

72-
while ($query_count--) {
73-
$world->execute([mt_rand(1, 10000)]);
74-
$arr[] = $world->fetch();
74+
function queryCount(Request $request): int
75+
{
76+
$count = (int) ($request->getQueryParams()['q'] ?? 1);
77+
78+
if ($count > 1) {
79+
return min($count, 500);
7580
}
7681

77-
return new Response(200, [
78-
'Content-Type' => 'application/json'
79-
], json_encode($arr));
82+
return 1;
8083
}
8184

82-
function updateraw($request)
85+
/** @param Closure(int):PromiseInterface $world */
86+
function query(int $queryCount, Closure $world): PromiseInterface
8387
{
84-
global $world, $update;
88+
$processQueries = static function (int $count) use ($world): iterable {
89+
while ($count--) {
90+
$id = mt_rand(1, 10000);
8591

86-
$query_count = 1;
87-
$q = (int) $request->getQueryParams()['q'];
88-
if ($q > 1) {
89-
$query_count = min($q, 500);
90-
}
92+
yield $world($id)->then(static fn (QueryResult $result): array => $result->resultRows[0]);
93+
}
94+
};
9195

92-
while ($query_count--) {
93-
$id = mt_rand(1, 10000);
94-
$world->execute([$id]);
95-
$item = $world->fetch();
96-
$update->execute(
97-
[$item['randomNumber'] = mt_rand(1, 10000), $id]
98-
);
96+
return all($processQueries($queryCount))
97+
->then(static fn (array $result): ResponseInterface => Response::json($result));
98+
}
9999

100-
$arr[] = $item;
101-
}
100+
/**
101+
* @param Closure(int):PromiseInterface $world
102+
* @param Closure(int, int):PromiseInterface $update
103+
*/
104+
function updateraw(int $queryCount, Closure $world, Closure $update): PromiseInterface
105+
{
106+
$processQueries = static function (int $count) use ($world, $update): iterable {
107+
while ($count--) {
108+
$id = mt_rand(1, 10000);
109+
110+
yield $world($id)->then(
111+
static function (QueryResult $result) use ($update): PromiseInterface {
112+
$updated = $result->resultRows[0];
113+
$updated['randomNumber'] = mt_rand(1, 10000);
114+
115+
return $update($updated['id'], $updated['randomNumber'])
116+
->then(static fn (): array => $updated);
117+
}
118+
);
119+
}
120+
};
102121

103-
// $pdo->beginTransaction();
104-
// foreach($arr as $world) {
105-
// $update->execute([$world['randomNumber'], $world['id']]);
106-
// }
107-
// $pdo->commit();
108-
return new Response(200, [
109-
'Content-Type' => 'application/json'
110-
], json_encode($arr));
122+
return all($processQueries($queryCount))
123+
->then(static fn (array $result): ResponseInterface => Response::json($result));
111124
}
112125

113-
function fortune()
126+
function fortune(Closure $fortune): PromiseInterface
114127
{
115-
global $fortune;
128+
$formatResult = static function (array $rows): string {
129+
$rows[] = ['id' => 0, 'message' => 'Additional fortune added at request time.'];
130+
usort($rows, static fn (array $one, array $other) => $one['message'] <=> $other['message']);
116131

117-
$fortune->execute();
132+
$html = '';
118133

119-
$arr = $fortune->fetchAll();
120-
$arr[0] = 'Additional fortune added at request time.';
121-
asort($arr);
134+
foreach ($rows as $row) {
135+
$message = htmlspecialchars($row['message'], ENT_QUOTES, 'UTF-8');
122136

123-
$html = '';
124-
foreach ($arr as $id => $message) {
125-
$message = htmlspecialchars($message, ENT_QUOTES, 'UTF-8');
126-
$html .= "<tr><td>$id</td><td>$message</td></tr>";
127-
}
137+
$html .= "<tr><td>${row['id']}</td><td>${message}</td></tr>";
138+
}
139+
140+
return "<!DOCTYPE html><html><head><title>Fortunes</title></head><body><table><tr><th>id</th><th>message</th></tr>$html</table></body></html>";
141+
};
128142

129-
return new Response(200, [
130-
'Content-Type' => 'text/html; charset=UTF-8',
131-
], "<!DOCTYPE html><html><head><title>Fortunes</title></head><body><table><tr><th>id</th><th>message</th></tr>$html</table></body></html>"
143+
return $fortune()->then(
144+
static fn (QueryResult $result): ResponseInterface => Response::html($formatResult($result->resultRows)),
132145
);
133146
}
134147

@@ -138,4 +151,4 @@ function fortune()
138151
phpinfo();
139152
return new Response(200, ['Content-Type' => 'text/plain'], ob_get_clean());
140153
}
141-
*/
154+
*/

frameworks/PHP/reactphp/composer.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
"psr/http-message": "^1.0",
55
"react/event-loop": "^1.5",
66
"react/http": "^1.9",
7-
"react/socket": "^1.14"
7+
"react/socket": "^1.14",
8+
"react/mysql": "^0.6"
89
}
910
}

frameworks/PHP/reactphp/reactphp.dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ RUN apt-get update -yqq && apt-get install -yqq software-properties-common > /de
66
RUN LC_ALL=C.UTF-8 add-apt-repository ppa:ondrej/php
77
RUN apt-get update -yqq > /dev/null && \
88
apt-get install -yqq git unzip wget curl build-essential \
9-
php8.2-cli php8.2-mbstring php8.2-dev php8.2-xml php8.2-curl php8.2-mysql > /dev/null
9+
php8.2-cli php8.2-mbstring php8.2-dev > /dev/null
1010

1111
# An extension is required!
1212
# We deal with concurrencies over 1k, which stream_select doesn't support.

frameworks/PHP/reactphp/server.php

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,7 @@
77
require __DIR__ . '/vendor/autoload.php';
88
require_once __DIR__.'/app.php';
99

10-
init();
11-
12-
$server = new HttpServer(router(...));
10+
$server = new HttpServer(requestHandler());
1311
$socket = new SocketServer('0.0.0.0:8080');
1412
$server->listen($socket);
1513

0 commit comments

Comments
 (0)