Skip to content

Commit 53f823f

Browse files
authored
šŸ›Computational backend: fix issue where job_id is inexistent + logs improvements (#8395)
1 parent 0abee9b commit 53f823f

File tree

3 files changed

+99
-112
lines changed

3 files changed

+99
-112
lines changed

ā€Žservices/autoscaling/src/simcore_service_autoscaling/modules/cluster_scaling/_auto_scaling_core.pyā€Ž

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -575,8 +575,8 @@ def _try_assign_task_to_ec2_instance(
575575
_logger.debug(
576576
"%s",
577577
f"assigned task with {task_required_resources=}, {task_required_ec2_instance=} to "
578-
f"{instance.ec2_instance.id=}:{instance.ec2_instance.type}, "
579-
f"remaining resources:{instance.available_resources}/{instance.ec2_instance.resources}",
578+
f"{instance.ec2_instance.id=}:{instance.ec2_instance.type=}, "
579+
f"{instance.available_resources=}, {instance.ec2_instance.resources=}",
580580
)
581581
return True
582582
return False
@@ -599,8 +599,8 @@ def _try_assign_task_to_ec2_instance_type(
599599
_logger.debug(
600600
"%s",
601601
f"assigned task with {task_required_resources=}, {task_required_ec2_instance=} to "
602-
f"{instance.instance_type}, "
603-
f"remaining resources:{instance.available_resources}/{instance.instance_type.resources}",
602+
f"{instance.instance_type=}, "
603+
f"{instance.available_resources=}, {instance.instance_type.resources=}",
604604
)
605605
return True
606606
return False
@@ -1217,7 +1217,10 @@ async def _scale_down_unused_cluster_instances(
12171217
) -> Cluster:
12181218
if any(not instance.has_assigned_tasks() for instance in cluster.active_nodes):
12191219
# ask the provider to try to retire nodes actively
1220-
with log_catch(_logger, reraise=False):
1220+
with (
1221+
log_catch(_logger, reraise=False),
1222+
log_context(_logger, logging.INFO, "actively ask to retire unused nodes"),
1223+
):
12211224
await auto_scaling_mode.try_retire_nodes(app)
12221225
cluster = await _deactivate_empty_nodes(app, cluster)
12231226
return await _try_scale_down_cluster(app, cluster)

ā€Žservices/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_dask.pyā€Ž

Lines changed: 91 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
TaskProgressEvent,
1313
)
1414
from dask_task_models_library.container_tasks.io import TaskOutputData
15-
from dask_task_models_library.container_tasks.utils import parse_dask_job_id
1615
from models_library.clusters import BaseCluster
1716
from models_library.errors import ErrorDict
1817
from models_library.projects import ProjectID
@@ -370,17 +369,16 @@ async def _process_completed_tasks(
370369
self._process_task_result(
371370
task,
372371
result,
373-
comp_run.metadata,
374372
iteration,
375-
comp_run.run_id,
373+
comp_run,
376374
)
377375
for task, result in zip(tasks, tasks_results, strict=True)
378376
),
379377
limit=MAX_CONCURRENT_PIPELINE_SCHEDULING,
380378
):
381379
with log_catch(_logger, reraise=False):
382380
task_can_be_cleaned, job_id = await future
383-
if task_can_be_cleaned:
381+
if task_can_be_cleaned and job_id:
384382
await client.release_task_result(job_id)
385383

386384
async def _handle_successful_run(
@@ -411,11 +409,9 @@ async def _handle_successful_run(
411409
async def _handle_computational_retrieval_error(
412410
self,
413411
task: CompTaskAtDB,
414-
user_id: UserID,
415412
result: ComputationalBackendTaskResultsNotReadyError,
416413
log_error_context: dict[str, Any],
417414
) -> tuple[RunningState, SimcorePlatformStatus, list[ErrorDict], bool]:
418-
assert task.job_id # nosec
419415
_logger.warning(
420416
**create_troubleshooting_log_kwargs(
421417
f"Retrieval of task {task.job_id} result timed-out",
@@ -448,10 +444,7 @@ async def _handle_computational_retrieval_error(
448444
type=_TASK_RETRIEVAL_ERROR_TYPE,
449445
ctx={
450446
_TASK_RETRIEVAL_ERROR_CONTEXT_TIME_KEY: f"{check_time}",
451-
"user_id": user_id,
452-
"project_id": f"{task.project_id}",
453-
"node_id": f"{task.node_id}",
454-
"job_id": task.job_id,
447+
**log_error_context,
455448
},
456449
)
457450
)
@@ -472,7 +465,6 @@ async def _handle_computational_backend_not_connected_error(
472465
result: ComputationalBackendNotConnectedError,
473466
log_error_context: dict[str, Any],
474467
) -> tuple[RunningState, SimcorePlatformStatus, list[ErrorDict], bool]:
475-
assert task.job_id # nosec
476468
_logger.warning(
477469
**create_troubleshooting_log_kwargs(
478470
f"Computational backend disconnected when retrieving task {task.job_id} result",
@@ -492,8 +484,6 @@ async def _handle_task_error(
492484
result: BaseException,
493485
log_error_context: dict[str, Any],
494486
) -> tuple[RunningState, SimcorePlatformStatus, list[ErrorDict], bool]:
495-
assert task.job_id # nosec
496-
497487
# the task itself failed, check why
498488
if isinstance(result, TaskCancelledError):
499489
_logger.info(
@@ -529,102 +519,100 @@ async def _process_task_result(
529519
self,
530520
task: TaskStateTracker,
531521
result: BaseException | TaskOutputData,
532-
run_metadata: RunMetadataDict,
533522
iteration: Iteration,
534-
run_id: PositiveInt,
535-
) -> tuple[bool, str]:
523+
comp_run: CompRunsAtDB,
524+
) -> tuple[bool, str | None]:
536525
"""Returns True and the job ID if the task was successfully processed and can be released from the Dask cluster."""
537-
_logger.debug("received %s result: %s", f"{task=}", f"{result=}")
538-
assert task.current.job_id # nosec
539-
(
540-
_service_key,
541-
_service_version,
542-
user_id,
543-
project_id,
544-
node_id,
545-
) = parse_dask_job_id(task.current.job_id)
546-
547-
assert task.current.project_id == project_id # nosec
548-
assert task.current.node_id == node_id # nosec
549-
log_error_context = {
550-
"user_id": user_id,
551-
"project_id": project_id,
552-
"node_id": node_id,
553-
"job_id": task.current.job_id,
554-
}
555-
556-
if isinstance(result, TaskOutputData):
557-
(
558-
task_final_state,
559-
simcore_platform_status,
560-
task_errors,
561-
task_completed,
562-
) = await self._handle_successful_run(
563-
task.current, result, log_error_context
564-
)
526+
with log_context(
527+
_logger, logging.DEBUG, msg=f"{comp_run.run_id=}, {task=}, {result=}"
528+
):
529+
log_error_context = {
530+
"user_id": comp_run.user_id,
531+
"project_id": f"{comp_run.project_uuid}",
532+
"node_id": f"{task.current.node_id}",
533+
"job_id": task.current.job_id,
534+
}
535+
536+
if isinstance(result, TaskOutputData):
537+
(
538+
task_final_state,
539+
simcore_platform_status,
540+
task_errors,
541+
task_completed,
542+
) = await self._handle_successful_run(
543+
task.current, result, log_error_context
544+
)
565545

566-
elif isinstance(result, ComputationalBackendTaskResultsNotReadyError):
567-
(
568-
task_final_state,
569-
simcore_platform_status,
570-
task_errors,
571-
task_completed,
572-
) = await self._handle_computational_retrieval_error(
573-
task.current, user_id, result, log_error_context
574-
)
575-
elif isinstance(result, ComputationalBackendNotConnectedError):
576-
(
577-
task_final_state,
578-
simcore_platform_status,
579-
task_errors,
580-
task_completed,
581-
) = await self._handle_computational_backend_not_connected_error(
582-
task.current, result, log_error_context
583-
)
584-
else:
585-
(
586-
task_final_state,
587-
simcore_platform_status,
588-
task_errors,
589-
task_completed,
590-
) = await self._handle_task_error(task.current, result, log_error_context)
591-
592-
# we need to remove any invalid files in the storage
593-
await clean_task_output_and_log_files_if_invalid(
594-
self.db_engine, user_id, project_id, node_id
595-
)
546+
elif isinstance(result, ComputationalBackendTaskResultsNotReadyError):
547+
(
548+
task_final_state,
549+
simcore_platform_status,
550+
task_errors,
551+
task_completed,
552+
) = await self._handle_computational_retrieval_error(
553+
task.current, result, log_error_context
554+
)
555+
elif isinstance(result, ComputationalBackendNotConnectedError):
556+
(
557+
task_final_state,
558+
simcore_platform_status,
559+
task_errors,
560+
task_completed,
561+
) = await self._handle_computational_backend_not_connected_error(
562+
task.current, result, log_error_context
563+
)
564+
else:
565+
(
566+
task_final_state,
567+
simcore_platform_status,
568+
task_errors,
569+
task_completed,
570+
) = await self._handle_task_error(
571+
task.current, result, log_error_context
572+
)
596573

597-
if task_completed:
598-
# resource tracking
599-
await publish_service_resource_tracking_stopped(
600-
self.rabbitmq_client,
601-
ServiceRunID.get_resource_tracking_run_id_for_computational(
602-
user_id, project_id, node_id, iteration
603-
),
604-
simcore_platform_status=simcore_platform_status,
605-
)
606-
# instrumentation
607-
await publish_service_stopped_metrics(
608-
self.rabbitmq_client,
609-
user_id=user_id,
610-
simcore_user_agent=run_metadata.get(
611-
"simcore_user_agent", UNDEFINED_DEFAULT_SIMCORE_USER_AGENT_VALUE
612-
),
613-
task=task.current,
614-
task_final_state=task_final_state,
615-
)
574+
# we need to remove any invalid files in the storage
575+
await clean_task_output_and_log_files_if_invalid(
576+
self.db_engine,
577+
comp_run.user_id,
578+
comp_run.project_uuid,
579+
task.current.node_id,
580+
)
616581

617-
await CompTasksRepository(self.db_engine).update_project_tasks_state(
618-
task.current.project_id,
619-
run_id,
620-
[task.current.node_id],
621-
task_final_state if task_completed else task.previous.state,
622-
errors=task_errors,
623-
optional_progress=1 if task_completed else None,
624-
optional_stopped=arrow.utcnow().datetime if task_completed else None,
625-
)
582+
if task_completed:
583+
# resource tracking
584+
await publish_service_resource_tracking_stopped(
585+
self.rabbitmq_client,
586+
ServiceRunID.get_resource_tracking_run_id_for_computational(
587+
comp_run.user_id,
588+
comp_run.project_uuid,
589+
task.current.node_id,
590+
iteration,
591+
),
592+
simcore_platform_status=simcore_platform_status,
593+
)
594+
# instrumentation
595+
await publish_service_stopped_metrics(
596+
self.rabbitmq_client,
597+
user_id=comp_run.user_id,
598+
simcore_user_agent=comp_run.metadata.get(
599+
"simcore_user_agent", UNDEFINED_DEFAULT_SIMCORE_USER_AGENT_VALUE
600+
),
601+
task=task.current,
602+
task_final_state=task_final_state,
603+
)
604+
605+
await CompTasksRepository(self.db_engine).update_project_tasks_state(
606+
task.current.project_id,
607+
comp_run.run_id,
608+
[task.current.node_id],
609+
task_final_state if task_completed else task.previous.state,
610+
errors=task_errors,
611+
optional_progress=1 if task_completed else None,
612+
optional_stopped=arrow.utcnow().datetime if task_completed else None,
613+
)
626614

627-
return task_completed, task.current.job_id
615+
return task_completed, task.current.job_id
628616

629617
async def _task_progress_change_handler(
630618
self, event: tuple[UnixTimestamp, Any]

ā€Žservices/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_pipelines.pyā€Ž

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,16 +42,12 @@ async def upsert_pipeline(
4242
**pipeline_at_db.model_dump(mode="json", by_alias=True)
4343
)
4444
# FIXME: This is not a nice thing. this part of the information should be kept in comp_runs.
45-
update_exclusion_policy = set()
46-
if not dag_graph.nodes():
47-
update_exclusion_policy.add("dag_adjacency_list")
4845
on_update_stmt = insert_stmt.on_conflict_do_update(
4946
index_elements=[comp_pipeline.c.project_id],
5047
set_=pipeline_at_db.model_dump(
5148
mode="json",
5249
by_alias=True,
5350
exclude_unset=True,
54-
exclude=update_exclusion_policy,
5551
),
5652
)
5753
async with self.db_engine.begin() as conn:

0 commit comments

Comments
Ā (0)