|
2 | 2 | # SPDX-License-Identifier: Apache-2.0 |
3 | 3 | from datetime import datetime, timezone |
4 | 4 |
|
5 | | -from asgi_correlation_id import correlation_id |
6 | 5 | from asgi_correlation_id.extensions.celery import load_correlation_ids |
7 | 6 | from celery import Celery |
8 | | -from celery.signals import after_setup_task_logger, before_task_publish, task_prerun |
| 7 | +from celery.signals import after_setup_task_logger |
9 | 8 | from celery.utils.log import get_task_logger |
10 | 9 | from sqlalchemy import select |
11 | 10 | from sqlalchemy.orm import Session, selectinload |
|
22 | 21 | load_correlation_ids() |
23 | 22 |
|
24 | 23 | WORKER_SETTINGS = WorkerAppSettings() |
25 | | -CORRELATION_CELERY_HEADER_ID = WORKER_SETTINGS.worker.CORRELATION_CELERY_HEADER_ID |
26 | 24 |
|
27 | 25 |
|
28 | 26 | @celery.task(name="run_transfer_task", bind=True, track_started=True) |
@@ -85,18 +83,3 @@ def run_transfer(session: Session, run_id: int, settings: WorkerAppSettings): |
85 | 83 | @after_setup_task_logger.connect |
86 | 84 | def setup_loggers(*args, **kwargs): |
87 | 85 | setup_logging(WorkerAppSettings().logging.get_log_config_path()) |
88 | | - |
89 | | - |
90 | | -@before_task_publish.connect() |
91 | | -def transfer_correlation_id(headers, *args, **kwargs) -> None: |
92 | | - # This is called before task.delay() finishes |
93 | | - # Here we're able to transfer the correlation ID via the headers kept in our backend |
94 | | - headers[CORRELATION_CELERY_HEADER_ID] = correlation_id.get() |
95 | | - |
96 | | - |
97 | | -@task_prerun.connect() |
98 | | -def load_correlation_id(task, *args, **kwargs) -> None: |
99 | | - # This is called when the worker picks up the task |
100 | | - # Here we're able to load the correlation ID from the headers |
101 | | - id_value = task.request.get(CORRELATION_CELERY_HEADER_ID) |
102 | | - correlation_id.set(id_value) |
0 commit comments