diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index bef7e15e10..0d38af216f 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -2255,28 +2255,38 @@ object QueryPlanSerde extends Logging with CometExprShim { } - // TODO: Remove this constraint when we upgrade to new arrow-rs including - // https://github.com/apache/arrow-rs/pull/6225 + // scalastyle:off + /** + * Align w/ Arrow's + * [[https://github.com/apache/arrow-rs/blob/55.2.0/arrow-ord/src/rank.rs#L30-L40 can_rank]] and + * [[https://github.com/apache/arrow-rs/blob/55.2.0/arrow-ord/src/sort.rs#L193-L215 can_sort_to_indices]] + * + * TODO: Include SparkSQL's [[YearMonthIntervalType]] and [[DayTimeIntervalType]] + */ + // scalastyle:off def supportedSortType(op: SparkPlan, sortOrder: Seq[SortOrder]): Boolean = { def canRank(dt: DataType): Boolean = { dt match { case _: ByteType | _: ShortType | _: IntegerType | _: LongType | _: FloatType | - _: DoubleType | _: TimestampType | _: DecimalType | _: DateType => + _: DoubleType | _: DecimalType => true - case _: BinaryType | _: StringType => true + case _: DateType | _: TimestampType | _: TimestampNTZType => + true + case _: BooleanType | _: BinaryType | _: StringType => true case _ => false } } if (sortOrder.length == 1) { val canSort = sortOrder.head.dataType match { - case _: BooleanType => true case _: ByteType | _: ShortType | _: IntegerType | _: LongType | _: FloatType | - _: DoubleType | _: TimestampType | _: TimestampNTZType | _: DecimalType | - _: DateType => + _: DoubleType | _: DecimalType => + true + case _: DateType | _: TimestampType | _: TimestampNTZType => true - case _: BinaryType | _: StringType => true + case _: BooleanType | _: BinaryType | _: StringType => true case ArrayType(elementType, _) => canRank(elementType) + case MapType(_, valueType, _) => canRank(valueType) case _ => false } if (!canSort) { diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index 339f90e81c..10c6aaa201 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -201,6 +201,114 @@ class CometExecSuite extends CometTestBase { } } + test("Sort on array of boolean") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", + CometConf.COMET_SHUFFLE_MODE.key -> "jvm") { + + sql(""" + |CREATE OR REPLACE TEMPORARY VIEW test_list AS SELECT * FROM VALUES + | (array(true)), + | (array(false)), + | (array(false)), + | (array(false)) AS test(arr) + |""".stripMargin) + + val df = sql(""" + SELECT * FROM test_list ORDER BY arr + |""".stripMargin) + val sort = stripAQEPlan(df.queryExecution.executedPlan).collect { case s: CometSortExec => + s + }.headOption + assert(sort.isDefined) + } + } + + test("Sort on TimestampNTZType") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", + CometConf.COMET_SHUFFLE_MODE.key -> "jvm") { + + sql(""" + |CREATE OR REPLACE TEMPORARY VIEW test_list AS SELECT * FROM VALUES + | (TIMESTAMP_NTZ'2025-08-29 00:00:00'), + | (TIMESTAMP_NTZ'2023-07-07 00:00:00'), + | (convert_timezone('Asia/Kathmandu', 'UTC', TIMESTAMP_NTZ'2023-07-07 00:00:00')), + | (convert_timezone('America/Los_Angeles', 'UTC', TIMESTAMP_NTZ'2023-07-07 00:00:00')), + | (TIMESTAMP_NTZ'1969-12-31 00:00:00') AS test(ts_ntz) + |""".stripMargin) + + val df = sql(""" + SELECT * FROM test_list ORDER BY ts_ntz + |""".stripMargin) + checkSparkAnswer(df) + val sort = stripAQEPlan(df.queryExecution.executedPlan).collect { case s: CometSortExec => + s + }.headOption + assert(sort.isDefined) + } + } + + test("Sort on map w/ TimestampNTZType values") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", + CometConf.COMET_SHUFFLE_MODE.key -> "jvm") { + + sql(""" + |CREATE OR REPLACE TEMPORARY VIEW test_map AS SELECT * FROM VALUES + | (map('a', TIMESTAMP_NTZ'2025-08-29 00:00:00')), + | (map('b', TIMESTAMP_NTZ'2023-07-07 00:00:00')), + | (map('c', convert_timezone('Asia/Kathmandu', 'UTC', TIMESTAMP_NTZ'2023-07-07 00:00:00'))), + | (map('d', convert_timezone('America/Los_Angeles', 'UTC', TIMESTAMP_NTZ'2023-07-07 00:00:00'))) AS test(map) + |""".stripMargin) + + val df = sql(""" + SELECT * FROM test_map ORDER BY map_values(map) DESC + |""".stripMargin) + checkSparkAnswer(df) + val sort = stripAQEPlan(df.queryExecution.executedPlan).collect { case s: CometSortExec => + s + }.headOption + assert(sort.isDefined) + } + } + + test("Sort on map w/ boolean values") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", + CometConf.COMET_EXEC_SORT_ENABLED.key -> "true", + CometConf.COMET_SHUFFLE_MODE.key -> "jvm") { + + sql(""" + |CREATE OR REPLACE TEMPORARY VIEW test_map AS SELECT * FROM VALUES + | (map('a', true)), + | (map('b', true)), + | (map('c', false)), + | (map('d', true)) AS test(map) + |""".stripMargin) + + val df = sql(""" + SELECT * FROM test_map ORDER BY map_values(map) DESC + |""".stripMargin) + val sort = stripAQEPlan(df.queryExecution.executedPlan).collect { case s: CometSortExec => + s + }.headOption + assert(sort.isDefined) + } + } + test( "fall back to Spark when the partition spec and order spec are not the same for window function") { withTempView("test") {