diff --git a/dist/clusters-presets.json b/dist/clusters-presets.json index 43142fba..2e4bd53c 100644 --- a/dist/clusters-presets.json +++ b/dist/clusters-presets.json @@ -12,6 +12,7 @@ "numberOfSlots": 32 } }, + "config.sparkmpi.sparkPath": "/data/kitware/SparkMPI", "config.paraview.installDir": "/data/kitware/opt/paraview", "config.pyfr.cuda": true, "config.pyfr.opencl": [], @@ -38,6 +39,7 @@ "numberOfSlots": 1 } }, + "config.sparkmpi.sparkPath": "", "config.paraview.installDir": "/opt/paraview", "config.pyfr.cuda": true, "config.pyfr.opencl": [], @@ -65,6 +67,7 @@ "numberOfSlots": 1 } }, + "config.sparkmpi.sparkPath": "", "config.paraview.installDir": "/opt/paraview", "config.pyfr.cuda": false, "config.pyfr.opencl": [], diff --git a/samples/spark-mpi/README.md b/samples/spark-mpi/README.md new file mode 100644 index 00000000..adbf1fe9 --- /dev/null +++ b/samples/spark-mpi/README.md @@ -0,0 +1,17 @@ +# Spark-MPI Workflow + +This workflow is currently a work in progress. In this folder there are three project files for a demo: + +- `pvw-spark.py` - Spark script +- `start.sh` - Start script +- `TiltSeries_NanoParticle_doi.tif` - Input + +These files are based off [Kitware/spark-mpi-experimentation/14-spark-pipeline](https://github.com/Kitware/spark-mpi-experimentation/tree/master/experimentations/14-spark-pipeline). + +## ToDo + +Figure out how to get the Spark script to communicate back to HPCCloud. Primarily, trigger workflow status changes so the client knows when it's possible to connect to the target machine for visualization. + +## Warning + +At time of writing the only computer this demo could run on is "Beast" a machine at KSW. To give an idea of a machine this workflow takes, Beast has about 64GB of RAM and starting this workflow with the wrong config can make Beast run out of memory. \ No newline at end of file diff --git a/samples/spark-mpi/TiltSeries_NanoParticle_doi.tif b/samples/spark-mpi/TiltSeries_NanoParticle_doi.tif new file mode 100644 index 00000000..d99e4a81 Binary files /dev/null and b/samples/spark-mpi/TiltSeries_NanoParticle_doi.tif differ diff --git a/samples/spark-mpi/pvw-spark.py b/samples/spark-mpi/pvw-spark.py new file mode 100644 index 00000000..256dc815 --- /dev/null +++ b/samples/spark-mpi/pvw-spark.py @@ -0,0 +1,587 @@ +# based on https://github.com/Kitware/spark-mpi-experimentation/blob/master/experimentations/14-spark-pipeline/pvw-spark.py +from __future__ import print_function +import os +import sys +import time +from datetime import datetime +import pyspark + +from pyspark import SparkContext + +from paraview import simple +import vtk +import numpy as np +import scipy.sparse as ss +from vtk.numpy_interface import dataset_adapter as dsa +from vtk.util import numpy_support +from vtk.vtkCommonCore import vtkIntArray + +sc = SparkContext() + +# ------------------------------------------------------------------------- +# MPI configuration +# ------------------------------------------------------------------------- + +hostname = os.uname()[1] +hydra_proxy_port = os.getenv("HYDRA_PROXY_PORT") +pmi_port = hostname + ":" + hydra_proxy_port + +# ------------------------------------------------------------------------- +# Parallel configuration +# ------------------------------------------------------------------------- + +nbSparkPartition = int(os.environ["SPARK_SIZE"]) +nbMPIPartition = int(os.environ["MPI_SIZE"]) + +# ------------------------------------------------------------------------- +# Read Tiff file +# ------------------------------------------------------------------------- + +t0 = time.time() +print('### Start execution - %s' % str(datetime.now())) + +filePath = '%s/data-convert/TiltSeries_NanoParticle_doi_10.1021-nl103400a' + \ + '.tif' % os.environ["SPARK_MPI_PATH"] +reader = simple.TIFFSeriesReader(FileNames=[filePath]) +reader.UpdatePipeline() +imageData = reader.GetClientSideObject().GetOutputDataObject(0) +dataArray = imageData.GetPointData().GetScalars() +npArray = np.arange(dataArray.GetNumberOfTuples(), dtype=float) +for idx in range(dataArray.GetNumberOfTuples()): + npArray[idx] = dataArray.GetTuple1(idx) + +t1 = time.time() +print('### Tiff read - %s' % str(t1 - t0)) +t0 = t1 + +# ------------------------------------------------------------------------- +# Metadata extraction +# ------------------------------------------------------------------------- + +originalDimensions = imageData.GetDimensions() +sizeX = originalDimensions[0] +sizeY = originalDimensions[1] +sizeZ = originalDimensions[2] +sliceSize = sizeX * sizeY +globalMaxIndex = sizeX * sizeY * sizeZ + +print('Dimensions: %d x %d x %d' % (sizeX, sizeY, sizeZ)) + +def _ijk(index): + return [ + (index % sizeX), + (index / sizeX % sizeY), + int(index / sliceSize), + ] + +# ------------------------------------------------------------------------- +# Partition handling +# ------------------------------------------------------------------------- + +sparkStepX = int(sizeX / nbSparkPartition) +sparkStepX_ = sizeX % nbSparkPartition + +mpiStepX = int(sizeX / nbMPIPartition) +mpiStepX_ = sizeX % nbMPIPartition + +# ------------------------------------------------------------------------- + +def getSparkSizeX(partitionId): + if partitionId < sparkStepX_: + return sparkStepX + 1 + return sparkStepX + +def getSparkPartition(index): + x = index % sizeX + for i in range(nbSparkPartition): + chunkSize = getSparkSizeX(i) + if x < chunkSize: + return i + x -= chunkSize + print('Error invalid mpi partition %d %d %d' % (index, sizeX, nbSparkPartition)) + return nbSparkPartition + +# ------------------------------------------------------------------------- + +def getMPISizeX(partitionId): + if partitionId < mpiStepX_: + return mpiStepX + 1 + return mpiStepX + +def getMPIPartition(index): + x = index % sizeX + for i in range(nbMPIPartition): + chunkSize = getMPISizeX(i) + if x < chunkSize: + return i + x -= chunkSize + print('Error invalid mpi partition %d %d %d' % (index, sizeX, nbMPIPartition)) + return nbMPIPartition + +# ----------------------------------------------------------------------------- +# Helper: ParaViewWeb options +# ----------------------------------------------------------------------------- + +class Options(object): + debug = False + nosignalhandlers = True + host = 'localhost' + port = 9753 + timeout = 300 + content = '%s/runtime/visualizer/dist' % (os.environ["SPARK_MPI_PATH"]) + forceFlush = False + sslKey = '' + sslCert = '' + ws = 'ws' + lp = 'lp' + hp = 'hp' + nows = False + nobws = False + nolp = False + fsEndpoints = '' + uploadPath = None + testScriptPath = '' + baselineImgDir = '' + useBrowser = 'nobrowser' + tmpDirectory = '.' + testImgFile = '' + +# ----------------------------------------------------------------------------- +# Helper: Image reconstruction +# ----------------------------------------------------------------------------- + +def parallelRay(Nside, pixelWidth, angles, Nray, rayWidth): + # Suppress warning messages that pops up when dividing zeros + np.seterr(all='ignore') + Nproj = len(angles) # Number of projections + + # Ray coordinates at 0 degrees. + offsets = np.linspace(-(Nray * 1.0 - 1) / 2, + (Nray * 1.0 - 1) / 2, Nray) * rayWidth + # Intersection lines/grid Coordinates + xgrid = np.linspace(-Nside * 0.5, Nside * 0.5, Nside + 1) * pixelWidth + ygrid = np.linspace(-Nside * 0.5, Nside * 0.5, Nside + 1) * pixelWidth + # Initialize vectors that contain matrix elements and corresponding + # row/column numbers + rows = np.zeros(2 * Nside * Nproj * Nray) + cols = np.zeros(2 * Nside * Nproj * Nray) + vals = np.zeros(2 * Nside * Nproj * Nray) + idxend = 0 + + for i in range(0, Nproj): # Loop over projection angles + ang = angles[i] * np.pi / 180. + # Points passed by rays at current angles + xrayRotated = np.cos(ang) * offsets + yrayRotated = np.sin(ang) * offsets + xrayRotated[np.abs(xrayRotated) < 1e-8] = 0 + yrayRotated[np.abs(yrayRotated) < 1e-8] = 0 + + a = -np.sin(ang) + a = rmepsilon(a) + b = np.cos(ang) + b = rmepsilon(b) + + for j in range(0, Nray): # Loop rays in current projection + #Ray: y = tx * x + intercept + t_xgrid = (xgrid - xrayRotated[j]) / a + y_xgrid = b * t_xgrid + yrayRotated[j] + + t_ygrid = (ygrid - yrayRotated[j]) / b + x_ygrid = a * t_ygrid + xrayRotated[j] + # Collect all points + t_grid = np.append(t_xgrid, t_ygrid) + xx = np.append(xgrid, x_ygrid) + yy = np.append(y_xgrid, ygrid) + # Sort the coordinates according to intersection time + I = np.argsort(t_grid) + xx = xx[I] + yy = yy[I] + + # Get rid of points that are outside the image grid + Ix = np.logical_and(xx >= -Nside / 2.0 * pixelWidth, + xx <= Nside / 2.0 * pixelWidth) + Iy = np.logical_and(yy >= -Nside / 2.0 * pixelWidth, + yy <= Nside / 2.0 * pixelWidth) + I = np.logical_and(Ix, Iy) + xx = xx[I] + yy = yy[I] + + # If the ray pass through the image grid + if (xx.size != 0 and yy.size != 0): + # Get rid of double counted points + I = np.logical_and(np.abs(np.diff(xx)) <= + 1e-8, np.abs(np.diff(yy)) <= 1e-8) + I2 = np.zeros(I.size + 1) + I2[0:-1] = I + xx = xx[np.logical_not(I2)] + yy = yy[np.logical_not(I2)] + + # Calculate the length within the cell + length = np.sqrt(np.diff(xx)**2 + np.diff(yy)**2) + #Count number of cells the ray passes through + numvals = length.size + + # Remove the rays that are on the boundary of the box in the + # top or to the right of the image grid + check1 = np.logical_and(b == 0, np.absolute( + yrayRotated[j] - Nside / 2 * pixelWidth) < 1e-15) + check2 = np.logical_and(a == 0, np.absolute( + xrayRotated[j] - Nside / 2 * pixelWidth) < 1e-15) + check = np.logical_not(np.logical_or(check1, check2)) + + if np.logical_and(numvals > 0, check): + # Calculate corresponding indices in measurement matrix + # First, calculate the mid points coord. between two + # adjacent grid points + midpoints_x = rmepsilon(0.5 * (xx[0:-1] + xx[1:])) + midpoints_y = rmepsilon(0.5 * (yy[0:-1] + yy[1:])) + #Calculate the pixel index for mid points + pixelIndicex = \ + (np.floor(Nside / 2.0 - midpoints_y / pixelWidth)) * \ + Nside + (np.floor(midpoints_x / + pixelWidth + Nside / 2.0)) + # Create the indices to store the values to the measurement + # matrix + idxstart = idxend + idxend = idxstart + numvals + idx = np.arange(idxstart, idxend) + # Store row numbers, column numbers and values + rows[idx] = i * Nray + j + cols[idx] = pixelIndicex + vals[idx] = length + else: + print("Ray No. %d at %f degree is out of image grid!" % + (j + 1, angles[i])) + + # Truncate excess zeros. + rows = rows[:idxend] + cols = cols[:idxend] + vals = vals[:idxend] + A = ss.coo_matrix((vals, (rows, cols)), shape=(Nray * Nproj, Nside**2)) + return A + +# ----------------------------------------------------------------------------- + +def rmepsilon(input): + if (input.size > 1): + input[np.abs(input) < 1e-10] = 0 + else: + if np.abs(input) < 1e-10: + input = 0 + return input + +# ------------------------------------------------------------------------- +# Spark reconstruction +# ------------------------------------------------------------------------- + +def reconstruct(partitionId, iterator): + # Extract iOffset + iOffset = 0 + for i in range(partitionId): + iOffset += getSparkSizeX(i) + + # Copy data from iterator into data chunk + t0 = time.time() + + dataChunk = np.empty([getSparkSizeX(partitionId), sizeY, sizeZ], dtype=float, order='F') + for item in iterator: + globalIndex = item[0] + pixelValue = item[1] + ijk = _ijk(globalIndex) + ijk[0] -= iOffset + dataChunk[ijk[0]][ijk[1]][ijk[2]] = pixelValue + + t1 = time.time() + print('%d # Gather %s | ' % (partitionId, str(t1 - t0))) + t0 = t1 + + # Do reconstruction + tiltSeries = dataChunk + tiltAngles = range(-sizeZ + 1, sizeZ, 2) # Delta angle of 2 + (Nslice, Nray, Nproj) = tiltSeries.shape + Niter = 1 + + A = parallelRay(Nray, 1.0, tiltAngles, Nray, 1.0) # A is a sparse matrix + recon = np.empty([Nslice, Nray, Nray], dtype=float, order='F') + + A = A.todense() + + (Nrow, Ncol) = A.shape + rowInnerProduct = np.zeros(Nrow) + row = np.zeros(Ncol) + f = np.zeros(Ncol) # Placeholder for 2d image + beta = 1.0 + + # Calculate row inner product + for j in range(Nrow): + row[:] = A[j, ].copy() + rowInnerProduct[j] = np.dot(row, row) + + for s in range(Nslice): + f[:] = 0 + b = tiltSeries[s, :, :].transpose().flatten() + for i in range(Niter): + for j in range(Nrow): + row[:] = A[j, ].copy() + row_f_product = np.dot(row, f) + a = (b[j] - row_f_product) / rowInnerProduct[j] + f = f + row * a * beta + + recon[s, :, :] = f.reshape((Nray, Nray)) + + (iSize, jSize, kSize) = recon.shape + + t1 = time.time() + print('%d # Reconstruction %s | ' % (partitionId, str(t1 - t0))) + t0 = t1 + + for k in range(kSize): + for j in range(jSize): + for i in range(iSize): + gIdx = i + iOffset + (j * sizeX) + (k * sizeX * sizeY) + yield (gIdx, recon[i][j][k]) + +# ------------------------------------------------------------------------- +# Spark thresholding +# ------------------------------------------------------------------------- + +def threshold(value): + if value[1] < 2.0: + return (value[0], 0.0) + return (value[0], value[1]) + +# ------------------------------------------------------------------------- +# MPI data exchange + visualization +# ------------------------------------------------------------------------- + +def visualization(partitionId, iterator): + # Setup MPI context + import os + os.environ["PMI_PORT"] = pmi_port + os.environ["PMI_ID"] = str(partitionId) + os.environ["PV_ALLOW_BATCH_INTERACTION"] = "1" + os.environ["DISPLAY"] = ":0" + + # Extract iOffset + iOffset = 0 + localSizeX = getMPISizeX(partitionId) + size = localSizeX * sizeY * sizeY + for i in range(partitionId): + iOffset += getMPISizeX(i) + + # Copy data from iterator into data chunk + t0 = time.time() + count = 0 + dataChunk = np.arange(size, dtype=float) + for item in iterator: + count += 1 + globalIndex = item[0] + pixelValue = item[1] + # print('%d # %d: %f' % (partitionId, globalIndex, pixelValue)) + ijk = _ijk(globalIndex) + ijk[0] -= iOffset + destIdx = ijk[0] + (ijk[1] * localSizeX) + (ijk[2] * localSizeX * sizeY) + dataChunk[destIdx] = pixelValue + + t1 = time.time() + print('%d # MPI Gather %s | %d' % (partitionId, str(t1 - t0), count)) + t0 = t1 + + # Configure Paraview for MPI + import paraview + paraview.options.batch = True + + from vtk.vtkPVVTKExtensionsCore import vtkDistributedTrivialProducer + from vtk.vtkCommonCore import vtkIntArray, vtkUnsignedCharArray, vtkFloatArray + from vtk.vtkCommonDataModel import vtkImageData, vtkPointData + + # ------------------------------------------------------------------------- + # Data access helper + # ------------------------------------------------------------------------- + + def createSlice(): + size = sizeY * sizeY + array = np.arange(size, dtype=float) + return array + + def getSideSlice(offset, xSize): + size = sizeY * sizeY + slice = np.arange(size, dtype=float) + + for i in range(size): + slice[i] = dataChunk[int(offset + (i * xSize))] + return slice + + # ------------------------------------------------------------------------- + # Add ghost points from neighbors + # ------------------------------------------------------------------------- + + from mpi4py import MPI + comm = MPI.COMM_WORLD + remoteLowerSlice = None + remoteUpperSlice = None + + if partitionId + 1 < nbMPIPartition: + # Share upper slice + remoteUpperSlice = createSlice() + localUpperSlice = getSideSlice(localSizeX - 1, localSizeX) + comm.Sendrecv(localUpperSlice, (partitionId+1), (2*partitionId + 1), remoteUpperSlice, (partitionId+1), (2*partitionId)) + if partitionId > 0: + # Share lower slice + remoteLowerSlice = createSlice() + localLowerSlice = getSideSlice(0, localSizeX) + comm.Sendrecv(localLowerSlice, (partitionId-1), (2 * (partitionId - 1)), remoteLowerSlice, (partitionId-1), (2 * (partitionId - 1) + 1)) + + t1 = time.time() + print('%d # MPI share %s | ' % (partitionId, str(t1 - t0))) + t0 = t1 + + # ------------------------------------------------------------------------- + + dataset = vtkImageData() + minX = 0 + maxX = 0 + for i in range(partitionId + 1): + minX = maxX + maxX += getMPISizeX(i) + + # ------------------------------------------------------------------------- + # Add slice(s) to data + # ------------------------------------------------------------------------- + + arrayWithSlices = vtkFloatArray() + arrayWithSlices.SetName('Scalars') + if remoteLowerSlice != None and remoteUpperSlice != None: + # Add both slices + minX -= 1 + maxX += 1 + localSizeX = maxX - minX + newSize = localSizeX * sizeY * sizeY + arrayWithSlices.SetNumberOfTuples(newSize) + localOffset = 0 + for i in range(newSize): + if i % localSizeX == 0: + arrayWithSlices.SetTuple1(i, remoteLowerSlice[i / localSizeX]) + elif (i + 1) % localSizeX == 0: + arrayWithSlices.SetTuple1(i, remoteUpperSlice[((i + 1) / localSizeX) - 1]) + else: + arrayWithSlices.SetTuple1(i, dataChunk[localOffset]) + localOffset += 1 + + elif remoteLowerSlice != None: + # Add lower slice + minX -= 1 + localSizeX = maxX - minX + newSize = localSizeX * sizeY * sizeY + arrayWithSlices.SetNumberOfTuples(newSize) + localOffset = 0 + for i in range(newSize): + if i % localSizeX == 0: + arrayWithSlices.SetTuple1(i, remoteLowerSlice[i / localSizeX]) + else: + arrayWithSlices.SetTuple1(i, dataChunk[localOffset]) + localOffset += 1 + elif remoteUpperSlice != None: + # Add upper slice + maxX += 1 + localSizeX = maxX - minX + newSize = localSizeX * sizeY * sizeY + arrayWithSlices.SetNumberOfTuples(newSize) + localOffset = 0 + for i in range(newSize): + if (i + 1) % localSizeX == 0: + arrayWithSlices.SetTuple1(i, remoteUpperSlice[((i + 1) / localSizeX) - 1]) + else: + arrayWithSlices.SetTuple1(i, dataChunk[localOffset]) + localOffset += 1 + + dataset.SetExtent(minX, maxX - 1, 0, sizeY - 1, 0, sizeY - 1) + dataset.GetPointData().SetScalars(arrayWithSlices) + + t1 = time.time() + print('%d # build resutling image data %s | ' % (partitionId, str(t1 - t0))) + t0 = t1 + + # ------------------------------------------------------------------------- + + vtkDistributedTrivialProducer.SetGlobalOutput('Spark', dataset) + + from vtk.vtkPVClientServerCoreCore import vtkProcessModule + from paraview import simple + from vtk.web import server + from paraview.web import wamp as pv_wamp + from paraview.web import protocols as pv_protocols + + class _VisualizerServer(pv_wamp.PVServerProtocol): + dataDir = '/data' + groupRegex = "[0-9]+\\.[0-9]+\\.|[0-9]+\\." + excludeRegex = "^\\.|~$|^\\$" + allReaders = True + viewportScale=1.0 + viewportMaxWidth=2560 + viewportMaxHeight=1440 + + def initialize(self): + # Bring used components + self.registerVtkWebProtocol(pv_protocols.ParaViewWebFileListing(_VisualizerServer.dataDir, "Home", _VisualizerServer.excludeRegex, _VisualizerServer.groupRegex)) + self.registerVtkWebProtocol(pv_protocols.ParaViewWebProxyManager(baseDir=_VisualizerServer.dataDir, allowUnconfiguredReaders=_VisualizerServer.allReaders)) + self.registerVtkWebProtocol(pv_protocols.ParaViewWebColorManager()) + self.registerVtkWebProtocol(pv_protocols.ParaViewWebMouseHandler()) + self.registerVtkWebProtocol(pv_protocols.ParaViewWebViewPort(_VisualizerServer.viewportScale, _VisualizerServer.viewportMaxWidth, + _VisualizerServer.viewportMaxHeight)) + self.registerVtkWebProtocol(pv_protocols.ParaViewWebViewPortImageDelivery()) + self.registerVtkWebProtocol(pv_protocols.ParaViewWebViewPortGeometryDelivery()) + self.registerVtkWebProtocol(pv_protocols.ParaViewWebTimeHandler()) + self.registerVtkWebProtocol(pv_protocols.ParaViewWebSelectionHandler()) + self.registerVtkWebProtocol(pv_protocols.ParaViewWebWidgetManager()) + self.registerVtkWebProtocol(pv_protocols.ParaViewWebKeyValuePairStore()) + self.registerVtkWebProtocol(pv_protocols.ParaViewWebSaveData(baseSavePath=_VisualizerServer.dataDir)) + # Disable interactor-based render calls + simple.GetRenderView().EnableRenderOnInteraction = 0 + simple.GetRenderView().Background = [0,0,0] + # Update interaction mode + pxm = simple.servermanager.ProxyManager() + interactionProxy = pxm.GetProxy('settings', 'RenderViewInteractionSettings') + interactionProxy.Camera3DManipulators = ['Rotate', 'Pan', 'Zoom', 'Pan', 'Roll', 'Pan', 'Zoom', 'Rotate', 'Zoom'] + + + pm = vtkProcessModule.GetProcessModule() + + # ------------------------------------------------------------------------- + + print('%d # > Start visualization - %s | ' % (partitionId, str(datetime.now()))) + + # ------------------------------------------------------------------------- + + args = Options() + if pm.GetPartitionId() == 0: + print('%d # ==> %d' % (partitionId, pm.GetPartitionId())) + producer = simple.DistributedTrivialProducer() + producer.UpdateDataset = '' + producer.UpdateDataset = 'Spark' + producer.WholeExtent = [0, sizeX - 1, 0, sizeY - 1, 0, sizeY - 1] + server.start_webserver(options=args, protocol=_VisualizerServer) + pm.GetGlobalController().TriggerBreakRMIs() + + print('%d # < Stop visualization - %s | ' % (partitionId, str(datetime.now()))) + yield (partitionId, nbMPIPartition) + +# ------------------------------------------------------------------------- +# Spark pipeline +# ------------------------------------------------------------------------- + +def swap(kv): + return (kv[1], kv[0]) + +t0 = time.time() +data = sc.parallelize(npArray) +rdd = data.zipWithIndex().map(swap) + +rdd.partitionBy(nbSparkPartition, getSparkPartition).mapPartitionsWithIndex(reconstruct).map(threshold) \ + .partitionBy(nbMPIPartition, getMPIPartition).mapPartitionsWithIndex(visualization) \ + .collect() + +t1 = time.time() +print('### Total execution time - %s | ' % str(t1 - t0)) + +print('### Stop execution - %s' % str(datetime.now())) diff --git a/samples/spark-mpi/start.sh b/samples/spark-mpi/start.sh new file mode 100644 index 00000000..eee655ab --- /dev/null +++ b/samples/spark-mpi/start.sh @@ -0,0 +1,14 @@ +#!/usr/bin/env bash +# based on https://github.com/Kitware/spark-mpi-experimentation/blob/master/experimentations/14-spark-pipeline/start.sh + +export JAVA_HOME="/usr/lib/jvm/java-8-openjdk-amd64/" +export SPARK_HOME="{{ spark_path }}/spark-2.1.1-bin-hadoop2.7" +export SPARK_MPI_PATH="{{ spark_path }}" +export MPI_SIZE={{ mpiSize }} +export SPARK_SIZE={{ sparkSize }} + +export HYDRA_PROXY_PORT=55555 + +${SPARK_MPI_PATH}/spark-mpi/install/bin/pmiserv -n ${MPI_SIZE} hello & + +${SPARK_MPI_PATH}/spark-2.1.1-bin-hadoop2.7/bin/spark-submit --master spark://beast:7077 ./pvw-spark.py \ No newline at end of file diff --git a/server/taskflows/hpccloud/taskflow/sparkmpi/__init__.py b/server/taskflows/hpccloud/taskflow/sparkmpi/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/server/taskflows/hpccloud/taskflow/sparkmpi/sparkmpi.py b/server/taskflows/hpccloud/taskflow/sparkmpi/sparkmpi.py new file mode 100644 index 00000000..e62c721e --- /dev/null +++ b/server/taskflows/hpccloud/taskflow/sparkmpi/sparkmpi.py @@ -0,0 +1,129 @@ +import json +import os +from jsonpath_rw import parse + +import cumulus.taskflow.cluster +from cumulus.taskflow.cluster import create_girder_client +from cumulus.tasks.job import download_job_input_folders +from cumulus.tasks.job import submit_job, monitor_job +from cumulus.tasks.job import job_directory +# from cumulus.transport import get_connection +# from cumulus.transport.files.upload import upload_file + +# from hpccloud.taskflow.utility import * + + +class SparkMpiTaskFlow(cumulus.taskflow.cluster.ClusterProvisioningTaskFlow): + """ + { + "dataDir": + } + } + """ + + def start(self, *args, **kwargs): + # kwargs['image_spec'] = image_spec + kwargs['next'] = create_sparkmpi_job.s() + super(SparkMpiTaskFlow, self).start(self, *args, **kwargs) + + def terminate(self): + super(SparkMpiTaskFlow, self).terminate() + self.run_task(finish.s()) + + def delete(self): + super(SparkMpiTaskFlow, self).delete() + self.run_task(finish.s()) + + +@cumulus.taskflow.task +def create_sparkmpi_job(task, *args, **kwargs): + # Girder client + client = create_girder_client( + task.taskflow.girder_api_url, task.taskflow.girder_token) + + # Save the cluster in the taskflow for termination + cluster = kwargs.pop('cluster') + task.taskflow.set_metadata('cluster', cluster) + + # Create job definition + task.taskflow.logger.info('Creating SparkMPI job.') + + base_path = os.path.dirname(__file__) + script_path = os.path.join(base_path, 'start.sh') + + if not os.path.exists(script_path): + msg = 'Script path %s does not exists.' % script_path + task.logger.info(msg) + raise Exception(msg) + + with open(script_path, 'r') as fp: + commands = fp.read().splitlines() + + body = { + 'name': 'start.sh', + 'commands': commands, + 'input': [], + 'output': [] + } + + # Register job in girder + attach to taskflow + job = client.post('jobs', data=json.dumps(body)) + task.logger.info('SparkMPI job created: %s' % job['_id']) + task.taskflow.logger.info('SparkMPI job created.') + task.taskflow.set_metadata('jobs', [job]) + + # Capture job working directory + target_dir = job_directory(cluster, job) + task.taskflow.set_metadata('dataDir', target_dir) + + # Move to the next task + submit_sparkmpi_job.delay(cluster, job, *args, **kwargs) + + +@cumulus.taskflow.task +def submit_sparkmpi_job(task, cluster, job, *args, **kwargs): + # Now download job inputs + task.logger.info('Uploading input files to cluster.') + download_job_input_folders(cluster, job, + log_write_url=None, + girder_token=task.taskflow.girder_token, + submit=False) + task.logger.info('Uploading complete.') + + # Setup job parameters + task.taskflow.logger.info('Submitting job to cluster.') + job['params'] = { + 'sparkPath': parse('config.sparkmpi.path').find(cluster), + 'sparkSize': kwargs['sparkSize'], + 'mpiSize': kwargs['mpiSize'] + } + + # Submit job to the queue + submit_job(cluster, job, + log_write_url=None, + girder_token=task.taskflow.girder_token, + monitor=False) + + # Move to the next task + monitor_sparkmpi_job.delay(cluster, job, *args, **kwargs) + + +@cumulus.taskflow.task +def monitor_sparkmpi_job(task, cluster, job, *args, **kwargs): + task.logger.info('Monitoring job on cluster.') + + # Move to next task when monitor job is done + task.taskflow \ + .on_complete(monitor_job) \ + .run(finish.s(task)) + + # Monitor job in a loop manner + task.taskflow.run_task( + monitor_job.s(cluster, job, girder_token=task.taskflow.girder_token)) + + +@cumulus.taskflow.task +def finish(task): + task.taskflow.logger.info('Upload complete.') diff --git a/src/workflows/index.js b/src/workflows/index.js index 1201a60c..a753e69c 100644 --- a/src/workflows/index.js +++ b/src/workflows/index.js @@ -4,6 +4,7 @@ import NWChemNeb from './nwchem/nwchem-neb'; import OpenFOAMTutorial from './openfoam/tutorials'; import OpenFOAMWindTunnel from './openfoam/windtunnel'; import PyFr from './pyfr'; +import SparkMPI from './spark-mpi'; import Visualizer from './visualizer'; const Workflows = { @@ -13,6 +14,7 @@ const Workflows = { OpenFOAMTutorial, OpenFOAMWindTunnel, PyFr, + SparkMPI, Visualizer, }; diff --git a/src/workflows/spark-mpi/components/panels/RuntimeBackend.js b/src/workflows/spark-mpi/components/panels/RuntimeBackend.js new file mode 100644 index 00000000..93737f00 --- /dev/null +++ b/src/workflows/spark-mpi/components/panels/RuntimeBackend.js @@ -0,0 +1,75 @@ +import React from 'react'; +import formStyle from 'HPCCloudStyle/ItemEditor.mcss'; + +export default React.createClass({ + + displayName: 'pyfr-exec/RuntimeBackend', + + propTypes: { + owner: React.PropTypes.func, + parentState: React.PropTypes.object, + /* eslint-disable react/no-unused-prop-types */ + parentProps: React.PropTypes.object, + /* eslint-emable react/no-unused-prop-types */ + }, + + getInitialState() { + return { mpiSize: 2, sparkSize: 2 }; + }, + + // Automatically update backend when needed + componentDidUpdate() { + const active = this.state.active; + const value = this.state[active]; + if (!active || !this.props.owner() || !this.state) { + return; + } + + const backend = { type: active }; + if (active === 'cuda') { + backend['device-id'] = value; + } else if (this.state.backendProfile && this.state.backendProfile[active]) { + const addOn = this.state.backendProfile[active].find((item) => item.name === this.state[active]); + Object.assign(backend, addOn); + } + + // Prevent many call if backend is the same + const lastPush = JSON.stringify(backend); + if (this.lastPush !== lastPush) { + this.lastPush = lastPush; + this.props.owner().setState({ backend }); + } + }, + + updateParam(event) { + const which = event.target.dataset.which; + const value = event.target.value; + this.setState({ [which]: value }); + }, + + render() { + if (this.props.parentState.serverType !== 'Traditional') { + return null; + } + + return ( +
+
+ + +
+
+ + +
+
); + }, +}); diff --git a/src/workflows/spark-mpi/components/root/NewProject.js b/src/workflows/spark-mpi/components/root/NewProject.js new file mode 100644 index 00000000..4f596591 --- /dev/null +++ b/src/workflows/spark-mpi/components/root/NewProject.js @@ -0,0 +1,17 @@ +import React from 'react'; +import { FileUploadEntry } from '../../../../panels/ItemEditor'; + +export default function NewProject(props) { + return ( +
+ + + +
); +} + +// ---------------------------------------------------------------------------- + +NewProject.propTypes = { + owner: React.PropTypes.func, +}; diff --git a/src/workflows/spark-mpi/components/root/NewSimulation.js b/src/workflows/spark-mpi/components/root/NewSimulation.js new file mode 100644 index 00000000..4b78e484 --- /dev/null +++ b/src/workflows/spark-mpi/components/root/NewSimulation.js @@ -0,0 +1,10 @@ +import React from 'react'; +import { FileUploadEntry } from '../../../../panels/ItemEditor'; + +const newSim = (props) => ; + +newSim.propTypes = { + owner: React.PropTypes.func, +}; + +export default newSim; diff --git a/src/workflows/spark-mpi/components/steps/Introduction/content.html b/src/workflows/spark-mpi/components/steps/Introduction/content.html new file mode 100644 index 00000000..23288d8c --- /dev/null +++ b/src/workflows/spark-mpi/components/steps/Introduction/content.html @@ -0,0 +1,10 @@ +
+

ParaView

+

+ Apache Spark along with MPI parallel processing and ParaView. +
+ This workflow is a work in progress, see /samples/spark-mpi/README.md in the repository for more information. +

+ +

Spark-MPI experimentation

+
diff --git a/src/workflows/spark-mpi/components/steps/Introduction/index.js b/src/workflows/spark-mpi/components/steps/Introduction/index.js new file mode 100644 index 00000000..303f38ef --- /dev/null +++ b/src/workflows/spark-mpi/components/steps/Introduction/index.js @@ -0,0 +1,5 @@ +import React from 'react'; +import DocumentationHTML from '../../../../generic/components/steps/DocumentationHTML'; +import staticContent from './content.html'; + +export default (props) => ; diff --git a/src/workflows/spark-mpi/components/steps/Visualization/Start.js b/src/workflows/spark-mpi/components/steps/Visualization/Start.js new file mode 100644 index 00000000..e6b3349e --- /dev/null +++ b/src/workflows/spark-mpi/components/steps/Visualization/Start.js @@ -0,0 +1,58 @@ +import React from 'react'; +import RuntimeBackend from '../../panels/RuntimeBackend'; +import JobSubmission from '../../../../generic/components/steps/JobSubmission'; + +// ---------------------------------------------------------------------------- + +const actionList = [{ name: 'prepareJob', label: 'Start Visualization', icon: '' }]; + +// ---------------------------------------------------------------------------- + +function clusterFilter(cluster) { + return 'config' in cluster && 'paraview' in cluster.config && + 'installDir' in cluster.config.paraview && cluster.config.paraview.installDir; +} + +// ---------------------------------------------------------------------------- + +function getTaskflowMetaData(props) { + console.log('getTaskflowMetaData', props); + return { + sessionId: props.simulation._id, + }; +} + +// ---------------------------------------------------------------------------- + +function getPayload(props) { + console.log('getPayload', props); + const sessionKey = props.simulation._id; + + return { + sessionKey, // for pvw, we use this later for connecting, + input: { + file: { + id: props.simulation.metadata.inputFolder.files.dataset, + }, + }, + output: { + folder: { + id: props.simulation.metadata.outputFolder._id, + }, + }, + }; +} + +// ---------------------------------------------------------------------------- + +export default function openFoamStart(props) { + return ( + ); +} diff --git a/src/workflows/spark-mpi/components/steps/Visualization/View.js b/src/workflows/spark-mpi/components/steps/Visualization/View.js new file mode 100644 index 00000000..cbaf46d2 --- /dev/null +++ b/src/workflows/spark-mpi/components/steps/Visualization/View.js @@ -0,0 +1,77 @@ +import JobMonitoring from '../../../../generic/components/steps/JobMonitoring'; + +import getNetworkError from '../../../../../utils/getNetworkError'; +import { getDisabledButtons } from '../../../../../utils/getDisabledButtons'; +import get from '../../../../../utils/get'; + +import { connect } from 'react-redux'; +import { dispatch } from '../../../../../redux'; +import * as SimActions from '../../../../../redux/actions/projects'; + +// ---------------------------------------------------------------------------- + +function getActions(props) { + const { taskflow } = props; + const jobs = Object.keys(taskflow.jobMapById).map((id) => taskflow.jobMapById[id]); + const actions = []; + + taskflow.actions.forEach((action) => { + actions.push(action); + }); + + // name is paraview and status is running -> visualize + if (jobs.some((job) => job.name === props.primaryJob && job.status === 'running')) { + actions.push('visualize'); + } else if (taskflow.allComplete) { + actions.push('rerun'); + } + + return actions; +} + +// ---------------------------------------------------------------------------- + +function onVisualize(props) { + const location = { + pathname: props.location.pathname, + query: Object.assign({}, props.location.query, { view: 'visualizer' }), + state: props.location.state, + }; + dispatch(SimActions.saveSimulation(props.simulation, null, location)); +} + +// ---------------------------------------------------------------------------- + +export default connect( + (state, props) => { + var taskflowId = null; + const activeProject = state.projects.active; + const activeSimulation = activeProject ? state.projects.simulations[activeProject].active : null; + + if (activeSimulation) { + const simulation = state.simulations.mapById[activeSimulation]; + taskflowId = simulation.steps.Visualization.metadata.taskflowId; + } + + let taskflow = null; + if (taskflowId) { + taskflow = state.taskflows.mapById[taskflowId]; + } + + let cluster = null; + if (get(taskflow, 'flow.meta.cluster._id')) { + const clusterId = taskflow.flow.meta.cluster._id; + cluster = state.preferences.clusters.mapById[clusterId]; + } + + return { + getActions, + taskflow, + taskflowId, + cluster, + disabledButtons: getDisabledButtons(state.network, taskflow), + error: getNetworkError(state, ['terminate_taskflow', 'delete_taskflow']), + actionFunctions: { onVisualize }, + }; + } +)(JobMonitoring); diff --git a/src/workflows/spark-mpi/index.js b/src/workflows/spark-mpi/index.js new file mode 100644 index 00000000..00f4d691 --- /dev/null +++ b/src/workflows/spark-mpi/index.js @@ -0,0 +1,69 @@ +import rootNewProject from './components/root/NewProject'; +import rootNewSimulation from './components/root/NewSimulation'; +import rootViewSimulation from '../generic/components/root/ViewSimulation'; + +import stepIntroduction from './components/steps/Introduction'; +import stepStartViz from './components/steps/Visualization/Start'; +import stepVisualizer from './components/steps/Visualization/View'; + +export default { + name: 'Spark-MPI', + logo: require('./logo.png'), + requiredAttachments: { + project: ['startScript', 'sparkScript', 'input'], + simulation: [], + }, + components: { + NewProject: rootNewProject, + NewSimulation: rootNewSimulation, + ViewSimulation: rootViewSimulation, + }, + config: { + cluster: { + 'config.sparkmpi.sparkPath': { + type: 'text', + label: 'Spark-MPI Directory', + description: 'Path to the SparkMPI directory.', + }, + }, + }, + steps: { + _order: ['Introduction', 'Visualization'], + _initial_state: { + Introduction: { + type: 'information', + metadata: { + alwaysAvailable: true, + }, + }, + Visualization: { + type: 'output', + metadata: { + disabled: false, + }, + }, + }, + Introduction: { + default: stepIntroduction, + }, + Visualization: { + default: stepStartViz, + run: stepVisualizer, + }, + }, + taskFlows: { + Visualization: 'hpccloud.taskflow.paraview.visualizer.ParaViewTaskFlow', + }, + primaryJobs: { + Visualization: 'paraview', + }, + labels: { + Introduction: { + default: 'Introduction', + }, + Visualization: { + default: 'Visualization', + run: 'Visualization (running)', + }, + }, +}; diff --git a/src/workflows/spark-mpi/logo.png b/src/workflows/spark-mpi/logo.png new file mode 100644 index 00000000..ab179b52 Binary files /dev/null and b/src/workflows/spark-mpi/logo.png differ