Skip to content

Commit 30d7b34

Browse files
committed
execbuilder: add "average lookup ratio" parallelization heuristic for lookup joins
The DistSender API forces its users to make a choice between cross-range parallelism (which is needed for performance) and setting memory limits (which is needed for stability). Streamer was introduced to address this limitation, but it comes with some requirements, one of which is that it needs to have access to LeafTxns. However, mutation statements must run in the RootTxn, so we never use the Streamer for those and fall back to using the DistSender API directly (via `txnKVFetcher`). There, we currently have the following heuristic: - if we know that each input row results in at most one lookup row, then we consider such a lookup to be "safe" parallelization, so we disable usage of memory limits on the BatchHeader. This is the case for index joins (when we always expect to get exactly one looked up row) as well as lookup joins that have "equality columns are key" property (when we expect at most one looked up row). - otherwise, if we have a multi-key lookup join, we use the default fetcher memory limits (TargetBytes of 10MiB), which disables cross-range parallelism. Most commonly this will affect mutation statements and will have a more pronouanced effect on the multi-region tables, so this commit extends the heuristic for when we consider it to be "safe" for parallelization. Namely, we now calculate the average lookup ratio based on the lookup equality columns and the available table / column statistics, and if the ratio doesn't exceed the allowed limit, then we'll enable the parallelism. To a certain degree, this heuristic resembles the "equality columns are key" heuristic that we already utilize, but instead of a guaranteed maximum on the lookup ratio we use the estimated average. What we're trying to prevent with the existing and the new heuristics is the case when we construct such a KV batch that the KV response will overwhelm (read "will OOM") the node issuing the KV batch. In the existing heuristic we say that "if lookup ratio is guaranteed to not exceed one, then it should be safe". I believe that the new heuristic should be safe in practice for most deployments due to the following reasons: - we already have an implicit limiting behavior in the join reader due to its execution model (it first buffers some number of rows, up to 2MiB in size when not using the streamer, deduplicates the lookup spans, and performs the lookup of all those spans in a single KV batch). Empirical testing shows that we expect to have at most 25k lookups in that single KV batch. - this will have impact only when the streamer is not used, which most commonly will mean we're executing a mutation, and in our docs we advocate for not performing large mutations. (I'm stretching things a bit here since even if we modify small amount of data, to compute that we might read a lot, which could be destabilizing if we disable KV limits. Yet a similar argument could be made that our current "equality columns are key" heuristic is not safe - it's possible to construct a scenario where we look up large amounts of data.) In order to prevent this new heuristic from exploding in some edge cases, two guardrails are added: - in order to handle a scenario where the lookup ratio is not evenly distributed (i.e. different input rows can result in vastly different number of looked up rows), we'll disable the heuristic if the max lookup ratio exceeds the allowed limit. - in order to handle a scenario where looked rows are very large, we'll disable the heuristic if the estimated average lookup row size exceeds the allowed limit. (Note that we don't have this kind of protection in the existing heuristics.) I plan to do some more empirical runs to fine-tune the default values of the newly added session variables, but the current defaults are: - `parallelize_multi_key_lookup_joins_avg_lookup_ratio = 10` - `parallelize_multi_key_lookup_joins_max_lookup_ratio = 10000` - `parallelize_multi_key_lookup_joins_avg_lookup_row_size = 100 KiB`. In order to de-risk rollout of this feature, we will initially apply the new heuristic only to mutations of multi-region tables. New session variable `parallelize_multi_key_lookup_joins_only_on_mr_mutations` can be set to `false` to apply the heuristic to all statements, regardless of the table being multi-region. Release note (performance improvement): Mutation statements (UPDATEs and DELETEs) that perform lookup joins into multi-region tables (perhaps as part of a CASCADE) are now more likely to parallelize the lookups across ranges which improves their performance.
1 parent 90765a4 commit 30d7b34

File tree

17 files changed

+546
-51
lines changed

17 files changed

+546
-51
lines changed

pkg/ccl/logictestccl/testdata/logic_test/regional_by_row_cascade

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,15 +58,36 @@ INSERT INTO child (c, p, crdb_region) VALUES (1000, 100, 'us-east-1'), (2000, 20
5858
statement ok
5959
ANALYZE great_grandparent;
6060

61+
# Only the scan in the main query is parallelized when we don't have stats on
62+
# the descendant tables.
63+
query I
64+
SELECT count(*) FROM [EXPLAIN (VERBOSE) DELETE FROM great_grandparent WHERE i = 1] WHERE info LIKE '%parallel%';
65+
----
66+
1
67+
6168
statement ok
6269
ANALYZE grandparent;
6370

71+
# Now we also should parallelize lookup join into the grandparent table.
72+
query I
73+
SELECT count(*) FROM [EXPLAIN (VERBOSE) DELETE FROM great_grandparent WHERE i = 1] WHERE info LIKE '%parallel%';
74+
----
75+
2
76+
6477
statement ok
6578
ANALYZE parent;
6679

80+
# Now we also should parallelize lookup join into the parent table.
81+
query I
82+
SELECT count(*) FROM [EXPLAIN (VERBOSE) DELETE FROM great_grandparent WHERE i = 1] WHERE info LIKE '%parallel%';
83+
----
84+
3
85+
6786
statement ok
6887
ANALYZE child;
6988

89+
# Finally, all three lookup joins as well as the scan in the main query should
90+
# be parallelized.
7091
query T
7192
EXPLAIN (VERBOSE) DELETE FROM great_grandparent WHERE i = 1;
7293
----
@@ -126,6 +147,7 @@ vectorized: true
126147
│ │ estimated row count: 3
127148
│ │ table: grandparent@grandparent_gg_idx
128149
│ │ lookup condition: (crdb_region IN ('ap-southeast-2', 'ca-central-1', 'us-east-1')) AND (gg = gg)
150+
│ │ parallel
129151
│ │
130152
│ └── • distinct
131153
│ │ columns: (gg)
@@ -163,6 +185,7 @@ vectorized: true
163185
│ │ estimated row count: 3
164186
│ │ table: parent@parent_g_idx
165187
│ │ lookup condition: (crdb_region IN ('ap-southeast-2', 'ca-central-1', 'us-east-1')) AND (g = g)
188+
│ │ parallel
166189
│ │
167190
│ └── • distinct
168191
│ │ columns: (g)
@@ -193,6 +216,7 @@ vectorized: true
193216
│ estimated row count: 3
194217
│ table: child@child_p_idx
195218
│ lookup condition: (crdb_region IN ('ap-southeast-2', 'ca-central-1', 'us-east-1')) AND (p = p)
219+
│ parallel
196220
197221
└── • distinct
198222
│ columns: (p)
@@ -206,3 +230,106 @@ vectorized: true
206230
columns: (p, g, crdb_region)
207231
estimated row count: 100
208232
label: buffer 1000000
233+
234+
statement ok
235+
SET parallelize_multi_key_lookup_joins_avg_lookup_ratio = 0;
236+
237+
# Only the scan in the main query is parallelized when the "average lookup
238+
# ratio" heuristic is disabled.
239+
query I
240+
SELECT count(*) FROM [EXPLAIN (VERBOSE) DELETE FROM great_grandparent WHERE i = 1] WHERE info LIKE '%parallel%';
241+
----
242+
1
243+
244+
statement ok
245+
RESET parallelize_multi_key_lookup_joins_avg_lookup_ratio;
246+
247+
# All three lookup joins as well as the scan in the main query should be
248+
# parallelized.
249+
query I
250+
SELECT count(*) FROM [EXPLAIN (VERBOSE) DELETE FROM great_grandparent WHERE i = 1] WHERE info LIKE '%parallel%';
251+
----
252+
4
253+
254+
# Inject the table stats for grandparent table to simulate the case when each
255+
# region stores 100k rows each. The lookup into the table should still be
256+
# parallelized (if it's not, then we're using the wrong ColumnIDs when
257+
# retrieving column stats).
258+
statement ok
259+
ALTER TABLE grandparent INJECT STATISTICS '[
260+
{
261+
"avg_size": 4,
262+
"columns": [
263+
"crdb_region"
264+
],
265+
"created_at": "2025-01-01 00:00:00.000000",
266+
"distinct_count": 3,
267+
"histo_col_type": "",
268+
"name": "__auto__",
269+
"null_count": 0,
270+
"row_count": 300000
271+
},
272+
{
273+
"avg_size": 2,
274+
"columns": [
275+
"gg"
276+
],
277+
"created_at": "2025-01-01 00:00:00.000000",
278+
"distinct_count": 300000,
279+
"histo_buckets": [
280+
{"distinct_range": 0, "num_eq": 1, "num_range": 0, "upper_bound": "1"},
281+
{"distinct_range": 299999, "num_eq": 1, "num_range": 299999, "upper_bound": "300000"}
282+
],
283+
"histo_col_type": "INT8",
284+
"histo_version": 3,
285+
"name": "__auto__",
286+
"null_count": 0,
287+
"row_count": 300000
288+
}
289+
]'
290+
291+
query I
292+
SELECT count(*) FROM [EXPLAIN (VERBOSE) DELETE FROM great_grandparent WHERE i = 1] WHERE info LIKE '%parallel%';
293+
----
294+
4
295+
296+
# Now simulate a scenario where many rows have NULLs in the lookup column 'gg'.
297+
# The lookup into the table should still be parallelized (if it's not, then
298+
# we're incorrectly considering NULLs in the heuristic).
299+
statement ok
300+
ALTER TABLE grandparent INJECT STATISTICS '[
301+
{
302+
"avg_size": 4,
303+
"columns": [
304+
"crdb_region"
305+
],
306+
"created_at": "2025-01-01 00:00:00.000000",
307+
"distinct_count": 3,
308+
"histo_col_type": "",
309+
"name": "__auto__",
310+
"null_count": 0,
311+
"row_count": 1000000
312+
},
313+
{
314+
"avg_size": 2,
315+
"columns": [
316+
"gg"
317+
],
318+
"created_at": "2025-01-01 00:00:00.000000",
319+
"distinct_count": 300000,
320+
"histo_buckets": [
321+
{"distinct_range": 0, "num_eq": 1, "num_range": 0, "upper_bound": "1"},
322+
{"distinct_range": 299999, "num_eq": 1, "num_range": 299999, "upper_bound": "300000"}
323+
],
324+
"histo_col_type": "INT8",
325+
"histo_version": 3,
326+
"name": "__auto__",
327+
"null_count": 700000,
328+
"row_count": 1000000
329+
}
330+
]'
331+
332+
query I
333+
SELECT count(*) FROM [EXPLAIN (VERBOSE) DELETE FROM great_grandparent WHERE i = 1] WHERE info LIKE '%parallel%';
334+
----
335+
4

pkg/ccl/logictestccl/testdata/logic_test/regional_by_row_query_behavior

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3626,6 +3626,22 @@ vectorized: true
36263626
table: abc@abc_id1_id2_idx
36273627
spans: /"@"/"m\xa4\xf3V\xe5&Kx\xb9\xf9\xbb\xb1\xa7\xfc\x12\xd6"/"h\b\x87\x06\x02\xc6Gѹ\x93\xa4!\xcdv\x1f+"-/"@"/"m\xa4\xf3V\xe5&Kx\xb9\xf9\xbb\xb1\xa7\xfc\x12\xd6"/"h\b\x87\x06\x02\xc6Gѹ\x93\xa4!\xcdv\x1f+"/PrefixEnd
36283628

3629+
# Same query as above but ensure that the "average lookup ratio" parallelization
3630+
# heuristic applies to the SELECT statement (both the index join and the lookup
3631+
# join should be parallelized).
3632+
statement ok
3633+
SET parallelize_multi_key_lookup_joins_only_on_mr_mutations = false;
3634+
3635+
query I
3636+
SELECT count(*) FROM [
3637+
EXPLAIN (VERBOSE) SELECT xyz.str, abc.id, abc.id1, abc.id2, abc.created_at, abc.updated_at FROM abc JOIN xyz ON xyz.abc_id = abc.id AND xyz.id2 = abc.id2 AND xyz.crdb_region = abc.crdb_region WHERE abc.id1 = '6da4f356-e526-4b78-b9f9-bbb1a7fc12d6' AND abc.id2 = '68088706-02c6-47d1-b993-a421cd761f2b' AND abc.crdb_region = 'ap-southeast-2' AND xyz.crdb_region = 'ap-southeast-2'
3638+
] WHERE info LIKE '%parallel%';
3639+
----
3640+
2
3641+
3642+
statement ok
3643+
RESET parallelize_multi_key_lookup_joins_only_on_mr_mutations;
3644+
36293645
# The following should use a string of 4 lookup/index joins with a cost under 200.
36303646
query T retry
36313647
EXPLAIN(opt,verbose) SELECT

pkg/sql/exec_util.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3871,6 +3871,22 @@ func (m *sessionDataMutator) SetParallelizeMultiKeyLookupJoinsEnabled(val bool)
38713871
m.data.ParallelizeMultiKeyLookupJoinsEnabled = val
38723872
}
38733873

3874+
func (m *sessionDataMutator) SetParallelizeMultiKeyLookupJoinsAvgLookupRatio(val float64) {
3875+
m.data.ParallelizeMultiKeyLookupJoinsAvgLookupRatio = val
3876+
}
3877+
3878+
func (m *sessionDataMutator) SetParallelizeMultiKeyLookupJoinsMaxLookupRatio(val float64) {
3879+
m.data.ParallelizeMultiKeyLookupJoinsMaxLookupRatio = val
3880+
}
3881+
3882+
func (m *sessionDataMutator) SetParallelizeMultiKeyLookupJoinsAvgLookupRowSize(val int64) {
3883+
m.data.ParallelizeMultiKeyLookupJoinsAvgLookupRowSize = val
3884+
}
3885+
3886+
func (m *sessionDataMutator) SetParallelizeMultiKeyLookupJoinsOnlyOnMRMutations(val bool) {
3887+
m.data.ParallelizeMultiKeyLookupJoinsOnlyOnMRMutations = val
3888+
}
3889+
38743890
// TODO(harding): Remove this when costing scans based on average column size
38753891
// is fully supported.
38763892
func (m *sessionDataMutator) SetCostScansWithDefaultColSize(val bool) {

pkg/sql/logictest/testdata/logic_test/information_schema

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4080,7 +4080,11 @@ optimizer_use_provided_ordering_fix on
40804080
optimizer_use_trigram_similarity_optimization on
40814081
optimizer_use_virtual_computed_column_stats on
40824082
override_multi_region_zone_config off
4083+
parallelize_multi_key_lookup_joins_avg_lookup_ratio 10
4084+
parallelize_multi_key_lookup_joins_avg_lookup_row_size 100 KiB
40834085
parallelize_multi_key_lookup_joins_enabled off
4086+
parallelize_multi_key_lookup_joins_max_lookup_ratio 10000
4087+
parallelize_multi_key_lookup_joins_only_on_mr_mutations on
40844088
password_encryption scram-sha-256
40854089
pg_trgm.similarity_threshold 0.3
40864090
plan_cache_mode auto

pkg/sql/logictest/testdata/logic_test/pg_catalog

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3083,7 +3083,11 @@ optimizer_use_provided_ordering_fix on
30833083
optimizer_use_trigram_similarity_optimization on NULL NULL NULL string
30843084
optimizer_use_virtual_computed_column_stats on NULL NULL NULL string
30853085
override_multi_region_zone_config off NULL NULL NULL string
3086+
parallelize_multi_key_lookup_joins_avg_lookup_ratio 10 NULL NULL NULL string
3087+
parallelize_multi_key_lookup_joins_avg_lookup_row_size 100 KiB NULL NULL NULL string
30863088
parallelize_multi_key_lookup_joins_enabled off NULL NULL NULL string
3089+
parallelize_multi_key_lookup_joins_max_lookup_ratio 10000 NULL NULL NULL string
3090+
parallelize_multi_key_lookup_joins_only_on_mr_mutations on NULL NULL NULL string
30873091
password_encryption scram-sha-256 NULL NULL NULL string
30883092
pg_trgm.similarity_threshold 0.3 NULL NULL NULL string
30893093
plan_cache_mode auto NULL NULL NULL string
@@ -3320,7 +3324,11 @@ optimizer_use_provided_ordering_fix on
33203324
optimizer_use_trigram_similarity_optimization on NULL user NULL on on
33213325
optimizer_use_virtual_computed_column_stats on NULL user NULL on on
33223326
override_multi_region_zone_config off NULL user NULL off off
3327+
parallelize_multi_key_lookup_joins_avg_lookup_ratio 10 NULL user NULL 10 10
3328+
parallelize_multi_key_lookup_joins_avg_lookup_row_size 100 KiB B user NULL 100 KiB 100 KiB
33233329
parallelize_multi_key_lookup_joins_enabled off NULL user NULL off off
3330+
parallelize_multi_key_lookup_joins_max_lookup_ratio 10000 NULL user NULL 10000 10000
3331+
parallelize_multi_key_lookup_joins_only_on_mr_mutations on NULL user NULL on on
33243332
password_encryption scram-sha-256 NULL user NULL scram-sha-256 scram-sha-256
33253333
pg_trgm.similarity_threshold 0.3 NULL user NULL 0.3 0.3
33263334
plan_cache_mode auto NULL user NULL auto auto
@@ -3548,7 +3556,11 @@ optimizer_use_provided_ordering_fix NULL NULL
35483556
optimizer_use_trigram_similarity_optimization NULL NULL NULL NULL NULL
35493557
optimizer_use_virtual_computed_column_stats NULL NULL NULL NULL NULL
35503558
override_multi_region_zone_config NULL NULL NULL NULL NULL
3559+
parallelize_multi_key_lookup_joins_avg_lookup_ratio NULL NULL NULL NULL NULL
3560+
parallelize_multi_key_lookup_joins_avg_lookup_row_size NULL NULL NULL NULL NULL
35513561
parallelize_multi_key_lookup_joins_enabled NULL NULL NULL NULL NULL
3562+
parallelize_multi_key_lookup_joins_max_lookup_ratio NULL NULL NULL NULL NULL
3563+
parallelize_multi_key_lookup_joins_only_on_mr_mutations NULL NULL NULL NULL NULL
35523564
password_encryption NULL NULL NULL NULL NULL
35533565
pg_trgm.similarity_threshold NULL NULL NULL NULL NULL
35543566
plan_cache_mode NULL NULL NULL NULL NULL

pkg/sql/logictest/testdata/logic_test/show_source

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,11 @@ optimizer_use_provided_ordering_fix on
187187
optimizer_use_trigram_similarity_optimization on
188188
optimizer_use_virtual_computed_column_stats on
189189
override_multi_region_zone_config off
190+
parallelize_multi_key_lookup_joins_avg_lookup_ratio 10
191+
parallelize_multi_key_lookup_joins_avg_lookup_row_size 100 KiB
190192
parallelize_multi_key_lookup_joins_enabled off
193+
parallelize_multi_key_lookup_joins_max_lookup_ratio 10000
194+
parallelize_multi_key_lookup_joins_only_on_mr_mutations on
191195
password_encryption scram-sha-256
192196
pg_trgm.similarity_threshold 0.3
193197
plan_cache_mode auto

0 commit comments

Comments
 (0)