Skip to content

Commit 14cecf7

Browse files
committed
CDAP-14542 add a property for the dataset project
Added an optional property for the dataset project that can be used if it does not reside in the project in which the job will be run. Also fixing warning messages
1 parent 8754263 commit 14cecf7

File tree

6 files changed

+80
-27
lines changed

6 files changed

+80
-27
lines changed

docs/BigQueryTable-batchsink.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@ Properties
2424
**Reference Name:** Name used to uniquely identify this sink for lineage, annotating metadata, etc.
2525

2626
**Project ID**: Google Cloud Project ID, which uniquely identifies a project.
27-
It can be found on the Dashboard in the Google Cloud Platform Console.
27+
It can be found on the Dashboard in the Google Cloud Platform Console. This is the project
28+
that the BigQuery job will run in. If a temporary bucket needs to be created, the service account
29+
must have permission in this project to create buckets.
2830

2931
**Dataset**: Dataset the table belongs to. A dataset is contained within a specific project.
3032
Datasets are top-level containers that are used to organize and control access to tables and views.

docs/BigQueryTable-batchsource.md

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,13 @@ Properties
2424
**Reference Name:** Name used to uniquely identify this source for lineage, annotating metadata, etc.
2525

2626
**Project ID**: Google Cloud Project ID, which uniquely identifies a project.
27-
It can be found on the Dashboard in the Google Cloud Platform Console.
27+
It can be found on the Dashboard in the Google Cloud Platform Console. This is the project
28+
that the BigQuery job will run in. If a temporary bucket needs to be created, the service account
29+
must have permission in this project to create buckets.
30+
31+
**Dataset Project**: Project the dataset belongs to. This is only required if the dataset is not
32+
in the same project that the BigQuery job will run in. If no value is given,
33+
it will default to the configured Project ID.
2834

2935
**Dataset**: Dataset the table belongs to. A dataset is contained within a specific project.
3036
Datasets are top-level containers that are used to organize and control access to tables and views.

src/main/java/co/cask/gcp/bigquery/BigQuerySource.java

Lines changed: 24 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,8 @@ public void prepareRun(BatchSourceContext context) throws Exception {
9191
uuid = UUID.randomUUID();
9292
configuration = BigQueryUtils.getBigQueryConfig(config.getServiceAccountFilePath(), config.getProject());
9393

94-
String bucket = config.bucket;
95-
if (config.bucket == null) {
94+
String bucket = config.getBucket();
95+
if (bucket == null) {
9696
bucket = uuid.toString();
9797
// By default, this option is false, meaning the job can not delete the bucket. So enable it only when bucket name
9898
// is not provided.
@@ -106,7 +106,8 @@ public void prepareRun(BatchSourceContext context) throws Exception {
106106
String temporaryGcsPath = String.format("gs://%s/hadoop/input/%s", bucket, uuid);
107107
AvroBigQueryInputFormat.setTemporaryCloudStorageDirectory(configuration, temporaryGcsPath);
108108
AvroBigQueryInputFormat.setEnableShardedExport(configuration, false);
109-
BigQueryConfiguration.configureBigQueryInput(configuration, config.getProject(), config.dataset, config.table);
109+
BigQueryConfiguration.configureBigQueryInput(configuration, config.getDatasetProject(),
110+
config.getDataset(), config.getTable());
110111

111112
Job job = Job.getInstance(configuration);
112113
job.setOutputKeyClass(LongWritable.class);
@@ -141,7 +142,7 @@ public void transform(KeyValue<LongWritable, GenericData.Record> input, Emitter<
141142
public void onRunFinish(boolean succeeded, BatchSourceContext context) {
142143
org.apache.hadoop.fs.Path gcsPath = new org.apache.hadoop.fs.Path(String.format("gs://%s", uuid.toString()));
143144
try {
144-
if (config.bucket == null) {
145+
if (config.getBucket() == null) {
145146
FileSystem fs = gcsPath.getFileSystem(configuration);
146147
if (fs.exists(gcsPath)) {
147148
fs.delete(gcsPath, true);
@@ -163,18 +164,20 @@ public void onRunFinish(boolean succeeded, BatchSourceContext context) {
163164
*/
164165
@Path("getSchema")
165166
public Schema getSchema(BigQuerySourceConfig request) throws Exception {
166-
Table table = BigQueryUtils.getBigQueryTable(request.getServiceAccountFilePath(), request.getProject(),
167-
request.dataset, request.table);
167+
String dataset = request.getDataset();
168+
String table = request.getTable();
169+
String project = request.getDatasetProject();
170+
Table bqTable = BigQueryUtils.getBigQueryTable(request.getServiceAccountFilePath(), project, dataset, table);
168171
if (table == null) {
169172
// Table does not exist
170-
throw new IllegalArgumentException(String.format("BigQuery table '%s.%s' does not exist",
171-
request.dataset, request.table));
173+
throw new IllegalArgumentException(String.format("BigQuery table '%s:%s.%s' does not exist",
174+
project, dataset, table));
172175
}
173176

174-
com.google.cloud.bigquery.Schema bgSchema = table.getDefinition().getSchema();
177+
com.google.cloud.bigquery.Schema bgSchema = bqTable.getDefinition().getSchema();
175178
if (bgSchema == null) {
176-
throw new IllegalArgumentException(String.format("Cannot read from table '%s.%s' because it has no schema.",
177-
request.dataset, request.table));
179+
throw new IllegalArgumentException(String.format("Cannot read from table '%s:%s.%s' because it has no schema.",
180+
project, dataset, table));
178181
}
179182
List<Schema.Field> fields = getSchemaFields(bgSchema);
180183
return Schema.recordOf("output", fields);
@@ -185,32 +188,34 @@ public Schema getSchema(BigQuerySourceConfig request) throws Exception {
185188
* {@link #getSchema(BigQuerySourceConfig)} method.
186189
*/
187190
private void validateOutputSchema() throws IOException {
188-
Table table = BigQueryUtils.getBigQueryTable(config.getServiceAccountFilePath(), config.getProject(),
189-
config.dataset, config.table);
191+
String dataset = config.getDataset();
192+
String tableName = config.getTable();
193+
String project = config.getDatasetProject();
194+
Table table = BigQueryUtils.getBigQueryTable(config.getServiceAccountFilePath(), project, dataset, tableName);
190195
if (table == null) {
191196
// Table does not exist
192-
throw new IllegalArgumentException(String.format("BigQuery table '%s.%s' does not exist.", config.dataset,
193-
config.table));
197+
throw new IllegalArgumentException(String.format("BigQuery table '%s:%s.%s' does not exist.",
198+
project, dataset, table));
194199
}
195200

196201
com.google.cloud.bigquery.Schema bgSchema = table.getDefinition().getSchema();
197202
if (bgSchema == null) {
198-
throw new IllegalArgumentException(String.format("Cannot read from table '%s.%s' because it has no schema.",
199-
config.dataset, config.table));
203+
throw new IllegalArgumentException(String.format("Cannot read from table '%s:%s.%s' because it has no schema.",
204+
project, dataset, table));
200205
}
201206

202207
// Output schema should not have more fields than BigQuery table
203208
List<String> diff = BigQueryUtils.getSchemaMinusBqFields(config.getSchema().getFields(), bgSchema.getFields());
204209
if (!diff.isEmpty()) {
205210
throw new IllegalArgumentException(String.format("Output schema has field(s) '%s' which are not present in table"
206-
+ " '%s.%s' schema.", diff, config.dataset, config.table));
211+
+ " '%s:%s.%s' schema.", diff, project, dataset, table));
207212
}
208213

209214
FieldList fields = bgSchema.getFields();
210215
// Match output schema field type with bigquery column type
211216
for (Schema.Field field : config.getSchema().getFields()) {
212217
validateSimpleTypes(field);
213-
BigQueryUtils.validateFieldSchemaMatches(fields.get(field.getName()), field, config.dataset, config.table);
218+
BigQueryUtils.validateFieldSchemaMatches(fields.get(field.getName()), field, dataset, tableName);
214219
}
215220
}
216221

src/main/java/co/cask/gcp/bigquery/BigQuerySourceConfig.java

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@
2020
import co.cask.cdap.api.annotation.Macro;
2121
import co.cask.cdap.api.annotation.Name;
2222
import co.cask.cdap.api.data.schema.Schema;
23+
import co.cask.gcp.common.GCPConfig;
2324
import co.cask.gcp.common.GCPReferenceSourceConfig;
25+
import com.google.cloud.ServiceOptions;
2426

2527
import java.io.IOException;
2628
import javax.annotation.Nullable;
@@ -32,25 +34,53 @@ public final class BigQuerySourceConfig extends GCPReferenceSourceConfig {
3234
@Macro
3335
@Description("The dataset the table belongs to. A dataset is contained within a specific project. "
3436
+ "Datasets are top-level containers that are used to organize and control access to tables and views.")
35-
public String dataset;
37+
private String dataset;
3638

3739
@Macro
3840
@Description("The table to read from. A table contains individual records organized in rows. "
3941
+ "Each record is composed of columns (also called fields). "
4042
+ "Every table is defined by a schema that describes the column names, data types, and other information.")
41-
public String table;
43+
private String table;
4244

4345
@Macro
4446
@Nullable
4547
@Description("The Google Cloud Storage bucket to store temporary data in. "
4648
+ "It will be automatically created if it does not exist, but will not be automatically deleted. "
47-
+ "Temporary data will be deleted after it has been read. " +
48-
"If it is not provided, a unique bucket will be created and then deleted after the run finishes.")
49-
public String bucket;
49+
+ "Temporary data will be deleted after it has been read. "
50+
+ "If it is not provided, a unique bucket will be created and then deleted after the run finishes. "
51+
+ "The service account must have permission to create buckets in the configured project.")
52+
private String bucket;
5053

5154
@Macro
5255
@Description("The schema of the table to read.")
53-
public String schema;
56+
private String schema;
57+
58+
@Macro
59+
@Nullable
60+
@Description("The project the dataset belongs to. This is only required if the dataset is not "
61+
+ "in the same project that the BigQuery job will run in. If no value is given, it will default to the configured "
62+
+ "project ID.")
63+
private String datasetProject;
64+
65+
public String getDataset() {
66+
return dataset;
67+
}
68+
69+
public String getTable() {
70+
return table;
71+
}
72+
73+
@Nullable
74+
public String getBucket() {
75+
return bucket;
76+
}
77+
78+
public String getDatasetProject() {
79+
if (GCPConfig.AUTO_DETECT.equalsIgnoreCase(datasetProject)) {
80+
return ServiceOptions.getDefaultProjectId();
81+
}
82+
return datasetProject == null ? getProject() : datasetProject;
83+
}
5484

5585
/**
5686
* @return the schema of the dataset

src/main/java/co/cask/gcp/bigquery/BigQueryUtils.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package co.cask.gcp.bigquery;
1818

1919
import co.cask.cdap.api.data.schema.Schema;
20+
import co.cask.gcp.gcs.GCSConfigHelper;
2021
import com.google.cloud.bigquery.BigQuery;
2122
import com.google.cloud.bigquery.BigQueryOptions;
2223
import com.google.cloud.bigquery.Field;
@@ -99,6 +100,7 @@ static Configuration getBigQueryConfig(@Nullable String serviceAccountFilePath,
99100
configuration.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem");
100101
configuration.set("fs.AbstractFileSystm.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS");
101102
configuration.set("fs.gs.project.id", projectId);
103+
configuration.set("fs.gs.working.dir", GCSConfigHelper.ROOT_DIR);
102104
configuration.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId);
103105
return configuration;
104106
}

widgets/BigQueryTable-batchsource.json

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,14 @@
2323
"default": "auto-detect"
2424
}
2525
},
26+
{
27+
"widget-type": "textbox",
28+
"label": "Dataset Project ID",
29+
"name": "datasetProject",
30+
"widget-attributes" : {
31+
"placeholder": "Project the dataset belongs to, if different from the Project ID."
32+
}
33+
},
2634
{
2735
"widget-type": "textbox",
2836
"label": "Dataset",

0 commit comments

Comments
 (0)