Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions sdks/python/apache_beam/options/pipeline_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -1021,9 +1021,9 @@ def _add_argparse_args(cls, parser):
'updating-a-pipeline')
parser.add_argument(
'--enable_streaming_engine',
default=False,
default=True,
action='store_true',
help='Enable Windmill Service for this Dataflow job. ')
help='Deprecated. All Python pipelines use Streaming Engine. ')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: All Python pipelines on Dataflow?

parser.add_argument(
'--dataflow_kms_key',
default=None,
Expand Down
19 changes: 2 additions & 17 deletions sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -633,23 +633,8 @@ def _check_and_add_missing_streaming_options(options):
# Runner v2 only supports using streaming engine (aka windmill service)
if options.view_as(StandardOptions).streaming:
debug_options = options.view_as(DebugOptions)
google_cloud_options = options.view_as(GoogleCloudOptions)
if (not google_cloud_options.enable_streaming_engine and
(debug_options.lookup_experiment("enable_windmill_service") or
debug_options.lookup_experiment("enable_streaming_engine"))):
raise ValueError(
"""Streaming engine both disabled and enabled:
--enable_streaming_engine flag is not set, but
enable_windmill_service and/or enable_streaming_engine experiments
are present. It is recommended you only set the
--enable_streaming_engine flag.""")

# Ensure that if we detected a streaming pipeline that streaming specific
# options and experiments.
options.view_as(StandardOptions).streaming = True
google_cloud_options.enable_streaming_engine = True
debug_options.add_experiment("enable_streaming_engine")
debug_options.add_experiment("enable_windmill_service")
debug_options.add_experiment('enable_streaming_engine')
debug_options.add_experiment('enable_windmill_service')


def _is_runner_v2_disabled(options):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,10 +421,9 @@ def test_min_cpu_platform_flag_is_propagated_to_experiments(self):
'min_cpu_platform=Intel Haswell',
remote_runner.job.options.view_as(DebugOptions).experiments)

def test_streaming_engine_flag_adds_windmill_experiments(self):
def test_streaming_adds_windmill_experiments(self):
remote_runner = DataflowRunner()
self.default_properties.append('--streaming')
self.default_properties.append('--enable_streaming_engine')
self.default_properties.append('--experiment=some_other_experiment')

with Pipeline(remote_runner, PipelineOptions(self.default_properties)) as p:
Expand Down
Loading