Skip to content

Commit 5dbec33

Browse files
author
Bingqin Zhou
committed
Remove includeKafkaKey and includeKafkaData.
1 parent 25ad35e commit 5dbec33

File tree

6 files changed

+58
-71
lines changed

6 files changed

+58
-71
lines changed

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkConnector.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import java.util.HashMap;
4848
import java.util.List;
4949
import java.util.Map;
50+
import java.util.Optional;
5051

5152
/**
5253
* A {@link SinkConnector} used to delegate BigQuery data writes to
@@ -102,12 +103,9 @@ private SchemaManager getSchemaManager(BigQuery bigQuery) {
102103
}
103104
SchemaRetriever schemaRetriever = config.getSchemaRetriever();
104105
SchemaConverter<com.google.cloud.bigquery.Schema> schemaConverter = config.getSchemaConverter();
105-
boolean includeKafkaKey = config.getBoolean(config.INCLUDE_KAFKA_KEY_CONFIG);
106-
boolean includeKafkaData = config.getBoolean(config.INCLUDE_KAFKA_DATA_CONFIG);
107-
String kafkaKeyFieldName = config.getString(config.KAFKA_KEY_FIELD_NAME_CONFIG);
108-
String kafkaDataFieldName = config.getString(config.KAFKA_DATA_FIELD_NAME_CONFIG);
109-
return new SchemaManager(schemaRetriever, schemaConverter, bigQuery,
110-
includeKafkaKey, includeKafkaData, kafkaKeyFieldName, kafkaDataFieldName);
106+
Optional<String> kafkaKeyFieldName = config.getKafkaKeyFieldName();
107+
Optional<String> kafkaDataFieldName = config.getKafkaDataFieldName();
108+
return new SchemaManager(schemaRetriever, schemaConverter, bigQuery, kafkaKeyFieldName, kafkaDataFieldName);
111109
}
112110

113111
private void ensureExistingTables(

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
import java.util.concurrent.LinkedBlockingQueue;
6666
import java.util.concurrent.ScheduledExecutorService;
6767
import java.util.concurrent.TimeUnit;
68+
import java.util.Optional;
6869

6970
/**
7071
* A {@link SinkTask} used to translate Kafka Connect {@link SinkRecord SinkRecords} into BigQuery
@@ -153,11 +154,13 @@ private PartitionedTableId getRecordTable(SinkRecord record) {
153154

154155
private RowToInsert getRecordRow(SinkRecord record) {
155156
Map<String, Object> convertedRecord = recordConverter.convertRecord(record, KafkaSchemaRecordType.VALUE);
156-
if (config.getBoolean(config.INCLUDE_KAFKA_KEY_CONFIG)) {
157-
convertedRecord.put(config.getString(config.KAFKA_KEY_FIELD_NAME_CONFIG), recordConverter.convertRecord(record, KafkaSchemaRecordType.KEY));
157+
Optional<String> kafkaKeyFieldName = config.getKafkaKeyFieldName();
158+
if (kafkaKeyFieldName.isPresent()) {
159+
convertedRecord.put(kafkaKeyFieldName.get(), recordConverter.convertRecord(record, KafkaSchemaRecordType.KEY));
158160
}
159-
if (config.getBoolean(config.INCLUDE_KAFKA_DATA_CONFIG)) {
160-
convertedRecord.put(config.getString(config.KAFKA_DATA_FIELD_NAME_CONFIG), KafkaDataBuilder.buildKafkaDataRecord(record));
161+
Optional<String> kafkaDataFieldName = config.getKafkaDataFieldName();
162+
if (kafkaDataFieldName.isPresent()) {
163+
convertedRecord.put(kafkaDataFieldName.get(), KafkaDataBuilder.buildKafkaDataRecord(record));
161164
}
162165
if (config.getBoolean(config.SANITIZE_FIELD_NAME_CONFIG)) {
163166
convertedRecord = FieldNameSanitizer.replaceInvalidKeys(convertedRecord);
@@ -248,12 +251,9 @@ private SchemaManager getSchemaManager(BigQuery bigQuery) {
248251
schemaRetriever = config.getSchemaRetriever();
249252
SchemaConverter<com.google.cloud.bigquery.Schema> schemaConverter =
250253
config.getSchemaConverter();
251-
boolean includeKafkaKey = config.getBoolean(config.INCLUDE_KAFKA_KEY_CONFIG);
252-
boolean includeKafkaData = config.getBoolean(config.INCLUDE_KAFKA_DATA_CONFIG);
253-
String kafkaKeyFieldName = config.getString(config.KAFKA_KEY_FIELD_NAME_CONFIG);
254-
String kafkaDataFieldName = config.getString(config.KAFKA_DATA_FIELD_NAME_CONFIG);
255-
return new SchemaManager(schemaRetriever, schemaConverter, bigQuery,
256-
includeKafkaKey, includeKafkaData, kafkaKeyFieldName, kafkaDataFieldName);
254+
Optional<String> kafkaKeyFieldName = config.getKafkaKeyFieldName();
255+
Optional<String> kafkaDataFieldName = config.getKafkaDataFieldName();
256+
return new SchemaManager(schemaRetriever, schemaConverter, bigQuery, kafkaKeyFieldName, kafkaDataFieldName);
257257
}
258258

259259
private BigQueryWriter getBigQueryWriter() {

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/SchemaManager.java

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
import java.util.ArrayList;
1515
import java.util.List;
16+
import java.util.Optional;
1617

1718
/**
1819
* Class for managing Schemas of BigQuery tables (creating and updating).
@@ -23,10 +24,8 @@ public class SchemaManager {
2324
private final SchemaRetriever schemaRetriever;
2425
private final SchemaConverter<com.google.cloud.bigquery.Schema> schemaConverter;
2526
private final BigQuery bigQuery;
26-
private final boolean includeKafkaKey;
27-
private final boolean includeKafkaData;
28-
private final String kafkaKeyFieldName;
29-
private final String kafkaDataFieldName;
27+
private final Optional<String> kafkaKeyFieldName;
28+
private final Optional<String> kafkaDataFieldName;
3029

3130
/**
3231
* @param schemaRetriever Used to determine the Kafka Connect Schema that should be used for a
@@ -38,15 +37,11 @@ public SchemaManager(
3837
SchemaRetriever schemaRetriever,
3938
SchemaConverter<com.google.cloud.bigquery.Schema> schemaConverter,
4039
BigQuery bigQuery,
41-
boolean includeKafkaKey,
42-
boolean includeKafkaData,
43-
String kafkaKeyFieldName,
44-
String kafkaDataFieldName) {
40+
Optional<String> kafkaKeyFieldName,
41+
Optional<String> kafkaDataFieldName) {
4542
this.schemaRetriever = schemaRetriever;
4643
this.schemaConverter = schemaConverter;
4744
this.bigQuery = bigQuery;
48-
this.includeKafkaKey = includeKafkaKey;
49-
this.includeKafkaData = includeKafkaData;
5045
this.kafkaKeyFieldName = kafkaKeyFieldName;
5146
this.kafkaDataFieldName = kafkaDataFieldName;
5247
}
@@ -58,7 +53,7 @@ public SchemaManager(
5853
*/
5954
public void createTable(TableId table, String topic) {
6055
Schema kafkaValueSchema = schemaRetriever.retrieveSchema(table, topic, KafkaSchemaRecordType.VALUE.toString());
61-
Schema kafkaKeySchema = includeKafkaKey ? schemaRetriever.retrieveSchema(table, topic, KafkaSchemaRecordType.KEY.toString()) : null;
56+
Schema kafkaKeySchema = kafkaKeyFieldName.isPresent() ? schemaRetriever.retrieveSchema(table, topic, KafkaSchemaRecordType.KEY.toString()) : null;
6257
bigQuery.create(constructTableInfo(table, kafkaKeySchema, kafkaValueSchema));
6358
}
6459

@@ -69,7 +64,7 @@ public void createTable(TableId table, String topic) {
6964
*/
7065
public void updateSchema(TableId table, String topic) {
7166
Schema kafkaValueSchema = schemaRetriever.retrieveSchema(table, topic, KafkaSchemaRecordType.VALUE.toString());
72-
Schema kafkaKeySchema = includeKafkaKey ? schemaRetriever.retrieveSchema(table, topic, KafkaSchemaRecordType.KEY.toString()) : null;
67+
Schema kafkaKeySchema = kafkaKeyFieldName.isPresent() ? schemaRetriever.retrieveSchema(table, topic, KafkaSchemaRecordType.KEY.toString()) : null;
7368
TableInfo tableInfo = constructTableInfo(table, kafkaKeySchema, kafkaValueSchema);
7469
logger.info("Attempting to update table `{}` with schema {}",
7570
table, tableInfo.getDefinition().getSchema());
@@ -95,14 +90,14 @@ private com.google.cloud.bigquery.Schema getBigQuerySchema(Schema kafkaKeySchema
9590
List<Field> allFields = new ArrayList<> ();
9691
com.google.cloud.bigquery.Schema valueSchema = schemaConverter.convertSchema(kafkaValueSchema);
9792
allFields.addAll(valueSchema.getFields());
98-
if (includeKafkaKey) {
93+
if (kafkaKeyFieldName.isPresent()) {
9994
com.google.cloud.bigquery.Schema keySchema = schemaConverter.convertSchema(kafkaKeySchema);
100-
Field kafkaKeyField = Field.newBuilder(kafkaKeyFieldName, LegacySQLTypeName.RECORD, keySchema.getFields())
95+
Field kafkaKeyField = Field.newBuilder(kafkaKeyFieldName.get(), LegacySQLTypeName.RECORD, keySchema.getFields())
10196
.setMode(Field.Mode.NULLABLE).build();
10297
allFields.add(kafkaKeyField);
10398
}
104-
if (includeKafkaData) {
105-
Field kafkaDataField = KafkaDataBuilder.buildKafkaDataField(kafkaDataFieldName);
99+
if (kafkaDataFieldName.isPresent()) {
100+
Field kafkaDataField = KafkaDataBuilder.buildKafkaDataField(kafkaDataFieldName.get());
106101
allFields.add(kafkaDataField);
107102
}
108103
return com.google.cloud.bigquery.Schema.of(allFields);

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkConfig.java

Lines changed: 24 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import java.util.Map;
4545
import java.util.regex.Matcher;
4646
import java.util.regex.Pattern;
47+
import java.util.Optional;
4748

4849
/**
4950
* Base class for connector and task configs; contains properties shared between the two of them.
@@ -166,34 +167,19 @@ public class BigQuerySinkConfig extends AbstractConfig {
166167
+ "front of field name. Note: field a.b and a_b will have same value after sanitizing, "
167168
+ "and might cause key duplication error.";
168169

169-
public static final String INCLUDE_KAFKA_DATA_CONFIG = "includeKafkaData";
170-
public static final ConfigDef.Type INCLUDE_KAFKA_DATA_TYPE = ConfigDef.Type.BOOLEAN;
171-
public static final Boolean INCLUDE_KAFKA_DATA_DEFAULT = false;
172-
public static final ConfigDef.Importance INCLUDE_KAFKA_DATA_IMPORTANCE =
173-
ConfigDef.Importance.LOW;
174-
public static final String INCLUDE_KAFKA_DATA_DOC =
175-
"Whether to include an extra block containing the Kafka source topic, offset, "
176-
+ "and partition information in the resulting BigQuery rows.";
177-
178-
public static final String INCLUDE_KAFKA_KEY_CONFIG = "includeKafkaKey";
179-
public static final ConfigDef.Type INCLUDE_KAFKA_KEY_TYPE = ConfigDef.Type.BOOLEAN;
180-
public static final Boolean INCLUDE_KAFKA_KEY_DEFAULT = false;
181-
public static final ConfigDef.Importance INCLUDE_KAFKA_KEY_IMPORTANCE =
182-
ConfigDef.Importance.LOW;
183-
public static final String INCLUDE_KAFKA_KEY_DOC =
184-
"Whether to include an extra block containing fields in Kafka key.";
185-
186170
public static final String KAFKA_KEY_FIELD_NAME_CONFIG = "kafkaKeyFieldName";
187171
private static final ConfigDef.Type KAFKA_KEY_FIELD_NAME_TYPE = ConfigDef.Type.STRING;
188-
public static final String KAFKA_KEY_FIELD_NAME_DEFAULT = "kafkaKey";
172+
public static final String KAFKA_KEY_FIELD_NAME_DEFAULT = null;
189173
private static final ConfigDef.Importance KAFKA_KEY_FIELD_NAME_IMPORTANCE = ConfigDef.Importance.LOW;
190-
private static final String KAFKA_KEY_FIELD_NAME_DOC = "The name of the field of Kafka key.";
174+
private static final String KAFKA_KEY_FIELD_NAME_DOC = "The name of the field of Kafka key. " +
175+
"Default to be null, which means Kafka Key Field will not be included.";
191176

192177
public static final String KAFKA_DATA_FIELD_NAME_CONFIG = "kafkaDataFieldName";
193178
private static final ConfigDef.Type KAFKA_DATA_FIELD_NAME_TYPE = ConfigDef.Type.STRING;
194-
public static final String KAFKA_DATA_FIELD_NAME_DEFAULT = "kafkaData";
179+
public static final String KAFKA_DATA_FIELD_NAME_DEFAULT = null;
195180
private static final ConfigDef.Importance KAFKA_DATA_FIELD_NAME_IMPORTANCE = ConfigDef.Importance.LOW;
196-
private static final String KAFKA_DATA_FIELD_NAME_DOC = "The name of the field of Kafka Data.";
181+
private static final String KAFKA_DATA_FIELD_NAME_DOC = "The name of the field of Kafka Data. " +
182+
"Default to be null, which means Kafka Data Field will not be included. ";
197183

198184
public static final String AVRO_DATA_CACHE_SIZE_CONFIG = "avroDataCacheSize";
199185
private static final ConfigDef.Type AVRO_DATA_CACHE_SIZE_TYPE = ConfigDef.Type.INT;
@@ -307,18 +293,6 @@ public class BigQuerySinkConfig extends AbstractConfig {
307293
SANITIZE_FIELD_NAME_DEFAULT,
308294
SANITIZE_FIELD_NAME_IMPORTANCE,
309295
SANITIZE_FIELD_NAME_DOC
310-
).define(
311-
INCLUDE_KAFKA_DATA_CONFIG,
312-
INCLUDE_KAFKA_DATA_TYPE,
313-
INCLUDE_KAFKA_DATA_DEFAULT,
314-
INCLUDE_KAFKA_DATA_IMPORTANCE,
315-
INCLUDE_KAFKA_DATA_DOC
316-
).define(
317-
INCLUDE_KAFKA_KEY_CONFIG,
318-
INCLUDE_KAFKA_KEY_TYPE,
319-
INCLUDE_KAFKA_KEY_DEFAULT,
320-
INCLUDE_KAFKA_KEY_IMPORTANCE,
321-
INCLUDE_KAFKA_KEY_DOC
322296
).define(
323297
KAFKA_KEY_FIELD_NAME_CONFIG,
324298
KAFKA_KEY_FIELD_NAME_TYPE,
@@ -598,6 +572,22 @@ public SchemaRetriever getSchemaRetriever() {
598572
return schemaRetriever;
599573
}
600574

575+
/**
576+
*
577+
* @return An optional class of KafkaKeyFieldName.
578+
*/
579+
public Optional<String> getKafkaKeyFieldName() {
580+
return Optional.ofNullable(getString(KAFKA_KEY_FIELD_NAME_CONFIG));
581+
}
582+
583+
/**
584+
*
585+
* @return An optional class of KafkaDataFieldName.
586+
*/
587+
public Optional<String> getKafkaDataFieldName() {
588+
return Optional.ofNullable(getString(KAFKA_DATA_FIELD_NAME_CONFIG));
589+
}
590+
601591
/**
602592
* Verifies that a bucket is specified if GCS batch loading is enabled.
603593
* @throws ConfigException Exception thrown if no bucket is specified and batch loading is on.
@@ -628,4 +618,5 @@ public BigQuerySinkConfig(Map<String, String> properties) {
628618
super(config, properties);
629619
verifyBucketSpecified();
630620
}
621+
631622
}

kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/SchemaManagerTest.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@
3535
import org.junit.Assert;
3636
import org.junit.Test;
3737

38+
import java.util.Optional;
39+
3840
public class SchemaManagerTest {
3941

4042
@Test
@@ -50,13 +52,14 @@ public void testBQTableDescription() {
5052
(SchemaConverter<com.google.cloud.bigquery.Schema>) mock(SchemaConverter.class);
5153
BigQuery mockBigQuery = mock(BigQuery.class);
5254

55+
Optional<String> kafkaKeyFieldName = Optional.of("kafkaKey");
56+
Optional<String> kafkaDataFieldName = Optional.of("kafkaData");
57+
5358
SchemaManager schemaManager = new SchemaManager(mockSchemaRetriever,
5459
mockSchemaConverter,
5560
mockBigQuery,
56-
true,
57-
true,
58-
"kafkaKey",
59-
"kafkaData");
61+
kafkaKeyFieldName,
62+
kafkaDataFieldName);
6063

6164
Schema mockKafkaSchema = mock(Schema.class);
6265
// we would prefer to mock this class, but it is final.

kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkConfigTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ public void testTopicsToDatasets() {
7878
@Test
7979
public void testGetSchemaConverter() {
8080
Map<String, String> configProperties = propertiesFactory.getProperties();
81-
configProperties.put(BigQuerySinkConfig.INCLUDE_KAFKA_DATA_CONFIG, "true");
81+
configProperties.put(BigQuerySinkConfig.KAFKA_DATA_FIELD_NAME_CONFIG, "kafkaData");
8282

8383
BigQuerySinkConfig testConfig = new BigQuerySinkConfig(configProperties);
8484

@@ -88,7 +88,7 @@ public void testGetSchemaConverter() {
8888
@Test
8989
public void testGetRecordConverter() {
9090
Map<String, String> configProperties = propertiesFactory.getProperties();
91-
configProperties.put(BigQuerySinkConfig.INCLUDE_KAFKA_DATA_CONFIG, "true");
91+
configProperties.put(BigQuerySinkConfig.KAFKA_DATA_FIELD_NAME_CONFIG, "kafkaData");
9292

9393
BigQuerySinkConfig testConfig = new BigQuerySinkConfig(configProperties);
9494

0 commit comments

Comments
 (0)