Skip to content

Commit fe3e81d

Browse files
committed
refactor
1 parent fc73de3 commit fe3e81d

File tree

3 files changed

+38
-22
lines changed

3 files changed

+38
-22
lines changed
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
import logging
2+
3+
from servicelib.logging_utils import config_all_loggers
4+
5+
from .settings import ApplicationSettings
6+
7+
8+
def setup_app_logging(settings: ApplicationSettings) -> None:
9+
# set up logging
10+
logging.basicConfig(level=settings.DASK_SIDECAR_LOGLEVEL.value)
11+
logging.root.setLevel(level=settings.DASK_SIDECAR_LOGLEVEL.value)
12+
# NOTE: Dask attaches a StreamHandler to the logger in distributed
13+
# removing them solves dual propagation of logs
14+
for handler in logging.getLogger("distributed").handlers:
15+
logging.getLogger("distributed").removeHandler(handler)
16+
config_all_loggers(
17+
log_format_local_dev_enabled=settings.DASK_LOG_FORMAT_LOCAL_DEV_ENABLED,
18+
logger_filter_mapping=settings.DASK_LOG_FILTER_MAPPING,
19+
tracing_settings=None, # no tracing for dask sidecar
20+
)
Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,27 @@
11
import logging
22

3-
import dask.config
43
import distributed
4+
from servicelib.logging_utils import log_context
55

66
from ._meta import print_dask_scheduler_banner
7+
from .app_utils import setup_app_logging
8+
from .settings import ApplicationSettings
79

810
_logger = logging.getLogger(__name__)
911

1012

1113
async def dask_setup(scheduler: distributed.Scheduler) -> None:
12-
"""This is a special function recognized by the dask worker when starting with flag --preload"""
13-
_logger.info("Setting up scheduler...")
14+
"""This is a special function recognized by dask when starting with flag --preload"""
1415
assert scheduler # nosec
15-
print(f"dask config: {dask.config.config}", flush=True) # noqa: T201
16-
print_dask_scheduler_banner()
16+
17+
settings = ApplicationSettings.create_from_envs()
18+
setup_app_logging(settings)
19+
20+
with log_context(_logger, logging.INFO, "Launch dask scheduler"):
21+
_logger.info("app settings: %s", settings.model_dump_json(indent=1))
22+
print_dask_scheduler_banner()
1723

1824

1925
async def dask_teardown(_worker: distributed.Worker) -> None:
20-
_logger.info("Shutting down scheduler")
26+
with log_context(_logger, logging.INFO, "Tear down dask scheduler"):
27+
...

services/dask-sidecar/src/simcore_service_dask_sidecar/worker.py

Lines changed: 5 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,11 @@
1111
ContainerTaskParameters,
1212
LogFileUploadURL,
1313
)
14-
from distributed.worker import logger as dask_worker_logger
15-
from servicelib.logging_utils import config_all_loggers, log_context
14+
from servicelib.logging_utils import log_context
1615
from settings_library.s3 import S3Settings
1716

1817
from ._meta import print_dask_sidecar_banner
18+
from .app_utils import setup_app_logging
1919
from .computational_sidecar.core import ComputationalSidecar
2020
from .dask_utils import TaskPublisher, get_current_task_resources, monitor_task_abortion
2121
from .rabbitmq_plugin import RabbitMQPlugin
@@ -53,21 +53,9 @@ def exit_gracefully(self, *_args):
5353

5454

5555
async def dask_setup(worker: distributed.Worker) -> None:
56-
"""This is a special function recognized by the dask worker when starting with flag --preload"""
56+
"""This is a special function recognized by dask when starting with flag --preload"""
5757
settings = ApplicationSettings.create_from_envs()
58-
# set up logging
59-
logging.basicConfig(level=settings.DASK_SIDECAR_LOGLEVEL.value)
60-
logging.root.setLevel(level=settings.DASK_SIDECAR_LOGLEVEL.value)
61-
dask_worker_logger.setLevel(level=settings.DASK_SIDECAR_LOGLEVEL.value)
62-
# NOTE: Dask attaches a StreamHandler to the logger in distributed
63-
# removing them solves dual propagation of logs
64-
for handler in logging.getLogger("distributed").handlers:
65-
logging.getLogger("distributed").removeHandler(handler)
66-
config_all_loggers(
67-
log_format_local_dev_enabled=settings.DASK_LOG_FORMAT_LOCAL_DEV_ENABLED,
68-
logger_filter_mapping=settings.DASK_LOG_FILTER_MAPPING,
69-
tracing_settings=None, # no tracing for dask sidecar
70-
)
58+
setup_app_logging(settings)
7159

7260
with log_context(_logger, logging.INFO, "Launch dask worker"):
7361
_logger.info("app settings: %s", settings.model_dump_json(indent=1))
@@ -79,6 +67,7 @@ async def dask_setup(worker: distributed.Worker) -> None:
7967

8068
loop = asyncio.get_event_loop()
8169
_logger.info("We do have a running loop in the main thread: %s", f"{loop=}")
70+
8271
if settings.DASK_SIDECAR_RABBITMQ:
8372
await worker.plugin_add(RabbitMQPlugin(settings.DASK_SIDECAR_RABBITMQ))
8473

0 commit comments

Comments
 (0)