Skip to content

Commit 1374d03

Browse files
committed
inspire by TypedRead, add capability for custom types, add tests, integration tests.
1 parent 1a57ad3 commit 1374d03

File tree

9 files changed

+1618
-57
lines changed

9 files changed

+1618
-57
lines changed

examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryStreamingTornadoes.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,8 @@
4747
import org.slf4j.LoggerFactory;
4848

4949
/**
50-
* An example that reads periodically the public samples of weather data from BigQuery, counts the number of
51-
* tornadoes that occur in each month, and writes the results to BigQuery.
50+
* An example that reads periodically the public samples of weather data from BigQuery, counts the
51+
* number of tornadoes that occur in each month, and writes the results to BigQuery.
5252
*
5353
* <p>Concepts: Reading/writing BigQuery; counting a PCollection; user-defined PTransforms
5454
*
@@ -154,6 +154,7 @@ public interface Options extends PipelineOptions {
154154
BigQueryIO.Write.Method getWriteMethod();
155155

156156
void setWriteMethod(BigQueryIO.Write.Method value);
157+
157158
@Description(
158159
"BigQuery table to write to, specified as "
159160
+ "<project_id>:<dataset_id>.<table_id>. The dataset must already exist.")
@@ -177,7 +178,8 @@ public static void applyBigQueryStreamingTornadoes(Pipeline p, Options options)
177178
MapElements.into(TypeDescriptor.of(BigQueryDynamicReadDescriptor.class))
178179
.via(
179180
(Instant t) ->
180-
BigQueryDynamicReadDescriptor.create(null, WEATHER_SAMPLES_TABLE)));
181+
BigQueryDynamicReadDescriptor.table(
182+
WEATHER_SAMPLES_TABLE, null, null)));
181183

182184
PCollection<TableRow> readDynamically =
183185
descriptors.apply("Read dynamically", BigQueryIO.readDynamicallyTableRows());
@@ -193,7 +195,6 @@ public static void applyBigQueryStreamingTornadoes(Pipeline p, Options options)
193195
.withMethod(options.getWriteMethod()));
194196
}
195197

196-
197198
public static void runBigQueryTornadoes(Options options) {
198199
LOG.info("Running BigQuery Tornadoes with options " + options.toString());
199200
Pipeline p = Pipeline.create(options);

sdks/java/io/google-cloud-platform/build.gradle

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,8 @@ task integrationTest(type: Test, dependsOn: processTestResources) {
202202
exclude '**/BigQueryIOStorageQueryIT.class'
203203
exclude '**/BigQueryIOStorageReadIT.class'
204204
exclude '**/BigQueryIOStorageWriteIT.class'
205+
exclude '**/BigQueryIOStorageDynamicQueryIT.class'
206+
exclude '**/BigQueryIOStorageDynamicReadIT.class'
205207
exclude '**/BigQueryToTableIT.class'
206208

207209
maxParallelForks 4
@@ -271,6 +273,7 @@ task bigQueryEarlyRolloutIntegrationTest(type: Test, dependsOn: processTestResou
271273
include '**/BigQueryToTableIT.class'
272274
include '**/BigQueryIOJsonIT.class'
273275
include '**/BigQueryIOStorageReadTableRowIT.class'
276+
include '**/BigQueryIOStorageDynamicReadTableRowIT.class'
274277
// storage write api
275278
include '**/StorageApiDirectWriteProtosIT.class'
276279
include '**/StorageApiSinkFailedRowsIT.class'

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryDynamicReadDescriptor.java

Lines changed: 57 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,16 @@
1717
*/
1818
package org.apache.beam.sdk.io.gcp.bigquery;
1919

20+
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
21+
2022
import com.google.auto.value.AutoValue;
2123
import java.io.Serializable;
24+
import java.util.List;
2225
import org.apache.beam.sdk.schemas.AutoValueSchema;
2326
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
2427
import org.apache.beam.sdk.schemas.annotations.SchemaCreate;
2528
import org.apache.beam.sdk.schemas.annotations.SchemaFieldName;
29+
import org.apache.beam.sdk.schemas.annotations.SchemaFieldNumber;
2630
import org.checkerframework.checker.nullness.qual.Nullable;
2731
import org.checkerframework.dataflow.qual.Pure;
2832

@@ -31,17 +35,67 @@
3135
@AutoValue
3236
public abstract class BigQueryDynamicReadDescriptor implements Serializable {
3337
@SchemaFieldName("query")
38+
@SchemaFieldNumber("0")
3439
@Pure
3540
abstract @Nullable String getQuery();
3641

3742
@SchemaFieldName("table")
43+
@SchemaFieldNumber("1")
3844
@Pure
3945
abstract @Nullable String getTable();
4046

47+
@SchemaFieldName("flattenResults")
48+
@SchemaFieldNumber("2")
49+
@Pure
50+
abstract @Nullable Boolean getFlattenResults();
51+
52+
@SchemaFieldName("legacySql")
53+
@SchemaFieldNumber("3")
54+
@Pure
55+
abstract @Nullable Boolean getLegacySql();
56+
57+
@SchemaFieldName("selectedFields")
58+
@SchemaFieldNumber("4")
59+
@Pure
60+
abstract @Nullable List<String> getSelectedFields();
61+
62+
@SchemaFieldName("rowRestriction")
63+
@SchemaFieldNumber("5")
64+
@Pure
65+
abstract @Nullable String getRowRestriction();
66+
4167
@SchemaCreate
42-
@SuppressWarnings("all")
4368
public static BigQueryDynamicReadDescriptor create(
44-
@Nullable String query, @Nullable String table) {
45-
return new AutoValue_BigQueryDynamicReadDescriptor(query, table);
69+
@Nullable String query,
70+
@Nullable String table,
71+
@Nullable Boolean flattenResults,
72+
@Nullable Boolean legacySql,
73+
@Nullable List<String> selectedFields,
74+
@Nullable String rowRestriction) {
75+
checkArgument((query != null || table != null), "Either query or table has to be specified.");
76+
checkArgument(
77+
!(query != null && table != null), "Either query or table has to be specified not both.");
78+
checkArgument(
79+
!(table != null && (flattenResults != null || legacySql != null)),
80+
"Specifies a table with a result flattening preference or legacySql, which only applies to queries");
81+
checkArgument(
82+
!(query != null && (selectedFields != null || rowRestriction != null)),
83+
"Selected fields and row restriction are only applicable for table reads");
84+
checkArgument(
85+
!(query != null && (flattenResults == null || legacySql == null)),
86+
"If query is used, flattenResults and legacySql have to be set as well.");
87+
88+
return new AutoValue_BigQueryDynamicReadDescriptor(
89+
query, table, flattenResults, legacySql, selectedFields, rowRestriction);
90+
}
91+
92+
public static BigQueryDynamicReadDescriptor query(
93+
String query, Boolean flattenResults, Boolean legacySql) {
94+
return create(query, null, flattenResults, legacySql, null, null);
95+
}
96+
97+
public static BigQueryDynamicReadDescriptor table(
98+
String table, @Nullable List<String> selectedFields, @Nullable String rowRestriction) {
99+
return create(null, table, null, null, selectedFields, rowRestriction);
46100
}
47101
}

0 commit comments

Comments
 (0)