Skip to content

Commit 63ff44a

Browse files
committed
removing TaskLogEvent
1 parent 15ec26c commit 63ff44a

File tree

4 files changed

+28
-73
lines changed

4 files changed

+28
-73
lines changed

packages/dask-task-models-library/src/dask_task_models_library/container_tasks/events.py

Lines changed: 0 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
1-
import logging
21
from abc import ABC, abstractmethod
3-
from typing import TypeAlias
42

53
import dask.typing
64
from distributed.worker import get_worker
@@ -85,48 +83,3 @@ def ensure_between_0_1(cls, v):
8583
if 0 <= v <= 1:
8684
return v
8785
return min(max(0, v), 1)
88-
89-
90-
LogMessageStr: TypeAlias = str
91-
LogLevelInt: TypeAlias = int
92-
93-
94-
class TaskLogEvent(BaseTaskEvent):
95-
log: LogMessageStr
96-
log_level: LogLevelInt
97-
98-
@staticmethod
99-
def topic_name() -> str:
100-
return "task_logs"
101-
102-
@classmethod
103-
def from_dask_worker(
104-
cls, log: str, log_level: LogLevelInt, *, task_owner: TaskOwner
105-
) -> "TaskLogEvent":
106-
worker = get_worker()
107-
job_id = worker.get_current_task()
108-
return cls(
109-
job_id=_dask_key_to_dask_task_id(job_id),
110-
log=log,
111-
log_level=log_level,
112-
task_owner=task_owner,
113-
)
114-
115-
model_config = ConfigDict(
116-
json_schema_extra={
117-
"examples": [
118-
{
119-
"job_id": "simcore/services/comp/sleeper:1.1.0:projectid_ec7e595a-63ee-46a1-a04a-901b11b649f8:nodeid_39467d89-b659-4914-9359-c40b1b6d1d6d:uuid_5ee5c655-450d-4711-a3ec-32ffe16bc580",
120-
"log": "some logs",
121-
"log_level": logging.INFO,
122-
"task_owner": {
123-
"user_id": 32,
124-
"project_id": "ec7e595a-63ee-46a1-a04a-901b11b649f8",
125-
"node_id": "39467d89-b659-4914-9359-c40b1b6d1d6d",
126-
"parent_project_id": None,
127-
"parent_node_id": None,
128-
},
129-
},
130-
]
131-
}
132-
)

services/dask-sidecar/src/simcore_service_dask_sidecar/dask_utils.py

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
from dask_task_models_library.container_tasks.errors import TaskCancelledError
1010
from dask_task_models_library.container_tasks.events import (
1111
BaseTaskEvent,
12-
TaskLogEvent,
1312
TaskProgressEvent,
1413
)
1514
from dask_task_models_library.container_tasks.io import TaskCancelEventName
@@ -66,11 +65,9 @@ class TaskPublisher:
6665
task_owner: TaskOwner
6766
progress: distributed.Pub = field(init=False)
6867
_last_published_progress_value: float = -1
69-
logs: distributed.Pub = field(init=False)
7068

7169
def __post_init__(self) -> None:
7270
self.progress = distributed.Pub(TaskProgressEvent.topic_name())
73-
self.logs = distributed.Pub(TaskLogEvent.topic_name())
7471

7572
def publish_progress(self, report: ProgressReport) -> None:
7673
rounded_value = round(report.percent_value, ndigits=2)
@@ -113,12 +110,6 @@ async def publish_logs(
113110
)
114111
await rabbitmq_client.publish(parent_message.channel_name, base_message)
115112

116-
publish_event(
117-
self.logs,
118-
TaskLogEvent.from_dask_worker(
119-
log=message, log_level=log_level, task_owner=self.task_owner
120-
),
121-
)
122113
_logger.log(log_level, message)
123114

124115

services/dask-sidecar/src/simcore_service_dask_sidecar/utils/__init__.py

Whitespace-only changes.

services/dask-sidecar/tests/unit/test_tasks.py

Lines changed: 28 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import json
99
import logging
1010
import re
11-
from collections.abc import Callable, Coroutine, Iterable
11+
from collections.abc import AsyncIterator, Callable, Coroutine, Iterable
1212

1313
# copied out from dask
1414
from dataclasses import dataclass
@@ -23,10 +23,7 @@
2323
from common_library.json_serialization import json_dumps
2424
from dask_task_models_library.container_tasks.docker import DockerBasicAuth
2525
from dask_task_models_library.container_tasks.errors import ServiceRuntimeError
26-
from dask_task_models_library.container_tasks.events import (
27-
TaskLogEvent,
28-
TaskProgressEvent,
29-
)
26+
from dask_task_models_library.container_tasks.events import TaskProgressEvent
3027
from dask_task_models_library.container_tasks.io import (
3128
FileUrl,
3229
TaskInputData,
@@ -39,12 +36,14 @@
3936
)
4037
from faker import Faker
4138
from models_library.basic_types import EnvVarKey
39+
from models_library.rabbitmq_messages import LoggerRabbitMessage
4240
from models_library.services import ServiceMetaDataPublished
4341
from models_library.services_resources import BootMode
4442
from packaging import version
4543
from pydantic import AnyUrl, SecretStr, TypeAdapter
4644
from pytest_mock.plugin import MockerFixture
4745
from pytest_simcore.helpers.typing_env import EnvVarsDict
46+
from servicelib.rabbitmq._client import RabbitMQClient
4847
from settings_library.s3 import S3Settings
4948
from simcore_service_dask_sidecar.computational_sidecar.docker_utils import (
5049
LEGACY_SERVICE_LOG_FILE_NAME,
@@ -466,6 +465,19 @@ def mocked_get_image_labels(
466465
)
467466

468467

468+
@pytest.fixture
469+
async def log_rabbit_client_parser(
470+
create_rabbitmq_client: Callable[[str], RabbitMQClient], mocker: MockerFixture
471+
) -> AsyncIterator[mock.AsyncMock]:
472+
client = create_rabbitmq_client("dask_sidecar_pytest_logs_consumer")
473+
mock = mocker.AsyncMock(return_value=True)
474+
queue_name, _ = await client.subscribe(
475+
LoggerRabbitMessage.get_channel_name(), mock, exclusive_queue=False
476+
)
477+
yield mock
478+
await client.unsubscribe(queue_name)
479+
480+
469481
def test_run_computational_sidecar_real_fct(
470482
caplog_info_level: pytest.LogCaptureFixture,
471483
event_loop: asyncio.AbstractEventLoop,
@@ -474,6 +486,7 @@ def test_run_computational_sidecar_real_fct(
474486
sleeper_task: ServiceExampleParam,
475487
mocked_get_image_labels: mock.Mock,
476488
s3_settings: S3Settings,
489+
log_rabbit_client_parser: mock.AsyncMock,
477490
):
478491
output_data = run_computational_sidecar(
479492
**sleeper_task.sidecar_params(),
@@ -484,10 +497,11 @@ def test_run_computational_sidecar_real_fct(
484497
sleeper_task.service_key,
485498
sleeper_task.service_version,
486499
)
487-
for event in [TaskProgressEvent, TaskLogEvent]:
500+
for event in [TaskProgressEvent]:
488501
dask_subsystem_mock["dask_event_publish"].assert_any_call(
489502
name=event.topic_name()
490503
)
504+
log_rabbit_client_parser.assert_called_once()
491505

492506
# check that the task produces expected logs
493507
for log in sleeper_task.expected_logs:
@@ -561,13 +575,6 @@ def test_run_multiple_computational_sidecar_dask(
561575
mocked_get_image_labels.assert_called()
562576

563577

564-
@pytest.fixture
565-
def log_sub(
566-
dask_client: distributed.Client,
567-
) -> distributed.Sub:
568-
return distributed.Sub(TaskLogEvent.topic_name(), client=dask_client)
569-
570-
571578
@pytest.fixture
572579
def progress_sub(dask_client: distributed.Client) -> distributed.Sub:
573580
return distributed.Sub(TaskProgressEvent.topic_name(), client=dask_client)
@@ -579,10 +586,10 @@ def progress_sub(dask_client: distributed.Client) -> distributed.Sub:
579586
async def test_run_computational_sidecar_dask(
580587
dask_client: distributed.Client,
581588
sleeper_task: ServiceExampleParam,
582-
log_sub: distributed.Sub,
583589
progress_sub: distributed.Sub,
584590
mocked_get_image_labels: mock.Mock,
585591
s3_settings: S3Settings,
592+
log_rabbit_client_parser: mock.AsyncMock,
586593
):
587594
future = dask_client.submit(
588595
run_computational_sidecar,
@@ -607,7 +614,9 @@ async def test_run_computational_sidecar_dask(
607614
), "ordering of progress values incorrectly sorted!"
608615
assert worker_progresses[0] == 0, "missing/incorrect initial progress value"
609616
assert worker_progresses[-1] == 1, "missing/incorrect final progress value"
610-
worker_logs = [TaskLogEvent.model_validate_json(msg).log for msg in log_sub.buffer]
617+
log_rabbit_client_parser.assert_called_once()
618+
# worker_logs = [TaskLogEvent.model_validate_json(msg).log for msg in log_sub.buffer]
619+
worker_logs = []
611620
print(f"<-- we got {len(worker_logs)} lines of logs")
612621

613622
for log in sleeper_task.expected_logs:
@@ -641,9 +650,9 @@ async def test_run_computational_sidecar_dask(
641650
async def test_run_computational_sidecar_dask_does_not_lose_messages_with_pubsub(
642651
dask_client: distributed.Client,
643652
sidecar_task: Callable[..., ServiceExampleParam],
644-
log_sub: distributed.Sub,
645653
progress_sub: distributed.Sub,
646654
mocked_get_image_labels: mock.Mock,
655+
log_rabbit_client_parser: mock.AsyncMock,
647656
):
648657
mocked_get_image_labels.assert_not_called()
649658
NUMBER_OF_LOGS = 20000
@@ -679,7 +688,9 @@ async def test_run_computational_sidecar_dask_does_not_lose_messages_with_pubsub
679688
assert worker_progresses[0] == 0, "missing/incorrect initial progress value"
680689
assert worker_progresses[-1] == 1, "missing/incorrect final progress value"
681690

682-
worker_logs = [TaskLogEvent.model_validate_json(msg).log for msg in log_sub.buffer]
691+
log_rabbit_client_parser.assert_called_once()
692+
# worker_logs = [TaskLogEvent.model_validate_json(msg).log for msg in log_sub.buffer]
693+
worker_logs = []
683694
# check all the awaited logs are in there
684695
filtered_worker_logs = filter(lambda log: "This is iteration" in log, worker_logs)
685696
assert len(list(filtered_worker_logs)) == NUMBER_OF_LOGS

0 commit comments

Comments
 (0)