diff --git a/job_executor/app.py b/job_executor/app.py index cbb20ec4..35de05b9 100644 --- a/job_executor/app.py +++ b/job_executor/app.py @@ -1,9 +1,11 @@ import logging import time from multiprocessing import Queue +from pathlib import Path -from job_executor.adapter import datastore_api, local_storage +from job_executor.adapter import datastore_api from job_executor.adapter.datastore_api.models import JobStatus +from job_executor.adapter.fs import LocalStorageAdapter from job_executor.common.exceptions import StartupException from job_executor.config import environment from job_executor.config.log import initialize_logging_thread, setup_logging @@ -16,8 +18,9 @@ def initialize_app() -> None: try: + local_storage = LocalStorageAdapter(Path(environment.datastore_dir)) rollback.fix_interrupted_jobs() - if local_storage.temporary_backup_exists(): + if local_storage.datastore_dir.temporary_backup_exists(): raise StartupException("tmp directory exists") except Exception as e: raise StartupException("Exception when initializing") from e @@ -26,6 +29,7 @@ def initialize_app() -> None: def handle_jobs(manager: Manager, logging_queue: Queue) -> None: job_query_result = datastore_api.query_for_jobs() manager.clean_up_after_dead_workers() + local_storage = LocalStorageAdapter(Path(environment.datastore_dir)) if job_query_result.available_jobs_count: logger.info( f"Found {len(job_query_result.queued_worker_jobs)}" @@ -35,7 +39,7 @@ def handle_jobs(manager: Manager, logging_queue: Queue) -> None: ) for job in job_query_result.queued_worker_jobs: - job_size = local_storage.get_input_tar_size_in_bytes( + job_size = local_storage.input_dir.get_importable_tar_size_in_bytes( job.parameters.target ) if job_size == 0: