Skip to content

Commit f7d5e13

Browse files
jrmccluskeytvalentyn
authored andcommitted
Write how-to doc on dataflow cost benchmarking (apache#33702)
* Write how-to doc on dataflow cost benchmarking * trailing whitespace * add streaming information, links * add context for BQ stuff * remove trailing whitespace * Apply suggestions from code review Co-authored-by: tvalentyn <tvalentyn@users.noreply.github.com> * Elaborate on requirements * remove errant char --------- Co-authored-by: tvalentyn <tvalentyn@users.noreply.github.com>
1 parent 220c74d commit f7d5e13

File tree

1 file changed

+122
-0
lines changed
  • sdks/python/apache_beam/testing/benchmarks

1 file changed

+122
-0
lines changed
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
<!--
2+
Licensed to the Apache Software Foundation (ASF) under one
3+
or more contributor license agreements. See the NOTICE file
4+
distributed with this work for additional information
5+
regarding copyright ownership. The ASF licenses this file
6+
to you under the Apache License, Version 2.0 (the
7+
"License"); you may not use this file except in compliance
8+
with the License. You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing,
13+
software distributed under the License is distributed on an
14+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
KIND, either express or implied. See the License for the
16+
specific language governing permissions and limitations
17+
under the License.
18+
-->
19+
20+
# Writing a Dataflow Cost Benchmark
21+
22+
Writing a Dataflow Cost Benchmark to estimate the financial cost of executing a pipeline on Google Cloud Platform Dataflow requires 4 components in the repository:
23+
24+
1. A pipeline to execute (ideally one located in the examples directory)
25+
1. A text file with pipeline options in the `.github/workflows/cost-benchmarks-pipeline-options` [directory](../../../../../.github/workflows/cost-benchmarks-pipeline-options)
26+
1. A python file with a class inheriting from the `DataflowCostBenchmark` [class](../load_tests/dataflow_cost_benchmark.py)
27+
1. An entry to execute the pipeline as part of the cost benchmarks workflow action
28+
29+
### Choosing a Pipeline
30+
Pipelines that are worth benchmarking in terms of performance and cost have a few straightforward requirements.
31+
32+
1. The transforms used in the pipeline should be native to Beam *or* be lightweight and contain their source code in the pipeline code.
33+
* The performance impact of non-Beam transforms should be minimized since the aim is to benchmark Beam transforms on Dataflow, not custom user code.
34+
1. The pipeline itself should run on a consistent data set and have a consistent configuration.
35+
* For example, a `RunInference` benchmark should use the same model and version for each run, never pulling the latest release of a model for use.
36+
* The same focus on consistency extends to both the hardware and software configurations for the pipeline, from input data and model version all the way
37+
to which Google Cloud Platform region the Dataflow pipeline runs in. All of this configuration should be explicit and available in the repository as part
38+
of the benchmark's definition.
39+
1. The pipeline should perform some sort of behavior that would be common enough for a user to create themselves
40+
* Effectively, we want to read data from a source, do some sort of transformation, then write that data elsewhere. No need to overcomplicate things.
41+
42+
Additionally, the `run()` call for the pipeline should return a `PipelineResult` object, which the benchmark framework uses to query metrics from Dataflow after the run completes.
43+
44+
### Pipeline Options
45+
Once you have a functioning pipeline to configure as a benchmark, the options needs to be saved as a `.txt` file in the `.github/workflows/cost-benchmarks-pipeline-options` [directory](../../../../../.github/workflows/cost-benchmarks-pipeline-options).
46+
The file needs the Apache 2.0 license header at the top of the file, then each flag will need to be provided on a separate line. These arguments include:
47+
48+
* GCP Region (usually us-central1)
49+
* Machine Type
50+
* Number of Workers
51+
* Disk Size
52+
* Autoscaling Algorithm (set this to `NONE` for a more consistent benchmark signal)
53+
* Staging and Temp locations
54+
* A requirements file path in the repository (if additional dependencies are needed)
55+
* Benchmark-specific values
56+
* `publish_to_big_query`
57+
* This is always `true` for cost benchmarks
58+
* Metrics Dataset for Output
59+
* For `RunInference` workloads this will be `beam_run_inference`
60+
* Metrics Table for Output
61+
* This should be named after the benchmark being run
62+
63+
### Configuring the Test Class
64+
With the pipeline itself chosen and the arguments set, we can build out the test class that will execute the pipeline. Navigate to [`sdks/python/apache_beam/testing/benchmarks`](../../testing/benchmarks/) and select an appropriate sub-directory (or create one if necessary.)
65+
The class for `wordcount` is shown below:
66+
67+
```py
68+
import logging
69+
70+
from apache_beam.examples import wordcount
71+
from apache_beam.testing.load_tests.dataflow_cost_benchmark import DataflowCostBenchmark
72+
73+
74+
class WordcountCostBenchmark(DataflowCostBenchmark):
75+
def __init__(self):
76+
super().__init__()
77+
78+
def test(self):
79+
extra_opts = {}
80+
extra_opts['output'] = self.pipeline.get_option('output_file')
81+
self.result = wordcount.run(
82+
self.pipeline.get_full_options_as_args(**extra_opts),
83+
save_main_session=False)
84+
85+
86+
if __name__ == '__main__':
87+
logging.basicConfig(level=logging.INFO)
88+
WordcountCostBenchmark().run()
89+
```
90+
91+
The important notes here: if there are any arguments with common arg names (like `input` and `output`) you can use the `extra_opts` dictionary to map to them from alternatives in the options file.
92+
93+
You should also make sure that you save the output of the `run()` call from the pipeline in the `self.result` field, as the framework will try to re-run the pipeline without the extra opts if that value is missing. Beyond those two key notes, the benchmarking framework does all of the other work in terms of setup, teardown, and metrics querying.
94+
95+
#### Streaming Workloads
96+
97+
If the pipeline is a streaming use case, two versions need to be created: one operating on a backlog of work items (e.g. the entire test corpus is placed into the streaming source
98+
before the pipeline begins) and one operating in steady state (e.g. elements are added to the streaming source at a regular rate.) The former is relatively simple, simply add an extra
99+
step to the `test()` function to stage the input data into the streaming source being read from. For the latter, a separate Python thread should be spun up to stage one element at a time
100+
repeatedly over a given time interval (the interval between elements and the duration of the staging should be defined as part of the benchmark configuration.) Once the streaming pipeline
101+
is out of data and does not receive more for an extended period of time, the pipeline will exit and the benchmarking framework will process the results in the same manner as the batch case.
102+
In the steady state case, remember to call `join()` to close the thread after exectution.
103+
104+
### Updating the Benchmark Workflow
105+
Navigate to [`.github/workflows/beam_Python_CostBenchmarks_Dataflow.yml`](../../../../../.github/workflows/beam_Python_CostBenchmarks_Dataflow.yml) and make the following changes:
106+
107+
1. Add the pipeline options `.txt` file written above to the `argument-file-paths` list. This will load those pipeline options as an entry in the workflow environment, with the entry getting the value `env.beam_Python_Cost_Benchmarks_Dataflow_test_arguments_X`. X is an integer value corresponding to the position of the text file in the list of files, starting from `1`.
108+
2. Create an entry for the benchmark. The entry for wordcount is as follows:
109+
110+
```yaml
111+
- name: Run wordcount on Dataflow
112+
uses: ./.github/actions/gradle-command-self-hosted-action
113+
timeout-minutes: 30
114+
with:
115+
@@ -88,4 +92,14 @@ jobs:
116+
-PloadTest.mainClass=apache_beam.testing.benchmarks.wordcount.wordcount \
117+
-Prunner=DataflowRunner \
118+
-PpythonVersion=3.10 \
119+
'-PloadTest.args=${{ env.beam_Inference_Python_Benchmarks_Dataflow_test_arguments_1 }} --job_name=benchmark-tests-wordcount-python-${{env.NOW_UTC}} --output_file=gs://temp-storage-for-end-to-end-tests/wordcount/result_wordcount-${{env.NOW_UTC}}.txt' \
120+
```
121+
122+
The main class is the `DataflowCostBenchmark` subclass defined earlier, then the runner and python version are specified. The majority of pipeline arguments are loaded from the `.txt` file, with the job name and output being specified here. Be sure to set a reasonable timeout here as well.

0 commit comments

Comments
 (0)