Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
8e020c0
refactor worker unreachable check
ashiven Jul 7, 2025
b49bd9d
reassign analysis of unreachable worker
ashiven Jul 7, 2025
63f23d9
fix update_system_info exception on unreachable - an attributeerror i…
ashiven Jul 7, 2025
2f1aec0
move reachable update into update_sytem_info
ashiven Jul 7, 2025
aa5f648
fix linter error
ashiven Jul 7, 2025
334a164
add missing attributeerror exceptions
ashiven Jul 7, 2025
385d06b
refactor error messages
ashiven Jul 7, 2025
2f50630
fix linter error and add changelog entry
ashiven Jul 7, 2025
52c8598
Merge branch 'main' into Issue-#85
ashiven Jul 7, 2025
531a1fd
refactor ssh_connect to correctly raise a paramiko.SSHException when …
ashiven Jul 7, 2025
6755b12
fix caught exceptions
ashiven Jul 7, 2025
50af69e
fix unused import
ashiven Jul 7, 2025
cadef8f
Merge branch 'main' into Issue-#85
ashiven Jul 7, 2025
519e04a
create new analysis from cancelled analysis
ashiven Jul 9, 2025
93d6351
implement handle reconnected worker
ashiven Jul 9, 2025
f26a2e7
Merge branch 'main' into Issue-#85
ashiven Jul 9, 2025
975e40f
add note for update system info
ashiven Jul 9, 2025
185b2b8
Merge branch 'Issue-#85' of https://github.com/amosproj/amos2025ss01-…
ashiven Jul 9, 2025
8ee6cb1
add orchestrator info endpoint for testing
ashiven Jul 12, 2025
df7754a
remove orchestrator info
ashiven Jul 13, 2025
f8e4097
Merge branch 'main' into Issue-#85
ashiven Jul 14, 2025
4b41412
add some convenience methods for interacting with the update queue of…
ashiven Jul 14, 2025
dec9914
fix incorrect client in stop remote analysis and add convenience meth…
ashiven Jul 14, 2025
c170dac
add another missing source to deps_host.sh and add todo for bug in de…
ashiven Jul 14, 2025
581b9c2
reset dependency used_by and worker update queues om application start
ashiven Jul 14, 2025
f11ab76
fix incorrect worker update reset
ashiven Jul 14, 2025
dca8570
fix function description
ashiven Jul 14, 2025
0e47575
delete blank lines
ashiven Jul 14, 2025
f4125e0
fix analysis reassignment and increase celery log level to info (too …
ashiven Jul 14, 2025
2c1dc3f
fix: Fix multiple concurrency errors
ClProsser Jul 15, 2025
4983a48
fix: Fix logreader logspam
ClProsser Jul 15, 2025
431e26e
Merge branch 'main' into Issue-#85
ClProsser Jul 15, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
- Trigger orchestrator at critical moments ([Issue](https://github.com/orgs/amosproj/projects/79/views/2?pane=issue&itemId=114047181&issue=amosproj%7Camos2025ss01-embark-orchestration-framework%7C59))
- Use celery for IP range scanning and update config UI ([Issue](https://github.com/orgs/amosproj/projects/79/views/2?pane=issue&itemId=118038092&issue=amosproj%7Camos2025ss01-embark-orchestration-framework%7C91))
- `process_update_queue` call once the analysis is finished ([Issue](https://github.com/orgs/amosproj/projects/79/views/2?pane=issue&itemId=114650622&issue=amosproj%7Camos2025ss01-embark-orchestration-framework%7C70))
- Handle unresponsive worker nodes ([Issue](https://github.com/orgs/amosproj/projects/79/views/2?pane=issue&itemId=115550958&issue=amosproj%7Camos2025ss01-embark-orchestration-framework%7C85))
- Wiki entries for API ([Issue](https://github.com/orgs/amosproj/projects/79/views/2?pane=issue&itemId=114744149&issue=amosproj%7Camos2025ss01-embark-orchestration-framework%7C71))
- Indicate currently installed dependency versions ([Issue](https://github.com/orgs/amosproj/projects/79/views/2?pane=issue&itemId=118051293&issue=amosproj%7Camos2025ss01-embark-orchestration-framework%7C94))

Expand Down
2 changes: 1 addition & 1 deletion dev-tools/debug-server-start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ python3 ./manage.py runapscheduler | tee -a ../logs/scheduler.log &
# systemctl start embark.service

# Start celery worker
celery -A embark worker --beat --scheduler django -l DEBUG --logfile=../logs/celery.log &
celery -A embark worker --beat --scheduler django -l INFO --logfile=../logs/celery.log &
CELERY_PID=$!
trap 'kill ${CELERY_PID} 2>/dev/null; exit' SIGINT SIGTERM EXIT

Expand Down
11 changes: 8 additions & 3 deletions embark/embark/logreader.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,9 +295,14 @@ def read_loop(self):
pass

while not self.finish:
emba_log_path = f"{self.analysis.path_to_logs}/emba.log"
if not pathlib.Path(emba_log_path).exists():
time.sleep(5)
continue

# look for new events in log
logger.debug("looking for events in %s", f"{self.analysis.path_to_logs}/emba.log")
got_event = self.inotify_events(f"{self.analysis.path_to_logs}/emba.log")
logger.debug("looking for events in %s", emba_log_path)
got_event = self.inotify_events(emba_log_path)

for eve in got_event:
for flag in flags.from_mask(eve.mask):
Expand All @@ -307,7 +312,7 @@ def read_loop(self):
# Act on file change
elif flag is flags.MODIFY:
# get the actual difference
tmp = self.get_diff(f"{self.analysis.path_to_logs}/emba.log")
tmp = self.get_diff(emba_log_path)
logger.debug("Got diff-output: %s", tmp)
# send changes to frontend
self.input_processing(tmp)
Expand Down
1 change: 1 addition & 0 deletions embark/embark/settings/deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@
WORKER_FIRMWARE_DIR = "/root/firmware/"
WORKER_EMBA_LOGS = "/root/emba_logs/"
WORKER_UPDATE_QUEUE_SIZE = 50
WORKER_REACHABLE_TIMEOUT = 10

# Celery task queue
CELERY_BROKER_URL = f"redis://{REDIS_HOST}:{REDIS_PORT}/0"
Expand Down
1 change: 1 addition & 0 deletions embark/embark/settings/dev.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@
WORKER_FIRMWARE_DIR = "/root/firmware/"
WORKER_EMBA_LOGS = "/root/emba_logs/"
WORKER_UPDATE_QUEUE_SIZE = 50
WORKER_REACHABLE_TIMEOUT = 10

# Celery task queue
CELERY_BROKER_URL = f"redis://{REDIS_HOST}:{REDIS_PORT}/0"
Expand Down
25 changes: 25 additions & 0 deletions embark/workers/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,28 @@ def reset_orchestrator(**kwargs):
orchestrator = get_orchestrator()
orchestrator.reset()
post_migrate.connect(reset_orchestrator)

def reset_dependency_users(**kwargs):
"""
Reset the 'used_by' field for all DependencyState instances.
An unexpected shutdown may leave this field populated with workers
that no longer use the dependency.
"""
from workers.models import DependencyState # pylint: disable=import-outside-toplevel
states = DependencyState.objects.all()
for state in states:
state.used_by.clear()
state.save()
post_migrate.connect(reset_dependency_users)

def reset_update_queues(**kwargs):
"""
Reset the update queues for all workers.
An unexpected shutdown may leave the update queues
populated with updates that are no longer relevant.
"""
from workers.models import Worker, WorkerUpdate # pylint: disable=import-outside-toplevel
workers = Worker.objects.all()
for worker in workers:
WorkerUpdate.objects.filter(worker__id=worker.id).delete()
post_migrate.connect(reset_update_queues)
9 changes: 7 additions & 2 deletions embark/workers/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class ConfigStatus(models.TextChoices): # pylint: disable=too-many-ancestors
reachable = models.BooleanField(default=False)
status = models.CharField(max_length=1, choices=ConfigStatus, default=ConfigStatus.UNCONFIGURED)
analysis_id = models.UUIDField(blank=True, null=True, help_text="ID of the analysis currently running on this worker")
last_reached = models.DateTimeField(auto_now_add=True)

dependency_version = models.OneToOneField(
WorkerDependencyVersion,
Expand All @@ -91,15 +92,16 @@ def clean(self):
except ValueError as value_error:
raise ValidationError({"configuration": f"Invalid IP range: {value_error}"}) from value_error

def ssh_connect(self):
def ssh_connect(self, timeout=30):
"""
Tries to establish an ssh connection with each configuration and returns the first successful connection
:param timeout: max ssh connect timeout
"""
ssh_client = new_autoadd_client()

for configuration in self.configurations.all():
try:
ssh_client.connect(self.ip_address, username=configuration.ssh_user, password=configuration.ssh_password)
ssh_client.connect(self.ip_address, username=configuration.ssh_user, password=configuration.ssh_password, timeout=timeout)

# save the ssh user and password so they can later be used in commands
ssh_client.ssh_user = configuration.ssh_user
Expand All @@ -108,6 +110,9 @@ def ssh_connect(self):
except (paramiko.SSHException, socket.error):
continue

if ssh_client.get_transport() is None or not ssh_client.get_transport().is_active():
raise paramiko.SSHException("Failed to connect to worker with any configuration.")

return ssh_client


Expand Down
7 changes: 5 additions & 2 deletions embark/workers/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,19 +291,22 @@ def add_worker(self, worker: Worker):
def _remove_worker(self, worker: Worker, check: bool = True):
"""
Remove a worker from the orchestrator. The worker is removed from either the free or busy workers list.
We assume that any analysis running on the worker has been completed or cancelled before calling this method.

:param worker: The worker to be removed
:param check: If True, throws an error
:raises ValueError: If the worker does not exist in the orchestrator and check = True
"""
worker.analysis_id = None
worker.save()

if worker.ip_address in self.free_workers:
del self.free_workers[worker.ip_address]
elif worker.ip_address in self.busy_workers:
del self.busy_workers[worker.ip_address]
else:
if check:
raise ValueError(f"Worker with IP {worker.ip_address} does not exist.")

raise ValueError(f"Worker with IP {worker.ip_address} not registered in the orchestrator.")
return

logger.info("Worker: %s removed from orchestrator", worker.name)
Expand Down
Loading
Loading