Skip to content

Commit cc490e7

Browse files
committed
Improve map_fn_microbenchmark
- add fixed window test - add side input tests - improve by using benchmark helpers
1 parent 649e33c commit cc490e7

File tree

2 files changed

+108
-31
lines changed

2 files changed

+108
-31
lines changed

sdks/python/apache_beam/tools/map_fn_microbenchmark.py

Lines changed: 83 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
2424
This executes the same codepaths that are run on the Fn API (and Dataflow)
2525
workers, but is generally easier to run (locally) and more stable. It does
26-
not, on the other hand, excercise any non-trivial amount of IO (e.g. shuffle).
26+
not, on the other hand, exercise any non-trivial amount of IO (e.g. shuffle).
2727
2828
Run as
2929
@@ -32,41 +32,100 @@
3232

3333
# pytype: skip-file
3434

35+
import argparse
3536
import logging
36-
import time
37-
38-
from scipy import stats
3937

4038
import apache_beam as beam
4139
from apache_beam.tools import utils
40+
from apache_beam.transforms.window import FixedWindows
4241

4342

44-
def run_benchmark(num_maps=100, num_runs=10, num_elements_step=1000):
45-
timings = {}
46-
for run in range(num_runs):
47-
num_elements = num_elements_step * run + 1
48-
start = time.time()
43+
def map_pipeline(num_elements, num_maps=100):
44+
def _pipeline_runner():
4945
with beam.Pipeline() as p:
5046
pc = p | beam.Create(list(range(num_elements)))
5147
for ix in range(num_maps):
5248
pc = pc | 'Map%d' % ix >> beam.FlatMap(lambda x: (None, ))
53-
timings[num_elements] = time.time() - start
54-
print(
55-
"%6d element%s %g sec" % (
56-
num_elements,
57-
" " if num_elements == 1 else "s",
58-
timings[num_elements]))
59-
60-
print()
61-
# pylint: disable=unused-variable
62-
gradient, intercept, r_value, p_value, std_err = stats.linregress(
63-
*list(zip(*list(timings.items()))))
64-
print("Fixed cost ", intercept)
65-
print("Per-element ", gradient / num_maps)
66-
print("R^2 ", r_value**2)
49+
50+
return _pipeline_runner
51+
52+
53+
def map_with_global_side_input_pipeline(num_elements, num_maps=100):
54+
def add(element, side_input):
55+
return element + side_input
56+
57+
def _pipeline_runner():
58+
with beam.Pipeline() as p:
59+
side = p | 'CreateSide' >> beam.Create([1])
60+
pc = p | 'CreateMain' >> beam.Create(list(range(num_elements)))
61+
for ix in range(num_maps):
62+
pc = pc | 'Map%d' % ix >> beam.Map(add, beam.pvalue.AsSingleton(side))
63+
64+
return _pipeline_runner
65+
66+
67+
def map_with_fixed_window_side_input_pipeline(num_elements, num_maps=100):
68+
def add(element, side_input):
69+
return element + side_input
70+
71+
def _pipeline_runner():
72+
with beam.Pipeline() as p:
73+
side = p | 'CreateSide' >> beam.Create(
74+
[1]) | 'WindowSide' >> beam.WindowInto(FixedWindows(1000))
75+
pc = p | 'CreateMain' >> beam.Create(list(range(
76+
num_elements))) | 'WindowMain' >> beam.WindowInto(FixedWindows(1000))
77+
for ix in range(num_maps):
78+
pc = pc | 'Map%d' % ix >> beam.Map(add, beam.pvalue.AsSingleton(side))
79+
80+
return _pipeline_runner
81+
82+
83+
def run_benchmark(
84+
starting_point=1,
85+
num_runs=10,
86+
num_elements_step=100,
87+
verbose=True,
88+
profile_filename_base=None,
89+
):
90+
suite = [
91+
utils.BenchmarkConfig(
92+
map_with_fixed_window_side_input_pipeline,
93+
starting_point * 1000,
94+
num_runs,
95+
),
96+
utils.LinearRegressionBenchmarkConfig(
97+
map_pipeline, starting_point, num_elements_step, num_runs),
98+
utils.BenchmarkConfig(
99+
map_with_global_side_input_pipeline,
100+
starting_point * 1000,
101+
num_runs,
102+
),
103+
utils.BenchmarkConfig(
104+
map_with_global_side_input_pipeline_uncached,
105+
starting_point * 1000,
106+
num_runs,
107+
),
108+
]
109+
return utils.run_benchmarks(
110+
suite, verbose=verbose, profile_filename_base=profile_filename_base)
67111

68112

69113
if __name__ == '__main__':
70114
logging.basicConfig()
71115
utils.check_compiled('apache_beam.runners.common')
72-
run_benchmark()
116+
117+
parser = argparse.ArgumentParser()
118+
parser.add_argument('--num_runs', default=10, type=int)
119+
parser.add_argument('--starting_point', default=1, type=int)
120+
parser.add_argument('--increment', default=100, type=int)
121+
parser.add_argument('--verbose', default=True, type=bool)
122+
parser.add_argument('--profile_filename_base', default=None, type=str)
123+
options = parser.parse_args()
124+
125+
run_benchmark(
126+
options.starting_point,
127+
options.num_runs,
128+
options.increment,
129+
options.verbose,
130+
options.profile_filename_base,
131+
)

sdks/python/apache_beam/tools/utils.py

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
# pytype: skip-file
2121

2222
import collections
23+
import cProfile
2324
import gc
2425
import importlib
2526
import os
@@ -70,10 +71,11 @@ def __call__(self):
7071
size: int
7172
num_runs: int
7273

74+
def name(self):
75+
return getattr(self.benchmark, '__name__', str(self.benchmark))
76+
7377
def __str__(self):
74-
return "%s, %s element(s)" % (
75-
getattr(self.benchmark, '__name__', str(self.benchmark)),
76-
str(self.size))
78+
return "%s, %s element(s)" % (self.name(), str(self.size))
7779

7880

7981
class LinearRegressionBenchmarkConfig(NamedTuple):
@@ -102,14 +104,15 @@ def __call__(self):
102104
increment: int
103105
num_runs: int
104106

107+
def name(self):
108+
return getattr(self.benchmark, '__name__', str(self.benchmark))
109+
105110
def __str__(self):
106111
return "%s, %s element(s) at start, %s growth per run" % (
107-
getattr(self.benchmark, '__name__', str(self.benchmark)),
108-
str(self.starting_point),
109-
str(self.increment))
112+
self.name(), str(self.starting_point), str(self.increment))
110113

111114

112-
def run_benchmarks(benchmark_suite, verbose=True):
115+
def run_benchmarks(benchmark_suite, verbose=True, profile_filename_base=None):
113116
"""Runs benchmarks, and collects execution times.
114117
115118
A simple instrumentation to run a callable several times, collect and print
@@ -118,17 +121,25 @@ def run_benchmarks(benchmark_suite, verbose=True):
118121
Args:
119122
benchmark_suite: A list of BenchmarkConfig.
120123
verbose: bool, whether to print benchmark results to stdout.
124+
profile_filename_base: str, if present each benchmark will be profiled and
125+
have stats dumped to per-benchmark files beginning with this prefix.
121126
122127
Returns:
123128
A dictionary of the form string -> list of floats. Keys of the dictionary
124129
are benchmark names, values are execution times in seconds for each run.
125130
"""
131+
profiler = cProfile.Profile() if profile_filename_base else None
132+
126133
def run(benchmark: BenchmarkFactoryFn, size: int):
127134
# Contain each run of a benchmark inside a function so that any temporary
128135
# objects can be garbage-collected after the run.
129136
benchmark_instance_callable = benchmark(size)
130137
start = time.time()
138+
if profiler:
139+
profiler.enable()
131140
_ = benchmark_instance_callable()
141+
if profiler:
142+
profiler.disable()
132143
return time.time() - start
133144

134145
cost_series = collections.defaultdict(list)
@@ -161,6 +172,13 @@ def run(benchmark: BenchmarkFactoryFn, size: int):
161172

162173
# Incrementing the size of the benchmark run by the step size
163174
size += step
175+
176+
if profiler:
177+
filename = profile_filename_base + benchmark_config.name() + '.prof'
178+
if verbose:
179+
print("Dumping profile to " + filename)
180+
profiler.dump_stats(filename)
181+
164182
if verbose:
165183
print("")
166184

0 commit comments

Comments
 (0)