Skip to content

Commit 73cf17b

Browse files
author
maxim-lixakov
committed
[DOP-22268] - remove CORRELATION_CELERY_HEADER_ID setting
1 parent 07f518f commit 73cf17b

File tree

4 files changed

+4
-31
lines changed

4 files changed

+4
-31
lines changed

docs/worker/log_url.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,5 +9,5 @@ The configuration parameter is:
99
1010
SYNCMASTER__SERVER__LOG_URL_TEMPLATE=https://grafana.example.com?correlation_id={{ correlation_id }}&run_id={{ run.id }}
1111
12-
You can search for each run by either its correlation id ``CORRELATION_CELERY_HEADER_ID`` in http headers or the ``Run.Id``.
12+
You can search for each run by either its correlation id ``x-request-id`` in http headers or the ``Run.Id``.
1313

syncmaster/settings/log/json.yml

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,6 @@ filters:
99
(): asgi_correlation_id.CorrelationIdFilter
1010
uuid_length: 32
1111
default_value: '-'
12-
celery_tracing:
13-
(): asgi_correlation_id.CeleryTracingIdsFilter
14-
uuid_length: 32
1512

1613
formatters:
1714
json:
@@ -29,7 +26,7 @@ handlers:
2926
celery:
3027
class: logging.StreamHandler
3128
formatter: json
32-
filters: [correlation_id, celery_tracing]
29+
filters: [correlation_id]
3330
stream: ext://sys.stdout
3431

3532
loggers:

syncmaster/worker/settings/__init__.py

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,9 @@ class WorkerSettings(BaseSettings):
2020
2121
.. code-block:: bash
2222
23-
SYNCMASTER__WORKER__CORRELATION_CELERY_HEADER_ID=CORRELATION_ID_CELERY
23+
SYNCMASTER__WORKER__CREATE_SPARK_SESSION_FUNCTION=custom_syncmaster.spark.get_worker_spark_session
2424
"""
2525

26-
CORRELATION_CELERY_HEADER_ID: str = Field(
27-
"CORRELATION_ID",
28-
description="Header ID for correlation in Celery",
29-
)
3026
CREATE_SPARK_SESSION_FUNCTION: ImportString = Field(
3127
"syncmaster.worker.spark.get_worker_spark_session",
3228
description="Function to create Spark session for worker",
@@ -51,9 +47,6 @@ class WorkerAppSettings(BaseSettings):
5147
5248
.. code-block:: bash
5349
54-
# Example of setting a CORRELATION_CELERY_HEADER_ID via environment variable
55-
SYNCMASTER__WORKER__CORRELATION_CELERY_HEADER_ID=CORRELATION_ID_CELERY
56-
5750
# Example of setting a database URL via environment variable
5851
SYNCMASTER__DATABASE__URL=postgresql+asyncpg://user:password@localhost:5432/dbname
5952

syncmaster/worker/transfer.py

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,9 @@
22
# SPDX-License-Identifier: Apache-2.0
33
from datetime import datetime, timezone
44

5-
from asgi_correlation_id import correlation_id
65
from asgi_correlation_id.extensions.celery import load_correlation_ids
76
from celery import Celery
8-
from celery.signals import after_setup_task_logger, before_task_publish, task_prerun
7+
from celery.signals import after_setup_task_logger
98
from celery.utils.log import get_task_logger
109
from sqlalchemy import select
1110
from sqlalchemy.orm import Session, selectinload
@@ -22,7 +21,6 @@
2221
load_correlation_ids()
2322

2423
WORKER_SETTINGS = WorkerAppSettings()
25-
CORRELATION_CELERY_HEADER_ID = WORKER_SETTINGS.worker.CORRELATION_CELERY_HEADER_ID
2624

2725

2826
@celery.task(name="run_transfer_task", bind=True, track_started=True)
@@ -85,18 +83,3 @@ def run_transfer(session: Session, run_id: int, settings: WorkerAppSettings):
8583
@after_setup_task_logger.connect
8684
def setup_loggers(*args, **kwargs):
8785
setup_logging(WorkerAppSettings().logging.get_log_config_path())
88-
89-
90-
@before_task_publish.connect()
91-
def transfer_correlation_id(headers, *args, **kwargs) -> None:
92-
# This is called before task.delay() finishes
93-
# Here we're able to transfer the correlation ID via the headers kept in our backend
94-
headers[CORRELATION_CELERY_HEADER_ID] = correlation_id.get()
95-
96-
97-
@task_prerun.connect()
98-
def load_correlation_id(task, *args, **kwargs) -> None:
99-
# This is called when the worker picks up the task
100-
# Here we're able to load the correlation ID from the headers
101-
id_value = task.request.get(CORRELATION_CELERY_HEADER_ID)
102-
correlation_id.set(id_value)

0 commit comments

Comments
 (0)