Skip to content

Commit 84b5c17

Browse files
authored
Merge pull request #1585 from data-integrations/cherry-pick-bq-plugin-issie-611
[🍒 ] Add defaults for Time and Range Partitioning in BQ source plugin + Bump up Snapshot
2 parents 1146f0b + 7020dfb commit 84b5c17

File tree

9 files changed

+502
-90
lines changed

9 files changed

+502
-90
lines changed

docs/BigQueryTable-batchsource.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,14 @@ name is not null', all output rows will have an 'age' over 50 and a value for th
5757
This is the same as the WHERE clause in BigQuery. More information can be found at
5858
https://cloud.google.com/bigquery/docs/reference/standard-sql/query-syntax#where_clause
5959

60+
**Order By**: The column or list of columns to order the data by. For
61+
example, `name asc, age desc`. More information can be found
62+
at https://cloud.google.com/bigquery/docs/reference/standard-sql/query-syntax#order_by_clause.
63+
64+
**Limit**: The maximum number of rows to read from the source table. More information can be
65+
found
66+
at https://cloud.google.com/bigquery/docs/reference/standard-sql/query-syntax#limit_and_offset_clause.
67+
6068
**Enable Querying Views**: Whether to allow querying views. Since BigQuery views are not materialized
6169
by default, querying them may have a performance overhead.
6270

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
<groupId>io.cdap.plugin</groupId>
2222
<artifactId>google-cloud</artifactId>
23-
<version>0.24.1</version>
23+
<version>0.24.2-SNAPSHOT</version>
2424
<name>Google Cloud Plugins</name>
2525
<packaging>jar</packaging>
2626
<description>Plugins for Google Big Query</description>

src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySource.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,12 @@ private void configureBigQuerySource() {
267267
if (config.getViewMaterializationDataset() != null) {
268268
configuration.set(BigQueryConstants.CONFIG_VIEW_MATERIALIZATION_DATASET, config.getViewMaterializationDataset());
269269
}
270+
if (config.getOrderBy() != null) {
271+
configuration.set(BigQueryConstants.CONFIG_ORDER_BY, config.getOrderBy());
272+
}
273+
if (config.getLimit() != null) {
274+
configuration.set(BigQueryConstants.CONFIG_LIMIT, String.valueOf(config.getLimit()));
275+
}
270276
configuration.set(BigQueryConstants.CONFIG_BQ_HTTP_READ_TIMEOUT, String.valueOf(config.getReadTimeout()));
271277
}
272278

src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySourceConfig.java

Lines changed: 60 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ public final class BigQuerySourceConfig extends BigQueryBaseConfig {
5858
private static final String VALID_DATE_FORMAT = "yyyy-MM-dd";
5959
private static final String SCHEME = "gs://";
6060
private static final String WHERE = "WHERE";
61+
private static final String ORDER_BY = "ORDER BY";
6162
private static final String NAME_READ_TIMEOUT = "readTimeout";
6263
public static final Set<Schema.Type> SUPPORTED_TYPES =
6364
ImmutableSet.of(Schema.Type.LONG, Schema.Type.STRING, Schema.Type.DOUBLE, Schema.Type.BOOLEAN, Schema.Type.BYTES,
@@ -71,6 +72,8 @@ public final class BigQuerySourceConfig extends BigQueryBaseConfig {
7172
public static final String NAME_ENABLE_QUERYING_VIEWS = "enableQueryingViews";
7273
public static final String NAME_VIEW_MATERIALIZATION_PROJECT = "viewMaterializationProject";
7374
public static final String NAME_VIEW_MATERIALIZATION_DATASET = "viewMaterializationDataset";
75+
public static final String NAME_LIMIT = "limit";
76+
public static final String NAME_ORDER_BY = "orderBy";
7477

7578
@Name(Constants.Reference.REFERENCE_NAME)
7679
@Nullable
@@ -111,6 +114,19 @@ public final class BigQuerySourceConfig extends BigQueryBaseConfig {
111114
"and discards all rows that do not return TRUE (that is, rows that return FALSE or NULL).")
112115
private String filter;
113116

117+
@Name(NAME_ORDER_BY)
118+
@Macro
119+
@Nullable
120+
@Description("The ORDER BY clause sorts the results of a query based on one or more columns. " +
121+
"For example, 'name asc, age desc'.")
122+
private String orderBy;
123+
124+
@Name(NAME_LIMIT)
125+
@Macro
126+
@Nullable
127+
@Description("The LIMIT clause restricts the number of rows returned by the query.")
128+
private Long limit;
129+
114130
@Name(NAME_ENABLE_QUERYING_VIEWS)
115131
@Macro
116132
@Nullable
@@ -187,6 +203,13 @@ public void validate(FailureCollector collector, Map<String, String> arguments)
187203
if (!containsMacro(NAME_CMEK_KEY)) {
188204
validateCmekKey(collector, arguments);
189205
}
206+
207+
if (!containsMacro(NAME_LIMIT) && limit != null) {
208+
if (limit < 0) {
209+
collector.addFailure("Invalid limit value.", "Limit must be a non-negative number.")
210+
.withConfigProperty(NAME_LIMIT);
211+
}
212+
}
190213
}
191214

192215
void validateCmekKey(FailureCollector collector, Map<String, String> arguments) {
@@ -282,17 +305,44 @@ public String getPartitionTo() {
282305

283306
@Nullable
284307
public String getFilter() {
285-
if (filter != null) {
286-
filter = filter.trim();
287-
if (filter.isEmpty()) {
288-
return null;
289-
}
290-
// remove the WHERE keyword from the filter if the user adds it at the begging of the expression
291-
if (filter.toUpperCase().startsWith(WHERE)) {
292-
filter = filter.substring(WHERE.length());
293-
}
308+
return cleanupSqlFragment(filter, WHERE);
309+
}
310+
311+
@Nullable
312+
public String getOrderBy() {
313+
return cleanupSqlFragment(orderBy, ORDER_BY);
314+
}
315+
316+
@Nullable
317+
public Long getLimit() {
318+
return limit;
319+
}
320+
321+
/**
322+
* Cleans up a SQL fragment by trimming whitespace and stripping a given keyword from the
323+
* beginning of the string in a case-insensitive way.
324+
*
325+
* @param fragment The input SQL string fragment.
326+
* @param keyword The SQL keyword to remove (e.g., "WHERE ", "ORDER BY ").
327+
* @return The cleaned fragment, or null if the input was null or empty.
328+
*/
329+
@Nullable
330+
private String cleanupSqlFragment(@Nullable String fragment, String keyword) {
331+
if (Strings.isNullOrEmpty(fragment)) {
332+
return null;
333+
}
334+
335+
fragment = fragment.trim();
336+
337+
if (fragment.isEmpty()) {
338+
return null;
294339
}
295-
return filter;
340+
341+
if (fragment.toUpperCase().startsWith(keyword)) {
342+
fragment = fragment.substring(keyword.length()).trim();
343+
}
344+
345+
return fragment.isEmpty() ? null : fragment;
296346
}
297347

298348
public boolean isEnableQueryingViews() {

src/main/java/io/cdap/plugin/gcp/bigquery/source/PartitionedBigQueryInputFormat.java

Lines changed: 73 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.google.api.services.bigquery.model.JobReference;
2323
import com.google.api.services.bigquery.model.Table;
2424
import com.google.api.services.bigquery.model.TableReference;
25+
import com.google.cloud.bigquery.RangePartitioning;
2526
import com.google.cloud.bigquery.StandardTableDefinition;
2627
import com.google.cloud.bigquery.TableDefinition.Type;
2728
import com.google.cloud.bigquery.TimePartitioning;
@@ -52,6 +53,8 @@
5253
import org.apache.hadoop.mapreduce.RecordReader;
5354
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
5455
import org.apache.hadoop.util.Progressable;
56+
import org.slf4j.Logger;
57+
import org.slf4j.LoggerFactory;
5558

5659
import java.io.IOException;
5760
import java.security.GeneralSecurityException;
@@ -68,6 +71,7 @@
6871
*/
6972
public class PartitionedBigQueryInputFormat extends AbstractBigQueryInputFormat<LongWritable, GenericData.Record> {
7073

74+
private static final Logger LOG = LoggerFactory.getLogger(PartitionedBigQueryInputFormat.class);
7175
private InputFormat<LongWritable, GenericData.Record> delegateInputFormat =
7276
new AvroBigQueryInputFormatWithScopes();
7377

@@ -132,19 +136,27 @@ private void processQuery(JobContext context) throws IOException, InterruptedExc
132136
String partitionFromDate = configuration.get(BigQueryConstants.CONFIG_PARTITION_FROM_DATE, null);
133137
String partitionToDate = configuration.get(BigQueryConstants.CONFIG_PARTITION_TO_DATE, null);
134138
String filter = configuration.get(BigQueryConstants.CONFIG_FILTER, null);
139+
String limit = configuration.get(BigQueryConstants.CONFIG_LIMIT, null);
140+
String orderBy = configuration.get(BigQueryConstants.CONFIG_ORDER_BY, null);
135141
Integer readTimeout = configuration.getInt(BigQueryConstants.CONFIG_BQ_HTTP_READ_TIMEOUT,
136142
GCPUtils.BQ_DEFAULT_READ_TIMEOUT_SECONDS);
137143

138144
com.google.cloud.bigquery.Table bigQueryTable = BigQueryUtil.getBigQueryTable(
139-
datasetProjectId, datasetId, tableName, serviceAccount, isServiceAccountFilePath, null, readTimeout);
145+
datasetProjectId, datasetId, tableName, serviceAccount, isServiceAccountFilePath, null,
146+
readTimeout);
140147
Type type = Objects.requireNonNull(bigQueryTable).getDefinition().getType();
148+
Boolean isPartitionFilterRequired = bigQueryTable.getRequirePartitionFilter();
149+
StandardTableDefinition tableDefinition = Objects.requireNonNull(bigQueryTable).getDefinition();
141150

142151
String query;
143152
if (type == Type.VIEW || type == Type.MATERIALIZED_VIEW || type == Type.EXTERNAL) {
144-
query = generateQueryForMaterializingView(datasetProjectId, datasetId, tableName, filter);
153+
query = generateQueryForMaterializingView(datasetProjectId, datasetId, tableName, filter,
154+
limit, orderBy);
145155
} else {
146-
query = generateQuery(partitionFromDate, partitionToDate, filter, projectId, datasetProjectId, datasetId,
147-
tableName, serviceAccount, isServiceAccountFilePath);
156+
query = generateQuery(partitionFromDate, partitionToDate, filter, datasetProjectId,
157+
datasetId,
158+
tableName, limit, orderBy,
159+
isPartitionFilterRequired, tableDefinition);
148160
}
149161

150162
if (query != null) {
@@ -166,30 +178,41 @@ private void processQuery(JobContext context) throws IOException, InterruptedExc
166178
}
167179

168180
@VisibleForTesting
169-
String generateQuery(String partitionFromDate, String partitionToDate, String filter, String project,
170-
String datasetProject, String dataset, String table, @Nullable String serviceAccount,
171-
@Nullable Boolean isServiceAccountFilePath) {
172-
if (partitionFromDate == null && partitionToDate == null && filter == null) {
181+
String generateQuery(String partitionFromDate, String partitionToDate, String filter,
182+
String datasetProject, String dataset, String table, String limit, String orderBy,
183+
Boolean isPartitionFilterRequired, StandardTableDefinition tableDefinition) {
184+
185+
if (Strings.isNullOrEmpty(filter) && Strings.isNullOrEmpty(orderBy) && Strings.isNullOrEmpty(
186+
limit)
187+
&& Strings.isNullOrEmpty(partitionFromDate) && Strings.isNullOrEmpty(partitionToDate)) {
173188
return null;
174189
}
175-
String queryTemplate = "select * from `%s` where %s";
176-
com.google.cloud.bigquery.Table sourceTable =
177-
BigQueryUtil.getBigQueryTable(datasetProject, dataset, table, serviceAccount, isServiceAccountFilePath, null,
178-
null);
179-
StandardTableDefinition tableDefinition = Objects.requireNonNull(sourceTable).getDefinition();
190+
191+
RangePartitioning rangePartitioning = tableDefinition.getRangePartitioning();
180192
TimePartitioning timePartitioning = tableDefinition.getTimePartitioning();
181-
if (timePartitioning == null && filter == null) {
182-
return null;
183-
}
184193
StringBuilder condition = new StringBuilder();
194+
String partitionCondition = null;
185195

186196
if (timePartitioning != null) {
187-
String timePartitionCondition = BigQueryUtil.generateTimePartitionCondition(tableDefinition, partitionFromDate,
188-
partitionToDate);
189-
condition.append(timePartitionCondition);
197+
if (partitionFromDate == null && partitionToDate == null
198+
&& Objects.equals(isPartitionFilterRequired, Boolean.TRUE)) {
199+
partitionCondition = BigQueryUtil.generateDefaultTimePartitionCondition(tableDefinition);
200+
} else if (partitionFromDate != null || partitionToDate != null) {
201+
partitionCondition =
202+
BigQueryUtil.generateTimePartitionCondition(tableDefinition, partitionFromDate,
203+
partitionToDate);
204+
}
205+
} else if (rangePartitioning != null && Objects.equals(isPartitionFilterRequired,
206+
Boolean.TRUE)) {
207+
partitionCondition = BigQueryUtil.generateDefaultRangePartitionCondition(
208+
tableDefinition);
190209
}
191210

192-
if (filter != null) {
211+
if (!Strings.isNullOrEmpty(partitionCondition)) {
212+
condition.append("(").append(partitionCondition).append(")");
213+
}
214+
215+
if (!Strings.isNullOrEmpty(filter)) {
193216
if (condition.length() == 0) {
194217
condition.append(filter);
195218
} else {
@@ -198,20 +221,42 @@ String generateQuery(String partitionFromDate, String partitionToDate, String fi
198221
}
199222

200223
String tableName = datasetProject + "." + dataset + "." + table;
201-
return String.format(queryTemplate, tableName, condition.toString());
224+
StringBuilder query = new StringBuilder("select * from ").append(tableName);
225+
226+
if (condition.length() > 0) {
227+
query.append(" where ").append(condition);
228+
}
229+
230+
if (!Strings.isNullOrEmpty(orderBy)) {
231+
query.append(" order by ").append(orderBy);
232+
}
233+
234+
if (!Strings.isNullOrEmpty(limit)) {
235+
query.append(" limit ").append(limit);
236+
}
237+
238+
LOG.debug("Generated BigQuery query for job: {}", query);
239+
return query.toString();
202240
}
203241

204242
@VisibleForTesting
205-
String generateQueryForMaterializingView(String datasetProject, String dataset, String table, String filter) {
206-
String queryTemplate = "select * from `%s`%s";
207-
StringBuilder condition = new StringBuilder();
208-
243+
String generateQueryForMaterializingView(String datasetProject, String dataset, String table,
244+
String filter, String limit, String orderBy) {
245+
String tableName = String.format("`%s.%s.%s`", datasetProject, dataset, table);
246+
StringBuilder query = new StringBuilder("select * from ").append(tableName);
209247
if (!Strings.isNullOrEmpty(filter)) {
210-
condition.append(String.format(" where %s", filter));
248+
query.append(" where ").append(filter);
211249
}
212250

213-
String tableName = datasetProject + "." + dataset + "." + table;
214-
return String.format(queryTemplate, tableName, condition.toString());
251+
if (!Strings.isNullOrEmpty(orderBy)) {
252+
query.append(" order by ").append(orderBy);
253+
}
254+
255+
if (!Strings.isNullOrEmpty(limit)) {
256+
query.append(" limit ").append(limit);
257+
}
258+
259+
return query.toString();
215260
}
216261

217262
/**

src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryConstants.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ public interface BigQueryConstants {
3636
String CONFIG_TABLE_FIELDS = "cdap.bq.sink.table.fields";
3737
String CONFIG_JSON_STRING_FIELDS = "cdap.bq.sink.json.string.fields";
3838
String CONFIG_FILTER = "cdap.bq.source.filter";
39+
String CONFIG_LIMIT = "cdap.bq.source.limit";
40+
String CONFIG_ORDER_BY = "cdap.bq.source.order.by";
3941
String CONFIG_PARTITION_FILTER = "cdap.bq.sink.partition.filter";
4042
String CONFIG_JOB_ID = "cdap.bq.sink.job.id";
4143
String CONFIG_VIEW_MATERIALIZATION_PROJECT = "cdap.bq.source.view.materialization.project";

src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.google.cloud.bigquery.Field;
2525
import com.google.cloud.bigquery.FieldList;
2626
import com.google.cloud.bigquery.LegacySQLTypeName;
27+
import com.google.cloud.bigquery.RangePartitioning;
2728
import com.google.cloud.bigquery.StandardSQLTypeName;
2829
import com.google.cloud.bigquery.StandardTableDefinition;
2930
import com.google.cloud.bigquery.Table;
@@ -82,7 +83,7 @@ public final class BigQueryUtil {
8283

8384
private static final Logger LOG = LoggerFactory.getLogger(BigQueryUtil.class);
8485

85-
private static final String DEFAULT_PARTITION_COLUMN_NAME = "_PARTITIONTIME";
86+
public static final String DEFAULT_PARTITION_COLUMN_NAME = "_PARTITIONTIME";
8687
private static final String BIGQUERY_BUCKET_PREFIX_PROPERTY_NAME = "gcp.bigquery.bucket.prefix";
8788

8889
public static final String BUCKET_PATTERN = "[a-z0-9._-]+";
@@ -778,6 +779,43 @@ public static String generateTimePartitionCondition(StandardTableDefinition tabl
778779
return timePartitionCondition.toString();
779780
}
780781

782+
/**
783+
* Generates a default "IS NOT NULL OR IS NULL" partition condition for a time-partitioned table.
784+
*
785+
* @param tableDefinition The definition of the table.
786+
* @return The SQL condition string or an empty string if no condition is needed.
787+
*/
788+
public static String generateDefaultTimePartitionCondition(
789+
StandardTableDefinition tableDefinition) {
790+
TimePartitioning timePartitioning = tableDefinition.getTimePartitioning();
791+
if (timePartitioning == null) {
792+
return StringUtils.EMPTY;
793+
}
794+
795+
String columnName = timePartitioning.getField() != null ?
796+
timePartitioning.getField() : DEFAULT_PARTITION_COLUMN_NAME;
797+
798+
return String.format("`%s` IS NOT NULL OR `%s` IS NULL", columnName, columnName);
799+
}
800+
801+
/**
802+
* Generates a default "IS NOT NULL OR IS NULL" partition condition for a range-partitioned
803+
* table.
804+
*
805+
* @param tableDefinition The definition of the table.
806+
* @return The SQL condition string or an empty string if no condition is needed.
807+
*/
808+
public static String generateDefaultRangePartitionCondition(
809+
StandardTableDefinition tableDefinition) {
810+
RangePartitioning rangePartitioning = tableDefinition.getRangePartitioning();
811+
if (rangePartitioning == null || Strings.isNullOrEmpty(rangePartitioning.getField())) {
812+
return StringUtils.EMPTY;
813+
}
814+
815+
String columnName = rangePartitioning.getField();
816+
return String.format("`%s` IS NOT NULL OR `%s` IS NULL", columnName, columnName);
817+
}
818+
781819
/**
782820
* Get fully-qualified name (FQN) for a BQ table (FQN format:
783821
* bigquery:{projectId}.{datasetId}.{tableId}).

0 commit comments

Comments
 (0)