Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
File renamed without changes.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM ghcr.io/astral-sh/uv:bookworm-slim as builder
FROM ghcr.io/astral-sh/uv:trixie-slim as builder

WORKDIR /app
COPY pyproject.toml uv.lock /app/
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ uv run pytest
### Build docker image

```
docker build --tag job_executor .
docker buildx build --tag job-executor:test-local .
```

## Built with
Expand Down
23 changes: 22 additions & 1 deletion job_executor/adapter/datastore_api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@
Operation,
)
from job_executor.common.exceptions import HttpRequestError, HttpResponseError
from job_executor.config import environment
from job_executor.config import environment, secrets

DATASTORE_API_URL = environment.datastore_api_url
DEFAULT_REQUESTS_TIMEOUT = (10, 60) # (read timeout, connect timeout)
DATASTORE_API_SERVICE_KEY = secrets.datastore_api_service_key

logger = logging.getLogger()

Expand Down Expand Up @@ -117,6 +118,25 @@ def get_datastore_directory(rdn: str) -> Path:
return Path(DatastoreResponse.model_validate(response.json()).directory)


def post_public_key(datastore_rdn: str, public_key_pem: bytes) -> None:
"""
Post the public RSA key to the datastore-api.

:param datastore_rdn: The RDN of the datastore
:param public_key_pem: The public key in PEM format as bytes
"""
request_url = f"{DATASTORE_API_URL}/datastores/{datastore_rdn}/public-key"
execute_request(
"POST",
request_url,
data=public_key_pem,
headers={
"Content-Type": "application/x-pem-file",
"X-API-Key": DATASTORE_API_SERVICE_KEY,
},
)


def query_for_jobs() -> JobQueryResult:
"""
Retrieves different types of jobs based on the system's state
Expand Down Expand Up @@ -147,6 +167,7 @@ def query_for_jobs() -> JobQueryResult:
Operation.REMOVE,
Operation.ROLLBACK_REMOVE,
Operation.DELETE_ARCHIVE,
Operation.GENERATE_RSA_KEYS,
],
),
queued_worker_jobs=get_jobs(
Expand Down
1 change: 1 addition & 0 deletions job_executor/adapter/datastore_api/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class Operation(StrEnum):
REMOVE = "REMOVE"
ROLLBACK_REMOVE = "ROLLBACK_REMOVE"
DELETE_ARCHIVE = "DELETE_ARCHIVE"
GENERATE_RSA_KEYS = "GENERATE_RSA_KEYS"


class ReleaseStatus(StrEnum):
Expand Down
2 changes: 0 additions & 2 deletions job_executor/adapter/fs/datastore_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

class DatastoreDirectory:
root_dir: Path
vault_dir: Path
data_dir: Path
metadata_dir: Path
draft_metadata_all_path: Path
Expand All @@ -31,7 +30,6 @@ def __init__(self, root_dir: Path) -> None:
self.root_dir = root_dir
self.data_dir = root_dir / "data"
self.metadata_dir = root_dir / "datastore"
self.vault_dir = root_dir / "vault"
self.draft_version_path = self.metadata_dir / "draft_version.json"
self.archive_dir = self.root_dir / "archive"
self.draft_metadata_all_path = (
Expand Down
6 changes: 5 additions & 1 deletion job_executor/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class Environment:
docker_host_name: str
commit_id: str
max_gb_all_workers: int
private_keys_dir: str


def _initialize_environment() -> Environment:
Expand All @@ -27,6 +28,7 @@ def _initialize_environment() -> Environment:
docker_host_name=os.environ["DOCKER_HOST_NAME"],
commit_id=os.environ["COMMIT_ID"],
max_gb_all_workers=int(os.environ["MAX_GB_ALL_WORKERS"]),
private_keys_dir=os.environ["PRIVATE_KEYS_DIR"],
)


Expand All @@ -36,13 +38,15 @@ def _initialize_environment() -> Environment:
@dataclass
class Secrets:
pseudonym_service_api_key: str
datastore_api_service_key: str


def _initialize_secrets() -> Secrets:
with open(environment.secrets_file, encoding="utf-8") as f:
secrets_file = json.load(f)
return Secrets(
pseudonym_service_api_key=secrets_file["PSEUDONYM_SERVICE_API_KEY"]
pseudonym_service_api_key=secrets_file["PSEUDONYM_SERVICE_API_KEY"],
datastore_api_service_key=secrets_file["DATASTORE_API_SERVICE_KEY"],
)


Expand Down
76 changes: 76 additions & 0 deletions job_executor/domain/datastores.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
import logging
import os
from pathlib import Path

from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa

from job_executor.adapter import datastore_api
from job_executor.adapter.datastore_api.models import JobStatus
Expand All @@ -19,6 +25,7 @@
UnnecessaryUpdateException,
VersioningException,
)
from job_executor.config import environment
from job_executor.domain.models import JobContext
from job_executor.domain.rollback import (
rollback_bump,
Expand Down Expand Up @@ -586,3 +593,72 @@ def delete_archived_input(job_context: JobContext) -> None:
logger.error(f"{job_id}: An unexpected error occured")
logger.exception(f"{job_id}: {str(e)}", exc_info=e)
datastore_api.update_job_status(job_id, JobStatus.FAILED)


def generate_rsa_keys(
job_context: JobContext,
) -> None:
"""
Generate RSA key pair for a datastore.
Stores the private key in /private_keys/{rdn}/microdata_private_key.pem
and posts the public key to the datastore-api.
"""
job_id = job_context.job.job_id
datastore_rdn = job_context.job.datastore_rdn
try:
logger.info(f"{job_id}: initiated")
datastore_api.update_job_status(job_id, JobStatus.INITIATED)

target_dir = Path(environment.private_keys_dir) / datastore_rdn

if not target_dir.exists():
logger.info(
f"{job_id}: Creating private keys directory at {target_dir}"
)
os.makedirs(target_dir)

logger.info(f"{job_id}: Generating RSA key pair")
private_key = rsa.generate_private_key(
public_exponent=65537, key_size=2048, backend=default_backend()
)
public_key = private_key.public_key()

microdata_private_key_pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption(),
)

private_key_location = target_dir / "microdata_private_key.pem"
with open(private_key_location, "wb") as file:
file.write(microdata_private_key_pem)
logger.info(f"{job_id}: Saved private key to {private_key_location}")

microdata_public_key_pem = public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
)

try:
logger.info(f"{job_id}: Posting public key to datastore-api")
datastore_api.post_public_key(
datastore_rdn, microdata_public_key_pem
)
except Exception as post_error:
logger.error(
f"{job_id}: Failed to post public key to datastore-api, "
"cleaning up saved private key"
)
if private_key_location.exists():
os.remove(private_key_location)
logger.info(
f"{job_id}: Deleted private key at {private_key_location}"
)
raise post_error

logger.info(f"{job_id}: completed")
datastore_api.update_job_status(job_id, JobStatus.COMPLETED)
except Exception as e:
logger.error(f"{job_id}: Failed to generate RSA keys")
logger.exception(f"{job_id}: {str(e)}", exc_info=e)
datastore_api.update_job_status(job_id, JobStatus.FAILED, str(e))
2 changes: 2 additions & 0 deletions job_executor/domain/manager/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ def _handle_manager_job(self, job_context: JobContext) -> None:
datastores.delete_draft(job_context)
elif operation == Operation.DELETE_ARCHIVE:
datastores.delete_archived_input(job_context)
elif operation == Operation.GENERATE_RSA_KEYS:
datastores.generate_rsa_keys(job_context)
else:
datastore_api.update_job_status(
job_context.job.job_id,
Expand Down
23 changes: 23 additions & 0 deletions job_executor/domain/rollback.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
RollbackException,
StartupException,
)
from job_executor.config import environment

logger = logging.getLogger()

Expand Down Expand Up @@ -290,9 +291,31 @@ def fix_interrupted_job(job: Job) -> None:
JobStatus.FAILED,
"Bump operation was interrupted and rolled back.",
)
elif job_operation == "GENERATE_RSA_KEYS":
rollback_generate_rsa_keys_job(job)
logger.info(
f'{job.job_id}: Setting status to "failed" for interrupted job'
)
datastore_api.update_job_status(
job.job_id,
JobStatus.FAILED,
"Job was failed due to an unexpected interruption",
)
else:
log_message = (
f"Unrecognized job operation {job_operation} for job {job.job_id}"
)
logger.error(log_message)
raise RollbackException(log_message)


def rollback_generate_rsa_keys_job(job: Job) -> None:
rdn = job.datastore_rdn
target_dir = Path(environment.private_keys_dir) / rdn
private_key_location = target_dir / "microdata_private_key.pem"
if private_key_location.exists():
os.remove(private_key_location)
logger.info(
f"{job.job_id}: Rollback: deleted private key at "
f"{private_key_location}"
)
5 changes: 4 additions & 1 deletion job_executor/domain/worker/build_dataset_worker.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import logging
import os
from multiprocessing import Queue
from pathlib import Path
from time import perf_counter

from job_executor.adapter import datastore_api
from job_executor.adapter.datastore_api.models import JobStatus
from job_executor.adapter.fs import LocalStorageAdapter
from job_executor.common.exceptions import BuilderStepError, HttpResponseError
from job_executor.config import environment
from job_executor.config.log import configure_worker_logger
from job_executor.domain.models import JobContext
from job_executor.domain.worker.steps import (
Expand Down Expand Up @@ -48,6 +50,7 @@ def run_worker(job_context: JobContext, logging_queue: Queue) -> None:
job_id = job_context.job.job_id
local_storage = job_context.local_storage
dataset_name = job_context.job.parameters.target
datastore_rdn = job_context.job.datastore_rdn
try:
configure_worker_logger(logging_queue, job_id)
logger.info(
Expand All @@ -60,7 +63,7 @@ def run_worker(job_context: JobContext, logging_queue: Queue) -> None:
dataset_name,
local_storage.input_dir.path,
local_storage.working_dir.path,
local_storage.datastore_dir.vault_dir,
Path(environment.private_keys_dir) / datastore_rdn,
)
datastore_api.update_job_status(job_id, JobStatus.VALIDATING)
(data_file_name, _) = dataset_validator.run_for_dataset(
Expand Down
5 changes: 4 additions & 1 deletion job_executor/domain/worker/build_metadata_worker.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import logging
from multiprocessing import Queue
from pathlib import Path
from time import perf_counter

from job_executor.adapter import datastore_api
from job_executor.adapter.datastore_api.models import JobStatus
from job_executor.adapter.fs import LocalStorageAdapter
from job_executor.common.exceptions import BuilderStepError, HttpResponseError
from job_executor.config import environment
from job_executor.config.log import configure_worker_logger
from job_executor.domain.models import JobContext
from job_executor.domain.worker.steps import (
Expand All @@ -29,6 +31,7 @@ def run_worker(job_context: JobContext, logging_queue: Queue) -> None:
job_id = job_context.job.job_id
local_storage = job_context.local_storage
dataset_name = job_context.job.parameters.target
datastore_rdn = job_context.job.datastore_rdn
try:
configure_worker_logger(logging_queue, job_id)
logger.info(
Expand All @@ -41,7 +44,7 @@ def run_worker(job_context: JobContext, logging_queue: Queue) -> None:
dataset_name,
local_storage.input_dir.path,
local_storage.working_dir.path,
local_storage.datastore_dir.vault_dir,
Path(environment.private_keys_dir) / datastore_rdn,
)
datastore_api.update_job_status(job_id, JobStatus.VALIDATING)
dataset_validator.run_for_metadata(
Expand Down
1 change: 1 addition & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@
os.environ["DOCKER_HOST_NAME"] = "localhost"
os.environ["COMMIT_ID"] = "abc123"
os.environ["MAX_GB_ALL_WORKERS"] = "50"
os.environ["PRIVATE_KEYS_DIR"] = "tests/integration/resources/private_keys"
16 changes: 8 additions & 8 deletions tests/integration/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@
from cryptography.hazmat.primitives.asymmetric import rsa
from microdata_tools import package_dataset

PRIVATE_KEYS_DIR = Path("tests/integration/resources/private_keys")

def _create_key_pair(vault_dir: Path):
if not vault_dir.exists():
os.makedirs(vault_dir)

def _create_key_pair(public_key_dir: Path, private_key_dir: Path):
private_key = rsa.generate_private_key(
public_exponent=65537, key_size=2048, backend=default_backend()
)
Expand All @@ -21,11 +20,11 @@ def _create_key_pair(vault_dir: Path):
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
)
public_key_location = vault_dir / "microdata_public_key.pem"
public_key_location = public_key_dir / "microdata_public_key.pem"
with open(public_key_location, "wb") as file:
file.write(microdata_public_key_pem)

with open(vault_dir / "microdata_private_key.pem", "wb") as file:
with open(private_key_dir / "microdata_private_key.pem", "wb") as file:
file.write(
private_key.private_bytes(
encoding=serialization.Encoding.PEM,
Expand Down Expand Up @@ -74,11 +73,12 @@ def _render_metadata_all(metadata_all_file: Path) -> None:
def _package_to_input(datastore_dir: str):
package_dir = Path("tests/integration/resources/input_datasets")
input_dir = Path(f"{datastore_dir}_input")
vault_dir = Path(f"{datastore_dir}/vault")
_create_key_pair(vault_dir)
public_key_dir = Path(f"{datastore_dir}/vault")
private_key_dir = PRIVATE_KEYS_DIR / Path(datastore_dir).name
_create_key_pair(public_key_dir, private_key_dir)
for dataset in os.listdir(package_dir):
package_dataset(
rsa_keys_dir=vault_dir,
rsa_keys_dir=public_key_dir,
dataset_dir=Path(package_dir / dataset),
output_dir=Path(input_dir),
)
Expand Down
Empty file.
Loading