Skip to content

Commit 0fee65a

Browse files
committed
feat: add scitag support
1 parent 4949af4 commit 0fee65a

File tree

4 files changed

+86
-42
lines changed

4 files changed

+86
-42
lines changed

src/DIRAC/DataManagementSystem/Agent/RequestOperations/ReplicateAndRegister.py

Lines changed: 58 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,25 +25,22 @@
2525
import re
2626
from collections import defaultdict
2727

28+
import requests
29+
from cachetools import TTLCache, cached
30+
2831
# # from DIRAC
29-
from DIRAC import S_OK, S_ERROR, gLogger
30-
from DIRAC.Core.Utilities.Adler import compareAdler, hexAdlerToInt, intAdlerToHex
32+
from DIRAC import S_ERROR, S_OK, gLogger
3133
from DIRAC.Core.Security.ProxyInfo import getVOfromProxyGroup
32-
33-
from DIRAC.DataManagementSystem.Client.DataManager import DataManager
34+
from DIRAC.Core.Utilities.Adler import compareAdler, hexAdlerToInt, intAdlerToHex
3435
from DIRAC.DataManagementSystem.Agent.RequestOperations.DMSRequestOperationsBase import DMSRequestOperationsBase
35-
36-
from DIRAC.Resources.Storage.StorageElement import StorageElement
37-
from DIRAC.Resources.Catalog.FileCatalog import FileCatalog
38-
39-
from DIRAC.DataManagementSystem.Client.FTS3Operation import FTS3TransferOperation
40-
from DIRAC.DataManagementSystem.Client.FTS3File import FTS3File
36+
from DIRAC.DataManagementSystem.Client.DataManager import DataManager
4137
from DIRAC.DataManagementSystem.Client.FTS3Client import FTS3Client
38+
from DIRAC.DataManagementSystem.Client.FTS3File import FTS3File
39+
from DIRAC.DataManagementSystem.Client.FTS3Operation import FTS3TransferOperation
4240
from DIRAC.DataManagementSystem.private.FTS3Utilities import getFTS3Plugin
43-
44-
from DIRAC.ConfigurationSystem.Client.Helpers import Registry
45-
4641
from DIRAC.MonitoringSystem.Client.MonitoringReporter import MonitoringReporter
42+
from DIRAC.Resources.Catalog.FileCatalog import FileCatalog
43+
from DIRAC.Resources.Storage.StorageElement import StorageElement
4744

4845

4946
def filterReplicas(opFile, logger=None, dataManager=None, opSources=None, activeReplicas=None):
@@ -154,6 +151,44 @@ def filterReplicas(opFile, logger=None, dataManager=None, opSources=None, active
154151
return S_OK(result)
155152

156153

154+
def get_scitag(vo: str, activity: str = None):
155+
"""
156+
Get the scitag based on the VO and activity.
157+
If the VO is not found in the scitag.json, it defaults to 1.
158+
If no specific activity is provided, it defaults to the "default" activityName.
159+
160+
:param vo: The VO for which to get the scitag
161+
:param activity: The activity for which to get the scitag
162+
:return: The scitag value
163+
"""
164+
165+
# Create a TTL cache: max 1 item, expires after 84400 seconds (1 day)
166+
json_cache = TTLCache(maxsize=1, ttl=86400)
167+
168+
@cached(json_cache)
169+
def get_remote_json():
170+
gLogger.verbose("Fetching https://scitags.org/api.json from the network")
171+
response = requests.get("https://scitags.org/api.json")
172+
response.raise_for_status()
173+
return response.json()
174+
175+
# Load the JSON data from the cache or network
176+
sj = get_remote_json()
177+
178+
vo_id = 1 # Default VO ID
179+
activity_id = 1 # Default activity ID
180+
181+
for experiment in sj.get("experiments", []):
182+
if experiment.get("expName") == vo.lower():
183+
vo_id = experiment.get("expId")
184+
for act in experiment.get("activities", []):
185+
if act.get("activityName") == activity:
186+
activity_id = act.get("activityId")
187+
188+
# Logic to determine the scitag based on vo and activity (this is what FTS wants)
189+
return vo_id << 6 | activity_id # Example logic, replace with actual implementation
190+
191+
157192
########################################################################
158193
class ReplicateAndRegister(DMSRequestOperationsBase):
159194
"""
@@ -472,14 +507,23 @@ def fts3Transfer(self):
472507
fts3Operation = FTS3TransferOperation.fromRMSObjects(self.request, self.operation)
473508
fts3Operation.ftsFiles = fts3Files
474509

510+
vo = getVOfromProxyGroup().get("Value")
475511
try:
476512
if not fts3Operation.activity:
477-
vo = getVOfromProxyGroup().get("Value")
478513
fts3Plugin = getFTS3Plugin(vo=vo)
479514
fts3Operation.activity = fts3Plugin.inferFTSActivity(fts3Operation, self.request, self.operation)
480515
except Exception:
481516
pass
482517

518+
try:
519+
if not fts3Operation.scitag:
520+
fts3Operation.scitag = get_scitag(
521+
vo=vo,
522+
activity=fts3Operation.activity if fts3Operation.activity else None,
523+
)
524+
except Exception:
525+
pass
526+
483527
ftsSchedule = FTS3Client().persistOperation(fts3Operation)
484528
if not ftsSchedule["OK"]:
485529
self.log.error("Completely failed to schedule to FTS3:", ftsSchedule["Message"])

src/DIRAC/DataManagementSystem/Client/FTS3Job.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,13 @@
22

33
import datetime
44
import errno
5-
from packaging.version import Version
65

6+
import fts3.rest.client.easy as fts3
77

88
# Requires at least version 3.3.3
99
from fts3 import __version__ as fts3_version
10-
import fts3.rest.client.easy as fts3
1110
from fts3.rest.client.exceptions import FTS3ClientException, NotFound
11+
from packaging.version import Version
1212

1313
# There is a breaking change in the API in 3.13
1414
# https://gitlab.cern.ch/fts/fts-rest-flask/-/commit/5faa283e0cd4b80a0139a547c4a6356522c8449d
@@ -22,16 +22,13 @@
2222
# default pycurl. See https://its.cern.ch/jira/browse/FTS-261
2323
from fts3.rest.client.request import Request as ftsSSLRequest
2424

25-
from DIRAC.Resources.Storage.StorageElement import StorageElement
26-
27-
from DIRAC.FrameworkSystem.Client.Logger import gLogger
28-
from DIRAC.FrameworkSystem.Client.TokenManagerClient import gTokenManager
29-
30-
from DIRAC.Core.Utilities.ReturnValues import S_OK, S_ERROR
3125
from DIRAC.Core.Utilities.DErrno import cmpError
32-
3326
from DIRAC.Core.Utilities.JEncode import JSerializable
27+
from DIRAC.Core.Utilities.ReturnValues import S_ERROR, S_OK
3428
from DIRAC.DataManagementSystem.Client.FTS3File import FTS3File
29+
from DIRAC.FrameworkSystem.Client.Logger import gLogger
30+
from DIRAC.FrameworkSystem.Client.TokenManagerClient import gTokenManager
31+
from DIRAC.Resources.Storage.StorageElement import StorageElement
3532

3633
# 3 days in seconds
3734
BRING_ONLINE_TIMEOUT = 259200
@@ -104,6 +101,7 @@ def __init__(self):
104101
self.targetSE = None
105102
self.filesToSubmit = []
106103
self.activity = None
104+
self.scitag = None
107105
self.priority = None
108106
self.vo = None
109107
self.rmsReqID = None
@@ -485,6 +483,7 @@ def _constructTransferJob(self, pinTime, allLFNs, target_spacetoken, protocols=N
485483
filesize=ftsFile.size,
486484
metadata=stageTrans_metadata,
487485
activity=self.activity,
486+
scitag=self.scitag,
488487
)
489488
transfers.append(stageTrans)
490489

@@ -553,6 +552,7 @@ def _constructTransferJob(self, pinTime, allLFNs, target_spacetoken, protocols=N
553552
activity=self.activity,
554553
source_token=srcToken,
555554
destination_token=dstToken,
555+
scitag=self.scitag,
556556
)
557557

558558
transfers.append(trans)

src/DIRAC/DataManagementSystem/Client/FTS3Operation.py

Lines changed: 18 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,27 @@
11
import datetime
22
import errno
33
import json
4+
45
from sqlalchemy import orm
56

7+
from DIRAC import S_ERROR, S_OK
8+
from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getVOForGroup
9+
from DIRAC.Core.Utilities.DErrno import cmpError
10+
from DIRAC.Core.Utilities.JEncode import JSerializable
11+
from DIRAC.Core.Utilities.List import breakListIntoChunks
12+
from DIRAC.Core.Utilities.ReturnValues import returnSingleResult
13+
from DIRAC.DataManagementSystem.Client.DataManager import DataManager
14+
from DIRAC.DataManagementSystem.Client.FTS3File import FTS3File
615
from DIRAC.DataManagementSystem.Client.FTS3Job import FTS3Job
716
from DIRAC.DataManagementSystem.private import FTS3Utilities
817
from DIRAC.DataManagementSystem.Utilities.DMSHelpers import DMSHelpers
9-
10-
from DIRAC.DataManagementSystem.Client.DataManager import DataManager
1118
from DIRAC.FrameworkSystem.Client.Logger import gLogger
12-
13-
from DIRAC.Resources.Storage.StorageElement import StorageElement
14-
from DIRAC.Core.Utilities.ReturnValues import returnSingleResult
15-
from DIRAC.Core.Utilities.DErrno import cmpError
16-
17-
18-
from DIRAC import S_OK, S_ERROR
19-
20-
from DIRAC.Core.Utilities.List import breakListIntoChunks
21-
from DIRAC.ResourceStatusSystem.Client.ResourceStatus import ResourceStatus
22-
from DIRAC.DataManagementSystem.Client.FTS3File import FTS3File
23-
from DIRAC.Core.Utilities.JEncode import JSerializable
24-
25-
from DIRAC.RequestManagementSystem.Client.ReqClient import ReqClient
26-
from DIRAC.RequestManagementSystem.Client.Operation import Operation as rmsOperation
2719
from DIRAC.RequestManagementSystem.Client.File import File as rmsFile
20+
from DIRAC.RequestManagementSystem.Client.Operation import Operation as rmsOperation
21+
from DIRAC.RequestManagementSystem.Client.ReqClient import ReqClient
2822
from DIRAC.RequestManagementSystem.Client.Request import Request as rmsRequest
29-
30-
from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getVOForGroup
23+
from DIRAC.Resources.Storage.StorageElement import StorageElement
24+
from DIRAC.ResourceStatusSystem.Client.ResourceStatus import ResourceStatus
3125

3226

3327
class FTS3Operation(JSerializable):
@@ -83,6 +77,7 @@ def __init__(
8377
sourceSEs=None,
8478
activity=None,
8579
priority=None,
80+
scitag=None,
8681
):
8782
"""
8883
:param ftsFiles: list of FTS3Files object that belongs to the operation
@@ -93,6 +88,7 @@ def __init__(
9388
:param sourceSEs: list of SE to be used as source (if applicable)
9489
:param activity: FTS activity to use
9590
:param priority: FTS priority to use
91+
:param scitag: optional scitag to be used for the transfers
9692
9793
"""
9894
############################
@@ -113,6 +109,7 @@ def __init__(
113109

114110
self.activity = activity
115111
self.priority = priority
112+
self.scitag = scitag
116113

117114
self.ftsJobs = []
118115

@@ -254,6 +251,7 @@ def _createNewJob(self, jobType, ftsFiles, targetSE, sourceSE=None, multiHopSE=N
254251
newJob.targetSE = targetSE
255252
newJob.activity = self.activity
256253
newJob.priority = self.priority
254+
newJob.scitag = self.scitag
257255
newJob.username = self.username
258256
newJob.userGroup = self.userGroup
259257
newJob.vo = self.vo
@@ -387,6 +385,7 @@ def fromRMSObjects(cls, rmsReq, rmsOp):
387385

388386
ftsOp.activity = argumentDic["activity"]
389387
ftsOp.priority = argumentDic["priority"]
388+
ftsOp.scitag = argumentDic.get("scitag", 65) # 65 is 1 << 6 | 1 (default experiment, default activity)
390389
except Exception:
391390
pass
392391

src/DIRAC/DataManagementSystem/DB/FTS3DB.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@
104104
Column("sourceSEs", String(255)),
105105
Column("activity", String(255)),
106106
Column("priority", SmallInteger),
107+
Column("scitag", SmallInteger, server_default="65"), # 65 is 1 << 6 | 1 (default experiment, default activity)
107108
Column("creationTime", DateTime),
108109
Column("lastUpdate", DateTime, onupdate=utc_timestamp()),
109110
Column("status", Enum(*FTS3Operation.ALL_STATES), server_default=FTS3Operation.INIT_STATE, index=True),

0 commit comments

Comments
 (0)