Skip to content

Commit c2e72ac

Browse files
authored
Streaming read for BigQuery (#36668)
* mvp for streaming read from bigQuery * mvp for streaming read from bigQuery - example * inspire by TypedRead, add capability for custom types, add tests, integration tests. * review * spotless * try increase memory as there are multiple tests ooming periodically. * fix names * fix names
1 parent 11f9b0c commit c2e72ac

File tree

9 files changed

+1882
-0
lines changed

9 files changed

+1882
-0
lines changed
Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,209 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.examples.cookbook;
19+
20+
import com.google.api.services.bigquery.model.TableFieldSchema;
21+
import com.google.api.services.bigquery.model.TableRow;
22+
import com.google.api.services.bigquery.model.TableSchema;
23+
import java.util.ArrayList;
24+
import java.util.List;
25+
import org.apache.beam.sdk.Pipeline;
26+
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryDynamicReadDescriptor;
27+
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
28+
import org.apache.beam.sdk.options.Default;
29+
import org.apache.beam.sdk.options.Description;
30+
import org.apache.beam.sdk.options.PipelineOptions;
31+
import org.apache.beam.sdk.options.PipelineOptionsFactory;
32+
import org.apache.beam.sdk.options.Validation;
33+
import org.apache.beam.sdk.transforms.Count;
34+
import org.apache.beam.sdk.transforms.DoFn;
35+
import org.apache.beam.sdk.transforms.MapElements;
36+
import org.apache.beam.sdk.transforms.PTransform;
37+
import org.apache.beam.sdk.transforms.ParDo;
38+
import org.apache.beam.sdk.transforms.PeriodicImpulse;
39+
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
40+
import org.apache.beam.sdk.transforms.windowing.Window;
41+
import org.apache.beam.sdk.values.KV;
42+
import org.apache.beam.sdk.values.PCollection;
43+
import org.apache.beam.sdk.values.TypeDescriptor;
44+
import org.joda.time.Duration;
45+
import org.joda.time.Instant;
46+
import org.slf4j.Logger;
47+
import org.slf4j.LoggerFactory;
48+
49+
/**
50+
* An example that reads periodically the public samples of weather data from BigQuery, counts the
51+
* number of tornadoes that occur in each month, and writes the results to BigQuery.
52+
*
53+
* <p>Concepts: Reading/writing BigQuery; counting a PCollection; user-defined PTransforms
54+
*
55+
* <p>Note: Before running this example, you must create a BigQuery dataset to contain your output
56+
* table.
57+
*
58+
* <p>To execute this pipeline locally, specify the BigQuery table for the output with the form:
59+
*
60+
* <pre>{@code
61+
* --output=YOUR_PROJECT_ID:DATASET_ID.TABLE_ID
62+
* }</pre>
63+
*
64+
* <p>To change the runner, specify:
65+
*
66+
* <pre>{@code
67+
* --runner=YOUR_SELECTED_RUNNER
68+
* }</pre>
69+
*
70+
* See examples/java/README.md for instructions about how to configure different runners.
71+
*
72+
* <p>The BigQuery input table defaults to {@code apache-beam-testing.samples.weather_stations} and
73+
* can be overridden with {@code --input}.
74+
*/
75+
public class BigQueryStreamingTornadoes {
76+
private static final Logger LOG = LoggerFactory.getLogger(BigQueryStreamingTornadoes.class);
77+
78+
// Default to using a 1000 row subset of the public weather station table publicdata:samples.gsod.
79+
private static final String WEATHER_SAMPLES_TABLE =
80+
"apache-beam-testing.samples.weather_stations";
81+
82+
/**
83+
* Examines each row in the input table. If a tornado was recorded in that sample, the month in
84+
* which it occurred is output.
85+
*/
86+
static class ExtractTornadoesFn extends DoFn<TableRow, Integer> {
87+
@ProcessElement
88+
public void processElement(ProcessContext c) {
89+
TableRow row = c.element();
90+
if (Boolean.TRUE.equals(row.get("tornado"))) {
91+
c.output(Integer.parseInt((String) row.get("month")));
92+
}
93+
}
94+
}
95+
96+
/**
97+
* Prepares the data for writing to BigQuery by building a TableRow object containing an integer
98+
* representation of month and the number of tornadoes that occurred in each month.
99+
*/
100+
static class FormatCountsFn extends DoFn<KV<Integer, Long>, TableRow> {
101+
@ProcessElement
102+
public void processElement(ProcessContext c) {
103+
TableRow row =
104+
new TableRow()
105+
.set("ts", c.timestamp().toString())
106+
.set("month", c.element().getKey())
107+
.set("tornado_count", c.element().getValue());
108+
c.output(row);
109+
}
110+
}
111+
112+
/**
113+
* Takes rows from a table and generates a table of counts.
114+
*
115+
* <p>The input schema is described by https://developers.google.com/bigquery/docs/dataset-gsod .
116+
* The output contains the total number of tornadoes found in each month in the following schema:
117+
*
118+
* <ul>
119+
* <li>month: integer
120+
* <li>tornado_count: integer
121+
* </ul>
122+
*/
123+
static class CountTornadoes extends PTransform<PCollection<TableRow>, PCollection<TableRow>> {
124+
@Override
125+
public PCollection<TableRow> expand(PCollection<TableRow> rows) {
126+
127+
// row... => month...
128+
PCollection<Integer> tornadoes = rows.apply(ParDo.of(new ExtractTornadoesFn()));
129+
130+
// month... => <month,count>...
131+
PCollection<KV<Integer, Long>> tornadoCounts = tornadoes.apply(Count.perElement());
132+
133+
// <month,count>... => row...
134+
PCollection<TableRow> results = tornadoCounts.apply(ParDo.of(new FormatCountsFn()));
135+
136+
return results;
137+
}
138+
}
139+
140+
/**
141+
* Options supported by {@link BigQueryStreamingTornadoes}.
142+
*
143+
* <p>Inherits standard configuration options.
144+
*/
145+
public interface Options extends PipelineOptions {
146+
@Description("Table to read from, specified as <project_id>:<dataset_id>.<table_id>")
147+
@Default.String(WEATHER_SAMPLES_TABLE)
148+
String getInput();
149+
150+
void setInput(String value);
151+
152+
@Description("Write method to use to write to BigQuery")
153+
@Default.Enum("DEFAULT")
154+
BigQueryIO.Write.Method getWriteMethod();
155+
156+
void setWriteMethod(BigQueryIO.Write.Method value);
157+
158+
@Description(
159+
"BigQuery table to write to, specified as "
160+
+ "<project_id>:<dataset_id>.<table_id>. The dataset must already exist.")
161+
@Validation.Required
162+
String getOutput();
163+
164+
void setOutput(String value);
165+
}
166+
167+
public static void applyBigQueryStreamingTornadoes(Pipeline p, Options options) {
168+
List<TableFieldSchema> fields = new ArrayList<>();
169+
fields.add(new TableFieldSchema().setName("ts").setType("STRING"));
170+
fields.add(new TableFieldSchema().setName("month").setType("INTEGER"));
171+
fields.add(new TableFieldSchema().setName("tornado_count").setType("INTEGER"));
172+
TableSchema schema = new TableSchema().setFields(fields);
173+
174+
PCollection<BigQueryDynamicReadDescriptor> descriptors =
175+
p.apply("Impulse", PeriodicImpulse.create().withInterval(Duration.standardSeconds(60)))
176+
.apply(
177+
"Create query",
178+
MapElements.into(TypeDescriptor.of(BigQueryDynamicReadDescriptor.class))
179+
.via(
180+
(Instant t) ->
181+
BigQueryDynamicReadDescriptor.table(
182+
WEATHER_SAMPLES_TABLE, null, null)));
183+
184+
PCollection<TableRow> readDynamically =
185+
descriptors.apply("Read dynamically", BigQueryIO.readDynamicallyTableRows());
186+
readDynamically
187+
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
188+
.apply(new CountTornadoes())
189+
.apply(
190+
BigQueryIO.writeTableRows()
191+
.to(options.getOutput())
192+
.withSchema(schema)
193+
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
194+
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
195+
.withMethod(options.getWriteMethod()));
196+
}
197+
198+
public static void runBigQueryTornadoes(Options options) {
199+
LOG.info("Running BigQuery Tornadoes with options " + options.toString());
200+
Pipeline p = Pipeline.create(options);
201+
applyBigQueryStreamingTornadoes(p, options);
202+
p.run().waitUntilFinish();
203+
}
204+
205+
public static void main(String[] args) {
206+
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
207+
runBigQueryTornadoes(options);
208+
}
209+
}

sdks/java/io/google-cloud-platform/build.gradle

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,8 @@ task integrationTest(type: Test, dependsOn: processTestResources) {
212212
exclude '**/BigQueryIOStorageQueryIT.class'
213213
exclude '**/BigQueryIOStorageReadIT.class'
214214
exclude '**/BigQueryIOStorageWriteIT.class'
215+
exclude '**/BigQueryIODynamicQueryIT.class'
216+
exclude '**/BigQueryIODynamicReadIT.class'
215217
exclude '**/BigQueryToTableIT.class'
216218

217219
maxParallelForks 4
@@ -281,6 +283,7 @@ task bigQueryEarlyRolloutIntegrationTest(type: Test, dependsOn: processTestResou
281283
include '**/BigQueryToTableIT.class'
282284
include '**/BigQueryIOJsonIT.class'
283285
include '**/BigQueryIOStorageReadTableRowIT.class'
286+
include '**/BigQueryIODynamicReadTableRowIT.class'
284287
// storage write api
285288
include '**/StorageApiDirectWriteProtosIT.class'
286289
include '**/StorageApiSinkFailedRowsIT.class'
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.sdk.io.gcp.bigquery;
19+
20+
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
21+
22+
import com.google.auto.value.AutoValue;
23+
import java.io.Serializable;
24+
import java.util.List;
25+
import org.apache.beam.sdk.schemas.AutoValueSchema;
26+
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
27+
import org.apache.beam.sdk.schemas.annotations.SchemaCreate;
28+
import org.apache.beam.sdk.schemas.annotations.SchemaFieldName;
29+
import org.apache.beam.sdk.schemas.annotations.SchemaFieldNumber;
30+
import org.checkerframework.checker.nullness.qual.Nullable;
31+
import org.checkerframework.dataflow.qual.Pure;
32+
33+
/** Represents a BigQuery source description used for dynamic read. */
34+
@DefaultSchema(AutoValueSchema.class)
35+
@AutoValue
36+
public abstract class BigQueryDynamicReadDescriptor implements Serializable {
37+
@SchemaFieldName("query")
38+
@SchemaFieldNumber("0")
39+
@Pure
40+
abstract @Nullable String getQuery();
41+
42+
@SchemaFieldName("table")
43+
@SchemaFieldNumber("1")
44+
@Pure
45+
abstract @Nullable String getTable();
46+
47+
@SchemaFieldName("flattenResults")
48+
@SchemaFieldNumber("2")
49+
@Pure
50+
abstract @Nullable Boolean getFlattenResults();
51+
52+
@SchemaFieldName("legacySql")
53+
@SchemaFieldNumber("3")
54+
@Pure
55+
abstract @Nullable Boolean getUseLegacySql();
56+
57+
@SchemaFieldName("selectedFields")
58+
@SchemaFieldNumber("4")
59+
@Pure
60+
abstract @Nullable List<String> getSelectedFields();
61+
62+
@SchemaFieldName("rowRestriction")
63+
@SchemaFieldNumber("5")
64+
@Pure
65+
abstract @Nullable String getRowRestriction();
66+
67+
@SchemaCreate
68+
public static BigQueryDynamicReadDescriptor create(
69+
@Nullable String query,
70+
@Nullable String table,
71+
@Nullable Boolean flattenResults,
72+
@Nullable Boolean useLegacySql,
73+
@Nullable List<String> selectedFields,
74+
@Nullable String rowRestriction) {
75+
checkArgument((query != null || table != null), "Either query or table has to be specified.");
76+
checkArgument(
77+
!(query != null && table != null), "Either query or table has to be specified not both.");
78+
checkArgument(
79+
!(table != null && (flattenResults != null || useLegacySql != null)),
80+
"Specifies a table with a result flattening preference or legacySql, which only applies to queries");
81+
checkArgument(
82+
!(query != null && (selectedFields != null || rowRestriction != null)),
83+
"Selected fields and row restriction are only applicable for table reads");
84+
checkArgument(
85+
!(query != null && (flattenResults == null || useLegacySql == null)),
86+
"If query is used, flattenResults and legacySql have to be set as well.");
87+
88+
return new AutoValue_BigQueryDynamicReadDescriptor(
89+
query, table, flattenResults, useLegacySql, selectedFields, rowRestriction);
90+
}
91+
92+
public static BigQueryDynamicReadDescriptor query(
93+
String query, Boolean flattenResults, Boolean useLegacySql) {
94+
return create(query, null, flattenResults, useLegacySql, null, null);
95+
}
96+
97+
public static BigQueryDynamicReadDescriptor table(
98+
String table, @Nullable List<String> selectedFields, @Nullable String rowRestriction) {
99+
return create(null, table, null, null, selectedFields, rowRestriction);
100+
}
101+
}

0 commit comments

Comments
 (0)