Skip to content

Commit de0be4b

Browse files
authored
feat: add experimental remote HDFS support for native DataFusion reader (#1359)
* feat: add experimental remote HDFS support for native DataFusion reader
1 parent 19f07b0 commit de0be4b

File tree

4 files changed

+125
-11
lines changed

4 files changed

+125
-11
lines changed

Makefile

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ define spark_jvm_17_extra_args
2121
$(shell ./mvnw help:evaluate -Dexpression=extraJavaTestArgs | grep -v '\[')
2222
endef
2323

24+
# Build optional Comet native features (like hdfs e.g)
25+
FEATURES_ARG := $(shell ! [ -z $(COMET_FEATURES) ] && echo '--features=$(COMET_FEATURES)')
26+
2427
all: core jvm
2528

2629
core:
@@ -95,7 +98,7 @@ release-linux: clean
9598
cd native && RUSTFLAGS="-Ctarget-cpu=native -Ctarget-feature=-prefer-256-bit" cargo build --release
9699
./mvnw install -Prelease -DskipTests $(PROFILES)
97100
release:
98-
cd native && RUSTFLAGS="-Ctarget-cpu=native" cargo build --release
101+
cd native && RUSTFLAGS="$(RUSTFLAGS) -Ctarget-cpu=native" cargo build --release $(FEATURES_ARG)
99102
./mvnw install -Prelease -DskipTests $(PROFILES)
100103
release-nogit:
101104
cd native && RUSTFLAGS="-Ctarget-cpu=native" cargo build --release

docs/source/user-guide/datasources.md

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,3 +35,81 @@ converted into Arrow format, allowing native execution to happen after that.
3535

3636
Comet does not provide native JSON scan, but when `spark.comet.convert.json.enabled` is enabled, data is immediately
3737
converted into Arrow format, allowing native execution to happen after that.
38+
39+
# Supported Storages
40+
41+
## Local
42+
In progress
43+
44+
## HDFS
45+
46+
Apache DataFusion Comet native reader seamlessly scans files from remote HDFS for [supported formats](#supported-spark-data-sources)
47+
48+
### Using experimental native DataFusion reader
49+
Unlike to native Comet reader the Datafusion reader fully supports nested types processing. This reader is currently experimental only
50+
51+
To build Comet with native DataFusion reader and remote HDFS support it is required to have a JDK installed
52+
53+
Example:
54+
Build a Comet for `spark-3.4` provide a JDK path in `JAVA_HOME`
55+
Provide the JRE linker path in `RUSTFLAGS`, the path can vary depending on the system. Typically JRE linker is a part of installed JDK
56+
57+
```shell
58+
export JAVA_HOME="/opt/homebrew/opt/openjdk@11"
59+
make release PROFILES="-Pspark-3.4" COMET_FEATURES=hdfs RUSTFLAGS="-L $JAVA_HOME/libexec/openjdk.jdk/Contents/Home/lib/server"
60+
```
61+
62+
Start Comet with experimental reader and HDFS support as [described](installation.md/#run-spark-shell-with-comet-enabled)
63+
and add additional parameters
64+
65+
```shell
66+
--conf spark.comet.scan.impl=native_datafusion \
67+
--conf spark.hadoop.fs.defaultFS="hdfs://namenode:9000" \
68+
--conf spark.hadoop.dfs.client.use.datanode.hostname = true \
69+
--conf dfs.client.use.datanode.hostname = true
70+
```
71+
72+
Query a struct type from Remote HDFS
73+
```shell
74+
spark.read.parquet("hdfs://namenode:9000/user/data").show(false)
75+
76+
root
77+
|-- id: integer (nullable = true)
78+
|-- first_name: string (nullable = true)
79+
|-- personal_info: struct (nullable = true)
80+
| |-- firstName: string (nullable = true)
81+
| |-- lastName: string (nullable = true)
82+
| |-- ageInYears: integer (nullable = true)
83+
84+
25/01/30 16:50:43 INFO core/src/lib.rs: Comet native library version 0.6.0 initialized
85+
== Physical Plan ==
86+
* CometColumnarToRow (2)
87+
+- CometNativeScan: (1)
88+
89+
90+
(1) CometNativeScan:
91+
Output [3]: [id#0, first_name#1, personal_info#4]
92+
Arguments: [id#0, first_name#1, personal_info#4]
93+
94+
(2) CometColumnarToRow [codegen id : 1]
95+
Input [3]: [id#0, first_name#1, personal_info#4]
96+
97+
98+
25/01/30 16:50:44 INFO fs-hdfs-0.1.12/src/hdfs.rs: Connecting to Namenode (hdfs://namenode:9000)
99+
+---+----------+-----------------+
100+
|id |first_name|personal_info |
101+
+---+----------+-----------------+
102+
|2 |Jane |{Jane, Smith, 34}|
103+
|1 |John |{John, Doe, 28} |
104+
+---+----------+-----------------+
105+
106+
107+
108+
```
109+
110+
Verify the native scan type should be `CometNativeScan`.
111+
112+
More on [HDFS Reader](../../../native/hdfs/README.md)
113+
114+
## S3
115+
In progress

native/core/src/execution/planner.rs

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctio
7474

7575
use crate::execution::shuffle::CompressionCodec;
7676
use crate::execution::spark_plan::SparkPlan;
77-
use crate::parquet::parquet_support::SparkParquetOptions;
77+
use crate::parquet::parquet_support::{register_object_store, SparkParquetOptions};
7878
use crate::parquet::schema_adapter::SparkSchemaAdapterFactory;
7979
use datafusion::datasource::listing::PartitionedFile;
8080
use datafusion::datasource::physical_plan::parquet::ParquetExecBuilder;
@@ -106,7 +106,6 @@ use datafusion_common::{
106106
tree_node::{Transformed, TransformedResult, TreeNode, TreeNodeRecursion, TreeNodeRewriter},
107107
JoinType as DFJoinType, ScalarValue,
108108
};
109-
use datafusion_execution::object_store::ObjectStoreUrl;
110109
use datafusion_expr::type_coercion::other::get_coerce_type_for_case_expression;
111110
use datafusion_expr::{
112111
AggregateUDF, ReturnTypeArgs, ScalarUDF, WindowFrame, WindowFrameBound, WindowFrameUnits,
@@ -1165,12 +1164,9 @@ impl PhysicalPlanner {
11651164
))
11661165
});
11671166

1168-
let object_store = object_store::local::LocalFileSystem::new();
1169-
// register the object store with the runtime environment
1170-
let url = Url::try_from("file://").unwrap();
1171-
self.session_ctx
1172-
.runtime_env()
1173-
.register_object_store(&url, Arc::new(object_store));
1167+
// By default, local FS object store registered
1168+
// if `hdfs` feature enabled then HDFS file object store registered
1169+
let object_store_url = register_object_store(Arc::clone(&self.session_ctx))?;
11741170

11751171
// Generate file groups
11761172
let mut file_groups: Vec<Vec<PartitionedFile>> =
@@ -1229,8 +1225,6 @@ impl PhysicalPlanner {
12291225

12301226
// TODO: I think we can remove partition_count in the future, but leave for testing.
12311227
assert_eq!(file_groups.len(), partition_count);
1232-
1233-
let object_store_url = ObjectStoreUrl::local_filesystem();
12341228
let partition_fields: Vec<Field> = partition_schema
12351229
.fields()
12361230
.iter()

native/core/src/parquet/parquet_support.rs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,19 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use crate::execution::operators::ExecutionError;
1819
use arrow::{
1920
array::{cast::AsArray, types::Int32Type, Array, ArrayRef},
2021
compute::{cast_with_options, take, CastOptions},
2122
util::display::FormatOptions,
2223
};
2324
use arrow_array::{DictionaryArray, StructArray};
2425
use arrow_schema::DataType;
26+
use datafusion::prelude::SessionContext;
2527
use datafusion_comet_spark_expr::utils::array_with_timezone;
2628
use datafusion_comet_spark_expr::EvalMode;
2729
use datafusion_common::{Result as DataFusionResult, ScalarValue};
30+
use datafusion_execution::object_store::ObjectStoreUrl;
2831
use datafusion_expr::ColumnarValue;
2932
use std::collections::HashMap;
3033
use std::{fmt::Debug, hash::Hash, sync::Arc};
@@ -195,3 +198,39 @@ fn cast_struct_to_struct(
195198
_ => unreachable!(),
196199
}
197200
}
201+
202+
// Default object store which is local filesystem
203+
#[cfg(not(feature = "hdfs"))]
204+
pub(crate) fn register_object_store(
205+
session_context: Arc<SessionContext>,
206+
) -> Result<ObjectStoreUrl, ExecutionError> {
207+
let object_store = object_store::local::LocalFileSystem::new();
208+
let url = ObjectStoreUrl::parse("file://")?;
209+
session_context
210+
.runtime_env()
211+
.register_object_store(url.as_ref(), Arc::new(object_store));
212+
Ok(url)
213+
}
214+
215+
// HDFS object store
216+
#[cfg(feature = "hdfs")]
217+
pub(crate) fn register_object_store(
218+
session_context: Arc<SessionContext>,
219+
) -> Result<ObjectStoreUrl, ExecutionError> {
220+
// TODO: read the namenode configuration from file schema or from spark.defaultFS
221+
let url = ObjectStoreUrl::parse("hdfs://namenode:9000")?;
222+
if let Some(object_store) =
223+
datafusion_comet_objectstore_hdfs::object_store::hdfs::HadoopFileSystem::new(url.as_ref())
224+
{
225+
session_context
226+
.runtime_env()
227+
.register_object_store(url.as_ref(), Arc::new(object_store));
228+
229+
return Ok(url);
230+
}
231+
232+
Err(ExecutionError::GeneralError(format!(
233+
"HDFS object store cannot be created for {}",
234+
url
235+
)))
236+
}

0 commit comments

Comments
 (0)