Skip to content

Commit 97f0d47

Browse files
authored
KNN SPMD python interfaces (#1208)
* initial draft of python knn SPMD interfaces * conditionally adding responses to regression * forgot this * trying moving get_queue * reverting last and restoring cpp to test * trying just infer * resolved issue and examples run * removing commented lines * ci update * reverting intel-dpcpp-cpp-compiler version change * reverting * reverting several commits... * addressing comments * flake8 * addressing comments * flake8 * specifying onedal version for spmd neighbors setup * temporary commit for debugging external CI fails * printing import error * adding conditional to neighbors.cp * adding version import * removing all logging and re-adding onedal version conditional in setup * addressing last comments * removing comm size warnings and cleaning up examples * example rename * update run_examples.py after rebase * flake8 * addressing non gpu spmd call, better multiline string
1 parent 2405630 commit 97f0d47

File tree

13 files changed

+283
-10
lines changed

13 files changed

+283
-10
lines changed
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
#===============================================================================
2+
# Copyright 2023 Intel Corporation
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
#===============================================================================
16+
17+
import numpy as np
18+
from sklearn.metrics import accuracy_score
19+
from warnings import warn
20+
from mpi4py import MPI
21+
import dpctl
22+
from sklearnex.spmd.neighbors import KNeighborsClassifier
23+
24+
25+
def generate_X_y(par, seed):
26+
ns, nf = par['ns'], par['nf']
27+
28+
drng = np.random.default_rng(seed)
29+
data = drng.uniform(-1, 1, size=(ns, nf))
30+
resp = (data > 0) @ (2 ** np.arange(nf))
31+
32+
return data, resp
33+
34+
35+
comm = MPI.COMM_WORLD
36+
rank = comm.Get_rank()
37+
size = comm.Get_size()
38+
39+
if dpctl.has_gpu_devices:
40+
q = dpctl.SyclQueue("gpu")
41+
else:
42+
raise RuntimeError('GPU devices unavailable. Currently, '
43+
'SPMD execution mode is implemented only for this device type.')
44+
45+
params_train = {'ns': 100000, 'nf': 8}
46+
params_test = {'ns': 100, 'nf': 8}
47+
48+
X_train, y_train = generate_X_y(params_train, rank)
49+
X_test, y_test = generate_X_y(params_test, rank + 99)
50+
51+
model_spmd = KNeighborsClassifier(algorithm='brute',
52+
n_neighbors=20,
53+
weights='uniform',
54+
p=2,
55+
metric='minkowski')
56+
model_spmd.fit(X_train, y_train, queue=q)
57+
58+
y_predict = model_spmd.predict(X_test, queue=q)
59+
60+
print("Brute Force Distributed kNN classification results:")
61+
print("Ground truth (first 5 observations on rank {}):\n{}".format(rank, y_test[:5]))
62+
print("Classification results (first 5 observations on rank {}):\n{}"
63+
.format(rank, y_predict[:5]))
64+
print("Accuracy for entire rank {} (256 classes): {}\n"
65+
.format(rank, accuracy_score(y_test, y_predict)))
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
#===============================================================================
2+
# Copyright 2023 Intel Corporation
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
#===============================================================================
16+
17+
import numpy as np
18+
from sklearn.metrics import mean_squared_error
19+
from warnings import warn
20+
from mpi4py import MPI
21+
import dpctl
22+
from numpy.testing import assert_allclose
23+
from sklearnex.spmd.neighbors import KNeighborsRegressor
24+
25+
26+
def generate_X_y(par, coef_seed, data_seed):
27+
ns, nf = par['ns'], par['nf']
28+
29+
crng = np.random.default_rng(coef_seed)
30+
coef = crng.uniform(-10, 10, size=(nf,))
31+
32+
drng = np.random.default_rng(data_seed)
33+
data = drng.uniform(-100, 100, size=(ns, nf))
34+
resp = data @ coef
35+
36+
return data, resp, coef
37+
38+
39+
comm = MPI.COMM_WORLD
40+
rank = comm.Get_rank()
41+
size = comm.Get_size()
42+
43+
if dpctl.has_gpu_devices:
44+
q = dpctl.SyclQueue("gpu")
45+
else:
46+
raise RuntimeError('GPU devices unavailable. Currently, '
47+
'SPMD execution mode is implemented only for this device type.')
48+
49+
params_train = {'ns': 1000000, 'nf': 3}
50+
params_test = {'ns': 100, 'nf': 3}
51+
52+
X_train, y_train, coef_train = generate_X_y(params_train, 10, rank)
53+
X_test, y_test, coef_test = generate_X_y(params_test, 10, rank + 99)
54+
55+
assert_allclose(coef_train, coef_test)
56+
57+
model_spmd = KNeighborsRegressor(algorithm='brute',
58+
n_neighbors=5,
59+
weights='uniform',
60+
p=2,
61+
metric='minkowski')
62+
model_spmd.fit(X_train, y_train, queue=q)
63+
64+
y_predict = model_spmd.predict(X_test, queue=q)
65+
66+
print("Brute Force Distributed kNN regression results:")
67+
print("Ground truth (first 5 observations on rank {}):\n{}".format(rank, y_test[:5]))
68+
print("Regression results (first 5 observations on rank {}):\n{}"
69+
.format(rank, y_predict[:5]))
70+
print("RMSE for entire rank {}: {}\n"
71+
.format(rank, mean_squared_error(y_test, y_predict, squared=False)))

onedal/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,4 +49,5 @@
4949
__all__ += ['basic_statistics', 'linear_model']
5050

5151
if _is_dpc_backend:
52-
__all__ += ['spmd.basic_statistics', 'spmd.decomposition', 'spmd.linear_model',]
52+
__all__ += ['spmd.basic_statistics', 'spmd.decomposition',
53+
'spmd.linear_model', 'spmd.neighbors']

onedal/neighbors/neighbors.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#include "oneapi/dal/algo/knn.hpp"
1818

1919
#include "onedal/common.hpp"
20+
#include "onedal/version.hpp"
2021
#include "onedal/primitives/pairwise_distances.hpp"
2122
#include <regex>
2223

@@ -313,8 +314,13 @@ ONEDAL_PY_INIT_MODULE(neighbors) {
313314
using task_list = types<task::classification, task::regression, task::search>;
314315
auto sub = m.def_submodule("neighbors");
315316

317+
#if defined(ONEDAL_DATA_PARALLEL_SPMD) && defined(ONEDAL_VERSION) && ONEDAL_VERSION >= 20230100
318+
ONEDAL_PY_INSTANTIATE(init_train_ops, sub, policy_list_spmd, task_list);
319+
ONEDAL_PY_INSTANTIATE(init_infer_ops, sub, policy_list_spmd, task_list);
320+
#else // defined(ONEDAL_DATA_PARALLEL_SPMD) && defined(ONEDAL_VERSION) && ONEDAL_VERSION >= 20230100
316321
ONEDAL_PY_INSTANTIATE(init_train_ops, sub, policy_list, task_list);
317322
ONEDAL_PY_INSTANTIATE(init_infer_ops, sub, policy_list, task_list);
323+
#endif // defined(ONEDAL_DATA_PARALLEL_SPMD) && defined(ONEDAL_VERSION) && ONEDAL_VERSION >= 20230100
318324

319325
ONEDAL_PY_INSTANTIATE(init_model, sub, task_list);
320326
ONEDAL_PY_INSTANTIATE(init_train_result, sub, task_list);

onedal/neighbors/neighbors.py

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@
4444

4545

4646
class NeighborsCommonBase(metaclass=ABCMeta):
47+
def _get_policy(self, queue, *data):
48+
return _get_policy(queue, *data)
49+
4750
def _parse_auto_method(self, method, n_samples, n_features):
4851
result_method = method
4952

@@ -402,7 +405,7 @@ def _onedal_fit(self, X, y, queue):
402405

403406
return train_alg(**params).compute(X, y).model
404407

405-
policy = _get_policy(queue, X, y)
408+
policy = self._get_policy(queue, X, y)
406409
X, y = _convert_to_supported(policy, X, y)
407410
params = self._get_onedal_params(X, y)
408411
train_alg = _backend.neighbors.classification.train(policy, params,
@@ -421,7 +424,7 @@ def _onedal_predict(self, model, X, params, queue):
421424

422425
return predict_alg(**params).compute(X, model)
423426

424-
policy = _get_policy(queue, X)
427+
policy = self._get_policy(queue, X)
425428
X = _convert_to_supported(policy, X)
426429
if hasattr(self, '_onedal_model'):
427430
model = self._onedal_model
@@ -549,7 +552,8 @@ def _onedal_fit(self, X, y, queue):
549552

550553
return train_alg(**params).compute(X, y).model
551554

552-
policy = _get_policy(queue, X, y)
555+
policy = self._get_policy(queue, X, y)
556+
X, y = _convert_to_supported(policy, X, y)
553557
params = self._get_onedal_params(X, y)
554558
train_alg_regr = _backend.neighbors.regression.train
555559
train_alg_srch = _backend.neighbors.search.train
@@ -568,7 +572,8 @@ def _onedal_predict(self, model, X, params, queue):
568572

569573
return predict_alg(**params).compute(X, model)
570574

571-
policy = _get_policy(queue, X)
575+
policy = self._get_policy(queue, X)
576+
X = _convert_to_supported(policy, X)
572577
backend = _backend.neighbors.regression if gpu_device \
573578
else _backend.neighbors.search
574579

@@ -678,7 +683,8 @@ def _onedal_fit(self, X, y, queue):
678683

679684
return train_alg(**params).compute(X, y).model
680685

681-
policy = _get_policy(queue, X, y)
686+
policy = self._get_policy(queue, X, y)
687+
X, y = _convert_to_supported(policy, X, y)
682688
params = self._get_onedal_params(X, y)
683689
train_alg = _backend.neighbors.search.train(policy, params,
684690
to_table(X))
@@ -696,7 +702,8 @@ def _onedal_predict(self, model, X, params, queue):
696702

697703
return predict_alg(**params).compute(X, model)
698704

699-
policy = _get_policy(queue, X)
705+
policy = self._get_policy(queue, X)
706+
X = _convert_to_supported(policy, X)
700707
if hasattr(self, '_onedal_model'):
701708
model = self._onedal_model
702709
else:

onedal/spmd/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,4 @@
1414
# limitations under the License.
1515
#===============================================================================
1616

17-
__all__ = ['basic_statistics', 'decomposition', 'linear_model']
17+
__all__ = ['basic_statistics', 'decomposition', 'linear_model', 'neighbors']

onedal/spmd/neighbors/__init__.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
#===============================================================================
2+
# Copyright 2023 Intel Corporation
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
#===============================================================================
16+
17+
from .neighbors import KNeighborsClassifier, KNeighborsRegressor, NearestNeighbors
18+
19+
__all__ = ['KNeighborsClassifier', 'KNeighborsRegressor', 'NearestNeighbors']

onedal/spmd/neighbors/neighbors.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
#===============================================================================
2+
# Copyright 2023 Intel Corporation
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
#===============================================================================
16+
17+
from abc import ABC
18+
from ...common._spmd_policy import _get_spmd_policy
19+
from onedal.neighbors import KNeighborsClassifier as KNeighborsClassifier_Batch
20+
from onedal.neighbors import KNeighborsRegressor as KNeighborsRegressor_Batch
21+
22+
23+
class NeighborsCommonBaseSPMD(ABC):
24+
def _get_policy(self, queue, *data):
25+
return _get_spmd_policy(queue)
26+
27+
28+
class KNeighborsClassifier(NeighborsCommonBaseSPMD, KNeighborsClassifier_Batch):
29+
def predict_proba(self, X, queue=None):
30+
raise NotImplementedError("predict_proba not supported in distributed mode.")
31+
32+
33+
class KNeighborsRegressor(NeighborsCommonBaseSPMD, KNeighborsRegressor_Batch):
34+
def fit(self, X, y, queue=None):
35+
if queue is not None and queue.sycl_device.is_gpu:
36+
return super()._fit(X, y, queue=queue)
37+
else:
38+
raise ValueError('SPMD version of kNN is not implemented for '
39+
'CPU. Consider running on it on GPU.')
40+
41+
def predict(self, X, queue=None):
42+
return self._predict_gpu(X, queue=queue)
43+
44+
def _get_onedal_params(self, X, y=None):
45+
params = super()._get_onedal_params(X, y)
46+
if 'responses' not in params['result_option']:
47+
params['result_option'] += '|responses'
48+
return params
49+
50+
51+
class NearestNeighbors(NeighborsCommonBaseSPMD):
52+
pass

setup.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -478,7 +478,9 @@ def run(self):
478478
'onedal.spmd.basic_statistics',
479479
'onedal.spmd.decomposition',
480480
'onedal.spmd.linear_model'
481-
] if build_distribute else [])),
481+
] + (['onedal.spmd.neighbors']
482+
if ONEDAL_VERSION >= 20230100 else [])
483+
if build_distribute else [])),
482484
package_data={
483485
'daal4py.oneapi': [
484486
'liboneapi_backend.so',

sklearnex/spmd/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,4 @@
1414
# limitations under the License.
1515
#===============================================================================
1616

17-
__all__ = ['basic_statistics', 'decomposition', 'linear_model']
17+
__all__ = ['basic_statistics', 'decomposition', 'linear_model', 'neighbors']

0 commit comments

Comments
 (0)