@@ -395,7 +395,12 @@ def test_cleanup_by_a_pipeline(self):
395395 # Pipeline association is cleaned up.
396396 self .assertNotIn (p , self .clusters .pipelines )
397397 self .assertNotIn (p , dcm .pipelines )
398- self .assertEqual (options .view_as (FlinkRunnerOptions ).flink_master , '[auto]' )
398+ # The internal option in the pipeline is overwritten.
399+ self .assertEqual (
400+ p .options .view_as (FlinkRunnerOptions ).flink_master , '[auto]' )
401+ # The original option is unchanged.
402+ self .assertEqual (
403+ options .view_as (FlinkRunnerOptions ).flink_master , meta .master_url )
399404 # The cluster is unknown now.
400405 self .assertNotIn (meta , self .clusters .dataproc_cluster_managers )
401406 self .assertNotIn (meta .master_url , self .clusters .master_urls )
@@ -423,10 +428,17 @@ def test_not_cleanup_if_multiple_pipelines_share_a_manager(self):
423428 # Pipeline association of p is cleaned up.
424429 self .assertNotIn (p , self .clusters .pipelines )
425430 self .assertNotIn (p , dcm .pipelines )
426- self .assertEqual (options .view_as (FlinkRunnerOptions ).flink_master , '[auto]' )
431+ # The internal option in the pipeline is overwritten.
432+ self .assertEqual (
433+ p .options .view_as (FlinkRunnerOptions ).flink_master , '[auto]' )
434+ # The original option is unchanged.
435+ self .assertEqual (
436+ options .view_as (FlinkRunnerOptions ).flink_master , meta .master_url )
427437 # Pipeline association of p2 still presents.
428438 self .assertIn (p2 , self .clusters .pipelines )
429439 self .assertIn (p2 , dcm .pipelines )
440+ self .assertEqual (
441+ p2 .options .view_as (FlinkRunnerOptions ).flink_master , meta .master_url )
430442 self .assertEqual (
431443 options2 .view_as (FlinkRunnerOptions ).flink_master , meta .master_url )
432444 # The cluster is still known.
0 commit comments