Skip to content

Commit 3e93995

Browse files
committed
experiments with Spark conf
1 parent 69537c5 commit 3e93995

File tree

9 files changed

+170
-48
lines changed

9 files changed

+170
-48
lines changed

native/core/src/config.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use datafusion::common::extensions_options;
19+
use datafusion::config::ConfigExtension;
20+
21+
extensions_options! {
22+
pub struct CometNativeOptions {
23+
// Default File System
24+
pub default_fs: Option<String>, default = None
25+
}
26+
}
27+
28+
impl ConfigExtension for CometNativeOptions {
29+
const PREFIX: &'static str = "comet";
30+
}

native/core/src/execution/jni_api.rs

Lines changed: 28 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use crate::{
2929
use arrow::array::RecordBatch;
3030
use arrow::datatypes::DataType as ArrowDataType;
3131
use datafusion::common::ScalarValue;
32+
use datafusion::config::Extensions;
3233
use datafusion::execution::memory_pool::{
3334
FairSpillPool, GreedyMemoryPool, MemoryPool, TrackConsumersPool, UnboundedMemoryPool,
3435
};
@@ -41,7 +42,8 @@ use datafusion::{
4142
use datafusion_comet_proto::spark_operator::Operator;
4243
use futures::poll;
4344
use futures::stream::StreamExt;
44-
use jni::objects::{JByteBuffer, JMap};
45+
use jni::objects::JByteBuffer;
46+
use jni::objects::JMap;
4547
use jni::sys::JNI_FALSE;
4648
use jni::{
4749
errors::Result as JNIResult,
@@ -63,6 +65,7 @@ use std::time::{Duration, Instant};
6365
use std::{collections::HashMap, sync::Arc, task::Poll};
6466
use tokio::runtime::Runtime;
6567

68+
use crate::config::CometNativeOptions;
6669
use crate::execution::fair_memory_pool::CometFairMemoryPool;
6770
use crate::execution::operators::ScanExec;
6871
use crate::execution::shuffle::{read_ipc_compressed, CompressionCodec};
@@ -127,8 +130,6 @@ struct ExecutionContext {
127130
pub explain_native: bool,
128131
/// Memory pool config
129132
pub memory_pool_config: MemoryPoolConfig,
130-
/// Apache Spark config
131-
pub spark_config: HashMap<String, String>,
132133
}
133134

134135
#[derive(PartialEq, Eq)]
@@ -249,11 +250,28 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
249250
local_dirs.push(local_dir.into());
250251
}
251252

252-
// We need to keep the session context alive. Some session state like temporary
253-
// dictionaries are stored in session context. If it is dropped, the temporary
253+
// Read Apache Spark runtime config and memoize them in Comet options
254+
let spark_conf_map = JMap::from_env(&mut env, &spark_conf)?;
255+
let mut spark_conf_iter = spark_conf_map.iter(&mut env)?;
256+
let mut comet_options = CometNativeOptions::default();
257+
while let Some((key, value)) = spark_conf_iter.next(&mut env)? {
258+
let key: String = env.get_string(&JString::from(key)).unwrap().into();
259+
if key == "spark.hadoop.fs.defaultFS" {
260+
let value: String = env.get_string(&JString::from(value)).unwrap().into();
261+
comet_options.default_fs = Some(value);
262+
}
263+
}
264+
265+
// We need to keep the session context alive. Some session states,
266+
// like temporary dictionaries, are stored in the
267+
// session context. If it is dropped, the temporary
254268
// dictionaries will be dropped as well.
255-
let session =
256-
prepare_datafusion_session_context(batch_size as usize, memory_pool, local_dirs)?;
269+
let session = prepare_datafusion_session_context(
270+
batch_size as usize,
271+
memory_pool,
272+
local_dirs,
273+
Arc::from(comet_options),
274+
)?;
257275

258276
let plan_creation_time = start.elapsed();
259277

@@ -263,17 +281,6 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
263281
None
264282
};
265283

266-
// Read Apache Spark runtime config
267-
let spark_conf_map = JMap::from_env(&mut env, &spark_conf)?;
268-
let mut spark_conf_iter = spark_conf_map.iter(&mut env)?;
269-
let mut spark_conf = HashMap::new();
270-
271-
while let Some((key, value)) = spark_conf_iter.next(&mut env)? {
272-
let key: String = env.get_string(&JString::from(key)).unwrap().into();
273-
let value: String = env.get_string(&JString::from(value)).unwrap().into();
274-
spark_conf.insert(key, value);
275-
}
276-
277284
let exec_context = Box::new(ExecutionContext {
278285
id,
279286
task_attempt_id,
@@ -291,7 +298,6 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
291298
debug_native: debug_native == 1,
292299
explain_native: explain_native == 1,
293300
memory_pool_config,
294-
spark_config: spark_conf,
295301
});
296302

297303
Ok(Box::into_raw(exec_context) as i64)
@@ -303,6 +309,7 @@ fn prepare_datafusion_session_context(
303309
batch_size: usize,
304310
memory_pool: Arc<dyn MemoryPool>,
305311
local_dirs: Vec<String>,
312+
comet_opts: Arc<CometNativeOptions>,
306313
) -> CometResult<SessionContext> {
307314
let disk_manager_config =
308315
DiskManagerConfig::NewSpecified(local_dirs.into_iter().map(PathBuf::from).collect());
@@ -323,7 +330,8 @@ fn prepare_datafusion_session_context(
323330
// maximum value is 1.0, so we set the threshold a little higher just
324331
// to be safe
325332
&ScalarValue::Float64(Some(1.1)),
326-
);
333+
)
334+
.with_extension(comet_opts);
327335

328336
#[allow(deprecated)]
329337
let runtime = RuntimeEnv::try_new(rt_config)?;

native/core/src/execution/planner.rs

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1113,8 +1113,9 @@ impl PhysicalPlanner {
11131113
.and_then(|f| f.partitioned_file.first())
11141114
.map(|f| f.file_path.clone())
11151115
.ok_or(GeneralError("Failed to locate file".to_string()))?;
1116+
11161117
let (object_store_url, _) =
1117-
prepare_object_store(self.session_ctx.runtime_env(), one_file)?;
1118+
prepare_object_store(Arc::clone(&self.session_ctx), one_file)?;
11181119

11191120
// Generate file groups
11201121
let mut file_groups: Vec<Vec<PartitionedFile>> =
@@ -2501,6 +2502,10 @@ fn create_case_expr(
25012502

25022503
#[cfg(test)]
25032504
mod tests {
2505+
use datafusion::config::ConfigExtension;
2506+
use datafusion::config::ConfigOptions;
2507+
use datafusion::config::Extensions;
2508+
use datafusion::prelude::SessionConfig;
25042509
use std::{sync::Arc, task::Poll};
25052510

25062511
use futures::{poll, StreamExt};
@@ -2953,4 +2958,38 @@ mod tests {
29532958
}
29542959
});
29552960
}
2961+
2962+
use datafusion::common::extensions_options;
2963+
2964+
extensions_options! {
2965+
struct CometNativeOptions {
2966+
// Default File System
2967+
default_fs: Option<String>, default = Some("123".to_string())
2968+
}
2969+
}
2970+
2971+
impl ConfigExtension for CometNativeOptions {
2972+
const PREFIX: &'static str = "comet";
2973+
}
2974+
2975+
#[test]
2976+
fn test_create_array1() {
2977+
//let mut extensions = Extensions::new();
2978+
//extensions.insert(CometNativeOptions::default());
2979+
2980+
#[derive(Debug)]
2981+
struct Ext1 {
2982+
default_fs: String,
2983+
}
2984+
2985+
let ext1a = Arc::new(Ext1 {
2986+
default_fs: "123".to_string(),
2987+
});
2988+
2989+
//let config = SessionConfig::from(ConfigOptions::new().with_extensions(extensions));
2990+
let config = SessionConfig::default().with_extension(Arc::clone(&ext1a));
2991+
let session_ctx = SessionContext::new_with_config(config);
2992+
2993+
dbg!(session_ctx.state().config().get_extension::<Ext1>());
2994+
}
29562995
}

native/core/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ use errors::{try_unwrap_or_throw, CometError, CometResult};
4646
mod errors;
4747
#[macro_use]
4848
pub mod common;
49+
mod config;
4950
pub mod execution;
5051
mod jvm_bridge;
5152
pub mod parquet;

native/core/src/parquet/mod.rs

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,11 @@ pub mod parquet_support;
2626
pub mod read;
2727
pub mod schema_adapter;
2828

29+
use crate::errors::{try_unwrap_or_throw, CometError};
30+
use std::collections::HashMap;
2931
use std::task::Poll;
3032
use std::{boxed::Box, ptr::NonNull, sync::Arc};
3133

32-
use crate::errors::{try_unwrap_or_throw, CometError};
33-
3434
use arrow::ffi::FFI_ArrowArray;
3535

3636
/// JNI exposed methods
@@ -59,7 +59,9 @@ use datafusion::execution::SendableRecordBatchStream;
5959
use datafusion::physical_plan::ExecutionPlan;
6060
use datafusion::prelude::{SessionConfig, SessionContext};
6161
use futures::{poll, StreamExt};
62-
use jni::objects::{JBooleanArray, JByteArray, JLongArray, JPrimitiveArray, JString, ReleaseMode};
62+
use jni::objects::{
63+
JBooleanArray, JByteArray, JLongArray, JMap, JObject, JPrimitiveArray, JString, ReleaseMode,
64+
};
6365
use jni::sys::jstring;
6466
use object_store::path::Path;
6567
use read::ColumnReader;
@@ -657,6 +659,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBat
657659
data_schema: jbyteArray,
658660
session_timezone: jstring,
659661
batch_size: jint,
662+
spark_conf: JObject,
660663
) -> jlong {
661664
try_unwrap_or_throw(&e, |mut env| unsafe {
662665
let session_config = SessionConfig::new().with_batch_size(batch_size as usize);
@@ -670,7 +673,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBat
670673
.into();
671674

672675
let (object_store_url, object_store_path) =
673-
prepare_object_store(session_ctx.runtime_env(), path.clone())?;
676+
prepare_object_store(Arc::clone(session_ctx), path.clone())?;
674677

675678
let required_schema_array = JByteArray::from_raw(required_schema);
676679
let required_schema_buffer = env.convert_byte_array(&required_schema_array)?;
@@ -706,6 +709,17 @@ pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBat
706709
.unwrap()
707710
.into();
708711

712+
// Read Apache Spark runtime config
713+
let spark_conf_map = JMap::from_env(&mut env, &spark_conf)?;
714+
let mut spark_conf_iter = spark_conf_map.iter(&mut env)?;
715+
let mut spark_conf = HashMap::new();
716+
717+
while let Some((key, value)) = spark_conf_iter.next(&mut env)? {
718+
let key: String = env.get_string(&JString::from(key)).unwrap().into();
719+
let value: String = env.get_string(&JString::from(value)).unwrap().into();
720+
spark_conf.insert(key, value);
721+
}
722+
709723
let scan = init_datasource_exec(
710724
required_schema,
711725
Some(data_schema),

native/core/src/parquet/parquet_support.rs

Lines changed: 33 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use crate::config::CometNativeOptions;
1819
use crate::execution::operators::ExecutionError;
20+
use crate::parquet::SessionContext;
1921
use arrow::array::{new_null_array, DictionaryArray, StructArray};
2022
use arrow::datatypes::DataType;
2123
use arrow::{
@@ -25,7 +27,6 @@ use arrow::{
2527
};
2628
use datafusion::common::{Result as DataFusionResult, ScalarValue};
2729
use datafusion::execution::object_store::ObjectStoreUrl;
28-
use datafusion::execution::runtime_env::RuntimeEnv;
2930
use datafusion::physical_plan::ColumnarValue;
3031
use datafusion_comet_spark_expr::utils::array_with_timezone;
3132
use datafusion_comet_spark_expr::EvalMode;
@@ -213,7 +214,7 @@ fn cast_struct_to_struct(
213214
}
214215
}
215216

216-
// Mirrors object_store::parse::parse_url for the hdfs object store
217+
// Mirrors object_store::parse::parse_hdfs_url for the hdfs object store
217218
#[cfg(feature = "hdfs")]
218219
fn parse_hdfs_url(url: &Url) -> Result<(Box<dyn ObjectStore>, Path), object_store::Error> {
219220
match datafusion_comet_objectstore_hdfs::object_store::hdfs::HadoopFileSystem::new(url.as_ref())
@@ -239,24 +240,36 @@ fn parse_hdfs_url(_url: &Url) -> Result<(Box<dyn ObjectStore>, Path), object_sto
239240

240241
/// Parses the url, registers the object store, and returns a tuple of the object store url and object store path
241242
pub(crate) fn prepare_object_store(
242-
runtime_env: Arc<RuntimeEnv>,
243+
session_ctx: Arc<SessionContext>,
243244
url: String,
244245
) -> Result<(ObjectStoreUrl, Path), ExecutionError> {
245246
let mut url = Url::parse(url.as_str())
246247
.map_err(|e| ExecutionError::GeneralError(format!("Error parsing URL {url}: {e}")))?;
248+
247249
let mut scheme = url.scheme();
250+
248251
if scheme == "s3a" {
249252
scheme = "s3";
250253
url.set_scheme("s3").map_err(|_| {
251254
ExecutionError::GeneralError("Could not convert scheme from s3a to s3".to_string())
252255
})?;
253256
}
257+
254258
let url_key = format!(
255259
"{}://{}",
256260
scheme,
257261
&url[url::Position::BeforeHost..url::Position::AfterPort],
258262
);
259263

264+
dbg!(&url_key);
265+
266+
// let x = match session_ctx.state().config().get_extension::<CometNativeOptions>() {
267+
// Some(opts) if scheme.is_empty() && opts.default_fs.is_some() => {
268+
// Url::parse(format!("{}/{}", opts.default_fs.as_deref().unwrap()))
269+
// },
270+
// _ => url
271+
// };
272+
260273
let (object_store, object_store_path): (Box<dyn ObjectStore>, Path) = if scheme == "hdfs" {
261274
parse_hdfs_url(&url)
262275
} else {
@@ -265,15 +278,20 @@ pub(crate) fn prepare_object_store(
265278
.map_err(|e| ExecutionError::GeneralError(e.to_string()))?;
266279

267280
let object_store_url = ObjectStoreUrl::parse(url_key.clone())?;
268-
runtime_env.register_object_store(&url, Arc::from(object_store));
281+
session_ctx
282+
.runtime_env()
283+
.register_object_store(&url, Arc::from(object_store));
269284
Ok((object_store_url, object_store_path))
270285
}
271286

272287
#[cfg(test)]
273288
mod tests {
289+
use crate::config::CometNativeOptions;
274290
use crate::parquet::parquet_support::prepare_object_store;
291+
use datafusion::common::ScalarValue;
275292
use datafusion::execution::object_store::ObjectStoreUrl;
276293
use datafusion::execution::runtime_env::RuntimeEnv;
294+
use datafusion::prelude::{SessionConfig, SessionContext};
277295
use object_store::path::Path;
278296
use std::sync::Arc;
279297
use url::Url;
@@ -306,7 +324,7 @@ mod tests {
306324

307325
for (i, url_str) in all_urls.iter().enumerate() {
308326
let url = &Url::parse(url_str).unwrap();
309-
let res = prepare_object_store(Arc::new(RuntimeEnv::default()), url.to_string());
327+
let res = prepare_object_store(Arc::new(SessionContext::default()), url.to_string());
310328

311329
let expected = expected.get(i).unwrap();
312330
match expected {
@@ -327,21 +345,15 @@ mod tests {
327345
}
328346

329347
#[test]
330-
#[cfg(feature = "hdfs")]
331-
fn test_prepare_object_store() {
332-
// we use a local file system url instead of an hdfs url because the latter requires
333-
// a running namenode
334-
let hdfs_url = "file:///comet/spark-warehouse/part-00000.snappy.parquet";
335-
let expected: (ObjectStoreUrl, Path) = (
336-
ObjectStoreUrl::parse("file://").unwrap(),
337-
Path::from("/comet/spark-warehouse/part-00000.snappy.parquet"),
338-
);
339-
340-
let url = &Url::parse(hdfs_url).unwrap();
341-
let res = prepare_object_store(Arc::new(RuntimeEnv::default()), url.to_string());
342-
343-
let res = res.unwrap();
344-
assert_eq!(res.0, expected.0);
345-
assert_eq!(res.1, expected.1);
348+
fn test_prepare_object_store_with_default_fs() {
349+
let unqualified_url = "file1.parquet";
350+
let comet_options = CometNativeOptions {
351+
default_fs: Some("hdfs://namenode:9000".to_string()),
352+
..Default::default()
353+
};
354+
let session_config = SessionConfig::new().with_extension(Arc::from(comet_options));
355+
let session = SessionContext::new_with_config(session_config);
356+
let object_store = prepare_object_store(Arc::new(session), unqualified_url.to_string());
357+
dbg!(object_store);
346358
}
347359
}

0 commit comments

Comments
 (0)