Skip to content

Commit e5624e5

Browse files
committed
feat: add scitag support
1 parent 9c79ff5 commit e5624e5

File tree

2 files changed

+168
-9
lines changed

2 files changed

+168
-9
lines changed

src/DIRAC/DataManagementSystem/Client/FTS3Job.py

Lines changed: 57 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,16 @@
22

33
import datetime
44
import errno
5-
from packaging.version import Version
5+
from threading import Lock
66

7+
import fts3.rest.client.easy as fts3
8+
import requests
9+
from cachetools import TTLCache, cached
710

811
# Requires at least version 3.3.3
912
from fts3 import __version__ as fts3_version
10-
import fts3.rest.client.easy as fts3
1113
from fts3.rest.client.exceptions import FTS3ClientException, NotFound
14+
from packaging.version import Version
1215

1316
# There is a breaking change in the API in 3.13
1417
# https://gitlab.cern.ch/fts/fts-rest-flask/-/commit/5faa283e0cd4b80a0139a547c4a6356522c8449d
@@ -22,21 +25,61 @@
2225
# default pycurl. See https://its.cern.ch/jira/browse/FTS-261
2326
from fts3.rest.client.request import Request as ftsSSLRequest
2427

25-
from DIRAC.Resources.Storage.StorageElement import StorageElement
26-
27-
from DIRAC.FrameworkSystem.Client.Logger import gLogger
28-
from DIRAC.FrameworkSystem.Client.TokenManagerClient import gTokenManager
29-
30-
from DIRAC.Core.Utilities.ReturnValues import S_OK, S_ERROR
3128
from DIRAC.Core.Utilities.DErrno import cmpError
32-
3329
from DIRAC.Core.Utilities.JEncode import JSerializable
30+
from DIRAC.Core.Utilities.ReturnValues import S_ERROR, S_OK
3431
from DIRAC.DataManagementSystem.Client.FTS3File import FTS3File
32+
from DIRAC.FrameworkSystem.Client.Logger import gLogger
33+
from DIRAC.FrameworkSystem.Client.TokenManagerClient import gTokenManager
34+
from DIRAC.Resources.Storage.StorageElement import StorageElement
3535

3636
# 3 days in seconds
3737
BRING_ONLINE_TIMEOUT = 259200
3838

3939

40+
# Create a TTL cache: max 1 item, expires after 3600 seconds (1 hour)
41+
scitag_cache = TTLCache(maxsize=10, ttl=3600)
42+
43+
44+
@cached(scitag_cache, lock=Lock())
45+
def get_scitag(vo: str, activity: str = None):
46+
"""
47+
Get the scitag based on the VO and activity.
48+
If the VO is not found in the scitag.json, it defaults to 1.
49+
If no specific activity is provided, it defaults to the "default" activityName.
50+
51+
:param vo: The VO for which to get the scitag
52+
:param activity: The activity for which to get the scitag
53+
:return: The scitag value
54+
"""
55+
56+
# Create a TTL cache: max 1 item, expires after 84400 seconds (1 day)
57+
json_cache = TTLCache(maxsize=1, ttl=86400)
58+
59+
@cached(json_cache, lock=Lock())
60+
def get_remote_json():
61+
gLogger.verbose("Fetching https://scitags.org/api.json from the network")
62+
response = requests.get("https://scitags.org/api.json")
63+
response.raise_for_status()
64+
return response.json()
65+
66+
# Load the JSON data from the cache or network
67+
sj = get_remote_json()
68+
69+
vo_id = 1 # Default VO ID
70+
activity_id = 1 # Default activity ID
71+
72+
for experiment in sj.get("experiments", []):
73+
if experiment.get("expName") == vo.lower():
74+
vo_id = experiment.get("expId")
75+
for act in experiment.get("activities", []):
76+
if act.get("activityName") == activity:
77+
activity_id = act.get("activityId")
78+
79+
# Logic to determine the scitag based on vo and activity (this is what FTS wants)
80+
return vo_id << 6 | activity_id # Example logic, replace with actual implementation
81+
82+
4083
class FTS3Job(JSerializable):
4184
"""Abstract class to represent a job to be executed by FTS. It belongs
4285
to an FTS3Operation
@@ -467,6 +510,9 @@ def _constructTransferJob(self, pinTime, allLFNs, target_spacetoken, protocols=N
467510
# * stageProto://myFile -> stageProto://myFile
468511
# * srcProto://myFile -> destProto://myFile
469512

513+
# scitag 65 is 1 << 6 | 1 (default experiment, default activity)
514+
scitag = get_scitag(vo=self.vo, activity=self.activity)
515+
470516
if stageURL:
471517
# We do not set a fileID in the metadata
472518
# such that we do not update the DB when monitoring
@@ -485,6 +531,7 @@ def _constructTransferJob(self, pinTime, allLFNs, target_spacetoken, protocols=N
485531
filesize=ftsFile.size,
486532
metadata=stageTrans_metadata,
487533
activity=self.activity,
534+
scitag=scitag,
488535
)
489536
transfers.append(stageTrans)
490537

@@ -553,6 +600,7 @@ def _constructTransferJob(self, pinTime, allLFNs, target_spacetoken, protocols=N
553600
activity=self.activity,
554601
source_token=srcToken,
555602
destination_token=dstToken,
603+
scitag=scitag,
556604
)
557605

558606
transfers.append(trans)
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
from unittest.mock import Mock, patch
2+
3+
import pytest
4+
5+
from DIRAC.DataManagementSystem.Client.FTS3Job import get_scitag
6+
7+
8+
@pytest.fixture
9+
def sample_json_response():
10+
"""Sample JSON response for testing."""
11+
return {
12+
"experiments": [
13+
{
14+
"expName": "default",
15+
"expId": 1,
16+
"activities": [
17+
{"activityName": "default", "activityId": 1},
18+
{"activityName": "Data Challenge", "activityId": 4},
19+
],
20+
},
21+
{
22+
"expName": "atlas",
23+
"expId": 100,
24+
"activities": [
25+
{"activityName": "default", "activityId": 1},
26+
{"activityName": "analysis", "activityId": 2},
27+
{"activityName": "production", "activityId": 3},
28+
],
29+
},
30+
{
31+
"expName": "cms",
32+
"expId": 200,
33+
"activities": [{"activityName": "default", "activityId": 1}, {"activityName": "mc", "activityId": 4}],
34+
},
35+
]
36+
}
37+
38+
39+
@pytest.fixture
40+
def mock_requests_success(sample_json_response):
41+
"""Mock successful HTTP request."""
42+
with patch("requests.get") as mock_get:
43+
mock_response = Mock()
44+
mock_response.json.return_value = sample_json_response
45+
mock_response.raise_for_status.return_value = None
46+
mock_get.return_value = mock_response
47+
yield mock_get
48+
49+
50+
class TestGetScitag:
51+
def test_valid_vo_and_activity(self, mock_requests_success):
52+
"""Test get_scitag with valid VO and activity."""
53+
result = get_scitag("atlas", "analysis")
54+
expected = 100 << 6 | 2 # atlas expId=100, analysis activityId=2
55+
assert result == expected
56+
57+
def test_valid_vo_no_activity(self, mock_requests_success):
58+
"""Test get_scitag with valid VO but no specific activity (should use default)."""
59+
result = get_scitag("cms")
60+
expected = 200 << 6 | 1 # cms expId=200, default activityId=1
61+
assert result == expected
62+
63+
def test_invalid_vo(self, mock_requests_success):
64+
"""Test get_scitag with invalid VO (should use default vo_id=1)."""
65+
result = get_scitag("nonexistent")
66+
expected = 1 << 6 | 1 # default vo_id=1, default activity_id=1
67+
assert result == expected
68+
69+
def test_valid_vo_invalid_activity(self, mock_requests_success):
70+
"""Test get_scitag with valid VO but invalid activity."""
71+
result = get_scitag("atlas", "nonexistent_activity")
72+
expected = 100 << 6 | 1 # atlas expId=100, default activity_id=1
73+
assert result == expected
74+
75+
def test_case_insensitive_vo(self, mock_requests_success):
76+
"""Test that VO matching is case insensitive."""
77+
result = get_scitag("ATLAS", "production")
78+
expected = 100 << 6 | 3 # atlas expId=100, production activityId=3
79+
assert result == expected
80+
81+
82+
@pytest.mark.parametrize(
83+
"vo,activity,expected_vo_id,expected_activity_id",
84+
[
85+
("atlas", "analysis", 100, 2),
86+
("atlas", "production", 100, 3),
87+
("cms", "mc", 200, 4),
88+
("cms", "default", 200, 1),
89+
("nonexistent", "any", 1, 1), # defaults
90+
("atlas", "nonexistent", 100, 1), # valid vo, invalid activity
91+
],
92+
)
93+
def test_parametrized_scenarios(mock_requests_success, vo, activity, expected_vo_id, expected_activity_id):
94+
"""Parametrized test for various VO and activity combinations."""
95+
result = get_scitag(vo, activity)
96+
expected = expected_vo_id << 6 | expected_activity_id
97+
assert result == expected
98+
99+
100+
@pytest.mark.parametrize(
101+
"vo,expected_result",
102+
[
103+
("atlas", 100 << 6 | 1), # Should use default activity
104+
("cms", 200 << 6 | 1), # Should use default activity
105+
("unknown", 1 << 6 | 1), # Should use all defaults
106+
],
107+
)
108+
def test_no_activity_parameter(mock_requests_success, vo, expected_result):
109+
"""Test behavior when no activity parameter is provided."""
110+
result = get_scitag(vo)
111+
assert result == expected_result

0 commit comments

Comments
 (0)