diff --git a/native/core/Cargo.toml b/native/core/Cargo.toml index 898d41dde9..d89757302f 100644 --- a/native/core/Cargo.toml +++ b/native/core/Cargo.toml @@ -74,6 +74,10 @@ aws-credential-types = { workspace = true } parking_lot = "0.12.3" datafusion-comet-objectstore-hdfs = { path = "../hdfs", optional = true, default-features = false, features = ["hdfs"] } +object_store_opendal = {version = "0.54.0", optional = true} +hdfs-sys = {version = "0.3", optional = true, features = ["hdfs_3_3"]} +opendal = { version ="0.54.0", optional = true, features = ["services-hdfs"] } + [target.'cfg(target_os = "linux")'.dependencies] procfs = "0.17.0" @@ -89,11 +93,12 @@ datafusion-functions-nested = { version = "49.0.2" } [features] default = [] hdfs = ["datafusion-comet-objectstore-hdfs"] +hdfs-opendal = ["opendal", "object_store_opendal", "hdfs-sys"] jemalloc = ["tikv-jemallocator", "tikv-jemalloc-ctl"] # exclude optional packages from cargo machete verifications [package.metadata.cargo-machete] -ignored = ["datafusion-comet-objectstore-hdfs"] +ignored = ["datafusion-comet-objectstore-hdfs", "hdfs-sys"] [lib] name = "comet" diff --git a/native/core/src/parquet/parquet_support.rs b/native/core/src/parquet/parquet_support.rs index 5c425c6688..e8118a5550 100644 --- a/native/core/src/parquet/parquet_support.rs +++ b/native/core/src/parquet/parquet_support.rs @@ -352,7 +352,44 @@ fn parse_hdfs_url(url: &Url) -> Result<(Box, Path), object_stor } } -#[cfg(not(feature = "hdfs"))] +#[cfg(feature = "hdfs-opendal")] +fn parse_hdfs_url(url: &Url) -> Result<(Box, Path), object_store::Error> { + let name_node = get_name_node_uri(url)?; + let builder = opendal::services::Hdfs::default().name_node(&name_node); + + let op = opendal::Operator::new(builder) + .map_err(|error| object_store::Error::Generic { + store: "hdfs-opendal", + source: error.into(), + })? + .finish(); + let store = object_store_opendal::OpendalStore::new(op); + let path = Path::parse(url.path())?; + Ok((Box::new(store), path)) +} + +#[cfg(feature = "hdfs-opendal")] +fn get_name_node_uri(url: &Url) -> Result { + use std::fmt::Write; + if let Some(host) = url.host() { + let schema = url.scheme(); + let mut uri_builder = String::new(); + write!(&mut uri_builder, "{schema}://{host}").unwrap(); + + if let Some(port) = url.port() { + write!(&mut uri_builder, ":{port}").unwrap(); + } + Ok(uri_builder) + } else { + Err(object_store::Error::InvalidPath { + source: object_store::path::Error::InvalidPath { + path: std::path::PathBuf::from(url.as_str()), + }, + }) + } +} + +#[cfg(all(not(feature = "hdfs"), not(feature = "hdfs-opendal")))] fn parse_hdfs_url(_url: &Url) -> Result<(Box, Path), object_store::Error> { Err(object_store::Error::Generic { store: "HadoopFileSystem",