Skip to content

Commit fb2bfa8

Browse files
author
Bingqin Zhou
committed
Refactor.
1 parent 4bfbe01 commit fb2bfa8

File tree

9 files changed

+66
-54
lines changed

9 files changed

+66
-54
lines changed

kcbq-api/src/main/java/com/wepay/kafka/connect/bigquery/api/SchemaRetriever.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,10 @@ public interface SchemaRetriever {
2222
* Retrieve the most current schema for the given topic.
2323
* @param table The table that will be created.
2424
* @param topic The topic to retrieve a schema for.
25-
* @param subjectType The type of kafka schema subject, either "value" or "key".
25+
* @param schemaType The type of kafka schema, either "value" or "key".
2626
* @return The Schema for the given table.
2727
*/
28-
public Schema retrieveSchema(TableId table, String topic, String subjectType);
28+
public Schema retrieveSchema(TableId table, String topic, String schemaType);
2929

3030
/**
3131
* Set the last seen schema for a given topic

kcbq-confluent/src/main/java/com/wepay/kafka/connect/bigquery/schemaregistry/schemaretriever/SchemaRegistrySchemaRetriever.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,8 @@ public void configure(Map<String, String> properties) {
6161
}
6262

6363
@Override
64-
public Schema retrieveSchema(TableId table, String topic, String subjectType) {
65-
String subject = getSubject(topic, subjectType);
64+
public Schema retrieveSchema(TableId table, String topic, String schemaType) {
65+
String subject = getSubject(topic, schemaType);
6666
try {
6767
logger.debug("Retrieving schema information for topic {} with subject {}", topic, subject);
6868
SchemaMetadata latestSchemaMetadata = schemaRegistryClient.getLatestSchemaMetadata(subject);
@@ -80,7 +80,7 @@ public Schema retrieveSchema(TableId table, String topic, String subjectType) {
8080
@Override
8181
public void setLastSeenSchema(TableId table, String topic, Schema schema) { }
8282

83-
private String getSubject(String topic, String subjectType) {
84-
return topic + "-" + subjectType;
83+
private String getSubject(String topic, String schemaType) {
84+
return topic + "-" + schemaType;
8585
}
8686
}

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig;
3030
import com.wepay.kafka.connect.bigquery.config.BigQuerySinkTaskConfig;
3131
import com.wepay.kafka.connect.bigquery.convert.KafkaDataBuilder;
32+
import com.wepay.kafka.connect.bigquery.convert.KafkaSchemaRecordType;
3233
import com.wepay.kafka.connect.bigquery.convert.RecordConverter;
3334
import com.wepay.kafka.connect.bigquery.convert.SchemaConverter;
3435
import com.wepay.kafka.connect.bigquery.exception.SinkConfigConnectException;
@@ -48,7 +49,6 @@
4849
import org.apache.kafka.common.TopicPartition;
4950
import org.apache.kafka.common.config.ConfigException;
5051
import org.apache.kafka.common.record.TimestampType;
51-
import org.apache.kafka.connect.data.Schema;
5252
import org.apache.kafka.connect.errors.ConnectException;
5353
import org.apache.kafka.connect.sink.SinkRecord;
5454
import org.apache.kafka.connect.sink.SinkTask;
@@ -152,9 +152,9 @@ private PartitionedTableId getRecordTable(SinkRecord record) {
152152
}
153153

154154
private RowToInsert getRecordRow(SinkRecord record) {
155-
Map<String, Object> convertedRecord = recordConverter.convertRecord(record.valueSchema(), record.value());
155+
Map<String, Object> convertedRecord = recordConverter.convertRecord(record, KafkaSchemaRecordType.VALUE);
156156
if (config.getBoolean(config.INCLUDE_KAFKA_KEY_CONFIG)) {
157-
convertedRecord.put(config.getString(config.KAFKA_KEY_FIELD_NAME_CONFIG), recordConverter.convertRecord(record.keySchema(), record.key()));
157+
convertedRecord.put(config.getString(config.KAFKA_KEY_FIELD_NAME_CONFIG), recordConverter.convertRecord(record, KafkaSchemaRecordType.KEY));
158158
}
159159
if (config.getBoolean(config.INCLUDE_KAFKA_DATA_CONFIG)) {
160160
convertedRecord.put(config.getString(config.KAFKA_DATA_FIELD_NAME_CONFIG), KafkaDataBuilder.buildKafkaDataRecord(record));

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/SchemaManager.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import com.google.cloud.bigquery.*;
55
import com.wepay.kafka.connect.bigquery.api.SchemaRetriever;
66
import com.wepay.kafka.connect.bigquery.convert.KafkaDataBuilder;
7+
import com.wepay.kafka.connect.bigquery.convert.KafkaSchemaRecordType;
78
import com.wepay.kafka.connect.bigquery.convert.SchemaConverter;
89

910
import org.apache.kafka.connect.data.Schema;
@@ -18,8 +19,6 @@
1819
*/
1920
public class SchemaManager {
2021
private static final Logger logger = LoggerFactory.getLogger(SchemaManager.class);
21-
private static final String VALUE = "value";
22-
private static final String KEY = "key";
2322

2423
private final SchemaRetriever schemaRetriever;
2524
private final SchemaConverter<com.google.cloud.bigquery.Schema> schemaConverter;
@@ -58,8 +57,8 @@ public SchemaManager(
5857
* @param topic The Kafka topic used to determine the schema.
5958
*/
6059
public void createTable(TableId table, String topic) {
61-
Schema kafkaValueSchema = schemaRetriever.retrieveSchema(table, topic, VALUE);
62-
Schema kafkaKeySchema = includeKafkaKey ? schemaRetriever.retrieveSchema(table, topic, KEY) : null;
60+
Schema kafkaValueSchema = schemaRetriever.retrieveSchema(table, topic, KafkaSchemaRecordType.VALUE.toString());
61+
Schema kafkaKeySchema = includeKafkaKey ? schemaRetriever.retrieveSchema(table, topic, KafkaSchemaRecordType.KEY.toString()) : null;
6362
bigQuery.create(constructTableInfo(table, kafkaKeySchema, kafkaValueSchema));
6463
}
6564

@@ -69,8 +68,8 @@ public void createTable(TableId table, String topic) {
6968
* @param topic The Kafka topic used to determine the schema.
7069
*/
7170
public void updateSchema(TableId table, String topic) {
72-
Schema kafkaValueSchema = schemaRetriever.retrieveSchema(table, topic, VALUE);
73-
Schema kafkaKeySchema = includeKafkaKey ? schemaRetriever.retrieveSchema(table, topic, KEY) : null;
71+
Schema kafkaValueSchema = schemaRetriever.retrieveSchema(table, topic, KafkaSchemaRecordType.VALUE.toString());
72+
Schema kafkaKeySchema = includeKafkaKey ? schemaRetriever.retrieveSchema(table, topic, KafkaSchemaRecordType.KEY.toString()) : null;
7473
TableInfo tableInfo = constructTableInfo(table, kafkaKeySchema, kafkaValueSchema);
7574
logger.info("Attempting to update table `{}` with schema {}",
7675
table, tableInfo.getDefinition().getSchema());

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/convert/BigQueryRecordConverter.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -65,15 +65,15 @@ public BigQueryRecordConverter(boolean shouldConvertDoubleSpecial) {
6565

6666
/**
6767
* Convert a {@link SinkRecord} into the contents of a BigQuery {@link RowToInsert}.
68-
* @param kafkaConnectSchema The schema of the kafka connect record to convert. Must be of type {@link Struct},
69-
* in order to translate into a row format that requires each field to
70-
* consist of both a name and a value.
71-
* @param kafkaConnectStruct The struct of the kafka connect record to convert. Must be of type {@link Struct},
72-
* in order to translate into a row format that requires each field to
73-
* consist of both a name and a value.
68+
* @param record The Kafka Connect record to convert. Must be of type {@link Struct},
69+
* in order to translate into a row format that requires each field to
70+
* consist of both a name and a value.
71+
* @param recordType The type of the record to convert, either value or key.
7472
* @return The result BigQuery row content.
7573
*/
76-
public Map<String, Object> convertRecord(Schema kafkaConnectSchema, Object kafkaConnectStruct) {
74+
public Map<String, Object> convertRecord(SinkRecord record, KafkaSchemaRecordType recordType) {
75+
Schema kafkaConnectSchema = recordType == KafkaSchemaRecordType.KEY ? record.keySchema() : record.valueSchema();
76+
Object kafkaConnectStruct = recordType == KafkaSchemaRecordType.KEY ? record.key() : record.value();
7777
if (kafkaConnectSchema == null) {
7878
if (kafkaConnectStruct instanceof Map) {
7979
return (Map<String, Object>) convertSchemalessRecord(kafkaConnectStruct);
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package com.wepay.kafka.connect.bigquery.convert;
2+
3+
4+
public enum KafkaSchemaRecordType {
5+
VALUE("value"),
6+
KEY("key");
7+
String str;
8+
KafkaSchemaRecordType(String str) {
9+
this.str = str;
10+
}
11+
public String toString() {
12+
return this.str;
13+
}
14+
}

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/convert/RecordConverter.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
*/
1919

2020

21-
import org.apache.kafka.connect.data.Schema;
2221
import org.apache.kafka.connect.sink.SinkRecord;
2322

2423
/**
@@ -27,10 +26,10 @@
2726
*/
2827
public interface RecordConverter<R> {
2928
/**
30-
* @param schema The schema of the record to convert.
31-
* @param struct The struct of the record to convert.
29+
* @param record The record to convert.
30+
* @param recordType The type of the record to convert, either value or key.
3231
* @return The converted record.
3332
*/
34-
R convertRecord(Schema schema, Object struct);
33+
R convertRecord(SinkRecord record, KafkaSchemaRecordType recordType);
3534

3635
}

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/retrieve/MemorySchemaRetriever.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public void configure(Map<String, String> properties) {
4040
}
4141

4242
@Override
43-
public Schema retrieveSchema(TableId table, String topic, String subjectType) {
43+
public Schema retrieveSchema(TableId table, String topic, String schemaType) {
4444
String tableName = table.getTable();
4545
Schema schema = schemaCache.get(getCacheKey(tableName, topic));
4646
if (schema != null) {

0 commit comments

Comments
 (0)