Skip to content

Commit d8bc63c

Browse files
[DOP-22268] - move spark logs to json format (#172)
* [DOP-22268] - move spark logs to json format * [DOP-22268] - remove CORRELATION_CELERY_HEADER_ID setting
1 parent 7e4d1bc commit d8bc63c

File tree

3 files changed

+6
-31
lines changed

3 files changed

+6
-31
lines changed

docs/worker/log_url.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,5 +9,5 @@ The configuration parameter is:
99
1010
SYNCMASTER__SERVER__LOG_URL_TEMPLATE=https://grafana.example.com?correlation_id={{ correlation_id }}&run_id={{ run.id }}
1111
12-
You can search for each run by either its correlation id ``CORRELATION_CELERY_HEADER_ID`` in http headers or the ``Run.Id``.
12+
You can search for each run by either its correlation id ``x-request-id`` in http headers or the ``Run.Id``.
1313

syncmaster/worker/settings/__init__.py

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,9 @@ class WorkerSettings(BaseSettings):
2020
2121
.. code-block:: bash
2222
23-
SYNCMASTER__WORKER__CORRELATION_CELERY_HEADER_ID=CORRELATION_ID_CELERY
23+
SYNCMASTER__WORKER__CREATE_SPARK_SESSION_FUNCTION=custom_syncmaster.spark.get_worker_spark_session
2424
"""
2525

26-
CORRELATION_CELERY_HEADER_ID: str = Field(
27-
"CORRELATION_CELERY_HEADER_ID",
28-
description="Header ID for correlation in Celery",
29-
)
3026
CREATE_SPARK_SESSION_FUNCTION: ImportString = Field(
3127
"syncmaster.worker.spark.get_worker_spark_session",
3228
description="Function to create Spark session for worker",
@@ -51,9 +47,6 @@ class WorkerAppSettings(BaseSettings):
5147
5248
.. code-block:: bash
5349
54-
# Example of setting a CORRELATION_CELERY_HEADER_ID via environment variable
55-
SYNCMASTER__WORKER__CORRELATION_CELERY_HEADER_ID=CORRELATION_ID_CELERY
56-
5750
# Example of setting a database URL via environment variable
5851
SYNCMASTER__DATABASE__URL=postgresql+asyncpg://user:password@localhost:5432/dbname
5952

syncmaster/worker/transfer.py

Lines changed: 4 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,10 @@
11
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
22
# SPDX-License-Identifier: Apache-2.0
3-
import logging
43
from datetime import datetime, timezone
54

6-
import onetl
7-
from asgi_correlation_id import correlation_id
5+
from asgi_correlation_id.extensions.celery import load_correlation_ids
86
from celery import Celery
9-
from celery.signals import after_setup_logger, before_task_publish, task_prerun
7+
from celery.signals import after_setup_task_logger
108
from celery.utils.log import get_task_logger
119
from sqlalchemy import select
1210
from sqlalchemy.orm import Session, selectinload
@@ -20,14 +18,13 @@
2018
from syncmaster.worker.settings import WorkerAppSettings
2119

2220
logger = get_task_logger(__name__)
21+
load_correlation_ids()
2322

2423
WORKER_SETTINGS = WorkerAppSettings()
25-
CORRELATION_CELERY_HEADER_ID = WORKER_SETTINGS.worker.CORRELATION_CELERY_HEADER_ID
2624

2725

2826
@celery.task(name="run_transfer_task", bind=True, track_started=True)
2927
def run_transfer_task(self: Celery, run_id: int) -> None:
30-
onetl.log.setup_logging(level=logging.INFO)
3128
with Session(self.engine) as session:
3229
run_transfer(
3330
session=session,
@@ -83,21 +80,6 @@ def run_transfer(session: Session, run_id: int, settings: WorkerAppSettings):
8380
session.commit()
8481

8582

86-
@after_setup_logger.connect
83+
@after_setup_task_logger.connect
8784
def setup_loggers(*args, **kwargs):
8885
setup_logging(WorkerAppSettings().logging.get_log_config_path())
89-
90-
91-
@before_task_publish.connect()
92-
def transfer_correlation_id(headers, *args, **kwargs) -> None:
93-
# This is called before task.delay() finishes
94-
# Here we're able to transfer the correlation ID via the headers kept in our backend
95-
headers[CORRELATION_CELERY_HEADER_ID] = correlation_id.get()
96-
97-
98-
@task_prerun.connect()
99-
def load_correlation_id(task, *args, **kwargs) -> None:
100-
# This is called when the worker picks up the task
101-
# Here we're able to load the correlation ID from the headers
102-
id_value = task.request.get(CORRELATION_CELERY_HEADER_ID)
103-
correlation_id.set(id_value)

0 commit comments

Comments
 (0)