Skip to content

Commit 7ae5925

Browse files
committed
more deterministic GPU scheduling; add timeout for entire Spark application to handle TF hangs
1 parent 5fc6aa7 commit 7ae5925

File tree

4 files changed

+64
-21
lines changed

4 files changed

+64
-21
lines changed

tensorflowonspark/TFCluster.py

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import logging
2626
import os
2727
import random
28+
import signal
2829
import sys
2930
import threading
3031
import time
@@ -111,12 +112,16 @@ def inference(self, dataRDD, feed_timeout=600, qname='input'):
111112
assert qname in self.queues, "Unknown queue: {}".format(qname)
112113
return dataRDD.mapPartitions(TFSparkNode.inference(self.cluster_info, feed_timeout=feed_timeout, qname=qname))
113114

114-
def shutdown(self, ssc=None, grace_secs=0):
115+
def shutdown(self, ssc=None, grace_secs=0, timeout=259200):
115116
"""Stops the distributed TensorFlow cluster.
116117
118+
For InputMode.SPARK, this will be executed AFTER the `TFCluster.train()` or `TFCluster.inference()` method completes.
119+
For InputMode.TENSORFLOW, this will be executed IMMEDIATELY after `TFCluster.run()` and will wait until the TF worker nodes complete.
120+
117121
Args:
118122
:ssc: *For Streaming applications only*. Spark StreamingContext
119-
:grace_secs: Grace period to wait before terminating the Spark application, e.g. to allow the chief worker to perform any final/cleanup duties like exporting or evaluating the model.
123+
:grace_secs: Grace period to wait after all executors have completed their tasks before terminating the Spark application, e.g. to allow the chief worker to perform any final/cleanup duties like exporting or evaluating the model. Default is 0.
124+
:timeout: Time in seconds to wait for TF cluster to complete before terminating the Spark application. This can be useful if the TF code hangs for any reason. Default is 3 days.
120125
"""
121126
logging.info("Stopping TensorFlow nodes")
122127

@@ -125,6 +130,17 @@ def shutdown(self, ssc=None, grace_secs=0):
125130
for node in self.cluster_info:
126131
(ps_list if node['job_name'] == 'ps' else worker_list).append(node)
127132

133+
# setup execution timeout
134+
def timeout_handler(signum, frame):
135+
logging.error("TensorFlow execution timed out, exiting Spark application with error status")
136+
self.sc.cancelAllJobs()
137+
self.sc.stop()
138+
sys.exit(1)
139+
140+
signal.signal(signal.SIGALRM, timeout_handler)
141+
signal.alarm(timeout)
142+
143+
# wait for Spark Streaming termination or TF app completion for InputMode.TENSORFLOW
128144
if ssc is not None:
129145
# Spark Streaming
130146
while not ssc.awaitTerminationOrTimeout(1):

tensorflowonspark/TFNode.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,13 @@ def start_cluster_server(ctx, num_gpus=1, rdma=False):
8282
logging.info("{0}: Cluster spec: {1}".format(ctx.worker_num, cluster_spec))
8383

8484
if tf.test.is_built_with_cuda() and num_gpus > 0:
85+
# compute my index relative to other nodes placed on the same host (for GPU allocation)
86+
my_addr = cluster_spec[ctx.job_name][ctx.task_index]
87+
my_host = my_addr.split(':')[0]
88+
flattened = [v for sublist in cluster_spec.values() for v in sublist]
89+
local_peers = [p for p in flattened if p.startswith(my_host)]
90+
my_index = local_peers.index(my_addr)
91+
8592
# GPU
8693
gpu_initialized = False
8794
retries = 3
@@ -92,7 +99,7 @@ def start_cluster_server(ctx, num_gpus=1, rdma=False):
9299
num_gpus = 1
93100

94101
# Find a free gpu(s) to use
95-
gpus_to_use = gpu_info.get_gpus(num_gpus)
102+
gpus_to_use = gpu_info.get_gpus(num_gpus, my_index)
96103
gpu_prompt = "GPU" if num_gpus == 1 else "GPUs"
97104
logging.info("{0}: Using {1}: {2}".format(ctx.worker_num, gpu_prompt, gpus_to_use))
98105

tensorflowonspark/TFSparkNode.py

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -139,9 +139,10 @@ def _mapfn(iter):
139139
for i in iter:
140140
executor_id = i
141141

142-
# run quick check of GPU infrastructure if using tensorflow-gpu
142+
# check that there are enough available GPUs (if using tensorflow-gpu) before committing reservation on this node
143143
if tf.test.is_built_with_cuda():
144-
gpus_to_use = gpu_info.get_gpus(1)
144+
num_gpus = tf_args.num_gpus if 'num_gpus' in tf_args else 1
145+
gpus_to_use = gpu_info.get_gpus(num_gpus)
145146

146147
# assign TF job/task based on provided cluster_spec template (or use default/null values)
147148
job_name = 'default'
@@ -261,38 +262,45 @@ def _mapfn(iter):
261262

262263
# construct a TensorFlow clusterspec from cluster_info
263264
sorted_cluster_info = sorted(cluster_info, key=lambda k: k['executor_id'])
264-
spec = {}
265+
cluster_spec = {}
265266
last_executor_id = -1
266267
for node in sorted_cluster_info:
267268
if (node['executor_id'] == last_executor_id):
268269
raise Exception("Duplicate worker/task in cluster_info")
269270
last_executor_id = node['executor_id']
270271
logging.info("node: {0}".format(node))
271272
(njob, nhost, nport) = (node['job_name'], node['host'], node['port'])
272-
hosts = [] if njob not in spec else spec[njob]
273+
hosts = [] if njob not in cluster_spec else cluster_spec[njob]
273274
hosts.append("{0}:{1}".format(nhost, nport))
274-
spec[njob] = hosts
275+
cluster_spec[njob] = hosts
275276

276277
# update TF_CONFIG if cluster spec has a 'master' node (i.e. tf.estimator)
277-
if 'master' in spec:
278+
if 'master' in cluster_spec:
278279
tf_config = json.dumps({
279-
'cluster': spec,
280+
'cluster': cluster_spec,
280281
'task': {'type': job_name, 'index': task_index},
281282
'environment': 'cloud'
282283
})
283284
logging.info("export TF_CONFIG: {}".format(tf_config))
284285
os.environ['TF_CONFIG'] = tf_config
285286

286-
# reserve GPU
287+
# reserve GPU(s) again, just before launching TF process (in case situation has changed)
287288
if tf.test.is_built_with_cuda():
289+
# compute my index relative to other nodes on the same host (for GPU allocation)
290+
my_addr = cluster_spec[job_name][task_index]
291+
my_host = my_addr.split(':')[0]
292+
flattened = [v for sublist in cluster_spec.values() for v in sublist]
293+
local_peers = [p for p in flattened if p.startswith(my_host)]
294+
my_index = local_peers.index(my_addr)
295+
288296
num_gpus = tf_args.num_gpus if 'num_gpus' in tf_args else 1
289-
gpus_to_use = gpu_info.get_gpus(num_gpus)
297+
gpus_to_use = gpu_info.get_gpus(num_gpus, my_index)
290298
gpu_str = "GPUs" if num_gpus > 1 else "GPU"
291299
logging.debug("Requested {} {}, setting CUDA_VISIBLE_DEVICES={}".format(num_gpus, gpu_str, gpus_to_use))
292300
os.environ['CUDA_VISIBLE_DEVICES'] = gpus_to_use
293301

294302
# create a context object to hold metadata for TF
295-
ctx = TFNodeContext(executor_id, job_name, task_index, spec, cluster_meta['default_fs'], cluster_meta['working_dir'], TFSparkNode.mgr)
303+
ctx = TFNodeContext(executor_id, job_name, task_index, cluster_spec, cluster_meta['default_fs'], cluster_meta['working_dir'], TFSparkNode.mgr)
296304

297305
# release port reserved for TF as late as possible
298306
if tmp_sock is not None:

tensorflowonspark/gpu_info.py

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -40,13 +40,14 @@ def _get_gpu():
4040
return gpu
4141

4242

43-
def get_gpus(num_gpu=1):
43+
def get_gpus(num_gpu=1, worker_index=-1):
4444
"""Get list of free GPUs according to nvidia-smi.
4545
4646
This will retry for ``MAX_RETRIES`` times until the requested number of GPUs are available.
4747
4848
Args:
4949
:num_gpu: number of GPUs desired.
50+
:worker_index: index "hint" for allocation of available GPUs.
5051
5152
Returns:
5253
Comma-delimited string of GPU ids, or raises an Exception if the requested number of GPUs could not be found.
@@ -63,9 +64,6 @@ def parse_gpu(gpu_str):
6364
return cols[5].split(')')[0], cols[1].split(':')[0]
6465
gpu_list = [parse_gpu(gpu) for gpu in gpus]
6566

66-
# randomize the search order to get a better distribution of GPUs
67-
random.shuffle(gpu_list)
68-
6967
free_gpus = []
7068
retries = 0
7169
while len(free_gpus) < num_gpu and retries < MAX_RETRIES:
@@ -77,19 +75,33 @@ def parse_gpu(gpu_str):
7775
free_gpus.append(index)
7876

7977
if len(free_gpus) < num_gpu:
80-
# keep trying indefinitely
8178
logging.warn("Unable to find available GPUs: requested={0}, available={1}".format(num_gpu, len(free_gpus)))
8279
retries += 1
8380
time.sleep(30 * retries)
8481
free_gpus = []
8582

86-
# if still can't find GPUs, raise exception
83+
logging.info("Available GPUs: {}".format(free_gpus))
84+
85+
# if still can't find available GPUs, raise exception
8786
if len(free_gpus) < num_gpu:
8887
smi_output = subprocess.check_output(["nvidia-smi", "--format=csv", "--query-compute-apps=gpu_uuid,pid,process_name,used_gpu_memory"]).decode()
8988
logging.info(": {0}".format(smi_output))
90-
raise Exception("Unable to find free GPU:\n{0}".format(smi_output))
89+
raise Exception("Unable to find {} free GPU(s)\n{}".format(num_gpu, smi_output))
90+
91+
# Get logical placement
92+
num_available = len(free_gpus)
93+
if worker_index == -1:
94+
# use original random placement
95+
random.shuffle(free_gpus)
96+
proposed_gpus = free_gpus[:num_gpu]
97+
else:
98+
# ordered by worker index
99+
if worker_index + num_gpu > num_available:
100+
worker_index = worker_index % num_available
101+
proposed_gpus = free_gpus[worker_index:(worker_index + num_gpu)]
102+
logging.info("Proposed GPUs: {}".format(proposed_gpus))
91103

92-
return ','.join(free_gpus[:num_gpu])
104+
return ','.join(str(x) for x in proposed_gpus)
93105

94106

95107
# Function to get the gpu information

0 commit comments

Comments
 (0)