diff --git a/servicex/__init__.py b/servicex/__init__.py index 8916dfbc..b9d6f3d8 100644 --- a/servicex/__init__.py +++ b/servicex/__init__.py @@ -28,6 +28,7 @@ from servicex.databinder_models import Sample, General, ServiceXSpec from servicex.servicex_client import deliver, ProgressBarFormat from .models import ResultDestination +from .query_cache_read import read_dir import servicex.dataset as dataset import servicex.query as query import importlib.metadata @@ -48,5 +49,6 @@ "dataset", "query", "ProgressBarFormat", + "read_dir", "__version__", ] diff --git a/servicex/query_cache_read.py b/servicex/query_cache_read.py new file mode 100644 index 00000000..2733f051 --- /dev/null +++ b/servicex/query_cache_read.py @@ -0,0 +1,103 @@ +# Copyright (c) 2026, IRIS-HEP +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# * Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import os +from pathlib import Path +from typing import Optional, Union +from servicex.servicex_client import GuardList +from servicex.query_cache import QueryCache +from servicex.configuration import Configuration +from servicex.models import TransformedResults + + +def read_dir( + path: Optional[Union[str, Path]] = None, + config_path: Optional[Union[str, Path]] = None, + local_preferred: bool = True, +) -> dict[str, GuardList]: + r""" + Return the transformation output results previously saved to a directory. The format + is the same as in the deliver() call. + + This function will not trigger any additional downloads. Either local copies or URLs will + be returned, according to the policy set by `local_preferred`. + + If multiple results with the same name were saved to the same directory, only the most + recent is returned. + + :param path: The directory path to read from, as a string or a :py:class:`Path` object. + If None, the default cache path will be used from the ServiceX configuration. + :param config_path: If `path` is :py:const:`None`, this determines the path from which the + ServiceX configuration will be loaded to determine the default cache path. + :py:const:`None` will search the default paths. + :param local_preferred: Determines the behavior if both downloaded files and remote URL + information is present. If :py:const:`True` (default) then downloaded copies of + files are preferred, if available. If :py:const:`False` then remote URLs will + be preferred, if available. + :return: A dictionary mapping the name of each :py:class:`Sample` to a :py:class:`.GuardList` + with the file names or URLs for the outputs. + """ + + if path is None: + # Load default + config = Configuration.read( + str(config_path) if config_path is not None else None + ) + else: + # create a simple Configuration object with just the cache path + # if directory cache non-existent, do not trigger its creation! + if not os.path.isdir(path): + raise ValueError(f"{path} is not an existing directory") + if not (Path(path) / ".servicex" / "db.json").exists(): + raise RuntimeError( + f"{path} does not contain a valid ServiceX download area" + ) + config = Configuration(api_endpoints=[], cache_path=str(path)) + cache = QueryCache(config) + transforms = cache.cached_queries() + latest_transforms: dict[str, TransformedResults] = {} + for transform in transforms: + if transform.title not in latest_transforms: + latest_transforms[transform.title] = transform + else: + current = latest_transforms[transform.title] + if transform.submit_time > current.submit_time: + latest_transforms[transform.title] = transform + + if local_preferred: + return { + _[0]: GuardList(_[1].file_list if _[1].file_list else _[1].signed_url_list) + for _ in latest_transforms.items() + } + else: + return { + _[0]: GuardList( + _[1].signed_url_list if _[1].signed_url_list else _[1].file_list + ) + for _ in latest_transforms.items() + } diff --git a/tests/test_query_cache_read.py b/tests/test_query_cache_read.py new file mode 100644 index 00000000..76b8e661 --- /dev/null +++ b/tests/test_query_cache_read.py @@ -0,0 +1,163 @@ +# Copyright (c) 2026, IRIS-HEP +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# * Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +import tempfile +import datetime +import pytest +from pathlib import Path + +from servicex.configuration import Configuration +from servicex.query_cache import QueryCache +from servicex.servicex_client import GuardList +from servicex import read_dir + +file_uris = ["/tmp/foo1.root", "/tmp/foo2.root"] +file_uris_2 = ["/tmp/bar1.root", "/tmp/bar2.root"] +remote_urls = ["http://remote/foo1.root", "http://remote/foo2.root"] + + +def test_read_cache(transform_request, completed_status): + with tempfile.TemporaryDirectory() as temp_dir: + config = Configuration(cache_path=temp_dir, api_endpoints=[]) # type: ignore + cache = QueryCache(config) + cache.update_transform_status(transform_request.compute_hash(), "COMPLETE") + cache.cache_transform( + cache.transformed_results( + transform=transform_request, + completed_status=completed_status, + data_dir="/foo/bar", + file_list=file_uris, + signed_urls=remote_urls, + ) + ) + + data = read_dir(temp_dir) + assert isinstance(data["Test submission"], GuardList) + assert str(data) == str({"Test submission": file_uris}) + + data = read_dir(temp_dir, local_preferred=False) + assert isinstance(data["Test submission"], GuardList) + assert str(data) == str({"Test submission": remote_urls}) + + cache.close() + + +def test_read_cache_with_config(transform_request, completed_status, mocker): + mocker.patch("servicex.configuration.getuser", return_value="cache_user") + with tempfile.TemporaryDirectory() as temp_dir: + mocker.patch( + "servicex.configuration.tempfile.gettempdir", return_value=str(temp_dir) + ) + cfg = Path(temp_dir) / "servicex.yaml" + cfg.write_text(f""" +api_endpoints: + - endpoint: http://localhost:5000 + name: localhost +cache_path: {temp_dir} +""") + config = Configuration(cache_path=temp_dir, api_endpoints=[]) # type: ignore + cache = QueryCache(config) + cache.update_transform_status(transform_request.compute_hash(), "COMPLETE") + cache.cache_transform( + cache.transformed_results( + transform=transform_request, + completed_status=completed_status, + data_dir="/foo/bar", + file_list=file_uris, + signed_urls=remote_urls, + ) + ) + + data = read_dir(config_path=cfg) + assert str(data) == str({"Test submission": file_uris}) + cache.close() + + +def test_no_cache(): + with pytest.raises(ValueError): + read_dir("let-us-assume-this-nonexists") + + with tempfile.TemporaryDirectory() as temp_dir: + with pytest.raises(RuntimeError): + read_dir(temp_dir) + + +def test_most_recent_cache(transform_request, completed_status): + with tempfile.TemporaryDirectory() as temp_dir: + config = Configuration(cache_path=temp_dir, api_endpoints=[]) # type: ignore + cache = QueryCache(config) + cache.update_transform_status(transform_request.compute_hash(), "COMPLETE") + cache.cache_transform( + cache.transformed_results( + transform=transform_request, + completed_status=completed_status, + data_dir="/foo/bar", + file_list=file_uris, + signed_urls=[], + ) + ) + + req2 = transform_request.model_copy(update={"codegen": "uproot-2"}) + cache.update_transform_status(req2.compute_hash(), "COMPLETE") + cache.cache_transform( + cache.transformed_results( + transform=req2, + completed_status=completed_status.model_copy( + update={ + "request_id": "02c64494-4529-49a7-a4a6-95661ea3936e", + "submit_time": datetime.datetime( + 2025, 12, 1, tzinfo=datetime.timezone.utc + ), + } + ), + data_dir="/foo/bar", + file_list=file_uris_2, + signed_urls=[], + ) + ) + + req3 = transform_request.model_copy(update={"codegen": "uproot-3"}) + cache.update_transform_status(req3.compute_hash(), "COMPLETE") + cache.cache_transform( + cache.transformed_results( + transform=req3, + completed_status=completed_status.model_copy( + update={ + "request_id": "02c64494-4529-49a7-a4a6-95661ea3936a", + "submit_time": datetime.datetime( + 1970, 12, 1, tzinfo=datetime.timezone.utc + ), + } + ), + data_dir="/foo/bar", + file_list=[], + signed_urls=remote_urls, + ) + ) + + data = read_dir(temp_dir) + assert str(data) == str({"Test submission": file_uris_2}) diff --git a/tests/test_topcp_dataset.py b/tests/test_topcp_dataset.py index a35d02af..1e917ed8 100644 --- a/tests/test_topcp_dataset.py +++ b/tests/test_topcp_dataset.py @@ -111,12 +111,10 @@ def test_no_yaml(): def test_docker_image(tmp_path): reco_file = tmp_path / "reco.yaml" - reco_file.write_text( - """ + reco_file.write_text(""" CommonServices: runSystematics: False - """ - ) + """) docker_image = "my-custom-image:latest" topcp_query = TopCPQuery(reco=reco_file, image=docker_image)