Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions sdks/python/apache_beam/options/pipeline_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -1021,9 +1021,10 @@ def _add_argparse_args(cls, parser):
'updating-a-pipeline')
parser.add_argument(
'--enable_streaming_engine',
default=False,
default=True,
action='store_true',
help='Enable Windmill Service for this Dataflow job. ')
help='Deprecated. All Python streaming pipelines on Dataflow'
'use Streaming Engine.')
parser.add_argument(
'--dataflow_kms_key',
default=None,
Expand Down
19 changes: 2 additions & 17 deletions sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -633,23 +633,8 @@ def _check_and_add_missing_streaming_options(options):
# Runner v2 only supports using streaming engine (aka windmill service)
if options.view_as(StandardOptions).streaming:
debug_options = options.view_as(DebugOptions)
google_cloud_options = options.view_as(GoogleCloudOptions)
if (not google_cloud_options.enable_streaming_engine and
(debug_options.lookup_experiment("enable_windmill_service") or
debug_options.lookup_experiment("enable_streaming_engine"))):
raise ValueError(
"""Streaming engine both disabled and enabled:
--enable_streaming_engine flag is not set, but
enable_windmill_service and/or enable_streaming_engine experiments
are present. It is recommended you only set the
--enable_streaming_engine flag.""")

# Ensure that if we detected a streaming pipeline that streaming specific
# options and experiments.
options.view_as(StandardOptions).streaming = True
google_cloud_options.enable_streaming_engine = True
debug_options.add_experiment("enable_streaming_engine")
debug_options.add_experiment("enable_windmill_service")
debug_options.add_experiment('enable_streaming_engine')
debug_options.add_experiment('enable_windmill_service')


def _is_runner_v2_disabled(options):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,10 +421,9 @@ def test_min_cpu_platform_flag_is_propagated_to_experiments(self):
'min_cpu_platform=Intel Haswell',
remote_runner.job.options.view_as(DebugOptions).experiments)

def test_streaming_engine_flag_adds_windmill_experiments(self):
def test_streaming_adds_windmill_experiments(self):
remote_runner = DataflowRunner()
self.default_properties.append('--streaming')
self.default_properties.append('--enable_streaming_engine')
self.default_properties.append('--experiment=some_other_experiment')

with Pipeline(remote_runner, PipelineOptions(self.default_properties)) as p:
Expand Down
Loading