Skip to content

Commit 1d52ed7

Browse files
authored
RUST-1383 Create or drop auxiliary collections when doing so for a collection with encrypted fields. (#710)
1 parent 5489613 commit 1d52ed7

File tree

7 files changed

+200
-72
lines changed

7 files changed

+200
-72
lines changed

src/client/csfle.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use mongocrypt::Crypt;
1111
use crate::{
1212
error::{Error, Result},
1313
Client,
14+
Namespace,
1415
};
1516

1617
use options::{
@@ -174,3 +175,21 @@ impl ClientState {
174175
})
175176
}
176177
}
178+
179+
pub(crate) fn aux_collections(
180+
base_ns: &Namespace,
181+
enc_fields: &bson::Document,
182+
) -> Result<Vec<Namespace>> {
183+
let mut out = vec![];
184+
for &key in &["esc", "ecc", "ecoc"] {
185+
let coll = match enc_fields.get_str(format!("{}Collection", key)) {
186+
Ok(s) => s.to_string(),
187+
Err(_) => format!("enxcol_.{}.{}", base_ns.coll, key),
188+
};
189+
out.push(Namespace {
190+
coll,
191+
..base_ns.clone()
192+
});
193+
}
194+
Ok(out)
195+
}

src/client/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
pub mod auth;
22
#[cfg(feature = "csfle")]
3-
mod csfle;
3+
pub(crate) mod csfle;
44
mod executor;
55
pub mod options;
66
pub mod session;

src/coll/mod.rs

Lines changed: 89 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -217,18 +217,25 @@ impl<T> Collection<T> {
217217
self.inner.write_concern.as_ref()
218218
}
219219

220+
#[allow(clippy::needless_option_as_deref)]
220221
async fn drop_common(
221222
&self,
222223
options: impl Into<Option<DropCollectionOptions>>,
223224
session: impl Into<Option<&mut ClientSession>>,
224225
) -> Result<()> {
225-
let session = session.into();
226+
let mut session = session.into();
226227

227-
let mut options = options.into();
228+
let mut options: Option<DropCollectionOptions> = options.into();
228229
resolve_options!(self, options, [write_concern]);
229230

231+
#[cfg(feature = "csfle")]
232+
self.drop_aux_collections(options.as_ref(), session.as_deref_mut())
233+
.await?;
234+
230235
let drop = DropCollection::new(self.namespace(), options);
231-
self.client().execute_operation(drop, session).await
236+
self.client()
237+
.execute_operation(drop, session.as_deref_mut())
238+
.await
232239
}
233240

234241
/// Drops the collection, deleting all data and indexes stored in it.
@@ -246,6 +253,67 @@ impl<T> Collection<T> {
246253
self.drop_common(options, session).await
247254
}
248255

256+
#[cfg(feature = "csfle")]
257+
#[allow(clippy::needless_option_as_deref)]
258+
async fn drop_aux_collections(
259+
&self,
260+
options: Option<&DropCollectionOptions>,
261+
mut session: Option<&mut ClientSession>,
262+
) -> Result<()> {
263+
// Find associated `encrypted_fields`:
264+
// * from options to this call
265+
let mut enc_fields = options.and_then(|o| o.encrypted_fields.as_ref());
266+
let enc_opts = self.client().auto_encryption_opts().await;
267+
// * from client-wide `encrypted_fields_map`:
268+
let client_enc_fields = enc_opts
269+
.as_ref()
270+
.and_then(|eo| eo.encrypted_fields_map.as_ref());
271+
if enc_fields.is_none() {
272+
enc_fields =
273+
client_enc_fields.and_then(|efm| efm.get(&format!("{}", self.namespace())));
274+
}
275+
// * from a `list_collections` call:
276+
let found;
277+
if enc_fields.is_none() && client_enc_fields.is_some() {
278+
let filter = doc! { "name": self.name() };
279+
let mut specs: Vec<_> = match session.as_deref_mut() {
280+
Some(s) => {
281+
let mut cursor = self
282+
.inner
283+
.db
284+
.list_collections_with_session(filter, None, s)
285+
.await?;
286+
cursor.stream(s).try_collect().await?
287+
}
288+
None => {
289+
self.inner
290+
.db
291+
.list_collections(filter, None)
292+
.await?
293+
.try_collect()
294+
.await?
295+
}
296+
};
297+
if let Some(spec) = specs.pop() {
298+
if let Some(enc) = spec.options.encrypted_fields {
299+
found = enc;
300+
enc_fields = Some(&found);
301+
}
302+
}
303+
}
304+
305+
// Drop the collections.
306+
if let Some(enc_fields) = enc_fields {
307+
for ns in crate::client::csfle::aux_collections(&self.namespace(), enc_fields)? {
308+
let drop = DropCollection::new(ns, options.cloned());
309+
self.client()
310+
.execute_operation(drop, session.as_deref_mut())
311+
.await?;
312+
}
313+
}
314+
Ok(())
315+
}
316+
249317
/// Runs an aggregation operation.
250318
///
251319
/// See the documentation [here](https://www.mongodb.com/docs/manual/aggregation/) for more
@@ -387,7 +455,7 @@ impl<T> Collection<T> {
387455
.await
388456
}
389457

390-
async fn create_index_common(
458+
pub(crate) async fn create_index_common(
391459
&self,
392460
index: IndexModel,
393461
options: impl Into<Option<CreateIndexOptions>>,
@@ -1379,6 +1447,21 @@ impl Namespace {
13791447
coll: String::new(),
13801448
}
13811449
}
1450+
1451+
pub(crate) fn from_str(s: &str) -> Option<Self> {
1452+
let mut parts = s.split('.');
1453+
1454+
let db = parts.next();
1455+
let coll = parts.collect::<Vec<_>>().join(".");
1456+
1457+
match (db, coll) {
1458+
(Some(db), coll) if !coll.is_empty() => Some(Self {
1459+
db: db.to_string(),
1460+
coll,
1461+
}),
1462+
_ => None,
1463+
}
1464+
}
13821465
}
13831466

13841467
impl fmt::Display for Namespace {
@@ -1393,18 +1476,8 @@ impl<'de> Deserialize<'de> for Namespace {
13931476
D: Deserializer<'de>,
13941477
{
13951478
let s: String = Deserialize::deserialize(deserializer)?;
1396-
let mut parts = s.split('.');
1397-
1398-
let db = parts.next();
1399-
let coll = parts.collect::<Vec<_>>().join(".");
1400-
1401-
match (db, coll) {
1402-
(Some(db), coll) if !coll.is_empty() => Ok(Self {
1403-
db: db.to_string(),
1404-
coll,
1405-
}),
1406-
_ => Err(D::Error::custom("Missing one or more fields in namespace")),
1407-
}
1479+
Self::from_str(&s)
1480+
.ok_or_else(|| D::Error::custom("Missing one or more fields in namespace"))
14081481
}
14091482
}
14101483

src/coll/options.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -948,6 +948,11 @@ pub struct CreateIndexOptions {
948948
pub struct DropCollectionOptions {
949949
/// The write concern for the operation.
950950
pub write_concern: Option<WriteConcern>,
951+
952+
/// Map of encrypted fields for the collection.
953+
#[cfg(feature = "csfle")]
954+
#[serde(skip)]
955+
pub encrypted_fields: Option<Document>,
951956
}
952957

953958
/// Specifies the options to a

src/db/mod.rs

Lines changed: 81 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ pub mod options;
22

33
use std::{fmt::Debug, sync::Arc};
44

5+
#[cfg(feature = "csfle")]
6+
use bson::doc;
57
use futures_util::stream::TryStreamExt;
68

79
use crate::{
@@ -285,23 +287,93 @@ impl Database {
285287
.await
286288
}
287289

290+
#[allow(clippy::needless_option_as_deref)]
288291
async fn create_collection_common(
289292
&self,
290293
name: impl AsRef<str>,
291294
options: impl Into<Option<CreateCollectionOptions>>,
292295
session: impl Into<Option<&mut ClientSession>>,
293296
) -> Result<()> {
294-
let mut options = options.into();
297+
let mut options: Option<CreateCollectionOptions> = options.into();
295298
resolve_options!(self, options, [write_concern]);
299+
let mut session = session.into();
296300

297-
let create = Create::new(
298-
Namespace {
299-
db: self.name().to_string(),
300-
coll: name.as_ref().to_string(),
301-
},
302-
options,
303-
);
304-
self.client().execute_operation(create, session).await
301+
let ns = Namespace {
302+
db: self.name().to_string(),
303+
coll: name.as_ref().to_string(),
304+
};
305+
306+
#[cfg(feature = "csfle")]
307+
self.create_aux_collections(&ns, &mut options, session.as_deref_mut())
308+
.await?;
309+
310+
let create = Create::new(ns.clone(), options);
311+
self.client()
312+
.execute_operation(create, session.as_deref_mut())
313+
.await?;
314+
315+
#[cfg(feature = "csfle")]
316+
{
317+
let coll = self.collection::<Document>(&ns.coll);
318+
coll.create_index_common(
319+
crate::IndexModel {
320+
keys: doc! {"__safeContent__": 1},
321+
options: None,
322+
},
323+
None,
324+
session.as_deref_mut(),
325+
)
326+
.await?;
327+
}
328+
329+
Ok(())
330+
}
331+
332+
#[cfg(feature = "csfle")]
333+
#[allow(clippy::needless_option_as_deref)]
334+
async fn create_aux_collections(
335+
&self,
336+
base_ns: &Namespace,
337+
options: &mut Option<CreateCollectionOptions>,
338+
mut session: Option<&mut ClientSession>,
339+
) -> Result<()> {
340+
let has_encrypted_fields = options
341+
.as_ref()
342+
.and_then(|o| o.encrypted_fields.as_ref())
343+
.is_some();
344+
// If options does not have `associated_fields`, populate it from client-wide
345+
// `encrypted_fields_map`:
346+
if !has_encrypted_fields {
347+
let enc_opts = self.client().auto_encryption_opts().await;
348+
if let Some(enc_opts_fields) = enc_opts
349+
.as_ref()
350+
.and_then(|eo| eo.encrypted_fields_map.as_ref())
351+
.and_then(|efm| efm.get(&format!("{}", &base_ns)))
352+
{
353+
options
354+
.get_or_insert_with(Default::default)
355+
.encrypted_fields = Some(enc_opts_fields.clone());
356+
}
357+
}
358+
359+
if let Some(opts) = options.as_ref() {
360+
if let Some(enc_fields) = opts.encrypted_fields.as_ref() {
361+
for ns in crate::client::csfle::aux_collections(base_ns, enc_fields)? {
362+
let mut sub_opts = opts.clone();
363+
sub_opts.clustered_index = Some(self::options::ClusteredIndex {
364+
key: doc! { "_id": 1 },
365+
unique: true,
366+
name: None,
367+
v: None,
368+
});
369+
let create = Create::new(ns, Some(sub_opts));
370+
self.client()
371+
.execute_operation(create, session.as_deref_mut())
372+
.await?;
373+
}
374+
}
375+
}
376+
Ok(())
305377
}
306378

307379
/// Creates a new collection in the database with the given `name` and `options`.

src/db/options.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,10 @@ pub struct CreateCollectionOptions {
110110

111111
/// Options for clustered collections.
112112
pub clustered_index: Option<ClusteredIndex>,
113+
114+
/// Map of encrypted fields for the created collection.
115+
#[cfg(feature = "csfle")]
116+
pub encrypted_fields: Option<Document>,
113117
}
114118

115119
/// Specifies how strictly the database should apply validation rules to existing documents during

src/operation/drop_collection/test.rs

Lines changed: 1 addition & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,54 +1,9 @@
11
use crate::{
22
bson::doc,
3-
cmap::StreamDescription,
4-
concern::{Acknowledgment, WriteConcern},
53
error::{ErrorKind, WriteFailure},
6-
operation::{test::handle_response_test, DropCollection, Operation},
7-
options::DropCollectionOptions,
8-
Namespace,
4+
operation::{test::handle_response_test, DropCollection},
95
};
106

11-
#[test]
12-
fn build() {
13-
let options = DropCollectionOptions {
14-
write_concern: Some(WriteConcern {
15-
w: Some(Acknowledgment::Custom("abc".to_string())),
16-
..Default::default()
17-
}),
18-
};
19-
20-
let ns = Namespace {
21-
db: "test_db".to_string(),
22-
coll: "test_coll".to_string(),
23-
};
24-
25-
let mut op = DropCollection::new(ns.clone(), Some(options));
26-
27-
let description = StreamDescription::new_testing();
28-
let cmd = op.build(&description).expect("build should succeed");
29-
30-
assert_eq!(cmd.name.as_str(), "drop");
31-
assert_eq!(cmd.target_db.as_str(), "test_db");
32-
assert_eq!(
33-
cmd.body,
34-
doc! {
35-
"drop": "test_coll",
36-
"writeConcern": { "w": "abc" }
37-
}
38-
);
39-
40-
let mut op = DropCollection::new(ns, None);
41-
let cmd = op.build(&description).expect("build should succeed");
42-
assert_eq!(cmd.name.as_str(), "drop");
43-
assert_eq!(cmd.target_db.as_str(), "test_db");
44-
assert_eq!(
45-
cmd.body,
46-
doc! {
47-
"drop": "test_coll",
48-
}
49-
);
50-
}
51-
527
#[test]
538
fn handle_success() {
549
let op = DropCollection::empty();

0 commit comments

Comments
 (0)