Skip to content

Commit 535d2cf

Browse files
authored
Fail with appropriate message instead of aborting with NPE when field specified in schema.keyfield doesn't exist in the schema (#83)
1 parent e195e3c commit 535d2cf

File tree

3 files changed

+19
-1
lines changed

3 files changed

+19
-1
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ For an example of using the the Protobuf converter with kafka-connect-datagen, s
141141
## Use a bundled schema specification
142142

143143
There are a few quickstart schema specifications bundled with `kafka-connect-datagen`, and they are listed in this [directory](https://github.com/confluentinc/kafka-connect-datagen/tree/master/src/main/resources).
144-
To use one of these bundled schema, refer to [this mapping](https://github.com/confluentinc/kafka-connect-datagen/blob/master/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java#L66-L73) and in the configuration file, set the parameter `quickstart` to the associated name.
144+
To use one of these bundled schema, refer to [this mapping](https://github.com/confluentinc/kafka-connect-datagen/blob/master/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java#L75-L86) and in the configuration file, set the parameter `quickstart` to the associated name.
145145
For example:
146146

147147
```bash

src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import io.confluent.connect.avro.AvroData;
3232
import org.apache.avro.generic.GenericData.Record;
3333
import org.apache.avro.generic.GenericRecord;
34+
import org.apache.kafka.common.config.ConfigException;
3435
import org.apache.kafka.connect.data.SchemaAndValue;
3536
import org.apache.kafka.connect.errors.ConnectException;
3637
import org.apache.kafka.connect.header.ConnectHeaders;
@@ -182,6 +183,12 @@ public void start(Map<String, String> props) {
182183
}
183184

184185
avroSchema = generator.schema();
186+
187+
if (!schemaKeyField.isEmpty() && avroSchema.getField(schemaKeyField) == null) {
188+
throw new ConfigException(DatagenConnectorConfig.SCHEMA_KEYFIELD_CONF, schemaKeyField,
189+
"Schema does not contain the specified field");
190+
}
191+
185192
avroData = new AvroData(1);
186193
ksqlSchema = avroData.toConnectSchema(avroSchema);
187194
}

src/test/java/io/confluent/kafka/connect/datagen/DatagenTaskTest.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535

3636
import java.util.function.Function;
3737
import java.util.stream.Collectors;
38+
import org.apache.kafka.common.config.ConfigException;
3839
import org.apache.kafka.connect.data.Field;
3940
import org.apache.kafka.connect.data.Schema;
4041
import org.apache.kafka.connect.data.Struct;
@@ -125,6 +126,8 @@ public void shouldGenerateFilesForPageviewsQuickstart() throws Exception {
125126
public void shouldGenerateFilesForStockTradesQuickstart() throws Exception {
126127
generateAndValidateRecordsFor(DatagenTask.Quickstart.STOCK_TRADES);
127128
}
129+
130+
@Test
128131
public void shouldGenerateFilesForProductQuickstart() throws Exception {
129132
generateAndValidateRecordsFor(Quickstart.PRODUCT);
130133
}
@@ -201,6 +204,14 @@ public void shouldFailToGenerateMoreRecordsThanSpecified() throws Exception {
201204
}
202205
}
203206

207+
@Test(expected = ConfigException.class)
208+
public void shouldFailIfSchemaKeyFieldNotPresentInSchema() throws Exception {
209+
config.put(DatagenConnectorConfig.QUICKSTART_CONF, DatagenTask.Quickstart.USERS.name());
210+
config.put(DatagenConnectorConfig.SCHEMA_KEYFIELD_CONF, "key_does_not_exist");
211+
createTask();
212+
generateRecords();
213+
}
214+
204215
private void generateAndValidateRecordsFor(DatagenTask.Quickstart quickstart) throws Exception {
205216
createTaskWith(quickstart);
206217
generateRecords();

0 commit comments

Comments
 (0)