Skip to content

Commit b38d730

Browse files
committed
Prepare initial release
1 parent 95c8981 commit b38d730

File tree

5 files changed

+51
-8
lines changed

5 files changed

+51
-8
lines changed

Cargo.toml

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,7 @@ authors = ["Neville Dipale <[email protected]>"]
55
edition = "2018"
66

77
[dependencies]
8-
bson = { version = "0.14", features = ["decimal128"] }
9-
mongodb = { git = "https://github.com/nevi-me/mongo-rust-driver", branch = "decimal-128-hack" }
8+
bson = "0.14"
9+
mongodb = "0.9.2"
1010
arrow = "0.16.0"
11-
mongodb-schema-parser = { git = "https://github.com/nevi-me/mongodb-schema-parser", branch = "write-bson", default-features = false }
1211
chrono = "0.4"

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ We are curently writing this library due to a need to read MongoDB data into dat
1111
## Features
1212

1313
- [X] Read from a collection to batches
14+
- [X] Write from batches to a collection
1415
- [ ] Infer collection schema
1516
- [ ] Projection predicate push-down
1617
- [ ] Data types

src/lib.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,12 @@
1+
//!
2+
//! MongoDB to Apache Arrow Connector
3+
//!
4+
//! This crate allows reading and writing MongoDB data in the Apache Arrow format.
5+
//! Data is read as `RecordBatch`es from a MongoDB database using the aggregation
6+
//! framework.
7+
//! Arrow `RecordBatch`es are written to MongoDB using an insert_many into a collection.
8+
9+
/// MongoDB reader
110
pub mod reader;
11+
/// MongoDB writer
212
pub mod writer;
3-
4-
#[macro_use(bson)]
5-
extern crate bson;

src/reader.rs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,22 +10,35 @@ use mongodb::{
1010
options::{AggregateOptions, ClientOptions, StreamAddress},
1111
Client,
1212
};
13-
// use mongodb_schema_parser::SchemaParser;
1413

14+
/// Configuration for the MongoDB reader
1515
pub struct ReaderConfig<'a> {
16+
/// The hostname to connect to
1617
pub hostname: &'a str,
18+
/// An optional port, defaults to 27017
1719
pub port: Option<u16>,
18-
// read_preference,
20+
/// The name of the database to read from
1921
pub database: &'a str,
22+
/// The name of the collection to read from
2023
pub collection: &'a str,
2124
}
2225

26+
/// Database reader
2327
pub struct Reader {
28+
/// The MongoDB client, with a connection established
2429
client: Client,
30+
/// The name of the database to read from
2531
database: String,
32+
/// The name of the collection to read from
2633
collection: String,
34+
/// The schema of the data to read
2735
schema: Schema,
36+
/// An internal tracker of the current index that has been read
2837
current_index: usize,
38+
/// The preferred batch size per document.
39+
/// If the documents being read are fairly small, or can fit in memory,
40+
/// a larger batch size is more performant as it would result in
41+
/// less roundtrips to the database.
2942
batch_size: usize,
3043
}
3144

src/writer.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,28 +11,49 @@ use mongodb::{
1111
Client,
1212
};
1313

14+
/// Configuration for the MongoDB writer
1415
pub struct WriterConfig<'a> {
16+
/// The hostname to connect to
1517
pub hostname: &'a str,
18+
/// An optional port, defaults to 27017
1619
pub port: Option<u16>,
20+
/// The name of the database to write to
1721
pub database: &'a str,
22+
/// The name of the collection to write to
1823
pub collection: &'a str,
24+
/// The write mode, whether an existing collection should
25+
/// be appended to or overwritten
1926
pub write_mode: WriteMode,
27+
/// Whether compatible types should be coerced, for example
28+
/// an Int8 type will be written to an Int32 as BSON doesn't have Int8
2029
pub coerce_types: bool,
2130
}
2231

32+
/// The mode to write to the collection in
2333
pub enum WriteMode {
34+
/// Do not drop collection, but append to an existing collection.
35+
/// If the collection does not exist, a new one is created.
2436
Append,
37+
/// Try to drop the collection if it exists.
38+
/// MongoDB returns an error if a collection that does not exist
39+
/// is dropped. We log this to the console, but do not return an error.
2540
Overwrite,
2641
}
2742

43+
/// Database writer
2844
pub struct Writer {
45+
/// The MongoDB client, with a connection established
2946
client: Client,
47+
/// The name of the database to write to
3048
database: String,
49+
/// The name of the collection to write to
3150
collection: String,
51+
/// The schema of the data to write
3252
schema: Schema,
3353
}
3454

3555
impl Writer {
56+
/// Try to create a new writer, with provided writer options and a schema
3657
pub fn try_new(config: &WriterConfig, schema: Schema) -> Result<Self, ()> {
3758
// check if data types can be written
3859
Writer::check_supported_schema(schema.fields(), config.coerce_types)?;
@@ -63,6 +84,7 @@ impl Writer {
6384
})
6485
}
6586

87+
/// Write a batch to the database
6688
pub fn write(&self, batch: &RecordBatch) -> Result<(), ()> {
6789
if batch.schema().as_ref() != &self.schema {
6890
eprintln!("Schema of record batch does not match writer");
@@ -140,6 +162,7 @@ impl Writer {
140162
}
141163
}
142164

165+
/// A private struct that uses a newtype pattern, holds the documents to be written
143166
struct Documents(Vec<bson::Document>);
144167

145168
impl From<&RecordBatch> for Documents {

0 commit comments

Comments
 (0)