Skip to content

Commit 91a2927

Browse files
committed
feat: Added function to send pilot submission to ES
1 parent 59a4559 commit 91a2927

File tree

5 files changed

+59
-17
lines changed

5 files changed

+59
-17
lines changed

src/DIRAC/MonitoringSystem/Client/MonitoringReporter.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,6 @@
1515
a MQ service is available, if the MQ is not working a failover will be performed.
1616
1717
"""
18-
from __future__ import absolute_import
19-
from __future__ import division
20-
from __future__ import print_function
21-
22-
__RCSID__ = "$Id$"
2318

2419
import threading
2520
import json

src/DIRAC/MonitoringSystem/Client/Types/PilotMonitoring.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ def __init__(self):
1414

1515
self.monitoringFields = ["NumTotal", "NumSucceeded"]
1616

17-
self.index = "pilot-stats_index"
17+
self.index = "pilotstats_index"
1818

1919
self.addMapping(
2020
{

src/DIRAC/MonitoringSystem/Client/Types/WMSHistory.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,6 @@
33
44
Filled by the agent "WorkloadManagement/StatesAccountingAgent"
55
"""
6-
from __future__ import absolute_import
7-
from __future__ import division
8-
from __future__ import print_function
96

107
from DIRAC.MonitoringSystem.Client.Types.BaseType import BaseType
118

src/DIRAC/MonitoringSystem/DB/MonitoringDB.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,6 @@
3232
3333
3434
"""
35-
from __future__ import absolute_import
36-
from __future__ import division
37-
from __future__ import print_function
38-
39-
__RCSID__ = "$Id$"
4035

4136
import time
4237
import calendar

src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py

Lines changed: 58 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,17 @@
1515
from concurrent.futures import ThreadPoolExecutor
1616

1717
import DIRAC
18-
from DIRAC import S_OK, gConfig
18+
from DIRAC import S_OK, S_ERROR, gConfig
1919
from DIRAC.ConfigurationSystem.Client.Helpers import CSGlobals, Registry
2020
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
2121
from DIRAC.ConfigurationSystem.Client.Helpers.Resources import getCESiteMapping
2222
from DIRAC.Core.Base.AgentModule import AgentModule
23-
from DIRAC.Core.Utilities.Time import dateTime, second
23+
from DIRAC.Core.Utilities.Time import dateTime, second, toEpoch
2424
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
2525
from DIRAC.FrameworkSystem.Client.ProxyManagerClient import gProxyManager
2626
from DIRAC.AccountingSystem.Client.Types.Pilot import Pilot as PilotAccounting
2727
from DIRAC.AccountingSystem.Client.Types.PilotSubmission import PilotSubmission as PilotSubmissionAccounting
28+
from DIRAC.MonitoringSystem.Client.MonitoringReporter import MonitoringReporter
2829
from DIRAC.MonitoringSystem.Client.Types.PilotMonitoring import PilotMonitoring as PilotSubmissionMonitoring
2930
from DIRAC.AccountingSystem.Client.DataStoreClient import gDataStoreClient
3031
from DIRAC.WorkloadManagementSystem.Client import PilotStatus
@@ -722,7 +723,15 @@ def _submitPilotsToQueue(self, pilotsToSubmit, ce, queue):
722723
0,
723724
"Failed",
724725
)
725-
726+
if self.sendSubmissionMonitoring:
727+
self.sendPilotSubmissionMonitoring(
728+
self.queueDict[queue]["Site"],
729+
self.queueDict[queue]["CEName"],
730+
self.queueDict[queue]["QueueName"],
731+
pilotsToSubmit,
732+
0,
733+
"Failed",
734+
)
726735
self.failedQueues[queue] += 1
727736
return submitResult
728737

@@ -745,6 +754,15 @@ def _submitPilotsToQueue(self, pilotsToSubmit, ce, queue):
745754
len(pilotList),
746755
"Succeeded",
747756
)
757+
if self.sendSubmissionMonitoring:
758+
self.sendPilotSubmissionMonitoring(
759+
self.queueDict[queue]["Site"],
760+
self.queueDict[queue]["CEName"],
761+
self.queueDict[queue]["QueueName"],
762+
len(pilotList),
763+
len(pilotList),
764+
"Succeeded",
765+
)
748766

749767
return S_OK((pilotList, stampDict))
750768

@@ -1365,3 +1383,40 @@ def sendPilotSubmissionAccounting(self, siteName, ceName, queueName, numTotal, n
13651383
if not result["OK"]:
13661384
self.log.error("Error in Commit:" + result["Message"])
13671385
return result
1386+
1387+
def sendPilotSubmissionMonitoring(self, siteName, ceName, queueName, numTotal, numSucceeded, status):
1388+
"""Send pilot submission accounting record
1389+
1390+
:param str siteName: Site name
1391+
:param str ceName: CE name
1392+
:param str queueName: queue Name
1393+
:param int numTotal: Total number of submission
1394+
:param int numSucceeded: Total number of submission succeeded
1395+
:param str status: 'Succeeded' or 'Failed'
1396+
1397+
:returns: S_OK / S_ERROR
1398+
"""
1399+
1400+
pilotMonitoringReporter = MonitoringReporter(monitoringType="PilotMonitoring")
1401+
pilotMonitoringData = [
1402+
{
1403+
"HostName": "",
1404+
"SiteDirector": "",
1405+
"Site": siteName,
1406+
"CE": ceName,
1407+
"Queue": queueName,
1408+
"Status": status,
1409+
"NumTotal": numTotal,
1410+
"NumSucceded": numSucceeded,
1411+
"timestamp": int(toEpoch(dateTime())),
1412+
}
1413+
]
1414+
pilotMonitoringReporter.addRecord(pilotMonitoringData)
1415+
result = pilotMonitoringReporter.commit()
1416+
1417+
self.log.verbose("Committing pilot submission to monitoring")
1418+
if not result["OK"]:
1419+
self.log.error("Couldn't commit pilot submission to monitoring", result["Message"])
1420+
return S_ERROR()
1421+
self.log.verbose("Done committing to monitoring")
1422+
return S_OK()

0 commit comments

Comments
 (0)