Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord;
import org.apache.beam.sdk.io.parquet.ParquetIO;
import org.apache.beam.sdk.options.Default;
Expand Down Expand Up @@ -108,14 +109,22 @@
* Creates ReadSession for schema extraction.
*
* @param client BigQueryStorage client used to create ReadSession.
* @param tableString String that represents table to export from.
* @param options BigQueryToParquetOptions options.
* @param tableReadOptions TableReadOptions that specify any fields in the table to filter on.
* @return session ReadSession object that contains the schema for the export.
*/
static ReadSession create(
BigQueryStorageClient client, String tableString, TableReadOptions tableReadOptions) {
BigQueryStorageClient client,
BigQueryToParquetOptions options,
TableReadOptions tableReadOptions) {
String tableString = options.getTableRef();
TableReference tableReference = BigQueryHelpers.parseTableSpec(tableString);
String parentProjectId = "projects/" + tableReference.getProjectId();
BigQueryOptions bigQueryOptions = options.as(BigQueryOptions.class);

Check warning on line 122 in v2/bigquery-to-parquet/src/main/java/com/google/cloud/teleport/v2/templates/BigQueryToParquet.java

View check run for this annotation

Codecov / codecov/patch

v2/bigquery-to-parquet/src/main/java/com/google/cloud/teleport/v2/templates/BigQueryToParquet.java#L122

Added line #L122 was not covered by tests
String parentProjectId =
bigQueryOptions.getBigQueryProject() == null
? bigQueryOptions.getProject()
: bigQueryOptions.getBigQueryProject();
String parentProjectIdResource = "projects/" + parentProjectId;

Check warning on line 127 in v2/bigquery-to-parquet/src/main/java/com/google/cloud/teleport/v2/templates/BigQueryToParquet.java

View check run for this annotation

Codecov / codecov/patch

v2/bigquery-to-parquet/src/main/java/com/google/cloud/teleport/v2/templates/BigQueryToParquet.java#L125-L127

Added lines #L125 - L127 were not covered by tests

TableReferenceProto.TableReference storageTableRef =
TableReferenceProto.TableReference.newBuilder()
Expand All @@ -126,7 +135,7 @@

CreateReadSessionRequest.Builder builder =
CreateReadSessionRequest.newBuilder()
.setParent(parentProjectId)
.setParent(parentProjectIdResource)

Check warning on line 138 in v2/bigquery-to-parquet/src/main/java/com/google/cloud/teleport/v2/templates/BigQueryToParquet.java

View check run for this annotation

Codecov / codecov/patch

v2/bigquery-to-parquet/src/main/java/com/google/cloud/teleport/v2/templates/BigQueryToParquet.java#L138

Added line #L138 was not covered by tests
.setReadOptions(tableReadOptions)
.setTableReference(storageTableRef);
try {
Expand Down Expand Up @@ -254,8 +263,7 @@

TableReadOptions tableReadOptions = builder.build();
BigQueryStorageClient client = BigQueryStorageClientFactory.create();
ReadSession session =
ReadSessionFactory.create(client, options.getTableRef(), tableReadOptions);
ReadSession session = ReadSessionFactory.create(client, options, tableReadOptions);

Check warning on line 266 in v2/bigquery-to-parquet/src/main/java/com/google/cloud/teleport/v2/templates/BigQueryToParquet.java

View check run for this annotation

Codecov / codecov/patch

v2/bigquery-to-parquet/src/main/java/com/google/cloud/teleport/v2/templates/BigQueryToParquet.java#L266

Added line #L266 was not covered by tests

// Extract schema from ReadSession
Schema schema = getTableSchema(session);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.cloud.bigquery.storage.v1beta1.Storage.ReadSession;
import com.google.cloud.teleport.v2.templates.BigQueryToParquet.ReadSessionFactory;
import org.apache.avro.Schema;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
Expand All @@ -48,8 +49,11 @@ public void testReadSessionFactoryBadTable() {
// Test input
final String badTableRef = "fantasmic-999999;great_data.table";
final TableReadOptions tableReadOptions = TableReadOptions.newBuilder().build();
BigQueryToParquet.BigQueryToParquetOptions options =
PipelineOptionsFactory.create().as(BigQueryToParquet.BigQueryToParquetOptions.class);
options.setTableRef(badTableRef);
ReadSessionFactory trsf = new ReadSessionFactory();
ReadSession trs = trsf.create(client, badTableRef, tableReadOptions);
ReadSession trs = trsf.create(client, options, tableReadOptions);
}

/** Test Schema Parser is working as expected. */
Expand Down
Loading