diff --git a/requirements.txt b/requirements.txt index d6ece693..d74f1c62 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ +backports.tempfile==1.0rc1 jsonschema==2.4.0 jsonpath-rw==1.4.0 jsonpath-rw-ext==1.0.0 diff --git a/server/docker/nwchem/Dockerfile b/server/docker/nwchem/Dockerfile new file mode 100644 index 00000000..bd37a054 --- /dev/null +++ b/server/docker/nwchem/Dockerfile @@ -0,0 +1,19 @@ +FROM ubuntu:16.04 +MAINTAINER Chet Nieter + +# +# Run using the following command in teh +# docker run -rm -v :/hpccloud +# + +RUN apt-get -y update && apt-get install -y \ + git \ + python \ +&& rm -rf /var/lib/apt/lits/* + +RUN mkdir /hpccloud +WORKDIR /hpccloud + +RUN git clone https://github.com/wadejong/NWChemOutputToJson.git /opt/NWChemOutputToJson + +ENTRYPOINT ["python", "/opt/NWChemOutputToJson/NWChemJsonConversion.py"] \ No newline at end of file diff --git a/server/taskflows/hpccloud/taskflow/nwchem/__init__.py b/server/taskflows/hpccloud/taskflow/nwchem/__init__.py index f7ad4ea6..b8a0711d 100644 --- a/server/taskflows/hpccloud/taskflow/nwchem/__init__.py +++ b/server/taskflows/hpccloud/taskflow/nwchem/__init__.py @@ -18,6 +18,9 @@ ############################################################################### import json import os +import subprocess +import shutil +from backports.tempfile import TemporaryDirectory from jsonpath_rw import parse from celery.exceptions import Retry @@ -71,6 +74,7 @@ def start(self, *args, **kwargs): submit.s() | \ submit_nwchem_job.s() | \ monitor_nwchem_job.s().set(queue='monitor') | \ + create_json_output.s() | \ upload_output.s() ) super(NWChemTaskFlow, self).start(self, *args, **kwargs) @@ -84,6 +88,7 @@ def create_geometry_symlink(task, job, cluster, fileName): with get_connection(task.taskflow.girder_token, cluster) as conn: conn.execute('ln -s %s %s' % (filePath, linkPath)) + @cumulus.taskflow.task def setup_input(task, *args, **kwargs): input_folder_id = kwargs['input']['folder']['id'] @@ -131,11 +136,10 @@ def create_job(task, upstream_result): task.taskflow.logger.info('Create NWChem job.') input_folder_id = upstream_result['input']['folder']['id'] - # TODO: setup command to run with mpi body = { 'name': 'nwchem_run', 'commands': [ - "mpiexec -n %s nwchem input/%s" % ( + "mpiexec -n %s nwchem input/%s &> nwchem_run.out" % ( upstream_result['numberOfProcs'], upstream_result['nwFilename']) ], @@ -210,11 +214,50 @@ def monitor_nwchem_job(task, upstream_result): task.throws=(Retry,), job = upstream_result['job'] - # TODO - We are currently reaching in and used a 'private' function _monitor_jobs(task, cluster, [job], girder_token=girder_token, monitor_interval=30) return upstream_result +@cumulus.taskflow.task +def create_json_output(task, upstream_result): + task.logger.info('Converting nwchem output to json format.') + cluster = upstream_result['cluster'] + job = upstream_result['job'] + job_dir = job_directory(cluster, job) + out_file = '%s.out' % (job['name']) + + with TemporaryDirectory() as tmp_dir: + # Copy the nwchem output to server + cluster_path = os.path.join(job_dir, out_file) + local_path = os.path.join(tmp_dir, out_file) + with open(local_path, 'w') as local_fp: + with get_connection(task.taskflow.girder_token, cluster) as conn: + with conn.get(cluster_path) as remote_fp: + local_fp.write(remote_fp.read()) + + # Run docker container to post-process results - need to add docker image to upstream_result + command = ['docker', 'run', '--rm', '-v', '%s:/hpccloud' % tmp_dir, + 'chetnieter/nwchem-postprocess', out_file] + p = subprocess.Popen(args=command, stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + stdout, stderr = p.communicate() + + if p.returncode != 0: + task + task.logger.error('Error running Docker container.') + task.logger.error('STDOUT: ' + stdout) + task.logger.error('STDERR: ' + stderr) + raise Exception('Docker returned code {}.'.format(p.returncode)) + + # Copy json file back to cluster? + cluster_path = os.path.join(job_dir, out_file + '.json') + local_path = os.path.join(tmp_dir, out_file + '.json') + with get_connection(task.taskflow.girder_token, cluster) as conn: + with open(local_path, 'r') as local_fp: + conn.put(local_fp, cluster_path) + + return upstream_result + @cumulus.taskflow.task def upload_output(task, upstream_result): task.taskflow.logger.info('Uploading results from cluster')