Skip to content

Commit 8b58283

Browse files
author
maxim-lixakov
committed
[DOP-22268] - move spark logs to json format
1 parent 7e4d1bc commit 8b58283

File tree

2 files changed

+10
-4
lines changed

2 files changed

+10
-4
lines changed

syncmaster/settings/log/json.yml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@ filters:
99
(): asgi_correlation_id.CorrelationIdFilter
1010
uuid_length: 32
1111
default_value: '-'
12+
celery_tracing:
13+
(): asgi_correlation_id.CeleryTracingIdsFilter
14+
uuid_length: 32
1215

1316
formatters:
1417
json:
@@ -26,7 +29,7 @@ handlers:
2629
celery:
2730
class: logging.StreamHandler
2831
formatter: json
29-
filters: [correlation_id]
32+
filters: [correlation_id, celery_tracing]
3033
stream: ext://sys.stdout
3134

3235
loggers:

syncmaster/worker/transfer.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
22
# SPDX-License-Identifier: Apache-2.0
3-
import logging
43
from datetime import datetime, timezone
54

6-
import onetl
75
from asgi_correlation_id import correlation_id
6+
from asgi_correlation_id.extensions.celery import (
7+
load_celery_current_and_parent_ids,
8+
load_correlation_ids,
9+
)
810
from celery import Celery
911
from celery.signals import after_setup_logger, before_task_publish, task_prerun
1012
from celery.utils.log import get_task_logger
@@ -20,14 +22,15 @@
2022
from syncmaster.worker.settings import WorkerAppSettings
2123

2224
logger = get_task_logger(__name__)
25+
load_correlation_ids()
26+
load_celery_current_and_parent_ids()
2327

2428
WORKER_SETTINGS = WorkerAppSettings()
2529
CORRELATION_CELERY_HEADER_ID = WORKER_SETTINGS.worker.CORRELATION_CELERY_HEADER_ID
2630

2731

2832
@celery.task(name="run_transfer_task", bind=True, track_started=True)
2933
def run_transfer_task(self: Celery, run_id: int) -> None:
30-
onetl.log.setup_logging(level=logging.INFO)
3134
with Session(self.engine) as session:
3235
run_transfer(
3336
session=session,

0 commit comments

Comments
 (0)