Skip to content
This repository was archived by the owner on Nov 11, 2022. It is now read-only.

Commit 5c096ca

Browse files
jasonkusterdhalperi
authored andcommitted
Upgrade to Apache Beam, version 2.0.0 (#571)
* Upgrade to Apache Beam, version 2.0.0 Signed-off-by: Jason Kuster <[email protected]> * Revert enforcer plugin changes since they seem to be beam-only. Signed-off-by: Jason Kuster <[email protected]> * Pull request comments. Signed-off-by: Jason Kuster <[email protected]>
1 parent bba951d commit 5c096ca

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+883
-637
lines changed

maven-archetypes/examples-java8/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@
7474
</plugins>
7575
</pluginManagement>
7676
</build>
77+
7778
<!-- Dependency section from Beam omitted. Used in Beam for parallelizing build to ensure
7879
relevant sections are built, but that is not being done here. -->
7980
</project>

maven-archetypes/examples-java8/src/main/resources/archetype-resources/pom.xml

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727

2828
<properties>
2929
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
30+
<surefire-plugin.version>2.20</surefire-plugin.version>
3031
</properties>
3132

3233
<repositories>
@@ -58,7 +59,7 @@
5859
<plugin>
5960
<groupId>org.apache.maven.plugins</groupId>
6061
<artifactId>maven-surefire-plugin</artifactId>
61-
<version>2.19.1</version>
62+
<version>${surefire-plugin.version}</version>
6263
<configuration>
6364
<parallel>all</parallel>
6465
<threadCount>4</threadCount>
@@ -68,19 +69,26 @@
6869
<dependency>
6970
<groupId>org.apache.maven.surefire</groupId>
7071
<artifactId>surefire-junit47</artifactId>
71-
<version>2.19.1</version>
72+
<version>${surefire-plugin.version}</version>
7273
</dependency>
7374
</dependencies>
7475
</plugin>
7576

77+
<!-- Ensure that the Maven jar plugin runs before the Maven
78+
shade plugin by listing the plugin higher within the file. -->
79+
<plugin>
80+
<groupId>org.apache.maven.plugins</groupId>
81+
<artifactId>maven-jar-plugin</artifactId>
82+
</plugin>
83+
7684
<!--
7785
Configures `mvn package` to produce a bundled jar ("fat jar") for runners
7886
that require this for job submission to a cluster.
7987
-->
8088
<plugin>
8189
<groupId>org.apache.maven.plugins</groupId>
8290
<artifactId>maven-shade-plugin</artifactId>
83-
<version>2.4.1</version>
91+
<version>3.0.0</version>
8492
<executions>
8593
<execution>
8694
<phase>package</phase>

maven-archetypes/examples-java8/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,14 @@
2222
import java.util.regex.Pattern;
2323
import org.apache.beam.sdk.Pipeline;
2424
import org.apache.beam.sdk.io.TextIO;
25+
import org.apache.beam.sdk.metrics.Counter;
26+
import org.apache.beam.sdk.metrics.Metrics;
2527
import org.apache.beam.sdk.options.Default;
2628
import org.apache.beam.sdk.options.Description;
2729
import org.apache.beam.sdk.options.PipelineOptionsFactory;
2830
import org.apache.beam.sdk.testing.PAssert;
29-
import org.apache.beam.sdk.transforms.Aggregator;
3031
import org.apache.beam.sdk.transforms.DoFn;
3132
import org.apache.beam.sdk.transforms.ParDo;
32-
import org.apache.beam.sdk.transforms.Sum;
3333
import org.apache.beam.sdk.values.KV;
3434
import org.apache.beam.sdk.values.PCollection;
3535
import org.slf4j.Logger;
@@ -51,7 +51,7 @@
5151
* <p>New Concepts:
5252
* <pre>
5353
* 1. Logging using SLF4J, even in a distributed environment
54-
* 2. Creating a custom aggregator (runners have varying levels of support)
54+
* 2. Creating a custom metric (runners have varying levels of support)
5555
* 3. Testing your Pipeline via PAssert
5656
* </pre>
5757
*
@@ -90,29 +90,27 @@ public FilterTextFn(String pattern) {
9090
}
9191

9292
/**
93-
* Concept #2: A custom aggregator can track values in your pipeline as it runs. Each
94-
* runner provides varying levels of support for aggregators, and may expose them
93+
* Concept #2: A custom metric can track values in your pipeline as it runs. Each
94+
* runner provides varying levels of support for metrics, and may expose them
9595
* in a dashboard, etc.
9696
*/
97-
private final Aggregator<Long, Long> matchedWords =
98-
createAggregator("matchedWords", Sum.ofLongs());
99-
private final Aggregator<Long, Long> unmatchedWords =
100-
createAggregator("unmatchedWords", Sum.ofLongs());
97+
private final Counter matchedWords = Metrics.counter(FilterTextFn.class, "matchedWords");
98+
private final Counter unmatchedWords = Metrics.counter(FilterTextFn.class, "unmatchedWords");
10199

102100
@ProcessElement
103101
public void processElement(ProcessContext c) {
104102
if (filter.matcher(c.element().getKey()).matches()) {
105103
// Log at the "DEBUG" level each element that we match. When executing this pipeline
106104
// these log lines will appear only if the log level is set to "DEBUG" or lower.
107105
LOG.debug("Matched: " + c.element().getKey());
108-
matchedWords.addValue(1L);
106+
matchedWords.inc();
109107
c.output(c.element());
110108
} else {
111109
// Log at the "TRACE" level each element that is not matched. Different log levels
112110
// can be used to control the verbosity of logging providing an effective mechanism
113111
// to filter less important information.
114112
LOG.trace("Did not match: " + c.element().getKey());
115-
unmatchedWords.addValue(1L);
113+
unmatchedWords.inc();
116114
}
117115
}
118116
}
@@ -138,7 +136,7 @@ public static void main(String[] args) {
138136
Pipeline p = Pipeline.create(options);
139137

140138
PCollection<KV<String, Long>> filteredWords =
141-
p.apply("ReadLines", TextIO.Read.from(options.getInputFile()))
139+
p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
142140
.apply(new WordCount.CountWords())
143141
.apply(ParDo.of(new FilterTextFn(options.getFilterPattern())));
144142

@@ -151,7 +149,7 @@ public static void main(String[] args) {
151149
* <p>Below we verify that the set of filtered words matches our expected counts. Note
152150
* that PAssert does not provide any output and that successful completion of the
153151
* Pipeline implies that the expectations were met. Learn more at
154-
* https://cloud.google.com/dataflow/pipelines/testing-your-pipeline on how to test
152+
* https://beam.apache.org/documentation/pipelines/test-your-pipeline/ on how to test
155153
* your Pipeline and see {@link DebuggingWordCountTest} for an example unit test.
156154
*/
157155
List<KV<String, Long>> expectedResults = Arrays.asList(

maven-archetypes/examples-java8/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package ${package};
1919

20+
import ${package}.common.ExampleUtils;
2021
import org.apache.beam.sdk.Pipeline;
2122
import org.apache.beam.sdk.io.TextIO;
2223
import org.apache.beam.sdk.options.PipelineOptions;
@@ -74,7 +75,7 @@ public static void main(String[] args) {
7475
// the input text (a set of Shakespeare's texts).
7576

7677
// This example reads a public data set consisting of the complete works of Shakespeare.
77-
p.apply(TextIO.Read.from("gs://apache-beam-samples/shakespeare/*"))
78+
p.apply(TextIO.read().from("gs://apache-beam-samples/shakespeare/*"))
7879

7980
// Concept #2: Apply a ParDo transform to our PCollection of text lines. This ParDo invokes a
8081
// DoFn (defined in-line) on each element that tokenizes the text line into individual words.
@@ -83,7 +84,7 @@ public static void main(String[] args) {
8384
.apply("ExtractWords", ParDo.of(new DoFn<String, String>() {
8485
@ProcessElement
8586
public void processElement(ProcessContext c) {
86-
for (String word : c.element().split("[^a-zA-Z']+")) {
87+
for (String word : c.element().split(ExampleUtils.TOKENIZER_PATTERN)) {
8788
if (!word.isEmpty()) {
8889
c.output(word);
8990
}
@@ -110,7 +111,7 @@ public String apply(KV<String, Long> input) {
110111
// formatted strings) to a series of text files.
111112
//
112113
// By default, it will write to a set of files with names like wordcount-00001-of-00005
113-
.apply(TextIO.Write.to("wordcounts"));
114+
.apply(TextIO.write().to("wordcounts"));
114115

115116
// Run the pipeline.
116117
p.run().waitUntilFinish();

maven-archetypes/examples-java8/src/main/resources/archetype-resources/src/main/java/MinimalWordCountJava8.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -55,17 +55,17 @@ public static void main(String[] args) {
5555

5656
Pipeline p = Pipeline.create(options);
5757

58-
p.apply(TextIO.Read.from("gs://apache-beam-samples/shakespeare/*"))
59-
.apply(FlatMapElements.via((String word) -> Arrays.asList(word.split("[^a-zA-Z']+")))
60-
.withOutputType(TypeDescriptors.strings()))
58+
p.apply(TextIO.read().from("gs://apache-beam-samples/shakespeare/*"))
59+
.apply(FlatMapElements
60+
.into(TypeDescriptors.strings())
61+
.via((String word) -> Arrays.asList(word.split("[^\\p{L}]+"))))
6162
.apply(Filter.by((String word) -> !word.isEmpty()))
6263
.apply(Count.<String>perElement())
6364
.apply(MapElements
64-
.via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue())
65-
.withOutputType(TypeDescriptors.strings()))
66-
65+
.into(TypeDescriptors.strings())
66+
.via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue()))
6767
// CHANGE 3/3: The Google Cloud Storage path is required for outputting the results to.
68-
.apply(TextIO.Write.to("gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX"));
68+
.apply(TextIO.write().to("gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX"));
6969

7070
p.run().waitUntilFinish();
7171
}

maven-archetypes/examples-java8/src/main/resources/archetype-resources/src/main/java/WindowedWordCount.java

Lines changed: 12 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import java.util.concurrent.ThreadLocalRandom;
2222
import ${package}.common.ExampleBigQueryTableOptions;
2323
import ${package}.common.ExampleOptions;
24-
import ${package}.common.WriteWindowedFilesDoFn;
24+
import ${package}.common.WriteOneFilePerWindow;
2525
import org.apache.beam.sdk.Pipeline;
2626
import org.apache.beam.sdk.PipelineResult;
2727
import org.apache.beam.sdk.io.TextIO;
@@ -31,11 +31,9 @@
3131
import org.apache.beam.sdk.options.PipelineOptions;
3232
import org.apache.beam.sdk.options.PipelineOptionsFactory;
3333
import org.apache.beam.sdk.transforms.DoFn;
34-
import org.apache.beam.sdk.transforms.GroupByKey;
34+
import org.apache.beam.sdk.transforms.MapElements;
3535
import org.apache.beam.sdk.transforms.ParDo;
36-
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
3736
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
38-
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
3937
import org.apache.beam.sdk.transforms.windowing.Window;
4038
import org.apache.beam.sdk.values.KV;
4139
import org.apache.beam.sdk.values.PCollection;
@@ -53,7 +51,7 @@
5351
*
5452
* <p>Basic concepts, also in the MinimalWordCount, WordCount, and DebuggingWordCount examples:
5553
* Reading text files; counting a PCollection; writing to GCS; executing a Pipeline both locally
56-
* and using a selected runner; defining DoFns; creating a custom aggregator;
54+
* and using a selected runner; defining DoFns;
5755
* user-defined PTransforms; defining PipelineOptions.
5856
*
5957
* <p>New Concepts:
@@ -163,12 +161,15 @@ public interface Options extends WordCount.WordCountOptions,
163161
@Default.InstanceFactory(DefaultToMinTimestampPlusOneHour.class)
164162
Long getMaxTimestampMillis();
165163
void setMaxTimestampMillis(Long value);
164+
165+
@Description("Fixed number of shards to produce per window, or null for runner-chosen sharding")
166+
Integer getNumShards();
167+
void setNumShards(Integer numShards);
166168
}
167169

168170
public static void main(String[] args) throws IOException {
169171
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
170172
final String output = options.getOutput();
171-
final Duration windowSize = Duration.standardMinutes(options.getWindowSize());
172173
final Instant minTimestamp = new Instant(options.getMinTimestampMillis());
173174
final Instant maxTimestamp = new Instant(options.getMaxTimestampMillis());
174175

@@ -180,7 +181,7 @@ public static void main(String[] args) throws IOException {
180181
*/
181182
PCollection<String> input = pipeline
182183
/** Read from the GCS file. */
183-
.apply(TextIO.Read.from(options.getInputFile()))
184+
.apply(TextIO.read().from(options.getInputFile()))
184185
// Concept #2: Add an element timestamp, using an artificial time just to show windowing.
185186
// See AddTimestampFn for more detail on this.
186187
.apply(ParDo.of(new AddTimestampFn(minTimestamp, maxTimestamp)));
@@ -203,33 +204,13 @@ public static void main(String[] args) throws IOException {
203204
PCollection<KV<String, Long>> wordCounts = windowedWords.apply(new WordCount.CountWords());
204205

205206
/**
206-
* Concept #5: Customize the output format using windowing information
207-
*
208-
* <p>At this point, the data is organized by window. We're writing text files and and have no
209-
* late data, so for simplicity we can use the window as the key and {@link GroupByKey} to get
210-
* one output file per window. (if we had late data this key would not be unique)
211-
*
212-
* <p>To access the window in a {@link DoFn}, add a {@link BoundedWindow} parameter. This will
213-
* be automatically detected and populated with the window for the current element.
214-
*/
215-
PCollection<KV<IntervalWindow, KV<String, Long>>> keyedByWindow =
216-
wordCounts.apply(
217-
ParDo.of(
218-
new DoFn<KV<String, Long>, KV<IntervalWindow, KV<String, Long>>>() {
219-
@ProcessElement
220-
public void processElement(ProcessContext context, IntervalWindow window) {
221-
context.output(KV.of(window, context.element()));
222-
}
223-
}));
224-
225-
/**
226-
* Concept #6: Format the results and write to a sharded file partitioned by window, using a
207+
* Concept #5: Format the results and write to a sharded file partitioned by window, using a
227208
* simple ParDo operation. Because there may be failures followed by retries, the
228209
* writes must be idempotent, but the details of writing to files is elided here.
229210
*/
230-
keyedByWindow
231-
.apply(GroupByKey.<IntervalWindow, KV<String, Long>>create())
232-
.apply(ParDo.of(new WriteWindowedFilesDoFn(output)));
211+
wordCounts
212+
.apply(MapElements.via(new WordCount.FormatAsTextFn()))
213+
.apply(new WriteOneFilePerWindow(output, options.getNumShards()));
233214

234215
PipelineResult result = pipeline.run();
235216
try {

maven-archetypes/examples-java8/src/main/resources/archetype-resources/src/main/java/WordCount.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,22 @@
1717
*/
1818
package ${package};
1919

20+
import ${package}.common.ExampleUtils;
2021
import org.apache.beam.sdk.Pipeline;
2122
import org.apache.beam.sdk.io.TextIO;
23+
import org.apache.beam.sdk.metrics.Counter;
24+
import org.apache.beam.sdk.metrics.Metrics;
2225
import org.apache.beam.sdk.options.Default;
2326
import org.apache.beam.sdk.options.Description;
2427
import org.apache.beam.sdk.options.PipelineOptions;
2528
import org.apache.beam.sdk.options.PipelineOptionsFactory;
2629
import org.apache.beam.sdk.options.Validation.Required;
27-
import org.apache.beam.sdk.transforms.Aggregator;
2830
import org.apache.beam.sdk.transforms.Count;
2931
import org.apache.beam.sdk.transforms.DoFn;
3032
import org.apache.beam.sdk.transforms.MapElements;
3133
import org.apache.beam.sdk.transforms.PTransform;
3234
import org.apache.beam.sdk.transforms.ParDo;
3335
import org.apache.beam.sdk.transforms.SimpleFunction;
34-
import org.apache.beam.sdk.transforms.Sum;
3536
import org.apache.beam.sdk.values.KV;
3637
import org.apache.beam.sdk.values.PCollection;
3738

@@ -44,8 +45,8 @@
4445
* pipeline, for introduction of additional concepts.
4546
*
4647
* <p>For a detailed walkthrough of this example, see
47-
* <a href="http://beam.apache.org/use/walkthroughs/">
48-
* http://beam.apache.org/use/walkthroughs/
48+
* <a href="https://beam.apache.org/get-started/wordcount-example/">
49+
* https://beam.apache.org/get-started/wordcount-example/
4950
* </a>
5051
*
5152
* <p>Basic concepts, also in the MinimalWordCount example:
@@ -86,17 +87,16 @@ public class WordCount {
8687
* to a ParDo in the pipeline.
8788
*/
8889
static class ExtractWordsFn extends DoFn<String, String> {
89-
private final Aggregator<Long, Long> emptyLines =
90-
createAggregator("emptyLines", Sum.ofLongs());
90+
private final Counter emptyLines = Metrics.counter(ExtractWordsFn.class, "emptyLines");
9191

9292
@ProcessElement
9393
public void processElement(ProcessContext c) {
9494
if (c.element().trim().isEmpty()) {
95-
emptyLines.addValue(1L);
95+
emptyLines.inc();
9696
}
9797

9898
// Split the line into words.
99-
String[] words = c.element().split("[^a-zA-Z']+");
99+
String[] words = c.element().split(ExampleUtils.TOKENIZER_PATTERN);
100100

101101
// Output each word encountered into the output PCollection.
102102
for (String word : words) {
@@ -176,10 +176,10 @@ public static void main(String[] args) {
176176

177177
// Concepts #2 and #3: Our pipeline applies the composite CountWords transform, and passes the
178178
// static FormatAsTextFn() to the ParDo transform.
179-
p.apply("ReadLines", TextIO.Read.from(options.getInputFile()))
179+
p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
180180
.apply(new CountWords())
181181
.apply(MapElements.via(new FormatAsTextFn()))
182-
.apply("WriteCounts", TextIO.Write.to(options.getOutput()));
182+
.apply("WriteCounts", TextIO.write().to(options.getOutput()));
183183

184184
p.run().waitUntilFinish();
185185
}

maven-archetypes/examples-java8/src/main/resources/archetype-resources/src/main/java/common/ExampleBigQueryTableOptions.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,10 @@
1818
package ${package}.common;
1919

2020
import com.google.api.services.bigquery.model.TableSchema;
21+
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
2122
import org.apache.beam.sdk.options.Default;
2223
import org.apache.beam.sdk.options.DefaultValueFactory;
2324
import org.apache.beam.sdk.options.Description;
24-
import org.apache.beam.sdk.options.GcpOptions;
2525
import org.apache.beam.sdk.options.PipelineOptions;
2626

2727
/**

maven-archetypes/examples-java8/src/main/resources/archetype-resources/src/main/java/common/ExamplePubsubTopicAndSubscriptionOptions.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,10 @@
1717
*/
1818
package ${package}.common;
1919

20+
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
2021
import org.apache.beam.sdk.options.Default;
2122
import org.apache.beam.sdk.options.DefaultValueFactory;
2223
import org.apache.beam.sdk.options.Description;
23-
import org.apache.beam.sdk.options.GcpOptions;
2424
import org.apache.beam.sdk.options.PipelineOptions;
2525

2626
/**

maven-archetypes/examples-java8/src/main/resources/archetype-resources/src/main/java/common/ExamplePubsubTopicOptions.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,10 @@
1717
*/
1818
package ${package}.common;
1919

20+
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
2021
import org.apache.beam.sdk.options.Default;
2122
import org.apache.beam.sdk.options.DefaultValueFactory;
2223
import org.apache.beam.sdk.options.Description;
23-
import org.apache.beam.sdk.options.GcpOptions;
2424
import org.apache.beam.sdk.options.PipelineOptions;
2525

2626
/**

0 commit comments

Comments
 (0)