Skip to content

Commit 9dd6315

Browse files
authored
Merge pull request #6856 from chaen/8.0_FIX_fts3Perf
[8.0] Speedup FTS3 queries
2 parents 31a9122 + 019b503 commit 9dd6315

File tree

5 files changed

+601
-433
lines changed

5 files changed

+601
-433
lines changed

src/DIRAC/DataManagementSystem/DB/FTS3DB.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,10 +127,11 @@
127127
),
128128
"ftsJobs": relationship(
129129
FTS3Job,
130-
lazy="subquery", # Immediately load the entirety of the object,
130+
lazy="selectin", # Immediately load the entirety of the object,
131131
# but use a subquery to do it
132132
# This is to avoid the cartesian product between the three tables.
133-
# https://docs.sqlalchemy.org/en/latest/orm/loading_relationships.html#subquery-eager-loading
133+
# https://docs.sqlalchemy.org/en/20/orm/queryguide/relationships.html#selectin-eager-loading
134+
# We use selectin and not subquery because of https://github.com/DIRACGrid/DIRAC/issues/6814
134135
cascade="all, delete-orphan", # if a File is removed from the list,
135136
# remove it from the DB
136137
passive_deletes=True, # used together with cascade='all, delete-orphan'
Lines changed: 346 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,346 @@
1+
"""
2+
We define here some tests that are meant to be ran as unit tests
3+
against an sqlite DB and as integration tests against a MySQL DB
4+
"""
5+
import random
6+
7+
from DIRAC.Core.Security.ProxyInfo import getProxyInfo
8+
from DIRAC.DataManagementSystem.Client.FTS3Operation import FTS3Operation, FTS3TransferOperation, FTS3StagingOperation
9+
from DIRAC.DataManagementSystem.Client.FTS3File import FTS3File
10+
from DIRAC.DataManagementSystem.Client.FTS3Job import FTS3Job
11+
12+
13+
def generateOperation(opType, nbFiles, dests, sources=None):
14+
"""Generate one FTS3Operation object with FTS3Files in it"""
15+
op = None
16+
if opType == "Transfer":
17+
op = FTS3TransferOperation()
18+
elif opType == "Staging":
19+
op = FTS3StagingOperation()
20+
# Get the username and group from the proxy
21+
# if we are in integration test
22+
try:
23+
proxyInfo = getProxyInfo()["Value"]
24+
op.username = proxyInfo["username"]
25+
op.userGroup = proxyInfo["group"]
26+
except:
27+
op.username = "username"
28+
op.userGroup = "group"
29+
op.sourceSEs = str(sources)
30+
for _i in range(nbFiles * len(dests)):
31+
for dest in dests:
32+
ftsFile = FTS3File()
33+
ftsFile.lfn = f"lfn{random.randint(0,100)}"
34+
ftsFile.targetSE = dest
35+
op.ftsFiles.append(ftsFile)
36+
37+
return op
38+
39+
40+
def base_test_operation(fts3db, fts3Client):
41+
"""
42+
Run basic operation tests
43+
44+
:param fts3db: an FTS3DB
45+
:param fts3Client: possibly an FTS3Client (integration test) or the same fts3db
46+
"""
47+
op = generateOperation("Transfer", 3, ["Target1", "Target2"], sources=["Source1", "Source2"])
48+
assert not op.isTotallyProcessed()
49+
50+
res = fts3Client.persistOperation(op)
51+
assert res["OK"], res
52+
opID = res["Value"]
53+
54+
res = fts3Client.getOperation(opID)
55+
assert res["OK"]
56+
57+
op2 = res["Value"]
58+
59+
assert isinstance(op2, FTS3TransferOperation)
60+
assert not op2.isTotallyProcessed()
61+
62+
for attr in ["username", "userGroup", "sourceSEs"]:
63+
assert getattr(op, attr) == getattr(op2, attr)
64+
65+
assert len(op.ftsFiles) == len(op2.ftsFiles)
66+
67+
assert op2.status == FTS3Operation.INIT_STATE
68+
69+
fileIds = []
70+
for ftsFile in op2.ftsFiles:
71+
fileIds.append(ftsFile.fileID)
72+
assert ftsFile.status == FTS3File.INIT_STATE
73+
74+
# Testing updating the status and error
75+
fileStatusDict = {}
76+
for fId in fileIds:
77+
fileStatusDict[fId] = {
78+
"status": "Finished" if fId % 2 else "Failed",
79+
"error": "" if fId % 2 else "Tough luck",
80+
}
81+
82+
res = fts3db.updateFileStatus(fileStatusDict)
83+
assert res["OK"]
84+
85+
res = fts3Client.getOperation(opID)
86+
op3 = res["Value"]
87+
assert res["OK"]
88+
89+
assert op3.ftsFiles
90+
for ftsFile in op3.ftsFiles:
91+
if ftsFile.fileID % 2:
92+
assert ftsFile.status == "Finished"
93+
assert not ftsFile.error
94+
else:
95+
assert ftsFile.status == "Failed"
96+
assert ftsFile.error == "Tough luck"
97+
98+
assert not op3.isTotallyProcessed()
99+
100+
# Testing updating only the status and to final states
101+
fileStatusDict = {}
102+
nbFinalStates = len(FTS3File.FINAL_STATES)
103+
for fId in fileIds:
104+
fileStatusDict[fId] = {"status": FTS3File.FINAL_STATES[fId % nbFinalStates]}
105+
106+
res = fts3db.updateFileStatus(fileStatusDict)
107+
assert res["OK"]
108+
109+
res = fts3Client.getOperation(opID)
110+
op4 = res["Value"]
111+
assert res["OK"]
112+
113+
assert op4.ftsFiles
114+
for ftsFile in op4.ftsFiles:
115+
if ftsFile.fileID % 2:
116+
# Files to finished cannot be changed
117+
assert ftsFile.status == "Finished"
118+
assert not ftsFile.error
119+
else:
120+
assert ftsFile.status == FTS3File.FINAL_STATES[ftsFile.fileID % nbFinalStates]
121+
assert ftsFile.error == "Tough luck"
122+
123+
# Now it should be considered as totally processed
124+
assert op4.isTotallyProcessed()
125+
res = fts3Client.persistOperation(op4)
126+
127+
128+
def base_test_job(fts3db, fts3Client):
129+
"""
130+
Run basic Job tests
131+
132+
:param fts3db: an FTS3DB
133+
:param fts3Client: possibly an FTS3Client (integration test) or the same fts3db
134+
"""
135+
op = generateOperation("Transfer", 3, ["Target1", "Target2"], sources=["Source1", "Source2"])
136+
137+
job1 = FTS3Job()
138+
job1.ftsGUID = "a-random-guid"
139+
job1.ftsServer = "fts3"
140+
141+
job1.username = op.username
142+
job1.userGroup = op.userGroup
143+
144+
op.ftsJobs.append(job1)
145+
146+
res = fts3Client.persistOperation(op)
147+
assert res["OK"], res
148+
opID = res["Value"]
149+
150+
res = fts3Client.getOperation(opID)
151+
assert res["OK"]
152+
153+
op2 = res["Value"]
154+
assert len(op2.ftsJobs) == 1
155+
job2 = op2.ftsJobs[0]
156+
assert job2.operationID == opID
157+
158+
for attr in ["ftsGUID", "ftsServer", "username", "userGroup"]:
159+
assert getattr(job1, attr) == getattr(job2, attr)
160+
161+
162+
def base_test_job_monitoring_racecondition(fts3db, fts3Client):
163+
"""We used to have a race condition resulting in duplicated transfers for a file.
164+
This test reproduces the race condition.
165+
166+
The scenario is as follow. Operation has two files File1 and File2.
167+
Job1 is submitted for File1 and File2.
168+
File1 fails, File2 is still ongoing.
169+
We submit Job2 for File1.
170+
Job1 is monitored again, and we update again File1 to failed (because it is so in Job1)
171+
A Job3 would be created for File1, despite Job2 still running on it.
172+
"""
173+
op = generateOperation("Transfer", 2, ["Target1"])
174+
175+
job1 = FTS3Job()
176+
job1.ftsGUID = "03-racecondition-job1"
177+
job1.ftsServer = "fts3"
178+
179+
job1.username = op.username
180+
job1.userGroup = op.userGroup
181+
182+
op.ftsJobs.append(job1)
183+
184+
res = fts3Client.persistOperation(op)
185+
opID = res["Value"]
186+
187+
# Get back the operation to update all the IDs
188+
res = fts3Client.getOperation(opID)
189+
op = res["Value"]
190+
191+
fileIds = []
192+
for ftsFile in op.ftsFiles:
193+
fileIds.append(ftsFile.fileID)
194+
195+
file1ID = min(fileIds)
196+
file2ID = max(fileIds)
197+
198+
# Now we monitor Job1, and find that the first file has failed, the second is still ongoing
199+
fileStatusDict = {
200+
file1ID: {"status": "Failed", "error": "Someone made a boo-boo"},
201+
file2ID: {"status": "Staging"},
202+
}
203+
204+
res = fts3db.updateFileStatus(fileStatusDict)
205+
assert res["OK"]
206+
207+
# We would then submit a second job
208+
job2 = FTS3Job()
209+
job2.ftsGUID = "03-racecondition-job2"
210+
job2.ftsServer = "fts3"
211+
212+
job2.username = op.username
213+
job2.userGroup = op.userGroup
214+
215+
op.ftsJobs.append(job2)
216+
res = fts3Client.persistOperation(op)
217+
218+
# Now we monitor Job2 & Job1 (in this order)
219+
fileStatusDictJob2 = {
220+
file1ID: {"status": "Staging"},
221+
}
222+
res = fts3db.updateFileStatus(fileStatusDictJob2)
223+
assert res["OK"]
224+
225+
# And in Job1, File1 is (and will remain) failed, while File2 is still ongoing
226+
fileStatusDictJob1 = {
227+
file1ID: {"status": "Failed", "error": "Someone made a boo-boo"},
228+
file2ID: {"status": "Staging"},
229+
}
230+
res = fts3db.updateFileStatus(fileStatusDictJob1)
231+
assert res["OK"]
232+
233+
# And now this is the problem, because If we check whether this operation still has
234+
# files to submit, it will tell me yes, while all the files are being taken care of
235+
res = fts3Client.getOperation(opID)
236+
op = res["Value"]
237+
238+
# isTotallyProcessed does not return S_OK struct
239+
filesToSubmit = op._getFilesToSubmit()
240+
assert filesToSubmit == [op.ftsFiles[0]]
241+
242+
243+
def base_test_job_monitoring_solve_racecondition(fts3db, fts3Client):
244+
"""We used to have a race condition resulting in duplicated transfers for a file.
245+
This test reproduces the race condition to make sure it is fixed.
246+
This test makes sure that the update only happens on files concerned by the job
247+
248+
The scenario is as follow. Operation has two files File1 and File2.
249+
Job1 is submitted for File1 and File2.
250+
File1 fails, File2 is still ongoing.
251+
We submit Job2 for File1.
252+
Job1 is monitored again, and we update again File1 to failed (because it is so in Job1)
253+
A Job3 would be created for File1, dispite Job2 still runing on it.
254+
"""
255+
op = generateOperation("Transfer", 2, ["Target1"])
256+
257+
job1 = FTS3Job()
258+
job1GUID = "04-racecondition-job1"
259+
job1.ftsGUID = job1GUID
260+
job1.ftsServer = "fts3"
261+
262+
job1.username = op.username
263+
job1.userGroup = op.userGroup
264+
265+
op.ftsJobs.append(job1)
266+
267+
# Now, when submitting the job, we specify the ftsGUID to which files are
268+
# assigned
269+
for ftsFile in op.ftsFiles:
270+
ftsFile.ftsGUID = job1GUID
271+
272+
res = fts3Client.persistOperation(op)
273+
opID = res["Value"]
274+
275+
# Get back the operation to update all the IDs
276+
res = fts3Client.getOperation(opID)
277+
op = res["Value"]
278+
279+
fileIds = []
280+
for ftsFile in op.ftsFiles:
281+
fileIds.append(ftsFile.fileID)
282+
283+
# Arbitrarilly decide that File1 has the smalled fileID
284+
file1ID = min(fileIds)
285+
file2ID = max(fileIds)
286+
287+
# Now we monitor Job1, and find that the first file has failed, the second is still ongoing
288+
# And since File1 is in an FTS final status, we set its ftsGUID to None
289+
fileStatusDict = {
290+
file1ID: {"status": "Failed", "error": "Someone made a boo-boo", "ftsGUID": None},
291+
file2ID: {"status": "Staging"},
292+
}
293+
294+
# And when updating, take care of specifying that you are updating for a given GUID
295+
res = fts3db.updateFileStatus(fileStatusDict, ftsGUID=job1GUID)
296+
assert res["OK"]
297+
298+
# We would then submit a second job
299+
job2 = FTS3Job()
300+
job2GUID = "04-racecondition-job2"
301+
job2.ftsGUID = job2GUID
302+
job2.ftsServer = "fts3"
303+
304+
job2.username = op.username
305+
job2.userGroup = op.userGroup
306+
307+
op.ftsJobs.append(job2)
308+
309+
# And do not forget to add the new FTSGUID to File1
310+
# assigned
311+
for ftsFile in op.ftsFiles:
312+
if ftsFile.fileID == file1ID:
313+
ftsFile.ftsGUID = job2GUID
314+
315+
res = fts3Client.persistOperation(op)
316+
317+
# Now we monitor Job2 & Job1 (in this order)
318+
fileStatusDictJob2 = {
319+
file1ID: {"status": "Staging"},
320+
}
321+
322+
# Again specify the GUID
323+
res = fts3db.updateFileStatus(fileStatusDictJob2, ftsGUID=job2GUID)
324+
assert res["OK"]
325+
326+
# And in Job1, File1 is (and will remain) failed, while File2 is still ongoing
327+
fileStatusDictJob1 = {
328+
file1ID: {"status": "Failed", "error": "Someone made a boo-boo"},
329+
file2ID: {"status": "Staging"},
330+
}
331+
332+
# And thanks to specifying the job GUID, File1 should not be touched !
333+
res = fts3db.updateFileStatus(fileStatusDictJob1, ftsGUID=job1GUID)
334+
assert res["OK"]
335+
336+
# And hopefully now there shouldn't be any file to submit
337+
res = fts3Client.getOperation(opID)
338+
op = res["Value"]
339+
340+
# isTotallyProcessed does not return S_OK struct
341+
filesToSubmit = op._getFilesToSubmit()
342+
assert not filesToSubmit
343+
344+
345+
# All base tests defined in this module
346+
allBaseTests = [test_func for testName, test_func in globals().items() if testName.startswith("base_test")]

src/DIRAC/DataManagementSystem/DB/test/Test_FTS3DB.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from DIRAC.DataManagementSystem.Client.FTS3Operation import FTS3Operation, FTS3TransferOperation, FTS3StagingOperation
1010
from DIRAC.DataManagementSystem.Client.FTS3File import FTS3File
1111
from DIRAC.DataManagementSystem.Client.FTS3Job import FTS3Job
12-
12+
import DIRAC.DataManagementSystem.DB.test.FTS3TestUtils as baseTestModule
1313

1414
gLogger.setLevel("DEBUG")
1515

@@ -189,3 +189,9 @@ def _makeFile():
189189
activeJobs = res["Value"]
190190
activeJobIDs = [op.jobID for op in activeJobs]
191191
assert activeJobIDs == [1, 6]
192+
193+
194+
@pytest.mark.parametrize("baseTest", baseTestModule.allBaseTests)
195+
def test_all_common_tests(fts3db, baseTest):
196+
"""Run all the tests in the FTS3TestUtils."""
197+
baseTest(fts3db, fts3db)

0 commit comments

Comments
 (0)