Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 24 additions & 5 deletions src/toil_lib/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,26 @@

def spawn_spark_cluster(job,
numWorkers,
sparkMasterContainer="quay.io/ucsc_cgl/apache-spark-master:1.5.2",
sparkWorkerContainer="quay.io/ucsc_cgl/apache-spark-worker:1.5.2",
cores=None,
memory=None,
disk=None,
overrideLeaderIP=None):
'''
:param numWorkers: The number of worker nodes to have in the cluster. \
Must be greater than or equal to 1.
:param sparkMasterContainer: The Docker image to run for the Spark master.
:param sparkWorkerContainer: The Docker image to run for the Spark worker.
:param cores: Optional parameter to set the number of cores per node. \
If not provided, we use the number of cores on the node that launches \
the service.
:param memory: Optional parameter to set the memory requested per node.
:param disk: Optional parameter to set the disk requested per node.
:type leaderMemory: int or string convertable by bd2k.util.humanize.human2bytes to an int
:type numWorkers: int
:type sparkMasterContainer: str
:type sparkWorkerContainer: str
:type leaderMemory: int or string convertable by bd2k.util.humanize.human2bytes to an int
:type cores: int
:type memory: int or string convertable by bd2k.util.humanize.human2bytes to an int
:type disk: int or string convertable by bd2k.util.humanize.human2bytes to an int
Expand All @@ -49,13 +55,15 @@ def spawn_spark_cluster(job,
if numWorkers < 1:
raise ValueError("Must have more than one worker. %d given." % numWorkers)

leaderService = SparkService(cores=cores,
leaderService = SparkService(sparkMasterContainer,
cores=cores,
memory=memory,
disk=disk,
overrideLeaderIP=overrideLeaderIP)
leaderIP = job.addService(leaderService)
for i in range(numWorkers):
job.addService(WorkerService(leaderIP,
sparkWorkerContainer,
cores=cores,
disk=disk,
memory=memory),
Expand Down Expand Up @@ -98,16 +106,19 @@ class SparkService(Job.Service):
"""

def __init__(self,
sparkContainer,
memory=None,
disk=None,
cores=None,
overrideLeaderIP=None):
"""
:param sparkContainer: The Docker container name to run for Spark.
:param memory: The amount of memory to be requested for the Spark leader. Optional.
:param disk: The amount of disk to be requested for the Spark leader. Optional.
:param cores: Optional parameter to set the number of cores per node. \
If not provided, we use the number of cores on the node that launches \
the service.
:type sparkContainer: str
:type memory: int or string convertable by bd2k.util.humanize.human2bytes to an int
:type disk: int or string convertable by bd2k.util.humanize.human2bytes to an int
:type cores: int
Expand All @@ -117,6 +128,7 @@ def __init__(self,
cores = multiprocessing.cpu_count()

self.hostname = overrideLeaderIP
self.sparkContainer = sparkContainer

Job.Service.__init__(self, memory=memory, cores=cores, disk=disk)

Expand All @@ -135,9 +147,10 @@ def start(self, job):
self.sparkContainerID = dockerCheckOutput(job=job,
defer=STOP,
workDir=os.getcwd(),
tool="quay.io/ucsc_cgl/apache-spark-master:1.5.2",
tool=self.sparkContainer,
dockerParameters=["--net=host",
"-d",
"-v", "/var/run/docker.sock:/var/run/docker.sock",
"-v", "/mnt/ephemeral/:/ephemeral/:rw",
"-e", "SPARK_MASTER_IP=" + self.hostname,
"-e", "SPARK_LOCAL_DIRS=/ephemeral/spark/local",
Expand Down Expand Up @@ -188,19 +201,24 @@ class WorkerService(Job.Service):
Should not be called outside of `SparkService`.
"""

def __init__(self, masterIP, memory=None, cores=None, disk=None):
def __init__(self, masterIP, sparkContainer, memory=None, cores=None, disk=None):
"""
:param masterIP: The IP of the Spark master.
:param sparkContainer: The container to run for Spark.
:param memory: The memory requirement for each node in the cluster. Optional.
:param disk: The disk requirement for each node in the cluster. Optional.
:param cores: Optional parameter to set the number of cores per node. \
If not provided, we use the number of cores on the node that launches \
the service.
:type masterIP: str
:type sparkContainer: str
:type memory: int or string convertable by bd2k.util.humanize.human2bytes to an int
:type disk: int or string convertable by bd2k.util.humanize.human2bytes to an int
:type cores: int
"""

self.masterIP = masterIP
self.sparkContainer = sparkContainer

if cores is None:
cores = multiprocessing.cpu_count()
Expand All @@ -219,9 +237,10 @@ def start(self, job):
self.sparkContainerID = dockerCheckOutput(job=job,
defer=STOP,
workDir=os.getcwd(),
tool="quay.io/ucsc_cgl/apache-spark-worker:1.5.2",
tool=self.sparkContainer,
dockerParameters=["--net=host",
"-d",
"-v", "/var/run/docker.sock:/var/run/docker.sock",
"-v", "/mnt/ephemeral/:/ephemeral/:rw",
"-e",
"\"SPARK_MASTER_IP=" + self.masterIP + ":" + _SPARK_MASTER_PORT + "\"",
Expand Down
108 changes: 104 additions & 4 deletions src/toil_lib/tools/spark_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,21 +95,28 @@ def _make_parameters(master_ip, default_parameters, memory, arguments, override_
return parameters


def call_conductor(job, master_ip, src, dst, memory=None, override_parameters=None):
def call_conductor(job,
master_ip,
src,
dst,
container="quay.io/ucsc_cgl/conductor",
memory=None, override_parameters=None):
"""
Invokes the Conductor container to copy files between S3 and HDFS and vice versa.
Find Conductor at https://github.com/BD2KGenomics/conductor.

:param toil.Job.job job: The Toil Job calling this function
:param masterIP: The Spark leader IP address.
:param src: URL of file to copy.
:param src: URL of location to copy file to.
:param dst: URL of location to copy file to.
:param container: The container name to run.
:param memory: Gigabytes of memory to provision for Spark driver/worker.
:param override_parameters: Parameters passed by the user, that override our defaults.

:type masterIP: MasterAddress
:type src: string
:type dst: string
:type container: string
:type memory: int or None
:type override_parameters: list of string or None
"""
Expand All @@ -118,7 +125,7 @@ def call_conductor(job, master_ip, src, dst, memory=None, override_parameters=No

docker_parameters = ['--log-driver', 'none', master_ip.docker_parameters(["--net=host"])]
dockerCall(job=job,
tool="quay.io/ucsc_cgl/conductor",
tool=container,
parameters=_make_parameters(master_ip,
[], # no conductor specific spark configuration
memory,
Expand All @@ -128,6 +135,7 @@ def call_conductor(job, master_ip, src, dst, memory=None, override_parameters=No


def call_adam(job, master_ip, arguments,
container="quay.io/ucsc_cgl/adam:0.22.0--7add8b306862902b2bdd28a991e4e8dbc5292504",
memory=None,
override_parameters=None,
run_local=False,
Expand All @@ -138,6 +146,7 @@ def call_adam(job, master_ip, arguments,
:param toil.Job.job job: The Toil Job calling this function
:param masterIP: The Spark leader IP address.
:param arguments: Arguments to pass to ADAM.
:param container: The container name to run.
:param memory: Gigabytes of memory to provision for Spark driver/worker.
:param override_parameters: Parameters passed by the user, that override our defaults.
:param native_adam_path: Path to ADAM executable. If not provided, Docker is used.
Expand All @@ -146,6 +155,7 @@ def call_adam(job, master_ip, arguments,

:type masterIP: MasterAddress
:type arguments: list of string
:type container: string
:type memory: int or None
:type override_parameters: list of string or None
:type native_adam_path: string or None
Expand Down Expand Up @@ -179,7 +189,7 @@ def call_adam(job, master_ip, arguments,
if native_adam_path is None:
docker_parameters = ['--log-driver', 'none', master_ip.docker_parameters(["--net=host"])]
dockerCall(job=job,
tool="quay.io/ucsc_cgl/adam:962-ehf--6e7085f8cac4b9a927dc9fb06b48007957256b80",
tool=container,
dockerParameters=docker_parameters,
parameters=_make_parameters(master_ip,
default_params,
Expand All @@ -191,3 +201,93 @@ def call_adam(job, master_ip, arguments,
default_params +
arguments)


def call_avocado(job, master_ip, arguments,
container="quay.io/ucsc_cgl/avocado:fb20657172d2ce38e5dcd5542b0915db4de7eaa0--036b9354dbd46e62c4d326b4308c4786fc966d6a",
memory=None,
override_parameters=None,
run_local=False):
"""
Invokes the Avocado container. Find Avocado at https://github.com/bigdatagenomics/avocado.

:param toil.Job.job job: The Toil Job calling this function
:param masterIP: The Spark leader IP address.
:param arguments: Arguments to pass to Avocado.
:param container: The container name to run.
:param memory: Gigabytes of memory to provision for Spark driver/worker.
:param override_parameters: Parameters passed by the user, that override our defaults.
:param run_local: If true, runs Spark with the --master local[*] setting, which uses
all cores on the local machine. The master_ip will be disregarded.

:type masterIP: MasterAddress
:type arguments: list of string
:type container: string
:type memory: int or None
:type override_parameters: list of string or None
:type run_local: boolean
"""
if run_local:
master = ["--master", "local[*]"]
else:
master = ["--master",
("spark://%s:%s" % (master_ip, SPARK_MASTER_PORT)),
"--conf", ("spark.hadoop.fs.default.name=hdfs://%s:%s" % (master_ip, HDFS_MASTER_PORT)),]

default_params = (master + [
# set max result size to unlimited, see #177
"--conf", "spark.driver.maxResultSize=0",
"--conf", "spark.kryoserializer.buffer.max=2047m"
])

docker_parameters = ['--log-driver', 'none', master_ip.docker_parameters(["--net=host"])]
dockerCall(job=job,
tool=container,
dockerParameters=docker_parameters,
parameters=_make_parameters(master_ip,
default_params,
memory,
arguments,
override_parameters))


def call_cannoli(job, master_ip, arguments,
container="quay.io/ucsc_cgl/cannoli:0a9321a382fdfad1411cb308a0de1566bf4c8bb4--036b9354dbd46e62c4d326b4308c4786fc966d6a",
memory=None,
override_parameters=None,
run_local=False):
"""
Invokes the Cannoli container. Find Cannoli at https://github.com/bigdatagenomics/cannoli.

:param toil.Job.job job: The Toil Job calling this function
:param masterIP: The Spark leader IP address.
:param arguments: Arguments to pass to Cannoli.
:param container: The container name to run.
:param memory: Gigabytes of memory to provision for Spark driver/worker.
:param override_parameters: Parameters passed by the user, that override our defaults.
:param run_local: If true, runs Spark with the --master local[*] setting, which uses
all cores on the local machine. The master_ip will be disregarded.

:type masterIP: MasterAddress
:type arguments: list of string
:type container: string
:type memory: int or None
:type override_parameters: list of string or None
:type run_local: boolean
"""
if run_local:
master = ["--master", "local[*]"]
else:
master = ["--master",
("spark://%s:%s" % (master_ip, SPARK_MASTER_PORT)),
"--conf", ("spark.hadoop.fs.default.name=hdfs://%s:%s" % (master_ip, HDFS_MASTER_PORT)),]

docker_parameters = ['--log-driver', 'none', master_ip.docker_parameters(["--net=host"])]
dockerCall(job=job,
tool=container,
dockerParameters=docker_parameters,
parameters=_make_parameters(master_ip,
master,
memory,
arguments,
override_parameters))