diff --git a/.gitignore b/.gitignore index 5fa5c30..9c387e2 100644 --- a/.gitignore +++ b/.gitignore @@ -30,3 +30,7 @@ MANIFEST .coverage coverage.xml report.xml + +# IDEs +.idea + diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 79b09bd..8845c8f 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,7 +1,7 @@ repos: - repo: https://github.com/pre-commit/pre-commit-hooks - rev: v3.4.0 + rev: v5.0.0 hooks: - id: trailing-whitespace - id: end-of-file-fixer @@ -9,23 +9,22 @@ repos: - id: check-json - id: check-yaml - repo: https://github.com/pre-commit/mirrors-mypy - rev: v0.991 + rev: v1.15.0 hooks: - id: mypy + exclude: ^slkspec/tests - repo: https://github.com/psf/black - rev: 22.10.0 + rev: 25.1.0 hooks: - id: black + args: ["--line-length=88", "--target-version=py310"] - repo: https://github.com/pycqa/flake8 - rev: 5.0.4 + rev: 7.1.2 hooks: - id: flake8 + args: ["--max-line-length=88", "--max-complexity=10", "--doctests"] - repo: https://github.com/pre-commit/mirrors-isort - rev: v5.7.0 + rev: v5.10.1 hooks: - id: isort - - repo: https://github.com/PyCQA/docformatter - rev: v1.5.0 - hooks: - - id: docformatter - args: [--in-place] + args: ["--profile", "black"] diff --git a/CHANGELOG.md b/CHANGELOG.md index caeaaa7..9e2ee55 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,10 @@ # Changelog +## v0.0.3 +- adapted to ``pyslk`` version >= 2.0.0 +- improved and parallelized retrieval workflow +- dependency on ``pyslk`` available via PyPI (and not DKRZ GitLab Repo) + ## v0.0.2 - Add CHANGELOG [#50](https://github.com/observingClouds/slkspec/pull/50) - Modernize packaging with pyproject.toml [#49](https://github.com/observingClouds/slkspec/pull/49) diff --git a/pyproject.toml b/pyproject.toml index 95d64a8..7053f3e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -17,7 +17,7 @@ classifiers = [ requires-python = ">=3.9" dependencies = [ "fsspec>=0.9.0", - "pyslk @ git+https://gitlab.dkrz.de/hsm-tools/pyslk.git@master" + "pyslk>=2.2.9", ] [project.entry-points."fsspec.specs"] diff --git a/slkspec/core.py b/slkspec/core.py index 3ebb625..eedabb0 100644 --- a/slkspec/core.py +++ b/slkspec/core.py @@ -1,11 +1,14 @@ from __future__ import annotations import io +import json import logging import os import threading import time import warnings +from collections import defaultdict +from datetime import datetime from getpass import getuser from pathlib import Path from queue import Queue @@ -22,16 +25,31 @@ overload, ) +import pandas as pd import pyslk from fsspec.spec import AbstractFileSystem logger = logging.getLogger("slkspec") logger.setLevel(logging.INFO) - MAX_RETRIES = 2 +MAX_PARALLEL_RECALLS = 3 +MAX_RETRIES_RECALL = 3 FileQueue: Queue[Tuple[str, str]] = Queue(maxsize=-1) -FileInfo = TypedDict("FileInfo", {"name": str, "size": Literal[None], "type": str}) +FileInfo = TypedDict("FileInfo", {"name": str, "size": int, "type": str}) +TapeGroup = TypedDict( + "TapeGroup", + { + "id": int, + "location": str, + "description": str, + "barcode": str, + "status": str, + "file_count": int, + "files": list[str], + "file_ids": list[int], + }, +) _retrieval_lock = threading.Lock() @@ -49,17 +67,19 @@ class SLKFile(io.IOBase): retrieve data from tape. override: bool, default: False Override existing files - touch: bool, default: True + touch: bool, default: False Update existing files on the temporary storage to prevent them - from being deleted. + from being deleted. // not necessary as they are read. mode: str, default: rb Specify the mode in which the files are opened 'r' open for reading (default) 'b' binary mode (default) 't' text mode - file_permissions: int, default 0o3777 - Permission when creating directories and files. + file_permissions: int, default: 0o644 + Permission when creating files. + dir_permissions: int, default: 0o3775 + Permission when creating directories. **kwargs: Additional keyword arguments passed to the open file descriptor method. @@ -91,8 +111,9 @@ def __init__( *, override: bool = True, mode: str = "rb", - touch: bool = True, - file_permissions: int = 0o3777, + touch: bool = False, + file_permissions: int = 0o644, + dir_permissions: int = 0o3775, delay: int = 2, _lock: threading.Lock = _retrieval_lock, _file_queue: Queue[Tuple[str, str]] = FileQueue, @@ -107,6 +128,7 @@ def __init__( self.slk_cache = Path(slk_cache) self.touch = touch self.file_permissions = file_permissions + self.dir_permissions = dir_permissions self._order_num = 0 self._file_obj: Optional[IO[Any]] = None self._lock = _lock @@ -118,7 +140,6 @@ def __init__( self.write_through = False self.delay = delay self._file_queue = _file_queue - print(self._file) with _lock: if not Path(self._file).exists() or override: self._file_queue.put((self._url, str(Path(self._file).parent))) @@ -135,22 +156,92 @@ def name(self) -> str: return self._url def _retrieve_items(self, retrieve_files: list[tuple[str, str]]) -> None: - """Get items from the tape archive.""" - - retrieval_requests: List[str] = list() - logger.debug("Retrieving %i items from tape", len(retrieve_files)) - for inp_file, _ in retrieve_files: - retrieval_requests.append(inp_file) - logger.debug("Creating slk query for %i files", len(retrieve_files)) - search_id = pyslk.search(pyslk.slk_gen_file_query(retrieval_requests)) - if search_id is None: - raise FileNotFoundError("No files found in archive.") - logger.debug("Retrieving files for search id: %i", search_id) - pyslk.slk_retrieve(search_id, str(self.slk_cache), preserve_path=True) - logger.debug("Adjusting file permissions") - for out_file in retrieval_requests: - local_path = self.slk_cache / Path(out_file.strip("/")) - local_path.chmod(self.file_permissions) + """Get items from the tape archive. + Retrieves items using given list of files and performs necessary operations. + + Parameters: + - retrieve_files: a list of tuples containing file source and destination + + Returns: None + """ + logger.debug("retrieval routine initializing") + retrieve_files_corrected: list[tuple[str, str]] = _reformat_retrieve_files_list( + retrieve_files=retrieve_files, + dir_permissions=self.dir_permissions, + ) + # declare variables + files_retrieval_failed: dict[str, str] = dict() + # start + logger.debug( + "Retrieving %i items from tape (%i already available)", + len(retrieve_files_corrected), + len(retrieve_files) - len(retrieve_files_corrected), + ) + # instantiate recall and retrieval classes + slk_recall: SLKRecall = SLKRecall(retrieve_files_corrected) + slk_retrieval: SLKRetrieval = SLKRetrieval( + slk_recall, + retrieve_files_corrected, + files_retrieval_failed, + self.file_permissions, + ) + + # iterate as long as there are files to retrieve; but first start recalls + # (after each retrieval, start recalls; done in retrieval class) + iterations: int = 0 + retrieve_timer: float + slk_recall.start_recalls() + # we do not generally remove files_recall_failed from to_be_retrieved because + # some files of failed recalls might have been recalled + while slk_retrieval.number_files_still_to_be_retrieved_realistically() > 0: + iterations += 1 + retrieve_timer = time.time() + logger.info( + ( + "retrieve/recall iteration %i; %i requested files missing (of " + + "which %i failed) => %i realistically to be retrieved. " + + "Currently, trying %i files. %i recall jobs running " + + "for %i files." + ), + iterations, + len(slk_retrieval.files_retrieval_requested), + len(slk_retrieval.files_retrieval_failed), + ( + len(slk_retrieval.files_retrieval_requested) + - len(slk_retrieval.files_retrieval_failed) + ), + len(slk_retrieval.files_retrieval_reasonable), + slk_recall.number_active_jobs(), + slk_recall.number_files_in_active_jobs(), + ) + slk_retrieval.run_retrieval() + if ( + len(slk_retrieval.files_retrieval_reasonable) > 0 + and time.time() - retrieve_timer < 60 + ): + logger.info( + f"Waiting for {int(60 - (time.time() - retrieve_timer))} " + + "seconds before next retrieval." + ) + time.sleep(60 - (time.time() - retrieve_timer)) + + # print files which are not available + _write_file_lists(slk_recall, slk_retrieval, self.slk_cache) + + # throw error if not all files were retrieved + if ( + len(slk_retrieval.files_retrieval_failed) + + len(slk_recall.files_recall_failed) + > 0 + ): + tmp_sum = len(slk_retrieval.files_retrieval_failed) + len( + slk_recall.files_recall_failed + ) + raise pyslk.PySlkException( + f"{tmp_sum} of requested" + + f"{len(slk_retrieval.retrieve_files_corrected)} files could not be " + + "retrieved. Please check previous error messages for affected files." + ) def _cache_files(self) -> None: time.sleep(self.delay) @@ -205,7 +296,7 @@ def seekable() -> Literal[True]: return True def read(self, size: int = -1) -> Any: - """The the content of a file-stream. + """The content of a file-stream. size: int, default: -1 read at most size characters from the stream, -1 means everything @@ -247,13 +338,16 @@ class SLKFileSystem(AbstractFileSystem): retrieve data from tape. block_size: int, default: None Some indication of buffering - this is a value in bytes - file_permissions: int, default: 0o3777 - Permission when creating directories and files. + file_permissions: int, default: 0o644 + Permission when creating files. + dir_permissions: int, default: 0o3775 + Permission when creating directories. override: bool, default: False Override existing files - touch: bool, default: True - Update existing files on the temporary storage to prevent them - from being deleted. + touch: bool, default: False + Update `mtime` of temporary files to prevent them from being deleted. Depending + on the implemented method to delete temporary files this might not be necessary + or have no effect. **storage_options: Additional options passed to the AbstractFileSystem class. """ @@ -266,8 +360,9 @@ def __init__( self, block_size: Optional[int] = None, slk_cache: Optional[Union[str, Path]] = None, - file_permissions: int = 0o3777, - touch: bool = True, + file_permissions: int = 0o644, + dir_permissions: int = 0o3775, + touch: bool = False, delay: int = 2, override: bool = False, **storage_options: Any, @@ -298,6 +393,7 @@ def __init__( self.override = override self.delay = delay self.file_permissions = file_permissions + self.dir_permissions = dir_permissions @overload def ls( @@ -332,19 +428,19 @@ def ls( information dicts if detail is True. """ path = Path(path) - filelist = pyslk.slk_list(str(path)).split("\n") + filelist: pd.DataFrame = pyslk.ls(str(path), full_path=True) detail_list: List[FileInfo] = [] types = {"d": "directory", "-": "file"} - for file_entry in filelist[:-2]: + for index, row in filelist.iterrows(): entry: FileInfo = { - "name": str(path / " ".join(file_entry.split()[8:])), - "size": None, # sizes are human readable not in bytes - "type": types[file_entry[0]], + "name": str(row.filename), + "size": int(row.filesize), + "type": types[row.permissions[0]], } detail_list.append(entry) if detail: return detail_list - return [d["name"] for d in detail_list] + return filelist.filename.tolist() def _open( self, @@ -367,4 +463,800 @@ def _open( delay=self.delay, encoding=kwargs.get("encoding"), file_permissions=self.file_permissions, + dir_permissions=self.dir_permissions, + ) + + +class SLKRecall: + + def __init__(self, retrieve_files_corrected: list[tuple[str, str]]): + # some internal control variables + self.grouping_initialized = False + # tape_job_mapping: {"": [, , ...]} + self.tape_job_mapping: dict[str, list[int]] = defaultdict(list) + self.multi_tape_file_job_mapping: dict[int, list[int]] = defaultdict(list) + # overview over active jobs + self.job_tape_mapping: dict[int, str] = dict() + self.job_multi_tape_file_mapping: dict[int, int] = dict() + # store tapes and multi-tape-files to process later + self.file_ids_multiple_tapes: list[int] = list() + self.tapes: list[str] + # all tapes / all multi-tape-files done + self.all_tapes_done: bool + self.all_multi_tape_files_done: bool + # list of failed (after MAX_RETRIES_RECALL recalls) and successful recalls + self.tapes_success: set[str] = set() + self.tapes_active: set[str] = set() + self.tapes_failed: dict[str, str] = dict() + self.multi_tape_files_success: set[int] = set() + self.multi_tape_files_active: set[int] = set() + self.multi_tape_files_failed: dict[int, str] = dict() + # list of files which recalls failed + self.files_recall_failed: dict[str, str] = dict() + # list of files for which recall started + self.files_recall_started: list[str] = list() + self._files_recall_newly_started: list[str] = list() + # list of files which were cached from the beginning + self.files_cached_from_beginning: list[str] = list() + # mapping from tape to file list + self.tape_file_mapping: dict[str, list[int]] = defaultdict(list) + # initialize file-tape-grouping + self._initialize_grouping(retrieve_files_corrected) + + def _initialize_grouping( + self, retrieve_files_corrected: list[tuple[str, str]] + ) -> None: + # tape grouping + file_tape_grouping: list[TapeGroup] = pyslk.group_files_by_tape( + [inp_file for inp_file, out_dir in retrieve_files_corrected] + ) + # get list of all tape barcodes (volume_ids) + self.tapes = [ + tape_group["barcode"] + for tape_group in file_tape_grouping + if tape_group.get("id", -1) > 0 + ] + self.all_tapes_done = len(self.tapes) == 0 + # get list of file split amongst multiple tapes + for tape_group in file_tape_grouping: + # check if there are files on multiple tapes included + if ( + tape_group.get("id", 0) == -1 + and tape_group.get("location", "") == "tape" + ): + self.file_ids_multiple_tapes = tape_group["file_ids"] + if ( + tape_group.get("location", "") == "tape" + and tape_group.get("id", -1) > 0 + ): + self.tape_file_mapping[tape_group["barcode"]] = tape_group["file_ids"] + if ( + tape_group.get("id", 0) == -1 + and tape_group.get("location", "") == "cache" + ): + self.files_cached_from_beginning = [ + str(file_path) for file_path in tape_group["files"] + ] + self.all_multi_tape_files_done = len(self.file_ids_multiple_tapes) == 0 + self.grouping_initialized = True + + def start_recalls(self) -> None: + logger.debug("Recall function started") + + # +---------------------------------------------------------- + # | CHECK IF WE NEED TO RUN THIS FUNCTION + # +---------------------------------------------------------- + # all_tapes_done is not set => check if conditions to set this are fulfilled => + # print user info once + remaining_tapes: list = [ + tape + for tape in self.tapes + if tape not in self.tapes_success and tape not in self.tapes_failed + ] + self.all_tapes_done = len(remaining_tapes) == 0 and not self.all_tapes_done + + if self.all_tapes_done: + logger.info("All tapes have been processed.") + + # all_multi_tape_files_done is not set => check if conditions to set this are + # fulfilled + # => print user info once + remaining_split_files: list = [ + file_id + for file_id in self.file_ids_multiple_tapes + if file_id not in self.multi_tape_files_success + and file_id not in self.multi_tape_files_failed + ] + self.all_multi_tape_files_done = ( + len(remaining_split_files) == 0 and not self.all_multi_tape_files_done + ) + if self.all_multi_tape_files_done: + logger.info("All files split amongst multiple tapes have been processed.") + + # leave directly if all tapes and all multi-tape-files have been processed + if self.all_tapes_done and self.all_multi_tape_files_done: + logger.debug("Recall function has nothing to do; leaving") + return + + # +---------------------------------------------------------- + # | CHECK STATUS OF RUNNING JOBS + # +---------------------------------------------------------- + # check if there are jobs running for whole tapes: + logger.info( + "Number of running jobs based on tape: %i", + len(self.job_tape_mapping), + ) + if not self.all_tapes_done and len(self.job_tape_mapping) > 0: + self._check_normal_recall_jobs() + + logger.info( + "Number of running jobs based on split files: %i", + len(self.job_multi_tape_file_mapping), + ) + # check if there are jobs running for files split amongst multiple tapes: + if ( + not self.all_multi_tape_files_done + and len(self.job_multi_tape_file_mapping) > 0 + ): + self._check_split_file_recall_jobs() + + # +---------------------------------------------------------- + # | SUBMIT NEW JOBS IF NECESSARY + # +---------------------------------------------------------- + # start new recalls if there are less than the max number of these jobs are + # running and tapes are free + tapes_available: list = [ + tape + for tape in self.tapes + if ( + tape not in self.tapes_success + and tape not in self.tapes_failed + and tape not in self.tapes_active + ) + ] + logger.debug( + "Number of tapes for which recalls need to be submitted: %i", + len(tapes_available), + ) + logger.debug( + "Number of running jobs: %i", + len(self.job_tape_mapping) + len(self.job_multi_tape_file_mapping), + ) + logger.debug("Maximum allowed number of jobs: %i", MAX_PARALLEL_RECALLS) + if ( + len(tapes_available) > 0 + and len(self.job_tape_mapping) + len(self.job_multi_tape_file_mapping) + < MAX_PARALLEL_RECALLS + ): + self._start_normal_recall_jobs(tapes_available) + + # iterate over files stored on multiple tapes each + multi_tape_files_available = [ + file_id + for file_id in self.file_ids_multiple_tapes + if ( + file_id not in self.multi_tape_files_success + and file_id not in self.multi_tape_files_failed + and file_id not in self.multi_tape_files_active + ) + ] + logger.debug( + "Number of files for which recalls need to be submitted: %i", + len(multi_tape_files_available), + ) + logger.debug( + "Number of running jobs: %i", + len(self.job_tape_mapping) + len(self.job_multi_tape_file_mapping), + ) + logger.debug("Maximum allowed number of jobs: %i", MAX_PARALLEL_RECALLS) + if ( + len(multi_tape_files_available) > 0 + and len(self.job_tape_mapping) + len(self.job_multi_tape_file_mapping) + < MAX_PARALLEL_RECALLS + ): + self._start_split_files_recall_jobs(multi_tape_files_available) + + logger.debug("Recall function ended") + + def _check_normal_recall_jobs(self) -> None: + job_ids_to_be_removed: set[int] = set() + job_id: int + msg: str + job_status: pyslk.StatusJob + # iterate all ids of running jobs + for job_id, tape_barcode in self.job_tape_mapping.items(): + logger.debug("Checking status of job %i (tape: %s)", job_id, tape_barcode) + job_status = pyslk.get_job_status(job_id) + logger.debug("Job status: %s", job_status.get_status_name()) + # DIFFERENT JOB STATES + if job_status.is_successful(): + # SUCCESS => mark tape as successful; remember job id to be + # considered as free; consider this job to be done + msg = f"Job {job_id} ended successfully (tape: {tape_barcode})." + logger.debug(msg) + self.tapes_active.remove(tape_barcode) + self.tapes_success.add(tape_barcode) + job_ids_to_be_removed.add(job_id) + elif job_status.is_queued() or job_status.is_processing(): + # STILL WAITING OR BEING PROCESSED => do nothing; just wait further + msg = f"Job {job_id} not finished yet (tape: {tape_barcode})." + logger.debug(msg) + pass + elif job_status.is_paused(): + # PAUSED => something is wrong => admins manually paused job BUT + # keep this job as it is log warning message; but do nothing else + msg = ( + f"Job {job_id} in paused state (tape: {tape_barcode}). " + + "Waiting for this job." + ) + logger.warning(msg) + elif job_status.has_failed(): + # FAILED => something went wrong => can be restarted + # DO NOT RESTART if MAX_RETRIES_RECALL of retries have been + # reached log warning message; but do nothing else + msg = ( + f"Job {job_id} has failed (tape: {tape_barcode}). " + + f"{4 - len(self.tape_job_mapping.get(tape_barcode, list()))} " + + f"of {MAX_RETRIES_RECALL} retries left" + ) + logger.warning(msg) + self.tapes_active.remove(tape_barcode) + job_ids_to_be_removed.add(job_id) + if ( + len(self.tape_job_mapping.get(tape_barcode, list())) + >= MAX_RETRIES_RECALL + 1 + ): + # consider this tape to fail permanently + # we do this tmp_list_? in order to prevent line breaks + tmp_list_a = self.tape_job_mapping[tape_barcode] + msg = ( + "max retries reached (jobs failed: " + + f"{', '.join([str(job_id) for job_id in tmp_list_a])})" + ) + self.tapes_failed[tape_barcode] = msg + logger.error(msg) + # get file ids + file_ids = self.tape_file_mapping[tape_barcode] + for file_id in file_ids: + self.files_recall_failed[pyslk.get_resource_path(file_id)] = msg + else: + # SOMETHING ELSE ... + # unexpected state; log warning message; but do nothing else + msg = ( + f"Job {job_id} has unexpected state (tape id: {tape_barcode}): " + + f"{job_status.get_status_name()}. Do not proceed with this " + + "tape." + ) + logger.error(msg) + job_ids_to_be_removed.add(job_id) + self.tapes_active.remove(tape_barcode) + self.tapes_failed[tape_barcode] = msg + # get file ids + file_ids = self.tape_file_mapping[tape_barcode] + for file_id in file_ids: + self.files_recall_failed[pyslk.get_resource_path(file_id)] = msg + # remove ids of jobs which ended + for job_id_to_be_removed in job_ids_to_be_removed: + del self.job_tape_mapping[job_id_to_be_removed] + + def _check_split_file_recall_jobs(self) -> None: + job_ids_to_be_removed: set[int] = set() + job_id: int + msg: str + job_status: pyslk.StatusJob + job_ids_to_be_removed = set() + # iterate all ids of running jobs + for job_id, file_id in self.job_multi_tape_file_mapping.items(): + logger.debug("Checking status of job %i (file id: %i)", job_id, file_id) + job_status = pyslk.get_job_status(job_id) + # DIFFERENT JOB STATES + if job_status.is_successful(): + # SUCCESS => mark tape as successful; remember job id to be + # considered as free; consider this job to be done + msg = f"Job {job_id} ended successfully (file id: {file_id})." + logger.debug(msg) + self.multi_tape_files_success.add(file_id) + job_ids_to_be_removed.add(job_id) + elif job_status.is_queued() or job_status.is_processing(): + # STILL WAITING OR BEING PROCESSED => do nothing; just wait further + msg = f"Job {job_id} not finished yet (file id: {file_id})." + logger.debug(msg) + pass + elif job_status.is_paused(): + # PAUSED => something is wrong => admins manually paused job BUT + # keep this job as it is log warning message; but do nothing else + msg = ( + f"Job {job_id} in paused state (file id: {file_id}). Waiting" + + " for this job." + ) + logger.warning(msg) + elif job_status.has_failed(): + # FAILED => something went wrong => can be restarted + # DO NOT RESTART if MAX_RETRIES_RECALL of retries have been + # reached log warning message; but do nothing else + tmp_len = len(self.multi_tape_file_job_mapping.get(file_id, list())) + msg = ( + f"Job {job_id} has failed (file id: {file_id}). " + + f"{4 - tmp_len} of " + + f"{MAX_RETRIES_RECALL} retries left" + ) + logger.warning(msg) + job_ids_to_be_removed.add(job_id) + if ( + len(self.multi_tape_file_job_mapping.get(file_id, list())) + >= MAX_RETRIES_RECALL + 1 + ): + tmp_list_c = self.multi_tape_file_job_mapping[file_id] + # consider this job to be done + msg = ( + "max retries reached (jobs failed: " + + f"{', '.join([str(job_id) for job_id in tmp_list_c])})" + ) + logger.error(msg) + self.multi_tape_files_failed[file_id] = msg + self.files_recall_failed[pyslk.get_resource_path(file_id)] = msg + else: + # SOMETHING ELSE ... + # unexpected state; log warning message; but do nothing else + msg = ( + f"Job {job_id} has unexpected state (file id: {file_id}): " + + f"{job_status.get_status_name()}. Do not proceed with this " + + "tape." + ) + logger.error(msg) + job_ids_to_be_removed.add(job_id) + self.multi_tape_files_failed[file_id] = msg + self.files_recall_failed[pyslk.get_resource_path(file_id)] = msg + # remove ids of jobs which ended + for job_id_to_be_removed in job_ids_to_be_removed: + del self.job_multi_tape_file_mapping[job_id_to_be_removed] + + def _start_normal_recall_jobs(self, tapes_available: list) -> None: + job_id: int + msg: str + # iterate over all tapes until + # (a) all tapes were iterated or + # (b) the maximum number of parallel recalls has been reached + for tape in tapes_available: + if ( + len(self.job_tape_mapping) + len(self.job_multi_tape_file_mapping) + >= MAX_PARALLEL_RECALLS + ): + logger.debug( + "Submitting no additional recalls because max number of " + + "parallel recalls has been reached." + ) + break + logger.debug("Considering tape %s for next recall.", tape) + # go through tape list and start new recalls + tape_status = pyslk.get_tape_status(tape) + logger.debug("Tape %s status: %s", tape, tape_status) + if tape_status == "BLOCKED": + # do nothing + msg = f"Tape {tape} is blocked. Skip it until next time." + logger.debug(msg) + elif tape_status == "FAILED": + msg = ( + f"Tape {tape} is in failed state. Do not proceed getting " + + "data from this tape." + ) + logger.error(msg) + self.tapes_failed[tape] = msg + # get file ids + file_ids = self.tape_file_mapping[tape] + for file_id in file_ids: + self.files_recall_failed[pyslk.get_resource_path(file_id)] = msg + elif tape_status == "AVAILABLE": + # start new job + msg = f"Tape {tape} is available. Starting recall from tape." + logger.debug(msg) + # get file ids + file_ids = self.tape_file_mapping[tape] + # really start new job here + job_id = pyslk.recall_single(file_ids, resource_ids=True) + logger.info(f"Recall job started for tape {tape}: {str(job_id)}") + # bijective job id <-> tape + self.job_tape_mapping[job_id] = tape + # tape -> multiple job ids + self.tape_job_mapping[tape].append(job_id) + # just the active tapes + self.tapes_active.add(tape) + # append list of files which recall started to respective lists + for file_id in file_ids: + file_name_tmp: str = str(pyslk.get_resource_path(file_id)) + self.files_recall_started.append(file_name_tmp) + self._files_recall_newly_started.append(file_name_tmp) + else: + # unexpected state; log warning message; but do nothing else + msg = ( + f"Tape {tape} has unexpected state: {tape_status}. Do not " + + "proceed with this tape." + ) + logger.error(msg) + self.tapes_failed[tape] = msg + # get file ids + file_ids = self.tape_file_mapping[tape] + for file_id in file_ids: + self.files_recall_failed[pyslk.get_resource_path(file_id)] = msg + + def _start_split_files_recall_jobs(self, multi_tape_files_available: list) -> None: + job_id: int + file_id: int + msg: str + tmp_tapes_available: List[bool] + tmp_tapes: List[str] + # for loop over file ids + for file_id in multi_tape_files_available: + if ( + len(self.job_tape_mapping) + len(self.job_multi_tape_file_mapping) + >= MAX_PARALLEL_RECALLS + ): + logger.debug( + "Submitting no additional recalls because max number of " + + "parallel recalls has been reached." + ) + break + logger.debug("Considering file %i for next recall.", file_id) + # TEST file: + # /arch/pd1309/forcings/reanalyses/ERA5/year2009/ERA5_2009_09_part5.tar + tmp_tapes = list() + tmp_tapes_available = list() + for tape_id, tape_barcode in pyslk.get_resource_tape( + pyslk.get_resource_path(file_id) + ).items(): + tmp_tapes.append(tape_barcode) + if len(tmp_tapes) < 2: + msg = ( + f"File {file_id} is in list of split files but it seems to be " + + f"stored on {len(tmp_tapes)} tape." + ) + logger.error(msg) + raise pyslk.PySlkException(msg) + logger.debug(f"File {file_id} on tapes: {', '.join(tmp_tapes)}") + for tape in tmp_tapes: + # go through tape list and start new recalls + tape_status = pyslk.get_tape_status(tape) + if tape_status == "BLOCKED": + # do nothing + msg = ( + f"Tape {tape} is blocked (file {file_id}). Skip split " + + "file until next time." + ) + logger.debug(msg) + tmp_tapes_available.append(False) + elif tape_status == "FAILED": + msg = ( + f"Tape {tape} is in failed state (file {file_id}). Do not " + + "proceed getting split file." + ) + logger.error(msg) + self.multi_tape_files_failed[file_id] = msg + self.files_recall_failed[pyslk.get_resource_path(file_id)] = msg + tmp_tapes_available.append(False) + elif tape_status == "AVAILABLE": + # start new job + msg = f"Tape {tape} is available (file {file_id})." + logger.debug(msg) + tmp_tapes_available.append(True) + else: + # unexpected state; log warning message; but do nothing else + msg = ( + f"Tape {tape} has unexpected state (file {file_id}): " + + f"{tape_status}. Do not proceed with this tape." + ) + logger.error(msg) + self.multi_tape_files_failed[file_id] = msg + self.files_recall_failed[pyslk.get_resource_path(file_id)] = msg + tmp_tapes_available.append(False) + if all(tmp_tapes_available): + # really start new job here + job_id = pyslk.recall_single(file_id, resource_ids=True) + # bijective job id <-> tape + self.job_multi_tape_file_mapping[job_id] = file_id + # tape -> multiple job ids + self.multi_tape_file_job_mapping[file_id].append(job_id) + # just the active tapes + self.multi_tape_files_active.add(file_id) + + def recall_of_file_failed(self, file_path: str) -> bool: + return file_path in self.files_recall_failed.keys() + + def number_active_jobs(self) -> int: + return len(self.job_tape_mapping) + len(self.job_multi_tape_file_mapping) + + def number_files_in_active_jobs(self) -> int: + return sum( + [ + len(self.tape_file_mapping[tape]) + for tape in self.job_tape_mapping.values() + ] + ) + len(self.job_multi_tape_file_mapping) + + def get_files_recall_newly_started(self) -> list[str]: + output: list[str] = self._files_recall_newly_started + self._files_recall_newly_started = list() + return output + + +class SLKRetrieval: + + def __init__( + self, + slk_recall: SLKRecall, + retrieve_files_corrected: list[tuple[str, str]], + files_retrieval_failed: dict[str, str], + file_permissions: int, + ) -> None: + self.slk_recall: SLKRecall = slk_recall + self.retrieve_files_corrected: list[tuple[str, str]] = retrieve_files_corrected + # self.files_retrieval_reasonable: set[str] = set( + # [inp_file for inp_file, out_dir in self.retrieve_files_corrected] + # ) + self.files_retrieval_destination: dict[str, str] = { + inp_file: out_dir for inp_file, out_dir in self.retrieve_files_corrected + } + self.files_retrieval_requested: set[str] = { + inp_file for inp_file, out_dir in self.retrieve_files_corrected + } + self.files_retrieval_reasonable: set[str] = set( + slk_recall.files_cached_from_beginning + ) + self.files_retrieval_failed: dict[str, str] = files_retrieval_failed + self.files_retrieval_succeeded: list[str] = list() + self.file_permissions: int = file_permissions + self.recall_timer: float = time.time() + + def number_files_still_to_be_retrieved_in_total(self) -> int: + return len(self.files_retrieval_requested) + + def number_files_still_to_be_retrieved_realistically(self) -> int: + return len( + [ + file_path + for file_path in self.files_retrieval_requested + if not ( + self.slk_recall.recall_of_file_failed(file_path) + or file_path in self.files_retrieval_failed + ) + ] + ) + + def run_retrieval(self) -> None: + logger.info("Retrieving files started") + retrieve_counter: int = 0 + files_retrieval_done: set = set() + inp_file: str + for inp_file in self.files_retrieval_requested: + if inp_file in self.files_retrieval_reasonable: + # check if recalls need to be started before retrieving + # check every 5 minutes whether additional recalls need to be started + if time.time() - self.recall_timer > 300: + self.slk_recall.start_recalls() + self.recall_timer = time.time() + # append files which recall started to 'files_retrieval_reasonable' + self.files_retrieval_reasonable.update( + set(self.slk_recall.get_files_recall_newly_started()) + ) + # skip files which do not need to be retrieved anymore + out_dir: str = self.files_retrieval_destination[inp_file] + Path(out_dir).mkdir( + parents=True, exist_ok=True, mode=self.file_permissions + ) + # check if file should be retrieved or not + output_dry_retrieve = pyslk.retrieve_improved( + inp_file, out_dir, dry_run=True, preserve_path=False + ) + self._eval_output_dry_retrieval( + output_dry_retrieve, + inp_file, + out_dir, + retrieve_counter, + files_retrieval_done, + ) + for inp_file in files_retrieval_done: + self.files_retrieval_requested.remove(inp_file) + self.files_retrieval_succeeded.append(str(inp_file)) + + if retrieve_counter == 0: + logger.info("No files retrieved") + else: + logger.info(f"{retrieve_counter} files retrieved") + + def _eval_output_dry_retrieval( + self, + output_retrieve: dict, + inp_file: str, + out_dir: str, + retrieve_counter: int, + files_retrieval_done: set, + ) -> None: + # example output of pyslk.retrieve_improved: + """ + { + 'SKIPPED': { + 'SKIPPED_TARGET_EXISTS': ['/arch/bm0146/k204221/iow/INDEX.txt'] + }, + 'FILES': { + '/arch/bm0146/k204221/iow/INDEX.txt': '/home/k204221/tmp/INDEX.txt'} + } + + # dry run + { + 'ENVISAGED': {'ENVISAGED': ['/arch/bm0146/k204221/iow/INDEX.txt']}, + 'FILES': { + '/arch/bm0146/k204221/iow/INDEX.txt': + '/home/k204221/tmp/abcdef2/INDEX.txt'} + } + + # after successful retrieval + { + 'ENVISAGED': { + 'ENVISAGED': [] + }, + 'FILES': { + '/arch/bm0146/k204221/iow/INDEX.txt': + '/home/k204221/tmp/INDEX.txt' + }, + 'SUCCESS': { + 'SUCCESS': + ['/arch/bm0146/k204221/iow/INDEX.txt'] + } + } + + { + 'FAILED': { + 'FAILED_NOT_CACHED': ['/arch/bm0146/k204221/iow/iow_data5_001.tar'] + }, + 'FILES': { + '/arch/bm0146/k204221/iow/iow_data5_001.tar': + '/home/k204221/tmp/iow_data5_001.tar' + } + } + """ + if "ENVISAGED" in output_retrieve: + # we can try to retrieve the file + # message on which file is retrieved to where + logger.debug(f"Retrieving file {inp_file} to {out_dir}") + # new retrieve command + output_retrieve = pyslk.retrieve_improved( + inp_file, out_dir, dry_run=False, preserve_path=False + ) + if "SUCCESS" in output_retrieve: + # check if file was successfully retrieved + logger.debug( + f"File {inp_file} was successfully retrieved to {out_dir}." + ) + logger.debug("Adjusting file permissions") + Path( + os.path.join(os.path.expanduser(out_dir), Path(inp_file).name) + ).chmod(self.file_permissions) + self.files_retrieval_reasonable.remove(inp_file) + files_retrieval_done.add(inp_file) + retrieve_counter = retrieve_counter + 1 + return + + # check if file should be skipped + if "SKIPPED" in output_retrieve: + logger.debug(f"File {inp_file} does already exist in {out_dir}. Skip.") + self.files_retrieval_reasonable.remove(inp_file) + files_retrieval_done.add(inp_file) + elif "FAILED" in output_retrieve: + # check if file somehow cannot be retrieved + if "FAILED_NOT_CACHED" in output_retrieve["FAILED"]: + logger.debug(f"File {inp_file} is not cached yet. Retry later.") + else: + logger.error( + f"File {inp_file} cannot be retrieved for unknown reasons. " + + "Ignore." + ) + self.files_retrieval_reasonable.remove(inp_file) + self.files_retrieval_failed[inp_file] = next( + iter(output_retrieve["FAILED"]) + ) + else: + logger.error( + f"Retrieval request for file {inp_file} yielded unexpected output. " + + f"Ignore. Output: {output_retrieve}" + ) + self.files_retrieval_failed[inp_file] = ( + "unexpected JSON output of pyslk.retrieve_improved: " + + f"{json.dumps(output_retrieve)}" + ) + self.files_retrieval_reasonable.remove(inp_file) + + +def _write_file_lists( + slk_recall: SLKRecall, slk_retrieval: SLKRetrieval, slk_cache: Path +) -> None: + missing_files: list[str] = [ + file_path + for file_path in slk_retrieval.files_retrieval_reasonable + if file_path not in slk_recall.files_recall_failed.keys() + and file_path not in slk_retrieval.files_retrieval_failed.keys() + ] + tmp_str: str + if ( + len(slk_retrieval.files_retrieval_reasonable) > 0 + or len(slk_recall.files_recall_failed.keys()) > 0 + or len(slk_retrieval.files_retrieval_failed.keys()) > 0 + ): + timestamp: str = datetime.now().strftime("%Y%m%dT%H%M%S") + file_failed_base: str = f"files_failed_{timestamp}" + file_failed_recall: str = f"{file_failed_base}_recall.txt" + file_failed_retrieve: str = f"{file_failed_base}_retrieve.txt" + file_failed_other: str = f"{file_failed_base}_other.txt" + logger.error( + "One or more files could not be retrieved from the tape archive. They " + + f"are printed below and written into files '{file_failed_base}_*.txt'" + + f"in directory '{str(slk_cache)}'." ) + if len(slk_recall.files_recall_failed) > 0: + tmp_str = "\n ".join(slk_recall.files_recall_failed) + logger.error(f"files, recall failed:\n {tmp_str}") + with open(os.path.join(slk_cache, file_failed_recall), "w") as f: + for file_path, reason in slk_recall.files_recall_failed.items(): + f.write(f"{file_path}: {reason}\n") + if len(slk_retrieval.files_retrieval_failed) > 0: + tmp_str = "\n ".join(slk_retrieval.files_retrieval_failed) + logger.error(f"files, retrieval failed (recall successful):\n {tmp_str}") + with open(os.path.join(slk_cache, file_failed_retrieve), "w") as f: + for ( + file_path, + reason, + ) in slk_retrieval.files_retrieval_failed.items(): + f.write(f"{file_path}: {reason}\n") + if len(missing_files) > 0: + tmp_str = "\n ".join(missing_files) + logger.error(f"files, missing for other reasons:\n {tmp_str}") + with open(os.path.join(slk_cache, file_failed_other), "w") as f: + for file_path in missing_files: + f.write(f"{file_path}: failed for unknown reasons\n") + + +def _mkdirs(path: Union[str, Path], dir_permissions: int) -> None: + rp = os.path.realpath(path) + if os.access(rp, os.F_OK): + if not os.access(rp, os.W_OK): + raise PermissionError( + f"Cannot write to directory, {rp}, needed for downloading data. " + + "Probably, you lack access privileges." + ) + return + components = Path(rp).parts[1:] + for i in range(len(components)): + subpath = Path("/", *components[: i + 1]) + if not os.access(subpath, os.F_OK): + try: + os.mkdir(subpath) + except PermissionError as e: + raise PermissionError( + f"Cannot create or access directory, {e.filename}, needed for " + + "downloading data." + ) + os.chmod(subpath, dir_permissions) + + +def _reformat_retrieve_files_list( + retrieve_files: list[tuple[str, str]], dir_permissions: int +) -> list[tuple[str, str]]: + retrieve_files_corrected: list[tuple[str, str]] = list() + for inp_file, out_dir in retrieve_files: + _mkdirs(out_dir, dir_permissions) + # this `mkdir` indirectly sets proper access permissions for this folder + out_file: str = os.path.join(os.path.expanduser(out_dir), Path(inp_file).name) + if os.path.exists(out_file): + details_inp_file = pyslk.list_clone_file( + inp_file, print_timestamps_as_seconds_since_1970=True + ) + size_out_file = os.path.getsize(out_file) + mtime_out_file = os.path.getmtime(out_file) + if ( + int(details_inp_file.filesize.iloc[0]) == size_out_file + and int(details_inp_file.timestamp_mtime.iloc[0]) == mtime_out_file + ): + # do not retrieve file because it exists already in destination and has + # same size and timestamp + continue + retrieve_files_corrected.append((str(inp_file), str(out_dir))) + + return retrieve_files_corrected diff --git a/slkspec/tests/conftest.py b/slkspec/tests/conftest.py index 18bf9f1..8bcbd46 100644 --- a/slkspec/tests/conftest.py +++ b/slkspec/tests/conftest.py @@ -3,11 +3,13 @@ from __future__ import annotations import builtins +import inspect import shutil +from datetime import datetime from pathlib import Path from subprocess import PIPE, run from tempfile import TemporaryDirectory -from typing import Generator +from typing import Generator, Optional, Union import mock import numpy as np @@ -15,12 +17,244 @@ import pytest import xarray as xr +PYSLK_DEFAULT_LIST_COLUMNS = [ + "permissions", + "owner", + "group", + "filesize", + "timestamp", + "filename", +] + class SLKMock: """A mock that emulates what pyslk is doing.""" + class StatusJob: + """mock the pyslk.StatusJob""" + + PAUSED, QUEUED, PROCESSING, COMPLETED, SUCCESSFUL, FAILED, ABORTED = range( + -4, 3 + ) + STATI = {} + + # PROCESSING includes COMPLETING + # ABORTED includes ABORTING + + def __init__(self, status_str: str): + """ + Converts a string representing the status of a recall job into a + status_job object + + Possible values for status_str are: + PAUSED, PAUSING, QUEUED, PROCESSING, COMPLETED, COMPLETING, + ABORTED, ABORTING + + :param status_str: status of a recall job + :type status_str: str + """ + if not isinstance(status_str, str): + raise TypeError( + f"pyslk.StatusJob.{inspect.stack()[0][3]}: wrong type " + + "provided; need 'str'; " + + f"got '{type(status_str).__name__}'" + ) + + self.status: int + + self.STATI[self.PAUSED] = "PAUSED" + self.STATI[self.QUEUED] = "QUEUED" + self.STATI[self.PROCESSING] = "PROCESSING" + self.STATI[self.COMPLETED] = "COMPLETED" + self.STATI[self.SUCCESSFUL] = "SUCCESSFUL" + self.STATI[self.FAILED] = "FAILED" + self.STATI[self.ABORTED] = "ABORTED" + + if status_str == "SUCCESSFUL": + self.status = self.SUCCESSFUL + elif status_str == "FAILED": + self.status = self.FAILED + elif status_str == "COMPLETED": + self.status = self.COMPLETED + elif status_str in ["PROCESSING", "COMPLETING"]: + self.status = self.PROCESSING + elif status_str[0:6] == "QUEUED": + self.status = self.QUEUED + elif status_str in ["PAUSED", "PAUSING"]: + self.status = self.PAUSED + elif status_str in ["ABORTED", "ABORTING"]: + self.status = self.ABORTED + else: + raise ValueError( + f"pyslk.StatusJob.{inspect.stack()[0][3]}: provided " + + f"status cannot be processed: {status_str}; " + + "please contact support@dkrz.de" + ) + + def get_status(self) -> int: + """ + return the status as integer + + Meaning of the output + * -4: PAUSED / PAUSING + * -3: QUEUED + * -2: PROCESSING / COMPLETING + * -1: COMPLETED + * 0: SUCCESSFUL + * 1: FAILED + * 2: ABORTED / ABORTING + + :return: status as integer value (-4 to 2) + :rtype: int + """ + return self.status + + def get_status_name(self) -> str: + return self.__str__() + + def __str__(self) -> str: + return self.STATI[self.status] + + def get_possible_stati(self) -> dict: + return self.STATI + + def is_paused(self) -> bool: + return self.status == self.PAUSED + + def is_queued(self) -> bool: + return self.status == self.QUEUED + + def is_processing(self) -> bool: + return self.status == self.PROCESSING + + def is_finished(self) -> bool: + return self.status in [ + self.COMPLETED, + self.SUCCESSFUL, + self.FAILED, + self.ABORTED, + ] + + def is_successful(self) -> bool: + return self.status == self.SUCCESSFUL + + def is_completed(self) -> bool: + return self.status == self.COMPLETED + + def has_failed(self) -> bool: + return self.status in [self.FAILED, self.ABORTED] + + def __eq__(self, other: any) -> bool: + if isinstance(other, "StatusJob"): + return self.status == other.get_status() + if isinstance(other, int): + return self.status == other + raise TypeError( + f"pyslk.StatusJob.{inspect.stack()[0][3]}: wrong type " + + "provided; need 'int' or 'StatusJob'; " + + f"got '{type(other).__name__}'" + ) + + def __ne__(self, other: any) -> bool: + if isinstance(other, "StatusJob"): + return self.status != other.get_status() + if isinstance(other, int): + return self.status != other + raise TypeError( + f"pyslk.StatusJob.{inspect.stack()[0][3]}: wrong type " + + "provided; need 'int' or 'StatusJob'; " + + f"got '{type(other).__name__}'" + ) + + tape_barcode2id: dict[str, int] = { + "M12345M8": 12345, + "M23456M8": 23456, + "M34567M8": 34567, + } + tape_status_default: dict[int, str] = { + "M12345M8": "AVAILABLE", + "M23456M8": "AVAILABLE", + "M34567M8": "AVAILABLE", + } + tape_job_ids: dict[str, int] = { + "M12345M8": 123456, + "M23456M8": 234567, + "M34567M8": 345678, + } + job_stati: dict[str, list[StatusJob]] = { + "123456": [ + StatusJob("PROCESSING"), + StatusJob("PROCESSING"), + StatusJob("SUCCESSFUL"), + ], + "234567": [ + StatusJob("QUEUED"), + StatusJob("PROCESSING"), + StatusJob("PROCESSING"), + StatusJob("SUCCESSFUL"), + ], + "345678": [ + StatusJob("PROCESSING"), + StatusJob("PROCESSING"), + StatusJob("SUCCESSFUL"), + ], + } + tape_files: dict[str, list[str]] = { + "M12345M8": [ + "/arch/ab1234/c567890/fileA.nc", + "/arch/ab1234/c567890/fileB.nc", + "/arch/ab1234/c567890/fileC.nc", + ], + "M23456M8": [ + "/arch/ab1234/c567890/fileD.nc", + "/arch/ab1234/c567890/fileE.nc", + "/arch/ab1234/c567890/fileF.nc", + ], + "M34567M8": [ + "/arch/ab1234/c567890/fileG.nc", + "/arch/ab1234/c567890/fileH.nc", + "/arch/ab1234/c567890/fileI.nc", + ], + } + files_path2id: dict[str, int] = { + "/arch/ab1234/c567890/fileA.nc": 40000001010, + "/arch/ab1234/c567890/fileB.nc": 40000002010, + "/arch/ab1234/c567890/fileC.nc": 40000003010, + "/arch/ab1234/c567890/fileD.nc": 40000004010, + "/arch/ab1234/c567890/fileE.nc": 40000005010, + "/arch/ab1234/c567890/fileF.nc": 40000006010, + "/arch/ab1234/c567890/fileG.nc": 40000007010, + "/arch/ab1234/c567890/fileH.nc": 40000008010, + "/arch/ab1234/c567890/fileI.nc": 40000009010, + } + def __init__(self, _cache: dict[int, builtins.list[str]] = {}) -> None: self._cache = _cache + self.resource_tape: dict[str, str] = dict() + self.files_special: list[str] = list() + self.tape_special_ids: list[int] = list() + for k, v in self.tape_files.items(): + for i in v: + self.resource_tape[i] = k + self.files_special.append(i) + for k, v in self.tape_barcode2id.items(): + self.tape_special_ids.append(v) + self.job_counters: dict[str, int] = dict() + self.job_active: dict[str, bool] = dict() + for k in self.job_counters.keys(): + self.job_counters[str(k)] = 0 + self.job_active[str(k)] = False + # 80000000000 + resource_counter*1000 + self.resource_counter: int = 0 + # 400000 + job_counter + self.job_counter: int = 0 + # 40000 + tape_counter + self.tape_counter: int = 0 + self.resource_id2path: dict[str, str] = dict() + self.resource_path2id: dict[str, int] = dict() + for k, v in self.files_path2id.items(): + self.resource_id2path[str(v)] = k + self.resource_path2id[k] = v def slk_list(self, inp_path: str) -> str: """Mock the slk_list method.""" @@ -31,7 +265,7 @@ def slk_list(self, inp_path: str) -> str: ) return "\n".join(res[1:] + [res[0]]) - def search(self, inp_f: builtins.list[str]) -> int | None: + def search(self, inp_f: list[str]) -> int | None: """Mock slk_search.""" if not inp_f: return None @@ -39,19 +273,429 @@ def search(self, inp_f: builtins.list[str]) -> int | None: self._cache[hash_value] = inp_f return hash_value - def slk_gen_file_query(self, inp_files: builtins.list[str]) -> builtins.list[str]: - """Mock slk_gen_file_qeury.""" - return [f for f in inp_files if Path(f).exists()] - - def slk_retrieve(self, search_id: int, out_dir: str, preserve_path: bool) -> None: - """Mock slk_retrieve.""" - for inp_file in map(Path, self._cache[search_id]): - if preserve_path: - outfile = Path(out_dir) / Path(str(inp_file).strip(inp_file.root)) - outfile.parent.mkdir(parents=True, exist_ok=True) - shutil.copy(inp_file, outfile) + def ls( + self, + path_or_id: Union[ + str, + int, + Path, + list[str], + list[int], + list[Path], + set[str], + set[int], + set[Path], + ], + show_hidden: bool = False, + numeric_ids: bool = False, + recursive: bool = False, + column_names: list = PYSLK_DEFAULT_LIST_COLUMNS, + parse_dates: bool = True, + parse_sizes: bool = True, + full_path: bool = True, + ) -> pd.DataFrame: + """Mock slk_ls.""" + return pd.DataFrame( + [ + [ + "-rwxr-xr-x-", + "k204221", + "bm0146", + 1268945, + datetime.now(), + "/arch/bm0146/k204221/iow/INDEX.txt", + ] + ], + columns=column_names, + ) + + def is_cached(self, resource_path: Union[Path, str]) -> bool: + """Mock pyslk.is_cached.""" + if not self._are_resources_special(resource_path): + return True + else: + tape_barcode: str = self.resource_tape[str(resource_path)] + job_id: int = self.tape_job_ids[tape_barcode] + if self.get_job_status(job_id).is_finished(): + return True + else: + return False + + def group_files_by_tape( + self, + resource_path: Union[Path, str, list, None] = None, + resource_ids: Union[str, list, int, None] = None, + search_id: Union[str, int, None] = None, + search_query: Union[str, None] = None, + recursive: bool = False, + max_tape_number_per_search: int = -1, + run_search_query: bool = False, + evaluate_regex_in_input: bool = False, + ) -> list[dict]: + """Mock slk_group_files_by_tape.""" + if resource_path is None: + raise ValueError("'None' for 'resource_path' not implemented in mock") + if isinstance(resource_path, (Path, str)): + return self.group_files_by_tape( + resource_path=[resource_path], + resource_ids=resource_ids, + search_id=search_id, + search_query=search_query, + recursive=recursive, + max_tape_number_per_search=max_tape_number_per_search, + run_search_query=run_search_query, + evaluate_regex_in_input=evaluate_regex_in_input, + ) + # define output + tmp_result: dict[str, dict] = dict() + result: list[dict] = [] + # iterate resources + for resource in resource_path: + # check if file is cached + if self.is_cached(resource): + # if 'cached' entry in tmp_results => append resource + # if not => create entry + if "cached" in tmp_result: + tmp_result["cached"]["file_count"] = ( + tmp_result["cached"]["file_count"] + 1 + ) + tmp_result["cached"]["files"].append(resource) + tmp_result["cached"]["file_ids"].append( + self.get_resource_id(resource) + ) + else: + tmp_result["cached"] = { + "id": -1, + "location": "cache", + "description": "files currently stored in the HSM cache", + "barcode": "", + "status": "", + "file_count": 1, + "files": [resource], + "file_ids": [self.get_resource_id(resource)], + } + else: + # this resource is not cached but has to be retrieved from a tape + tape_barcode: str = self.resource_tape[str(resource)] + if tape_barcode in tmp_result: + tmp_result[tape_barcode]["file_count"] = ( + tmp_result[tape_barcode]["file_count"] + 1 + ) + tmp_result[tape_barcode]["files"].append(resource) + tmp_result[tape_barcode]["file_ids"].append( + self.get_resource_id(resource) + ) + else: + tmp_result[tape_barcode] = { + "id": self.get_tape_id(tape_barcode), + "location": "tape", + "description": "files currently stored in tape", + "barcode": tape_barcode, + "status": "", + "file_count": 1, + "files": [resource], + "file_ids": [self.get_resource_id(resource)], + } + for v in tmp_result.values(): + result.append(v) + + return result + + def get_tape_status(self, tape: int | str) -> str | None: + special_barcode: str | None = "" + # check if input tape is in list of special tapes + if isinstance(tape, str): + if tape in self.tape_barcode2id.keys(): + special_barcode = tape + elif isinstance(tape, int): + if tape in self.tape_special_ids: + special_barcode = self.get_tape_barcode[tape] + if special_barcode is not None: + job_id: int = self.tape_job_ids[special_barcode] + # if job status indicates that the job is not finished, + # then tape is blocked + if not self._get_job_status(job_id).is_finished(): + return "BLOCKED" + # all else: + return "AVAILABLE" + + def _get_job_status(self, job_id: int) -> StatusJob | None: + if str(job_id) in self.job_active and self.job_active[str(job_id)]: + return self.job_stati[str(job_id)][self.job_counters[str(job_id)]] + # default + return self.StatusJob("SUCCESS") + + def get_job_status(self, job_id: int) -> StatusJob | None: + tmp_job_status: self.StatusJob = self._get_job_status(job_id) + # if job is active then increment the job status counter by one + if str(job_id) in self.job_active and self.job_active[str(job_id)]: + self.job_counter = self.job_counter + 1 + # set job to inactive when all stati were iterated + if self.job_counter >= len(self.job_stati[str(job_id)]): + self.job_active[str(job_id)] = False + # StatusJob( + return tmp_job_status + + def _are_resources_special( + self, + resources: ( + Path + | str + | int + | list[Path] + | list[str] + | list[int] + | set[Path] + | set[str] + | set[int] + ), + ) -> bool: + output: bool = False + if isinstance(resources, (Path, str, int)): + resource_path: str + if isinstance(resources, (Path, str)): + resource_path = str(resources) + else: + resource_path = self.get_resource_path(resources) + if resource_path in self.files_path2id: + output = True + else: + for resource in resources: + output = output or self._are_resources_special(resource) + return output + + def _get_tape_of_special_resources( + self, + resources: ( + Path + | str + | int + | list[Path] + | list[str] + | list[int] + | set[Path] + | set[str] + | set[int] + ), + ) -> str: + output: str + tmp_tape: str + if isinstance(resources, (Path, str, int)): + resource_path: str + if isinstance(resources, (Path, str)): + resource_path = str(resources) else: - shutil.copy(inp_file, Path(out_dir) / inp_file.name) + resource_path = self.get_resource_path(resources) + output = self.get_resource_tape(resource_path) + else: + for resource in resources: + tmp_tape = self._get_tape_of_special_resources(resource) + if output is None: + output = tmp_tape + else: + if output != tmp_tape: + raise ValueError("All resources must be in the same tape") + return output + + def recall_single( + self, + resources: ( + Path + | str + | int + | list[Path] + | list[str] + | list[int] + | set[Path] + | set[str] + | set[int] + ), + destination: Path | str | None = None, + resource_ids: bool = False, + search_id: bool = False, + recursive: bool = False, + preserve_path: bool = True, + ) -> int: + if not self._are_resources_special(resources): + job_id: int = 300000 + self.job_counter + self.job_counter = self.job_counter + 1 + return job_id + else: + return self.tape_job_ids[self._get_tape_of_special_resources(resources)] + + def get_tape_barcode(self, tape_id: int) -> str: + return f"M{tape_id/10}M8" + + def get_tape_id(self, tape_barcode: str) -> int: + return int(tape_barcode[1:6]) * 10 + + def get_resource_tape(self, resource_path: str | Path) -> dict[int, str] | None: + if str(resource_path) in self.resource_tape: + return self.resource_tape[str(resource_path)] + else: + # construct a tape + tmp_tape_id: int = 40000 + self.tape_counter + tmp_tape_barcode: str = self.get_tape_barcode(tmp_tape_id) + self.resource_tape[str(resource_path)] = tmp_tape_barcode + return {tmp_tape_id: tmp_tape_barcode} + + def get_resource_path(self, resource_id: int) -> Path | None: + if str(resource_id) in self.resource_id2path: + return self.resource_id2path[str(resource_id)] + else: + raise ValueError(f"Resource id {resource_id} not found") + + def get_resource_id(self, resource_path: str | Path) -> int | None: + if str(resource_path) in self.resource_path2id: + return self.resource_path2id[str(resource_path)] + else: + # construct a resource + tmp_resource_id: int = 80000000000 + self.resource_counter * 1000 + self.resource_counter = self.resource_counter + 1 + self.resource_path2id[str(resource_path)] = tmp_resource_id + self.resource_id2path[str(tmp_resource_id)] = resource_path + return tmp_resource_id + + def _resource_status(self, resource_path: Path | str) -> tuple[str, str]: + if resource_path not in self.files_special: + return "ENVISAGED", "ENVISAGED" + else: + tape: str = self.resource_tape[str(resource_path)] + job_id: int = self.tape_job_ids[tape] + if self._get_job_status(job_id).is_finished(): + return "ENVISAGED", "ENVISAGED" + else: + return "FAILED", "FAILED_NOT_CACHED" + + def _retrieve( + self, + resource: Path | str | int, + destination: Path | str, + dry_run: bool = False, + force_overwrite: bool = False, + ignore_existing: bool = False, + resource_ids: bool = False, + search_id: bool = False, + recursive: bool = False, + stop_on_failed_retrieval: bool = False, + preserve_path: bool = True, + verbose: bool = False, + ) -> tuple[str, str, str, str, int]: + status1: str + status2: str + resource_id: int + resource_path: Path + # get resource path/id + if isinstance(resource, int): + if resource_ids: + resource_id = resource + resource_path = self.get_resource_path(resource) + else: + raise ValueError( + "If resource_ids is False, resource must not be an int. " + + "Search ids are not implemented yet." + ) + else: + resource_id = self.get_resource_id(resource) + resource_path = Path(resource) + # modify status for special files + status1, status2 = self._resource_status(resource_path) + # set destination paths + outfile: Path + if preserve_path: + outfile = Path(destination) / Path( + str(resource_path).strip(resource_path.root) + ) + else: + outfile = Path(destination) / resource_path.name + # check if destination exists: + if outfile.exists() and status1 != "FAILED" and not force_overwrite: + status1 = "SKIPPED" + status2 = "SKIPPED_TARGET_EXISTS" + # copy resource and change status + if not dry_run and status1 in ["ENVISAGED"]: + shutil.copy(resource_path, outfile) + status1 = "SUCCESS" + status2 = "SUCCESS" + # return + return status1, status2, str(resource_path), str(outfile), resource_id + + def retrieve_improved( + self, + resources: ( + Path + | str + | int + | list[Path] + | list[str] + | list[int] + | set[Path] + | set[str] + | set[int] + ), + destination: Path | str, + dry_run: bool = False, + force_overwrite: bool = False, + ignore_existing: bool = False, + resource_ids: bool = False, + search_id: bool = False, + recursive: bool = False, + stop_on_failed_retrieval: bool = False, + preserve_path: bool = True, + verbose: bool = False, + ) -> Optional[dict] | None: + output: dict = dict() + # structure of tmp_tuple + # (val0, val1, val2, val3, val4) + # val0: str => "ENVISAGED"|"SKIPPED"|"FAILED"|"SUCCESS" + # val1: str => "ENVISAGED"|"SKIPPED_TARGET_EXISTS" + # |"FAILED_NOT_CACHED"|"SUCCESS" + # val2: str => resource path source + # val3: str => resource path destination + # val4: int => resource id + tmp_tuple: tuple[str, str, str, str, int] + if isinstance(resources, (Path, str, int)): + tmp_tuple = self._retrieve( + resources, + destination, + dry_run, + force_overwrite, + ignore_existing, + resource_ids, + search_id, + recursive, + stop_on_failed_retrieval, + preserve_path, + verbose, + ) + output[tmp_tuple[0]] = {tmp_tuple[1]: [tmp_tuple[2]]} + output["FILES"] = {tmp_tuple[2]: tmp_tuple[3]} + elif isinstance(resources, (list, set)): + output["FILES"] = dict() + for resource in resources: + tmp_tuple = self._retrieve( + resource, + destination, + dry_run, + force_overwrite, + ignore_existing, + resource_ids, + search_id, + recursive, + stop_on_failed_retrieval, + preserve_path, + verbose, + ) + if tmp_tuple[0] not in output: + output[tmp_tuple[0]] = dict() + if tmp_tuple[1] not in output[tmp_tuple[0]]: + output[tmp_tuple[0]][tmp_tuple[1]] = list() + output[tmp_tuple[0]][tmp_tuple[1]].append(tmp_tuple[2]) + output["FILES"][tmp_tuple[2]] = tmp_tuple[3] + return output + + class PySlkException(BaseException): + pass def create_data(variable_name: str, size: int) -> xr.Dataset: diff --git a/slkspec/tests/test_open_dataset.py b/slkspec/tests/test_open_dataset.py index 441f323..dc0b93b 100644 --- a/slkspec/tests/test_open_dataset.py +++ b/slkspec/tests/test_open_dataset.py @@ -133,4 +133,4 @@ def test_list_files(patch_dir: Path, netcdf_files: Path) -> None: assert "name" in info assert "type" in info assert "size" in info - assert info["size"] is None + assert isinstance(info["size"], int) diff --git a/slkspec/tests/verify_slk.py b/slkspec/tests/verify_slk.py new file mode 100644 index 0000000..6827a4e --- /dev/null +++ b/slkspec/tests/verify_slk.py @@ -0,0 +1,42 @@ +import os +from pathlib import Path + +import pyslk + +test_files = [ + dict( + name=( + "/arch/bk1040/dyamond_winter_post_processed/ECMWF/IFS-4km/DW-CPL/" + + "atmos/1hr/tas/r1i1p1f1/2d/gn/tas_1hr_IFS-4km_DW-CPL_r1i1p1f1_2" + + "d_gn_20200220000000-20200220230000.nc" + ), + size=0.8215, + query=( + '{"$and":[{"path":{"$gte":"/arch/bk1040/dyamond_winter_post_proce' + + 'ssed/ECMWF/IFS-4km/DW-CPL/atmos/1hr/tas/r1i1p1f1/2d/gn","$max_' + + 'depth":1}},{"resources.name":{"$regex":"tas_1hr_IFS-4km_DW-CPL' + + '_r1i1p1f1_2d_gn_20200220000000-20200220230000.nc"}}]}' + ), + ), + dict( + name=( + "/arch/bk1040/dyamond_winter_post_processed/ECMWF/IFS-4km/DW-CPL/" + + "atmos/1hr/tas/r1i1p1f1/2d/gn/tas_1hr_IFS-4km_DW-CPL_r1i1p1f1_2" + + "d_gn_2020022900000d_gn_20200229000000-20200229000000.nc" + ) + ), +] + + +def test_retrieve_improved() -> None: + import tempfile + + filename = "/arch/bm0146/k204221/iow/INDEX.txt" + td = tempfile.TemporaryDirectory() + pyslk.retrieve_improved(filename, td.name, preserve_path=True) + assert os.stat(Path(td.name, filename[1:])).st_size == 1268945 + td.cleanup() + + +def test_search() -> None: + assert isinstance(pyslk.search(test_files[0]["query"]), int)