Skip to content

Commit 5b0bd27

Browse files
Update Dataflow BigQuery samples to use Managed I/O (#10029)
1 parent 3c9133e commit 5b0bd27

File tree

3 files changed

+43
-38
lines changed

3 files changed

+43
-38
lines changed

dataflow/snippets/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
<maven.compiler.source>11</maven.compiler.source>
3838
<maven.compiler.target>11</maven.compiler.target>
3939
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
40-
<apache_beam.version>2.58.0</apache_beam.version>
40+
<apache_beam.version>2.63.0</apache_beam.version>
4141
<slf4j.version>2.0.12</slf4j.version>
4242
<parquet.version>1.14.0</parquet.version>
4343
<iceberg.version>1.4.2</iceberg.version>

dataflow/snippets/src/main/java/com/example/dataflow/BigQueryReadFromQuery.java

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,14 @@
1717
package com.example.dataflow;
1818

1919
// [START dataflow_bigquery_read_query]
20-
import com.google.api.services.bigquery.model.TableRow;
20+
import com.google.common.collect.ImmutableMap;
2121
import org.apache.beam.sdk.Pipeline;
22-
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
23-
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
22+
import org.apache.beam.sdk.managed.Managed;
2423
import org.apache.beam.sdk.options.PipelineOptions;
2524
import org.apache.beam.sdk.options.PipelineOptionsFactory;
2625
import org.apache.beam.sdk.transforms.MapElements;
27-
import org.apache.beam.sdk.values.TypeDescriptor;
26+
import org.apache.beam.sdk.values.Row;
27+
import org.apache.beam.sdk.values.TypeDescriptors;
2828

2929
public class BigQueryReadFromQuery {
3030
public static void main(String[] args) {
@@ -39,20 +39,23 @@ public static void main(String[] args) {
3939
PipelineOptions options = PipelineOptionsFactory.fromArgs(args)
4040
.withValidation().create();
4141

42+
ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
43+
.put("query", queryString)
44+
.build();
45+
4246
// Create a pipeline and apply transforms.
4347
Pipeline pipeline = Pipeline.create(options);
4448
pipeline
45-
// Read the query results into TableRow objects.
46-
.apply(BigQueryIO.readTableRows()
47-
.fromQuery(queryString)
48-
.usingStandardSql()
49-
.withMethod(TypedRead.Method.DIRECT_READ))
50-
// The output from the previous step is a PCollection<TableRow>.
49+
.apply(Managed.read(Managed.BIGQUERY).withConfig(config)).getSinglePCollection()
5150
.apply(MapElements
52-
.into(TypeDescriptor.of(TableRow.class))
53-
.via((TableRow row) -> {
54-
System.out.printf("Repo: %s, commits: %s%n", row.get("repo"), row.get("count"));
55-
return row;
51+
.into(TypeDescriptors.strings())
52+
// Access individual fields in the row.
53+
.via((Row row) -> {
54+
String output = String.format("Repo: %s, commits: %d%n",
55+
row.getString("repo"),
56+
row.getInt64("count"));
57+
System.out.println(output);
58+
return output;
5659
}));
5760
pipeline.run().waitUntilFinish();
5861
}

dataflow/snippets/src/main/java/com/example/dataflow/BigQueryReadWithProjectionAndFiltering.java

Lines changed: 25 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,14 @@
1717
package com.example.dataflow;
1818

1919
// [START dataflow_bigquery_read_projection_and_filtering]
20-
import com.google.api.services.bigquery.model.TableRow;
21-
import java.util.Arrays;
20+
import com.google.common.collect.ImmutableMap;
21+
import java.util.List;
2222
import org.apache.beam.sdk.Pipeline;
23-
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
24-
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
23+
import org.apache.beam.sdk.managed.Managed;
2524
import org.apache.beam.sdk.options.PipelineOptionsFactory;
2625
import org.apache.beam.sdk.transforms.MapElements;
27-
import org.apache.beam.sdk.values.TypeDescriptor;
26+
import org.apache.beam.sdk.values.Row;
27+
import org.apache.beam.sdk.values.TypeDescriptors;
2828

2929
public class BigQueryReadWithProjectionAndFiltering {
3030
public static void main(String[] args) {
@@ -36,28 +36,30 @@ public static void main(String[] args) {
3636
.withValidation()
3737
.as(ExamplePipelineOptions.class);
3838

39+
String tableSpec = String.format("%s:%s.%s",
40+
options.getProjectId(),
41+
options.getDatasetName(),
42+
options.getTableName());
43+
44+
ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
45+
.put("table", tableSpec)
46+
.put("row_restriction", "age > 18")
47+
.put("fields", List.of("user_name", "age"))
48+
.build();
49+
3950
// Create a pipeline and apply transforms.
4051
Pipeline pipeline = Pipeline.create(options);
4152
pipeline
42-
.apply(BigQueryIO.readTableRows()
43-
// Read rows from a specified table.
44-
.from(String.format("%s:%s.%s",
45-
options.getProjectId(),
46-
options.getDatasetName(),
47-
options.getTableName()))
48-
.withMethod(TypedRead.Method.DIRECT_READ)
49-
.withSelectedFields(Arrays.asList("user_name", "age"))
50-
.withRowRestriction("age > 18")
51-
)
52-
// The output from the previous step is a PCollection<TableRow>.
53+
.apply(Managed.read(Managed.BIGQUERY).withConfig(config)).getSinglePCollection()
5354
.apply(MapElements
54-
.into(TypeDescriptor.of(TableRow.class))
55-
// Use TableRow to access individual fields in the row.
56-
.via((TableRow row) -> {
57-
var name = (String) row.get("user_name");
58-
var age = row.get("age");
59-
System.out.printf("Name: %s, Age: %s%n", name, age);
60-
return row;
55+
.into(TypeDescriptors.strings())
56+
// Access individual fields in the row.
57+
.via((Row row) -> {
58+
String output = String.format("Name: %s, Age: %s%n",
59+
row.getString("user_name"),
60+
row.getInt64("age"));
61+
System.out.println(output);
62+
return output;
6163
}));
6264
pipeline.run().waitUntilFinish();
6365
}

0 commit comments

Comments
 (0)