Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
0128566
Add ptransform_timeout_duration option to pipeline options in python sdk
LEEKYE Jun 15, 2025
f634e5c
Propagate ptransform_timeout_duration Worker option to FnApiWorkerSta…
LEEKYE Jun 15, 2025
4dbbd32
fix import
LEEKYE Jun 15, 2025
6fd2641
Refactor: ptransform_timeout_duration -> element_processing_timeout
LEEKYE Jun 18, 2025
3e299d7
Propagate the value of element_processing_timeout
LEEKYE Jun 18, 2025
90975be
Update
LEEKYE Jun 22, 2025
cdcf613
restart lull unit test
LEEKYE Jun 24, 2025
a8d232b
wrap sys.exit call
LEEKYE Jun 24, 2025
5cdea91
fix test
LEEKYE Jun 24, 2025
2ec2da8
fix test
LEEKYE Jun 24, 2025
d215625
fix
LEEKYE Jun 25, 2025
7023cfb
fixes
LEEKYE Jun 26, 2025
4a972b5
fix lint issues
LEEKYE Jun 26, 2025
42e3e9a
Update sdks/python/apache_beam/runners/worker/sdk_worker.py
LEEKYE Jun 26, 2025
fda548e
fix formatter/lint issues
LEEKYE Jun 27, 2025
77b0558
fix formatter/lint issues
LEEKYE Jun 27, 2025
b19f8dc
fix formatter/lint issues
LEEKYE Jun 27, 2025
fd92e18
remove lower bound of timeout
LEEKYE Jul 31, 2025
c1f5f3f
Merge branch 'master' into add_dataflow_option_python
LEEKYE Jul 31, 2025
6609ff1
format fix
LEEKYE Jul 31, 2025
3d48062
Update sdks/python/apache_beam/options/pipeline_options.py
LEEKYE Jul 31, 2025
8115db5
minor update
LEEKYE Jul 31, 2025
cf578da
minor fix
LEEKYE Jul 31, 2025
8fea033
fix formatter issue
LEEKYE Jul 31, 2025
04f61eb
style fix
LEEKYE Aug 5, 2025
76b22e2
fix lint issues
LEEKYE Aug 6, 2025
3a02cf3
fix lint issues
LEEKYE Aug 6, 2025
173ae07
wording fix
LEEKYE Aug 12, 2025
673f5c1
combine 2 log lull methods
LEEKYE Aug 12, 2025
41c74b8
unit test
LEEKYE Aug 12, 2025
0ab9a0b
Remove redundant error message
LEEKYE Aug 12, 2025
87c0229
call flush log handler method from main thread in worker status
LEEKYE Aug 12, 2025
3bc7c69
remove
LEEKYE Aug 12, 2025
54e1da8
update
LEEKYE Aug 13, 2025
0d04cef
flush logger and shut down process if lull time is too long
LEEKYE Aug 18, 2025
2a1799a
initialized status handler with None
LEEKYE Aug 18, 2025
719bb53
fix re-def issue
LEEKYE Aug 18, 2025
335958d
formatter issue fix
LEEKYE Aug 19, 2025
b8a441e
update
LEEKYE Aug 19, 2025
85e509c
update
LEEKYE Aug 19, 2025
0e5412e
formatter issue fix
LEEKYE Aug 19, 2025
1f73611
formatter issue fix
LEEKYE Aug 19, 2025
ad5126d
update
LEEKYE Aug 20, 2025
90ae9c0
Use os._exit() to terminate the program
LEEKYE Aug 20, 2025
fbe8ca1
minor fix
LEEKYE Aug 27, 2025
01dbee2
minor fix
LEEKYE Aug 27, 2025
dda93bb
formatter issue fix
LEEKYE Aug 27, 2025
6f201f2
minor fix
LEEKYE Aug 28, 2025
92f6fe0
minor fix
LEEKYE Aug 28, 2025
c6eb5b5
formatter issue fix
LEEKYE Aug 28, 2025
6b8ae4c
fix lint error
LEEKYE Aug 28, 2025
bdaec05
Switch branches to reduce logging
tvalentyn Aug 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions sdks/python/apache_beam/options/pipeline_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -1381,6 +1381,16 @@ def _add_argparse_args(cls, parser):
'responsible for executing the user code and communicating with '
'the runner. Depending on the runner, there may be more than one '
'SDK Harness process running on the same worker node.'))
parser.add_argument(
'--element_processing_timeout',
type=int,
default=None,
help=(
'The time limit (minute) that an SDK worker allows for a PTransform'
' operation before signaling the runner harness to restart the SDK'
' worker.'
)
)

def validate(self, validator):
errors = []
Expand Down
11 changes: 10 additions & 1 deletion sdks/python/apache_beam/options/pipeline_options_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,16 +405,25 @@ def test_experiments(self):
self.assertEqual(options.get_all_options()['experiments'], None)

def test_worker_options(self):
options = PipelineOptions(['--machine_type', 'abc', '--disk_type', 'def'])
options = PipelineOptions([
'--machine_type',
'abc',
'--disk_type',
'def',
'--element_processing_timeout',
'10',
])
worker_options = options.view_as(WorkerOptions)
self.assertEqual(worker_options.machine_type, 'abc')
self.assertEqual(worker_options.disk_type, 'def')
self.assertEqual(worker_options.element_processing_timeout, 10)

options = PipelineOptions(
['--worker_machine_type', 'abc', '--worker_disk_type', 'def'])
worker_options = options.view_as(WorkerOptions)
self.assertEqual(worker_options.machine_type, 'abc')
self.assertEqual(worker_options.disk_type, 'def')
self.assertIsNone(worker_options.element_processing_timeout)

def test_beam_services_empty(self):
with mock.patch.dict(os.environ, {}, clear=True):
Expand Down
21 changes: 16 additions & 5 deletions sdks/python/apache_beam/runners/worker/sdk_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ def __init__(
# that should be reported to the runner when proocessing the first bundle.
deferred_exception=None, # type: Optional[Exception]
runner_capabilities=frozenset(), # type: FrozenSet[str]
element_processing_timeout=None, # type: Optional[int]
):
# type: (...) -> None
self._alive = True
Expand Down Expand Up @@ -207,6 +208,7 @@ def __init__(
self._profiler_factory = profiler_factory
self.data_sampler = data_sampler
self.runner_capabilities = runner_capabilities
self.element_processing_timeout=element_processing_timeout

def default_factory(id):
# type: (str) -> beam_fn_api_pb2.ProcessBundleDescriptor
Expand All @@ -230,7 +232,12 @@ def default_factory(id):
status_address,
self._bundle_processor_cache,
self._state_cache,
enable_heap_dump) # type: Optional[FnApiWorkerStatusHandler]
enable_heap_dump,
element_processing_timeout=self.element_processing_timeout) # type: Optional[FnApiWorkerStatusHandler]
except TimeoutError as e:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this doesn't seem like the right place for handling this as this is just constructing the status handler. The error will be thrown later after processing has started on the thread that the status handler started:

 self._lull_logger = threading.Thread(

I'm not a Python expert so I'm not sure the best way to cleanup/terminate once we determine we want to. It could be that throwing unhandled error from that thread will terminate the sdk or perhaps there is some better way to do it.

Copy link
Contributor

@tvalentyn tvalentyn Aug 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, the TimeoutError won't be caught here - this try block finishes after status handler is constructed.

+1 - the error will be thrown on the status handler thread, uncaught, and it should simply terminate the status handler thread, but the main thread will keep running, SDK process shouldn't terminate ( did you see a different behavior when you tested this change? )

Exiting the from StatusHandler thread should work (if we called _shutdown_due_to_element_processing_timeout) from the status handler. However it would be better if we flushed the logs, as is attempted to do on the main thread

except: # pylint: disable=broad-except
_LOGGER.critical('Python sdk harness failed: ', exc_info=True)
raise
finally:
if fn_log_handler:
fn_log_handler.close()
.

We could do this:
- make logs handler a process-level global variable
- add a helpers in sdk_worker_main to flush log handler (if defined) + shut down the process. This would be callable from any thread.
- call these from worker_status thread when timeout is reached

Even better would be to catch the expression on the main thread but that requires either message passing from child threads to main thread or refactoring thread management using concurrent.futures.

self._shutdown_due_to_element_processing_timeout(
str(e) + 'The SDK harness will be terminated and restart.'
)
except Exception:
traceback_string = traceback.format_exc()
_LOGGER.warning(
Expand Down Expand Up @@ -409,6 +416,10 @@ def create_worker(self):
return SdkWorker(
self._bundle_processor_cache, profiler_factory=self._profiler_factory)

def _shutdown_due_to_element_processing_timeout(self, errMsg):
_LOGGER.error('%sThe SDK harness will be terminated.', errMsg)
sys.exit(1)


class BundleProcessorCache(object):
"""A cache for ``BundleProcessor``s.
Expand Down Expand Up @@ -1123,7 +1134,7 @@ def _request(self, request):
request.instruction_id = self._context.process_instruction_id
# Adding a new item to a dictionary is atomic in cPython
self._responses_by_id[request.id] = future = _Future[
beam_fn_api_pb2.StateResponse]()
beam_fn_api_pb2.StateResponse]()
# Request queue is thread-safe
self._requests.put(request)
return future
Expand Down Expand Up @@ -1191,9 +1202,9 @@ def process_instruction_id(self, bundle_id, cache_tokens):
user_state_cache_token = cache_token_struct.token
elif cache_token_struct.HasField("side_input"):
self._context.side_input_cache_tokens[
cache_token_struct.side_input.transform_id,
cache_token_struct.side_input.
side_input_id] = cache_token_struct.token
cache_token_struct.side_input.transform_id,
cache_token_struct.side_input.
side_input_id] = cache_token_struct.token
# TODO: Consider a two-level cache to avoid extra logic and locking
# for items cached at the bundle level.
self._context.bundle_cache_token = bundle_id
Expand Down
3 changes: 2 additions & 1 deletion sdks/python/apache_beam/runners/worker/sdk_worker_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,8 @@ def create_harness(environment, dry_run=False):
enable_heap_dump=enable_heap_dump,
data_sampler=data_sampler,
deferred_exception=deferred_exception,
runner_capabilities=runner_capabilities)
runner_capabilities=runner_capabilities,
element_processing_timeout=sdk_pipeline_options.view_as(WorkerOptions).element_processing_timeout)
return fn_log_handler, sdk_harness, sdk_pipeline_options


Expand Down
59 changes: 48 additions & 11 deletions sdks/python/apache_beam/runners/worker/worker_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
# 5 minutes * 60 seconds * 1000 millis * 1000 micros * 1000 nanoseconds
DEFAULT_LOG_LULL_TIMEOUT_NS = 5 * 60 * 1000 * 1000 * 1000

DEFAULT_RESTART_LULL_TIMEOUT_NS = 10 * 60 * 1000 * 1000 * 1000

# Full thread dump is performed at most every 20 minutes.
LOG_LULL_FULL_THREAD_DUMP_INTERVAL_S = 20 * 60

Expand Down Expand Up @@ -165,7 +167,9 @@ def __init__(
state_cache=None,
enable_heap_dump=False,
worker_id=None,
log_lull_timeout_ns=DEFAULT_LOG_LULL_TIMEOUT_NS):
log_lull_timeout_ns=DEFAULT_LOG_LULL_TIMEOUT_NS,
restart_lull_timeout_ns=DEFAULT_RESTART_LULL_TIMEOUT_NS,
element_processing_timeout=None):
"""Initialize FnApiWorkerStatusHandler.

Args:
Expand All @@ -184,6 +188,14 @@ def __init__(
self._status_channel)
self._responses = queue.Queue()
self.log_lull_timeout_ns = log_lull_timeout_ns
if element_processing_timeout:
self._element_processing_timeout_ns = max(
element_processing_timeout * 60 * 1e9, restart_lull_timeout_ns)
if element_processing_timeout * 60 * 1e9 < restart_lull_timeout_ns:
_LOGGER.error(
'element_processing_timeout overriden to %d ns', self._element_processing_timeout_ns)
else:
self._element_processing_timeout_ns = None
self._last_full_thread_dump_secs = 0.0
self._last_lull_logged_secs = 0.0
self._server = threading.Thread(
Expand All @@ -196,7 +208,10 @@ def __init__(
self._bundle_process_cache),
name='lull_operation_logger')
self._lull_logger.daemon = True
self._lull_logger.start()
try:
self._lull_logger.start()
except TimeoutError as e:
raise TimeoutError('%sThe SDK harness will be terminated.' % e) from e

def _get_responses(self):
while True:
Expand Down Expand Up @@ -250,14 +265,36 @@ def _log_lull_in_bundle_processor(self, bundle_process_cache):
if processor:
info = processor.state_sampler.get_info()
self._log_lull_sampler_info(info, instruction)
if self._element_processing_timeout_ns:
try:
self._restart_lull(info, instruction)
except TimeoutError as e:
raise e

def _restart_lull(self, sampler_info, instruction):
if (
sampler_info
and sampler_info.time_since_transition
and sampler_info.time_since_transition
> self._element_processing_timeout_ns
):
log_lull_msg = self._log_lull_msg(sampler_info, instruction)
raise TimeoutError(log_lull_msg + 'The SDK harness will be terminated.')

def _log_lull_sampler_info(self, sampler_info, instruction):
if not self._passed_lull_timeout_since_last_log():
return
if (sampler_info and sampler_info.time_since_transition and
sampler_info.time_since_transition > self.log_lull_timeout_ns):
if (
sampler_info
and sampler_info.time_since_transition
and sampler_info.time_since_transition > self.log_lull_timeout_ns
):
_LOGGER.warning(self._log_lull_msg(sampler_info, instruction))

def _log_lull_msg(self, sampler_info, instruction):
"""Logs a lull message for the given sampler info and instruction."""
if sampler_info:
lull_seconds = sampler_info.time_since_transition / 1e9

step_name = sampler_info.state_name.step_name
state_name = sampler_info.state_name.name
if step_name and state_name:
Expand All @@ -267,17 +304,17 @@ def _log_lull_sampler_info(self, sampler_info, instruction):
step_name_log = ''

stack_trace = self._get_stack_trace(sampler_info)

_LOGGER.warning(
(
'Operation ongoing in bundle %s%s for at least %.2f seconds'
' without outputting or completing.\n'
'Current Traceback:\n%s'),
return (
'Operation ongoing in bundle %s%s for at least %.2f seconds'
' without outputting or completing.\n'
'Current Traceback:\n%s'
) % (
instruction,
step_name_log,
lull_seconds,
stack_trace,
)
return ''

def _get_stack_trace(self, sampler_info):
exec_thread = getattr(sampler_info, 'tracked_thread', None)
Expand Down
19 changes: 19 additions & 0 deletions sdks/python/apache_beam/runners/worker/worker_status_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,25 @@ def get_state_sampler_info_for_lull(lull_duration_s):
bundle_id, sampler_info = get_state_sampler_info_for_lull(21 * 60)
self.fn_status_handler._log_lull_sampler_info(sampler_info, bundle_id)

# def test_restart_lull_in_bundle_processor(self):
# def get_state_sampler_info_for_lull(lull_duration_s):
# return "bundle-id", statesampler.StateSamplerInfo(
# CounterName('progress-msecs', 'stage_name', 'step_name'),
# 1,
# lull_duration_s * 1e9,
# threading.current_thread())
#
# now = time.time()
# with mock.patch('time.time') as time_mock:
# time_mock.return_value = now
# bundle_id, sampler_info = get_state_sampler_info_for_lull(50 * 60)
# self.fn_status_handler._restart_lull(sampler_info, bundle_id)
#
# with mock.patch('time.time') as time_mock:
# time_mock.return_value = now + 6 * 60 # 6 minutes
# bundle_id, sampler_info = get_state_sampler_info_for_lull(101 * 60)
# with self.assertRaises(TimeoutError):
# self.fn_status_handler._restart_lull(sampler_info, bundle_id)

class HeapDumpTest(unittest.TestCase):
@mock.patch('apache_beam.runners.worker.worker_status.hpy', None)
Expand Down
Loading