Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
- Indicate worker update availability (red dot) ([Issue](https://github.com/orgs/amosproj/projects/79/views/2?pane=issue&itemId=119737574&issue=amosproj%7Camos2025ss01-embark-orchestration-framework%7C108))
- Show HEAD hash for worker EMBA version ([Issue](https://github.com/orgs/amosproj/projects/79/views/2?pane=issue&itemId=119733872&issue=amosproj%7Camos2025ss01-embark-orchestration-framework%7C107))
- SSH key authentication ([Issue](https://github.com/orgs/amosproj/projects/79/views/2?pane=issue&itemId=118803738&issue=amosproj%7Camos2025ss01-embark-orchestration-framework%7C97))
- Reassign analyses on worker soft reset ([Issue](https://github.com/orgs/amosproj/projects/79/views/2?pane=issue&itemId=119121939&issue=amosproj%7Camos2025ss01-embark-orchestration-framework%7C105))

## [sprint-12](https://github.com/amosproj/amos2025ss01-embark/releases/tag/sprint-12-release) - 2025-07-09

Expand Down
4 changes: 2 additions & 2 deletions embark/workers/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def _reset(self):
analysis.duration = str(analysis.scan_time)
analysis.save()
self._remove_worker(worker)
worker_soft_reset_task.delay(worker.id)
worker_soft_reset_task.delay(worker.id, only_reset=True)

for worker_ip, worker in all_workers.items():
self.free_workers[worker_ip] = worker
Expand Down Expand Up @@ -271,7 +271,7 @@ def _add_worker(self, worker: Worker):
:raises ValueError: If the worker already exists in the orchestrator
"""
if worker.ip_address in self.free_workers or worker.ip_address in self.busy_workers:
raise ValueError(f"Worker with IP {worker.ip_address} already exists.")
raise ValueError(f"Worker with IP {worker.ip_address} already registered in the orchestrator.")
self.free_workers[worker.ip_address] = worker

logger.info("Worker: %s added to orchestrator", worker.name)
Expand Down
37 changes: 32 additions & 5 deletions embark/workers/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def update_system_info(worker: Worker):
return system_info


def _new_analysis_form(old_analysis: FirmwareAnalysis) -> FirmwareAnalysis:
def _new_analysis_from(old_analysis: FirmwareAnalysis) -> FirmwareAnalysis:
"""
Creates a new FirmwareAnalysis object based on the provided old_analysis.
This can be used to restart a cancelled or failed analysis on a different worker
Expand Down Expand Up @@ -202,7 +202,7 @@ def _handle_unreachable_worker(worker: Worker, force: bool = False):
if reassign_analysis_id:
firmware_analysis = FirmwareAnalysis.objects.get(id=reassign_analysis_id)
firmware_file = FirmwareFile.objects.get(id=firmware_analysis.firmware.id)
new_analysis = _new_analysis_form(firmware_analysis)
new_analysis = _new_analysis_from(firmware_analysis)
submit_firmware(new_analysis, firmware_file)
except BaseException as error:
logger.error("An error occurred while handling unreachable worker %s: %s", worker.name, error)
Expand Down Expand Up @@ -589,21 +589,48 @@ def _get_head_time(repo):


@shared_task
def worker_soft_reset_task(worker_id):
def worker_soft_reset_task(worker_id, only_reset=False):
"""
Connects via SSH to the worker and performs the soft reset
Removes the worker from the orchestrator, reassigns the analysis if needed,
connects via SSH to the worker and performs the soft reset, and re-adds the worker to the orchestrator.

Removes the worker from the orchestrator, reassigns the analysis if needed,
connects via SSH to the worker and performs the soft reset, and re-adds the worker to the orchestrator.

:param worker_id: ID of worker to soft reset
:param only_reset: If True, only performs the reset without reassigning the analysis or removing the worker from the orchestrator.
"""
ssh_client = None
try:
worker = Worker.objects.get(id=worker_id)
if not only_reset:
# Remove the worker from the orchestrator
orchestrator = get_orchestrator()
worker = Worker.objects.get(id=worker_id)
orchestrator.remove_worker(worker, check=False)

# Reassign the analysis running on the worker
reassign_analysis_id = worker.analysis_id
if reassign_analysis_id:
firmware_analysis = FirmwareAnalysis.objects.get(id=reassign_analysis_id)
firmware_file = FirmwareFile.objects.get(id=firmware_analysis.firmware.id)
new_analysis = _new_analysis_from(firmware_analysis)
submit_firmware(new_analysis, firmware_file)

# Soft reset the worker
ssh_client = worker.ssh_connect()
homedir = "/root" if ssh_client.ssh_user == "root" else f"/home/{ssh_client.ssh_user}"
exec_blocking_ssh(ssh_client, "sudo docker ps -aq | xargs -r sudo docker stop | xargs -r sudo docker rm || true")
exec_blocking_ssh(ssh_client, f"sudo rm -rf {settings.WORKER_EMBA_LOGS}")
exec_blocking_ssh(ssh_client, f"sudo rm -rf {settings.WORKER_FIRMWARE_DIR}")
exec_blocking_ssh(ssh_client, f"sudo rm -rf {homedir}/emba_logs.zip*") # Also delete possible leftover tmp files
exec_blocking_ssh(ssh_client, f"sudo rm -rf {homedir}/emba_run.log")

if not only_reset:
# Re-add the worker to the orchestrator
try:
orchestrator.add_worker(worker)
except ValueError:
pass
except Worker.DoesNotExist:
logger.error("Worker Soft Reset: Invalid worker id")
except (paramiko.SSHException, socket.error):
Expand Down
Loading