Skip to content

Commit d5b2389

Browse files
author
Bingqin Zhou
committed
Make Kafka data field and key name configurable.
1 parent 969e3a5 commit d5b2389

File tree

7 files changed

+55
-20
lines changed

7 files changed

+55
-20
lines changed

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkConnector.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,10 @@ private SchemaManager getSchemaManager(BigQuery bigQuery) {
104104
SchemaConverter<com.google.cloud.bigquery.Schema> schemaConverter = config.getSchemaConverter();
105105
boolean includeKafkaKey = config.getBoolean(config.INCLUDE_KAFKA_KEY_CONFIG);
106106
boolean includeKafkaData = config.getBoolean(config.INCLUDE_KAFKA_DATA_CONFIG);
107-
return new SchemaManager(schemaRetriever, schemaConverter, bigQuery, includeKafkaKey, includeKafkaData);
107+
String kafkaKeyFieldName = config.getString(config.KAFKA_KEY_FIELD_NAME_CONFIG);
108+
String kafkaDataFieldName = config.getString(config.KAFKA_DATA_FIELD_NAME_CONFIG);
109+
return new SchemaManager(schemaRetriever, schemaConverter, bigQuery,
110+
includeKafkaKey, includeKafkaData, kafkaKeyFieldName, kafkaDataFieldName);
108111
}
109112

110113
private void ensureExistingTables(

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
import com.wepay.kafka.connect.bigquery.api.SchemaRetriever;
2929
import com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig;
3030
import com.wepay.kafka.connect.bigquery.config.BigQuerySinkTaskConfig;
31-
import com.wepay.kafka.connect.bigquery.convert.KafkaDataConverter;
31+
import com.wepay.kafka.connect.bigquery.convert.KafkaDataBuilder;
3232
import com.wepay.kafka.connect.bigquery.convert.RecordConverter;
3333
import com.wepay.kafka.connect.bigquery.convert.SchemaConverter;
3434
import com.wepay.kafka.connect.bigquery.exception.SinkConfigConnectException;
@@ -153,10 +153,10 @@ private PartitionedTableId getRecordTable(SinkRecord record) {
153153
private RowToInsert getRecordRow(SinkRecord record) {
154154
Map<String, Object> convertedRecord = recordConverter.convertRecord(record, false);
155155
if (config.getBoolean(config.INCLUDE_KAFKA_KEY_CONFIG)) {
156-
convertedRecord.put(SchemaManager.KAFKA_KEY_FIELD_NAME, recordConverter.convertRecord(record, true));
156+
convertedRecord.put(config.KAFKA_KEY_FIELD_NAME_CONFIG, recordConverter.convertRecord(record, true));
157157
}
158158
if (config.getBoolean(config.INCLUDE_KAFKA_DATA_CONFIG)) {
159-
convertedRecord.put(KafkaDataConverter.KAFKA_DATA_FIELD_NAME, KafkaDataConverter.getKafkaDataRecord(record));
159+
convertedRecord.put(config.KAFKA_DATA_FIELD_NAME_CONFIG, KafkaDataBuilder.getKafkaDataRecord(record));
160160
}
161161
if (config.getBoolean(config.SANITIZE_FIELD_NAME_CONFIG)) {
162162
convertedRecord = FieldNameSanitizer.replaceInvalidKeys(convertedRecord);
@@ -249,7 +249,10 @@ private SchemaManager getSchemaManager(BigQuery bigQuery) {
249249
config.getSchemaConverter();
250250
boolean includeKafkaKey = config.getBoolean(config.INCLUDE_KAFKA_KEY_CONFIG);
251251
boolean includeKafkaData = config.getBoolean(config.INCLUDE_KAFKA_DATA_CONFIG);
252-
return new SchemaManager(schemaRetriever, schemaConverter, bigQuery, includeKafkaKey, includeKafkaData);
252+
String kafkaKeyFieldName = config.getString(config.KAFKA_KEY_FIELD_NAME_CONFIG);
253+
String kafkaDataFieldName = config.getString(config.KAFKA_DATA_FIELD_NAME_CONFIG);
254+
return new SchemaManager(schemaRetriever, schemaConverter, bigQuery,
255+
includeKafkaKey, includeKafkaData, kafkaKeyFieldName, kafkaDataFieldName);
253256
}
254257

255258
private BigQueryWriter getBigQueryWriter() {

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/SchemaManager.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
import com.google.cloud.bigquery.*;
55
import com.wepay.kafka.connect.bigquery.api.SchemaRetriever;
6-
import com.wepay.kafka.connect.bigquery.convert.KafkaDataConverter;
6+
import com.wepay.kafka.connect.bigquery.convert.KafkaDataBuilder;
77
import com.wepay.kafka.connect.bigquery.convert.SchemaConverter;
88

99
import org.apache.kafka.connect.data.Schema;
@@ -26,8 +26,8 @@ public class SchemaManager {
2626
private final BigQuery bigQuery;
2727
private final boolean includeKafkaKey;
2828
private final boolean includeKafkaData;
29-
30-
public static final String KAFKA_KEY_FIELD_NAME = "kafkaKey";
29+
private final String kafkaKeyFieldName;
30+
private final String kafkaDataFieldName;
3131

3232
/**
3333
* @param schemaRetriever Used to determine the Kafka Connect Schema that should be used for a
@@ -40,12 +40,16 @@ public SchemaManager(
4040
SchemaConverter<com.google.cloud.bigquery.Schema> schemaConverter,
4141
BigQuery bigQuery,
4242
boolean includeKafkaKey,
43-
boolean includeKafkaData) {
43+
boolean includeKafkaData,
44+
String kafkaKeyFieldName,
45+
String kafkaDataFieldName) {
4446
this.schemaRetriever = schemaRetriever;
4547
this.schemaConverter = schemaConverter;
4648
this.bigQuery = bigQuery;
4749
this.includeKafkaKey = includeKafkaKey;
4850
this.includeKafkaData = includeKafkaData;
51+
this.kafkaKeyFieldName = kafkaKeyFieldName;
52+
this.kafkaDataFieldName = kafkaDataFieldName;
4953
}
5054

5155
/**
@@ -94,12 +98,12 @@ private com.google.cloud.bigquery.Schema getBigQuerySchema(Schema kafkaKeySchema
9498
allFields.addAll(valueSchema.getFields());
9599
if (includeKafkaKey) {
96100
com.google.cloud.bigquery.Schema keySchema = schemaConverter.convertSchema(kafkaKeySchema);
97-
Field kafkaKeyField = Field.newBuilder(KAFKA_KEY_FIELD_NAME, LegacySQLTypeName.RECORD, keySchema.getFields())
101+
Field kafkaKeyField = Field.newBuilder(kafkaKeyFieldName, LegacySQLTypeName.RECORD, keySchema.getFields())
98102
.setMode(Field.Mode.NULLABLE).build();
99103
allFields.add(kafkaKeyField);
100104
}
101105
if (includeKafkaData) {
102-
Field kafkaDataField = KafkaDataConverter.getKafkaDataField();
106+
Field kafkaDataField = KafkaDataBuilder.getKafkaDataField(kafkaDataFieldName);
103107
allFields.add(kafkaDataField);
104108
}
105109
return com.google.cloud.bigquery.Schema.of(allFields);

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkConfig.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,18 @@ public class BigQuerySinkConfig extends AbstractConfig {
183183
public static final String INCLUDE_KAFKA_KEY_DOC =
184184
"Whether to include an extra block containing fields in Kafka key.";
185185

186+
public static final String KAFKA_KEY_FIELD_NAME_CONFIG = "kafkaKeyFieldName";
187+
private static final ConfigDef.Type KAFKA_KEY_FIELD_NAME_TYPE = ConfigDef.Type.STRING;
188+
public static final String KAFKA_KEY_FIELD_NAME_DEFAULT = "kafkaKey";
189+
private static final ConfigDef.Importance KAFKA_KEY_FIELD_NAME_IMPORTANCE = ConfigDef.Importance.LOW;
190+
private static final String KAFKA_KEY_FIELD_NAME_DOC = "The name of the field of Kafka key.";
191+
192+
public static final String KAFKA_DATA_FIELD_NAME_CONFIG = "kafkaDataFieldName";
193+
private static final ConfigDef.Type KAFKA_DATA_FIELD_NAME_TYPE = ConfigDef.Type.STRING;
194+
public static final String KAFKA_DATA_FIELD_NAME_DEFAULT = "kafkaData";
195+
private static final ConfigDef.Importance KAFKA_DATA_FIELD_NAME_IMPORTANCE = ConfigDef.Importance.LOW;
196+
private static final String KAFKA_DATA_FIELD_NAME_DOC = "The name of the field of Kafka Data.";
197+
186198
public static final String AVRO_DATA_CACHE_SIZE_CONFIG = "avroDataCacheSize";
187199
private static final ConfigDef.Type AVRO_DATA_CACHE_SIZE_TYPE = ConfigDef.Type.INT;
188200
public static final Integer AVRO_DATA_CACHE_SIZE_DEFAULT = 100;
@@ -307,6 +319,18 @@ public class BigQuerySinkConfig extends AbstractConfig {
307319
INCLUDE_KAFKA_KEY_DEFAULT,
308320
INCLUDE_KAFKA_KEY_IMPORTANCE,
309321
INCLUDE_KAFKA_KEY_DOC
322+
).define(
323+
KAFKA_KEY_FIELD_NAME_CONFIG,
324+
KAFKA_KEY_FIELD_NAME_TYPE,
325+
KAFKA_KEY_FIELD_NAME_DEFAULT,
326+
KAFKA_KEY_FIELD_NAME_IMPORTANCE,
327+
KAFKA_KEY_FIELD_NAME_DOC
328+
).define(
329+
KAFKA_DATA_FIELD_NAME_CONFIG,
330+
KAFKA_DATA_FIELD_NAME_TYPE,
331+
KAFKA_DATA_FIELD_NAME_DEFAULT,
332+
KAFKA_DATA_FIELD_NAME_IMPORTANCE,
333+
KAFKA_DATA_FIELD_NAME_DOC
310334
).define(
311335
AVRO_DATA_CACHE_SIZE_CONFIG,
312336
AVRO_DATA_CACHE_SIZE_TYPE,
Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,14 @@
1111
/**
1212
* Helper class to construct kafka data schema and kafka data record.
1313
*/
14-
public class KafkaDataConverter {
14+
public class KafkaDataBuilder {
1515

16-
public static final String KAFKA_DATA_FIELD_NAME = "kafkaData";
1716
public static final String KAFKA_DATA_TOPIC_FIELD_NAME = "topic";
1817
public static final String KAFKA_DATA_PARTITION_FIELD_NAME = "partition";
1918
public static final String KAFKA_DATA_OFFSET_FIELD_NAME = "offset";
2019
public static final String KAFKA_DATA_INSERT_TIME_FIELD_NAME = "insertTime";
2120

22-
public static Field getKafkaDataField() {
21+
public static Field getKafkaDataField(String kafkaDataFieldName) {
2322
Field topicField = com.google.cloud.bigquery.Field.of(KAFKA_DATA_TOPIC_FIELD_NAME, LegacySQLTypeName.STRING);
2423
Field partitionField = com.google.cloud.bigquery.Field.of(KAFKA_DATA_PARTITION_FIELD_NAME, LegacySQLTypeName.INTEGER);
2524
Field offsetField = com.google.cloud.bigquery.Field.of(KAFKA_DATA_OFFSET_FIELD_NAME, LegacySQLTypeName.INTEGER);
@@ -28,7 +27,7 @@ public static Field getKafkaDataField() {
2827
.setMode(com.google.cloud.bigquery.Field.Mode.NULLABLE);
2928
Field insertTimeField = insertTimeBuilder.build();
3029

31-
return Field.newBuilder(KAFKA_DATA_FIELD_NAME, LegacySQLTypeName.RECORD,
30+
return Field.newBuilder(kafkaDataFieldName, LegacySQLTypeName.RECORD,
3231
topicField, partitionField, offsetField, insertTimeField)
3332
.setMode(com.google.cloud.bigquery.Field.Mode.NULLABLE).build();
3433
}

kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/SchemaManagerTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,9 @@ public void testBQTableDescription() {
5454
mockSchemaConverter,
5555
mockBigQuery,
5656
true,
57-
true);
57+
true,
58+
"kafkaKey",
59+
"kafkaData");
5860

5961
Schema mockKafkaSchema = mock(Schema.class);
6062
// we would prefer to mock this class, but it is final.

kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/convert/KafkaDataConverterTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
public class KafkaDataConverterTest {
1616

17-
public static final String KAFKA_DATA_FIELD_NAME = "kafkaData";
17+
public static final String kafkaDataFieldName = "kafkaData";
1818
private static final String kafkaDataTopicName = "topic";
1919
private static final String kafkaDataPartitionName = "partition";
2020
private static final String kafkaDataOffsetName = "offset";
@@ -33,7 +33,7 @@ public void testGetKafkaDataRecord() {
3333
expectedKafkaDataFields.put(kafkaDataOffsetName, kafkaDataOffsetValue);
3434

3535
SinkRecord record = new SinkRecord(kafkaDataTopicValue, kafkaDataPartitionValue, null, null, null, null, kafkaDataOffsetValue);
36-
Map<String, Object> actualKafkaDataFields = KafkaDataConverter.getKafkaDataRecord(record);
36+
Map<String, Object> actualKafkaDataFields = KafkaDataBuilder.getKafkaDataRecord(record);
3737

3838
assertTrue(actualKafkaDataFields.containsKey(kafkaDataInsertTimeName));
3939
assertTrue(actualKafkaDataFields.get(kafkaDataInsertTimeName) instanceof Double);
@@ -52,15 +52,15 @@ public void testGetKafkaDataField() {
5252
.setMode(Field.Mode.NULLABLE)
5353
.build();
5454

55-
Field expectedBigQuerySchema = Field.newBuilder(KAFKA_DATA_FIELD_NAME,
55+
Field expectedBigQuerySchema = Field.newBuilder(kafkaDataFieldName,
5656
LegacySQLTypeName.RECORD,
5757
topicField,
5858
partitionField,
5959
offsetField,
6060
insertTimeField)
6161
.setMode(Field.Mode.NULLABLE)
6262
.build();
63-
Field actualBigQuerySchema = KafkaDataConverter.getKafkaDataField();
63+
Field actualBigQuerySchema = KafkaDataBuilder.getKafkaDataField(kafkaDataFieldName);
6464
assertEquals(expectedBigQuerySchema, actualBigQuerySchema);
6565
}
6666
}

0 commit comments

Comments
 (0)