Skip to content

Commit e2194fc

Browse files
authored
Merge pull request #47080 from rmankel/rmankel-updateMPS
Update of Millepede production system (MPS): large campaigns
2 parents d7b551b + ae3403a commit e2194fc

File tree

6 files changed

+182
-38
lines changed

6 files changed

+182
-38
lines changed

Alignment/MillePedeAlignmentAlgorithm/python/mpslib/Mpslibclass.py

Lines changed: 86 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@
4242
import time
4343
import os
4444
import sys
45+
import re
46+
import math
47+
import fileinput
4548

4649
#-------------------------------------------------------------------------------
4750
class jobdatabase:
@@ -144,33 +147,49 @@ def print_memdb(self):
144147
print('pedeMem:\t', self.pedeMem, '\n')
145148

146149
#print interesting Job-level lists ---- to add: t/evt, fix remarks
147-
print('### dir jobid stat try rtime nevt remark weight name')
150+
headFmt = '### dir jobid stat try rtime nevt remark weight name'
151+
jobFmt = '%03d %6s %10s%6s %3d %5d %6d %12s %5s %s'
152+
mrgFmt = '%s %6s %10s%6s %3d %5d %6d %12s %5s %s'
153+
if self.nJobs>999:
154+
headFmt = '#### dir jobid stat try rtime nevt remark weight name'
155+
jobFmt = '%04d %7s %10s%6s %3d %5d %6d %12s %5s %s'
156+
mrgFmt = '%s %7s %10s%6s %3d %5d %6d %12s %5s %s'
157+
if self.nJobs>9999:
158+
jobFmt = '%d %s %10s%6s %3d %5d %6d %12s %5s %s'
159+
mrgFmt = '%s %8s %10s%6s %3d %5d %6d %12s %5s %s'
160+
print(headFmt)
148161
print("------------------------------------------------------------------------------")
149162
for i in range(self.nJobs):
150-
print('%03d %6s %9s %6s %3d %5d %8d %8s %5s %s' % (
163+
remarkField = self.JOBHOST[i]
164+
if self.JOBSTATUS[i] == "FAIL":
165+
remarkField = self.JOBREMARK[i]
166+
print(jobFmt % (
151167
self.JOBNUMBER[i],
152168
self.JOBDIR[i],
153169
self.JOBID[i],
154-
self.JOBSTATUS[i],
170+
self.JOBSTATUS[i][:6],
155171
self.JOBNTRY[i],
156172
self.JOBRUNTIME[i],
157173
self.JOBNEVT[i],
158-
self.JOBHOST[i],
174+
remarkField[:12],
159175
self.JOBSP2[i],
160176
self.JOBSP3[i]))
161177

162178
#print merge Jobs if merge mode
163179
if self.driver == 'merge':
164180
for i in range(self.nJobs,len(self.JOBDIR)):
165-
print('%s %6s %9s %6s %3d %5d %8d %8s %5s %s' % (
181+
remarkField = self.JOBHOST[i]
182+
if (self.JOBSTATUS[i] == "FAIL") or (self.JOBSTATUS[i] == "WARN"):
183+
remarkField = self.JOBREMARK[i]
184+
print(mrgFmt % (
166185
'MMM',
167186
self.JOBDIR[i],
168187
self.JOBID[i],
169-
self.JOBSTATUS[i],
188+
self.JOBSTATUS[i][:6],
170189
self.JOBNTRY[i],
171190
self.JOBRUNTIME[i],
172191
self.JOBNEVT[i],
173-
self.JOBHOST[i],
192+
remarkField[:12],
174193
self.JOBSP2[i],
175194
self.JOBSP3[i]))
176195

@@ -192,7 +211,7 @@ def print_memdb(self):
192211
#-------------------------------------------------------------------------------
193212
# writes a new mps.db file from the members. Replaces the old mps.db
194213
def write_db(self):
195-
self.header = "mps database schema 3.2"
214+
self.header = "mps database schema 4.0"
196215
self.currentTime = int(time.time())
197216
self.elapsedTime = 0;
198217
if self.updateTime != 0:
@@ -218,7 +237,7 @@ def write_db(self):
218237

219238
#write mps.db jobinfo
220239
for i in range(len(self.JOBID)):
221-
DBFILE.write('%03d:%s:%s:%s:%s:%s:%s:%s:%s:%s:%s:%s:%s\n' %
240+
DBFILE.write('%d:%s:%s:%s:%s:%s:%s:%s:%s:%s:%s:%s:%s\n' %
222241
(i+1,
223242
self.JOBDIR[i],
224243
self.JOBID[i],
@@ -252,3 +271,61 @@ def get_class(self, argument=''):
252271
else:
253272
print('\nget_class():\n Know class only for \'mille\' or \'pede\', not %s!\n\n' %argument)
254273
sys.exit(1)
274+
275+
#-------------------------------------------------------------------------------
276+
# Take card file, blank all INFI directives and insert the INFI directives
277+
# from the modifier file instead
278+
def mps_splice(self,inCfg='',modCfg='',outCfg='the.py',isn=0,skip_events=0,max_events=0):
279+
280+
with open(inCfg, 'r') as INFILE:
281+
body = INFILE.read()
282+
283+
# read modifier file
284+
with open(modCfg, 'r') as MODFILE:
285+
mods = MODFILE.read()
286+
mods = mods.strip()
287+
288+
# prepare the new fileNames directive. Delete first line if necessary.
289+
fileNames = mods.split('\n')
290+
if 'CastorPool=' in fileNames[0]:
291+
del fileNames[0]
292+
293+
# replace ISN number (input is a string of three digit number with leading zeros though)
294+
body = re.sub(re.compile('ISN',re.M), isn, body)
295+
296+
# print to outCfg
297+
with open(outCfg, 'w') as OUTFILE:
298+
OUTFILE.write(body)
299+
300+
# Number of total files and number of extends for cms.vstring are needed
301+
numberOfFiles = len(fileNames)
302+
numberOfExtends = int(math.ceil(numberOfFiles/255.))
303+
304+
# Create and insert the readFile.extend lines
305+
for j in range(numberOfExtends):
306+
insertBlock = "readFiles.extend([\n "
307+
i=0
308+
currentStart = j*255
309+
while (i<255) and ((currentStart+i)<numberOfFiles):
310+
entry = fileNames[currentStart+i].strip()
311+
if (i==254) or ((currentStart+i+1)==numberOfFiles):
312+
insertBlock += "\'"+entry+"\'])\n"
313+
else:
314+
insertBlock += "\'"+entry+"\',\n "
315+
i+=1
316+
317+
for line in fileinput.input(outCfg, inplace=1):
318+
print(line,end='')
319+
if re.match('readFiles\s*=\s*cms.untracked.vstring()',line):
320+
print(insertBlock,end='')
321+
322+
if skip_events != 0:
323+
with open(outCfg, "a") as f:
324+
f.write("process.source.skipEvents = cms.untracked.uint32({0:d})\n"
325+
.format(skip_events))
326+
327+
if max_events != 0:
328+
with open(outCfg, "a") as f:
329+
f.write("process.maxEvents = cms.untracked.PSet(input = "
330+
"cms.untracked.int32({0:d}))\n".format(max_events))
331+

Alignment/MillePedeAlignmentAlgorithm/scripts/mps_check.py

Lines changed: 51 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
batchExited = 0
3535
finished = 0
3636
endofjob = 0
37+
badInputFile = 0
3738
eofile = 1 # do not deal with timel yet
3839
timel = 0
3940
killed = 0
@@ -58,6 +59,11 @@
5859
quotaspace = 0
5960
copyerr=0
6061
ispede=0
62+
mismatchedInputFiles = 0
63+
missingBinaryfile = 0
64+
missingTreefile = 0
65+
missingMonitor = 0
66+
zeroEvents = 0
6167

6268
kill_reason = None
6369
pedeLogErrStr = ""
@@ -116,9 +122,15 @@
116122
if match:
117123
cpuFactor = 2.125
118124
cputime = int(round(int(match.group(1))/cpuFactor)) # match.group(1) is the matched digit
125+
if re.search(re.compile('Missing milleBinary',re.M), line):
126+
missingBinaryfile = 1
127+
if re.search(re.compile('Missing treeFile',re.M), line):
128+
missingTreefile = 1
129+
if re.search(re.compile('Missing Monitor',re.M), line):
130+
missingMonitor = 1
119131

120132
# gzip it afterwards:
121-
print('gzip -f '+stdOut)
133+
print('Checked '+stdOut)
122134
os.system('gzip -f '+stdOut)
123135
except IOError as e:
124136
if e.args == (2, "No such file or directory"):
@@ -132,7 +144,7 @@
132144
condor_log = subprocess.check_output(["condor_q", lib.JOBID[i],
133145
"-userlog", log_file,
134146
"-af",
135-
"RemoteSysCpu",
147+
"RemoteUserCpu",
136148
"JobStatus",
137149
"RemoveReason"],
138150
stderr = subprocess.STDOUT).decode()
@@ -186,6 +198,10 @@
186198
with open(eazeLog,'r') as INFILE:
187199
# scan records in input file
188200
for line in INFILE:
201+
# check whether any file could not be opened
202+
if re.search(re.compile('Failed to open the file',re.M), line):
203+
badInputFile = 1
204+
break
189205
# check if end of file has been reached
190206
if re.search(re.compile('\<StorageStatistics\>',re.M), line):
191207
eofile = 1
@@ -211,6 +227,10 @@
211227
# AP 07.09.2009 - Check that the job got to a normal end
212228
if re.search(re.compile('AlignmentProducerAsAnalyzer::endJob\(\)',re.M), line):
213229
endofjob = 1
230+
if re.search(re.compile('MismatchedInputFiles',re.M), line):
231+
mismatchedInputFiles = 1
232+
if re.search(re.compile('Did not process any events',re.M), line):
233+
zeroEvents = 1
214234
if re.search(re.compile('FwkReport -i main_input:sourc',re.M), line):
215235
array = line.split()
216236
nEvent = int(array[5])
@@ -221,6 +241,11 @@
221241
if nEvent==0 and re.search(re.compile('FwkReport -i AfterSource',re.M), line):
222242
array = line.split()
223243
nEvent = int(array[5])
244+
# RM 03.02.2023
245+
if nEvent==0:
246+
x = re.search(r"FwkReport -f AfterSource\s+(\d+)",line)
247+
if x:
248+
nEvent = int(x.group(1))
224249

225250
if logZipped == 'true':
226251
os.system('gzip -f '+eazeLog)
@@ -238,7 +263,6 @@
238263
#$mOutSize = `nsls -l $mssDir | grep $milleOut | head -1 | awk '{print \$5}'`;
239264
#$mOutSize = `cmsLs -l $mssDir | grep $milleOut | head -1 | awk '{print \$2}'`;
240265
mOutSize = 0
241-
#print(">>>eoslsoutput:", eoslsoutput, " \ttype(eoslsoutput):", type(eoslsoutput))
242266
for line in eoslsoutput:
243267
if milleOut in line:
244268
columns = line.split()
@@ -369,6 +393,10 @@
369393
farmhost = ' '
370394

371395
okStatus = 'OK'
396+
if badInputFile == 1:
397+
print(lib.JOBDIR[i],lib.JOBID[i],'had FileOpenError')
398+
okStatus = 'FAIL'
399+
remark = 'FileOpenError'
372400
if not eofile == 1:
373401
print(lib.JOBDIR[i],lib.JOBID[i],'did not reach end of file')
374402
okStatus = 'ABEND'
@@ -455,7 +483,26 @@
455483
remark = 'copy to eos failed'
456484
okStatus = 'FAIL'
457485

458-
486+
if missingBinaryfile == 1:
487+
print(lib.JOBDIR[i],lib.JOBID[i],'Missing binary file')
488+
remark = 'Missing binary file'
489+
okStatus = 'FAIL'
490+
if missingTreefile == 1:
491+
print(lib.JOBDIR[i],lib.JOBID[i],'Missing Treefile')
492+
remark = 'Missing Treefile'
493+
okStatus = 'FAIL'
494+
if missingMonitor == 1:
495+
print(lib.JOBDIR[i],lib.JOBID[i],'Missing Monitor')
496+
remark = 'Missing Monitor file'
497+
okStatus = 'FAIL'
498+
if zeroEvents == 1:
499+
print(lib.JOBDIR[i],lib.JOBID[i],'Did not process any events')
500+
remark = 'Did not process any events'
501+
okStatus = 'FAIL'
502+
if mismatchedInputFiles == 1:
503+
print(lib.JOBDIR[i],lib.JOBID[i],'MismatchedInputFiles')
504+
remark = 'MismatchedInputFiles'
505+
okStatus = 'FAIL'
459506
# print warning line to stdout
460507
if okStatus != "OK":
461508
print(lib.JOBDIR[i],lib.JOBID[i],' -------- ',okStatus)

Alignment/MillePedeAlignmentAlgorithm/scripts/mps_kill.pl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ BEGIN
3030

3131
# parse the arguments
3232
while (@ARGV) {
33-
$arg = shift(ARGV);
33+
$arg = shift(@ARGV);
3434
if ($arg =~ /\A-/) { # check for option
3535
if ($arg =~ "h") {
3636
$helpwanted = 1;

Alignment/MillePedeAlignmentAlgorithm/scripts/mps_retry.pl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ BEGIN
2626

2727
# parse the arguments
2828
while (@ARGV) {
29-
$arg = shift(ARGV);
29+
$arg = shift(@ARGV);
3030
if ($arg =~ /\A-/) { # check for option
3131
if ($arg =~ "h") {
3232
$helpwanted = 1;

Alignment/MillePedeAlignmentAlgorithm/scripts/mps_setup.py

Lines changed: 20 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import Alignment.MillePedeAlignmentAlgorithm.mpslib.tools as mps_tools
1212
import Alignment.MillePedeAlignmentAlgorithm.mpslib.Mpslibclass as mpslib
1313

14-
1514
parser = argparse.ArgumentParser(description = "Setup local mps database")
1615
parser.add_argument("-m", "--setup-merge", dest = "setup_merge",
1716
action = "store_true", default = False,
@@ -157,28 +156,25 @@
157156
# Create the job directories
158157
nJobExist = 0
159158
if args.append and os.path.isdir("jobData"):
160-
# Append mode, and "jobData" exists
159+
# Append mode, and "jobData" exists. Find the highest existing job number
161160
jobs = os.listdir("jobData")
162-
job_regex = re.compile(r"job([0-9]{3})") # should we really restrict it to 3 digits?
163-
existing_jobs = [job_regex.search(item) for item in jobs]
164-
existing_jobs = [int(job.group(1)) for job in existing_jobs if job is not None]
165-
nJobExist = sorted(existing_jobs)[-1]
166-
167-
if nJobExist == 0 or nJobExist <=0 or nJobExist > 999: # quite rude method... -> enforce job number limit earlier?
168-
# Delete all
169-
mps_tools.remove_existing_object("jobData")
170-
os.makedirs("jobData")
171-
nJobExist = 0;
161+
job_regex = re.compile(r"job(\d+)") # can have any number of digits
162+
existing_jobs_set = set()
163+
for item in jobs:
164+
job_regex = re.compile(r"job(\d+)")
165+
x = job_regex.search(item)
166+
if x:
167+
#print(x.group(1))
168+
existing_jobs_set.add(int(x.group(1)))
169+
nJobExist = max(existing_jobs_set)
172170

173171
for j in range(1, args.n_jobs + 1):
174172
i = j+nJobExist
175173
jobdir = "job{0:03d}".format(i)
176-
print("jobdir", jobdir)
177174
os.makedirs(os.path.join("jobData", jobdir))
178175

179176
# build the absolute job directory path (needed by mps_script)
180177
theJobData = os.path.abspath("jobData")
181-
print("theJobData =", theJobData)
182178

183179
if args.append:
184180
# save current values
@@ -243,7 +239,7 @@
243239
cmd = ["mps_split.pl", args.input_file_list,
244240
str(j if args.max_events is None else 1),
245241
str(args.n_jobs if args.max_events is None else 1)]
246-
print(" ".join(cmd)+" > jobData/{}/theSplit".format(jobdir))
242+
#print(" ".join(cmd)+" > jobData/{}/theSplit".format(jobdir))
247243
with open("jobData/{}/theSplit".format(jobdir), "w") as f:
248244
try:
249245
subprocess.check_call(cmd, stdout = f)
@@ -253,19 +249,20 @@
253249
theIsn = "{0:03d}".format(i)
254250

255251
# create the cfg file
256-
cmd = ["mps_splice.py", args.config_template,
257-
"jobData/{}/theSplit".format(jobdir),
258-
"jobData/{}/the.py".format(jobdir), theIsn]
252+
skip_events = 0
253+
max_events = 0
259254
if args.max_events is not None:
260255
chunk_size = int(args.max_events/args.n_jobs)
261-
event_options = ["--skip-events", str(chunk_size*(j-1))]
256+
skip_events = chunk_size*(j-1)
262257
max_events = (args.max_events - (args.n_jobs-1)*chunk_size
263258
if j == args.n_jobs # last job gets the remaining events
264259
else chunk_size)
265-
event_options.extend(["--max-events", str(max_events)])
266-
cmd.extend(event_options)
267-
print(" ".join(cmd))
268-
mps_tools.run_checked(cmd)
260+
261+
lib.mps_splice(args.config_template,
262+
"jobData/{}/theSplit".format(jobdir),
263+
"jobData/{}/the.py".format(jobdir),
264+
theIsn)
265+
269266

270267
# create the run script
271268
print("mps_script.pl {} jobData/{}/theScript.sh {}/{} the.py jobData/{}/theSplit {} {} {}".format(args.batch_script, jobdir, theJobData, jobdir, jobdir, theIsn, args.mss_dir, lib.mssDirPool))

0 commit comments

Comments
 (0)