Skip to content

Commit 13439aa

Browse files
make sure project is where job is run and datasetProject is where dataset is (#719) (#721)
1 parent 5cb5a73 commit 13439aa

File tree

6 files changed

+31
-45
lines changed

6 files changed

+31
-45
lines changed

src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySink.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@
4040

4141
import java.io.IOException;
4242
import java.util.List;
43-
import java.util.Map;
4443
import java.util.Objects;
4544
import java.util.UUID;
4645
import java.util.stream.Collectors;
@@ -218,7 +217,7 @@ private void configureBigQuerySink() {
218217
*/
219218
private void configureTable(Schema schema) {
220219
AbstractBigQuerySinkConfig config = getConfig();
221-
Table table = BigQueryUtil.getBigQueryTable(config.getDatasetProject(), config.getDataset(),
220+
Table table = BigQueryUtil.getBigQueryTable(config.getProject(), config.getDatasetProject(), config.getDataset(),
222221
config.getTable(),
223222
config.getServiceAccount(),
224223
config.isServiceAccountFilePath());

src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ public void prepareRun(BatchSourceContext context) throws Exception {
123123
// Create BigQuery client
124124
String serviceAccount = config.getServiceAccount();
125125
Credentials credentials = BigQuerySourceUtils.getCredentials(config.getConnection());
126-
BigQuery bigQuery = GCPUtils.getBigQuery(config.getDatasetProject(), credentials);
126+
BigQuery bigQuery = GCPUtils.getBigQuery(config.getProject(), credentials);
127127

128128
// Get Configuration for this run
129129
bucketPath = UUID.randomUUID().toString();

src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySourceConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,7 @@ private void validateTable(FailureCollector collector) {
242242
*/
243243
public Type getSourceTableType() {
244244
Table sourceTable =
245-
BigQueryUtil.getBigQueryTable(
245+
BigQueryUtil.getBigQueryTable(getProject(),
246246
getDatasetProject(), getDataset(), table, getServiceAccount(), isServiceAccountFilePath());
247247
return sourceTable != null ? sourceTable.getDefinition().getType() : null;
248248
}

src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySourceUtils.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.google.auth.Credentials;
2020
import com.google.cloud.bigquery.BigQuery;
2121
import com.google.cloud.bigquery.Dataset;
22+
import com.google.cloud.bigquery.DatasetId;
2223
import com.google.cloud.bigquery.TableId;
2324
import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration;
2425
import io.cdap.plugin.gcp.bigquery.connector.BigQueryConnectorConfig;
@@ -78,7 +79,7 @@ public static String getOrCreateBucket(Configuration configuration,
7879
configuration.setBoolean("fs.gs.bucket.delete.enable", true);
7980

8081
// the dataset existence is validated before, so this cannot be null
81-
Dataset dataset = bigQuery.getDataset(config.getDataset());
82+
Dataset dataset = bigQuery.getDataset(DatasetId.of(config.getDatasetProject(), config.getDataset()));
8283
GCPUtils.createBucket(GCPUtils.getStorage(config.getProject(), credentials),
8384
bucket,
8485
dataset.getLocation(),
@@ -158,8 +159,8 @@ public static void deleteBigQueryTemporaryTable(Configuration configuration, Big
158159
String temporaryTable = configuration.get(BigQueryConstants.CONFIG_TEMPORARY_TABLE_NAME);
159160
try {
160161
Credentials credentials = getCredentials(config.getConnection());
161-
BigQuery bigQuery = GCPUtils.getBigQuery(config.getDatasetProject(), credentials);
162-
bigQuery.delete(TableId.of(config.getProject(), config.getDataset(), temporaryTable));
162+
BigQuery bigQuery = GCPUtils.getBigQuery(config.getProject(), credentials);
163+
bigQuery.delete(TableId.of(config.getDatasetProject(), config.getDataset(), temporaryTable));
163164
LOG.debug("Deleted temporary table '{}'", temporaryTable);
164165
} catch (IOException e) {
165166
LOG.error("Failed to load service account credentials: {}", e.getMessage(), e);

src/main/java/io/cdap/plugin/gcp/bigquery/source/PartitionedBigQueryInputFormat.java

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,8 @@ private void processQuery(JobContext context) throws IOException, InterruptedExc
8282
}
8383
Map<String, String> mandatoryConfig = ConfigurationUtil.getMandatoryConfig(
8484
configuration, BigQueryConfiguration.MANDATORY_CONFIG_PROPERTIES_INPUT);
85-
String inputProjectId = mandatoryConfig.get(BigQueryConfiguration.INPUT_PROJECT_ID_KEY);
85+
String projectId = mandatoryConfig.get(BigQueryConfiguration.PROJECT_ID_KEY);
86+
String datasetProjectId = mandatoryConfig.get(BigQueryConfiguration.INPUT_PROJECT_ID_KEY);
8687
String datasetId = mandatoryConfig.get(BigQueryConfiguration.INPUT_DATASET_ID_KEY);
8788
String tableName = mandatoryConfig.get(BigQueryConfiguration.INPUT_TABLE_ID_KEY);
8889
String serviceAccount = configuration.get(BigQueryConstants.CONFIG_SERVICE_ACCOUNT, null);
@@ -93,26 +94,26 @@ private void processQuery(JobContext context) throws IOException, InterruptedExc
9394
String partitionToDate = configuration.get(BigQueryConstants.CONFIG_PARTITION_TO_DATE, null);
9495
String filter = configuration.get(BigQueryConstants.CONFIG_FILTER, null);
9596

96-
com.google.cloud.bigquery.Table bigQueryTable =
97-
BigQueryUtil.getBigQueryTable(inputProjectId, datasetId, tableName, serviceAccount, isServiceAccountFilePath);
97+
com.google.cloud.bigquery.Table bigQueryTable = BigQueryUtil.getBigQueryTable(
98+
projectId, datasetProjectId, datasetId, tableName, serviceAccount, isServiceAccountFilePath);
9899
Type type = Objects.requireNonNull(bigQueryTable).getDefinition().getType();
99100

100101
String query;
101102
if (type == Type.VIEW || type == Type.MATERIALIZED_VIEW) {
102-
query = generateQueryForMaterializingView(datasetId, tableName, filter);
103+
query = generateQueryForMaterializingView(datasetProjectId, datasetId, tableName, filter);
103104
} else {
104-
query = generateQuery(partitionFromDate, partitionToDate, filter, inputProjectId, datasetId, tableName,
105-
serviceAccount, isServiceAccountFilePath);
105+
query = generateQuery(partitionFromDate, partitionToDate, filter, projectId, datasetProjectId, datasetId,
106+
tableName, serviceAccount, isServiceAccountFilePath);
106107
}
107108

108109
if (query != null) {
109-
TableReference sourceTable = new TableReference().setDatasetId(datasetId).setProjectId(inputProjectId)
110+
TableReference sourceTable = new TableReference().setDatasetId(datasetId).setProjectId(datasetProjectId)
110111
.setTableId(tableName);
111112
String location = bigQueryHelper.getTable(sourceTable).getLocation();
112113
String temporaryTableName = configuration.get(BigQueryConstants.CONFIG_TEMPORARY_TABLE_NAME);
113-
TableReference exportTableReference = createExportTableReference(type, inputProjectId, datasetId,
114+
TableReference exportTableReference = createExportTableReference(type, datasetProjectId, datasetId,
114115
temporaryTableName, configuration);
115-
runQuery(configuration, bigQueryHelper, inputProjectId, exportTableReference, query, location);
116+
runQuery(configuration, bigQueryHelper, projectId, exportTableReference, query, location);
116117
if (type == Type.VIEW || type == Type.MATERIALIZED_VIEW) {
117118
configuration.set(BigQueryConfiguration.INPUT_PROJECT_ID_KEY,
118119
configuration.get(BigQueryConstants.CONFIG_VIEW_MATERIALIZATION_PROJECT));
@@ -123,14 +124,14 @@ private void processQuery(JobContext context) throws IOException, InterruptedExc
123124
}
124125
}
125126

126-
private String generateQuery(String partitionFromDate, String partitionToDate, String filter,
127-
String project, String dataset, String table, @Nullable String serviceAccount,
127+
private String generateQuery(String partitionFromDate, String partitionToDate, String filter, String project,
128+
String datasetProject, String dataset, String table, @Nullable String serviceAccount,
128129
@Nullable Boolean isServiceAccountFilePath) {
129130
if (partitionFromDate == null && partitionToDate == null && filter == null) {
130131
return null;
131132
}
132133
String queryTemplate = "select * from %s where %s";
133-
com.google.cloud.bigquery.Table sourceTable = BigQueryUtil.getBigQueryTable(project, dataset, table,
134+
com.google.cloud.bigquery.Table sourceTable = BigQueryUtil.getBigQueryTable(project, datasetProject, dataset, table,
134135
serviceAccount,
135136
isServiceAccountFilePath);
136137
StandardTableDefinition tableDefinition = Objects.requireNonNull(sourceTable).getDefinition();
@@ -154,35 +155,35 @@ private String generateQuery(String partitionFromDate, String partitionToDate, S
154155
}
155156
}
156157

157-
String tableName = dataset + "." + table;
158+
String tableName = datasetProject + "." + dataset + "." + table;
158159
return String.format(queryTemplate, tableName, condition.toString());
159160
}
160161

161-
private String generateQueryForMaterializingView(String dataset, String table, String filter) {
162+
private String generateQueryForMaterializingView(String datasetProject, String dataset, String table, String filter) {
162163
String queryTemplate = "select * from %s %s";
163164
StringBuilder condition = new StringBuilder();
164165

165166
if (!Strings.isNullOrEmpty(filter)) {
166167
condition.append(String.format(" where %s", filter));
167168
}
168169

169-
String tableName = dataset + "." + table;
170+
String tableName = datasetProject + "." + dataset + "." + table;
170171
return String.format(queryTemplate, tableName, condition.toString());
171172
}
172173

173174
/**
174175
* Create {@link TableReference} to export Table or View
175176
*
176177
* @param type BigQuery table type
177-
* @param inputProjectId project id of source table
178+
* @param datasetProjectId project id of source table
178179
* @param datasetId dataset id of source table
179180
* @param tableId The ID of the table
180181
* @param configuration Configuration that contains View Materialization ProjectId and View
181182
* Materialization Dataset Id
182183
* @return {@link TableReference}
183184
*/
184185
private TableReference createExportTableReference(
185-
Type type, String inputProjectId,
186+
Type type, String datasetProjectId,
186187
String datasetId,
187188
String tableId,
188189
Configuration configuration) {
@@ -193,7 +194,7 @@ private TableReference createExportTableReference(
193194
tableReference.setProjectId(configuration.get(BigQueryConstants.CONFIG_VIEW_MATERIALIZATION_PROJECT));
194195
tableReference.setDatasetId(configuration.get(BigQueryConstants.CONFIG_VIEW_MATERIALIZATION_DATASET));
195196
} else {
196-
tableReference.setProjectId(inputProjectId);
197+
tableReference.setProjectId(datasetProjectId);
197198
tableReference.setDatasetId(datasetId);
198199
}
199200

src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java

Lines changed: 5 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -476,33 +476,18 @@ public static ValidationFailure validateArraySchema(Schema arraySchema, String n
476476

477477
/**
478478
* Get BigQuery table.
479-
*
480-
* @param projectId BigQuery project ID
481-
* @param datasetId BigQuery dataset ID
482-
* @param tableName BigQuery table name
483-
* @param serviceAccountPath service account file path
484-
* @return BigQuery table
485-
*/
486-
@Nullable
487-
public static Table getBigQueryTable(String projectId, String datasetId, String tableName,
488-
@Nullable String serviceAccountPath) {
489-
return getBigQueryTable(projectId, datasetId, tableName, serviceAccountPath, true);
490-
}
491-
492-
/**
493-
* Get BigQuery table.
494-
*
495-
* @param projectId BigQuery project ID
479+
* @param project project where the BQ job will be run
480+
* @param datasetProject project where dataset is in
496481
* @param datasetId BigQuery dataset ID
497482
* @param tableName BigQuery table name
498483
* @param serviceAccount service account file path or JSON content
499484
* @param isServiceAccountFilePath indicator for whether service account is file or json
500485
* @return BigQuery table
501486
*/
502487
@Nullable
503-
public static Table getBigQueryTable(String projectId, String datasetId, String tableName,
488+
public static Table getBigQueryTable(String project, String datasetProject, String datasetId, String tableName,
504489
@Nullable String serviceAccount, boolean isServiceAccountFilePath) {
505-
TableId tableId = TableId.of(projectId, datasetId, tableName);
490+
TableId tableId = TableId.of(datasetProject, datasetId, tableName);
506491

507492
com.google.auth.Credentials credentials = null;
508493
if (serviceAccount != null) {
@@ -514,7 +499,7 @@ public static Table getBigQueryTable(String projectId, String datasetId, String
514499
"serviceFilePath");
515500
}
516501
}
517-
BigQuery bigQuery = GCPUtils.getBigQuery(projectId, credentials);
502+
BigQuery bigQuery = GCPUtils.getBigQuery(project, credentials);
518503

519504
Table table;
520505
try {

0 commit comments

Comments
 (0)