Skip to content

Commit 8b0f36b

Browse files
committed
ENH: Large SGE jobs were overloading on qstat
When large jobs were run with SGE, the accumulation of qstat queries was causing a massive load on the cluster server, and was affecting overal system performance. This modification queries based on userid, stores information about all that users jobs (both running and recently finished), subsequent queries are then addressed by looking at the cached values first, then only updating with a system qstat call periodically. This change so that qstat is called on a more reasonable replication time frame. User can supply externally supplied version of qmake. Prevent DOS style of attacks on the batch processing server by preventing continuous queries by many jobs. This was affecting the performance of the entire server and the excess load of querying when jobs were done in the niavie way was affecting the performance of dispatching new jobs. The following two scripts can be used as plugin_args to provide cached versions of qmake suitable for running huge jobs. plugin_args=dict(template=JOB_SCRIPT, qsub_args="-S /bin/bash -cwd -pe smp 1-12 -l h_vmem=19G,mem_free=9G -o /dev/null -e /dev/null " + CLUSTER_QUEUE, qstatProgramPath=qstat_immediate.sh, qstatCachedProgramPath=qstat_cached.sh)) =qstat_cached.sh=================================================== \#!/bin/bash \# \author Hans J. Johnson \# This file provides a wrapper around qstat to ensure that \# the qstat server is not overloaded with too many requests \#debug_file=/dev/null debug_file=/tmp/TESTINGLOG qstat_cache=/tmp/qstat.xml echo "USING EXTERNAL QSTAT: $@" >> ${debug_file} 2>&1 older_than_60_sec=$( find $(dirname ${qstat_cache}) -maxdepth 1 -name $(basename ${qstat_cache}) -mmin $(echo 5/60 |bc -l) ) if [ -z "${older_than_60_sec}" ]; then DoQstatNow=$(dirname ${0})/qstat_immediate.sh ${DoQstatNow} $@ else echo "using cache $(date)" >> ${debug_file} 2>&1 cat ${qstat_cache} fi =================================================================== =qstat_immediate.sh=============================================== \#!/bin/bash \# \author Hans J. Johnson \# This file provides a wrapper around qstat to ensure that \# the qstat server is not overloaded with too many requests \#debug_file=/dev/null debug_file=/tmp/TESTINGLOG qstat_cache=/tmp/qstat.xml echo "USING EXTERNAL QSTAT: $@" >> ${debug_file} 2>&1 echo "Refreshing $(date)" >> ${debug_file} 2>&1 cacheUpdated=0; while [ ${cacheUpdated} -eq 0 ]; do if [ ! -f ${qstat_cache}_lock ]; then touch ${qstat_cache}_lock DoQstatNow=$(which qstat) ${DoQstatNow} $@ > ${qstat_cache}_tmp 2>&1 mv ${qstat_cache}_tmp ${qstat_cache} rm ${qstat_cache}_lock let cacheUpdated=1 else echo "Waiting for contention lock $(date)" >> ${debug_file} 2>&1 sleep 1 fi done cat ${qstat_cache} ===================================================================
1 parent 99240e0 commit 8b0f36b

File tree

2 files changed

+345
-31
lines changed

2 files changed

+345
-31
lines changed

nipype/pipeline/plugins/base.py

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -303,8 +303,8 @@ def _submit_mapnode(self, jobid):
303303
self.mapnodesubids[self.depidx.shape[0] + i] = jobid
304304
self.procs.extend(mapnodesubids)
305305
self.depidx = ssp.vstack((self.depidx,
306-
ssp.lil_matrix(np.zeros((numnodes,
307-
self.depidx.shape[1])))),
306+
ssp.lil_matrix(np.zeros(
307+
(numnodes, self.depidx.shape[1])))),
308308
'lil')
309309
self.depidx = ssp.hstack((self.depidx,
310310
ssp.lil_matrix(
@@ -349,16 +349,19 @@ def _send_procs_to_workers(self, updatehash=False, slots=None, graph=None):
349349
if self._status_callback:
350350
self._status_callback(self.procs[jobid], 'start')
351351
continue_with_submission = True
352-
if str2bool(self.procs[jobid].config['execution']['local_hash_check']):
352+
if str2bool(self.procs[jobid].config['execution']
353+
['local_hash_check']):
353354
logger.debug('checking hash locally')
354355
try:
355356
hash_exists, _, _, _ = self.procs[
356357
jobid].hash_exists()
357358
logger.debug('Hash exists %s' % str(hash_exists))
358359
if (hash_exists and
359-
(self.procs[jobid].overwrite == False or
360-
(self.procs[jobid].overwrite == None and
361-
not self.procs[jobid]._interface.always_run))):
360+
(self.procs[jobid].overwrite == False or
361+
(self.procs[jobid].overwrite == None and
362+
not self.procs[jobid]._interface.always_run)
363+
)
364+
):
362365
continue_with_submission = False
363366
self._task_finished_cb(jobid)
364367
self._remove_node_dirs()
@@ -436,7 +439,8 @@ def _remove_node_dirs(self):
436439
"""Removes directories whose outputs have already been used up
437440
"""
438441
if str2bool(self._config['execution']['remove_node_directories']):
439-
for idx in np.nonzero((self.refidx.sum(axis=1) == 0).__array__())[0]:
442+
for idx in np.nonzero(
443+
(self.refidx.sum(axis=1) == 0).__array__())[0]:
440444
if idx in self.mapnodesubids:
441445
continue
442446
if self.proc_done[idx] and (not self.proc_pending[idx]):
@@ -506,9 +510,13 @@ def _get_result(self, taskid):
506510
'traceback': None}
507511
results_file = None
508512
try:
509-
raise IOError(('Job (%s) finished or terminated, but results file '
510-
'does not exist. Batch dir contains crashdump '
511-
'file if node raised an exception' % node_dir))
513+
error_message = ('Job id ({0}) finished or terminated, but '
514+
'results file does not exist after ({1}) '
515+
'seconds. Batch dir contains crashdump file '
516+
'if node raised an exception.\n'
517+
'Node working directory: ({2}) '.format(
518+
taskid,timeout,node_dir) )
519+
raise IOError(error_message)
512520
except IOError, e:
513521
result_data['traceback'] = format_exc()
514522
else:
@@ -582,13 +590,16 @@ def _get_args(self, node, keywords):
582590
value = getattr(self, "_" + keyword)
583591
if keyword == "template" and os.path.isfile(value):
584592
value = open(value).read()
585-
if hasattr(node, "plugin_args") and isinstance(node.plugin_args, dict) and keyword in node.plugin_args:
586-
if keyword == "template" and os.path.isfile(node.plugin_args[keyword]):
593+
if hasattr(node, "plugin_args") and
594+
isinstance(node.plugin_args, dict) and keyword in node.plugin_args:
595+
if keyword == "template" and
596+
os.path.isfile(node.plugin_args[keyword]):
587597
tmp_value = open(node.plugin_args[keyword]).read()
588598
else:
589599
tmp_value = node.plugin_args[keyword]
590600

591-
if 'overwrite' in node.plugin_args and node.plugin_args['overwrite']:
601+
if 'overwrite' in node.plugin_args and
602+
node.plugin_args['overwrite']:
592603
value = tmp_value
593604
else:
594605
value += tmp_value

0 commit comments

Comments
 (0)