Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/pr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ jobs:
python-version: "3.13"

- name: Install the latest version of uv
uses: astral-sh/setup-uv@v6
uses: astral-sh/setup-uv@eb1897b8dc4b5d5bfe39a428a8f2304605e0983c # v7

- name: Sync dependencies
run: uv sync
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test-and-build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ jobs:
python-version: "3.13"

- name: Install the latest version of uv
uses: astral-sh/setup-uv@v6
uses: astral-sh/setup-uv@eb1897b8dc4b5d5bfe39a428a8f2304605e0983c # v7

- name: Sync dependencies
run: uv sync
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/update-tools-version.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jobs:
python-version: "3.13"

- name: Install the latest version of uv
uses: astral-sh/setup-uv@v6
uses: astral-sh/setup-uv@eb1897b8dc4b5d5bfe39a428a8f2304605e0983c # v7

- name: Sync dependencies
run: uv sync
Expand Down
108 changes: 72 additions & 36 deletions job_executor/adapter/local_storage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,15 @@

from pydantic import ValidationError

from job_executor.adapter.local_storage.models.datastore_versions import (
DatastoreVersions,
DraftVersion,
)
from job_executor.adapter.local_storage.models.metadata import (
Metadata,
MetadataAll,
MetadataAllDraft,
)
from job_executor.common.exceptions import LocalStorageError
from job_executor.config import environment

Expand Down Expand Up @@ -104,88 +113,115 @@ def write_data_versions(data_versions: dict, version: str) -> None:
)


def get_draft_version() -> dict:
def get_draft_version() -> DraftVersion:
"""
Returns the contents of the draft version json file as dict.
Reads the draft version file from the datastore.
"""
return _read_json(DRAFT_VERSION_PATH)
return DraftVersion.model_validate(_read_json(DRAFT_VERSION_PATH))


def write_draft_version(draft_version: dict) -> None:
def write_draft_version(draft_version: DraftVersion) -> None:
"""
Writes given dict to the draft version json file.

* draft_version: dict - draft version dict
Writes json representation of object to the draft version json file
by alias.
"""
_write_json(draft_version, DRAFT_VERSION_PATH, indent=2)
_write_json(
draft_version.model_dump(by_alias=True), DRAFT_VERSION_PATH, indent=2
)


def get_datastore_versions() -> dict:
def get_datastore_versions() -> DatastoreVersions:
"""
Returns the contents of the datastore versions json file as dict.
"""
return _read_json(DATASTORE_VERSIONS_PATH)
return DatastoreVersions.model_validate(_read_json(DATASTORE_VERSIONS_PATH))


def write_datastore_versions(datastore_versions: dict) -> None:
def write_datastore_versions(datastore_versions: DatastoreVersions) -> None:
"""
Writes given dict to the datastore versions json file.

* datastore_versions: dict - datastore_versions dict
Writes json representation of object to the draft version json file
by alias.
"""
_write_json(
datastore_versions,
datastore_versions.model_dump(by_alias=True),
DATASTORE_VERSIONS_PATH,
indent=2,
)


def get_metadata_all(version: str) -> dict:
def get_metadata_all(version: str) -> MetadataAll:
"""
Returns the metadata all json file for the given version as a dict.
Returns the metadata all json file for the given version.

* version: str - '<MAJOR>_<MINOR>_<PATCH>' formatted semantic version
or 'DRAFT'
"""
return _read_json(DATASTORE_DIR / f"datastore/metadata_all__{version}.json")
return MetadataAll.model_validate(
_read_json(DATASTORE_DIR / f"datastore/metadata_all__{version}.json")
)


def write_metadata_all(metadata_all: dict, version: str) -> None:
def write_metadata_all(metadata_all: MetadataAll, version: str) -> None:
"""
Writes given dict to a metadata all json file to the appropriate
Writes given metadata all to the appropriate json file in the
datastore directory named with the given version.
If supplied version is 'DRAFT' a tmp file will be written to first

* metadata_all: MetadataAll - A MetadataAll object
* version: str - '<MAJOR>_<MINOR>_<PATCH>'
"""
_write_json(
metadata_all.model_dump(by_alias=True, exclude_none=True),
DATASTORE_DIR / f"datastore/metadata_all__{version}.json",
)


def get_metadata_all_draft() -> MetadataAllDraft:
"""
Returns the metadata all draft json file.
"""
return MetadataAllDraft.model_validate(
_read_json(DATASTORE_DIR / "datastore/metadata_all__DRAFT.json")
)


def write_metadata_all_draft(metadata_all: MetadataAllDraft) -> None:
"""
Writes json representation of object to the metadata all draft json file
by alias. A tmp file will be written to first
to avoid downtime in consuming services due to incomplete json while
writing.
* metadata_all: dict - metadata all dict
* version: str - '<MAJOR>_<MINOR>_<PATCH>' formatted semantic version
or 'DRAFT'

"""
file_path = DATASTORE_DIR / f"datastore/metadata_all__{version}.json"
if version == "DRAFT":
_write_json_with_tmp(metadata_all, file_path)
else:
_write_json(metadata_all, file_path)
_write_json_with_tmp(
metadata_all.model_dump(by_alias=True, exclude_none=True),
DATASTORE_DIR / "datastore/metadata_all__DRAFT.json",
indent=2,
)


def write_working_dir_metadata(dataset_name: str, metadata: dict) -> None:
def write_working_dir_metadata(dataset_name: str, metadata: Metadata) -> None:
"""
Writes a json to a the working directory as the processed metadata file
named: {dataset_name}__DRAFT.json

* dataset_name: str - name of dataset
* metadata: dict - dictionary to write as json
* metadata: Metadata - Metadata to write as json
"""
_write_json(metadata, WORKING_DIR / f"{dataset_name}__DRAFT.json")
_write_json(
metadata.model_dump(by_alias=True, exclude_none=True),
WORKING_DIR / f"{dataset_name}__DRAFT.json",
)


def get_working_dir_metadata(dataset_name: str) -> dict:
def get_working_dir_metadata(dataset_name: str) -> Metadata:
"""
Returns the working dir metadata json file for given dataset_name.
Returns the working dir metadata json file for given dataset_name
as a Metadata object.

* dataset_name: str - name of dataset
"""
return _read_json(WORKING_DIR / f"{dataset_name}__DRAFT.json")
return Metadata.model_validate(
_read_json(WORKING_DIR / f"{dataset_name}__DRAFT.json")
)


def delete_working_dir_metadata(dataset_name: str) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ def release_pending(self) -> tuple[list[DataStructureUpdate], str]:
return pending_updates, update_type

def _write_to_file(self) -> None:
local_storage.write_draft_version(self.model_dump(by_alias=True))
local_storage.write_draft_version(self)

def set_draft_release_status(
self, dataset_name: str, new_status: str
Expand Down Expand Up @@ -215,7 +215,7 @@ class DatastoreVersions(CamelModel, extra="forbid"):
versions: list[DatastoreVersion]

def _write_to_file(self) -> None:
local_storage.write_datastore_versions(self.model_dump(by_alias=True))
local_storage.write_datastore_versions(self)

def _get_current_epoch_seconds(self) -> int:
return int(
Expand Down
6 changes: 2 additions & 4 deletions job_executor/adapter/local_storage/models/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@


class TimePeriod(CamelModel):
start: int | None
start: int | None = None
stop: int | None = None

def __eq__(self, other: object) -> bool:
Expand Down Expand Up @@ -409,9 +409,7 @@ def get(self, dataset_name: str) -> Metadata | None:

class MetadataAllDraft(MetadataAll):
def _write_to_file(self) -> None:
local_storage.write_metadata_all(
self.model_dump(by_alias=True, exclude_none=True), "DRAFT"
)
local_storage.write_metadata_all_draft(self)

def remove(self, dataset_name: str) -> None:
self.data_structures = [
Expand Down
30 changes: 8 additions & 22 deletions job_executor/domain/datastores.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,12 @@ class Datastore:
latest_version_number: str | None

def __init__(self) -> None:
self.draft_version = DraftVersion.model_validate(
local_storage.get_draft_version()
)
self.datastore_versions = DatastoreVersions.model_validate(
local_storage.get_datastore_versions()
)
self.draft_version = local_storage.get_draft_version()
self.datastore_versions = local_storage.get_datastore_versions()
self.latest_version_number = (
self.datastore_versions.get_latest_version_number()
)
self.metadata_all_draft = MetadataAllDraft.model_validate(
local_storage.get_metadata_all("DRAFT")
)
self.metadata_all_draft = local_storage.get_metadata_all_draft()
if self.latest_version_number is None:
self.metadata_all_latest = None
else:
Expand Down Expand Up @@ -81,12 +75,10 @@ def _generate_new_metadata_all(
]
new_metadata_all = MetadataAll(**new_metadata_all_dict)
local_storage.write_metadata_all(
new_metadata_all.model_dump(by_alias=True, exclude_none=True),
new_metadata_all,
new_version,
)
datastore.metadata_all_latest = MetadataAll(
**local_storage.get_metadata_all(new_version)
)
datastore.metadata_all_latest = local_storage.get_metadata_all(new_version)


def _version_pending_operations(
Expand Down Expand Up @@ -175,9 +167,7 @@ def patch_metadata(
"Can't patch metadata of dataset with status "
f"{dataset_release_status}"
)
draft_metadata = Metadata(
**local_storage.get_working_dir_metadata(dataset_name)
)
draft_metadata = local_storage.get_working_dir_metadata(dataset_name)
released_metadata = datastore.metadata_all_latest.get(dataset_name)
if released_metadata is None:
raise NoSuchDraftException(
Expand Down Expand Up @@ -240,9 +230,7 @@ def add(
release_status="DRAFT",
)
)
draft_metadata = Metadata(
**local_storage.get_working_dir_metadata(dataset_name)
)
draft_metadata = local_storage.get_working_dir_metadata(dataset_name)
local_storage.make_dataset_dir(dataset_name)
datastore.metadata_all_draft.add(draft_metadata)
local_storage.move_working_dir_parquet_to_datastore(dataset_name)
Expand Down Expand Up @@ -279,9 +267,7 @@ def change(
"Can't change data for dataset with release status"
f"{dataset_release_status}"
)
draft_metadata = Metadata(
**local_storage.get_working_dir_metadata(dataset_name)
)
draft_metadata = local_storage.get_working_dir_metadata(dataset_name)
datastore.metadata_all_draft.update_one(dataset_name, draft_metadata)
datastore.draft_version.add(
DataStructureUpdate(
Expand Down
6 changes: 2 additions & 4 deletions job_executor/domain/worker/build_dataset_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

from job_executor.adapter import datastore_api, local_storage
from job_executor.adapter.datastore_api.models import JobStatus
from job_executor.adapter.local_storage.models.metadata import Metadata
from job_executor.common.exceptions import BuilderStepError, HttpResponseError
from job_executor.config import environment
from job_executor.config.log import configure_worker_logger
Expand Down Expand Up @@ -77,12 +76,11 @@ def run_worker(job_id: str, dataset_name: str, logging_queue: Queue) -> None:

local_storage.delete_working_dir_dir(WORKING_DIR / f"{dataset_name}")
datastore_api.update_job_status(job_id, JobStatus.TRANSFORMING)
transformed_metadata_json = dataset_transformer.run(input_metadata)
transformed_metadata = dataset_transformer.run(input_metadata)
local_storage.write_working_dir_metadata(
dataset_name, transformed_metadata_json
dataset_name, transformed_metadata
)
local_storage.delete_working_dir_file(metadata_file_path)
transformed_metadata = Metadata(**transformed_metadata_json)

temporality_type = transformed_metadata.temporality
if _dataset_requires_pseudonymization(input_metadata):
Expand Down
7 changes: 4 additions & 3 deletions job_executor/domain/worker/steps/dataset_transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from job_executor.adapter.local_storage.models.metadata import (
DATA_TYPES_MAPPING,
Metadata,
)
from job_executor.common.exceptions import BuilderStepError

Expand Down Expand Up @@ -283,7 +284,7 @@ def _transform_temporal_end(temporal_end: dict) -> dict[str, str]:
return temporal_end_result


def _transform_metadata(metadata: dict) -> dict:
def _transform_metadata(metadata: dict) -> Metadata:
logger.info("Transforming metadata")
# These values are found by going through the data file.
# When we do transformation of metadata alone, we do not
Expand Down Expand Up @@ -326,10 +327,10 @@ def _transform_metadata(metadata: dict) -> dict:
metadata["dataRevision"]["temporalEnd"]
)
logger.info("Finished transformation")
return transformed
return Metadata.model_validate(transformed)


def run(metadata: dict) -> dict:
def run(metadata: dict) -> Metadata:
"""
Transforms a metadatafile from the input model to the SIKT
metadata model that is stored in the datastore.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"temporality": "EVENT",
"sensitivityLevel": "PERSON_GENERAL",
"subjectFields": ["Befolkning"],
"temporalCoverage": { "start": -18263, "stop": -6940 },
"temporalCoverage": {},
"identifierVariables": [
{
"variableRole": "Identifier",
Expand Down
Loading