Skip to content

Commit 705b140

Browse files
author
maxim-lixakov
committed
[DOP-22268] - move spark logs to json format
1 parent 7e4d1bc commit 705b140

File tree

3 files changed

+13
-7
lines changed

3 files changed

+13
-7
lines changed

syncmaster/settings/log/json.yml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@ filters:
99
(): asgi_correlation_id.CorrelationIdFilter
1010
uuid_length: 32
1111
default_value: '-'
12+
celery_tracing:
13+
(): asgi_correlation_id.CeleryTracingIdsFilter
14+
uuid_length: 32
1215

1316
formatters:
1417
json:
@@ -26,7 +29,7 @@ handlers:
2629
celery:
2730
class: logging.StreamHandler
2831
formatter: json
29-
filters: [correlation_id]
32+
filters: [correlation_id, celery_tracing]
3033
stream: ext://sys.stdout
3134

3235
loggers:

syncmaster/worker/settings/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ class WorkerSettings(BaseSettings):
2424
"""
2525

2626
CORRELATION_CELERY_HEADER_ID: str = Field(
27-
"CORRELATION_CELERY_HEADER_ID",
27+
"CORRELATION_ID",
2828
description="Header ID for correlation in Celery",
2929
)
3030
CREATE_SPARK_SESSION_FUNCTION: ImportString = Field(

syncmaster/worker/transfer.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
22
# SPDX-License-Identifier: Apache-2.0
3-
import logging
43
from datetime import datetime, timezone
54

6-
import onetl
75
from asgi_correlation_id import correlation_id
6+
from asgi_correlation_id.extensions.celery import (
7+
load_celery_current_and_parent_ids,
8+
load_correlation_ids,
9+
)
810
from celery import Celery
9-
from celery.signals import after_setup_logger, before_task_publish, task_prerun
11+
from celery.signals import after_setup_task_logger, before_task_publish, task_prerun
1012
from celery.utils.log import get_task_logger
1113
from sqlalchemy import select
1214
from sqlalchemy.orm import Session, selectinload
@@ -20,14 +22,15 @@
2022
from syncmaster.worker.settings import WorkerAppSettings
2123

2224
logger = get_task_logger(__name__)
25+
load_correlation_ids()
26+
load_celery_current_and_parent_ids()
2327

2428
WORKER_SETTINGS = WorkerAppSettings()
2529
CORRELATION_CELERY_HEADER_ID = WORKER_SETTINGS.worker.CORRELATION_CELERY_HEADER_ID
2630

2731

2832
@celery.task(name="run_transfer_task", bind=True, track_started=True)
2933
def run_transfer_task(self: Celery, run_id: int) -> None:
30-
onetl.log.setup_logging(level=logging.INFO)
3134
with Session(self.engine) as session:
3235
run_transfer(
3336
session=session,
@@ -83,7 +86,7 @@ def run_transfer(session: Session, run_id: int, settings: WorkerAppSettings):
8386
session.commit()
8487

8588

86-
@after_setup_logger.connect
89+
@after_setup_task_logger.connect
8790
def setup_loggers(*args, **kwargs):
8891
setup_logging(WorkerAppSettings().logging.get_log_config_path())
8992

0 commit comments

Comments
 (0)