Skip to content

Commit 8b4939d

Browse files
Added configuration to add/remove partition decorator for BigQuery inserts (#244)
1 parent 08f1a01 commit 8b4939d

File tree

3 files changed

+94
-9
lines changed

3 files changed

+94
-9
lines changed

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ public class BigQuerySinkTask extends SinkTask {
8181
private RecordConverter<Map<String, Object>> recordConverter;
8282
private Map<String, TableId> topicsToBaseTableIds;
8383
private boolean useMessageTimeDatePartitioning;
84+
private boolean usePartitionDecorator;
8485

8586
private TopicPartitionManager topicPartitionManager;
8687

@@ -144,15 +145,17 @@ private PartitionedTableId getRecordTable(SinkRecord record) {
144145
maybeCreateTable(record, baseTableId);
145146

146147
PartitionedTableId.Builder builder = new PartitionedTableId.Builder(baseTableId);
147-
if (useMessageTimeDatePartitioning) {
148-
if (record.timestampType() == TimestampType.NO_TIMESTAMP_TYPE) {
149-
throw new ConnectException(
150-
"Message has no timestamp type, cannot use message timestamp to partition.");
151-
}
152-
153-
builder.setDayPartition(record.timestamp());
154-
} else {
155-
builder.setDayPartitionForNow();
148+
if(usePartitionDecorator) {
149+
150+
if (useMessageTimeDatePartitioning) {
151+
if (record.timestampType() == TimestampType.NO_TIMESTAMP_TYPE) {
152+
throw new ConnectException(
153+
"Message has no timestamp type, cannot use message timestamp to partition.");
154+
}
155+
builder.setDayPartition(record.timestamp());
156+
} else {
157+
builder.setDayPartitionForNow();
158+
}
156159
}
157160

158161
return builder.build();
@@ -337,6 +340,8 @@ public void start(Map<String, String> properties) {
337340
topicPartitionManager = new TopicPartitionManager();
338341
useMessageTimeDatePartitioning =
339342
config.getBoolean(config.BIGQUERY_MESSAGE_TIME_PARTITIONING_CONFIG);
343+
usePartitionDecorator =
344+
config.getBoolean(config.BIGQUERY_PARTITION_DECORATOR_CONFIG);
340345
if (hasGCSBQTask) {
341346
startGCSToBQLoadTask();
342347
}

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkTaskConfig.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,19 @@ public class BigQuerySinkTaskConfig extends BigQuerySinkConfig {
9393
private static final String BIGQUERY_MESSAGE_TIME_PARTITIONING_DOC =
9494
"Whether or not to use the message time when inserting records. "
9595
+ "Default uses the connector processing time.";
96+
97+
public static final String BIGQUERY_PARTITION_DECORATOR_CONFIG =
98+
"bigQueryPartitionDecorator";
99+
private static final ConfigDef.Type BIGQUERY_PARTITION_DECORATOR_CONFIG_TYPE =
100+
ConfigDef.Type.BOOLEAN;
101+
//This has been set to true to preserve the existing behavior. However, we can set it to false if field based partitioning is used in BigQuery
102+
public static final Boolean BIGQUERY_PARTITION_DECORATOR_DEFAULT = true;
103+
private static final ConfigDef.Importance BIGQUERY_PARTITION_DECORATOR_IMPORTANCE =
104+
ConfigDef.Importance.HIGH;
105+
private static final String BIGQUERY_PARTITION_DECORATOR_DOC =
106+
"Whether or not to append partition decorator to BigQuery table name when inserting records. "
107+
+ "Default is true. Setting this to true appends partition decorator to table name (e.g. table$yyyyMMdd depending on the configuration set for bigQueryPartitionDecorator). "
108+
+ "Setting this to false bypasses the logic to append the partition decorator and uses raw table name for inserts.";
96109

97110
static {
98111
config = BigQuerySinkConfig.getConfig()
@@ -136,6 +149,12 @@ public class BigQuerySinkTaskConfig extends BigQuerySinkConfig {
136149
BIGQUERY_MESSAGE_TIME_PARTITIONING_DEFAULT,
137150
BIGQUERY_MESSAGE_TIME_PARTITIONING_IMPORTANCE,
138151
BIGQUERY_MESSAGE_TIME_PARTITIONING_DOC
152+
).define(
153+
BIGQUERY_PARTITION_DECORATOR_CONFIG,
154+
BIGQUERY_PARTITION_DECORATOR_CONFIG_TYPE,
155+
BIGQUERY_PARTITION_DECORATOR_DEFAULT,
156+
BIGQUERY_PARTITION_DECORATOR_IMPORTANCE,
157+
BIGQUERY_PARTITION_DECORATOR_DOC
139158
);
140159
}
141160

kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTaskTest.java

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,67 @@ public void testPutWhenPartitioningOnMessageTime() {
267267
verify(bigQuery, times(1)).insertAll(argument.capture());
268268
assertEquals("test-topic$20171026", argument.getValue().getTable().getTable());
269269
}
270+
271+
@Test
272+
public void testPutWhenPartitioningIsSetToTrue() {
273+
final String topic = "test-topic";
274+
275+
Map<String, String> properties = propertiesFactory.getProperties();
276+
properties.put(BigQuerySinkConfig.TOPICS_CONFIG, topic);
277+
properties.put(BigQuerySinkConfig.DATASETS_CONFIG, ".*=scratch");
278+
properties.put(BigQuerySinkTaskConfig.BIGQUERY_PARTITION_DECORATOR_CONFIG, "true");
279+
properties.put(BigQuerySinkTaskConfig.BIGQUERY_MESSAGE_TIME_PARTITIONING_CONFIG, "true");
280+
281+
BigQuery bigQuery = mock(BigQuery.class);
282+
Storage storage = mock(Storage.class);
283+
SinkTaskContext sinkTaskContext = mock(SinkTaskContext.class);
284+
InsertAllResponse insertAllResponse = mock(InsertAllResponse.class);
285+
286+
when(bigQuery.insertAll(anyObject())).thenReturn(insertAllResponse);
287+
when(insertAllResponse.hasErrors()).thenReturn(false);
288+
289+
BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, null, storage, null);
290+
testTask.initialize(sinkTaskContext);
291+
testTask.start(properties);
292+
293+
testTask.put(Collections.singletonList(spoofSinkRecord(topic, "value", "message text",
294+
TimestampType.CREATE_TIME, 1509007584334L)));
295+
testTask.flush(Collections.emptyMap());
296+
ArgumentCaptor<InsertAllRequest> argument = ArgumentCaptor.forClass(InsertAllRequest.class);
297+
298+
verify(bigQuery, times(1)).insertAll(argument.capture());
299+
assertEquals("test-topic$20171026", argument.getValue().getTable().getTable());
300+
}
301+
302+
@Test
303+
public void testPutWhenPartitioningIsSetToFalse() {
304+
final String topic = "test-topic";
305+
306+
Map<String, String> properties = propertiesFactory.getProperties();
307+
properties.put(BigQuerySinkConfig.TOPICS_CONFIG, topic);
308+
properties.put(BigQuerySinkConfig.DATASETS_CONFIG, ".*=scratch");
309+
properties.put(BigQuerySinkTaskConfig.BIGQUERY_PARTITION_DECORATOR_CONFIG, "false");
310+
311+
BigQuery bigQuery = mock(BigQuery.class);
312+
Storage storage = mock(Storage.class);
313+
SinkTaskContext sinkTaskContext = mock(SinkTaskContext.class);
314+
InsertAllResponse insertAllResponse = mock(InsertAllResponse.class);
315+
316+
when(bigQuery.insertAll(anyObject())).thenReturn(insertAllResponse);
317+
when(insertAllResponse.hasErrors()).thenReturn(false);
318+
319+
BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, null, storage, null);
320+
testTask.initialize(sinkTaskContext);
321+
testTask.start(properties);
322+
323+
testTask.put(Collections.singletonList(spoofSinkRecord(topic, "value", "message text",
324+
TimestampType.CREATE_TIME, 1509007584334L)));
325+
testTask.flush(Collections.emptyMap());
326+
ArgumentCaptor<InsertAllRequest> argument = ArgumentCaptor.forClass(InsertAllRequest.class);
327+
328+
verify(bigQuery, times(1)).insertAll(argument.capture());
329+
assertEquals("test-topic", argument.getValue().getTable().getTable());
330+
}
270331

271332
// Make sure a connect exception is thrown when the message has no timestamp type
272333
@Test(expected = ConnectException.class)

0 commit comments

Comments
 (0)