diff --git a/.test.env b/.test.env index e7febe07..e356f966 100644 --- a/.test.env +++ b/.test.env @@ -1,7 +1,4 @@ -INPUT_DIR=tests/resources/datastores/TEST_DATASTORE_input -WORKING_DIR=tests/resources/datastores/TEST_DATASTORE_working DATASTORE_DIR=tests/resources/datastores/TEST_DATASTORE -RSA_KEYS_DIRECTORY=tests/resources/rsa_keys PSEUDONYM_SERVICE_URL=http://mock.pseudonym.service DATASTORE_API_URL=http://mock.job.service NUMBER_OF_WORKERS=4 diff --git a/job_executor/adapter/datastore_api/models.py b/job_executor/adapter/datastore_api/models.py index 3673be85..d101c37b 100644 --- a/job_executor/adapter/datastore_api/models.py +++ b/job_executor/adapter/datastore_api/models.py @@ -3,7 +3,7 @@ from pydantic import model_validator -from job_executor.adapter.local_storage.models.datastore_versions import ( +from job_executor.adapter.fs.models.datastore_versions import ( DatastoreVersion, ) from job_executor.common.models import CamelModel diff --git a/job_executor/adapter/fs/__init__.py b/job_executor/adapter/fs/__init__.py new file mode 100644 index 00000000..f45bf19d --- /dev/null +++ b/job_executor/adapter/fs/__init__.py @@ -0,0 +1,49 @@ +import shutil +from pathlib import Path +from typing import Protocol + +from job_executor.adapter.fs.datastore_files import DatastoreDirectory +from job_executor.adapter.fs.input_files import InputDirectory +from job_executor.adapter.fs.working_files import WorkingDirectory + + +class FileSystemAdapter(Protocol): + datastore_dir: DatastoreDirectory + working_dir: WorkingDirectory + input_dir: InputDirectory + + def move_working_dir_parquet_to_datastore( + self, dataset_name: str + ) -> None: ... + + +class LocalStorageAdapter: + datastore_dir: DatastoreDirectory + working_dir: WorkingDirectory + input_dir: InputDirectory + + def __init__(self, datastore_dir_path: Path) -> None: + self.datastore_dir = DatastoreDirectory(datastore_dir_path) + self.working_dir = WorkingDirectory( + Path(f"{datastore_dir_path}_working") + ) + self.input_dir = InputDirectory(Path(f"{datastore_dir_path}_input")) + + def move_working_dir_parquet_to_datastore(self, dataset_name: str) -> None: + """ + Moves the given parquet DRAFT file from the working directory to + the appropriate datastore sub directory. + + * dataset_name: str - name of dataset + """ + working_dir_parquet_path = self.working_dir._get_draft_parquet_path( + dataset_name + ) + shutil.move( + working_dir_parquet_path, + ( + self.datastore_dir.data_dir + / dataset_name + / working_dir_parquet_path.parts[-1] + ), + ) diff --git a/job_executor/adapter/fs/datastore_files.py b/job_executor/adapter/fs/datastore_files.py new file mode 100644 index 00000000..cb5908b6 --- /dev/null +++ b/job_executor/adapter/fs/datastore_files.py @@ -0,0 +1,362 @@ +import json +import os +import shutil +from datetime import UTC, datetime +from pathlib import Path + +from pydantic import ValidationError + +from job_executor.adapter.fs.models.datastore_versions import ( + DatastoreVersions, + DraftVersion, +) +from job_executor.adapter.fs.models.metadata import ( + MetadataAll, + MetadataAllDraft, +) +from job_executor.common.exceptions import LocalStorageError + + +class DatastoreDirectory: + root_dir: Path + vault_dir: Path + data_dir: Path + metadata_dir: Path + draft_metadata_all_path: Path + datastore_versions_path: Path + draft_version_path: Path + archive_dir: Path + + def __init__(self, root_dir: Path) -> None: + self.root_dir = root_dir + self.data_dir = root_dir / "data" + self.metadata_dir = root_dir / "datastore" + self.vault_dir = root_dir / "vault" + self.draft_version_path = self.metadata_dir / "draft_version.json" + self.archive_dir = self.root_dir / "archive" + self.draft_metadata_all_path = ( + self.metadata_dir / "metadata_all__DRAFT.json" + ) + self.datastore_versions_path = ( + self.metadata_dir / "datastore_versions.json" + ) + + def _get_draft_parquet_path(self, dataset_name: str) -> Path: + parquet_file_path = ( + self.data_dir / dataset_name / f"{dataset_name}__DRAFT.parquet" + ) + partitioned_parquet_path = ( + self.data_dir / dataset_name / f"{dataset_name}__DRAFT" + ) + if partitioned_parquet_path.is_dir(): + return partitioned_parquet_path + elif parquet_file_path.is_file(): + return parquet_file_path + else: + raise FileExistsError( + f"Invalid parquet path in {self.data_dir} for {dataset_name}" + ) + + def make_dataset_dir(self, dataset_name: str) -> None: + """ + Creates sub-directories for dataset_name in the datastore + /data directory. + + * dataset_name: str - name of dataset + """ + os.makedirs(self.data_dir / dataset_name, exist_ok=True) + + def get_data_versions(self, version: str | None) -> dict: + """ + Returns the data_versions json file for the given version as a dict. + Returns an empty dictionary if given version is None. + + * version: str - '__' formatted semantic version + """ + if version is None: + return {} + file_version = "_".join(version.split("_")[:-1]) + file_path = self.metadata_dir / f"data_versions__{file_version}.json" + with open(file_path, "r") as f: + return json.load(f) + + def write_data_versions(self, data_versions: dict, version: str) -> None: + """ + Writes given dict to a new data versions json file to the appropriate + datastore directory named with the given version. + + * data_versions: dict - data versions dict + * version: str - '__' formatted semantic version + """ + file_version = "_".join(version.split("_")[:-1]) + file_path = ( + self.root_dir / f"datastore/data_versions__{file_version}.json" + ) + with open(file_path, "w") as f: + return json.dump(data_versions, f, indent=2) + + def get_draft_version(self) -> DraftVersion: + """ + Reads the draft version file from the datastore. + """ + file_path = self.draft_version_path + with open(file_path, "r") as f: + return DraftVersion.model_validate(json.load(f)) + + def write_draft_version(self, draft_version: DraftVersion) -> None: + """ + Writes json representation of object to the draft version json file + by alias. + """ + file_path = self.draft_version_path + with open(file_path, "w") as f: + return json.dump( + draft_version.model_dump(by_alias=True), f, indent=2 + ) + + def get_datastore_versions(self) -> DatastoreVersions: + """ + Returns the contents of the datastore versions json file as + DatastoreVersions object. + """ + file_path = self.datastore_versions_path + with open(file_path, "r") as f: + return DatastoreVersions.model_validate(json.load(f)) + + def write_datastore_versions( + self, datastore_versions: DatastoreVersions + ) -> None: + """ + Writes json representation of object to the draft version json file + by alias. + """ + file_path = self.datastore_versions_path + with open(file_path, "w") as f: + return json.dump( + datastore_versions.model_dump(by_alias=True), f, indent=2 + ) + + def get_metadata_all(self, version: str) -> MetadataAll: + """ + Returns the metadata all json file for the given version. + + * version: str - '__' formatted semantic version + """ + file_path = self.metadata_dir / f"metadata_all__{version}.json" + with open(file_path, "r") as f: + return MetadataAll.model_validate(json.load(f)) + + def write_metadata_all( + self, metadata_all: MetadataAll, version: str + ) -> None: + """ + Writes given metadata all to the appropriate json file in the + datastore directory named with the given version. + + * metadata_all: MetadataAll - A MetadataAll object + * version: str - '__' + """ + file_path = self.metadata_dir / f"metadata_all__{version}.json" + with open(file_path, "w") as f: + return json.dump( + metadata_all.model_dump(by_alias=True, exclude_none=True), + f, + indent=2, + ) + + def get_metadata_all_draft(self) -> MetadataAllDraft: + """ + Returns the metadata all draft json file. + """ + file_path = self.draft_metadata_all_path + with open(file_path, "r") as f: + return MetadataAllDraft.model_validate(json.load(f)) + + def write_metadata_all_draft( + self, metadata_all_draft: MetadataAllDraft + ) -> None: + """ + Writes json representation of object to the metadata all draft json file + by alias. A tmp file will be written to first + to avoid downtime in consuming services due to incomplete json while + writing. + """ + tmp_file_path = f"{self.draft_metadata_all_path}.tmp" + with open(tmp_file_path, "w", encoding="utf-8") as f: + json.dump( + metadata_all_draft.model_dump(by_alias=True, exclude_none=True), + f, + indent=2, + ) + os.remove(self.draft_metadata_all_path) + shutil.move(tmp_file_path, self.draft_metadata_all_path) + + def rename_parquet_draft_to_release( + self, dataset_name: str, version: str + ) -> str: + """ + Renames the parquet DRAFT file or directory for the given dataset_name, + with the given version. + + * dataset_name: str - name of dataset + * version: str - '__' formatted semantic version + """ + draft_path = self._get_draft_parquet_path(dataset_name) + file_version = "_".join(version.split("_")[:-1]) + file_name = ( + draft_path.stem.replace("DRAFT", file_version) + draft_path.suffix + ) + release_path = draft_path.parent / file_name + shutil.move(draft_path, release_path) + return release_path.name + + def delete_parquet_draft(self, dataset_name: str) -> None: + """ + Deletes the parquet draft file or directory for the given dataset_name. + + * dataset_name: str - name of dataset + """ + data_dir = self.root_dir / f"data/{dataset_name}" + partitioned_parquet_path = data_dir / f"{dataset_name}__DRAFT" + parquet_file_path = data_dir / f"{dataset_name}__DRAFT.parquet" + if partitioned_parquet_path.is_dir(): + shutil.rmtree(partitioned_parquet_path) + elif parquet_file_path.is_file(): + os.remove(parquet_file_path) + + def save_temporary_backup(self) -> None: + """ + Backs up metadata_all__DRAFT.json, datastore_versions.json and + draft_version.json from the datastore to a tmp directory + inside the datastore directory. + Raises `LocalStorageError` if tmp directory already exists. + """ + with open( + f"{self.root_dir}/datastore/datastore_versions.json", + encoding="utf-8", + ) as f: + datastore_versions = json.load(f) + with open(self.draft_version_path, encoding="utf-8") as f: + draft_version = json.load(f) + with open(self.draft_metadata_all_path, encoding="utf-8") as f: + metadata_all_draft = json.load(f) + tmp_dir = self.root_dir / "tmp" + if os.path.isdir(tmp_dir): + raise LocalStorageError("tmp directory already exists") + os.mkdir(tmp_dir) + with open(tmp_dir / "draft_version.json", "w", encoding="utf-8") as f: + json.dump(draft_version, f, indent=2) + with open( + tmp_dir / "metadata_all__DRAFT.json", "w", encoding="utf-8" + ) as f: + json.dump(metadata_all_draft, f, indent=2) + with open( + tmp_dir / "datastore_versions.json", "w", encoding="utf-8" + ) as f: + json.dump(datastore_versions, f, indent=2) + + def restore_from_temporary_backup(self) -> str | None: + """ + Restores the datastore from the tmp directory. + Raises `LocalStorageError`if there are any missing backup files. + + Returns None if no released version in backup, else returns the + latest release version number as dotted four part version. + """ + tmp_dir = Path(self.root_dir) / "tmp" + draft_version_backup = tmp_dir / "draft_version.json" + metadata_all_draft_backup = tmp_dir / "metadata_all__DRAFT.json" + datastore_versions_backup = tmp_dir / "datastore_versions.json" + backup_exists = ( + os.path.isdir(tmp_dir) + and os.path.isfile(draft_version_backup) + and os.path.isfile(metadata_all_draft_backup) + and os.path.isfile(datastore_versions_backup) + ) + if not backup_exists: + raise LocalStorageError("Missing tmp backup files") + try: + with open(datastore_versions_backup, "r") as f: + datastore_versions = json.load(f) + shutil.move(draft_version_backup, self.draft_version_path) + shutil.move(metadata_all_draft_backup, self.draft_metadata_all_path) + shutil.move(datastore_versions_backup, self.datastore_versions_path) + if datastore_versions["versions"] == []: + return None + else: + return datastore_versions["versions"][0]["version"] + except ValidationError as e: + raise LocalStorageError("Invalid backup file") from e + + def archive_temporary_backup(self) -> None: + """ + Archives the tmp directory within the datastore if the directory + exists. Raises `LocalStorageError` if there are any unrecognized files + in the directory. + """ + tmp_dir = Path(self.root_dir) / "tmp" + os.makedirs(self.archive_dir, exist_ok=True) + + if not os.path.isdir(Path(self.root_dir) / "tmp"): + raise LocalStorageError( + "Could not find a tmp directory to archive." + ) + for content in os.listdir(tmp_dir): + if content not in [ + "datastore_versions.json", + "metadata_all__DRAFT.json", + "draft_version.json", + ]: + raise LocalStorageError( + "Found unrecognized files and/or directories in the tmp " + "directory. Aborting tmp archiving." + ) + timestamp = datetime.now(UTC).replace(tzinfo=None) + shutil.move( + self.root_dir / "tmp", self.archive_dir / f"tmp_{timestamp}" + ) + + def delete_temporary_backup(self) -> None: + """ + Deletes the tmp directory within the datastore if the directory + exists. Raises `LocalStorageError` if there are any unrecognized files + in the directory. + """ + tmp_dir = Path(self.root_dir) / "tmp" + os.makedirs(self.archive_dir, exist_ok=True) + + if not os.path.isdir(Path(self.root_dir) / "tmp"): + raise LocalStorageError("Could not find a tmp directory to delete.") + for content in os.listdir(tmp_dir): + if content not in [ + "datastore_versions.json", + "metadata_all__DRAFT.json", + "draft_version.json", + ]: + raise LocalStorageError( + "Found unrecognized files and/or directories in the tmp " + "directory. Aborting tmp deleting." + ) + shutil.rmtree(self.root_dir / "tmp") + + def temporary_backup_exists(self) -> bool: + """ + Returns a boolean representing if the tmp directory exists. + """ + tmp_dir = self.root_dir / "tmp" + return os.path.isdir(tmp_dir) + + def archive_draft_version(self, version: str) -> None: + """ + Archives the current draft json + * dataset_name: str - name of dataset draft + * version: str - version of the archived draft + """ + os.makedirs(self.archive_dir, exist_ok=True) + + timestamp = datetime.now(UTC).replace(tzinfo=None) + + archived_draft_version_path = ( + self.archive_dir / f"draft_version_{version}_{timestamp}.json" + ) + shutil.copyfile(self.draft_version_path, archived_draft_version_path) diff --git a/job_executor/adapter/fs/input_files.py b/job_executor/adapter/fs/input_files.py new file mode 100644 index 00000000..80dcdb38 --- /dev/null +++ b/job_executor/adapter/fs/input_files.py @@ -0,0 +1,44 @@ +import os +import shutil +from dataclasses import dataclass +from pathlib import Path + + +@dataclass +class InputDirectory: + path: Path + + def archive_importable(self, dataset_name: str) -> None: + """ + Archives the input .tar files if not already archived + """ + archive_dir = self.path / "archive" + tar_filename = f"{dataset_name}.tar" + archived_tar_file = archive_dir / tar_filename + tar_file = self.path / tar_filename + if not archive_dir.exists(): + os.makedirs(archive_dir, exist_ok=True) + if tar_file.exists() and not os.path.isfile(archived_tar_file): + shutil.move(str(tar_file), str(archive_dir)) + + def delete_archived_importable(self, dataset_name: str) -> None: + """ + Delete the archived .tar file from the archive directory. + """ + archived_file: Path = self.path / f"archive/{dataset_name}.tar" + if archived_file.is_file(): + os.remove(archived_file) + + def get_importable_tar_size_in_bytes(self, dataset_name: str) -> int: + """ + Checks the size in bytes of the dataset.tar file. + Returns size in bytes or 0 if the file does not exist. + """ + tar_path = self.path / f"{dataset_name}.tar" + + if not tar_path.exists(): + tar_path = self.path / "archive" / f"{dataset_name}.tar" + + if tar_path.exists(): + return os.path.getsize(tar_path) + return 0 diff --git a/job_executor/adapter/local_storage/models/datastore_versions.py b/job_executor/adapter/fs/models/datastore_versions.py similarity index 100% rename from job_executor/adapter/local_storage/models/datastore_versions.py rename to job_executor/adapter/fs/models/datastore_versions.py diff --git a/job_executor/adapter/local_storage/models/metadata.py b/job_executor/adapter/fs/models/metadata.py similarity index 99% rename from job_executor/adapter/local_storage/models/metadata.py rename to job_executor/adapter/fs/models/metadata.py index 7d477aa7..2136755b 100644 --- a/job_executor/adapter/local_storage/models/metadata.py +++ b/job_executor/adapter/fs/models/metadata.py @@ -1,6 +1,6 @@ from collections.abc import Iterator -from job_executor.adapter.local_storage.models.datastore_versions import ( +from job_executor.adapter.fs.models.datastore_versions import ( DatastoreVersion, ) from job_executor.common.exceptions import ( diff --git a/job_executor/adapter/fs/working_files.py b/job_executor/adapter/fs/working_files.py new file mode 100644 index 00000000..ee7210ad --- /dev/null +++ b/job_executor/adapter/fs/working_files.py @@ -0,0 +1,104 @@ +import json +import os +import shutil +from dataclasses import dataclass +from pathlib import Path + +from job_executor.adapter.fs.models.metadata import Metadata + + +@dataclass +class WorkingDirectory: + path: Path + + def _get_draft_parquet_path(self, dataset_name: str) -> Path: + parquet_file_path = self.path / f"{dataset_name}__DRAFT.parquet" + partitioned_parquet_path = self.path / f"{dataset_name}__DRAFT" + if partitioned_parquet_path.is_dir(): + return partitioned_parquet_path + elif parquet_file_path.is_file(): + return parquet_file_path + else: + raise FileExistsError( + f"Invalid parquet path in {self.path} for {dataset_name}" + ) + + def write_metadata(self, dataset_name: str, metadata: Metadata) -> None: + """ + Writes a json to a the working directory as the processed metadata file + named: {dataset_name}__DRAFT.json + + * dataset_name: str - name of dataset + * metadata: Metadata - Metadata to write as json + """ + file_path = self.path / f"{dataset_name}__DRAFT.json" + with open(file_path, "w", encoding="utf-8") as f: + json.dump( + metadata.model_dump(by_alias=True, exclude_none=True), + f, + ) + + def get_metadata(self, dataset_name: str) -> Metadata: + """ + Returns the working dir metadata json file for given dataset_name + as a Metadata object. + + * dataset_name: str - name of dataset + """ + file_path = self.path / f"{dataset_name}__DRAFT.json" + with open(file_path, "r", encoding="utf-8") as f: + return Metadata.model_validate(json.load(f)) + + def delete_metadata(self, dataset_name: str) -> None: + """ + Deletes the metadata in working directory with postfix __DRAFT.json + + * dataset_name: str - name of dataset + """ + metadata_path = self.path / f"{dataset_name}__DRAFT.json" + if os.path.isfile(metadata_path): + os.remove(metadata_path) + + def get_input_metadata(self, dataset_name: str) -> dict: + """ + Returns the working dir metadata json file for given dataset_name. + + * dataset_name: str - name of dataset + """ + file_path = self.path / f"{dataset_name}.json" + with open(file_path, "r", encoding="utf-8") as f: + return json.load(f) + + def delete_input_metadata(self, dataset_name: str) -> None: + """ + Deletes the metadata in working directory with postfix __DRAFT.json + + * dataset_name: str - name of dataset + """ + metadata_path = self.path / f"{dataset_name}.json" + if os.path.isfile(metadata_path): + os.remove(metadata_path) + + def delete_file(self, file_name: str) -> None: + """ + Deletes a file from the working directory. + Intended to clean up left-over files. + + * file_name: str - name of temporary file + """ + file_path = self.path / file_name + if file_path.is_file(): + os.remove(file_path) + + def delete_sub_directory(self, directory_name: str) -> None: + """ + Deletes a directory from the working directory. + Intended to clean up left-over directories. + Raises a LocalStorageError if dirpath is not in + the working directory. + + * dir_path: str - name of temporary directory + """ + dir_path = self.path / directory_name + if dir_path.is_dir(): + shutil.rmtree(dir_path) diff --git a/job_executor/adapter/local_storage/__init__.py b/job_executor/adapter/local_storage/__init__.py deleted file mode 100644 index b5633579..00000000 --- a/job_executor/adapter/local_storage/__init__.py +++ /dev/null @@ -1,494 +0,0 @@ -import json -import os -import shutil -from datetime import UTC, datetime -from pathlib import Path - -from pydantic import ValidationError - -from job_executor.adapter.local_storage.models.datastore_versions import ( - DatastoreVersions, - DraftVersion, -) -from job_executor.adapter.local_storage.models.metadata import ( - Metadata, - MetadataAll, - MetadataAllDraft, -) -from job_executor.common.exceptions import LocalStorageError -from job_executor.config import environment - -WORKING_DIR = Path(environment.working_dir) -DATASTORE_DIR = Path(environment.datastore_dir) -INPUT_DIR = Path(environment.input_dir) - -DATASTORE_VERSIONS_PATH = DATASTORE_DIR / "datastore/datastore_versions.json" -DRAFT_METADATA_ALL_PATH = DATASTORE_DIR / "datastore/metadata_all__DRAFT.json" -DRAFT_VERSION_PATH = DATASTORE_DIR / "datastore/draft_version.json" -ARCHIVE_DIR = DATASTORE_DIR / "archive" - - -def _read_json(file_path: Path) -> dict: - with open(file_path, "r", encoding="utf-8") as f: - return json.load(f) - - -def _write_json( - content: dict, file_path: Path, indent: int | None = None -) -> None: - with open(file_path, "w", encoding="utf-8") as f: - json.dump(content, f, indent=indent) - - -def _write_json_with_tmp( - content: dict, file_path: Path, indent: int | None = None -) -> None: - tmp_file_path = f"{file_path}.tmp" - with open(tmp_file_path, "w", encoding="utf-8") as f: - json.dump(content, f, indent=indent) - os.remove(file_path) - shutil.move(tmp_file_path, file_path) - - -def _get_parquet_path(directory: Path, dataset_name: str) -> Path: - parquet_file_path = directory / f"{dataset_name}__DRAFT.parquet" - partitioned_parquet_path = directory / f"{dataset_name}__DRAFT" - if partitioned_parquet_path.is_dir(): - return partitioned_parquet_path - elif parquet_file_path.is_file(): - return parquet_file_path - else: - raise FileExistsError( - f"Invalid parquet path in {directory} for {dataset_name}" - ) - - -def _get_datastore_draft_parquet_path(dataset_name: str) -> Path: - return _get_parquet_path( - DATASTORE_DIR / f"data/{dataset_name}", dataset_name - ) - - -def _get_working_dir_draft_parquet_path(dataset_name: str) -> Path: - return _get_parquet_path(WORKING_DIR, dataset_name) - - -def make_dataset_dir(dataset_name: str) -> None: - """ - Creates sub-directories for dataset_name in the datastore /data directory. - - * dataset_name: str - name of dataset - """ - os.makedirs(DATASTORE_DIR / f"data/{dataset_name}", exist_ok=True) - - -def get_data_versions(version: str | None) -> dict: - """ - Returns the data_versions json file for the given version as a dict. - Returns an empty dictionary if given version is None. - - * version: str - '__' formatted semantic version - """ - if version is None: - return {} - file_version = "_".join(version.split("_")[:-1]) - return _read_json( - DATASTORE_DIR / f"datastore/data_versions__{file_version}.json" - ) - - -def write_data_versions(data_versions: dict, version: str) -> None: - """ - Writes given dict to a new data versions json file to the appropriate - datastore directory named with the given version. - - * data_versions: dict - data versions dict - * version: str - '__' formatted semantic version - """ - file_version = "_".join(version.split("_")[:-1]) - _write_json( - data_versions, - DATASTORE_DIR / f"datastore/data_versions__{file_version}.json", - indent=2, - ) - - -def get_draft_version() -> DraftVersion: - """ - Reads the draft version file from the datastore. - """ - return DraftVersion.model_validate(_read_json(DRAFT_VERSION_PATH)) - - -def write_draft_version(draft_version: DraftVersion) -> None: - """ - Writes json representation of object to the draft version json file - by alias. - """ - _write_json( - draft_version.model_dump(by_alias=True), DRAFT_VERSION_PATH, indent=2 - ) - - -def get_datastore_versions() -> DatastoreVersions: - """ - Returns the contents of the datastore versions json file as dict. - """ - return DatastoreVersions.model_validate(_read_json(DATASTORE_VERSIONS_PATH)) - - -def write_datastore_versions(datastore_versions: DatastoreVersions) -> None: - """ - Writes json representation of object to the draft version json file - by alias. - """ - _write_json( - datastore_versions.model_dump(by_alias=True), - DATASTORE_VERSIONS_PATH, - indent=2, - ) - - -def get_metadata_all(version: str) -> MetadataAll: - """ - Returns the metadata all json file for the given version. - - * version: str - '__' formatted semantic version - """ - return MetadataAll.model_validate( - _read_json(DATASTORE_DIR / f"datastore/metadata_all__{version}.json") - ) - - -def write_metadata_all(metadata_all: MetadataAll, version: str) -> None: - """ - Writes given metadata all to the appropriate json file in the - datastore directory named with the given version. - - * metadata_all: MetadataAll - A MetadataAll object - * version: str - '__' - """ - _write_json( - metadata_all.model_dump(by_alias=True, exclude_none=True), - DATASTORE_DIR / f"datastore/metadata_all__{version}.json", - ) - - -def get_metadata_all_draft() -> MetadataAllDraft: - """ - Returns the metadata all draft json file. - """ - return MetadataAllDraft.model_validate( - _read_json(DATASTORE_DIR / "datastore/metadata_all__DRAFT.json") - ) - - -def write_metadata_all_draft(metadata_all: MetadataAllDraft) -> None: - """ - Writes json representation of object to the metadata all draft json file - by alias. A tmp file will be written to first - to avoid downtime in consuming services due to incomplete json while - writing. - - """ - _write_json_with_tmp( - metadata_all.model_dump(by_alias=True, exclude_none=True), - DATASTORE_DIR / "datastore/metadata_all__DRAFT.json", - indent=2, - ) - - -def write_working_dir_metadata(dataset_name: str, metadata: Metadata) -> None: - """ - Writes a json to a the working directory as the processed metadata file - named: {dataset_name}__DRAFT.json - - * dataset_name: str - name of dataset - * metadata: Metadata - Metadata to write as json - """ - _write_json( - metadata.model_dump(by_alias=True, exclude_none=True), - WORKING_DIR / f"{dataset_name}__DRAFT.json", - ) - - -def get_working_dir_metadata(dataset_name: str) -> Metadata: - """ - Returns the working dir metadata json file for given dataset_name - as a Metadata object. - - * dataset_name: str - name of dataset - """ - return Metadata.model_validate( - _read_json(WORKING_DIR / f"{dataset_name}__DRAFT.json") - ) - - -def delete_working_dir_metadata(dataset_name: str) -> None: - """ - Deletes the metadata in working directory with postfix __DRAFT.json - - * dataset_name: str - name of dataset - """ - metadata_path = WORKING_DIR / f"{dataset_name}__DRAFT.json" - if os.path.isfile(metadata_path): - os.remove(metadata_path) - - -def get_working_dir_input_metadata(dataset_name: str) -> dict: - """ - Returns the working dir metadata json file for given dataset_name. - - * dataset_name: str - name of dataset - """ - return _read_json(WORKING_DIR / f"{dataset_name}.json") - - -def rename_parquet_draft_to_release(dataset_name: str, version: str) -> str: - """ - Renames the parquet DRAFT file or directory for the given dataset_name, - with the given version. - - * dataset_name: str - name of dataset - * version: str - '__' formatted semantic version - """ - draft_path = _get_datastore_draft_parquet_path(dataset_name) - file_version = "_".join(version.split("_")[:-1]) - file_name = ( - draft_path.stem.replace("DRAFT", file_version) + draft_path.suffix - ) - release_path = draft_path.parent / file_name - shutil.move(draft_path, release_path) - return release_path.name - - -def move_working_dir_parquet_to_datastore(dataset_name: str) -> None: - """ - Moves the given parquet DRAFT file from the working directory to - the appropriate datastore sub directory. - - * dataset_name: str - name of dataset - """ - working_dir_parquet_path = _get_working_dir_draft_parquet_path(dataset_name) - shutil.move( - working_dir_parquet_path, - ( - DATASTORE_DIR / f"data/{dataset_name}/" - f"{working_dir_parquet_path.parts[-1]}" - ), - ) - - -def delete_parquet_draft(dataset_name: str) -> None: - """ - Deletes the parquet draft file or directory for the given dataset_name. - - * dataset_name: str - name of dataset - """ - data_dir = DATASTORE_DIR / f"data/{dataset_name}" - partitioned_parquet_path = data_dir / f"{dataset_name}__DRAFT" - parquet_file_path = data_dir / f"{dataset_name}__DRAFT.parquet" - if partitioned_parquet_path.is_dir(): - shutil.rmtree(partitioned_parquet_path) - elif parquet_file_path.is_file(): - os.remove(parquet_file_path) - - -def delete_working_dir_file(file_path: Path) -> None: - """ - Deletes a file from the working directory. - Intended to clean up left-over files. - Raises a LocalStorageError if filepath is not in - the working directory. - - * file_name: str - name of temporary file - """ - if not str(file_path).startswith(str(WORKING_DIR)): - raise LocalStorageError(f"Filepath {file_path} is not in {WORKING_DIR}") - if file_path.is_file(): - os.remove(file_path) - - -def delete_working_dir_dir(dir_path: Path) -> None: - """ - Deletes a directory from the working directory. - Intended to clean up left-over directories. - Raises a LocalStorageError if dirpath is not in - the working directory. - - * dir_path: str - name of temporary directory - """ - if not str(dir_path).startswith(str(WORKING_DIR)): - raise LocalStorageError(f"Dirpath {dir_path} is not in {WORKING_DIR}") - if dir_path.is_dir(): - shutil.rmtree(dir_path) - - -def save_temporary_backup() -> None: - """ - Backs up metadata_all__DRAFT.json, datastore_versions.json and - draft_version.json from the datastore to a tmp directory - inside the datastore directory. - Raises `LocalStorageError` if tmp directory already exists. - """ - with open( - f"{DATASTORE_DIR}/datastore/datastore_versions.json", encoding="utf-8" - ) as f: - datastore_versions = json.load(f) - with open(DRAFT_VERSION_PATH, encoding="utf-8") as f: - draft_version = json.load(f) - with open(DRAFT_METADATA_ALL_PATH, encoding="utf-8") as f: - metadata_all_draft = json.load(f) - tmp_dir = Path(DATASTORE_DIR) / "tmp" - if os.path.isdir(tmp_dir): - raise LocalStorageError("tmp directory already exists") - os.mkdir(tmp_dir) - _write_json(draft_version, tmp_dir / "draft_version.json", indent=2) - _write_json(metadata_all_draft, tmp_dir / "metadata_all__DRAFT.json") - _write_json( - datastore_versions, tmp_dir / "datastore_versions.json", indent=2 - ) - - -def restore_from_temporary_backup() -> str | None: - """ - Restores the datastore from the tmp directory. - Raises `LocalStorageError`if there are any missing backup files. - - Returns None if no released version in backup, else returns the - latest release version number as dotted four part version. - """ - tmp_dir = Path(DATASTORE_DIR) / "tmp" - draft_version_backup = tmp_dir / "draft_version.json" - metadata_all_draft_backup = tmp_dir / "metadata_all__DRAFT.json" - datastore_versions_backup = tmp_dir / "datastore_versions.json" - backup_exists = ( - os.path.isdir(tmp_dir) - and os.path.isfile(draft_version_backup) - and os.path.isfile(metadata_all_draft_backup) - and os.path.isfile(datastore_versions_backup) - ) - if not backup_exists: - raise LocalStorageError("Missing tmp backup files") - try: - datastore_versions = _read_json(datastore_versions_backup) - shutil.move(draft_version_backup, DRAFT_VERSION_PATH) - shutil.move(metadata_all_draft_backup, DRAFT_METADATA_ALL_PATH) - shutil.move(datastore_versions_backup, DATASTORE_VERSIONS_PATH) - if datastore_versions["versions"] == []: - return None - else: - return datastore_versions["versions"][0]["version"] - except ValidationError as e: - raise LocalStorageError("Invalid backup file") from e - - -def archive_temporary_backup() -> None: - """ - Archives the tmp directory within the datastore if the directory - exists. Raises `LocalStorageError` if there are any unrecognized files - in the directory. - """ - tmp_dir = Path(DATASTORE_DIR) / "tmp" - os.makedirs(ARCHIVE_DIR, exist_ok=True) - - if not os.path.isdir(Path(DATASTORE_DIR) / "tmp"): - raise LocalStorageError("Could not find a tmp directory to archive.") - for content in os.listdir(tmp_dir): - if content not in [ - "datastore_versions.json", - "metadata_all__DRAFT.json", - "draft_version.json", - ]: - raise LocalStorageError( - "Found unrecognized files and/or directories in the tmp " - "directory. Aborting tmp archiving." - ) - timestamp = datetime.now(UTC).replace(tzinfo=None) - shutil.move(DATASTORE_DIR / "tmp", ARCHIVE_DIR / f"tmp_{timestamp}") - - -def delete_temporary_backup() -> None: - """ - Deletes the tmp directory within the datastore if the directory - exists. Raises `LocalStorageError` if there are any unrecognized files - in the directory. - """ - tmp_dir = Path(DATASTORE_DIR) / "tmp" - os.makedirs(ARCHIVE_DIR, exist_ok=True) - - if not os.path.isdir(Path(DATASTORE_DIR) / "tmp"): - raise LocalStorageError("Could not find a tmp directory to delete.") - for content in os.listdir(tmp_dir): - if content not in [ - "datastore_versions.json", - "metadata_all__DRAFT.json", - "draft_version.json", - ]: - raise LocalStorageError( - "Found unrecognized files and/or directories in the tmp " - "directory. Aborting tmp deleting." - ) - shutil.rmtree(DATASTORE_DIR / "tmp") - - -def temporary_backup_exists() -> bool: - """ - Returns a boolean representing if the tmp directory exists. - """ - tmp_dir = Path(DATASTORE_DIR) / "tmp" - return os.path.isdir(tmp_dir) - - -def archive_draft_version(version: str) -> None: - """ - Archives the current draft json - * dataset_name: str - name of dataset draft - * version: str - version of the archived draft - """ - os.makedirs(ARCHIVE_DIR, exist_ok=True) - - timestamp = datetime.now(UTC).replace(tzinfo=None) - - archived_draft_version_path = ( - ARCHIVE_DIR / f"draft_version_{version}_{timestamp}.json" - ) - shutil.copyfile(DRAFT_VERSION_PATH, archived_draft_version_path) - - -def archive_input_files(dataset_name: str) -> None: - """ - Archives the input .tar files if not already archived - """ - archive_dir = INPUT_DIR / "archive" - tar_filename = f"{dataset_name}.tar" - archived_tar_file = archive_dir / tar_filename - tar_file = INPUT_DIR / tar_filename - if not archive_dir.exists(): - os.makedirs(archive_dir, exist_ok=True) - if tar_file.exists() and not os.path.isfile(archived_tar_file): - shutil.move(str(tar_file), str(archive_dir)) - - -def delete_archived_input(dataset_name: str) -> None: - """ - Delete the archived .tar file from the archive directory. - """ - archived_file: Path = INPUT_DIR / f"archive/{dataset_name}.tar" - if archived_file.is_file(): - os.remove(archived_file) - - -def get_input_tar_size_in_bytes(dataset_name: str) -> int: - """ - Checks the size in bytes of the dataset.tar file. - Returns size in bytes or 0 if the file does not exist. - """ - tar_path = INPUT_DIR / f"{dataset_name}.tar" - - if not tar_path.exists(): - tar_path = INPUT_DIR / "archive" / f"{dataset_name}.tar" - - if tar_path.exists(): - return os.path.getsize(tar_path) - return 0 diff --git a/job_executor/config/__init__.py b/job_executor/config/__init__.py index f4ee1542..87a84a4c 100644 --- a/job_executor/config/__init__.py +++ b/job_executor/config/__init__.py @@ -6,9 +6,6 @@ @dataclass class Environment: datastore_dir: str - working_dir: str - input_dir: str - rsa_keys_directory: str secrets_file: str pseudonym_service_url: str datastore_api_url: str @@ -20,10 +17,7 @@ class Environment: def _initialize_environment() -> Environment: return Environment( - input_dir=os.environ["INPUT_DIR"], - working_dir=os.environ["WORKING_DIR"], datastore_dir=os.environ["DATASTORE_DIR"], - rsa_keys_directory=os.environ["RSA_KEYS_DIRECTORY"], secrets_file=os.environ["SECRETS_FILE"], pseudonym_service_url=os.environ["PSEUDONYM_SERVICE_URL"], datastore_api_url=os.environ["DATASTORE_API_URL"], diff --git a/job_executor/domain/datastores.py b/job_executor/domain/datastores.py index d91c2c37..f9a9f98d 100644 --- a/job_executor/domain/datastores.py +++ b/job_executor/domain/datastores.py @@ -1,14 +1,16 @@ import logging +from pathlib import Path -from job_executor.adapter import datastore_api, local_storage +from job_executor.adapter import datastore_api from job_executor.adapter.datastore_api.models import JobStatus -from job_executor.adapter.local_storage.models.datastore_versions import ( +from job_executor.adapter.fs import LocalStorageAdapter +from job_executor.adapter.fs.models.datastore_versions import ( DatastoreVersion, DatastoreVersions, DataStructureUpdate, DraftVersion, ) -from job_executor.adapter.local_storage.models.metadata import ( +from job_executor.adapter.fs.models.metadata import ( Metadata, MetadataAll, MetadataAllDraft, @@ -19,6 +21,7 @@ UnnecessaryUpdateException, VersioningException, ) +from job_executor.config import environment from job_executor.domain.rollback import ( rollback_bump, rollback_manager_phase_import_job, @@ -35,17 +38,24 @@ class Datastore: latest_version_number: str | None def __init__(self) -> None: - self.draft_version = local_storage.get_draft_version() - self.datastore_versions = local_storage.get_datastore_versions() + local_storage = LocalStorageAdapter(Path(environment.datastore_dir)) + self.draft_version = local_storage.datastore_dir.get_draft_version() + self.datastore_versions = ( + local_storage.datastore_dir.get_datastore_versions() + ) self.latest_version_number = ( self.datastore_versions.get_latest_version_number() ) - self.metadata_all_draft = local_storage.get_metadata_all_draft() + self.metadata_all_draft = ( + local_storage.datastore_dir.get_metadata_all_draft() + ) if self.latest_version_number is None: self.metadata_all_latest = None else: self.metadata_all_latest = MetadataAll.model_validate( - local_storage.get_metadata_all(self.latest_version_number) + local_storage.datastore_dir.get_metadata_all( + self.latest_version_number + ) ) @@ -65,6 +75,7 @@ def _get_release_status(datastore: Datastore, dataset_name: str) -> str | None: def _generate_new_metadata_all( datastore: Datastore, new_version: str, new_version_metadata: list[Metadata] ) -> None: + local_storage = LocalStorageAdapter(Path(environment.datastore_dir)) new_metadata_all_dict = datastore.metadata_all_draft.model_dump( by_alias=True, exclude_none=True ) @@ -74,11 +85,13 @@ def _generate_new_metadata_all( for dataset in new_version_metadata ] new_metadata_all = MetadataAll(**new_metadata_all_dict) - local_storage.write_metadata_all( + local_storage.datastore_dir.write_metadata_all( new_metadata_all, new_version, ) - datastore.metadata_all_latest = local_storage.get_metadata_all(new_version) + datastore.metadata_all_latest = ( + local_storage.datastore_dir.get_metadata_all(new_version) + ) def _version_pending_operations( @@ -87,6 +100,7 @@ def _version_pending_operations( release_updates: list[DataStructureUpdate], new_version: str, ) -> tuple[list[Metadata], dict]: + local_storage = LocalStorageAdapter(Path(environment.datastore_dir)) logger.info(f"{job_id}: Generating new metadata_all") new_metadata_datasets = ( [] @@ -95,7 +109,7 @@ def _version_pending_operations( ) logger.info(f"{job_id}: Generating new data_versions") - latest_data_versions = local_storage.get_data_versions( + latest_data_versions = local_storage.datastore_dir.get_data_versions( datastore.latest_version_number ) new_data_versions = { @@ -140,7 +154,7 @@ def _version_pending_operations( f"{job_id}: Renaming data file and updating data_versions" ) new_data_versions[dataset_name] = ( - local_storage.rename_parquet_draft_to_release( + local_storage.datastore_dir.rename_parquet_draft_to_release( dataset_name, new_version ) ) @@ -154,10 +168,11 @@ def patch_metadata( Patch metadata for a released dataset with updated metadata file. """ + local_storage = LocalStorageAdapter(Path(environment.datastore_dir)) if datastore.metadata_all_latest is None: raise NoSuchDraftException("There are no released versions to patch") logger.info(f"{job_id}: Saving temporary backup") - local_storage.save_temporary_backup() + local_storage.datastore_dir.save_temporary_backup() try: logger.info(f"{job_id}: importing") datastore_api.update_job_status(job_id, JobStatus.IMPORTING) @@ -167,7 +182,7 @@ def patch_metadata( "Can't patch metadata of dataset with status " f"{dataset_release_status}" ) - draft_metadata = local_storage.get_working_dir_metadata(dataset_name) + draft_metadata = local_storage.working_dir.get_metadata(dataset_name) released_metadata = datastore.metadata_all_latest.get(dataset_name) if released_metadata is None: raise NoSuchDraftException( @@ -175,7 +190,9 @@ def patch_metadata( ) patched_metadata = released_metadata.patch(draft_metadata) datastore.metadata_all_draft.update_one(dataset_name, patched_metadata) - local_storage.write_metadata_all_draft(datastore.metadata_all_draft) + local_storage.datastore_dir.write_metadata_all_draft( + datastore.metadata_all_draft + ) datastore.draft_version.add( DataStructureUpdate( name=dataset_name, @@ -184,13 +201,13 @@ def patch_metadata( release_status="DRAFT", ) ) - local_storage.write_draft_version(datastore.draft_version) + local_storage.datastore_dir.write_draft_version(datastore.draft_version) logger.info(f"{job_id}: completed") datastore_api.update_job_status(job_id, JobStatus.COMPLETED) logger.info(f"{job_id}: Deleting temporary backup") - local_storage.delete_temporary_backup() - local_storage.delete_working_dir_metadata(dataset_name) - local_storage.delete_archived_input(dataset_name) + local_storage.datastore_dir.delete_temporary_backup() + local_storage.working_dir.delete_metadata(dataset_name) + local_storage.input_dir.delete_archived_importable(dataset_name) except PatchingError as e: logger.error(f"{job_id}: Patching error occured") logger.exception(f"{job_id}: {str(e)}", exc_info=e) @@ -214,8 +231,9 @@ def add( Import metadata and data as draft for a new dataset that has not been released in a previous versions. """ + local_storage = LocalStorageAdapter(Path(environment.datastore_dir)) logger.info(f"{job_id}: Saving temporary backup") - local_storage.save_temporary_backup() + local_storage.datastore_dir.save_temporary_backup() try: logger.info(f"{job_id}: importing") datastore_api.update_job_status(job_id, JobStatus.IMPORTING) @@ -232,18 +250,20 @@ def add( release_status="DRAFT", ) ) - local_storage.write_draft_version(datastore.draft_version) - draft_metadata = local_storage.get_working_dir_metadata(dataset_name) - local_storage.make_dataset_dir(dataset_name) + local_storage.datastore_dir.write_draft_version(datastore.draft_version) + draft_metadata = local_storage.working_dir.get_metadata(dataset_name) + local_storage.datastore_dir.make_dataset_dir(dataset_name) datastore.metadata_all_draft.add(draft_metadata) - local_storage.write_metadata_all_draft(datastore.metadata_all_draft) + local_storage.datastore_dir.write_metadata_all_draft( + datastore.metadata_all_draft + ) local_storage.move_working_dir_parquet_to_datastore(dataset_name) logger.info(f"{job_id}: completed") datastore_api.update_job_status(job_id, JobStatus.COMPLETED) logger.info(f"{job_id}: Deleting temporary backup") - local_storage.delete_temporary_backup() - local_storage.delete_working_dir_metadata(dataset_name) - local_storage.delete_archived_input(dataset_name) + local_storage.datastore_dir.delete_temporary_backup() + local_storage.working_dir.delete_metadata(dataset_name) + local_storage.input_dir.delete_archived_importable(dataset_name) except Exception as e: logger.error(f"{job_id}: An unexpected error occured") logger.exception(f"{job_id}: {str(e)}", exc_info=e) @@ -259,9 +279,10 @@ def change( for a dataset that has already been released in a previous version. """ + local_storage = LocalStorageAdapter(Path(environment.datastore_dir)) try: logger.info(f"{job_id}: Saving temporary backup") - local_storage.save_temporary_backup() + local_storage.datastore_dir.save_temporary_backup() logger.info(f"{job_id}: importing") datastore_api.update_job_status(job_id, JobStatus.IMPORTING) @@ -271,9 +292,11 @@ def change( "Can't change data for dataset with release status" f"{dataset_release_status}" ) - draft_metadata = local_storage.get_working_dir_metadata(dataset_name) + draft_metadata = local_storage.working_dir.get_metadata(dataset_name) datastore.metadata_all_draft.update_one(dataset_name, draft_metadata) - local_storage.write_metadata_all_draft(datastore.metadata_all_draft) + local_storage.datastore_dir.write_metadata_all_draft( + datastore.metadata_all_draft + ) datastore.draft_version.add( DataStructureUpdate( name=dataset_name, @@ -282,14 +305,14 @@ def change( release_status="DRAFT", ) ) - local_storage.write_draft_version(datastore.draft_version) + local_storage.datastore_dir.write_draft_version(datastore.draft_version) local_storage.move_working_dir_parquet_to_datastore(dataset_name) logger.info(f"{job_id}: completed") datastore_api.update_job_status(job_id, JobStatus.COMPLETED) logger.info(f"{job_id}: Deleting temporary backup") - local_storage.delete_temporary_backup() - local_storage.delete_working_dir_metadata(dataset_name) - local_storage.delete_archived_input(dataset_name) + local_storage.datastore_dir.delete_temporary_backup() + local_storage.working_dir.delete_metadata(dataset_name) + local_storage.input_dir.delete_archived_importable(dataset_name) except Exception as e: logger.error(f"{job_id}: An unexpected error occured") logger.exception(f"{job_id}: {str(e)}", exc_info=e) @@ -304,6 +327,7 @@ def remove( Remove a released dataset that has been released in a previous version from future versions of the datastore. """ + local_storage = LocalStorageAdapter(Path(environment.datastore_dir)) logger.info(f"{job_id}: initiated") datastore_api.update_job_status(job_id, JobStatus.INITIATED) dataset_release_status = _get_release_status(datastore, dataset_name) @@ -313,7 +337,9 @@ def remove( ) if dataset_is_draft and dataset_operation == "REMOVE": datastore.metadata_all_draft.remove(dataset_name) - local_storage.write_metadata_all_draft(datastore.metadata_all_draft) + local_storage.datastore_dir.write_metadata_all_draft( + datastore.metadata_all_draft + ) log_message = "Dataset already in draft with operation REMOVE." logger.info(f"{job_id}: {log_message}") datastore_api.update_job_status( @@ -328,7 +354,9 @@ def remove( datastore_api.update_job_status(job_id, JobStatus.FAILED, log_message) else: datastore.metadata_all_draft.remove(dataset_name) - local_storage.write_metadata_all_draft(datastore.metadata_all_draft) + local_storage.datastore_dir.write_metadata_all_draft( + datastore.metadata_all_draft + ) datastore.draft_version.add( DataStructureUpdate( name=dataset_name, @@ -337,7 +365,7 @@ def remove( release_status="PENDING_DELETE", ) ) - local_storage.write_draft_version(datastore.draft_version) + local_storage.datastore_dir.write_draft_version(datastore.draft_version) datastore_api.update_job_status(job_id, JobStatus.COMPLETED) logger.info(f"{job_id}: completed") @@ -349,6 +377,7 @@ def delete_draft( Delete a dataset from the draft version of the datastore. """ logger.info(f"{job_id}: initiated") + local_storage = LocalStorageAdapter(Path(environment.datastore_dir)) datastore_api.update_job_status(job_id, JobStatus.INITIATED) dataset_is_draft = datastore.draft_version.contains(dataset_name) dataset_operation = datastore.draft_version.get_dataset_operation( @@ -381,14 +410,18 @@ def delete_draft( raise VersioningException(log_message) datastore.metadata_all_draft.remove(dataset_name) datastore.metadata_all_draft.add(released_metadata) - local_storage.write_metadata_all_draft(datastore.metadata_all_draft) + local_storage.datastore_dir.write_metadata_all_draft( + datastore.metadata_all_draft + ) if dataset_operation == "ADD": datastore.metadata_all_draft.remove(dataset_name) - local_storage.write_metadata_all_draft(datastore.metadata_all_draft) + local_storage.datastore_dir.write_metadata_all_draft( + datastore.metadata_all_draft + ) if dataset_operation in ["ADD", "CHANGE"]: - local_storage.delete_parquet_draft(dataset_name) + local_storage.datastore_dir.delete_parquet_draft(dataset_name) datastore.draft_version.delete_draft(dataset_name) - local_storage.write_draft_version(datastore.draft_version) + local_storage.datastore_dir.write_draft_version(datastore.draft_version) datastore_api.update_job_status(job_id, JobStatus.COMPLETED) @@ -398,13 +431,14 @@ def set_draft_release_status( """ Set a new release status for a dataset in the draft version. """ + local_storage = LocalStorageAdapter(Path(environment.datastore_dir)) try: logger.info(f"{job_id}: initiated") datastore_api.update_job_status(job_id, JobStatus.INITIATED) datastore.draft_version.set_draft_release_status( dataset_name, new_status ) - local_storage.write_draft_version(datastore.draft_version) + local_storage.datastore_dir.write_draft_version(datastore.draft_version) datastore_api.update_job_status(job_id, JobStatus.COMPLETED) logger.info(f"{job_id}: completed") except UnnecessaryUpdateException as e: @@ -426,8 +460,9 @@ def bump_version( Release a new version of the datastore with the pending operations in the draft version of the datastore. """ + local_storage = LocalStorageAdapter(Path(environment.datastore_dir)) logger.info(f"{job_id}: Saving temporary backup") - local_storage.save_temporary_backup() + local_storage.datastore_dir.save_temporary_backup() try: logger.info(f"{job_id}: initiated") @@ -443,24 +478,26 @@ def bump_version( job_id, JobStatus.FAILED, log_message ) logger.info(f"{job_id}: Archiving temporary backup") - local_storage.archive_temporary_backup() + local_storage.datastore_dir.archive_temporary_backup() return logger.info(f"{job_id}: Archiving draft version") - local_storage.archive_draft_version( + local_storage.datastore_dir.archive_draft_version( datastore.latest_version_number or "0.0.0.0" ) logger.info(f"{job_id}: Release pending operations from draft_version") release_updates, update_type = datastore.draft_version.release_pending() - local_storage.write_draft_version(datastore.draft_version) + local_storage.datastore_dir.write_draft_version(datastore.draft_version) # If there are no released versions update type is MAJOR if datastore.metadata_all_latest is None: update_type = "MAJOR" new_version = datastore.datastore_versions.add_new_release_version( release_updates, description, update_type ) - local_storage.write_datastore_versions(datastore.datastore_versions) + local_storage.datastore_dir.write_datastore_versions( + datastore.datastore_versions + ) logger.info( f"{job_id}: " f"Bumping from {datastore.latest_version_number} => {new_version}" @@ -474,7 +511,9 @@ def bump_version( ) if update_type in ["MINOR", "MAJOR"]: logger.info(f"{job_id}: Writing new data_versions to file") - local_storage.write_data_versions(new_data_versions, new_version) + local_storage.datastore_dir.write_data_versions( + new_data_versions, new_version + ) logger.info(f"{job_id}: Writing new metadata_all to file") _generate_new_metadata_all( @@ -488,11 +527,13 @@ def bump_version( datastore.metadata_all_latest.data_structures, datastore.draft_version, ) - local_storage.write_metadata_all_draft(datastore.metadata_all_draft) + local_storage.datastore_dir.write_metadata_all_draft( + datastore.metadata_all_draft + ) logger.info(f"{job_id}: completed BUMP") datastore_api.update_job_status(job_id, JobStatus.COMPLETED) logger.info(f"{job_id}: Archiving temporary backup") - local_storage.archive_temporary_backup() + local_storage.datastore_dir.archive_temporary_backup() except Exception as e: logger.error(f"{job_id}: An unexpected error occured") logger.exception(f"{job_id}: {str(e)}", exc_info=e) @@ -507,10 +548,11 @@ def delete_archived_input(job_id: str, dataset_name: str) -> None: """ Delete the archived dataset from archive directory. """ + local_storage = LocalStorageAdapter(Path(environment.datastore_dir)) try: logger.info(f"{job_id}: initiated") datastore_api.update_job_status(job_id, JobStatus.INITIATED) - local_storage.delete_archived_input(dataset_name) + local_storage.input_dir.delete_archived_importable(dataset_name) datastore_api.update_job_status(job_id, JobStatus.COMPLETED) except Exception as e: logger.error(f"{job_id}: An unexpected error occured") diff --git a/job_executor/domain/rollback.py b/job_executor/domain/rollback.py index 4ff06d00..861b70b8 100644 --- a/job_executor/domain/rollback.py +++ b/job_executor/domain/rollback.py @@ -3,10 +3,10 @@ import shutil from pathlib import Path -from job_executor.adapter import datastore_api, local_storage +from job_executor.adapter import datastore_api from job_executor.adapter.datastore_api.models import Job, JobStatus -from job_executor.adapter.local_storage import WORKING_DIR -from job_executor.adapter.local_storage.models.datastore_versions import ( +from job_executor.adapter.fs import LocalStorageAdapter +from job_executor.adapter.fs.models.datastore_versions import ( bump_dotted_version_number, dotted_to_underscored_version, underscored_to_dotted_version, @@ -16,15 +16,19 @@ RollbackException, StartupException, ) +from job_executor.config import environment -WORKING_DIR_PATH = Path(WORKING_DIR) +WORKING_DIR_PATH = Path(environment.datastore_dir + "_working") logger = logging.getLogger() def rollback_bump(job_id: str, bump_manifesto: dict) -> None: + local_storage = LocalStorageAdapter(Path(environment.datastore_dir)) try: logger.info(f"{job_id}: Restoring files from temporary backup") - restored_version_number = local_storage.restore_from_temporary_backup() + restored_version_number = ( + local_storage.datastore_dir.restore_from_temporary_backup() + ) update_type = bump_manifesto["updateType"] bumped_version_number = ( "1.0.0.0" @@ -53,8 +57,7 @@ def rollback_bump(job_id: str, bump_manifesto: dict) -> None: ) logger.info(f"{job_id}: Removing generated datastore files") - datastore_dir = Path(local_storage.DATASTORE_DIR) - datastore_info_dir = datastore_dir / "datastore" + datastore_info_dir = local_storage.datastore_dir.metadata_dir # No new data version has been built if update type was PATCH if update_type in ["MAJOR", "MINOR"]: @@ -84,7 +87,9 @@ def rollback_bump(job_id: str, bump_manifesto: dict) -> None: f"{job_id}: Update type is {update_type}. " f"Reverting {dataset} data file to DRAFT" ) - dataset_data_dir: Path = datastore_dir / "data" / dataset + dataset_data_dir: Path = ( + local_storage.datastore_dir.data_dir / dataset + ) partitioned_data_path: Path = ( dataset_data_dir / f"{dataset}__{bumped_version_data}" ) @@ -111,7 +116,7 @@ def rollback_bump(job_id: str, bump_manifesto: dict) -> None: dataset_data_dir / f"{dataset}__DRAFT.parquet", ) logger.info(f"{job_id}: Deleting temporary backup") - local_storage.archive_temporary_backup() + local_storage.datastore_dir.archive_temporary_backup() except LocalStorageError as e: logger.error(f"{job_id}: LocalStorageError when rolling back job") logger.exception(e) @@ -124,6 +129,7 @@ def rollback_bump(job_id: str, bump_manifesto: dict) -> None: def rollback_worker_phase_import_job( job_id: str, operation: str, dataset_name: str ) -> None: + local_storage = LocalStorageAdapter(Path(environment.datastore_dir)) logger.warning( f"{job_id}: Rolling back worker job " f'with target: "{dataset_name}" and operation "{operation}"' @@ -141,7 +147,7 @@ def rollback_worker_phase_import_job( generated_data_directory = f"{dataset_name}__DRAFT" for file in generated_metadata_files: - filepath = WORKING_DIR_PATH / file + filepath = local_storage.working_dir.path / file if filepath.exists(): logger.info(f'{job_id}: Deleting metadata file "{filepath}"') os.remove(filepath) @@ -174,18 +180,19 @@ def rollback_manager_phase_import_job( Exceptions are not handled here on purpose. It is a catastrophic thing if a rollback fails. """ + local_storage = LocalStorageAdapter(Path(environment.datastore_dir)) logger.warning( f"{job_id}: Rolling back import job " f'with target: "{dataset_name}" and operation "{operation}"' ) logger.info(f"{job_id}: Restoring files from temporary backup") - local_storage.restore_from_temporary_backup() + local_storage.datastore_dir.restore_from_temporary_backup() if operation in ["ADD", "CHANGE"]: logger.info(f"{job_id}: Deleting data file/directory") - local_storage.delete_parquet_draft(dataset_name) + local_storage.datastore_dir.delete_parquet_draft(dataset_name) logger.info(f"{job_id}: Deleting temporary backup") - local_storage.archive_temporary_backup() + local_storage.datastore_dir.archive_temporary_backup() def fix_interrupted_jobs() -> None: diff --git a/job_executor/domain/worker/build_dataset_worker.py b/job_executor/domain/worker/build_dataset_worker.py index c38f1ee6..6eea54bf 100644 --- a/job_executor/domain/worker/build_dataset_worker.py +++ b/job_executor/domain/worker/build_dataset_worker.py @@ -4,8 +4,9 @@ from pathlib import Path from time import perf_counter -from job_executor.adapter import datastore_api, local_storage +from job_executor.adapter import datastore_api from job_executor.adapter.datastore_api.models import JobStatus +from job_executor.adapter.fs import LocalStorageAdapter from job_executor.common.exceptions import BuilderStepError, HttpResponseError from job_executor.config import environment from job_executor.config.log import configure_worker_logger @@ -17,21 +18,18 @@ dataset_validator, ) -WORKING_DIR = Path(environment.working_dir) +DATASTORE_DIR = Path(environment.datastore_dir) +WORKING_DIR = Path(environment.datastore_dir + "_working") def _clean_working_dir(dataset_name: str) -> None: - generated_files = [ - WORKING_DIR / f"{dataset_name}.json", - WORKING_DIR / f"{dataset_name}.parquet", - WORKING_DIR / f"{dataset_name}_pseudonymized.parquet", - WORKING_DIR / dataset_name, - ] - for file_path in generated_files: - if file_path.is_dir(): - local_storage.delete_working_dir_dir(file_path) - else: - local_storage.delete_working_dir_file(file_path) + local_storage = LocalStorageAdapter(DATASTORE_DIR) # TODO + local_storage.working_dir.delete_metadata(dataset_name) + local_storage.working_dir.delete_file(f"{dataset_name}.parquet") + local_storage.working_dir.delete_file( + f"{dataset_name}_pseudonymized.parquet" + ) + local_storage.working_dir.delete_sub_directory(dataset_name) def _dataset_requires_pseudonymization(input_metadata: dict) -> bool: @@ -57,51 +55,60 @@ def run_worker(job_id: str, dataset_name: str, logging_queue: Queue) -> None: f"Starting dataset worker for dataset " f"{dataset_name} and job {job_id}" ) - - local_storage.archive_input_files(dataset_name) - + local_storage = LocalStorageAdapter(DATASTORE_DIR) # TODO + local_storage.input_dir.archive_importable(dataset_name) datastore_api.update_job_status(job_id, JobStatus.DECRYPTING) - dataset_decryptor.unpackage(dataset_name) - + dataset_decryptor.unpackage( + dataset_name, + local_storage.input_dir.path, + local_storage.working_dir.path, + local_storage.datastore_dir.vault_dir, + ) datastore_api.update_job_status(job_id, JobStatus.VALIDATING) - ( - data_path, - metadata_file_path, - ) = dataset_validator.run_for_dataset(dataset_name) - input_metadata = local_storage.get_working_dir_input_metadata( + (data_file_name, _) = dataset_validator.run_for_dataset( + dataset_name, local_storage.working_dir.path + ) + input_metadata = local_storage.working_dir.get_input_metadata( dataset_name ) description = input_metadata["dataRevision"]["description"][0]["value"] datastore_api.update_description(job_id, description) - local_storage.delete_working_dir_dir(WORKING_DIR / f"{dataset_name}") + local_storage.working_dir.delete_sub_directory(dataset_name) datastore_api.update_job_status(job_id, JobStatus.TRANSFORMING) transformed_metadata = dataset_transformer.run(input_metadata) - local_storage.write_working_dir_metadata( + local_storage.working_dir.write_metadata( dataset_name, transformed_metadata ) - local_storage.delete_working_dir_file(metadata_file_path) + local_storage.working_dir.delete_input_metadata(dataset_name) temporality_type = transformed_metadata.temporality if _dataset_requires_pseudonymization(input_metadata): datastore_api.update_job_status(job_id, JobStatus.PSEUDONYMIZING) - pre_pseudonymized_data_path = data_path - data_path = dataset_pseudonymizer.run( - data_path, transformed_metadata, job_id + pre_pseudo_data_file_name = data_file_name + data_file_name = dataset_pseudonymizer.run( + local_storage.working_dir.path / data_file_name, + transformed_metadata, + job_id, ) - local_storage.delete_working_dir_file(pre_pseudonymized_data_path) + local_storage.working_dir.delete_file(pre_pseudo_data_file_name) datastore_api.update_job_status(job_id, JobStatus.PARTITIONING) if temporality_type in ["STATUS", "ACCUMULATED"]: - dataset_partitioner.run(data_path, dataset_name) - local_storage.delete_working_dir_file(data_path) + dataset_partitioner.run( + local_storage.working_dir.path / data_file_name, dataset_name + ) + local_storage.working_dir.delete_file(data_file_name) else: - target_path = os.path.join( - os.path.dirname(data_path), - f"{dataset_name}__DRAFT.parquet", + target_path = ( + local_storage.working_dir.path + / f"{dataset_name}__DRAFT.parquet" + ) + os.rename( + local_storage.working_dir.path / data_file_name, + target_path, ) - os.rename(data_path, target_path) - local_storage.delete_archived_input(dataset_name) + local_storage.input_dir.delete_archived_importable(dataset_name) datastore_api.update_job_status(job_id, JobStatus.BUILT) logger.info("Dataset built successfully") except BuilderStepError as e: diff --git a/job_executor/domain/worker/build_metadata_worker.py b/job_executor/domain/worker/build_metadata_worker.py index ffd27f98..7616d9d7 100644 --- a/job_executor/domain/worker/build_metadata_worker.py +++ b/job_executor/domain/worker/build_metadata_worker.py @@ -3,8 +3,9 @@ from pathlib import Path from time import perf_counter -from job_executor.adapter import datastore_api, local_storage +from job_executor.adapter import datastore_api from job_executor.adapter.datastore_api.models import JobStatus +from job_executor.adapter.fs import LocalStorageAdapter from job_executor.common.exceptions import BuilderStepError, HttpResponseError from job_executor.config import environment from job_executor.config.log import configure_worker_logger @@ -14,19 +15,14 @@ dataset_validator, ) -WORKING_DIR = Path(environment.working_dir) +DATASTORE_DIR = Path(environment.datastore_dir) def _clean_working_dir(dataset_name: str) -> None: - generated_files = [ - WORKING_DIR / f"{dataset_name}.json", - WORKING_DIR / dataset_name, - ] - for file_path in generated_files: - if file_path.is_dir(): - local_storage.delete_working_dir_dir(file_path) - else: - local_storage.delete_working_dir_file(file_path) + local_storage = LocalStorageAdapter(DATASTORE_DIR) # TODO + local_storage.working_dir.delete_metadata(dataset_name) + local_storage.working_dir.delete_sub_directory(dataset_name) + local_storage.working_dir.delete_file(dataset_name) def run_worker(job_id: str, dataset_name: str, logging_queue: Queue) -> None: @@ -39,28 +35,38 @@ def run_worker(job_id: str, dataset_name: str, logging_queue: Queue) -> None: f"Starting metadata worker for dataset " f"{dataset_name} and job {job_id}" ) - - local_storage.archive_input_files(dataset_name) - + local_storage = LocalStorageAdapter( + DATASTORE_DIR + ) # TODO: get this info from manager + local_storage.input_dir.archive_importable(dataset_name) datastore_api.update_job_status(job_id, JobStatus.DECRYPTING) - dataset_decryptor.unpackage(dataset_name) - + dataset_decryptor.unpackage( + dataset_name, + local_storage.input_dir.path, + local_storage.working_dir.path, + local_storage.datastore_dir.vault_dir, + ) datastore_api.update_job_status(job_id, JobStatus.VALIDATING) - metadata_file_path = dataset_validator.run_for_metadata(dataset_name) - input_metadata = local_storage.get_working_dir_input_metadata( + dataset_validator.run_for_metadata( + dataset_name, + local_storage.working_dir.path, + ) + input_metadata = local_storage.working_dir.get_input_metadata( dataset_name ) + description = input_metadata["dataRevision"]["description"][0]["value"] datastore_api.update_description(job_id, description) - local_storage.delete_working_dir_dir(WORKING_DIR / f"{dataset_name}") + local_storage.working_dir.delete_sub_directory(dataset_name) datastore_api.update_job_status(job_id, JobStatus.TRANSFORMING) transformed_metadata_json = dataset_transformer.run(input_metadata) - local_storage.write_working_dir_metadata( + local_storage.working_dir.write_metadata( dataset_name, transformed_metadata_json ) - local_storage.delete_working_dir_file(metadata_file_path) - local_storage.delete_archived_input(dataset_name) + + local_storage.working_dir.delete_input_metadata(dataset_name) + local_storage.input_dir.delete_archived_importable(dataset_name) datastore_api.update_job_status(job_id, JobStatus.BUILT) except BuilderStepError as e: error_message = "Failed during building metdata" diff --git a/job_executor/domain/worker/steps/dataset_decryptor.py b/job_executor/domain/worker/steps/dataset_decryptor.py index 2ecad368..2cf20620 100644 --- a/job_executor/domain/worker/steps/dataset_decryptor.py +++ b/job_executor/domain/worker/steps/dataset_decryptor.py @@ -2,16 +2,16 @@ from microdata_tools import unpackage_dataset -from job_executor.adapter.local_storage import INPUT_DIR, WORKING_DIR -from job_executor.config import environment -RSA_KEYS_DIRECTORY = Path(environment.rsa_keys_directory) - - -def unpackage(dataset_name: str) -> None: - file_path = Path(f"{INPUT_DIR}/archive/{dataset_name}.tar") +def unpackage( + dataset_name: str, + input_directory_path: Path, + working_directory_path: Path, + rsa_keys_directory: Path, +) -> None: + file_path = Path(input_directory_path / "archive" / f"{dataset_name}.tar") unpackage_dataset( packaged_file_path=file_path, - rsa_keys_dir=RSA_KEYS_DIRECTORY, - output_dir=WORKING_DIR, + rsa_keys_dir=rsa_keys_directory, + output_dir=working_directory_path, ) diff --git a/job_executor/domain/worker/steps/dataset_pseudonymizer.py b/job_executor/domain/worker/steps/dataset_pseudonymizer.py index 85917524..3cc39c7e 100644 --- a/job_executor/domain/worker/steps/dataset_pseudonymizer.py +++ b/job_executor/domain/worker/steps/dataset_pseudonymizer.py @@ -8,7 +8,7 @@ from pyarrow import compute, dataset, parquet from job_executor.adapter import pseudonym_service -from job_executor.adapter.local_storage.models.metadata import Metadata +from job_executor.adapter.fs.models.metadata import Metadata from job_executor.common.exceptions import BuilderStepError logger = logging.getLogger() @@ -134,7 +134,7 @@ def _pseudonymize( return pseudonymized_table -def run(input_parquet_path: Path, metadata: Metadata, job_id: str) -> Path: +def run(input_parquet_path: Path, metadata: Metadata, job_id: str) -> str: """ Pseudonymizes the identifier & measure column of the dataset if. @@ -172,15 +172,13 @@ def run(input_parquet_path: Path, metadata: Metadata, job_id: str) -> Path: measure_unit_id_type, job_id, ) - output_path = ( - input_parquet_path.parent - / f"{input_parquet_path.stem}_pseudonymized.parquet" - ) + output_file_name = f"{input_parquet_path.stem}_pseudonymized.parquet" + output_path = input_parquet_path.parent / output_file_name parquet.write_table(pseudonymized_table, output_path) logger.info(f"Pseudonymization step done {output_path}") - return output_path + return output_file_name except UnregisteredUnitTypeError as e: raise BuilderStepError( f"Failed to pseudonymize, UnregisteredUnitType: {str(e)}" diff --git a/job_executor/domain/worker/steps/dataset_transformer.py b/job_executor/domain/worker/steps/dataset_transformer.py index fe0a676e..4ab90be1 100644 --- a/job_executor/domain/worker/steps/dataset_transformer.py +++ b/job_executor/domain/worker/steps/dataset_transformer.py @@ -1,7 +1,7 @@ import logging from datetime import datetime, timezone -from job_executor.adapter.local_storage.models.metadata import ( +from job_executor.adapter.fs.models.metadata import ( DATA_TYPES_MAPPING, Metadata, ) diff --git a/job_executor/domain/worker/steps/dataset_validator.py b/job_executor/domain/worker/steps/dataset_validator.py index 630f2f7a..c983c8d3 100644 --- a/job_executor/domain/worker/steps/dataset_validator.py +++ b/job_executor/domain/worker/steps/dataset_validator.py @@ -4,25 +4,25 @@ from microdata_tools import validate_dataset, validate_metadata from job_executor.common.exceptions import BuilderStepError -from job_executor.config import environment logger = logging.getLogger() -WORKING_DIR = Path(environment.working_dir) -def run_for_dataset(dataset_name: str) -> tuple[Path, Path]: +def run_for_dataset( + dataset_name: str, working_directory: Path +) -> tuple[str, str]: """ Validates the data and metadata file in the working_directory using the microdata-tools. - Returns path to validated data and metadata in working directory. + Returns file name of validated data and metadata in working directory. """ validation_errors = [] try: validation_errors = validate_dataset( dataset_name, - input_directory=str(WORKING_DIR), - working_directory=str(WORKING_DIR), + input_directory=str(working_directory), + working_directory=str(working_directory), keep_temporary_files=True, ) @@ -41,24 +41,24 @@ def run_for_dataset(dataset_name: str) -> tuple[Path, Path]: ) return ( - WORKING_DIR / f"{dataset_name}.parquet", - WORKING_DIR / f"{dataset_name}.json", + f"{dataset_name}.parquet", + f"{dataset_name}.json", ) -def run_for_metadata(dataset_name: str) -> Path: +def run_for_metadata(dataset_name: str, working_directory: Path) -> str: """ Validates the metadata in the given file with the microdata-tools schema and moves file to working directory. - Returns path to validated metadata in working directory. + Returns file name of the validated metadata in working directory. """ validation_errors = [] try: validation_errors = validate_metadata( dataset_name, - input_directory=str(WORKING_DIR), - working_directory=str(WORKING_DIR), + input_directory=str(working_directory), + working_directory=str(working_directory), keep_temporary_files=True, ) @@ -75,4 +75,4 @@ def run_for_metadata(dataset_name: str) -> Path: "uploading. Remember to update to the latest version of " "microdata-tools. " ) - return WORKING_DIR / f"{dataset_name}.json" + return f"{dataset_name}.json" diff --git a/tests/resources/datastores/TEST_DATASTORE/vault/.gitkeep b/tests/resources/datastores/TEST_DATASTORE/vault/.gitkeep new file mode 100644 index 00000000..e69de29b diff --git a/tests/unit/adapter/test_local_storage.py b/tests/unit/adapter/test_local_storage.py index 2d9d95c3..ca0f4857 100644 --- a/tests/unit/adapter/test_local_storage.py +++ b/tests/unit/adapter/test_local_storage.py @@ -5,20 +5,22 @@ import pytest -from job_executor.adapter import local_storage -from job_executor.adapter.local_storage.models.datastore_versions import ( +from job_executor.adapter.fs import LocalStorageAdapter +from job_executor.adapter.fs.models.datastore_versions import ( DatastoreVersions, DraftVersion, ) -from job_executor.adapter.local_storage.models.metadata import ( +from job_executor.adapter.fs.models.metadata import ( MetadataAll, ) from job_executor.common.exceptions import LocalStorageError -WORKING_DIR = os.environ["WORKING_DIR"] DATASTORE_DIR = os.environ["DATASTORE_DIR"] +WORKING_DIR = DATASTORE_DIR + "_working" DATASTORE_DATA_DIR = f"{DATASTORE_DIR}/data" +local_storage = LocalStorageAdapter(Path(DATASTORE_DIR)) + DATASTORE_VERSIONS_PATH = f"{DATASTORE_DIR}/datastore/datastore_versions.json" DRAFT_METADATA_ALL_PATH = f"{DATASTORE_DIR}/datastore/metadata_all__draft.json" DRAFT_VERSION_PATH = f"{DATASTORE_DIR}/datastore/draft_version.json" @@ -56,61 +58,75 @@ def read_json(file_path: str) -> dict: def test_make_dataset_dir(): - local_storage.make_dataset_dir(WORKING_DIR_DATASET) + local_storage.datastore_dir.make_dataset_dir(WORKING_DIR_DATASET) assert os.path.isdir(f"{DATASTORE_DATA_DIR}/{WORKING_DIR_DATASET}") def test_get_data_versions(): - assert local_storage.get_data_versions("1_0_0") == read_json( + assert local_storage.datastore_dir.get_data_versions("1_0_0") == read_json( DATA_VERSIONS_PATH ) def test_write_data_versions(): - local_storage.write_data_versions({}, "1_0_0") + local_storage.datastore_dir.write_data_versions({}, "1_0_0") assert read_json(DATA_VERSIONS_PATH) == {} def test_get_draft_version(): - assert isinstance(local_storage.get_draft_version(), DraftVersion) + assert isinstance( + local_storage.datastore_dir.get_draft_version(), DraftVersion + ) def test_write_draft_version(): - draft_version = local_storage.get_draft_version() + draft_version = local_storage.datastore_dir.get_draft_version() draft_version.description = "updated" - local_storage.write_draft_version(draft_version) - assert local_storage.get_draft_version().description == "updated" + local_storage.datastore_dir.write_draft_version(draft_version) + assert ( + local_storage.datastore_dir.get_draft_version().description == "updated" + ) def test_get_datastore_versions(): - assert isinstance(local_storage.get_datastore_versions(), DatastoreVersions) + assert isinstance( + local_storage.datastore_dir.get_datastore_versions(), DatastoreVersions + ) def test_write_datastore_versions(): - datastore_versions = local_storage.get_datastore_versions() + datastore_versions = local_storage.datastore_dir.get_datastore_versions() datastore_versions.description = "updated" - local_storage.write_datastore_versions(datastore_versions) - assert local_storage.get_datastore_versions().description == "updated" + local_storage.datastore_dir.write_datastore_versions(datastore_versions) + assert ( + local_storage.datastore_dir.get_datastore_versions().description + == "updated" + ) def test_get_metadata_all(): - assert isinstance(local_storage.get_metadata_all("1_0_0"), MetadataAll) + assert isinstance( + local_storage.datastore_dir.get_metadata_all("1_0_0"), MetadataAll + ) def test_write_metadata_all(): - metadata_all = local_storage.get_metadata_all("1_0_0") + metadata_all = local_storage.datastore_dir.get_metadata_all("1_0_0") metadata_all.data_structures = [] - local_storage.write_metadata_all(metadata_all, "1_0_0") - assert local_storage.get_metadata_all("1_0_0").data_structures == [] + local_storage.datastore_dir.write_metadata_all(metadata_all, "1_0_0") + assert ( + local_storage.datastore_dir.get_metadata_all("1_0_0").data_structures + == [] + ) def delete_parquet_draft(): - local_storage.delete_parquet_draft(DRAFT_DATASET_NAME) + local_storage.datastore_dir.delete_parquet_draft(DRAFT_DATASET_NAME) assert not os.path.isfile(DRAFT_DATA_PATH) def test_rename_parquet_draft_to_release(): - release_path = local_storage.rename_parquet_draft_to_release( + release_path = local_storage.datastore_dir.rename_parquet_draft_to_release( DRAFT2_DATASET_NAME, "1_1_0" ) assert os.path.isdir(RELEASED_DRAFT2_DATA_PATH) @@ -118,17 +134,17 @@ def test_rename_parquet_draft_to_release(): def test_move_working_dir_parquet_to_datastore(): - local_storage.make_dataset_dir(WORKING_DIR_DATASET) + local_storage.datastore_dir.make_dataset_dir(WORKING_DIR_DATASET) local_storage.move_working_dir_parquet_to_datastore(WORKING_DIR_DATASET) assert os.path.isfile(MOVED_WORKING_DIR_DATASET_DATA_PATH) def test_make_temp_directory(): datastore_content = os.listdir(DATASTORE_DIR) - local_storage.save_temporary_backup() + local_storage.datastore_dir.save_temporary_backup() datastore_content_backup = os.listdir(DATASTORE_DIR) - assert len(datastore_content) == 2 - assert len(datastore_content_backup) == 3 + assert len(datastore_content) == 3 + assert len(datastore_content_backup) == 4 tmp_dir = Path(DATASTORE_DIR) / "tmp" assert os.path.isdir(tmp_dir) tmp_actual_content = os.listdir(tmp_dir) @@ -143,43 +159,43 @@ def test_make_temp_directory(): def test_make_temp_directory_already_exists(): - local_storage.save_temporary_backup() + local_storage.datastore_dir.save_temporary_backup() datastore_content = os.listdir(DATASTORE_DIR) assert "tmp" in datastore_content with pytest.raises(LocalStorageError) as e: - local_storage.save_temporary_backup() + local_storage.datastore_dir.save_temporary_backup() assert "tmp directory already exists" in str(e) def test_archive_temp_directory(): - local_storage.save_temporary_backup() + local_storage.datastore_dir.save_temporary_backup() datastore_content = os.listdir(DATASTORE_DIR) - local_storage.archive_temporary_backup() + local_storage.datastore_dir.archive_temporary_backup() datastore_content_archived = os.listdir(DATASTORE_DIR) - for dir in ["datastore", "data", "tmp"]: + for dir in ["datastore", "data", "tmp", "vault"]: assert dir in datastore_content - for dir in ["datastore", "data", "archive"]: + for dir in ["datastore", "data", "archive", "vault"]: assert dir in datastore_content_archived - assert len(datastore_content) == 3 - assert len(datastore_content_archived) == 3 + assert len(datastore_content) == 4 + assert len(datastore_content_archived) == 4 assert not os.path.isdir(Path(DATASTORE_DIR) / "tmp") def test_archived_temp_directory_unrecognized_files(): - local_storage.save_temporary_backup() + local_storage.datastore_dir.save_temporary_backup() tmp_dir = Path(DATASTORE_DIR) / "tmp" assert os.path.isdir(tmp_dir) (tmp_dir / "newfile.txt").touch() with pytest.raises(LocalStorageError) as e: - local_storage.archive_temporary_backup() + local_storage.datastore_dir.archive_temporary_backup() assert "Found unrecognized files" in str(e) def test_archive_or_delete_non_existent_tmp_dir(): with pytest.raises(LocalStorageError) as e: - local_storage.archive_temporary_backup() + local_storage.datastore_dir.archive_temporary_backup() assert "Could not find a tmp directory to archive." in str(e) with pytest.raises(LocalStorageError) as e: - local_storage.delete_temporary_backup() + local_storage.datastore_dir.delete_temporary_backup() assert "Could not find a tmp directory to delete." in str(e) diff --git a/tests/unit/config/test_environment.py b/tests/unit/config/test_environment.py index da60b8f4..26040d19 100644 --- a/tests/unit/config/test_environment.py +++ b/tests/unit/config/test_environment.py @@ -4,8 +4,6 @@ def test_config_from_environment(): - assert environment.input_dir == os.environ.get("INPUT_DIR") - assert environment.working_dir == os.environ.get("WORKING_DIR") assert environment.datastore_dir == os.environ.get("DATASTORE_DIR") assert environment.pseudonym_service_url == ( os.environ.get("PSEUDONYM_SERVICE_URL") diff --git a/tests/unit/domain/test_datastores.py b/tests/unit/domain/test_datastores.py index 84699662..e3ed31a4 100644 --- a/tests/unit/domain/test_datastores.py +++ b/tests/unit/domain/test_datastores.py @@ -6,7 +6,7 @@ from requests_mock import Mocker as RequestsMocker -from job_executor.adapter.local_storage.models.datastore_versions import ( +from job_executor.adapter.fs.models.datastore_versions import ( DatastoreVersion, ) from job_executor.domain import datastores @@ -17,7 +17,7 @@ DATASTORE_API_URL = os.getenv("DATASTORE_API_URL") JOB_ID = "123-123-123-123" DATASTORE_DIR = os.environ["DATASTORE_DIR"] -WORKING_DIR = os.environ["WORKING_DIR"] +WORKING_DIR = DATASTORE_DIR + "_working" DATASTORE_DATA_DIR = f"{DATASTORE_DIR}/data" DATASTORE_METADATA_DIR = f"{DATASTORE_DIR}/metadata" DATASTORE_INFO_DIR = f"{DATASTORE_DIR}/datastore" diff --git a/tests/unit/domain/test_empty_datastore.py b/tests/unit/domain/test_empty_datastore.py index 48326c81..4d214601 100644 --- a/tests/unit/domain/test_empty_datastore.py +++ b/tests/unit/domain/test_empty_datastore.py @@ -5,7 +5,7 @@ from requests_mock import Mocker as RequestsMocker -from job_executor.adapter.local_storage.models.datastore_versions import ( +from job_executor.adapter.fs.models.datastore_versions import ( DatastoreVersion, ) from job_executor.domain import datastores diff --git a/tests/unit/domain/test_rollback.py b/tests/unit/domain/test_rollback.py index 03597e7a..4552df85 100644 --- a/tests/unit/domain/test_rollback.py +++ b/tests/unit/domain/test_rollback.py @@ -3,9 +3,12 @@ import shutil from pathlib import Path -from job_executor.adapter.local_storage import DATASTORE_DIR, WORKING_DIR +from job_executor.config import environment from job_executor.domain import rollback +DATASTORE_DIR = environment.datastore_dir +WORKING_DIR = DATASTORE_DIR + "_working" + JOB_ID = "123-123-123-123" BUMP_MANIFESTO = { "version": "0.0.0.1635299291", diff --git a/tests/unit/model/test_data_structure_update.py b/tests/unit/model/test_data_structure_update.py index 9a8112f0..2c4073bc 100644 --- a/tests/unit/model/test_data_structure_update.py +++ b/tests/unit/model/test_data_structure_update.py @@ -1,7 +1,7 @@ import pytest from pydantic import ValidationError -from job_executor.adapter.local_storage.models.datastore_versions import ( +from job_executor.adapter.fs.models.datastore_versions import ( DataStructureUpdate, ) from job_executor.common.exceptions import ReleaseStatusException diff --git a/tests/unit/model/test_datastore_version.py b/tests/unit/model/test_datastore_version.py index a9e410f7..bf4f6801 100644 --- a/tests/unit/model/test_datastore_version.py +++ b/tests/unit/model/test_datastore_version.py @@ -1,11 +1,12 @@ import json import os import shutil +from pathlib import Path import pytest -from job_executor.adapter import local_storage -from job_executor.adapter.local_storage.models.datastore_versions import ( +from job_executor.adapter.fs import LocalStorageAdapter +from job_executor.adapter.fs.models.datastore_versions import ( DatastoreVersion, DataStructureUpdate, DraftVersion, @@ -15,6 +16,7 @@ ExistingDraftException, NoSuchDraftException, ) +from job_executor.config import environment def load_json(file_path): @@ -29,7 +31,7 @@ def load_json(file_path): DRAFT_VERSION_ADDED_PENDING = load_json( f"{TEST_DIR}/draft_version_added_pending.json" ) - +local_storage = LocalStorageAdapter(Path(environment.datastore_dir)) DATASTORE_DIR = f"{os.environ['DATASTORE_DIR']}/datastore" DRAFT_VERSION_PATH = f"{DATASTORE_DIR}/draft_version.json" DATASTORE_VERSION = { @@ -89,7 +91,7 @@ def test_get_dataset_release_status(): def test_draft_version(): draft_version = DraftVersion.model_validate( - local_storage.get_draft_version() + local_storage.datastore_dir.get_draft_version() ) assert draft_version.model_dump( by_alias=True, exclude_none=True @@ -98,12 +100,12 @@ def test_draft_version(): def test_draft_version_delete_draft(): draft_version = DraftVersion.model_validate( - local_storage.get_draft_version() + local_storage.datastore_dir.get_draft_version() ) release_time = draft_version.release_time version = draft_version.version draft_version.delete_draft("BRUTTO_INNTEKT") - local_storage.write_draft_version(draft_version) + local_storage.datastore_dir.write_draft_version(draft_version) draft_version_file = load_json(DRAFT_VERSION_PATH) update_names = [ update["name"] for update in draft_version_file["dataStructureUpdates"] @@ -120,7 +122,7 @@ def test_draft_version_delete_draft(): def test_add_draft_version_already_existing_dataset(): draft_version = DraftVersion.model_validate( - local_storage.get_draft_version() + local_storage.datastore_dir.get_draft_version() ) with pytest.raises(ExistingDraftException) as e: draft_version.add( @@ -136,7 +138,7 @@ def test_add_draft_version_already_existing_dataset(): def test_draft_version_validate_bump_manifesto(): draft_version = DraftVersion.model_validate( - local_storage.get_draft_version() + local_storage.datastore_dir.get_draft_version() ) bump_manifesto = DatastoreVersion(**DRAFT_VERSION_IDENTICAL) @@ -155,7 +157,7 @@ def test_draft_version_validate_bump_manifesto(): def test_draft_version_release_pending(): draft_version = DraftVersion.model_validate( - local_storage.get_draft_version() + local_storage.datastore_dir.get_draft_version() ) release_time = draft_version.release_time version = draft_version.version @@ -180,7 +182,7 @@ def test_draft_version_release_pending(): def test_set_draft_release_status(): draft_version = DraftVersion.model_validate( - local_storage.get_draft_version() + local_storage.datastore_dir.get_draft_version() ) release_time = draft_version.release_time version = draft_version.version diff --git a/tests/unit/model/test_datastore_versions.py b/tests/unit/model/test_datastore_versions.py index f7d9a2ac..c2505a1b 100644 --- a/tests/unit/model/test_datastore_versions.py +++ b/tests/unit/model/test_datastore_versions.py @@ -1,9 +1,10 @@ import json import os import shutil +from pathlib import Path -from job_executor.adapter import local_storage -from job_executor.adapter.local_storage.models.datastore_versions import ( +from job_executor.adapter.fs import LocalStorageAdapter +from job_executor.adapter.fs.models.datastore_versions import ( DatastoreVersions, DataStructureUpdate, ) @@ -13,8 +14,10 @@ def load_json(file_path): return json.load(open(file_path, encoding="utf")) -DATASTORE_DIR = f"{os.environ['DATASTORE_DIR']}/datastore" -DATASTORE_VERSIONS_PATH = f"{DATASTORE_DIR}/datastore_versions.json" +DATASTORE_DIR = f"{os.environ['DATASTORE_DIR']}" +METADATA_DIR = f"{os.environ['DATASTORE_DIR']}/datastore" +local_storage = LocalStorageAdapter(Path(DATASTORE_DIR)) +DATASTORE_VERSIONS_PATH = f"{METADATA_DIR}/datastore_versions.json" def setup_function(): @@ -31,7 +34,7 @@ def teardown_function(): def test_datastore_versions(): datastore_versions = DatastoreVersions.model_validate( - local_storage.get_datastore_versions() + local_storage.datastore_dir.get_datastore_versions() ) assert datastore_versions.model_dump( by_alias=True, exclude_none=True @@ -40,7 +43,7 @@ def test_datastore_versions(): def test_add_new_release_version(): datastore_versions = DatastoreVersions.model_validate( - local_storage.get_datastore_versions() + local_storage.datastore_dir.get_datastore_versions() ) datastore_versions.add_new_release_version( [ @@ -59,7 +62,7 @@ def test_add_new_release_version(): def test_get_dataset_release_status(): datastore_versions = DatastoreVersions.model_validate( - local_storage.get_datastore_versions() + local_storage.datastore_dir.get_datastore_versions() ) assert ( datastore_versions.get_dataset_release_status("SIVSTAND") == "RELEASED" diff --git a/tests/unit/model/test_metadata.py b/tests/unit/model/test_metadata.py index 763b7016..8b33e0a7 100644 --- a/tests/unit/model/test_metadata.py +++ b/tests/unit/model/test_metadata.py @@ -2,7 +2,7 @@ import os import shutil -from job_executor.adapter.local_storage.models.metadata import ( +from job_executor.adapter.fs.models.metadata import ( Metadata, MetadataAll, ) diff --git a/tests/unit/worker/steps/test_dataset_pseudonymizer.py b/tests/unit/worker/steps/test_dataset_pseudonymizer.py index fc1def94..c3c33bc0 100644 --- a/tests/unit/worker/steps/test_dataset_pseudonymizer.py +++ b/tests/unit/worker/steps/test_dataset_pseudonymizer.py @@ -8,7 +8,7 @@ from pyarrow import dataset, parquet from job_executor.adapter import pseudonym_service -from job_executor.adapter.local_storage.models.metadata import Metadata +from job_executor.adapter.fs.models.metadata import Metadata from job_executor.common.exceptions import BuilderStepError from job_executor.domain.worker.steps import dataset_pseudonymizer @@ -103,12 +103,11 @@ WORKING_DIR = Path("tests/resources/worker/steps/pseudonymizer") INPUT_PARQUET_PATH = WORKING_DIR / "input.parquet" -OUTPUT_PARQUET_PATH = WORKING_DIR / "input_pseudonymized.parquet" +OUTPUT_PARQUET_FILE_NAME = "input_pseudonymized.parquet" INPUT_PARQUET_PATH_START_YEAR = WORKING_DIR / "input_start_year.parquet" -OUTPUT_PARQUET_PATH_START_YEAR = ( - WORKING_DIR / "input_start_year_pseudonymized.parquet" -) +OUTPUT_PARQUET_FILE_START_YEAR = "input_start_year_pseudonymized.parquet" + JOB_ID = "123-123-123-123" PSEUDONYM_DICT = {f"i{count}": count for count in range(TABLE_SIZE)} @@ -130,11 +129,6 @@ PSEUDONYMIZE_ONLY_VALUE_METADATA = Metadata(**json.load(file)) -@pytest.fixture(autouse=True) -def set_working_dir(monkeypatch): - monkeypatch.setenv("WORKING_DIR", str(WORKING_DIR)) - - def setup_function(): if os.path.isdir(f"{WORKING_DIR}_backup"): shutil.rmtree(f"{WORKING_DIR}_backup") @@ -156,9 +150,11 @@ def test_pseudonymizer(mocker): ) assert str( dataset_pseudonymizer.run(INPUT_PARQUET_PATH, METADATA, JOB_ID) - ) == str(OUTPUT_PARQUET_PATH) + ) == str(OUTPUT_PARQUET_FILE_NAME) - actual_table = dataset.dataset(OUTPUT_PARQUET_PATH).to_table() + actual_table = dataset.dataset( + WORKING_DIR / OUTPUT_PARQUET_FILE_NAME + ).to_table() _validate_content(actual_table, EXPECTED_TABLE) expected_types = { @@ -169,7 +165,9 @@ def test_pseudonymizer(mocker): } # Checking the parquet schema is what we expect - _verify_parquet_schema(OUTPUT_PARQUET_PATH, expected_types) + _verify_parquet_schema( + WORKING_DIR / OUTPUT_PARQUET_FILE_NAME, expected_types + ) def test_pseudonymizer_unit_id_and_value(mocker): @@ -178,12 +176,14 @@ def test_pseudonymizer_unit_id_and_value(mocker): ) # Pseudonymize - pseudonymized_output_path = dataset_pseudonymizer.run( + pseudonymized_output_file = dataset_pseudonymizer.run( INPUT_PARQUET_PATH, PSEUDONYMIZE_UNIT_ID_AND_VALUE_METADATA, JOB_ID, ) - actual_table = dataset.dataset(pseudonymized_output_path).to_table() + actual_table = dataset.dataset( + WORKING_DIR / pseudonymized_output_file + ).to_table() _validate_content(actual_table, EXPECTED_TABLE_WITH_BOTH_PSEUDONYMIZED) expected_types = { @@ -194,7 +194,9 @@ def test_pseudonymizer_unit_id_and_value(mocker): } # Checking the parquet schema is what we expect - _verify_parquet_schema(OUTPUT_PARQUET_PATH, expected_types) + _verify_parquet_schema( + WORKING_DIR / OUTPUT_PARQUET_FILE_NAME, expected_types + ) def test_pseudonymizer_only_value(mocker): @@ -203,12 +205,14 @@ def test_pseudonymizer_only_value(mocker): ) # Pseudonymize - pseudonymized_output_path = dataset_pseudonymizer.run( + pseudonymized_output_file = dataset_pseudonymizer.run( INPUT_PARQUET_PATH, PSEUDONYMIZE_ONLY_VALUE_METADATA, JOB_ID, ) - actual_table = dataset.dataset(pseudonymized_output_path).to_table() + actual_table = dataset.dataset( + WORKING_DIR / pseudonymized_output_file + ).to_table() _validate_content( actual_table, EXPECTED_TABLE_WITH_ONLY_VALUE_PSEUDONYMIZED ) @@ -221,7 +225,9 @@ def test_pseudonymizer_only_value(mocker): } # Checking the parquet schema is what we expect - _verify_parquet_schema(OUTPUT_PARQUET_PATH, expected_types) + _verify_parquet_schema( + WORKING_DIR / OUTPUT_PARQUET_FILE_NAME, expected_types + ) def test_pseudonymizer_start_year(mocker): @@ -232,9 +238,11 @@ def test_pseudonymizer_start_year(mocker): dataset_pseudonymizer.run( INPUT_PARQUET_PATH_START_YEAR, METADATA, JOB_ID ) - ) == str(OUTPUT_PARQUET_PATH_START_YEAR) + ) == str(OUTPUT_PARQUET_FILE_START_YEAR) - actual_table = dataset.dataset(OUTPUT_PARQUET_PATH_START_YEAR).to_table() + actual_table = dataset.dataset( + WORKING_DIR / OUTPUT_PARQUET_FILE_START_YEAR + ).to_table() _validate_content(actual_table, EXPECTED_TABLE_START_YEAR) expected_types = { @@ -246,7 +254,9 @@ def test_pseudonymizer_start_year(mocker): } # Checking the parquet schema is what we expect - _verify_parquet_schema(OUTPUT_PARQUET_PATH_START_YEAR, expected_types) + _verify_parquet_schema( + WORKING_DIR / OUTPUT_PARQUET_FILE_START_YEAR, expected_types + ) def test_pseudonymizer_adapter_failure(): diff --git a/tests/unit/worker/test_build_dataset_worker.py b/tests/unit/worker/test_build_dataset_worker.py index 04b61d0b..20a1c107 100644 --- a/tests/unit/worker/test_build_dataset_worker.py +++ b/tests/unit/worker/test_build_dataset_worker.py @@ -9,11 +9,10 @@ from microdata_tools import package_dataset from requests_mock import Mocker as RequestsMocker -from job_executor.adapter.local_storage import INPUT_DIR from job_executor.config import environment from job_executor.domain.worker.build_dataset_worker import run_worker -RSA_KEYS_DIRECTORY = Path(environment.rsa_keys_directory) +RSA_KEYS_DIRECTORY = Path(environment.datastore_dir) / "vault" PARTITIONED_DATASET_NAME = "INNTEKT" @@ -21,7 +20,9 @@ NO_PSEUDONYM_DATASET_NAME = "KOMMUNE_FOLKETALL" NO_PSEUDONYM_FIXED_DATASET_NAME = "KOMMUNE_HOYESTE_PUNKT" JOB_ID = "1234-1234-1234-1234" -WORKING_DIR = os.environ["WORKING_DIR"] +DATASTORE_DIR = os.environ["DATASTORE_DIR"] +WORKING_DIR = DATASTORE_DIR + "_working" +INPUT_DIR = DATASTORE_DIR + "_input" INPUT_DIR_ARCHIVE = f"{INPUT_DIR}/archive" DATASTORE_API_URL = os.environ["DATASTORE_API_URL"] PSEUDONYM_SERVICE_URL = os.environ["PSEUDONYM_SERVICE_URL"] @@ -412,14 +413,11 @@ def _create_rsa_public_key(target_dir: Path): private_key = rsa.generate_private_key( public_exponent=65537, key_size=2048, backend=default_backend() ) - public_key = private_key.public_key() - microdata_public_key_pem = public_key.public_bytes( encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo, ) - public_key_location = target_dir / "microdata_public_key.pem" with open(public_key_location, "wb") as file: file.write(microdata_public_key_pem) diff --git a/tests/unit/worker/test_build_metadata_worker.py b/tests/unit/worker/test_build_metadata_worker.py index cf7302d5..5ba3f89d 100644 --- a/tests/unit/worker/test_build_metadata_worker.py +++ b/tests/unit/worker/test_build_metadata_worker.py @@ -7,16 +7,17 @@ from microdata_tools import package_dataset from requests_mock import Mocker as RequestsMocker -from job_executor.adapter.local_storage import INPUT_DIR from job_executor.config import environment from job_executor.domain.worker.build_metadata_worker import run_worker from tests.unit.worker.test_build_dataset_worker import _create_rsa_public_key -RSA_KEYS_DIRECTORY = Path(environment.rsa_keys_directory) +RSA_KEYS_DIRECTORY = Path(environment.datastore_dir) / "vault" DATASET_NAME = "KJOENN" JOB_ID = "1234-1234-1234-1234" -WORKING_DIR = os.environ["WORKING_DIR"] +DATASTORE_DIR = os.environ["DATASTORE_DIR"] +WORKING_DIR = DATASTORE_DIR + "_working" +INPUT_DIR = DATASTORE_DIR + "_input" INPUT_DIR_ARCHIVE = f"{INPUT_DIR}/archive" EXPECTED_DIR = "tests/resources/worker/build_metadata/expected" DATASTORE_API_URL = os.environ["DATASTORE_API_URL"] @@ -73,7 +74,6 @@ def test_import(requests_mock: RequestsMocker): requests_mock.put( f"{DATASTORE_API_URL}/jobs/{JOB_ID}", json={"message": "OK"} ) - run_worker(JOB_ID, DATASET_NAME, Queue()) with open( f"{WORKING_DIR}/{DATASET_NAME}__DRAFT.json", "r", encoding="utf-8"