Skip to content

Commit a102e8c

Browse files
author
Bingqin Zhou
committed
Move KafkaSchemaRecordType to api package.
1 parent 5dbec33 commit a102e8c

File tree

11 files changed

+25
-21
lines changed

11 files changed

+25
-21
lines changed
Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,6 @@
1-
package com.wepay.kafka.connect.bigquery.convert;
1+
package com.wepay.kafka.connect.bigquery.api;
22

33

4-
/**
5-
* Enum class for Kafka schema or record type, either value or key.
6-
*/
74
public enum KafkaSchemaRecordType {
85

96
VALUE("value"),

kcbq-api/src/main/java/com/wepay/kafka/connect/bigquery/api/SchemaRetriever.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ public interface SchemaRetriever {
2525
* @param schemaType The type of kafka schema, either "value" or "key".
2626
* @return The Schema for the given table.
2727
*/
28-
public Schema retrieveSchema(TableId table, String topic, String schemaType);
28+
public Schema retrieveSchema(TableId table, String topic, KafkaSchemaRecordType schemaType);
2929

3030
/**
3131
* Set the last seen schema for a given topic

kcbq-confluent/src/main/java/com/wepay/kafka/connect/bigquery/schemaregistry/schemaretriever/SchemaRegistrySchemaRetriever.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.google.cloud.bigquery.TableId;
44

5+
import com.wepay.kafka.connect.bigquery.api.KafkaSchemaRecordType;
56
import com.wepay.kafka.connect.bigquery.api.SchemaRetriever;
67

78
import io.confluent.connect.avro.AvroData;
@@ -61,7 +62,7 @@ public void configure(Map<String, String> properties) {
6162
}
6263

6364
@Override
64-
public Schema retrieveSchema(TableId table, String topic, String schemaType) {
65+
public Schema retrieveSchema(TableId table, String topic, KafkaSchemaRecordType schemaType) {
6566
String subject = getSubject(topic, schemaType);
6667
try {
6768
logger.debug("Retrieving schema information for topic {} with subject {}", topic, subject);
@@ -80,7 +81,7 @@ public Schema retrieveSchema(TableId table, String topic, String schemaType) {
8081
@Override
8182
public void setLastSeenSchema(TableId table, String topic, Schema schema) { }
8283

83-
private String getSubject(String topic, String schemaType) {
84-
return topic + "-" + schemaType;
84+
private String getSubject(String topic, KafkaSchemaRecordType schemaType) {
85+
return topic + "-" + schemaType.toString();
8586
}
8687
}

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,11 @@
2525
import com.google.cloud.storage.BucketInfo;
2626
import com.google.cloud.storage.Storage;
2727
import com.google.common.annotations.VisibleForTesting;
28+
import com.wepay.kafka.connect.bigquery.api.KafkaSchemaRecordType;
2829
import com.wepay.kafka.connect.bigquery.api.SchemaRetriever;
2930
import com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig;
3031
import com.wepay.kafka.connect.bigquery.config.BigQuerySinkTaskConfig;
3132
import com.wepay.kafka.connect.bigquery.convert.KafkaDataBuilder;
32-
import com.wepay.kafka.connect.bigquery.convert.KafkaSchemaRecordType;
3333
import com.wepay.kafka.connect.bigquery.convert.RecordConverter;
3434
import com.wepay.kafka.connect.bigquery.convert.SchemaConverter;
3535
import com.wepay.kafka.connect.bigquery.exception.SinkConfigConnectException;

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/SchemaManager.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@
22

33

44
import com.google.cloud.bigquery.*;
5+
import com.wepay.kafka.connect.bigquery.api.KafkaSchemaRecordType;
56
import com.wepay.kafka.connect.bigquery.api.SchemaRetriever;
67
import com.wepay.kafka.connect.bigquery.convert.KafkaDataBuilder;
7-
import com.wepay.kafka.connect.bigquery.convert.KafkaSchemaRecordType;
88
import com.wepay.kafka.connect.bigquery.convert.SchemaConverter;
99

1010
import org.apache.kafka.connect.data.Schema;
@@ -52,8 +52,8 @@ public SchemaManager(
5252
* @param topic The Kafka topic used to determine the schema.
5353
*/
5454
public void createTable(TableId table, String topic) {
55-
Schema kafkaValueSchema = schemaRetriever.retrieveSchema(table, topic, KafkaSchemaRecordType.VALUE.toString());
56-
Schema kafkaKeySchema = kafkaKeyFieldName.isPresent() ? schemaRetriever.retrieveSchema(table, topic, KafkaSchemaRecordType.KEY.toString()) : null;
55+
Schema kafkaValueSchema = schemaRetriever.retrieveSchema(table, topic, KafkaSchemaRecordType.VALUE);
56+
Schema kafkaKeySchema = kafkaKeyFieldName.isPresent() ? schemaRetriever.retrieveSchema(table, topic, KafkaSchemaRecordType.KEY) : null;
5757
bigQuery.create(constructTableInfo(table, kafkaKeySchema, kafkaValueSchema));
5858
}
5959

@@ -63,8 +63,8 @@ public void createTable(TableId table, String topic) {
6363
* @param topic The Kafka topic used to determine the schema.
6464
*/
6565
public void updateSchema(TableId table, String topic) {
66-
Schema kafkaValueSchema = schemaRetriever.retrieveSchema(table, topic, KafkaSchemaRecordType.VALUE.toString());
67-
Schema kafkaKeySchema = kafkaKeyFieldName.isPresent() ? schemaRetriever.retrieveSchema(table, topic, KafkaSchemaRecordType.KEY.toString()) : null;
66+
Schema kafkaValueSchema = schemaRetriever.retrieveSchema(table, topic, KafkaSchemaRecordType.VALUE);
67+
Schema kafkaKeySchema = kafkaKeyFieldName.isPresent() ? schemaRetriever.retrieveSchema(table, topic, KafkaSchemaRecordType.KEY) : null;
6868
TableInfo tableInfo = constructTableInfo(table, kafkaKeySchema, kafkaValueSchema);
6969
logger.info("Attempting to update table `{}` with schema {}",
7070
table, tableInfo.getDefinition().getSchema());

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/convert/BigQueryRecordConverter.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020

2121
import com.google.cloud.bigquery.InsertAllRequest.RowToInsert;
22+
import com.wepay.kafka.connect.bigquery.api.KafkaSchemaRecordType;
2223
import com.wepay.kafka.connect.bigquery.convert.logicaltype.DebeziumLogicalConverters;
2324
import com.wepay.kafka.connect.bigquery.convert.logicaltype.KafkaLogicalConverters;
2425
import com.wepay.kafka.connect.bigquery.convert.logicaltype.LogicalConverterRegistry;

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/convert/RecordConverter.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919

2020

21+
import com.wepay.kafka.connect.bigquery.api.KafkaSchemaRecordType;
2122
import org.apache.kafka.connect.sink.SinkRecord;
2223

2324
/**

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/retrieve/MemorySchemaRetriever.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.google.cloud.bigquery.TableId;
44

5+
import com.wepay.kafka.connect.bigquery.api.KafkaSchemaRecordType;
56
import com.wepay.kafka.connect.bigquery.api.SchemaRetriever;
67

78
import org.apache.kafka.common.cache.Cache;
@@ -40,7 +41,7 @@ public void configure(Map<String, String> properties) {
4041
}
4142

4243
@Override
43-
public Schema retrieveSchema(TableId table, String topic, String schemaType) {
44+
public Schema retrieveSchema(TableId table, String topic, KafkaSchemaRecordType schemaType) {
4445
String tableName = table.getTable();
4546
Schema schema = schemaCache.get(getCacheKey(tableName, topic));
4647
if (schema != null) {

kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/BigQuerySinkConnectorTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import com.google.cloud.bigquery.Table;
3434
import com.google.cloud.bigquery.TableId;
3535

36+
import com.wepay.kafka.connect.bigquery.api.KafkaSchemaRecordType;
3637
import com.wepay.kafka.connect.bigquery.api.SchemaRetriever;
3738

3839
import com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig;
@@ -61,7 +62,7 @@ public void configure(Map<String, String> properties) {
6162
}
6263

6364
@Override
64-
public Schema retrieveSchema(TableId table, String topic, String subjectType) {
65+
public Schema retrieveSchema(TableId table, String topic, KafkaSchemaRecordType schemaType) {
6566
// Shouldn't be called
6667
return null;
6768
}

kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/convert/BigQueryRecordConverterTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import static org.junit.Assert.assertEquals;
2222

23+
import com.wepay.kafka.connect.bigquery.api.KafkaSchemaRecordType;
2324
import com.wepay.kafka.connect.bigquery.exception.ConversionConnectException;
2425

2526
import org.apache.kafka.connect.data.Schema;

0 commit comments

Comments
 (0)