Skip to content

Commit 223cee3

Browse files
authored
RUST-360 Streaming monitoring protocol (#721)
This also fixes RUST-1464
1 parent d37c1bb commit 223cee3

File tree

132 files changed

+9743
-6545
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

132 files changed

+9743
-6545
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,6 @@ md-5 = "0.10.1"
8888
mongocrypt = { git = "https://github.com/mongodb/libmongocrypt-rust.git", branch = "main", optional = true }
8989
openssl = { version = "0.10.38", optional = true }
9090
openssl-probe = { version = "0.1.5", optional = true }
91-
os_info = { version = "3.0.1", default-features = false }
9291
percent-encoding = "2.0.0"
9392
rand = { version = "0.8.3", features = ["small_rng"] }
9493
rayon = { version = "1.5.3", optional = true }
@@ -170,6 +169,7 @@ serde = { version = "*", features = ["rc"] }
170169
serde_json = "1.0.64"
171170
semver = "1.0.0"
172171
time = "0.3.9"
172+
regex = "1.6.0"
173173

174174
[package.metadata.docs.rs]
175175
rustdoc-args = ["--cfg", "docsrs"]

src/client/csfle/state_machine.rs

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,9 @@ use tokio::{
1515

1616
use crate::{
1717
client::{options::ServerAddress, WeakClient},
18-
cmap::options::StreamOptions,
1918
error::{Error, Result},
2019
operation::{RawOutput, RunCommand},
21-
runtime::{AsyncStream, Process},
20+
runtime::{AsyncStream, Process, TlsConfig},
2221
Client,
2322
Namespace,
2423
};
@@ -165,13 +164,9 @@ impl CryptExecutor {
165164
.and_then(|tls| tls.get(&provider))
166165
.cloned()
167166
.unwrap_or_default();
168-
let mut stream = AsyncStream::connect(
169-
StreamOptions::builder()
170-
.address(addr)
171-
.tls_options(tls_options)
172-
.build(),
173-
)
174-
.await?;
167+
let mut stream =
168+
AsyncStream::connect(addr, Some(&TlsConfig::new(tls_options)?))
169+
.await?;
175170
stream.write_all(kms_ctx.message()?).await?;
176171
let mut buf = vec![0];
177172
while kms_ctx.bytes_needed() > 0 {

src/client/executor.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -620,6 +620,7 @@ impl Client {
620620
let raw_cmd = RawCommand {
621621
name: cmd_name.clone(),
622622
target_db,
623+
exhaust_allowed: false,
623624
bytes: serialized,
624625
};
625626

src/client/mod.rs

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -409,22 +409,21 @@ impl Client {
409409
if let Some(server) = server_selection::attempt_to_select_server(
410410
criteria,
411411
&state.description,
412-
&state.servers,
412+
&state.servers(),
413413
)? {
414414
return Ok(server);
415415
}
416416

417-
self.inner.topology.request_update();
417+
watcher.request_immediate_check();
418418

419419
let change_occurred = start_time.elapsed() < timeout
420420
&& watcher
421421
.wait_for_update(timeout - start_time.elapsed())
422422
.await;
423423
if !change_occurred {
424424
return Err(ErrorKind::ServerSelection {
425-
message: self
426-
.inner
427-
.topology
425+
message: state
426+
.description
428427
.server_selection_timeout_error_message(criteria),
429428
}
430429
.into());
@@ -433,12 +432,13 @@ impl Client {
433432
}
434433

435434
#[cfg(all(test, not(feature = "sync"), not(feature = "tokio-sync")))]
436-
pub(crate) async fn get_hosts(&self) -> Vec<String> {
435+
pub(crate) fn get_hosts(&self) -> Vec<String> {
437436
let watcher = self.inner.topology.watch();
438437
let state = watcher.peek_latest();
439438

440-
let servers = state.servers.keys();
441-
servers
439+
state
440+
.servers()
441+
.keys()
442442
.map(|stream_address| format!("{}", stream_address))
443443
.collect()
444444
}
@@ -458,6 +458,11 @@ impl Client {
458458
.clone()
459459
}
460460

461+
#[cfg(test)]
462+
pub(crate) fn topology(&self) -> &crate::sdam::Topology {
463+
&self.inner.topology
464+
}
465+
461466
#[cfg(feature = "csfle")]
462467
pub(crate) fn weak(&self) -> WeakClient {
463468
WeakClient {
@@ -475,11 +480,6 @@ impl Client {
475480
})
476481
.ok()
477482
}
478-
479-
#[cfg(test)]
480-
pub(crate) fn topology(&self) -> &Topology {
481-
&self.inner.topology
482-
}
483483
}
484484

485485
#[cfg(feature = "csfle")]

src/client/options/mod.rs

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use crate::{
3333
error::{Error, ErrorKind, Result},
3434
event::{cmap::CmapEventHandler, command::CommandEventHandler, sdam::SdamEventHandler},
3535
options::ReadConcernLevel,
36-
sdam::{DEFAULT_HEARTBEAT_FREQUENCY, IDLE_WRITE_PERIOD, MIN_HEARTBEAT_FREQUENCY},
36+
sdam::{verify_max_staleness, DEFAULT_HEARTBEAT_FREQUENCY, MIN_HEARTBEAT_FREQUENCY},
3737
selection_criteria::{ReadPreference, SelectionCriteria, TagSet},
3838
srv::{OriginalSrvInfo, SrvResolver},
3939
};
@@ -568,7 +568,7 @@ pub struct ClientOptions {
568568
#[derive(Debug, Clone, Default)]
569569
pub(crate) struct TestOptions {
570570
/// Override MIN_HEARTBEAT_FREQUENCY.
571-
pub(crate) heartbeat_freq: Option<Duration>,
571+
pub(crate) min_heartbeat_freq: Option<Duration>,
572572

573573
/// Disable server and SRV-polling monitor threads.
574574
pub(crate) disable_monitoring_threads: bool,
@@ -1318,17 +1318,10 @@ impl ClientOptions {
13181318

13191319
if let Some(SelectionCriteria::ReadPreference(ref rp)) = self.selection_criteria {
13201320
if let Some(max_staleness) = rp.max_staleness() {
1321-
let smallest_max_staleness = std::cmp::max(
1322-
Duration::from_secs(90),
1323-
self.heartbeat_freq.unwrap_or(DEFAULT_HEARTBEAT_FREQUENCY) + IDLE_WRITE_PERIOD,
1324-
);
1325-
1326-
if max_staleness < smallest_max_staleness {
1327-
return Err(Error::invalid_argument(format!(
1328-
"invalid maxStaleness value: must be at least {} seconds",
1329-
smallest_max_staleness.as_secs()
1330-
)));
1331-
}
1321+
verify_max_staleness(
1322+
max_staleness,
1323+
self.heartbeat_freq.unwrap_or(DEFAULT_HEARTBEAT_FREQUENCY),
1324+
)?;
13321325
}
13331326
}
13341327

src/cmap/conn/command.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
use std::time::Duration;
2-
31
use bson::{RawDocument, RawDocumentBuf};
42
use serde::{de::DeserializeOwned, Deserialize, Serialize};
53

@@ -20,6 +18,8 @@ use crate::{
2018
pub(crate) struct RawCommand {
2119
pub(crate) name: String,
2220
pub(crate) target_db: String,
21+
/// Whether or not the server may respond to this command multiple times via the moreToComeBit.
22+
pub(crate) exhaust_allowed: bool,
2323
pub(crate) bytes: Vec<u8>,
2424
}
2525

@@ -38,6 +38,9 @@ pub(crate) struct Command<T = Document> {
3838
#[serde(skip)]
3939
pub(crate) name: String,
4040

41+
#[serde(skip)]
42+
pub(crate) exhaust_allowed: bool,
43+
4144
#[serde(flatten)]
4245
pub(crate) body: T,
4346

@@ -71,6 +74,7 @@ impl<T> Command<T> {
7174
Self {
7275
name,
7376
target_db,
77+
exhaust_allowed: false,
7478
body,
7579
lsid: None,
7680
cluster_time: None,
@@ -93,6 +97,7 @@ impl<T> Command<T> {
9397
Self {
9498
name,
9599
target_db,
100+
exhaust_allowed: false,
96101
body,
97102
lsid: None,
98103
cluster_time: None,
@@ -245,15 +250,14 @@ impl RawCommandResponse {
245250
.map_err(|_| Error::invalid_authentication_response(mechanism_name))
246251
}
247252

248-
pub(crate) fn into_hello_reply(self, round_trip_time: Duration) -> Result<HelloReply> {
253+
pub(crate) fn into_hello_reply(self) -> Result<HelloReply> {
249254
match self.body::<CommandResponse<HelloCommandResponse>>() {
250255
Ok(response) if response.is_success() => {
251256
let server_address = self.source_address().clone();
252257
let cluster_time = response.cluster_time().cloned();
253258
Ok(HelloReply {
254259
server_address,
255260
command_response: response.body,
256-
round_trip_time,
257261
cluster_time,
258262
raw_command_response: self.into_raw_document_buf(),
259263
})

0 commit comments

Comments
 (0)