Skip to content

Commit ae67984

Browse files
author
Bingqin Zhou
authored
Merge pull request #218 from wepay/include_kafka_key
Enable KCBQ to consume and stream Kafka Key into BigQuery.
2 parents 6270dd8 + 8adaca0 commit ae67984

File tree

23 files changed

+373
-455
lines changed

23 files changed

+373
-455
lines changed
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package com.wepay.kafka.connect.bigquery.api;
2+
3+
4+
/**
5+
* Enum class for Kafka schema or record type, either value or key.
6+
*/
7+
public enum KafkaSchemaRecordType {
8+
9+
VALUE("value"),
10+
KEY("key");
11+
12+
private final String str;
13+
14+
KafkaSchemaRecordType(String str) {
15+
this.str = str;
16+
}
17+
18+
public String toString() {
19+
return this.str;
20+
}
21+
}

kcbq-api/src/main/java/com/wepay/kafka/connect/bigquery/api/SchemaRetriever.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,10 @@ public interface SchemaRetriever {
2222
* Retrieve the most current schema for the given topic.
2323
* @param table The table that will be created.
2424
* @param topic The topic to retrieve a schema for.
25+
* @param schemaType The type of kafka schema, either "value" or "key".
2526
* @return The Schema for the given table.
2627
*/
27-
public Schema retrieveSchema(TableId table, String topic);
28+
public Schema retrieveSchema(TableId table, String topic, KafkaSchemaRecordType schemaType);
2829

2930
/**
3031
* Set the last seen schema for a given topic

kcbq-confluent/src/main/java/com/wepay/kafka/connect/bigquery/schemaregistry/schemaretriever/SchemaRegistrySchemaRetriever.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.google.cloud.bigquery.TableId;
44

5+
import com.wepay.kafka.connect.bigquery.api.KafkaSchemaRecordType;
56
import com.wepay.kafka.connect.bigquery.api.SchemaRetriever;
67

78
import io.confluent.connect.avro.AvroData;
@@ -61,8 +62,8 @@ public void configure(Map<String, String> properties) {
6162
}
6263

6364
@Override
64-
public Schema retrieveSchema(TableId table, String topic) {
65-
String subject = getSubject(topic);
65+
public Schema retrieveSchema(TableId table, String topic, KafkaSchemaRecordType schemaType) {
66+
String subject = getSubject(topic, schemaType);
6667
try {
6768
logger.debug("Retrieving schema information for topic {} with subject {}", topic, subject);
6869
SchemaMetadata latestSchemaMetadata = schemaRegistryClient.getLatestSchemaMetadata(subject);
@@ -80,7 +81,7 @@ public Schema retrieveSchema(TableId table, String topic) {
8081
@Override
8182
public void setLastSeenSchema(TableId table, String topic, Schema schema) { }
8283

83-
private String getSubject(String topic) {
84-
return topic + "-value";
84+
private String getSubject(String topic, KafkaSchemaRecordType schemaType) {
85+
return topic + "-" + schemaType.toString();
8586
}
8687
}

kcbq-confluent/src/test/java/com/wepay/kafka/connect/bigquery/schemaregistry/schemaretriever/SchemaRegistrySchemaRetrieverTest.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import com.google.cloud.bigquery.TableId;
99

10+
import com.wepay.kafka.connect.bigquery.api.KafkaSchemaRecordType;
1011
import io.confluent.connect.avro.AvroData;
1112

1213
import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
@@ -22,15 +23,17 @@ public class SchemaRegistrySchemaRetrieverTest {
2223
public void testRetrieveSchema() throws Exception {
2324
final TableId table = TableId.of("test", "kafka_topic");
2425
final String testTopic = "kafka-topic";
25-
final String testSubject = "kafka-topic-value";
26+
final String testSubjectValue = "kafka-topic-value";
27+
final String testSubjectKey = "kafka-topic-key";
2628
final String testAvroSchemaString =
2729
"{\"type\": \"record\", "
2830
+ "\"name\": \"testrecord\", "
2931
+ "\"fields\": [{\"name\": \"f1\", \"type\": \"string\"}]}";
3032
final SchemaMetadata testSchemaMetadata = new SchemaMetadata(1, 1, testAvroSchemaString);
3133

3234
SchemaRegistryClient schemaRegistryClient = mock(SchemaRegistryClient.class);
33-
when(schemaRegistryClient.getLatestSchemaMetadata(testSubject)).thenReturn(testSchemaMetadata);
35+
when(schemaRegistryClient.getLatestSchemaMetadata(testSubjectValue)).thenReturn(testSchemaMetadata);
36+
when(schemaRegistryClient.getLatestSchemaMetadata(testSubjectKey)).thenReturn(testSchemaMetadata);
3437

3538
SchemaRegistrySchemaRetriever testSchemaRetriever = new SchemaRegistrySchemaRetriever(
3639
schemaRegistryClient,
@@ -40,6 +43,7 @@ public void testRetrieveSchema() throws Exception {
4043
Schema expectedKafkaConnectSchema =
4144
SchemaBuilder.struct().field("f1", Schema.STRING_SCHEMA).name("testrecord").build();
4245

43-
assertEquals(expectedKafkaConnectSchema, testSchemaRetriever.retrieveSchema(table, testTopic));
46+
assertEquals(expectedKafkaConnectSchema, testSchemaRetriever.retrieveSchema(table, testTopic, KafkaSchemaRecordType.VALUE));
47+
assertEquals(expectedKafkaConnectSchema, testSchemaRetriever.retrieveSchema(table, testTopic, KafkaSchemaRecordType.KEY));
4448
}
4549
}

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkConnector.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import java.util.HashMap;
4848
import java.util.List;
4949
import java.util.Map;
50+
import java.util.Optional;
5051

5152
/**
5253
* A {@link SinkConnector} used to delegate BigQuery data writes to
@@ -102,7 +103,9 @@ private SchemaManager getSchemaManager(BigQuery bigQuery) {
102103
}
103104
SchemaRetriever schemaRetriever = config.getSchemaRetriever();
104105
SchemaConverter<com.google.cloud.bigquery.Schema> schemaConverter = config.getSchemaConverter();
105-
return new SchemaManager(schemaRetriever, schemaConverter, bigQuery);
106+
Optional<String> kafkaKeyFieldName = config.getKafkaKeyFieldName();
107+
Optional<String> kafkaDataFieldName = config.getKafkaDataFieldName();
108+
return new SchemaManager(schemaRetriever, schemaConverter, bigQuery, kafkaKeyFieldName, kafkaDataFieldName);
106109
}
107110

108111
private void ensureExistingTables(

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,11 @@
2525
import com.google.cloud.storage.BucketInfo;
2626
import com.google.cloud.storage.Storage;
2727
import com.google.common.annotations.VisibleForTesting;
28+
import com.wepay.kafka.connect.bigquery.api.KafkaSchemaRecordType;
2829
import com.wepay.kafka.connect.bigquery.api.SchemaRetriever;
2930
import com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig;
3031
import com.wepay.kafka.connect.bigquery.config.BigQuerySinkTaskConfig;
32+
import com.wepay.kafka.connect.bigquery.convert.KafkaDataBuilder;
3133
import com.wepay.kafka.connect.bigquery.convert.RecordConverter;
3234
import com.wepay.kafka.connect.bigquery.convert.SchemaConverter;
3335
import com.wepay.kafka.connect.bigquery.exception.SinkConfigConnectException;
@@ -63,6 +65,7 @@
6365
import java.util.concurrent.LinkedBlockingQueue;
6466
import java.util.concurrent.ScheduledExecutorService;
6567
import java.util.concurrent.TimeUnit;
68+
import java.util.Optional;
6669

6770
/**
6871
* A {@link SinkTask} used to translate Kafka Connect {@link SinkRecord SinkRecords} into BigQuery
@@ -150,11 +153,18 @@ private PartitionedTableId getRecordTable(SinkRecord record) {
150153
}
151154

152155
private RowToInsert getRecordRow(SinkRecord record) {
153-
Map<String,Object> convertedRecord = recordConverter.convertRecord(record);
156+
Map<String, Object> convertedRecord = recordConverter.convertRecord(record, KafkaSchemaRecordType.VALUE);
157+
Optional<String> kafkaKeyFieldName = config.getKafkaKeyFieldName();
158+
if (kafkaKeyFieldName.isPresent()) {
159+
convertedRecord.put(kafkaKeyFieldName.get(), recordConverter.convertRecord(record, KafkaSchemaRecordType.KEY));
160+
}
161+
Optional<String> kafkaDataFieldName = config.getKafkaDataFieldName();
162+
if (kafkaDataFieldName.isPresent()) {
163+
convertedRecord.put(kafkaDataFieldName.get(), KafkaDataBuilder.buildKafkaDataRecord(record));
164+
}
154165
if (config.getBoolean(config.SANITIZE_FIELD_NAME_CONFIG)) {
155166
convertedRecord = FieldNameSanitizer.replaceInvalidKeys(convertedRecord);
156167
}
157-
158168
return RowToInsert.of(getRowId(record), convertedRecord);
159169
}
160170

@@ -241,7 +251,9 @@ private SchemaManager getSchemaManager(BigQuery bigQuery) {
241251
schemaRetriever = config.getSchemaRetriever();
242252
SchemaConverter<com.google.cloud.bigquery.Schema> schemaConverter =
243253
config.getSchemaConverter();
244-
return new SchemaManager(schemaRetriever, schemaConverter, bigQuery);
254+
Optional<String> kafkaKeyFieldName = config.getKafkaKeyFieldName();
255+
Optional<String> kafkaDataFieldName = config.getKafkaDataFieldName();
256+
return new SchemaManager(schemaRetriever, schemaConverter, bigQuery, kafkaKeyFieldName, kafkaDataFieldName);
245257
}
246258

247259
private BigQueryWriter getBigQueryWriter() {
Lines changed: 44 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,26 @@
11
package com.wepay.kafka.connect.bigquery;
22

3+
34
import com.google.cloud.bigquery.BigQuery;
5+
import com.google.cloud.bigquery.Field;
6+
import com.google.cloud.bigquery.LegacySQLTypeName;
47
import com.google.cloud.bigquery.StandardTableDefinition;
58
import com.google.cloud.bigquery.TableId;
69
import com.google.cloud.bigquery.TableInfo;
710
import com.google.cloud.bigquery.TimePartitioning;
8-
11+
import com.wepay.kafka.connect.bigquery.api.KafkaSchemaRecordType;
912
import com.wepay.kafka.connect.bigquery.api.SchemaRetriever;
13+
import com.wepay.kafka.connect.bigquery.convert.KafkaDataBuilder;
1014
import com.wepay.kafka.connect.bigquery.convert.SchemaConverter;
1115

12-
import com.wepay.kafka.connect.bigquery.write.row.AdaptiveBigQueryWriter;
1316
import org.apache.kafka.connect.data.Schema;
1417
import org.slf4j.Logger;
1518
import org.slf4j.LoggerFactory;
1619

20+
import java.util.ArrayList;
21+
import java.util.List;
22+
import java.util.Optional;
23+
1724
/**
1825
* Class for managing Schemas of BigQuery tables (creating and updating).
1926
*/
@@ -23,6 +30,8 @@ public class SchemaManager {
2330
private final SchemaRetriever schemaRetriever;
2431
private final SchemaConverter<com.google.cloud.bigquery.Schema> schemaConverter;
2532
private final BigQuery bigQuery;
33+
private final Optional<String> kafkaKeyFieldName;
34+
private final Optional<String> kafkaDataFieldName;
2635

2736
/**
2837
* @param schemaRetriever Used to determine the Kafka Connect Schema that should be used for a
@@ -33,10 +42,14 @@ public class SchemaManager {
3342
public SchemaManager(
3443
SchemaRetriever schemaRetriever,
3544
SchemaConverter<com.google.cloud.bigquery.Schema> schemaConverter,
36-
BigQuery bigQuery) {
45+
BigQuery bigQuery,
46+
Optional<String> kafkaKeyFieldName,
47+
Optional<String> kafkaDataFieldName) {
3748
this.schemaRetriever = schemaRetriever;
3849
this.schemaConverter = schemaConverter;
3950
this.bigQuery = bigQuery;
51+
this.kafkaKeyFieldName = kafkaKeyFieldName;
52+
this.kafkaDataFieldName = kafkaDataFieldName;
4053
}
4154

4255
/**
@@ -45,8 +58,9 @@ public SchemaManager(
4558
* @param topic The Kafka topic used to determine the schema.
4659
*/
4760
public void createTable(TableId table, String topic) {
48-
Schema kafkaConnectSchema = schemaRetriever.retrieveSchema(table, topic);
49-
bigQuery.create(constructTableInfo(table, kafkaConnectSchema));
61+
Schema kafkaValueSchema = schemaRetriever.retrieveSchema(table, topic, KafkaSchemaRecordType.VALUE);
62+
Schema kafkaKeySchema = kafkaKeyFieldName.isPresent() ? schemaRetriever.retrieveSchema(table, topic, KafkaSchemaRecordType.KEY) : null;
63+
bigQuery.create(constructTableInfo(table, kafkaKeySchema, kafkaValueSchema));
5064
}
5165

5266
/**
@@ -55,26 +69,44 @@ public void createTable(TableId table, String topic) {
5569
* @param topic The Kafka topic used to determine the schema.
5670
*/
5771
public void updateSchema(TableId table, String topic) {
58-
Schema kafkaConnectSchema = schemaRetriever.retrieveSchema(table, topic);
59-
TableInfo tableInfo = constructTableInfo(table, kafkaConnectSchema);
72+
Schema kafkaValueSchema = schemaRetriever.retrieveSchema(table, topic, KafkaSchemaRecordType.VALUE);
73+
Schema kafkaKeySchema = kafkaKeyFieldName.isPresent() ? schemaRetriever.retrieveSchema(table, topic, KafkaSchemaRecordType.KEY) : null;
74+
TableInfo tableInfo = constructTableInfo(table, kafkaKeySchema, kafkaValueSchema);
6075
logger.info("Attempting to update table `{}` with schema {}",
6176
table, tableInfo.getDefinition().getSchema());
6277
bigQuery.update(tableInfo);
6378
}
6479

6580
// package private for testing.
66-
TableInfo constructTableInfo(TableId table, Schema kafkaConnectSchema) {
67-
com.google.cloud.bigquery.Schema bigQuerySchema =
68-
schemaConverter.convertSchema(kafkaConnectSchema);
81+
TableInfo constructTableInfo(TableId table, Schema kafkaKeySchema, Schema kafkaValueSchema) {
82+
com.google.cloud.bigquery.Schema bigQuerySchema = getBigQuerySchema(kafkaKeySchema, kafkaValueSchema);
6983
StandardTableDefinition tableDefinition = StandardTableDefinition.newBuilder()
7084
.setSchema(bigQuerySchema)
7185
.setTimePartitioning(TimePartitioning.of(TimePartitioning.Type.DAY))
7286
.build();
7387
TableInfo.Builder tableInfoBuilder =
7488
TableInfo.newBuilder(table, tableDefinition);
75-
if (kafkaConnectSchema.doc() != null) {
76-
tableInfoBuilder.setDescription(kafkaConnectSchema.doc());
89+
if (kafkaValueSchema.doc() != null) {
90+
tableInfoBuilder.setDescription(kafkaValueSchema.doc());
7791
}
7892
return tableInfoBuilder.build();
7993
}
94+
95+
private com.google.cloud.bigquery.Schema getBigQuerySchema(Schema kafkaKeySchema, Schema kafkaValueSchema) {
96+
List<Field> allFields = new ArrayList<> ();
97+
com.google.cloud.bigquery.Schema valueSchema = schemaConverter.convertSchema(kafkaValueSchema);
98+
allFields.addAll(valueSchema.getFields());
99+
if (kafkaKeyFieldName.isPresent()) {
100+
com.google.cloud.bigquery.Schema keySchema = schemaConverter.convertSchema(kafkaKeySchema);
101+
Field kafkaKeyField = Field.newBuilder(kafkaKeyFieldName.get(), LegacySQLTypeName.RECORD, keySchema.getFields())
102+
.setMode(Field.Mode.NULLABLE).build();
103+
allFields.add(kafkaKeyField);
104+
}
105+
if (kafkaDataFieldName.isPresent()) {
106+
Field kafkaDataField = KafkaDataBuilder.buildKafkaDataField(kafkaDataFieldName.get());
107+
allFields.add(kafkaDataField);
108+
}
109+
return com.google.cloud.bigquery.Schema.of(allFields);
110+
}
111+
80112
}

0 commit comments

Comments
 (0)