Skip to content

Commit 0e300f8

Browse files
authored
Merge pull request #62 from xp-forge/feature/compression
Implement compression
2 parents 2af113d + 7fadc04 commit 0e300f8

File tree

8 files changed

+427
-32
lines changed

8 files changed

+427
-32
lines changed
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
<?php namespace com\mongodb\io;
2+
3+
use lang\Value;
4+
use util\Comparison;
5+
6+
/**
7+
* Compression negotiation and selection.
8+
*
9+
* @see https://github.com/mongodb/specifications/blob/master/source/compression/OP_COMPRESSED.md
10+
* @test com.mongodb.unittest.CompressionTest
11+
*/
12+
class Compression implements Value {
13+
use Comparison;
14+
15+
private static $negotiable= [];
16+
private $compressors;
17+
18+
static function __static() {
19+
extension_loaded('zlib') && self::$negotiable['zlib']= fn($options) => new Zlib($options['zlibCompressionLevel'] ?? -1);
20+
extension_loaded('zstd') && self::$negotiable['zstd']= fn($options) => new Zstd($options['zstdCompressionLevel'] ?? -1);
21+
}
22+
23+
/** @param [:com.mongodb.io.Compressor] $compressors */
24+
public function __construct(array $compressors= []) {
25+
$this->compressors= $compressors;
26+
}
27+
28+
/** Registers a given compressor */
29+
public function with(Compressor $compressor): self {
30+
$this->compressors[$compressor->id]= $compressor;
31+
return $this;
32+
}
33+
34+
/**
35+
* Negotiate compression. Returns NULL if no compressors apply.
36+
*
37+
* @param string[] $server
38+
* @param [:string] $options
39+
* @return ?self
40+
*/
41+
public static function negotiate($server, $options= []) {
42+
$negotiated= [];
43+
foreach ($server as $preference) {
44+
if ($new= self::$negotiable[$preference] ?? null) {
45+
$compressor= $new($options);
46+
$negotiated[$compressor->id]= $compressor;
47+
}
48+
}
49+
return $negotiated ? new self($negotiated) : null;
50+
}
51+
52+
/**
53+
* Selects the compressor for a given ID. Returns NULL if compressor by this
54+
* ID is present.
55+
*
56+
* @param int $id
57+
* @return ?com.mongodb.io.Compressor
58+
*/
59+
public function select($id) {
60+
return $this->compressors[$id] ?? null;
61+
}
62+
63+
/**
64+
* Returns compressor for given input sections and length. Returns NULL if no
65+
* compression should be used.
66+
*
67+
* @param [:var] $sectionts
68+
* @param int $length
69+
* @return ?com.mongodb.io.Compressor
70+
*/
71+
public function for($sections, $length) {
72+
if (empty($this->compressors) || (
73+
isset($sections['hello']) ||
74+
isset($sections['isMaster']) ||
75+
isset($sections['saslStart']) ||
76+
isset($sections['saslContinue']) ||
77+
isset($sections['getnonce']) ||
78+
isset($sections['authenticate']) ||
79+
isset($sections['createUser']) ||
80+
isset($sections['updateUser']) ||
81+
isset($sections['copydbSaslStart']) ||
82+
isset($sections['copydbgetnonce']) ||
83+
isset($sections['copydb'])
84+
)) return null;
85+
86+
// When compressing, clients MUST use the first compressor in the client's
87+
// configured compressors list that is also in the servers list.
88+
return current($this->compressors);
89+
}
90+
91+
/** @return string */
92+
public function toString() {
93+
$s= nameof($this)."@[\n";
94+
foreach ($this->compressors as $compressor) {
95+
$s.= ' '.$compressor->toString()."\n";
96+
}
97+
return $s.']';
98+
}
99+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
<?php namespace com\mongodb\io;
2+
3+
use lang\Value;
4+
5+
abstract class Compressor implements Value {
6+
public $id;
7+
8+
public abstract function compress($data);
9+
10+
public abstract function decompress($compressed);
11+
12+
public function toString() { return nameof($this).'(id: '.$this->id.')'; }
13+
14+
public function hashCode() { return 'C'.$this->id; }
15+
16+
public function compareTo($value) { return $value instanceof self ? $this->id <=> $value->id : 1; }
17+
18+
}

src/main/php/com/mongodb/io/Connection.class.php

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ class Connection {
3232

3333
private $socket, $bson;
3434
private $packet= 1;
35+
public $compression= null;
3536
public $server= null;
3637
public $lastUsed= null;
3738

@@ -98,6 +99,10 @@ public function establish($options= [], $auth= null) {
9899
'driver' => ['name' => 'XP MongoDB Connectivity', 'version' => '3.0.0'],
99100
'os' => ['name' => php_uname('s'), 'type' => PHP_OS, 'architecture' => php_uname('m'), 'version' => php_uname('r')]
100101
],
102+
'compression' => ($param= ($options['params']['compressors'] ?? null))
103+
? explode(',', $param)
104+
: []
105+
,
101106
];
102107

103108
// If the optional field saslSupportedMechs is specified, the command also returns
@@ -113,6 +118,7 @@ public function establish($options= [], $auth= null) {
113118

114119
try {
115120
$this->server= $this->hello($params);
121+
$this->compression= Compression::negotiate($this->server['compression'] ?? [], $options['params'] ?? []);
116122
} catch (ProtocolException $e) {
117123
throw new ConnectException('Server handshake failed @ '.$this->address(), $e);
118124
}
@@ -230,14 +236,30 @@ public function send($operation, $header, $sections, $readPreference= null) {
230236

231237
$this->packet > 2147483647 ? $this->packet= 1 : $this->packet++;
232238
$body= $header.$this->bson->sections($sections);
233-
$payload= pack('VVVV', strlen($body) + 16, $this->packet, 0, $operation).$body;
239+
$length= strlen($body);
240+
241+
if ($this->compression && $compressor= $this->compression->for($sections, $length)) {
242+
$compressed= $compressor->compress($body);
243+
$this->socket->write(pack(
244+
'VVVVVVCa*',
245+
strlen($compressed) + 25,
246+
$this->packet,
247+
0,
248+
self::OP_COMPRESSED,
249+
$operation,
250+
$length,
251+
$compressor->id,
252+
$compressed
253+
));
254+
} else {
255+
$this->socket->write(pack('VVVV', $length + 16, $this->packet, 0, $operation).$body);
256+
}
234257

235-
$this->socket->write($payload);
236258
$meta= unpack('VmessageLength/VrequestID/VresponseTo/VopCode', $this->read0(16));
237259
$response= $this->read0($meta['messageLength'] - 16);
238260
$this->lastUsed= time();
239261

240-
switch ($meta['opCode']) {
262+
opcode: switch ($meta['opCode']) {
241263
case self::OP_MSG:
242264
$flags= unpack('V', $response, 4)[1];
243265
if ("\x00" === $response[4]) {
@@ -258,6 +280,17 @@ public function send($operation, $header, $sections, $readPreference= null) {
258280

259281
return $reply;
260282

283+
case self::OP_COMPRESSED:
284+
$compressed= unpack('VoriginalOpcode/VuncompressedSize/CcompressorId', $response);
285+
286+
if ($this->compression && $compressor= $this->compression->select($compressed['compressorId'] ?? null)) {
287+
$response= $compressor->decompress(substr($response, 9));
288+
$meta['opCode']= $compressed['originalOpcode'];
289+
goto opcode;
290+
}
291+
292+
throw new ProtocolException('Unsupported compressorId '.$compressed['compressorId']);
293+
261294
default:
262295
return ['opCode' => $meta['opCode']];
263296
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
<?php namespace com\mongodb\io;
2+
3+
class Zlib extends Compressor {
4+
public $id= 2;
5+
public $level;
6+
7+
/** @param int $level */
8+
public function __construct($level= -1) {
9+
$this->level= $level;
10+
}
11+
12+
public function compress($data) {
13+
return gzcompress($data, $this->level);
14+
}
15+
16+
public function decompress($compressed) {
17+
return gzuncompress($compressed);
18+
}
19+
20+
/** @return string */
21+
public function toString() {
22+
return nameof($this).'(id: '.$this->id.', level: '.$this->level.')';
23+
}
24+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
<?php namespace com\mongodb\io;
2+
3+
class Zstd extends Compressor {
4+
public $id= 3;
5+
public $level;
6+
7+
/** @param int $level */
8+
public function __construct($level= -1) {
9+
$this->level= $level;
10+
}
11+
12+
public function compress($data) {
13+
return zstd_compress($data);
14+
}
15+
16+
public function decompress($compressed) {
17+
return zstd_uncompress($compressed);
18+
}
19+
20+
/** @return string */
21+
public function toString() {
22+
return nameof($this).'(id: '.$this->id.', level: '.$this->level.')';
23+
}
24+
}
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
<?php namespace com\mongodb\unittest;
2+
3+
use com\mongodb\io\{Compression, Compressor, Zlib, Zstd};
4+
use test\verify\Runtime;
5+
use test\{Assert, Before, Test, Values};
6+
7+
class CompressionTest {
8+
private $compressor;
9+
10+
#[Before]
11+
public function compressor() {
12+
$this->compressor= new class() extends Compressor {
13+
public $id= 9;
14+
public function compress($data) { /** Not implemented */ }
15+
public function decompress($compressed) { /** Not implemented */ }
16+
};
17+
}
18+
19+
20+
#[Test]
21+
public function can_create() {
22+
new Compression();
23+
}
24+
25+
#[Test]
26+
public function select() {
27+
Assert::equals($this->compressor, (new Compression())->with($this->compressor)->select($this->compressor->id));
28+
}
29+
30+
#[Test]
31+
public function select_empty() {
32+
Assert::null((new Compression())->select($this->compressor->id));
33+
}
34+
35+
#[Test]
36+
public function select_non_existant() {
37+
Assert::null((new Compression())->with($this->compressor)->select(2));
38+
}
39+
40+
#[Test]
41+
public function not_for_hello() {
42+
Assert::null((new Compression())->with($this->compressor)->for(['hello' => 1], 128));
43+
}
44+
45+
#[Test]
46+
public function for_insert() {
47+
Assert::equals($this->compressor, (new Compression())->with($this->compressor)->for(['insert' => 'collection'], 1024));
48+
}
49+
50+
#[Test]
51+
public function negotiate_empty() {
52+
Assert::null(Compression::negotiate([]));
53+
}
54+
55+
#[Test]
56+
public function negotiate_unsupported() {
57+
Assert::null(Compression::negotiate(['unsupported']));
58+
}
59+
60+
#[Test, Runtime(extensions: ['zlib'])]
61+
public function negotiate_zlib() {
62+
Assert::instance(Zlib::class, Compression::negotiate(['unsupported', 'zlib'])->select(2));
63+
}
64+
65+
#[Test, Runtime(extensions: ['zlib']), Values([[[], -1], [['zlibCompressionLevel' => 6], 6]])]
66+
public function negotiate_zlib_with($options, $level) {
67+
$compressor= Compression::negotiate(['zlib'], $options)->select(2);
68+
69+
Assert::instance(Zlib::class, $compressor);
70+
Assert::equals($level, $compressor->level);
71+
}
72+
73+
#[Test, Runtime(extensions: ['zstd'])]
74+
public function negotiate_zstd() {
75+
Assert::instance(Zstd::class, Compression::negotiate(['unsupported', 'zstd'])->select(3));
76+
}
77+
78+
#[Test, Runtime(extensions: ['zstd']), Values([[[], -1], [['zstdCompressionLevel' => 6], 6]])]
79+
public function negotiate_zstd_with($options, $level) {
80+
$compressor= Compression::negotiate(['zstd'], $options)->select(3);
81+
82+
Assert::instance(Zstd::class, $compressor);
83+
Assert::equals($level, $compressor->level);
84+
}
85+
}

0 commit comments

Comments
 (0)