-
Notifications
You must be signed in to change notification settings - Fork 4.5k
Description
What happened?
The main points of my issue are as follows:
When submitting a DataFlowPythonOP task to run beam pipeline, the script is executed using subprocess. However, apache_beam.runners.dataflow.internal.apiclient does not adjust the default log level, and logging.info() is used to print the Job ID and URL. As a result, create_python_job fails to match the Job ID, causing the task to fail.
Although it’s possible to forcibly set the log level in the script using:
logging.getLogger('apache_beam.runners.dataflow.internal.apiclient').setLevel(logging.INFO)
this is not a very user-friendly solution.
Additional Context:
Vertex Pipeline needs to match the submitted Job URL from subprocess when running DataFlowPythonOP to call create_python_job. However, since the module does not change the default log level, the Job URL is not displayed, leading to a RuntimeError.
# run_pipeline.py
from google_cloud_pipeline_components.v1.dataflow import DataflowPythonJobOp
...
@dsl.pipeline(name='test dataflow')
def test_dataflow():
dataflow_task = DataflowPythonJobOp(
project=project,
location=region,
python_module_path=dataflow_clean_local_path,
requirements_file_path=requirements_file_path,
temp_location=temp_location,
args=[
"--project", project,
"--region", region,
"--temp_location", temp_location,
"--job_name", f"dataflow-clean-{time.strftime('%Y%m%d-%H%M%S', time.gmtime())}",
"--save_main_session",
"--runner", "DataflowRunner",
],
)
...
# data_clean.py
p = beam.Pipeline(options=options)
p | .....
result = p.run()
result.wait_until_finish()
return result
PR: #34952
Let me know if you'd like any refinements!
Issue Priority
Priority: 1 (data loss / total loss of function)
Issue Components
- Component: Python SDK
- Component: Java SDK
- Component: Go SDK
- Component: Typescript SDK
- Component: IO connector
- Component: Beam YAML
- Component: Beam examples
- Component: Beam playground
- Component: Beam katas
- Component: Website
- Component: Infrastructure
- Component: Spark Runner
- Component: Flink Runner
- Component: Samza Runner
- Component: Twister2 Runner
- Component: Hazelcast Jet Runner
- Component: Google Cloud Dataflow Runner
