Skip to content

Commit 97a5b08

Browse files
authored
[pyspark] Use quantile dmatrix. (dmlc#8284)
1 parent ce0382d commit 97a5b08

File tree

9 files changed

+226
-121
lines changed

9 files changed

+226
-121
lines changed

doc/parameter.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -349,7 +349,7 @@ Specify the learning task and the corresponding learning objective. The objectiv
349349
- ``reg:squaredlogerror``: regression with squared log loss :math:`\frac{1}{2}[log(pred + 1) - log(label + 1)]^2`. All input labels are required to be greater than -1. Also, see metric ``rmsle`` for possible issue with this objective.
350350
- ``reg:logistic``: logistic regression.
351351
- ``reg:pseudohubererror``: regression with Pseudo Huber loss, a twice differentiable alternative to absolute loss.
352-
- ``reg:absoluteerror``: Regression with L1 error. When tree model is used, leaf value is refreshed after tree construction.
352+
- ``reg:absoluteerror``: Regression with L1 error. When tree model is used, leaf value is refreshed after tree construction. If used in distributed training, the leaf value is calculated as the mean value from all workers, which is not guaranteed to be optimal.
353353
- ``binary:logistic``: logistic regression for binary classification, output probability
354354
- ``binary:logitraw``: logistic regression for binary classification, output score before logistic transformation
355355
- ``binary:hinge``: hinge loss for binary classification. This makes predictions of 0 or 1, rather than producing probabilities.

python-package/xgboost/core.py

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,11 @@ def from_cstr_to_pystr(data: CStrPptr, length: c_bst_ulong) -> List[str]:
105105
return res
106106

107107

108+
def make_jcargs(**kwargs: Any) -> bytes:
109+
"Make JSON-based arguments for C functions."
110+
return from_pystr_to_cstr(json.dumps(kwargs))
111+
112+
108113
IterRange = TypeVar("IterRange", Optional[Tuple[int, int]], Tuple[int, int])
109114

110115

@@ -1256,7 +1261,7 @@ def __init__(self) -> None: # pylint: disable=super-init-not-called
12561261
def _set_data_from_cuda_interface(self, data: DataType) -> None:
12571262
"""Set data from CUDA array interface."""
12581263
interface = data.__cuda_array_interface__
1259-
interface_str = bytes(json.dumps(interface, indent=2), "utf-8")
1264+
interface_str = bytes(json.dumps(interface), "utf-8")
12601265
_check_call(
12611266
_LIB.XGProxyDMatrixSetDataCudaArrayInterface(self.handle, interface_str)
12621267
)
@@ -1357,6 +1362,26 @@ def __init__( # pylint: disable=super-init-not-called
13571362
"Only one of the eval_qid or eval_group for each evaluation "
13581363
"dataset should be provided."
13591364
)
1365+
if isinstance(data, DataIter):
1366+
if any(
1367+
info is not None
1368+
for info in (
1369+
label,
1370+
weight,
1371+
base_margin,
1372+
feature_names,
1373+
feature_types,
1374+
group,
1375+
qid,
1376+
label_lower_bound,
1377+
label_upper_bound,
1378+
feature_weights,
1379+
)
1380+
):
1381+
raise ValueError(
1382+
"If data iterator is used as input, data like label should be "
1383+
"specified as batch argument."
1384+
)
13601385

13611386
self._init(
13621387
data,
@@ -1405,12 +1430,9 @@ def _init(
14051430
"in iterator to fix this error."
14061431
)
14071432

1408-
args = {
1409-
"nthread": self.nthread,
1410-
"missing": self.missing,
1411-
"max_bin": self.max_bin,
1412-
}
1413-
config = from_pystr_to_cstr(json.dumps(args))
1433+
config = make_jcargs(
1434+
nthread=self.nthread, missing=self.missing, max_bin=self.max_bin
1435+
)
14141436
ret = _LIB.XGQuantileDMatrixCreateFromCallback(
14151437
None,
14161438
it.proxy.handle,
@@ -2375,7 +2397,7 @@ def save_raw(self, raw_format: str = "deprecated") -> bytearray:
23752397
"""
23762398
length = c_bst_ulong()
23772399
cptr = ctypes.POINTER(ctypes.c_char)()
2378-
config = from_pystr_to_cstr(json.dumps({"format": raw_format}))
2400+
config = make_jcargs(format=raw_format)
23792401
_check_call(
23802402
_LIB.XGBoosterSaveModelToBuffer(
23812403
self.handle, config, ctypes.byref(length), ctypes.byref(cptr)
@@ -2570,9 +2592,6 @@ def get_score(
25702592
`n_classes`, otherwise they're scalars.
25712593
"""
25722594
fmap = os.fspath(os.path.expanduser(fmap))
2573-
args = from_pystr_to_cstr(
2574-
json.dumps({"importance_type": importance_type, "feature_map": fmap})
2575-
)
25762595
features = ctypes.POINTER(ctypes.c_char_p)()
25772596
scores = ctypes.POINTER(ctypes.c_float)()
25782597
n_out_features = c_bst_ulong()
@@ -2582,7 +2601,7 @@ def get_score(
25822601
_check_call(
25832602
_LIB.XGBoosterFeatureScore(
25842603
self.handle,
2585-
args,
2604+
make_jcargs(importance_type=importance_type, feature_map=fmap),
25862605
ctypes.byref(n_out_features),
25872606
ctypes.byref(features),
25882607
ctypes.byref(out_dim),

python-package/xgboost/dask.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -573,6 +573,7 @@ def __init__(
573573
label_upper_bound: Optional[List[Any]] = None,
574574
feature_names: Optional[FeatureNames] = None,
575575
feature_types: Optional[Union[Any, List[Any]]] = None,
576+
feature_weights: Optional[Any] = None,
576577
) -> None:
577578
self._data = data
578579
self._label = label
@@ -583,6 +584,7 @@ def __init__(
583584
self._label_upper_bound = label_upper_bound
584585
self._feature_names = feature_names
585586
self._feature_types = feature_types
587+
self._feature_weights = feature_weights
586588

587589
assert isinstance(self._data, collections.abc.Sequence)
588590

@@ -633,6 +635,7 @@ def next(self, input_data: Callable) -> int:
633635
label_upper_bound=self._get("_label_upper_bound"),
634636
feature_names=feature_names,
635637
feature_types=self._feature_types,
638+
feature_weights=self._feature_weights,
636639
)
637640
self._iter += 1
638641
return 1
@@ -731,19 +734,21 @@ def _create_quantile_dmatrix(
731734
return d
732735

733736
unzipped_dict = _get_worker_parts(parts)
734-
it = DaskPartitionIter(**unzipped_dict)
737+
it = DaskPartitionIter(
738+
**unzipped_dict,
739+
feature_types=feature_types,
740+
feature_names=feature_names,
741+
feature_weights=feature_weights,
742+
)
735743

736744
dmatrix = QuantileDMatrix(
737745
it,
738746
missing=missing,
739-
feature_names=feature_names,
740-
feature_types=feature_types,
741747
nthread=nthread,
742748
max_bin=max_bin,
743749
ref=ref,
744750
enable_categorical=enable_categorical,
745751
)
746-
dmatrix.set_info(feature_weights=feature_weights)
747752
return dmatrix
748753

749754

python-package/xgboost/spark/core.py

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -747,6 +747,7 @@ def _fit(self, dataset):
747747
k: v for k, v in train_call_kwargs_params.items() if v is not None
748748
}
749749
dmatrix_kwargs = {k: v for k, v in dmatrix_kwargs.items() if v is not None}
750+
use_qdm = booster_params.get("tree_method") in ("hist", "gpu_hist")
750751

751752
def _train_booster(pandas_df_iter):
752753
"""Takes in an RDD partition and outputs a booster for that partition after
@@ -759,20 +760,17 @@ def _train_booster(pandas_df_iter):
759760
context.barrier()
760761

761762
gpu_id = None
763+
764+
if use_qdm and (booster_params.get("max_bin", None) is not None):
765+
dmatrix_kwargs["max_bin"] = booster_params["max_bin"]
766+
762767
if use_gpu:
763768
gpu_id = context.partitionId() if is_local else _get_gpu_id(context)
764769
booster_params["gpu_id"] = gpu_id
765770

766-
# max_bin is needed for qdm
767-
if (
768-
features_cols_names is not None
769-
and booster_params.get("max_bin", None) is not None
770-
):
771-
dmatrix_kwargs["max_bin"] = booster_params["max_bin"]
772-
773771
_rabit_args = {}
774772
if context.partitionId() == 0:
775-
get_logger("XGBoostPySpark").info(
773+
get_logger("XGBoostPySpark").debug(
776774
"booster params: %s\n"
777775
"train_call_kwargs_params: %s\n"
778776
"dmatrix_kwargs: %s",
@@ -791,6 +789,7 @@ def _train_booster(pandas_df_iter):
791789
pandas_df_iter,
792790
features_cols_names,
793791
gpu_id,
792+
use_qdm,
794793
dmatrix_kwargs,
795794
enable_sparse_data_optim=enable_sparse_data_optim,
796795
has_validation_col=has_validation_col,

0 commit comments

Comments
 (0)