39
39
40
40
metadata = MetaData ()
41
41
42
+ # Define the default utc_timestampfunction.
43
+ # We overwrite it in the case of sqlite in the tests
44
+ # because sqlite does not know UTC_TIMESTAMP
45
+ utc_timestamp = func .utc_timestamp
42
46
43
47
fts3FileTable = Table (
44
48
"Files" ,
45
49
metadata ,
46
50
Column ("fileID" , Integer , primary_key = True ),
47
51
Column ("operationID" , Integer , ForeignKey ("Operations.operationID" , ondelete = "CASCADE" ), nullable = False ),
48
52
Column ("attempt" , Integer , server_default = "0" ),
49
- Column ("lastUpdate" , DateTime , onupdate = func . utc_timestamp ()),
53
+ Column ("lastUpdate" , DateTime , onupdate = utc_timestamp ()),
50
54
Column ("rmsFileID" , Integer , server_default = "0" ),
51
55
Column ("lfn" , String (1024 )),
52
56
Column ("checksum" , String (255 )),
67
71
Column ("jobID" , Integer , primary_key = True ),
68
72
Column ("operationID" , Integer , ForeignKey ("Operations.operationID" , ondelete = "CASCADE" ), nullable = False ),
69
73
Column ("submitTime" , DateTime ),
70
- Column ("lastUpdate" , DateTime , onupdate = func . utc_timestamp ()),
74
+ Column ("lastUpdate" , DateTime , onupdate = utc_timestamp ()),
71
75
Column ("lastMonitor" , DateTime ),
72
76
Column ("completeness" , Float ),
73
77
# Could be fetched from Operation, but bad for perf
99
103
Column ("activity" , String (255 )),
100
104
Column ("priority" , SmallInteger ),
101
105
Column ("creationTime" , DateTime ),
102
- Column ("lastUpdate" , DateTime , onupdate = func . utc_timestamp ()),
106
+ Column ("lastUpdate" , DateTime , onupdate = utc_timestamp ()),
103
107
Column ("status" , Enum (* FTS3Operation .ALL_STATES ), server_default = FTS3Operation .INIT_STATE , index = True ),
104
108
Column ("error" , String (1024 )),
105
109
Column ("type" , String (255 )),
@@ -176,7 +180,7 @@ def __getDBConnectionInfo(self, fullname):
176
180
self .dbPass = dbParameters ["Password" ]
177
181
self .dbName = dbParameters ["DBName" ]
178
182
179
- def __init__ (self , pool_size = 15 ):
183
+ def __init__ (self , pool_size = 15 , url = None ):
180
184
"""c'tor
181
185
182
186
:param self: self reference
@@ -185,12 +189,16 @@ def __init__(self, pool_size=15):
185
189
"""
186
190
187
191
self .log = gLogger .getSubLogger ("FTS3DB" )
188
- # Initialize the connection info
189
- self .__getDBConnectionInfo ("DataManagement/FTS3DB" )
192
+
193
+ if not url :
194
+ # Initialize the connection info
195
+ self .__getDBConnectionInfo ("DataManagement/FTS3DB" )
196
+
197
+ url = "mysql://%s:%s@%s:%s/%s" % (self .dbUser , self .dbPass , self .dbHost , self .dbPort , self .dbName )
190
198
191
199
runDebug = gLogger .getLevel () == "DEBUG"
192
200
self .engine = create_engine (
193
- "mysql://%s:%s@%s:%s/%s" % ( self . dbUser , self . dbPass , self . dbHost , self . dbPort , self . dbName ) ,
201
+ url ,
194
202
echo = runDebug ,
195
203
pool_size = pool_size ,
196
204
pool_recycle = 3600 ,
@@ -221,7 +229,7 @@ def persistOperation(self, operation):
221
229
# so that another agent can work on the request
222
230
operation .assignment = None
223
231
# because of the merge we have to explicitely set lastUpdate
224
- operation .lastUpdate = func . utc_timestamp ()
232
+ operation .lastUpdate = utc_timestamp ()
225
233
try :
226
234
227
235
# Merge it in case it already is in the DB
@@ -286,7 +294,6 @@ def getActiveJobs(self, limit=20, lastMonitor=None, jobAssignmentTag="Assigned")
286
294
:returns: list of FTS3Jobs
287
295
288
296
"""
289
-
290
297
session = self .dbSession (expire_on_commit = False )
291
298
292
299
try :
@@ -435,7 +442,7 @@ def updateJobStatus(self, jobStatusDict):
435
442
updateDict [FTS3Job .completeness ] = valueDict ["completeness" ]
436
443
437
444
if valueDict .get ("lastMonitor" ):
438
- updateDict [FTS3Job .lastMonitor ] = func . utc_timestamp ()
445
+ updateDict [FTS3Job .lastMonitor ] = utc_timestamp ()
439
446
440
447
updateDict [FTS3Job .assignment ] = None
441
448
@@ -530,12 +537,13 @@ def getNonFinishedOperations(self, limit=20, operationAssignmentTag="Assigned"):
530
537
ftsOperations = []
531
538
532
539
# We need to do the select in two times because the join clause that makes the limit difficult
540
+ # We get the list of operations ID that have associated jobs assigned
541
+ opIDsWithJobAssigned = session .query (FTS3Job .operationID ).filter (~ FTS3Job .assignment .is_ (None )).subquery ()
533
542
operationIDsQuery = (
534
543
session .query (FTS3Operation .operationID )
535
- .outerjoin (FTS3Job )
536
544
.filter (FTS3Operation .status .in_ (["Active" , "Processed" ]))
537
545
.filter (FTS3Operation .assignment .is_ (None ))
538
- .filter (FTS3Job . assignment . is_ ( None ))
546
+ .filter (~ FTS3Operation . operationID . in_ ( opIDsWithJobAssigned ))
539
547
.order_by (FTS3Operation .lastUpdate .asc ())
540
548
.limit (limit )
541
549
.distinct ()
@@ -591,8 +599,7 @@ def kickStuckOperations(self, limit=20, kickDelay=2):
591
599
ftsOps = (
592
600
session .query (FTS3Operation .operationID )
593
601
.filter (
594
- FTS3Operation .lastUpdate
595
- < (func .date_sub (func .utc_timestamp (), text ("INTERVAL %d HOUR" % kickDelay )))
602
+ FTS3Operation .lastUpdate < (func .date_sub (utc_timestamp (), text ("INTERVAL %d HOUR" % kickDelay )))
596
603
)
597
604
.filter (~ FTS3Operation .assignment .is_ (None ))
598
605
.limit (limit )
@@ -607,7 +614,7 @@ def kickStuckOperations(self, limit=20, kickDelay=2):
607
614
.where (FTS3Operation .operationID .in_ (opIDs ))
608
615
.where (
609
616
FTS3Operation .lastUpdate
610
- < (func .date_sub (func . utc_timestamp (), text ("INTERVAL %d HOUR" % kickDelay )))
617
+ < (func .date_sub (utc_timestamp (), text ("INTERVAL %d HOUR" % kickDelay )))
611
618
)
612
619
.values ({"assignment" : None })
613
620
.execution_options (synchronize_session = False ) # see comment about synchronize_session
@@ -641,9 +648,7 @@ def kickStuckJobs(self, limit=20, kickDelay=2):
641
648
642
649
ftsJobs = (
643
650
session .query (FTS3Job .jobID )
644
- .filter (
645
- FTS3Job .lastUpdate < (func .date_sub (func .utc_timestamp (), text ("INTERVAL %d HOUR" % kickDelay )))
646
- )
651
+ .filter (FTS3Job .lastUpdate < (func .date_sub (utc_timestamp (), text ("INTERVAL %d HOUR" % kickDelay ))))
647
652
.filter (~ FTS3Job .assignment .is_ (None ))
648
653
.limit (limit )
649
654
)
@@ -655,9 +660,7 @@ def kickStuckJobs(self, limit=20, kickDelay=2):
655
660
result = session .execute (
656
661
update (FTS3Job )
657
662
.where (FTS3Job .jobID .in_ (jobIDs ))
658
- .where (
659
- FTS3Job .lastUpdate < (func .date_sub (func .utc_timestamp (), text ("INTERVAL %d HOUR" % kickDelay )))
660
- )
663
+ .where (FTS3Job .lastUpdate < (func .date_sub (utc_timestamp (), text ("INTERVAL %d HOUR" % kickDelay ))))
661
664
.values ({"assignment" : None })
662
665
.execution_options (synchronize_session = False ) # see comment about synchronize_session
663
666
)
@@ -689,8 +692,7 @@ def deleteFinalOperations(self, limit=20, deleteDelay=180):
689
692
ftsOps = (
690
693
session .query (FTS3Operation .operationID )
691
694
.filter (
692
- FTS3Operation .lastUpdate
693
- < (func .date_sub (func .utc_timestamp (), text ("INTERVAL %d DAY" % deleteDelay )))
695
+ FTS3Operation .lastUpdate < (func .date_sub (utc_timestamp (), text ("INTERVAL %d DAY" % deleteDelay )))
694
696
)
695
697
.filter (FTS3Operation .status .in_ (FTS3Operation .FINAL_STATES ))
696
698
.limit (limit )
0 commit comments