Skip to content

Commit 2dcb15f

Browse files
olegkachur-eOleg Kachur
andauthored
Add Dataflow Apache Beam Java streaming system test (apache#47209)
- add system test - update docs - add sources to the build jar file Co-authored-by: Oleg Kachur <kachur@google.com>
1 parent e35cf2f commit 2dcb15f

File tree

6 files changed

+385
-0
lines changed

6 files changed

+385
-0
lines changed

providers/google/docs/operators/cloud/dataflow.rst

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,14 @@ Here is an example of creating and running a pipeline in Java with jar stored on
138138
:start-after: [START howto_operator_start_java_job_local_jar]
139139
:end-before: [END howto_operator_start_java_job_local_jar]
140140

141+
Here is an example of creating and running a streaming pipeline in Java with jar stored on GCS:
142+
143+
.. exampleinclude:: /../../providers/google/tests/system/google/cloud/dataflow/example_dataflow_java_streaming.py
144+
:language: python
145+
:dedent: 4
146+
:start-after: [START howto_operator_start_java_streaming]
147+
:end-before: [END howto_operator_start_java_streaming]
148+
141149
.. _howto/operator:PythonSDKPipelines:
142150

143151
Python SDK pipelines
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing,
13+
# software distributed under the License is distributed on an
14+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
# KIND, either express or implied. See the License for the
16+
# specific language governing permissions and limitations
17+
# under the License.
18+
19+
"""
20+
Example Airflow DAG for testing Google Dataflow Beam Pipeline Operator with Java(streaming).
21+
22+
Important Note:
23+
This test downloads Java JAR file from the public bucket. In case the JAR file cannot be downloaded
24+
or is not compatible with the Java version used in the test.
25+
There is no streaming pipeline example for Apache Beam Java SDK, the source code and build instructions
26+
are located in `providers/google/tests/system/google/cloud/dataflow/resources/java_streaming_src/`.
27+
28+
You can follow the instructions on how to pack a self-executing jar here:
29+
https://beam.apache.org/documentation/runners/dataflow/
30+
31+
Requirements:
32+
These operators require the gcloud command and Java's JRE to run.
33+
"""
34+
35+
from __future__ import annotations
36+
37+
import os
38+
from datetime import datetime
39+
40+
from airflow.models.dag import DAG
41+
from airflow.providers.apache.beam.hooks.beam import BeamRunnerType
42+
from airflow.providers.apache.beam.operators.beam import BeamRunJavaPipelineOperator
43+
from airflow.providers.google.cloud.operators.dataflow import DataflowStopJobOperator
44+
from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
45+
from airflow.providers.google.cloud.operators.pubsub import (
46+
PubSubCreateTopicOperator,
47+
PubSubDeleteTopicOperator,
48+
)
49+
from airflow.providers.google.cloud.transfers.gcs_to_local import GCSToLocalFilesystemOperator
50+
from airflow.utils.trigger_rule import TriggerRule
51+
52+
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
53+
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
54+
DAG_ID = "dataflow_java_streaming"
55+
LOCATION = "europe-west3"
56+
BUCKET_NAME = f"bucket-{DAG_ID}-{ENV_ID}"
57+
GCS_TMP = f"gs://{BUCKET_NAME}/temp"
58+
GCS_OUTPUT = f"gs://{BUCKET_NAME}/DF_OUT"
59+
RESOURCE_BUCKET = "airflow-system-tests-resources"
60+
JAR_FILE_NAME = "stream-pubsub-example-bundled-v-0.1.jar"
61+
GCS_JAR_PATH = f"gs://{RESOURCE_BUCKET}/dataflow/java/{JAR_FILE_NAME}"
62+
# For the distributed system, we need to store the JAR file in a folder that can be accessed by multiple
63+
# worker.
64+
# For example in Composer the correct path is gcs/data/word-count-beam-bundled-0.1.jar.
65+
# Because gcs/data/ is shared folder for Airflow's workers.
66+
IS_COMPOSER = bool(os.environ.get("COMPOSER_ENVIRONMENT", ""))
67+
LOCAL_JAR = f"gcs/data/{JAR_FILE_NAME}" if IS_COMPOSER else JAR_FILE_NAME
68+
REMOTE_JAR_FILE_PATH = f"dataflow/java/{JAR_FILE_NAME}"
69+
70+
OUTPUT_TOPIC_ID = f"tp-{ENV_ID}-out"
71+
INPUT_TOPIC = "projects/pubsub-public-data/topics/taxirides-realtime"
72+
OUTPUT_TOPIC = f"projects/{PROJECT_ID}/topics/{OUTPUT_TOPIC_ID}"
73+
74+
75+
with DAG(
76+
DAG_ID,
77+
schedule="@once",
78+
start_date=datetime(2025, 2, 1),
79+
catchup=False,
80+
tags=["example", "dataflow", "java", "streaming"],
81+
) as dag:
82+
create_bucket = GCSCreateBucketOperator(task_id="create_bucket", bucket_name=BUCKET_NAME)
83+
download_file = GCSToLocalFilesystemOperator(
84+
task_id="download_file",
85+
object_name=f"dataflow/java/{JAR_FILE_NAME}",
86+
bucket=RESOURCE_BUCKET,
87+
filename=LOCAL_JAR,
88+
)
89+
create_output_pub_sub_topic = PubSubCreateTopicOperator(
90+
task_id="create_topic", topic=OUTPUT_TOPIC_ID, project_id=PROJECT_ID, fail_if_exists=False
91+
)
92+
# [START howto_operator_start_java_streaming]
93+
94+
start_java_streaming_job_dataflow = BeamRunJavaPipelineOperator(
95+
runner=BeamRunnerType.DataflowRunner,
96+
task_id="start_java_streaming_dataflow_job",
97+
jar=LOCAL_JAR,
98+
pipeline_options={
99+
"tempLocation": GCS_TMP,
100+
"input_topic": INPUT_TOPIC,
101+
"output_topic": OUTPUT_TOPIC,
102+
"streaming": True,
103+
},
104+
dataflow_config={
105+
"job_name": f"java-streaming-job-{ENV_ID}",
106+
"location": LOCATION,
107+
},
108+
)
109+
# [END howto_operator_start_java_streaming]
110+
stop_dataflow_job = DataflowStopJobOperator(
111+
task_id="stop_dataflow_job",
112+
location=LOCATION,
113+
job_id="{{ task_instance.xcom_pull(task_ids='start_java_streaming_dataflow_job')['dataflow_job_id'] }}",
114+
)
115+
delete_topic = PubSubDeleteTopicOperator(
116+
task_id="delete_topic", topic=OUTPUT_TOPIC_ID, project_id=PROJECT_ID
117+
)
118+
delete_topic.trigger_rule = TriggerRule.ALL_DONE
119+
delete_bucket = GCSDeleteBucketOperator(
120+
task_id="delete_bucket", bucket_name=BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE
121+
)
122+
123+
(
124+
# TEST SETUP
125+
create_bucket
126+
>> download_file
127+
>> create_output_pub_sub_topic
128+
# TEST BODY
129+
>> start_java_streaming_job_dataflow
130+
# TEST TEARDOWN
131+
>> stop_dataflow_job
132+
>> delete_topic
133+
>> delete_bucket
134+
)
135+
136+
from tests_common.test_utils.watcher import watcher
137+
138+
# This test needs watcher in order to properly mark success/failure
139+
# when "teardown" task with trigger rule is part of the DAG
140+
list(dag.tasks) >> watcher()
141+
142+
from tests_common.test_utils.system_tests import get_test_run # noqa: E402
143+
144+
# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
145+
test_run = get_test_run(dag)
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
# Dataflow Java Streaming
2+
3+
This project is a streaming example running with latest Apache Beam Java SDK (v.2.63),
4+
as there is no "official" streaming example yet https://beam.apache.org/get-started/wordcount-example/.
5+
6+
Sample logic to direct streaming data from input Pub/Sub source to the output PubSub topic to work
7+
with unbounded data source/destination.
8+
9+
That used for Java dataflow streaming system tests.
10+
11+
12+
## Requirements
13+
14+
- **Java 17**: Ensure you have Java 17 installed and configured on your system.
15+
- **Maven**: Make sure Maven is installed and configured on your system.
16+
- Maven is used for dependency management and building the project.
17+
18+
## Project Structure
19+
20+
The project's structure is as follows:
21+
22+
```plaintext
23+
├── src
24+
│ ├── main
25+
│ │ ├── java
26+
│ │ │ └── org
27+
│ │ │ └── example
28+
│ │ │ └── pubsub
29+
│ │ │ └── StreamingExample.java
30+
├── pom.xml
31+
└── README.md
32+
```
33+
34+
## Build
35+
It was checked to build inside Breeze container with dependencies installed from the [requirements](#requirements).
36+
37+
The output artifact is `target/stream-pubsub-example-bundled-v-0.1.jar` executable.
38+
39+
40+
## Run
41+
To run use the
42+
```bash
43+
java -jar target/stream_pubsub-bundled-sample-0.1.jar \
44+
--runner=DataflowRunner \
45+
--project=<project_id> \
46+
--region=<location_id> \
47+
--jobName=<df_job_name> \
48+
--input_toopic=<input_PubSub_topic> \
49+
--output_topic=<input_PubSub_topic>
50+
```
51+
52+
optionally you might add the
53+
`'--labels=<custom_labels_dict>'` or `--tempLocation=<gcs_path>` and `--stagingLocation=<gcs_path>`
54+
or other dataflow pipeline options, if needed.
55+
56+
57+
## Runners
58+
The `DataflowRunner` and `DirectRunner` are supported.
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
<!--
2+
Licensed to the Apache Software Foundation (ASF) under one
3+
or more contributor license agreements. See the NOTICE file
4+
distributed with this work for additional information
5+
regarding copyright ownership. The ASF licenses this file
6+
to you under the Apache License, Version 2.0 (the
7+
"License"); you may not use this file except in compliance
8+
with the License. You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing,
13+
software distributed under the License is distributed on an
14+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
KIND, either express or implied. See the License for the
16+
specific language governing permissions and limitations
17+
under the License.
18+
-->
19+
20+
<project xmlns="http://maven.apache.org/POM/4.0.0"
21+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
22+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://www.apache.org/xsd/maven-4.0.0.xsd">
23+
<modelVersion>4.0.0</modelVersion>
24+
<groupId>org.example</groupId>
25+
<artifactId>stream-pubsub-example</artifactId>
26+
<version>v-0.1</version>
27+
<properties>
28+
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
29+
<beam.version>2.63.0</beam.version>
30+
<slf4j.version>2.0.16</slf4j.version>
31+
<google.cloud.version>1.113.0</google.cloud.version>
32+
<secretmanager.version>2.45.0</secretmanager.version>
33+
<iamcredentials.version>2.45.0</iamcredentials.version>
34+
<maven.compiler.source>17</maven.compiler.source>
35+
<maven.compiler.target>17</maven.compiler.target>
36+
37+
<maven-compiler-plugin.version>3.8.1</maven-compiler-plugin.version>
38+
<maven-shade-plugin.version>3.2.4</maven-shade-plugin.version>
39+
</properties>
40+
<dependencies>
41+
<!-- Beam Dependencies-->
42+
<dependency>
43+
<groupId>org.apache.beam</groupId>
44+
<artifactId>beam-sdks-java-core</artifactId>
45+
<version>${beam.version}</version>
46+
</dependency>
47+
<dependency>
48+
<groupId>org.apache.beam</groupId>
49+
<artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
50+
<version>${beam.version}</version>
51+
</dependency>
52+
<!-- Direct runner might be used for tests-->
53+
<dependency>
54+
<groupId>org.apache.beam</groupId>
55+
<artifactId>beam-runners-direct-java</artifactId>
56+
<version>${beam.version}</version>
57+
</dependency>
58+
59+
<dependency>
60+
<groupId>org.apache.beam</groupId>
61+
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
62+
<version>${beam.version}</version>
63+
</dependency>
64+
65+
<dependency>
66+
<groupId>com.google.cloud</groupId>
67+
<artifactId>google-cloud-pubsub</artifactId>
68+
<version>${google.cloud.version}</version>
69+
</dependency>
70+
<!-- Log -->
71+
<!-- Add slf4j API frontend binding with JUL backend on runtime-->
72+
<dependency>
73+
<groupId>org.slf4j</groupId>
74+
<artifactId>slf4j-jdk14</artifactId>
75+
<version>${slf4j.version}</version>
76+
<scope>runtime</scope>
77+
</dependency>
78+
79+
<dependency>
80+
<groupId>org.slf4j</groupId>
81+
<artifactId>slf4j-api</artifactId>
82+
<version>${slf4j.version}</version>
83+
</dependency>
84+
85+
</dependencies>
86+
87+
<build>
88+
<plugins>
89+
<plugin>
90+
<groupId>org.apache.maven.plugins</groupId>
91+
<artifactId>maven-compiler-plugin</artifactId>
92+
<version>${maven-compiler-plugin.version}</version>
93+
</plugin>
94+
<plugin>
95+
<groupId>org.apache.maven.plugins</groupId>
96+
<artifactId>maven-shade-plugin</artifactId>
97+
<version>${maven-shade-plugin.version}</version>
98+
<executions>
99+
<execution>
100+
<phase>package</phase>
101+
<goals>
102+
<goal>shade</goal>
103+
</goals>
104+
<configuration>
105+
<finalName>${project.artifactId}-bundled-${project.version}</finalName>
106+
<filters>
107+
<filter>
108+
<artifact>*:*</artifact>
109+
<excludes>
110+
<exclude>META-INF/LICENSE</exclude>
111+
<exclude>META-INF/*.SF</exclude>
112+
<exclude>META-INF/*.DSA</exclude>
113+
<exclude>META-INF/*.RSA</exclude>
114+
</excludes>
115+
</filter>
116+
</filters>
117+
<transformers>
118+
<transformer
119+
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
120+
<transformer
121+
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
122+
<mainClass>org.example.pubsub.StreamingExample</mainClass>
123+
</transformer>
124+
</transformers>
125+
</configuration>
126+
</execution>
127+
</executions>
128+
</plugin>
129+
</plugins>
130+
</build>
131+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package org.example.pubsub;
2+
3+
import org.apache.beam.sdk.Pipeline;
4+
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
5+
import org.apache.beam.sdk.options.Default;
6+
import org.apache.beam.sdk.options.Description;
7+
import org.apache.beam.sdk.options.PipelineOptions;
8+
import org.apache.beam.sdk.options.StreamingOptions;
9+
import org.apache.beam.sdk.options.PipelineOptionsFactory;
10+
import org.apache.beam.sdk.options.Validation;
11+
import java.util.logging.Logger;
12+
13+
14+
public class StreamingExample {
15+
16+
public interface StreamingExampleOptions extends PipelineOptions, StreamingOptions {
17+
@Description("Input Pub/Sub Topic")
18+
@Validation.Required
19+
String getInput_topic();
20+
void setInput_topic(String inputTopic);
21+
22+
@Description("Output Pub/Sub Topic")
23+
@Validation.Required
24+
String getOutput_topic();
25+
void setOutput_topic(String outputTopic);
26+
}
27+
28+
public static void main(String[] args) {
29+
StreamingExampleOptions options = PipelineOptionsFactory
30+
.fromArgs(args)
31+
.withValidation()
32+
.as(StreamingExampleOptions.class);
33+
options.setStreaming(true);
34+
35+
Pipeline pipeline = Pipeline.create(options);
36+
pipeline
37+
.apply("ReadFromPubSub", PubsubIO.readStrings().fromTopic(options.getInput_topic()))
38+
.apply("WriteToPubSub", PubsubIO.writeStrings().to(options.getOutput_topic()));
39+
40+
pipeline.run();
41+
}
42+
}

0 commit comments

Comments
 (0)