Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 57 additions & 9 deletions src/DIRAC/DataManagementSystem/Client/FTS3Job.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@

import datetime
import errno
from packaging.version import Version
from threading import Lock

import fts3.rest.client.easy as fts3
import requests
from cachetools import TTLCache, cached

# Requires at least version 3.3.3
from fts3 import __version__ as fts3_version
import fts3.rest.client.easy as fts3
from fts3.rest.client.exceptions import FTS3ClientException, NotFound
from packaging.version import Version

# There is a breaking change in the API in 3.13
# https://gitlab.cern.ch/fts/fts-rest-flask/-/commit/5faa283e0cd4b80a0139a547c4a6356522c8449d
Expand All @@ -22,21 +25,61 @@
# default pycurl. See https://its.cern.ch/jira/browse/FTS-261
from fts3.rest.client.request import Request as ftsSSLRequest

from DIRAC.Resources.Storage.StorageElement import StorageElement

from DIRAC.FrameworkSystem.Client.Logger import gLogger
from DIRAC.FrameworkSystem.Client.TokenManagerClient import gTokenManager

from DIRAC.Core.Utilities.ReturnValues import S_OK, S_ERROR
from DIRAC.Core.Utilities.DErrno import cmpError

from DIRAC.Core.Utilities.JEncode import JSerializable
from DIRAC.Core.Utilities.ReturnValues import S_ERROR, S_OK
from DIRAC.DataManagementSystem.Client.FTS3File import FTS3File
from DIRAC.FrameworkSystem.Client.Logger import gLogger
from DIRAC.FrameworkSystem.Client.TokenManagerClient import gTokenManager
from DIRAC.Resources.Storage.StorageElement import StorageElement

# 3 days in seconds
BRING_ONLINE_TIMEOUT = 259200


# Create a TTL cache: max 1 item, expires after 3600 seconds (1 hour)
scitag_cache = TTLCache(maxsize=10, ttl=3600)


@cached(scitag_cache, lock=Lock())
def get_scitag(vo: str, activity: str = None):
"""
Get the scitag based on the VO and activity.
If the VO is not found in the scitag.json, it defaults to 1.
If no specific activity is provided, it defaults to the "default" activityName.

:param vo: The VO for which to get the scitag
:param activity: The activity for which to get the scitag
:return: The scitag value
"""

# Create a TTL cache: max 1 item, expires after 84400 seconds (1 day)
json_cache = TTLCache(maxsize=1, ttl=86400)

@cached(json_cache, lock=Lock())
def get_remote_json():
gLogger.verbose("Fetching https://scitags.org/api.json from the network")
response = requests.get("https://scitags.org/api.json")
response.raise_for_status()
return response.json()

# Load the JSON data from the cache or network
sj = get_remote_json()

vo_id = 1 # Default VO ID
activity_id = 1 # Default activity ID

for experiment in sj.get("experiments", []):
if experiment.get("expName") == vo.lower():
vo_id = experiment.get("expId")
for act in experiment.get("activities", []):
if act.get("activityName") == activity:
activity_id = act.get("activityId")

# Logic to determine the scitag based on vo and activity (this is what FTS wants)
return vo_id << 6 | activity_id # Example logic, replace with actual implementation


class FTS3Job(JSerializable):
"""Abstract class to represent a job to be executed by FTS. It belongs
to an FTS3Operation
Expand Down Expand Up @@ -467,6 +510,9 @@ def _constructTransferJob(self, pinTime, allLFNs, target_spacetoken, protocols=N
# * stageProto://myFile -> stageProto://myFile
# * srcProto://myFile -> destProto://myFile

# scitag 65 is 1 << 6 | 1 (default experiment, default activity)
scitag = get_scitag(vo=self.vo, activity=self.activity)

if stageURL:
# We do not set a fileID in the metadata
# such that we do not update the DB when monitoring
Expand All @@ -485,6 +531,7 @@ def _constructTransferJob(self, pinTime, allLFNs, target_spacetoken, protocols=N
filesize=ftsFile.size,
metadata=stageTrans_metadata,
activity=self.activity,
scitag=scitag,
)
transfers.append(stageTrans)

Expand Down Expand Up @@ -553,6 +600,7 @@ def _constructTransferJob(self, pinTime, allLFNs, target_spacetoken, protocols=N
activity=self.activity,
source_token=srcToken,
destination_token=dstToken,
scitag=scitag,
)

transfers.append(trans)
Expand Down
111 changes: 111 additions & 0 deletions src/DIRAC/DataManagementSystem/Client/test/Test_FTS3Job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
from unittest.mock import Mock, patch

import pytest

from DIRAC.DataManagementSystem.Client.FTS3Job import get_scitag


@pytest.fixture
def sample_json_response():
"""Sample JSON response for testing."""
return {
"experiments": [
{
"expName": "default",
"expId": 1,
"activities": [
{"activityName": "default", "activityId": 1},
{"activityName": "Data Challenge", "activityId": 4},
],
},
{
"expName": "atlas",
"expId": 100,
"activities": [
{"activityName": "default", "activityId": 1},
{"activityName": "analysis", "activityId": 2},
{"activityName": "production", "activityId": 3},
],
},
{
"expName": "cms",
"expId": 200,
"activities": [{"activityName": "default", "activityId": 1}, {"activityName": "mc", "activityId": 4}],
},
]
}


@pytest.fixture
def mock_requests_success(sample_json_response):
"""Mock successful HTTP request."""
with patch("requests.get") as mock_get:
mock_response = Mock()
mock_response.json.return_value = sample_json_response
mock_response.raise_for_status.return_value = None
mock_get.return_value = mock_response
yield mock_get


class TestGetScitag:
def test_valid_vo_and_activity(self, mock_requests_success):
"""Test get_scitag with valid VO and activity."""
result = get_scitag("atlas", "analysis")
expected = 100 << 6 | 2 # atlas expId=100, analysis activityId=2
assert result == expected

def test_valid_vo_no_activity(self, mock_requests_success):
"""Test get_scitag with valid VO but no specific activity (should use default)."""
result = get_scitag("cms")
expected = 200 << 6 | 1 # cms expId=200, default activityId=1
assert result == expected

def test_invalid_vo(self, mock_requests_success):
"""Test get_scitag with invalid VO (should use default vo_id=1)."""
result = get_scitag("nonexistent")
expected = 1 << 6 | 1 # default vo_id=1, default activity_id=1
assert result == expected

def test_valid_vo_invalid_activity(self, mock_requests_success):
"""Test get_scitag with valid VO but invalid activity."""
result = get_scitag("atlas", "nonexistent_activity")
expected = 100 << 6 | 1 # atlas expId=100, default activity_id=1
assert result == expected

def test_case_insensitive_vo(self, mock_requests_success):
"""Test that VO matching is case insensitive."""
result = get_scitag("ATLAS", "production")
expected = 100 << 6 | 3 # atlas expId=100, production activityId=3
assert result == expected


@pytest.mark.parametrize(
"vo,activity,expected_vo_id,expected_activity_id",
[
("atlas", "analysis", 100, 2),
("atlas", "production", 100, 3),
("cms", "mc", 200, 4),
("cms", "default", 200, 1),
("nonexistent", "any", 1, 1), # defaults
("atlas", "nonexistent", 100, 1), # valid vo, invalid activity
],
)
def test_parametrized_scenarios(mock_requests_success, vo, activity, expected_vo_id, expected_activity_id):
"""Parametrized test for various VO and activity combinations."""
result = get_scitag(vo, activity)
expected = expected_vo_id << 6 | expected_activity_id
assert result == expected


@pytest.mark.parametrize(
"vo,expected_result",
[
("atlas", 100 << 6 | 1), # Should use default activity
("cms", 200 << 6 | 1), # Should use default activity
("unknown", 1 << 6 | 1), # Should use all defaults
],
)
def test_no_activity_parameter(mock_requests_success, vo, expected_result):
"""Test behavior when no activity parameter is provided."""
result = get_scitag(vo)
assert result == expected_result
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ def generateFTS3Job(sourceSE, targetSE, lfns, multiHopSE=None):
newJob.sourceSE = sourceSE
newJob.targetSE = targetSE
newJob.multiHopSE = multiHopSE
newJob.vo = "lhcb"
filesToSubmit = []

for i, lfn in enumerate(lfns, start=1):
Expand Down
Loading