Skip to content

Commit 363bcc0

Browse files
authored
[ManagedIO] Fail expansion when encountering extra or unknown configuration (#34525)
* fail expansion when encountering extra or unknown configuration * add another test * spotless * add skip validation option * tests * new api instead of pipeline option * extend to python * trigger tests * default to false * fix * remove test case not applicable for unit test * address comments * fix after merge HEAD
1 parent fe2bc29 commit 363bcc0

File tree

7 files changed

+207
-24
lines changed

7 files changed

+207
-24
lines changed

sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -304,11 +304,6 @@ public void testBuildTransformWithManaged() {
304304
+ "bootstrap_servers: some bootstrap\n"
305305
+ "schema: '{\"type\":\"record\",\"name\":\"my_record\",\"fields\":[{\"name\":\"bool\",\"type\":\"boolean\"}]}'",
306306
"topic: topic_3\n"
307-
+ "bootstrap_servers: some bootstrap\n"
308-
+ "schema_registry_url: some-url\n"
309-
+ "schema_registry_subject: some-subject\n"
310-
+ "format: RAW",
311-
"topic: topic_4\n"
312307
+ "bootstrap_servers: some bootstrap\n"
313308
+ "format: PROTO\n"
314309
+ "schema: '"

sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,8 @@ public abstract static class ManagedTransform extends PTransform<PInput, PCollec
176176
@VisibleForTesting
177177
abstract List<String> getSupportedIdentifiers();
178178

179+
abstract @Nullable Boolean getSkipConfigValidation();
180+
179181
abstract Builder toBuilder();
180182

181183
@AutoValue.Builder
@@ -189,6 +191,8 @@ abstract static class Builder {
189191
@VisibleForTesting
190192
abstract Builder setSupportedIdentifiers(List<String> supportedIdentifiers);
191193

194+
abstract Builder setSkipConfigValidation(boolean skip);
195+
192196
abstract ManagedTransform build();
193197
}
194198

@@ -215,6 +219,14 @@ ManagedTransform withSupportedIdentifiers(List<String> supportedIdentifiers) {
215219
return toBuilder().setSupportedIdentifiers(supportedIdentifiers).build();
216220
}
217221

222+
/**
223+
* Skips configuration validation. If unset, the pipeline will fail at construction time if the
224+
* configuration includes unknown fields or missing required fields.
225+
*/
226+
public ManagedTransform skipConfigValidation() {
227+
return toBuilder().setSkipConfigValidation(true).build();
228+
}
229+
218230
@Override
219231
public PCollectionRowTuple expand(PInput input) {
220232
PCollectionRowTuple inputTuple = resolveInput(input);
@@ -224,6 +236,7 @@ public PCollectionRowTuple expand(PInput input) {
224236
.setTransformIdentifier(getIdentifier())
225237
.setConfig(YamlUtils.yamlStringFromMap(getConfig()))
226238
.setConfigUrl(getConfigUrl())
239+
.setSkipConfigValidation(getSkipConfigValidation())
227240
.build();
228241

229242
SchemaTransform underlyingTransform =

sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java

Lines changed: 55 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,19 @@
2626
import java.io.InputStream;
2727
import java.nio.ByteBuffer;
2828
import java.nio.charset.StandardCharsets;
29+
import java.util.ArrayList;
2930
import java.util.Arrays;
3031
import java.util.Collection;
3132
import java.util.HashMap;
33+
import java.util.HashSet;
3234
import java.util.List;
3335
import java.util.Map;
3436
import java.util.ServiceLoader;
37+
import java.util.Set;
3538
import javax.annotation.Nullable;
3639
import org.apache.beam.sdk.io.FileSystems;
3740
import org.apache.beam.sdk.io.fs.MatchResult;
41+
import org.apache.beam.sdk.options.PipelineOptions;
3842
import org.apache.beam.sdk.schemas.AutoValueSchema;
3943
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
4044
import org.apache.beam.sdk.schemas.Schema;
@@ -98,6 +102,11 @@ public static Builder builder() {
98102
@SchemaFieldDescription("YAML string config used to build the underlying SchemaTransform.")
99103
public abstract @Nullable String getConfig();
100104

105+
@SchemaFieldDescription(
106+
"Skips configuration validation. If unset, the pipeline will fail at construction "
107+
+ "time if the configuration includes unknown fields or missing required fields.")
108+
public abstract @Nullable Boolean getSkipConfigValidation();
109+
101110
@AutoValue.Builder
102111
public abstract static class Builder {
103112
public abstract Builder setTransformIdentifier(String identifier);
@@ -106,6 +115,8 @@ public abstract static class Builder {
106115

107116
public abstract Builder setConfig(@Nullable String yamlConfig);
108117

118+
public abstract Builder setSkipConfigValidation(@Nullable Boolean skip);
119+
109120
public abstract ManagedConfig build();
110121
}
111122

@@ -154,28 +165,21 @@ protected SchemaTransform from(ManagedConfig managedConfig) {
154165

155166
static class ManagedSchemaTransform extends SchemaTransform {
156167
private final ManagedConfig managedConfig;
157-
private final Row underlyingRowConfig;
158168
private final SchemaTransformProvider underlyingTransformProvider;
159169

160170
ManagedSchemaTransform(
161171
ManagedConfig managedConfig, SchemaTransformProvider underlyingTransformProvider) {
162-
// parse config before expansion to check if it matches underlying transform's config schema
163-
Schema transformConfigSchema = underlyingTransformProvider.configurationSchema();
164-
Row underlyingRowConfig;
165-
try {
166-
underlyingRowConfig = getRowConfig(managedConfig, transformConfigSchema);
167-
} catch (Exception e) {
168-
throw new IllegalArgumentException(
169-
"Encountered an error when retrieving a configuration", e);
170-
}
171-
172-
this.underlyingRowConfig = underlyingRowConfig;
173172
this.underlyingTransformProvider = underlyingTransformProvider;
174173
this.managedConfig = managedConfig;
175174
}
176175

177176
@Override
178177
public PCollectionRowTuple expand(PCollectionRowTuple input) {
178+
Row underlyingRowConfig =
179+
getRowConfig(
180+
managedConfig,
181+
underlyingTransformProvider.configurationSchema(),
182+
input.getPipeline().getOptions());
179183
LOG.debug(
180184
"Building transform \"{}\" with configuration: {}",
181185
underlyingTransformProvider.identifier(),
@@ -184,6 +188,7 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
184188
return input.apply(underlyingTransformProvider.from(underlyingRowConfig));
185189
}
186190

191+
@VisibleForTesting
187192
public ManagedConfig getManagedConfig() {
188193
return this.managedConfig;
189194
}
@@ -206,7 +211,8 @@ Row getConfigurationRow() {
206211
// May return an empty row (perhaps the underlying transform doesn't have any required
207212
// parameters)
208213
@VisibleForTesting
209-
static Row getRowConfig(ManagedConfig config, Schema transformSchema) {
214+
static Row getRowConfig(
215+
ManagedConfig config, Schema transformConfigSchema, PipelineOptions options) {
210216
Map<String, Object> configMap = config.resolveUnderlyingConfig();
211217
// Build a config Row that will be used to build the underlying SchemaTransform.
212218
// If a mapping for the SchemaTransform exists, we use it to update parameter names to align
@@ -225,7 +231,42 @@ static Row getRowConfig(ManagedConfig config, Schema transformSchema) {
225231
configMap = remappedConfig;
226232
}
227233

228-
return YamlUtils.toBeamRow(configMap, transformSchema, false);
234+
@Nullable Boolean skipValidation = config.getSkipConfigValidation();
235+
if (skipValidation == null || !skipValidation) {
236+
validateUserConfig(
237+
config.getTransformIdentifier(),
238+
new HashSet<>(configMap.keySet()),
239+
transformConfigSchema);
240+
}
241+
242+
return YamlUtils.toBeamRow(configMap, transformConfigSchema, false);
243+
}
244+
245+
static void validateUserConfig(
246+
String transformId, Set<String> userParams, Schema transformConfigSchema) {
247+
List<String> missingRequiredFields = new ArrayList<>();
248+
for (Schema.Field field : transformConfigSchema.getFields()) {
249+
boolean inUserConfig = userParams.remove(field.getName());
250+
if (!field.getType().getNullable() && !inUserConfig) {
251+
missingRequiredFields.add(field.getName());
252+
}
253+
}
254+
255+
if (!missingRequiredFields.isEmpty() || !userParams.isEmpty()) {
256+
String msg = "Invalid config for transform '" + transformId + "':";
257+
if (!missingRequiredFields.isEmpty()) {
258+
msg += " Missing required fields: " + missingRequiredFields + ".";
259+
}
260+
if (!userParams.isEmpty()) {
261+
msg +=
262+
" Contains unknown fields: "
263+
+ userParams
264+
+ ". If you'd still like to pass "
265+
+ "these fields, use '.skipConfigValidation()'";
266+
}
267+
268+
throw new IllegalArgumentException(msg);
269+
}
229270
}
230271

231272
public static Map<String, Map<String, String>> getAliases() {

sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/testing/TestSchemaTransformProvider.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
*/
1818
package org.apache.beam.sdk.managed.testing;
1919

20+
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects.firstNonNull;
21+
2022
import com.google.auto.service.AutoService;
2123
import com.google.auto.value.AutoValue;
2224
import org.apache.beam.sdk.schemas.AutoValueSchema;
@@ -31,6 +33,7 @@
3133
import org.apache.beam.sdk.values.PCollectionRowTuple;
3234
import org.apache.beam.sdk.values.Row;
3335
import org.apache.beam.sdk.values.TypeDescriptors;
36+
import org.checkerframework.checker.nullness.qual.Nullable;
3437

3538
@AutoService(SchemaTransformProvider.class)
3639
public class TestSchemaTransformProvider
@@ -52,19 +55,27 @@ public static Builder builder() {
5255
@SchemaFieldDescription("Integer to add to each row element.")
5356
public abstract Integer getExtraInteger();
5457

58+
@SchemaFieldDescription("If true, will upper case the extra string. Default is false.")
59+
public abstract @Nullable Boolean getToggleUppercase();
60+
5561
@AutoValue.Builder
5662
public abstract static class Builder {
5763
public abstract Builder setExtraString(String extraString);
5864

5965
public abstract Builder setExtraInteger(Integer extraInteger);
6066

67+
public abstract Builder setToggleUppercase(Boolean toggleUppercase);
68+
6169
public abstract Config build();
6270
}
6371
}
6472

6573
@Override
6674
public SchemaTransform from(Config config) {
67-
String extraString = config.getExtraString();
75+
String extraString =
76+
firstNonNull(config.getToggleUppercase(), false)
77+
? config.getExtraString().toUpperCase()
78+
: config.getExtraString();
6879
Integer extraInteger = config.getExtraInteger();
6980
return new SchemaTransform() {
7081
@Override

sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProviderTest.java

Lines changed: 110 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,16 @@
2424
import java.net.URISyntaxException;
2525
import java.nio.file.Paths;
2626
import java.util.Arrays;
27+
import java.util.Map;
28+
import org.apache.beam.sdk.Pipeline;
2729
import org.apache.beam.sdk.managed.testing.TestSchemaTransformProvider;
30+
import org.apache.beam.sdk.options.PipelineOptionsFactory;
2831
import org.apache.beam.sdk.schemas.Schema;
32+
import org.apache.beam.sdk.schemas.utils.YamlUtils;
33+
import org.apache.beam.sdk.transforms.Create;
34+
import org.apache.beam.sdk.values.PCollectionRowTuple;
2935
import org.apache.beam.sdk.values.Row;
36+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
3037
import org.junit.Rule;
3138
import org.junit.Test;
3239
import org.junit.rules.ExpectedException;
@@ -36,6 +43,8 @@
3643
@RunWith(JUnit4.class)
3744
public class ManagedSchemaTransformProviderTest {
3845
@Rule public transient ExpectedException thrown = ExpectedException.none();
46+
private static final Schema EMPTY_SCHEMA = Schema.builder().build();
47+
private static final Row EMPTY_ROW = Row.nullRow(EMPTY_SCHEMA);
3948

4049
@Test
4150
public void testFailWhenNoConfigSpecified() {
@@ -49,6 +58,103 @@ public void testFailWhenNoConfigSpecified() {
4958
config.validate();
5059
}
5160

61+
@Test
62+
public void testFailWhenUnknownFieldsSpecified() {
63+
Map<String, Object> config =
64+
ImmutableMap.of(
65+
"extra_string",
66+
"str",
67+
"extra_integer",
68+
123,
69+
"toggle_uppercase",
70+
true,
71+
"unknown_field",
72+
"unknown");
73+
ManagedSchemaTransformProvider.ManagedConfig managedConfig =
74+
ManagedSchemaTransformProvider.ManagedConfig.builder()
75+
.setTransformIdentifier(TestSchemaTransformProvider.IDENTIFIER)
76+
.setConfig(YamlUtils.yamlStringFromMap(config))
77+
.build();
78+
79+
thrown.expect(IllegalArgumentException.class);
80+
thrown.expectMessage("Invalid config for transform");
81+
thrown.expectMessage(TestSchemaTransformProvider.IDENTIFIER);
82+
thrown.expectMessage("Contains unknown fields");
83+
thrown.expectMessage("unknown_field");
84+
Pipeline p = Pipeline.create();
85+
new ManagedSchemaTransformProvider(null)
86+
.from(managedConfig)
87+
.expand(
88+
PCollectionRowTuple.of(
89+
"input", p.apply(Create.of(EMPTY_ROW).withRowSchema(EMPTY_SCHEMA))));
90+
}
91+
92+
@Test
93+
public void testFailWhenMissingRequiredFields() {
94+
Map<String, Object> config = ImmutableMap.of("extra_string", "str", "toggle_uppercase", true);
95+
ManagedSchemaTransformProvider.ManagedConfig managedConfig =
96+
ManagedSchemaTransformProvider.ManagedConfig.builder()
97+
.setTransformIdentifier(TestSchemaTransformProvider.IDENTIFIER)
98+
.setConfig(YamlUtils.yamlStringFromMap(config))
99+
.build();
100+
101+
thrown.expect(IllegalArgumentException.class);
102+
thrown.expectMessage("Invalid config for transform");
103+
thrown.expectMessage(TestSchemaTransformProvider.IDENTIFIER);
104+
thrown.expectMessage("Missing required fields");
105+
thrown.expectMessage("extra_integer");
106+
Pipeline p = Pipeline.create();
107+
new ManagedSchemaTransformProvider(null)
108+
.from(managedConfig)
109+
.expand(
110+
PCollectionRowTuple.of(
111+
"input", p.apply(Create.of(EMPTY_ROW).withRowSchema(EMPTY_SCHEMA))));
112+
}
113+
114+
@Test
115+
public void testPassWhenMissingNullableFields() {
116+
Map<String, Object> config = ImmutableMap.of("extra_string", "str", "extra_integer", 123);
117+
ManagedSchemaTransformProvider.ManagedConfig managedConfig =
118+
ManagedSchemaTransformProvider.ManagedConfig.builder()
119+
.setTransformIdentifier(TestSchemaTransformProvider.IDENTIFIER)
120+
.setConfig(YamlUtils.yamlStringFromMap(config))
121+
.build();
122+
123+
Pipeline p = Pipeline.create();
124+
new ManagedSchemaTransformProvider(null)
125+
.from(managedConfig)
126+
.expand(
127+
PCollectionRowTuple.of(
128+
"input", p.apply(Create.of(EMPTY_ROW).withRowSchema(EMPTY_SCHEMA))));
129+
}
130+
131+
@Test
132+
public void testSkipConfigValidationWithUnknownFields() {
133+
Map<String, Object> config =
134+
ImmutableMap.of(
135+
"extra_string",
136+
"str",
137+
"extra_integer",
138+
123,
139+
"toggle_uppercase",
140+
true,
141+
"unknown_field",
142+
"unknown");
143+
ManagedSchemaTransformProvider.ManagedConfig managedConfig =
144+
ManagedSchemaTransformProvider.ManagedConfig.builder()
145+
.setTransformIdentifier(TestSchemaTransformProvider.IDENTIFIER)
146+
.setConfig(YamlUtils.yamlStringFromMap(config))
147+
.setSkipConfigValidation(true)
148+
.build();
149+
150+
Pipeline p = Pipeline.create();
151+
new ManagedSchemaTransformProvider(null)
152+
.from(managedConfig)
153+
.expand(
154+
PCollectionRowTuple.of(
155+
"input", p.apply(Create.of(EMPTY_ROW).withRowSchema(EMPTY_SCHEMA))));
156+
}
157+
52158
@Test
53159
public void testGetConfigRowFromYamlString() {
54160
String yamlString = "extra_string: abc\n" + "extra_integer: 123";
@@ -65,7 +171,8 @@ public void testGetConfigRowFromYamlString() {
65171
.build();
66172

67173
Row returnedRow =
68-
ManagedSchemaTransformProvider.getRowConfig(config, TestSchemaTransformProvider.SCHEMA);
174+
ManagedSchemaTransformProvider.getRowConfig(
175+
config, TestSchemaTransformProvider.SCHEMA, PipelineOptionsFactory.create());
69176

70177
assertEquals(expectedRow, returnedRow);
71178
}
@@ -88,7 +195,8 @@ public void testGetConfigRowFromYamlFile() throws URISyntaxException {
88195
.withFieldValue("extra_integer", 123)
89196
.build();
90197
Row configRow =
91-
ManagedSchemaTransformProvider.getRowConfig(config, TestSchemaTransformProvider.SCHEMA);
198+
ManagedSchemaTransformProvider.getRowConfig(
199+
config, TestSchemaTransformProvider.SCHEMA, PipelineOptionsFactory.create());
92200

93201
assertEquals(expectedRow, configRow);
94202
}

0 commit comments

Comments
 (0)