Skip to content

Commit 5b54bb6

Browse files
RUST-877 Delay replacement document serialization until Operation::build (#942)
1 parent 289443e commit 5b54bb6

File tree

13 files changed

+347
-909
lines changed

13 files changed

+347
-909
lines changed

src/bson_util.rs

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,8 @@ use std::{
33
io::{Read, Write},
44
};
55

6-
use bson::RawBsonRef;
7-
86
use crate::{
9-
bson::{Bson, Document},
7+
bson::{Bson, Document, RawArrayBuf, RawBson, RawBsonRef, RawDocumentBuf},
108
error::{ErrorKind, Result},
119
runtime::SyncLittleEndianRead,
1210
};
@@ -48,6 +46,14 @@ pub(crate) fn to_bson_array(docs: &[Document]) -> Bson {
4846
Bson::Array(docs.iter().map(|doc| Bson::Document(doc.clone())).collect())
4947
}
5048

49+
pub(crate) fn to_raw_bson_array(docs: &[Document]) -> Result<RawBson> {
50+
let mut array = RawArrayBuf::new();
51+
for doc in docs {
52+
array.push(RawDocumentBuf::from_document(doc)?);
53+
}
54+
Ok(RawBson::Array(array))
55+
}
56+
5157
#[cfg(test)]
5258
pub(crate) fn sort_document(document: &mut Document) {
5359
let temp = std::mem::take(document);
@@ -62,11 +68,11 @@ pub(crate) fn first_key(document: &Document) -> Option<&str> {
6268
document.keys().next().map(String::as_str)
6369
}
6470

65-
pub(crate) fn replacement_document_check(replacement: &Document) -> Result<()> {
66-
match first_key(replacement) {
67-
Some(s) if !s.starts_with('$') => Ok(()),
71+
pub(crate) fn replacement_raw_document_check(replacement: &RawDocumentBuf) -> Result<()> {
72+
match replacement.iter().next().transpose()? {
73+
Some((key, _)) if !key.starts_with('$') => Ok(()),
6874
_ => Err(ErrorKind::InvalidArgument {
69-
message: "replace document must have first key not starting with '$".to_string(),
75+
message: "replace document must have first key not starting with '$'".to_string(),
7076
}
7177
.into()),
7278
}
@@ -119,6 +125,17 @@ pub(crate) fn read_document_bytes<R: Read>(mut reader: R) -> Result<Vec<u8>> {
119125
Ok(bytes)
120126
}
121127

128+
pub(crate) fn extend_raw_document_buf(
129+
this: &mut RawDocumentBuf,
130+
other: RawDocumentBuf,
131+
) -> Result<()> {
132+
for result in other.iter() {
133+
let (k, v) = result?;
134+
this.append(k, v.to_raw_bson());
135+
}
136+
Ok(())
137+
}
138+
122139
#[cfg(test)]
123140
mod test {
124141
use crate::bson_util::num_decimal_digits;

src/cmap/conn/command.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@ use serde::{de::DeserializeOwned, Deserialize, Serialize};
33

44
use super::wire::Message;
55
use crate::{
6-
bson::Document,
6+
bson::{rawdoc, Document},
7+
bson_util::extend_raw_document_buf,
78
client::{options::ServerApi, ClusterTime, HELLO_COMMAND_NAMES, REDACTED_COMMANDS},
89
error::{Error, ErrorKind, Result},
910
hello::{HelloCommandResponse, HelloReply},
@@ -177,6 +178,19 @@ impl<T> Command<T> {
177178
}
178179
}
179180

181+
impl Command<RawDocumentBuf> {
182+
pub(crate) fn into_bson_bytes(mut self) -> Result<Vec<u8>> {
183+
let mut command = self.body;
184+
185+
// Clear the body of the command to avoid re-serializing.
186+
self.body = rawdoc! {};
187+
let rest_of_command = bson::to_raw_document_buf(&self)?;
188+
189+
extend_raw_document_buf(&mut command, rest_of_command)?;
190+
Ok(command.into_bytes())
191+
}
192+
}
193+
180194
#[derive(Debug, Clone)]
181195
pub(crate) struct RawCommandResponse {
182196
pub(crate) source: ServerAddress,

src/coll.rs

Lines changed: 29 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use serde::{
1515

1616
use self::options::*;
1717
use crate::{
18-
bson::{doc, to_document_with_options, Bson, Document, SerializerOptions},
18+
bson::{doc, Bson, Document},
1919
bson_util,
2020
change_stream::{
2121
event::ChangeStreamEvent,
@@ -753,7 +753,14 @@ impl<T> Collection<T> {
753753
let mut options = options.into();
754754
resolve_write_concern_with_session!(self, options, session.as_ref())?;
755755

756-
let update = Update::new(self.namespace(), query, update, true, options);
756+
let update = Update::with_update(
757+
self.namespace(),
758+
query,
759+
update,
760+
true,
761+
options,
762+
self.inner.human_readable_serialization,
763+
);
757764
self.client().execute_operation(update, session).await
758765
}
759766

@@ -806,7 +813,14 @@ impl<T> Collection<T> {
806813
let mut options = options.into();
807814
resolve_write_concern_with_session!(self, options, session.as_ref())?;
808815

809-
let update = Update::new(self.namespace(), query, update, false, options);
816+
let update = Update::with_update(
817+
self.namespace(),
818+
query,
819+
update,
820+
false,
821+
options,
822+
self.inner.human_readable_serialization,
823+
);
810824
self.client().execute_operation(update, session).await
811825
}
812826

@@ -1018,7 +1032,7 @@ where
10181032
let mut options = options.into();
10191033
resolve_write_concern_with_session!(self, options, session.as_ref())?;
10201034

1021-
let op = FindAndModify::<T>::with_delete(self.namespace(), filter, options);
1035+
let op = FindAndModify::with_delete(self.namespace(), filter, options);
10221036
self.client().execute_operation(op, session).await
10231037
}
10241038

@@ -1067,7 +1081,7 @@ where
10671081
let mut options = options.into();
10681082
resolve_write_concern_with_session!(self, options, session.as_ref())?;
10691083

1070-
let op = FindAndModify::<T>::with_update(self.namespace(), filter, update, options)?;
1084+
let op = FindAndModify::with_update(self.namespace(), filter, update, options)?;
10711085
self.client().execute_operation(op, session).await
10721086
}
10731087

@@ -1123,18 +1137,16 @@ where
11231137
session: impl Into<Option<&mut ClientSession>>,
11241138
) -> Result<Option<T>> {
11251139
let mut options = options.into();
1126-
let replacement = to_document_with_options(
1127-
replacement.borrow(),
1128-
SerializerOptions::builder()
1129-
.human_readable(self.inner.human_readable_serialization)
1130-
.build(),
1131-
)?;
1132-
11331140
let session = session.into();
1134-
11351141
resolve_write_concern_with_session!(self, options, session.as_ref())?;
11361142

1137-
let op = FindAndModify::<T>::with_replace(self.namespace(), filter, replacement, options)?;
1143+
let op = FindAndModify::with_replace(
1144+
self.namespace(),
1145+
filter,
1146+
replacement.borrow(),
1147+
options,
1148+
self.inner.human_readable_serialization,
1149+
)?;
11381150
self.client().execute_operation(op, session).await
11391151
}
11401152

@@ -1395,25 +1407,18 @@ where
13951407
session: impl Into<Option<&mut ClientSession>>,
13961408
) -> Result<UpdateResult> {
13971409
let mut options = options.into();
1398-
let replacement = to_document_with_options(
1399-
replacement.borrow(),
1400-
SerializerOptions::builder()
1401-
.human_readable(self.inner.human_readable_serialization)
1402-
.build(),
1403-
)?;
1404-
1405-
bson_util::replacement_document_check(&replacement)?;
14061410

14071411
let session = session.into();
14081412

14091413
resolve_write_concern_with_session!(self, options, session.as_ref())?;
14101414

1411-
let update = Update::new(
1415+
let update = Update::with_replace(
14121416
self.namespace(),
14131417
query,
1414-
UpdateModifications::Document(replacement),
1418+
replacement.borrow(),
14151419
false,
14161420
options.map(UpdateOptions::from_replace_options),
1421+
self.inner.human_readable_serialization,
14171422
);
14181423
self.client().execute_operation(update, session).await
14191424
}

src/coll/options.rs

Lines changed: 8 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
use std::time::Duration;
22

3-
use bson::serde_helpers;
43
use serde::{de::Error, Deserialize, Deserializer, Serialize, Serializer};
54
use serde_with::skip_serializing_none;
65
use typed_builder::TypedBuilder;
76

87
use crate::{
9-
bson::{doc, Bson, Document},
8+
bson::{doc, serde_helpers, Bson, Document, RawBson, RawDocumentBuf},
109
concern::{ReadConcern, WriteConcern},
10+
error::Result,
1111
options::Collation,
1212
selection_criteria::SelectionCriteria,
1313
serde_util,
@@ -63,7 +63,7 @@ impl<'de> Deserialize<'de> for ReturnDocument {
6363
}
6464

6565
/// Specifies the index to use for an operation.
66-
#[derive(Clone, Debug, Deserialize, Serialize)]
66+
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
6767
#[serde(untagged)]
6868
#[non_exhaustive]
6969
pub enum Hint {
@@ -74,11 +74,11 @@ pub enum Hint {
7474
}
7575

7676
impl Hint {
77-
pub(crate) fn to_bson(&self) -> Bson {
78-
match self {
79-
Hint::Keys(ref d) => Bson::Document(d.clone()),
80-
Hint::Name(ref s) => Bson::String(s.clone()),
81-
}
77+
pub(crate) fn to_raw_bson(&self) -> Result<RawBson> {
78+
Ok(match self {
79+
Hint::Keys(ref d) => RawBson::Document(RawDocumentBuf::from_document(d)?),
80+
Hint::Name(ref s) => RawBson::String(s.clone()),
81+
})
8282
}
8383
}
8484

@@ -174,17 +174,6 @@ pub enum UpdateModifications {
174174
Pipeline(Vec<Document>),
175175
}
176176

177-
impl UpdateModifications {
178-
pub(crate) fn to_bson(&self) -> Bson {
179-
match self {
180-
UpdateModifications::Document(ref d) => Bson::Document(d.clone()),
181-
UpdateModifications::Pipeline(ref p) => {
182-
Bson::Array(p.iter().map(|d| Bson::Document(d.clone())).collect())
183-
}
184-
}
185-
}
186-
}
187-
188177
impl From<Document> for UpdateModifications {
189178
fn from(item: Document) -> Self {
190179
UpdateModifications::Document(item)

src/operation.rs

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use serde::{de::DeserializeOwned, Deserialize, Serialize};
3232

3333
use crate::{
3434
bson::{self, Bson, Document},
35-
bson_util,
35+
bson_util::{self, extend_raw_document_buf},
3636
client::{ClusterTime, HELLO_COMMAND_NAMES, REDACTED_COMMANDS},
3737
cmap::{conn::PinnedConnectionHandle, Command, RawCommandResponse, StreamDescription},
3838
error::{
@@ -73,7 +73,7 @@ pub(crate) use list_indexes::ListIndexes;
7373
pub(crate) use raw_output::RawOutput;
7474
pub(crate) use run_command::RunCommand;
7575
pub(crate) use run_cursor_command::RunCursorCommand;
76-
pub(crate) use update::Update;
76+
pub(crate) use update::{Update, UpdateOrReplace};
7777

7878
const SERVER_4_2_0_WIRE_VERSION: i32 = 8;
7979
const SERVER_4_4_0_WIRE_VERSION: i32 = 9;
@@ -224,28 +224,28 @@ impl From<CommandErrorBody> for Error {
224224
}
225225
}
226226

227-
/// Appends a serializable struct to the input document.
228-
/// The serializable struct MUST serialize to a Document, otherwise an error will be thrown.
227+
/// Appends a serializable struct to the input document. The serializable struct MUST serialize to a
228+
/// Document; otherwise, an error will be thrown.
229229
pub(crate) fn append_options<T: Serialize + Debug>(
230230
doc: &mut Document,
231231
options: Option<&T>,
232232
) -> Result<()> {
233-
match options {
234-
Some(options) => {
235-
let temp_doc = bson::to_bson(options)?;
236-
match temp_doc {
237-
Bson::Document(d) => {
238-
doc.extend(d);
239-
Ok(())
240-
}
241-
_ => Err(ErrorKind::Internal {
242-
message: format!("options did not serialize to a Document: {:?}", options),
243-
}
244-
.into()),
245-
}
246-
}
247-
None => Ok(()),
233+
if let Some(options) = options {
234+
let options_doc = bson::to_document(options)?;
235+
doc.extend(options_doc);
236+
}
237+
Ok(())
238+
}
239+
240+
pub(crate) fn append_options_to_raw_document<T: Serialize>(
241+
doc: &mut RawDocumentBuf,
242+
options: Option<&T>,
243+
) -> Result<()> {
244+
if let Some(options) = options {
245+
let options_raw_doc = bson::to_raw_document_buf(options)?;
246+
extend_raw_document_buf(doc, options_raw_doc)?;
248247
}
248+
Ok(())
249249
}
250250

251251
#[derive(Deserialize, Debug)]

0 commit comments

Comments
 (0)