-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-54443][SS] Partition key extraction for all streaming stateful operators #53355
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
|
|
||
| override def shortName: String = "applyInPandasWithState" | ||
| override def shortName: String = | ||
| StatefulOperatorsUtils.FLAT_MAP_GROUPS_IN_PANDAS_WITH_STATE_EXEC_OP_NAME |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice - good to move these to utils
| val SESSION_WINDOW_STATE_STORE_SAVE_EXEC_OP_NAME = "sessionWindowStateStoreSaveExec" | ||
| val FLAT_MAP_GROUPS_WITH_STATE_EXEC_OP_NAME = "flatMapGroupsWithState" | ||
| val FLAT_MAP_GROUPS_IN_PANDAS_WITH_STATE_EXEC_OP_NAME = "applyInPandasWithState" | ||
| val FLAT_MAP_GROUPS_OP_NAMES: Seq[String] = Seq( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah TWS node names are already here - ok nice
| val DEDUPLICATE_WITHIN_WATERMARK_EXEC_OP_NAME = "dedupeWithinWatermark" | ||
| val SESSION_WINDOW_STATE_STORE_SAVE_EXEC_OP_NAME = "sessionWindowStateStoreSaveExec" | ||
| val FLAT_MAP_GROUPS_WITH_STATE_EXEC_OP_NAME = "flatMapGroupsWithState" | ||
| val FLAT_MAP_GROUPS_IN_PANDAS_WITH_STATE_EXEC_OP_NAME = "applyInPandasWithState" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: should we rename as APPLY_IN_PANDAS_WITH_STATE_EXEC ?
What changes were proposed in this pull request?
We want to extract the same partitioning key, used during shuffle to partition the streaming state, to perform repartition for the operator state. The State store key typically includes the partition key, hence we are extracting it depending on the operator state key schema. Each operator determines how they store data in the state store.
This is to make sure repartition is consistent with how the state will be partitioned during query execution.
Why are the changes needed?
For offline state repartition
Does this PR introduce any user-facing change?
No
How was this patch tested?
New tests for each operator
Was this patch authored or co-authored using generative AI tooling?
No