Skip to content

Commit 9f0709f

Browse files
committed
updated pmonitor, minor corrections
1 parent db66bf3 commit 9f0709f

File tree

4 files changed

+50
-19
lines changed

4 files changed

+50
-19
lines changed

cems/src/main/python/jasmin/jasmin_job_monitor.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ def run(self, pm_request):
3737

3838
return 0
3939
except (RuntimeError, ValueError) as e:
40-
sys.stderr.write(e)
40+
sys.stderr.write(str(e))
4141
return 1
4242

4343
@staticmethod
@@ -220,7 +220,8 @@ def parse_jobs_call(self, message):
220220
def _extract_id_and_status(self, line):
221221
tokens = line.split()
222222

223-
if len(tokens) < 8:
223+
# seven tokens are used for canceled jobs, 8 is standard tb 2022-09-21
224+
if len(tokens) < 7:
224225
raise ValueError("unable to handle 'squeue' result: " + line)
225226

226227
status_code = self._status_to_enum(tokens[4])

cems/src/main/python/pmonitor.py

Lines changed: 43 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
__author__ = "Martin Böttcher, Brockmann Consult GmbH"
77
__copyright__ = "Copyright 2016, Brockmann Consult GmbH"
88
__license__ = "For use with Calvalus processing systems"
9-
__version__ = "1.9"
9+
__version__ = "1.12"
1010
__email__ = "[email protected]"
1111
__status__ = "Production"
1212

@@ -30,6 +30,12 @@
3030
# fix simulation in case of async request submission
3131
# changes in 1.9
3232
# use utf-8 encoding in communication with sub-processes
33+
# changes in 1.10
34+
# include polling and retry pending requests also in wait_for_idle for ingestion of offline products from DHuS
35+
# changes in 1.11
36+
# synchronize access to _running for combinations of RMonitor with async steps
37+
# changes in 1.12
38+
# handle cases of duplicate status inquiry in case of slow backend and drop second event instead of raising an exception
3339

3440
import glob
3541
import os
@@ -182,7 +188,7 @@ def __init__(self, inputs, request='', hosts=[('localhost',4)], types=[], weight
182188
cache: directory where working directories are created for steps, defaults to None for no working dir creation
183189
logdir: directory for step log files
184190
simulation: switch to skip actual execution of step, defaults to False
185-
delay: milliseconds to wait after step scheduling to avoid races between concurrent steps, defaults to None for no waiting
191+
delay: seconds to wait after step scheduling to avoid races between concurrent steps, defaults to None for no waiting, can be fraction, 0.01
186192
fair: handles steps in sequence of their generation (and avoids looking through complete list of steps), defaults to True
187193
script: generic step execution script to be used as prefix of each command line, defaults to None for the different steps being executable scripts themselves
188194
polling: generic status polling script called with a list of external request identifiers to inquire the status in form of CSV <id>,<state>[,<msg>]\n
@@ -339,6 +345,11 @@ def wait_for_idle(self, calls):
339345
if not self._backlog_condition:
340346
self._backlog_condition = threading.Condition()
341347
time.sleep(1)
348+
first_iteration = True
349+
if self._polling or self._retrycode:
350+
timeout = self._period
351+
else:
352+
timeout = None
342353
while True:
343354
#print '... mutex 3 acquiring'
344355
with self._mutex:
@@ -358,12 +369,17 @@ def wait_for_idle(self, calls):
358369
if not in_backlog:
359370
#print "no more backlog, return from waiting"
360371
break;
372+
if not first_iteration and self._polling:
373+
self._inquire_status()
374+
elif self._retrycode and self._retry_timestamp and datetime.datetime.now() >= self._retry_timestamp:
375+
self._retry_pending()
361376
#print "waiting for " + in_backlog
362377
#print '... mutex 3 released'
363378
with self._backlog_condition:
364379
#print '... backlog condition wait'
365-
self._backlog_condition.wait()
380+
self._backlog_condition.wait(timeout)
366381
#print '... backlog condition resume'
382+
first_iteration = False
367383

368384

369385
def current_load(self, calls):
@@ -473,9 +489,9 @@ def _process_step(self, call, task_id, parameters, inputs, outputs, host, log_pr
473489
self._observe_step(call, inputs, outputs, parameters, 0)
474490
elif command in self._running and isinstance(self._running[command], str) and not self._simulation:
475491
# async running request, mark as to be inquired
476-
self._running[command] = PMonitor.Args(call, task_id, parameters, inputs, outputs, host, log_prefix, async_, external_id=self._running[command])
477-
sys.__stdout__.write('reconsidering {0}\n'.format(command))
478492
with self._mutex:
493+
self._running[command] = PMonitor.Args(call, task_id, parameters, inputs, outputs, host, log_prefix, async_, external_id=self._running[command])
494+
sys.__stdout__.write('reconsidering {0}\n'.format(command))
479495
self._release_constraint(call, host, hostOnly=True)
480496
self._check_for_mature_tasks()
481497
self._write_status()
@@ -488,20 +504,20 @@ def _process_step(self, call, task_id, parameters, inputs, outputs, host, log_pr
488504
code = 0
489505
if async_ and code == 0 and not self._simulation:
490506
# async submission succeeded, memorise external ID
491-
self._running[command] = PMonitor.Args(call, task_id, parameters, inputs, outputs, host, log_prefix, async_, external_id=output_paths[0])
492507
with self._mutex:
508+
self._running[command] = PMonitor.Args(call, task_id, parameters, inputs, outputs, host, log_prefix, async_, external_id=output_paths[0])
493509
self._release_constraint(call, host, hostOnly=True)
494510
self._check_for_mature_tasks()
495511
elif async_ and not self._simulation:
496512
# async submission failed, mark as pending
497-
self._running[command] = PMonitor.Args(call, task_id, parameters, inputs, outputs, host, log_prefix, async_, external_id='pending')
498513
with self._mutex:
514+
self._running[command] = PMonitor.Args(call, task_id, parameters, inputs, outputs, host, log_prefix, async_, external_id='pending')
499515
self._release_constraint(call, host, hostOnly=True)
500516
self._check_for_mature_tasks()
501517
elif code == self._retrycode:
502518
# sync execution temporarily failed, mark as pending
503-
self._running[command] = PMonitor.Args(call, task_id, parameters, inputs, outputs, host, log_prefix, async_, external_id='pending')
504519
with self._mutex:
520+
self._running[command] = PMonitor.Args(call, task_id, parameters, inputs, outputs, host, log_prefix, async_, external_id='pending')
505521
if not self._retry_timestamp:
506522
self._retry_timestamp = datetime.datetime.now() + datetime.timedelta(seconds=self._period)
507523
self._release_constraint(call, host, hostOnly=True)
@@ -590,7 +606,7 @@ def _inquire_status(self):
590606
inquiry = {}
591607
# retrieve status with polling script
592608
if external_ids:
593-
# print('retrieving status with ' + self._polling + ' ' + ' '.join(external_ids))
609+
print('retrieving status with ' + self._polling + ' ' + ' '.join(external_ids))
594610
process = subprocess.Popen(self._polling + ' ' + ' '.join(external_ids),
595611
shell=True, bufsize=1, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
596612
for l in process.stdout:
@@ -620,13 +636,25 @@ def _inquire_status(self):
620636
wd = self._cache + '/' + self._request + '/' + '{0:04d}'.format(args.task_id)
621637
if 'cache' in wd:
622638
subprocess.call(['rm', '-rf', wd])
623-
self._finalise_step(args.call, 0, command, args.host, [], args.outputs, typeOnly=True)
624-
self._observe_step(args.call, args.inputs, args.outputs, args.parameters, 0)
625-
some_final_steps = True
639+
try:
640+
self._finalise_step(args.call, 0, command, args.host, [], args.outputs, typeOnly=True)
641+
self._observe_step(args.call, args.inputs, args.outputs, args.parameters, 0)
642+
some_final_steps = True
643+
except KeyError as e:
644+
if command in self._running:
645+
raise e
646+
print('status of ' + command + ' handled before. ignoring')
647+
return
626648
elif status == PMonitor.State.FAILED or status == PMonitor.State.CANCELLED:
627-
self._finalise_step(args.call, 1, command, args.host, [], args.outputs, typeOnly=True)
628-
self._observe_step(args.call, args.inputs, args.outputs, args.parameters, 1)
629-
some_final_steps = True
649+
try:
650+
self._finalise_step(args.call, 1, command, args.host, [], args.outputs, typeOnly=True)
651+
self._observe_step(args.call, args.inputs, args.outputs, args.parameters, 1)
652+
some_final_steps = True
653+
except KeyError as e:
654+
if command in self._running:
655+
raise e
656+
print('status of ' + command + ' handled before. ignoring')
657+
return
630658
elif status == PMonitor.State.DROPPED or status == PMonitor.State.NOT_FOUND or status == PMonitor.State._LOST:
631659
with self._mutex:
632660
# release and re-schedule request

cems/src/main/python/workflow.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ def _get_sensor_pairs(self):
158158
try:
159159
sensor_pair = SensorPair(p, p, self.get_production_period())
160160
sensor_pairs.add(sensor_pair)
161-
except exceptions.ValueError:
161+
except ValueError:
162162
pass
163163
return sorted(list(sensor_pairs), reverse=True)
164164

@@ -203,7 +203,7 @@ def _add_inp_preconditions(self, preconditions):
203203
sensors = self._get_primary_sensors()
204204
for sensor in sensors:
205205
sensor_period = sensor.get_period()
206-
date = sensor_period.get_start_date()- datetime.timedelta(days=1)
206+
date = sensor_period.get_start_date() - datetime.timedelta(days=1)
207207
while date < sensor_period.get_end_date():
208208
chunk = self._get_next_period(date)
209209
if chunk is None:

core/src/main/java/com/bc/fiduceo/db/AbstractDriver.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,8 @@ Integer getSensorId(String sensorName) throws SQLException {
152152
final Statement statement = connection.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_UPDATABLE);
153153
final ResultSet resultSet = statement.executeQuery("SELECT ID FROM SENSOR WHERE NAME = '" + sensorName + "'");
154154

155+
connection.commit();
156+
155157
if (resultSet.first()) {
156158
return resultSet.getInt("ID");
157159
} else {

0 commit comments

Comments
 (0)