Skip to content

Commit 7f03ec5

Browse files
authored
RUST-871 Serialize directly to BSON bytes in insert operations (#406)
1 parent 8edbf66 commit 7f03ec5

File tree

39 files changed

+763
-414
lines changed

39 files changed

+763
-414
lines changed

benchmarks/src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -444,7 +444,7 @@ fn parse_ids(matches: ArgMatches) -> Vec<bool> {
444444
ids
445445
}
446446

447-
#[cfg_attr(feature = "tokio-runtime", tokio::main)]
447+
#[cfg_attr(feature = "tokio-runtime", tokio::main(flavor = "current_thread"))]
448448
#[cfg_attr(feature = "async-std-runtime", async_std::main)]
449449
async fn main() {
450450
let matches = App::new("RustDriverBenchmark")

src/bson_util/mod.rs

Lines changed: 169 additions & 164 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,15 @@
1-
use std::{convert::TryFrom, io::Read, time::Duration};
1+
use std::{
2+
convert::{TryFrom, TryInto},
3+
io::{Read, Write},
4+
time::Duration,
5+
};
26

3-
use serde::{de::Error, ser, Deserialize, Deserializer, Serialize, Serializer};
7+
use bson::spec::ElementType;
8+
use serde::{de::Error as SerdeDeError, ser, Deserialize, Deserializer, Serialize, Serializer};
49

510
use crate::{
6-
bson::{doc, Binary, Bson, Document, JavaScriptCodeWithScope, Regex},
7-
error::{ErrorKind, Result},
11+
bson::{doc, Bson, Document},
12+
error::{Error, ErrorKind, Result},
813
runtime::{SyncLittleEndianRead, SyncLittleEndianWrite},
914
};
1015

@@ -164,128 +169,30 @@ where
164169
.ok_or_else(|| D::Error::custom(format!("could not deserialize u64 from {:?}", bson)))
165170
}
166171

167-
pub fn doc_size_bytes(doc: &Document) -> u64 {
168-
//
169-
// * i32 length prefix (4 bytes)
170-
// * for each element:
171-
// * type (1 byte)
172-
// * number of UTF-8 bytes in key
173-
// * null terminator for the key (1 byte)
174-
// * size of the value
175-
// * null terminator (1 byte)
176-
4 + doc
177-
.into_iter()
178-
.map(|(key, val)| 1 + key.len() as u64 + 1 + size_bytes(val))
179-
.sum::<u64>()
180-
+ 1
181-
}
182-
183-
pub fn size_bytes(val: &Bson) -> u64 {
184-
match val {
185-
Bson::Double(_) => 8,
186-
//
187-
// * length prefix (4 bytes)
188-
// * number of UTF-8 bytes
189-
// * null terminator (1 byte)
190-
Bson::String(s) => 4 + s.len() as u64 + 1,
191-
// An array is serialized as a document with the keys "0", "1", "2", etc., so the size of
192-
// an array is:
193-
//
194-
// * length prefix (4 bytes)
195-
// * for each element:
196-
// * type (1 byte)
197-
// * number of decimal digits in key
198-
// * null terminator for the key (1 byte)
199-
// * size of value
200-
// * null terminator (1 byte)
201-
Bson::Array(arr) => {
202-
4 + arr
203-
.iter()
204-
.enumerate()
205-
.map(|(i, val)| 1 + num_decimal_digits(i) + 1 + size_bytes(val))
206-
.sum::<u64>()
207-
+ 1
208-
}
209-
Bson::Document(doc) => doc_size_bytes(doc),
210-
Bson::Boolean(_) => 1,
211-
Bson::Null => 0,
212-
// for $pattern and $opts:
213-
// * number of UTF-8 bytes
214-
// * null terminator (1 byte)
215-
Bson::RegularExpression(Regex { pattern, options }) => {
216-
pattern.len() as u64 + 1 + options.len() as u64 + 1
217-
}
218-
//
219-
// * length prefix (4 bytes)
220-
// * number of UTF-8 bytes
221-
// * null terminator (1 byte)
222-
Bson::JavaScriptCode(code) => 4 + code.len() as u64 + 1,
223-
//
224-
// * i32 length prefix (4 bytes)
225-
// * i32 length prefix for code (4 bytes)
226-
// * number of UTF-8 bytes in code
227-
// * null terminator for code (1 byte)
228-
// * length of document
229-
Bson::JavaScriptCodeWithScope(JavaScriptCodeWithScope { code, scope }) => {
230-
4 + 4 + code.len() as u64 + 1 + doc_size_bytes(scope)
231-
}
232-
Bson::Int32(_) => 4,
233-
Bson::Int64(_) => 8,
234-
Bson::Timestamp(_) => 8,
235-
//
236-
// * i32 length prefix (4 bytes)
237-
// * subtype (1 byte)
238-
// * number of bytes
239-
Bson::Binary(Binary { bytes, .. }) => 4 + 1 + bytes.len() as u64,
240-
Bson::ObjectId(_) => 12,
241-
Bson::DateTime(_) => 8,
242-
//
243-
// * i32 length prefix (4 bytes)
244-
// * subtype (1 byte)
245-
// * number of UTF-8 bytes
246-
Bson::Symbol(s) => 4 + 1 + s.len() as u64,
247-
Bson::Decimal128(..) => 128 / 8,
248-
Bson::Undefined | Bson::MaxKey | Bson::MinKey => 0,
249-
// DbPointer doesn't have public details exposed by the BSON library, but it comprises of a
250-
// namespace and an ObjectId. Since our methods to calculate the size of BSON values are
251-
// only used to estimate the cutoff for batches when making a large insert, we can just
252-
// assume the largest possible size for a namespace, which is 120 bytes. Therefore, the size
253-
// is:
254-
//
255-
// * i32 length prefix (4 bytes)
256-
// * namespace (120 bytes)
257-
// * null terminator (1 byte)
258-
// * objectid (12 bytes)
259-
Bson::DbPointer(..) => 4 + 120 + 1 + 12,
260-
}
261-
}
262-
263172
/// The size in bytes of the provided document's entry in a BSON array at the given index.
264-
pub(crate) fn array_entry_size_bytes(index: usize, doc: &Document) -> u64 {
173+
pub(crate) fn array_entry_size_bytes(index: usize, doc_len: usize) -> u64 {
265174
//
266175
// * type (1 byte)
267176
// * number of decimal digits in key
268177
// * null terminator for the key (1 byte)
269178
// * size of value
270-
1 + num_decimal_digits(index) + 1 + doc_size_bytes(doc)
179+
180+
1 + num_decimal_digits(index) + 1 + doc_len as u64
271181
}
272182

273183
/// The number of digits in `n` in base 10.
274184
/// Useful for calculating the size of an array entry in BSON.
275-
fn num_decimal_digits(n: usize) -> u64 {
276-
let mut digits = 1;
277-
let mut curr = 10;
278-
279-
while curr < n {
280-
curr = match curr.checked_mul(10) {
281-
Some(val) => val,
282-
None => break,
283-
};
185+
fn num_decimal_digits(mut n: usize) -> u64 {
186+
let mut digits = 0;
284187

188+
loop {
189+
n /= 10;
285190
digits += 1;
286-
}
287191

288-
digits
192+
if n == 0 {
193+
return digits;
194+
}
195+
}
289196
}
290197

291198
/// Read a document's raw BSON bytes from the provided reader.
@@ -300,63 +207,161 @@ pub(crate) fn read_document_bytes<R: Read>(mut reader: R) -> Result<Vec<u8>> {
300207
Ok(bytes)
301208
}
302209

303-
/// Serialize the document to raw BSON and return a vec containing the bytes.
304-
#[cfg(test)]
305-
pub(crate) fn document_to_vec(doc: Document) -> Result<Vec<u8>> {
306-
let mut v = Vec::new();
307-
doc.to_writer(&mut v)?;
308-
Ok(v)
210+
/// Get the value for the provided key from a buffer containing a BSON document.
211+
/// If the key is not present, None will be returned.
212+
/// If the BSON is not properly formatted, an internal error would be returned.
213+
///
214+
/// TODO: RUST-924 replace this with raw document API usage.
215+
pub(crate) fn raw_get(doc: &[u8], key: &str) -> Result<Option<Bson>> {
216+
fn read_i32(reader: &mut std::io::Cursor<&[u8]>) -> Result<i32> {
217+
reader.read_i32().map_err(deserialize_error)
218+
}
219+
220+
fn read_u8(reader: &mut std::io::Cursor<&[u8]>) -> Result<u8> {
221+
reader.read_u8().map_err(deserialize_error)
222+
}
223+
224+
fn deserialize_error<T: std::error::Error>(_e: T) -> Error {
225+
deserialize_error_no_arg()
226+
}
227+
228+
fn deserialize_error_no_arg() -> Error {
229+
Error::from(ErrorKind::Internal {
230+
message: "failed to read from serialized document".to_string(),
231+
})
232+
}
233+
234+
let mut reader = std::io::Cursor::new(doc);
235+
let len: u64 = read_i32(&mut reader)?
236+
.try_into()
237+
.map_err(deserialize_error)?;
238+
239+
while reader.position() < len {
240+
let element_start: usize = reader.position().try_into().map_err(deserialize_error)?;
241+
242+
// read the element type
243+
let tag = read_u8(&mut reader)?;
244+
245+
// check if we reached the end of the document
246+
if tag == 0 && reader.position() == len {
247+
return Ok(None);
248+
}
249+
250+
let element_type = ElementType::from(tag).ok_or_else(deserialize_error_no_arg)?;
251+
252+
// walk through the document until a null byte is encountered
253+
while read_u8(&mut reader)? != 0 {
254+
if reader.position() >= len {
255+
return Err(deserialize_error_no_arg());
256+
}
257+
}
258+
259+
// parse the key
260+
let string_end: usize = reader
261+
.position()
262+
.checked_sub(1) // back from null byte
263+
.and_then(|u| usize::try_from(u).ok())
264+
.ok_or_else(deserialize_error_no_arg)?;
265+
let slice = &reader.get_ref()[(element_start + 1)..string_end];
266+
let k = std::str::from_utf8(slice).map_err(deserialize_error)?;
267+
268+
// move to the end of the element
269+
let skip_len = match element_type {
270+
ElementType::Array
271+
| ElementType::EmbeddedDocument
272+
| ElementType::JavaScriptCodeWithScope => {
273+
let l = read_i32(&mut reader)?;
274+
// length includes the 4 bytes for the length, so subtrack them out
275+
l.checked_sub(4).ok_or_else(deserialize_error_no_arg)?
276+
}
277+
ElementType::Binary => read_i32(&mut reader)?
278+
.checked_add(1) // add one for subtype
279+
.ok_or_else(deserialize_error_no_arg)?,
280+
ElementType::Int32 => 4,
281+
ElementType::Int64 => 8,
282+
ElementType::String | ElementType::Symbol | ElementType::JavaScriptCode => {
283+
read_i32(&mut reader)?
284+
}
285+
ElementType::Boolean => 1,
286+
ElementType::Double => 8,
287+
ElementType::Timestamp => 8,
288+
ElementType::Decimal128 => 16,
289+
ElementType::MinKey
290+
| ElementType::MaxKey
291+
| ElementType::Null
292+
| ElementType::Undefined => 0,
293+
ElementType::DateTime => 8,
294+
ElementType::ObjectId => 12,
295+
ElementType::DbPointer => read_i32(&mut reader)?
296+
.checked_add(12) // add 12 for objectid
297+
.ok_or_else(deserialize_error_no_arg)?,
298+
ElementType::RegularExpression => {
299+
// read two cstr's
300+
for _i in 0..2 {
301+
while read_u8(&mut reader)? != 0 {
302+
if reader.position() >= len {
303+
return Err(deserialize_error_no_arg());
304+
}
305+
}
306+
}
307+
308+
0 // don't need to skip anymore since we already read the whole value
309+
}
310+
};
311+
let skip_len: u64 = skip_len.try_into().map_err(deserialize_error)?;
312+
reader.set_position(
313+
reader
314+
.position()
315+
.checked_add(skip_len)
316+
.ok_or_else(deserialize_error_no_arg)?,
317+
);
318+
319+
if k == key {
320+
// if this is the element we're looking for, extract it.
321+
let element_end: usize = reader.position().try_into().map_err(deserialize_error)?;
322+
let element_slice = &reader.get_ref()[element_start..element_end];
323+
let element_length: i32 = element_slice.len().try_into().map_err(deserialize_error)?;
324+
325+
// create a new temporary document which just has the element we want and grab the value
326+
let mut temp_doc = Vec::new();
327+
328+
// write the document length
329+
let temp_len: i32 = element_length
330+
.checked_add(4 + 1)
331+
.ok_or_else(deserialize_error_no_arg)?;
332+
temp_doc
333+
.write_all(&temp_len.to_le_bytes())
334+
.map_err(deserialize_error)?;
335+
336+
// add in the element
337+
temp_doc.extend(element_slice);
338+
339+
// write the null byte
340+
temp_doc.push(0);
341+
342+
let d = Document::from_reader(temp_doc.as_slice()).map_err(deserialize_error)?;
343+
return Ok(Some(
344+
d.get("_id").ok_or_else(deserialize_error_no_arg)?.clone(),
345+
));
346+
}
347+
}
348+
349+
// read all bytes but didn't reach null byte
350+
Err(deserialize_error_no_arg())
309351
}
310352

311353
#[cfg(test)]
312354
mod test {
313-
use crate::bson::{
314-
doc,
315-
oid::ObjectId,
316-
spec::BinarySubtype,
317-
Binary,
318-
Bson,
319-
DateTime,
320-
JavaScriptCodeWithScope,
321-
Regex,
322-
Timestamp,
323-
};
324-
325-
use super::doc_size_bytes;
355+
use crate::bson_util::num_decimal_digits;
326356

327357
#[cfg_attr(feature = "tokio-runtime", tokio::test)]
328358
#[cfg_attr(feature = "async-std-runtime", async_std::test)]
329-
async fn doc_size_bytes_eq_serialized_size_bytes() {
330-
let doc = doc! {
331-
"double": -12.3,
332-
"string": "foo",
333-
"array": ["foobar", -7, Bson::Null, Bson::Timestamp(Timestamp { time: 12345, increment: 67890 }), false],
334-
"document": {
335-
"x": 1,
336-
"yyz": "Rush is one of the greatest bands of all time",
337-
},
338-
"bool": true,
339-
"null": Bson::Null,
340-
"regex": Bson::RegularExpression(Regex { pattern: "foobar".into(), options: "i".into() }),
341-
"code": Bson::JavaScriptCode("foo(x) { return x + 1; }".into()),
342-
"code with scope": Bson::JavaScriptCodeWithScope(JavaScriptCodeWithScope {
343-
code: "foo(x) { return x + y; }".into(),
344-
scope: doc! { "y": -17 },
345-
}),
346-
"i32": 12i32,
347-
"i64": -126i64,
348-
"timestamp": Bson::Timestamp(Timestamp { time: 12233, increment: 34444 }),
349-
"binary": Bson::Binary(Binary{ subtype: BinarySubtype::Generic, bytes: vec![3, 222, 11] }),
350-
"objectid": ObjectId::from_bytes([1; 12]),
351-
"datetime": DateTime::from_millis(4444333221),
352-
"symbol": Bson::Symbol("foobar".into()),
353-
};
354-
355-
let size_bytes = doc_size_bytes(&doc);
356-
357-
let mut serialized_bytes = Vec::new();
358-
doc.to_writer(&mut serialized_bytes).unwrap();
359-
360-
assert_eq!(size_bytes, serialized_bytes.len() as u64);
359+
async fn num_digits() {
360+
assert_eq!(num_decimal_digits(0), 1);
361+
assert_eq!(num_decimal_digits(1), 1);
362+
assert_eq!(num_decimal_digits(10), 2);
363+
assert_eq!(num_decimal_digits(15), 2);
364+
assert_eq!(num_decimal_digits(100), 3);
365+
assert_eq!(num_decimal_digits(125), 3);
361366
}
362367
}

0 commit comments

Comments
 (0)