Skip to content

Commit 02f7688

Browse files
Danielwicknicks
andauthored
CC-7246: add ability to partition based on timestamp of a record value field (#246)
* removed unused method getSchemaManager in BigQuerySinkConnector * added functionality to partition based on field name * added tests for SchemaManager and BigQuerySinkTask * Update kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkTaskConfigTest.java Co-Authored-By: Arjun Satish <[email protected]> * PR nit fixes Co-authored-by: Arjun Satish <[email protected]>
1 parent e6dfa04 commit 02f7688

File tree

6 files changed

+137
-36
lines changed

6 files changed

+137
-36
lines changed

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkConnector.java

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -96,17 +96,6 @@ private BigQuery getBigQuery() {
9696
return new BigQueryHelper().setKeySource(keySource).connect(projectName, key);
9797
}
9898

99-
private SchemaManager getSchemaManager(BigQuery bigQuery) {
100-
if (testSchemaManager != null) {
101-
return testSchemaManager;
102-
}
103-
SchemaRetriever schemaRetriever = config.getSchemaRetriever();
104-
SchemaConverter<com.google.cloud.bigquery.Schema> schemaConverter = config.getSchemaConverter();
105-
Optional<String> kafkaKeyFieldName = config.getKafkaKeyFieldName();
106-
Optional<String> kafkaDataFieldName = config.getKafkaDataFieldName();
107-
return new SchemaManager(schemaRetriever, schemaConverter, bigQuery, kafkaKeyFieldName, kafkaDataFieldName);
108-
}
109-
11099
private void ensureExistingTables() {
111100
BigQuery bigQuery = getBigQuery();
112101
Map<String, TableId> topicsToTableIds = TopicToTableResolver.getTopicsToTables(config);

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,9 @@ private SchemaManager getSchemaManager(BigQuery bigQuery) {
265265
config.getSchemaConverter();
266266
Optional<String> kafkaKeyFieldName = config.getKafkaKeyFieldName();
267267
Optional<String> kafkaDataFieldName = config.getKafkaDataFieldName();
268-
return new SchemaManager(schemaRetriever, schemaConverter, bigQuery, kafkaKeyFieldName, kafkaDataFieldName);
268+
Optional<String> timestampPartitionFieldName = config.getTimestampPartitionFieldName();
269+
return new SchemaManager(schemaRetriever, schemaConverter, bigQuery, kafkaKeyFieldName,
270+
kafkaDataFieldName, timestampPartitionFieldName);
269271
}
270272

271273
private BigQueryWriter getBigQueryWriter() {

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/SchemaManager.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import com.google.cloud.bigquery.TableId;
99
import com.google.cloud.bigquery.TableInfo;
1010
import com.google.cloud.bigquery.TimePartitioning;
11+
import com.google.cloud.bigquery.TimePartitioning.Type;
1112
import com.wepay.kafka.connect.bigquery.api.KafkaSchemaRecordType;
1213
import com.wepay.kafka.connect.bigquery.api.SchemaRetriever;
1314
import com.wepay.kafka.connect.bigquery.convert.KafkaDataBuilder;
@@ -32,6 +33,7 @@ public class SchemaManager {
3233
private final BigQuery bigQuery;
3334
private final Optional<String> kafkaKeyFieldName;
3435
private final Optional<String> kafkaDataFieldName;
36+
private final Optional<String> timestampPartitionFieldName;
3537

3638
/**
3739
* @param schemaRetriever Used to determine the Kafka Connect Schema that should be used for a
@@ -48,12 +50,14 @@ public SchemaManager(
4850
SchemaConverter<com.google.cloud.bigquery.Schema> schemaConverter,
4951
BigQuery bigQuery,
5052
Optional<String> kafkaKeyFieldName,
51-
Optional<String> kafkaDataFieldName) {
53+
Optional<String> kafkaDataFieldName,
54+
Optional<String> timestampPartitionFieldName) {
5255
this.schemaRetriever = schemaRetriever;
5356
this.schemaConverter = schemaConverter;
5457
this.bigQuery = bigQuery;
5558
this.kafkaKeyFieldName = kafkaKeyFieldName;
5659
this.kafkaDataFieldName = kafkaDataFieldName;
60+
this.timestampPartitionFieldName = timestampPartitionFieldName;
5761
}
5862

5963
/**
@@ -84,9 +88,15 @@ public void updateSchema(TableId table, String topic) {
8488
// package private for testing.
8589
TableInfo constructTableInfo(TableId table, Schema kafkaKeySchema, Schema kafkaValueSchema) {
8690
com.google.cloud.bigquery.Schema bigQuerySchema = getBigQuerySchema(kafkaKeySchema, kafkaValueSchema);
91+
92+
TimePartitioning timePartitioning = TimePartitioning.of(Type.DAY);
93+
if (timestampPartitionFieldName.isPresent()){
94+
timePartitioning = timePartitioning.toBuilder().setField(timestampPartitionFieldName.get()).build();
95+
}
96+
8797
StandardTableDefinition tableDefinition = StandardTableDefinition.newBuilder()
8898
.setSchema(bigQuerySchema)
89-
.setTimePartitioning(TimePartitioning.of(TimePartitioning.Type.DAY))
99+
.setTimePartitioning(timePartitioning)
90100
.build();
91101
TableInfo.Builder tableInfoBuilder =
92102
TableInfo.newBuilder(table, tableDefinition);

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkTaskConfig.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919

2020

21+
import java.util.Optional;
2122
import org.apache.kafka.common.config.ConfigDef;
2223
import org.apache.kafka.common.config.ConfigException;
2324

@@ -107,6 +108,16 @@ public class BigQuerySinkTaskConfig extends BigQuerySinkConfig {
107108
+ "Default is true. Setting this to true appends partition decorator to table name (e.g. table$yyyyMMdd depending on the configuration set for bigQueryPartitionDecorator). "
108109
+ "Setting this to false bypasses the logic to append the partition decorator and uses raw table name for inserts.";
109110

111+
public static final String BIGQUERY_TIMESTAMP_PARTITION_FIELD_NAME_CONFIG = "timestampPartitionFieldName";
112+
private static final ConfigDef.Type BIGQUERY_TIMESTAMP_PARTITION_FIELD_NAME_TYPE = ConfigDef.Type.STRING;
113+
private static final String BIGQUERY_TIMESTAMP_PARTITION_FIELD_NAME_DEFAULT = null;
114+
private static final ConfigDef.Importance BIGQUERY_TIMESTAMP_PARTITION_FIELD_NAME_IMPORTANCE =
115+
ConfigDef.Importance.LOW;
116+
private static final String BIGQUERY_TIMESTAMP_PARTITION_FIELD_NAME_DOC =
117+
"The name of the field in the value that contains the timestamp to partition by in BigQuery"
118+
+ " and enable timestamp partitioning for each table. Leave this configuration blank,"
119+
+ " to enable ingestion time partitioning for each table.";
120+
110121
static {
111122
config = BigQuerySinkConfig.getConfig()
112123
.define(
@@ -155,6 +166,12 @@ public class BigQuerySinkTaskConfig extends BigQuerySinkConfig {
155166
BIGQUERY_PARTITION_DECORATOR_DEFAULT,
156167
BIGQUERY_PARTITION_DECORATOR_IMPORTANCE,
157168
BIGQUERY_PARTITION_DECORATOR_DOC
169+
).define(
170+
BIGQUERY_TIMESTAMP_PARTITION_FIELD_NAME_CONFIG,
171+
BIGQUERY_TIMESTAMP_PARTITION_FIELD_NAME_TYPE,
172+
BIGQUERY_TIMESTAMP_PARTITION_FIELD_NAME_DEFAULT,
173+
BIGQUERY_TIMESTAMP_PARTITION_FIELD_NAME_IMPORTANCE,
174+
BIGQUERY_TIMESTAMP_PARTITION_FIELD_NAME_DOC
158175
);
159176
}
160177

@@ -175,6 +192,26 @@ private void checkAutoUpdateSchemas() {
175192
}
176193
}
177194

195+
/**
196+
* Returns the field name to use for timestamp partitioning.
197+
* @return String that represents the field name.
198+
*/
199+
public Optional<String> getTimestampPartitionFieldName() {
200+
return Optional.ofNullable(getString(BIGQUERY_TIMESTAMP_PARTITION_FIELD_NAME_CONFIG));
201+
}
202+
203+
/**
204+
* Check the validity of table partitioning configs.
205+
*/
206+
private void checkPartitionCofigs() {
207+
if (getTimestampPartitionFieldName().isPresent() && getBoolean(BIGQUERY_PARTITION_DECORATOR_CONFIG)){
208+
throw new ConfigException(
209+
"Only one partitioning configuration mode may be specified for the connector. "
210+
+ "Use either bigQueryPartitionDecorator OR timestampPartitionFieldName."
211+
);
212+
}
213+
}
214+
178215
public static ConfigDef getConfig() {
179216
return config;
180217
}
@@ -185,5 +222,6 @@ public static ConfigDef getConfig() {
185222
public BigQuerySinkTaskConfig(Map<String, String> properties) {
186223
super(config, properties);
187224
checkAutoUpdateSchemas();
225+
checkPartitionCofigs();
188226
}
189227
}

kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/SchemaManagerTest.java

Lines changed: 49 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.google.cloud.bigquery.BigQuery;
2525
import com.google.cloud.bigquery.Field;
2626
import com.google.cloud.bigquery.LegacySQLTypeName;
27+
import com.google.cloud.bigquery.StandardTableDefinition;
2728
import com.google.cloud.bigquery.TableId;
2829
import com.google.cloud.bigquery.TableInfo;
2930

@@ -33,45 +34,71 @@
3334
import org.apache.kafka.connect.data.Schema;
3435

3536
import org.junit.Assert;
37+
import org.junit.Before;
3638
import org.junit.Test;
3739

3840
import java.util.Optional;
3941

4042
public class SchemaManagerTest {
4143

42-
@Test
43-
public void testBQTableDescription() {
44-
final String testTableName = "testTable";
45-
final String testDatasetName = "testDataset";
46-
final String testDoc = "test doc";
47-
final TableId tableId = TableId.of(testDatasetName, testTableName);
48-
49-
SchemaRetriever mockSchemaRetriever = mock(SchemaRetriever.class);
50-
@SuppressWarnings("unchecked")
51-
SchemaConverter<com.google.cloud.bigquery.Schema> mockSchemaConverter =
44+
private String testTableName = "testTable";
45+
private String testDatasetName = "testDataset";
46+
private String testDoc = "test doc";
47+
private TableId tableId = TableId.of(testDatasetName, testTableName);
48+
49+
private SchemaRetriever mockSchemaRetriever;
50+
private SchemaConverter<com.google.cloud.bigquery.Schema> mockSchemaConverter;
51+
private BigQuery mockBigQuery;
52+
private Schema mockKafkaSchema;
53+
private com.google.cloud.bigquery.Schema fakeBigQuerySchema;
54+
55+
@Before
56+
public void before() {
57+
mockSchemaRetriever = mock(SchemaRetriever.class);
58+
mockSchemaConverter =
5259
(SchemaConverter<com.google.cloud.bigquery.Schema>) mock(SchemaConverter.class);
53-
BigQuery mockBigQuery = mock(BigQuery.class);
60+
mockBigQuery = mock(BigQuery.class);
61+
mockKafkaSchema = mock(Schema.class);
62+
fakeBigQuerySchema = com.google.cloud.bigquery.Schema.of(
63+
Field.of("mock field", LegacySQLTypeName.STRING));
64+
}
5465

66+
@Test
67+
public void testBQTableDescription() {
5568
Optional<String> kafkaKeyFieldName = Optional.of("kafkaKey");
5669
Optional<String> kafkaDataFieldName = Optional.of("kafkaData");
70+
SchemaManager schemaManager = new SchemaManager(mockSchemaRetriever, mockSchemaConverter,
71+
mockBigQuery, kafkaKeyFieldName, kafkaDataFieldName, Optional.empty());
5772

58-
SchemaManager schemaManager = new SchemaManager(mockSchemaRetriever,
59-
mockSchemaConverter,
60-
mockBigQuery,
61-
kafkaKeyFieldName,
62-
kafkaDataFieldName);
73+
when(mockSchemaConverter.convertSchema(mockKafkaSchema)).thenReturn(fakeBigQuerySchema);
74+
when(mockKafkaSchema.doc()).thenReturn(testDoc);
6375

64-
Schema mockKafkaSchema = mock(Schema.class);
65-
// we would prefer to mock this class, but it is final.
66-
com.google.cloud.bigquery.Schema fakeBigQuerySchema =
67-
com.google.cloud.bigquery.Schema.of(Field.of("mock field", LegacySQLTypeName.STRING));
76+
TableInfo tableInfo = schemaManager
77+
.constructTableInfo(tableId, mockKafkaSchema, mockKafkaSchema);
78+
79+
Assert.assertEquals("Kafka doc does not match BigQuery table description",
80+
testDoc, tableInfo.getDescription());
81+
Assert.assertNull("Timestamp partition field name is not null",
82+
((StandardTableDefinition) tableInfo.getDefinition()).getTimePartitioning().getField());
83+
}
84+
85+
@Test
86+
public void testTimestampPartitionSet() {
87+
Optional<String> testField = Optional.of("testField");
88+
SchemaManager schemaManager = new SchemaManager(mockSchemaRetriever, mockSchemaConverter,
89+
mockBigQuery, Optional.empty(), Optional.empty(), testField);
6890

6991
when(mockSchemaConverter.convertSchema(mockKafkaSchema)).thenReturn(fakeBigQuerySchema);
7092
when(mockKafkaSchema.doc()).thenReturn(testDoc);
7193

72-
TableInfo tableInfo = schemaManager.constructTableInfo(tableId, mockKafkaSchema, mockKafkaSchema);
94+
TableInfo tableInfo = schemaManager
95+
.constructTableInfo(tableId, mockKafkaSchema, mockKafkaSchema);
7396

7497
Assert.assertEquals("Kafka doc does not match BigQuery table description",
75-
testDoc, tableInfo.getDescription());
98+
testDoc, tableInfo.getDescription());
99+
Assert.assertEquals("The field name does not match the field name of time partition",
100+
testField.get(),
101+
((StandardTableDefinition) tableInfo.getDefinition()).getTimePartitioning().getField());
76102
}
103+
77104
}

kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkTaskConfigTest.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
*/
1919

2020

21+
import static org.junit.Assert.assertFalse;
22+
import static org.junit.Assert.assertTrue;
2123
import static org.junit.Assert.fail;
2224

2325
import com.wepay.kafka.connect.bigquery.SinkTaskPropertiesFactory;
@@ -62,6 +64,39 @@ public void testMaxWriteSize() {
6264
*/
6365
}
6466

67+
/**
68+
* Test the default for the field name is not present.
69+
*/
70+
@Test
71+
public void testEmptyTimestampPartitionFieldName() {
72+
Map<String, String> configProperties = propertiesFactory.getProperties();
73+
BigQuerySinkTaskConfig testConfig = new BigQuerySinkTaskConfig(configProperties);
74+
assertFalse(testConfig.getTimestampPartitionFieldName().isPresent());
75+
}
76+
77+
/**
78+
* Test if the field name being non-empty and the decorator default (true) errors correctly.
79+
*/
80+
@Test (expected = ConfigException.class)
81+
public void testTimestampPartitionFieldNameError() {
82+
Map<String, String> configProperties = propertiesFactory.getProperties();
83+
configProperties.put(BigQuerySinkTaskConfig.BIGQUERY_TIMESTAMP_PARTITION_FIELD_NAME_CONFIG, "name");
84+
new BigQuerySinkTaskConfig(configProperties);
85+
}
86+
87+
/**
88+
* Test the field name being non-empty and the decorator set to false works correctly.
89+
*/
90+
@Test
91+
public void testTimestampPartitionFieldName() {
92+
Map<String, String> configProperties = propertiesFactory.getProperties();
93+
configProperties.put(BigQuerySinkTaskConfig.BIGQUERY_TIMESTAMP_PARTITION_FIELD_NAME_CONFIG, "name");
94+
configProperties.put(BigQuerySinkTaskConfig.BIGQUERY_PARTITION_DECORATOR_CONFIG, "false");
95+
BigQuerySinkTaskConfig testConfig = new BigQuerySinkTaskConfig(configProperties);
96+
assertTrue(testConfig.getTimestampPartitionFieldName().isPresent());
97+
assertFalse(testConfig.getBoolean(BigQuerySinkTaskConfig.BIGQUERY_PARTITION_DECORATOR_CONFIG));
98+
}
99+
65100
@Test(expected = ConfigException.class)
66101
public void testAutoSchemaUpdateWithoutRetriever() {
67102
Map<String, String> badConfigProperties = propertiesFactory.getProperties();

0 commit comments

Comments
 (0)