Skip to content

Commit 5a0192c

Browse files
author
Andrew Witten
authored
RUST-54 Support for OP_COMPRESSED (#476)
Adds support for reading and writing OP_COMPRESSED
1 parent 8b8c29e commit 5a0192c

File tree

18 files changed

+773
-25
lines changed

18 files changed

+773
-25
lines changed

Cargo.toml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,18 @@ bson-uuid-0_8 = ["bson/uuid-0_8"]
3535
# This can only be used with the tokio-runtime feature flag.
3636
aws-auth = ["reqwest"]
3737

38+
zstd-compression = ["zstd"]
39+
zlib-compression = ["flate2"]
40+
snappy-compression = ["snap"]
41+
3842
[dependencies]
3943
async-trait = "0.1.42"
4044
base64 = "0.13.0"
4145
bitflags = "1.1.0"
4246
bson = { git = "https://github.com/mongodb/bson-rust", branch = "master" }
4347
chrono = "0.4.7"
4448
derivative = "2.1.1"
49+
flate2 = { version = "1.0", optional = true }
4550
futures-core = "0.3.14"
4651
futures-io = "0.3.14"
4752
futures-util = { version = "0.3.14", features = ["io"] }
@@ -56,6 +61,7 @@ rand = { version = "0.8.3", features = ["small_rng"] }
5661
serde_with = "1.3.1"
5762
sha-1 = "0.9.4"
5863
sha2 = "0.9.3"
64+
snap = { version = "1.0.5", optional = true}
5965
socket2 = "0.4.0"
6066
stringprep = "0.1.2"
6167
strsim = "0.10.0"
@@ -67,6 +73,7 @@ typed-builder = "0.9.0"
6773
version_check = "0.9.1"
6874
webpki = "0.21.0"
6975
webpki-roots = "0.21.0"
76+
zstd = { version = "0.9.0", optional = true }
7077

7178
[dependencies.async-std]
7279
version = "1.9.0"

README.md

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,9 @@ features = ["sync"]
7070
| `aws-auth` | Enable support for the MONGODB-AWS authentication mechanism. | `reqwest` 0.11 | no |
7171
| `bson-uuid-0_8` | Enable support for v0.8 of the [`uuid`](docs.rs/uuid/0.8) crate in the public API of the re-exported `bson` crate. | n/a | no |
7272
| `bson-chrono-0_4` | Enable support for v0.4 of the [`chrono`](docs.rs/chrono/0.4) crate in the public API of the re-exported `bson` crate. | n/a | no |
73+
| `zlib-compression` | Enable support for compressing messages with [`zlib`](https://zlib.net/) | `flate2` 1.0 | no |
74+
| `zstd-compression` | Enable support for compressing messages with [`zstd`](http://facebook.github.io/zstd/). This flag requires Rust version 1.54. | `zstd` 0.9.0 | no |
75+
| `snappy-compression`| Enable support for compressing messages with [`snappy`](http://google.github.io/snappy/) | `snap` 1.0.5 | no |
7376

7477
## Example Usage
7578
Below are simple examples of using the driver. For more specific examples and the API reference, see the driver's [docs.rs page](https://docs.rs/mongodb/latest).
@@ -159,7 +162,7 @@ typed_collection.insert_many(books, None).await?;
159162
```
160163

161164
#### Finding documents in a collection
162-
Results from queries are generally returned via [`Cursor`](https://docs.rs/mongodb/latest/mongodb/struct.Cursor.html), a struct which streams the results back from the server as requested. The [`Cursor`](https://docs.rs/mongodb/latest/mongodb/struct.Cursor.html) type implements the [`Stream`](https://docs.rs/futures/latest/futures/stream/index.html) trait from the [`futures`](https://crates.io/crates/futures) crate, and in order to access its streaming functionality you need to import at least one of the [`StreamExt`](https://docs.rs/futures/latest/futures/stream/trait.StreamExt.html) or [`TryStreamExt`](https://docs.rs/futures/latest/futures/stream/trait.TryStreamExt.html) traits.
165+
Results from queries are generally returned via [`Cursor`](https://docs.rs/mongodb/latest/mongodb/struct.Cursor.html), a struct which streams the results back from the server as requested. The [`Cursor`](https://docs.rs/mongodb/latest/mongodb/struct.Cursor.html) type implements the [`Stream`](https://docs.rs/futures/latest/futures/stream/index.html) trait from the [`futures`](https://crates.io/crates/futures) crate, and in order to access its streaming functionality you need to import at least one of the [`StreamExt`](https://docs.rs/futures/latest/futures/stream/trait.StreamExt.html) or [`TryStreamExt`](https://docs.rs/futures/latest/futures/stream/trait.TryStreamExt.html) traits.
163166

164167
``` toml
165168
# In Cargo.toml, add the following dependency.
@@ -297,17 +300,17 @@ We encourage and would happily accept contributions in the form of GitHub pull r
297300
### Integration and unit tests
298301
In order to run the tests (which are mostly integration tests), you must have access to a MongoDB deployment. You may specify a [MongoDB connection string](https://docs.mongodb.com/manual/reference/connection-string/) in the `MONGODB_URI` environment variable, and the tests will use it to connect to the deployment. If `MONGODB_URI` is unset, the tests will attempt to connect to a local deployment on port 27017.
299302

300-
**Note:** The integration tests will clear out the databases/collections they need to use, but they do not clean up after themselves.
303+
**Note:** The integration tests will clear out the databases/collections they need to use, but they do not clean up after themselves.
301304

302305
To actually run the tests, you can use `cargo` like you would in any other crate:
303306
```bash
304307
cargo test --verbose # runs against localhost:27017
305-
export MONGODB_URI="mongodb://localhost:123"
308+
export MONGODB_URI="mongodb://localhost:123"
306309
cargo test --verbose # runs against localhost:123
307310
```
308311

309312
#### Auth tests
310-
The authentication tests will only be included in the test run if certain requirements are met:
313+
The authentication tests will only be included in the test run if certain requirements are met:
311314
- The deployment must have `--auth` enabled
312315
- Credentials must be specified in `MONGODB_URI`
313316
- The credentials specified in `MONGODB_URI` must be valid and have root privileges on the deployment
@@ -327,7 +330,7 @@ cargo test --verbose
327330
```
328331

329332
#### Run the tests with TLS/SSL
330-
To run the tests with TLS/SSL enabled, you must enable it on the deployment and in `MONGODB_URI`.
333+
To run the tests with TLS/SSL enabled, you must enable it on the deployment and in `MONGODB_URI`.
331334
```bash
332335
export MONGODB_URI="mongodb://localhost:27017/?tls=true&tlsCertificateKeyFile=cert.pem&tlsCAFile=ca.pem"
333336
cargo test --verbose

src/client/options/mod.rs

Lines changed: 35 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ use crate::{
4343
bson::{doc, Bson, Document},
4444
bson_util,
4545
client::auth::{AuthMechanism, Credential},
46+
compression::Compressor,
4647
concern::{Acknowledgment, ReadConcern, WriteConcern},
4748
error::{ErrorKind, Result},
4849
event::{cmap::CmapEventHandler, command::CommandEventHandler, sdam::SdamEventHandler},
@@ -370,8 +371,13 @@ pub struct ClientOptions {
370371
#[builder(default)]
371372
pub app_name: Option<String>,
372373

373-
#[builder(default, setter(skip))]
374-
pub(crate) compressors: Option<Vec<String>>,
374+
/// The compressors that the Client is willing to use in the order they are specified
375+
/// in the configuration. The Client sends this list of compressors to the server.
376+
/// The server responds with the intersection of its supported list of compressors.
377+
/// The order of compressors indicates preference of compressors.
378+
#[builder(default)]
379+
#[serde(skip)]
380+
pub compressors: Option<Vec<Compressor>>,
375381

376382
/// The handler that should process all Connection Monitoring and Pooling events. See the
377383
/// CmapEventHandler type documentation for more details.
@@ -525,9 +531,6 @@ pub struct ClientOptions {
525531
#[builder(default)]
526532
pub write_concern: Option<WriteConcern>,
527533

528-
#[builder(default, setter(skip))]
529-
pub(crate) zlib_compression: Option<i32>,
530-
531534
/// Information from the SRV URI that generated these client options, if applicable.
532535
#[builder(default, setter(skip))]
533536
#[serde(skip)]
@@ -593,7 +596,6 @@ impl Serialize for ClientOptions {
593596
#[derive(Serialize)]
594597
struct ClientOptionsHelper<'a> {
595598
appname: &'a Option<String>,
596-
compressors: &'a Option<Vec<String>>,
597599

598600
#[serde(serialize_with = "bson_util::serialize_duration_option_as_int_millis")]
599601
connecttimeoutms: &'a Option<Duration>,
@@ -650,7 +652,6 @@ impl Serialize for ClientOptions {
650652

651653
let client_options = ClientOptionsHelper {
652654
appname: &self.app_name,
653-
compressors: &self.compressors,
654655
connecttimeoutms: &self.connect_timeout,
655656
credential: &self.credential,
656657
directconnection: &self.direct_connection,
@@ -668,8 +669,8 @@ impl Serialize for ClientOptions {
668669
sockettimeoutms: &self.socket_timeout,
669670
tls: &self.tls,
670671
writeconcern: &self.write_concern,
671-
zlibcompressionlevel: &self.zlib_compression,
672672
loadbalanced: &self.load_balanced,
673+
zlibcompressionlevel: &None,
673674
};
674675

675676
client_options.serialize(serializer)
@@ -693,7 +694,7 @@ struct ClientOptionsParser {
693694
pub min_pool_size: Option<u32>,
694695
pub max_idle_time: Option<Duration>,
695696
pub wait_queue_timeout: Option<Duration>,
696-
pub compressors: Option<Vec<String>>,
697+
pub compressors: Option<Vec<Compressor>>,
697698
pub connect_timeout: Option<Duration>,
698699
pub retry_reads: Option<bool>,
699700
pub retry_writes: Option<bool>,
@@ -929,7 +930,6 @@ impl From<ClientOptionsParser> for ClientOptions {
929930
retry_reads: parser.retry_reads,
930931
retry_writes: parser.retry_writes,
931932
socket_timeout: parser.socket_timeout,
932-
zlib_compression: parser.zlib_compression,
933933
direct_connection: parser.direct_connection,
934934
driver_info: None,
935935
credential: parser.credential,
@@ -1180,6 +1180,12 @@ impl ClientOptions {
11801180
}
11811181
}
11821182

1183+
if let Some(ref compressors) = self.compressors {
1184+
for compressor in compressors {
1185+
compressor.validate()?;
1186+
}
1187+
}
1188+
11831189
Ok(())
11841190
}
11851191

@@ -1213,7 +1219,6 @@ impl ClientOptions {
12131219
socket_timeout,
12141220
tls,
12151221
write_concern,
1216-
zlib_compression,
12171222
original_srv_info,
12181223
original_uri
12191224
]
@@ -1564,6 +1569,16 @@ impl ClientOptionsParser {
15641569
}
15651570
}
15661571

1572+
// If zlib and zlib_compression_level are specified then write zlib_compression_level into
1573+
// zlib enum
1574+
if let (Some(compressors), Some(zlib_compression_level)) =
1575+
(self.compressors.as_mut(), self.zlib_compression)
1576+
{
1577+
for compressor in compressors {
1578+
compressor.write_zlib_level(zlib_compression_level)
1579+
}
1580+
}
1581+
15671582
Ok(())
15681583
}
15691584

@@ -1668,7 +1683,15 @@ impl ClientOptionsParser {
16681683
self.auth_mechanism_properties = Some(doc);
16691684
}
16701685
"compressors" => {
1671-
self.compressors = Some(value.split(',').map(String::from).collect());
1686+
let compressors = value
1687+
.split(',')
1688+
.filter_map(|x| Compressor::parse_str(x).ok())
1689+
.collect::<Vec<Compressor>>();
1690+
self.compressors = if compressors.is_empty() {
1691+
None
1692+
} else {
1693+
Some(compressors)
1694+
}
16721695
}
16731696
k @ "connecttimeoutms" => {
16741697
self.connect_timeout = Some(Duration::from_millis(get_duration!(value, k)));

src/client/options/test.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use crate::{
55
bson::{Bson, Document},
66
client::options::{ClientOptions, ClientOptionsParser, ServerAddress},
77
error::ErrorKind,
8+
options::Compressor,
89
test::run_spec_test,
910
};
1011
#[derive(Debug, Deserialize)]
@@ -40,6 +41,18 @@ async fn run_test(test_file: TestFile) {
4041
|| test_case.description.contains("serverSelectionTryOnce")
4142
|| test_case.description.contains("Unix")
4243
|| test_case.description.contains("relative path")
44+
// Compression is implemented but will only pass the tests if all
45+
// the appropriate feature flags are set. That is because
46+
// valid compressors are only parsed correctly if the corresponding feature flag is set.
47+
// (otherwise they are treated as invalid, and hence ignored)
48+
|| (test_case.description.contains("compress") &&
49+
!cfg!(
50+
all(features = "zlib-compression",
51+
features = "zstd-compression",
52+
features = "snappy-compression"
53+
)
54+
)
55+
)
4356
{
4457
continue;
4558
}
@@ -105,6 +118,25 @@ async fn run_test(test_file: TestFile) {
105118
.filter(|(ref key, _)| json_options.contains_key(key))
106119
.collect();
107120

121+
// This is required because compressor is not serialize, but the spec tests
122+
// still expect to see serialized compressors.
123+
// This hardcodes the compressors into the options.
124+
if let Some(compressors) = options.compressors {
125+
options_doc.insert(
126+
"compressors",
127+
compressors
128+
.iter()
129+
.map(Compressor::name)
130+
.collect::<Vec<&str>>(),
131+
);
132+
#[cfg(feature = "zlib-compression")]
133+
for compressor in compressors {
134+
if let Compressor::Zlib { level: Some(level) } = compressor {
135+
options_doc.insert("zlibcompressionlevel", level);
136+
}
137+
}
138+
}
139+
108140
assert_eq!(options_doc, json_options, "{}", test_case.description)
109141
}
110142
// auth

src/cmap/conn/command.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use serde::{de::DeserializeOwned, Serialize};
55
use super::wire::Message;
66
use crate::{
77
bson::Document,
8-
client::{options::ServerApi, ClusterTime},
8+
client::{options::ServerApi, ClusterTime, HELLO_COMMAND_NAMES, REDACTED_COMMANDS},
99
error::{Error, ErrorKind, Result},
1010
is_master::{IsMasterCommandResponse, IsMasterReply},
1111
operation::{CommandErrorBody, CommandResponse, Response},
@@ -22,6 +22,13 @@ pub(crate) struct RawCommand {
2222
pub(crate) bytes: Vec<u8>,
2323
}
2424

25+
impl RawCommand {
26+
pub(crate) fn should_compress(&self) -> bool {
27+
let name = self.name.to_lowercase();
28+
!REDACTED_COMMANDS.contains(name.as_str()) && !HELLO_COMMAND_NAMES.contains(name.as_str())
29+
}
30+
}
31+
2532
/// Driver-side model of a database command.
2633
#[serde_with::skip_serializing_none]
2734
#[derive(Clone, Debug, Serialize, Default)]

src/cmap/conn/mod.rs

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use crate::{
1818
options::{ConnectionOptions, StreamOptions},
1919
PoolGeneration,
2020
},
21+
compression::Compressor,
2122
error::{load_balanced_mode_mismatch, Error, ErrorKind, Result},
2223
event::cmap::{
2324
CmapEventHandler,
@@ -76,6 +77,14 @@ pub(crate) struct Connection {
7677

7778
stream: AsyncStream,
7879

80+
/// Compressor that the client will use before sending messages.
81+
/// This compressor does not get used to decompress server messages.
82+
/// The client will decompress server messages using whichever compressor
83+
/// the server indicates in its message. This compressor is the first
84+
/// compressor in the client's compressor list that also appears in the
85+
/// server's compressor list.
86+
pub(super) compressor: Option<Compressor>,
87+
7988
/// If the connection is pinned to a cursor or transaction, the channel sender to return this
8089
/// connection to the pin holder.
8190
pinned_sender: Option<mpsc::Sender<Connection>>,
@@ -109,6 +118,7 @@ impl Connection {
109118
stream_description: None,
110119
error: false,
111120
pinned_sender: None,
121+
compressor: None,
112122
};
113123

114124
Ok(conn)
@@ -244,9 +254,24 @@ impl Connection {
244254
}
245255
}
246256

247-
async fn send_message(&mut self, message: Message) -> Result<RawCommandResponse> {
257+
async fn send_message(
258+
&mut self,
259+
message: Message,
260+
to_compress: bool,
261+
) -> Result<RawCommandResponse> {
248262
self.command_executing = true;
249-
let write_result = message.write_to(&mut self.stream).await;
263+
264+
// If the client has agreed on a compressor with the server, and the command
265+
// is the right type of command, then compress the message.
266+
let write_result = match self.compressor {
267+
Some(ref compressor) if to_compress => {
268+
message
269+
.write_compressed_to(&mut self.stream, compressor)
270+
.await
271+
}
272+
_ => message.write_to(&mut self.stream).await,
273+
};
274+
250275
self.error = write_result.is_err();
251276
write_result?;
252277

@@ -267,8 +292,9 @@ impl Connection {
267292
command: Command,
268293
request_id: impl Into<Option<i32>>,
269294
) -> Result<RawCommandResponse> {
295+
let to_compress = command.should_compress();
270296
let message = Message::with_command(command, request_id.into())?;
271-
self.send_message(message).await
297+
self.send_message(message, to_compress).await
272298
}
273299

274300
/// Executes a `RawCommand` and returns a `CommandResponse` containing the result from the
@@ -282,8 +308,9 @@ impl Connection {
282308
command: RawCommand,
283309
request_id: impl Into<Option<i32>>,
284310
) -> Result<RawCommandResponse> {
311+
let to_compress = command.should_compress();
285312
let message = Message::with_raw_command(command, request_id.into());
286-
self.send_message(message).await
313+
self.send_message(message, to_compress).await
287314
}
288315

289316
/// Gets the connection's StreamDescription.
@@ -351,6 +378,7 @@ impl Connection {
351378
pool_manager: None,
352379
ready_and_available_time: None,
353380
pinned_sender: self.pinned_sender.clone(),
381+
compressor: self.compressor.clone(),
354382
}
355383
}
356384
}

src/cmap/conn/wire/header.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ pub(crate) enum OpCode {
1212
Reply = 1,
1313
Query = 2004,
1414
Message = 2013,
15+
Compressed = 2012,
1516
}
1617

1718
impl OpCode {
@@ -21,6 +22,7 @@ impl OpCode {
2122
1 => Ok(OpCode::Reply),
2223
2004 => Ok(OpCode::Query),
2324
2013 => Ok(OpCode::Message),
25+
2012 => Ok(OpCode::Compressed),
2426
other => Err(ErrorKind::InvalidResponse {
2527
message: format!("Invalid wire protocol opcode: {}", other),
2628
}

0 commit comments

Comments
 (0)