diff --git a/Cargo.lock b/Cargo.lock index 02b88e99fd1db..3c041c5445f5b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -191,35 +191,6 @@ version = "1.0.100" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a23eb6b1614318a8071c9b2521f36b424b2c83db5eb3a0fead4a6c0809af6e61" -[[package]] -name = "apache-avro" -version = "0.21.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "36fa98bc79671c7981272d91a8753a928ff6a1cd8e4f20a44c45bd5d313840bf" -dependencies = [ - "bigdecimal", - "bon", - "bzip2", - "crc32fast", - "digest", - "liblzma", - "log", - "miniz_oxide", - "num-bigint", - "quad-rand", - "rand 0.9.2", - "regex-lite", - "serde", - "serde_bytes", - "serde_json", - "snap", - "strum 0.27.2", - "strum_macros 0.27.2", - "thiserror", - "uuid", - "zstd", -] - [[package]] name = "arrayref" version = "0.3.9" @@ -234,9 +205,8 @@ checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" [[package]] name = "arrow" -version = "57.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cb372a7cbcac02a35d3fb7b3fc1f969ec078e871f9bb899bf00a2e1809bec8a3" +version = "57.2.0" +source = "git+https://github.com/jecsand838/arrow-rs?branch=avro-reader-projection#2b527656293781bbea03014c6e55ff5d4559371c" dependencies = [ "arrow-arith", "arrow-array", @@ -257,9 +227,8 @@ dependencies = [ [[package]] name = "arrow-arith" -version = "57.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f377dcd19e440174596d83deb49cd724886d91060c07fec4f67014ef9d54049" +version = "57.2.0" +source = "git+https://github.com/jecsand838/arrow-rs?branch=avro-reader-projection#2b527656293781bbea03014c6e55ff5d4559371c" dependencies = [ "arrow-array", "arrow-buffer", @@ -271,9 +240,8 @@ dependencies = [ [[package]] name = "arrow-array" -version = "57.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a23eaff85a44e9fa914660fb0d0bb00b79c4a3d888b5334adb3ea4330c84f002" +version = "57.2.0" +source = "git+https://github.com/jecsand838/arrow-rs?branch=avro-reader-projection#2b527656293781bbea03014c6e55ff5d4559371c" dependencies = [ "ahash 0.8.12", "arrow-buffer", @@ -288,11 +256,32 @@ dependencies = [ "num-traits", ] +[[package]] +name = "arrow-avro" +version = "57.2.0" +source = "git+https://github.com/jecsand838/arrow-rs?branch=avro-reader-projection#2b527656293781bbea03014c6e55ff5d4559371c" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-schema", + "bzip2", + "crc", + "flate2", + "indexmap 2.12.1", + "liblzma", + "rand 0.9.2", + "serde", + "serde_json", + "snap", + "strum_macros 0.27.2", + "uuid", + "zstd", +] + [[package]] name = "arrow-buffer" -version = "57.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a2819d893750cb3380ab31ebdc8c68874dd4429f90fd09180f3c93538bd21626" +version = "57.2.0" +source = "git+https://github.com/jecsand838/arrow-rs?branch=avro-reader-projection#2b527656293781bbea03014c6e55ff5d4559371c" dependencies = [ "bytes", "half", @@ -302,9 +291,8 @@ dependencies = [ [[package]] name = "arrow-cast" -version = "57.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e3d131abb183f80c450d4591dc784f8d7750c50c6e2bc3fcaad148afc8361271" +version = "57.2.0" +source = "git+https://github.com/jecsand838/arrow-rs?branch=avro-reader-projection#2b527656293781bbea03014c6e55ff5d4559371c" dependencies = [ "arrow-array", "arrow-buffer", @@ -324,9 +312,8 @@ dependencies = [ [[package]] name = "arrow-csv" -version = "57.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2275877a0e5e7e7c76954669366c2aa1a829e340ab1f612e647507860906fb6b" +version = "57.2.0" +source = "git+https://github.com/jecsand838/arrow-rs?branch=avro-reader-projection#2b527656293781bbea03014c6e55ff5d4559371c" dependencies = [ "arrow-array", "arrow-cast", @@ -339,9 +326,8 @@ dependencies = [ [[package]] name = "arrow-data" -version = "57.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05738f3d42cb922b9096f7786f606fcb8669260c2640df8490533bb2fa38c9d3" +version = "57.2.0" +source = "git+https://github.com/jecsand838/arrow-rs?branch=avro-reader-projection#2b527656293781bbea03014c6e55ff5d4559371c" dependencies = [ "arrow-buffer", "arrow-schema", @@ -352,9 +338,8 @@ dependencies = [ [[package]] name = "arrow-flight" -version = "57.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b5f57c3d39d1b1b7c1376a772ea86a131e7da310aed54ebea9363124bb885e3" +version = "57.2.0" +source = "git+https://github.com/jecsand838/arrow-rs?branch=avro-reader-projection#2b527656293781bbea03014c6e55ff5d4559371c" dependencies = [ "arrow-arith", "arrow-array", @@ -380,9 +365,8 @@ dependencies = [ [[package]] name = "arrow-ipc" -version = "57.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d09446e8076c4b3f235603d9ea7c5494e73d441b01cd61fb33d7254c11964b3" +version = "57.2.0" +source = "git+https://github.com/jecsand838/arrow-rs?branch=avro-reader-projection#2b527656293781bbea03014c6e55ff5d4559371c" dependencies = [ "arrow-array", "arrow-buffer", @@ -396,9 +380,8 @@ dependencies = [ [[package]] name = "arrow-json" -version = "57.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "371ffd66fa77f71d7628c63f209c9ca5341081051aa32f9c8020feb0def787c0" +version = "57.2.0" +source = "git+https://github.com/jecsand838/arrow-rs?branch=avro-reader-projection#2b527656293781bbea03014c6e55ff5d4559371c" dependencies = [ "arrow-array", "arrow-buffer", @@ -420,9 +403,8 @@ dependencies = [ [[package]] name = "arrow-ord" -version = "57.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cbc94fc7adec5d1ba9e8cd1b1e8d6f72423b33fe978bf1f46d970fafab787521" +version = "57.2.0" +source = "git+https://github.com/jecsand838/arrow-rs?branch=avro-reader-projection#2b527656293781bbea03014c6e55ff5d4559371c" dependencies = [ "arrow-array", "arrow-buffer", @@ -433,9 +415,8 @@ dependencies = [ [[package]] name = "arrow-row" -version = "57.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "169676f317157dc079cc5def6354d16db63d8861d61046d2f3883268ced6f99f" +version = "57.2.0" +source = "git+https://github.com/jecsand838/arrow-rs?branch=avro-reader-projection#2b527656293781bbea03014c6e55ff5d4559371c" dependencies = [ "arrow-array", "arrow-buffer", @@ -446,9 +427,8 @@ dependencies = [ [[package]] name = "arrow-schema" -version = "57.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d27609cd7dd45f006abae27995c2729ef6f4b9361cde1ddd019dc31a5aa017e0" +version = "57.2.0" +source = "git+https://github.com/jecsand838/arrow-rs?branch=avro-reader-projection#2b527656293781bbea03014c6e55ff5d4559371c" dependencies = [ "bitflags 2.9.4", "serde", @@ -458,9 +438,8 @@ dependencies = [ [[package]] name = "arrow-select" -version = "57.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae980d021879ea119dd6e2a13912d81e64abed372d53163e804dfe84639d8010" +version = "57.2.0" +source = "git+https://github.com/jecsand838/arrow-rs?branch=avro-reader-projection#2b527656293781bbea03014c6e55ff5d4559371c" dependencies = [ "ahash 0.8.12", "arrow-array", @@ -472,9 +451,8 @@ dependencies = [ [[package]] name = "arrow-string" -version = "57.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf35e8ef49dcf0c5f6d175edee6b8af7b45611805333129c541a8b89a0fc0534" +version = "57.2.0" +source = "git+https://github.com/jecsand838/arrow-rs?branch=avro-reader-projection#2b527656293781bbea03014c6e55ff5d4559371c" dependencies = [ "arrow-array", "arrow-buffer", @@ -1029,7 +1007,6 @@ dependencies = [ "num-bigint", "num-integer", "num-traits", - "serde", ] [[package]] @@ -1184,31 +1161,6 @@ dependencies = [ "serde_with", ] -[[package]] -name = "bon" -version = "3.8.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ebeb9aaf9329dff6ceb65c689ca3db33dbf15f324909c60e4e5eef5701ce31b1" -dependencies = [ - "bon-macros", - "rustversion", -] - -[[package]] -name = "bon-macros" -version = "3.8.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77e9d642a7e3a318e37c2c9427b5a6a48aa1ad55dcd986f3034ab2239045a645" -dependencies = [ - "darling", - "ident_case", - "prettyplease", - "proc-macro2", - "quote", - "rustversion", - "syn 2.0.111", -] - [[package]] name = "borsh" version = "1.5.7" @@ -1640,6 +1592,21 @@ dependencies = [ "libc", ] +[[package]] +name = "crc" +version = "3.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5eb8a2a1cd12ab0d987a5d5e825195d372001a4094a0376319d5a0ad71c1ba0d" +dependencies = [ + "crc-catalog", +] + +[[package]] +name = "crc-catalog" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" + [[package]] name = "crc32fast" version = "1.5.0" @@ -2007,7 +1974,6 @@ name = "datafusion-common" version = "51.0.0" dependencies = [ "ahash 0.8.12", - "apache-avro", "arrow", "arrow-ipc", "chrono", @@ -2099,19 +2065,16 @@ dependencies = [ name = "datafusion-datasource-avro" version = "51.0.0" dependencies = [ - "apache-avro", "arrow", + "arrow-avro", "async-trait", "bytes", "datafusion-common", "datafusion-datasource", - "datafusion-physical-expr-common", "datafusion-physical-plan", "datafusion-session", "futures", - "num-traits", "object_store", - "serde_json", ] [[package]] @@ -4260,7 +4223,6 @@ checksum = "a5e44f723f1133c9deac646763579fdb3ac745e418f2a7af9cd0c431da1f20b9" dependencies = [ "num-integer", "num-traits", - "serde", ] [[package]] @@ -4461,14 +4423,12 @@ dependencies = [ [[package]] name = "parquet" -version = "57.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be3e4f6d320dd92bfa7d612e265d7d08bba0a240bab86af3425e1d255a511d89" +version = "57.2.0" +source = "git+https://github.com/jecsand838/arrow-rs?branch=avro-reader-projection#2b527656293781bbea03014c6e55ff5d4559371c" dependencies = [ "ahash 0.8.12", "arrow-array", "arrow-buffer", - "arrow-cast", "arrow-data", "arrow-ipc", "arrow-schema", @@ -4930,12 +4890,6 @@ dependencies = [ "syn 1.0.109", ] -[[package]] -name = "quad-rand" -version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a651516ddc9168ebd67b24afd085a718be02f8858fe406591b013d101ce2f40" - [[package]] name = "quick-xml" version = "0.38.3" @@ -5640,16 +5594,6 @@ dependencies = [ "serde_derive", ] -[[package]] -name = "serde_bytes" -version = "0.11.19" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a5d440709e79d88e51ac01c4b72fc6cb7314017bb7da9eeff678aa94c10e3ea8" -dependencies = [ - "serde", - "serde_core", -] - [[package]] name = "serde_core" version = "1.0.228" @@ -6849,7 +6793,6 @@ checksum = "e2e054861b4bd027cd373e18e8d8d8e6548085000e41290d95ce0c373a654b4a" dependencies = [ "getrandom 0.3.4", "js-sys", - "serde_core", "wasm-bindgen", ] diff --git a/Cargo.toml b/Cargo.toml index c9afc5fcb54be..caee4499d1729 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -90,20 +90,26 @@ version = "51.0.0" ahash = { version = "0.8", default-features = false, features = [ "runtime-rng", ] } -apache-avro = { version = "0.21", default-features = false } -arrow = { version = "57.1.0", features = [ +arrow = { git = "https://github.com/jecsand838/arrow-rs", branch = "avro-reader-projection", features = [ "prettyprint", "chrono-tz", -] } -arrow-buffer = { version = "57.1.0", default-features = false } -arrow-flight = { version = "57.1.0", features = [ +] } # fixme +arrow-avro = { git = "https://github.com/jecsand838/arrow-rs", branch = "avro-reader-projection", default-features = false, features = [ + "deflate", + "snappy", + "zstd", + "bzip2", + "xz", +] }# fixme +arrow-buffer = { git = "https://github.com/jecsand838/arrow-rs", branch = "avro-reader-projection", default-features = false }# fixme +arrow-flight = { git = "https://github.com/jecsand838/arrow-rs", branch = "avro-reader-projection", features = [# fixme "flight-sql-experimental", ] } -arrow-ipc = { version = "57.1.0", default-features = false, features = [ +arrow-ipc = { git = "https://github.com/jecsand838/arrow-rs", branch = "avro-reader-projection", default-features = false, features = [# fixme "lz4", ] } -arrow-ord = { version = "57.1.0", default-features = false } -arrow-schema = { version = "57.1.0", default-features = false } +arrow-ord = { git = "https://github.com/jecsand838/arrow-rs", branch = "avro-reader-projection", default-features = false }# fixme +arrow-schema = { git = "https://github.com/jecsand838/arrow-rs", branch = "avro-reader-projection", default-features = false }# fixme async-trait = "0.1.89" bigdecimal = "0.4.8" bytes = "1.11" @@ -166,7 +172,7 @@ log = "^0.4" num-traits = { version = "0.2" } object_store = { version = "0.12.4", default-features = false } parking_lot = "0.12" -parquet = { version = "57.1.0", default-features = false, features = [ +parquet = { git = "https://github.com/jecsand838/arrow-rs", branch = "avro-reader-projection", default-features = false, features = [ # fixme "arrow", "async", "object_store", diff --git a/datafusion/common/Cargo.toml b/datafusion/common/Cargo.toml index 5c36e33c7263d..16bfaff88be40 100644 --- a/datafusion/common/Cargo.toml +++ b/datafusion/common/Cargo.toml @@ -41,7 +41,6 @@ workspace = true name = "datafusion_common" [features] -avro = ["apache-avro"] backtrace = [] parquet_encryption = [ "parquet", @@ -55,12 +54,6 @@ sql = ["sqlparser"] [dependencies] ahash = { workspace = true } -apache-avro = { workspace = true, features = [ - "bzip", - "snappy", - "xz", - "zstandard", -], optional = true } arrow = { workspace = true } arrow-ipc = { workspace = true } chrono = { workspace = true } diff --git a/datafusion/common/src/error.rs b/datafusion/common/src/error.rs index 4f681896dfc66..224e8831f3477 100644 --- a/datafusion/common/src/error.rs +++ b/datafusion/common/src/error.rs @@ -48,8 +48,6 @@ use std::sync::Arc; use crate::utils::datafusion_strsim::normalized_levenshtein; use crate::utils::quote_identifier; use crate::{Column, DFSchema, Diagnostic, TableReference}; -#[cfg(feature = "avro")] -use apache_avro::Error as AvroError; use arrow::error::ArrowError; #[cfg(feature = "parquet")] use parquet::errors::ParquetError; @@ -76,9 +74,6 @@ pub enum DataFusionError { /// Error when reading / writing Parquet data. #[cfg(feature = "parquet")] ParquetError(Box), - /// Error when reading Avro data. - #[cfg(feature = "avro")] - AvroError(Box), /// Error when reading / writing to / from an object_store (e.g. S3 or LocalFile) #[cfg(feature = "object_store")] ObjectStore(Box), @@ -332,13 +327,6 @@ impl From for DataFusionError { } } -#[cfg(feature = "avro")] -impl From for DataFusionError { - fn from(e: AvroError) -> Self { - DataFusionError::AvroError(Box::new(e)) - } -} - #[cfg(feature = "object_store")] impl From for DataFusionError { fn from(e: object_store::Error) -> Self { @@ -389,8 +377,6 @@ impl Error for DataFusionError { DataFusionError::ArrowError(e, _) => Some(e.as_ref()), #[cfg(feature = "parquet")] DataFusionError::ParquetError(e) => Some(e.as_ref()), - #[cfg(feature = "avro")] - DataFusionError::AvroError(e) => Some(e.as_ref()), #[cfg(feature = "object_store")] DataFusionError::ObjectStore(e) => Some(e.as_ref()), DataFusionError::IoError(e) => Some(e), @@ -520,8 +506,6 @@ impl DataFusionError { DataFusionError::ArrowError(_, _) => "Arrow error: ", #[cfg(feature = "parquet")] DataFusionError::ParquetError(_) => "Parquet error: ", - #[cfg(feature = "avro")] - DataFusionError::AvroError(_) => "Avro error: ", #[cfg(feature = "object_store")] DataFusionError::ObjectStore(_) => "Object Store error: ", DataFusionError::IoError(_) => "IO error: ", @@ -561,8 +545,6 @@ impl DataFusionError { } #[cfg(feature = "parquet")] DataFusionError::ParquetError(ref desc) => Cow::Owned(desc.to_string()), - #[cfg(feature = "avro")] - DataFusionError::AvroError(ref desc) => Cow::Owned(desc.to_string()), DataFusionError::IoError(ref desc) => Cow::Owned(desc.to_string()), #[cfg(feature = "sql")] DataFusionError::SQL(ref desc, ref backtrace) => { diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index d2ecd34886def..500bdc1d7ed7e 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -43,7 +43,7 @@ nested_expressions = ["datafusion-functions-nested"] # This feature is deprecated. Use the `nested_expressions` feature instead. array_expressions = ["nested_expressions"] # Used to enable the avro format -avro = ["datafusion-common/avro", "datafusion-datasource-avro"] +avro = ["datafusion-datasource-avro"] backtrace = ["datafusion-common/backtrace"] compression = [ "liblzma", diff --git a/datafusion/core/src/datasource/file_format/avro.rs b/datafusion/core/src/datasource/file_format/avro.rs index b287b1ef3a4e9..31a84f81fa5ef 100644 --- a/datafusion/core/src/datasource/file_format/avro.rs +++ b/datafusion/core/src/datasource/file_format/avro.rs @@ -109,7 +109,7 @@ mod tests { "double_col: Float64", "date_string_col: Binary", "string_col: Binary", - "timestamp_col: Timestamp(Microsecond, None)", + "timestamp_col: Timestamp(Microsecond, Some(\"+00:00\"))", ], x ); @@ -118,18 +118,18 @@ mod tests { assert_eq!(batches.len(), 1); assert_snapshot!(batches_to_string(&batches),@r###" - +----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+ - | id | bool_col | tinyint_col | smallint_col | int_col | bigint_col | float_col | double_col | date_string_col | string_col | timestamp_col | - +----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+ - | 4 | true | 0 | 0 | 0 | 0 | 0.0 | 0.0 | 30332f30312f3039 | 30 | 2009-03-01T00:00:00 | - | 5 | false | 1 | 1 | 1 | 10 | 1.1 | 10.1 | 30332f30312f3039 | 31 | 2009-03-01T00:01:00 | - | 6 | true | 0 | 0 | 0 | 0 | 0.0 | 0.0 | 30342f30312f3039 | 30 | 2009-04-01T00:00:00 | - | 7 | false | 1 | 1 | 1 | 10 | 1.1 | 10.1 | 30342f30312f3039 | 31 | 2009-04-01T00:01:00 | - | 2 | true | 0 | 0 | 0 | 0 | 0.0 | 0.0 | 30322f30312f3039 | 30 | 2009-02-01T00:00:00 | - | 3 | false | 1 | 1 | 1 | 10 | 1.1 | 10.1 | 30322f30312f3039 | 31 | 2009-02-01T00:01:00 | - | 0 | true | 0 | 0 | 0 | 0 | 0.0 | 0.0 | 30312f30312f3039 | 30 | 2009-01-01T00:00:00 | - | 1 | false | 1 | 1 | 1 | 10 | 1.1 | 10.1 | 30312f30312f3039 | 31 | 2009-01-01T00:01:00 | - +----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+ + +----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+----------------------+ + | id | bool_col | tinyint_col | smallint_col | int_col | bigint_col | float_col | double_col | date_string_col | string_col | timestamp_col | + +----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+----------------------+ + | 4 | true | 0 | 0 | 0 | 0 | 0.0 | 0.0 | 30332f30312f3039 | 30 | 2009-03-01T00:00:00Z | + | 5 | false | 1 | 1 | 1 | 10 | 1.1 | 10.1 | 30332f30312f3039 | 31 | 2009-03-01T00:01:00Z | + | 6 | true | 0 | 0 | 0 | 0 | 0.0 | 0.0 | 30342f30312f3039 | 30 | 2009-04-01T00:00:00Z | + | 7 | false | 1 | 1 | 1 | 10 | 1.1 | 10.1 | 30342f30312f3039 | 31 | 2009-04-01T00:01:00Z | + | 2 | true | 0 | 0 | 0 | 0 | 0.0 | 0.0 | 30322f30312f3039 | 30 | 2009-02-01T00:00:00Z | + | 3 | false | 1 | 1 | 1 | 10 | 1.1 | 10.1 | 30322f30312f3039 | 31 | 2009-02-01T00:01:00Z | + | 0 | true | 0 | 0 | 0 | 0 | 0.0 | 0.0 | 30312f30312f3039 | 30 | 2009-01-01T00:00:00Z | + | 1 | false | 1 | 1 | 1 | 10 | 1.1 | 10.1 | 30312f30312f3039 | 31 | 2009-01-01T00:01:00Z | + +----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+----------------------+ "###); Ok(()) } diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs index e83934a8e281d..09351d9117e63 100644 --- a/datafusion/core/src/lib.rs +++ b/datafusion/core/src/lib.rs @@ -787,7 +787,7 @@ pub use object_store; pub use parquet; #[cfg(feature = "avro")] -pub use datafusion_datasource_avro::apache_avro; +pub use datafusion_datasource_avro::arrow_avro; // re-export DataFusion sub-crates at the top level. Use `pub use *` // so that the contents of the subcrates appears in rustdocs diff --git a/datafusion/datasource-avro/Cargo.toml b/datafusion/datasource-avro/Cargo.toml index c9299aeb101da..28439c46addda 100644 --- a/datafusion/datasource-avro/Cargo.toml +++ b/datafusion/datasource-avro/Cargo.toml @@ -31,21 +31,18 @@ version.workspace = true all-features = true [dependencies] -apache-avro = { workspace = true } arrow = { workspace = true } +arrow-avro = { workspace = true } async-trait = { workspace = true } bytes = { workspace = true } -datafusion-common = { workspace = true, features = ["object_store", "avro"] } +datafusion-common = { workspace = true, features = ["object_store"] } datafusion-datasource = { workspace = true } -datafusion-physical-expr-common = { workspace = true } datafusion-physical-plan = { workspace = true } datafusion-session = { workspace = true } futures = { workspace = true } -num-traits = { workspace = true } object_store = { workspace = true } [dev-dependencies] -serde_json = { workspace = true } # Note: add additional linter rules in lib.rs. # Rust does not support workspace + new linter rules in subcrates yet diff --git a/datafusion/datasource-avro/src/avro_to_arrow/arrow_array_reader.rs b/datafusion/datasource-avro/src/avro_to_arrow/arrow_array_reader.rs deleted file mode 100644 index ea676a7611db9..0000000000000 --- a/datafusion/datasource-avro/src/avro_to_arrow/arrow_array_reader.rs +++ /dev/null @@ -1,1807 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Avro to Arrow array readers - -use apache_avro::schema::RecordSchema; -use apache_avro::{ - Error as AvroError, Reader as AvroReader, - error::Details as AvroErrorDetails, - schema::{Schema as AvroSchema, SchemaKind}, - types::Value, -}; -use arrow::array::{ - Array, ArrayBuilder, ArrayData, ArrayDataBuilder, ArrayRef, BooleanBuilder, - LargeStringArray, ListBuilder, NullArray, OffsetSizeTrait, PrimitiveArray, - StringArray, StringBuilder, StringDictionaryBuilder, make_array, -}; -use arrow::array::{BinaryArray, FixedSizeBinaryArray, GenericListArray}; -use arrow::buffer::{Buffer, MutableBuffer}; -use arrow::datatypes::{ - ArrowDictionaryKeyType, ArrowNumericType, ArrowPrimitiveType, DataType, Date32Type, - Date64Type, Field, Float32Type, Float64Type, Int8Type, Int16Type, Int32Type, - Int64Type, Time32MillisecondType, Time32SecondType, Time64MicrosecondType, - Time64NanosecondType, TimeUnit, TimestampMicrosecondType, TimestampMillisecondType, - TimestampNanosecondType, TimestampSecondType, UInt8Type, UInt16Type, UInt32Type, - UInt64Type, -}; -use arrow::datatypes::{Fields, SchemaRef}; -use arrow::error::ArrowError; -use arrow::error::ArrowError::SchemaError; -use arrow::error::Result as ArrowResult; -use arrow::record_batch::RecordBatch; -use arrow::util::bit_util; -use datafusion_common::arrow_err; -use datafusion_common::error::Result; -use num_traits::NumCast; -use std::collections::BTreeMap; -use std::io::Read; -use std::sync::Arc; - -type RecordSlice<'a> = &'a [&'a Vec<(String, Value)>]; - -pub struct AvroArrowArrayReader<'a, R: Read> { - reader: AvroReader<'a, R>, - schema: SchemaRef, - schema_lookup: BTreeMap, -} - -impl AvroArrowArrayReader<'_, R> { - pub fn try_new(reader: R, schema: SchemaRef) -> Result { - let reader = AvroReader::new(reader)?; - let writer_schema = reader.writer_schema().clone(); - let schema_lookup = Self::schema_lookup(writer_schema)?; - Ok(Self { - reader, - schema, - schema_lookup, - }) - } - - pub fn schema_lookup(schema: AvroSchema) -> Result> { - match schema { - AvroSchema::Record(RecordSchema { - fields, mut lookup, .. - }) => { - for field in fields { - Self::child_schema_lookup(&field.name, &field.schema, &mut lookup)?; - } - Ok(lookup) - } - _ => arrow_err!(SchemaError( - "expected avro schema to be a record".to_string(), - )), - } - } - - fn child_schema_lookup<'b>( - parent_field_name: &str, - schema: &AvroSchema, - schema_lookup: &'b mut BTreeMap, - ) -> Result<&'b BTreeMap> { - match schema { - AvroSchema::Union(us) => { - let has_nullable = us - .find_schema_with_known_schemata::( - &Value::Null, - None, - &None, - ) - .is_some(); - let sub_schemas = us.variants(); - if has_nullable - && sub_schemas.len() == 2 - && let Some(sub_schema) = - sub_schemas.iter().find(|&s| !matches!(s, AvroSchema::Null)) - { - Self::child_schema_lookup( - parent_field_name, - sub_schema, - schema_lookup, - )?; - } - } - AvroSchema::Record(RecordSchema { fields, lookup, .. }) => { - lookup.iter().for_each(|(field_name, pos)| { - schema_lookup - .insert(format!("{parent_field_name}.{field_name}"), *pos); - }); - - for field in fields { - let sub_parent_field_name = - format!("{}.{}", parent_field_name, field.name); - Self::child_schema_lookup( - &sub_parent_field_name, - &field.schema, - schema_lookup, - )?; - } - } - AvroSchema::Array(schema) => { - Self::child_schema_lookup( - parent_field_name, - &schema.items, - schema_lookup, - )?; - } - _ => (), - } - Ok(schema_lookup) - } - - /// Read the next batch of records - pub fn next_batch(&mut self, batch_size: usize) -> Option> { - let rows_result = self - .reader - .by_ref() - .take(batch_size) - .map(|value| match value { - Ok(Value::Record(v)) => Ok(v), - Err(e) => Err(ArrowError::ParseError(format!( - "Failed to parse avro value: {e}" - ))), - other => Err(ArrowError::ParseError(format!( - "Row needs to be of type object, got: {other:?}" - ))), - }) - .collect::>>>(); - - let rows = match rows_result { - // Return error early - Err(e) => return Some(Err(e)), - // No rows: return None early - Ok(rows) if rows.is_empty() => return None, - Ok(rows) => rows, - }; - - let rows = rows.iter().collect::>>(); - let arrays = self.build_struct_array(&rows, "", self.schema.fields()); - - Some(arrays.and_then(|arr| RecordBatch::try_new(Arc::clone(&self.schema), arr))) - } - - fn build_boolean_array(&self, rows: RecordSlice, col_name: &str) -> ArrayRef { - let mut builder = BooleanBuilder::with_capacity(rows.len()); - for row in rows { - if let Some(value) = self.field_lookup(col_name, row) { - if let Some(boolean) = resolve_boolean(value) { - builder.append_value(boolean) - } else { - builder.append_null(); - } - } else { - builder.append_null(); - } - } - Arc::new(builder.finish()) - } - - fn build_primitive_array(&self, rows: RecordSlice, col_name: &str) -> ArrayRef - where - T: ArrowNumericType + Resolver, - T::Native: NumCast, - { - Arc::new( - rows.iter() - .map(|row| { - self.field_lookup(col_name, row) - .and_then(|value| resolve_item::(value)) - }) - .collect::>(), - ) - } - - #[inline(always)] - fn build_string_dictionary_builder( - &self, - row_len: usize, - ) -> StringDictionaryBuilder - where - T: ArrowPrimitiveType + ArrowDictionaryKeyType, - { - StringDictionaryBuilder::with_capacity(row_len, row_len, row_len) - } - - fn build_wrapped_list_array( - &self, - rows: RecordSlice, - col_name: &str, - key_type: &DataType, - ) -> ArrowResult { - match *key_type { - DataType::Int8 => { - let dtype = DataType::Dictionary( - Box::new(DataType::Int8), - Box::new(DataType::Utf8), - ); - self.list_array_string_array_builder::(&dtype, col_name, rows) - } - DataType::Int16 => { - let dtype = DataType::Dictionary( - Box::new(DataType::Int16), - Box::new(DataType::Utf8), - ); - self.list_array_string_array_builder::(&dtype, col_name, rows) - } - DataType::Int32 => { - let dtype = DataType::Dictionary( - Box::new(DataType::Int32), - Box::new(DataType::Utf8), - ); - self.list_array_string_array_builder::(&dtype, col_name, rows) - } - DataType::Int64 => { - let dtype = DataType::Dictionary( - Box::new(DataType::Int64), - Box::new(DataType::Utf8), - ); - self.list_array_string_array_builder::(&dtype, col_name, rows) - } - DataType::UInt8 => { - let dtype = DataType::Dictionary( - Box::new(DataType::UInt8), - Box::new(DataType::Utf8), - ); - self.list_array_string_array_builder::(&dtype, col_name, rows) - } - DataType::UInt16 => { - let dtype = DataType::Dictionary( - Box::new(DataType::UInt16), - Box::new(DataType::Utf8), - ); - self.list_array_string_array_builder::(&dtype, col_name, rows) - } - DataType::UInt32 => { - let dtype = DataType::Dictionary( - Box::new(DataType::UInt32), - Box::new(DataType::Utf8), - ); - self.list_array_string_array_builder::(&dtype, col_name, rows) - } - DataType::UInt64 => { - let dtype = DataType::Dictionary( - Box::new(DataType::UInt64), - Box::new(DataType::Utf8), - ); - self.list_array_string_array_builder::(&dtype, col_name, rows) - } - ref e => Err(SchemaError(format!( - "Data type is currently not supported for dictionaries in list : {e}" - ))), - } - } - - #[inline(always)] - fn list_array_string_array_builder( - &self, - data_type: &DataType, - col_name: &str, - rows: RecordSlice, - ) -> ArrowResult - where - D: ArrowPrimitiveType + ArrowDictionaryKeyType, - { - let mut builder: Box = match data_type { - DataType::Utf8 => { - let values_builder = StringBuilder::with_capacity(rows.len(), 5); - Box::new(ListBuilder::new(values_builder)) - } - DataType::Dictionary(_, _) => { - let values_builder = - self.build_string_dictionary_builder::(rows.len() * 5); - Box::new(ListBuilder::new(values_builder)) - } - e => { - return Err(SchemaError(format!( - "Nested list data builder type is not supported: {e}" - ))); - } - }; - - for row in rows { - if let Some(value) = self.field_lookup(col_name, row) { - let value = maybe_resolve_union(value); - // value can be an array or a scalar - let vals: Vec> = if let Value::String(v) = value { - vec![Some(v.to_string())] - } else if let Value::Array(n) = value { - n.iter() - .map(resolve_string) - .collect::>>>()? - .into_iter() - .collect::>>() - } else if let Value::Null = value { - vec![None] - } else if !matches!(value, Value::Record(_)) { - vec![resolve_string(value)?] - } else { - return Err(SchemaError( - "Only scalars are currently supported in Avro arrays".to_string(), - )); - }; - - // TODO: ARROW-10335: APIs of dictionary arrays and others are different. Unify - // them. - match data_type { - DataType::Utf8 => { - let builder = builder - .as_any_mut() - .downcast_mut::>() - .ok_or_else(||SchemaError( - "Cast failed for ListBuilder during nested data parsing".to_string(), - ))?; - for val in vals { - if let Some(v) = val { - builder.values().append_value(&v) - } else { - builder.values().append_null() - }; - } - - // Append to the list - builder.append(true); - } - DataType::Dictionary(_, _) => { - let builder = builder.as_any_mut().downcast_mut::>>().ok_or_else(||SchemaError( - "Cast failed for ListBuilder during nested data parsing".to_string(), - ))?; - for val in vals { - if let Some(v) = val { - let _ = builder.values().append(&v)?; - } else { - builder.values().append_null() - }; - } - - // Append to the list - builder.append(true); - } - e => { - return Err(SchemaError(format!( - "Nested list data builder type is not supported: {e}" - ))); - } - } - } - } - - Ok(builder.finish() as ArrayRef) - } - - #[inline(always)] - fn build_dictionary_array( - &self, - rows: RecordSlice, - col_name: &str, - ) -> ArrowResult - where - T::Native: NumCast, - T: ArrowPrimitiveType + ArrowDictionaryKeyType, - { - let mut builder: StringDictionaryBuilder = - self.build_string_dictionary_builder(rows.len()); - for row in rows { - if let Some(value) = self.field_lookup(col_name, row) { - if let Ok(Some(str_v)) = resolve_string(value) { - builder.append(str_v).map(drop)? - } else { - builder.append_null() - } - } else { - builder.append_null() - } - } - Ok(Arc::new(builder.finish()) as ArrayRef) - } - - #[inline(always)] - fn build_string_dictionary_array( - &self, - rows: RecordSlice, - col_name: &str, - key_type: &DataType, - value_type: &DataType, - ) -> ArrowResult { - if let DataType::Utf8 = *value_type { - match *key_type { - DataType::Int8 => self.build_dictionary_array::(rows, col_name), - DataType::Int16 => { - self.build_dictionary_array::(rows, col_name) - } - DataType::Int32 => { - self.build_dictionary_array::(rows, col_name) - } - DataType::Int64 => { - self.build_dictionary_array::(rows, col_name) - } - DataType::UInt8 => { - self.build_dictionary_array::(rows, col_name) - } - DataType::UInt16 => { - self.build_dictionary_array::(rows, col_name) - } - DataType::UInt32 => { - self.build_dictionary_array::(rows, col_name) - } - DataType::UInt64 => { - self.build_dictionary_array::(rows, col_name) - } - _ => Err(SchemaError("unsupported dictionary key type".to_string())), - } - } else { - Err(SchemaError( - "dictionary types other than UTF-8 not yet supported".to_string(), - )) - } - } - - /// Build a nested GenericListArray from a list of unnested `Value`s - fn build_nested_list_array( - &self, - parent_field_name: &str, - rows: &[&Value], - list_field: &Field, - ) -> ArrowResult { - // build list offsets - let mut cur_offset = OffsetSize::zero(); - let list_len = rows.len(); - let num_list_bytes = bit_util::ceil(list_len, 8); - let mut offsets = Vec::with_capacity(list_len + 1); - let mut list_nulls = MutableBuffer::from_len_zeroed(num_list_bytes); - offsets.push(cur_offset); - rows.iter().enumerate().for_each(|(i, v)| { - // TODO: unboxing Union(Array(Union(...))) should probably be done earlier - let v = maybe_resolve_union(v); - if let Value::Array(a) = v { - cur_offset += OffsetSize::from_usize(a.len()).unwrap(); - bit_util::set_bit(&mut list_nulls, i); - } else if let Value::Null = v { - // value is null, not incremented - } else { - cur_offset += OffsetSize::one(); - } - offsets.push(cur_offset); - }); - let valid_len = cur_offset.to_usize().unwrap(); - let array_data = match list_field.data_type() { - DataType::Null => NullArray::new(valid_len).into_data(), - DataType::Boolean => { - let num_bytes = bit_util::ceil(valid_len, 8); - let mut bool_values = MutableBuffer::from_len_zeroed(num_bytes); - let mut bool_nulls = - MutableBuffer::new(num_bytes).with_bitset(num_bytes, true); - let mut curr_index = 0; - rows.iter().for_each(|v| { - if let Value::Array(vs) = v { - vs.iter().for_each(|value| { - if let Value::Boolean(child) = value { - // if valid boolean, append value - if *child { - bit_util::set_bit(&mut bool_values, curr_index); - } - } else { - // null slot - bit_util::unset_bit(&mut bool_nulls, curr_index); - } - curr_index += 1; - }); - } - }); - ArrayData::builder(list_field.data_type().clone()) - .len(valid_len) - .add_buffer(bool_values.into()) - .null_bit_buffer(Some(bool_nulls.into())) - .build() - .unwrap() - } - DataType::Int8 => self.read_primitive_list_values::(rows), - DataType::Int16 => self.read_primitive_list_values::(rows), - DataType::Int32 => self.read_primitive_list_values::(rows), - DataType::Int64 => self.read_primitive_list_values::(rows), - DataType::UInt8 => self.read_primitive_list_values::(rows), - DataType::UInt16 => self.read_primitive_list_values::(rows), - DataType::UInt32 => self.read_primitive_list_values::(rows), - DataType::UInt64 => self.read_primitive_list_values::(rows), - DataType::Float16 => { - return Err(SchemaError("Float16 not supported".to_string())); - } - DataType::Float32 => self.read_primitive_list_values::(rows), - DataType::Float64 => self.read_primitive_list_values::(rows), - DataType::Timestamp(_, _) - | DataType::Date32 - | DataType::Date64 - | DataType::Time32(_) - | DataType::Time64(_) => { - return Err(SchemaError( - "Temporal types are not yet supported, see ARROW-4803".to_string(), - )); - } - DataType::Utf8 => flatten_string_values(rows) - .into_iter() - .collect::() - .into_data(), - DataType::LargeUtf8 => flatten_string_values(rows) - .into_iter() - .collect::() - .into_data(), - DataType::List(field) => { - let child = self.build_nested_list_array::( - parent_field_name, - &flatten_values(rows), - field, - )?; - child.to_data() - } - DataType::LargeList(field) => { - let child = self.build_nested_list_array::( - parent_field_name, - &flatten_values(rows), - field, - )?; - child.to_data() - } - DataType::Struct(fields) => { - // extract list values, with non-lists converted to Value::Null - let array_item_count = rows - .iter() - .map(|row| match maybe_resolve_union(row) { - Value::Array(values) => values.len(), - _ => 1, - }) - .sum(); - let num_bytes = bit_util::ceil(array_item_count, 8); - let mut null_buffer = MutableBuffer::from_len_zeroed(num_bytes); - let mut struct_index = 0; - let null_struct_array = vec![("null".to_string(), Value::Null)]; - let rows: Vec<&Vec<(String, Value)>> = rows - .iter() - .map(|v| maybe_resolve_union(v)) - .flat_map(|row| { - if let Value::Array(values) = row { - values - .iter() - .map(maybe_resolve_union) - .map(|v| match v { - Value::Record(record) => { - bit_util::set_bit(&mut null_buffer, struct_index); - struct_index += 1; - record - } - Value::Null => { - struct_index += 1; - &null_struct_array - } - other => panic!("expected Record, got {other:?}"), - }) - .collect::>>() - } else { - struct_index += 1; - vec![&null_struct_array] - } - }) - .collect(); - - let arrays = self.build_struct_array(&rows, parent_field_name, fields)?; - let data_type = DataType::Struct(fields.clone()); - ArrayDataBuilder::new(data_type) - .len(rows.len()) - .null_bit_buffer(Some(null_buffer.into())) - .child_data(arrays.into_iter().map(|a| a.to_data()).collect()) - .build() - .unwrap() - } - datatype => { - return Err(SchemaError(format!( - "Nested list of {datatype} not supported" - ))); - } - }; - // build list - let list_data = ArrayData::builder(DataType::List(Arc::new(list_field.clone()))) - .len(list_len) - .add_buffer(Buffer::from_slice_ref(&offsets)) - .add_child_data(array_data) - .null_bit_buffer(Some(list_nulls.into())) - .build() - .unwrap(); - Ok(Arc::new(GenericListArray::::from(list_data))) - } - - /// Builds the child values of a `StructArray`, falling short of constructing the StructArray. - /// The function does not construct the StructArray as some callers would want the child arrays. - /// - /// *Note*: The function is recursive, and will read nested structs. - fn build_struct_array( - &self, - rows: RecordSlice, - parent_field_name: &str, - struct_fields: &Fields, - ) -> ArrowResult> { - let arrays: ArrowResult> = struct_fields - .iter() - .map(|field| { - let field_path = if parent_field_name.is_empty() { - field.name().to_string() - } else { - format!("{}.{}", parent_field_name, field.name()) - }; - let arr = match field.data_type() { - DataType::Null => Arc::new(NullArray::new(rows.len())) as ArrayRef, - DataType::Boolean => self.build_boolean_array(rows, &field_path), - DataType::Float64 => { - self.build_primitive_array::(rows, &field_path) - } - DataType::Float32 => { - self.build_primitive_array::(rows, &field_path) - } - DataType::Int64 => { - self.build_primitive_array::(rows, &field_path) - } - DataType::Int32 => { - self.build_primitive_array::(rows, &field_path) - } - DataType::Int16 => { - self.build_primitive_array::(rows, &field_path) - } - DataType::Int8 => { - self.build_primitive_array::(rows, &field_path) - } - DataType::UInt64 => { - self.build_primitive_array::(rows, &field_path) - } - DataType::UInt32 => { - self.build_primitive_array::(rows, &field_path) - } - DataType::UInt16 => { - self.build_primitive_array::(rows, &field_path) - } - DataType::UInt8 => { - self.build_primitive_array::(rows, &field_path) - } - // TODO: this is incomplete - DataType::Timestamp(unit, _) => match unit { - TimeUnit::Second => self - .build_primitive_array::( - rows, - &field_path, - ), - TimeUnit::Microsecond => self - .build_primitive_array::( - rows, - &field_path, - ), - TimeUnit::Millisecond => self - .build_primitive_array::( - rows, - &field_path, - ), - TimeUnit::Nanosecond => self - .build_primitive_array::( - rows, - &field_path, - ), - }, - DataType::Date64 => { - self.build_primitive_array::(rows, &field_path) - } - DataType::Date32 => { - self.build_primitive_array::(rows, &field_path) - } - DataType::Time64(unit) => match unit { - TimeUnit::Microsecond => self - .build_primitive_array::( - rows, - &field_path, - ), - TimeUnit::Nanosecond => self - .build_primitive_array::( - rows, - &field_path, - ), - t => { - return Err(SchemaError(format!( - "TimeUnit {t:?} not supported with Time64" - ))); - } - }, - DataType::Time32(unit) => match unit { - TimeUnit::Second => self - .build_primitive_array::(rows, &field_path), - TimeUnit::Millisecond => self - .build_primitive_array::( - rows, - &field_path, - ), - t => { - return Err(SchemaError(format!( - "TimeUnit {t:?} not supported with Time32" - ))); - } - }, - DataType::Utf8 | DataType::LargeUtf8 => Arc::new( - rows.iter() - .map(|row| { - let maybe_value = self.field_lookup(&field_path, row); - match maybe_value { - None => Ok(None), - Some(v) => resolve_string(v), - } - }) - .collect::>()?, - ) - as ArrayRef, - DataType::Binary | DataType::LargeBinary => Arc::new( - rows.iter() - .map(|row| { - let maybe_value = self.field_lookup(&field_path, row); - maybe_value.and_then(resolve_bytes) - }) - .collect::(), - ) - as ArrayRef, - DataType::FixedSizeBinary(size) => { - Arc::new(FixedSizeBinaryArray::try_from_sparse_iter_with_size( - rows.iter().map(|row| { - let maybe_value = self.field_lookup(&field_path, row); - maybe_value.and_then(|v| resolve_fixed(v, *size as usize)) - }), - *size, - )?) as ArrayRef - } - DataType::List(list_field) => { - match list_field.data_type() { - DataType::Dictionary(key_ty, _) => { - self.build_wrapped_list_array(rows, &field_path, key_ty)? - } - _ => { - // extract rows by name - let extracted_rows = rows - .iter() - .map(|row| { - self.field_lookup(&field_path, row) - .unwrap_or(&Value::Null) - }) - .collect::>(); - self.build_nested_list_array::( - &field_path, - &extracted_rows, - list_field, - )? - } - } - } - DataType::Dictionary(key_ty, val_ty) => self - .build_string_dictionary_array( - rows, - &field_path, - key_ty, - val_ty, - )?, - DataType::Struct(fields) => { - let len = rows.len(); - let num_bytes = bit_util::ceil(len, 8); - let mut null_buffer = MutableBuffer::from_len_zeroed(num_bytes); - let empty_vec = vec![]; - let struct_rows = rows - .iter() - .enumerate() - .map(|(i, row)| (i, self.field_lookup(&field_path, row))) - .map(|(i, v)| { - let v = v.map(maybe_resolve_union); - match v { - Some(Value::Record(value)) => { - bit_util::set_bit(&mut null_buffer, i); - value - } - None | Some(Value::Null) => &empty_vec, - other => { - panic!("expected struct got {other:?}"); - } - } - }) - .collect::>>(); - let arrays = - self.build_struct_array(&struct_rows, &field_path, fields)?; - // construct a struct array's data in order to set null buffer - let data_type = DataType::Struct(fields.clone()); - let data = ArrayDataBuilder::new(data_type) - .len(len) - .null_bit_buffer(Some(null_buffer.into())) - .child_data(arrays.into_iter().map(|a| a.to_data()).collect()) - .build()?; - make_array(data) - } - _ => { - return Err(SchemaError(format!( - "type {} not supported", - field.data_type() - ))); - } - }; - Ok(arr) - }) - .collect(); - arrays - } - - /// Read the primitive list's values into ArrayData - fn read_primitive_list_values(&self, rows: &[&Value]) -> ArrayData - where - T: ArrowPrimitiveType + ArrowNumericType, - T::Native: NumCast, - { - let values = rows - .iter() - .flat_map(|row| { - let row = maybe_resolve_union(row); - if let Value::Array(values) = row { - values - .iter() - .map(resolve_item::) - .collect::>>() - } else if let Some(f) = resolve_item::(row) { - vec![Some(f)] - } else { - vec![] - } - }) - .collect::>>(); - let array = values.iter().collect::>(); - array.to_data() - } - - fn field_lookup<'b>( - &self, - name: &str, - row: &'b [(String, Value)], - ) -> Option<&'b Value> { - self.schema_lookup - .get(name) - .and_then(|i| row.get(*i)) - .map(|o| &o.1) - } -} - -/// Flattens a list of Avro values, by flattening lists, and treating all other values as -/// single-value lists. -/// This is used to read into nested lists (list of list, list of struct) and non-dictionary lists. -#[inline] -fn flatten_values<'a>(values: &[&'a Value]) -> Vec<&'a Value> { - values - .iter() - .flat_map(|row| { - let v = maybe_resolve_union(row); - if let Value::Array(values) = v { - values.iter().collect() - } else { - // we interpret a scalar as a single-value list to minimise data loss - vec![v] - } - }) - .collect() -} - -/// Flattens a list into string values, dropping Value::Null in the process. -/// This is useful for interpreting any Avro array as string, dropping nulls. -/// See `value_as_string`. -#[inline] -fn flatten_string_values(values: &[&Value]) -> Vec> { - values - .iter() - .flat_map(|row| { - let row = maybe_resolve_union(row); - if let Value::Array(values) = row { - values - .iter() - .map(|s| resolve_string(s).ok().flatten()) - .collect::>>() - } else if let Value::Null = row { - vec![] - } else { - vec![resolve_string(row).ok().flatten()] - } - }) - .collect::>>() -} - -/// Reads an Avro value as a string, regardless of its type. -/// This is useful if the expected datatype is a string, in which case we preserve -/// all the values regardless of they type. -fn resolve_string(v: &Value) -> ArrowResult> { - let v = if let Value::Union(_, b) = v { b } else { v }; - match v { - Value::String(s) => Ok(Some(s.clone())), - Value::Bytes(bytes) => String::from_utf8(bytes.to_vec()) - .map_err(|e| AvroError::new(AvroErrorDetails::ConvertToUtf8(e))) - .map(Some), - Value::Enum(_, s) => Ok(Some(s.clone())), - Value::Null => Ok(None), - other => Err(AvroError::new(AvroErrorDetails::GetString(other.clone()))), - } - .map_err(|e| SchemaError(format!("expected resolvable string : {e}"))) -} - -fn resolve_u8(v: &Value) -> Option { - let v = match v { - Value::Union(_, inner) => inner.as_ref(), - _ => v, - }; - - match v { - Value::Int(n) => u8::try_from(*n).ok(), - Value::Long(n) => u8::try_from(*n).ok(), - _ => None, - } -} - -fn resolve_bytes(v: &Value) -> Option> { - let v = match v { - Value::Union(_, inner) => inner.as_ref(), - _ => v, - }; - - match v { - Value::Bytes(bytes) => Some(bytes.clone()), - Value::String(s) => Some(s.as_bytes().to_vec()), - Value::Array(items) => items.iter().map(resolve_u8).collect::>>(), - _ => None, - } -} - -fn resolve_fixed(v: &Value, size: usize) -> Option> { - let v = if let Value::Union(_, b) = v { b } else { v }; - match v { - Value::Fixed(n, bytes) => { - if *n == size { - Some(bytes.clone()) - } else { - None - } - } - _ => None, - } -} - -fn resolve_boolean(value: &Value) -> Option { - let v = if let Value::Union(_, b) = value { - b - } else { - value - }; - match v { - Value::Boolean(boolean) => Some(*boolean), - _ => None, - } -} - -trait Resolver: ArrowPrimitiveType { - fn resolve(value: &Value) -> Option; -} - -fn resolve_item(value: &Value) -> Option { - T::resolve(value) -} - -fn maybe_resolve_union(value: &Value) -> &Value { - if SchemaKind::from(value) == SchemaKind::Union { - // Pull out the Union, and attempt to resolve against it. - match value { - Value::Union(_, b) => b, - _ => unreachable!(), - } - } else { - value - } -} - -impl Resolver for N -where - N: ArrowNumericType, - N::Native: NumCast, -{ - fn resolve(value: &Value) -> Option { - let value = maybe_resolve_union(value); - match value { - Value::Int(i) | Value::TimeMillis(i) | Value::Date(i) => NumCast::from(*i), - Value::Long(l) - | Value::TimeMicros(l) - | Value::TimestampMillis(l) - | Value::TimestampMicros(l) => NumCast::from(*l), - Value::Float(f) => NumCast::from(*f), - Value::Double(f) => NumCast::from(*f), - Value::Duration(_d) => unimplemented!(), // shenanigans type - Value::Null => None, - _ => unreachable!(), - } - } -} - -#[cfg(test)] -mod test { - use crate::avro_to_arrow::{Reader, ReaderBuilder}; - use arrow::array::Array; - use arrow::datatypes::{DataType, Fields}; - use arrow::datatypes::{Field, TimeUnit}; - use datafusion_common::assert_batches_eq; - use datafusion_common::cast::{ - as_int32_array, as_int64_array, as_list_array, as_timestamp_microsecond_array, - }; - use std::fs::File; - use std::sync::Arc; - - fn build_reader(name: &'_ str, batch_size: usize) -> Reader<'_, File> { - let testdata = datafusion_common::test_util::arrow_test_data(); - let filename = format!("{testdata}/avro/{name}"); - let builder = ReaderBuilder::new() - .read_schema() - .with_batch_size(batch_size); - builder.build(File::open(filename).unwrap()).unwrap() - } - - // TODO: Fixed, Enum, Dictionary - - #[test] - fn test_time_avro_milliseconds() { - let mut reader = build_reader("alltypes_plain.avro", 10); - let batch = reader.next().unwrap().unwrap(); - - assert_eq!(11, batch.num_columns()); - assert_eq!(8, batch.num_rows()); - - let schema = reader.schema(); - let batch_schema = batch.schema(); - assert_eq!(schema, batch_schema); - - let timestamp_col = schema.column_with_name("timestamp_col").unwrap(); - assert_eq!( - &DataType::Timestamp(TimeUnit::Microsecond, None), - timestamp_col.1.data_type() - ); - let timestamp_array = - as_timestamp_microsecond_array(batch.column(timestamp_col.0)).unwrap(); - for i in 0..timestamp_array.len() { - assert!(timestamp_array.is_valid(i)); - } - assert_eq!(1235865600000000, timestamp_array.value(0)); - assert_eq!(1235865660000000, timestamp_array.value(1)); - assert_eq!(1238544000000000, timestamp_array.value(2)); - assert_eq!(1238544060000000, timestamp_array.value(3)); - assert_eq!(1233446400000000, timestamp_array.value(4)); - assert_eq!(1233446460000000, timestamp_array.value(5)); - assert_eq!(1230768000000000, timestamp_array.value(6)); - assert_eq!(1230768060000000, timestamp_array.value(7)); - } - - #[test] - fn test_avro_read_list() { - let mut reader = build_reader("list_columns.avro", 3); - let schema = reader.schema(); - let (col_id_index, _) = schema.column_with_name("int64_list").unwrap(); - let batch = reader.next().unwrap().unwrap(); - assert_eq!(batch.num_columns(), 2); - assert_eq!(batch.num_rows(), 3); - let a_array = as_list_array(batch.column(col_id_index)).unwrap(); - assert_eq!( - *a_array.data_type(), - DataType::List(Arc::new(Field::new("element", DataType::Int64, true))) - ); - let array = a_array.value(0); - assert_eq!(*array.data_type(), DataType::Int64); - - assert_eq!( - 6, - as_int64_array(&array) - .unwrap() - .iter() - .flatten() - .sum::() - ); - } - #[test] - fn test_avro_read_nested_list() { - let mut reader = build_reader("nested_lists.snappy.avro", 3); - let batch = reader.next().unwrap().unwrap(); - assert_eq!(batch.num_columns(), 2); - assert_eq!(batch.num_rows(), 3); - } - - #[test] - fn test_complex_list() { - let schema = apache_avro::Schema::parse_str( - r#" - { - "type": "record", - "name": "r1", - "fields": [ - { - "name": "headers", - "type": ["null", { - "type": "array", - "items": ["null",{ - "name":"r2", - "type": "record", - "fields":[ - {"name":"name", "type": ["null", "string"], "default": null}, - {"name":"value", "type": ["null", "string"], "default": null} - ] - }] - }], - "default": null - } - ] - }"#, - ) - .unwrap(); - let r1 = apache_avro::to_value(serde_json::json!({ - "headers": [ - { - "name": "a", - "value": "b" - } - ] - })) - .unwrap() - .resolve(&schema) - .unwrap(); - - let mut w = apache_avro::Writer::new(&schema, vec![]); - w.append(r1).unwrap(); - let bytes = w.into_inner().unwrap(); - - let mut reader = ReaderBuilder::new() - .read_schema() - .with_batch_size(2) - .build(std::io::Cursor::new(bytes)) - .unwrap(); - - let batch = reader.next().unwrap().unwrap(); - assert_eq!(batch.num_rows(), 1); - assert_eq!(batch.num_columns(), 1); - let expected = [ - "+-----------------------+", - "| headers |", - "+-----------------------+", - "| [{name: a, value: b}] |", - "+-----------------------+", - ]; - assert_batches_eq!(expected, &[batch]); - } - - #[test] - fn test_complex_struct() { - let schema = apache_avro::Schema::parse_str( - r#" - { - "type": "record", - "name": "r1", - "fields": [ - { - "name": "dns", - "type": [ - "null", - { - "type": "record", - "name": "r13", - "fields": [ - { - "name": "answers", - "type": [ - "null", - { - "type": "array", - "items": [ - "null", - { - "type": "record", - "name": "r292", - "fields": [ - { - "name": "class", - "type": ["null", "string"], - "default": null - }, - { - "name": "data", - "type": ["null", "string"], - "default": null - }, - { - "name": "name", - "type": ["null", "string"], - "default": null - }, - { - "name": "ttl", - "type": ["null", "long"], - "default": null - }, - { - "name": "type", - "type": ["null", "string"], - "default": null - } - ] - } - ] - } - ], - "default": null - }, - { - "name": "header_flags", - "type": [ - "null", - { - "type": "array", - "items": ["null", "string"] - } - ], - "default": null - }, - { - "name": "id", - "type": ["null", "string"], - "default": null - }, - { - "name": "op_code", - "type": ["null", "string"], - "default": null - }, - { - "name": "question", - "type": [ - "null", - { - "type": "record", - "name": "r288", - "fields": [ - { - "name": "class", - "type": ["null", "string"], - "default": null - }, - { - "name": "name", - "type": ["null", "string"], - "default": null - }, - { - "name": "registered_domain", - "type": ["null", "string"], - "default": null - }, - { - "name": "subdomain", - "type": ["null", "string"], - "default": null - }, - { - "name": "top_level_domain", - "type": ["null", "string"], - "default": null - }, - { - "name": "type", - "type": ["null", "string"], - "default": null - } - ] - } - ], - "default": null - }, - { - "name": "resolved_ip", - "type": [ - "null", - { - "type": "array", - "items": ["null", "string"] - } - ], - "default": null - }, - { - "name": "response_code", - "type": ["null", "string"], - "default": null - }, - { - "name": "type", - "type": ["null", "string"], - "default": null - } - ] - } - ], - "default": null - } - ] - }"#, - ) - .unwrap(); - - let jv1 = serde_json::json!({ - "dns": { - "answers": [ - { - "data": "CHNlY3VyaXR5BnVidW50dQMjb20AAAEAAQAAAAgABLl9vic=", - "type": "1" - }, - { - "data": "CHNlY3VyaXR5BnVidW50dQNjb20AAAEAABAAAAgABLl9viQ=", - "type": "1" - }, - { - "data": "CHNlT3VyaXR5BnVidW50dQNjb20AAAEAAQAAAAgABFu9Wyc=", - "type": "1" - } - ], - "question": { - "name": "security.ubuntu.com", - "type": "A" - }, - "resolved_ip": [ - "67.43.156.1", - "67.43.156.2", - "67.43.156.3" - ], - "response_code": "0" - } - }); - let r1 = apache_avro::to_value(jv1) - .unwrap() - .resolve(&schema) - .unwrap(); - - let mut w = apache_avro::Writer::new(&schema, vec![]); - w.append(r1).unwrap(); - let bytes = w.into_inner().unwrap(); - - let mut reader = ReaderBuilder::new() - .read_schema() - .with_batch_size(1) - .build(std::io::Cursor::new(bytes)) - .unwrap(); - - let batch = reader.next().unwrap().unwrap(); - assert_eq!(batch.num_rows(), 1); - assert_eq!(batch.num_columns(), 1); - - let expected = [ - "+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+", - "| dns |", - "+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+", - "| {answers: [{class: , data: CHNlY3VyaXR5BnVidW50dQMjb20AAAEAAQAAAAgABLl9vic=, name: , ttl: , type: 1}, {class: , data: CHNlY3VyaXR5BnVidW50dQNjb20AAAEAABAAAAgABLl9viQ=, name: , ttl: , type: 1}, {class: , data: CHNlT3VyaXR5BnVidW50dQNjb20AAAEAAQAAAAgABFu9Wyc=, name: , ttl: , type: 1}], header_flags: , id: , op_code: , question: {class: , name: security.ubuntu.com, registered_domain: , subdomain: , top_level_domain: , type: A}, resolved_ip: [67.43.156.1, 67.43.156.2, 67.43.156.3], response_code: 0, type: } |", - "+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+", - ]; - assert_batches_eq!(expected, &[batch]); - } - - #[test] - fn test_deep_nullable_struct() { - let schema = apache_avro::Schema::parse_str( - r#" - { - "type": "record", - "name": "r1", - "fields": [ - { - "name": "col1", - "type": [ - "null", - { - "type": "record", - "name": "r2", - "fields": [ - { - "name": "col2", - "type": [ - "null", - { - "type": "record", - "name": "r3", - "fields": [ - { - "name": "col3", - "type": [ - "null", - { - "type": "record", - "name": "r4", - "fields": [ - { - "name": "col4", - "type": [ - "null", - { - "type": "record", - "name": "r5", - "fields": [ - { - "name": "col5", - "type": ["null", "string"] - } - ] - } - ] - } - ] - } - ] - } - ] - } - ] - } - ] - } - ] - } - ] - } - "#, - ) - .unwrap(); - let r1 = apache_avro::to_value(serde_json::json!({ - "col1": { - "col2": { - "col3": { - "col4": { - "col5": "hello" - } - } - } - } - })) - .unwrap() - .resolve(&schema) - .unwrap(); - let r2 = apache_avro::to_value(serde_json::json!({ - "col1": { - "col2": { - "col3": { - "col4": { - "col5": null - } - } - } - } - })) - .unwrap() - .resolve(&schema) - .unwrap(); - let r3 = apache_avro::to_value(serde_json::json!({ - "col1": { - "col2": { - "col3": null - } - } - })) - .unwrap() - .resolve(&schema) - .unwrap(); - let r4 = apache_avro::to_value(serde_json::json!({ "col1": null })) - .unwrap() - .resolve(&schema) - .unwrap(); - - let mut w = apache_avro::Writer::new(&schema, vec![]); - w.append(r1).unwrap(); - w.append(r2).unwrap(); - w.append(r3).unwrap(); - w.append(r4).unwrap(); - let bytes = w.into_inner().unwrap(); - - let mut reader = ReaderBuilder::new() - .read_schema() - .with_batch_size(4) - .build(std::io::Cursor::new(bytes)) - .unwrap(); - - let batch = reader.next().unwrap().unwrap(); - - let expected = [ - "+---------------------------------------+", - "| col1 |", - "+---------------------------------------+", - "| {col2: {col3: {col4: {col5: hello}}}} |", - "| {col2: {col3: {col4: {col5: }}}} |", - "| {col2: {col3: }} |", - "| |", - "+---------------------------------------+", - ]; - assert_batches_eq!(expected, &[batch]); - } - - #[test] - fn test_avro_nullable_struct() { - let schema = apache_avro::Schema::parse_str( - r#" - { - "type": "record", - "name": "r1", - "fields": [ - { - "name": "col1", - "type": [ - "null", - { - "type": "record", - "name": "r2", - "fields": [ - { - "name": "col2", - "type": ["null", "string"] - } - ] - } - ], - "default": null - } - ] - }"#, - ) - .unwrap(); - let r1 = apache_avro::to_value(serde_json::json!({ "col1": null })) - .unwrap() - .resolve(&schema) - .unwrap(); - let r2 = apache_avro::to_value(serde_json::json!({ - "col1": { - "col2": "hello" - } - })) - .unwrap() - .resolve(&schema) - .unwrap(); - let r3 = apache_avro::to_value(serde_json::json!({ - "col1": { - "col2": null - } - })) - .unwrap() - .resolve(&schema) - .unwrap(); - - let mut w = apache_avro::Writer::new(&schema, vec![]); - w.append(r1).unwrap(); - w.append(r2).unwrap(); - w.append(r3).unwrap(); - let bytes = w.into_inner().unwrap(); - - let mut reader = ReaderBuilder::new() - .read_schema() - .with_batch_size(3) - .build(std::io::Cursor::new(bytes)) - .unwrap(); - let batch = reader.next().unwrap().unwrap(); - assert_eq!(batch.num_rows(), 3); - assert_eq!(batch.num_columns(), 1); - - let expected = [ - "+---------------+", - "| col1 |", - "+---------------+", - "| |", - "| {col2: hello} |", - "| {col2: } |", - "+---------------+", - ]; - assert_batches_eq!(expected, &[batch]); - } - - #[test] - fn test_avro_nullable_struct_array() { - let schema = apache_avro::Schema::parse_str( - r#" - { - "type": "record", - "name": "r1", - "fields": [ - { - "name": "col1", - "type": [ - "null", - { - "type": "array", - "items": { - "type": [ - "null", - { - "type": "record", - "name": "Item", - "fields": [ - { - "name": "id", - "type": "long" - } - ] - } - ] - } - } - ], - "default": null - } - ] - }"#, - ) - .unwrap(); - let jv1 = serde_json::json!({ - "col1": [ - { - "id": 234 - }, - { - "id": 345 - } - ] - }); - let r1 = apache_avro::to_value(jv1) - .unwrap() - .resolve(&schema) - .unwrap(); - let r2 = apache_avro::to_value(serde_json::json!({ "col1": null })) - .unwrap() - .resolve(&schema) - .unwrap(); - - let mut w = apache_avro::Writer::new(&schema, vec![]); - for _i in 0..5 { - w.append(r1.clone()).unwrap(); - } - w.append(r2).unwrap(); - let bytes = w.into_inner().unwrap(); - - let mut reader = ReaderBuilder::new() - .read_schema() - .with_batch_size(20) - .build(std::io::Cursor::new(bytes)) - .unwrap(); - let batch = reader.next().unwrap().unwrap(); - assert_eq!(batch.num_rows(), 6); - assert_eq!(batch.num_columns(), 1); - - let expected = [ - "+------------------------+", - "| col1 |", - "+------------------------+", - "| [{id: 234}, {id: 345}] |", - "| [{id: 234}, {id: 345}] |", - "| [{id: 234}, {id: 345}] |", - "| [{id: 234}, {id: 345}] |", - "| [{id: 234}, {id: 345}] |", - "| |", - "+------------------------+", - ]; - assert_batches_eq!(expected, &[batch]); - } - - #[test] - fn test_avro_iterator() { - let reader = build_reader("alltypes_plain.avro", 5); - let schema = reader.schema(); - let (col_id_index, _) = schema.column_with_name("id").unwrap(); - - let mut sum_num_rows = 0; - let mut num_batches = 0; - let mut sum_id = 0; - for batch in reader { - let batch = batch.unwrap(); - assert_eq!(11, batch.num_columns()); - sum_num_rows += batch.num_rows(); - num_batches += 1; - let batch_schema = batch.schema(); - assert_eq!(schema, batch_schema); - let a_array = as_int32_array(batch.column(col_id_index)).unwrap(); - sum_id += (0..a_array.len()).map(|i| a_array.value(i)).sum::(); - } - assert_eq!(8, sum_num_rows); - assert_eq!(2, num_batches); - assert_eq!(28, sum_id); - } - - #[test] - fn test_list_of_structs_with_custom_field_name() { - let schema = apache_avro::Schema::parse_str( - r#" - { - "type": "record", - "name": "root", - "fields": [ - { - "name": "items", - "type": { - "type": "array", - "items": { - "type": "record", - "name": "item_record", - "fields": [ - { - "name": "id", - "type": "long" - }, - { - "name": "name", - "type": "string" - } - ] - } - } - } - ] - }"#, - ) - .unwrap(); - - let r1 = apache_avro::to_value(serde_json::json!({ - "items": [ - { - "id": 1, - "name": "first" - }, - { - "id": 2, - "name": "second" - } - ] - })) - .unwrap() - .resolve(&schema) - .unwrap(); - - let mut w = apache_avro::Writer::new(&schema, vec![]); - w.append(r1).unwrap(); - let bytes = w.into_inner().unwrap(); - - // Create an Arrow schema where the list field is NOT named "element" - let arrow_schema = Arc::new(arrow::datatypes::Schema::new(vec![Field::new( - "items", - DataType::List(Arc::new(Field::new( - "item", // This is NOT "element" - DataType::Struct(Fields::from(vec![ - Field::new("id", DataType::Int64, false), - Field::new("name", DataType::Utf8, false), - ])), - false, - ))), - false, - )])); - - let mut reader = ReaderBuilder::new() - .with_schema(arrow_schema) - .with_batch_size(10) - .build(std::io::Cursor::new(bytes)) - .unwrap(); - - // This used to fail because schema_lookup would have "items.element.id" and "items.element.name" - // but build_struct_array will try to look up "items.item.id" and "items.item.name", - // Now it it is simply "items.id" and "items.name" - let batch = reader.next().unwrap().unwrap(); - - let expected = [ - "+-----------------------------------------------+", - "| items |", - "+-----------------------------------------------+", - "| [{id: 1, name: first}, {id: 2, name: second}] |", - "+-----------------------------------------------+", - ]; - assert_batches_eq!(expected, &[batch]); - } -} diff --git a/datafusion/datasource-avro/src/avro_to_arrow/mod.rs b/datafusion/datasource-avro/src/avro_to_arrow/mod.rs deleted file mode 100644 index c1530a4880205..0000000000000 --- a/datafusion/datasource-avro/src/avro_to_arrow/mod.rs +++ /dev/null @@ -1,39 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! This module contains code for reading [Avro] data into `RecordBatch`es -//! -//! [Avro]: https://avro.apache.org/docs/1.2.0/ - -mod arrow_array_reader; -mod reader; -mod schema; - -use arrow::datatypes::Schema; -pub use reader::{Reader, ReaderBuilder}; - -pub use schema::to_arrow_schema; -use std::io::Read; - -/// Read Avro schema given a reader -pub fn read_avro_schema_from_reader( - reader: &mut R, -) -> datafusion_common::Result { - let avro_reader = apache_avro::Reader::new(reader)?; - let schema = avro_reader.writer_schema(); - to_arrow_schema(schema) -} diff --git a/datafusion/datasource-avro/src/avro_to_arrow/reader.rs b/datafusion/datasource-avro/src/avro_to_arrow/reader.rs deleted file mode 100644 index bd96b47aea9e6..0000000000000 --- a/datafusion/datasource-avro/src/avro_to_arrow/reader.rs +++ /dev/null @@ -1,353 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use super::arrow_array_reader::AvroArrowArrayReader; -use arrow::datatypes::{Fields, SchemaRef}; -use arrow::error::Result as ArrowResult; -use arrow::record_batch::RecordBatch; -use datafusion_common::Result; -use std::io::{Read, Seek}; -use std::sync::Arc; - -/// Avro file reader builder -#[derive(Debug)] -pub struct ReaderBuilder { - /// Optional schema for the Avro file - /// - /// If the schema is not supplied, the reader will try to read the schema. - schema: Option, - /// Batch size (number of records to load each time) - /// - /// The default batch size when using the `ReaderBuilder` is 1024 records - batch_size: usize, - /// Optional projection for which columns to load (zero-based column indices) - projection: Option>, -} - -impl Default for ReaderBuilder { - fn default() -> Self { - Self { - schema: None, - batch_size: 1024, - projection: None, - } - } -} - -impl ReaderBuilder { - /// Create a new builder for configuring Avro parsing options. - /// - /// To convert a builder into a reader, call `Reader::from_builder` - /// - /// # Example - /// - /// ``` - /// use std::fs::File; - /// - /// use datafusion_datasource_avro::avro_to_arrow::{Reader, ReaderBuilder}; - /// - /// fn example() -> Reader<'static, File> { - /// let file = File::open("test/data/basic.avro").unwrap(); - /// - /// // create a builder, inferring the schema with the first 100 records - /// let builder = ReaderBuilder::new().read_schema().with_batch_size(100); - /// - /// let reader = builder.build::(file).unwrap(); - /// - /// reader - /// } - /// ``` - pub fn new() -> Self { - Self::default() - } - - /// Set the Avro file's schema - pub fn with_schema(mut self, schema: SchemaRef) -> Self { - self.schema = Some(schema); - self - } - - /// Set the Avro reader to infer the schema of the file - pub fn read_schema(mut self) -> Self { - // remove any schema that is set - self.schema = None; - self - } - - /// Set the batch size (number of records to load at one time) - pub fn with_batch_size(mut self, batch_size: usize) -> Self { - self.batch_size = batch_size; - self - } - - /// Set the reader's column projection - pub fn with_projection(mut self, projection: Vec) -> Self { - self.projection = Some(projection); - self - } - - /// Create a new `Reader` from the `ReaderBuilder` - pub fn build<'a, R>(self, source: R) -> Result> - where - R: Read + Seek, - { - let mut source = source; - - // check if schema should be inferred - let schema = match self.schema { - Some(schema) => schema, - None => Arc::new(super::read_avro_schema_from_reader(&mut source)?), - }; - source.rewind()?; - Reader::try_new(source, &schema, self.batch_size, self.projection.as_ref()) - } -} - -/// Avro file record reader -pub struct Reader<'a, R: Read> { - array_reader: AvroArrowArrayReader<'a, R>, - schema: SchemaRef, - batch_size: usize, -} - -impl Reader<'_, R> { - /// Create a new Avro Reader from any value that implements the `Read` trait. - /// - /// If reading a `File`, you can customise the Reader, such as to enable schema - /// inference, use `ReaderBuilder`. - /// - /// If projection is provided, it uses a schema with only the fields in the projection, respecting their order. - /// Only the first level of projection is handled. No further projection currently occurs, but would be - /// useful if plucking values from a struct, e.g. getting `a.b.c.e` from `a.b.c.{d, e}`. - pub fn try_new( - reader: R, - schema: &SchemaRef, - batch_size: usize, - projection: Option<&Vec>, - ) -> Result { - let projected_schema = projection.as_ref().filter(|p| !p.is_empty()).map_or_else( - || Arc::clone(schema), - |proj| { - Arc::new(arrow::datatypes::Schema::new( - proj.iter() - .filter_map(|name| { - schema.column_with_name(name).map(|(_, f)| f.clone()) - }) - .collect::(), - )) - }, - ); - - Ok(Self { - array_reader: AvroArrowArrayReader::try_new( - reader, - Arc::clone(&projected_schema), - )?, - schema: projected_schema, - batch_size, - }) - } - - /// Returns the schema of the reader, useful for getting the schema without reading - /// record batches - pub fn schema(&self) -> SchemaRef { - Arc::clone(&self.schema) - } -} - -impl Iterator for Reader<'_, R> { - type Item = ArrowResult; - - /// Returns the next batch of results (defined by `self.batch_size`), or `None` if there - /// are no more results. - fn next(&mut self) -> Option { - self.array_reader.next_batch(self.batch_size) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use arrow::array::*; - use arrow::array::{ - BinaryArray, BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array, - TimestampMicrosecondArray, - }; - use arrow::datatypes::TimeUnit; - use arrow::datatypes::{DataType, Field}; - use std::fs::File; - - fn build_reader(name: &'_ str, projection: Option>) -> Reader<'_, File> { - let testdata = datafusion_common::test_util::arrow_test_data(); - let filename = format!("{testdata}/avro/{name}"); - let mut builder = ReaderBuilder::new().read_schema().with_batch_size(64); - if let Some(projection) = projection { - builder = builder.with_projection(projection); - } - builder.build(File::open(filename).unwrap()).unwrap() - } - - fn get_col<'a, T: 'static>( - batch: &'a RecordBatch, - col: (usize, &Field), - ) -> Option<&'a T> { - batch.column(col.0).as_any().downcast_ref::() - } - - #[test] - fn test_avro_basic() { - let mut reader = build_reader("alltypes_dictionary.avro", None); - let batch = reader.next().unwrap().unwrap(); - - assert_eq!(11, batch.num_columns()); - assert_eq!(2, batch.num_rows()); - - let schema = reader.schema(); - let batch_schema = batch.schema(); - assert_eq!(schema, batch_schema); - - let id = schema.column_with_name("id").unwrap(); - assert_eq!(0, id.0); - assert_eq!(&DataType::Int32, id.1.data_type()); - let col = get_col::(&batch, id).unwrap(); - assert_eq!(0, col.value(0)); - assert_eq!(1, col.value(1)); - let bool_col = schema.column_with_name("bool_col").unwrap(); - assert_eq!(1, bool_col.0); - assert_eq!(&DataType::Boolean, bool_col.1.data_type()); - let col = get_col::(&batch, bool_col).unwrap(); - assert!(col.value(0)); - assert!(!col.value(1)); - let tinyint_col = schema.column_with_name("tinyint_col").unwrap(); - assert_eq!(2, tinyint_col.0); - assert_eq!(&DataType::Int32, tinyint_col.1.data_type()); - let col = get_col::(&batch, tinyint_col).unwrap(); - assert_eq!(0, col.value(0)); - assert_eq!(1, col.value(1)); - let smallint_col = schema.column_with_name("smallint_col").unwrap(); - assert_eq!(3, smallint_col.0); - assert_eq!(&DataType::Int32, smallint_col.1.data_type()); - let col = get_col::(&batch, smallint_col).unwrap(); - assert_eq!(0, col.value(0)); - assert_eq!(1, col.value(1)); - let int_col = schema.column_with_name("int_col").unwrap(); - assert_eq!(4, int_col.0); - let col = get_col::(&batch, int_col).unwrap(); - assert_eq!(0, col.value(0)); - assert_eq!(1, col.value(1)); - assert_eq!(&DataType::Int32, int_col.1.data_type()); - let col = get_col::(&batch, int_col).unwrap(); - assert_eq!(0, col.value(0)); - assert_eq!(1, col.value(1)); - let bigint_col = schema.column_with_name("bigint_col").unwrap(); - assert_eq!(5, bigint_col.0); - let col = get_col::(&batch, bigint_col).unwrap(); - assert_eq!(0, col.value(0)); - assert_eq!(10, col.value(1)); - assert_eq!(&DataType::Int64, bigint_col.1.data_type()); - let float_col = schema.column_with_name("float_col").unwrap(); - assert_eq!(6, float_col.0); - let col = get_col::(&batch, float_col).unwrap(); - assert_eq!(0.0, col.value(0)); - assert_eq!(1.1, col.value(1)); - assert_eq!(&DataType::Float32, float_col.1.data_type()); - let col = get_col::(&batch, float_col).unwrap(); - assert_eq!(0.0, col.value(0)); - assert_eq!(1.1, col.value(1)); - let double_col = schema.column_with_name("double_col").unwrap(); - assert_eq!(7, double_col.0); - assert_eq!(&DataType::Float64, double_col.1.data_type()); - let col = get_col::(&batch, double_col).unwrap(); - assert_eq!(0.0, col.value(0)); - assert_eq!(10.1, col.value(1)); - let date_string_col = schema.column_with_name("date_string_col").unwrap(); - assert_eq!(8, date_string_col.0); - assert_eq!(&DataType::Binary, date_string_col.1.data_type()); - let col = get_col::(&batch, date_string_col).unwrap(); - assert_eq!("01/01/09".as_bytes(), col.value(0)); - assert_eq!("01/01/09".as_bytes(), col.value(1)); - let string_col = schema.column_with_name("string_col").unwrap(); - assert_eq!(9, string_col.0); - assert_eq!(&DataType::Binary, string_col.1.data_type()); - let col = get_col::(&batch, string_col).unwrap(); - assert_eq!("0".as_bytes(), col.value(0)); - assert_eq!("1".as_bytes(), col.value(1)); - let timestamp_col = schema.column_with_name("timestamp_col").unwrap(); - assert_eq!(10, timestamp_col.0); - assert_eq!( - &DataType::Timestamp(TimeUnit::Microsecond, None), - timestamp_col.1.data_type() - ); - let col = get_col::(&batch, timestamp_col).unwrap(); - assert_eq!(1230768000000000, col.value(0)); - assert_eq!(1230768060000000, col.value(1)); - } - - #[test] - fn test_avro_with_projection() { - // Test projection to filter and reorder columns - let projection = Some(vec![ - "string_col".to_string(), - "double_col".to_string(), - "bool_col".to_string(), - ]); - let mut reader = build_reader("alltypes_dictionary.avro", projection); - let batch = reader.next().unwrap().unwrap(); - - // Only 3 columns should be present (not all 11) - assert_eq!(3, batch.num_columns()); - assert_eq!(2, batch.num_rows()); - - let schema = reader.schema(); - let batch_schema = batch.schema(); - assert_eq!(schema, batch_schema); - - // Verify columns are in the order specified in projection - // First column should be string_col (was at index 9 in original) - assert_eq!("string_col", schema.field(0).name()); - assert_eq!(&DataType::Binary, schema.field(0).data_type()); - let col = batch - .column(0) - .as_any() - .downcast_ref::() - .unwrap(); - assert_eq!("0".as_bytes(), col.value(0)); - assert_eq!("1".as_bytes(), col.value(1)); - - // Second column should be double_col (was at index 7 in original) - assert_eq!("double_col", schema.field(1).name()); - assert_eq!(&DataType::Float64, schema.field(1).data_type()); - let col = batch - .column(1) - .as_any() - .downcast_ref::() - .unwrap(); - assert_eq!(0.0, col.value(0)); - assert_eq!(10.1, col.value(1)); - - // Third column should be bool_col (was at index 1 in original) - assert_eq!("bool_col", schema.field(2).name()); - assert_eq!(&DataType::Boolean, schema.field(2).data_type()); - let col = batch - .column(2) - .as_any() - .downcast_ref::() - .unwrap(); - assert!(col.value(0)); - assert!(!col.value(1)); - } -} diff --git a/datafusion/datasource-avro/src/avro_to_arrow/schema.rs b/datafusion/datasource-avro/src/avro_to_arrow/schema.rs deleted file mode 100644 index 0e8f2a4d56088..0000000000000 --- a/datafusion/datasource-avro/src/avro_to_arrow/schema.rs +++ /dev/null @@ -1,517 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use apache_avro::Schema as AvroSchema; -use apache_avro::schema::{ - Alias, DecimalSchema, EnumSchema, FixedSchema, Name, RecordSchema, -}; -use apache_avro::types::Value; -use arrow::datatypes::{DataType, IntervalUnit, Schema, TimeUnit, UnionMode}; -use arrow::datatypes::{Field, UnionFields}; -use datafusion_common::error::Result; -use std::collections::HashMap; -use std::sync::Arc; - -/// Converts an avro schema to an arrow schema -pub fn to_arrow_schema(avro_schema: &apache_avro::Schema) -> Result { - let mut schema_fields = vec![]; - match avro_schema { - AvroSchema::Record(RecordSchema { fields, .. }) => { - for field in fields { - schema_fields.push(schema_to_field_with_props( - &field.schema, - Some(&field.name), - field.is_nullable(), - Some(external_props(&field.schema)), - )?) - } - } - schema => schema_fields.push(schema_to_field(schema, Some(""), false)?), - } - - let schema = Schema::new(schema_fields); - Ok(schema) -} - -fn schema_to_field( - schema: &apache_avro::Schema, - name: Option<&str>, - nullable: bool, -) -> Result { - schema_to_field_with_props(schema, name, nullable, Default::default()) -} - -fn schema_to_field_with_props( - schema: &AvroSchema, - name: Option<&str>, - nullable: bool, - props: Option>, -) -> Result { - let mut nullable = nullable; - let field_type: DataType = match schema { - AvroSchema::Ref { .. } => todo!("Add support for AvroSchema::Ref"), - AvroSchema::Null => DataType::Null, - AvroSchema::Boolean => DataType::Boolean, - AvroSchema::Int => DataType::Int32, - AvroSchema::Long => DataType::Int64, - AvroSchema::Float => DataType::Float32, - AvroSchema::Double => DataType::Float64, - AvroSchema::Bytes => DataType::Binary, - AvroSchema::String => DataType::Utf8, - AvroSchema::Array(item_schema) => DataType::List(Arc::new( - schema_to_field_with_props(&item_schema.items, Some("element"), false, None)?, - )), - AvroSchema::Map(value_schema) => { - let value_field = schema_to_field_with_props( - &value_schema.types, - Some("value"), - false, - None, - )?; - DataType::Dictionary( - Box::new(DataType::Utf8), - Box::new(value_field.data_type().clone()), - ) - } - AvroSchema::Union(us) => { - // If there are only two variants and one of them is null, set the other type as the field data type - let has_nullable = us - .find_schema_with_known_schemata::( - &Value::Null, - None, - &None, - ) - .is_some(); - let sub_schemas = us.variants(); - if has_nullable && sub_schemas.len() == 2 { - nullable = true; - if let Some(schema) = sub_schemas - .iter() - .find(|&schema| !matches!(schema, AvroSchema::Null)) - { - schema_to_field_with_props(schema, None, has_nullable, None)? - .data_type() - .clone() - } else { - return Err(apache_avro::Error::new( - apache_avro::error::Details::GetUnionDuplicate, - ) - .into()); - } - } else { - let fields = sub_schemas - .iter() - .map(|s| schema_to_field_with_props(s, None, has_nullable, None)) - .collect::>>()?; - let type_ids = 0_i8..fields.len() as i8; - DataType::Union(UnionFields::new(type_ids, fields), UnionMode::Dense) - } - } - AvroSchema::Record(RecordSchema { fields, .. }) => { - let fields: Result<_> = fields - .iter() - .map(|field| { - let mut props = HashMap::new(); - if let Some(doc) = &field.doc { - props.insert("avro::doc".to_string(), doc.clone()); - } - /*if let Some(aliases) = fields.aliases { - props.insert("aliases", aliases); - }*/ - schema_to_field_with_props( - &field.schema, - Some(&field.name), - false, - Some(props), - ) - }) - .collect(); - DataType::Struct(fields?) - } - AvroSchema::Enum(EnumSchema { .. }) => DataType::Utf8, - AvroSchema::Fixed(FixedSchema { size, .. }) => { - DataType::FixedSizeBinary(*size as i32) - } - AvroSchema::Decimal(DecimalSchema { - precision, scale, .. - }) => DataType::Decimal128(*precision as u8, *scale as i8), - AvroSchema::BigDecimal => DataType::LargeBinary, - AvroSchema::Uuid => DataType::FixedSizeBinary(16), - AvroSchema::Date => DataType::Date32, - AvroSchema::TimeMillis => DataType::Time32(TimeUnit::Millisecond), - AvroSchema::TimeMicros => DataType::Time64(TimeUnit::Microsecond), - AvroSchema::TimestampMillis => DataType::Timestamp(TimeUnit::Millisecond, None), - AvroSchema::TimestampMicros => DataType::Timestamp(TimeUnit::Microsecond, None), - AvroSchema::TimestampNanos => DataType::Timestamp(TimeUnit::Nanosecond, None), - AvroSchema::LocalTimestampMillis => todo!(), - AvroSchema::LocalTimestampMicros => todo!(), - AvroSchema::LocalTimestampNanos => todo!(), - AvroSchema::Duration => DataType::Duration(TimeUnit::Millisecond), - }; - - let data_type = field_type.clone(); - let name = name.unwrap_or_else(|| default_field_name(&data_type)); - - let mut field = Field::new(name, field_type, nullable); - field.set_metadata(props.unwrap_or_default()); - Ok(field) -} - -fn default_field_name(dt: &DataType) -> &str { - match dt { - DataType::Null => "null", - DataType::Boolean => "bit", - DataType::Int8 => "tinyint", - DataType::Int16 => "smallint", - DataType::Int32 => "int", - DataType::Int64 => "bigint", - DataType::UInt8 => "uint1", - DataType::UInt16 => "uint2", - DataType::UInt32 => "uint4", - DataType::UInt64 => "uint8", - DataType::Float16 => "float2", - DataType::Float32 => "float4", - DataType::Float64 => "float8", - DataType::Date32 => "dateday", - DataType::Date64 => "datemilli", - DataType::Time32(tu) | DataType::Time64(tu) => match tu { - TimeUnit::Second => "timesec", - TimeUnit::Millisecond => "timemilli", - TimeUnit::Microsecond => "timemicro", - TimeUnit::Nanosecond => "timenano", - }, - DataType::Timestamp(tu, tz) => { - if tz.is_some() { - match tu { - TimeUnit::Second => "timestampsectz", - TimeUnit::Millisecond => "timestampmillitz", - TimeUnit::Microsecond => "timestampmicrotz", - TimeUnit::Nanosecond => "timestampnanotz", - } - } else { - match tu { - TimeUnit::Second => "timestampsec", - TimeUnit::Millisecond => "timestampmilli", - TimeUnit::Microsecond => "timestampmicro", - TimeUnit::Nanosecond => "timestampnano", - } - } - } - DataType::Duration(_) => "duration", - DataType::Interval(unit) => match unit { - IntervalUnit::YearMonth => "intervalyear", - IntervalUnit::DayTime => "intervalmonth", - IntervalUnit::MonthDayNano => "intervalmonthdaynano", - }, - DataType::Binary => "varbinary", - DataType::FixedSizeBinary(_) => "fixedsizebinary", - DataType::LargeBinary => "largevarbinary", - DataType::Utf8 => "varchar", - DataType::LargeUtf8 => "largevarchar", - DataType::List(_) => "list", - DataType::FixedSizeList(_, _) => "fixed_size_list", - DataType::LargeList(_) => "largelist", - DataType::Struct(_) => "struct", - DataType::Union(_, _) => "union", - DataType::Dictionary(_, _) => "map", - DataType::Map(_, _) => unimplemented!("Map support not implemented"), - DataType::RunEndEncoded(_, _) => { - unimplemented!("RunEndEncoded support not implemented") - } - DataType::Utf8View - | DataType::BinaryView - | DataType::ListView(_) - | DataType::LargeListView(_) => { - unimplemented!("View support not implemented") - } - DataType::Decimal32(_, _) => "decimal", - DataType::Decimal64(_, _) => "decimal", - DataType::Decimal128(_, _) => "decimal", - DataType::Decimal256(_, _) => "decimal", - } -} - -fn external_props(schema: &AvroSchema) -> HashMap { - let mut props = HashMap::new(); - match &schema { - AvroSchema::Record(RecordSchema { doc: Some(doc), .. }) - | AvroSchema::Enum(EnumSchema { doc: Some(doc), .. }) - | AvroSchema::Fixed(FixedSchema { doc: Some(doc), .. }) => { - props.insert("avro::doc".to_string(), doc.clone()); - } - _ => {} - } - match &schema { - AvroSchema::Record(RecordSchema { - name: Name { namespace, .. }, - aliases: Some(aliases), - .. - }) - | AvroSchema::Enum(EnumSchema { - name: Name { namespace, .. }, - aliases: Some(aliases), - .. - }) - | AvroSchema::Fixed(FixedSchema { - name: Name { namespace, .. }, - aliases: Some(aliases), - .. - }) => { - let aliases: Vec = aliases - .iter() - .map(|alias| aliased(alias, namespace.as_deref(), None)) - .collect(); - props.insert( - "avro::aliases".to_string(), - format!("[{}]", aliases.join(",")), - ); - } - _ => {} - } - props -} - -/// Returns the fully qualified name for a field -pub fn aliased( - alias: &Alias, - namespace: Option<&str>, - default_namespace: Option<&str>, -) -> String { - if alias.namespace().is_some() { - alias.fullname(None) - } else { - let namespace = namespace.as_ref().copied().or(default_namespace); - - match namespace { - Some(ref namespace) => format!("{}.{}", namespace, alias.name()), - None => alias.fullname(None), - } - } -} - -#[cfg(test)] -mod test { - use super::{aliased, external_props, to_arrow_schema}; - use apache_avro::Schema as AvroSchema; - use apache_avro::schema::{Alias, EnumSchema, FixedSchema, Name, RecordSchema}; - use arrow::datatypes::DataType::{Binary, Float32, Float64, Timestamp, Utf8}; - use arrow::datatypes::DataType::{Boolean, Int32, Int64}; - use arrow::datatypes::TimeUnit::Microsecond; - use arrow::datatypes::{Field, Schema}; - - fn alias(name: &str) -> Alias { - Alias::new(name).unwrap() - } - - #[test] - fn test_alias() { - assert_eq!(aliased(&alias("foo.bar"), None, None), "foo.bar"); - assert_eq!(aliased(&alias("bar"), Some("foo"), None), "foo.bar"); - assert_eq!(aliased(&alias("bar"), Some("foo"), Some("cat")), "foo.bar"); - assert_eq!(aliased(&alias("bar"), None, Some("cat")), "cat.bar"); - } - - #[test] - fn test_external_props() { - let record_schema = AvroSchema::Record(RecordSchema { - name: Name { - name: "record".to_string(), - namespace: None, - }, - aliases: Some(vec![alias("fooalias"), alias("baralias")]), - doc: Some("record documentation".to_string()), - fields: vec![], - lookup: Default::default(), - attributes: Default::default(), - }); - let props = external_props(&record_schema); - assert_eq!( - props.get("avro::doc"), - Some(&"record documentation".to_string()) - ); - assert_eq!( - props.get("avro::aliases"), - Some(&"[fooalias,baralias]".to_string()) - ); - let enum_schema = AvroSchema::Enum(EnumSchema { - name: Name { - name: "enum".to_string(), - namespace: None, - }, - aliases: Some(vec![alias("fooenum"), alias("barenum")]), - doc: Some("enum documentation".to_string()), - symbols: vec![], - default: None, - attributes: Default::default(), - }); - let props = external_props(&enum_schema); - assert_eq!( - props.get("avro::doc"), - Some(&"enum documentation".to_string()) - ); - assert_eq!( - props.get("avro::aliases"), - Some(&"[fooenum,barenum]".to_string()) - ); - let fixed_schema = AvroSchema::Fixed(FixedSchema { - name: Name { - name: "fixed".to_string(), - namespace: None, - }, - aliases: Some(vec![alias("foofixed"), alias("barfixed")]), - size: 1, - doc: None, - default: None, - attributes: Default::default(), - }); - let props = external_props(&fixed_schema); - assert_eq!( - props.get("avro::aliases"), - Some(&"[foofixed,barfixed]".to_string()) - ); - } - - #[test] - fn test_invalid_avro_schema() {} - - #[test] - fn test_plain_types_schema() { - let schema = AvroSchema::parse_str( - r#" - { - "type" : "record", - "name" : "topLevelRecord", - "fields" : [ { - "name" : "id", - "type" : [ "int", "null" ] - }, { - "name" : "bool_col", - "type" : [ "boolean", "null" ] - }, { - "name" : "tinyint_col", - "type" : [ "int", "null" ] - }, { - "name" : "smallint_col", - "type" : [ "int", "null" ] - }, { - "name" : "int_col", - "type" : [ "int", "null" ] - }, { - "name" : "bigint_col", - "type" : [ "long", "null" ] - }, { - "name" : "float_col", - "type" : [ "float", "null" ] - }, { - "name" : "double_col", - "type" : [ "double", "null" ] - }, { - "name" : "date_string_col", - "type" : [ "bytes", "null" ] - }, { - "name" : "string_col", - "type" : [ "bytes", "null" ] - }, { - "name" : "timestamp_col", - "type" : [ { - "type" : "long", - "logicalType" : "timestamp-micros" - }, "null" ] - } ] - }"#, - ); - assert!(schema.is_ok(), "{schema:?}"); - let arrow_schema = to_arrow_schema(&schema.unwrap()); - assert!(arrow_schema.is_ok(), "{arrow_schema:?}"); - let expected = Schema::new(vec![ - Field::new("id", Int32, true), - Field::new("bool_col", Boolean, true), - Field::new("tinyint_col", Int32, true), - Field::new("smallint_col", Int32, true), - Field::new("int_col", Int32, true), - Field::new("bigint_col", Int64, true), - Field::new("float_col", Float32, true), - Field::new("double_col", Float64, true), - Field::new("date_string_col", Binary, true), - Field::new("string_col", Binary, true), - Field::new("timestamp_col", Timestamp(Microsecond, None), true), - ]); - assert_eq!(arrow_schema.unwrap(), expected); - } - - #[test] - fn test_nested_schema() { - let avro_schema = apache_avro::Schema::parse_str( - r#" - { - "type": "record", - "name": "r1", - "fields": [ - { - "name": "col1", - "type": [ - "null", - { - "type": "record", - "name": "r2", - "fields": [ - { - "name": "col2", - "type": "string" - }, - { - "name": "col3", - "type": ["null", "string"], - "default": null - } - ] - } - ], - "default": null - } - ] - }"#, - ) - .unwrap(); - // should not use Avro Record names. - let expected_arrow_schema = Schema::new(vec![Field::new( - "col1", - arrow::datatypes::DataType::Struct( - vec![ - Field::new("col2", Utf8, false), - Field::new("col3", Utf8, true), - ] - .into(), - ), - true, - )]); - assert_eq!( - to_arrow_schema(&avro_schema).unwrap(), - expected_arrow_schema - ); - } - - #[test] - fn test_non_record_schema() { - let arrow_schema = to_arrow_schema(&AvroSchema::String); - assert!(arrow_schema.is_ok(), "{arrow_schema:?}"); - assert_eq!( - arrow_schema.unwrap(), - Schema::new(vec![Field::new("", Utf8, false)]) - ); - } -} diff --git a/datafusion/datasource-avro/src/file_format.rs b/datafusion/datasource-avro/src/file_format.rs index 2447c032e700d..602a1172bbd7e 100644 --- a/datafusion/datasource-avro/src/file_format.rs +++ b/datafusion/datasource-avro/src/file_format.rs @@ -16,13 +16,12 @@ // under the License. //! Apache Avro [`FileFormat`] abstractions - use std::any::Any; use std::collections::HashMap; use std::fmt; use std::sync::Arc; -use crate::avro_to_arrow::read_avro_schema_from_reader; +use crate::read_avro_schema_from_reader; use crate::source::AvroSource; use arrow::datatypes::Schema; diff --git a/datafusion/datasource-avro/src/mod.rs b/datafusion/datasource-avro/src/mod.rs index 22c40e203a014..946330166582b 100644 --- a/datafusion/datasource-avro/src/mod.rs +++ b/datafusion/datasource-avro/src/mod.rs @@ -28,9 +28,64 @@ //! An [Avro](https://avro.apache.org/) based [`FileSource`](datafusion_datasource::file::FileSource) implementation and related functionality. -pub mod avro_to_arrow; pub mod file_format; pub mod source; -pub use apache_avro; +use arrow::datatypes::Schema; +pub use arrow_avro; +use arrow_avro::reader::ReaderBuilder; pub use file_format::*; +use std::io::{BufReader, Read}; + +/// Read Avro schema given a reader +pub fn read_avro_schema_from_reader( + reader: &mut R, +) -> datafusion_common::Result { + let avro_reader = ReaderBuilder::new().build(BufReader::new(reader))?; + Ok(avro_reader.schema().as_ref().clone()) +} + +#[cfg(test)] +mod test { + use super::*; + use datafusion_common::test_util::arrow_test_data; + use datafusion_common::Result as DFResult; + use std::fs::File; + use arrow::datatypes::{DataType, Field, TimeUnit}; + + fn avro_test_file(name: &str) -> String { + format!("{}/avro/{name}", arrow_test_data()) + } + + #[test] + fn test_read_avro_schema_from_reader() -> DFResult<()> { + let path = avro_test_file("alltypes_dictionary.avro"); + let mut file = File::open(&path)?; + let file_schema = read_avro_schema_from_reader(&mut file)?; + + let expected_fields = vec![ + Field::new("id", DataType::Int32, true), + Field::new("bool_col", DataType::Boolean, true), + Field::new("tinyint_col", DataType::Int32, true), + Field::new("smallint_col", DataType::Int32, true), + Field::new("int_col", DataType::Int32, true), + Field::new("bigint_col", DataType::Int64, true), + Field::new("float_col", DataType::Float32, true), + Field::new("double_col", DataType::Float64, true), + Field::new("date_string_col", DataType::Binary, true), + Field::new("string_col", DataType::Binary, true), + Field::new( + "timestamp_col", + DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())), + true, + ), + ]; + + assert_eq!(file_schema.fields.len(), expected_fields.len()); + for (i, field) in file_schema.fields.iter().enumerate() { + assert_eq!(field.as_ref(), &expected_fields[i]); + } + + Ok(()) + } +} diff --git a/datafusion/datasource-avro/src/source.rs b/datafusion/datasource-avro/src/source.rs index 33d6cf5272678..455806676ab4f 100644 --- a/datafusion/datasource-avro/src/source.rs +++ b/datafusion/datasource-avro/src/source.rs @@ -20,16 +20,15 @@ use std::any::Any; use std::sync::Arc; -use crate::avro_to_arrow::Reader as AvroReader; - +use arrow_avro::reader::{Reader, ReaderBuilder}; +use arrow_avro::schema::AvroSchema; use datafusion_common::error::Result; -use datafusion_datasource::TableSchema; use datafusion_datasource::file::FileSource; use datafusion_datasource::file_scan_config::FileScanConfig; use datafusion_datasource::file_stream::FileOpener; use datafusion_datasource::projection::{ProjectionOpener, SplitProjection}; use datafusion_datasource::schema_adapter::SchemaAdapterFactory; -use datafusion_physical_expr_common::sort_expr::LexOrdering; +use datafusion_datasource::TableSchema; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion_physical_plan::projection::ProjectionExprs; @@ -58,22 +57,15 @@ impl AvroSource { } } - fn open(&self, reader: R) -> Result> { - let file_schema = self.table_schema.file_schema(); - let projection = Some( - self.projection - .file_indices - .iter() - .map(|&idx| file_schema.field(idx).name().clone()) - .collect::>(), - ); - AvroReader::try_new( - reader, - &Arc::clone(self.table_schema.file_schema()), - self.batch_size.expect("Batch size must set before open"), - projection.as_ref(), - ) + fn open(&self, reader: R) -> Result> { + ReaderBuilder::new() + .with_reader_schema(AvroSchema::try_from(self.table_schema.file_schema().as_ref()).unwrap()) + .with_batch_size(self.batch_size.expect("Batch size must set before open")) + .with_projection(self.projection.file_indices.clone()) + .build(reader) + .map_err(Into::into) } + } impl FileSource for AvroSource { @@ -133,16 +125,6 @@ impl FileSource for AvroSource { "avro" } - fn repartitioned( - &self, - _target_partitions: usize, - _repartition_file_min_size: usize, - _output_ordering: Option, - _config: &FileScanConfig, - ) -> Result> { - Ok(None) - } - fn with_schema_adapter_factory( &self, schema_adapter_factory: Arc, @@ -160,9 +142,10 @@ impl FileSource for AvroSource { mod private { use super::*; + use std::io::BufReader; use bytes::Buf; - use datafusion_datasource::{PartitionedFile, file_stream::FileOpenFuture}; + use datafusion_datasource::{file_stream::FileOpenFuture, PartitionedFile}; use futures::StreamExt; use object_store::{GetResultPayload, ObjectStore}; @@ -173,22 +156,23 @@ mod private { impl FileOpener for AvroOpener { fn open(&self, partitioned_file: PartitionedFile) -> Result { - let config = Arc::clone(&self.config); let object_store = Arc::clone(&self.object_store); + let config = Arc::clone(&self.config); + Ok(Box::pin(async move { let r = object_store .get(&partitioned_file.object_meta.location) .await?; match r.payload { GetResultPayload::File(file, _) => { - let reader = config.open(file)?; + let reader = config.open(BufReader::new(file))?; Ok(futures::stream::iter(reader) .map(|r| r.map_err(Into::into)) .boxed()) } GetResultPayload::Stream(_) => { let bytes = r.bytes().await?; - let reader = config.open(bytes.reader())?; + let reader = config.open(BufReader::new(bytes.reader()))?; Ok(futures::stream::iter(reader) .map(|r| r.map_err(Into::into)) .boxed()) @@ -198,3 +182,178 @@ mod private { } } } + +#[cfg(test)] +mod tests { + use super::*; + use arrow::array::*; + use arrow::array::{ + BinaryArray, BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array, + TimestampMicrosecondArray, + }; + use arrow::datatypes::{DataType, Field}; + use arrow::datatypes::TimeUnit; + use std::fs::File; + use std::io::BufReader; + + fn build_reader(name: &'_ str, projection : Option>) -> Reader> { + let testdata = datafusion_common::test_util::arrow_test_data(); + let filename = format!("{testdata}/avro/{name}"); + let mut builder = ReaderBuilder::new() + .with_batch_size(64); + if let Some(proj) = projection { + builder = builder.with_projection(proj); + } + builder + .build(BufReader::new(File::open(filename).unwrap())) + .unwrap() + } + + fn get_col<'a, T: 'static>( + batch: &'a RecordBatch, + col: (usize, &Field), + ) -> Option<&'a T> { + batch.column(col.0).as_any().downcast_ref::() + } + + #[test] + fn test_avro_basic() { + let mut reader = build_reader("alltypes_dictionary.avro", None); + let batch = reader.next().unwrap().unwrap(); + + assert_eq!(11, batch.num_columns()); + assert_eq!(2, batch.num_rows()); + + let schema = reader.schema(); + let batch_schema = batch.schema(); + assert_eq!(schema, batch_schema); + + let id = schema.column_with_name("id").unwrap(); + assert_eq!(0, id.0); + assert_eq!(&DataType::Int32, id.1.data_type()); + let col = get_col::(&batch, id).unwrap(); + assert_eq!(0, col.value(0)); + assert_eq!(1, col.value(1)); + let bool_col = schema.column_with_name("bool_col").unwrap(); + assert_eq!(1, bool_col.0); + assert_eq!(&DataType::Boolean, bool_col.1.data_type()); + let col = get_col::(&batch, bool_col).unwrap(); + assert!(col.value(0)); + assert!(!col.value(1)); + let tinyint_col = schema.column_with_name("tinyint_col").unwrap(); + assert_eq!(2, tinyint_col.0); + assert_eq!(&DataType::Int32, tinyint_col.1.data_type()); + let col = get_col::(&batch, tinyint_col).unwrap(); + assert_eq!(0, col.value(0)); + assert_eq!(1, col.value(1)); + let smallint_col = schema.column_with_name("smallint_col").unwrap(); + assert_eq!(3, smallint_col.0); + assert_eq!(&DataType::Int32, smallint_col.1.data_type()); + let col = get_col::(&batch, smallint_col).unwrap(); + assert_eq!(0, col.value(0)); + assert_eq!(1, col.value(1)); + let int_col = schema.column_with_name("int_col").unwrap(); + assert_eq!(4, int_col.0); + let col = get_col::(&batch, int_col).unwrap(); + assert_eq!(0, col.value(0)); + assert_eq!(1, col.value(1)); + assert_eq!(&DataType::Int32, int_col.1.data_type()); + let col = get_col::(&batch, int_col).unwrap(); + assert_eq!(0, col.value(0)); + assert_eq!(1, col.value(1)); + let bigint_col = schema.column_with_name("bigint_col").unwrap(); + assert_eq!(5, bigint_col.0); + let col = get_col::(&batch, bigint_col).unwrap(); + assert_eq!(0, col.value(0)); + assert_eq!(10, col.value(1)); + assert_eq!(&DataType::Int64, bigint_col.1.data_type()); + let float_col = schema.column_with_name("float_col").unwrap(); + assert_eq!(6, float_col.0); + let col = get_col::(&batch, float_col).unwrap(); + assert_eq!(0.0, col.value(0)); + assert_eq!(1.1, col.value(1)); + assert_eq!(&DataType::Float32, float_col.1.data_type()); + let col = get_col::(&batch, float_col).unwrap(); + assert_eq!(0.0, col.value(0)); + assert_eq!(1.1, col.value(1)); + let double_col = schema.column_with_name("double_col").unwrap(); + assert_eq!(7, double_col.0); + assert_eq!(&DataType::Float64, double_col.1.data_type()); + let col = get_col::(&batch, double_col).unwrap(); + assert_eq!(0.0, col.value(0)); + assert_eq!(10.1, col.value(1)); + let date_string_col = schema.column_with_name("date_string_col").unwrap(); + assert_eq!(8, date_string_col.0); + assert_eq!(&DataType::Binary, date_string_col.1.data_type()); + let col = get_col::(&batch, date_string_col).unwrap(); + assert_eq!("01/01/09".as_bytes(), col.value(0)); + assert_eq!("01/01/09".as_bytes(), col.value(1)); + let string_col = schema.column_with_name("string_col").unwrap(); + assert_eq!(9, string_col.0); + assert_eq!(&DataType::Binary, string_col.1.data_type()); + let col = get_col::(&batch, string_col).unwrap(); + assert_eq!("0".as_bytes(), col.value(0)); + assert_eq!("1".as_bytes(), col.value(1)); + let timestamp_col = schema.column_with_name("timestamp_col").unwrap(); + assert_eq!(10, timestamp_col.0); + assert_eq!( + &DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())), + timestamp_col.1.data_type() + ); + let col = get_col::(&batch, timestamp_col).unwrap(); + assert_eq!(1230768000000000, col.value(0)); + assert_eq!(1230768060000000, col.value(1)); + } + + #[test] + fn test_avro_with_projection() { + // Test projection to filter and reorder columns + let projection = vec![9, 7, 1]; // string_col, double_col, bool_col + + let mut reader = + build_reader("alltypes_dictionary.avro", Some(projection)); + let batch = reader.next().unwrap().unwrap(); + + // Only 3 columns should be present (not all 11) + assert_eq!(3, batch.num_columns()); + assert_eq!(2, batch.num_rows()); + + let schema = reader.schema(); + let batch_schema = batch.schema(); + assert_eq!(schema, batch_schema); + + // Verify columns are in the order specified in projection + // First column should be string_col (was at index 9 in original) + assert_eq!("string_col", schema.field(0).name()); + assert_eq!(&DataType::Binary, schema.field(0).data_type()); + let col = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!("0".as_bytes(), col.value(0)); + assert_eq!("1".as_bytes(), col.value(1)); + + // Second column should be double_col (was at index 7 in original) + assert_eq!("double_col", schema.field(1).name()); + assert_eq!(&DataType::Float64, schema.field(1).data_type()); + let col = batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(0.0, col.value(0)); + assert_eq!(10.1, col.value(1)); + + // Third column should be bool_col (was at index 1 in original) + assert_eq!("bool_col", schema.field(2).name()); + assert_eq!(&DataType::Boolean, schema.field(2).data_type()); + let col = batch + .column(2) + .as_any() + .downcast_ref::() + .unwrap(); + assert!(col.value(0)); + assert!(!col.value(1)); + } +} diff --git a/datafusion/proto/Cargo.toml b/datafusion/proto/Cargo.toml index b00bd0dcc6bfd..06e38741e528a 100644 --- a/datafusion/proto/Cargo.toml +++ b/datafusion/proto/Cargo.toml @@ -41,7 +41,7 @@ name = "datafusion_proto" default = ["parquet"] json = ["pbjson", "serde", "serde_json", "datafusion-proto-common/json"] parquet = ["datafusion-datasource-parquet", "datafusion-common/parquet", "datafusion/parquet"] -avro = ["datafusion-datasource-avro", "datafusion-common/avro"] +avro = ["datafusion-datasource-avro"] # Note to developers: do *not* add `datafusion` as a dependency in # this crate. See https://github.com/apache/datafusion/issues/17713 diff --git a/datafusion/sqllogictest/test_files/avro.slt b/datafusion/sqllogictest/test_files/avro.slt index 2ad60c0082e87..eed8af475b406 100644 --- a/datafusion/sqllogictest/test_files/avro.slt +++ b/datafusion/sqllogictest/test_files/avro.slt @@ -31,7 +31,7 @@ CREATE EXTERNAL TABLE alltypes_plain ( float_col FLOAT NOT NULL, double_col DOUBLE NOT NULL, date_string_col BYTEA NOT NULL, - string_col VARCHAR NOT NULL, + string_col BYTEA NOT NULL, timestamp_col TIMESTAMP NOT NULL, ) STORED AS AVRO @@ -48,7 +48,7 @@ CREATE EXTERNAL TABLE alltypes_plain_snappy ( float_col FLOAT NOT NULL, double_col DOUBLE NOT NULL, date_string_col BYTEA NOT NULL, - string_col VARCHAR NOT NULL, + string_col BYTEA NOT NULL, timestamp_col TIMESTAMP NOT NULL, ) STORED AS AVRO @@ -65,7 +65,7 @@ CREATE EXTERNAL TABLE alltypes_plain_bzip2 ( float_col FLOAT NOT NULL, double_col DOUBLE NOT NULL, date_string_col BYTEA NOT NULL, - string_col VARCHAR NOT NULL, + string_col BYTEA NOT NULL, timestamp_col TIMESTAMP NOT NULL, ) STORED AS AVRO @@ -82,7 +82,7 @@ CREATE EXTERNAL TABLE alltypes_plain_xz ( float_col FLOAT NOT NULL, double_col DOUBLE NOT NULL, date_string_col BYTEA NOT NULL, - string_col VARCHAR NOT NULL, + string_col BYTEA NOT NULL, timestamp_col TIMESTAMP NOT NULL, ) STORED AS AVRO @@ -99,7 +99,7 @@ CREATE EXTERNAL TABLE alltypes_plain_zstandard ( float_col FLOAT NOT NULL, double_col DOUBLE NOT NULL, date_string_col BYTEA NOT NULL, - string_col VARCHAR NOT NULL, + string_col BYTEA NOT NULL, timestamp_col TIMESTAMP NOT NULL, ) STORED AS AVRO @@ -107,7 +107,7 @@ LOCATION '../../testing/data/avro/alltypes_plain.zstandard.avro'; statement ok CREATE EXTERNAL TABLE single_nan ( - mycol FLOAT + mycol DOUBLE ) STORED AS AVRO LOCATION '../../testing/data/avro/single_nan.avro'; @@ -260,7 +260,7 @@ physical_plan # test column projection order from avro file query ITII -SELECT id, string_col, int_col, bigint_col FROM alltypes_plain ORDER BY id LIMIT 5 +SELECT id, CAST(string_col AS varchar), int_col, bigint_col FROM alltypes_plain ORDER BY id LIMIT 5 ---- 0 0 0 0 1 1 1 10 diff --git a/datafusion/sqllogictest/test_files/repartition_scan.slt b/datafusion/sqllogictest/test_files/repartition_scan.slt index 06ea22761d92b..f198f06eb213f 100644 --- a/datafusion/sqllogictest/test_files/repartition_scan.slt +++ b/datafusion/sqllogictest/test_files/repartition_scan.slt @@ -279,7 +279,7 @@ query TT EXPLAIN SELECT * FROM avro_table ---- logical_plan TableScan: avro_table projection=[f1, f2, f3] -physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/avro/simple_enum.avro]]}, projection=[f1, f2, f3], file_type=avro +physical_plan DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/testing/data/avro/simple_enum.avro:0..103], [WORKSPACE_ROOT/testing/data/avro/simple_enum.avro:103..206], [WORKSPACE_ROOT/testing/data/avro/simple_enum.avro:206..309], [WORKSPACE_ROOT/testing/data/avro/simple_enum.avro:309..411]]}, projection=[f1, f2, f3], file_type=avro # Cleanup statement ok diff --git a/parquet-testing b/parquet-testing index 107b36603e051..a3d96a65e11e2 160000 --- a/parquet-testing +++ b/parquet-testing @@ -1 +1 @@ -Subproject commit 107b36603e051aee26bd93e04b871034f6c756c0 +Subproject commit a3d96a65e11e2bbca7d22a894e8313ede90a33a3