8
8
from DIRAC .Core .DISET .RequestHandler import RequestHandler
9
9
from DIRAC .Core .Utilities .JEncode import strToIntDict
10
10
from DIRAC .Core .Utilities .ObjectLoader import ObjectLoader
11
+ from DIRAC .RequestManagementSystem .Client .Operation import Operation
12
+ from DIRAC .RequestManagementSystem .Client .Request import Request
13
+ from DIRAC .TransformationSystem .Client import TransformationFilesStatus
14
+ from DIRAC .WorkloadManagementSystem .Client import JobStatus
11
15
from DIRAC .WorkloadManagementSystem .Service .JobPolicy import RIGHT_GET_INFO , JobPolicy
12
16
17
+ TASKS_STATE_NAMES = ["TotalCreated" , "Created" ] + sorted (
18
+ set (JobStatus .JOB_STATES ) | set (Request .ALL_STATES ) | set (Operation .ALL_STATES )
19
+ )
20
+ FILES_STATE_NAMES = ["PercentProcessed" , "Total" ] + TransformationFilesStatus .TRANSFORMATION_FILES_STATES
21
+
13
22
14
23
class WebAppHandler (RequestHandler ):
15
24
@classmethod
@@ -27,6 +36,11 @@ def initializeHandler(cls, serviceInfoDict):
27
36
return result
28
37
cls .jobDB = result ["Value" ](parentLogger = cls .log )
29
38
39
+ result = ObjectLoader ().loadObject ("TransformationSystem.DB.TransformationDB" , "TransformationDB" )
40
+ if not result ["OK" ]:
41
+ return result
42
+ cls .transformationDB = result ["Value" ](parentLogger = cls .log )
43
+
30
44
except RuntimeError as excp :
31
45
return S_ERROR (f"Can't connect to DB: { excp } " )
32
46
@@ -320,3 +334,198 @@ def export_getSiteSummarySelectors(cls):
320
334
resultDict ["Site" ] = siteList
321
335
322
336
return S_OK (resultDict )
337
+
338
+ ##############################################################################
339
+ # Transformations
340
+ ##############################################################################
341
+
342
+ types_getDistinctAttributeValues = [str , dict ]
343
+
344
+ @classmethod
345
+ def export_getDistinctAttributeValues (cls , attribute , selectDict ):
346
+ res = cls .transformationDB .getTableDistinctAttributeValues ("Transformations" , [attribute ], selectDict )
347
+ if not res ["OK" ]:
348
+ return res
349
+ return S_OK (res ["Value" ][attribute ])
350
+
351
+ types_getTransformationFilesSummaryWeb = [dict , list , int , int ]
352
+
353
+ def export_getTransformationFilesSummaryWeb (cls , selectDict , sortList , startItem , maxItems ):
354
+ selectColumns = (["TransformationID" , "Status" , "UsedSE" , "TargetSE" ],)
355
+ timeStamp = ("LastUpdate" ,)
356
+ statusColumn = ("Status" ,)
357
+ fromDate = selectDict .get ("FromDate" , None )
358
+ if fromDate :
359
+ del selectDict ["FromDate" ]
360
+ # if not fromDate:
361
+ # fromDate = last_update
362
+ toDate = selectDict .get ("ToDate" , None )
363
+ if toDate :
364
+ del selectDict ["ToDate" ]
365
+ # Sorting instructions. Only one for the moment.
366
+ if sortList :
367
+ orderAttribute = sortList [0 ][0 ] + ":" + sortList [0 ][1 ]
368
+ else :
369
+ orderAttribute = None
370
+ # Get the columns that match the selection
371
+ fcn = None
372
+ fcnName = "getTransformationFiles"
373
+ if hasattr (cls .transformationDB , fcnName ) and callable (getattr (cls .transformationDB , fcnName )):
374
+ fcn = getattr (cls .transformationDB , fcnName )
375
+ if not fcn :
376
+ return S_ERROR (f"Unable to invoke gTransformationDB.{ fcnName } , it isn't a member function" )
377
+ res = fcn (condDict = selectDict , older = toDate , newer = fromDate , timeStamp = timeStamp , orderAttribute = orderAttribute )
378
+ if not res ["OK" ]:
379
+ return res
380
+
381
+ # The full list of columns in contained here
382
+ allRows = res ["Value" ]
383
+ # Prepare the standard structure now within the resultDict dictionary
384
+ resultDict = {}
385
+ # Create the total records entry
386
+ resultDict ["TotalRecords" ] = len (allRows )
387
+
388
+ # Get the rows which are within the selected window
389
+ if resultDict ["TotalRecords" ] == 0 :
390
+ return S_OK (resultDict )
391
+ ini = startItem
392
+ last = ini + maxItems
393
+ if ini >= resultDict ["TotalRecords" ]:
394
+ return S_ERROR ("Item number out of range" )
395
+ if last > resultDict ["TotalRecords" ]:
396
+ last = resultDict ["TotalRecords" ]
397
+
398
+ selectedRows = allRows [ini :last ]
399
+ resultDict ["Records" ] = []
400
+ for row in selectedRows :
401
+ resultDict ["Records" ].append (list (row .values ()))
402
+
403
+ # Create the ParameterNames entry
404
+ resultDict ["ParameterNames" ] = list (selectedRows [0 ].keys ())
405
+ # Find which element in the tuple contains the requested status
406
+ if statusColumn not in resultDict ["ParameterNames" ]:
407
+ return S_ERROR ("Provided status column not present" )
408
+
409
+ # Generate the status dictionary
410
+ statusDict = {}
411
+ for row in selectedRows :
412
+ status = row [statusColumn ]
413
+ statusDict [status ] = statusDict .setdefault (status , 0 ) + 1
414
+ resultDict ["Extras" ] = statusDict
415
+
416
+ # Obtain the distinct values of the selection parameters
417
+ res = cls .transformationDB .getTableDistinctAttributeValues (
418
+ "TransformationFiles" , selectColumns , selectDict , older = toDate , newer = fromDate
419
+ )
420
+ distinctSelections = zip (selectColumns , [])
421
+ if res ["OK" ]:
422
+ distinctSelections = res ["Value" ]
423
+ resultDict ["Selections" ] = distinctSelections
424
+
425
+ return S_OK (resultDict )
426
+
427
+ types_getTransformationSummaryWeb = [dict , list , int , int ]
428
+
429
+ def export_getTransformationSummaryWeb (cls , selectDict , sortList , startItem , maxItems ):
430
+ """Get the summary of the transformation information for a given page in the generic format"""
431
+
432
+ # Obtain the timing information from the selectDict
433
+ last_update = selectDict .get ("CreationDate" , None )
434
+ if last_update :
435
+ del selectDict ["CreationDate" ]
436
+ fromDate = selectDict .get ("FromDate" , None )
437
+ if fromDate :
438
+ del selectDict ["FromDate" ]
439
+ if not fromDate :
440
+ fromDate = last_update
441
+ toDate = selectDict .get ("ToDate" , None )
442
+ if toDate :
443
+ del selectDict ["ToDate" ]
444
+ # Sorting instructions. Only one for the moment.
445
+ if sortList :
446
+ orderAttribute = []
447
+ for i in sortList :
448
+ orderAttribute += [i [0 ] + ":" + i [1 ]]
449
+ else :
450
+ orderAttribute = None
451
+
452
+ # Get the transformations that match the selection
453
+ res = cls .transformationDB .getTransformations (
454
+ condDict = selectDict , older = toDate , newer = fromDate , orderAttribute = orderAttribute
455
+ )
456
+ if not res ["OK" ]:
457
+ return res
458
+
459
+ ops = Operations ()
460
+ # Prepare the standard structure now within the resultDict dictionary
461
+ resultDict = {}
462
+ trList = res ["Records" ]
463
+ # Create the total records entry
464
+ nTrans = len (trList )
465
+ resultDict ["TotalRecords" ] = nTrans
466
+ # Create the ParameterNames entry
467
+ # As this list is a reference to the list in the DB, we cannot extend it, therefore copy it
468
+ resultDict ["ParameterNames" ] = list (res ["ParameterNames" ])
469
+ # Add the job states to the ParameterNames entry
470
+ taskStateNames = TASKS_STATE_NAMES + ops .getValue ("Transformations/AdditionalTaskStates" , [])
471
+ resultDict ["ParameterNames" ] += ["Jobs_" + x for x in taskStateNames ]
472
+ # Add the file states to the ParameterNames entry
473
+ fileStateNames = FILES_STATE_NAMES + ops .getValue ("Transformations/AdditionalFileStates" , [])
474
+ resultDict ["ParameterNames" ] += ["Files_" + x for x in fileStateNames ]
475
+
476
+ # Get the transformations which are within the selected window
477
+ if nTrans == 0 :
478
+ return S_OK (resultDict )
479
+ ini = startItem
480
+ last = ini + maxItems
481
+ if ini >= nTrans :
482
+ return S_ERROR ("Item number out of range" )
483
+ if last > nTrans :
484
+ last = nTrans
485
+ transList = trList [ini :last ]
486
+
487
+ statusDict = {}
488
+ extendableTranfs = ops .getValue ("Transformations/ExtendableTransfTypes" , ["Simulation" , "MCsimulation" ])
489
+ givenUpFileStatus = ops .getValue ("Transformations/GivenUpFileStatus" , ["MissingInFC" ])
490
+ problematicStatuses = ops .getValue ("Transformations/ProblematicStatuses" , ["Problematic" ])
491
+ # Add specific information for each selected transformation
492
+ for trans in transList :
493
+ transDict = dict (zip (resultDict ["ParameterNames" ], trans ))
494
+
495
+ # Update the status counters
496
+ status = transDict ["Status" ]
497
+ statusDict [status ] = statusDict .setdefault (status , 0 ) + 1
498
+
499
+ # Get the statistics on the number of jobs for the transformation
500
+ transID = transDict ["TransformationID" ]
501
+ res = cls .transformationDB .getTransformationTaskStats (transID )
502
+ taskDict = {}
503
+ if res ["OK" ] and res ["Value" ]:
504
+ taskDict = res ["Value" ]
505
+ for state in taskStateNames :
506
+ trans .append (taskDict .get (state , 0 ))
507
+
508
+ # Get the statistics for the number of files for the transformation
509
+ fileDict = {}
510
+ transType = transDict ["Type" ]
511
+ if transType .lower () in extendableTranfs :
512
+ fileDict ["PercentProcessed" ] = "-"
513
+ else :
514
+ res = cls .transformationDB .getTransformationStats (transID )
515
+ if res ["OK" ]:
516
+ fileDict = res ["Value" ]
517
+ total = fileDict ["Total" ]
518
+ for stat in givenUpFileStatus :
519
+ total -= fileDict .get (stat , 0 )
520
+ processed = fileDict .get (TransformationFilesStatus .PROCESSED , 0 )
521
+ fileDict ["PercentProcessed" ] = f"{ int (processed * 1000.0 / total ) / 10.0 :.1f} " if total else 0.0
522
+ problematic = 0
523
+ for stat in problematicStatuses :
524
+ problematic += fileDict .get (stat , 0 )
525
+ fileDict ["Problematic" ] = problematic
526
+ for state in fileStateNames :
527
+ trans .append (fileDict .get (state , 0 ))
528
+
529
+ resultDict ["Records" ] = transList
530
+ resultDict ["Extras" ] = statusDict
531
+ return S_OK (resultDict )
0 commit comments