Skip to content

Commit 7996d81

Browse files
authored
RUST-246 Batch large inserts (#106)
1 parent 7c0b5e8 commit 7996d81

File tree

6 files changed

+499
-6
lines changed

6 files changed

+499
-6
lines changed

src/bson_util.rs

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,3 +107,146 @@ pub(crate) fn serialize_batch_size<S: Serializer>(
107107
)),
108108
}
109109
}
110+
111+
pub fn doc_size_bytes(doc: &Document) -> usize {
112+
//
113+
// * i32 length prefix (4 bytes)
114+
// * for each element:
115+
// * type (1 byte)
116+
// * number of UTF-8 bytes in key
117+
// * null terminator for the key (1 byte)
118+
// * size of the value
119+
// * null terminator (1 byte)
120+
4 + doc
121+
.into_iter()
122+
.map(|(key, val)| 1 + key.len() + 1 + size_bytes(val))
123+
.sum::<usize>()
124+
+ 1
125+
}
126+
127+
pub fn size_bytes(val: &Bson) -> usize {
128+
match val {
129+
Bson::FloatingPoint(_) => 8,
130+
//
131+
// * length prefix (4 bytes)
132+
// * number of UTF-8 bytes
133+
// * null terminator (1 byte)
134+
Bson::String(s) => 4 + s.len() + 1,
135+
// An array is serialized as a document with the keys "0", "1", "2", etc., so the size of
136+
// an array is:
137+
//
138+
// * length prefix (4 bytes)
139+
// * for each element:
140+
// * type (1 byte)
141+
// * number of decimal digits in key
142+
// * null terminator for the key (1 byte)
143+
// * size of value
144+
// * null terminator (1 byte)
145+
Bson::Array(arr) => {
146+
4 + arr
147+
.iter()
148+
.enumerate()
149+
.map(|(i, val)| 1 + num_decimal_digits(i) + 1 + size_bytes(val))
150+
.sum::<usize>()
151+
+ 1
152+
}
153+
Bson::Document(doc) => doc_size_bytes(doc),
154+
Bson::Boolean(_) => 1,
155+
Bson::Null => 0,
156+
// for $pattern and $opts:
157+
// * number of UTF-8 bytes
158+
// * null terminator (1 byte)
159+
Bson::RegExp(pattern, opts) => pattern.len() + 1 + opts.len() + 1,
160+
//
161+
// * length prefix (4 bytes)
162+
// * number of UTF-8 bytes
163+
// * null terminator (1 byte)
164+
Bson::JavaScriptCode(code) => 4 + code.len() + 1,
165+
//
166+
// * i32 length prefix (4 bytes)
167+
// * i32 length prefix for code (4 bytes)
168+
// * number of UTF-8 bytes in code
169+
// * null terminator for code (1 byte)
170+
// * length of document
171+
Bson::JavaScriptCodeWithScope(code, scope) => {
172+
4 + 4 + code.len() + 1 + doc_size_bytes(scope)
173+
}
174+
Bson::I32(_) => 4,
175+
Bson::I64(_) => 8,
176+
Bson::TimeStamp(_) => 8,
177+
//
178+
// * i32 length prefix (4 bytes)
179+
// * subtype (1 byte)
180+
// * number of bytes
181+
Bson::Binary(_, bytes) => 4 + 1 + bytes.len(),
182+
Bson::ObjectId(_) => 12,
183+
Bson::UtcDatetime(_) => 8,
184+
//
185+
// * i32 length prefix (4 bytes)
186+
// * subtype (1 byte)
187+
// * number of UTF-8 bytes
188+
Bson::Symbol(s) => 4 + 1 + s.len(),
189+
}
190+
}
191+
192+
fn num_decimal_digits(n: usize) -> usize {
193+
let mut digits = 1;
194+
let mut curr = 10;
195+
196+
while curr < n {
197+
curr = match curr.checked_mul(10) {
198+
Some(val) => val,
199+
None => break,
200+
};
201+
202+
digits += 1;
203+
}
204+
205+
digits
206+
}
207+
208+
#[cfg(test)]
209+
mod test {
210+
use bson::{bson, doc, oid::ObjectId, spec::BinarySubtype, Bson};
211+
use chrono::{DateTime, NaiveDateTime, Utc};
212+
213+
use super::doc_size_bytes;
214+
215+
#[test]
216+
fn doc_size_bytes_eq_serialized_size_bytes() {
217+
let doc = doc! {
218+
"double": -12.3,
219+
"string": "foo",
220+
"array": ["foobar", -7, Bson::Null, Bson::TimeStamp(1278), false],
221+
"document": {
222+
"x": 1,
223+
"yyz": "Rush is one of the greatest bands of all time",
224+
},
225+
"bool": true,
226+
"null": Bson::Null,
227+
"regex": Bson::RegExp("foobar".into(), "i".into()),
228+
"code": Bson::JavaScriptCode("foo(x) { return x + 1; }".into()),
229+
"code with scope": Bson::JavaScriptCodeWithScope(
230+
"foo(x) { return x + y; }".into(),
231+
doc! { "y": -17 },
232+
),
233+
"i32": 12i32,
234+
"i64": -126i64,
235+
"timestamp": Bson::TimeStamp(1223334444),
236+
"binary": Bson::Binary(BinarySubtype::Generic, vec![3, 222, 11]),
237+
"objectid": ObjectId::with_bytes([1; 12]),
238+
"datetime": DateTime::from_utc(
239+
NaiveDateTime::from_timestamp(4444333221, 0),
240+
Utc,
241+
),
242+
"symbol": Bson::Symbol("foobar".into()),
243+
};
244+
245+
let size_bytes = doc_size_bytes(&doc);
246+
247+
let mut serialized_bytes = Vec::new();
248+
bson::encode_document(&mut serialized_bytes, &doc).unwrap();
249+
250+
assert_eq!(size_bytes, serialized_bytes.len());
251+
}
252+
}

src/coll/batch.rs

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
// Splits off elements from `all` so that the sum of sizes in `all` is not greater than
2+
// `max_batch_size`. Any remaining elements will be returned in a separate vector.
3+
pub(crate) fn split_off_batch<T>(
4+
all: &mut Vec<T>,
5+
max_batch_size: usize,
6+
get_size: impl Fn(&T) -> usize,
7+
) -> Vec<T> {
8+
if all.is_empty() {
9+
return Vec::new();
10+
}
11+
12+
let mut batch_size = get_size(&all[0]);
13+
14+
for i in 1..all.len() {
15+
let elem_size = get_size(&all[i]);
16+
17+
if batch_size + elem_size > max_batch_size {
18+
return all.split_off(i);
19+
}
20+
21+
batch_size += elem_size;
22+
}
23+
24+
Vec::new()
25+
}
26+
27+
#[cfg(test)]
28+
mod test {
29+
use super::split_off_batch;
30+
31+
#[test]
32+
fn split_empty_batch() {
33+
let mut all: Vec<i32> = Vec::new();
34+
35+
assert!(split_off_batch(&mut all, 10, |_| 1).is_empty());
36+
}
37+
38+
#[test]
39+
fn split_single_batch() {
40+
let mut all = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
41+
42+
assert!(split_off_batch(&mut all, 10, |_| 1).is_empty());
43+
}
44+
45+
#[test]
46+
fn split_multi_batch() {
47+
let mut all = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
48+
let rest = split_off_batch(&mut all, 3, |_| 1);
49+
50+
assert_eq!(all, vec![1, 2, 3]);
51+
assert_eq!(rest, vec![4, 5, 6, 7, 8, 9, 10]);
52+
}
53+
54+
#[test]
55+
fn split_batches_until_empty() {
56+
let mut batches = Vec::new();
57+
let mut all = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
58+
59+
loop {
60+
let batch = split_off_batch(&mut all, 3, |_| 1);
61+
if batch.is_empty() {
62+
break;
63+
}
64+
batches.push(std::mem::replace(&mut all, batch));
65+
}
66+
67+
assert_eq!(all, vec![10]);
68+
assert_eq!(batches.len(), 3);
69+
assert_eq!(batches[0], vec![1, 2, 3]);
70+
assert_eq!(batches[1], vec![4, 5, 6]);
71+
assert_eq!(batches[2], vec![7, 8, 9]);
72+
}
73+
74+
#[test]
75+
fn split_batch_with_too_large_element() {
76+
let mut all = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
77+
let rest = split_off_batch(&mut all, 3, |_| 5);
78+
79+
assert_eq!(all, vec![1]);
80+
assert_eq!(rest, vec![2, 3, 4, 5, 6, 7, 8, 9, 10]);
81+
}
82+
}

src/coll/mod.rs

Lines changed: 78 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
mod batch;
12
pub mod options;
23

34
use std::{fmt, sync::Arc};
@@ -9,7 +10,7 @@ use self::options::*;
910
use crate::{
1011
bson_util,
1112
concern::{ReadConcern, WriteConcern},
12-
error::{convert_bulk_errors, ErrorKind, Result},
13+
error::{convert_bulk_errors, BulkWriteError, BulkWriteFailure, ErrorKind, Result},
1314
operation::{
1415
Aggregate,
1516
Count,
@@ -28,6 +29,11 @@ use crate::{
2829
Database,
2930
};
3031

32+
/// Maximum size in bytes of an insert batch.
33+
/// This is intentionally less than the actual max document size, which is 16*1024*1024 bytes, to
34+
/// allow for overhead in the command document.
35+
const MAX_INSERT_DOCS_BYTES: usize = 16 * 1000 * 1000;
36+
3137
/// `Collection` is the client-side abstraction of a MongoDB Collection. It can be used to
3238
/// perform collection-level operations such as CRUD operations. A `Collection` can be obtained
3339
/// through a [`Database`](struct.Database.html) by calling either
@@ -382,8 +388,77 @@ impl Collection {
382388
let mut options = options.into();
383389
resolve_options!(self, options, [write_concern]);
384390

385-
let insert = Insert::new(self.namespace(), docs.into_iter().collect(), options);
386-
self.client().execute_operation(&insert, None)
391+
let mut docs: Vec<Document> = docs.into_iter().collect();
392+
393+
if docs.is_empty() {
394+
return Err(ErrorKind::ArgumentError {
395+
message: "No documents provided to insert_many".to_string(),
396+
}
397+
.into());
398+
}
399+
400+
let ordered = options.as_ref().and_then(|o| o.ordered).unwrap_or(true);
401+
402+
let mut cumulative_failure: Option<BulkWriteFailure> = None;
403+
let mut cumulative_result: Option<InsertManyResult> = None;
404+
405+
let mut n_attempted = 0;
406+
407+
while !docs.is_empty() {
408+
let mut remaining_docs =
409+
batch::split_off_batch(&mut docs, MAX_INSERT_DOCS_BYTES, bson_util::doc_size_bytes);
410+
std::mem::swap(&mut docs, &mut remaining_docs);
411+
let current_batch = remaining_docs;
412+
413+
let current_batch_size = current_batch.len();
414+
n_attempted += current_batch_size;
415+
416+
let insert = Insert::new(self.namespace(), current_batch, options.clone());
417+
match self.client().execute_operation(&insert, None) {
418+
Ok(result) => {
419+
if cumulative_failure.is_none() {
420+
let cumulative_result =
421+
cumulative_result.get_or_insert_with(InsertManyResult::new);
422+
for (index, id) in result.inserted_ids {
423+
cumulative_result
424+
.inserted_ids
425+
.insert(index + n_attempted - current_batch_size, id);
426+
}
427+
}
428+
}
429+
Err(e) => match e.kind.as_ref() {
430+
ErrorKind::BulkWriteError(failure) => {
431+
let failure_ref =
432+
cumulative_failure.get_or_insert_with(BulkWriteFailure::new);
433+
if let Some(ref write_errors) = failure.write_errors {
434+
failure_ref
435+
.write_errors
436+
.get_or_insert_with(Default::default)
437+
.extend(write_errors.iter().map(|error| BulkWriteError {
438+
index: error.index + n_attempted - current_batch_size,
439+
..error.clone()
440+
}));
441+
}
442+
if let Some(ref write_concern_error) = failure.write_concern_error {
443+
failure_ref.write_concern_error = Some(write_concern_error.clone());
444+
}
445+
446+
if ordered {
447+
return Err(ErrorKind::BulkWriteError(
448+
cumulative_failure.unwrap_or_else(BulkWriteFailure::new),
449+
)
450+
.into());
451+
}
452+
}
453+
_ => return Err(e),
454+
},
455+
}
456+
}
457+
458+
match cumulative_failure {
459+
Some(failure) => Err(ErrorKind::BulkWriteError(failure).into()),
460+
None => Ok(cumulative_result.unwrap_or_else(InsertManyResult::new)),
461+
}
387462
}
388463

389464
/// Inserts `doc` into the collection.

src/error.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,7 @@ pub struct WriteError {
305305
#[derive(Debug, PartialEq, Clone, Deserialize)]
306306
pub struct BulkWriteError {
307307
/// Index into the list of operations that this error corresponds to.
308-
pub index: i32,
308+
pub index: usize,
309309

310310
/// Identifies the type of write concern error.
311311
pub code: i32,
@@ -332,6 +332,15 @@ pub struct BulkWriteFailure {
332332
pub write_concern_error: Option<WriteConcernError>,
333333
}
334334

335+
impl BulkWriteFailure {
336+
pub(crate) fn new() -> Self {
337+
BulkWriteFailure {
338+
write_errors: None,
339+
write_concern_error: None,
340+
}
341+
}
342+
}
343+
335344
/// An error that occurred when trying to execute a write operation.
336345
#[derive(Clone, Debug)]
337346
pub enum WriteFailure {

src/results.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,14 @@ pub struct InsertManyResult {
3232
pub inserted_ids: HashMap<usize, Bson>,
3333
}
3434

35+
impl InsertManyResult {
36+
pub(crate) fn new() -> Self {
37+
InsertManyResult {
38+
inserted_ids: HashMap::new(),
39+
}
40+
}
41+
}
42+
3543
/// The result of a [`Collection::update_one`](../struct.Collection.html#method.update_one) or
3644
/// [`Collection::update_many`](../struct.Collection.html#method.update_many) operation.
3745
#[derive(Debug)]

0 commit comments

Comments
 (0)