Skip to content

Commit 3065504

Browse files
authored
fix parent project in BigQueryToParquet - this should be transparent change as BQIO is following same logic, it was strange that for schema pulling, pipeline is using dataset's project, while for reading it is following options. now it will be aligned. (#2530)
1 parent 8e01ed6 commit 3065504

File tree

2 files changed

+19
-7
lines changed

2 files changed

+19
-7
lines changed

v2/bigquery-to-parquet/src/main/java/com/google/cloud/teleport/v2/templates/BigQueryToParquet.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
4343
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
4444
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method;
45+
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
4546
import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord;
4647
import org.apache.beam.sdk.io.parquet.ParquetIO;
4748
import org.apache.beam.sdk.options.Default;
@@ -108,14 +109,22 @@ static class ReadSessionFactory {
108109
* Creates ReadSession for schema extraction.
109110
*
110111
* @param client BigQueryStorage client used to create ReadSession.
111-
* @param tableString String that represents table to export from.
112+
* @param options BigQueryToParquetOptions options.
112113
* @param tableReadOptions TableReadOptions that specify any fields in the table to filter on.
113114
* @return session ReadSession object that contains the schema for the export.
114115
*/
115116
static ReadSession create(
116-
BigQueryStorageClient client, String tableString, TableReadOptions tableReadOptions) {
117+
BigQueryStorageClient client,
118+
BigQueryToParquetOptions options,
119+
TableReadOptions tableReadOptions) {
120+
String tableString = options.getTableRef();
117121
TableReference tableReference = BigQueryHelpers.parseTableSpec(tableString);
118-
String parentProjectId = "projects/" + tableReference.getProjectId();
122+
BigQueryOptions bigQueryOptions = options.as(BigQueryOptions.class);
123+
String parentProjectId =
124+
bigQueryOptions.getBigQueryProject() == null
125+
? bigQueryOptions.getProject()
126+
: bigQueryOptions.getBigQueryProject();
127+
String parentProjectIdResource = "projects/" + parentProjectId;
119128

120129
TableReferenceProto.TableReference storageTableRef =
121130
TableReferenceProto.TableReference.newBuilder()
@@ -126,7 +135,7 @@ static ReadSession create(
126135

127136
CreateReadSessionRequest.Builder builder =
128137
CreateReadSessionRequest.newBuilder()
129-
.setParent(parentProjectId)
138+
.setParent(parentProjectIdResource)
130139
.setReadOptions(tableReadOptions)
131140
.setTableReference(storageTableRef);
132141
try {
@@ -254,8 +263,7 @@ private static PipelineResult run(BigQueryToParquetOptions options) {
254263

255264
TableReadOptions tableReadOptions = builder.build();
256265
BigQueryStorageClient client = BigQueryStorageClientFactory.create();
257-
ReadSession session =
258-
ReadSessionFactory.create(client, options.getTableRef(), tableReadOptions);
266+
ReadSession session = ReadSessionFactory.create(client, options, tableReadOptions);
259267

260268
// Extract schema from ReadSession
261269
Schema schema = getTableSchema(session);

v2/bigquery-to-parquet/src/test/java/com/google/cloud/teleport/v2/templates/BigQueryToParquetTest.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.google.cloud.bigquery.storage.v1beta1.Storage.ReadSession;
2424
import com.google.cloud.teleport.v2.templates.BigQueryToParquet.ReadSessionFactory;
2525
import org.apache.avro.Schema;
26+
import org.apache.beam.sdk.options.PipelineOptionsFactory;
2627
import org.junit.Test;
2728
import org.junit.runner.RunWith;
2829
import org.junit.runners.JUnit4;
@@ -48,8 +49,11 @@ public void testReadSessionFactoryBadTable() {
4849
// Test input
4950
final String badTableRef = "fantasmic-999999;great_data.table";
5051
final TableReadOptions tableReadOptions = TableReadOptions.newBuilder().build();
52+
BigQueryToParquet.BigQueryToParquetOptions options =
53+
PipelineOptionsFactory.create().as(BigQueryToParquet.BigQueryToParquetOptions.class);
54+
options.setTableRef(badTableRef);
5155
ReadSessionFactory trsf = new ReadSessionFactory();
52-
ReadSession trs = trsf.create(client, badTableRef, tableReadOptions);
56+
ReadSession trs = trsf.create(client, options, tableReadOptions);
5357
}
5458

5559
/** Test Schema Parser is working as expected. */

0 commit comments

Comments
 (0)