Skip to content

Commit 6a26fbd

Browse files
authored
Merge pull request #1137 from data-integrations/bq-direct-source_final
[PLUGIN-871] Adding support for BQ source pushdown in BQ Plugin
2 parents 6b0f7d4 + 3123d96 commit 6a26fbd

File tree

6 files changed

+613
-34
lines changed

6 files changed

+613
-34
lines changed

src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySource.java

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
import com.google.cloud.kms.v1.CryptoKeyName;
3030
import com.google.cloud.storage.Storage;
3131
import com.google.common.base.Strings;
32+
import com.google.common.collect.ImmutableMap;
33+
import com.google.gson.Gson;
3234
import io.cdap.cdap.api.annotation.Description;
3335
import io.cdap.cdap.api.annotation.Metadata;
3436
import io.cdap.cdap.api.annotation.MetadataProperty;
@@ -46,9 +48,13 @@
4648
import io.cdap.cdap.etl.api.batch.BatchSource;
4749
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
4850
import io.cdap.cdap.etl.api.connector.Connector;
51+
import io.cdap.cdap.etl.api.engine.sql.SQLEngineInput;
4952
import io.cdap.cdap.etl.api.validation.ValidationFailure;
5053
import io.cdap.plugin.common.LineageRecorder;
5154
import io.cdap.plugin.gcp.bigquery.connector.BigQueryConnector;
55+
import io.cdap.plugin.gcp.bigquery.sqlengine.BigQueryReadDataset;
56+
import io.cdap.plugin.gcp.bigquery.sqlengine.BigQuerySQLEngine;
57+
import io.cdap.plugin.gcp.bigquery.sqlengine.BigQueryWrite;
5258
import io.cdap.plugin.gcp.bigquery.util.BigQueryConstants;
5359
import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil;
5460
import io.cdap.plugin.gcp.common.CmekUtils;
@@ -61,6 +67,7 @@
6167

6268
import java.time.DateTimeException;
6369
import java.time.LocalDate;
70+
import java.util.List;
6471
import java.util.UUID;
6572
import java.util.stream.Collectors;
6673
import javax.annotation.Nullable;
@@ -76,6 +83,7 @@
7683
@Metadata(properties = {@MetadataProperty(key = Connector.PLUGIN_TYPE, value = BigQueryConnector.NAME)})
7784
public final class BigQuerySource extends BatchSource<LongWritable, GenericData.Record, StructuredRecord> {
7885
private static final Logger LOG = LoggerFactory.getLogger(BigQuerySource.class);
86+
private static final Gson GSON = new Gson();
7987
public static final String NAME = "BigQueryTable";
8088
private BigQuerySourceConfig config;
8189
private Schema outputSchema;
@@ -165,7 +173,7 @@ public void prepareRun(BatchSourceContext context) throws Exception {
165173
// We call emitLineage before since it creates the dataset with schema.
166174
Type sourceTableType = config.getSourceTableType();
167175
emitLineage(context, configuredSchema, sourceTableType, config.getTable());
168-
setInputFormat(context);
176+
setInputFormat(context, configuredSchema);
169177
}
170178

171179
@Override
@@ -335,8 +343,31 @@ private void validatePartitionProperties(FailureCollector collector) {
335343
}
336344
}
337345

338-
private void setInputFormat(BatchSourceContext context) {
346+
private void setInputFormat(BatchSourceContext context,
347+
Schema configuredSchema) {
348+
// Set input for Spark
339349
context.setInput(Input.of(config.referenceName, new BigQueryInputFormatProvider(configuration)));
350+
351+
// Add output for SQL Engine Direct read
352+
ImmutableMap.Builder<String, String> arguments = new ImmutableMap.Builder<>();
353+
354+
if (configuredSchema == null) {
355+
LOG.debug("BigQuery SQL Engine Input was not initialized. Schema was empty.");
356+
return;
357+
}
358+
359+
List<String> fieldNames = configuredSchema.getFields().stream().map(f -> f.getName()).collect(Collectors.toList());
360+
361+
arguments
362+
.put(BigQueryReadDataset.SQL_INPUT_CONFIG, GSON.toJson(config))
363+
.put(BigQueryReadDataset.SQL_INPUT_SCHEMA, GSON.toJson(configuredSchema))
364+
.put(BigQueryReadDataset.SQL_INPUT_FIELDS, GSON.toJson(fieldNames));
365+
366+
Input sqlEngineInput = new SQLEngineInput(config.referenceName,
367+
context.getStageName(),
368+
BigQuerySQLEngine.class.getName(),
369+
arguments.build());
370+
context.setInput(sqlEngineInput);
340371
}
341372

342373
private void emitLineage(BatchSourceContext context, Schema schema, Type sourceTableType,

src/main/java/io/cdap/plugin/gcp/bigquery/source/PartitionedBigQueryInputFormat.java

Lines changed: 2 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@
4848
* in order to create input splits.
4949
*/
5050
public class PartitionedBigQueryInputFormat extends AbstractBigQueryInputFormat<LongWritable, GenericData.Record> {
51-
private static final String DEFAULT_COLUMN_NAME = "_PARTITIONTIME";
5251

5352
private InputFormat<LongWritable, GenericData.Record> delegateInputFormat =
5453
new AvroBigQueryInputFormat();
@@ -160,8 +159,8 @@ String generateQuery(String partitionFromDate, String partitionToDate, String fi
160159
StringBuilder condition = new StringBuilder();
161160

162161
if (timePartitioning != null) {
163-
String timePartitionCondition = generateTimePartitionCondition(tableDefinition, timePartitioning,
164-
partitionFromDate, partitionToDate);
162+
String timePartitionCondition = BigQueryUtil.generateTimePartitionCondition(tableDefinition, partitionFromDate,
163+
partitionToDate);
165164
condition.append(timePartitionCondition);
166165
}
167166

@@ -289,33 +288,4 @@ private static JobReference getJobReference(Configuration conf, BigQueryHelper b
289288
}
290289
return new JobReference().setProjectId(projectId).setJobId(savedJobId).setLocation(location);
291290
}
292-
293-
private String generateTimePartitionCondition(StandardTableDefinition tableDefinition,
294-
TimePartitioning timePartitioning, String partitionFromDate,
295-
String partitionToDate) {
296-
StringBuilder timePartitionCondition = new StringBuilder();
297-
String columnName = timePartitioning.getField() != null ? timePartitioning.getField() : DEFAULT_COLUMN_NAME;
298-
299-
LegacySQLTypeName columnType = null;
300-
if (!DEFAULT_COLUMN_NAME.equals(columnName)) {
301-
columnType = tableDefinition.getSchema().getFields().get(columnName).getType();
302-
}
303-
304-
String columnNameTS = columnName;
305-
if (!LegacySQLTypeName.TIMESTAMP.equals(columnType)) {
306-
columnNameTS = "TIMESTAMP(`" + columnNameTS + "`)";
307-
}
308-
if (partitionFromDate != null) {
309-
timePartitionCondition.append(columnNameTS).append(" >= ").append("TIMESTAMP(\"")
310-
.append(partitionFromDate).append("\")");
311-
}
312-
if (partitionFromDate != null && partitionToDate != null) {
313-
timePartitionCondition.append(" and ");
314-
}
315-
if (partitionToDate != null) {
316-
timePartitionCondition.append(columnNameTS).append(" < ").append("TIMESTAMP(\"")
317-
.append(partitionToDate).append("\")");
318-
}
319-
return timePartitionCondition.toString();
320-
}
321291
}

0 commit comments

Comments
 (0)