Skip to content

Commit 800a355

Browse files
committed
removed logs sub
1 parent 072d9aa commit 800a355

File tree

4 files changed

+1
-36
lines changed

4 files changed

+1
-36
lines changed

services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_dask.py

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
import arrow
99
from dask_task_models_library.container_tasks.errors import TaskCancelledError
1010
from dask_task_models_library.container_tasks.events import (
11-
TaskLogEvent,
1211
TaskProgressEvent,
1312
)
1413
from dask_task_models_library.container_tasks.io import TaskOutputData
@@ -38,7 +37,6 @@
3837
)
3938
from ...utils.dask_client_utils import TaskHandlers
4039
from ...utils.rabbitmq import (
41-
publish_service_log,
4240
publish_service_progress,
4341
publish_service_resource_tracking_stopped,
4442
publish_service_stopped_metrics,
@@ -92,7 +90,6 @@ def __post_init__(self) -> None:
9290
self.dask_clients_pool.register_handlers(
9391
TaskHandlers(
9492
self._task_progress_change_handler,
95-
self._task_log_change_handler,
9693
)
9794
)
9895

@@ -378,27 +375,3 @@ async def _task_progress_change_handler(self, event: str) -> None:
378375
node_id=node_id,
379376
progress=task_progress_event.progress,
380377
)
381-
382-
async def _task_log_change_handler(self, event: str) -> None:
383-
with log_catch(_logger, reraise=False):
384-
task_log_event = TaskLogEvent.model_validate_json(event)
385-
_logger.debug("received task log update: %s", task_log_event)
386-
await publish_service_log(
387-
self.rabbitmq_client,
388-
user_id=task_log_event.task_owner.user_id,
389-
project_id=task_log_event.task_owner.project_id,
390-
node_id=task_log_event.task_owner.node_id,
391-
log=task_log_event.log,
392-
log_level=task_log_event.log_level,
393-
)
394-
if task_log_event.task_owner.has_parent:
395-
assert task_log_event.task_owner.parent_project_id # nosec
396-
assert task_log_event.task_owner.parent_node_id # nosec
397-
await publish_service_log(
398-
self.rabbitmq_client,
399-
user_id=task_log_event.task_owner.user_id,
400-
project_id=task_log_event.task_owner.parent_project_id,
401-
node_id=task_log_event.task_owner.parent_node_id,
402-
log=task_log_event.log,
403-
log_level=task_log_event.log_level,
404-
)

services/director-v2/src/simcore_service_director_v2/modules/dask_client.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,6 @@ async def delete(self) -> None:
187187
def register_handlers(self, task_handlers: TaskHandlers) -> None:
188188
_event_consumer_map = [
189189
(self.backend.progress_sub, task_handlers.task_progress_handler),
190-
(self.backend.logs_sub, task_handlers.task_log_handler),
191190
]
192191
self._subscribed_tasks = [
193192
asyncio.create_task(

services/director-v2/src/simcore_service_director_v2/utils/dask_client_utils.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66

77
import distributed
88
from dask_task_models_library.container_tasks.events import (
9-
TaskLogEvent,
109
TaskProgressEvent,
1110
)
1211
from models_library.clusters import ClusterAuthentication, TLSAuthentication
@@ -19,7 +18,6 @@
1918
@dataclass
2019
class TaskHandlers:
2120
task_progress_handler: Callable[[str], Awaitable[None]]
22-
task_log_handler: Callable[[str], Awaitable[None]]
2321

2422

2523
logger = logging.getLogger(__name__)
@@ -30,13 +28,11 @@ class DaskSubSystem:
3028
client: distributed.Client
3129
scheduler_id: str
3230
progress_sub: distributed.Sub = field(init=False)
33-
logs_sub: distributed.Sub = field(init=False)
3431

3532
def __post_init__(self) -> None:
3633
self.progress_sub = distributed.Sub(
3734
TaskProgressEvent.topic_name(), client=self.client
3835
)
39-
self.logs_sub = distributed.Sub(TaskLogEvent.topic_name(), client=self.client)
4036

4137
async def close(self) -> None:
4238
# NOTE: if the Sub are deleted before closing the connection,

services/director-v2/tests/unit/test_modules_dask_client.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1078,9 +1078,7 @@ def fake_remote_fct(
10781078

10791079
@pytest.fixture
10801080
async def fake_task_handlers(mocker: MockerFixture) -> TaskHandlers:
1081-
return TaskHandlers(
1082-
task_progress_handler=mocker.MagicMock(), task_log_handler=mocker.MagicMock()
1083-
)
1081+
return TaskHandlers(task_progress_handler=mocker.MagicMock())
10841082

10851083

10861084
async def test_dask_sub_handlers(
@@ -1154,7 +1152,6 @@ def fake_remote_fct(
11541152
fake_task_handlers.task_progress_handler.assert_called_with(
11551153
"my name is progress"
11561154
)
1157-
fake_task_handlers.task_log_handler.assert_called_with("my name is logs")
11581155
await _assert_wait_for_cb_call(mocked_user_completed_cb)
11591156

11601157

0 commit comments

Comments
 (0)