Skip to content

Commit 969e3a5

Browse files
author
Bingqin Zhou
committed
Remove boolean retrieveKey from retrieveSchema.
1 parent b3d490c commit 969e3a5

File tree

8 files changed

+24
-21
lines changed

8 files changed

+24
-21
lines changed

kcbq-api/src/main/java/com/wepay/kafka/connect/bigquery/api/SchemaRetriever.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,10 @@ public interface SchemaRetriever {
2222
* Retrieve the most current schema for the given topic.
2323
* @param table The table that will be created.
2424
* @param topic The topic to retrieve a schema for.
25+
* @param subjectType The type of kafka schema subject, either "value" or "key".
2526
* @return The Schema for the given table.
2627
*/
27-
public Schema retrieveSchema(TableId table, String topic, boolean retrieveKey);
28+
public Schema retrieveSchema(TableId table, String topic, String subjectType);
2829

2930
/**
3031
* Set the last seen schema for a given topic

kcbq-confluent/src/main/java/com/wepay/kafka/connect/bigquery/schemaregistry/schemaretriever/SchemaRegistrySchemaRetriever.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,8 @@ public void configure(Map<String, String> properties) {
6161
}
6262

6363
@Override
64-
public Schema retrieveSchema(TableId table, String topic, boolean retrieveKey) {
65-
String subject = getSubject(topic, retrieveKey);
64+
public Schema retrieveSchema(TableId table, String topic, String subjectType) {
65+
String subject = getSubject(topic, subjectType);
6666
try {
6767
logger.debug("Retrieving schema information for topic {} with subject {}", topic, subject);
6868
SchemaMetadata latestSchemaMetadata = schemaRegistryClient.getLatestSchemaMetadata(subject);
@@ -80,10 +80,7 @@ public Schema retrieveSchema(TableId table, String topic, boolean retrieveKey) {
8080
@Override
8181
public void setLastSeenSchema(TableId table, String topic, Schema schema) { }
8282

83-
private String getSubject(String topic, boolean retrieveKey) {
84-
if (retrieveKey) {
85-
return topic + "-key";
86-
}
87-
return topic + "-value";
83+
private String getSubject(String topic, String subjectType) {
84+
return topic + "-" + subjectType;
8885
}
8986
}

kcbq-confluent/src/test/java/com/wepay/kafka/connect/bigquery/schemaregistry/schemaretriever/SchemaRegistrySchemaRetrieverTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public void testRetrieveSchema() throws Exception {
4242
Schema expectedKafkaConnectSchema =
4343
SchemaBuilder.struct().field("f1", Schema.STRING_SCHEMA).name("testrecord").build();
4444

45-
assertEquals(expectedKafkaConnectSchema, testSchemaRetriever.retrieveSchema(table, testTopic, false));
46-
assertEquals(expectedKafkaConnectSchema, testSchemaRetriever.retrieveSchema(table, testTopic, true));
45+
assertEquals(expectedKafkaConnectSchema, testSchemaRetriever.retrieveSchema(table, testTopic, "value"));
46+
assertEquals(expectedKafkaConnectSchema, testSchemaRetriever.retrieveSchema(table, testTopic, "key"));
4747
}
4848
}

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/SchemaManager.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
*/
1919
public class SchemaManager {
2020
private static final Logger logger = LoggerFactory.getLogger(SchemaManager.class);
21+
private static final String VALUE = "value";
22+
private static final String KEY = "key";
2123

2224
private final SchemaRetriever schemaRetriever;
2325
private final SchemaConverter<com.google.cloud.bigquery.Schema> schemaConverter;
@@ -52,8 +54,8 @@ public SchemaManager(
5254
* @param topic The Kafka topic used to determine the schema.
5355
*/
5456
public void createTable(TableId table, String topic) {
55-
Schema kafkaValueSchema = schemaRetriever.retrieveSchema(table, topic, false);
56-
Schema kafkaKeySchema = includeKafkaKey ? schemaRetriever.retrieveSchema(table, topic, true) : null;
57+
Schema kafkaValueSchema = schemaRetriever.retrieveSchema(table, topic, VALUE);
58+
Schema kafkaKeySchema = includeKafkaKey ? schemaRetriever.retrieveSchema(table, topic, KEY) : null;
5759
bigQuery.create(constructTableInfo(table, kafkaKeySchema, kafkaValueSchema));
5860
}
5961

@@ -63,8 +65,8 @@ public void createTable(TableId table, String topic) {
6365
* @param topic The Kafka topic used to determine the schema.
6466
*/
6567
public void updateSchema(TableId table, String topic) {
66-
Schema kafkaValueSchema = schemaRetriever.retrieveSchema(table, topic, false);
67-
Schema kafkaKeySchema = includeKafkaKey ? schemaRetriever.retrieveSchema(table, topic, true) : null;
68+
Schema kafkaValueSchema = schemaRetriever.retrieveSchema(table, topic, VALUE);
69+
Schema kafkaKeySchema = includeKafkaKey ? schemaRetriever.retrieveSchema(table, topic, KEY) : null;
6870
TableInfo tableInfo = constructTableInfo(table, kafkaKeySchema, kafkaValueSchema);
6971
logger.info("Attempting to update table `{}` with schema {}",
7072
table, tableInfo.getDefinition().getSchema());

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/convert/KafkaDataConverter.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@
88
import java.util.HashMap;
99
import java.util.Map;
1010

11+
/**
12+
* Helper class to construct kafka data schema and kafka data record.
13+
*/
1114
public class KafkaDataConverter {
1215

1316
public static final String KAFKA_DATA_FIELD_NAME = "kafkaData";

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/retrieve/MemorySchemaRetriever.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public void configure(Map<String, String> properties) {
4040
}
4141

4242
@Override
43-
public Schema retrieveSchema(TableId table, String topic, boolean retrieveKey) {
43+
public Schema retrieveSchema(TableId table, String topic, String subjectType) {
4444
String tableName = table.getTable();
4545
Schema schema = schemaCache.get(getCacheKey(tableName, topic));
4646
if (schema != null) {

kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/BigQuerySinkConnectorTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public void configure(Map<String, String> properties) {
6161
}
6262

6363
@Override
64-
public Schema retrieveSchema(TableId table, String topic, boolean retrieveKey) {
64+
public Schema retrieveSchema(TableId table, String topic, String subjectType) {
6565
// Shouldn't be called
6666
return null;
6767
}

kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/retrieve/MemorySchemaRetrieverTest.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ public void testRetrieveSchemaWhenNoLastSeenSchemaReturnsEmptyStructSchema() {
2525
final TableId tableId = getTableId("testTable", "testDataset");
2626
SchemaRetriever retriever = new MemorySchemaRetriever();
2727
retriever.configure(new HashMap<>());
28-
Assert.assertEquals(retriever.retrieveSchema(tableId, topic, false), SchemaBuilder.struct().build());
28+
Assert.assertEquals(retriever.retrieveSchema(tableId, topic, "value"), SchemaBuilder.struct().build());
2929
}
3030

3131
@Test
@@ -38,7 +38,7 @@ public void testRetrieveSchemaWhenLastSeenExistsSucceeds() {
3838
Schema expectedSchema = Schema.OPTIONAL_FLOAT32_SCHEMA;
3939
retriever.setLastSeenSchema(tableId, topic, expectedSchema);
4040

41-
Assert.assertEquals(retriever.retrieveSchema(tableId, topic, true), expectedSchema);
41+
Assert.assertEquals(retriever.retrieveSchema(tableId, topic, "key"), expectedSchema);
4242
}
4343

4444
@Test
@@ -56,8 +56,8 @@ public void testRetrieveSchemaWithMultipleSchemasSucceeds() {
5656
retriever.setLastSeenSchema(intTableId, intSchemaTopic, expectedIntSchema);
5757

5858
Assert.assertEquals(
59-
retriever.retrieveSchema(floatTableId, floatSchemaTopic, false), expectedFloatSchema);
60-
Assert.assertEquals(retriever.retrieveSchema(intTableId, intSchemaTopic, true), expectedIntSchema);
59+
retriever.retrieveSchema(floatTableId, floatSchemaTopic, "value"), expectedFloatSchema);
60+
Assert.assertEquals(retriever.retrieveSchema(intTableId, intSchemaTopic, "key"), expectedIntSchema);
6161
}
6262

6363
@Test
@@ -72,6 +72,6 @@ public void testRetrieveSchemaRetrievesLastSeenSchema() {
7272
retriever.setLastSeenSchema(tableId, intSchemaTopic, firstSchema);
7373
retriever.setLastSeenSchema(tableId, intSchemaTopic, secondSchema);
7474

75-
Assert.assertEquals(retriever.retrieveSchema(tableId, intSchemaTopic, false), secondSchema);
75+
Assert.assertEquals(retriever.retrieveSchema(tableId, intSchemaTopic, "value"), secondSchema);
7676
}
7777
}

0 commit comments

Comments
 (0)