Skip to content

Commit 439944b

Browse files
authored
feat: Support hadoop s3a config in native_iceberg_compat (apache#1925)
* Support hadoop s3a config in native_iceberg_compat
1 parent 94ca968 commit 439944b

File tree

7 files changed

+54
-25
lines changed

7 files changed

+54
-25
lines changed

common/src/main/java/org/apache/comet/parquet/Native.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.apache.comet.parquet;
2121

2222
import java.nio.ByteBuffer;
23+
import java.util.Map;
2324

2425
import org.apache.comet.NativeBase;
2526

@@ -258,7 +259,8 @@ public static native long initRecordBatchReader(
258259
byte[] dataSchema,
259260
String sessionTimezone,
260261
int batchSize,
261-
boolean caseSensitive);
262+
boolean caseSensitive,
263+
Map<String, String> objectStoreOptions);
262264

263265
// arrow native version of read batch
264266
/**

common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@
7575

7676
import org.apache.comet.CometConf;
7777
import org.apache.comet.CometSchemaImporter;
78+
import org.apache.comet.objectstore.NativeConfig;
7879
import org.apache.comet.shims.ShimBatchReader;
7980
import org.apache.comet.shims.ShimFileFormat;
8081
import org.apache.comet.vector.CometVector;
@@ -253,6 +254,9 @@ public void init() throws Throwable {
253254
}
254255
ParquetReadOptions readOptions = builder.build();
255256

257+
Map<String, String> objectStoreOptions =
258+
JavaConverters.mapAsJavaMap(NativeConfig.extractObjectStoreOptions(conf, file.pathUri()));
259+
256260
// TODO: enable off-heap buffer when they are ready
257261
ReadOptions cometReadOptions = ReadOptions.builder(conf).build();
258262

@@ -420,7 +424,8 @@ public void init() throws Throwable {
420424
serializedDataArrowSchema,
421425
timeZoneId,
422426
batchSize,
423-
caseSensitive);
427+
caseSensitive,
428+
objectStoreOptions);
424429
}
425430
isInitialized = true;
426431
}

spark/src/main/scala/org/apache/comet/objectstore/NativeConfig.scala renamed to common/src/main/scala/org/apache/comet/objectstore/NativeConfig.scala

File renamed without changes.

docs/source/user-guide/datasources.md

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -163,9 +163,9 @@ DataFusion Comet has [multiple Parquet scan implementations](./compatibility.md#
163163

164164
The default `native_comet` Parquet scan implementation reads data from S3 using the [Hadoop-AWS module](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html), which is identical to the approach commonly used with vanilla Spark. AWS credential configuration and other Hadoop S3A configurations works the same way as in vanilla Spark.
165165

166-
### `native_datafusion`
166+
### `native_datafusion` and `native_iceberg_compat`
167167

168-
The `native_datafusion` Parquet scan implementation completely offloads data loading to native code. It uses the [`object_store` crate](https://crates.io/crates/object_store) to read data from S3 and supports configuring S3 access using standard [Hadoop S3A configurations](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#General_S3A_Client_configuration) by translating them to the `object_store` crate's format.
168+
The `native_datafusion` and `native_iceberg_compat` Parquet scan implementations completely offload data loading to native code. They use the [`object_store` crate](https://crates.io/crates/object_store) to read data from S3 and support configuring S3 access using standard [Hadoop S3A configurations](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#General_S3A_Client_configuration) by translating them to the `object_store` crate's format.
169169

170170
This implementation maintains compatibility with existing Hadoop S3A configurations, so existing code will continue to work as long as the configurations are supported and can be translated without loss of functionality.
171171

@@ -240,6 +240,3 @@ The S3 support of `native_datafusion` has the following limitations:
240240

241241
2. **Custom credential providers**: Custom implementations of AWS credential providers are not supported. The implementation only supports the standard credential providers listed in the table above. We are planning to add support for custom credential providers through a JNI-based adapter that will allow calling Java credential providers from native code. See [issue #1829](https://github.com/apache/datafusion-comet/issues/1829) for more details.
242242

243-
### `native_iceberg_compat`
244-
245-
The `native_iceberg_compat` Parquet scan implementation does not support reading data from S3 yet, but we are working on it.

native/core/src/parquet/mod.rs

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ pub mod schema_adapter;
2828

2929
mod objectstore;
3030

31+
use std::collections::HashMap;
3132
use std::task::Poll;
3233
use std::{boxed::Box, ptr::NonNull, sync::Arc};
3334

@@ -53,15 +54,17 @@ use crate::execution::serde;
5354
use crate::execution::utils::SparkArrowConvert;
5455
use crate::parquet::data_type::AsBytes;
5556
use crate::parquet::parquet_exec::init_datasource_exec;
56-
use crate::parquet::parquet_support::prepare_object_store;
57+
use crate::parquet::parquet_support::prepare_object_store_with_configs;
5758
use arrow::array::{Array, RecordBatch};
5859
use arrow::buffer::{Buffer, MutableBuffer};
5960
use datafusion::datasource::listing::PartitionedFile;
6061
use datafusion::execution::SendableRecordBatchStream;
6162
use datafusion::physical_plan::ExecutionPlan;
6263
use datafusion::prelude::{SessionConfig, SessionContext};
6364
use futures::{poll, StreamExt};
64-
use jni::objects::{JBooleanArray, JByteArray, JLongArray, JPrimitiveArray, JString, ReleaseMode};
65+
use jni::objects::{
66+
JBooleanArray, JByteArray, JLongArray, JMap, JObject, JPrimitiveArray, JString, ReleaseMode,
67+
};
6568
use jni::sys::{jstring, JNI_FALSE};
6669
use object_store::path::Path;
6770
use read::ColumnReader;
@@ -644,6 +647,25 @@ fn get_file_groups_single_file(
644647
vec![groups]
645648
}
646649

650+
pub fn get_object_store_options(
651+
env: &mut JNIEnv,
652+
map_object: JObject,
653+
) -> Result<HashMap<String, String>, CometError> {
654+
let map = JMap::from_env(env, &map_object)?;
655+
// Convert to a HashMap
656+
let mut collected_map = HashMap::new();
657+
map.iter(env).and_then(|mut iter| {
658+
while let Some((key, value)) = iter.next(env)? {
659+
let key_string: String = String::from(env.get_string(&JString::from(key))?);
660+
let value_string: String = String::from(env.get_string(&JString::from(value))?);
661+
collected_map.insert(key_string, value_string);
662+
}
663+
Ok(())
664+
})?;
665+
666+
Ok(collected_map)
667+
}
668+
647669
/// # Safety
648670
/// This function is inherently unsafe since it deals with raw pointers passed from JNI.
649671
#[no_mangle]
@@ -660,6 +682,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBat
660682
session_timezone: jstring,
661683
batch_size: jint,
662684
case_sensitive: jboolean,
685+
object_store_options: jobject,
663686
) -> jlong {
664687
try_unwrap_or_throw(&e, |mut env| unsafe {
665688
let session_config = SessionConfig::new().with_batch_size(batch_size as usize);
@@ -672,8 +695,13 @@ pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBat
672695
.unwrap()
673696
.into();
674697

675-
let (object_store_url, object_store_path) =
676-
prepare_object_store(session_ctx.runtime_env(), path.clone())?;
698+
let object_store_config =
699+
get_object_store_options(&mut env, JObject::from_raw(object_store_options))?;
700+
let (object_store_url, object_store_path) = prepare_object_store_with_configs(
701+
session_ctx.runtime_env(),
702+
path.clone(),
703+
&object_store_config,
704+
)?;
677705

678706
let required_schema_array = JByteArray::from_raw(required_schema);
679707
let required_schema_buffer = env.convert_byte_array(&required_schema_array)?;

native/core/src/parquet/parquet_support.rs

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -360,14 +360,6 @@ fn parse_hdfs_url(_url: &Url) -> Result<(Box<dyn ObjectStore>, Path), object_sto
360360
})
361361
}
362362

363-
/// Parses the url, registers the object store, and returns a tuple of the object store url and object store path
364-
pub(crate) fn prepare_object_store(
365-
runtime_env: Arc<RuntimeEnv>,
366-
url: String,
367-
) -> Result<(ObjectStoreUrl, Path), ExecutionError> {
368-
prepare_object_store_with_configs(runtime_env, url, &HashMap::new())
369-
}
370-
371363
/// Parses the url, registers the object store with configurations, and returns a tuple of the object store url
372364
/// and object store path
373365
pub(crate) fn prepare_object_store_with_configs(
@@ -406,13 +398,23 @@ pub(crate) fn prepare_object_store_with_configs(
406398

407399
#[cfg(test)]
408400
mod tests {
409-
use crate::parquet::parquet_support::prepare_object_store;
401+
use crate::execution::operators::ExecutionError;
402+
use crate::parquet::parquet_support::prepare_object_store_with_configs;
410403
use datafusion::execution::object_store::ObjectStoreUrl;
411404
use datafusion::execution::runtime_env::RuntimeEnv;
412405
use object_store::path::Path;
406+
use std::collections::HashMap;
413407
use std::sync::Arc;
414408
use url::Url;
415409

410+
/// Parses the url, registers the object store, and returns a tuple of the object store url and object store path
411+
pub(crate) fn prepare_object_store(
412+
runtime_env: Arc<RuntimeEnv>,
413+
url: String,
414+
) -> Result<(ObjectStoreUrl, Path), ExecutionError> {
415+
prepare_object_store_with_configs(runtime_env, url, &HashMap::new())
416+
}
417+
416418
#[cfg(not(feature = "hdfs"))]
417419
#[test]
418420
fn test_prepare_object_store() {

spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromS3Suite.scala

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,6 @@ import org.apache.spark.sql.comet.CometScanExec
3434
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
3535
import org.apache.spark.sql.functions.{col, sum}
3636

37-
import org.apache.comet.CometConf.SCAN_NATIVE_ICEBERG_COMPAT
38-
3937
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials
4038
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider
4139
import software.amazon.awssdk.services.s3.S3Client
@@ -105,9 +103,6 @@ class ParquetReadFromS3Suite extends CometTestBase with AdaptiveSparkPlanHelper
105103
}
106104

107105
test("read parquet file from MinIO") {
108-
// native_iceberg_compat mode does not have comprehensive S3 support, so we don't run tests
109-
// under this mode.
110-
assume(sys.env.getOrElse("COMET_PARQUET_SCAN_IMPL", "") != SCAN_NATIVE_ICEBERG_COMPAT)
111106

112107
val testFilePath = s"s3a://$testBucketName/data/test-file.parquet"
113108
writeTestParquetFile(testFilePath)

0 commit comments

Comments
 (0)