Skip to content

Commit bcf926a

Browse files
committed
feat: add scitag support
1 parent 9c79ff5 commit bcf926a

File tree

3 files changed

+167
-10
lines changed

3 files changed

+167
-10
lines changed

src/DIRAC/DataManagementSystem/Client/FTS3Job.py

Lines changed: 56 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,16 @@
22

33
import datetime
44
import errno
5-
from packaging.version import Version
5+
from threading import Lock
66

7+
import fts3.rest.client.easy as fts3
8+
import requests
9+
from cachetools import TTLCache, cached
710

811
# Requires at least version 3.3.3
912
from fts3 import __version__ as fts3_version
10-
import fts3.rest.client.easy as fts3
1113
from fts3.rest.client.exceptions import FTS3ClientException, NotFound
14+
from packaging.version import Version
1215

1316
# There is a breaking change in the API in 3.13
1417
# https://gitlab.cern.ch/fts/fts-rest-flask/-/commit/5faa283e0cd4b80a0139a547c4a6356522c8449d
@@ -22,21 +25,60 @@
2225
# default pycurl. See https://its.cern.ch/jira/browse/FTS-261
2326
from fts3.rest.client.request import Request as ftsSSLRequest
2427

25-
from DIRAC.Resources.Storage.StorageElement import StorageElement
26-
27-
from DIRAC.FrameworkSystem.Client.Logger import gLogger
28-
from DIRAC.FrameworkSystem.Client.TokenManagerClient import gTokenManager
29-
30-
from DIRAC.Core.Utilities.ReturnValues import S_OK, S_ERROR
3128
from DIRAC.Core.Utilities.DErrno import cmpError
32-
3329
from DIRAC.Core.Utilities.JEncode import JSerializable
30+
from DIRAC.Core.Utilities.ReturnValues import S_ERROR, S_OK
3431
from DIRAC.DataManagementSystem.Client.FTS3File import FTS3File
32+
from DIRAC.FrameworkSystem.Client.Logger import gLogger
33+
from DIRAC.FrameworkSystem.Client.TokenManagerClient import gTokenManager
34+
from DIRAC.Resources.Storage.StorageElement import StorageElement
3535

3636
# 3 days in seconds
3737
BRING_ONLINE_TIMEOUT = 259200
3838

3939

40+
# Create a TTL cache: max 1 item, expires after 3600 seconds (1 hour)
41+
scitag_cache = TTLCache(maxsize=10, ttl=3600, lock=Lock())
42+
43+
@cached(scitag_cache)
44+
def get_scitag(vo: str, activity: str = None):
45+
"""
46+
Get the scitag based on the VO and activity.
47+
If the VO is not found in the scitag.json, it defaults to 1.
48+
If no specific activity is provided, it defaults to the "default" activityName.
49+
50+
:param vo: The VO for which to get the scitag
51+
:param activity: The activity for which to get the scitag
52+
:return: The scitag value
53+
"""
54+
55+
# Create a TTL cache: max 1 item, expires after 84400 seconds (1 day)
56+
json_cache = TTLCache(maxsize=1, ttl=86400, lock=Lock())
57+
58+
@cached(json_cache)
59+
def get_remote_json():
60+
gLogger.verbose("Fetching https://scitags.org/api.json from the network")
61+
response = requests.get("https://scitags.org/api.json")
62+
response.raise_for_status()
63+
return response.json()
64+
65+
# Load the JSON data from the cache or network
66+
sj = get_remote_json()
67+
68+
vo_id = 1 # Default VO ID
69+
activity_id = 1 # Default activity ID
70+
71+
for experiment in sj.get("experiments", []):
72+
if experiment.get("expName") == vo.lower():
73+
vo_id = experiment.get("expId")
74+
for act in experiment.get("activities", []):
75+
if act.get("activityName") == activity:
76+
activity_id = act.get("activityId")
77+
78+
# Logic to determine the scitag based on vo and activity (this is what FTS wants)
79+
return vo_id << 6 | activity_id # Example logic, replace with actual implementation
80+
81+
4082
class FTS3Job(JSerializable):
4183
"""Abstract class to represent a job to be executed by FTS. It belongs
4284
to an FTS3Operation
@@ -467,6 +509,9 @@ def _constructTransferJob(self, pinTime, allLFNs, target_spacetoken, protocols=N
467509
# * stageProto://myFile -> stageProto://myFile
468510
# * srcProto://myFile -> destProto://myFile
469511

512+
# scitag 65 is 1 << 6 | 1 (default experiment, default activity)
513+
scitag = get_scitag(vo=self.vo, activity=self.activity)
514+
470515
if stageURL:
471516
# We do not set a fileID in the metadata
472517
# such that we do not update the DB when monitoring
@@ -485,6 +530,7 @@ def _constructTransferJob(self, pinTime, allLFNs, target_spacetoken, protocols=N
485530
filesize=ftsFile.size,
486531
metadata=stageTrans_metadata,
487532
activity=self.activity,
533+
scitag=scitag,
488534
)
489535
transfers.append(stageTrans)
490536

@@ -553,6 +599,7 @@ def _constructTransferJob(self, pinTime, allLFNs, target_spacetoken, protocols=N
553599
activity=self.activity,
554600
source_token=srcToken,
555601
destination_token=dstToken,
602+
scitag=scitag,
556603
)
557604

558605
transfers.append(trans)
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
from unittest.mock import Mock, patch
2+
3+
import pytest
4+
5+
from DIRAC.DataManagementSystem.Client.FTS3Job import get_scitag
6+
7+
8+
@pytest.fixture
9+
def sample_json_response():
10+
"""Sample JSON response for testing."""
11+
return {
12+
"experiments": [
13+
{
14+
"expName": "default",
15+
"expId": 1,
16+
"activities": [
17+
{"activityName": "default", "activityId": 1},
18+
{"activityName": "Data Challenge", "activityId": 4},
19+
],
20+
},
21+
{
22+
"expName": "atlas",
23+
"expId": 100,
24+
"activities": [
25+
{"activityName": "default", "activityId": 1},
26+
{"activityName": "analysis", "activityId": 2},
27+
{"activityName": "production", "activityId": 3},
28+
],
29+
},
30+
{
31+
"expName": "cms",
32+
"expId": 200,
33+
"activities": [{"activityName": "default", "activityId": 1}, {"activityName": "mc", "activityId": 4}],
34+
},
35+
]
36+
}
37+
38+
39+
@pytest.fixture
40+
def mock_requests_success(sample_json_response):
41+
"""Mock successful HTTP request."""
42+
with patch("requests.get") as mock_get:
43+
mock_response = Mock()
44+
mock_response.json.return_value = sample_json_response
45+
mock_response.raise_for_status.return_value = None
46+
mock_get.return_value = mock_response
47+
yield mock_get
48+
49+
50+
class TestGetScitag:
51+
def test_valid_vo_and_activity(self, mock_requests_success):
52+
"""Test get_scitag with valid VO and activity."""
53+
result = get_scitag("atlas", "analysis")
54+
expected = 100 << 6 | 2 # atlas expId=100, analysis activityId=2
55+
assert result == expected
56+
57+
def test_valid_vo_no_activity(self, mock_requests_success):
58+
"""Test get_scitag with valid VO but no specific activity (should use default)."""
59+
result = get_scitag("cms")
60+
expected = 200 << 6 | 1 # cms expId=200, default activityId=1
61+
assert result == expected
62+
63+
def test_invalid_vo(self, mock_requests_success):
64+
"""Test get_scitag with invalid VO (should use default vo_id=1)."""
65+
result = get_scitag("nonexistent")
66+
expected = 1 << 6 | 1 # default vo_id=1, default activity_id=1
67+
assert result == expected
68+
69+
def test_valid_vo_invalid_activity(self, mock_requests_success):
70+
"""Test get_scitag with valid VO but invalid activity."""
71+
result = get_scitag("atlas", "nonexistent_activity")
72+
expected = 100 << 6 | 1 # atlas expId=100, default activity_id=1
73+
assert result == expected
74+
75+
def test_case_insensitive_vo(self, mock_requests_success):
76+
"""Test that VO matching is case insensitive."""
77+
result = get_scitag("ATLAS", "production")
78+
expected = 100 << 6 | 3 # atlas expId=100, production activityId=3
79+
assert result == expected
80+
81+
82+
@pytest.mark.parametrize(
83+
"vo,activity,expected_vo_id,expected_activity_id",
84+
[
85+
("atlas", "analysis", 100, 2),
86+
("atlas", "production", 100, 3),
87+
("cms", "mc", 200, 4),
88+
("cms", "default", 200, 1),
89+
("nonexistent", "any", 1, 1), # defaults
90+
("atlas", "nonexistent", 100, 1), # valid vo, invalid activity
91+
],
92+
)
93+
def test_parametrized_scenarios(mock_requests_success, vo, activity, expected_vo_id, expected_activity_id):
94+
"""Parametrized test for various VO and activity combinations."""
95+
result = get_scitag(vo, activity)
96+
expected = expected_vo_id << 6 | expected_activity_id
97+
assert result == expected
98+
99+
100+
@pytest.mark.parametrize(
101+
"vo,expected_result",
102+
[
103+
("atlas", 100 << 6 | 1), # Should use default activity
104+
("cms", 200 << 6 | 1), # Should use default activity
105+
("unknown", 1 << 6 | 1), # Should use all defaults
106+
],
107+
)
108+
def test_no_activity_parameter(mock_requests_success, vo, expected_result):
109+
"""Test behavior when no activity parameter is provided."""
110+
result = get_scitag(vo)
111+
assert result == expected_result

src/DIRAC/DataManagementSystem/Client/test/Test_FTS3Objects.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import errno
2-
from unittest import mock
32

43
import pytest
54

0 commit comments

Comments
 (0)