Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
0128566
Add ptransform_timeout_duration option to pipeline options in python sdk
LEEKYE Jun 15, 2025
f634e5c
Propagate ptransform_timeout_duration Worker option to FnApiWorkerSta…
LEEKYE Jun 15, 2025
4dbbd32
fix import
LEEKYE Jun 15, 2025
6fd2641
Refactor: ptransform_timeout_duration -> element_processing_timeout
LEEKYE Jun 18, 2025
3e299d7
Propagate the value of element_processing_timeout
LEEKYE Jun 18, 2025
90975be
Update
LEEKYE Jun 22, 2025
cdcf613
restart lull unit test
LEEKYE Jun 24, 2025
a8d232b
wrap sys.exit call
LEEKYE Jun 24, 2025
5cdea91
fix test
LEEKYE Jun 24, 2025
2ec2da8
fix test
LEEKYE Jun 24, 2025
d215625
fix
LEEKYE Jun 25, 2025
7023cfb
fixes
LEEKYE Jun 26, 2025
4a972b5
fix lint issues
LEEKYE Jun 26, 2025
42e3e9a
Update sdks/python/apache_beam/runners/worker/sdk_worker.py
LEEKYE Jun 26, 2025
fda548e
fix formatter/lint issues
LEEKYE Jun 27, 2025
77b0558
fix formatter/lint issues
LEEKYE Jun 27, 2025
b19f8dc
fix formatter/lint issues
LEEKYE Jun 27, 2025
fd92e18
remove lower bound of timeout
LEEKYE Jul 31, 2025
c1f5f3f
Merge branch 'master' into add_dataflow_option_python
LEEKYE Jul 31, 2025
6609ff1
format fix
LEEKYE Jul 31, 2025
3d48062
Update sdks/python/apache_beam/options/pipeline_options.py
LEEKYE Jul 31, 2025
8115db5
minor update
LEEKYE Jul 31, 2025
cf578da
minor fix
LEEKYE Jul 31, 2025
8fea033
fix formatter issue
LEEKYE Jul 31, 2025
04f61eb
style fix
LEEKYE Aug 5, 2025
76b22e2
fix lint issues
LEEKYE Aug 6, 2025
3a02cf3
fix lint issues
LEEKYE Aug 6, 2025
173ae07
wording fix
LEEKYE Aug 12, 2025
673f5c1
combine 2 log lull methods
LEEKYE Aug 12, 2025
41c74b8
unit test
LEEKYE Aug 12, 2025
0ab9a0b
Remove redundant error message
LEEKYE Aug 12, 2025
87c0229
call flush log handler method from main thread in worker status
LEEKYE Aug 12, 2025
3bc7c69
remove
LEEKYE Aug 12, 2025
54e1da8
update
LEEKYE Aug 13, 2025
0d04cef
flush logger and shut down process if lull time is too long
LEEKYE Aug 18, 2025
2a1799a
initialized status handler with None
LEEKYE Aug 18, 2025
719bb53
fix re-def issue
LEEKYE Aug 18, 2025
335958d
formatter issue fix
LEEKYE Aug 19, 2025
b8a441e
update
LEEKYE Aug 19, 2025
85e509c
update
LEEKYE Aug 19, 2025
0e5412e
formatter issue fix
LEEKYE Aug 19, 2025
1f73611
formatter issue fix
LEEKYE Aug 19, 2025
ad5126d
update
LEEKYE Aug 20, 2025
90ae9c0
Use os._exit() to terminate the program
LEEKYE Aug 20, 2025
fbe8ca1
minor fix
LEEKYE Aug 27, 2025
01dbee2
minor fix
LEEKYE Aug 27, 2025
dda93bb
formatter issue fix
LEEKYE Aug 27, 2025
6f201f2
minor fix
LEEKYE Aug 28, 2025
92f6fe0
minor fix
LEEKYE Aug 28, 2025
c6eb5b5
formatter issue fix
LEEKYE Aug 28, 2025
6b8ae4c
fix lint error
LEEKYE Aug 28, 2025
bdaec05
Switch branches to reduce logging
tvalentyn Aug 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions sdks/python/apache_beam/options/pipeline_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -1460,9 +1460,9 @@ def _add_argparse_args(cls, parser):
type=int,
default=None,
help=(
'The time limit (in minutes) that an SDK worker allows for a PTransform'
' operation to process one element before signaling the runner harness'
' to restart the SDK worker.'))
'The time limit (in minutes) that an SDK worker allows for a'
' PTransform operation to process one element before signaling'
' the runner harness to restart the SDK worker.'))

def validate(self, validator):
errors = []
Expand Down
9 changes: 6 additions & 3 deletions sdks/python/apache_beam/runners/worker/sdk_worker_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import logging
import os
import re
from signal import signal
import sys
from time import time
import traceback

from google.protobuf import text_format
Expand Down Expand Up @@ -224,12 +226,13 @@ def main(unused_argv):
fn_log_handler.close()


def flush_fn_log_handler():
def terminate_sdk_harness():
"""Flushes the FnApiLogRecordHandler if it exists."""
_LOGGER.error('The Sdk harness will be terminated now.')
_LOGGER.error('The SDK harness will be terminated in 5 seconds.')
time.sleep(5)
if _FN_LOG_HANDLER:
_FN_LOG_HANDLER.close()
os._exit(1)
os.kill(os.getpid(), signal.SIGINT)


def _load_pipeline_options(options_json):
Expand Down
83 changes: 44 additions & 39 deletions sdks/python/apache_beam/runners/worker/worker_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,45 +257,50 @@ def _log_lull_in_bundle_processor(self, bundle_process_cache):
self._log_lull_sampler_info(info, instruction)

def _log_lull_sampler_info(self, sampler_info, instruction):
if (sampler_info and sampler_info.time_since_transition):
lull_seconds = sampler_info.time_since_transition / 1e9
step_name = sampler_info.state_name.step_name
state_name = sampler_info.state_name.name
if step_name and state_name:
step_name_log = (
' for PTransform{name=%s, state=%s}' % (step_name, state_name))
else:
step_name_log = ''

stack_trace = self._get_stack_trace(sampler_info)
if (self._passed_lull_timeout_since_last_log() and
sampler_info.time_since_transition > self.log_lull_timeout_ns):
_LOGGER.warning(
(
'Operation ongoing in bundle %s%s for at least %.2f seconds'
' without outputting or completing.\n'
'Current Traceback:\n%s'),
instruction,
step_name_log,
lull_seconds,
stack_trace,
)
if (self._element_processing_timeout_ns and
sampler_info.time_since_transition
> self._element_processing_timeout_ns):
_LOGGER.error(
(
'Operation ongoing in bundle %s%s for at least %.2f seconds'
' without outputting or completing.\n'
'Current Traceback:\n%s'),
instruction,
step_name_log,
lull_seconds,
stack_trace,
)
from apache_beam.runners.worker.sdk_worker_main import flush_fn_log_handler

flush_fn_log_handler()
if (not sampler_info or not sampler_info.time_since_transition):
return

log_lull = self._passed_lull_timeout_since_last_log(
) and sampler_info.time_since_transition > self.log_lull_timeout_ns
timeout_exceeded = self._element_processing_timeout_ns and sampler_info.time_since_transition > self._element_processing_timeout_ns
if (not log_lull or not timeout_exceeded):
return

lull_seconds = sampler_info.time_since_transition / 1e9
step_name = sampler_info.state_name.step_name
state_name = sampler_info.state_name.name
if step_name and state_name:
step_name_log = (
' for PTransform{name=%s, state=%s}' % (step_name, state_name))
else:
step_name_log = ''
stack_trace = self._get_stack_trace(sampler_info)

if (log_lull):
_LOGGER.warning(
(
'Operation ongoing in bundle %s%s for at least %.2f seconds'
' without outputting or completing.\n'
'Current Traceback:\n%s'),
instruction,
step_name_log,
lull_seconds,
stack_trace,
)
if (timeout_exceeded):
_LOGGER.error(
(
'Operation ongoing in bundle %s%s for at least %.2f seconds'
' without outputting or completing.\n'
'Current Traceback:\n%s'),
instruction,
step_name_log,
self._element_processing_timeout_ns / 60 / 1e9,
stack_trace,
)
from apache_beam.runners.worker.sdk_worker_main import terminate_sdk_harness

terminate_sdk_harness()

def _get_stack_trace(self, sampler_info):
exec_thread = getattr(sampler_info, 'tracked_thread', None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def get_state_sampler_info_for_lull(lull_duration_s):
now = time.time()
with mock.patch('logging.Logger.warning') as warn_mock:
with mock.patch(
'apache_beam.runners.worker.sdk_worker_main.flush_fn_log_handler'
'apache_beam.runners.worker.sdk_worker_main.terminate_sdk_harness'
) as flush_mock:
with mock.patch('time.time') as time_mock:
time_mock.return_value = now
Expand Down
Loading