Skip to content

Commit de9f425

Browse files
authored
fix: [native_scans] Support CASE_SENSITIVE when reading Parquet (#1782)
1 parent 6663245 commit de9f425

File tree

9 files changed

+49
-9
lines changed

9 files changed

+49
-9
lines changed

common/src/main/java/org/apache/comet/parquet/Native.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -257,7 +257,8 @@ public static native long initRecordBatchReader(
257257
byte[] requiredSchema,
258258
byte[] dataSchema,
259259
String sessionTimezone,
260-
int batchSize);
260+
int batchSize,
261+
boolean caseSensitive);
261262

262263
// arrow native version of read batch
263264
/**

common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
import org.apache.spark.sql.execution.datasources.parquet.ParquetColumn;
6666
import org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter;
6767
import org.apache.spark.sql.execution.metric.SQLMetric;
68+
import org.apache.spark.sql.internal.SQLConf;
6869
import org.apache.spark.sql.types.DataType;
6970
import org.apache.spark.sql.types.StructField;
7071
import org.apache.spark.sql.types.StructType;
@@ -405,6 +406,10 @@ public void init() throws Throwable {
405406
conf.getInt(
406407
CometConf.COMET_BATCH_SIZE().key(),
407408
(Integer) CometConf.COMET_BATCH_SIZE().defaultValue().get());
409+
boolean caseSensitive =
410+
conf.getBoolean(
411+
SQLConf.CASE_SENSITIVE().key(),
412+
(boolean) SQLConf.CASE_SENSITIVE().defaultValue().get());
408413
this.handle =
409414
Native.initRecordBatchReader(
410415
filePath,
@@ -415,7 +420,8 @@ public void init() throws Throwable {
415420
serializedRequestedArrowSchema,
416421
serializedDataArrowSchema,
417422
timeZoneId,
418-
batchSize);
423+
batchSize,
424+
caseSensitive);
419425
}
420426
isInitialized = true;
421427
}

native/core/src/execution/planner.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1190,6 +1190,7 @@ impl PhysicalPlanner {
11901190
Some(data_filters?),
11911191
default_values,
11921192
scan.session_timezone.as_str(),
1193+
scan.case_sensitive,
11931194
)?;
11941195
Ok((
11951196
vec![],

native/core/src/parquet/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ use datafusion::physical_plan::ExecutionPlan;
6060
use datafusion::prelude::{SessionConfig, SessionContext};
6161
use futures::{poll, StreamExt};
6262
use jni::objects::{JBooleanArray, JByteArray, JLongArray, JPrimitiveArray, JString, ReleaseMode};
63-
use jni::sys::jstring;
63+
use jni::sys::{jstring, JNI_FALSE};
6464
use object_store::path::Path;
6565
use read::ColumnReader;
6666
use util::jni::{convert_column_descriptor, convert_encoding, deserialize_schema};
@@ -657,6 +657,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBat
657657
data_schema: jbyteArray,
658658
session_timezone: jstring,
659659
batch_size: jint,
660+
case_sensitive: jboolean,
660661
) -> jlong {
661662
try_unwrap_or_throw(&e, |mut env| unsafe {
662663
let session_config = SessionConfig::new().with_batch_size(batch_size as usize);
@@ -717,6 +718,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBat
717718
data_filters,
718719
None,
719720
session_timezone.as_str(),
721+
case_sensitive != JNI_FALSE,
720722
)?;
721723

722724
let partition_index: usize = 0;

native/core/src/parquet/parquet_exec.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,10 @@ pub(crate) fn init_datasource_exec(
6565
data_filters: Option<Vec<Arc<dyn PhysicalExpr>>>,
6666
default_values: Option<HashMap<usize, ScalarValue>>,
6767
session_timezone: &str,
68+
case_sensitive: bool,
6869
) -> Result<Arc<DataSourceExec>, ExecutionError> {
69-
let (table_parquet_options, spark_parquet_options) = get_options(session_timezone);
70+
let (table_parquet_options, spark_parquet_options) =
71+
get_options(session_timezone, case_sensitive);
7072
let mut parquet_source =
7173
ParquetSource::new(table_parquet_options).with_schema_adapter_factory(Arc::new(
7274
SparkSchemaAdapterFactory::new(spark_parquet_options, default_values),
@@ -118,15 +120,18 @@ pub(crate) fn init_datasource_exec(
118120
Ok(Arc::new(DataSourceExec::new(Arc::new(file_scan_config))))
119121
}
120122

121-
fn get_options(session_timezone: &str) -> (TableParquetOptions, SparkParquetOptions) {
123+
fn get_options(
124+
session_timezone: &str,
125+
case_sensitive: bool,
126+
) -> (TableParquetOptions, SparkParquetOptions) {
122127
let mut table_parquet_options = TableParquetOptions::new();
123128
table_parquet_options.global.pushdown_filters = true;
124129
table_parquet_options.global.reorder_filters = true;
125130
table_parquet_options.global.coerce_int96 = Some("us".to_string());
126131
let mut spark_parquet_options =
127132
SparkParquetOptions::new(EvalMode::Legacy, session_timezone, false);
128133
spark_parquet_options.allow_cast_unsigned_ints = true;
129-
spark_parquet_options.case_sensitive = false;
134+
spark_parquet_options.case_sensitive = case_sensitive;
130135
(table_parquet_options, spark_parquet_options)
131136
}
132137

native/core/src/parquet/parquet_support.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -211,16 +211,25 @@ fn cast_struct_to_struct(
211211
// TODO some of this logic may be specific to converting Parquet to Spark
212212
let mut field_name_to_index_map = HashMap::new();
213213
for (i, field) in from_fields.iter().enumerate() {
214-
field_name_to_index_map.insert(field.name(), i);
214+
if parquet_options.case_sensitive {
215+
field_name_to_index_map.insert(field.name().clone(), i);
216+
} else {
217+
field_name_to_index_map.insert(field.name().to_lowercase(), i);
218+
}
215219
}
216220
assert_eq!(field_name_to_index_map.len(), from_fields.len());
217221
let mut cast_fields: Vec<ArrayRef> = Vec::with_capacity(to_fields.len());
218222
for i in 0..to_fields.len() {
219223
// Fields in the to_type schema may not exist in the from_type schema
220224
// i.e. the required schema may have fields that the file does not
221225
// have
222-
if field_name_to_index_map.contains_key(to_fields[i].name()) {
223-
let from_index = field_name_to_index_map[to_fields[i].name()];
226+
let key = if parquet_options.case_sensitive {
227+
to_fields[i].name().clone()
228+
} else {
229+
to_fields[i].name().to_lowercase()
230+
};
231+
if field_name_to_index_map.contains_key(&key) {
232+
let from_index = field_name_to_index_map[&key];
224233
let cast_field = cast_array(
225234
Arc::clone(array.column(from_index)),
226235
to_fields[i].data_type(),

native/proto/src/proto/operator.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ message NativeScan {
9393
string session_timezone = 9;
9494
repeated spark.spark_expression.Expr default_values = 10;
9595
repeated int64 default_values_indexes = 11;
96+
bool case_sensitive = 12;
9697
}
9798

9899
message Projection {

spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2370,6 +2370,7 @@ object QueryPlanSerde extends Logging with CometExprShim {
23702370
nativeScanBuilder.addAllRequiredSchema(requiredSchema.toIterable.asJava)
23712371
nativeScanBuilder.addAllPartitionSchema(partitionSchema.toIterable.asJava)
23722372
nativeScanBuilder.setSessionTimezone(conf.getConfString("spark.sql.session.timeZone"))
2373+
nativeScanBuilder.setCaseSensitive(conf.getConf[Boolean](SQLConf.CASE_SENSITIVE))
23732374

23742375
Some(result.setNativeScan(nativeScanBuilder).build())
23752376

spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,20 @@ class CometNativeReaderSuite extends CometTestBase with AdaptiveSparkPlanHelper
4444
})
4545
}
4646

47+
test("native reader case sensitivity") {
48+
withTempPath { path =>
49+
spark.range(10).toDF("a").write.parquet(path.toString)
50+
Seq(true, false).foreach { caseSensitive =>
51+
withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
52+
val tbl = s"case_sensitivity_${caseSensitive}_${System.currentTimeMillis()}"
53+
sql(s"create table $tbl (A long) using parquet options (path '" + path + "')")
54+
val df = sql(s"select A from $tbl")
55+
checkSparkAnswer(df)
56+
}
57+
}
58+
}
59+
}
60+
4761
test("native reader - read simple STRUCT fields") {
4862
testSingleLineQuery(
4963
"""

0 commit comments

Comments
 (0)