Skip to content

Commit a6dc6f0

Browse files
authored
Add support to configure BigQuery clustering (#265)
1 parent 5e2dd7c commit a6dc6f0

File tree

5 files changed

+168
-23
lines changed

5 files changed

+168
-23
lines changed

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import java.time.Instant;
5959
import java.util.Collection;
6060
import java.util.HashMap;
61+
import java.util.List;
6162
import java.util.Map;
6263
import java.util.Set;
6364
import java.util.UUID;
@@ -143,17 +144,17 @@ private PartitionedTableId getRecordTable(SinkRecord record) {
143144
TableId baseTableId = topicsToBaseTableIds.get(record.topic());
144145

145146
PartitionedTableId.Builder builder = new PartitionedTableId.Builder(baseTableId);
146-
if(usePartitionDecorator) {
147-
148-
if (useMessageTimeDatePartitioning) {
149-
if (record.timestampType() == TimestampType.NO_TIMESTAMP_TYPE) {
150-
throw new ConnectException(
151-
"Message has no timestamp type, cannot use message timestamp to partition.");
152-
}
153-
builder.setDayPartition(record.timestamp());
154-
} else {
155-
builder.setDayPartitionForNow();
156-
}
147+
if (usePartitionDecorator) {
148+
149+
if (useMessageTimeDatePartitioning) {
150+
if (record.timestampType() == TimestampType.NO_TIMESTAMP_TYPE) {
151+
throw new ConnectException(
152+
"Message has no timestamp type, cannot use message timestamp to partition.");
153+
}
154+
builder.setDayPartition(record.timestamp());
155+
} else {
156+
builder.setDayPartitionForNow();
157+
}
157158
}
158159

159160
return builder.build();
@@ -266,8 +267,9 @@ private SchemaManager getSchemaManager(BigQuery bigQuery) {
266267
Optional<String> kafkaKeyFieldName = config.getKafkaKeyFieldName();
267268
Optional<String> kafkaDataFieldName = config.getKafkaDataFieldName();
268269
Optional<String> timestampPartitionFieldName = config.getTimestampPartitionFieldName();
270+
Optional<List<String>> clusteringFieldName = config.getClusteringPartitionFieldName();
269271
return new SchemaManager(schemaRetriever, schemaConverter, bigQuery, kafkaKeyFieldName,
270-
kafkaDataFieldName, timestampPartitionFieldName);
272+
kafkaDataFieldName, timestampPartitionFieldName, clusteringFieldName);
271273
}
272274

273275
private BigQueryWriter getBigQueryWriter() {

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/SchemaManager.java

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import com.google.cloud.bigquery.StandardTableDefinition;
88
import com.google.cloud.bigquery.TableId;
99
import com.google.cloud.bigquery.TableInfo;
10+
import com.google.cloud.bigquery.Clustering;
1011
import com.google.cloud.bigquery.TimePartitioning;
1112
import com.google.cloud.bigquery.TimePartitioning.Type;
1213
import com.wepay.kafka.connect.bigquery.api.KafkaSchemaRecordType;
@@ -34,6 +35,7 @@ public class SchemaManager {
3435
private final Optional<String> kafkaKeyFieldName;
3536
private final Optional<String> kafkaDataFieldName;
3637
private final Optional<String> timestampPartitionFieldName;
38+
private final Optional<List<String>> clusteringFieldName;
3739

3840
/**
3941
* @param schemaRetriever Used to determine the Kafka Connect Schema that should be used for a
@@ -51,13 +53,15 @@ public SchemaManager(
5153
BigQuery bigQuery,
5254
Optional<String> kafkaKeyFieldName,
5355
Optional<String> kafkaDataFieldName,
54-
Optional<String> timestampPartitionFieldName) {
56+
Optional<String> timestampPartitionFieldName,
57+
Optional<List<String>> clusteringFieldName) {
5558
this.schemaRetriever = schemaRetriever;
5659
this.schemaConverter = schemaConverter;
5760
this.bigQuery = bigQuery;
5861
this.kafkaKeyFieldName = kafkaKeyFieldName;
5962
this.kafkaDataFieldName = kafkaDataFieldName;
6063
this.timestampPartitionFieldName = timestampPartitionFieldName;
64+
this.clusteringFieldName = clusteringFieldName;
6165
}
6266

6367
/**
@@ -90,14 +94,22 @@ TableInfo constructTableInfo(TableId table, Schema kafkaKeySchema, Schema kafkaV
9094
com.google.cloud.bigquery.Schema bigQuerySchema = getBigQuerySchema(kafkaKeySchema, kafkaValueSchema);
9195

9296
TimePartitioning timePartitioning = TimePartitioning.of(Type.DAY);
93-
if (timestampPartitionFieldName.isPresent()){
97+
if (timestampPartitionFieldName.isPresent()) {
9498
timePartitioning = timePartitioning.toBuilder().setField(timestampPartitionFieldName.get()).build();
9599
}
96100

97-
StandardTableDefinition tableDefinition = StandardTableDefinition.newBuilder()
101+
StandardTableDefinition.Builder builder = StandardTableDefinition.newBuilder()
98102
.setSchema(bigQuerySchema)
99-
.setTimePartitioning(timePartitioning)
100-
.build();
103+
.setTimePartitioning(timePartitioning);
104+
105+
if (timestampPartitionFieldName.isPresent() && clusteringFieldName.isPresent()) {
106+
Clustering clustering = Clustering.newBuilder()
107+
.setFields(clusteringFieldName.get())
108+
.build();
109+
builder.setClustering(clustering);
110+
}
111+
112+
StandardTableDefinition tableDefinition = builder.build();
101113
TableInfo.Builder tableInfoBuilder =
102114
TableInfo.newBuilder(table, tableDefinition);
103115
if (kafkaValueSchema.doc() != null) {

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkTaskConfig.java

Lines changed: 45 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919

2020

21+
import java.util.List;
2122
import java.util.Optional;
2223
import org.apache.kafka.common.config.ConfigDef;
2324
import org.apache.kafka.common.config.ConfigException;
@@ -118,6 +119,14 @@ public class BigQuerySinkTaskConfig extends BigQuerySinkConfig {
118119
+ " and enable timestamp partitioning for each table. Leave this configuration blank,"
119120
+ " to enable ingestion time partitioning for each table.";
120121

122+
public static final String BIGQUERY_CLUSTERING_FIELD_NAMES_CONFIG = "clusteringPartitionFieldNames";
123+
private static final ConfigDef.Type BIGQUERY_CLUSTERING_FIELD_NAMES_TYPE = ConfigDef.Type.LIST;
124+
private static final List<String> BIGQUERY_CLUSTERING_FIELD_NAMES_DEFAULT = null;
125+
private static final ConfigDef.Importance BIGQUERY_CLUSTERING_FIELD_NAMES_IMPORTANCE =
126+
ConfigDef.Importance.LOW;
127+
private static final String BIGQUERY_CLUSTERING_FIELD_NAMES_DOC =
128+
"List of fields on which data should be clustered by in BigQuery, separated by commas";
129+
121130
static {
122131
config = BigQuerySinkConfig.getConfig()
123132
.define(
@@ -172,6 +181,12 @@ public class BigQuerySinkTaskConfig extends BigQuerySinkConfig {
172181
BIGQUERY_TIMESTAMP_PARTITION_FIELD_NAME_DEFAULT,
173182
BIGQUERY_TIMESTAMP_PARTITION_FIELD_NAME_IMPORTANCE,
174183
BIGQUERY_TIMESTAMP_PARTITION_FIELD_NAME_DOC
184+
).define(
185+
BIGQUERY_CLUSTERING_FIELD_NAMES_CONFIG,
186+
BIGQUERY_CLUSTERING_FIELD_NAMES_TYPE,
187+
BIGQUERY_CLUSTERING_FIELD_NAMES_DEFAULT,
188+
BIGQUERY_CLUSTERING_FIELD_NAMES_IMPORTANCE,
189+
BIGQUERY_CLUSTERING_FIELD_NAMES_DOC
175190
);
176191
}
177192

@@ -200,18 +215,44 @@ public Optional<String> getTimestampPartitionFieldName() {
200215
return Optional.ofNullable(getString(BIGQUERY_TIMESTAMP_PARTITION_FIELD_NAME_CONFIG));
201216
}
202217

218+
/**
219+
* Returns the field names to use for clustering.
220+
* @return List of Strings that represent the field names.
221+
*/
222+
public Optional<List<String>> getClusteringPartitionFieldName() {
223+
return Optional.ofNullable(getList(BIGQUERY_CLUSTERING_FIELD_NAMES_CONFIG));
224+
}
225+
203226
/**
204227
* Check the validity of table partitioning configs.
205228
*/
206-
private void checkPartitionCofigs() {
207-
if (getTimestampPartitionFieldName().isPresent() && getBoolean(BIGQUERY_PARTITION_DECORATOR_CONFIG)){
229+
private void checkPartitionConfigs() {
230+
if (getTimestampPartitionFieldName().isPresent() && getBoolean(BIGQUERY_PARTITION_DECORATOR_CONFIG)) {
208231
throw new ConfigException(
209232
"Only one partitioning configuration mode may be specified for the connector. "
210233
+ "Use either bigQueryPartitionDecorator OR timestampPartitionFieldName."
211234
);
212235
}
213236
}
214237

238+
/**
239+
* Check the validity of table clustering configs.
240+
*/
241+
private void checkClusteringConfigs() {
242+
if (getClusteringPartitionFieldName().isPresent()) {
243+
if (!getTimestampPartitionFieldName().isPresent() && !getBoolean(BIGQUERY_PARTITION_DECORATOR_CONFIG)) {
244+
throw new ConfigException(
245+
"Clustering field name may be specified only on a partitioned table."
246+
);
247+
}
248+
if (getClusteringPartitionFieldName().get().size() > 4) {
249+
throw new ConfigException(
250+
"You can only specify up to four clustering field names."
251+
);
252+
}
253+
}
254+
}
255+
215256
public static ConfigDef getConfig() {
216257
return config;
217258
}
@@ -222,6 +263,7 @@ public static ConfigDef getConfig() {
222263
public BigQuerySinkTaskConfig(Map<String, String> properties) {
223264
super(config, properties);
224265
checkAutoUpdateSchemas();
225-
checkPartitionCofigs();
266+
checkPartitionConfigs();
267+
checkClusteringConfigs();
226268
}
227269
}

kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/SchemaManagerTest.java

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@
3737
import org.junit.Before;
3838
import org.junit.Test;
3939

40+
import java.util.Arrays;
41+
import java.util.List;
4042
import java.util.Optional;
4143

4244
public class SchemaManagerTest {
@@ -68,7 +70,7 @@ public void testBQTableDescription() {
6870
Optional<String> kafkaKeyFieldName = Optional.of("kafkaKey");
6971
Optional<String> kafkaDataFieldName = Optional.of("kafkaData");
7072
SchemaManager schemaManager = new SchemaManager(mockSchemaRetriever, mockSchemaConverter,
71-
mockBigQuery, kafkaKeyFieldName, kafkaDataFieldName, Optional.empty());
73+
mockBigQuery, kafkaKeyFieldName, kafkaDataFieldName, Optional.empty(), Optional.empty());
7274

7375
when(mockSchemaConverter.convertSchema(mockKafkaSchema)).thenReturn(fakeBigQuerySchema);
7476
when(mockKafkaSchema.doc()).thenReturn(testDoc);
@@ -86,7 +88,7 @@ public void testBQTableDescription() {
8688
public void testTimestampPartitionSet() {
8789
Optional<String> testField = Optional.of("testField");
8890
SchemaManager schemaManager = new SchemaManager(mockSchemaRetriever, mockSchemaConverter,
89-
mockBigQuery, Optional.empty(), Optional.empty(), testField);
91+
mockBigQuery, Optional.empty(), Optional.empty(), testField, Optional.empty());
9092

9193
when(mockSchemaConverter.convertSchema(mockKafkaSchema)).thenReturn(fakeBigQuerySchema);
9294
when(mockKafkaSchema.doc()).thenReturn(testDoc);
@@ -101,4 +103,25 @@ public void testTimestampPartitionSet() {
101103
((StandardTableDefinition) tableInfo.getDefinition()).getTimePartitioning().getField());
102104
}
103105

106+
@Test
107+
public void testClusteringPartitionSet() {
108+
Optional<String> timestampPartitionFieldName = Optional.of("testField");
109+
Optional<List<String>> testField = Optional.of(Arrays.asList("column1", "column2"));
110+
SchemaManager schemaManager = new SchemaManager(mockSchemaRetriever, mockSchemaConverter,
111+
mockBigQuery, Optional.empty(), Optional.empty(), timestampPartitionFieldName, testField);
112+
113+
when(mockSchemaConverter.convertSchema(mockKafkaSchema)).thenReturn(fakeBigQuerySchema);
114+
when(mockKafkaSchema.doc()).thenReturn(testDoc);
115+
116+
TableInfo tableInfo = schemaManager
117+
.constructTableInfo(tableId, mockKafkaSchema, mockKafkaSchema);
118+
119+
Assert.assertEquals("Kafka doc does not match BigQuery table description",
120+
testDoc, tableInfo.getDescription());
121+
StandardTableDefinition definition = tableInfo.getDefinition();
122+
Assert.assertNotNull(definition.getClustering());
123+
Assert.assertEquals("The field name does not match the field name of time partition",
124+
testField.get(),
125+
definition.getClustering().getFields());
126+
}
104127
}

kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkTaskConfigTest.java

Lines changed: 67 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@
1818
*/
1919

2020

21+
import static org.junit.Assert.assertEquals;
2122
import static org.junit.Assert.assertFalse;
2223
import static org.junit.Assert.assertTrue;
23-
import static org.junit.Assert.fail;
2424

2525
import com.wepay.kafka.connect.bigquery.SinkTaskPropertiesFactory;
2626

@@ -29,7 +29,11 @@
2929
import org.junit.Before;
3030
import org.junit.Test;
3131

32+
import java.util.ArrayList;
33+
import java.util.Arrays;
34+
import java.util.List;
3235
import java.util.Map;
36+
import java.util.Optional;
3337

3438
public class BigQuerySinkTaskConfigTest {
3539
private SinkTaskPropertiesFactory propertiesFactory;
@@ -97,6 +101,68 @@ public void testTimestampPartitionFieldName() {
97101
assertFalse(testConfig.getBoolean(BigQuerySinkTaskConfig.BIGQUERY_PARTITION_DECORATOR_CONFIG));
98102
}
99103

104+
/**
105+
* Test the default for the field names is not present.
106+
*/
107+
@Test
108+
public void testEmptyClusteringFieldNames() {
109+
Map<String, String> configProperties = propertiesFactory.getProperties();
110+
BigQuerySinkTaskConfig testConfig = new BigQuerySinkTaskConfig(configProperties);
111+
assertFalse(testConfig.getClusteringPartitionFieldName().isPresent());
112+
}
113+
114+
/**
115+
* Test if the field names being non-empty and the partitioning is not present errors correctly.
116+
*/
117+
@Test (expected = ConfigException.class)
118+
public void testClusteringFieldNamesWithoutTimestampPartitionError() {
119+
Map<String, String> configProperties = propertiesFactory.getProperties();
120+
configProperties.put(BigQuerySinkTaskConfig.BIGQUERY_TIMESTAMP_PARTITION_FIELD_NAME_CONFIG, null);
121+
configProperties.put(BigQuerySinkTaskConfig.BIGQUERY_PARTITION_DECORATOR_CONFIG, "false");
122+
configProperties.put(
123+
BigQuerySinkTaskConfig.BIGQUERY_CLUSTERING_FIELD_NAMES_CONFIG,
124+
"column1,column2"
125+
);
126+
new BigQuerySinkTaskConfig(configProperties);
127+
}
128+
129+
/**
130+
* Test if the field names are more than four fields errors correctly.
131+
*/
132+
@Test (expected = ConfigException.class)
133+
public void testClusteringPartitionFieldNamesWithMoreThanFourFieldsError() {
134+
Map<String, String> configProperties = propertiesFactory.getProperties();
135+
configProperties.put(BigQuerySinkTaskConfig.BIGQUERY_PARTITION_DECORATOR_CONFIG, "true");
136+
configProperties.put(
137+
BigQuerySinkTaskConfig.BIGQUERY_CLUSTERING_FIELD_NAMES_CONFIG,
138+
"column1,column2,column3,column4,column5"
139+
);
140+
new BigQuerySinkTaskConfig(configProperties);
141+
}
142+
143+
/**
144+
* Test the field names being non-empty and the partitioning field exists works correctly.
145+
*/
146+
@Test
147+
public void testClusteringFieldNames() {
148+
Map<String, String> configProperties = propertiesFactory.getProperties();
149+
configProperties.put(BigQuerySinkTaskConfig.BIGQUERY_TIMESTAMP_PARTITION_FIELD_NAME_CONFIG, "name");
150+
configProperties.put(BigQuerySinkTaskConfig.BIGQUERY_PARTITION_DECORATOR_CONFIG, "false");
151+
configProperties.put(
152+
BigQuerySinkTaskConfig.BIGQUERY_CLUSTERING_FIELD_NAMES_CONFIG,
153+
"column1,column2"
154+
);
155+
156+
ArrayList<String> expectedClusteringPartitionFieldName = new ArrayList<>(
157+
Arrays.asList("column1", "column2")
158+
);
159+
160+
BigQuerySinkTaskConfig testConfig = new BigQuerySinkTaskConfig(configProperties);
161+
Optional<List<String>> testClusteringPartitionFieldName = testConfig.getClusteringPartitionFieldName();
162+
assertTrue(testClusteringPartitionFieldName.isPresent());
163+
assertEquals(expectedClusteringPartitionFieldName, testClusteringPartitionFieldName.get());
164+
}
165+
100166
@Test(expected = ConfigException.class)
101167
public void testAutoSchemaUpdateWithoutRetriever() {
102168
Map<String, String> badConfigProperties = propertiesFactory.getProperties();

0 commit comments

Comments
 (0)