Skip to content
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
0128566
Add ptransform_timeout_duration option to pipeline options in python sdk
LEEKYE Jun 15, 2025
f634e5c
Propagate ptransform_timeout_duration Worker option to FnApiWorkerSta…
LEEKYE Jun 15, 2025
4dbbd32
fix import
LEEKYE Jun 15, 2025
6fd2641
Refactor: ptransform_timeout_duration -> element_processing_timeout
LEEKYE Jun 18, 2025
3e299d7
Propagate the value of element_processing_timeout
LEEKYE Jun 18, 2025
90975be
Update
LEEKYE Jun 22, 2025
cdcf613
restart lull unit test
LEEKYE Jun 24, 2025
a8d232b
wrap sys.exit call
LEEKYE Jun 24, 2025
5cdea91
fix test
LEEKYE Jun 24, 2025
2ec2da8
fix test
LEEKYE Jun 24, 2025
d215625
fix
LEEKYE Jun 25, 2025
7023cfb
fixes
LEEKYE Jun 26, 2025
4a972b5
fix lint issues
LEEKYE Jun 26, 2025
42e3e9a
Update sdks/python/apache_beam/runners/worker/sdk_worker.py
LEEKYE Jun 26, 2025
fda548e
fix formatter/lint issues
LEEKYE Jun 27, 2025
77b0558
fix formatter/lint issues
LEEKYE Jun 27, 2025
b19f8dc
fix formatter/lint issues
LEEKYE Jun 27, 2025
fd92e18
remove lower bound of timeout
LEEKYE Jul 31, 2025
c1f5f3f
Merge branch 'master' into add_dataflow_option_python
LEEKYE Jul 31, 2025
6609ff1
format fix
LEEKYE Jul 31, 2025
3d48062
Update sdks/python/apache_beam/options/pipeline_options.py
LEEKYE Jul 31, 2025
8115db5
minor update
LEEKYE Jul 31, 2025
cf578da
minor fix
LEEKYE Jul 31, 2025
8fea033
fix formatter issue
LEEKYE Jul 31, 2025
04f61eb
style fix
LEEKYE Aug 5, 2025
76b22e2
fix lint issues
LEEKYE Aug 6, 2025
3a02cf3
fix lint issues
LEEKYE Aug 6, 2025
173ae07
wording fix
LEEKYE Aug 12, 2025
673f5c1
combine 2 log lull methods
LEEKYE Aug 12, 2025
41c74b8
unit test
LEEKYE Aug 12, 2025
0ab9a0b
Remove redundant error message
LEEKYE Aug 12, 2025
87c0229
call flush log handler method from main thread in worker status
LEEKYE Aug 12, 2025
3bc7c69
remove
LEEKYE Aug 12, 2025
54e1da8
update
LEEKYE Aug 13, 2025
0d04cef
flush logger and shut down process if lull time is too long
LEEKYE Aug 18, 2025
2a1799a
initialized status handler with None
LEEKYE Aug 18, 2025
719bb53
fix re-def issue
LEEKYE Aug 18, 2025
335958d
formatter issue fix
LEEKYE Aug 19, 2025
b8a441e
update
LEEKYE Aug 19, 2025
85e509c
update
LEEKYE Aug 19, 2025
0e5412e
formatter issue fix
LEEKYE Aug 19, 2025
1f73611
formatter issue fix
LEEKYE Aug 19, 2025
ad5126d
update
LEEKYE Aug 20, 2025
90ae9c0
Use os._exit() to terminate the program
LEEKYE Aug 20, 2025
fbe8ca1
minor fix
LEEKYE Aug 27, 2025
01dbee2
minor fix
LEEKYE Aug 27, 2025
dda93bb
formatter issue fix
LEEKYE Aug 27, 2025
6f201f2
minor fix
LEEKYE Aug 28, 2025
92f6fe0
minor fix
LEEKYE Aug 28, 2025
c6eb5b5
formatter issue fix
LEEKYE Aug 28, 2025
6b8ae4c
fix lint error
LEEKYE Aug 28, 2025
bdaec05
Switch branches to reduce logging
tvalentyn Aug 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions sdks/python/apache_beam/options/pipeline_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -1455,6 +1455,14 @@ def _add_argparse_args(cls, parser):
'responsible for executing the user code and communicating with '
'the runner. Depending on the runner, there may be more than one '
'SDK Harness process running on the same worker node.'))
parser.add_argument(
'--element_processing_timeout_minutes',
type=int,
default=None,
help=(
'The time limit (minute) that an SDK worker allows for a PTransform'
' operation to process one element before signaling the runner harness'
' to restart the SDK worker.'))

def validate(self, validator):
errors = []
Expand Down
10 changes: 9 additions & 1 deletion sdks/python/apache_beam/options/pipeline_options_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,10 +405,18 @@ def test_experiments(self):
self.assertEqual(options.get_all_options()['experiments'], None)

def test_worker_options(self):
options = PipelineOptions(['--machine_type', 'abc', '--disk_type', 'def'])
options = PipelineOptions([
'--machine_type',
'abc',
'--disk_type',
'def',
'--element_processing_timeout_minutes',
'10',
])
worker_options = options.view_as(WorkerOptions)
self.assertEqual(worker_options.machine_type, 'abc')
self.assertEqual(worker_options.disk_type, 'def')
self.assertEqual(worker_options.element_processing_timeout_minutes, 10)

options = PipelineOptions(
['--worker_machine_type', 'abc', '--worker_disk_type', 'def'])
Expand Down
14 changes: 13 additions & 1 deletion sdks/python/apache_beam/runners/worker/sdk_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ def __init__(
# that should be reported to the runner when proocessing the first bundle.
deferred_exception=None, # type: Optional[Exception]
runner_capabilities=frozenset(), # type: FrozenSet[str]
element_processing_timeout_minutes=None, # type: Optional[int]
):
# type: (...) -> None
self._alive = True
Expand Down Expand Up @@ -207,6 +208,7 @@ def __init__(
self._profiler_factory = profiler_factory
self.data_sampler = data_sampler
self.runner_capabilities = runner_capabilities
self.element_processing_timeout_minutes = element_processing_timeout_minutes

def default_factory(id):
# type: (str) -> beam_fn_api_pb2.ProcessBundleDescriptor
Expand All @@ -230,7 +232,12 @@ def default_factory(id):
status_address,
self._bundle_processor_cache,
self._state_cache,
enable_heap_dump) # type: Optional[FnApiWorkerStatusHandler]
enable_heap_dump,
element_processing_timeout_minutes=self.
element_processing_timeout_minutes
) # type: Optional[FnApiWorkerStatusHandler]
except TimeoutError as e:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this doesn't seem like the right place for handling this as this is just constructing the status handler. The error will be thrown later after processing has started on the thread that the status handler started:

 self._lull_logger = threading.Thread(

I'm not a Python expert so I'm not sure the best way to cleanup/terminate once we determine we want to. It could be that throwing unhandled error from that thread will terminate the sdk or perhaps there is some better way to do it.

Copy link
Contributor

@tvalentyn tvalentyn Aug 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, the TimeoutError won't be caught here - this try block finishes after status handler is constructed.

+1 - the error will be thrown on the status handler thread, uncaught, and it should simply terminate the status handler thread, but the main thread will keep running, SDK process shouldn't terminate ( did you see a different behavior when you tested this change? )

Exiting the from StatusHandler thread should work (if we called _shutdown_due_to_element_processing_timeout) from the status handler. However it would be better if we flushed the logs, as is attempted to do on the main thread

except: # pylint: disable=broad-except
_LOGGER.critical('Python sdk harness failed: ', exc_info=True)
raise
finally:
if fn_log_handler:
fn_log_handler.close()
.

We could do this:
- make logs handler a process-level global variable
- add a helpers in sdk_worker_main to flush log handler (if defined) + shut down the process. This would be callable from any thread.
- call these from worker_status thread when timeout is reached

Even better would be to catch the expression on the main thread but that requires either message passing from child threads to main thread or refactoring thread management using concurrent.futures.

self._shutdown_due_to_element_processing_timeout(e)
except Exception:
traceback_string = traceback.format_exc()
_LOGGER.warning(
Expand Down Expand Up @@ -409,6 +416,11 @@ def create_worker(self):
return SdkWorker(
self._bundle_processor_cache, profiler_factory=self._profiler_factory)

def _shutdown_due_to_element_processing_timeout(
self, err: TimeoutError) -> None:
_LOGGER.error('%sThe SDK harness will be terminated.', str(err))
sys.exit(1)


class BundleProcessorCache(object):
"""A cache for ``BundleProcessor``s.
Expand Down
4 changes: 3 additions & 1 deletion sdks/python/apache_beam/runners/worker/sdk_worker_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,9 @@ def create_harness(environment, dry_run=False):
enable_heap_dump=enable_heap_dump,
data_sampler=data_sampler,
deferred_exception=deferred_exception,
runner_capabilities=runner_capabilities)
runner_capabilities=runner_capabilities,
element_processing_timeout_minutes=sdk_pipeline_options.view_as(
WorkerOptions).element_processing_timeout_minutes)
return fn_log_handler, sdk_harness, sdk_pipeline_options


Expand Down
31 changes: 29 additions & 2 deletions sdks/python/apache_beam/runners/worker/worker_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,8 @@ def __init__(
state_cache=None,
enable_heap_dump=False,
worker_id=None,
log_lull_timeout_ns=DEFAULT_LOG_LULL_TIMEOUT_NS):
log_lull_timeout_ns=DEFAULT_LOG_LULL_TIMEOUT_NS,
element_processing_timeout_minutes=None):
"""Initialize FnApiWorkerStatusHandler.

Args:
Expand All @@ -184,6 +185,10 @@ def __init__(
self._status_channel)
self._responses = queue.Queue()
self.log_lull_timeout_ns = log_lull_timeout_ns
if element_processing_timeout_minutes:
self._element_processing_timeout_ns = element_processing_timeout_minutes * 60 * 1e9
else:
self._element_processing_timeout_ns = None
self._last_full_thread_dump_secs = 0.0
self._last_lull_logged_secs = 0.0
self._server = threading.Thread(
Expand Down Expand Up @@ -250,14 +255,36 @@ def _log_lull_in_bundle_processor(self, bundle_process_cache):
if processor:
info = processor.state_sampler.get_info()
self._log_lull_sampler_info(info, instruction)
if self._element_processing_timeout_ns:
self._terminate_sdk_worker_lull(info, instruction)

def _terminate_sdk_worker_lull(self, sampler_info, instruction):
if (sampler_info and sampler_info.time_since_transition and
sampler_info.time_since_transition
> self._element_processing_timeout_ns):
lull_seconds = sampler_info.time_since_transition / 1e9
step_name = sampler_info.state_name.step_name
state_name = sampler_info.state_name.name
if step_name and state_name:
step_name_log = (
' for PTransform{name=%s, state=%s}' % (step_name, state_name))
else:
step_name_log = ''

stack_trace = self._get_stack_trace(sampler_info)
log_lull_msg = (
'Operation ongoing in bundle %s%s for at least %.2f seconds'
' without outputting or completing.\n'
'Current Traceback:\n%s.' %
(instruction, step_name_log, lull_seconds, stack_trace))
raise TimeoutError(log_lull_msg + 'The SDK harness will be terminated.')

def _log_lull_sampler_info(self, sampler_info, instruction):
if not self._passed_lull_timeout_since_last_log():
return
if (sampler_info and sampler_info.time_since_transition and
sampler_info.time_since_transition > self.log_lull_timeout_ns):
lull_seconds = sampler_info.time_since_transition / 1e9

step_name = sampler_info.state_name.step_name
state_name = sampler_info.state_name.name
if step_name and state_name:
Expand Down
39 changes: 27 additions & 12 deletions sdks/python/apache_beam/runners/worker/worker_status_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ def setUp(self):
self.test_port = self.server.add_insecure_port('[::]:0')
self.server.start()
self.url = 'localhost:%s' % self.test_port
self.fn_status_handler = FnApiWorkerStatusHandler(self.url)
self.fn_status_handler = FnApiWorkerStatusHandler(
self.url, element_processing_timeout_minutes=10)

def tearDown(self):
self.server.stop(5)
Expand All @@ -86,19 +87,19 @@ def test_generate_error(self, mock_method):
self.assertIsNotNone(response.error)
self.fn_status_handler.close()

def test_log_lull_in_bundle_processor(self):
def get_state_sampler_info_for_lull(lull_duration_s):
return "bundle-id", statesampler.StateSamplerInfo(
CounterName('progress-msecs', 'stage_name', 'step_name'),
1,
lull_duration_s * 1e9,
threading.current_thread())
def get_state_sampler_info_for_lull(lull_duration_s):
return "bundle-id", statesampler.StateSamplerInfo(
CounterName('progress-msecs', 'stage_name', 'step_name'),
1,
lull_duration_s * 1e9,
threading.current_thread())

def test_log_lull_in_bundle_processor(self):
now = time.time()
with mock.patch('logging.Logger.warning') as warn_mock:
with mock.patch('time.time') as time_mock:
time_mock.return_value = now
bundle_id, sampler_info = get_state_sampler_info_for_lull(21 * 60)
bundle_id, sampler_info = self.get_state_sampler_info_for_lull(21 * 60)
self.fn_status_handler._log_lull_sampler_info(sampler_info, bundle_id)

bundle_id_template = warn_mock.call_args[0][1]
Expand All @@ -113,19 +114,33 @@ def get_state_sampler_info_for_lull(lull_duration_s):

with mock.patch('time.time') as time_mock:
time_mock.return_value = now + 6 * 60 # 6 minutes
bundle_id, sampler_info = get_state_sampler_info_for_lull(21 * 60)
bundle_id, sampler_info = self.get_state_sampler_info_for_lull(21 * 60)
self.fn_status_handler._log_lull_sampler_info(sampler_info, bundle_id)

with mock.patch('time.time') as time_mock:
time_mock.return_value = now + 21 * 60 # 21 minutes
bundle_id, sampler_info = get_state_sampler_info_for_lull(10 * 60)
bundle_id, sampler_info = self.get_state_sampler_info_for_lull(10 * 60)
self.fn_status_handler._log_lull_sampler_info(sampler_info, bundle_id)

with mock.patch('time.time') as time_mock:
time_mock.return_value = now + 42 * 60 # 21 minutes after previous one
bundle_id, sampler_info = get_state_sampler_info_for_lull(21 * 60)
bundle_id, sampler_info = self.get_state_sampler_info_for_lull(21 * 60)
self.fn_status_handler._log_lull_sampler_info(sampler_info, bundle_id)

def test__terminate_sdk_worker_lull_in_bundle_processor(self):
now = time.time()
with mock.patch('time.time') as time_mock:
time_mock.return_value = now
bundle_id, sampler_info = self.get_state_sampler_info_for_lull(5 * 60)
self.fn_status_handler._terminate_sdk_worker_lull(sampler_info, bundle_id)

with mock.patch('time.time') as time_mock:
time_mock.return_value = now + 6 * 60 # 6 minutes
bundle_id, sampler_info = self.get_state_sampler_info_for_lull(11 * 60)
with self.assertRaises(TimeoutError):
self.fn_status_handler._terminate_sdk_worker_lull(
sampler_info, bundle_id)


class HeapDumpTest(unittest.TestCase):
@mock.patch('apache_beam.runners.worker.worker_status.hpy', None)
Expand Down
Loading