Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions docs/deployment/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,14 @@ parameters for the [rabbitMQ](https://github.com/bitnami/charts/tree/master/bitn
| `transformer.persistence.existingClaim` | Existing persistent volume claim | nil |
| `transformer.subdir` | Subdirectory of the mount to write transformer results to (should end with trailing /) | nil |
| `dataLifecycle.enabled` | Enable deployment of data lifecycle jobs | false |
| `dataLifecycle.image` | Default image for data lifecycle job | `sslhep/servicex_minio_cleanup` |
| `dataLifecycle.tag` | Data lifecycle job image tag | |
| `dataLifecycle.image` | Default image for data lifecycle job | `python` |
| `dataLifecycle.tag` | Data lifecycle job image tag | `3.10` |
| `dataLifecycle.pullPolicy` | Data lifecycle image pull policy | `Always` |
| `dataLifecycle.schedule` | Schedule for minioCleanup cronjob. See [reference](https://kubernetes.io/docs/concepts/workloads/controllers/cron-jobs/#cron-schedule-syntax) for details on fields | `* */8 * * *` (every 8 hours) |
| `dataLifecycle.retention` | We will archive any transforms older than this. Use the gnu date command --date argument. See [date command](https://www.geeksforgeeks.org/date-command-linux-examples/#4-how-to-display-past-dates) for examples. | 7 days ago |
| `dataLifecycle.maxDesiredCacheSize` | If the server-side cache is larger than this cleanup service will keep going forward in time to delete transforms. Specify units as Mb Gb, Tb or Pb | "1Tb" |
| `datasetLifecycle.image` | Default image for dataset cache lifecycle job | `curlimages/curl` |
| `datasetLifecycle.tag` | Dataset cache lifecycle job image tag | `8.17.0` |
| `datasetLifecycle.pullPolicy` | Dataset cache lifecycle image pull policy | `Always` |
| `datasetLifecycle.schedule` | Schedule for dataset cache cleanup cronjob. See [reference](https://kubernetes.io/docs/concepts/workloads/controllers/cron-jobs/#cron-schedule-syntax) for details on fields | `0 * * * *` (top of every hour) |
| `datasetLifecycle.cacheLifetime` | Lifetime of dataset cache, in hours | 24 |
24 changes: 24 additions & 0 deletions helm/servicex/templates/dataset-lifecycle/cronjob.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
apiVersion: batch/v1
kind: CronJob
metadata:
name: {{ .Release.Name }}-dataset-lifecycle-job
spec:
schedule: {{ .Values.datasetLifecycle.schedule | default "0 * * * *" | quote }}
concurrencyPolicy: "Forbid"
jobTemplate:
spec:
template:
metadata:
labels:
app: {{ .Release.Name }}-dataset-lifecycle-job
spec:
containers:
- name: {{ .Release.Name }}-dataset-lifecycle-job
image: {{ .Values.datasetLifecycle.image }}:{{ .Values.datasetLifecycle.tag }}
imagePullPolicy: {{ .Values.datasetLifecycle.pullPolicy }}
env:
- name: LIFETIME
value: {{ .Values.datasetLifecycle.cacheLifetime }}
args:
- --request POST "http://{{ .Release.Name }}-servicex-app:8000/servicex/internal/dataset-lifecycle?age=$(LIFETIME)"
restartPolicy: OnFailure
12 changes: 12 additions & 0 deletions helm/servicex/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -265,3 +265,15 @@ dataLifecycle:
# The cleanup service will go beyond the retention date to keep the server side cache size below
# this threshold. Specify this as a string with Mb, Gb, Tb or Pb units in the string
maxDesiredCacheSize: "1Tb"

# This obsoletes the dataset cache for datasets older than "cacheLifetime" hours.
# Does not touch transforms or output files.
datasetLifecycle:
# image should support curl
image: curlimages/curl
tag: "8.17.0"
pullPolicy: Always
schedule: "0 * * * *"

# How long to keep datasets in the cache, in hours
cacheLifetime: 24
28 changes: 16 additions & 12 deletions servicex_app/servicex_app/resources/datasets/delete_dataset.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2024, IRIS-HEP
# Copyright (c) 2024-25, IRIS-HEP
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
Expand Down Expand Up @@ -30,18 +30,22 @@
from servicex_app.resources.servicex_resource import ServiceXResource


class DeleteDataset(ServiceXResource):
@auth_required
def delete(self, dataset_id):
dataset = Dataset.find_by_id(dataset_id)
def delete_dataset(dataset_id):
dataset = Dataset.find_by_id(dataset_id)

if not dataset:
return {"message": f"Dataset {dataset_id} not found"}, 404

if not dataset:
return {"message": f"Dataset {dataset_id} not found"}, 404
if dataset.stale:
return {"message": f"Dataset {dataset_id} has already been deleted"}, 400

if dataset.stale:
return {"message": f"Dataset {dataset_id} has already been deleted"}, 400
dataset.stale = True
dataset.save_to_db()

dataset.stale = True
dataset.save_to_db()
return {"dataset-id": dataset_id, "stale": True}, 200

return {"dataset-id": dataset_id, "stale": True}

class DeleteDataset(ServiceXResource):
@auth_required
def delete(self, dataset_id):
return delete_dataset(dataset_id)
24 changes: 15 additions & 9 deletions servicex_app/servicex_app/resources/datasets/get_all.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2024, IRIS-HEP
# Copyright (c) 2024-25, IRIS-HEP
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
Expand Down Expand Up @@ -31,20 +31,26 @@
from servicex_app.models import Dataset
from servicex_app.resources.servicex_resource import ServiceXResource

from typing import List

parser = reqparse.RequestParser()
parser.add_argument("did-finder", type=str, location="args", required=False)
parser.add_argument("show-deleted", type=bool, location="args", required=False)


def get_all_datasets(args={}) -> List[Dataset]:
show_deleted = args["show-deleted"] if "show-deleted" in args else False
if "did-finder" in args and args["did-finder"]:
did_finder = args["did-finder"]
datasets = Dataset.get_by_did_finder(did_finder, show_deleted)
else:
datasets = Dataset.get_all(show_deleted)

return datasets


class AllDatasets(ServiceXResource):
@auth_required
def get(self):
args = parser.parse_args()
show_deleted = args["show-deleted"] if "show-deleted" in args else False
if "did-finder" in args and args["did-finder"]:
did_finder = args["did-finder"]
datasets = Dataset.get_by_did_finder(did_finder, show_deleted)
else:
datasets = Dataset.get_all(show_deleted)

return {"datasets": [dataset.to_json() for dataset in datasets]}
return {"datasets": [dataset.to_json() for dataset in get_all_datasets(args)]}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# Copyright (c) 2025, IRIS-HEP
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# * Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
from datetime import datetime, timedelta, timezone

from flask import request, current_app

from servicex_app.resources.servicex_resource import ServiceXResource
from ..datasets.get_all import get_all_datasets
from ..datasets.delete_dataset import delete_dataset


class DatasetLifecycleOps(ServiceXResource):
def post(self):
"""
Obsolete cached datasets older than N hours
"""
now = datetime.now(timezone.utc)
try:
age = float(request.get_json().get("age", 24))
except Exception:
return {"message": "Invalid age parameter"}, 422
delta = timedelta(hours=age)
datasets = (
get_all_datasets()
) # by default this will only give non-stale datasets
todelete = [
_.id for _ in datasets if _.last_updated and (now - _.last_updated) > delta
]
current_app.logger.info(
f"Obsoletion called for datasets older than {delta}. "
f"Obsoleting {len(todelete)} datasets."
)
for dataset_id in todelete:
delete_dataset(dataset_id)

return {"message": "Success"}, 200
5 changes: 4 additions & 1 deletion servicex_app/servicex_app/routes.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2019, IRIS-HEP
# Copyright (c) 2019-2025, IRIS-HEP
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
Expand Down Expand Up @@ -32,6 +32,7 @@
from servicex_app.resources.datasets.get_one import OneDataset
from servicex_app.resources.transformation.delete import DeleteTransform
from servicex_app.resources.internal.data_lifecycle_ops import DataLifecycleOps
from servicex_app.resources.internal.dataset_lifecycle_ops import DatasetLifecycleOps


def add_routes(
Expand Down Expand Up @@ -198,3 +199,5 @@ def add_routes(

DataLifecycleOps.make_api(object_store)
api.add_resource(DataLifecycleOps, "/servicex/internal/data-lifecycle")

api.add_resource(DatasetLifecycleOps, "/servicex/internal/dataset-lifecycle")
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
# Copyright (c) 2025, IRIS-HEP
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# * Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
from datetime import datetime, timezone
from unittest.mock import patch

from pytest import fixture

from servicex_app.models import Dataset

from servicex_app_test.resource_test_base import ResourceTestBase


class TestDatasetLifecycle(ResourceTestBase):
@fixture
def fake_dataset_list(self):
with patch(
"servicex_app.resources.internal.dataset_lifecycle_ops.get_all_datasets"
) as dsfunc:
dsfunc.return_value = [
Dataset(
last_used=datetime(2022, 1, 1, tzinfo=timezone.utc),
last_updated=datetime(2022, 1, 1, tzinfo=timezone.utc),
id=1,
name="not-orphaned",
events=100,
size=1000,
n_files=1,
lookup_status="complete",
did_finder="rucio",
),
Dataset(
last_used=datetime.now(timezone.utc),
last_updated=datetime.now(timezone.utc),
id=2,
name="orphaned",
events=100,
size=1000,
n_files=1,
lookup_status="complete",
did_finder="rucio",
),
]
yield dsfunc

def test_fail_on_bad_param(self, client):
with client.application.app_context():
response = client.post(
"/servicex/internal/dataset-lifecycle", json={"age": "string"}
)
assert response.status_code == 422

def test_deletion(self, fake_dataset_list, client):
with client.application.app_context():
with patch(
"servicex_app.resources.internal.dataset_lifecycle_ops.delete_dataset"
) as deletion_obj:
response = client.post(
"/servicex/internal/dataset-lifecycle", json={"age": 24}
)
fake_dataset_list.assert_called_once()
deletion_obj.assert_called_once()
assert response.status_code == 200