Skip to content

Commit daea24a

Browse files
authored
Add validations for quickstart, schema.string, schema.filename (#84)
1 parent 0116fab commit daea24a

File tree

8 files changed

+369
-111
lines changed

8 files changed

+369
-111
lines changed

pom.xml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@
3636
<guava.version>29.0-jre</guava.version>
3737
<avro.version>1.8.1</avro.version>
3838
<maven.release.plugin.version>2.5.3</maven.release.plugin.version>
39+
<!-- temporary fix by pinning the version until we upgrade to a version of common that contains this or newer version.
40+
See https://github.com/confluentinc/common/pull/332 for details -->
41+
<dependency.check.version>6.1.6</dependency.check.version>
3942
</properties>
4043

4144
<name>kafka-connect-datagen</name>
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/**
2+
* Copyright 2018 Confluent Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
**/
16+
17+
package io.confluent.kafka.connect.datagen;
18+
19+
import java.io.FileInputStream;
20+
import java.io.FileNotFoundException;
21+
import java.io.IOException;
22+
import java.io.InputStream;
23+
import org.apache.avro.Schema;
24+
import org.apache.avro.Schema.Parser;
25+
import org.apache.avro.SchemaParseException;
26+
import org.apache.kafka.common.config.ConfigException;
27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
29+
30+
public class ConfigUtils {
31+
private static final Logger log = LoggerFactory.getLogger(ConfigUtils.class);
32+
33+
public static Schema getSchemaFromSchemaString(String schemaString) {
34+
Schema.Parser schemaParser = new Parser();
35+
Schema schema;
36+
try {
37+
schema = schemaParser.parse(schemaString);
38+
} catch (SchemaParseException e) {
39+
log.error("Unable to parse the provided schema", e);
40+
throw new ConfigException("Unable to parse the provided schema");
41+
}
42+
return schema;
43+
}
44+
45+
public static Schema getSchemaFromSchemaFileName(String schemaFileName) {
46+
Schema.Parser schemaParser = new Parser();
47+
Schema schema;
48+
try (InputStream stream = new FileInputStream(schemaFileName)) {
49+
schema = schemaParser.parse(stream);
50+
} catch (FileNotFoundException e) {
51+
try {
52+
if (DatagenTask.class.getClassLoader()
53+
.getResource(schemaFileName) == null) {
54+
throw new ConfigException("Unable to find the schema file");
55+
}
56+
schema = schemaParser.parse(
57+
DatagenTask.class.getClassLoader().getResourceAsStream(schemaFileName)
58+
);
59+
} catch (SchemaParseException | IOException i) {
60+
log.error("Unable to parse the provided schema", i);
61+
throw new ConfigException("Unable to parse the provided schema");
62+
}
63+
} catch (SchemaParseException | IOException e) {
64+
log.error("Unable to parse the provided schema", e);
65+
throw new ConfigException("Unable to parse the provided schema");
66+
}
67+
return schema;
68+
}
69+
}

src/main/java/io/confluent/kafka/connect/datagen/DatagenConnector.java

Lines changed: 43 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,12 @@
1616

1717
package io.confluent.kafka.connect.datagen;
1818

19-
import com.google.common.annotations.VisibleForTesting;
2019
import java.util.ArrayList;
2120
import java.util.HashMap;
2221
import java.util.List;
2322
import java.util.Map;
24-
2523
import java.util.stream.Collectors;
24+
import org.apache.avro.Schema;
2625
import org.apache.kafka.common.config.Config;
2726
import org.apache.kafka.common.config.ConfigDef;
2827
import org.apache.kafka.common.config.ConfigException;
@@ -38,10 +37,6 @@ public class DatagenConnector extends SourceConnector {
3837
private DatagenConnectorConfig config;
3938
private Map<String, String> props;
4039

41-
@VisibleForTesting
42-
static final String SCHEMA_SOURCE_ERR =
43-
"Must set exactly one of " + String.join(", ", DatagenConnectorConfig.schemaSourceKeys());
44-
4540
@Override
4641
public String version() {
4742
return VersionUtil.getVersion();
@@ -88,23 +83,61 @@ public ConfigDef config() {
8883
@Override
8984
public Config validate(Map<String, String> connectorConfigs) {
9085
Config config = super.validate(connectorConfigs);
91-
validateSchemaSource(config);
86+
boolean isSingleSchemaSource = validateSchemaSource(config);
87+
88+
// skip further validations if any single config validations have failed
89+
try {
90+
this.config = new DatagenConnectorConfig(connectorConfigs);
91+
} catch (ConfigException e) {
92+
return config;
93+
}
94+
95+
if (isSingleSchemaSource) {
96+
validateSchemaKeyField(config, this.config.getSchema());
97+
}
9298
return config;
9399
}
94100

95-
private void validateSchemaSource(Config config) {
101+
private boolean validateSchemaSource(Config config) {
96102
List<ConfigValue> schemaSources = config.configValues().stream()
97103
.filter(v -> DatagenConnectorConfig.isExplicitlySetSchemaSource(v.name(), v.value()))
98104
.collect(Collectors.toList());
105+
String schemaSourceError = "Must set exactly one of "
106+
+ String.join(", ", DatagenConnectorConfig.schemaSourceKeys());
99107
if (schemaSources.size() > 1) {
100108
for (ConfigValue v : schemaSources) {
101-
v.addErrorMessage(SCHEMA_SOURCE_ERR);
109+
v.addErrorMessage(schemaSourceError);
102110
}
111+
return false;
103112
}
104113
if (schemaSources.size() == 0) {
105114
config.configValues().stream()
106115
.filter(v -> DatagenConnectorConfig.schemaSourceKeys().contains(v.name()))
107-
.forEach(v -> v.addErrorMessage(SCHEMA_SOURCE_ERR));
116+
.forEach(v -> v.addErrorMessage(schemaSourceError));
117+
return false;
108118
}
119+
return true;
120+
}
121+
122+
private void validateSchemaKeyField(Config config, Schema schema) {
123+
ConfigValue schemaKeyField = getConfigValue(
124+
config,
125+
DatagenConnectorConfig.SCHEMA_KEYFIELD_CONF
126+
);
127+
128+
if (schemaKeyField != null && !schemaKeyField.value().equals("")) {
129+
if (schema.getField((String) schemaKeyField.value()) == null) {
130+
schemaKeyField.addErrorMessage(
131+
"The schema does not contain the field provided in '"
132+
+ DatagenConnectorConfig.SCHEMA_KEYFIELD_CONF + "'"
133+
);
134+
}
135+
}
136+
}
137+
138+
private ConfigValue getConfigValue(Config config, String configName) {
139+
return config.configValues().stream()
140+
.filter(value -> value.name().equals(configName))
141+
.findFirst().orElse(null);
109142
}
110143
}

src/main/java/io/confluent/kafka/connect/datagen/DatagenConnectorConfig.java

Lines changed: 94 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,16 @@
1717
package io.confluent.kafka.connect.datagen;
1818

1919
import com.google.common.collect.ImmutableList;
20+
import io.confluent.kafka.connect.datagen.DatagenTask.Quickstart;
2021
import java.util.List;
2122
import java.util.Map;
22-
23+
import org.apache.avro.Schema;
2324
import org.apache.kafka.common.config.AbstractConfig;
2425
import org.apache.kafka.common.config.ConfigDef;
25-
import org.apache.kafka.common.config.ConfigDef.Type;
2626
import org.apache.kafka.common.config.ConfigDef.Importance;
27+
import org.apache.kafka.common.config.ConfigDef.Type;
28+
import org.apache.kafka.common.config.ConfigDef.Validator;
29+
import org.apache.kafka.common.config.ConfigException;
2730

2831
public class DatagenConnectorConfig extends AbstractConfig {
2932

@@ -60,10 +63,33 @@ public static ConfigDef conf() {
6063
.define(KAFKA_TOPIC_CONF, Type.STRING, Importance.HIGH, KAFKA_TOPIC_DOC)
6164
.define(MAXINTERVAL_CONF, Type.LONG, 500L, Importance.HIGH, MAXINTERVAL_DOC)
6265
.define(ITERATIONS_CONF, Type.INT, -1, Importance.HIGH, ITERATIONS_DOC)
63-
.define(SCHEMA_STRING_CONF, Type.STRING, "", Importance.HIGH, SCHEMA_STRING_DOC)
64-
.define(SCHEMA_FILENAME_CONF, Type.STRING, "", Importance.HIGH, SCHEMA_FILENAME_DOC)
65-
.define(SCHEMA_KEYFIELD_CONF, Type.STRING, "", Importance.HIGH, SCHEMA_KEYFIELD_DOC)
66-
.define(QUICKSTART_CONF, Type.STRING, "", Importance.HIGH, QUICKSTART_DOC)
66+
.define(SCHEMA_STRING_CONF,
67+
Type.STRING,
68+
"",
69+
new SchemaStringValidator(),
70+
Importance.HIGH,
71+
SCHEMA_STRING_DOC
72+
)
73+
.define(SCHEMA_FILENAME_CONF,
74+
Type.STRING,
75+
"",
76+
new SchemaFileValidator(),
77+
Importance.HIGH,
78+
SCHEMA_FILENAME_DOC
79+
)
80+
.define(SCHEMA_KEYFIELD_CONF,
81+
Type.STRING,
82+
"",
83+
Importance.HIGH,
84+
SCHEMA_KEYFIELD_DOC
85+
)
86+
.define(QUICKSTART_CONF,
87+
Type.STRING,
88+
"",
89+
new QuickstartValidator(),
90+
Importance.HIGH,
91+
QUICKSTART_DOC
92+
)
6793
.define(RANDOM_SEED_CONF, Type.LONG, null, Importance.LOW, RANDOM_SEED_DOC);
6894
}
6995

@@ -84,6 +110,12 @@ public String getSchemaFilename() {
84110
}
85111

86112
public String getSchemaKeyfield() {
113+
if (this.getString(SCHEMA_KEYFIELD_CONF).isEmpty()) {
114+
String quickstart = this.getString(QUICKSTART_CONF);
115+
if (!quickstart.isEmpty()) {
116+
return Quickstart.valueOf(quickstart.toUpperCase()).getSchemaKeyField();
117+
}
118+
}
87119
return this.getString(SCHEMA_KEYFIELD_CONF);
88120
}
89121

@@ -99,12 +131,68 @@ public String getSchemaString() {
99131
return this.getString(SCHEMA_STRING_CONF);
100132
}
101133

134+
public Schema getSchema() {
135+
String quickstart = getQuickstart();
136+
if (quickstart != null && !quickstart.isEmpty()) {
137+
String schemaFilename = Quickstart.valueOf(quickstart.toUpperCase()).getSchemaFilename();
138+
return ConfigUtils.getSchemaFromSchemaFileName(schemaFilename);
139+
}
140+
String schemaString = getSchemaString();
141+
if (schemaString != null && !schemaString.isEmpty()) {
142+
return ConfigUtils.getSchemaFromSchemaString(schemaString);
143+
}
144+
String schemaFileName = getSchemaFilename();
145+
if (schemaFileName != null && !schemaFileName.isEmpty()) {
146+
return ConfigUtils.getSchemaFromSchemaFileName(schemaFileName);
147+
}
148+
return null;
149+
}
150+
102151
public static List<String> schemaSourceKeys() {
103152
return ImmutableList.of(SCHEMA_STRING_CONF, SCHEMA_FILENAME_CONF, QUICKSTART_CONF);
104153
}
105154

106155
public static boolean isExplicitlySetSchemaSource(String key, Object value) {
107156
return schemaSourceKeys().contains(key) && !("".equals(value));
108157
}
158+
159+
private static class QuickstartValidator implements Validator {
160+
161+
@Override
162+
public void ensureValid(String name, Object value) {
163+
if (((String) value).isEmpty()) {
164+
return;
165+
}
166+
if (!Quickstart.configValues.contains(((String) value).toLowerCase())) {
167+
throw new ConfigException(String.format(
168+
"%s must be one out of %s",
169+
name,
170+
String.join(",", DatagenTask.Quickstart.configValues)
171+
));
172+
}
173+
}
174+
}
175+
176+
private static class SchemaStringValidator implements Validator {
177+
178+
@Override
179+
public void ensureValid(String name, Object value) {
180+
if (((String) value).isEmpty()) {
181+
return;
182+
}
183+
ConfigUtils.getSchemaFromSchemaString((String) value);
184+
}
185+
}
186+
187+
private static class SchemaFileValidator implements Validator {
188+
189+
@Override
190+
public void ensureValid(String name, Object value) {
191+
if (((String) value).isEmpty()) {
192+
return;
193+
}
194+
ConfigUtils.getSchemaFromSchemaFileName((String) value);
195+
}
196+
}
109197
}
110198

0 commit comments

Comments
 (0)