Skip to content

Commit 80f1c30

Browse files
authored
Display error if user tries to create subsources for single-output load generators (#32015)
Return appropriate error if user tries to create a single-output load generator with subsources (e.g. `FOR ALL TABLES`, `FOR TABLES ...`, `FOR SCHEMAS ...`) ### Motivation Fixes MaterializeInc/database-issues#9028 ### Checklist - [x] This PR has adequate test coverage / QA involvement has been duly considered. ([trigger-ci for additional test/nightly runs](https://trigger-ci.dev.materialize.com/)) - [ ] This PR has an associated up-to-date [design doc](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/README.md), is a design doc ([template](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/00000000_template.md)), or is sufficiently small to not require a design. <!-- Reference the design in the description. --> - [ ] If this PR evolves [an existing `$T ⇔ Proto$T` mapping](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/command-and-response-binary-encoding.md) (possibly in a backwards-incompatible way), then it is tagged with a `T-proto` label. - [ ] If this PR will require changes to cloud orchestration or tests, there is a companion cloud PR to account for those changes that is tagged with the release-blocker label ([example](MaterializeInc/cloud#5021)). <!-- Ask in #team-cloud on Slack if you need help preparing the cloud PR. --> - [ ] If this PR includes major [user-facing behavior changes](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/guide-changes.md#what-changes-require-a-release-note), I have pinged the relevant PM to schedule a changelog post.
1 parent 37c7f7c commit 80f1c30

File tree

5 files changed

+411
-9
lines changed

5 files changed

+411
-9
lines changed

src/sql/src/pure.rs

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1193,20 +1193,32 @@ async fn purify_create_source(
11931193
// Filter to the references that need to be created as 'subsources', which
11941194
// doesn't include the default output for single-output sources.
11951195
// TODO(database-issues#8620): Remove once subsources are removed
1196-
let subsource_references = retrieved_source_references
1197-
.all_references()
1198-
.iter()
1199-
.filter(|r| {
1200-
r.load_generator_output().expect("is loadgen") != &LoadGeneratorOutput::Default
1201-
})
1202-
.collect::<Vec<_>>();
1196+
let multi_output_sources =
1197+
retrieved_source_references
1198+
.all_references()
1199+
.iter()
1200+
.any(|r| {
1201+
r.load_generator_output().expect("is loadgen")
1202+
!= &LoadGeneratorOutput::Default
1203+
});
12031204

12041205
match external_references {
12051206
Some(requested)
12061207
if matches!(reference_policy, SourceReferencePolicy::NotAllowed) =>
12071208
{
12081209
Err(PlanError::UseTablesForSources(requested.to_string()))?
12091210
}
1211+
Some(requested) if !multi_output_sources => match requested {
1212+
ExternalReferences::SubsetTables(_) => {
1213+
Err(LoadGeneratorSourcePurificationError::ForTables)?
1214+
}
1215+
ExternalReferences::SubsetSchemas(_) => {
1216+
Err(LoadGeneratorSourcePurificationError::ForSchemas)?
1217+
}
1218+
ExternalReferences::All => {
1219+
Err(LoadGeneratorSourcePurificationError::ForAllTables)?
1220+
}
1221+
},
12101222
Some(requested) => {
12111223
let requested_exports = retrieved_source_references
12121224
.requested_source_exports(Some(requested), source_name)?;
@@ -1232,8 +1244,8 @@ async fn purify_create_source(
12321244
}
12331245
}
12341246
None => {
1235-
if matches!(reference_policy, SourceReferencePolicy::Required)
1236-
&& !subsource_references.is_empty()
1247+
if multi_output_sources
1248+
&& matches!(reference_policy, SourceReferencePolicy::Required)
12371249
{
12381250
Err(LoadGeneratorSourcePurificationError::MultiOutputRequiresForAllTables)?
12391251
}
Lines changed: 298 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,298 @@
1+
# Copyright Materialize, Inc. and contributors. All rights reserved.
2+
#
3+
# Use of this software is governed by the Business Source License
4+
# included in the LICENSE file at the root of this repository.
5+
#
6+
# As of the Change Date specified in that file, in accordance with
7+
# the Business Source License, use of this software will be governed
8+
# by the Apache License, Version 2.0.
9+
10+
# Tests `LOAD GENERATOR KEY VALUE`
11+
12+
$ set-arg-default default-replica-size=1
13+
14+
$ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
15+
ALTER SYSTEM SET storage_statistics_collection_interval = 1000
16+
ALTER SYSTEM SET storage_statistics_interval = 2000
17+
ALTER SYSTEM SET enable_load_generator_key_value = true
18+
19+
> CREATE CLUSTER lg_cluster SIZE '${arg.default-replica-size}'
20+
21+
$ skip-if
22+
SELECT mz_version_num() < 14300;
23+
24+
# Error if trying to create with subsources
25+
! CREATE SOURCE g FROM LOAD GENERATOR KEY VALUE(
26+
KEYS 8,
27+
PARTITIONS 1,
28+
SNAPSHOT ROUNDS 1,
29+
VALUE SIZE 1,
30+
SEED 42,
31+
BATCH SIZE 4
32+
) FOR ALL TABLES;
33+
contains:FOR ALL TABLES
34+
35+
! CREATE SOURCE g FROM LOAD GENERATOR KEY VALUE(
36+
KEYS 8,
37+
PARTITIONS 1,
38+
SNAPSHOT ROUNDS 1,
39+
VALUE SIZE 1,
40+
SEED 42,
41+
BATCH SIZE 4
42+
) FOR TABLES ("foo");
43+
contains:FOR TABLES
44+
45+
! CREATE SOURCE g FROM LOAD GENERATOR KEY VALUE(
46+
KEYS 8,
47+
PARTITIONS 1,
48+
SNAPSHOT ROUNDS 1,
49+
VALUE SIZE 1,
50+
SEED 42,
51+
BATCH SIZE 4
52+
) FOR SCHEMAS ("foo");
53+
contains:FOR SCHEMAS
54+
55+
$ skip-end
56+
57+
# A loadgen that only snapshots.
58+
> CREATE SOURCE up_no_update
59+
IN CLUSTER lg_cluster
60+
FROM LOAD GENERATOR KEY VALUE (
61+
KEYS 16,
62+
PARTITIONS 4,
63+
SNAPSHOT ROUNDS 3,
64+
SEED 123,
65+
VALUE SIZE 10,
66+
BATCH SIZE 2
67+
)
68+
ENVELOPE UPSERT
69+
70+
> CREATE SOURCE up_quick
71+
IN CLUSTER lg_cluster
72+
FROM LOAD GENERATOR KEY VALUE (
73+
KEYS 16,
74+
PARTITIONS 4,
75+
SNAPSHOT ROUNDS 3,
76+
TRANSACTIONAL SNAPSHOT false,
77+
SEED 123,
78+
VALUE SIZE 10,
79+
BATCH SIZE 2
80+
)
81+
INCLUDE KEY AS whatever
82+
ENVELOPE UPSERT
83+
84+
# Ensure data is spread as expected.
85+
> SELECT partition, count(*) FROM up_no_update GROUP BY partition
86+
0 4
87+
1 4
88+
2 4
89+
3 4
90+
91+
92+
> SELECT MAX(key) FROM up_no_update;
93+
15
94+
95+
> SELECT partition, count(*) FROM up_quick GROUP BY partition
96+
0 4
97+
1 4
98+
2 4
99+
3 4
100+
101+
> SELECT MAX(whatever) FROM up_quick;
102+
15
103+
104+
# 48 values produced (3 snapshot rounds with 16 keys).
105+
# For the `TRANSACTIONAL SNAPSHOT = false` source, we produce 48 updates from the 3 rounds.
106+
# We expect and 6 quick round offsets (based on the batch size)
107+
> SELECT
108+
s.name,
109+
u.offset_known,
110+
u.offset_committed,
111+
u.snapshot_records_known,
112+
u.snapshot_records_staged,
113+
u.messages_received,
114+
u.records_indexed
115+
FROM mz_sources s
116+
JOIN mz_internal.mz_source_statistics u ON s.id = u.id
117+
WHERE s.name IN ('up_no_update', 'up_quick')
118+
up_no_update 3 3 48 48 48 16
119+
up_quick 6 6 0 0 48 16
120+
121+
$ set-from-sql var=pre-rehydration
122+
SELECT
123+
encode(value, 'base64')
124+
FROM up_no_update
125+
WHERE
126+
key = 14
127+
128+
$ set-from-sql var=pre-rehydration-quick
129+
SELECT
130+
encode(value, 'base64')
131+
FROM up_quick
132+
WHERE
133+
whatever = 14
134+
135+
> ALTER CLUSTER lg_cluster SET (REPLICATION FACTOR 0);
136+
137+
> ALTER CLUSTER lg_cluster SET (REPLICATION FACTOR 1);
138+
139+
# Ensure that we rehydrate and keep the same value as before.
140+
> SELECT
141+
encode(value, 'base64') = '${pre-rehydration}'
142+
FROM up_no_update
143+
WHERE
144+
key = 14
145+
true
146+
147+
> SELECT
148+
encode(value, 'base64') = '${pre-rehydration-quick}'
149+
FROM up_quick
150+
WHERE
151+
whatever = 14
152+
true
153+
154+
> SELECT
155+
s.name,
156+
u.offset_known,
157+
u.offset_committed,
158+
u.snapshot_records_known,
159+
u.snapshot_records_staged,
160+
u.messages_received,
161+
u.records_indexed
162+
FROM mz_sources s
163+
JOIN mz_internal.mz_source_statistics u ON s.id = u.id
164+
WHERE s.name IN ('up_no_update', 'up_quick')
165+
up_no_update 3 3 48 48 48 16
166+
up_quick 6 6 0 0 48 16
167+
168+
> DROP SOURCE up_no_update
169+
> DROP SOURCE up_quick
170+
171+
# Create a source with 1s updates after snapshotting.
172+
> CREATE SOURCE up_with_update
173+
IN CLUSTER lg_cluster
174+
FROM LOAD GENERATOR KEY VALUE (
175+
KEYS 16,
176+
PARTITIONS 4,
177+
SNAPSHOT ROUNDS 3,
178+
SEED 123,
179+
VALUE SIZE 10,
180+
BATCH SIZE 2,
181+
TICK INTERVAL '1s'
182+
)
183+
ENVELOPE UPSERT
184+
185+
# Ensure data is partitioned correctly.
186+
> SELECT partition, count(*) FROM up_with_update GROUP BY partition
187+
0 4
188+
1 4
189+
2 4
190+
3 4
191+
192+
# Doesn't work reliably under high load in CI
193+
# Higher offsets than before, as we produce more values.
194+
# > SELECT
195+
# s.name,
196+
# u.offset_known > 3,
197+
# u.offset_committed = u.offset_known,
198+
# u.snapshot_records_known,
199+
# u.snapshot_records_staged,
200+
# u.messages_received > 48,
201+
# u.records_indexed
202+
# FROM mz_sources s
203+
# JOIN mz_internal.mz_source_statistics u ON s.id = u.id
204+
# WHERE s.name IN ('up_with_update')
205+
# up_with_update true true 48 48 true 16
206+
207+
# Also, despite the same seed, values should be different than the snapshot-only source.
208+
> SELECT
209+
encode(value, 'base64') != '${pre-rehydration}'
210+
FROM up_with_update
211+
WHERE
212+
key = 14
213+
true
214+
215+
> ALTER CLUSTER lg_cluster SET (REPLICATION FACTOR 0);
216+
217+
> ALTER CLUSTER lg_cluster SET (REPLICATION FACTOR 1);
218+
219+
$ set-from-sql var=pre-rehydration-with-update
220+
SELECT
221+
encode(value, 'base64')
222+
FROM up_with_update
223+
WHERE
224+
key = 14
225+
226+
# After restarting, we should also still see new updates override values.
227+
> SELECT
228+
encode(value, 'base64') != '${pre-rehydration-with-update}'
229+
FROM up_with_update
230+
WHERE
231+
key = 14
232+
true
233+
234+
# Test NONE-envelope
235+
> CREATE SOURCE kv_none
236+
IN CLUSTER lg_cluster
237+
FROM LOAD GENERATOR KEY VALUE (
238+
KEYS 16,
239+
PARTITIONS 4,
240+
SNAPSHOT ROUNDS 3,
241+
TRANSACTIONAL SNAPSHOT false,
242+
SEED 123,
243+
VALUE SIZE 10,
244+
BATCH SIZE 2
245+
)
246+
ENVELOPE NONE
247+
248+
> SELECT partition, count(*) FROM kv_none GROUP BY partition
249+
0 12
250+
1 12
251+
2 12
252+
3 12
253+
254+
# Test INCLUDE OFFSET
255+
> CREATE SOURCE kv_offset
256+
IN CLUSTER lg_cluster
257+
FROM LOAD GENERATOR KEY VALUE (
258+
KEYS 16,
259+
PARTITIONS 4,
260+
SNAPSHOT ROUNDS 3,
261+
TRANSACTIONAL SNAPSHOT false,
262+
SEED 123,
263+
VALUE SIZE 10,
264+
BATCH SIZE 2
265+
)
266+
INCLUDE OFFSET
267+
ENVELOPE NONE
268+
269+
> SELECT partition, MAX("offset") FROM kv_offset GROUP BY partition
270+
0 5
271+
1 5
272+
2 5
273+
3 5
274+
275+
> CREATE SOURCE kv_offset2
276+
IN CLUSTER lg_cluster
277+
FROM LOAD GENERATOR KEY VALUE (
278+
KEYS 16,
279+
PARTITIONS 4,
280+
SNAPSHOT ROUNDS 3,
281+
TRANSACTIONAL SNAPSHOT false,
282+
SEED 123,
283+
VALUE SIZE 10,
284+
BATCH SIZE 2
285+
)
286+
INCLUDE
287+
OFFSET AS something_else,
288+
KEY AS whatever
289+
ENVELOPE NONE
290+
291+
> SELECT partition, MAX(something_else) FROM kv_offset2 GROUP BY partition
292+
0 5
293+
1 5
294+
2 5
295+
3 5
296+
297+
> SELECT MAX(whatever) FROM kv_offset2;
298+
15

test/testdrive-old-kafka-src-syntax/load-generator.td

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,39 @@ exact:COUNTER load generators do not support SCALE FACTOR values
5050
IN CLUSTER ${arg.single-replica-cluster}
5151
FROM LOAD GENERATOR AUCTION (AS OF 300, UP TO 301);
5252

53+
$ skip-if
54+
SELECT mz_version_num() < 14300;
55+
56+
# Error if trying to create with subsources
57+
! CREATE SOURCE g FROM LOAD GENERATOR COUNTER FOR ALL TABLES;
58+
contains:FOR ALL TABLES is only valid for multi-output sources
59+
60+
! CREATE SOURCE g FROM LOAD GENERATOR CLOCK FOR ALL TABLES;
61+
contains:FOR ALL TABLES is only valid for multi-output sources
62+
63+
! CREATE SOURCE g FROM LOAD GENERATOR DATUMS FOR ALL TABLES;
64+
contains:FOR ALL TABLES is only valid for multi-output sources
65+
66+
! CREATE SOURCE g FROM LOAD GENERATOR COUNTER FOR TABLES ("foo");
67+
regex:.*FOR TABLES.*unsupported
68+
69+
! CREATE SOURCE g FROM LOAD GENERATOR CLOCK FOR TABLES ("foo");
70+
regex:.*FOR TABLES.*unsupported
71+
72+
! CREATE SOURCE g FROM LOAD GENERATOR DATUMS FOR TABLES ("foo");
73+
regex:.*FOR TABLES.*unsupported
74+
75+
! CREATE SOURCE g FROM LOAD GENERATOR COUNTER FOR SCHEMAS ("foo");
76+
regex:.*FOR SCHEMAS.*unsupported
77+
78+
! CREATE SOURCE g FROM LOAD GENERATOR CLOCK FOR SCHEMAS ("foo");
79+
regex:.*FOR SCHEMAS.*unsupported
80+
81+
! CREATE SOURCE g FROM LOAD GENERATOR DATUMS FOR SCHEMAS ("foo");
82+
regex:.*FOR SCHEMAS.*unsupported
83+
84+
# skip-end
85+
5386
> CREATE TABLE accounts FROM SOURCE auction_house (REFERENCE accounts);
5487
> CREATE TABLE auctions FROM SOURCE auction_house (REFERENCE auctions);
5588
> CREATE TABLE bids FROM SOURCE auction_house (REFERENCE bids);

0 commit comments

Comments
 (0)