Skip to content

Commit 764671e

Browse files
committed
remove usage of job_id
1 parent 0abee9b commit 764671e

File tree

1 file changed

+90
-93
lines changed
  • services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler

1 file changed

+90
-93
lines changed

services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_dask.py

Lines changed: 90 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
TaskProgressEvent,
1313
)
1414
from dask_task_models_library.container_tasks.io import TaskOutputData
15-
from dask_task_models_library.container_tasks.utils import parse_dask_job_id
1615
from models_library.clusters import BaseCluster
1716
from models_library.errors import ErrorDict
1817
from models_library.projects import ProjectID
@@ -370,9 +369,8 @@ async def _process_completed_tasks(
370369
self._process_task_result(
371370
task,
372371
result,
373-
comp_run.metadata,
374372
iteration,
375-
comp_run.run_id,
373+
comp_run,
376374
)
377375
for task, result in zip(tasks, tasks_results, strict=True)
378376
),
@@ -381,6 +379,7 @@ async def _process_completed_tasks(
381379
with log_catch(_logger, reraise=False):
382380
task_can_be_cleaned, job_id = await future
383381
if task_can_be_cleaned:
382+
assert job_id is not None # nosec
384383
await client.release_task_result(job_id)
385384

386385
async def _handle_successful_run(
@@ -529,102 +528,100 @@ async def _process_task_result(
529528
self,
530529
task: TaskStateTracker,
531530
result: BaseException | TaskOutputData,
532-
run_metadata: RunMetadataDict,
533531
iteration: Iteration,
534-
run_id: PositiveInt,
535-
) -> tuple[bool, str]:
532+
comp_run: CompRunsAtDB,
533+
) -> tuple[bool, str | None]:
536534
"""Returns True and the job ID if the task was successfully processed and can be released from the Dask cluster."""
537-
_logger.debug("received %s result: %s", f"{task=}", f"{result=}")
538-
assert task.current.job_id # nosec
539-
(
540-
_service_key,
541-
_service_version,
542-
user_id,
543-
project_id,
544-
node_id,
545-
) = parse_dask_job_id(task.current.job_id)
546-
547-
assert task.current.project_id == project_id # nosec
548-
assert task.current.node_id == node_id # nosec
549-
log_error_context = {
550-
"user_id": user_id,
551-
"project_id": project_id,
552-
"node_id": node_id,
553-
"job_id": task.current.job_id,
554-
}
555-
556-
if isinstance(result, TaskOutputData):
557-
(
558-
task_final_state,
559-
simcore_platform_status,
560-
task_errors,
561-
task_completed,
562-
) = await self._handle_successful_run(
563-
task.current, result, log_error_context
564-
)
535+
with log_context(
536+
_logger, logging.DEBUG, msg=f"{comp_run.run_id=}, {task=}, {result=}"
537+
):
538+
log_error_context = {
539+
"user_id": comp_run.user_id,
540+
"project_id": comp_run.project_uuid,
541+
"node_id": task.current.node_id,
542+
"job_id": task.current.job_id,
543+
}
544+
545+
if isinstance(result, TaskOutputData):
546+
(
547+
task_final_state,
548+
simcore_platform_status,
549+
task_errors,
550+
task_completed,
551+
) = await self._handle_successful_run(
552+
task.current, result, log_error_context
553+
)
565554

566-
elif isinstance(result, ComputationalBackendTaskResultsNotReadyError):
567-
(
568-
task_final_state,
569-
simcore_platform_status,
570-
task_errors,
571-
task_completed,
572-
) = await self._handle_computational_retrieval_error(
573-
task.current, user_id, result, log_error_context
574-
)
575-
elif isinstance(result, ComputationalBackendNotConnectedError):
576-
(
577-
task_final_state,
578-
simcore_platform_status,
579-
task_errors,
580-
task_completed,
581-
) = await self._handle_computational_backend_not_connected_error(
582-
task.current, result, log_error_context
583-
)
584-
else:
585-
(
586-
task_final_state,
587-
simcore_platform_status,
588-
task_errors,
589-
task_completed,
590-
) = await self._handle_task_error(task.current, result, log_error_context)
591-
592-
# we need to remove any invalid files in the storage
593-
await clean_task_output_and_log_files_if_invalid(
594-
self.db_engine, user_id, project_id, node_id
595-
)
555+
elif isinstance(result, ComputationalBackendTaskResultsNotReadyError):
556+
(
557+
task_final_state,
558+
simcore_platform_status,
559+
task_errors,
560+
task_completed,
561+
) = await self._handle_computational_retrieval_error(
562+
task.current, comp_run.user_id, result, log_error_context
563+
)
564+
elif isinstance(result, ComputationalBackendNotConnectedError):
565+
(
566+
task_final_state,
567+
simcore_platform_status,
568+
task_errors,
569+
task_completed,
570+
) = await self._handle_computational_backend_not_connected_error(
571+
task.current, result, log_error_context
572+
)
573+
else:
574+
(
575+
task_final_state,
576+
simcore_platform_status,
577+
task_errors,
578+
task_completed,
579+
) = await self._handle_task_error(
580+
task.current, result, log_error_context
581+
)
596582

597-
if task_completed:
598-
# resource tracking
599-
await publish_service_resource_tracking_stopped(
600-
self.rabbitmq_client,
601-
ServiceRunID.get_resource_tracking_run_id_for_computational(
602-
user_id, project_id, node_id, iteration
603-
),
604-
simcore_platform_status=simcore_platform_status,
605-
)
606-
# instrumentation
607-
await publish_service_stopped_metrics(
608-
self.rabbitmq_client,
609-
user_id=user_id,
610-
simcore_user_agent=run_metadata.get(
611-
"simcore_user_agent", UNDEFINED_DEFAULT_SIMCORE_USER_AGENT_VALUE
612-
),
613-
task=task.current,
614-
task_final_state=task_final_state,
615-
)
583+
# we need to remove any invalid files in the storage
584+
await clean_task_output_and_log_files_if_invalid(
585+
self.db_engine,
586+
comp_run.user_id,
587+
comp_run.project_uuid,
588+
task.current.node_id,
589+
)
616590

617-
await CompTasksRepository(self.db_engine).update_project_tasks_state(
618-
task.current.project_id,
619-
run_id,
620-
[task.current.node_id],
621-
task_final_state if task_completed else task.previous.state,
622-
errors=task_errors,
623-
optional_progress=1 if task_completed else None,
624-
optional_stopped=arrow.utcnow().datetime if task_completed else None,
625-
)
591+
if task_completed:
592+
# resource tracking
593+
await publish_service_resource_tracking_stopped(
594+
self.rabbitmq_client,
595+
ServiceRunID.get_resource_tracking_run_id_for_computational(
596+
comp_run.user_id,
597+
comp_run.project_uuid,
598+
task.current.node_id,
599+
iteration,
600+
),
601+
simcore_platform_status=simcore_platform_status,
602+
)
603+
# instrumentation
604+
await publish_service_stopped_metrics(
605+
self.rabbitmq_client,
606+
user_id=comp_run.user_id,
607+
simcore_user_agent=comp_run.metadata.get(
608+
"simcore_user_agent", UNDEFINED_DEFAULT_SIMCORE_USER_AGENT_VALUE
609+
),
610+
task=task.current,
611+
task_final_state=task_final_state,
612+
)
613+
614+
await CompTasksRepository(self.db_engine).update_project_tasks_state(
615+
task.current.project_id,
616+
comp_run.run_id,
617+
[task.current.node_id],
618+
task_final_state if task_completed else task.previous.state,
619+
errors=task_errors,
620+
optional_progress=1 if task_completed else None,
621+
optional_stopped=arrow.utcnow().datetime if task_completed else None,
622+
)
626623

627-
return task_completed, task.current.job_id
624+
return task_completed, task.current.job_id
628625

629626
async def _task_progress_change_handler(
630627
self, event: tuple[UnixTimestamp, Any]

0 commit comments

Comments
 (0)