Skip to content

Commit 08fa1f8

Browse files
psainicsvikasrathee-cs
authored andcommitted
Time Partitioning Type Support
1 parent afcd9c8 commit 08fa1f8

File tree

8 files changed

+219
-16
lines changed

8 files changed

+219
-16
lines changed

docs/BigQueryTable-batchsink.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,9 @@ is ignored if the table already exists.
9292
* When this is set to Integer, table will be created with range partitioning.
9393
* When this is set to None, table will be created without time partitioning.
9494

95+
**Time Partitioning Type**: Specifies the time partitioning type. Can either be Daily or Hourly or Monthly or Yearly.
96+
Default is Daily. Ignored when table already exists
97+
9598
**Range Start**: For integer partitioning, specifies the start of the range. Only used when table doesn’t
9699
exist already, and partitioning type is set to Integer.
97100
* The start value is inclusive.

src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQueryOutputFormat.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,9 @@ public void commitJob(JobContext jobContext) throws IOException {
229229
LOG.debug("Allow schema relaxation: '{}'", allowSchemaRelaxation);
230230
PartitionType partitionType = conf.getEnum(BigQueryConstants.CONFIG_PARTITION_TYPE, PartitionType.NONE);
231231
LOG.debug("Create Partitioned Table type: '{}'", partitionType);
232+
com.google.cloud.bigquery.TimePartitioning.Type timePartitioningType = conf.getEnum(
233+
BigQueryConstants.CONFIG_TIME_PARTITIONING_TYPE, com.google.cloud.bigquery.TimePartitioning.Type.DAY
234+
);
232235
Range range = partitionType == PartitionType.INTEGER ? createRangeForIntegerPartitioning(conf) : null;
233236
String partitionByField = conf.get(BigQueryConstants.CONFIG_PARTITION_BY_FIELD, null);
234237
LOG.debug("Partition Field: '{}'", partitionByField);
@@ -254,7 +257,7 @@ public void commitJob(JobContext jobContext) throws IOException {
254257

255258
try {
256259
importFromGcs(destProjectId, destTable, destSchema.orElse(null), kmsKeyName, outputFileFormat,
257-
writeDisposition, sourceUris, partitionType, range, partitionByField,
260+
writeDisposition, sourceUris, partitionType, timePartitioningType, range, partitionByField,
258261
requirePartitionFilter, clusteringOrderList, tableExists, conf);
259262
} catch (Exception e) {
260263
throw new IOException("Failed to import GCS into BigQuery. ", e);
@@ -298,8 +301,9 @@ public void abortJob(JobContext context, JobStatus.State state) throws IOExcepti
298301
*/
299302
private void importFromGcs(String projectId, TableReference tableRef, @Nullable TableSchema schema,
300303
@Nullable String kmsKeyName, BigQueryFileFormat sourceFormat, String writeDisposition,
301-
List<String> gcsPaths, PartitionType partitionType, @Nullable Range range,
302-
@Nullable String partitionByField, boolean requirePartitionFilter,
304+
List<String> gcsPaths, PartitionType partitionType,
305+
com.google.cloud.bigquery.TimePartitioning.Type timePartitioningType,
306+
@Nullable Range range, @Nullable String partitionByField, boolean requirePartitionFilter,
303307
List<String> clusteringOrderList, boolean tableExists, Configuration conf)
304308
throws IOException, InterruptedException {
305309
LOG.info("Importing into table '{}' from {} paths; path[0] is '{}'; awaitCompletion: {}",
@@ -357,7 +361,8 @@ private void importFromGcs(String projectId, TableReference tableRef, @Nullable
357361
if (!tableExists) {
358362
switch (partitionType) {
359363
case TIME:
360-
TimePartitioning timePartitioning = createTimePartitioning(partitionByField, requirePartitionFilter);
364+
TimePartitioning timePartitioning = createTimePartitioning(partitionByField, requirePartitionFilter,
365+
timePartitioningType);
361366
loadConfig.setTimePartitioning(timePartitioning);
362367
break;
363368
case INTEGER:
@@ -756,9 +761,10 @@ private Range createRangeForIntegerPartitioning(Configuration conf) {
756761
}
757762

758763
private TimePartitioning createTimePartitioning(
759-
@Nullable String partitionByField, boolean requirePartitionFilter) {
764+
@Nullable String partitionByField, boolean requirePartitionFilter,
765+
com.google.cloud.bigquery.TimePartitioning.Type timePartitioningType) {
760766
TimePartitioning timePartitioning = new TimePartitioning();
761-
timePartitioning.setType("DAY");
767+
timePartitioning.setType(timePartitioningType.name());
762768
if (partitionByField != null) {
763769
timePartitioning.setField(partitionByField);
764770
}

src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySink.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.google.cloud.bigquery.JobId;
2525
import com.google.cloud.bigquery.JobStatistics;
2626
import com.google.cloud.bigquery.Table;
27+
import com.google.cloud.bigquery.TimePartitioning;
2728
import com.google.cloud.hadoop.io.bigquery.output.BigQueryTableFieldSchema;
2829
import com.google.common.collect.ImmutableMap;
2930
import com.google.gson.Gson;
@@ -316,6 +317,9 @@ private void configureBigQuerySink() {
316317
PartitionType partitioningType = getConfig().getPartitioningType();
317318
baseConfiguration.setEnum(BigQueryConstants.CONFIG_PARTITION_TYPE, partitioningType);
318319

320+
TimePartitioning.Type timePartitioningType = getConfig().getTimePartitioningType();
321+
baseConfiguration.setEnum(BigQueryConstants.CONFIG_TIME_PARTITIONING_TYPE, timePartitioningType);
322+
319323
if (config.getRangeStart() != null) {
320324
baseConfiguration.setLong(BigQueryConstants.CONFIG_PARTITION_INTEGER_RANGE_START, config.getRangeStart());
321325
}

src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkConfig.java

Lines changed: 36 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ public final class BigQuerySinkConfig extends AbstractBigQuerySinkConfig {
6969
public static final String NAME_OPERATION = "operation";
7070
public static final String PARTITION_FILTER = "partitionFilter";
7171
public static final String NAME_PARTITIONING_TYPE = "partitioningType";
72+
public static final String NAME_TIME_PARTITIONING_TYPE = "timePartitioningType";
7273
public static final String NAME_RANGE_START = "rangeStart";
7374
public static final String NAME_RANGE_END = "rangeEnd";
7475
public static final String NAME_RANGE_INTERVAL = "rangeInterval";
@@ -102,6 +103,13 @@ public final class BigQuerySinkConfig extends AbstractBigQuerySinkConfig {
102103
+ "Ignored when table already exists")
103104
protected String partitioningType;
104105

106+
@Name(NAME_TIME_PARTITIONING_TYPE)
107+
@Macro
108+
@Nullable
109+
@Description("Specifies the time partitioning type. Can either be Daily or Hourly or Monthly or Yearly. "
110+
+ "Ignored when table already exists")
111+
protected String timePartitioningType;
112+
105113
@Name(NAME_RANGE_START)
106114
@Macro
107115
@Nullable
@@ -269,6 +277,11 @@ public PartitionType getPartitioningType() {
269277
: PartitionType.valueOf(partitioningType.toUpperCase());
270278
}
271279

280+
public TimePartitioning.Type getTimePartitioningType() {
281+
return Strings.isNullOrEmpty(timePartitioningType) ? TimePartitioning.Type.DAY :
282+
TimePartitioning.Type.valueOf(timePartitioningType.toUpperCase());
283+
}
284+
272285
/**
273286
* @return the schema of the dataset
274287
*/
@@ -458,7 +471,7 @@ private void validateColumnForPartition(@Nullable String columnName, @Nullable S
458471
Schema fieldSchema = field.getSchema();
459472
fieldSchema = fieldSchema.isNullable() ? fieldSchema.getNonNullable() : fieldSchema;
460473
if (partitioningType == PartitionType.TIME) {
461-
validateTimePartitioningColumn(columnName, collector, fieldSchema);
474+
validateTimePartitioningColumn(columnName, collector, fieldSchema, getTimePartitioningType());
462475
} else if (partitioningType == PartitionType.INTEGER) {
463476
validateIntegerPartitioningColumn(columnName, collector, fieldSchema);
464477
validateIntegerPartitioningRange(getRangeStart(), getRangeEnd(), getRangeInterval(), collector);
@@ -474,15 +487,30 @@ private void validateIntegerPartitioningColumn(String columnName, FailureCollect
474487
}
475488
}
476489

477-
private void validateTimePartitioningColumn(String columnName, FailureCollector collector, Schema fieldSchema) {
490+
private void validateTimePartitioningColumn(String columnName, FailureCollector collector,
491+
Schema fieldSchema, TimePartitioning.Type timePartitioningType) {
492+
478493
Schema.LogicalType logicalType = fieldSchema.getLogicalType();
479-
if (logicalType != LogicalType.DATE && logicalType != LogicalType.TIMESTAMP_MICROS
480-
&& logicalType != LogicalType.TIMESTAMP_MILLIS) {
494+
495+
boolean isTimestamp = logicalType == LogicalType.TIMESTAMP_MICROS || logicalType == LogicalType.TIMESTAMP_MILLIS;
496+
boolean isDate = logicalType == LogicalType.DATE;
497+
boolean isTimestampOrDate = isTimestamp || isDate;
498+
499+
// If timePartitioningType is HOUR, then logicalType cannot be DATE Only TIMESTAMP_MICROS and TIMESTAMP_MILLIS
500+
if (timePartitioningType == TimePartitioning.Type.HOUR && !isTimestamp) {
481501
collector.addFailure(
482-
String.format("Partition column '%s' is of invalid type '%s'.", columnName, fieldSchema.getDisplayName()),
483-
"Partition column must be a date or timestamp.")
484-
.withConfigProperty(NAME_PARTITION_BY_FIELD)
485-
.withOutputSchemaField(columnName).withInputSchemaField(columnName);
502+
String.format("Partition column '%s' is of invalid type '%s'.",
503+
columnName, fieldSchema.getDisplayName()),
504+
"Partition column must be a timestamp.").withConfigProperty(NAME_PARTITION_BY_FIELD)
505+
.withOutputSchemaField(columnName).withInputSchemaField(columnName);
506+
507+
// For any other timePartitioningType (DAY, MONTH, YEAR) logicalType can be DATE, TIMESTAMP_MICROS, TIMESTAMP_MILLIS
508+
} else if (!isTimestampOrDate) {
509+
collector.addFailure(
510+
String.format("Partition column '%s' is of invalid type '%s'.",
511+
columnName, fieldSchema.getDisplayName()),
512+
"Partition column must be a date or timestamp.").withConfigProperty(NAME_PARTITION_BY_FIELD)
513+
.withOutputSchemaField(columnName).withInputSchemaField(columnName);
486514
}
487515
}
488516

src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQueryWrite.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -405,8 +405,9 @@ protected QueryJobConfiguration.Builder getUpdateUpsertQueryJobBuilder(TableId s
405405
* @return Time Partitioning configuration
406406
*/
407407
protected TimePartitioning getTimePartitioning(BigQuerySinkConfig config) {
408-
// Create time partitioning based on DAY
409-
TimePartitioning.Builder timePartitioningBuilder = TimePartitioning.newBuilder(TimePartitioning.Type.DAY);
408+
409+
// Default partitioning type is DAY if not specified
410+
TimePartitioning.Builder timePartitioningBuilder = TimePartitioning.newBuilder(config.getTimePartitioningType());
410411

411412
// Set partition field if specified
412413
if (config.getPartitionByField() != null) {

src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryConstants.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ public interface BigQueryConstants {
4040
String CONFIG_VIEW_MATERIALIZATION_PROJECT = "cdap.bq.source.view.materialization.project";
4141
String CONFIG_VIEW_MATERIALIZATION_DATASET = "cdap.bq.source.view.materialization.dataset";
4242
String CONFIG_PARTITION_TYPE = "cdap.bq.sink.partition.type";
43+
String CONFIG_TIME_PARTITIONING_TYPE = "cdap.bq.sink.time.partitioning.type";
4344
String CONFIG_PARTITION_INTEGER_RANGE_START = "cdap.bq.sink.partition.integer.range.start";
4445
String CONFIG_PARTITION_INTEGER_RANGE_END = "cdap.bq.sink.partition.integer.range.end";
4546
String CONFIG_PARTITION_INTEGER_RANGE_INTERVAL = "cdap.bq.sink.partition.integer.range.interval";
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
* Copyright © 2023 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.plugin.gcp.bigquery.sink;
18+
19+
import com.google.cloud.bigquery.TimePartitioning;
20+
import io.cdap.cdap.api.data.schema.Schema;
21+
import io.cdap.cdap.etl.api.FailureCollector;
22+
import io.cdap.cdap.etl.mock.validation.MockFailureCollector;
23+
import org.junit.Assert;
24+
import org.junit.Before;
25+
import org.junit.Test;
26+
27+
import java.lang.reflect.InvocationTargetException;
28+
import java.lang.reflect.Method;
29+
30+
/**
31+
* Tests for {@link BigQuerySinkConfig}.
32+
*/
33+
34+
public class BigQuerySinkConfigTest {
35+
MockFailureCollector collector;
36+
BigQuerySinkConfig config;
37+
Method validateTimePartitioningColumnMethod;
38+
39+
@Before
40+
public void setup() throws NoSuchMethodException {
41+
collector = new MockFailureCollector();
42+
config = BigQuerySinkConfig.builder().build();
43+
validateTimePartitioningColumnMethod = config.getClass()
44+
.getDeclaredMethod("validateTimePartitioningColumn", String.class, FailureCollector.class,
45+
Schema.class, TimePartitioning.Type.class);
46+
validateTimePartitioningColumnMethod.setAccessible(true);
47+
}
48+
@Test
49+
public void testValidateTimePartitioningColumnWithHourAndDate() throws
50+
InvocationTargetException, IllegalAccessException {
51+
String columnName = "partitionFrom";
52+
Schema fieldSchema = Schema.recordOf("test", Schema.Field.of("partitionFrom",
53+
Schema.of(Schema.LogicalType.DATE)));
54+
TimePartitioning.Type timePartitioningType = TimePartitioning.Type.HOUR;
55+
56+
validateTimePartitioningColumnMethod.invoke(config, columnName, collector, fieldSchema, timePartitioningType);
57+
Assert.assertEquals(String.format("Partition column '%s' is of invalid type '%s'.",
58+
columnName, fieldSchema.getDisplayName()),
59+
collector.getValidationFailures().stream().findFirst().get().getMessage());
60+
}
61+
62+
@Test
63+
public void testValidateTimePartitioningColumnWithHourAndTimestamp() throws
64+
InvocationTargetException, IllegalAccessException {
65+
66+
String columnName = "partitionFrom";
67+
Schema schema = Schema.of(Schema.LogicalType.TIMESTAMP_MICROS);
68+
69+
Schema fieldSchema = schema.isNullable() ? schema.getNonNullable() : schema;
70+
TimePartitioning.Type timePartitioningType = TimePartitioning.Type.HOUR;
71+
72+
validateTimePartitioningColumnMethod.invoke(config, columnName, collector, fieldSchema, timePartitioningType);
73+
Assert.assertEquals(0, collector.getValidationFailures().size());
74+
}
75+
76+
@Test
77+
public void testValidateTimePartitioningColumnWithDayAndString() throws
78+
InvocationTargetException, IllegalAccessException {
79+
80+
String columnName = "partitionFrom";
81+
Schema schema = Schema.of(Schema.Type.STRING);
82+
83+
Schema fieldSchema = schema.isNullable() ? schema.getNonNullable() : schema;
84+
TimePartitioning.Type timePartitioningType = TimePartitioning.Type.DAY;
85+
86+
validateTimePartitioningColumnMethod.invoke(config, columnName, collector, fieldSchema, timePartitioningType);
87+
Assert.assertEquals(String.format("Partition column '%s' is of invalid type '%s'.",
88+
columnName, fieldSchema.getDisplayName()),
89+
collector.getValidationFailures().stream().findFirst().get().getMessage());
90+
}
91+
92+
@Test
93+
public void testValidateTimePartitioningColumnWithDayAndDate() throws
94+
InvocationTargetException, IllegalAccessException {
95+
96+
String columnName = "partitionFrom";
97+
Schema schema = Schema.of(Schema.LogicalType.DATE);
98+
99+
Schema fieldSchema = schema.isNullable() ? schema.getNonNullable() : schema;
100+
TimePartitioning.Type timePartitioningType = TimePartitioning.Type.DAY;
101+
102+
validateTimePartitioningColumnMethod.invoke(config, columnName, collector, fieldSchema, timePartitioningType);
103+
Assert.assertEquals(0, collector.getValidationFailures().size());
104+
}
105+
106+
@Test
107+
public void testValidateTimePartitioningColumnWithNullAndDate() throws
108+
InvocationTargetException, IllegalAccessException {
109+
110+
String columnName = "partitionFrom";
111+
Schema schema = Schema.of(Schema.LogicalType.DATE);
112+
113+
Schema fieldSchema = schema.isNullable() ? schema.getNonNullable() : schema;
114+
TimePartitioning.Type timePartitioningType = null;
115+
116+
validateTimePartitioningColumnMethod.invoke(config, columnName, collector, fieldSchema, timePartitioningType);
117+
// No error as null time timePartitioningType will default to DAY
118+
Assert.assertEquals(0, collector.getValidationFailures().size());
119+
}
120+
121+
}

widgets/BigQueryTable-batchsink.json

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,33 @@
280280
]
281281
}
282282
},
283+
{
284+
"widget-type": "radio-group",
285+
"label": "Time Partitioning Type",
286+
"name": "timePartitioningType",
287+
"widget-attributes": {
288+
"layout": "inline",
289+
"default": "DAY",
290+
"options": [
291+
{
292+
"id": "DAY",
293+
"label": "Daily"
294+
},
295+
{
296+
"id": "HOUR",
297+
"label": "Hourly"
298+
},
299+
{
300+
"id": "MONTH",
301+
"label": "Monthly"
302+
},
303+
{
304+
"id": "YEAR",
305+
"label": "Yearly"
306+
}
307+
]
308+
}
309+
},
283310
{
284311
"widget-type": "Number",
285312
"label": "Range Start (inclusive)",
@@ -375,6 +402,18 @@
375402
}
376403
]
377404
},
405+
{
406+
"name": "PartitioningTimeFieldsFilter",
407+
"condition": {
408+
"expression": "partitioningType == 'TIME'"
409+
},
410+
"show": [
411+
{
412+
"type": "property",
413+
"name": "timePartitioningType"
414+
}
415+
]
416+
},
378417
{
379418
"name": "PartitionFieldFilter",
380419
"condition": {

0 commit comments

Comments
 (0)