Skip to content

Commit 38beead

Browse files
authored
fix: support map_keys (#1788)
* fix: support `map_keys`
1 parent c8d4762 commit 38beead

File tree

4 files changed

+151
-6
lines changed

4 files changed

+151
-6
lines changed

native/core/src/execution/planner.rs

Lines changed: 110 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3083,9 +3083,6 @@ mod tests {
30833083
let (mut scans, datafusion_plan) =
30843084
planner.create_plan(&projection, &mut vec![], 1).unwrap();
30853085

3086-
// Feed the data into plan
3087-
//scans[0].set_input_batch(input_batch);
3088-
30893086
// Start executing the plan in a separate thread
30903087
// The plan waits for incoming batches and emitting result as input comes
30913088
let mut stream = datafusion_plan.native_plan.execute(0, task_ctx).unwrap();
@@ -3149,7 +3146,7 @@ mod tests {
31493146
select array(named_struct('a', 1, 'b', 'n', 'c', 'x')) arr)
31503147
*/
31513148
#[tokio::test]
3152-
async fn test_nested_types() -> Result<(), DataFusionError> {
3149+
async fn test_nested_types_list_of_struct_by_index() -> Result<(), DataFusionError> {
31533150
let session_ctx = SessionContext::new();
31543151

31553152
// generate test data in the temp folder
@@ -3233,4 +3230,113 @@ mod tests {
32333230

32343231
Ok(())
32353232
}
3233+
3234+
/*
3235+
Testing a nested types scenario map[struct, struct]
3236+
3237+
select map_keys(m).b from (
3238+
select map(named_struct('a', 1, 'b', 'n', 'c', 'x'), named_struct('a', 1, 'b', 'n', 'c', 'x')) m
3239+
*/
3240+
#[tokio::test]
3241+
async fn test_nested_types_map_keys() -> Result<(), DataFusionError> {
3242+
let session_ctx = SessionContext::new();
3243+
3244+
// generate test data in the temp folder
3245+
let test_data = "select map([named_struct('a', 1, 'b', 'n', 'c', 'x')], [named_struct('a', 2, 'b', 'm', 'c', 'y')]) c0";
3246+
let tmp_dir = TempDir::new()?;
3247+
let test_path = tmp_dir.path().to_str().unwrap().to_string();
3248+
3249+
let plan = session_ctx
3250+
.sql(test_data)
3251+
.await?
3252+
.create_physical_plan()
3253+
.await?;
3254+
3255+
// Write a parquet file into temp folder
3256+
session_ctx
3257+
.write_parquet(plan, test_path.clone(), None)
3258+
.await?;
3259+
3260+
// Register all parquet with temp data as file groups
3261+
let mut file_groups: Vec<FileGroup> = vec![];
3262+
for entry in std::fs::read_dir(&test_path)? {
3263+
let entry = entry?;
3264+
let path = entry.path();
3265+
3266+
if path.extension().and_then(|ext| ext.to_str()) == Some("parquet") {
3267+
if let Some(path_str) = path.to_str() {
3268+
file_groups.push(FileGroup::new(vec![PartitionedFile::from_path(
3269+
path_str.into(),
3270+
)?]));
3271+
}
3272+
}
3273+
}
3274+
3275+
let source = Arc::new(
3276+
ParquetSource::default().with_schema_adapter_factory(Arc::new(
3277+
SparkSchemaAdapterFactory::new(
3278+
SparkParquetOptions::new(EvalMode::Ansi, "", false),
3279+
None,
3280+
),
3281+
)),
3282+
);
3283+
3284+
// Define schema Comet reads with
3285+
let required_schema = Schema::new(Fields::from(vec![Field::new(
3286+
"c0",
3287+
DataType::Map(
3288+
Field::new(
3289+
"entries",
3290+
DataType::Struct(Fields::from(vec![
3291+
Field::new(
3292+
"key",
3293+
DataType::Struct(Fields::from(vec![Field::new(
3294+
"b",
3295+
DataType::Utf8,
3296+
true,
3297+
)])),
3298+
false,
3299+
),
3300+
Field::new(
3301+
"value",
3302+
DataType::Struct(Fields::from(vec![
3303+
Field::new("a", DataType::Int64, true),
3304+
Field::new("b", DataType::Utf8, true),
3305+
Field::new("c", DataType::Utf8, true),
3306+
])),
3307+
true,
3308+
),
3309+
] as Vec<Field>)),
3310+
false,
3311+
)
3312+
.into(),
3313+
false,
3314+
),
3315+
true,
3316+
)]));
3317+
3318+
let object_store_url = ObjectStoreUrl::local_filesystem();
3319+
let file_scan_config =
3320+
FileScanConfigBuilder::new(object_store_url, required_schema.into(), source)
3321+
.with_file_groups(file_groups)
3322+
.build();
3323+
3324+
// Run native read
3325+
let scan = Arc::new(DataSourceExec::new(Arc::new(file_scan_config.clone())));
3326+
let stream = scan.execute(0, session_ctx.task_ctx())?;
3327+
let result: Vec<_> = stream.collect().await;
3328+
3329+
let actual = result.first().unwrap().as_ref().unwrap();
3330+
3331+
let expected = [
3332+
"+------------------------------+",
3333+
"| c0 |",
3334+
"+------------------------------+",
3335+
"| {{b: n}: {a: 2, b: m, c: y}} |",
3336+
"+------------------------------+",
3337+
];
3338+
assert_batches_eq!(expected, &[actual.clone()]);
3339+
3340+
Ok(())
3341+
}
32363342
}

spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2044,6 +2044,14 @@ object QueryPlanSerde extends Logging with CometExprShim {
20442044
convert(CometArrayCompact)
20452045
case _: ArrayExcept =>
20462046
convert(CometArrayExcept)
2047+
case mk: MapKeys =>
2048+
val childExpr = exprToProtoInternal(mk.child, inputs, binding)
2049+
scalarFunctionExprToProto("map_keys", childExpr)
2050+
// commented out because of correctness issue
2051+
// https://github.com/apache/datafusion-comet/issues/1789
2052+
// case mv: MapValues =>
2053+
// val childExpr = exprToProtoInternal(mv.child, inputs, binding)
2054+
// scalarFunctionExprToProto("map_values", childExpr)
20472055
case _ =>
20482056
withInfo(expr, s"${expr.prettyName} is not supported", expr.children: _*)
20492057
None

spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -317,4 +317,33 @@ class CometNativeReaderSuite extends CometTestBase with AdaptiveSparkPlanHelper
317317
|""".stripMargin,
318318
"select c0[0].a, c0[0].a from tbl")
319319
}
320+
321+
test("native reader - select nested field from a complex map[struct, struct] using map_keys") {
322+
testSingleLineQuery(
323+
"""
324+
| select map(str0, str1) c0 from
325+
| (
326+
| select named_struct('a', cast(1 as long), 'b', cast(2 as long), 'c', cast(3 as long)) str0,
327+
| named_struct('x', cast(8 as long), 'y', cast(9 as long), 'z', cast(0 as long)) str1 union all
328+
| select named_struct('a', cast(3 as long), 'b', cast(4 as long), 'c', cast(5 as long)) str0,
329+
| named_struct('x', cast(6 as long), 'y', cast(7 as long), 'z', cast(8 as long)) str1
330+
| )
331+
|""".stripMargin,
332+
"select map_keys(c0).b from tbl")
333+
}
334+
335+
// commented out because of correctness issue https://github.com/apache/datafusion-comet/issues/1789
336+
// test("native reader - select nested field from a complex map[struct, struct] using map_values") {
337+
// testSingleLineQuery(
338+
// """
339+
// | select map(str0, str1) c0 from
340+
// | (
341+
// | select named_struct('a', cast(1 as long), 'b', cast(2 as long), 'c', cast(3 as long)) str0,
342+
// | named_struct('x', cast(8 as long), 'y', cast(9 as long), 'z', cast(0 as long)) str1 union all
343+
// | select named_struct('a', cast(3 as long), 'b', cast(4 as long), 'c', cast(5 as long)) str0,
344+
// | named_struct('x', cast(6 as long), 'y', cast(7 as long), 'z', cast(8 as long)) str1
345+
// | )
346+
// |""".stripMargin,
347+
// "select map_values(c0).b from tbl")
348+
// }
320349
}

spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -862,7 +862,8 @@ abstract class CometTestBase
862862
testName: String = "test",
863863
tableName: String = "tbl",
864864
sqlConf: Seq[(String, String)] = Seq.empty,
865-
debugCometDF: DataFrame => Unit = _ => ()): Unit = {
865+
debugCometDF: DataFrame => Unit = _ => (),
866+
checkCometOperator: Boolean = true): Unit = {
866867

867868
withTempDir { dir =>
868869
val path = new Path(dir.toURI.toString, testName).toUri.toString
@@ -881,7 +882,8 @@ abstract class CometTestBase
881882
withSQLConf(sqlConf: _*) {
882883
val cometDF = sql(testQuery)
883884
debugCometDF(cometDF)
884-
checkSparkAnswerAndOperator(cometDF)
885+
if (checkCometOperator) checkSparkAnswerAndOperator(cometDF)
886+
else checkSparkAnswer(cometDF)
885887
}
886888
}
887889
}

0 commit comments

Comments
 (0)