@@ -146,11 +146,65 @@ def _mapfn(iter):
146146 executor_id = i
147147
148148 # check that there are enough available GPUs (if using tensorflow-gpu) before committing reservation on this node
149- # note: for Spark 3+ w/ GPU allocation, the required number of GPUs should be guaranteed by the resource manager
150- if version .parse (pyspark .__version__ ).base_version < version .parse ('3.0.0' ).base_version :
151- if gpu_info .is_gpu_available ():
152- num_gpus = tf_args .num_gpus if 'num_gpus' in tf_args else 1
153- gpus_to_use = gpu_info .get_gpus (num_gpus )
149+ def _get_gpus (cluster_spec = None ):
150+ gpus = []
151+ is_k8s = 'SPARK_EXECUTOR_POD_IP' in os .environ
152+
153+ # handle explicitly configured tf_args.num_gpus
154+ if 'num_gpus' in tf_args :
155+ requested_gpus = tf_args .num_gpus
156+ user_requested = True
157+ else :
158+ requested_gpus = 0
159+ user_requested = False
160+
161+ # first, try Spark 3 resources API, returning all visible GPUs
162+ # note: num_gpus arg is only used (if supplied) to limit/truncate visible devices
163+ if version .parse (pyspark .__version__ ).base_version >= version .parse ("3.0.0" ).base_version :
164+ from pyspark import TaskContext
165+ context = TaskContext ()
166+ if 'gpu' in context .resources ():
167+ # get all GPUs assigned by resource manager
168+ gpus = context .resources ()['gpu' ].addresses
169+ logger .info ("Spark gpu resources: {}" .format (gpus ))
170+ if user_requested :
171+ if requested_gpus < len (gpus ):
172+ # override/truncate list, if explicitly configured
173+ logger .warn ("Requested {} GPU(s), but {} available" .format (requested_gpus , len (gpus )))
174+ gpus = gpus [:requested_gpus ]
175+ else :
176+ # implicitly requested by Spark 3
177+ requested_gpus = len (gpus )
178+
179+ # if not in K8s pod and GPUs available, just use original allocation code (defaulting to 1 GPU if available)
180+ # Note: for K8s, there is a bug with the Nvidia device_plugin which can show GPUs for non-GPU pods that are hosted on GPU nodes
181+ if not is_k8s and gpu_info .is_gpu_available () and not gpus :
182+ # default to one GPU if not specified explicitly
183+ requested_gpus = max (1 , requested_gpus ) if not user_requested else requested_gpus
184+ if requested_gpus > 0 :
185+ if cluster_spec :
186+ # compute my index relative to other nodes on the same host (for GPU allocation)
187+ my_addr = cluster_spec [job_name ][task_index ]
188+ my_host = my_addr .split (':' )[0 ]
189+ flattened = [v for sublist in cluster_spec .values () for v in sublist ]
190+ local_peers = [p for p in flattened if p .startswith (my_host )]
191+ my_index = local_peers .index (my_addr )
192+ else :
193+ my_index = 0
194+
195+ # try to allocate a GPU
196+ gpus = gpu_info .get_gpus (requested_gpus , my_index , format = gpu_info .AS_LIST )
197+
198+ if user_requested and len (gpus ) < requested_gpus :
199+ raise Exception ("Unable to allocate {} GPU(s) from available GPUs: {}" .format (requested_gpus , gpus ))
200+
201+ gpus_to_use = ',' .join (gpus )
202+ if gpus :
203+ logger .info ("Requested {} GPU(s), setting CUDA_VISIBLE_DEVICES={}" .format (requested_gpus if user_requested else len (gpus ), gpus_to_use ))
204+ os .environ ['CUDA_VISIBLE_DEVICES' ] = gpus_to_use
205+
206+ # try GPU allocation at executor startup so we can try to fail out if unsuccessful
207+ _get_gpus ()
154208
155209 # assign TF job/task based on provided cluster_spec template (or use default/null values)
156210 job_name = 'default'
@@ -309,34 +363,7 @@ def _mapfn(iter):
309363
310364 # reserve GPU(s) again, just before launching TF process (in case situation has changed)
311365 # and setup CUDA_VISIBLE_DEVICES accordingly
312- if gpu_info .is_gpu_available ():
313-
314- gpus_to_use = None
315- # For Spark 3+, try to get GPU resources from TaskContext first
316- if version .parse (pyspark .__version__ ).base_version >= version .parse ("3.0.0" ).base_version :
317- from pyspark import TaskContext
318- context = TaskContext ()
319- if 'gpu' in context .resources ():
320- # use ALL GPUs assigned by resource manager
321- gpus = context .resources ()['gpu' ].addresses
322- num_gpus = len (gpus )
323- gpus_to_use = ',' .join (gpus )
324-
325- if not gpus_to_use :
326- # compute my index relative to other nodes on the same host (for GPU allocation)
327- my_addr = cluster_spec [job_name ][task_index ]
328- my_host = my_addr .split (':' )[0 ]
329- flattened = [v for sublist in cluster_spec .values () for v in sublist ]
330- local_peers = [p for p in flattened if p .startswith (my_host )]
331- my_index = local_peers .index (my_addr )
332-
333- # default to one GPU if not specified explicitly
334- num_gpus = tf_args .num_gpus if 'num_gpus' in tf_args else 1
335- gpus_to_use = gpu_info .get_gpus (num_gpus , my_index )
336-
337- gpu_str = "GPUs" if num_gpus > 1 else "GPU"
338- logger .info ("Requested {} {}, setting CUDA_VISIBLE_DEVICES={}" .format (num_gpus , gpu_str , gpus_to_use ))
339- os .environ ['CUDA_VISIBLE_DEVICES' ] = gpus_to_use
366+ _get_gpus (cluster_spec = cluster_spec )
340367
341368 # create a context object to hold metadata for TF
342369 ctx = TFNodeContext (executor_id , job_name , task_index , cluster_spec , cluster_meta ['default_fs' ], cluster_meta ['working_dir' ], TFSparkNode .mgr )
0 commit comments