Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 18 additions & 8 deletions spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2255,28 +2255,38 @@ object QueryPlanSerde extends Logging with CometExprShim {

}

// TODO: Remove this constraint when we upgrade to new arrow-rs including
// https://github.com/apache/arrow-rs/pull/6225
// scalastyle:off
/**
* Align w/ Arrow's
* [[https://github.com/apache/arrow-rs/blob/55.2.0/arrow-ord/src/rank.rs#L30-L40 can_rank]] and
* [[https://github.com/apache/arrow-rs/blob/55.2.0/arrow-ord/src/sort.rs#L193-L215 can_sort_to_indices]]
*
* TODO: Include SparkSQL's [[YearMonthIntervalType]] and [[DayTimeIntervalType]]
*/
// scalastyle:off
def supportedSortType(op: SparkPlan, sortOrder: Seq[SortOrder]): Boolean = {
def canRank(dt: DataType): Boolean = {
dt match {
case _: ByteType | _: ShortType | _: IntegerType | _: LongType | _: FloatType |
_: DoubleType | _: TimestampType | _: DecimalType | _: DateType =>
_: DoubleType | _: DecimalType =>
true
case _: BinaryType | _: StringType => true
case _: DateType | _: TimestampType | _: TimestampNTZType =>
true
case _: BooleanType | _: BinaryType | _: StringType => true
case _ => false
}
}

if (sortOrder.length == 1) {
val canSort = sortOrder.head.dataType match {
case _: BooleanType => true
case _: ByteType | _: ShortType | _: IntegerType | _: LongType | _: FloatType |
_: DoubleType | _: TimestampType | _: TimestampNTZType | _: DecimalType |
_: DateType =>
_: DoubleType | _: DecimalType =>
true
case _: DateType | _: TimestampType | _: TimestampNTZType =>
true
case _: BinaryType | _: StringType => true
case _: BooleanType | _: BinaryType | _: StringType => true
case ArrayType(elementType, _) => canRank(elementType)
case MapType(_, valueType, _) => canRank(valueType)
case _ => false
}
if (!canSort) {
Expand Down
108 changes: 108 additions & 0 deletions spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,114 @@ class CometExecSuite extends CometTestBase {
}
}

test("Sort on array of boolean") {
withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false",
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
CometConf.COMET_SHUFFLE_MODE.key -> "jvm") {

sql("""
|CREATE OR REPLACE TEMPORARY VIEW test_list AS SELECT * FROM VALUES
| (array(true)),
| (array(false)),
| (array(false)),
| (array(false)) AS test(arr)
|""".stripMargin)

val df = sql("""
SELECT * FROM test_list ORDER BY arr
|""".stripMargin)
val sort = stripAQEPlan(df.queryExecution.executedPlan).collect { case s: CometSortExec =>
s
}.headOption
assert(sort.isDefined)
}
}

test("Sort on TimestampNTZType") {
withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false",
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
CometConf.COMET_SHUFFLE_MODE.key -> "jvm") {

sql("""
|CREATE OR REPLACE TEMPORARY VIEW test_list AS SELECT * FROM VALUES
| (TIMESTAMP_NTZ'2025-08-29 00:00:00'),
| (TIMESTAMP_NTZ'2023-07-07 00:00:00'),
| (convert_timezone('Asia/Kathmandu', 'UTC', TIMESTAMP_NTZ'2023-07-07 00:00:00')),
| (convert_timezone('America/Los_Angeles', 'UTC', TIMESTAMP_NTZ'2023-07-07 00:00:00')),
| (TIMESTAMP_NTZ'1969-12-31 00:00:00') AS test(ts_ntz)
|""".stripMargin)

val df = sql("""
SELECT * FROM test_list ORDER BY ts_ntz
|""".stripMargin)
checkSparkAnswer(df)
val sort = stripAQEPlan(df.queryExecution.executedPlan).collect { case s: CometSortExec =>
s
}.headOption
assert(sort.isDefined)
}
}

test("Sort on map w/ TimestampNTZType values") {
withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false",
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
CometConf.COMET_SHUFFLE_MODE.key -> "jvm") {

sql("""
|CREATE OR REPLACE TEMPORARY VIEW test_map AS SELECT * FROM VALUES
| (map('a', TIMESTAMP_NTZ'2025-08-29 00:00:00')),
| (map('b', TIMESTAMP_NTZ'2023-07-07 00:00:00')),
| (map('c', convert_timezone('Asia/Kathmandu', 'UTC', TIMESTAMP_NTZ'2023-07-07 00:00:00'))),
| (map('d', convert_timezone('America/Los_Angeles', 'UTC', TIMESTAMP_NTZ'2023-07-07 00:00:00'))) AS test(map)
|""".stripMargin)

val df = sql("""
SELECT * FROM test_map ORDER BY map_values(map) DESC
|""".stripMargin)
checkSparkAnswer(df)
val sort = stripAQEPlan(df.queryExecution.executedPlan).collect { case s: CometSortExec =>
s
}.headOption
assert(sort.isDefined)
}
}

test("Sort on map w/ boolean values") {
withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false",
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
CometConf.COMET_EXEC_SORT_ENABLED.key -> "true",
CometConf.COMET_SHUFFLE_MODE.key -> "jvm") {

sql("""
|CREATE OR REPLACE TEMPORARY VIEW test_map AS SELECT * FROM VALUES
| (map('a', true)),
| (map('b', true)),
| (map('c', false)),
| (map('d', true)) AS test(map)
|""".stripMargin)

val df = sql("""
SELECT * FROM test_map ORDER BY map_values(map) DESC
|""".stripMargin)
val sort = stripAQEPlan(df.queryExecution.executedPlan).collect { case s: CometSortExec =>
s
}.headOption
assert(sort.isDefined)
}
}

test(
"fall back to Spark when the partition spec and order spec are not the same for window function") {
withTempView("test") {
Expand Down