From f7bb2582c3a01db0eee73794934b030a11e44c17 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 31 Mar 2026 21:36:57 +0000 Subject: [PATCH 1/4] Ensure logging is configured on remote task workers When tasks execute on remote Dask/Ray workers via a serialized context, the worker process may never have called setup_logging(), so the APILogHandler is missing from Prefect loggers and task logs are silently lost. Add ensure_logging_setup() API in logging/configuration.py that encapsulates the PROCESS_LOGGING_CONFIG check and only calls setup_logging() if needed. Call it from both SyncTaskRunEngine and AsyncTaskRunEngine initialize_run() when a serialized context is present. Flush APILogHandler in the finally block to ensure logs are sent before the task result is returned. Closes #18082 Co-authored-by: Alex Streed Co-Authored-By: alex.s --- src/prefect/logging/configuration.py | 23 ++++++++++++++++++++++ src/prefect/task_engine.py | 14 ++++++++++++++ tests/test_logging.py | 29 ++++++++++++++++++++++++++++ 3 files changed, 66 insertions(+) diff --git a/src/prefect/logging/configuration.py b/src/prefect/logging/configuration.py index 22dfadb81271..2fbcefa7680a 100644 --- a/src/prefect/logging/configuration.py +++ b/src/prefect/logging/configuration.py @@ -65,6 +65,29 @@ def load_logging_config(path: Path) -> dict[str, Any]: return flatdict_to_dict(flat_config) +def is_logging_configured() -> bool: + """ + Check whether Prefect logging has already been configured in this process. + + Returns `True` if `setup_logging` has been called at least once, meaning + handlers like `APILogHandler` are attached to the Prefect loggers. + """ + return bool(PROCESS_LOGGING_CONFIG) + + +def ensure_logging_setup() -> None: + """ + Ensure Prefect logging is configured in this process, calling + `setup_logging` only if it has not already been called. + + Use this in remote execution environments (e.g. Dask/Ray workers) where + the normal SDK entry point (`import prefect`) may not have triggered + logging configuration. + """ + if not PROCESS_LOGGING_CONFIG: + setup_logging() + + def setup_logging(incremental: bool | None = None) -> dict[str, Any]: """ Sets up logging. diff --git a/src/prefect/task_engine.py b/src/prefect/task_engine.py index fd7a698fe2ab..56d1a35d184d 100644 --- a/src/prefect/task_engine.py +++ b/src/prefect/task_engine.py @@ -73,6 +73,8 @@ TerminationSignal, UpstreamTaskError, ) +from prefect.logging.configuration import ensure_logging_setup +from prefect.logging.handlers import APILogHandler from prefect.logging.loggers import get_logger, patch_print, task_run_logger from prefect.results import ( ResultRecord, @@ -818,6 +820,9 @@ def initialize_run( """ with hydrated_context(self.context): + if self.context is not None: + ensure_logging_setup() + with SyncClientContext.get_or_create() as client_ctx: self._client = client_ctx.client self._is_started = True @@ -884,6 +889,10 @@ def initialize_run( self.handle_crash(exc) raise finally: + if self.context is not None: + for handler in logging.getLogger("prefect.task_runs").handlers: + if isinstance(handler, APILogHandler): + handler.flush() self.log_finished_message() self._is_started = False self._client = None @@ -1445,6 +1454,9 @@ async def initialize_run( """ with hydrated_context(self.context): + if self.context is not None: + ensure_logging_setup() + async with AsyncClientContext.get_or_create(): self._client = get_client() self._is_started = True @@ -1510,6 +1522,8 @@ async def initialize_run( await self.handle_crash(exc) raise finally: + if self.context is not None: + await APILogHandler.aflush() self.log_finished_message() self._is_started = False self._client = None diff --git a/tests/test_logging.py b/tests/test_logging.py index 141cd5172d9d..9025de626e57 100644 --- a/tests/test_logging.py +++ b/tests/test_logging.py @@ -33,6 +33,8 @@ from prefect.logging import LogEavesdropper from prefect.logging.configuration import ( DEFAULT_LOGGING_SETTINGS_PATH, + ensure_logging_setup, + is_logging_configured, load_logging_config, setup_logging, ) @@ -285,6 +287,33 @@ def test_setup_logging_applies_root_config_when_no_prior_configuration( assert called_config["root"]["handlers"] == ["console"] +def test_is_logging_configured_returns_false_when_not_configured( + dictConfigMock: MagicMock, +): + assert is_logging_configured() is False + + +def test_is_logging_configured_returns_true_after_setup(dictConfigMock: MagicMock): + setup_logging() + assert is_logging_configured() is True + + +def test_ensure_logging_setup_calls_setup_logging_when_not_configured( + dictConfigMock: MagicMock, +): + ensure_logging_setup() + dictConfigMock.assert_called_once() + + +def test_ensure_logging_setup_is_idempotent(dictConfigMock: MagicMock): + ensure_logging_setup() + ensure_logging_setup() + ensure_logging_setup() + # setup_logging should only be called once since PROCESS_LOGGING_CONFIG + # is populated after the first call + dictConfigMock.assert_called_once() + + def test_setting_aliases_respected_for_logging_config(tmp_path: Path): logging_config_content = """ loggers: From 6dba8a7f715efeeb7decd147f59cd1c3e1b9c1c9 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 1 Apr 2026 13:47:18 +0000 Subject: [PATCH 2/4] Remove APILogHandler flush from task engine finally blocks The flush was running on all task runs (including local ThreadPool runs where self.context is set to {cancel_event: ...}), disrupting the EventsWorker service. The core fix (ensure_logging_setup) is sufficient for the remote worker logging issue. Co-authored-by: Alexander Streed Co-Authored-By: alex.s --- src/prefect/task_engine.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/src/prefect/task_engine.py b/src/prefect/task_engine.py index 56d1a35d184d..46bac427595b 100644 --- a/src/prefect/task_engine.py +++ b/src/prefect/task_engine.py @@ -74,7 +74,6 @@ UpstreamTaskError, ) from prefect.logging.configuration import ensure_logging_setup -from prefect.logging.handlers import APILogHandler from prefect.logging.loggers import get_logger, patch_print, task_run_logger from prefect.results import ( ResultRecord, @@ -889,10 +888,6 @@ def initialize_run( self.handle_crash(exc) raise finally: - if self.context is not None: - for handler in logging.getLogger("prefect.task_runs").handlers: - if isinstance(handler, APILogHandler): - handler.flush() self.log_finished_message() self._is_started = False self._client = None @@ -1522,8 +1517,6 @@ async def initialize_run( await self.handle_crash(exc) raise finally: - if self.context is not None: - await APILogHandler.aflush() self.log_finished_message() self._is_started = False self._client = None From 8fae79c27d851297eee571b6307d3367767471c3 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 1 Apr 2026 14:22:28 +0000 Subject: [PATCH 3/4] Address review: move ensure_logging_setup into hydrated_context, remove is_logging_configured - Moved ensure_logging_setup() call from task_engine.py into hydrated_context() in context.py, so it runs automatically when a serialized context is provided (remote execution) - Removed is_logging_configured() since it was only used in tests - Removed ensure_logging_setup import/calls from task_engine.py Co-authored-by: Alexander Streed Co-Authored-By: alex.s --- src/prefect/context.py | 4 ++++ src/prefect/logging/configuration.py | 10 ---------- src/prefect/task_engine.py | 7 ------- tests/test_logging.py | 12 ------------ 4 files changed, 4 insertions(+), 29 deletions(-) diff --git a/src/prefect/context.py b/src/prefect/context.py index 01b3225de919..8191b7efd0c9 100644 --- a/src/prefect/context.py +++ b/src/prefect/context.py @@ -163,6 +163,10 @@ def hydrated_context( with ExitStack() as stack: if serialized_context: + from prefect.logging.configuration import ensure_logging_setup + + ensure_logging_setup() + # Set up settings context if settings_context := serialized_context.get("settings_context"): stack.enter_context(SettingsContext(**settings_context)) diff --git a/src/prefect/logging/configuration.py b/src/prefect/logging/configuration.py index 2fbcefa7680a..a294625685a1 100644 --- a/src/prefect/logging/configuration.py +++ b/src/prefect/logging/configuration.py @@ -65,16 +65,6 @@ def load_logging_config(path: Path) -> dict[str, Any]: return flatdict_to_dict(flat_config) -def is_logging_configured() -> bool: - """ - Check whether Prefect logging has already been configured in this process. - - Returns `True` if `setup_logging` has been called at least once, meaning - handlers like `APILogHandler` are attached to the Prefect loggers. - """ - return bool(PROCESS_LOGGING_CONFIG) - - def ensure_logging_setup() -> None: """ Ensure Prefect logging is configured in this process, calling diff --git a/src/prefect/task_engine.py b/src/prefect/task_engine.py index 46bac427595b..fd7a698fe2ab 100644 --- a/src/prefect/task_engine.py +++ b/src/prefect/task_engine.py @@ -73,7 +73,6 @@ TerminationSignal, UpstreamTaskError, ) -from prefect.logging.configuration import ensure_logging_setup from prefect.logging.loggers import get_logger, patch_print, task_run_logger from prefect.results import ( ResultRecord, @@ -819,9 +818,6 @@ def initialize_run( """ with hydrated_context(self.context): - if self.context is not None: - ensure_logging_setup() - with SyncClientContext.get_or_create() as client_ctx: self._client = client_ctx.client self._is_started = True @@ -1449,9 +1445,6 @@ async def initialize_run( """ with hydrated_context(self.context): - if self.context is not None: - ensure_logging_setup() - async with AsyncClientContext.get_or_create(): self._client = get_client() self._is_started = True diff --git a/tests/test_logging.py b/tests/test_logging.py index 9025de626e57..ab611971074c 100644 --- a/tests/test_logging.py +++ b/tests/test_logging.py @@ -34,7 +34,6 @@ from prefect.logging.configuration import ( DEFAULT_LOGGING_SETTINGS_PATH, ensure_logging_setup, - is_logging_configured, load_logging_config, setup_logging, ) @@ -287,17 +286,6 @@ def test_setup_logging_applies_root_config_when_no_prior_configuration( assert called_config["root"]["handlers"] == ["console"] -def test_is_logging_configured_returns_false_when_not_configured( - dictConfigMock: MagicMock, -): - assert is_logging_configured() is False - - -def test_is_logging_configured_returns_true_after_setup(dictConfigMock: MagicMock): - setup_logging() - assert is_logging_configured() is True - - def test_ensure_logging_setup_calls_setup_logging_when_not_configured( dictConfigMock: MagicMock, ): From f0eee82154198c4e28fdf2e7fa23c6ba274d1b00 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 1 Apr 2026 14:32:27 +0000 Subject: [PATCH 4/4] Move ensure_logging_setup import to top of context.py Co-authored-by: Alexander Streed Co-Authored-By: alex.s --- src/prefect/context.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/prefect/context.py b/src/prefect/context.py index 8191b7efd0c9..e6abedfdd678 100644 --- a/src/prefect/context.py +++ b/src/prefect/context.py @@ -37,6 +37,7 @@ from prefect.client.schemas.objects import RunType from prefect.events.worker import EventsWorker from prefect.exceptions import MissingContextError +from prefect.logging.configuration import ensure_logging_setup from prefect.results import ( ResultStore, get_default_persist_setting, @@ -163,8 +164,6 @@ def hydrated_context( with ExitStack() as stack: if serialized_context: - from prefect.logging.configuration import ensure_logging_setup - ensure_logging_setup() # Set up settings context