diff --git a/src/action/bulk_write.rs b/src/action/bulk_write.rs index 49ed5a665..04b85530d 100644 --- a/src/action/bulk_write.rs +++ b/src/action/bulk_write.rs @@ -117,20 +117,6 @@ where } async fn execute_inner(mut self) -> Result { - #[cfg(feature = "in-use-encryption")] - if self.client.should_auto_encrypt().await { - use mongocrypt::error::{Error as EncryptionError, ErrorKind as EncryptionErrorKind}; - - let error = EncryptionError { - kind: EncryptionErrorKind::Client, - code: None, - message: Some( - "bulkWrite does not currently support automatic encryption".to_string(), - ), - }; - return Err(ErrorKind::Encryption(error).into()); - } - resolve_write_concern_with_session!( self.client, self.options, @@ -148,7 +134,8 @@ where &self.models[total_attempted..], total_attempted, self.options.as_ref(), - ); + ) + .await; let result = self .client .execute_operation::>( diff --git a/src/bson_util.rs b/src/bson_util.rs index 3712e8b5e..931cf3b81 100644 --- a/src/bson_util.rs +++ b/src/bson_util.rs @@ -19,7 +19,9 @@ use crate::{ RawBsonRef, RawDocumentBuf, }, + bson_compat::CStr, checked::Checked, + cmap::Command, error::{Error, ErrorKind, Result}, runtime::SyncLittleEndianRead, }; @@ -246,6 +248,49 @@ pub(crate) fn get_or_prepend_id_field(doc: &mut RawDocumentBuf) -> Result } } +/// A helper trait for working with collections of raw documents. This is useful for unifying +/// command-building implementations that conditionally construct both document sequences and a +/// single command document. +pub(crate) trait RawDocumentCollection: Default { + /// Calculates the total number of bytes that would be added to a collection of this type by the + /// given document. + fn bytes_added(index: usize, doc: &RawDocumentBuf) -> Result; + + /// Adds the given document to the collection. + fn push(&mut self, doc: RawDocumentBuf); + + /// Adds the collection of raw documents to the provided command. + fn add_to_command(self, identifier: &'static CStr, command: &mut Command); +} + +impl RawDocumentCollection for Vec { + fn bytes_added(_index: usize, doc: &RawDocumentBuf) -> Result { + Ok(doc.as_bytes().len()) + } + + fn push(&mut self, doc: RawDocumentBuf) { + self.push(doc); + } + + fn add_to_command(self, identifier: &'static CStr, command: &mut Command) { + command.add_document_sequence(identifier, self); + } +} + +impl RawDocumentCollection for RawArrayBuf { + fn bytes_added(index: usize, doc: &RawDocumentBuf) -> Result { + array_entry_size_bytes(index, doc.as_bytes().len()) + } + + fn push(&mut self, doc: RawDocumentBuf) { + self.push(doc); + } + + fn add_to_command(self, identifier: &'static CStr, command: &mut Command) { + command.body.append(identifier, self); + } +} + #[cfg(test)] mod test { use crate::bson_util::num_decimal_digits; diff --git a/src/client/csfle.rs b/src/client/csfle.rs index 059f5f589..79736c33a 100644 --- a/src/client/csfle.rs +++ b/src/client/csfle.rs @@ -100,7 +100,8 @@ impl ClientState { .kms_providers(&opts.kms_providers.credentials_doc()?)? .use_need_kms_credentials_state() .retry_kms(true)? - .use_range_v2()?; + .use_range_v2()? + .use_need_mongo_collinfo_with_db_state(); if let Some(m) = &opts.schema_map { builder = builder.schema_map(&crate::bson_compat::serialize_to_document(m)?)?; } diff --git a/src/client/csfle/state_machine.rs b/src/client/csfle/state_machine.rs index d8fc8e225..1d85b4a17 100644 --- a/src/client/csfle/state_machine.rs +++ b/src/client/csfle/state_machine.rs @@ -5,10 +5,7 @@ use std::{ time::Duration, }; -use crate::{ - bson::{rawdoc, Document, RawDocument, RawDocumentBuf}, - bson_compat::{cstr, CString}, -}; +use crate::bson::{rawdoc, Document, RawDocument, RawDocumentBuf}; use futures_util::{stream, TryStreamExt}; use mongocrypt::ctx::{Ctx, KmsCtx, KmsProviderType, State}; use rayon::ThreadPool; @@ -95,6 +92,13 @@ impl CryptExecutor { self.mongocryptd_client.is_some() } + fn metadata_client(&self, state: &State) -> Result { + self.metadata_client + .as_ref() + .and_then(|w| w.upgrade()) + .ok_or_else(|| Error::internal(format!("metadata client required for {state:?}"))) + } + pub(crate) async fn run_ctx(&self, ctx: Ctx, db: Option<&str>) -> Result { let mut result = None; // This needs to be a `Result` so that the `Ctx` can be temporarily owned by the processing @@ -104,16 +108,10 @@ impl CryptExecutor { loop { let state = result_ref(&ctx)?.state()?; match state { - State::NeedMongoCollinfo => { + State::NeedMongoCollinfo | State::NeedMongoCollinfoWithDb => { let ctx = result_mut(&mut ctx)?; let filter = raw_to_doc(ctx.mongo_op()?)?; - let metadata_client = self - .metadata_client - .as_ref() - .and_then(|w| w.upgrade()) - .ok_or_else(|| { - Error::internal("metadata_client required for NeedMongoCollinfo state") - })?; + let metadata_client = self.metadata_client(&state)?; let db = metadata_client.database(db.as_ref().ok_or_else(|| { Error::internal("db required for NeedMongoCollinfo state") })?); @@ -245,7 +243,9 @@ impl CryptExecutor { continue; } - let prov_name: CString = provider.as_string().try_into()?; + #[cfg(any(feature = "aws-auth", feature = "azure-kms"))] + let prov_name: crate::bson_compat::CString = + provider.as_string().try_into()?; match provider.provider_type() { KmsProviderType::Aws => { #[cfg(feature = "aws-auth")] @@ -263,7 +263,10 @@ impl CryptExecutor { "secretAccessKey": aws_creds.secret_access_key().to_string(), }; if let Some(token) = aws_creds.session_token() { - creds.append(cstr!("sessionToken"), token); + creds.append( + crate::bson_compat::cstr!("sessionToken"), + token, + ); } kms_providers.append(prov_name, creds); } @@ -326,7 +329,7 @@ impl CryptExecutor { .await .map_err(|e| kms_error(e.to_string()))?; kms_providers.append( - cstr!("gcp"), + crate::bson_compat::cstr!("gcp"), rawdoc! { "accessToken": response.access_token }, ); } diff --git a/src/client/options/bulk_write.rs b/src/client/options/bulk_write.rs index 65d59c826..08994c525 100644 --- a/src/client/options/bulk_write.rs +++ b/src/client/options/bulk_write.rs @@ -7,9 +7,14 @@ use typed_builder::TypedBuilder; use crate::{ bson::{rawdoc, Array, Bson, Document, RawDocumentBuf}, - bson_compat::cstr, - bson_util::{get_or_prepend_id_field, replacement_document_check, update_document_check}, - error::Result, + bson_compat::{cstr, serialize_to_raw_document_buf}, + bson_util::{ + extend_raw_document_buf, + get_or_prepend_id_field, + replacement_document_check, + update_document_check, + }, + error::{Error, Result}, options::{UpdateModifications, WriteConcern}, serde_util::{serialize_bool_or_true, write_concern_is_empty}, Collection, @@ -371,9 +376,15 @@ impl WriteModel { } } - /// Returns the operation-specific fields that should be included in this model's entry in the - /// ops array. Also returns an inserted ID if this is an insert operation. - pub(crate) fn get_ops_document_contents(&self) -> Result<(RawDocumentBuf, Option)> { + /// Constructs the ops document for this write model given the nsInfo array index. + pub(crate) fn get_ops_document( + &self, + ns_info_index: usize, + ) -> Result<(RawDocumentBuf, Option)> { + let index = i32::try_from(ns_info_index) + .map_err(|_| Error::internal("nsInfo index exceeds i32::MAX"))?; + let mut ops_document = rawdoc! { self.operation_name(): index }; + if let Self::UpdateOne(UpdateOneModel { update, .. }) | Self::UpdateMany(UpdateManyModel { update, .. }) = self { @@ -384,22 +395,19 @@ impl WriteModel { replacement_document_check(replacement)?; } - let (mut model_document, inserted_id) = match self { - Self::InsertOne(model) => { - let mut insert_document = RawDocumentBuf::try_from(&model.document)?; - let inserted_id = get_or_prepend_id_field(&mut insert_document)?; - (rawdoc! { "document": insert_document }, Some(inserted_id)) - } - _ => { - let model_document = crate::bson_compat::serialize_to_raw_document_buf(&self)?; - (model_document, None) - } - }; - if let Some(multi) = self.multi() { - model_document.append(cstr!("multi"), multi); + ops_document.append(cstr!("multi"), multi); } - Ok((model_document, inserted_id)) + if let Self::InsertOne(model) = self { + let mut insert_document = RawDocumentBuf::try_from(&model.document)?; + let inserted_id = get_or_prepend_id_field(&mut insert_document)?; + ops_document.append(cstr!("document"), insert_document); + Ok((ops_document, Some(inserted_id))) + } else { + let model = serialize_to_raw_document_buf(&self)?; + extend_raw_document_buf(&mut ops_document, model)?; + Ok((ops_document, None)) + } } } diff --git a/src/error.rs b/src/error.rs index 6aec05dec..5e389f81d 100644 --- a/src/error.rs +++ b/src/error.rs @@ -50,8 +50,14 @@ pub type Result = std::result::Result; /// [`ErrorKind`](enum.ErrorKind.html) is wrapped in an `Box` to allow the errors to be /// cloned. #[derive(Clone, Debug, Error)] -#[cfg_attr(test, error("Kind: {kind}, labels: {labels:?}, backtrace: {bt}"))] -#[cfg_attr(not(test), error("Kind: {kind}, labels: {labels:?}"))] +#[cfg_attr( + test, + error("Kind: {kind}, labels: {labels:?}, source: {source:?}, backtrace: {bt}") +)] +#[cfg_attr( + not(test), + error("Kind: {kind}, labels: {labels:?}, source: {source:?}") +)] #[non_exhaustive] pub struct Error { /// The type of error that occurred. diff --git a/src/operation/bulk_write.rs b/src/operation/bulk_write.rs index 616c7b9f4..810308879 100644 --- a/src/operation/bulk_write.rs +++ b/src/operation/bulk_write.rs @@ -6,14 +6,19 @@ use futures_core::TryStream; use futures_util::{FutureExt, TryStreamExt}; use crate::{ - bson::{rawdoc, Bson, RawDocumentBuf}, + bson::{rawdoc, Bson, RawArrayBuf, RawDocumentBuf}, bson_compat::{cstr, CStr}, - bson_util::{self, extend_raw_document_buf}, + bson_util::{self, RawDocumentCollection}, checked::Checked, cmap::{Command, RawCommandResponse, StreamDescription}, cursor::CursorSpecification, error::{BulkWriteError, Error, ErrorKind, Result}, - operation::{run_command::RunCommand, GetMore, OperationWithDefaults}, + operation::{ + run_command::RunCommand, + GetMore, + OperationWithDefaults, + MAX_ENCRYPTED_WRITE_SIZE, + }, options::{BulkWriteOptions, OperationType, WriteModel}, results::{BulkWriteResult, DeleteResult, InsertOneResult, UpdateResult}, BoxFuture, @@ -33,11 +38,15 @@ use super::{ use server_responses::*; +const NS_INFO: &CStr = cstr!("nsInfo"); +const OPS: &CStr = cstr!("ops"); + pub(crate) struct BulkWrite<'a, R> where R: BulkWriteResult, { client: Client, + encrypted: bool, models: &'a [WriteModel], offset: usize, options: Option<&'a BulkWriteOptions>, @@ -52,14 +61,16 @@ impl<'a, R> BulkWrite<'a, R> where R: BulkWriteResult, { - pub(crate) fn new( + pub(crate) async fn new( client: Client, models: &'a [WriteModel], offset: usize, options: Option<&'a BulkWriteOptions>, ) -> BulkWrite<'a, R> { + let encrypted = client.should_auto_encrypt().await; Self { client, + encrypted, models, offset, options, @@ -222,39 +233,104 @@ where .and_then(|options| options.ordered) .unwrap_or(true) } + + fn batch_split_models( + &mut self, + command_body: RawDocumentBuf, + max_size: usize, + max_operations: usize, + max_bson_object_size: usize, + ) -> Result { + // For single-batch writes, ignore the lower maximum size defined by + // MAX_ENCRYPTED_WRITE_SIZE. + let first_write_max_encrypted_size = max_bson_object_size - command_body.as_bytes().len(); + + let mut namespace_info: NamespaceInfo = NamespaceInfo::new(); + let mut ops = T::default(); + let mut current_size = Checked::new(0); + + for (i, model) in self.models.iter().take(max_operations).enumerate() { + let (namespace_index, namespace_size) = + namespace_info.get_index_and_size(model.namespace())?; + let (model_document, inserted_id) = model.get_ops_document(namespace_index)?; + + let operation_size = T::bytes_added(i, &model_document)?; + current_size += namespace_size + operation_size; + let current_size = current_size.get()?; + + if current_size <= max_size + || self.encrypted && i == 0 && current_size <= first_write_max_encrypted_size + { + self.n_attempted += 1; + if let Some(inserted_id) = inserted_id { + self.inserted_ids.insert(self.offset + i, inserted_id); + } + namespace_info.add_pending(); + ops.push(model_document); + } else { + break; + } + } + + if self.n_attempted == 0 { + return Err(Error::invalid_argument(format!( + "operation at index {} exceeds the maximum size", + self.offset + ))); + } + + let mut command = Command::new(Self::NAME, "admin", command_body); + namespace_info + .namespaces + .add_to_command(NS_INFO, &mut command); + ops.add_to_command(OPS, &mut command); + Ok(command) + } } /// A helper struct for tracking namespace information. -struct NamespaceInfo<'a> { - namespaces: Vec, +struct NamespaceInfo<'a, T: RawDocumentCollection> { + namespaces: T, + pending_namespace: Option, // Cache the namespaces and their indexes to avoid traversing the namespaces array each time a // namespace is looked up or added. cache: HashMap<&'a Namespace, usize>, } -impl<'a> NamespaceInfo<'a> { +impl<'a, T> NamespaceInfo<'a, T> +where + T: RawDocumentCollection, +{ fn new() -> Self { Self { - namespaces: Vec::new(), + namespaces: Default::default(), + pending_namespace: None, cache: HashMap::new(), } } - /// Gets the index for the given namespace in the nsInfo list, adding it to the list if it is - /// not already present. - fn get_index(&mut self, namespace: &'a Namespace) -> (usize, usize) { + /// Gets the index for the given namespace in the nsInfo list and the number of bytes it would + /// add to the nsInfo list. Stores the namespace as a pending entry. + fn get_index_and_size(&mut self, namespace: &'a Namespace) -> Result<(usize, usize)> { match self.cache.get(namespace) { - Some(index) => (*index, 0), + Some(index) => Ok((*index, 0)), None => { let namespace_doc = rawdoc! { "ns": namespace.to_string() }; - let length_added = namespace_doc.as_bytes().len(); - self.namespaces.push(namespace_doc); - let next_index = self.cache.len(); - self.cache.insert(namespace, next_index); - (next_index, length_added) + let index = self.cache.len(); + let bytes_added = T::bytes_added(index, &namespace_doc)?; + self.pending_namespace = Some(namespace_doc); + self.cache.insert(namespace, index); + Ok((index, bytes_added)) } } } + + /// Adds the pending namespace to the list, if any. + fn add_pending(&mut self) { + if let Some(pending) = self.pending_namespace.take() { + self.namespaces.push(pending); + } + } } impl OperationWithDefaults for BulkWrite<'_, R> @@ -273,9 +349,9 @@ where .into()); } - let max_message_size: usize = - Checked::new(description.max_message_size_bytes).try_into()?; let max_operations: usize = Checked::new(description.max_write_batch_size).try_into()?; + let max_bson_object_size: usize = + Checked::new(description.max_bson_object_size).try_into()?; let mut command_body = rawdoc! { Self::NAME: 1 }; let mut options = match self.options { @@ -285,56 +361,30 @@ where options.append(cstr!("errorsOnly"), R::errors_only()); bson_util::extend_raw_document_buf(&mut command_body, options)?; - let max_document_sequences_size: usize = (Checked::new(max_message_size) - - OP_MSG_OVERHEAD_BYTES - - command_body.as_bytes().len()) - .try_into()?; - - let mut namespace_info = NamespaceInfo::new(); - let mut ops = Vec::new(); - let mut current_size = Checked::new(0); - for (i, model) in self.models.iter().take(max_operations).enumerate() { - let (namespace_index, namespace_size) = namespace_info.get_index(model.namespace()); - - let operation_namespace_index: i32 = Checked::new(namespace_index).try_into()?; - let mut operation = rawdoc! { model.operation_name(): operation_namespace_index }; - let (model_doc, inserted_id) = model.get_ops_document_contents()?; - extend_raw_document_buf(&mut operation, model_doc)?; - - let operation_size = operation.as_bytes().len(); - - current_size += namespace_size + operation_size; - if current_size.get()? > max_document_sequences_size { - // Remove the namespace doc from the list if one was added for this operation. - if namespace_size > 0 { - let last_index = namespace_info.namespaces.len() - 1; - namespace_info.namespaces.remove(last_index); - } - break; - } - - if let Some(inserted_id) = inserted_id { - self.inserted_ids.insert(i, inserted_id); - } - ops.push(operation); - } - - if ops.is_empty() { - return Err(ErrorKind::InvalidArgument { - message: format!( - "operation at index {} exceeds the maximum message size ({} bytes)", - self.offset, max_message_size - ), - } - .into()); + // Auto-encryption does not support document sequences. + if self.encrypted { + let max_size = + (Checked::new(MAX_ENCRYPTED_WRITE_SIZE) - command_body.as_bytes().len()).get()?; + self.batch_split_models::( + command_body, + max_size, + max_operations, + max_bson_object_size, + ) + } else { + let max_message_size: usize = + Checked::new(description.max_message_size_bytes).try_into()?; + let max_size = (Checked::new(max_message_size) + - OP_MSG_OVERHEAD_BYTES + - command_body.as_bytes().len()) + .get()?; + self.batch_split_models::>( + command_body, + max_size, + max_operations, + max_bson_object_size, + ) } - - self.n_attempted = ops.len(); - - let mut command = Command::new(Self::NAME, "admin", command_body); - command.add_document_sequence("nsInfo", namespace_info.namespaces); - command.add_document_sequence("ops", ops); - Ok(command) } fn handle_response_async<'b>( diff --git a/src/test/csfle/prose.rs b/src/test/csfle/prose.rs index aa6cf160b..41fbc2754 100644 --- a/src/test/csfle/prose.rs +++ b/src/test/csfle/prose.rs @@ -117,6 +117,8 @@ async fn custom_key_material() -> Result<()> { // Prose test 4. BSON Size Limits and Batch Splitting #[tokio::test] async fn bson_size_limits() -> Result<()> { + const STRING_LEN_2_MIB: usize = 2_097_152; + // Setup: db initialization. let (client, datakeys) = init_client().await?; client @@ -130,7 +132,7 @@ async fn bson_size_limits() -> Result<()> { // Setup: encrypted client. let mut opts = get_client_options().await.clone(); - let buffer = EventBuffer::::new(); + let mut buffer = EventBuffer::::new(); opts.command_event_handler = Some(buffer.handler()); let client_encrypted = @@ -142,23 +144,27 @@ async fn bson_size_limits() -> Result<()> { let coll = client_encrypted .database("db") .collection::("coll"); + let coll2 = client_encrypted + .database("db") + .collection::("coll2"); + coll2.drop().await?; // Tests // Test operation 1 coll.insert_one(doc! { "_id": "over_2mib_under_16mib", - "unencrypted": "a".repeat(2097152), + "unencrypted": "a".repeat(STRING_LEN_2_MIB), }) .await?; // Test operation 2 let mut doc: Document = load_testdata("limits/limits-doc.json")?; doc.insert("_id", "encryption_exceeds_2mib"); - doc.insert("unencrypted", "a".repeat(2_097_152 - 2_000)); + doc.insert("unencrypted", "a".repeat(STRING_LEN_2_MIB - 2_000)); coll.insert_one(doc).await?; // Test operation 3 - let value = "a".repeat(2_097_152); + let value = "a".repeat(STRING_LEN_2_MIB); let mut events = buffer.stream(); coll.insert_many(vec![ doc! { @@ -185,7 +191,7 @@ async fn bson_size_limits() -> Result<()> { // Test operation 4 let mut doc = load_testdata("limits/limits-doc.json")?; doc.insert("_id", "encryption_exceeds_2mib_1"); - doc.insert("unencrypted", "a".repeat(2_097_152 - 2_000)); + doc.insert("unencrypted", "a".repeat(STRING_LEN_2_MIB - 2_000)); let mut doc2 = doc.clone(); doc2.insert("_id", "encryption_exceeds_2mib_2"); let mut events = buffer.stream(); @@ -219,6 +225,42 @@ async fn bson_size_limits() -> Result<()> { "unexpected error: {err}" ); + // The remaining test operations use bulk_write. + if server_version_lt(8, 0).await { + return Ok(()); + } + + // Test operation 7 + let long_string = "a".repeat(STRING_LEN_2_MIB - 1_500); + let write_models = vec![ + coll2.insert_one_model(doc! { "_id": "over_2mib_3", "unencrypted": &long_string })?, + coll2.insert_one_model(doc! { "_id": "over_2mib_4", "unencrypted": &long_string })?, + ]; + client_encrypted.bulk_write(write_models).await?; + let bulk_write_events = buffer.get_command_started_events(&["bulkWrite"]); + assert_eq!(bulk_write_events.len(), 2); + + // Test operation 8 + buffer.clear_cached_events(); + let limits: Document = load_testdata("limits/limits-qe-doc.json")?; + let long_string = "a".repeat(STRING_LEN_2_MIB - 2_000 - 1_500); + + let mut doc1 = limits.clone(); + doc1.insert("_id", "encryption_exceeds_2mib_3"); + doc1.insert("foo", &long_string); + let write_model1 = coll2.insert_one_model(doc1)?; + + let mut doc2 = limits; + doc2.insert("_id", "encryption_exceeds_2mib_4"); + doc2.insert("foo", &long_string); + let write_model2 = coll2.insert_one_model(doc2)?; + + client_encrypted + .bulk_write(vec![write_model1, write_model2]) + .await?; + let bulk_write_events = buffer.get_command_started_events(&["bulkWrite"]); + assert_eq!(bulk_write_events.len(), 2); + Ok(()) } diff --git a/src/test/spec/json/client-side-encryption/unified/client-bulkWrite-qe.json b/src/test/spec/json/client-side-encryption/unified/client-bulkWrite-qe.json new file mode 100644 index 000000000..b73279340 --- /dev/null +++ b/src/test/spec/json/client-side-encryption/unified/client-bulkWrite-qe.json @@ -0,0 +1,298 @@ +{ + "description": "client bulkWrite with queryable encryption", + "schemaVersion": "1.23", + "runOnRequirements": [ + { + "minServerVersion": "8.0", + "serverless": "forbid", + "csfle": true + } + ], + "createEntities": [ + { + "client": { + "id": "client0", + "observeEvents": [ + "commandStartedEvent", + "commandSucceededEvent" + ], + "autoEncryptOpts": { + "keyVaultNamespace": "keyvault.datakeys", + "kmsProviders": { + "local": { + "key": "Mng0NCt4ZHVUYUJCa1kxNkVyNUR1QURhZ2h2UzR2d2RrZzh0cFBwM3R6NmdWMDFBMUN3YkQ5aXRRMkhGRGdQV09wOGVNYUMxT2k3NjZKelhaQmRCZGJkTXVyZG9uSjFk" + } + } + } + } + }, + { + "database": { + "id": "database0", + "client": "client0", + "databaseName": "crud-tests" + } + }, + { + "collection": { + "id": "collection0", + "database": "database0", + "collectionName": "coll0" + } + }, + { + "client": { + "id": "client1", + "observeEvents": [ + "commandStartedEvent" + ] + } + }, + { + "database": { + "id": "database1", + "client": "client0", + "databaseName": "keyvault" + } + }, + { + "collection": { + "id": "collection1", + "database": "database0", + "collectionName": "datakeys" + } + }, + { + "database": { + "id": "database2", + "client": "client1", + "databaseName": "crud-tests" + } + }, + { + "collection": { + "id": "collection2", + "database": "database2", + "collectionName": "coll0" + } + } + ], + "initialData": [ + { + "databaseName": "keyvault", + "collectionName": "datakeys", + "documents": [ + { + "_id": { + "$binary": { + "base64": "EjRWeBI0mHYSNBI0VniQEg==", + "subType": "04" + } + }, + "keyAltNames": [ + "local_key" + ], + "keyMaterial": { + "$binary": { + "base64": "sHe0kz57YW7v8g9VP9sf/+K1ex4JqKc5rf/URX3n3p8XdZ6+15uXPaSayC6adWbNxkFskuMCOifDoTT+rkqMtFkDclOy884RuGGtUysq3X7zkAWYTKi8QAfKkajvVbZl2y23UqgVasdQu3OVBQCrH/xY00nNAs/52e958nVjBuzQkSb1T8pKJAyjZsHJ60+FtnfafDZSTAIBJYn7UWBCwQ==", + "subType": "00" + } + }, + "creationDate": { + "$date": { + "$numberLong": "1641024000000" + } + }, + "updateDate": { + "$date": { + "$numberLong": "1641024000000" + } + }, + "status": 1, + "masterKey": { + "provider": "local" + } + } + ] + }, + { + "databaseName": "crud-tests", + "collectionName": "coll0", + "documents": [], + "createOptions": { + "encryptedFields": { + "fields": [ + { + "keyId": { + "$binary": { + "base64": "EjRWeBI0mHYSNBI0VniQEg==", + "subType": "04" + } + }, + "path": "encryptedInt", + "bsonType": "int", + "queries": { + "queryType": "equality", + "contention": { + "$numberLong": "0" + } + } + } + ] + } + } + } + ], + "_yamlAnchors": { + "namespace": "crud-tests.coll0" + }, + "tests": [ + { + "description": "client bulkWrite QE replaceOne", + "operations": [ + { + "object": "collection0", + "name": "insertMany", + "arguments": { + "documents": [ + { + "_id": 1, + "encryptedInt": 11 + }, + { + "_id": 2, + "encryptedInt": 22 + }, + { + "_id": 3, + "encryptedInt": 33 + } + ] + } + }, + { + "object": "client0", + "name": "clientBulkWrite", + "arguments": { + "models": [ + { + "replaceOne": { + "namespace": "crud-tests.coll0", + "filter": { + "encryptedInt": { + "$eq": 11 + } + }, + "replacement": { + "encryptedInt": 44 + } + } + } + ] + }, + "expectResult": { + "insertedCount": 0, + "upsertedCount": 0, + "matchedCount": 1, + "modifiedCount": 1, + "deletedCount": 0 + } + }, + { + "object": "collection0", + "name": "find", + "arguments": { + "filter": { + "encryptedInt": 44 + } + }, + "expectResult": [ + { + "_id": 1, + "encryptedInt": 44 + } + ] + }, + { + "object": "collection2", + "name": "find", + "arguments": { + "filter": {} + }, + "expectResult": [ + { + "_id": 1, + "encryptedInt": { + "$$type": "binData" + }, + "__safeContent__": { + "$$type": "array" + } + }, + { + "_id": 2, + "encryptedInt": { + "$$type": "binData" + }, + "__safeContent__": { + "$$type": "array" + } + }, + { + "_id": 3, + "encryptedInt": { + "$$type": "binData" + }, + "__safeContent__": { + "$$type": "array" + } + } + ] + } + ] + }, + { + "description": "client bulkWrite QE with multiple replace fails", + "operations": [ + { + "object": "client0", + "name": "clientBulkWrite", + "arguments": { + "models": [ + { + "replaceOne": { + "namespace": "crud-tests.coll0", + "filter": { + "encryptedInt": { + "$eq": 11 + } + }, + "replacement": { + "encryptedInt": 44 + } + } + }, + { + "replaceOne": { + "namespace": "crud-tests.coll0", + "filter": { + "encryptedInt": { + "$eq": 22 + } + }, + "replacement": { + "encryptedInt": 44 + } + } + } + ] + }, + "expectError": { + "isError": true, + "errorContains": "Only insert is supported in BulkWrite with multiple operations and Queryable Encryption" + } + } + ] + } + ] +} diff --git a/src/test/spec/json/testdata/client-side-encryption/limits/limits-qe-doc.json b/src/test/spec/json/testdata/client-side-encryption/limits/limits-qe-doc.json new file mode 100644 index 000000000..71efbf406 --- /dev/null +++ b/src/test/spec/json/testdata/client-side-encryption/limits/limits-qe-doc.json @@ -0,0 +1,3 @@ +{ + "foo": "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" +} \ No newline at end of file diff --git a/src/test/spec/unified_runner/operation.rs b/src/test/spec/unified_runner/operation.rs index 4a9ff6c18..9789d9aaa 100644 --- a/src/test/spec/unified_runner/operation.rs +++ b/src/test/spec/unified_runner/operation.rs @@ -217,7 +217,7 @@ impl Operation { let opt_entity = result.unwrap_or_else(|e| { panic!( "[{}] {} should succeed, but failed with the following error: {}", - description, self.name, e + description, self.name, e, ) }); if expected_value.is_some() || save_as_entity.is_some() { diff --git a/src/test/spec/unified_runner/test_runner.rs b/src/test/spec/unified_runner/test_runner.rs index 91cc1a105..9e0ddaf1e 100644 --- a/src/test/spec/unified_runner/test_runner.rs +++ b/src/test/spec/unified_runner/test_runner.rs @@ -532,6 +532,15 @@ impl TestRunner { if let Some(opts) = &client.auto_encrypt_opts { use crate::client::csfle::options::{AutoEncryptionOptions, KmsProviders}; + let mut extra_options = opts.extra_options.clone(); + if let Ok(val) = std::env::var("CRYPT_SHARED_LIB_PATH") { + if !val.is_empty() { + extra_options + .get_or_insert_default() + .insert("cryptSharedLibPath", val); + } + } + let real_opts = AutoEncryptionOptions { key_vault_client: None, key_vault_namespace: opts.key_vault_namespace.clone(), @@ -543,7 +552,7 @@ impl TestRunner { .unwrap(), schema_map: opts.schema_map.clone(), bypass_auto_encryption: opts.bypass_auto_encryption, - extra_options: opts.extra_options.clone(), + extra_options, encrypted_fields_map: opts.encrypted_fields_map.clone(), bypass_query_analysis: opts.bypass_query_analysis, disable_crypt_shared: None,