Skip to content
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
90 commits
Select commit Hold shift + click to select a range
4d9c497
Enable prism by default
damccorm Apr 11, 2025
99837ca
Clean up fallback code
damccorm Apr 11, 2025
e28e463
Clean up fallback code
damccorm Apr 11, 2025
837ba2f
Add missing base case
damccorm Apr 11, 2025
3233b69
Add missing base case
damccorm Apr 11, 2025
4929906
Fix tests
damccorm Apr 14, 2025
ad9b0dc
Fix tests
damccorm Apr 14, 2025
20370cb
exclude unsupported state
damccorm Apr 15, 2025
cbaa083
Merge branch 'master' of https://github.com/apache/beam into users/da…
damccorm Apr 16, 2025
7e13214
Merge branch 'master' of https://github.com/apache/beam into users/da…
damccorm May 12, 2025
a6db1a4
Exclude locally materialized results
damccorm May 12, 2025
1e49f55
Fix fallback/dupe execution
damccorm May 12, 2025
e6ca143
Fix some snippet tests
damccorm May 13, 2025
4e3a675
Disable argparse abbreviation
damccorm May 13, 2025
a9e07a7
Some test fixes for prism switch
damccorm May 14, 2025
94ad26a
Merge in master
damccorm May 14, 2025
bda68b1
lint
damccorm May 15, 2025
d8b3534
row fixes
damccorm May 15, 2025
c21dafa
Make row comp logic default
damccorm May 15, 2025
b8343e6
Add more yaml examples
damccorm May 15, 2025
e08ead5
Fix dataframes tests
damccorm May 16, 2025
095caca
Merge branch 'master' of https://github.com/apache/beam into users/da…
damccorm May 16, 2025
81e5c54
Examples fixes
damccorm May 16, 2025
9250510
type hints
damccorm May 16, 2025
3d613c5
Some more transform test fixes
damccorm May 17, 2025
81e5e8a
more test fixes
damccorm May 19, 2025
99c07a7
Merge branch 'master' of https://github.com/apache/beam into users/da…
damccorm May 29, 2025
d6ff73e
More generic error checks
damccorm May 29, 2025
9b8218a
Reshuffle tests
damccorm May 29, 2025
f62e442
enrichment error catching
damccorm May 30, 2025
22fafe7
more test fixes
damccorm May 30, 2025
5e75f32
Scope out external transforms
damccorm May 30, 2025
baf2468
simplify test
damccorm May 30, 2025
e057572
Fix more tests
damccorm May 30, 2025
e3e757b
Clean up test
damccorm Jun 2, 2025
77f682e
correct runner mode
damccorm Jun 2, 2025
a70a33f
ML tests
damccorm Jun 2, 2025
2678f7d
ib collect test
damccorm Jun 2, 2025
9b7703e
Make sure assertions dont fire in incorrect order
damccorm Jun 2, 2025
60f0918
ML test fixes
damccorm Jun 3, 2025
f04c952
typing
damccorm Jun 3, 2025
621749b
More fixes
damccorm Jun 3, 2025
81ac404
some more fixes
damccorm Jun 3, 2025
2b6c279
Another error fix
damccorm Jun 3, 2025
095b1a3
Temporarily set log level to debug
damccorm Jun 3, 2025
84ade39
yapf
damccorm Jun 3, 2025
998dccf
More error regex fixes
damccorm Jun 3, 2025
e3dfb34
Fix some error messages/metric tests
damccorm Jun 4, 2025
1af63b7
more generic tests
damccorm Jun 4, 2025
ce747b5
Upgrade logging to warning to see what is happening
damccorm Jun 5, 2025
9a7529d
Some more patches
damccorm Jun 6, 2025
8f76ca4
Wait until finish for test pipelines
damccorm Jun 9, 2025
7b43e33
Merge branch 'master' of https://github.com/apache/beam into users/da…
damccorm Jun 9, 2025
a10a882
fix test_always
damccorm Jun 9, 2025
8dccd5b
A few more small fixes
damccorm Jun 9, 2025
c0fc545
Merge in master
damccorm Jun 12, 2025
3f73418
Temporarily update logging
damccorm Jun 16, 2025
aa4b03b
Merge branch 'master' of https://github.com/apache/beam into users/da…
damccorm Jun 20, 2025
f21b94a
clean up merge
damccorm Jun 20, 2025
41c20e0
Some more exclusions
damccorm Jun 23, 2025
9aff0d1
Merge branch 'users/damccorm/prismByDefault' of https://github.com/ap…
damccorm Jun 23, 2025
8daca3b
regex isssues
damccorm Jun 23, 2025
34a4f13
Propogate original failure when using instruction cache
damccorm Jun 23, 2025
90edb60
Batching fix
damccorm Jun 23, 2025
544fe04
Trigger some postcommits
damccorm Jun 23, 2025
7f52afa
fmt
damccorm Jun 23, 2025
1efa877
More test fixes
damccorm Jun 23, 2025
6504c33
Fix a few more tests
damccorm Jun 24, 2025
50495a4
linting
damccorm Jun 24, 2025
2410ca6
Bump workflow timeout (#35420)
damccorm Jun 25, 2025
46bb8a9
linting/fixes
damccorm Jun 26, 2025
2d5cd07
Merge branch 'users/damccorm/prismByDefault' of https://github.com/ap…
damccorm Jun 26, 2025
051a4ed
Merge in master
damccorm Jun 26, 2025
7909c00
linting
damccorm Jun 26, 2025
2ca2f91
Avoid problem with temp file getting deleted
damccorm Jun 27, 2025
96d9f5b
Avoid problem with temp file getting deleted
damccorm Jun 27, 2025
8a8ed63
minor cleanup
damccorm Jun 27, 2025
83472ce
Some more minor fixes
damccorm Jun 27, 2025
6efda27
Merge branch 'master' of https://github.com/apache/beam into users/da…
damccorm Jun 27, 2025
8fde571
Merge in master
damccorm Jun 27, 2025
068d3a0
Fix test
damccorm Jun 27, 2025
a5dde5b
Fix another test with questionable assumptions
damccorm Jun 30, 2025
cb97e70
Dont wait on tmpfile being destroyed
damccorm Jun 30, 2025
3644d18
Merge branch 'master' of https://github.com/apache/beam into users/da…
damccorm Jul 1, 2025
9e2eb68
More regex checks
damccorm Jul 14, 2025
ae1054f
Merge in master
damccorm Jul 14, 2025
83ec444
Add more postcommits, clean up some error messages
damccorm Jul 14, 2025
c1e75b3
Clean up a few tests
damccorm Jul 14, 2025
fa1e54d
simplify test
damccorm Jul 14, 2025
9385804
Clean up test errors
damccorm Jul 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions sdks/python/apache_beam/dataframe/transforms_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,11 @@ def create_animal_speed_input(p):
reshuffle=False)

def test_loc_filter(self):
with beam.Pipeline() as p:
# TODO(https://github.com/apache/beam/issues/34549): This test relies on
# monitoring_metrics property of the FnApiRunner which does not exist on
# other runners like Prism.
# https://github.com/apache/beam/blob/5f9cd73b7c9a2f37f83971ace3a399d633201dd1/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L1590
with beam.Pipeline('FnApiRunner') as p:
_ = (
self.create_animal_speed_input(p)
| transforms.DataframeTransform(lambda df: df[df.Speed > 10]))
Expand All @@ -383,7 +387,11 @@ def set_column(df, name, s):
df[name] = s
return df

with beam.Pipeline() as p:
# TODO(https://github.com/apache/beam/issues/34549): This test relies on
# monitoring_metrics property of the FnApiRunner which does not exist on
# other runners like Prism.
# https://github.com/apache/beam/blob/5f9cd73b7c9a2f37f83971ace3a399d633201dd1/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L1590
with beam.Pipeline('FnApiRunner') as p:
_ = (
self.create_animal_speed_input(p)
| transforms.DataframeTransform(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def test_leader_board_users(self):
result = (
self.create_data(p)
| leader_board.CalculateUserScores(allowed_lateness=120))
assert_that(result, equal_to([]))
assert_that(result, equal_to([('user1_team1', 50), ('user2_team2', 2), ('user3_team3', 8), ('user4_team3', 5)]))


if __name__ == '__main__':
Expand Down
6 changes: 3 additions & 3 deletions sdks/python/apache_beam/examples/snippets/snippets_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ def test_bad_types(self):
# When running this pipeline, you'd get a runtime error,
# possibly on a remote machine, possibly very late.

with self.assertRaises(TypeError):
with self.assertRaisesRegex(RuntimeError, "TypeError"):
p.run()

# To catch this early, we can assert what types we expect.
Expand Down Expand Up @@ -372,7 +372,7 @@ def process(self, element):
# When running this pipeline, you'd get a runtime error,
# possibly on a remote machine, possibly very late.

with self.assertRaises(TypeError):
with self.assertRaisesRegex(RuntimeError, "TypeError"):
p.run()

# To catch this early, we can annotate process() with the expected types.
Expand Down Expand Up @@ -439,7 +439,7 @@ def test_runtime_checks_off(self):

def test_runtime_checks_on(self):
# pylint: disable=expression-not-assigned
with self.assertRaises(typehints.TypeCheckError):
with self.assertRaisesRegex(RuntimeError, "TypeCheckError"):
# [START type_hints_runtime_on]
p = TestPipeline(options=PipelineOptions(runtime_type_check=True))
p | beam.Create(['a']) | beam.Map(lambda x: 3).with_output_types(str)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@


def pardo_dofn_methods(test=None):
# Portable runners do not guarantee that teardown will be executed, so we
# use FnApiRunner instead of prism.
runner='FnApiRunner'
# [START pardo_dofn_methods]
import apache_beam as beam

Expand All @@ -60,9 +63,13 @@ def finish_bundle(self):
)

def teardown(self):
# Teardown is best effort and not guaranteed to be executed by all
# runners in all cases (for example, it may be skipped if the pipeline
# can otherwise complete). It should be used for best effort resource
# cleanup.
print('teardown')

with beam.Pipeline() as pipeline:
with beam.Pipeline(runner) as pipeline:
results = (
pipeline
| 'Create inputs' >> beam.Create(['🍓', '🥕', '🍆', '🍅', '🥔'])
Expand Down
4 changes: 2 additions & 2 deletions sdks/python/apache_beam/io/fileio_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def test_match_files_one_directory_failure1(self):
files.append(self._create_temp_file(dir=directories[0]))
files.append(self._create_temp_file(dir=directories[0]))

with self.assertRaises(beam.io.filesystem.BeamIOError):
with self.assertRaisesRegex(RuntimeError, "BeamIOError"):
with TestPipeline() as p:
files_pc = (
p
Expand Down Expand Up @@ -259,7 +259,7 @@ def test_fail_on_directories(self):
files.append(self._create_temp_file(dir=tempdir, content=content))
files.append(self._create_temp_file(dir=tempdir, content=content))

with self.assertRaises(beam.io.filesystem.BeamIOError):
with self.assertRaisesRegex(RuntimeError, "BeamIOError"):
with TestPipeline() as p:
_ = (
p
Expand Down
5 changes: 2 additions & 3 deletions sdks/python/apache_beam/metrics/metric_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,16 +201,15 @@ def process(self, element):
# Verify user distribution counter.
metric_results = res.metrics().query()
matcher = MetricResultMatcher(
step='ApplyPardo',
step=hc.contains_string('ApplyPardo'),
namespace=hc.contains_string('SomeDoFn'),
name='element_dist',
committed=DistributionMatcher(
sum_value=hc.greater_than_or_equal_to(0),
count_value=hc.greater_than_or_equal_to(0),
min_value=hc.greater_than_or_equal_to(0),
max_value=hc.greater_than_or_equal_to(0)))
hc.assert_that(
metric_results['distributions'], hc.contains_inanyorder(matcher))
hc.assert_that(metric_results['distributions'], hc.has_item(matcher))

def test_create_counter_distribution(self):
sampler = statesampler.StateSampler('', counters.CounterFactory())
Expand Down
1 change: 1 addition & 0 deletions sdks/python/apache_beam/options/pipeline_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,7 @@ class StandardOptions(PipelineOptions):
'apache_beam.runners.interactive.interactive_runner.InteractiveRunner',
'apache_beam.runners.portability.flink_runner.FlinkRunner',
'apache_beam.runners.portability.portable_runner.PortableRunner',
'apache_beam.runners.portability.fn_api_runner.FnApiRunner',
'apache_beam.runners.portability.prism_runner.PrismRunner',
'apache_beam.runners.portability.spark_runner.SparkRunner',
'apache_beam.runners.test.TestDirectRunner',
Expand Down
6 changes: 4 additions & 2 deletions sdks/python/apache_beam/pipeline_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ def test_pipeline_as_context(self):
def raise_exception(exn):
raise exn

with self.assertRaises(ValueError):
with self.assertRaisesRegex(RuntimeError, 'ValueError:'):
with Pipeline() as p:
# pylint: disable=expression-not-assigned
p | Create([ValueError('msg')]) | Map(raise_exception)
Expand Down Expand Up @@ -714,7 +714,9 @@ def test_incompatible_submission_and_runtime_envs_fail_pipeline(self):
RuntimeError,
'Pipeline construction environment and pipeline runtime '
'environment are not compatible.'):
with TestPipeline() as p:
# TODO(https://github.com/apache/beam/issues/34549): Prism doesn't
# pass through capabilities as part of the ProcessBundleDescriptor.
with TestPipeline('FnApiRunner') as p:
_ = p | Create([None])


Expand Down
56 changes: 36 additions & 20 deletions sdks/python/apache_beam/runners/direct/direct_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
from apache_beam.transforms.ptransform import PTransform
from apache_beam.transforms.timeutil import TimeDomain
from apache_beam.typehints import trivial_inference
from apache_beam.utils.interactive_utils import is_in_ipython

__all__ = ['BundleBasedDirectRunner', 'DirectRunner', 'SwitchingDirectRunner']

Expand Down Expand Up @@ -114,7 +115,11 @@ class _PrismRunnerSupportVisitor(PipelineVisitor):
"""Visitor determining if a Pipeline can be run on the PrismRunner."""
def accept(self, pipeline):
self.supported_by_prism_runner = True
pipeline.visit(self)
# TODO(https://github.com/apache/beam/issues/33623): Prism currently does not support interactive mode
if is_in_ipython():
self.supported_by_prism_runner = False
else:
pipeline.visit(self)
return self.supported_by_prism_runner

def visit_transform(self, applied_ptransform):
Expand All @@ -125,6 +130,12 @@ def visit_transform(self, applied_ptransform):
self.supported_by_prism_runner = False
if isinstance(transform, beam.ParDo):
dofn = transform.dofn

# https://github.com/apache/beam/issues/34549
# Remote once we can support local materialization
if (hasattr(dofn, 'is_materialize_values_do_fn') and
dofn.is_materialize_values_do_fn):
self.supported_by_prism_runner = False
# It's uncertain if the Prism Runner supports execution of CombineFns
# with deferred side inputs.
if isinstance(dofn, CombineValuesDoFn):
Expand All @@ -136,41 +147,33 @@ def visit_transform(self, applied_ptransform):
if userstate.is_stateful_dofn(dofn):
# https://github.com/apache/beam/issues/32786 -
# Remove once Real time clock is used.
_, timer_specs = userstate.get_dofn_specs(dofn)
state_specs, timer_specs = userstate.get_dofn_specs(dofn)
for timer in timer_specs:
if timer.time_domain == TimeDomain.REAL_TIME:
self.supported_by_prism_runner = False

tryingPrism = False
for state in state_specs:
if isinstance(state, userstate.CombiningValueStateSpec):
self.supported_by_prism_runner = False

# Use BundleBasedDirectRunner if other runners are missing needed features.
runner = BundleBasedDirectRunner()

# Check whether all transforms used in the pipeline are supported by the
# FnApiRunner, and the pipeline was not meant to be run as streaming.
if _FnApiRunnerSupportVisitor().accept(pipeline):
from apache_beam.portability.api import beam_provision_api_pb2
from apache_beam.runners.portability.fn_api_runner import fn_runner
from apache_beam.runners.portability.portable_runner import JobServiceHandle
all_options = options.get_all_options()
encoded_options = JobServiceHandle.encode_pipeline_options(all_options)
provision_info = fn_runner.ExtendedProvisionInfo(
beam_provision_api_pb2.ProvisionInfo(
pipeline_options=encoded_options))
runner = fn_runner.FnApiRunner(provision_info=provision_info)
elif _PrismRunnerSupportVisitor().accept(pipeline):
# PrismRunner
if _PrismRunnerSupportVisitor().accept(pipeline):
_LOGGER.info('Running pipeline with PrismRunner.')
from apache_beam.runners.portability import prism_runner
runner = prism_runner.PrismRunner()
tryingPrism = True
else:
runner = BundleBasedDirectRunner()

if tryingPrism:
try:
pr = runner.run_pipeline(pipeline, options)
# This is non-blocking, so if the state is *already* finished, something
# probably failed on job submission.
if (PipelineState.is_terminal(pr.state) and
pr.state != PipelineState.DONE):
_LOGGER.info(
'Pipeline failed on PrismRunner, falling back toDirectRunner.')
'Pipeline failed on PrismRunner, falling back to DirectRunner.')
runner = BundleBasedDirectRunner()
else:
return pr
Expand All @@ -182,6 +185,19 @@ def visit_transform(self, applied_ptransform):
_LOGGER.info('Falling back to DirectRunner')
runner = BundleBasedDirectRunner()

# Check whether all transforms used in the pipeline are supported by the
# FnApiRunner, and the pipeline was not meant to be run as streaming.
if _FnApiRunnerSupportVisitor().accept(pipeline):
from apache_beam.portability.api import beam_provision_api_pb2
from apache_beam.runners.portability.fn_api_runner import fn_runner
from apache_beam.runners.portability.portable_runner import JobServiceHandle
all_options = options.get_all_options()
encoded_options = JobServiceHandle.encode_pipeline_options(all_options)
provision_info = fn_runner.ExtendedProvisionInfo(
beam_provision_api_pb2.ProvisionInfo(
pipeline_options=encoded_options))
runner = fn_runner.FnApiRunner(provision_info=provision_info)

return runner.run_pipeline(pipeline, options)


Expand Down
7 changes: 4 additions & 3 deletions sdks/python/apache_beam/runners/direct/direct_runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,10 @@ class DirectPipelineResultTest(unittest.TestCase):
def test_waiting_on_result_stops_executor_threads(self):
pre_test_threads = set(t.ident for t in threading.enumerate())

for runner in ['DirectRunner',
'BundleBasedDirectRunner',
'SwitchingDirectRunner']:
for runner in [
'BundleBasedDirectRunner',
'apache_beam.runners.portability.fn_api_runner.fn_runner.FnApiRunner'
]:
pipeline = test_pipeline.TestPipeline(runner=runner)
_ = (pipeline | beam.Create([{'foo': 'bar'}]))
result = pipeline.run()
Expand Down
36 changes: 35 additions & 1 deletion sdks/python/apache_beam/testing/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
'matches_all',
# open_shards is internal and has no backwards compatibility guarantees.
'open_shards',
'row_namedtuple_equals_fn',
'TestWindowedValue',
]

Expand Down Expand Up @@ -167,7 +168,7 @@ def _equal(actual, equals_fn=equals_fn):
# collection. It can also raise false negatives for types that don't have
# a deterministic sort order, like pyarrow Tables as of 0.14.1
if not equals_fn:
equals_fn = lambda e, a: e == a
equals_fn = row_namedtuple_equals_fn
try:
sorted_expected = sorted(expected)
sorted_actual = sorted(actual)
Expand Down Expand Up @@ -202,6 +203,33 @@ def _equal(actual, equals_fn=equals_fn):
return _equal


def row_namedtuple_equals_fn(expected, actual, fallback_equals_fn=None):
"""
equals_fn which can be used by equal_to which treats Rows and
NamedTuples as equivalent types. This can be useful since Beam converts
Rows to NamedTuples when they are sent across portability layers, so a Row
may be converted to a NamedTuple automatically by Beam.
"""
if fallback_equals_fn is None:
fallback_equals_fn = lambda e, a: e == a
if type(expected) is not pvalue.Row and not _is_named_tuple(expected):
return fallback_equals_fn(expected, actual)
if type(actual) is not pvalue.Row and not _is_named_tuple(actual):
return fallback_equals_fn(expected, actual)

expected_dict = expected._asdict()
actual_dict = actual._asdict()
if len(expected_dict) != len(actual_dict):
return False
for k, v in expected_dict.items():
if k not in actual_dict:
return False
if not row_namedtuple_equals_fn(v, actual_dict[k]):
return False

return True


def matches_all(expected):
"""Matcher used by assert_that to check a set of matchers.

Expand Down Expand Up @@ -386,5 +414,11 @@ def _sort_lists(result):
return result


def _is_named_tuple(obj) -> bool:
return (
isinstance(obj, tuple) and hasattr(obj, '_asdict') and
hasattr(obj, '_fields'))


# A utility transform that recursively sorts lists for easier testing.
SortLists = Map(_sort_lists)
Loading
Loading