Skip to content

Commit e27e1f8

Browse files
committed
draft: using arrow-avro
1 parent 35f45b5 commit e27e1f8

File tree

6 files changed

+28
-22
lines changed

6 files changed

+28
-22
lines changed

Cargo.toml

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ version = "50.0.0"
8989
ahash = { version = "0.8", default-features = false, features = [
9090
"runtime-rng",
9191
] }
92-
apache-avro = { version = "0.20", default-features = false }
92+
#apache-avro = { version = "0.20", default-features = false }
9393
arrow = { version = "56.2.0", features = [
9494
"prettyprint",
9595
"chrono-tz",
@@ -103,6 +103,7 @@ arrow-ipc = { version = "56.2.0", default-features = false, features = [
103103
] }
104104
arrow-ord = { version = "56.2.0", default-features = false }
105105
arrow-schema = { version = "56.2.0", default-features = false }
106+
arrow-avro = { version = "56.2.0", default-features = false }
106107
async-trait = "0.1.89"
107108
bigdecimal = "0.4.8"
108109
bytes = "1.10"
@@ -196,6 +197,12 @@ unexpected_cfgs = { level = "warn", check-cfg = [
196197
] }
197198
unused_qualifications = "deny"
198199

200+
201+
# TEMPORARY: override arrow-avro for testing
202+
[patch.crates-io]
203+
arrow = { git = "https://github.com/apache/arrow-rs.git", rev = "b444ea7127ebc8136564bc9af036353d0c90991b" }
204+
arrow-avro = { git = "https://github.com/apache/arrow-rs.git", rev = "b444ea7127ebc8136564bc9af036353d0c90991b" }
205+
199206
# --------------------
200207
# Compilation Profiles
201208
# --------------------

datafusion/common/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ log = { workspace = true }
7171
object_store = { workspace = true, optional = true }
7272
parquet = { workspace = true, optional = true, default-features = true }
7373
paste = "1.0.15"
74-
pyo3 = { version = "0.25", optional = true }
74+
pyo3 = { version = "0.26.0", optional = true }
7575
recursive = { workspace = true, optional = true }
7676
sqlparser = { workspace = true, optional = true }
7777
tokio = { workspace = true }

datafusion/datasource-avro/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@ version.workspace = true
3131
all-features = true
3232

3333
[dependencies]
34-
apache-avro = { workspace = true }
3534
arrow = { workspace = true }
35+
arrow-avro = { workspace = true }
3636
async-trait = { workspace = true }
3737
bytes = { workspace = true }
3838
datafusion-common = { workspace = true, features = ["object_store", "avro"] }

datafusion/datasource-avro/src/avro_to_arrow/mod.rs

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,21 +19,20 @@
1919
//!
2020
//! [Avro]: https://avro.apache.org/docs/1.2.0/
2121
22-
mod arrow_array_reader;
23-
mod reader;
24-
mod schema;
22+
// mod arrow_array_reader;
23+
// mod reader;
24+
// mod schema;
2525

2626
use arrow::datatypes::Schema;
27-
pub use reader::{Reader, ReaderBuilder};
27+
use arrow_avro::reader::ReaderBuilder;
2828

29-
pub use schema::to_arrow_schema;
29+
// pub use schema::to_arrow_schema;
3030
use std::io::Read;
3131

3232
/// Read Avro schema given a reader
3333
pub fn read_avro_schema_from_reader<R: Read>(
3434
reader: &mut R,
3535
) -> datafusion_common::Result<Schema> {
36-
let avro_reader = apache_avro::Reader::new(reader)?;
37-
let schema = avro_reader.writer_schema();
38-
to_arrow_schema(schema)
36+
let avro_reader = ReaderBuilder::new().build(reader)?;
37+
Ok(avro_reader.schema().as_ref().clone())
3938
}

datafusion/datasource-avro/src/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,5 +30,4 @@ pub mod avro_to_arrow;
3030
pub mod file_format;
3131
pub mod source;
3232

33-
pub use apache_avro;
3433
pub use file_format::*;

datafusion/datasource-avro/src/source.rs

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@
2020
use std::any::Any;
2121
use std::sync::Arc;
2222

23-
use crate::avro_to_arrow::Reader as AvroReader;
24-
25-
use arrow::datatypes::SchemaRef;
23+
use arrow::datatypes::{Schema, SchemaRef};
24+
use arrow_avro::reader::{Reader, ReaderBuilder};
25+
use arrow_avro::schema::AvroSchema;
2626
use datafusion_common::error::Result;
2727
use datafusion_common::Statistics;
2828
use datafusion_datasource::file::FileSource;
@@ -51,13 +51,14 @@ impl AvroSource {
5151
Self::default()
5252
}
5353

54-
fn open<R: std::io::Read>(&self, reader: R) -> Result<AvroReader<'static, R>> {
55-
AvroReader::try_new(
56-
reader,
57-
Arc::clone(self.schema.as_ref().expect("Schema must set before open")),
58-
self.batch_size.expect("Batch size must set before open"),
59-
self.projection.clone(),
60-
)
54+
fn open<R: std::io::Read>(&self, reader: R) -> Result<Reader<R>> {
55+
let schema: &Schema = self.schema.as_ref().expect("Schema must set before open");
56+
let avro_schema = AvroSchema::try_from(schema)?;
57+
ReaderBuilder::new()
58+
.with_reader_schema(avro_schema)
59+
.with_batch_size(self.batch_size.expect("Batch size must set before open"))
60+
.build(reader)
61+
.map_err(Into::into)
6162
}
6263
}
6364

0 commit comments

Comments
 (0)