diff --git a/CHANGELOG.md b/CHANGELOG.md index 3f37a6bc..0e7b3da4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ - Indicate worker update availability (red dot) ([Issue](https://github.com/orgs/amosproj/projects/79/views/2?pane=issue&itemId=119737574&issue=amosproj%7Camos2025ss01-embark-orchestration-framework%7C108)) - Show HEAD hash for worker EMBA version ([Issue](https://github.com/orgs/amosproj/projects/79/views/2?pane=issue&itemId=119733872&issue=amosproj%7Camos2025ss01-embark-orchestration-framework%7C107)) - SSH key authentication ([Issue](https://github.com/orgs/amosproj/projects/79/views/2?pane=issue&itemId=118803738&issue=amosproj%7Camos2025ss01-embark-orchestration-framework%7C97)) +- Reassign analyses on worker soft reset ([Issue](https://github.com/orgs/amosproj/projects/79/views/2?pane=issue&itemId=119121939&issue=amosproj%7Camos2025ss01-embark-orchestration-framework%7C105)) ## [sprint-12](https://github.com/amosproj/amos2025ss01-embark/releases/tag/sprint-12-release) - 2025-07-09 diff --git a/embark/workers/orchestrator.py b/embark/workers/orchestrator.py index c5990a4e..c1cf21fe 100644 --- a/embark/workers/orchestrator.py +++ b/embark/workers/orchestrator.py @@ -148,7 +148,7 @@ def _reset(self): analysis.duration = str(analysis.scan_time) analysis.save() self._remove_worker(worker) - worker_soft_reset_task.delay(worker.id) + worker_soft_reset_task.delay(worker.id, only_reset=True) for worker_ip, worker in all_workers.items(): self.free_workers[worker_ip] = worker @@ -271,7 +271,7 @@ def _add_worker(self, worker: Worker): :raises ValueError: If the worker already exists in the orchestrator """ if worker.ip_address in self.free_workers or worker.ip_address in self.busy_workers: - raise ValueError(f"Worker with IP {worker.ip_address} already exists.") + raise ValueError(f"Worker with IP {worker.ip_address} already registered in the orchestrator.") self.free_workers[worker.ip_address] = worker logger.info("Worker: %s added to orchestrator", worker.name) diff --git a/embark/workers/tasks.py b/embark/workers/tasks.py index a4b41771..4102058c 100644 --- a/embark/workers/tasks.py +++ b/embark/workers/tasks.py @@ -99,7 +99,7 @@ def update_system_info(worker: Worker): return system_info -def _new_analysis_form(old_analysis: FirmwareAnalysis) -> FirmwareAnalysis: +def _new_analysis_from(old_analysis: FirmwareAnalysis) -> FirmwareAnalysis: """ Creates a new FirmwareAnalysis object based on the provided old_analysis. This can be used to restart a cancelled or failed analysis on a different worker @@ -202,7 +202,7 @@ def _handle_unreachable_worker(worker: Worker, force: bool = False): if reassign_analysis_id: firmware_analysis = FirmwareAnalysis.objects.get(id=reassign_analysis_id) firmware_file = FirmwareFile.objects.get(id=firmware_analysis.firmware.id) - new_analysis = _new_analysis_form(firmware_analysis) + new_analysis = _new_analysis_from(firmware_analysis) submit_firmware(new_analysis, firmware_file) except BaseException as error: logger.error("An error occurred while handling unreachable worker %s: %s", worker.name, error) @@ -589,14 +589,34 @@ def _get_head_time(repo): @shared_task -def worker_soft_reset_task(worker_id): +def worker_soft_reset_task(worker_id, only_reset=False): """ - Connects via SSH to the worker and performs the soft reset + Removes the worker from the orchestrator, reassigns the analysis if needed, + connects via SSH to the worker and performs the soft reset, and re-adds the worker to the orchestrator. + + Removes the worker from the orchestrator, reassigns the analysis if needed, + connects via SSH to the worker and performs the soft reset, and re-adds the worker to the orchestrator. + :param worker_id: ID of worker to soft reset + :param only_reset: If True, only performs the reset without reassigning the analysis or removing the worker from the orchestrator. """ ssh_client = None try: - worker = Worker.objects.get(id=worker_id) + if not only_reset: + # Remove the worker from the orchestrator + orchestrator = get_orchestrator() + worker = Worker.objects.get(id=worker_id) + orchestrator.remove_worker(worker, check=False) + + # Reassign the analysis running on the worker + reassign_analysis_id = worker.analysis_id + if reassign_analysis_id: + firmware_analysis = FirmwareAnalysis.objects.get(id=reassign_analysis_id) + firmware_file = FirmwareFile.objects.get(id=firmware_analysis.firmware.id) + new_analysis = _new_analysis_from(firmware_analysis) + submit_firmware(new_analysis, firmware_file) + + # Soft reset the worker ssh_client = worker.ssh_connect() homedir = "/root" if ssh_client.ssh_user == "root" else f"/home/{ssh_client.ssh_user}" exec_blocking_ssh(ssh_client, "sudo docker ps -aq | xargs -r sudo docker stop | xargs -r sudo docker rm || true") @@ -604,6 +624,13 @@ def worker_soft_reset_task(worker_id): exec_blocking_ssh(ssh_client, f"sudo rm -rf {settings.WORKER_FIRMWARE_DIR}") exec_blocking_ssh(ssh_client, f"sudo rm -rf {homedir}/emba_logs.zip*") # Also delete possible leftover tmp files exec_blocking_ssh(ssh_client, f"sudo rm -rf {homedir}/emba_run.log") + + if not only_reset: + # Re-add the worker to the orchestrator + try: + orchestrator.add_worker(worker) + except ValueError: + pass except Worker.DoesNotExist: logger.error("Worker Soft Reset: Invalid worker id") except (paramiko.SSHException, socket.error):