Skip to content
181 changes: 181 additions & 0 deletions tests/unit/adapter/test_datastore_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
import os

import pytest
from requests_mock import Mocker as RequestsMocker

from job_executor.adapter import datastore_api
from job_executor.adapter.datastore_api.models import (
Job,
JobParameters,
JobStatus,
Operation,
ReleaseStatus,
UserInfo,
)
from job_executor.common.exceptions import HttpResponseError

DATASTORE_API_URL = os.environ["DATASTORE_API_URL"]
DATASTORE_RDN = os.environ["DATASTORE_RDN"]
JOB_ID = "123"
JOB_LIST = [
Job(
job_id=JOB_ID,
datastore_rdn=DATASTORE_RDN,
status=JobStatus.QUEUED,
parameters=JobParameters(target="INNTEKT", operation=Operation.CHANGE),
log=[],
created_at="2022-05-18T11:40:22.519222",
created_by=UserInfo(
user_id="123-123-123", first_name="Data", last_name="Admin"
),
),
Job(
job_id=JOB_ID,
datastore_rdn=DATASTORE_RDN,
status=JobStatus.QUEUED,
parameters=JobParameters(
operation=Operation.SET_STATUS,
target="KJOENN",
release_status=ReleaseStatus.PENDING_RELEASE,
),
log=[],
created_at="2022-05-18T11:40:22.519222",
created_by=UserInfo(
user_id="123-123-123", first_name="Data", last_name="Admin"
),
),
]
LOG_MESSAGE = "log message"
DESCRIPTION = "new description"
ERROR_RESPONSE = "Internal Server Error"


def test_get_jobs(requests_mock: RequestsMocker):
requests_mock.get(
f"{DATASTORE_API_URL}/jobs",
json=[
job.model_dump(by_alias=True, exclude_none=True) for job in JOB_LIST
],
)
jobs = datastore_api.get_jobs()
assert jobs == JOB_LIST
assert len(requests_mock.request_history) == 1


def test_update_job_status(requests_mock: RequestsMocker):
requests_mock.put(
f"{DATASTORE_API_URL}/jobs/{JOB_ID}", json={"message": "OK"}
)
datastore_api.update_job_status(JOB_ID, JobStatus.QUEUED)
datastore_api.update_job_status(JOB_ID, JobStatus.QUEUED, LOG_MESSAGE)
request_history = requests_mock.request_history
assert len(request_history) == 2
assert request_history[0].json() == {"status": "queued"}
assert request_history[1].json() == {
"status": "queued",
"log": LOG_MESSAGE,
}


def test_update_description(requests_mock: RequestsMocker):
requests_mock.put(
f"{DATASTORE_API_URL}/jobs/{JOB_ID}", json={"message": "OK"}
)
datastore_api.update_description(JOB_ID, DESCRIPTION)
request_history = requests_mock.request_history
assert len(request_history) == 1
assert request_history[0].json() == {"description": DESCRIPTION}


def test_no_connection(requests_mock: RequestsMocker):
requests_mock.get(
f"{DATASTORE_API_URL}/jobs", status_code=500, text=ERROR_RESPONSE
)
requests_mock.put(
f"{DATASTORE_API_URL}/jobs/{JOB_ID}",
status_code=500,
text=ERROR_RESPONSE,
)
with pytest.raises(HttpResponseError) as e:
datastore_api.get_jobs()
assert ERROR_RESPONSE in str(e)
with pytest.raises(HttpResponseError) as e:
datastore_api.update_job_status(JOB_ID, JobStatus.QUEUED)
assert ERROR_RESPONSE in str(e)
with pytest.raises(HttpResponseError) as e:
datastore_api.update_description(JOB_ID, DESCRIPTION)
assert ERROR_RESPONSE in str(e)


def test_get_maintenance_status(requests_mock: RequestsMocker):
requests_mock.get(
f"{DATASTORE_API_URL}/maintenance-statuses/latest",
json={
"paused": False,
"msg": "OK",
"timestamp": "2023-05-08T06:31:00.519222",
},
)
maintenance_status = datastore_api.get_maintenance_status()
assert maintenance_status.paused is False


def test_get_maintenance_status_error(requests_mock: RequestsMocker):
requests_mock.get(
f"{DATASTORE_API_URL}/maintenance-statuses/latest",
status_code=500,
text=ERROR_RESPONSE,
)
with pytest.raises(HttpResponseError) as e:
datastore_api.get_maintenance_status()
assert ERROR_RESPONSE in str(e)


@pytest.mark.parametrize(
"is_paused,expected_result",
[
(
True,
datastore_api.JobQueryResult(
built_jobs=JOB_LIST,
queued_manager_jobs=[],
queued_worker_jobs=[],
),
),
(
False,
datastore_api.JobQueryResult(
built_jobs=JOB_LIST,
queued_manager_jobs=JOB_LIST,
queued_worker_jobs=JOB_LIST,
),
),
],
)
def test_query_for_jobs(is_paused, expected_result, requests_mock, monkeypatch):
monkeypatch.setattr(
"job_executor.adapter.datastore_api.is_system_paused", lambda: is_paused
)

# Always return built jobs even if system is paused
# If system is paused, return empty list for queued and queued_manager jobs
def mock_get_jobs(job_status=None, operations=None):
if job_status == "built":
return JOB_LIST
elif job_status == "queued":
return JOB_LIST if not is_paused else []
elif job_status == "queued_manager":
return JOB_LIST if not is_paused else []

monkeypatch.setattr(
"job_executor.adapter.datastore_api.get_jobs", mock_get_jobs
)

result = datastore_api.query_for_jobs()
assert result.built_jobs == JOB_LIST
if is_paused:
assert result.queued_manager_jobs == []
assert result.queued_worker_jobs == []
else:
assert result.queued_manager_jobs == JOB_LIST
assert result.queued_worker_jobs == JOB_LIST
195 changes: 195 additions & 0 deletions tests/unit/adapter/test_local_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
import json
import os
import shutil
from pathlib import Path

import pytest

from job_executor.adapter.fs import LocalStorageAdapter
from job_executor.adapter.fs.models.datastore_versions import (
DatastoreVersions,
DraftVersion,
)
from job_executor.adapter.fs.models.metadata import (
MetadataAll,
)
from job_executor.common.exceptions import LocalStorageError

DATASTORE_DIR = "tests/unit/resources/adapter/local_storage/TEST_DATASTORE"
WORKING_DIR = DATASTORE_DIR + "_working"
DATASTORE_DATA_DIR = f"{DATASTORE_DIR}/data"

local_storage = LocalStorageAdapter(Path(DATASTORE_DIR))

DATASTORE_VERSIONS_PATH = f"{DATASTORE_DIR}/datastore/datastore_versions.json"
DRAFT_METADATA_ALL_PATH = f"{DATASTORE_DIR}/datastore/metadata_all__draft.json"
DRAFT_VERSION_PATH = f"{DATASTORE_DIR}/datastore/draft_version.json"
DATA_VERSIONS_PATH = f"{DATASTORE_DIR}/datastore/data_versions__1_0.json"
METADATA_ALL_PATH = f"{DATASTORE_DIR}/datastore/metadata_all__1_0_0.json"

DRAFT_DATASET_NAME = "UTDANNING"
DRAFT_DATA_PATH = f"{DATASTORE_DATA_DIR}/UTDANNING/UTDANNING__DRAFT.parquet"

DRAFT2_DATASET_NAME = "BRUTTO_INNTEKT"
RELEASED_DRAFT2_DATA_PATH = (
f"{DATASTORE_DATA_DIR}/BRUTTO_INNTEKT/BRUTTO_INNTEKT__1_1"
)

WORKING_DIR_DATASET = "FOEDESTED"
MOVED_WORKING_DIR_DATASET_DATA_PATH = (
f"{DATASTORE_DATA_DIR}/FOEDESTED/FOEDESTED__DRAFT.parquet"
)


def setup_function():
if os.path.isdir("tests/unit/resources_backup"):
shutil.rmtree("tests/unit/resources_backup")
shutil.copytree("tests/unit/resources", "tests/unit/resources_backup")


def teardown_function():
shutil.rmtree("tests/unit/resources")
shutil.move("tests/unit/resources_backup", "tests/unit/resources")


def read_json(file_path: str) -> dict:
with open(file_path, encoding="utf-8") as f:
return json.load(f)


def test_make_dataset_dir():
local_storage.datastore_dir.make_dataset_dir(WORKING_DIR_DATASET)
assert os.path.isdir(f"{DATASTORE_DATA_DIR}/{WORKING_DIR_DATASET}")


def test_get_data_versions():
assert local_storage.datastore_dir.get_data_versions("1_0_0") == read_json(
DATA_VERSIONS_PATH
)


def test_write_data_versions():
local_storage.datastore_dir.write_data_versions({}, "1_0_0")
assert read_json(DATA_VERSIONS_PATH) == {}


def test_get_draft_version():
assert isinstance(
local_storage.datastore_dir.get_draft_version(), DraftVersion
)


def test_write_draft_version():
draft_version = local_storage.datastore_dir.get_draft_version()
draft_version.description = "updated"
local_storage.datastore_dir.write_draft_version(draft_version)
assert (
local_storage.datastore_dir.get_draft_version().description == "updated"
)


def test_get_datastore_versions():
assert isinstance(
local_storage.datastore_dir.get_datastore_versions(), DatastoreVersions
)


def test_write_datastore_versions():
datastore_versions = local_storage.datastore_dir.get_datastore_versions()
datastore_versions.description = "updated"
local_storage.datastore_dir.write_datastore_versions(datastore_versions)
assert (
local_storage.datastore_dir.get_datastore_versions().description
== "updated"
)


def test_get_metadata_all():
assert isinstance(
local_storage.datastore_dir.get_metadata_all("1_0_0"), MetadataAll
)


def test_write_metadata_all():
metadata_all = local_storage.datastore_dir.get_metadata_all("1_0_0")
metadata_all.data_structures = []
local_storage.datastore_dir.write_metadata_all(metadata_all, "1_0_0")
assert (
local_storage.datastore_dir.get_metadata_all("1_0_0").data_structures
== []
)


def delete_parquet_draft():
local_storage.datastore_dir.delete_parquet_draft(DRAFT_DATASET_NAME)
assert not os.path.isfile(DRAFT_DATA_PATH)


def test_rename_parquet_draft_to_release():
release_path = local_storage.datastore_dir.rename_parquet_draft_to_release(
DRAFT2_DATASET_NAME, "1_1_0"
)
assert os.path.isdir(RELEASED_DRAFT2_DATA_PATH)
assert release_path == f"{DRAFT2_DATASET_NAME}__1_1"


def test_move_working_dir_parquet_to_datastore():
local_storage.datastore_dir.make_dataset_dir(WORKING_DIR_DATASET)
local_storage.move_working_dir_parquet_to_datastore(WORKING_DIR_DATASET)
assert os.path.isfile(MOVED_WORKING_DIR_DATASET_DATA_PATH)


def test_make_temp_directory():
datastore_content = os.listdir(Path(DATASTORE_DIR) / "datastore")
local_storage.datastore_dir.save_temporary_backup()
datastore_content_backup = os.listdir(Path(DATASTORE_DIR) / "datastore")
assert len(datastore_content_backup) == len(datastore_content) + 1
tmp_dir = Path(DATASTORE_DIR) / "datastore" / "tmp"
assert os.path.isdir(tmp_dir)
tmp_actual_content = os.listdir(tmp_dir)
tmp_expected_content = [
"metadata_all__DRAFT.json",
"datastore_versions.json",
"draft_version.json",
]
assert len(tmp_actual_content) == 3
for content in tmp_expected_content:
assert content in tmp_actual_content


def test_make_temp_directory_already_exists():
local_storage.datastore_dir.save_temporary_backup()
datastore_content = os.listdir(Path(DATASTORE_DIR) / "datastore")
assert "tmp" in datastore_content
with pytest.raises(LocalStorageError) as e:
local_storage.datastore_dir.save_temporary_backup()
assert "tmp directory already exists" in str(e)


def test_archive_temp_directory():
local_storage.datastore_dir.save_temporary_backup()
datastore_content = os.listdir(Path(DATASTORE_DIR) / "datastore")
local_storage.datastore_dir.archive_temporary_backup()
datastore_content_archived = os.listdir(Path(DATASTORE_DIR) / "datastore")
assert len(datastore_content) == len(datastore_content_archived) + 1
assert not os.path.isdir(Path(DATASTORE_DIR) / "datastore" / "tmp")


def test_archived_temp_directory_unrecognized_files():
local_storage.datastore_dir.save_temporary_backup()
tmp_dir = Path(DATASTORE_DIR) / "datastore" / "tmp"
assert os.path.isdir(tmp_dir)
(tmp_dir / "newfile.txt").touch()

with pytest.raises(LocalStorageError) as e:
local_storage.datastore_dir.archive_temporary_backup()
assert "Found unrecognized files" in str(e)


def test_archive_or_delete_non_existent_tmp_dir():
with pytest.raises(LocalStorageError) as e:
local_storage.datastore_dir.archive_temporary_backup()
assert "Could not find a tmp directory to archive." in str(e)
with pytest.raises(LocalStorageError) as e:
local_storage.datastore_dir.delete_temporary_backup()
assert "Could not find a tmp directory to delete." in str(e)
Loading