Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
backports.tempfile==1.0rc1
jsonschema==2.4.0
jsonpath-rw==1.4.0
jsonpath-rw-ext==1.0.0
Expand Down
19 changes: 19 additions & 0 deletions server/docker/nwchem/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
FROM ubuntu:16.04
MAINTAINER Chet Nieter <[email protected]>

#
# Run using the following command in teh
# docker run -rm -v <path to dir with nwchem output file>:/hpccloud <image_name> <nwchem output file>
#

RUN apt-get -y update && apt-get install -y \
git \
python \
&& rm -rf /var/lib/apt/lits/*

RUN mkdir /hpccloud
WORKDIR /hpccloud

RUN git clone https://github.com/wadejong/NWChemOutputToJson.git /opt/NWChemOutputToJson

ENTRYPOINT ["python", "/opt/NWChemOutputToJson/NWChemJsonConversion.py"]
49 changes: 46 additions & 3 deletions server/taskflows/hpccloud/taskflow/nwchem/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
###############################################################################
import json
import os
import subprocess
import shutil
from backports.tempfile import TemporaryDirectory
from jsonpath_rw import parse
from celery.exceptions import Retry

Expand Down Expand Up @@ -71,6 +74,7 @@ def start(self, *args, **kwargs):
submit.s() | \
submit_nwchem_job.s() | \
monitor_nwchem_job.s().set(queue='monitor') | \
create_json_output.s() | \
upload_output.s() )

super(NWChemTaskFlow, self).start(self, *args, **kwargs)
Expand All @@ -84,6 +88,7 @@ def create_geometry_symlink(task, job, cluster, fileName):
with get_connection(task.taskflow.girder_token, cluster) as conn:
conn.execute('ln -s %s %s' % (filePath, linkPath))


@cumulus.taskflow.task
def setup_input(task, *args, **kwargs):
input_folder_id = kwargs['input']['folder']['id']
Expand Down Expand Up @@ -131,11 +136,10 @@ def create_job(task, upstream_result):
task.taskflow.logger.info('Create NWChem job.')
input_folder_id = upstream_result['input']['folder']['id']

# TODO: setup command to run with mpi
body = {
'name': 'nwchem_run',
'commands': [
"mpiexec -n %s nwchem input/%s" % (
"mpiexec -n %s nwchem input/%s &> nwchem_run.out" % (
upstream_result['numberOfProcs'],
upstream_result['nwFilename'])
],
Expand Down Expand Up @@ -210,11 +214,50 @@ def monitor_nwchem_job(task, upstream_result):
task.throws=(Retry,),

job = upstream_result['job']
# TODO - We are currently reaching in and used a 'private' function
_monitor_jobs(task, cluster, [job], girder_token=girder_token, monitor_interval=30)

return upstream_result

@cumulus.taskflow.task
def create_json_output(task, upstream_result):
task.logger.info('Converting nwchem output to json format.')
cluster = upstream_result['cluster']
job = upstream_result['job']
job_dir = job_directory(cluster, job)
out_file = '%s.out' % (job['name'])

with TemporaryDirectory() as tmp_dir:
# Copy the nwchem output to server
cluster_path = os.path.join(job_dir, out_file)
local_path = os.path.join(tmp_dir, out_file)
with open(local_path, 'w') as local_fp:
with get_connection(task.taskflow.girder_token, cluster) as conn:
with conn.get(cluster_path) as remote_fp:
local_fp.write(remote_fp.read())

# Run docker container to post-process results - need to add docker image to upstream_result
command = ['docker', 'run', '--rm', '-v', '%s:/hpccloud' % tmp_dir,
'chetnieter/nwchem-postprocess', out_file]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we have a kitware account?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a kitware user on Dockerhub.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chetnieter I have created a hpccloud user account, we can use that for our images.

p = subprocess.Popen(args=command, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
stdout, stderr = p.communicate()

if p.returncode != 0:
task
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Look like this is a typo?

task.logger.error('Error running Docker container.')
task.logger.error('STDOUT: ' + stdout)
task.logger.error('STDERR: ' + stderr)
raise Exception('Docker returned code {}.'.format(p.returncode))

# Copy json file back to cluster?
cluster_path = os.path.join(job_dir, out_file + '.json')
local_path = os.path.join(tmp_dir, out_file + '.json')
with get_connection(task.taskflow.girder_token, cluster) as conn:
with open(local_path, 'r') as local_fp:
conn.put(local_fp, cluster_path)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need the JSON back on the cluster, it should just be uploaded to Girder

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chetnieter We should factor out the docker run code into a set of function that can be reused by other taskflows.


return upstream_result

@cumulus.taskflow.task
def upload_output(task, upstream_result):
task.taskflow.logger.info('Uploading results from cluster')
Expand Down