Skip to content

Commit 266ac5b

Browse files
committed
add defaults for time and range partioning for bq source
1 parent 49b2b31 commit 266ac5b

File tree

8 files changed

+444
-86
lines changed

8 files changed

+444
-86
lines changed

docs/BigQueryTable-batchsource.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,14 @@ name is not null', all output rows will have an 'age' over 50 and a value for th
5757
This is the same as the WHERE clause in BigQuery. More information can be found at
5858
https://cloud.google.com/bigquery/docs/reference/standard-sql/query-syntax#where_clause
5959

60+
**Order By**: The column or list of columns to order the data by. For
61+
example, `name asc, age desc`. More information can be found
62+
at https://cloud.google.com/bigquery/docs/reference/standard-sql/query-syntax#order_by_clause.
63+
64+
**Limit**: The maximum number of rows to read from the source table. More information can be
65+
found
66+
at https://cloud.google.com/bigquery/docs/reference/standard-sql/query-syntax#limit_and_offset_clause.
67+
6068
**Enable Querying Views**: Whether to allow querying views. Since BigQuery views are not materialized
6169
by default, querying them may have a performance overhead.
6270

src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySource.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,12 @@ private void configureBigQuerySource() {
241241
if (config.getViewMaterializationDataset() != null) {
242242
configuration.set(BigQueryConstants.CONFIG_VIEW_MATERIALIZATION_DATASET, config.getViewMaterializationDataset());
243243
}
244+
if (config.getOrderBy() != null) {
245+
configuration.set(BigQueryConstants.CONFIG_ORDER_BY, config.getOrderBy());
246+
}
247+
if (config.getLimit() != null) {
248+
configuration.set(BigQueryConstants.CONFIG_LIMIT, String.valueOf(config.getLimit()));
249+
}
244250
}
245251

246252
public Schema getSchema(FailureCollector collector) {

src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySourceConfig.java

Lines changed: 60 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ public final class BigQuerySourceConfig extends BigQueryBaseConfig {
5858
private static final String VALID_DATE_FORMAT = "yyyy-MM-dd";
5959
private static final String SCHEME = "gs://";
6060
private static final String WHERE = "WHERE";
61+
private static final String ORDER_BY = "ORDER BY";
6162
public static final Set<Schema.Type> SUPPORTED_TYPES =
6263
ImmutableSet.of(Schema.Type.LONG, Schema.Type.STRING, Schema.Type.DOUBLE, Schema.Type.BOOLEAN, Schema.Type.BYTES,
6364
Schema.Type.ARRAY, Schema.Type.RECORD);
@@ -70,6 +71,8 @@ public final class BigQuerySourceConfig extends BigQueryBaseConfig {
7071
public static final String NAME_ENABLE_QUERYING_VIEWS = "enableQueryingViews";
7172
public static final String NAME_VIEW_MATERIALIZATION_PROJECT = "viewMaterializationProject";
7273
public static final String NAME_VIEW_MATERIALIZATION_DATASET = "viewMaterializationDataset";
74+
public static final String NAME_LIMIT = "limit";
75+
public static final String NAME_ORDER_BY = "orderBy";
7376

7477
@Name(Constants.Reference.REFERENCE_NAME)
7578
@Nullable
@@ -110,6 +113,19 @@ public final class BigQuerySourceConfig extends BigQueryBaseConfig {
110113
"and discards all rows that do not return TRUE (that is, rows that return FALSE or NULL).")
111114
private String filter;
112115

116+
@Name(NAME_ORDER_BY)
117+
@Macro
118+
@Nullable
119+
@Description("The ORDER BY clause sorts the results of a query based on one or more columns. " +
120+
"For example, 'name asc, age desc'.")
121+
private String orderBy;
122+
123+
@Name(NAME_LIMIT)
124+
@Macro
125+
@Nullable
126+
@Description("The LIMIT clause restricts the number of rows returned by the query.")
127+
private Long limit;
128+
113129
@Name(NAME_ENABLE_QUERYING_VIEWS)
114130
@Macro
115131
@Nullable
@@ -176,6 +192,13 @@ public void validate(FailureCollector collector, Map<String, String> arguments)
176192
if (!containsMacro(NAME_CMEK_KEY)) {
177193
validateCmekKey(collector, arguments);
178194
}
195+
196+
if (!containsMacro(NAME_LIMIT) && limit != null) {
197+
if (limit < 0) {
198+
collector.addFailure("Invalid limit value.", "Limit must be a non-negative number.")
199+
.withConfigProperty(NAME_LIMIT);
200+
}
201+
}
179202
}
180203

181204
void validateCmekKey(FailureCollector collector, Map<String, String> arguments) {
@@ -271,17 +294,44 @@ public String getPartitionTo() {
271294

272295
@Nullable
273296
public String getFilter() {
274-
if (filter != null) {
275-
filter = filter.trim();
276-
if (filter.isEmpty()) {
277-
return null;
278-
}
279-
// remove the WHERE keyword from the filter if the user adds it at the begging of the expression
280-
if (filter.toUpperCase().startsWith(WHERE)) {
281-
filter = filter.substring(WHERE.length());
282-
}
297+
return cleanupSqlFragment(filter, WHERE);
298+
}
299+
300+
@Nullable
301+
public String getOrderBy() {
302+
return cleanupSqlFragment(orderBy, ORDER_BY);
303+
}
304+
305+
@Nullable
306+
public Long getLimit() {
307+
return limit;
308+
}
309+
310+
/**
311+
* Cleans up a SQL fragment by trimming whitespace and stripping a given keyword from the
312+
* beginning of the string in a case-insensitive way.
313+
*
314+
* @param fragment The input SQL string fragment.
315+
* @param keyword The SQL keyword to remove (e.g., "WHERE ", "ORDER BY ").
316+
* @return The cleaned fragment, or null if the input was null or empty.
317+
*/
318+
@Nullable
319+
private String cleanupSqlFragment(@Nullable String fragment, String keyword) {
320+
if (Strings.isNullOrEmpty(fragment)) {
321+
return null;
322+
}
323+
324+
fragment = fragment.trim();
325+
326+
if (fragment.isEmpty()) {
327+
return null;
283328
}
284-
return filter;
329+
330+
if (fragment.toUpperCase().startsWith(keyword)) {
331+
fragment = fragment.substring(keyword.length()).trim();
332+
}
333+
334+
return fragment.isEmpty() ? null : fragment;
285335
}
286336

287337
public boolean isEnableQueryingViews() {

src/main/java/io/cdap/plugin/gcp/bigquery/source/PartitionedBigQueryInputFormat.java

Lines changed: 68 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.google.api.services.bigquery.model.JobReference;
2323
import com.google.api.services.bigquery.model.Table;
2424
import com.google.api.services.bigquery.model.TableReference;
25+
import com.google.cloud.bigquery.RangePartitioning;
2526
import com.google.cloud.bigquery.StandardTableDefinition;
2627
import com.google.cloud.bigquery.TableDefinition.Type;
2728
import com.google.cloud.bigquery.TimePartitioning;
@@ -49,6 +50,8 @@
4950
import org.apache.hadoop.mapreduce.RecordReader;
5051
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
5152
import org.apache.hadoop.util.Progressable;
53+
import org.slf4j.Logger;
54+
import org.slf4j.LoggerFactory;
5255

5356
import java.io.IOException;
5457
import java.security.GeneralSecurityException;
@@ -65,6 +68,7 @@
6568
*/
6669
public class PartitionedBigQueryInputFormat extends AbstractBigQueryInputFormat<LongWritable, GenericData.Record> {
6770

71+
private static final Logger LOG = LoggerFactory.getLogger(PartitionedBigQueryInputFormat.class);
6872
private InputFormat<LongWritable, GenericData.Record> delegateInputFormat =
6973
new AvroBigQueryInputFormatWithScopes();
7074

@@ -128,17 +132,24 @@ private void processQuery(JobContext context) throws IOException, InterruptedExc
128132
String partitionFromDate = configuration.get(BigQueryConstants.CONFIG_PARTITION_FROM_DATE, null);
129133
String partitionToDate = configuration.get(BigQueryConstants.CONFIG_PARTITION_TO_DATE, null);
130134
String filter = configuration.get(BigQueryConstants.CONFIG_FILTER, null);
135+
String limit = configuration.get(BigQueryConstants.CONFIG_LIMIT, null);
136+
String orderBy = configuration.get(BigQueryConstants.CONFIG_ORDER_BY, null);
131137

132138
com.google.cloud.bigquery.Table bigQueryTable = BigQueryUtil.getBigQueryTable(
133-
datasetProjectId, datasetId, tableName, serviceAccount, isServiceAccountFilePath);
139+
datasetProjectId, datasetId, tableName, serviceAccount, isServiceAccountFilePath, null);
134140
Type type = Objects.requireNonNull(bigQueryTable).getDefinition().getType();
141+
Boolean isPartitionFilterRequired = bigQueryTable.getRequirePartitionFilter();
142+
StandardTableDefinition tableDefinition = Objects.requireNonNull(bigQueryTable).getDefinition();
135143

136144
String query;
137145
if (type == Type.VIEW || type == Type.MATERIALIZED_VIEW || type == Type.EXTERNAL) {
138-
query = generateQueryForMaterializingView(datasetProjectId, datasetId, tableName, filter);
146+
query = generateQueryForMaterializingView(datasetProjectId, datasetId, tableName, filter,
147+
limit, orderBy);
139148
} else {
140-
query = generateQuery(partitionFromDate, partitionToDate, filter, projectId, datasetProjectId, datasetId,
141-
tableName, serviceAccount, isServiceAccountFilePath);
149+
query = generateQuery(partitionFromDate, partitionToDate, filter, datasetProjectId,
150+
datasetId,
151+
tableName, limit, orderBy,
152+
isPartitionFilterRequired, tableDefinition);
142153
}
143154

144155
if (query != null) {
@@ -160,30 +171,35 @@ private void processQuery(JobContext context) throws IOException, InterruptedExc
160171
}
161172

162173
@VisibleForTesting
163-
String generateQuery(String partitionFromDate, String partitionToDate, String filter, String project,
164-
String datasetProject, String dataset, String table, @Nullable String serviceAccount,
165-
@Nullable Boolean isServiceAccountFilePath) {
166-
if (partitionFromDate == null && partitionToDate == null && filter == null) {
167-
return null;
168-
}
169-
String queryTemplate = "select * from `%s` where %s";
170-
com.google.cloud.bigquery.Table sourceTable = BigQueryUtil.getBigQueryTable(datasetProject, dataset, table,
171-
serviceAccount,
172-
isServiceAccountFilePath);
173-
StandardTableDefinition tableDefinition = Objects.requireNonNull(sourceTable).getDefinition();
174+
String generateQuery(String partitionFromDate, String partitionToDate, String filter,
175+
String datasetProject, String dataset, String table, String limit, String orderBy,
176+
Boolean isPartitionFilterRequired, StandardTableDefinition tableDefinition) {
177+
178+
RangePartitioning rangePartitioning = tableDefinition.getRangePartitioning();
174179
TimePartitioning timePartitioning = tableDefinition.getTimePartitioning();
175-
if (timePartitioning == null && filter == null) {
176-
return null;
177-
}
178180
StringBuilder condition = new StringBuilder();
181+
String partitionCondition = null;
179182

180183
if (timePartitioning != null) {
181-
String timePartitionCondition = BigQueryUtil.generateTimePartitionCondition(tableDefinition, partitionFromDate,
182-
partitionToDate);
183-
condition.append(timePartitionCondition);
184+
if (partitionFromDate == null && partitionToDate == null
185+
&& Objects.equals(isPartitionFilterRequired, Boolean.TRUE)) {
186+
partitionCondition = BigQueryUtil.generateDefaultTimePartitionCondition(tableDefinition);
187+
} else if (partitionFromDate != null || partitionToDate != null) {
188+
partitionCondition =
189+
BigQueryUtil.generateTimePartitionCondition(tableDefinition, partitionFromDate,
190+
partitionToDate);
191+
}
192+
} else if (rangePartitioning != null && Objects.equals(isPartitionFilterRequired,
193+
Boolean.TRUE)) {
194+
partitionCondition = BigQueryUtil.generateDefaultRangePartitionCondition(
195+
tableDefinition);
196+
}
197+
198+
if (!Strings.isNullOrEmpty(partitionCondition)) {
199+
condition.append("(").append(partitionCondition).append(")");
184200
}
185201

186-
if (filter != null) {
202+
if (!Strings.isNullOrEmpty(filter)) {
187203
if (condition.length() == 0) {
188204
condition.append(filter);
189205
} else {
@@ -192,20 +208,42 @@ String generateQuery(String partitionFromDate, String partitionToDate, String fi
192208
}
193209

194210
String tableName = datasetProject + "." + dataset + "." + table;
195-
return String.format(queryTemplate, tableName, condition.toString());
211+
StringBuilder query = new StringBuilder("select * from ").append(tableName);
212+
213+
if (condition.length() > 0) {
214+
query.append(" where ").append(condition);
215+
}
216+
217+
if (!Strings.isNullOrEmpty(orderBy)) {
218+
query.append(" order by ").append(orderBy);
219+
}
220+
221+
if (!Strings.isNullOrEmpty(limit)) {
222+
query.append(" limit ").append(limit);
223+
}
224+
225+
LOG.debug("Generated BigQuery query for job: {}", query);
226+
return query.toString();
196227
}
197228

198229
@VisibleForTesting
199-
String generateQueryForMaterializingView(String datasetProject, String dataset, String table, String filter) {
200-
String queryTemplate = "select * from `%s`%s";
201-
StringBuilder condition = new StringBuilder();
202-
230+
String generateQueryForMaterializingView(String datasetProject, String dataset, String table,
231+
String filter, String limit, String orderBy) {
232+
String tableName = String.format("`%s.%s.%s`", datasetProject, dataset, table);
233+
StringBuilder query = new StringBuilder("select * from ").append(tableName);
203234
if (!Strings.isNullOrEmpty(filter)) {
204-
condition.append(String.format(" where %s", filter));
235+
query.append(" where ").append(filter);
205236
}
206237

207-
String tableName = datasetProject + "." + dataset + "." + table;
208-
return String.format(queryTemplate, tableName, condition.toString());
238+
if (!Strings.isNullOrEmpty(orderBy)) {
239+
query.append(" order by ").append(orderBy);
240+
}
241+
242+
if (!Strings.isNullOrEmpty(limit)) {
243+
query.append(" limit ").append(limit);
244+
}
245+
246+
return query.toString();
209247
}
210248

211249
/**

src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryConstants.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ public interface BigQueryConstants {
3636
String CONFIG_TABLE_FIELDS = "cdap.bq.sink.table.fields";
3737
String CONFIG_JSON_STRING_FIELDS = "cdap.bq.sink.json.string.fields";
3838
String CONFIG_FILTER = "cdap.bq.source.filter";
39+
String CONFIG_LIMIT = "cdap.bq.source.limit";
40+
String CONFIG_ORDER_BY = "cdap.bq.source.order.by";
3941
String CONFIG_PARTITION_FILTER = "cdap.bq.sink.partition.filter";
4042
String CONFIG_JOB_ID = "cdap.bq.sink.job.id";
4143
String CONFIG_VIEW_MATERIALIZATION_PROJECT = "cdap.bq.source.view.materialization.project";

src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.google.cloud.bigquery.Field;
2525
import com.google.cloud.bigquery.FieldList;
2626
import com.google.cloud.bigquery.LegacySQLTypeName;
27+
import com.google.cloud.bigquery.RangePartitioning;
2728
import com.google.cloud.bigquery.StandardSQLTypeName;
2829
import com.google.cloud.bigquery.StandardTableDefinition;
2930
import com.google.cloud.bigquery.Table;
@@ -83,7 +84,7 @@ public final class BigQueryUtil {
8384

8485
private static final Logger LOG = LoggerFactory.getLogger(BigQueryUtil.class);
8586

86-
private static final String DEFAULT_PARTITION_COLUMN_NAME = "_PARTITIONTIME";
87+
public static final String DEFAULT_PARTITION_COLUMN_NAME = "_PARTITIONTIME";
8788
private static final String BIGQUERY_BUCKET_PREFIX_PROPERTY_NAME = "gcp.bigquery.bucket.prefix";
8889

8990
public static final String BUCKET_PATTERN = "[a-z0-9._-]+";
@@ -824,6 +825,43 @@ public static String generateTimePartitionCondition(StandardTableDefinition tabl
824825
return timePartitionCondition.toString();
825826
}
826827

828+
/**
829+
* Generates a default "IS NOT NULL OR IS NULL" partition condition for a time-partitioned table.
830+
*
831+
* @param tableDefinition The definition of the table.
832+
* @return The SQL condition string or an empty string if no condition is needed.
833+
*/
834+
public static String generateDefaultTimePartitionCondition(
835+
StandardTableDefinition tableDefinition) {
836+
TimePartitioning timePartitioning = tableDefinition.getTimePartitioning();
837+
if (timePartitioning == null) {
838+
return StringUtils.EMPTY;
839+
}
840+
841+
String columnName = timePartitioning.getField() != null ?
842+
timePartitioning.getField() : DEFAULT_PARTITION_COLUMN_NAME;
843+
844+
return String.format("`%s` IS NOT NULL OR `%s` IS NULL", columnName, columnName);
845+
}
846+
847+
/**
848+
* Generates a default "IS NOT NULL OR IS NULL" partition condition for a range-partitioned
849+
* table.
850+
*
851+
* @param tableDefinition The definition of the table.
852+
* @return The SQL condition string or an empty string if no condition is needed.
853+
*/
854+
public static String generateDefaultRangePartitionCondition(
855+
StandardTableDefinition tableDefinition) {
856+
RangePartitioning rangePartitioning = tableDefinition.getRangePartitioning();
857+
if (rangePartitioning == null || Strings.isNullOrEmpty(rangePartitioning.getField())) {
858+
return StringUtils.EMPTY;
859+
}
860+
861+
String columnName = rangePartitioning.getField();
862+
return String.format("`%s` IS NOT NULL OR `%s` IS NULL", columnName, columnName);
863+
}
864+
827865
/**
828866
* Get fully-qualified name (FQN) for a BQ table (FQN format:
829867
* bigquery:{projectId}.{datasetId}.{tableId}).

0 commit comments

Comments
 (0)