Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
File renamed without changes.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM ghcr.io/astral-sh/uv:bookworm-slim as builder
FROM ghcr.io/astral-sh/uv:trixie-slim as builder

WORKDIR /app
COPY pyproject.toml uv.lock /app/
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ uv run pytest
### Build docker image

```
docker build --tag job_executor .
docker buildx build --tag job-executor:test-local .
```

## Built with
Expand Down
23 changes: 22 additions & 1 deletion job_executor/adapter/datastore_api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@
Operation,
)
from job_executor.common.exceptions import HttpRequestError, HttpResponseError
from job_executor.config import environment
from job_executor.config import environment, secrets

DATASTORE_API_URL = environment.datastore_api_url
DEFAULT_REQUESTS_TIMEOUT = (10, 60) # (read timeout, connect timeout)
DATASTORE_API_SERVICE_KEY = secrets.datastore_api_service_key

logger = logging.getLogger()

Expand Down Expand Up @@ -117,6 +118,25 @@ def get_datastore_directory(rdn: str) -> Path:
return Path(DatastoreResponse.model_validate(response.json()).directory)


def post_public_key(datastore_rdn: str, public_key_pem: bytes) -> None:
"""
Post the public RSA key to the datastore-api.

:param datastore_rdn: The RDN of the datastore
:param public_key_pem: The public key in PEM format as bytes
"""
request_url = f"{DATASTORE_API_URL}/datastores/{datastore_rdn}/public-key"
execute_request(
"POST",
request_url,
data=public_key_pem,
headers={
"Content-Type": "application/x-pem-file",
"X-API-Key": DATASTORE_API_SERVICE_KEY,
},
)


def query_for_jobs() -> JobQueryResult:
"""
Retrieves different types of jobs based on the system's state
Expand Down Expand Up @@ -147,6 +167,7 @@ def query_for_jobs() -> JobQueryResult:
Operation.REMOVE,
Operation.ROLLBACK_REMOVE,
Operation.DELETE_ARCHIVE,
Operation.GENERATE_RSA_KEYS,
],
),
queued_worker_jobs=get_jobs(
Expand Down
1 change: 1 addition & 0 deletions job_executor/adapter/datastore_api/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class Operation(StrEnum):
REMOVE = "REMOVE"
ROLLBACK_REMOVE = "ROLLBACK_REMOVE"
DELETE_ARCHIVE = "DELETE_ARCHIVE"
GENERATE_RSA_KEYS = "GENERATE_RSA_KEYS"


class ReleaseStatus(StrEnum):
Expand Down
8 changes: 7 additions & 1 deletion job_executor/adapter/fs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@

from job_executor.adapter.fs.datastore_files import DatastoreDirectory
from job_executor.adapter.fs.input_files import InputDirectory
from job_executor.adapter.fs.private_keys_directory import PrivateKeysDirectory
from job_executor.adapter.fs.working_files import WorkingDirectory
from job_executor.config import environment


class FileSystemAdapter(Protocol):
Expand All @@ -21,13 +23,17 @@ class LocalStorageAdapter:
datastore_dir: DatastoreDirectory
working_dir: WorkingDirectory
input_dir: InputDirectory
private_keys_dir: PrivateKeysDirectory

def __init__(self, datastore_dir_path: Path) -> None:
def __init__(self, datastore_dir_path: Path, datastore_rdn: str) -> None:
self.datastore_dir = DatastoreDirectory(datastore_dir_path)
self.working_dir = WorkingDirectory(
Path(f"{datastore_dir_path}_working")
)
self.input_dir = InputDirectory(Path(f"{datastore_dir_path}_input"))
self.private_keys_dir = PrivateKeysDirectory(
Path(environment.private_keys_dir) / datastore_rdn
)

def move_working_dir_parquet_to_datastore(self, dataset_name: str) -> None:
"""
Expand Down
2 changes: 0 additions & 2 deletions job_executor/adapter/fs/datastore_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

class DatastoreDirectory:
root_dir: Path
vault_dir: Path
data_dir: Path
metadata_dir: Path
draft_metadata_all_path: Path
Expand All @@ -31,7 +30,6 @@ def __init__(self, root_dir: Path) -> None:
self.root_dir = root_dir
self.data_dir = root_dir / "data"
self.metadata_dir = root_dir / "datastore"
self.vault_dir = root_dir / "vault"
self.draft_version_path = self.metadata_dir / "draft_version.json"
self.archive_dir = self.root_dir / "archive"
self.draft_metadata_all_path = (
Expand Down
27 changes: 27 additions & 0 deletions job_executor/adapter/fs/private_keys_directory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import os
from dataclasses import dataclass
from pathlib import Path


@dataclass
class PrivateKeysDirectory:
path_with_rdn: Path

def create(self) -> bool:
if not self.path_with_rdn.exists():
os.makedirs(self.path_with_rdn)
return True
return False

def save_private_key(self, microdata_private_key_pem: bytes) -> None:
with open(self._get_private_key_location(), "wb") as file:
file.write(microdata_private_key_pem)

def clean_up(self) -> bool:
if self._get_private_key_location().exists():
os.remove(self._get_private_key_location())
return True
return False

def _get_private_key_location(self) -> Path:
return self.path_with_rdn / "microdata_private_key.pem"
2 changes: 1 addition & 1 deletion job_executor/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def initialize_app() -> Manager:
rollback.fix_interrupted_jobs()
for rdn in datastore_api.get_datastores():
local_storage = LocalStorageAdapter(
datastore_api.get_datastore_directory(rdn)
datastore_api.get_datastore_directory(rdn), rdn
)
if local_storage.datastore_dir.temporary_backup_exists():
raise StartupException(f"tmp directory exists for {rdn}")
Expand Down
6 changes: 5 additions & 1 deletion job_executor/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class Environment:
docker_host_name: str
commit_id: str
max_gb_all_workers: int
private_keys_dir: str


def _initialize_environment() -> Environment:
Expand All @@ -27,6 +28,7 @@ def _initialize_environment() -> Environment:
docker_host_name=os.environ["DOCKER_HOST_NAME"],
commit_id=os.environ["COMMIT_ID"],
max_gb_all_workers=int(os.environ["MAX_GB_ALL_WORKERS"]),
private_keys_dir=os.environ["PRIVATE_KEYS_DIR"],
)


Expand All @@ -36,13 +38,15 @@ def _initialize_environment() -> Environment:
@dataclass
class Secrets:
pseudonym_service_api_key: str
datastore_api_service_key: str


def _initialize_secrets() -> Secrets:
with open(environment.secrets_file, encoding="utf-8") as f:
secrets_file = json.load(f)
return Secrets(
pseudonym_service_api_key=secrets_file["PSEUDONYM_SERVICE_API_KEY"]
pseudonym_service_api_key=secrets_file["PSEUDONYM_SERVICE_API_KEY"],
datastore_api_service_key=secrets_file["DATASTORE_API_SERVICE_KEY"],
)


Expand Down
49 changes: 49 additions & 0 deletions job_executor/domain/datastores.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
rollback_bump,
rollback_manager_phase_import_job,
)
from job_executor.domain.rsa_keys import generate_rsa_key_pair

logger = logging.getLogger()

Expand Down Expand Up @@ -586,3 +587,51 @@ def delete_archived_input(job_context: JobContext) -> None:
logger.error(f"{job_id}: An unexpected error occured")
logger.exception(f"{job_id}: {str(e)}", exc_info=e)
datastore_api.update_job_status(job_id, JobStatus.FAILED)


def generate_rsa_keys(
job_context: JobContext,
) -> None:
"""
Generate RSA key pair for a datastore.
Stores the private key in /private_keys/{rdn}/microdata_private_key.pem
and posts the public key to the datastore-api.
"""
job_id = job_context.job.job_id
datastore_rdn = job_context.job.datastore_rdn
private_keys_dir = job_context.local_storage.private_keys_dir
try:
logger.info(f"{job_id}: initiated")
datastore_api.update_job_status(job_id, JobStatus.INITIATED)

logger.info(
f"{job_id}: Checking private keys directory at "
f"{private_keys_dir.path_with_rdn}"
)
if private_keys_dir.create():
logger.info(f"{job_id}: Private keys directory created")

logger.info(f"{job_id}: Generating RSA key pair")
private_key_pem, public_key_pem = generate_rsa_key_pair()

private_keys_dir.save_private_key(private_key_pem)
logger.info(f"{job_id}: Saved private key")

try:
logger.info(f"{job_id}: Posting public key to datastore-api")
datastore_api.post_public_key(datastore_rdn, public_key_pem)
except Exception as post_error:
logger.error(
f"{job_id}: Failed to post public key to datastore-api, "
"cleaning up saved private key"
)
if private_keys_dir.clean_up():
logger.info(f"{job_id}: Deleted private key due to error")
raise post_error

logger.info(f"{job_id}: completed")
datastore_api.update_job_status(job_id, JobStatus.COMPLETED)
except Exception as e:
logger.error(f"{job_id}: Failed to generate RSA keys")
logger.exception(f"{job_id}: {str(e)}", exc_info=e)
datastore_api.update_job_status(job_id, JobStatus.FAILED, str(e))
2 changes: 2 additions & 0 deletions job_executor/domain/manager/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ def _handle_manager_job(self, job_context: JobContext) -> None:
datastores.delete_draft(job_context)
elif operation == Operation.DELETE_ARCHIVE:
datastores.delete_archived_input(job_context)
elif operation == Operation.GENERATE_RSA_KEYS:
datastores.generate_rsa_keys(job_context)
else:
datastore_api.update_job_status(
job_context.job.job_id,
Expand Down
3 changes: 2 additions & 1 deletion job_executor/domain/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ class JobContext:

def build_job_context(job: Job, handler: handler_type) -> JobContext:
local_storage = LocalStorageAdapter(
datastore_api.get_datastore_directory(job.datastore_rdn)
datastore_api.get_datastore_directory(job.datastore_rdn),
job.datastore_rdn,
)
job_size = (
local_storage.input_dir.get_importable_tar_size_in_bytes(
Expand Down
32 changes: 29 additions & 3 deletions job_executor/domain/rollback.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,16 @@
RollbackException,
StartupException,
)
from job_executor.config import environment

logger = logging.getLogger()


def rollback_bump(job: Job, bump_manifesto: DatastoreVersion) -> None:
job_id = job.job_id
local_storage = LocalStorageAdapter(
datastore_api.get_datastore_directory(job.datastore_rdn)
datastore_api.get_datastore_directory(job.datastore_rdn),
job.datastore_rdn,
)
try:
logger.info(f"{job_id}: Restoring files from temporary backup")
Expand Down Expand Up @@ -137,7 +139,8 @@ def rollback_worker_phase_import_job(
) -> None:
job_id = job.job_id
local_storage = LocalStorageAdapter(
datastore_api.get_datastore_directory(job.datastore_rdn)
datastore_api.get_datastore_directory(job.datastore_rdn),
job.datastore_rdn,
)
logger.warning(
f"{job_id}: Rolling back worker job "
Expand Down Expand Up @@ -193,7 +196,8 @@ def rollback_manager_phase_import_job(
"""
job_id = job.job_id
local_storage = LocalStorageAdapter(
datastore_api.get_datastore_directory(job.datastore_rdn)
datastore_api.get_datastore_directory(job.datastore_rdn),
job.datastore_rdn,
)
logger.warning(
f"{job_id}: Rolling back import job "
Expand Down Expand Up @@ -290,9 +294,31 @@ def fix_interrupted_job(job: Job) -> None:
JobStatus.FAILED,
"Bump operation was interrupted and rolled back.",
)
elif job_operation == "GENERATE_RSA_KEYS":
rollback_generate_rsa_keys_job(job)
logger.info(
f'{job.job_id}: Setting status to "failed" for interrupted job'
)
datastore_api.update_job_status(
job.job_id,
JobStatus.FAILED,
"Job was failed due to an unexpected interruption",
)
else:
log_message = (
f"Unrecognized job operation {job_operation} for job {job.job_id}"
)
logger.error(log_message)
raise RollbackException(log_message)


def rollback_generate_rsa_keys_job(job: Job) -> None:
rdn = job.datastore_rdn
target_dir = Path(environment.private_keys_dir) / rdn
private_key_location = target_dir / "microdata_private_key.pem"
if private_key_location.exists():
os.remove(private_key_location)
logger.info(
f"{job.job_id}: Rollback: deleted private key at "
f"{private_key_location}"
)
27 changes: 27 additions & 0 deletions job_executor/domain/rsa_keys.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import logging

from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa

logger = logging.getLogger()


def generate_rsa_key_pair() -> tuple[bytes, bytes]:
private_key = rsa.generate_private_key(
public_exponent=65537, key_size=2048, backend=default_backend()
)
public_key = private_key.public_key()

private_key_pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption(),
)

public_key_pem = public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
)

return private_key_pem, public_key_pem
5 changes: 4 additions & 1 deletion job_executor/domain/worker/build_dataset_worker.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import logging
import os
from multiprocessing import Queue
from pathlib import Path
from time import perf_counter

from job_executor.adapter import datastore_api
from job_executor.adapter.datastore_api.models import JobStatus
from job_executor.adapter.fs import LocalStorageAdapter
from job_executor.common.exceptions import BuilderStepError, HttpResponseError
from job_executor.config import environment
from job_executor.config.log import configure_worker_logger
from job_executor.domain.models import JobContext
from job_executor.domain.worker.steps import (
Expand Down Expand Up @@ -48,6 +50,7 @@ def run_worker(job_context: JobContext, logging_queue: Queue) -> None:
job_id = job_context.job.job_id
local_storage = job_context.local_storage
dataset_name = job_context.job.parameters.target
datastore_rdn = job_context.job.datastore_rdn
try:
configure_worker_logger(logging_queue, job_id)
logger.info(
Expand All @@ -60,7 +63,7 @@ def run_worker(job_context: JobContext, logging_queue: Queue) -> None:
dataset_name,
local_storage.input_dir.path,
local_storage.working_dir.path,
local_storage.datastore_dir.vault_dir,
Path(environment.private_keys_dir) / datastore_rdn,
)
datastore_api.update_job_status(job_id, JobStatus.VALIDATING)
(data_file_name, _) = dataset_validator.run_for_dataset(
Expand Down
Loading