Skip to content

Commit f7da938

Browse files
trivialfiswbo4958
andauthored
[backport][pyspark] Support stage-level scheduling (dmlc#9519) (dmlc#9686)
Co-authored-by: Bobby Wang <[email protected]>
1 parent 6ab6577 commit f7da938

File tree

2 files changed

+176
-51
lines changed

2 files changed

+176
-51
lines changed

python-package/xgboost/spark/core.py

Lines changed: 169 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222

2323
import numpy as np
2424
import pandas as pd
25-
from pyspark import SparkContext, cloudpickle
25+
from pyspark import RDD, SparkContext, cloudpickle
2626
from pyspark.ml import Estimator, Model
2727
from pyspark.ml.functions import array_to_vector, vector_to_array
2828
from pyspark.ml.linalg import VectorUDT
@@ -44,6 +44,7 @@
4444
MLWritable,
4545
MLWriter,
4646
)
47+
from pyspark.resource import ResourceProfileBuilder, TaskResourceRequests
4748
from pyspark.sql import Column, DataFrame
4849
from pyspark.sql.functions import col, countDistinct, pandas_udf, rand, struct
4950
from pyspark.sql.types import (
@@ -88,6 +89,7 @@
8889
_get_rabit_args,
8990
_get_spark_session,
9091
_is_local,
92+
_is_standalone_or_localcluster,
9193
deserialize_booster,
9294
deserialize_xgb_model,
9395
get_class_name,
@@ -342,6 +344,54 @@ def _gen_predict_params_dict(self) -> Dict[str, Any]:
342344
predict_params[param.name] = self.getOrDefault(param)
343345
return predict_params
344346

347+
def _validate_gpu_params(self) -> None:
348+
"""Validate the gpu parameters and gpu configurations"""
349+
350+
if use_cuda(self.getOrDefault(self.device)) or self.getOrDefault(self.use_gpu):
351+
ss = _get_spark_session()
352+
sc = ss.sparkContext
353+
354+
if _is_local(sc):
355+
# Support GPU training in Spark local mode is just for debugging
356+
# purposes, so it's okay for printing the below warning instead of
357+
# checking the real gpu numbers and raising the exception.
358+
get_logger(self.__class__.__name__).warning(
359+
"You have enabled GPU in spark local mode. Please make sure your"
360+
" local node has at least %d GPUs",
361+
self.getOrDefault(self.num_workers),
362+
)
363+
else:
364+
executor_gpus = sc.getConf().get("spark.executor.resource.gpu.amount")
365+
if executor_gpus is None:
366+
raise ValueError(
367+
"The `spark.executor.resource.gpu.amount` is required for training"
368+
" on GPU."
369+
)
370+
371+
if not (ss.version >= "3.4.0" and _is_standalone_or_localcluster(sc)):
372+
# We will enable stage-level scheduling in spark 3.4.0+ which doesn't
373+
# require spark.task.resource.gpu.amount to be set explicitly
374+
gpu_per_task = sc.getConf().get("spark.task.resource.gpu.amount")
375+
if gpu_per_task is not None:
376+
if float(gpu_per_task) < 1.0:
377+
raise ValueError(
378+
"XGBoost doesn't support GPU fractional configurations. "
379+
"Please set `spark.task.resource.gpu.amount=spark.executor"
380+
".resource.gpu.amount`"
381+
)
382+
383+
if float(gpu_per_task) > 1.0:
384+
get_logger(self.__class__.__name__).warning(
385+
"%s GPUs for each Spark task is configured, but each "
386+
"XGBoost training task uses only 1 GPU.",
387+
gpu_per_task,
388+
)
389+
else:
390+
raise ValueError(
391+
"The `spark.task.resource.gpu.amount` is required for training"
392+
" on GPU."
393+
)
394+
345395
def _validate_params(self) -> None:
346396
# pylint: disable=too-many-branches
347397
init_model = self.getOrDefault("xgb_model")
@@ -421,53 +471,7 @@ def _validate_params(self) -> None:
421471
"`pyspark.ml.linalg.Vector` type."
422472
)
423473

424-
if use_cuda(self.getOrDefault(self.device)) or self.getOrDefault(self.use_gpu):
425-
gpu_per_task = (
426-
_get_spark_session()
427-
.sparkContext.getConf()
428-
.get("spark.task.resource.gpu.amount")
429-
)
430-
431-
is_local = _is_local(_get_spark_session().sparkContext)
432-
433-
if is_local:
434-
# checking spark local mode.
435-
if gpu_per_task is not None:
436-
raise RuntimeError(
437-
"The spark local mode does not support gpu configuration."
438-
"Please remove spark.executor.resource.gpu.amount and "
439-
"spark.task.resource.gpu.amount"
440-
)
441-
442-
# Support GPU training in Spark local mode is just for debugging
443-
# purposes, so it's okay for printing the below warning instead of
444-
# checking the real gpu numbers and raising the exception.
445-
get_logger(self.__class__.__name__).warning(
446-
"You have enabled GPU in spark local mode. Please make sure your"
447-
" local node has at least %d GPUs",
448-
self.getOrDefault(self.num_workers),
449-
)
450-
else:
451-
# checking spark non-local mode.
452-
if gpu_per_task is not None:
453-
if float(gpu_per_task) < 1.0:
454-
raise ValueError(
455-
"XGBoost doesn't support GPU fractional configurations. "
456-
"Please set `spark.task.resource.gpu.amount=spark.executor"
457-
".resource.gpu.amount`"
458-
)
459-
460-
if float(gpu_per_task) > 1.0:
461-
get_logger(self.__class__.__name__).warning(
462-
"%s GPUs for each Spark task is configured, but each "
463-
"XGBoost training task uses only 1 GPU.",
464-
gpu_per_task,
465-
)
466-
else:
467-
raise ValueError(
468-
"The `spark.task.resource.gpu.amount` is required for training"
469-
" on GPU."
470-
)
474+
self._validate_gpu_params()
471475

472476

473477
def _validate_and_convert_feature_col_as_float_col_list(
@@ -592,6 +596,8 @@ def __init__(self) -> None:
592596
arbitrary_params_dict={},
593597
)
594598

599+
self.logger = get_logger(self.__class__.__name__)
600+
595601
def setParams(self, **kwargs: Any) -> None: # pylint: disable=invalid-name
596602
"""
597603
Set params for the estimator.
@@ -894,6 +900,116 @@ def _get_xgb_parameters(
894900

895901
return booster_params, train_call_kwargs_params, dmatrix_kwargs
896902

903+
def _skip_stage_level_scheduling(self) -> bool:
904+
# pylint: disable=too-many-return-statements
905+
"""Check if stage-level scheduling is not needed,
906+
return true to skip stage-level scheduling"""
907+
908+
if use_cuda(self.getOrDefault(self.device)) or self.getOrDefault(self.use_gpu):
909+
ss = _get_spark_session()
910+
sc = ss.sparkContext
911+
912+
if ss.version < "3.4.0":
913+
self.logger.info(
914+
"Stage-level scheduling in xgboost requires spark version 3.4.0+"
915+
)
916+
return True
917+
918+
if not _is_standalone_or_localcluster(sc):
919+
self.logger.info(
920+
"Stage-level scheduling in xgboost requires spark standalone or "
921+
"local-cluster mode"
922+
)
923+
return True
924+
925+
executor_cores = sc.getConf().get("spark.executor.cores")
926+
executor_gpus = sc.getConf().get("spark.executor.resource.gpu.amount")
927+
if executor_cores is None or executor_gpus is None:
928+
self.logger.info(
929+
"Stage-level scheduling in xgboost requires spark.executor.cores, "
930+
"spark.executor.resource.gpu.amount to be set."
931+
)
932+
return True
933+
934+
if int(executor_cores) == 1:
935+
# there will be only 1 task running at any time.
936+
self.logger.info(
937+
"Stage-level scheduling in xgboost requires spark.executor.cores > 1 "
938+
)
939+
return True
940+
941+
if int(executor_gpus) > 1:
942+
# For spark.executor.resource.gpu.amount > 1, we suppose user knows how to configure
943+
# to make xgboost run successfully.
944+
#
945+
self.logger.info(
946+
"Stage-level scheduling in xgboost will not work "
947+
"when spark.executor.resource.gpu.amount>1"
948+
)
949+
return True
950+
951+
task_gpu_amount = sc.getConf().get("spark.task.resource.gpu.amount")
952+
953+
if task_gpu_amount is None:
954+
# The ETL tasks will not grab a gpu when spark.task.resource.gpu.amount is not set,
955+
# but with stage-level scheduling, we can make training task grab the gpu.
956+
return False
957+
958+
if float(task_gpu_amount) == float(executor_gpus):
959+
# spark.executor.resource.gpu.amount=spark.task.resource.gpu.amount "
960+
# results in only 1 task running at a time, which may cause perf issue.
961+
return True
962+
963+
# We can enable stage-level scheduling
964+
return False
965+
966+
# CPU training doesn't require stage-level scheduling
967+
return True
968+
969+
def _try_stage_level_scheduling(self, rdd: RDD) -> RDD:
970+
"""Try to enable stage-level scheduling"""
971+
972+
if self._skip_stage_level_scheduling():
973+
return rdd
974+
975+
ss = _get_spark_session()
976+
977+
# executor_cores will not be None
978+
executor_cores = ss.sparkContext.getConf().get("spark.executor.cores")
979+
assert executor_cores is not None
980+
981+
# Spark-rapids is a project to leverage GPUs to accelerate spark SQL.
982+
# If spark-rapids is enabled, to avoid GPU OOM, we don't allow other
983+
# ETL gpu tasks running alongside training tasks.
984+
spark_plugins = ss.conf.get("spark.plugins", " ")
985+
assert spark_plugins is not None
986+
spark_rapids_sql_enabled = ss.conf.get("spark.rapids.sql.enabled", "true")
987+
assert spark_rapids_sql_enabled is not None
988+
989+
task_cores = (
990+
int(executor_cores)
991+
if "com.nvidia.spark.SQLPlugin" in spark_plugins
992+
and "true" == spark_rapids_sql_enabled.lower()
993+
else (int(executor_cores) // 2) + 1
994+
)
995+
996+
# Each training task requires cpu cores > total executor cores//2 + 1 which can
997+
# make sure the tasks be sent to different executors.
998+
#
999+
# Please note that we can't use GPU to limit the concurrent tasks because of
1000+
# https://issues.apache.org/jira/browse/SPARK-45527.
1001+
1002+
task_gpus = 1.0
1003+
treqs = TaskResourceRequests().cpus(task_cores).resource("gpu", task_gpus)
1004+
rp = ResourceProfileBuilder().require(treqs).build
1005+
1006+
self.logger.info(
1007+
"XGBoost training tasks require the resource(cores=%s, gpu=%s).",
1008+
task_cores,
1009+
task_gpus,
1010+
)
1011+
return rdd.withResources(rp)
1012+
8971013
def _fit(self, dataset: DataFrame) -> "_SparkXGBModel":
8981014
# pylint: disable=too-many-statements, too-many-locals
8991015
self._validate_params()
@@ -994,14 +1110,16 @@ def _train_booster(
9941110
)
9951111

9961112
def _run_job() -> Tuple[str, str]:
997-
ret = (
1113+
rdd = (
9981114
dataset.mapInPandas(
999-
_train_booster, schema="config string, booster string" # type: ignore
1115+
_train_booster, # type: ignore
1116+
schema="config string, booster string",
10001117
)
10011118
.rdd.barrier()
10021119
.mapPartitions(lambda x: x)
1003-
.collect()[0]
10041120
)
1121+
rdd_with_resource = self._try_stage_level_scheduling(rdd)
1122+
ret = rdd_with_resource.collect()[0]
10051123
return ret[0], ret[1]
10061124

10071125
get_logger("XGBoost-PySpark").info(

python-package/xgboost/spark/utils.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,13 @@ def _is_local(spark_context: SparkContext) -> bool:
129129
return spark_context._jsc.sc().isLocal()
130130

131131

132+
def _is_standalone_or_localcluster(spark_context: SparkContext) -> bool:
133+
master = spark_context.getConf().get("spark.master")
134+
return master is not None and (
135+
master.startswith("spark://") or master.startswith("local-cluster")
136+
)
137+
138+
132139
def _get_gpu_id(task_context: TaskContext) -> int:
133140
"""Get the gpu id from the task resources"""
134141
if task_context is None:

0 commit comments

Comments
 (0)