Skip to content

Commit 4bfbe01

Browse files
author
Bingqin Zhou
committed
Change kafka data function names.
1 parent cf01bf6 commit 4bfbe01

File tree

4 files changed

+8
-8
lines changed

4 files changed

+8
-8
lines changed

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ private RowToInsert getRecordRow(SinkRecord record) {
157157
convertedRecord.put(config.getString(config.KAFKA_KEY_FIELD_NAME_CONFIG), recordConverter.convertRecord(record.keySchema(), record.key()));
158158
}
159159
if (config.getBoolean(config.INCLUDE_KAFKA_DATA_CONFIG)) {
160-
convertedRecord.put(config.getString(config.KAFKA_DATA_FIELD_NAME_CONFIG), KafkaDataBuilder.getKafkaDataRecord(record));
160+
convertedRecord.put(config.getString(config.KAFKA_DATA_FIELD_NAME_CONFIG), KafkaDataBuilder.buildKafkaDataRecord(record));
161161
}
162162
if (config.getBoolean(config.SANITIZE_FIELD_NAME_CONFIG)) {
163163
convertedRecord = FieldNameSanitizer.replaceInvalidKeys(convertedRecord);

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/SchemaManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ private com.google.cloud.bigquery.Schema getBigQuerySchema(Schema kafkaKeySchema
103103
allFields.add(kafkaKeyField);
104104
}
105105
if (includeKafkaData) {
106-
Field kafkaDataField = KafkaDataBuilder.getKafkaDataField(kafkaDataFieldName);
106+
Field kafkaDataField = KafkaDataBuilder.buildKafkaDataField(kafkaDataFieldName);
107107
allFields.add(kafkaDataField);
108108
}
109109
return com.google.cloud.bigquery.Schema.of(allFields);

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/convert/KafkaDataBuilder.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ public class KafkaDataBuilder {
1818
public static final String KAFKA_DATA_OFFSET_FIELD_NAME = "offset";
1919
public static final String KAFKA_DATA_INSERT_TIME_FIELD_NAME = "insertTime";
2020

21-
public static Field getKafkaDataField(String kafkaDataFieldName) {
21+
public static Field buildKafkaDataField(String kafkaDataFieldName) {
2222
Field topicField = com.google.cloud.bigquery.Field.of(KAFKA_DATA_TOPIC_FIELD_NAME, LegacySQLTypeName.STRING);
2323
Field partitionField = com.google.cloud.bigquery.Field.of(KAFKA_DATA_PARTITION_FIELD_NAME, LegacySQLTypeName.INTEGER);
2424
Field offsetField = com.google.cloud.bigquery.Field.of(KAFKA_DATA_OFFSET_FIELD_NAME, LegacySQLTypeName.INTEGER);
@@ -32,7 +32,7 @@ public static Field getKafkaDataField(String kafkaDataFieldName) {
3232
.setMode(com.google.cloud.bigquery.Field.Mode.NULLABLE).build();
3333
}
3434

35-
public static Map<String, Object> getKafkaDataRecord(SinkRecord kafkaConnectRecord) {
35+
public static Map<String, Object> buildKafkaDataRecord(SinkRecord kafkaConnectRecord) {
3636
HashMap<String, Object> kafkaData = new HashMap<>();
3737
kafkaData.put(KAFKA_DATA_TOPIC_FIELD_NAME, kafkaConnectRecord.topic());
3838
kafkaData.put(KAFKA_DATA_PARTITION_FIELD_NAME, kafkaConnectRecord.kafkaPartition());

kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/convert/KafkaDataConverterTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ public class KafkaDataConverterTest {
2121
private static final String kafkaDataInsertTimeName = "insertTime";
2222

2323
@Test
24-
public void testGetKafkaDataRecord() {
24+
public void testBuildKafkaDataRecord() {
2525

2626
final String kafkaDataTopicValue = "testTopic";
2727
final int kafkaDataPartitionValue = 101;
@@ -33,7 +33,7 @@ public void testGetKafkaDataRecord() {
3333
expectedKafkaDataFields.put(kafkaDataOffsetName, kafkaDataOffsetValue);
3434

3535
SinkRecord record = new SinkRecord(kafkaDataTopicValue, kafkaDataPartitionValue, null, null, null, null, kafkaDataOffsetValue);
36-
Map<String, Object> actualKafkaDataFields = KafkaDataBuilder.getKafkaDataRecord(record);
36+
Map<String, Object> actualKafkaDataFields = KafkaDataBuilder.buildKafkaDataRecord(record);
3737

3838
assertTrue(actualKafkaDataFields.containsKey(kafkaDataInsertTimeName));
3939
assertTrue(actualKafkaDataFields.get(kafkaDataInsertTimeName) instanceof Double);
@@ -44,7 +44,7 @@ public void testGetKafkaDataRecord() {
4444
}
4545

4646
@Test
47-
public void testGetKafkaDataField() {
47+
public void testBuildKafkaDataField() {
4848
Field topicField = Field.of("topic", LegacySQLTypeName.STRING);
4949
Field partitionField = Field.of("partition", LegacySQLTypeName.INTEGER);
5050
Field offsetField = Field.of("offset", LegacySQLTypeName.INTEGER);
@@ -60,7 +60,7 @@ public void testGetKafkaDataField() {
6060
insertTimeField)
6161
.setMode(Field.Mode.NULLABLE)
6262
.build();
63-
Field actualBigQuerySchema = KafkaDataBuilder.getKafkaDataField(kafkaDataFieldName);
63+
Field actualBigQuerySchema = KafkaDataBuilder.buildKafkaDataField(kafkaDataFieldName);
6464
assertEquals(expectedBigQuerySchema, actualBigQuerySchema);
6565
}
6666
}

0 commit comments

Comments
 (0)