Skip to content

Commit 44428fa

Browse files
committed
mvp for streaming read from bigQuery
1 parent fb1fab0 commit 44428fa

File tree

2 files changed

+159
-0
lines changed

2 files changed

+159
-0
lines changed
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.sdk.io.gcp.bigquery;
19+
20+
import com.google.auto.value.AutoValue;
21+
import java.io.Serializable;
22+
import org.apache.beam.sdk.schemas.AutoValueSchema;
23+
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
24+
import org.apache.beam.sdk.schemas.annotations.SchemaCreate;
25+
import org.apache.beam.sdk.schemas.annotations.SchemaFieldName;
26+
import org.checkerframework.checker.nullness.qual.Nullable;
27+
import org.checkerframework.dataflow.qual.Pure;
28+
29+
/** Represents a BigQuery source description used for dynamic read. */
30+
@DefaultSchema(AutoValueSchema.class)
31+
@AutoValue
32+
public abstract class BigQueryDynamicReadDescriptor implements Serializable {
33+
@SchemaFieldName("query")
34+
@Pure
35+
abstract @Nullable String getQuery();
36+
37+
@SchemaFieldName("table")
38+
@Pure
39+
abstract @Nullable String getTable();
40+
41+
@SchemaCreate
42+
@SuppressWarnings("all")
43+
public static BigQueryDynamicReadDescriptor create(
44+
@Nullable String query, @Nullable String table) {
45+
return new AutoValue_BigQueryDynamicReadDescriptor(query, table);
46+
}
47+
}

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,11 @@
5252
import com.google.protobuf.DynamicMessage;
5353
import com.google.protobuf.Message;
5454
import java.io.IOException;
55+
import java.io.InputStream;
56+
import java.io.OutputStream;
5557
import java.io.Serializable;
5658
import java.lang.reflect.InvocationTargetException;
59+
import java.util.ArrayList;
5760
import java.util.Arrays;
5861
import java.util.Collections;
5962
import java.util.List;
@@ -74,9 +77,11 @@
7477
import org.apache.beam.sdk.PipelineRunner;
7578
import org.apache.beam.sdk.coders.CannotProvideCoderException;
7679
import org.apache.beam.sdk.coders.Coder;
80+
import org.apache.beam.sdk.coders.CoderException;
7781
import org.apache.beam.sdk.coders.CoderRegistry;
7882
import org.apache.beam.sdk.coders.KvCoder;
7983
import org.apache.beam.sdk.coders.StringUtf8Coder;
84+
import org.apache.beam.sdk.coders.StructuredCoder;
8085
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
8186
import org.apache.beam.sdk.extensions.avro.io.AvroSource;
8287
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
@@ -118,6 +123,7 @@
118123
import org.apache.beam.sdk.transforms.MapElements;
119124
import org.apache.beam.sdk.transforms.PTransform;
120125
import org.apache.beam.sdk.transforms.ParDo;
126+
import org.apache.beam.sdk.transforms.Redistribute;
121127
import org.apache.beam.sdk.transforms.Reshuffle;
122128
import org.apache.beam.sdk.transforms.SerializableFunction;
123129
import org.apache.beam.sdk.transforms.SerializableFunctions;
@@ -669,6 +675,19 @@ public static TypedRead<TableRow> readTableRowsWithSchema() {
669675
BigQueryUtils.tableRowToBeamRow(),
670676
BigQueryUtils.tableRowFromBeamRow());
671677
}
678+
/** @deprecated this method may have breaking changes introduced, use with caution */
679+
@Deprecated
680+
public static DynamicRead readDynamicallyTableRows() {
681+
return new AutoValue_BigQueryIO_DynamicRead.Builder()
682+
.setBigQueryServices(new BigQueryServicesImpl())
683+
.setParseFn(new TableRowParser())
684+
.setFormat(DataFormat.AVRO)
685+
.setOutputCoder(TableRowJsonCoder.of())
686+
.setProjectionPushdownApplied(false)
687+
.setBadRecordErrorHandler(new DefaultErrorHandler<>())
688+
.setBadRecordRouter(BadRecordRouter.THROWING_ROUTER)
689+
.build();
690+
}
672691

673692
private static class TableSchemaFunction
674693
implements Serializable, Function<@Nullable String, @Nullable TableSchema> {
@@ -804,6 +823,99 @@ public TableRow apply(SchemaAndRecord schemaAndRecord) {
804823
return BigQueryAvroUtils.convertGenericRecordToTableRow(schemaAndRecord.getRecord());
805824
}
806825
}
826+
/** @deprecated this class may have breaking changes introduced, use with caution */
827+
@Deprecated
828+
@AutoValue
829+
public abstract static class DynamicRead
830+
extends PTransform<PCollection<BigQueryDynamicReadDescriptor>, PCollection<TableRow>> {
831+
832+
abstract BigQueryServices getBigQueryServices();
833+
834+
abstract DataFormat getFormat();
835+
836+
abstract @Nullable SerializableFunction<SchemaAndRecord, TableRow> getParseFn();
837+
838+
abstract @Nullable ValueProvider<String> getRowRestriction();
839+
840+
abstract @Nullable Coder<TableRow> getOutputCoder();
841+
842+
abstract boolean getProjectionPushdownApplied();
843+
844+
abstract BadRecordRouter getBadRecordRouter();
845+
846+
abstract ErrorHandler<BadRecord, ?> getBadRecordErrorHandler();
847+
848+
@AutoValue.Builder
849+
abstract static class Builder {
850+
851+
abstract Builder setFormat(DataFormat format);
852+
853+
abstract Builder setBigQueryServices(BigQueryServices bigQueryServices);
854+
855+
abstract Builder setParseFn(SerializableFunction<SchemaAndRecord, TableRow> parseFn);
856+
857+
abstract Builder setRowRestriction(ValueProvider<String> rowRestriction);
858+
859+
abstract Builder setOutputCoder(Coder<TableRow> coder);
860+
861+
abstract Builder setProjectionPushdownApplied(boolean projectionPushdownApplied);
862+
863+
abstract Builder setBadRecordErrorHandler(ErrorHandler<BadRecord, ?> badRecordErrorHandler);
864+
865+
abstract Builder setBadRecordRouter(BadRecordRouter badRecordRouter);
866+
867+
abstract DynamicRead build();
868+
}
869+
870+
DynamicRead() {}
871+
872+
class CreateBoundedSourceForTable
873+
extends DoFn<BigQueryDynamicReadDescriptor, BigQueryStorageStreamSource<TableRow>> {
874+
875+
@ProcessElement
876+
public void processElement(
877+
OutputReceiver<BigQueryStorageStreamSource<TableRow>> receiver,
878+
@Element BigQueryDynamicReadDescriptor descriptor,
879+
PipelineOptions options)
880+
throws Exception {
881+
BigQueryStorageTableSource<TableRow> output =
882+
BigQueryStorageTableSource.create(
883+
StaticValueProvider.of(BigQueryHelpers.parseTableSpec(descriptor.getTable())),
884+
getFormat(),
885+
null,
886+
getRowRestriction(),
887+
getParseFn(),
888+
getOutputCoder(),
889+
getBigQueryServices(),
890+
getProjectionPushdownApplied());
891+
// 1mb --> 1 shard; 1gb --> 32 shards; 1tb --> 1000 shards, 1pb --> 32k
892+
// shards
893+
long desiredChunkSize =
894+
Math.max(1 << 20, (long) (1000 * Math.sqrt(output.getEstimatedSizeBytes(options))));
895+
List<BigQueryStorageStreamSource<TableRow>> split = output.split(desiredChunkSize, options);
896+
split.stream().forEach(source -> receiver.output(source));
897+
}
898+
}
899+
900+
@Override
901+
public PCollection<TableRow> expand(PCollection<BigQueryDynamicReadDescriptor> input) {
902+
TupleTag<TableRow> rowTag = new TupleTag<>();
903+
PCollectionTuple resultTuple =
904+
input
905+
.apply("convert", ParDo.of(new CreateBoundedSourceForTable()))
906+
.apply("redistribute", Redistribute.arbitrarily())
907+
.apply(
908+
"Read Storage Table Source",
909+
ParDo.of(
910+
new TypedRead.ReadTableSource<TableRow>(
911+
rowTag, getParseFn(), getBadRecordRouter()))
912+
.withOutputTags(rowTag, TupleTagList.of(BAD_RECORD_TAG)));
913+
getBadRecordErrorHandler()
914+
.addErrorCollection(
915+
resultTuple.get(BAD_RECORD_TAG).setCoder(BadRecord.getCoder(input.getPipeline())));
916+
return resultTuple.get(rowTag).setCoder(TableRowJsonCoder.of());
917+
}
918+
}
807919

808920
/** Implementation of {@link BigQueryIO#read()}. */
809921
public static class Read extends PTransform<PBegin, PCollection<TableRow>> {

0 commit comments

Comments
 (0)