Skip to content

Commit 691f7a8

Browse files
craig[bot]yuzefovich
andcommitted
Merge #148186
148186: execbuilder: add "average lookup ratio" parallelization heuristic for lookup joins r=yuzefovich a=yuzefovich **sql: use correct context in recursive CTE iterations** Previously, the `execbuilder.Builder` that we use in recursive CTE iterations referenced the same context that the Builder captured on the main query path. This is problematic since that context might have a tracing span that's already been finished. This commit fixes this issue by explicitly passing the context argument into the iteration function. This was only exposed because we added some logging in the following commit but has been present for a while now. **logictest: extend a few EXPLAIN tests a bit** This commit adjusts a few existing EXPLAIN tests to use VERBOSE as well as adds another RBR setup with CASCADE FKs. These will be used to highlight the parallelization change in the multi-key lookup joins in the following commit. **execbuilder: add "average lookup ratio" parallelization heuristic for lookup joins** The DistSender API forces its users to make a choice between cross-range parallelism (which is needed for performance) and setting memory limits (which is needed for stability). Streamer was introduced to address this limitation, but it comes with some requirements, one of which is that it needs to have access to LeafTxns. However, mutation statements must run in the RootTxn, so we never use the Streamer for those and fall back to using the DistSender API directly (via `txnKVFetcher`). There, we currently have the following heuristic: - if we know that each input row results in at most one lookup row, then we consider such a lookup to be "safe" parallelization, so we disable usage of memory limits on the BatchHeader. This is the case for index joins (when we always expect to get exactly one looked up row) as well as lookup joins that have "equality columns are key" property (when we expect at most one looked up row). - otherwise, if we have a multi-key lookup join, we use the default fetcher memory limits (TargetBytes of 10MiB), which disables cross-range parallelism. Most commonly this will affect mutation statements and will have a more pronouanced effect on the multi-region tables, so this commit extends the heuristic for when we consider it to be "safe" for parallelization. Namely, we now calculate the average lookup ratio based on the lookup equality columns and the available table / column statistics, and if the ratio doesn't exceed the allowed limit, then we'll enable the parallelism. To a certain degree, this heuristic resembles the "equality columns are key" heuristic that we already utilize, but instead of a guaranteed maximum on the lookup ratio we use the estimated average. What we're trying to prevent with the existing and the new heuristics is the case when we construct such a KV batch that the KV response will overwhelm (read "will OOM") the node issuing the KV batch. In the existing heuristic we say that "if lookup ratio is guaranteed to not exceed one, then it should be safe". I believe that the new heuristic should be safe in practice for most deployments due to the following reasons: - we already have an implicit limiting behavior in the join reader due to its execution model (it first buffers some number of rows, up to 2MiB in size when not using the streamer, deduplicates the lookup spans, and performs the lookup of all those spans in a single KV batch). Empirical testing shows that we expect to have at most 25k lookups in that single KV batch. - this will have impact only when the streamer is not used, which most commonly will mean we're executing a mutation, and in our docs we advocate for not performing large mutations. (I'm stretching things a bit here since even if we modify small amount of data, to compute that we might read a lot, which could be destabilizing if we disable KV limits. Yet a similar argument could be made that our current "equality columns are key" heuristic is not safe - it's possible to construct a scenario where we look up large amounts of data.) In order to prevent this new heuristic from exploding in some edge cases, two guardrails are added: - in order to handle a scenario where the lookup ratio is not evenly distributed (i.e. different input rows can result in vastly different number of looked up rows), we'll disable the heuristic if the max lookup ratio exceeds the allowed limit. - in order to handle a scenario where looked rows are very large, we'll disable the heuristic if the estimated average lookup row size exceeds the allowed limit. (Note that we don't have this kind of protection in the existing heuristics.) I plan to do some more empirical runs to fine-tune the default values of the newly added session variables, but the current defaults are: - `parallelize_multi_key_lookup_joins_avg_lookup_ratio = 10` - `parallelize_multi_key_lookup_joins_max_lookup_ratio = 10000` - `parallelize_multi_key_lookup_joins_avg_lookup_row_size = 100 KiB`. In order to de-risk rollout of this feature, we will initially apply the new heuristic only to mutations of multi-region tables. New session variable `parallelize_multi_key_lookup_joins_only_on_mr_mutations` can be set to `false` to apply the heuristic to all statements, regardless of the table being multi-region. Fixes: #134351. Epic: CRDB-44104 Release note (performance improvement): Mutation statements (UPDATEs and DELETEs) that perform lookup joins into multi-region tables (perhaps as part of a CASCADE) are now more likely to parallelize the lookups across ranges which improves their performance. Co-authored-by: Yahor Yuzefovich <[email protected]>
2 parents 4e225c5 + 30d7b34 commit 691f7a8

File tree

21 files changed

+796
-74
lines changed

21 files changed

+796
-74
lines changed
Lines changed: 335 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,335 @@
1+
# LogicTest: multiregion-9node-3region-3azs
2+
3+
# Set the closed timestamp interval to be short to shorten the amount of time
4+
# we need to wait for the system config to propagate.
5+
statement ok
6+
SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '10ms';
7+
8+
statement ok
9+
SET CLUSTER SETTING kv.closed_timestamp.target_duration = '10ms';
10+
11+
statement ok
12+
SET CLUSTER SETTING kv.rangefeed.closed_timestamp_refresh_interval = '10ms';
13+
14+
statement ok
15+
CREATE DATABASE multi_region_test_db PRIMARY REGION "ca-central-1" REGIONS "ap-southeast-2", "us-east-1" SURVIVE REGION FAILURE;
16+
17+
statement ok
18+
USE multi_region_test_db
19+
20+
statement ok
21+
CREATE TABLE great_grandparent (
22+
i INT NOT NULL PRIMARY KEY,
23+
gg INT NOT NULL,
24+
UNIQUE INDEX (gg),
25+
FAMILY (i, gg)
26+
) LOCALITY REGIONAL BY ROW;
27+
28+
statement ok
29+
CREATE TABLE grandparent (
30+
g INT NOT NULL PRIMARY KEY,
31+
gg INT NOT NULL REFERENCES great_grandparent (gg) ON DELETE CASCADE ON UPDATE CASCADE,
32+
INDEX (gg),
33+
FAMILY (g, gg)
34+
) LOCALITY REGIONAL BY ROW;
35+
36+
statement ok
37+
CREATE TABLE parent (
38+
p INT NOT NULL PRIMARY KEY,
39+
g INT NOT NULL REFERENCES grandparent (g) ON DELETE CASCADE ON UPDATE CASCADE,
40+
INDEX (g),
41+
FAMILY (p, g)
42+
) LOCALITY REGIONAL BY ROW;
43+
44+
statement ok
45+
CREATE TABLE child (
46+
c INT NOT NULL PRIMARY KEY,
47+
p INT NOT NULL REFERENCES parent (p) ON DELETE CASCADE ON UPDATE CASCADE,
48+
INDEX (p),
49+
FAMILY (c, p)
50+
) LOCALITY REGIONAL BY ROW;
51+
52+
statement ok
53+
INSERT INTO great_grandparent (i, gg, crdb_region) VALUES (1, 1, 'us-east-1'), (2, 2, 'us-east-1'), (3, 3, 'us-east-1');
54+
INSERT INTO grandparent (g, gg, crdb_region) VALUES (10, 1, 'us-east-1'), (20, 2, 'us-east-1'), (30, 3, 'us-east-1');
55+
INSERT INTO parent (p, g, crdb_region) VALUES (100, 10, 'us-east-1'), (200, 20, 'us-east-1'), (300, 30, 'us-east-1');
56+
INSERT INTO child (c, p, crdb_region) VALUES (1000, 100, 'us-east-1'), (2000, 200, 'us-east-1'), (3000, 300, 'us-east-1');
57+
58+
statement ok
59+
ANALYZE great_grandparent;
60+
61+
# Only the scan in the main query is parallelized when we don't have stats on
62+
# the descendant tables.
63+
query I
64+
SELECT count(*) FROM [EXPLAIN (VERBOSE) DELETE FROM great_grandparent WHERE i = 1] WHERE info LIKE '%parallel%';
65+
----
66+
1
67+
68+
statement ok
69+
ANALYZE grandparent;
70+
71+
# Now we also should parallelize lookup join into the grandparent table.
72+
query I
73+
SELECT count(*) FROM [EXPLAIN (VERBOSE) DELETE FROM great_grandparent WHERE i = 1] WHERE info LIKE '%parallel%';
74+
----
75+
2
76+
77+
statement ok
78+
ANALYZE parent;
79+
80+
# Now we also should parallelize lookup join into the parent table.
81+
query I
82+
SELECT count(*) FROM [EXPLAIN (VERBOSE) DELETE FROM great_grandparent WHERE i = 1] WHERE info LIKE '%parallel%';
83+
----
84+
3
85+
86+
statement ok
87+
ANALYZE child;
88+
89+
# Finally, all three lookup joins as well as the scan in the main query should
90+
# be parallelized.
91+
query T
92+
EXPLAIN (VERBOSE) DELETE FROM great_grandparent WHERE i = 1;
93+
----
94+
distribution: local
95+
vectorized: true
96+
·
97+
• root
98+
│ columns: ()
99+
100+
├── • delete
101+
│ │ columns: ()
102+
│ │ estimated row count: 0 (missing stats)
103+
│ │ from: great_grandparent
104+
│ │
105+
│ └── • buffer
106+
│ │ columns: (i, gg, crdb_region)
107+
│ │ label: buffer 1
108+
│ │
109+
│ └── • union all
110+
│ │ columns: (i, gg, crdb_region)
111+
│ │ estimated row count: 1
112+
│ │ limit: 1
113+
│ │
114+
│ ├── • scan
115+
│ │ columns: (i, gg, crdb_region)
116+
│ │ estimated row count: 0 (<0.01% of the table; stats collected <hidden> ago)
117+
│ │ table: great_grandparent@great_grandparent_pkey
118+
│ │ spans: /"@"/1/0
119+
│ │
120+
│ └── • scan
121+
│ columns: (i, gg, crdb_region)
122+
│ estimated row count: 1 (33% of the table; stats collected <hidden> ago)
123+
│ table: great_grandparent@great_grandparent_pkey
124+
│ spans: /"\x80"/1/0 /"\xc0"/1/0
125+
│ parallel
126+
127+
└── • fk-cascade
128+
│ fk: grandparent_gg_fkey
129+
130+
└── • root
131+
│ columns: ()
132+
133+
├── • delete
134+
│ │ columns: ()
135+
│ │ estimated row count: 0 (missing stats)
136+
│ │ from: grandparent
137+
│ │
138+
│ └── • buffer
139+
│ │ columns: (g, gg, crdb_region)
140+
│ │ label: buffer 1
141+
│ │
142+
│ └── • project
143+
│ │ columns: (g, gg, crdb_region)
144+
│ │
145+
│ └── • lookup join (inner)
146+
│ │ columns: (gg, g, gg, crdb_region)
147+
│ │ estimated row count: 3
148+
│ │ table: grandparent@grandparent_gg_idx
149+
│ │ lookup condition: (crdb_region IN ('ap-southeast-2', 'ca-central-1', 'us-east-1')) AND (gg = gg)
150+
│ │ parallel
151+
│ │
152+
│ └── • distinct
153+
│ │ columns: (gg)
154+
│ │ estimated row count: 10
155+
│ │ distinct on: gg
156+
│ │
157+
│ └── • project
158+
│ │ columns: (gg)
159+
│ │
160+
│ └── • scan buffer
161+
│ columns: (i, gg, crdb_region)
162+
│ estimated row count: 100
163+
│ label: buffer 1000000
164+
165+
└── • fk-cascade
166+
│ fk: parent_g_fkey
167+
168+
└── • root
169+
│ columns: ()
170+
171+
├── • delete
172+
│ │ columns: ()
173+
│ │ estimated row count: 0 (missing stats)
174+
│ │ from: parent
175+
│ │
176+
│ └── • buffer
177+
│ │ columns: (p, g, crdb_region)
178+
│ │ label: buffer 1
179+
│ │
180+
│ └── • project
181+
│ │ columns: (p, g, crdb_region)
182+
│ │
183+
│ └── • lookup join (inner)
184+
│ │ columns: (g, p, g, crdb_region)
185+
│ │ estimated row count: 3
186+
│ │ table: parent@parent_g_idx
187+
│ │ lookup condition: (crdb_region IN ('ap-southeast-2', 'ca-central-1', 'us-east-1')) AND (g = g)
188+
│ │ parallel
189+
│ │
190+
│ └── • distinct
191+
│ │ columns: (g)
192+
│ │ estimated row count: 10
193+
│ │ distinct on: g
194+
│ │
195+
│ └── • project
196+
│ │ columns: (g)
197+
│ │
198+
│ └── • scan buffer
199+
│ columns: (g, gg, crdb_region)
200+
│ estimated row count: 100
201+
│ label: buffer 1000000
202+
203+
└── • fk-cascade
204+
│ fk: child_p_fkey
205+
206+
└── • delete
207+
│ columns: ()
208+
│ estimated row count: 0 (missing stats)
209+
│ from: child
210+
211+
└── • project
212+
│ columns: (c, p, crdb_region)
213+
214+
└── • lookup join (inner)
215+
│ columns: (p, c, p, crdb_region)
216+
│ estimated row count: 3
217+
│ table: child@child_p_idx
218+
│ lookup condition: (crdb_region IN ('ap-southeast-2', 'ca-central-1', 'us-east-1')) AND (p = p)
219+
│ parallel
220+
221+
└── • distinct
222+
│ columns: (p)
223+
│ estimated row count: 10
224+
│ distinct on: p
225+
226+
└── • project
227+
│ columns: (p)
228+
229+
└── • scan buffer
230+
columns: (p, g, crdb_region)
231+
estimated row count: 100
232+
label: buffer 1000000
233+
234+
statement ok
235+
SET parallelize_multi_key_lookup_joins_avg_lookup_ratio = 0;
236+
237+
# Only the scan in the main query is parallelized when the "average lookup
238+
# ratio" heuristic is disabled.
239+
query I
240+
SELECT count(*) FROM [EXPLAIN (VERBOSE) DELETE FROM great_grandparent WHERE i = 1] WHERE info LIKE '%parallel%';
241+
----
242+
1
243+
244+
statement ok
245+
RESET parallelize_multi_key_lookup_joins_avg_lookup_ratio;
246+
247+
# All three lookup joins as well as the scan in the main query should be
248+
# parallelized.
249+
query I
250+
SELECT count(*) FROM [EXPLAIN (VERBOSE) DELETE FROM great_grandparent WHERE i = 1] WHERE info LIKE '%parallel%';
251+
----
252+
4
253+
254+
# Inject the table stats for grandparent table to simulate the case when each
255+
# region stores 100k rows each. The lookup into the table should still be
256+
# parallelized (if it's not, then we're using the wrong ColumnIDs when
257+
# retrieving column stats).
258+
statement ok
259+
ALTER TABLE grandparent INJECT STATISTICS '[
260+
{
261+
"avg_size": 4,
262+
"columns": [
263+
"crdb_region"
264+
],
265+
"created_at": "2025-01-01 00:00:00.000000",
266+
"distinct_count": 3,
267+
"histo_col_type": "",
268+
"name": "__auto__",
269+
"null_count": 0,
270+
"row_count": 300000
271+
},
272+
{
273+
"avg_size": 2,
274+
"columns": [
275+
"gg"
276+
],
277+
"created_at": "2025-01-01 00:00:00.000000",
278+
"distinct_count": 300000,
279+
"histo_buckets": [
280+
{"distinct_range": 0, "num_eq": 1, "num_range": 0, "upper_bound": "1"},
281+
{"distinct_range": 299999, "num_eq": 1, "num_range": 299999, "upper_bound": "300000"}
282+
],
283+
"histo_col_type": "INT8",
284+
"histo_version": 3,
285+
"name": "__auto__",
286+
"null_count": 0,
287+
"row_count": 300000
288+
}
289+
]'
290+
291+
query I
292+
SELECT count(*) FROM [EXPLAIN (VERBOSE) DELETE FROM great_grandparent WHERE i = 1] WHERE info LIKE '%parallel%';
293+
----
294+
4
295+
296+
# Now simulate a scenario where many rows have NULLs in the lookup column 'gg'.
297+
# The lookup into the table should still be parallelized (if it's not, then
298+
# we're incorrectly considering NULLs in the heuristic).
299+
statement ok
300+
ALTER TABLE grandparent INJECT STATISTICS '[
301+
{
302+
"avg_size": 4,
303+
"columns": [
304+
"crdb_region"
305+
],
306+
"created_at": "2025-01-01 00:00:00.000000",
307+
"distinct_count": 3,
308+
"histo_col_type": "",
309+
"name": "__auto__",
310+
"null_count": 0,
311+
"row_count": 1000000
312+
},
313+
{
314+
"avg_size": 2,
315+
"columns": [
316+
"gg"
317+
],
318+
"created_at": "2025-01-01 00:00:00.000000",
319+
"distinct_count": 300000,
320+
"histo_buckets": [
321+
{"distinct_range": 0, "num_eq": 1, "num_range": 0, "upper_bound": "1"},
322+
{"distinct_range": 299999, "num_eq": 1, "num_range": 299999, "upper_bound": "300000"}
323+
],
324+
"histo_col_type": "INT8",
325+
"histo_version": 3,
326+
"name": "__auto__",
327+
"null_count": 700000,
328+
"row_count": 1000000
329+
}
330+
]'
331+
332+
query I
333+
SELECT count(*) FROM [EXPLAIN (VERBOSE) DELETE FROM great_grandparent WHERE i = 1] WHERE info LIKE '%parallel%';
334+
----
335+
4

0 commit comments

Comments
 (0)