| title | description | ms.reviewer | ms.topic | ms.date |
|---|---|---|---|---|
Shuffle query |
This article describes Shuffle query. |
alexans |
reference |
08/11/2024 |
[!INCLUDE applies] [!INCLUDE fabric] [!INCLUDE azure-data-explorer] [!INCLUDE monitor] [!INCLUDE sentinel]
The shuffle query is a semantic-preserving transformation used with a set of operators that support the shuffle strategy. Depending on the data involved, querying with the shuffle strategy can yield better performance. It's better to use the shuffle query strategy when the shuffle key (a join key, summarize key, make-series key or partition key) has a high cardinality and the regular operator query hits query limits.
You can use the following operators with the shuffle command:
To use the shuffle query strategy, add the expression hint.strategy = shuffle or hint.shufflekey = <key>. When you use hint.strategy=shuffle, the operator data will be shuffled by all the keys. Use this expression when the compound key is unique but each key isn't unique enough, so you'll shuffle the data using all the keys of the shuffled operator.
When partitioning data with the shuffle strategy, the data load is shared on all cluster nodes. Each node processes one partition of the data. The default number of partitions is equal to the number of cluster nodes.
The partition number can be overridden by using the syntax hint.num_partitions = total_partitions, which will control the number of partitions. This is useful when the cluster has a small number of cluster nodes and the default partitions number will be small, and the query fails or takes a long execution time.
Note
Using many partitions may consume more cluster resources and degrade performance. Choose the partition number carefully by starting with the hint.strategy = shuffle and start increasing the partitions gradually.
In some cases, the hint.strategy = shuffle is ignored, and the query won't run in shuffle strategy. This can happen when:
- The
joinoperator has anothershuffle-compatible operator (join,summarize,make-seriesorpartition) on the left side or the right side. - The
summarizeoperator appears after anothershuffle-compatible operator (join,summarize,make-seriesorpartition) in the query.
T | DataExpression | join hint.strategy = shuffle ( DataExpression )
T | summarize hint.strategy = shuffle DataExpression
T | Query | partition hint.strategy = shuffle ( SubQuery )
T | DataExpression | join hint.shufflekey = key ( DataExpression )
T | summarize hint.shufflekey = key DataExpression
T | make-series hint.shufflekey = key DataExpression
T | Query | partition hint.shufflekey = key ( SubQuery )
[!INCLUDE syntax-conventions-note]
| Name | Type | Required | Description |
|---|---|---|---|
| T | string |
✔️ | The tabular source whose data is to be processed by the operator. |
| DataExpression | string |
An implicit or explicit tabular transformation expression. | |
| Query | string |
A transformation expression run on the records of T. | |
| key | string |
Use a join key, summarize key, make-series key or partition key. |
|
| SubQuery | string |
A transformation expression. |
Note
Either DataExpression or Query must be specified depending on the chosen syntax.
The example in this section shows how to use the syntax to help you get started.
[!INCLUDE help-cluster]
The shuffle strategy query with summarize operator shares the load on all cluster nodes, where each node processes one partition of the data.
:::moniker range="azure-data-explorer"
[!div class="nextstepaction"] Run the query ::: moniker-end
StormEvents
| summarize hint.strategy = shuffle count(), avg(InjuriesIndirect) by State
| count Output
| Count |
|---|
| 67 |
:::moniker range="azure-data-explorer"
[!div class="nextstepaction"] Run the query ::: moniker-end
StormEvents
| where State has "West"
| where EventType has "Flood"
| join hint.strategy=shuffle
(
StormEvents
| where EventType has "Hail"
| project EpisodeId, State, DamageProperty
)
on State
| countOutput
| Count |
|---|
| 103 |
:::moniker range="azure-data-explorer"
[!div class="nextstepaction"] Run the query ::: moniker-end
StormEvents
| where State has "North"
| make-series hint.shufflekey = State sum(DamageProperty) default = 0 on StartTime in range(datetime(2007-01-01 00:00:00.0000000), datetime(2007-01-31 23:59:00.0000000), 15d) by StateOutput
|State|sum_DamageProperty|StartTime| |---|---|---|---| |NORTH DAKOTA|[60000,0,0]|["2006-12-31T00:00:00.0000000Z","2007-01-15T00:00:00.0000000Z","2007-01-30T00:00:00.0000000Z"]| |NORTH CAROLINA|[20000,0,1000]|["2006-12-31T00:00:00.0000000Z","2007-01-15T00:00:00.0000000Z","2007-01-30T00:00:00.0000000Z"]| |ATLANTIC NORTH|[0,0,0]|["2006-12-31T00:00:00.0000000Z","2007-01-15T00:00:00.0000000Z","2007-01-30T00:00:00.0000000Z"]|
:::moniker range="azure-data-explorer"
[!div class="nextstepaction"] Run the query ::: moniker-end
StormEvents
| partition hint.strategy=shuffle by EpisodeId
(
top 3 by DamageProperty
| project EpisodeId, State, DamageProperty
)
| countOutput
| Count |
|---|
| 22345 |
When you use hint.strategy=shuffle, the shuffled operator will be shuffled by all the keys. In the following example, the query shuffles the data using both EpisodeId and EventId as keys:
:::moniker range="azure-data-explorer"
[!div class="nextstepaction"] Run the query ::: moniker-end
StormEvents
| where StartTime > datetime(2007-01-01 00:00:00.0000000)
| join kind = inner hint.strategy=shuffle (StormEvents | where DamageCrops > 62000000) on EpisodeId, EventId
| countOutput
| Count |
|---|
| 14 |
The following query uses hint.shufflekey = key. The query above is equivalent to this query.
:::moniker range="azure-data-explorer"
[!div class="nextstepaction"] Run the query ::: moniker-end
StormEvents
| where StartTime > datetime(2007-01-01 00:00:00.0000000)
| join kind = inner hint.shufflekey = EpisodeId hint.shufflekey = EventId (StormEvents | where DamageCrops > 62000000) on EpisodeId, EventIdOutput
| Count |
|---|
| 14 |
In some cases, the hint.strategy=shuffle will be ignored, and the query won't run in shuffle strategy. For example, in the following example, the join has summarize on its left side, so using hint.strategy=shuffle won't apply shuffle strategy to the query:
:::moniker range="azure-data-explorer"
[!div class="nextstepaction"] Run the query ::: moniker-end
StormEvents
| where StartTime > datetime(2007-01-01 00:00:00.0000000)
| summarize count() by EpisodeId, EventId
| join kind = inner hint.strategy=shuffle (StormEvents | where DamageCrops > 62000000) on EpisodeId, EventIdOutput
| EpisodeId | EventId | ... | EpisodeId1 | EventId1 | ... |
|---|---|---|---|---|---|
| 1030 | 4407 | ... | 1030 | 4407 | ... |
| 1030 | 13721 | ... | 1030 | 13721 | ... |
| 2477 | 12530 | ... | 2477 | 12530 | ... |
| 2103 | 10237 | ... | 2103 | 10237 | ... |
| 2103 | 10239 | ... | 2103 | 10239 | ... |
| ... | ... | ... | ... | ... | ... |
To overcome this issue and run in shuffle strategy, choose the key that is common for the summarize and join operations. In this case, this key is EpisodeId. Use the hint hint.shufflekey to specify the shuffle key on the join to hint.shufflekey = EpisodeId:
:::moniker range="azure-data-explorer"
[!div class="nextstepaction"] Run the query ::: moniker-end
StormEvents
| where StartTime > datetime(2007-01-01 00:00:00.0000000)
| summarize count() by EpisodeId, EventId
| join kind = inner hint.shufflekey=EpisodeId (StormEvents | where DamageCrops > 62000000) on EpisodeId, EventIdOutput
| EpisodeId | EventId | ... | EpisodeId1 | EventId1 | ... |
|---|---|---|---|---|---|
| 1030 | 4407 | ... | 1030 | 4407 | ... |
| 1030 | 13721 | ... | 1030 | 13721 | ... |
| 2477 | 12530 | ... | 2477 | 12530 | ... |
| 2103 | 10237 | ... | 2103 | 10237 | ... |
| 2103 | 10239 | ... | 2103 | 10239 | ... |
| ... | ... | ... | ... | ... | ... |
In this example, using the summarize operator with shuffle strategy improves performance. The source table has 150M records and the cardinality of the group by key is 10M, which is spread over 10 cluster nodes.
Using summarize operator without shuffle strategy, the query ends after 1:08 and the memory usage peak is ~3 GB:
orders
| summarize arg_max(o_orderdate, o_totalprice) by o_custkey
| where o_totalprice < 1000
| countOutput
| Count |
|---|
| 1086 |
While using shuffle strategy with summarize, the query ends after ~7 seconds and the memory usage peak is 0.43 GB:
orders
| summarize hint.strategy = shuffle arg_max(o_orderdate, o_totalprice) by o_custkey
| where o_totalprice < 1000
| countOutput
| Count |
|---|
| 1086 |
The following example demonstrates performance on a cluster that has two cluster nodes, with a table that has 60M records, where the cardinality of the group by key is 2M.
Running the query without hint.num_partitions will use only two partitions (as cluster nodes number) and the following query will take ~1:10 mins:
lineitem
| summarize hint.strategy = shuffle dcount(l_comment), dcount(l_shipdate) by l_partkey
| consumeIf setting the partitions number to 10, the query will end after 23 seconds:
lineitem
| summarize hint.strategy = shuffle hint.num_partitions = 10 dcount(l_comment), dcount(l_shipdate) by l_partkey
| consumeThe following example shows how using shuffle strategy with the join operator improves performance.
The examples were sampled on a cluster with 10 nodes where the data is spread over all these nodes.
The query's left-side source table has 15M records where the cardinality of the join key is ~14M. The query's right-side source has 150M records and the cardinality of the join key is 10M. The query ends after ~28 seconds and the memory usage peak is 1.43 GB:
customer
| join
orders
on $left.c_custkey == $right.o_custkey
| summarize sum(c_acctbal) by c_nationkeyWhen using shuffle strategy with a join operator, the query ends after ~4 seconds and the memory usage peak is 0.3 GB:
customer
| join
hint.strategy = shuffle orders
on $left.c_custkey == $right.o_custkey
| summarize sum(c_acctbal) by c_nationkeyIn another example, we try the same queries on a larger dataset with the following conditions:
- Left-side source of the
joinis 150M and the cardinality of the key is 148M. - Right-side source of the
joinis 1.5B, and the cardinality of the key is ~100M.
The query with just the join operator hits limits and times-out after 4 mins. However, when using shuffle strategy with the join operator, the query ends after ~34 seconds and the memory usage peak is 1.23 GB.
The following example shows the improvement on a cluster that has two cluster nodes, with a table of 60M records, where the cardinality of the join key is 2M.
Running the query without hint.num_partitions will use only two partitions (as cluster nodes number) and the following query will take ~1:10 mins:
lineitem
| summarize dcount(l_comment), dcount(l_shipdate) by l_partkey
| join
hint.shufflekey = l_partkey part
on $left.l_partkey == $right.p_partkey
| consumeWhen setting the partitions number to 10, the query will end after 23 seconds:
lineitem
| summarize dcount(l_comment), dcount(l_shipdate) by l_partkey
| join
hint.shufflekey = l_partkey hint.num_partitions = 10 part
on $left.l_partkey == $right.p_partkey
| consume