Skip to content

Commit d558a6b

Browse files
[DOP-22729] Generate log_url on worker side (#220)
1 parent 3e3d663 commit d558a6b

File tree

9 files changed

+19
-25
lines changed

9 files changed

+19
-25
lines changed

.env.docker

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,12 @@ SYNCMASTER__BROKER__URL=amqp://guest:guest@rabbitmq:5672
1717

1818
# Server options
1919
SYNCMASTER__SERVER__SESSION__SECRET_KEY=generate_some_random_string
20-
SYNCMASTER__SERVER__LOG_URL_TEMPLATE=https://logs.location.example.com/syncmaster-worker?correlation_id={{ correlation_id }}&run_id={{ run.id }}
2120
# !!! NEVER USE ON PRODUCTION !!!
2221
SYNCMASTER__SERVER__DEBUG=true
2322

23+
# Worker options
24+
SYNCMASTER__WORKER__LOG_URL_TEMPLATE=https://logs.location.example.com/syncmaster-worker?correlation_id={{ correlation_id }}&run_id={{ run.id }}
25+
2426
# Keycloak Auth
2527
#SYNCMASTER__AUTH__PROVIDER=syncmaster.server.providers.auth.keycloak_provider.KeycloakAuthProvider
2628
SYNCMASTER__AUTH__KEYCLOAK__SERVER_URL=http://keycloak:8080

.env.local

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,12 @@ export SYNCMASTER__BROKER__URL=amqp://guest:guest@localhost:5672
1717

1818
# Server options
1919
export SYNCMASTER__SERVER__SESSION__SECRET_KEY=generate_some_random_string
20-
export SYNCMASTER__SERVER__LOG_URL_TEMPLATE="https://logs.location.example.com/syncmaster-worker?correlation_id={{ correlation_id }}&run_id={{ run.id }}"
2120
# !!! NEVER USE ON PRODUCTION !!!
2221
export SYNCMASTER__SERVER__DEBUG=true
2322

23+
# Worker options
24+
export SYNCMASTER__WORKER__LOG_URL_TEMPLATE="https://logs.location.example.com/syncmaster-worker?correlation_id={{ correlation_id }}&run_id={{ run.id }}"
25+
2426
# Keycloak Auth
2527
#export SYNCMASTER__AUTH__PROVIDER=syncmaster.server.providers.auth.keycloak_provider.KeycloakAuthProvider
2628
export SYNCMASTER__AUTH__KEYCLOAK__SERVER_URL=http://localhost:8080

docs/worker/log_url.rst

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,12 @@
33
Setting the `Run.log_url` value
44
===============================
55

6-
Each run in the system is linked to a log URL where the Celery worker logs are available. This log URL might point to an Elastic instance or another logging tool such as Grafana. The log URL is generated based on a template configured in the server.
6+
Each run in the system is linked to a log URL where the Celery worker logs are available. This log URL might point to an Elastic instance or another logging tool such as Grafana. The log URL is generated based on a template configured in the configuration.
77

88
The configuration parameter is:
99

1010
.. code-block:: bash
1111
12-
SYNCMASTER__SERVER__LOG_URL_TEMPLATE=https://grafana.example.com?correlation_id={{ correlation_id }}&run_id={{ run.id }}
12+
SYNCMASTER__WORKER__LOG_URL_TEMPLATE=https://grafana.example.com?correlation_id={{ correlation_id }}&run_id={{ run.id }}
1313
1414
You can search for each run by either its correlation id ``x-request-id`` in http headers or the ``Run.Id``.
15-

syncmaster/server/api/v1/runs.py

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,8 @@
44
from datetime import datetime
55
from typing import Annotated
66

7-
from asgi_correlation_id import correlation_id
87
from celery import Celery
98
from fastapi import APIRouter, Depends, Query
10-
from jinja2 import Template
119
from kombu.exceptions import KombuError
1210

1311
from syncmaster.db.models import RunType, Status, User
@@ -122,15 +120,6 @@ async def start_run(
122120
type=RunType.MANUAL,
123121
)
124122

125-
log_url = Template(settings.server.log_url_template).render(
126-
run=run,
127-
correlation_id=correlation_id.get(),
128-
)
129-
run = await unit_of_work.run.update(
130-
run_id=run.id,
131-
log_url=log_url,
132-
)
133-
134123
try:
135124
await asyncio.to_thread(
136125
celery.send_task,

syncmaster/server/settings/server/__init__.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,6 @@ class ServerSettings(BaseModel):
3535
""",
3636
),
3737
)
38-
log_url_template: str = Field(
39-
"",
40-
description=":ref:`URL template to access worker logs <worker-log-url>`",
41-
)
4238
request_id: RequestIDSettings = Field(
4339
default_factory=RequestIDSettings,
4440
)

syncmaster/worker/settings/__init__.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,17 @@ class WorkerSettings(BaseSettings):
2222
.. code-block:: bash
2323
2424
SYNCMASTER__WORKER__CREATE_SPARK_SESSION_FUNCTION=custom_syncmaster.spark.get_worker_spark_session
25+
SYNCMASTER__WORKER__LOG_URL_TEMPLATE=https://logs.location.example.com/syncmaster-worker?correlation_id={{ correlation_id }}&run_id={{ run.id }}
2526
"""
2627

2728
CREATE_SPARK_SESSION_FUNCTION: ImportString = Field(
2829
"syncmaster.worker.spark.get_worker_spark_session",
2930
description="Function to create Spark session for worker",
3031
)
32+
log_url_template: str = Field(
33+
"",
34+
description=":ref:`URL template to access worker logs <worker-log-url>`",
35+
)
3136

3237

3338
class WorkerAppSettings(BaseSettings):

syncmaster/worker/transfer.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@
22
# SPDX-License-Identifier: Apache-2.0
33
from datetime import datetime, timezone
44

5+
from asgi_correlation_id import correlation_id
56
from asgi_correlation_id.extensions.celery import load_correlation_ids
67
from celery import Celery
78
from celery.signals import after_setup_task_logger
89
from celery.utils.log import get_task_logger
10+
from jinja2 import Template
911
from sqlalchemy import select
1012
from sqlalchemy.orm import Session, selectinload
1113

@@ -20,8 +22,6 @@
2022
logger = get_task_logger(__name__)
2123
load_correlation_ids()
2224

23-
WORKER_SETTINGS = WorkerAppSettings()
24-
2525

2626
@celery.task(name="run_transfer_task", bind=True, track_started=True)
2727
def run_transfer_task(self: Celery, run_id: int) -> None:
@@ -50,6 +50,7 @@ def run_transfer(session: Session, run_id: int, settings: WorkerAppSettings):
5050

5151
run.status = Status.STARTED
5252
run.started_at = datetime.now(tz=timezone.utc)
53+
run.log_url = Template(settings.worker.log_url_template).render(run=run, correlation_id=correlation_id.get())
5354
session.add(run)
5455
session.commit()
5556

@@ -61,7 +62,7 @@ def run_transfer(session: Session, run_id: int, settings: WorkerAppSettings):
6162

6263
try:
6364
controller = TransferController(
64-
settings=WORKER_SETTINGS,
65+
settings=settings,
6566
run=run,
6667
source_connection=run.transfer.source_connection,
6768
target_connection=run.transfer.target_connection,

tests/test_unit/test_runs/test_create_run.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -157,8 +157,6 @@ async def test_superuser_can_create_run(
157157
"type": RunType.MANUAL,
158158
}
159159
assert result.status_code == 200
160-
assert "correlation_id" in response.get("log_url")
161-
assert "run_id" in response.get("log_url")
162160
mock_to_thread.assert_awaited_once_with(
163161
mock_send_task,
164162
"run_transfer_task",

tests/utils.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,8 @@ async def run_transfer_and_verify(
171171
token=user.token,
172172
)
173173
assert run_data["status"] == Status.FINISHED.value
174+
assert "correlation_id" in run_data["log_url"]
175+
assert "run_id" in run_data["log_url"]
174176
verify_transfer_auth_data(run_data, source_auth, target_auth)
175177

176178
return run_data

0 commit comments

Comments
 (0)