Skip to content

Commit fc11163

Browse files
committed
feat: add experimental remote HDFS support for native DataFusion reader
1 parent ea6d950 commit fc11163

File tree

6 files changed

+243
-10
lines changed

6 files changed

+243
-10
lines changed

Makefile

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ define spark_jvm_17_extra_args
2121
$(shell ./mvnw help:evaluate -Dexpression=extraJavaTestArgs | grep -v '\[')
2222
endef
2323

24+
# Build optional Comet native features (like hdfs e.g)
25+
FEATURES_ARG := $(shell ! [ -z $(COMET_FEATURES) ] && echo '--features=$(COMET_FEATURES)')
26+
2427
all: core jvm
2528

2629
core:
@@ -95,7 +98,7 @@ release-linux: clean
9598
cd native && RUSTFLAGS="-Ctarget-cpu=native -Ctarget-feature=-prefer-256-bit" cargo build --release
9699
./mvnw install -Prelease -DskipTests $(PROFILES)
97100
release:
98-
cd native && RUSTFLAGS="-Ctarget-cpu=native" cargo build --release
101+
cd native && RUSTFLAGS="$(RUSTFLAGS) -Ctarget-cpu=native" && RUSTFLAGS=$$RUSTFLAGS cargo build --release $(FEATURES_ARG)
99102
./mvnw install -Prelease -DskipTests $(PROFILES)
100103
release-nogit:
101104
cd native && RUSTFLAGS="-Ctarget-cpu=native" cargo build --release

docs/source/user-guide/datasources.md

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,3 +35,79 @@ converted into Arrow format, allowing native execution to happen after that.
3535

3636
Comet does not provide native JSON scan, but when `spark.comet.convert.json.enabled` is enabled, data is immediately
3737
converted into Arrow format, allowing native execution to happen after that.
38+
39+
# Supported Storages
40+
41+
## Local
42+
In progress
43+
44+
## HDFS
45+
46+
Apache DataFusion Comet native reader seamlessly scans files from remote HDFS for [supported formats](#supported-spark-data-sources)
47+
48+
### Using experimental native DataFusion reader
49+
Unlike to native Comet reader the Datafusion reader fully supports nested types processing. This reader is currently experimental only
50+
51+
To build Comet with native DataFusion reader and remote HDFS support it is required to have a JDK installed
52+
53+
Example:
54+
Build a Comet for `spark-3.4` provide a JDK path in `JAVA_HOME`
55+
Provide the JRE linker path in `RUSTFLAGS`, the path can vary depending on the system. Typically JRE linker is a part of installed JDK
56+
57+
```shell
58+
export JAVA_HOME="/opt/homebrew/opt/openjdk@11"
59+
make release PROFILES="-Pspark-3.4" COMET_FEATURES=hdfs RUSTFLAGS="-L $JAVA_HOME/libexec/openjdk.jdk/Contents/Home/lib/server"
60+
```
61+
62+
Start Comet with experimental reader and HDFS support as [described](installation.md/#run-spark-shell-with-comet-enabled)
63+
and add additional parameters
64+
65+
```shell
66+
--conf spark.comet.scan.impl=native_datafusion \
67+
--conf spark.hadoop.fs.defaultFS="hdfs://namenode:9000" \
68+
--conf spark.hadoop.dfs.client.use.datanode.hostname = true \
69+
--conf dfs.client.use.datanode.hostname = true
70+
```
71+
72+
Query a struct type from Remote HDFS
73+
```shell
74+
spark.read.parquet("hdfs://namenode:9000/user/data").show(false)
75+
76+
root
77+
|-- id: integer (nullable = true)
78+
|-- first_name: string (nullable = true)
79+
|-- personal_info: struct (nullable = true)
80+
| |-- firstName: string (nullable = true)
81+
| |-- lastName: string (nullable = true)
82+
| |-- ageInYears: integer (nullable = true)
83+
84+
25/01/30 16:50:43 INFO core/src/lib.rs: Comet native library version 0.6.0 initialized
85+
== Physical Plan ==
86+
* CometColumnarToRow (2)
87+
+- CometNativeScan: (1)
88+
89+
90+
(1) CometNativeScan:
91+
Output [3]: [id#0, first_name#1, personal_info#4]
92+
Arguments: [id#0, first_name#1, personal_info#4]
93+
94+
(2) CometColumnarToRow [codegen id : 1]
95+
Input [3]: [id#0, first_name#1, personal_info#4]
96+
97+
98+
25/01/30 16:50:44 INFO fs-hdfs-0.1.12/src/hdfs.rs: Connecting to Namenode (hdfs://namenode:9000)
99+
+---+----------+-----------------+
100+
|id |first_name|personal_info |
101+
+---+----------+-----------------+
102+
|2 |Jane |{Jane, Smith, 34}|
103+
|1 |John |{John, Doe, 28} |
104+
+---+----------+-----------------+
105+
106+
107+
108+
```
109+
110+
Verify the native scan type should be `CometNativeScan`.
111+
112+
## S3
113+
In progress

native/Cargo.lock

Lines changed: 119 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

native/core/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ datafusion-comet-proto = { workspace = true }
7777
object_store = { workspace = true }
7878
url = { workspace = true }
7979
chrono = { workspace = true }
80+
datafusion-objectstore-hdfs = { git = "https://github.com/comphead/datafusion-objectstore-hdfs", branch = "master", optional = true }
8081

8182
[dev-dependencies]
8283
pprof = { version = "0.14.0", features = ["flamegraph"] }
@@ -88,6 +89,7 @@ hex = "0.4.3"
8889

8990
[features]
9091
default = []
92+
hdfs = ["datafusion-objectstore-hdfs"]
9193

9294
[lib]
9395
name = "comet"

native/core/src/execution/planner.rs

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctio
7373

7474
use crate::execution::shuffle::CompressionCodec;
7575
use crate::execution::spark_plan::SparkPlan;
76-
use crate::parquet::parquet_support::SparkParquetOptions;
76+
use crate::parquet::parquet_support::{register_object_store, SparkParquetOptions};
7777
use crate::parquet::schema_adapter::SparkSchemaAdapterFactory;
7878
use datafusion::datasource::listing::PartitionedFile;
7979
use datafusion::datasource::physical_plan::parquet::ParquetExecBuilder;
@@ -1155,12 +1155,9 @@ impl PhysicalPlanner {
11551155
))
11561156
});
11571157

1158-
let object_store = object_store::local::LocalFileSystem::new();
1159-
// register the object store with the runtime environment
1160-
let url = Url::try_from("file://").unwrap();
1161-
self.session_ctx
1162-
.runtime_env()
1163-
.register_object_store(&url, Arc::new(object_store));
1158+
// By default, local FS object store registered
1159+
// if `hdfs` feature enabled then HDFS file object store registered
1160+
register_object_store(Arc::clone(&self.session_ctx))?;
11641161

11651162
// Generate file groups
11661163
let mut file_groups: Vec<Vec<PartitionedFile>> =
@@ -1220,7 +1217,7 @@ impl PhysicalPlanner {
12201217
// TODO: I think we can remove partition_count in the future, but leave for testing.
12211218
assert_eq!(file_groups.len(), partition_count);
12221219

1223-
let object_store_url = ObjectStoreUrl::local_filesystem();
1220+
let object_store_url = ObjectStoreUrl::parse("hdfs://namenode:9000").unwrap();
12241221
let partition_fields: Vec<Field> = partition_schema
12251222
.fields()
12261223
.iter()

0 commit comments

Comments
 (0)