Skip to content

Commit e75dd75

Browse files
trivialfiswbo4958
andauthored
[backport] [pyspark] support gpu transform (dmlc#9542) (dmlc#9559)
--------- Co-authored-by: Bobby Wang <[email protected]>
1 parent 4d387cb commit e75dd75

File tree

5 files changed

+166
-7
lines changed

5 files changed

+166
-7
lines changed

python-package/xgboost/compat.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,18 @@ def is_cudf_available() -> bool:
8888
return False
8989

9090

91+
def is_cupy_available() -> bool:
92+
"""Check cupy package available or not"""
93+
if importlib.util.find_spec("cupy") is None:
94+
return False
95+
try:
96+
import cupy
97+
98+
return True
99+
except ImportError:
100+
return False
101+
102+
91103
try:
92104
import scipy.sparse as scipy_sparse
93105
from scipy.sparse import csr_matrix as scipy_csr

python-package/xgboost/spark/core.py

Lines changed: 92 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@
5959

6060
import xgboost
6161
from xgboost import XGBClassifier
62-
from xgboost.compat import is_cudf_available
62+
from xgboost.compat import is_cudf_available, is_cupy_available
6363
from xgboost.core import Booster, _check_distributed_params
6464
from xgboost.sklearn import DEFAULT_N_ESTIMATORS, XGBModel, _can_use_qdm
6565
from xgboost.training import train as worker_train
@@ -242,6 +242,13 @@ class _SparkXGBParams(
242242
TypeConverters.toList,
243243
)
244244

245+
def set_device(self, value: str) -> "_SparkXGBParams":
246+
"""Set device, optional value: cpu, cuda, gpu"""
247+
_check_distributed_params({"device": value})
248+
assert value in ("cpu", "cuda", "gpu")
249+
self.set(self.device, value)
250+
return self
251+
245252
@classmethod
246253
def _xgb_cls(cls) -> Type[XGBModel]:
247254
"""
@@ -1193,6 +1200,31 @@ def _post_transform(self, dataset: DataFrame, pred_col: Column) -> DataFrame:
11931200
dataset = dataset.drop(pred_struct_col)
11941201
return dataset
11951202

1203+
def _gpu_transform(self) -> bool:
1204+
"""If gpu is used to do the prediction, true to gpu prediction"""
1205+
1206+
if _is_local(_get_spark_session().sparkContext):
1207+
# if it's local model, we just use the internal "device"
1208+
return use_cuda(self.getOrDefault(self.device))
1209+
1210+
gpu_per_task = (
1211+
_get_spark_session()
1212+
.sparkContext.getConf()
1213+
.get("spark.task.resource.gpu.amount")
1214+
)
1215+
1216+
# User don't set gpu configurations, just use cpu
1217+
if gpu_per_task is None:
1218+
if use_cuda(self.getOrDefault(self.device)):
1219+
get_logger("XGBoost-PySpark").warning(
1220+
"Do the prediction on the CPUs since "
1221+
"no gpu configurations are set"
1222+
)
1223+
return False
1224+
1225+
# User already sets the gpu configurations, we just use the internal "device".
1226+
return use_cuda(self.getOrDefault(self.device))
1227+
11961228
def _transform(self, dataset: DataFrame) -> DataFrame:
11971229
# pylint: disable=too-many-statements, too-many-locals
11981230
# Save xgb_sklearn_model and predict_params to be local variable
@@ -1216,21 +1248,77 @@ def _transform(self, dataset: DataFrame) -> DataFrame:
12161248

12171249
_, schema = self._out_schema()
12181250

1251+
is_local = _is_local(_get_spark_session().sparkContext)
1252+
run_on_gpu = self._gpu_transform()
1253+
12191254
@pandas_udf(schema) # type: ignore
12201255
def predict_udf(iterator: Iterator[pd.DataFrame]) -> Iterator[pd.Series]:
12211256
assert xgb_sklearn_model is not None
12221257
model = xgb_sklearn_model
1258+
1259+
from pyspark import TaskContext
1260+
1261+
context = TaskContext.get()
1262+
assert context is not None
1263+
1264+
dev_ordinal = -1
1265+
1266+
if is_cudf_available():
1267+
if is_local:
1268+
if run_on_gpu and is_cupy_available():
1269+
import cupy as cp # pylint: disable=import-error
1270+
1271+
total_gpus = cp.cuda.runtime.getDeviceCount()
1272+
if total_gpus > 0:
1273+
partition_id = context.partitionId()
1274+
# For transform local mode, default the dev_ordinal to
1275+
# (partition id) % gpus.
1276+
dev_ordinal = partition_id % total_gpus
1277+
elif run_on_gpu:
1278+
dev_ordinal = _get_gpu_id(context)
1279+
1280+
if dev_ordinal >= 0:
1281+
device = "cuda:" + str(dev_ordinal)
1282+
get_logger("XGBoost-PySpark").info(
1283+
"Do the inference with device: %s", device
1284+
)
1285+
model.set_params(device=device)
1286+
else:
1287+
get_logger("XGBoost-PySpark").info("Do the inference on the CPUs")
1288+
else:
1289+
msg = (
1290+
"CUDF is unavailable, fallback the inference on the CPUs"
1291+
if run_on_gpu
1292+
else "Do the inference on the CPUs"
1293+
)
1294+
get_logger("XGBoost-PySpark").info(msg)
1295+
1296+
def to_gpu_if_possible(data: ArrayLike) -> ArrayLike:
1297+
"""Move the data to gpu if possible"""
1298+
if dev_ordinal >= 0:
1299+
import cudf # pylint: disable=import-error
1300+
import cupy as cp # pylint: disable=import-error
1301+
1302+
# We must set the device after import cudf, which will change the device id to 0
1303+
# See https://github.com/rapidsai/cudf/issues/11386
1304+
cp.cuda.runtime.setDevice(dev_ordinal) # pylint: disable=I1101
1305+
df = cudf.DataFrame(data)
1306+
del data
1307+
return df
1308+
return data
1309+
12231310
for data in iterator:
12241311
if enable_sparse_data_optim:
12251312
X = _read_csr_matrix_from_unwrapped_spark_vec(data)
12261313
else:
12271314
if feature_col_names is not None:
1228-
X = data[feature_col_names]
1315+
tmp = data[feature_col_names]
12291316
else:
1230-
X = stack_series(data[alias.data])
1317+
tmp = stack_series(data[alias.data])
1318+
X = to_gpu_if_possible(tmp)
12311319

12321320
if has_base_margin:
1233-
base_margin = data[alias.margin].to_numpy()
1321+
base_margin = to_gpu_if_possible(data[alias.margin])
12341322
else:
12351323
base_margin = None
12361324

python-package/xgboost/spark/utils.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from typing import Any, Callable, Dict, Optional, Set, Type
1111

1212
import pyspark
13-
from pyspark import BarrierTaskContext, SparkContext, SparkFiles
13+
from pyspark import BarrierTaskContext, SparkContext, SparkFiles, TaskContext
1414
from pyspark.sql.session import SparkSession
1515

1616
from xgboost import Booster, XGBModel, collective
@@ -129,7 +129,7 @@ def _is_local(spark_context: SparkContext) -> bool:
129129
return spark_context._jsc.sc().isLocal()
130130

131131

132-
def _get_gpu_id(task_context: BarrierTaskContext) -> int:
132+
def _get_gpu_id(task_context: TaskContext) -> int:
133133
"""Get the gpu id from the task resources"""
134134
if task_context is None:
135135
# This is a safety check.

tests/test_distributed/test_gpu_with_spark/test_gpu_spark.py

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import logging
33
import subprocess
44

5+
import numpy as np
56
import pytest
67
import sklearn
78

@@ -13,7 +14,7 @@
1314
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
1415
from pyspark.sql import SparkSession
1516

16-
from xgboost.spark import SparkXGBClassifier, SparkXGBRegressor
17+
from xgboost.spark import SparkXGBClassifier, SparkXGBRegressor, SparkXGBRegressorModel
1718

1819
gpu_discovery_script_path = "tests/test_distributed/test_gpu_with_spark/discover_gpu.sh"
1920

@@ -242,3 +243,33 @@ def test_sparkxgb_regressor_feature_cols_with_gpu(spark_diabetes_dataset_feature
242243
evaluator = RegressionEvaluator(metricName="rmse")
243244
rmse = evaluator.evaluate(pred_result_df)
244245
assert rmse <= 65.0
246+
247+
248+
def test_gpu_transform(spark_diabetes_dataset) -> None:
249+
regressor = SparkXGBRegressor(device="cuda", num_workers=num_workers)
250+
train_df, test_df = spark_diabetes_dataset
251+
model: SparkXGBRegressorModel = regressor.fit(train_df)
252+
253+
# The model trained with GPUs, and transform with GPU configurations.
254+
assert model._gpu_transform()
255+
256+
model.set_device("cpu")
257+
assert not model._gpu_transform()
258+
# without error
259+
cpu_rows = model.transform(test_df).select("prediction").collect()
260+
261+
regressor = SparkXGBRegressor(device="cpu", num_workers=num_workers)
262+
model = regressor.fit(train_df)
263+
264+
# The model trained with CPUs. Even with GPU configurations,
265+
# still prefer transforming with CPUs
266+
assert not model._gpu_transform()
267+
268+
# Set gpu transform explicitly.
269+
model.set_device("cuda")
270+
assert model._gpu_transform()
271+
# without error
272+
gpu_rows = model.transform(test_df).select("prediction").collect()
273+
274+
for cpu, gpu in zip(cpu_rows, gpu_rows):
275+
np.testing.assert_allclose(cpu.prediction, gpu.prediction, atol=1e-3)

tests/test_distributed/test_with_spark/test_spark_local.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -888,6 +888,34 @@ def test_device_param(self, reg_data: RegData, clf_data: ClfData) -> None:
888888
clf = SparkXGBClassifier(device="cuda")
889889
clf._validate_params()
890890

891+
def test_gpu_transform(self, clf_data: ClfData) -> None:
892+
"""local mode"""
893+
classifier = SparkXGBClassifier(device="cpu")
894+
model: SparkXGBClassifierModel = classifier.fit(clf_data.cls_df_train)
895+
896+
with tempfile.TemporaryDirectory() as tmpdir:
897+
path = "file:" + tmpdir
898+
model.write().overwrite().save(path)
899+
900+
# The model trained with CPU, transform defaults to cpu
901+
assert not model._gpu_transform()
902+
903+
# without error
904+
model.transform(clf_data.cls_df_test).collect()
905+
906+
model.set_device("cuda")
907+
assert model._gpu_transform()
908+
909+
model_loaded = SparkXGBClassifierModel.load(path)
910+
911+
# The model trained with CPU, transform defaults to cpu
912+
assert not model_loaded._gpu_transform()
913+
# without error
914+
model_loaded.transform(clf_data.cls_df_test).collect()
915+
916+
model_loaded.set_device("cuda")
917+
assert model_loaded._gpu_transform()
918+
891919

892920
class XgboostLocalTest(SparkTestCase):
893921
def setUp(self):

0 commit comments

Comments
 (0)