Skip to content

Commit c9ab291

Browse files
authored
fix: Fall back to native_comet when object store not supported by native_iceberg_compat (#2251)
1 parent bffe60b commit c9ab291

File tree

6 files changed

+209
-7
lines changed

6 files changed

+209
-7
lines changed

common/src/main/java/org/apache/comet/parquet/Native.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,13 @@ public static native void setPageV2(
241241
// to arrow.
242242
// Add batch size, datetimeRebaseModeSpec, metrics(how?)...
243243

244+
/**
245+
* Verify that object store options are valid. An exception will be thrown if the provided options
246+
* are not valid.
247+
*/
248+
public static native void validateObjectStoreConfig(
249+
String filePath, Map<String, String> objectStoreOptions);
250+
244251
/**
245252
* Initialize a record batch reader for a PartitionedFile
246253
*

docs/source/user-guide/compatibility.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,11 @@ The `native_datafusion` scan has some additional limitations:
7373
[#1545]: https://github.com/apache/datafusion-comet/issues/1545
7474
[#1758]: https://github.com/apache/datafusion-comet/issues/1758
7575

76+
### S3 Support with `native_iceberg_compat`
77+
78+
- When using the default AWS S3 endpoint (no custom endpoint configured), a valid region is required. Comet
79+
will attempt to resolve the region if it is not provided.
80+
7681
## ANSI Mode
7782

7883
Comet will fall back to Spark for the following expressions when ANSI mode is enabled, unless

native/core/src/parquet/mod.rs

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -666,6 +666,35 @@ pub fn get_object_store_options(
666666
Ok(collected_map)
667667
}
668668

669+
/// # Safety
670+
/// This function is inherently unsafe since it deals with raw pointers passed from JNI.
671+
#[no_mangle]
672+
pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_validateObjectStoreConfig(
673+
e: JNIEnv,
674+
_jclass: JClass,
675+
file_path: jstring,
676+
object_store_options: jobject,
677+
) {
678+
try_unwrap_or_throw(&e, |mut env| unsafe {
679+
let session_config = SessionConfig::new();
680+
let planner =
681+
PhysicalPlanner::new(Arc::new(SessionContext::new_with_config(session_config)), 0);
682+
let session_ctx = planner.session_ctx();
683+
let path: String = env
684+
.get_string(&JString::from_raw(file_path))
685+
.unwrap()
686+
.into();
687+
let object_store_config =
688+
get_object_store_options(&mut env, JObject::from_raw(object_store_options))?;
689+
let (_, _) = prepare_object_store_with_configs(
690+
session_ctx.runtime_env(),
691+
path.clone(),
692+
&object_store_config,
693+
)?;
694+
Ok(())
695+
})
696+
}
697+
669698
/// # Safety
670699
/// This function is inherently unsafe since it deals with raw pointers passed from JNI.
671700
#[no_mangle]
@@ -686,9 +715,9 @@ pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBat
686715
) -> jlong {
687716
try_unwrap_or_throw(&e, |mut env| unsafe {
688717
let session_config = SessionConfig::new().with_batch_size(batch_size as usize);
689-
let planer =
718+
let planner =
690719
PhysicalPlanner::new(Arc::new(SessionContext::new_with_config(session_config)), 0);
691-
let session_ctx = planer.session_ctx();
720+
let session_ctx = planner.session_ctx();
692721

693722
let path: String = env
694723
.get_string(&JString::from_raw(file_path))
@@ -716,7 +745,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBat
716745
let filter_buffer = env.convert_byte_array(&filter_array)?;
717746
let filter_expr = serde::deserialize_expr(filter_buffer.as_slice())?;
718747
Some(vec![
719-
planer.create_expr(&filter_expr, Arc::clone(&data_schema))?
748+
planner.create_expr(&filter_expr, Arc::clone(&data_schema))?
720749
])
721750
} else {
722751
None

native/core/src/parquet/objectstore/s3.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,12 @@ pub fn create_store(
6868
}
6969
let path = Path::parse(path)?;
7070

71+
if configs.contains_key("fs.s3a.endpoint") {
72+
return Err(object_store::Error::NotSupported {
73+
source: "Custom S3 endpoints are not supported".into(),
74+
});
75+
}
76+
7177
let mut builder = AmazonS3Builder::new()
7278
.with_url(url.to_string())
7379
.with_allow_http(true);

spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala

Lines changed: 80 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,13 @@
1919

2020
package org.apache.comet.rules
2121

22+
import java.net.URI
23+
24+
import scala.collection.{mutable, JavaConverters}
2225
import scala.collection.mutable.ListBuffer
2326

27+
import org.apache.hadoop.conf.Configuration
28+
import org.apache.spark.internal.Logging
2429
import org.apache.spark.sql.SparkSession
2530
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, GenericInternalRow, PlanExpression}
2631
import org.apache.spark.sql.catalyst.rules.Rule
@@ -34,17 +39,19 @@ import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
3439
import org.apache.spark.sql.internal.SQLConf
3540
import org.apache.spark.sql.types._
3641

37-
import org.apache.comet.{CometConf, DataTypeSupport}
42+
import org.apache.comet.{CometConf, CometNativeException, DataTypeSupport}
3843
import org.apache.comet.CometConf._
3944
import org.apache.comet.CometSparkSessionExtensions.{isCometLoaded, isCometScanEnabled, withInfo, withInfos}
4045
import org.apache.comet.DataTypeSupport.isComplexType
41-
import org.apache.comet.parquet.{CometParquetScan, SupportsComet}
46+
import org.apache.comet.objectstore.NativeConfig
47+
import org.apache.comet.parquet.{CometParquetScan, Native, SupportsComet}
4248
import org.apache.comet.shims.CometTypeShim
4349

4450
/**
4551
* Spark physical optimizer rule for replacing Spark scans with Comet scans.
4652
*/
4753
case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with CometTypeShim {
54+
import CometScanRule._
4855

4956
private lazy val showTransformations = CometConf.COMET_EXPLAIN_TRANSFORMATIONS.get()
5057

@@ -295,8 +302,17 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com
295302
val fallbackReasons = new ListBuffer[String]()
296303

297304
// native_iceberg_compat only supports local filesystem and S3
298-
if (!scanExec.relation.inputFiles
305+
if (scanExec.relation.inputFiles
299306
.forall(path => path.startsWith("file://") || path.startsWith("s3a://"))) {
307+
308+
val filePath = scanExec.relation.inputFiles.headOption
309+
if (filePath.exists(_.startsWith("s3a://"))) {
310+
validateObjectStoreConfig(
311+
filePath.get,
312+
session.sparkContext.hadoopConfiguration,
313+
fallbackReasons)
314+
}
315+
} else {
300316
fallbackReasons += s"$SCAN_NATIVE_ICEBERG_COMPAT only supports local filesystem and S3"
301317
}
302318

@@ -376,3 +392,64 @@ case class CometScanTypeChecker(scanImpl: String) extends DataTypeSupport with C
376392
}
377393
}
378394
}
395+
396+
object CometScanRule extends Logging {
397+
398+
/**
399+
* Validating object store configs can cause requests to be made to S3 APIs (such as when
400+
* resolving the region for a bucket). We use a cache to reduce the number of S3 calls.
401+
*
402+
* The key is the config map converted to a string. The value is the reason that the config is
403+
* not valid, or None if the config is valid.
404+
*/
405+
val configValidityMap = new mutable.HashMap[String, Option[String]]()
406+
407+
/**
408+
* We do not expect to see a large number of unique configs within the lifetime of a Spark
409+
* session, but we reset the cache once it reaches a fixed size to prevent it growing
410+
* indefinitely.
411+
*/
412+
val configValidityMapMaxSize = 1024
413+
414+
def validateObjectStoreConfig(
415+
filePath: String,
416+
hadoopConf: Configuration,
417+
fallbackReasons: mutable.ListBuffer[String]): Unit = {
418+
val objectStoreConfigMap =
419+
NativeConfig.extractObjectStoreOptions(hadoopConf, URI.create(filePath))
420+
421+
val cacheKey = objectStoreConfigMap
422+
.map { case (k, v) =>
423+
s"$k=$v"
424+
}
425+
.toList
426+
.sorted
427+
.mkString("\n")
428+
429+
if (configValidityMap.size >= configValidityMapMaxSize) {
430+
logWarning("Resetting S3 object store validity cache")
431+
configValidityMap.clear()
432+
}
433+
434+
configValidityMap.get(cacheKey) match {
435+
case Some(Some(reason)) =>
436+
fallbackReasons += reason
437+
throw new CometNativeException(reason)
438+
case Some(None) =>
439+
// previously validated
440+
case _ =>
441+
try {
442+
val objectStoreOptions = JavaConverters.mapAsJavaMap(objectStoreConfigMap)
443+
Native.validateObjectStoreConfig(filePath, objectStoreOptions)
444+
} catch {
445+
case e: Exception =>
446+
val reason = "Object store config not supported by " +
447+
s"$SCAN_NATIVE_ICEBERG_COMPAT: ${e.getMessage}"
448+
fallbackReasons += reason
449+
configValidityMap.put(cacheKey, Some(reason))
450+
throw e
451+
}
452+
}
453+
454+
}
455+
}

spark/src/test/scala/org/apache/comet/objectstore/NativeConfigSuite.scala

Lines changed: 79 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,23 @@ package org.apache.comet.objectstore
2121

2222
import java.net.URI
2323

24+
import scala.collection.mutable
25+
import scala.util.Try
26+
27+
import org.scalatest.BeforeAndAfterEach
2428
import org.scalatest.funsuite.AnyFunSuite
2529
import org.scalatest.matchers.should.Matchers
2630

2731
import org.apache.hadoop.conf.Configuration
2832

29-
class NativeConfigSuite extends AnyFunSuite with Matchers {
33+
import org.apache.comet.CometNativeException
34+
import org.apache.comet.rules.CometScanRule
35+
36+
class NativeConfigSuite extends AnyFunSuite with Matchers with BeforeAndAfterEach {
37+
38+
override protected def beforeEach(): Unit = {
39+
CometScanRule.configValidityMap.clear()
40+
}
3041

3142
test("extractObjectStoreOptions - multiple cloud provider configurations") {
3243
val hadoopConf = new Configuration()
@@ -70,4 +81,71 @@ class NativeConfigSuite extends AnyFunSuite with Matchers {
7081
new URI("unsupported://test-bucket/test-object"))
7182
assert(unsupportedOptions.isEmpty, "Unsupported scheme should return empty options")
7283
}
84+
85+
test("validate object store config - no provider") {
86+
val hadoopConf = new Configuration()
87+
validate(hadoopConf)
88+
}
89+
90+
test("validate object store config - valid providers") {
91+
val hadoopConf = new Configuration()
92+
val provider1 = "com.amazonaws.auth.EnvironmentVariableCredentialsProvider"
93+
val provider2 = "com.amazonaws.auth.WebIdentityTokenCredentialsProvider"
94+
hadoopConf.set("fs.s3a.aws.credentials.provider", Seq(provider1, provider2).mkString(","))
95+
validate(hadoopConf)
96+
}
97+
98+
test("validate object store config - invalid provider") {
99+
val hadoopConf = new Configuration()
100+
hadoopConf.set("fs.s3a.aws.credentials.provider", "invalid")
101+
val e = intercept[CometNativeException] {
102+
validate(hadoopConf)
103+
}
104+
assert(e.getMessage.contains("Unsupported credential provider: invalid"))
105+
}
106+
107+
test("validate object store config - mixed anonymous providers") {
108+
val hadoopConf = new Configuration()
109+
val provider1 = "com.amazonaws.auth.AnonymousAWSCredentials"
110+
val provider2 = "software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider"
111+
hadoopConf.set("fs.s3a.aws.credentials.provider", Seq(provider1, provider2).mkString(","))
112+
val e = intercept[CometNativeException] {
113+
validate(hadoopConf)
114+
}
115+
val expectedError =
116+
"Anonymous credential provider cannot be mixed with other credential providers"
117+
assert(e.getMessage.contains(expectedError))
118+
}
119+
120+
test("validate object store config - custom s3 endpoint not supported") {
121+
val hadoopConf = new Configuration()
122+
hadoopConf.set("fs.s3a.endpoint", "https://acme.storage.com")
123+
val e = intercept[CometNativeException] {
124+
validate(hadoopConf)
125+
}
126+
val expectedError =
127+
"Custom S3 endpoints are not supported"
128+
assert(e.getMessage.contains(expectedError))
129+
}
130+
131+
test("validity cache") {
132+
val hadoopConf = new Configuration()
133+
hadoopConf.set("fs.s3a.endpoint", "https://acme.storage.com")
134+
135+
assert(CometScanRule.configValidityMap.isEmpty)
136+
for (_ <- 0 until 5) {
137+
assert(Try(validate(hadoopConf)).isFailure)
138+
assert(CometScanRule.configValidityMap.size == 1)
139+
}
140+
141+
hadoopConf.set("fs.s3a.endpoint", "https://acme2.storage.com")
142+
assert(Try(validate(hadoopConf)).isFailure)
143+
assert(CometScanRule.configValidityMap.size == 2)
144+
}
145+
146+
private def validate(hadoopConf: Configuration): Unit = {
147+
val path = "s3a://path/to/file.parquet"
148+
val fallbackReasons = mutable.ListBuffer[String]()
149+
CometScanRule.validateObjectStoreConfig(path, hadoopConf, fallbackReasons)
150+
}
73151
}

0 commit comments

Comments
 (0)