Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .test.env
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ WORKING_DIR=tests/resources/datastores/TEST_DATASTORE_working
DATASTORE_DIR=tests/resources/datastores/TEST_DATASTORE
RSA_KEYS_DIRECTORY=tests/resources/rsa_keys
PSEUDONYM_SERVICE_URL=http://mock.pseudonym.service
JOB_SERVICE_URL=http://mock.job.service
DATASTORE_API_URL=http://mock.job.service
NUMBER_OF_WORKERS=4
SECRETS_FILE=tests/resources/secrets/secrets.json
DOCKER_HOST_NAME=localhost
COMMIT_ID=abc123
MAX_GB_ALL_WORKERS=50
MAX_GB_ALL_WORKERS=50
Original file line number Diff line number Diff line change
Expand Up @@ -6,44 +6,22 @@
from requests.adapters import HTTPAdapter
from urllib3 import Retry

from job_executor.adapter.datastore_api.models import (
Job,
JobQueryResult,
JobStatus,
MaintenanceStatus,
Operation,
)
from job_executor.config import environment
from job_executor.exception import HttpRequestError, HttpResponseError
from job_executor.model.job import Job, JobStatus, Operation
from job_executor.model.maintenance_status import MaintenanceStatus

JOB_SERVICE_URL = environment.get("JOB_SERVICE_URL")
DATASTORE_API_URL = environment.get("DATASTORE_API_URL")
DEFAULT_REQUESTS_TIMEOUT = (10, 60) # (read timeout, connect timeout)

logger = logging.getLogger()


class JobQueryResult:
queued_worker_jobs: list[Job]
built_jobs: list[Job]
queued_manager_jobs: list[Job]

def __init__(
self,
queued_worker_jobs: list[Job] = [],
built_jobs: list[Job] = [],
queued_manager_jobs: list[Job] = [],
) -> None:
self.queued_worker_jobs = queued_worker_jobs
self.built_jobs = built_jobs
self.queued_manager_jobs = queued_manager_jobs

@property
def available_jobs_count(self) -> int:
return (
len(self.queued_worker_jobs)
+ len(self.built_jobs)
+ len(self.queued_manager_jobs)
)

def queued_manager_and_built_jobs(self) -> list[Job]:
return self.queued_manager_jobs + self.built_jobs


def get_jobs(
job_status: JobStatus | None = None,
operations: list[Operation] | None = None,
Expand All @@ -57,7 +35,7 @@ def get_jobs(
if ignore_completed is not None:
query_fields.append(f"ignoreCompleted={str(ignore_completed).lower()}")

request_url = f"{JOB_SERVICE_URL}/jobs"
request_url = f"{DATASTORE_API_URL}/jobs"
if query_fields:
request_url += f"?{'&'.join(query_fields)}"

Expand All @@ -71,19 +49,19 @@ def update_job_status(
payload: dict[str, JobStatus | str] = {"status": str(new_status)}
if log is not None:
payload.update({"log": log})
execute_request("PUT", f"{JOB_SERVICE_URL}/jobs/{job_id}", json=payload)
execute_request("PUT", f"{DATASTORE_API_URL}/jobs/{job_id}", json=payload)


def update_description(job_id: str, new_description: str) -> None:
execute_request(
"PUT",
f"{JOB_SERVICE_URL}/jobs/{job_id}",
f"{DATASTORE_API_URL}/jobs/{job_id}",
json={"description": new_description},
)


def get_maintenance_status() -> MaintenanceStatus:
request_url = f"{JOB_SERVICE_URL}/maintenance-statuses/latest"
request_url = f"{DATASTORE_API_URL}/maintenance-statuses/latest"
response = execute_request("GET", request_url, True)
return MaintenanceStatus(**response.json())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@
from job_executor.model.camelcase_model import CamelModel


class MaintenanceStatus(CamelModel):
paused: bool
msg: str
timestamp: str


class JobStatus(StrEnum):
QUEUED = "queued"
INITIATED = "initiated"
Expand Down Expand Up @@ -87,3 +93,30 @@ class Job(CamelModel, use_enum_values=True):
log: list[Log] | None = []
created_at: str
created_by: UserInfo


class JobQueryResult:
queued_worker_jobs: list[Job]
built_jobs: list[Job]
queued_manager_jobs: list[Job]

def __init__(
self,
queued_worker_jobs: list[Job] = [],
built_jobs: list[Job] = [],
queued_manager_jobs: list[Job] = [],
) -> None:
self.queued_worker_jobs = queued_worker_jobs
self.built_jobs = built_jobs
self.queued_manager_jobs = queued_manager_jobs

@property
def available_jobs_count(self) -> int:
return (
len(self.queued_worker_jobs)
+ len(self.built_jobs)
+ len(self.queued_manager_jobs)
)

def queued_manager_and_built_jobs(self) -> list[Job]:
return self.queued_manager_jobs + self.built_jobs
10 changes: 5 additions & 5 deletions job_executor/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
import time
from multiprocessing import Queue

from job_executor.adapter import job_service, local_storage
from job_executor.adapter import datastore_api, local_storage
from job_executor.adapter.datastore_api.models import JobStatus
from job_executor.config import environment
from job_executor.config.log import initialize_logging_thread, setup_logging
from job_executor.domain import rollback
from job_executor.exception import StartupException
from job_executor.manager import Manager
from job_executor.model.job import JobStatus

logger = logging.getLogger()
setup_logging()
Expand All @@ -24,7 +24,7 @@ def initialize_app() -> None:


def handle_jobs(manager: Manager, logging_queue: Queue) -> None:
job_query_result = job_service.query_for_jobs()
job_query_result = datastore_api.query_for_jobs()
manager.clean_up_after_dead_workers()
if job_query_result.available_jobs_count:
logger.info(
Expand All @@ -40,7 +40,7 @@ def handle_jobs(manager: Manager, logging_queue: Queue) -> None:
)
if job_size == 0:
logger.error(f"{job.job_id} Failed to get the size of the dataset.")
job_service.update_job_status(
datastore_api.update_job_status(
job.job_id,
JobStatus.FAILED,
log="No such dataset available for import",
Expand All @@ -50,7 +50,7 @@ def handle_jobs(manager: Manager, logging_queue: Queue) -> None:
logger.warning(
f"{job.job_id} Exceeded the maximum size for all workers."
)
job_service.update_job_status(
datastore_api.update_job_status(
job.job_id,
JobStatus.FAILED,
log="Dataset too large for import",
Expand Down
2 changes: 1 addition & 1 deletion job_executor/config/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ def _initialize_environment() -> dict:
"DATASTORE_DIR": os.environ["DATASTORE_DIR"],
"RSA_KEYS_DIRECTORY": os.environ["RSA_KEYS_DIRECTORY"],
"PSEUDONYM_SERVICE_URL": os.environ["PSEUDONYM_SERVICE_URL"],
"JOB_SERVICE_URL": os.environ["JOB_SERVICE_URL"],
"DATASTORE_API_URL": os.environ["DATASTORE_API_URL"],
"NUMBER_OF_WORKERS": int(os.environ["NUMBER_OF_WORKERS"]),
"SECRETS_FILE": os.environ["SECRETS_FILE"],
"DOCKER_HOST_NAME": os.environ["DOCKER_HOST_NAME"],
Expand Down
Loading