Skip to content

Commit 82a6397

Browse files
authored
PHPLIB-1237 Implement Parallel Benchmarks (#1166)
Parallel Benchmarks specs: LDJSON multi-file import https://github.com/mongodb/specifications/blob/e09b41df206f9efaa36ba4c332c47d04ddb7d6d1/source/benchmarking/benchmarking.rst#ldjson-multi-file-import) Implementations: - Using Driver's BulkWrite in a single thread - Using library's Collection::insertMany in a single thread - Using multiple forked threads - Using amphp/parallel-functions with worker pool To get the fastest result: - Reading files is done using `stream_get_line` - Document insertion is done using Driver's BulkInsert
1 parent ec6c431 commit 82a6397

23 files changed

+259
-9
lines changed

.github/workflows/benchmark.yml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,14 +64,15 @@ jobs:
6464
- name: "Install dependencies with Composer"
6565
uses: "ramsey/[email protected]"
6666
with:
67-
composer-options: "--no-suggest"
67+
composer-options: "--no-suggest --working-dir=./benchmark"
6868

6969
- name: "Run phpbench"
70+
working-directory: "./benchmark"
7071
run: "vendor/bin/phpbench run --report=aggregate --report=bar_chart_time --report=env --output html"
7172

7273
- name: Upload HTML report
7374
uses: actions/upload-artifact@v3
7475
with:
7576
name: phpbench-${{ github.sha }}.html
76-
path: .phpbench/html/index.html
77+
path: ./benchmark/.phpbench/html/index.html
7778
retention-days: 3

benchmark/composer.json

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
{
2+
"name": "mongodb/mongodb-benchmark",
3+
"type": "project",
4+
"repositories": [
5+
{
6+
"type": "path",
7+
"url": "../",
8+
"symlink": true
9+
}
10+
],
11+
"require": {
12+
"php": ">=8.1",
13+
"ext-pcntl": "*",
14+
"amphp/parallel-functions": "^1.1",
15+
"mongodb/mongodb": "@dev",
16+
"phpbench/phpbench": "^1.2"
17+
},
18+
"autoload": {
19+
"psr-4": {
20+
"MongoDB\\Benchmark\\": "src/"
21+
}
22+
},
23+
"scripts": {
24+
"benchmark": "phpbench run --report=aggregate"
25+
}
26+
}

phpbench.json.dist renamed to benchmark/phpbench.json.dist

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
"runner.env_enabled_providers": ["mongodb","sampler","git","opcache","php","uname","unix_sysload"],
55
"runner.bootstrap": "vendor/autoload.php",
66
"runner.file_pattern": "*Bench.php",
7-
"runner.path": "benchmark",
7+
"runner.path": "src",
88
"runner.php_config": { "memory_limit": "1G" },
99
"runner.iterations": 3,
1010
"runner.revs": 10
Lines changed: 216 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,216 @@
1+
<?php
2+
3+
namespace MongoDB\Benchmark\DriverBench;
4+
5+
use Amp\Parallel\Worker\DefaultPool;
6+
use Generator;
7+
use MongoDB\Benchmark\Fixtures\Data;
8+
use MongoDB\Benchmark\Utils;
9+
use MongoDB\BSON\Document;
10+
use MongoDB\Driver\BulkWrite;
11+
use PhpBench\Attributes\AfterClassMethods;
12+
use PhpBench\Attributes\BeforeClassMethods;
13+
use PhpBench\Attributes\BeforeMethods;
14+
use PhpBench\Attributes\Iterations;
15+
use PhpBench\Attributes\ParamProviders;
16+
use PhpBench\Attributes\Revs;
17+
use RuntimeException;
18+
19+
use function Amp\ParallelFunctions\parallelMap;
20+
use function Amp\Promise\wait;
21+
use function array_map;
22+
use function count;
23+
use function fclose;
24+
use function fgets;
25+
use function file_get_contents;
26+
use function file_put_contents;
27+
use function fopen;
28+
use function is_dir;
29+
use function mkdir;
30+
use function pcntl_fork;
31+
use function pcntl_waitpid;
32+
use function range;
33+
use function sprintf;
34+
use function str_repeat;
35+
use function stream_get_line;
36+
use function sys_get_temp_dir;
37+
use function unlink;
38+
39+
/**
40+
* For accurate results, run benchmarks on a standalone server.
41+
*
42+
* @see https://github.com/mongodb/specifications/blob/ddfc8b583d49aaf8c4c19fa01255afb66b36b92e/source/benchmarking/benchmarking.rst#ldjson-multi-file-import
43+
*/
44+
#[BeforeClassMethods('beforeClass')]
45+
#[AfterClassMethods('afterClass')]
46+
#[BeforeMethods('beforeIteration')]
47+
#[Iterations(1)]
48+
#[Revs(1)]
49+
final class ParallelMultiFileImportBench
50+
{
51+
public static function beforeClass(): void
52+
{
53+
// Generate files
54+
$fileContents = str_repeat(file_get_contents(Data::LDJSON_FILE_PATH), 5_000);
55+
foreach (self::getFileNames() as $file) {
56+
file_put_contents($file, $fileContents);
57+
}
58+
}
59+
60+
public static function afterClass(): void
61+
{
62+
foreach (self::getFileNames() as $file) {
63+
unlink($file);
64+
}
65+
}
66+
67+
public function beforeIteration(): void
68+
{
69+
$database = Utils::getDatabase();
70+
$database->drop();
71+
$database->createCollection(Utils::getCollectionName());
72+
}
73+
74+
/**
75+
* Using Driver's BulkWrite in a single thread
76+
*/
77+
public function benchMultiFileImportBulkWrite(): void
78+
{
79+
foreach (self::getFileNames() as $file) {
80+
self::importFile($file);
81+
}
82+
}
83+
84+
/**
85+
* Using library's Collection::insertMany in a single thread
86+
*/
87+
public function benchMultiFileImportInsertMany(): void
88+
{
89+
$collection = Utils::getCollection();
90+
foreach (self::getFileNames() as $file) {
91+
$docs = [];
92+
// Read file contents into BSON documents
93+
$fh = fopen($file, 'r');
94+
while (($line = fgets($fh)) !== false) {
95+
if ($line !== '') {
96+
$docs[] = Document::fromJSON($line);
97+
}
98+
}
99+
100+
fclose($fh);
101+
102+
// Insert documents in bulk
103+
$collection->insertMany($docs);
104+
}
105+
}
106+
107+
/**
108+
* Using multiple forked threads
109+
*
110+
* @param array{processes:int, files:string[], batchSize:int} $params
111+
*/
112+
#[ParamProviders(['provideProcessesParameter'])]
113+
public function benchMultiFileImportFork(array $params): void
114+
{
115+
$pids = [];
116+
foreach (self::getFileNames() as $file) {
117+
// Wait for a child process to finish if we have reached the maximum number of processes
118+
if (count($pids) >= $params['processes']) {
119+
$pid = pcntl_waitpid(-1, $status);
120+
unset($pids[$pid]);
121+
}
122+
123+
$pid = pcntl_fork();
124+
if ($pid === 0) {
125+
// Reset to ensure that the existing libmongoc client (via the Manager) is not re-used by the child
126+
// process. When the child process constructs a new Manager, the differing PID will result in creation
127+
// of a new libmongoc client.
128+
Utils::reset();
129+
self::importFile($file);
130+
131+
// Exit the child process
132+
exit(0);
133+
}
134+
135+
if ($pid === -1) {
136+
throw new RuntimeException('Failed to fork');
137+
}
138+
139+
// Keep the forked process id to wait for it later
140+
$pids[$pid] = true;
141+
}
142+
143+
// Wait for all child processes to finish
144+
while ($pids !== []) {
145+
$pid = pcntl_waitpid(-1, $status);
146+
unset($pids[$pid]);
147+
}
148+
}
149+
150+
/**
151+
* Using amphp/parallel-functions with worker pool
152+
*
153+
* @param array{processes:int, files:string[], batchSize:int} $params
154+
*/
155+
#[ParamProviders(['provideProcessesParameter'])]
156+
public function benchMultiFileImportAmp(array $params): void
157+
{
158+
wait(parallelMap(
159+
self::getFileNames(),
160+
// Uses array callable instead of closure to skip complex serialization
161+
[self::class, 'importFile'],
162+
// The pool size is the number of processes
163+
new DefaultPool($params['processes']),
164+
));
165+
}
166+
167+
public static function provideProcessesParameter(): Generator
168+
{
169+
yield '1 proc' => ['processes' => 1]; // 100 sequences, to compare to the single thread baseline
170+
yield '2 proc' => ['processes' => 2]; // 50 sequences
171+
yield '4 proc' => ['processes' => 4]; // 25 sequences
172+
yield '8 proc' => ['processes' => 8]; // 13 sequences
173+
yield '13 proc' => ['processes' => 13]; // 8 sequences
174+
yield '20 proc' => ['processes' => 20]; // 5 sequences
175+
yield '34 proc' => ['processes' => 34]; // 3 sequences
176+
}
177+
178+
/**
179+
* We benchmarked the following solutions to read a file line by line:
180+
* - file
181+
* - SplFileObject
182+
* - fgets
183+
* - stream_get_line 🏆
184+
*/
185+
public static function importFile(string $file): void
186+
{
187+
$namespace = sprintf('%s.%s', Utils::getDatabaseName(), Utils::getCollectionName());
188+
189+
$bulkWrite = new BulkWrite();
190+
$fh = fopen($file, 'r');
191+
while (($line = stream_get_line($fh, 10_000, "\n")) !== false) {
192+
$bulkWrite->insert(Document::fromJSON($line));
193+
}
194+
195+
fclose($fh);
196+
Utils::getClient()->getManager()->executeBulkWrite($namespace, $bulkWrite);
197+
}
198+
199+
/**
200+
* Using a method to regenerate the file names because we cannot cache the result of the method in a static
201+
* property. The benchmark runner will call the method in a different process, so the static property will not be
202+
* populated.
203+
*/
204+
private static function getFileNames(): array
205+
{
206+
$tempDir = sys_get_temp_dir() . '/mongodb-php-benchmark';
207+
if (! is_dir($tempDir)) {
208+
mkdir($tempDir);
209+
}
210+
211+
return array_map(
212+
static fn (int $i) => sprintf('%s/%03d.txt', $tempDir, $i),
213+
range(0, 99),
214+
);
215+
}
216+
}

0 commit comments

Comments
 (0)