@@ -570,7 +570,7 @@ def __init__(self, transform):
570570 # id_label and timestamp_attribute.
571571 # Only raise errors for DirectRunner or batch pipelines
572572 pipeline_options = transform .pipeline_options
573- should_raise_error = False
573+ output_labels_supported = True
574574
575575 if pipeline_options :
576576 from apache_beam .options .pipeline_options import StandardOptions
@@ -585,18 +585,18 @@ def __init__(self, transform):
585585 if (runner_name is None or
586586 (runner_name in StandardOptions .LOCAL_RUNNERS or 'DirectRunner'
587587 in str (runner_name ) or 'TestDirectRunner' in str (runner_name ))):
588- should_raise_error = True
588+ output_labels_supported = False
589589 except Exception :
590590 # If we can't determine runner, assume DirectRunner for safety
591- should_raise_error = True
591+ output_labels_supported = False
592592
593593 # Check if in batch mode (not streaming)
594594 standard_options = pipeline_options .view_as (StandardOptions )
595595 if not standard_options .streaming :
596- should_raise_error = True
596+ output_labels_supported = False
597597 else :
598598 # If no pipeline options available, fall back to original behavior
599- should_raise_error = True
599+ output_labels_supported = False
600600
601601 # Log debug information for troubleshooting
602602 import logging
@@ -616,7 +616,7 @@ def __init__(self, transform):
616616 runner_info ,
617617 streaming_info )
618618
619- if should_raise_error :
619+ if not output_labels_supported :
620620
621621 if transform .id_label :
622622 raise NotImplementedError (
0 commit comments