Skip to content

Commit c214145

Browse files
authored
ENH: SPMD interface for IncrementalEmpiricalCovariance (#1941)
1 parent 8229ded commit c214145

File tree

11 files changed

+398
-15
lines changed

11 files changed

+398
-15
lines changed
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
# ===============================================================================
2+
# Copyright 2024 Intel Corporation
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
# ===============================================================================
16+
17+
import dpctl
18+
import dpctl.tensor as dpt
19+
import numpy as np
20+
from mpi4py import MPI
21+
22+
from sklearnex.spmd.covariance import IncrementalEmpiricalCovariance
23+
24+
25+
def get_local_data(data, comm):
26+
rank = comm.Get_rank()
27+
num_ranks = comm.Get_size()
28+
local_size = (data.shape[0] + num_ranks - 1) // num_ranks
29+
return data[rank * local_size : (rank + 1) * local_size]
30+
31+
32+
# We create SYCL queue and MPI communicator to perform computation on multiple GPUs
33+
34+
q = dpctl.SyclQueue("gpu")
35+
comm = MPI.COMM_WORLD
36+
37+
num_batches = 2
38+
seed = 77
39+
num_samples, num_features = 3000, 3
40+
drng = np.random.default_rng(seed)
41+
X = drng.random(size=(num_samples, num_features))
42+
43+
# Local data are obtained for each GPU and splitted into batches
44+
45+
X_local = get_local_data(X, comm)
46+
X_split = np.array_split(X_local, num_batches)
47+
48+
cov = IncrementalEmpiricalCovariance()
49+
50+
# Partial fit is called for each batch on each GPU
51+
52+
for i in range(num_batches):
53+
dpt_X = dpt.asarray(X_split[i], usm_type="device", sycl_queue=q)
54+
cov.partial_fit(dpt_X)
55+
56+
# Finalization of results is performed in a lazy way after requesting results like in non-SPMD incremental estimators.
57+
58+
print(f"Computed covariance values on rank {comm.Get_rank()}:\n", cov.covariance_)

onedal/covariance/covariance.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,9 +175,11 @@ ONEDAL_PY_INIT_MODULE(covariance) {
175175
using namespace dal::covariance;
176176

177177
auto sub = m.def_submodule("covariance");
178+
178179
#ifdef ONEDAL_DATA_PARALLEL_SPMD
179180
ONEDAL_PY_INSTANTIATE(init_compute_ops, sub, policy_spmd, task::compute);
180-
#else
181+
ONEDAL_PY_INSTANTIATE(init_finalize_compute_ops, sub, policy_spmd, task::compute);
182+
#else
181183
ONEDAL_PY_INSTANTIATE(init_compute_ops, sub, policy_list, task::compute);
182184
ONEDAL_PY_INSTANTIATE(init_partial_compute_ops, sub, policy_list, task::compute);
183185
ONEDAL_PY_INSTANTIATE(init_finalize_compute_ops, sub, policy_list, task::compute);

onedal/covariance/incremental_covariance.py

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,7 @@
1515
# ===============================================================================
1616
import numpy as np
1717

18-
from daal4py.sklearn._utils import daal_check_version, get_dtype, make2d
19-
from onedal import _backend
18+
from daal4py.sklearn._utils import daal_check_version, get_dtype
2019

2120
from ..datatypes import _convert_to_supported, from_table, to_table
2221
from ..utils import _check_array
@@ -86,10 +85,11 @@ def partial_fit(self, X, y=None, queue=None):
8685
"""
8786
X = _check_array(X, dtype=[np.float64, np.float32], ensure_2d=True)
8887

89-
if not hasattr(self, "_policy"):
90-
self._policy = self._get_policy(queue, X)
88+
self._queue = queue
9189

92-
X = _convert_to_supported(self._policy, X)
90+
policy = self._get_policy(queue, X)
91+
92+
X = _convert_to_supported(policy, X)
9393

9494
if not hasattr(self, "_dtype"):
9595
self._dtype = get_dtype(X)
@@ -100,7 +100,7 @@ def partial_fit(self, X, y=None, queue=None):
100100
"covariance",
101101
None,
102102
"partial_compute",
103-
self._policy,
103+
policy,
104104
params,
105105
self._partial_result,
106106
table_X,
@@ -114,19 +114,24 @@ def finalize_fit(self, queue=None):
114114
Parameters
115115
----------
116116
queue : dpctl.SyclQueue
117-
Not used here, added for API conformance
117+
If not None, use this queue for computations.
118118
119119
Returns
120120
-------
121121
self : object
122122
Returns the instance itself.
123123
"""
124124
params = self._get_onedal_params(self._dtype)
125+
if queue is not None:
126+
policy = self._get_policy(queue)
127+
else:
128+
policy = self._get_policy(self._queue)
129+
125130
result = self._get_backend(
126131
"covariance",
127132
None,
128133
"finalize_compute",
129-
self._policy,
134+
policy,
130135
params,
131136
self._partial_result,
132137
)

onedal/spmd/covariance/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,5 +15,6 @@
1515
# ==============================================================================
1616

1717
from .covariance import EmpiricalCovariance
18+
from .incremental_covariance import IncrementalEmpiricalCovariance
1819

19-
__all__ = ["EmpiricalCovariance"]
20+
__all__ = ["EmpiricalCovariance", "IncrementalEmpiricalCovariance"]
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
# ==============================================================================
2+
# Copyright 2024 Intel Corporation
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
# ==============================================================================
16+
17+
import numpy as np
18+
19+
from daal4py.sklearn._utils import get_dtype
20+
21+
from ...covariance import (
22+
IncrementalEmpiricalCovariance as base_IncrementalEmpiricalCovariance,
23+
)
24+
from ...datatypes import _convert_to_supported, to_table
25+
from ...utils import _check_array
26+
from .._base import BaseEstimatorSPMD
27+
28+
29+
class IncrementalEmpiricalCovariance(
30+
BaseEstimatorSPMD, base_IncrementalEmpiricalCovariance
31+
):
32+
def _reset(self):
33+
self._partial_result = super(
34+
base_IncrementalEmpiricalCovariance, self
35+
)._get_backend("covariance", None, "partial_compute_result")
36+
37+
def partial_fit(self, X, y=None, queue=None):
38+
"""
39+
Computes partial data for the covariance matrix
40+
from data batch X and saves it to `_partial_result`.
41+
42+
Parameters
43+
----------
44+
X : array-like of shape (n_samples, n_features)
45+
Training data batch, where `n_samples` is the number of samples
46+
in the batch, and `n_features` is the number of features.
47+
48+
y : Ignored
49+
Not used, present for API consistency by convention.
50+
51+
queue : dpctl.SyclQueue
52+
If not None, use this queue for computations.
53+
54+
Returns
55+
-------
56+
self : object
57+
Returns the instance itself.
58+
"""
59+
X = _check_array(X, dtype=[np.float64, np.float32], ensure_2d=True)
60+
61+
self._queue = queue
62+
63+
policy = super(base_IncrementalEmpiricalCovariance, self)._get_policy(queue, X)
64+
65+
X = _convert_to_supported(policy, X)
66+
67+
if not hasattr(self, "_dtype"):
68+
self._dtype = get_dtype(X)
69+
70+
params = self._get_onedal_params(self._dtype)
71+
table_X = to_table(X)
72+
self._partial_result = super(
73+
base_IncrementalEmpiricalCovariance, self
74+
)._get_backend(
75+
"covariance",
76+
None,
77+
"partial_compute",
78+
policy,
79+
params,
80+
self._partial_result,
81+
table_X,
82+
)

sklearnex/covariance/incremental_covariance.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -115,9 +115,9 @@ def _onedal_supported(self, method_name, *data):
115115
)
116116
return patching_status
117117

118-
def _onedal_finalize_fit(self):
118+
def _onedal_finalize_fit(self, queue=None):
119119
assert hasattr(self, "_onedal_estimator")
120-
self._onedal_estimator.finalize_fit()
120+
self._onedal_estimator.finalize_fit(queue=queue)
121121
self._need_to_finalize = False
122122

123123
if not daal_check_version((2024, "P", 400)) and self.assume_centered:
@@ -192,7 +192,7 @@ def _onedal_partial_fit(self, X, queue=None, check_input=True):
192192
else:
193193
self.n_samples_seen_ += X.shape[0]
194194

195-
self._onedal_estimator.partial_fit(X, queue)
195+
self._onedal_estimator.partial_fit(X, queue=queue)
196196
finally:
197197
self._need_to_finalize = True
198198

@@ -326,7 +326,7 @@ def _onedal_fit(self, X, queue=None):
326326
X_batch = X[batch]
327327
self._onedal_partial_fit(X_batch, queue=queue, check_input=False)
328328

329-
self._onedal_finalize_fit()
329+
self._onedal_finalize_fit(queue=queue)
330330

331331
return self
332332

sklearnex/covariance/tests/test_incremental_covariance.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
from sklearn.datasets import load_diabetes
2727
from sklearn.decomposition import PCA
2828

29+
from daal4py.sklearn._utils import daal_check_version
2930
from onedal.tests.utils._dataframes_support import (
3031
_as_numpy,
3132
_convert_to_dataframe,
@@ -37,6 +38,11 @@
3738
@pytest.mark.parametrize("dtype", [np.float32, np.float64])
3839
@pytest.mark.parametrize("assume_centered", [True, False])
3940
def test_sklearnex_partial_fit_on_gold_data(dataframe, queue, dtype, assume_centered):
41+
is_gpu = queue is not None and queue.sycl_device.is_gpu
42+
if assume_centered and is_gpu and not daal_check_version((2025, "P", 0)):
43+
pytest.skip(
44+
"Due to a bug on oneDAL side, means are not set to zero when assume_centered=True"
45+
)
4046
from sklearnex.covariance import IncrementalEmpiricalCovariance
4147

4248
X = np.array([[0, 1], [0, 1]])
@@ -143,6 +149,11 @@ def test_sklearnex_partial_fit_on_random_data(
143149
def test_sklearnex_fit_on_random_data(
144150
dataframe, queue, num_batches, row_count, column_count, dtype, assume_centered
145151
):
152+
is_gpu = queue is not None and queue.sycl_device.is_gpu
153+
if assume_centered and is_gpu and not daal_check_version((2025, "P", 0)):
154+
pytest.skip(
155+
"Due to a bug on oneDAL side, means are not set to zero when assume_centered=True"
156+
)
146157
from sklearnex.covariance import IncrementalEmpiricalCovariance
147158

148159
seed = 77

sklearnex/spmd/covariance/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,5 +15,6 @@
1515
# ==============================================================================
1616

1717
from .covariance import EmpiricalCovariance
18+
from .incremental_covariance import IncrementalEmpiricalCovariance
1819

19-
__all__ = ["EmpiricalCovariance"]
20+
__all__ = ["EmpiricalCovariance", "IncrementalEmpiricalCovariance"]
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
# ==============================================================================
2+
# Copyright 2024 Intel Corporation
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
# ==============================================================================
16+
17+
from onedal.spmd.covariance import (
18+
IncrementalEmpiricalCovariance as onedalSPMD_IncrementalEmpiricalCovariance,
19+
)
20+
21+
from ...covariance import (
22+
IncrementalEmpiricalCovariance as base_IncrementalEmpiricalCovariance,
23+
)
24+
25+
26+
class IncrementalEmpiricalCovariance(base_IncrementalEmpiricalCovariance):
27+
"""
28+
Incremental distributed estimator for covariance.
29+
Allows to distributely compute empirical covariance estimated by maximum
30+
likelihood method if data are splitted into batches.
31+
32+
API is the same as for `sklearnex.covariance.IncrementalEmpiricalCovariance`
33+
"""
34+
35+
_onedal_incremental_covariance = staticmethod(
36+
onedalSPMD_IncrementalEmpiricalCovariance
37+
)

0 commit comments

Comments
 (0)