Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -575,8 +575,8 @@ def _try_assign_task_to_ec2_instance(
_logger.debug(
"%s",
f"assigned task with {task_required_resources=}, {task_required_ec2_instance=} to "
f"{instance.ec2_instance.id=}:{instance.ec2_instance.type}, "
f"remaining resources:{instance.available_resources}/{instance.ec2_instance.resources}",
f"{instance.ec2_instance.id=}:{instance.ec2_instance.type=}, "
f"{instance.available_resources=}, {instance.ec2_instance.resources=}",
)
return True
return False
Expand All @@ -599,8 +599,8 @@ def _try_assign_task_to_ec2_instance_type(
_logger.debug(
"%s",
f"assigned task with {task_required_resources=}, {task_required_ec2_instance=} to "
f"{instance.instance_type}, "
f"remaining resources:{instance.available_resources}/{instance.instance_type.resources}",
f"{instance.instance_type=}, "
f"{instance.available_resources=}, {instance.instance_type.resources=}",
)
return True
return False
Expand Down Expand Up @@ -1217,7 +1217,10 @@ async def _scale_down_unused_cluster_instances(
) -> Cluster:
if any(not instance.has_assigned_tasks() for instance in cluster.active_nodes):
# ask the provider to try to retire nodes actively
with log_catch(_logger, reraise=False):
with (
log_catch(_logger, reraise=False),
log_context(_logger, logging.INFO, "actively ask to retire unused nodes"),
):
await auto_scaling_mode.try_retire_nodes(app)
cluster = await _deactivate_empty_nodes(app, cluster)
return await _try_scale_down_cluster(app, cluster)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
TaskProgressEvent,
)
from dask_task_models_library.container_tasks.io import TaskOutputData
from dask_task_models_library.container_tasks.utils import parse_dask_job_id
from models_library.clusters import BaseCluster
from models_library.errors import ErrorDict
from models_library.projects import ProjectID
Expand Down Expand Up @@ -370,17 +369,16 @@ async def _process_completed_tasks(
self._process_task_result(
task,
result,
comp_run.metadata,
iteration,
comp_run.run_id,
comp_run,
)
for task, result in zip(tasks, tasks_results, strict=True)
),
limit=MAX_CONCURRENT_PIPELINE_SCHEDULING,
):
with log_catch(_logger, reraise=False):
task_can_be_cleaned, job_id = await future
if task_can_be_cleaned:
if task_can_be_cleaned and job_id:
await client.release_task_result(job_id)

async def _handle_successful_run(
Expand Down Expand Up @@ -411,11 +409,9 @@ async def _handle_successful_run(
async def _handle_computational_retrieval_error(
self,
task: CompTaskAtDB,
user_id: UserID,
result: ComputationalBackendTaskResultsNotReadyError,
log_error_context: dict[str, Any],
) -> tuple[RunningState, SimcorePlatformStatus, list[ErrorDict], bool]:
assert task.job_id # nosec
_logger.warning(
**create_troubleshooting_log_kwargs(
f"Retrieval of task {task.job_id} result timed-out",
Expand Down Expand Up @@ -448,10 +444,7 @@ async def _handle_computational_retrieval_error(
type=_TASK_RETRIEVAL_ERROR_TYPE,
ctx={
_TASK_RETRIEVAL_ERROR_CONTEXT_TIME_KEY: f"{check_time}",
"user_id": user_id,
"project_id": f"{task.project_id}",
"node_id": f"{task.node_id}",
"job_id": task.job_id,
**log_error_context,
},
)
)
Expand All @@ -472,7 +465,6 @@ async def _handle_computational_backend_not_connected_error(
result: ComputationalBackendNotConnectedError,
log_error_context: dict[str, Any],
) -> tuple[RunningState, SimcorePlatformStatus, list[ErrorDict], bool]:
assert task.job_id # nosec
_logger.warning(
**create_troubleshooting_log_kwargs(
f"Computational backend disconnected when retrieving task {task.job_id} result",
Expand All @@ -492,8 +484,6 @@ async def _handle_task_error(
result: BaseException,
log_error_context: dict[str, Any],
) -> tuple[RunningState, SimcorePlatformStatus, list[ErrorDict], bool]:
assert task.job_id # nosec

# the task itself failed, check why
if isinstance(result, TaskCancelledError):
_logger.info(
Expand Down Expand Up @@ -529,102 +519,100 @@ async def _process_task_result(
self,
task: TaskStateTracker,
result: BaseException | TaskOutputData,
run_metadata: RunMetadataDict,
iteration: Iteration,
run_id: PositiveInt,
) -> tuple[bool, str]:
comp_run: CompRunsAtDB,
) -> tuple[bool, str | None]:
"""Returns True and the job ID if the task was successfully processed and can be released from the Dask cluster."""
_logger.debug("received %s result: %s", f"{task=}", f"{result=}")
assert task.current.job_id # nosec
(
_service_key,
_service_version,
user_id,
project_id,
node_id,
) = parse_dask_job_id(task.current.job_id)

assert task.current.project_id == project_id # nosec
assert task.current.node_id == node_id # nosec
log_error_context = {
"user_id": user_id,
"project_id": project_id,
"node_id": node_id,
"job_id": task.current.job_id,
}

if isinstance(result, TaskOutputData):
(
task_final_state,
simcore_platform_status,
task_errors,
task_completed,
) = await self._handle_successful_run(
task.current, result, log_error_context
)
with log_context(
_logger, logging.DEBUG, msg=f"{comp_run.run_id=}, {task=}, {result=}"
):
log_error_context = {
"user_id": comp_run.user_id,
"project_id": comp_run.project_uuid,
"node_id": task.current.node_id,
"job_id": task.current.job_id,
}

if isinstance(result, TaskOutputData):
(
task_final_state,
simcore_platform_status,
task_errors,
task_completed,
) = await self._handle_successful_run(
task.current, result, log_error_context
)

elif isinstance(result, ComputationalBackendTaskResultsNotReadyError):
(
task_final_state,
simcore_platform_status,
task_errors,
task_completed,
) = await self._handle_computational_retrieval_error(
task.current, user_id, result, log_error_context
)
elif isinstance(result, ComputationalBackendNotConnectedError):
(
task_final_state,
simcore_platform_status,
task_errors,
task_completed,
) = await self._handle_computational_backend_not_connected_error(
task.current, result, log_error_context
)
else:
(
task_final_state,
simcore_platform_status,
task_errors,
task_completed,
) = await self._handle_task_error(task.current, result, log_error_context)

# we need to remove any invalid files in the storage
await clean_task_output_and_log_files_if_invalid(
self.db_engine, user_id, project_id, node_id
)
elif isinstance(result, ComputationalBackendTaskResultsNotReadyError):
(
task_final_state,
simcore_platform_status,
task_errors,
task_completed,
) = await self._handle_computational_retrieval_error(
task.current, result, log_error_context
)
elif isinstance(result, ComputationalBackendNotConnectedError):
(
task_final_state,
simcore_platform_status,
task_errors,
task_completed,
) = await self._handle_computational_backend_not_connected_error(
task.current, result, log_error_context
)
else:
(
task_final_state,
simcore_platform_status,
task_errors,
task_completed,
) = await self._handle_task_error(
task.current, result, log_error_context
)

if task_completed:
# resource tracking
await publish_service_resource_tracking_stopped(
self.rabbitmq_client,
ServiceRunID.get_resource_tracking_run_id_for_computational(
user_id, project_id, node_id, iteration
),
simcore_platform_status=simcore_platform_status,
)
# instrumentation
await publish_service_stopped_metrics(
self.rabbitmq_client,
user_id=user_id,
simcore_user_agent=run_metadata.get(
"simcore_user_agent", UNDEFINED_DEFAULT_SIMCORE_USER_AGENT_VALUE
),
task=task.current,
task_final_state=task_final_state,
)
# we need to remove any invalid files in the storage
await clean_task_output_and_log_files_if_invalid(
self.db_engine,
comp_run.user_id,
comp_run.project_uuid,
task.current.node_id,
)

await CompTasksRepository(self.db_engine).update_project_tasks_state(
task.current.project_id,
run_id,
[task.current.node_id],
task_final_state if task_completed else task.previous.state,
errors=task_errors,
optional_progress=1 if task_completed else None,
optional_stopped=arrow.utcnow().datetime if task_completed else None,
)
if task_completed:
# resource tracking
await publish_service_resource_tracking_stopped(
self.rabbitmq_client,
ServiceRunID.get_resource_tracking_run_id_for_computational(
comp_run.user_id,
comp_run.project_uuid,
task.current.node_id,
iteration,
),
simcore_platform_status=simcore_platform_status,
)
# instrumentation
await publish_service_stopped_metrics(
self.rabbitmq_client,
user_id=comp_run.user_id,
simcore_user_agent=comp_run.metadata.get(
"simcore_user_agent", UNDEFINED_DEFAULT_SIMCORE_USER_AGENT_VALUE
),
task=task.current,
task_final_state=task_final_state,
)

await CompTasksRepository(self.db_engine).update_project_tasks_state(
task.current.project_id,
comp_run.run_id,
[task.current.node_id],
task_final_state if task_completed else task.previous.state,
errors=task_errors,
optional_progress=1 if task_completed else None,
optional_stopped=arrow.utcnow().datetime if task_completed else None,
)

return task_completed, task.current.job_id
return task_completed, task.current.job_id

async def _task_progress_change_handler(
self, event: tuple[UnixTimestamp, Any]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,12 @@ async def upsert_pipeline(
**pipeline_at_db.model_dump(mode="json", by_alias=True)
)
# FIXME: This is not a nice thing. this part of the information should be kept in comp_runs.
update_exclusion_policy = set()
if not dag_graph.nodes():
update_exclusion_policy.add("dag_adjacency_list")
on_update_stmt = insert_stmt.on_conflict_do_update(
index_elements=[comp_pipeline.c.project_id],
set_=pipeline_at_db.model_dump(
mode="json",
by_alias=True,
exclude_unset=True,
exclude=update_exclusion_policy,
),
)
async with self.db_engine.begin() as conn:
Expand Down
Loading