Skip to content

Commit b3b6b20

Browse files
committed
feat: Support hdfs with OpenDAL
1 parent 28d874c commit b3b6b20

File tree

2 files changed

+46
-2
lines changed

2 files changed

+46
-2
lines changed

native/core/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,9 @@ aws-credential-types = { workspace = true }
7474
parking_lot = "0.12.3"
7575
datafusion-comet-objectstore-hdfs = { path = "../hdfs", optional = true, default-features = false, features = ["hdfs"] }
7676

77+
object_store_opendal = {version = "0.54.0", optional = true}
78+
opendal = { version ="0.54.0", optional = true, features = ["services-hdfs"] }
79+
7780
[target.'cfg(target_os = "linux")'.dependencies]
7881
procfs = "0.17.0"
7982

@@ -89,6 +92,7 @@ datafusion-functions-nested = { version = "49.0.2" }
8992
[features]
9093
default = []
9194
hdfs = ["datafusion-comet-objectstore-hdfs"]
95+
hdfs-opendal = ["opendal", "object_store_opendal"]
9296
jemalloc = ["tikv-jemallocator", "tikv-jemalloc-ctl"]
9397

9498
# exclude optional packages from cargo machete verifications

native/core/src/parquet/parquet_support.rs

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,13 @@ use datafusion_comet_spark_expr::EvalMode;
3838
use object_store::path::Path;
3939
use object_store::{parse_url, ObjectStore};
4040
use std::collections::HashMap;
41+
use std::path::PathBuf;
4142
use std::time::Duration;
42-
use std::{fmt::Debug, hash::Hash, sync::Arc};
43+
use std::{
44+
fmt::{Debug, Write},
45+
hash::Hash,
46+
sync::Arc,
47+
};
4348
use url::Url;
4449

4550
use super::objectstore;
@@ -352,7 +357,42 @@ fn parse_hdfs_url(url: &Url) -> Result<(Box<dyn ObjectStore>, Path), object_stor
352357
}
353358
}
354359

355-
#[cfg(not(feature = "hdfs"))]
360+
#[cfg(feature = "hdfs-opendal")]
361+
fn parse_hdfs_url(url: &Url) -> Result<(Box<dyn ObjectStore>, Path), object_store::Error> {
362+
let name_node = get_name_node_uri(url)?;
363+
let builder = opendal::services::Hdfs::default().name_node(&name_node);
364+
365+
let op = opendal::Operator::new(builder)
366+
.map_err(|error| object_store::Error::Generic {
367+
store: "hdfs-opendal",
368+
source: error.into(),
369+
})?
370+
.finish();
371+
let store = object_store_opendal::OpendalStore::new(op);
372+
let path = Path::parse(url.path())?;
373+
Ok((Box::new(store), path))
374+
}
375+
376+
fn get_name_node_uri(url: &Url) -> Result<String, object_store::Error> {
377+
if let Some(host) = url.host() {
378+
let schema = url.scheme();
379+
let mut uri_builder = String::new();
380+
write!(&mut uri_builder, "{schema}://{host}").unwrap();
381+
382+
if let Some(port) = url.port() {
383+
write!(&mut uri_builder, ":{port}").unwrap();
384+
}
385+
Ok(uri_builder)
386+
} else {
387+
Err(object_store::Error::InvalidPath {
388+
source: object_store::path::Error::InvalidPath {
389+
path: PathBuf::from(url.as_str()),
390+
},
391+
})
392+
}
393+
}
394+
395+
#[cfg(all(not(feature = "hdfs"), not(feature = "hdfs-opendal")))]
356396
fn parse_hdfs_url(_url: &Url) -> Result<(Box<dyn ObjectStore>, Path), object_store::Error> {
357397
Err(object_store::Error::Generic {
358398
store: "HadoopFileSystem",

0 commit comments

Comments
 (0)