Skip to content

Commit 17d2fec

Browse files
committed
update dependencies
closes #4
1 parent f90d5b5 commit 17d2fec

File tree

3 files changed

+47
-28
lines changed

3 files changed

+47
-28
lines changed

Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ readme = "README.md"
1111
repository = "https://github.com/TheDataEngine/mongodb-arrow-connector/ "
1212

1313
[dependencies]
14-
bson = "0.14"
15-
mongodb = "0.9.2"
16-
arrow = "0.16.0"
14+
bson = "1.0"
15+
mongodb = { version = "1.0", default-features = false, features = ["sync"] }
16+
arrow = "1.0"
1717
chrono = "0.4"

src/reader.rs

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,8 @@ use arrow::{
66
record_batch::{RecordBatch, RecordBatchReader},
77
};
88
use bson::{doc, Bson};
9-
use mongodb::{
10-
options::{AggregateOptions, ClientOptions, StreamAddress},
11-
Client,
12-
};
9+
use mongodb::options::{AggregateOptions, ClientOptions, StreamAddress};
10+
use mongodb::sync::Client;
1311

1412
/// Configuration for the MongoDB reader
1513
pub struct ReaderConfig<'a> {
@@ -74,11 +72,11 @@ impl Reader {
7472
}
7573

7674
/// Read the next record batch
77-
pub fn next(&mut self) -> Result<Option<RecordBatch>, ()> {
75+
pub fn next_batch(&mut self) -> Result<Option<RecordBatch>, ()> {
7876
let mut criteria = doc! {};
7977
let mut project = doc! {};
8078
for field in self.schema.fields() {
81-
project.insert(field.name(), bson::Bson::I32(1));
79+
project.insert(field.name(), bson::Bson::Int32(1));
8280
}
8381
criteria.insert("$project", project);
8482
let coll = self
@@ -108,7 +106,7 @@ impl Reader {
108106
}
109107

110108
let docs_len = docs.len();
111-
self.current_index = self.current_index + docs_len;
109+
self.current_index += docs_len;
112110
if docs_len == 0 {
113111
return Ok(None);
114112
}
@@ -135,11 +133,11 @@ impl Reader {
135133
TimeUnit::Millisecond => builder
136134
.field_builder::<TimestampMillisecondBuilder>(i)
137135
.unwrap(),
138-
t @ _ => panic!("Timestamp arrays can only be read as milliseconds, found {:?}. \nPlease read as milliseconds then cast to desired resolution.", t)
136+
t => panic!("Timestamp arrays can only be read as milliseconds, found {:?}. \nPlease read as milliseconds then cast to desired resolution.", t)
139137
};
140138
for v in 0..docs_len {
141139
let doc: &_ = docs.get(v).unwrap();
142-
match doc.get_utc_datetime(field.name()) {
140+
match doc.get_datetime(field.name()) {
143141
Ok(val) => field_builder.append_value(val.timestamp_millis()).unwrap(),
144142
Err(_) => field_builder.append_null().unwrap(),
145143
};
@@ -194,7 +192,7 @@ impl Reader {
194192
}
195193
DataType::List(_dtype) => panic!("Creating lists not yet implemented"),
196194
DataType::Struct(_fields) => panic!("Creating nested structs not yet implemented"),
197-
t @ _ => panic!("Data type {:?} not supported when reading from MongoDB", t),
195+
t => panic!("Data type {:?} not supported when reading from MongoDB", t),
198196
}
199197
}
200198
// append true to all struct records
@@ -206,11 +204,11 @@ impl Reader {
206204
}
207205

208206
impl RecordBatchReader for Reader {
209-
fn schema(&mut self) -> Arc<Schema> {
207+
fn schema(&self) -> Arc<Schema> {
210208
Arc::new(self.schema.clone())
211209
}
212210
fn next_batch(&mut self) -> arrow::error::Result<Option<RecordBatch>> {
213-
self.next().map_err(|_| {
211+
self.next_batch().map_err(|_| {
214212
arrow::error::ArrowError::IoError("Unable to read next batch from MongoDB".to_string())
215213
})
216214
}
@@ -259,7 +257,7 @@ mod tests {
259257
// write results to CSV as the schema would allow
260258
let file = File::create("./target/debug/delays.csv").unwrap();
261259
let mut writer = csv::Writer::new(file);
262-
while let Ok(Some(batch)) = reader.next() {
260+
while let Ok(Some(batch)) = reader.next_batch() {
263261
writer.write(&batch).unwrap();
264262
}
265263
Ok(())

src/writer.rs

Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,8 @@ use arrow::{
66
};
77
use bson::doc;
88
use chrono::{DateTime, NaiveDateTime, Utc};
9-
use mongodb::{
10-
options::{ClientOptions, StreamAddress},
11-
Client,
12-
};
9+
use mongodb::options::{ClientOptions, StreamAddress};
10+
use mongodb::sync::Client;
1311

1412
/// Configuration for the MongoDB writer
1513
pub struct WriterConfig<'a> {
@@ -71,7 +69,7 @@ impl Writer {
7169
.database(config.database)
7270
.collection(config.collection)
7371
.drop(None);
74-
if let Err(_) = drop {
72+
if drop.is_err() {
7573
println!("Collection does not exist, and was not dropped");
7674
}
7775
}
@@ -105,7 +103,7 @@ impl Writer {
105103
}
106104

107105
/// MongoDB supports a subset of Apache Arrow supported types, check if schema can be written
108-
fn check_supported_schema(fields: &Vec<Field>, coerce_types: bool) -> Result<(), ()> {
106+
fn check_supported_schema(fields: &[Field], coerce_types: bool) -> Result<(), ()> {
109107
for field in fields {
110108
let t = field.data_type();
111109
match t {
@@ -131,16 +129,19 @@ impl Writer {
131129
| DataType::Float32
132130
| DataType::Float64
133131
| DataType::Utf8
132+
| DataType::LargeUtf8
134133
| DataType::Timestamp(_, _) => {
135134
// data types supported without coercion
136135
}
137136
DataType::Float16 => {
138137
eprintln!("Float16 arrays not supported");
139138
return Err(());
140139
}
141-
DataType::List(data_type) | DataType::FixedSizeList(data_type, _) => {
140+
DataType::List(data_type)
141+
| DataType::LargeList(data_type)
142+
| DataType::FixedSizeList(data_type, _) => {
142143
Writer::check_supported_schema(
143-
&vec![Field::new(field.name().as_str(), *data_type.clone(), false)],
144+
&[Field::new(field.name().as_str(), *data_type.clone(), false)],
144145
coerce_types,
145146
)?;
146147
}
@@ -152,13 +153,26 @@ impl Writer {
152153
| DataType::Duration(_)
153154
| DataType::Interval(_)
154155
| DataType::Binary
156+
| DataType::LargeBinary
155157
| DataType::FixedSizeBinary(_) => {
156158
eprintln!("Data type {:?} is not supported", t);
157159
return Err(());
158160
}
161+
DataType::Null => {
162+
eprintln!("Data type {:?} is not supported", t);
163+
return Err(());
164+
}
165+
DataType::Union(_) => {
166+
eprintln!("Data type {:?} is not supported", t);
167+
return Err(());
168+
}
169+
DataType::Dictionary(_, _) => {
170+
eprintln!("Data type {:?} is not supported", t);
171+
return Err(());
172+
}
159173
}
160174
}
161-
return Ok(());
175+
Ok(())
162176
}
163177
}
164178

@@ -179,6 +193,7 @@ impl From<&RecordBatch> for Documents {
179193
.as_any()
180194
.downcast_ref::<BooleanArray>()
181195
.expect("Unable to unwrap array");
196+
#[allow(clippy::needless_range_loop)]
182197
for i in 0..len {
183198
if !array.is_null(i) {
184199
documents[i].insert(field.name(), array.value(i));
@@ -196,6 +211,7 @@ impl From<&RecordBatch> for Documents {
196211
.as_any()
197212
.downcast_ref::<Int32Array>()
198213
.expect("Unable to unwrap array");
214+
#[allow(clippy::needless_range_loop)]
199215
for i in 0..len {
200216
if !array.is_null(i) {
201217
documents[i].insert(field.name(), array.value(i));
@@ -208,6 +224,7 @@ impl From<&RecordBatch> for Documents {
208224
.as_any()
209225
.downcast_ref::<Int64Array>()
210226
.expect("Unable to unwrap array");
227+
#[allow(clippy::needless_range_loop)]
211228
for i in 0..len {
212229
if !array.is_null(i) {
213230
documents[i].insert(field.name(), array.value(i));
@@ -220,6 +237,7 @@ impl From<&RecordBatch> for Documents {
220237
.as_any()
221238
.downcast_ref::<Float32Array>()
222239
.expect("Unable to unwrap array");
240+
#[allow(clippy::needless_range_loop)]
223241
for i in 0..len {
224242
if !array.is_null(i) {
225243
documents[i].insert(field.name(), array.value(i));
@@ -231,6 +249,7 @@ impl From<&RecordBatch> for Documents {
231249
.as_any()
232250
.downcast_ref::<Float64Array>()
233251
.expect("Unable to unwrap array");
252+
#[allow(clippy::needless_range_loop)]
234253
for i in 0..len {
235254
if !array.is_null(i) {
236255
documents[i].insert(field.name(), array.value(i));
@@ -244,12 +263,13 @@ impl From<&RecordBatch> for Documents {
244263
.as_any()
245264
.downcast_ref::<TimestampMillisecondArray>()
246265
.expect("Unable to unwrap array");
266+
#[allow(clippy::needless_range_loop)]
247267
for i in 0..len {
248268
if !array.is_null(i) {
249269
let value = array.value(i);
250270
documents[i].insert(
251271
field.name(),
252-
bson::Bson::UtcDatetime(DateTime::<Utc>::from_utc(
272+
bson::Bson::DateTime(DateTime::<Utc>::from_utc(
253273
NaiveDateTime::from_timestamp(value / 1000, 0),
254274
Utc,
255275
)),
@@ -268,6 +288,7 @@ impl From<&RecordBatch> for Documents {
268288
.as_any()
269289
.downcast_ref::<StringArray>()
270290
.expect("Unable to unwrap array");
291+
#[allow(clippy::needless_range_loop)]
271292
for i in 0..len {
272293
if !array.is_null(i) {
273294
documents[i].insert(field.name(), array.value(i));
@@ -279,7 +300,7 @@ impl From<&RecordBatch> for Documents {
279300
panic!("Write support for lists not yet implemented")
280301
}
281302
DataType::Struct(_) => panic!("Write support for structs not yet implemented"),
282-
t @ _ => panic!("Encountered unwritable data type {:?}", t),
303+
t => panic!("Encountered unwritable data type {:?}", t),
283304
});
284305

285306
Self(documents)
@@ -334,7 +355,7 @@ mod tests {
334355
let writer = Writer::try_new(&writer_config, schema)?;
335356

336357
// read from a collection and write to another
337-
while let Ok(Some(batch)) = reader.next() {
358+
while let Ok(Some(batch)) = reader.next_batch() {
338359
writer.write(&batch)?
339360
}
340361
Ok(())

0 commit comments

Comments
 (0)