Skip to content

Conversation

rishvin
Copy link
Contributor

@rishvin rishvin commented Aug 25, 2025

Which issue does this PR close?

Closes #1941

Rationale for this change

This PR introduces CometMapSort which is spark compatible MapSort function. Spark MapSort was introduced in Spark-4.0 to allows grouping on Map type. It allows this by sorting the map by keys before doing the group by.

Today, DataFusion/Arrow does not support grouping on map type, as such executing CometMapSort as a grouping expression will fail. To make it work, this PR introduces additional changes to allow grouping on map type, however the support is limited at the moment.

What changes are included in this PR?

  • Adds map_sort scalar function which sorts the Map type. This is compatible with Spark's MapSort.
  • Adds map_to_list scalar function which converts the Map type to List<Struct<K, V>> . The functions gets wrapped before passing the Map type to the hash aggregate for grouping. This conversion from Map type to List should be cheap as the physical layout is maintained.
  • Adds map_from_list scalar function which converts the List<Struct<K, V>> back to the Map type. This function is applied to the output of the hash aggregate, so that the grouping keys returned by the hash aggregate is still Map type. This is important to ensure the schema consistency. This conversion should also be cheap.
  • The above 3 function are integrated with with the hash aggregate using a helper class called HashAggregateMapConverter. It provides helpers to wrap grouping expressions with map_to_list and wrap the output of the hash aggregate with map_from_list to get the map type back.

Limitations (Future work)

  • Partial aggregation is executed natively, however Final aggregation falls back to the Spark. This is due to unsupported map-type in result-expression. Result expression is sent for final aggregation only. This work can be extended to support Final aggregation in the future.
  • Nested map type seems to fallback to Spark. In the future, this will be supported.

How are these changes tested?

  • Added unit tests for all mentioned scalar functions.
  • Added integration tests for grouping on map type.

@codecov-commenter
Copy link

codecov-commenter commented Aug 25, 2025

Codecov Report

❌ Patch coverage is 71.42857% with 4 lines in your changes missing coverage. Please review.
✅ Project coverage is 58.54%. Comparing base (f09f8af) to head (4c580f8).
⚠️ Report is 415 commits behind head on main.

Files with missing lines Patch % Lines
.../scala/org/apache/comet/serde/QueryPlanSerde.scala 50.00% 1 Missing and 3 partials ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main    #2221      +/-   ##
============================================
+ Coverage     56.12%   58.54%   +2.41%     
- Complexity      976     1281     +305     
============================================
  Files           119      143      +24     
  Lines         11743    13265    +1522     
  Branches       2251     2367     +116     
============================================
+ Hits           6591     7766    +1175     
- Misses         4012     4265     +253     
- Partials       1140     1234      +94     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@rishvin
Copy link
Contributor Author

rishvin commented Aug 26, 2025

PR Build (macOS) / macos-14/Spark 3.4, JDK 11, Scala 2.12 [exec] (pull_request)

Looks like this one failed for,

CometTaskMetricsSuite:
*** RUN ABORTED *** (30 minutes, 22 seconds)
  java.net.BindException: Can't assign requested address: Service 'sparkDriver' failed after 100 retries (on a random free port)! Consider explicitly setting the appropriate binding address for the service 'sparkDriver' (for example spark.driver.bindAddress for SparkDriver) to the correct binding address.
  at java.base/sun.nio.ch.Net.bind0(Native Method)
  at java.base/sun.nio.ch.Net.bind(Net.java:459)
  at java.base/sun.nio.ch.Net.bind(Net.java:448)
  at java.base/sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:227)
  at io.netty.channel.socket.nio.NioServerSocketChannel.doBind(NioServerSocketChannel.java:141)
  at io.netty.channel.AbstractChannel$AbstractUnsafe.bind(AbstractChannel.java:562)
  at io.netty.channel.DefaultChannelPipeline$HeadContext.bind(DefaultChannelPipeline.java:1334)
  at io.netty.channel.AbstractChannelHandlerContext.invokeBind(AbstractChannelHandlerContext.java:600)
  at io.netty.channel.AbstractChannelHandlerContext.bind(AbstractChannelHandlerContext.java:579)
  at io.netty.channel.DefaultChannelPipeline.bind(DefaultChannelPipeline.java:973)

Copy link
Contributor

@comphead comphead left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @rishvin I feel like this PR would be hard to review, would be that possible to break it down to smaller parts? One function a time?

@rishvin
Copy link
Contributor Author

rishvin commented Aug 28, 2025

Thanks @rishvin I feel like this PR would be hard to review, would be that possible to break it down to smaller parts? One function a time?

Thanks @comphead for the feedback. Yes, I will open individual PRs for each functions. I will also keep this PR open to refer how those functions are integrated at the moment. Later, I can use this same PR to make the final integration changes. I will change the status of this PR to draft for now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add support for MapSort expression in Spark 4.0.0
3 participants