diff --git a/src/DIRAC/DataManagementSystem/Client/FTS3Job.py b/src/DIRAC/DataManagementSystem/Client/FTS3Job.py index 4d17ca1b430..5416b8c218a 100644 --- a/src/DIRAC/DataManagementSystem/Client/FTS3Job.py +++ b/src/DIRAC/DataManagementSystem/Client/FTS3Job.py @@ -2,13 +2,16 @@ import datetime import errno -from packaging.version import Version +from threading import Lock +import fts3.rest.client.easy as fts3 +import requests +from cachetools import TTLCache, cached # Requires at least version 3.3.3 from fts3 import __version__ as fts3_version -import fts3.rest.client.easy as fts3 from fts3.rest.client.exceptions import FTS3ClientException, NotFound +from packaging.version import Version # There is a breaking change in the API in 3.13 # https://gitlab.cern.ch/fts/fts-rest-flask/-/commit/5faa283e0cd4b80a0139a547c4a6356522c8449d @@ -22,21 +25,61 @@ # default pycurl. See https://its.cern.ch/jira/browse/FTS-261 from fts3.rest.client.request import Request as ftsSSLRequest -from DIRAC.Resources.Storage.StorageElement import StorageElement - -from DIRAC.FrameworkSystem.Client.Logger import gLogger -from DIRAC.FrameworkSystem.Client.TokenManagerClient import gTokenManager - -from DIRAC.Core.Utilities.ReturnValues import S_OK, S_ERROR from DIRAC.Core.Utilities.DErrno import cmpError - from DIRAC.Core.Utilities.JEncode import JSerializable +from DIRAC.Core.Utilities.ReturnValues import S_ERROR, S_OK from DIRAC.DataManagementSystem.Client.FTS3File import FTS3File +from DIRAC.FrameworkSystem.Client.Logger import gLogger +from DIRAC.FrameworkSystem.Client.TokenManagerClient import gTokenManager +from DIRAC.Resources.Storage.StorageElement import StorageElement # 3 days in seconds BRING_ONLINE_TIMEOUT = 259200 +# Create a TTL cache: max 1 item, expires after 3600 seconds (1 hour) +scitag_cache = TTLCache(maxsize=10, ttl=3600) + + +@cached(scitag_cache, lock=Lock()) +def get_scitag(vo: str, activity: str = None): + """ + Get the scitag based on the VO and activity. + If the VO is not found in the scitag.json, it defaults to 1. + If no specific activity is provided, it defaults to the "default" activityName. + + :param vo: The VO for which to get the scitag + :param activity: The activity for which to get the scitag + :return: The scitag value + """ + + # Create a TTL cache: max 1 item, expires after 84400 seconds (1 day) + json_cache = TTLCache(maxsize=1, ttl=86400) + + @cached(json_cache, lock=Lock()) + def get_remote_json(): + gLogger.verbose("Fetching https://scitags.org/api.json from the network") + response = requests.get("https://scitags.org/api.json") + response.raise_for_status() + return response.json() + + # Load the JSON data from the cache or network + sj = get_remote_json() + + vo_id = 1 # Default VO ID + activity_id = 1 # Default activity ID + + for experiment in sj.get("experiments", []): + if experiment.get("expName") == vo.lower(): + vo_id = experiment.get("expId") + for act in experiment.get("activities", []): + if act.get("activityName") == activity: + activity_id = act.get("activityId") + + # Logic to determine the scitag based on vo and activity (this is what FTS wants) + return vo_id << 6 | activity_id # Example logic, replace with actual implementation + + class FTS3Job(JSerializable): """Abstract class to represent a job to be executed by FTS. It belongs to an FTS3Operation @@ -467,6 +510,9 @@ def _constructTransferJob(self, pinTime, allLFNs, target_spacetoken, protocols=N # * stageProto://myFile -> stageProto://myFile # * srcProto://myFile -> destProto://myFile + # scitag 65 is 1 << 6 | 1 (default experiment, default activity) + scitag = get_scitag(vo=self.vo, activity=self.activity) + if stageURL: # We do not set a fileID in the metadata # such that we do not update the DB when monitoring @@ -485,6 +531,7 @@ def _constructTransferJob(self, pinTime, allLFNs, target_spacetoken, protocols=N filesize=ftsFile.size, metadata=stageTrans_metadata, activity=self.activity, + scitag=scitag, ) transfers.append(stageTrans) @@ -553,6 +600,7 @@ def _constructTransferJob(self, pinTime, allLFNs, target_spacetoken, protocols=N activity=self.activity, source_token=srcToken, destination_token=dstToken, + scitag=scitag, ) transfers.append(trans) diff --git a/src/DIRAC/DataManagementSystem/Client/test/Test_FTS3Job.py b/src/DIRAC/DataManagementSystem/Client/test/Test_FTS3Job.py new file mode 100644 index 00000000000..7e98f53db3c --- /dev/null +++ b/src/DIRAC/DataManagementSystem/Client/test/Test_FTS3Job.py @@ -0,0 +1,111 @@ +from unittest.mock import Mock, patch + +import pytest + +from DIRAC.DataManagementSystem.Client.FTS3Job import get_scitag + + +@pytest.fixture +def sample_json_response(): + """Sample JSON response for testing.""" + return { + "experiments": [ + { + "expName": "default", + "expId": 1, + "activities": [ + {"activityName": "default", "activityId": 1}, + {"activityName": "Data Challenge", "activityId": 4}, + ], + }, + { + "expName": "atlas", + "expId": 100, + "activities": [ + {"activityName": "default", "activityId": 1}, + {"activityName": "analysis", "activityId": 2}, + {"activityName": "production", "activityId": 3}, + ], + }, + { + "expName": "cms", + "expId": 200, + "activities": [{"activityName": "default", "activityId": 1}, {"activityName": "mc", "activityId": 4}], + }, + ] + } + + +@pytest.fixture +def mock_requests_success(sample_json_response): + """Mock successful HTTP request.""" + with patch("requests.get") as mock_get: + mock_response = Mock() + mock_response.json.return_value = sample_json_response + mock_response.raise_for_status.return_value = None + mock_get.return_value = mock_response + yield mock_get + + +class TestGetScitag: + def test_valid_vo_and_activity(self, mock_requests_success): + """Test get_scitag with valid VO and activity.""" + result = get_scitag("atlas", "analysis") + expected = 100 << 6 | 2 # atlas expId=100, analysis activityId=2 + assert result == expected + + def test_valid_vo_no_activity(self, mock_requests_success): + """Test get_scitag with valid VO but no specific activity (should use default).""" + result = get_scitag("cms") + expected = 200 << 6 | 1 # cms expId=200, default activityId=1 + assert result == expected + + def test_invalid_vo(self, mock_requests_success): + """Test get_scitag with invalid VO (should use default vo_id=1).""" + result = get_scitag("nonexistent") + expected = 1 << 6 | 1 # default vo_id=1, default activity_id=1 + assert result == expected + + def test_valid_vo_invalid_activity(self, mock_requests_success): + """Test get_scitag with valid VO but invalid activity.""" + result = get_scitag("atlas", "nonexistent_activity") + expected = 100 << 6 | 1 # atlas expId=100, default activity_id=1 + assert result == expected + + def test_case_insensitive_vo(self, mock_requests_success): + """Test that VO matching is case insensitive.""" + result = get_scitag("ATLAS", "production") + expected = 100 << 6 | 3 # atlas expId=100, production activityId=3 + assert result == expected + + +@pytest.mark.parametrize( + "vo,activity,expected_vo_id,expected_activity_id", + [ + ("atlas", "analysis", 100, 2), + ("atlas", "production", 100, 3), + ("cms", "mc", 200, 4), + ("cms", "default", 200, 1), + ("nonexistent", "any", 1, 1), # defaults + ("atlas", "nonexistent", 100, 1), # valid vo, invalid activity + ], +) +def test_parametrized_scenarios(mock_requests_success, vo, activity, expected_vo_id, expected_activity_id): + """Parametrized test for various VO and activity combinations.""" + result = get_scitag(vo, activity) + expected = expected_vo_id << 6 | expected_activity_id + assert result == expected + + +@pytest.mark.parametrize( + "vo,expected_result", + [ + ("atlas", 100 << 6 | 1), # Should use default activity + ("cms", 200 << 6 | 1), # Should use default activity + ("unknown", 1 << 6 | 1), # Should use all defaults + ], +) +def test_no_activity_parameter(mock_requests_success, vo, expected_result): + """Test behavior when no activity parameter is provided.""" + result = get_scitag(vo) + assert result == expected_result diff --git a/src/DIRAC/DataManagementSystem/Client/test/Test_FTS3Objects.py b/src/DIRAC/DataManagementSystem/Client/test/Test_FTS3Objects.py index 6471ec65d0b..020aec5d99b 100644 --- a/src/DIRAC/DataManagementSystem/Client/test/Test_FTS3Objects.py +++ b/src/DIRAC/DataManagementSystem/Client/test/Test_FTS3Objects.py @@ -204,6 +204,7 @@ def generateFTS3Job(sourceSE, targetSE, lfns, multiHopSE=None): newJob.sourceSE = sourceSE newJob.targetSE = targetSE newJob.multiHopSE = multiHopSE + newJob.vo = "lhcb" filesToSubmit = [] for i, lfn in enumerate(lfns, start=1):