From f7af0285a70148db61444da013b428f4e6435b5e Mon Sep 17 00:00:00 2001 From: SirGankalot <73303677+SirGankalot@users.noreply.github.com> Date: Sun, 13 Jul 2025 14:17:37 +0200 Subject: [PATCH 1/3] Informing orchestrator and moving analysis to diffrent worker Co-authored-by: ashiven Signed-off-by: SirGankalot <73303677+SirGankalot@users.noreply.github.com> --- embark/workers/tasks.py | 31 ++++++++++++++++++++++++++++--- 1 file changed, 28 insertions(+), 3 deletions(-) diff --git a/embark/workers/tasks.py b/embark/workers/tasks.py index 1bfaf305d..c124a1af0 100644 --- a/embark/workers/tasks.py +++ b/embark/workers/tasks.py @@ -549,21 +549,46 @@ def _get_head_time(repo): @shared_task -def worker_soft_reset_task(worker_id): +def worker_soft_reset_task(worker_id, reassign=True): """ - Connects via SSH to the worker and performs the soft reset + Removes the worker from the orchestrator, reassigns the analysis if needed, + connects via SSH to the worker and performs the soft reset, and re-adds the worker to the orchestrator. + :param worker_id: ID of worker to soft reset + :param reassign: If True, reassigns the analysis running on the worker to another worker """ ssh_client = None try: + # Remove the worker from the orchestrator + orchestrator = get_orchestrator() worker = Worker.objects.get(id=worker_id) + try: + orchestrator.remove_worker(worker) + except ValueError: + pass + + # Reassign the analysis running on the worker + if reassign: + reassign_analysis_id = worker.analysis_id + if reassign_analysis_id: + firmware_analysis = FirmwareAnalysis.objects.get(id=reassign_analysis_id) + firmware_file = FirmwareFile.objects.get(id=firmware_analysis.firmware.id) + new_analysis = _new_analysis_from(firmware_analysis) + submit_firmware(new_analysis, firmware_file) + + # Soft reset the worker ssh_client = worker.ssh_connect() homedir = "/root" if ssh_client.ssh_user == "root" else f"/home/{ssh_client.ssh_user}" exec_blocking_ssh(ssh_client, "sudo docker ps -aq | xargs -r sudo docker stop | xargs -r sudo docker rm || true") exec_blocking_ssh(ssh_client, f"sudo rm -rf {settings.WORKER_EMBA_LOGS}") exec_blocking_ssh(ssh_client, f"sudo rm -rf {settings.WORKER_FIRMWARE_DIR}") exec_blocking_ssh(ssh_client, f"sudo rm -rf {homedir}/emba_logs.zip*") # Also delete possible leftover tmp files - exec_blocking_ssh(ssh_client, f"sudo rm -rf {homedir}/emba_run.log") + + # Re-add the worker to the orchestrator + try: + orchestrator.add_worker(worker) + except ValueError: + pass except Worker.DoesNotExist: logger.error("Worker Soft Reset: Invalid worker id") except (paramiko.SSHException, socket.error): From 2043cdf85c08a91c752ef7df71b3707cbb93cda6 Mon Sep 17 00:00:00 2001 From: ashiven Date: Tue, 15 Jul 2025 22:42:52 +0200 Subject: [PATCH 2/3] fix soft reset call from orchestrator reset Signed-off-by: ashiven --- embark/workers/orchestrator.py | 4 ++-- embark/workers/tasks.py | 34 +++++++++++++++++++++++++++++----- 2 files changed, 31 insertions(+), 7 deletions(-) diff --git a/embark/workers/orchestrator.py b/embark/workers/orchestrator.py index c5990a4ed..c1cf21fe0 100644 --- a/embark/workers/orchestrator.py +++ b/embark/workers/orchestrator.py @@ -148,7 +148,7 @@ def _reset(self): analysis.duration = str(analysis.scan_time) analysis.save() self._remove_worker(worker) - worker_soft_reset_task.delay(worker.id) + worker_soft_reset_task.delay(worker.id, only_reset=True) for worker_ip, worker in all_workers.items(): self.free_workers[worker_ip] = worker @@ -271,7 +271,7 @@ def _add_worker(self, worker: Worker): :raises ValueError: If the worker already exists in the orchestrator """ if worker.ip_address in self.free_workers or worker.ip_address in self.busy_workers: - raise ValueError(f"Worker with IP {worker.ip_address} already exists.") + raise ValueError(f"Worker with IP {worker.ip_address} already registered in the orchestrator.") self.free_workers[worker.ip_address] = worker logger.info("Worker: %s added to orchestrator", worker.name) diff --git a/embark/workers/tasks.py b/embark/workers/tasks.py index a4b417718..354b37df4 100644 --- a/embark/workers/tasks.py +++ b/embark/workers/tasks.py @@ -99,7 +99,7 @@ def update_system_info(worker: Worker): return system_info -def _new_analysis_form(old_analysis: FirmwareAnalysis) -> FirmwareAnalysis: +def _new_analysis_from(old_analysis: FirmwareAnalysis) -> FirmwareAnalysis: """ Creates a new FirmwareAnalysis object based on the provided old_analysis. This can be used to restart a cancelled or failed analysis on a different worker @@ -202,7 +202,7 @@ def _handle_unreachable_worker(worker: Worker, force: bool = False): if reassign_analysis_id: firmware_analysis = FirmwareAnalysis.objects.get(id=reassign_analysis_id) firmware_file = FirmwareFile.objects.get(id=firmware_analysis.firmware.id) - new_analysis = _new_analysis_form(firmware_analysis) + new_analysis = _new_analysis_from(firmware_analysis) submit_firmware(new_analysis, firmware_file) except BaseException as error: logger.error("An error occurred while handling unreachable worker %s: %s", worker.name, error) @@ -589,14 +589,31 @@ def _get_head_time(repo): @shared_task -def worker_soft_reset_task(worker_id): +def worker_soft_reset_task(worker_id, only_reset=False): """ - Connects via SSH to the worker and performs the soft reset + Removes the worker from the orchestrator, reassigns the analysis if needed, + connects via SSH to the worker and performs the soft reset, and re-adds the worker to the orchestrator. + :param worker_id: ID of worker to soft reset + :param only_reset: If True, only performs the reset without reassigning the analysis or removing the worker from the orchestrator. """ ssh_client = None try: - worker = Worker.objects.get(id=worker_id) + if not only_reset: + # Remove the worker from the orchestrator + orchestrator = get_orchestrator() + worker = Worker.objects.get(id=worker_id) + orchestrator.remove_worker(worker, check=False) + + # Reassign the analysis running on the worker + reassign_analysis_id = worker.analysis_id + if reassign_analysis_id: + firmware_analysis = FirmwareAnalysis.objects.get(id=reassign_analysis_id) + firmware_file = FirmwareFile.objects.get(id=firmware_analysis.firmware.id) + new_analysis = _new_analysis_from(firmware_analysis) + submit_firmware(new_analysis, firmware_file) + + # Soft reset the worker ssh_client = worker.ssh_connect() homedir = "/root" if ssh_client.ssh_user == "root" else f"/home/{ssh_client.ssh_user}" exec_blocking_ssh(ssh_client, "sudo docker ps -aq | xargs -r sudo docker stop | xargs -r sudo docker rm || true") @@ -604,6 +621,13 @@ def worker_soft_reset_task(worker_id): exec_blocking_ssh(ssh_client, f"sudo rm -rf {settings.WORKER_FIRMWARE_DIR}") exec_blocking_ssh(ssh_client, f"sudo rm -rf {homedir}/emba_logs.zip*") # Also delete possible leftover tmp files exec_blocking_ssh(ssh_client, f"sudo rm -rf {homedir}/emba_run.log") + + if not only_reset: + # Re-add the worker to the orchestrator + try: + orchestrator.add_worker(worker) + except ValueError: + pass except Worker.DoesNotExist: logger.error("Worker Soft Reset: Invalid worker id") except (paramiko.SSHException, socket.error): From 0b1b43bcceeddef941b6c7e03bba474d59f0d2f7 Mon Sep 17 00:00:00 2001 From: ashiven Date: Tue, 15 Jul 2025 22:56:00 +0200 Subject: [PATCH 3/3] add changelog entry Signed-off-by: ashiven --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3f37a6bcf..0e7b3da46 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ - Indicate worker update availability (red dot) ([Issue](https://github.com/orgs/amosproj/projects/79/views/2?pane=issue&itemId=119737574&issue=amosproj%7Camos2025ss01-embark-orchestration-framework%7C108)) - Show HEAD hash for worker EMBA version ([Issue](https://github.com/orgs/amosproj/projects/79/views/2?pane=issue&itemId=119733872&issue=amosproj%7Camos2025ss01-embark-orchestration-framework%7C107)) - SSH key authentication ([Issue](https://github.com/orgs/amosproj/projects/79/views/2?pane=issue&itemId=118803738&issue=amosproj%7Camos2025ss01-embark-orchestration-framework%7C97)) +- Reassign analyses on worker soft reset ([Issue](https://github.com/orgs/amosproj/projects/79/views/2?pane=issue&itemId=119121939&issue=amosproj%7Camos2025ss01-embark-orchestration-framework%7C105)) ## [sprint-12](https://github.com/amosproj/amos2025ss01-embark/releases/tag/sprint-12-release) - 2025-07-09