Skip to content

Commit 569dca4

Browse files
authored
RUST-1396 Respawn mongocryptd on server selection failure (#730)
1 parent ba3b349 commit 569dca4

File tree

8 files changed

+203
-64
lines changed

8 files changed

+203
-64
lines changed

Cargo.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,15 @@ default = ["tokio-runtime"]
3131
tokio-runtime = [
3232
"tokio/macros",
3333
"tokio/net",
34+
"tokio/process",
3435
"tokio/rt",
3536
"tokio/time",
3637
"serde_bytes",
3738
]
3839
async-std-runtime = [
3940
"async-std",
4041
"async-std/attributes",
42+
"async-std/unstable",
4143
"async-std-resolver",
4244
"tokio-util/compat",
4345
]
@@ -66,7 +68,7 @@ zlib-compression = ["flate2"]
6668
snappy-compression = ["snap"]
6769

6870
# DO NOT USE; see https://jira.mongodb.org/browse/RUST-569 for the status of CSFLE support in the Rust driver.
69-
csfle = ["mongocrypt", "which", "rayon"]
71+
csfle = ["mongocrypt", "rayon"]
7072

7173
[dependencies]
7274
async-trait = "0.1.42"
@@ -106,7 +108,6 @@ trust-dns-proto = "0.21.2"
106108
trust-dns-resolver = "0.21.2"
107109
typed-builder = "0.10.0"
108110
webpki-roots = "0.22.4"
109-
which = { version = "4.2.5", optional = true }
110111
zstd = { version = "0.11.2", optional = true }
111112

112113
[dependencies.async-std]

src/client/csfle.rs

Lines changed: 23 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,7 @@ pub mod client_encryption;
22
pub mod options;
33
mod state_machine;
44

5-
use std::{
6-
path::Path,
7-
process::{Command, Stdio},
8-
};
5+
use std::path::Path;
96

107
use derivative::Derivative;
118
use mongocrypt::Crypt;
@@ -26,7 +23,7 @@ use options::{
2623
EO_MONGOCRYPTD_URI,
2724
};
2825

29-
use self::state_machine::CryptExecutor;
26+
use self::state_machine::{CryptExecutor, MongocryptdOptions};
3027

3128
use super::WeakClient;
3229

@@ -49,15 +46,16 @@ struct AuxClients {
4946
impl ClientState {
5047
pub(super) async fn new(client: &Client, mut opts: AutoEncryptionOptions) -> Result<Self> {
5148
let crypt = Self::make_crypt(&opts)?;
52-
let mongocryptd_client = Self::spawn_mongocryptd_if_needed(&opts, &crypt).await?;
49+
let mongocryptd_opts = Self::make_mongocryptd_opts(&opts, &crypt)?;
5350
let aux_clients = Self::make_aux_clients(client, &opts)?;
54-
let exec = CryptExecutor::new(
51+
let exec = CryptExecutor::new_implicit(
5552
aux_clients.key_vault_client,
5653
opts.key_vault_namespace.clone(),
57-
mongocryptd_client,
58-
aux_clients.metadata_client,
5954
opts.tls_options.take(),
60-
)?;
55+
mongocryptd_opts,
56+
aux_clients.metadata_client,
57+
)
58+
.await?;
6159

6260
Ok(Self {
6361
crypt,
@@ -98,57 +96,37 @@ impl ClientState {
9896
Ok(crypt)
9997
}
10098

101-
/// If crypt_shared is unavailable and options have not disabled it, spawn mongocryptd. Returns
102-
/// a `Client` connected to the mongocryptd if one was spawned.
103-
async fn spawn_mongocryptd_if_needed(
99+
fn make_mongocryptd_opts(
104100
opts: &AutoEncryptionOptions,
105101
crypt: &Crypt,
106-
) -> Result<Option<Client>> {
102+
) -> Result<Option<MongocryptdOptions>> {
107103
if opts.bypass_auto_encryption == Some(true)
108104
|| opts.extra_option(&EO_MONGOCRYPTD_BYPASS_SPAWN)? == Some(true)
109105
|| crypt.shared_lib_version().is_some()
110106
|| opts.extra_option(&EO_CRYPT_SHARED_REQUIRED)? == Some(true)
111107
{
112108
return Ok(None);
113109
}
114-
let which_path;
115-
let bin_path = match opts.extra_option(&EO_MONGOCRYPTD_SPAWN_PATH)? {
116-
Some(s) => Path::new(s),
117-
None => {
118-
which_path = which::which("mongocryptd")
119-
.map_err(|e| Error::invalid_argument(format!("{}", e)))?;
120-
&which_path
121-
}
122-
};
123-
let mut args: Vec<&str> = vec![];
124-
let has_idle = if let Some(spawn_args) = opts.extra_option(&EO_MONGOCRYPTD_SPAWN_ARGS)? {
125-
let mut has_idle = false;
126-
for arg in spawn_args {
110+
let spawn_path = opts
111+
.extra_option(&EO_MONGOCRYPTD_SPAWN_PATH)?
112+
.map(std::path::PathBuf::from);
113+
let mut spawn_args = vec![];
114+
if let Some(args) = opts.extra_option(&EO_MONGOCRYPTD_SPAWN_ARGS)? {
115+
for arg in args {
127116
let str_arg = arg.as_str().ok_or_else(|| {
128117
Error::invalid_argument("non-string entry in mongocryptdSpawnArgs")
129118
})?;
130-
has_idle |= str_arg.starts_with("--idleShutdownTimeoutSecs");
131-
args.push(str_arg);
119+
spawn_args.push(str_arg.to_string());
132120
}
133-
has_idle
134-
} else {
135-
false
136-
};
137-
if !has_idle {
138-
args.push("--idleShutdownTimeoutSecs=60");
139121
}
140-
Command::new(bin_path)
141-
.args(&args)
142-
.stdout(Stdio::null())
143-
.stderr(Stdio::null())
144-
.spawn()?;
145-
146122
let uri = opts
147123
.extra_option(&EO_MONGOCRYPTD_URI)?
148-
.unwrap_or("mongodb://localhost:27020");
149-
let mut options = super::options::ClientOptions::parse_uri(uri, None).await?;
150-
options.server_selection_timeout = Some(std::time::Duration::from_millis(10_000));
151-
Ok(Some(Client::with_options(options)?))
124+
.map(|s| s.to_string());
125+
Ok(Some(MongocryptdOptions {
126+
spawn_path,
127+
spawn_args,
128+
uri,
129+
}))
152130
}
153131

154132
fn make_aux_clients(

src/client/csfle/client_encryption.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,9 @@ impl ClientEncryption {
3636
/// Create a new key vault handle with the given options.
3737
pub fn new(options: ClientEncryptionOptions) -> Result<Self> {
3838
let crypt = Crypt::builder().build()?;
39-
let exec = CryptExecutor::new(
39+
let exec = CryptExecutor::new_explicit(
4040
options.key_vault_client.weak(),
4141
options.key_vault_namespace.clone(),
42-
None,
43-
None,
4442
options.tls_options,
4543
)?;
4644
let key_vault = options

src/client/csfle/state_machine.rs

Lines changed: 120 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,24 @@
1-
use std::convert::TryInto;
1+
use std::{
2+
convert::TryInto,
3+
path::{Path, PathBuf},
4+
time::Duration,
5+
};
26

37
use bson::{Document, RawDocument, RawDocumentBuf};
48
use futures_util::{stream, TryStreamExt};
59
use mongocrypt::ctx::{Ctx, State};
610
use rayon::ThreadPool;
711
use tokio::{
812
io::{AsyncReadExt, AsyncWriteExt},
9-
sync::oneshot,
13+
sync::{oneshot, Mutex},
1014
};
1115

1216
use crate::{
1317
client::{options::ServerAddress, WeakClient},
1418
cmap::options::StreamOptions,
1519
error::{Error, Result},
1620
operation::{RawOutput, RunCommand},
17-
runtime::AsyncStream,
21+
runtime::{AsyncStream, Process},
1822
Client,
1923
Namespace,
2024
};
@@ -25,18 +29,16 @@ use super::options::KmsProvidersTlsOptions;
2529
pub(crate) struct CryptExecutor {
2630
key_vault_client: WeakClient,
2731
key_vault_namespace: Namespace,
28-
mongocryptd_client: Option<Client>,
29-
metadata_client: Option<WeakClient>,
3032
tls_options: Option<KmsProvidersTlsOptions>,
3133
crypto_threads: ThreadPool,
34+
mongocryptd: Option<Mongocryptd>,
35+
metadata_client: Option<WeakClient>,
3236
}
3337

3438
impl CryptExecutor {
35-
pub(crate) fn new(
39+
pub(crate) fn new_explicit(
3640
key_vault_client: WeakClient,
3741
key_vault_namespace: Namespace,
38-
mongocryptd_client: Option<Client>,
39-
metadata_client: Option<WeakClient>,
4042
tls_options: Option<KmsProvidersTlsOptions>,
4143
) -> Result<Self> {
4244
let num_cpus = std::thread::available_parallelism()?.get();
@@ -47,13 +49,30 @@ impl CryptExecutor {
4749
Ok(Self {
4850
key_vault_client,
4951
key_vault_namespace,
50-
mongocryptd_client,
51-
metadata_client,
5252
tls_options,
5353
crypto_threads,
54+
mongocryptd: None,
55+
metadata_client: None,
5456
})
5557
}
5658

59+
pub(crate) async fn new_implicit(
60+
key_vault_client: WeakClient,
61+
key_vault_namespace: Namespace,
62+
tls_options: Option<KmsProvidersTlsOptions>,
63+
mongocryptd_opts: Option<MongocryptdOptions>,
64+
metadata_client: Option<WeakClient>,
65+
) -> Result<Self> {
66+
let mongocryptd = match mongocryptd_opts {
67+
Some(opts) => Some(Mongocryptd::new(opts).await?),
68+
None => None,
69+
};
70+
let mut exec = Self::new_explicit(key_vault_client, key_vault_namespace, tls_options)?;
71+
exec.mongocryptd = mongocryptd;
72+
exec.metadata_client = metadata_client;
73+
Ok(exec)
74+
}
75+
5776
pub(crate) async fn run_ctx(&self, ctx: Ctx, db: Option<&str>) -> Result<RawDocumentBuf> {
5877
let mut result = None;
5978
// This needs to be a `Result` so that the `Ctx` can be temporarily owned by the processing
@@ -89,11 +108,25 @@ impl CryptExecutor {
89108
Error::internal("db required for NeedMongoMarkings state")
90109
})?;
91110
let op = RawOutput(RunCommand::new_raw(db.to_string(), command, None, None)?);
92-
let mongocryptd_client = self
93-
.mongocryptd_client
111+
let mongocryptd = self
112+
.mongocryptd
94113
.as_ref()
95114
.ok_or_else(|| Error::internal("mongocryptd client not found"))?;
96-
let response = mongocryptd_client.execute_operation(op, None).await?;
115+
let result = mongocryptd.client.execute_operation(op.clone(), None).await;
116+
let response = match result {
117+
Ok(r) => r,
118+
Err(e) if e.is_server_selection_error() => {
119+
mongocryptd.respawn().await?;
120+
match mongocryptd.client.execute_operation(op, None).await {
121+
Ok(r) => r,
122+
Err(new_e) if !new_e.is_server_selection_error() => {
123+
return Err(new_e)
124+
}
125+
Err(_) => return Err(e),
126+
}
127+
}
128+
Err(e) => return Err(e),
129+
};
97130
ctx.mongo_feed(response.raw_body())?;
98131
ctx.mongo_done()?;
99132
}
@@ -181,6 +214,80 @@ impl CryptExecutor {
181214
}
182215
}
183216

217+
#[derive(Debug)]
218+
struct Mongocryptd {
219+
opts: MongocryptdOptions,
220+
client: Client,
221+
child: Mutex<Result<Process>>,
222+
}
223+
224+
impl Mongocryptd {
225+
const DEFAULT_URI: &'static str = "mongodb://localhost:27020";
226+
const SERVER_SELECTION_TIMEOUT: Duration = Duration::from_millis(10_000);
227+
228+
async fn new(opts: MongocryptdOptions) -> Result<Self> {
229+
let child = Mutex::new(Ok(Self::spawn(&opts)?));
230+
let uri = opts.uri.as_deref().unwrap_or(Self::DEFAULT_URI);
231+
let mut options = crate::options::ClientOptions::parse_uri(uri, None).await?;
232+
options.server_selection_timeout = Some(Self::SERVER_SELECTION_TIMEOUT);
233+
let client = Client::with_options(options)?;
234+
Ok(Self {
235+
opts,
236+
client,
237+
child,
238+
})
239+
}
240+
241+
async fn respawn(&self) -> Result<()> {
242+
let mut child = match self.child.try_lock() {
243+
Ok(l) => l,
244+
_ => {
245+
// Another respawn is in progress. Lock to wait for it.
246+
return unit_err(&*self.child.lock().await);
247+
}
248+
};
249+
let new_child = Self::spawn(&self.opts);
250+
if new_child.is_ok() {
251+
if let Ok(old_child) = child.as_mut() {
252+
let _ = old_child.wait().await;
253+
}
254+
}
255+
*child = new_child;
256+
unit_err(&*child)
257+
}
258+
259+
fn spawn(opts: &MongocryptdOptions) -> Result<Process> {
260+
let bin_path = match &opts.spawn_path {
261+
Some(s) => s,
262+
None => Path::new("mongocryptd"),
263+
};
264+
let mut args: Vec<&str> = vec![];
265+
let mut has_idle = false;
266+
for arg in &opts.spawn_args {
267+
has_idle |= arg.starts_with("--idleShutdownTimeoutSecs");
268+
args.push(arg);
269+
}
270+
if !has_idle {
271+
args.push("--idleShutdownTimeoutSecs=60");
272+
}
273+
Process::spawn(bin_path, &args)
274+
}
275+
}
276+
277+
fn unit_err<T>(r: &Result<T>) -> Result<()> {
278+
match r {
279+
Ok(_) => Ok(()),
280+
Err(e) => Err(e.clone()),
281+
}
282+
}
283+
284+
#[derive(Debug)]
285+
pub(crate) struct MongocryptdOptions {
286+
pub(crate) spawn_path: Option<PathBuf>,
287+
pub(crate) spawn_args: Vec<String>,
288+
pub(crate) uri: Option<String>,
289+
}
290+
184291
fn result_ref<T>(r: &Result<T>) -> Result<&T> {
185292
r.as_ref().map_err(Error::clone)
186293
}

src/operation/raw_output.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use super::Operation;
77

88
/// Forwards all implementation to the wrapped `Operation`, but returns the response unparsed and
99
/// unvalidated as a `RawCommandResponse`.
10+
#[derive(Clone)]
1011
pub(crate) struct RawOutput<Op>(pub(crate) Op);
1112

1213
impl<Op: Operation> Operation for RawOutput<Op> {

src/operation/run_command/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use crate::{
1515
selection_criteria::SelectionCriteria,
1616
};
1717

18-
#[derive(Debug)]
18+
#[derive(Debug, Clone)]
1919
pub(crate) struct RunCommand<'conn> {
2020
db: String,
2121
command: RawDocumentBuf,

src/runtime/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ mod http;
33
#[cfg(feature = "async-std-runtime")]
44
mod interval;
55
mod join_handle;
6+
#[cfg(feature = "csfle")]
7+
mod process;
68
mod resolver;
79
mod stream;
810
mod sync_read_ext;
@@ -14,6 +16,8 @@ mod worker_handle;
1416

1517
use std::{future::Future, net::SocketAddr, time::Duration};
1618

19+
#[cfg(feature = "csfle")]
20+
pub(crate) use self::process::Process;
1721
pub(crate) use self::{
1822
acknowledged_message::AcknowledgedMessage,
1923
join_handle::AsyncJoinHandle,

0 commit comments

Comments
 (0)