Skip to content

Commit 9f63db2

Browse files
authored
ENH: SPMD interface for IncrementalPCA (#1979)
1 parent d9f46b7 commit 9f63db2

File tree

8 files changed

+458
-27
lines changed

8 files changed

+458
-27
lines changed

onedal/decomposition/incremental_pca.py

Lines changed: 29 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -96,13 +96,14 @@ def __init__(
9696
self.method = method
9797
self.is_deterministic = is_deterministic
9898
self.whiten = whiten
99-
module = self._get_backend("decomposition", "dim_reduction")
100-
self._partial_result = module.partial_train_result()
99+
self._reset()
101100

102101
def _reset(self):
103-
module = self._get_backend("decomposition", "dim_reduction")
104-
del self.components_
105-
self._partial_result = module.partial_train_result()
102+
self._partial_result = self._get_backend(
103+
"decomposition", "dim_reduction", "partial_train_result"
104+
)
105+
if hasattr(self, "components_"):
106+
del self.components_
106107

107108
def partial_fit(self, X, queue):
108109
"""Incremental fit with X. All of X is processed as a single batch.
@@ -116,9 +117,6 @@ def partial_fit(self, X, queue):
116117
y : Ignored
117118
Not used, present for API consistency by convention.
118119
119-
check_input : bool, default=True
120-
Run check_array on X.
121-
122120
Returns
123121
-------
124122
self : object
@@ -143,20 +141,24 @@ def partial_fit(self, X, queue):
143141
else:
144142
self.n_components_ = self.n_components
145143

146-
module = self._get_backend("decomposition", "dim_reduction")
147-
148-
if not hasattr(self, "_policy"):
149-
self._policy = self._get_policy(queue, X)
144+
self._queue = queue
150145

151-
X = _convert_to_supported(self._policy, X)
146+
policy = self._get_policy(queue, X)
147+
X = _convert_to_supported(policy, X)
152148

153149
if not hasattr(self, "_dtype"):
154150
self._dtype = get_dtype(X)
155151
self._params = self._get_onedal_params(X)
156152

157153
X_table = to_table(X)
158-
self._partial_result = module.partial_train(
159-
self._policy, self._params, self._partial_result, X_table
154+
self._partial_result = self._get_backend(
155+
"decomposition",
156+
"dim_reduction",
157+
"partial_train",
158+
policy,
159+
self._params,
160+
self._partial_result,
161+
X_table,
160162
)
161163
return self
162164

@@ -175,8 +177,18 @@ def finalize_fit(self, queue=None):
175177
self : object
176178
Returns the instance itself.
177179
"""
178-
module = self._get_backend("decomposition", "dim_reduction")
179-
result = module.finalize_train(self._policy, self._params, self._partial_result)
180+
if queue is not None:
181+
policy = self._get_policy(queue)
182+
else:
183+
policy = self._get_policy(self._queue)
184+
result = self._get_backend(
185+
"decomposition",
186+
"dim_reduction",
187+
"finalize_train",
188+
policy,
189+
self._params,
190+
self._partial_result,
191+
)
180192
self.mean_ = from_table(result.means).ravel()
181193
self.var_ = from_table(result.variances).ravel()
182194
self.components_ = from_table(result.eigenvectors)

onedal/decomposition/pca.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,7 @@ ONEDAL_PY_INIT_MODULE(decomposition) {
207207
auto sub = m.def_submodule("decomposition");
208208
#ifdef ONEDAL_DATA_PARALLEL_SPMD
209209
ONEDAL_PY_INSTANTIATE(init_train_ops, sub, policy_spmd, task_list);
210+
ONEDAL_PY_INSTANTIATE(init_finalize_train_ops, sub, policy_spmd, task_list);
210211
#else
211212
ONEDAL_PY_INSTANTIATE(init_train_ops, sub, policy_list, task_list);
212213
ONEDAL_PY_INSTANTIATE(init_infer_ops, sub, policy_list, task_list);

onedal/spmd/decomposition/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
# limitations under the License.
1515
# ==============================================================================
1616

17+
from .incremental_pca import IncrementalPCA
1718
from .pca import PCA
1819

19-
__all__ = ["PCA"]
20+
__all__ = ["IncrementalPCA", "PCA"]
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
# ==============================================================================
2+
# Copyright 2024 Intel Corporation
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
# ==============================================================================
16+
17+
from daal4py.sklearn._utils import get_dtype
18+
19+
from ...datatypes import _convert_to_supported, from_table, to_table
20+
from ...decomposition import IncrementalPCA as base_IncrementalPCA
21+
from ...utils import _check_array
22+
from .._base import BaseEstimatorSPMD
23+
24+
25+
class IncrementalPCA(BaseEstimatorSPMD, base_IncrementalPCA):
26+
"""
27+
Distributed incremental estimator for PCA based on oneDAL implementation.
28+
Allows for distributed PCA computation if data is split into batches.
29+
30+
API is the same as for `onedal.decomposition.IncrementalPCA`
31+
"""
32+
33+
def _reset(self):
34+
self._partial_result = super(base_IncrementalPCA, self)._get_backend(
35+
"decomposition", "dim_reduction", "partial_train_result"
36+
)
37+
if hasattr(self, "components_"):
38+
del self.components_
39+
40+
def partial_fit(self, X, y=None, queue=None):
41+
"""Incremental fit with X. All of X is processed as a single batch.
42+
43+
Parameters
44+
----------
45+
X : array-like of shape (n_samples, n_features)
46+
Training data, where `n_samples` is the number of samples and
47+
`n_features` is the number of features.
48+
49+
y : Ignored
50+
Not used, present for API consistency by convention.
51+
52+
Returns
53+
-------
54+
self : object
55+
Returns the instance itself.
56+
"""
57+
X = _check_array(X)
58+
n_samples, n_features = X.shape
59+
60+
first_pass = not hasattr(self, "components_")
61+
if first_pass:
62+
self.components_ = None
63+
self.n_samples_seen_ = n_samples
64+
self.n_features_in_ = n_features
65+
else:
66+
self.n_samples_seen_ += n_samples
67+
68+
if self.n_components is None:
69+
if self.components_ is None:
70+
self.n_components_ = min(n_samples, n_features)
71+
else:
72+
self.n_components_ = self.components_.shape[0]
73+
else:
74+
self.n_components_ = self.n_components
75+
76+
self._queue = queue
77+
78+
policy = super(base_IncrementalPCA, self)._get_policy(queue, X)
79+
X = _convert_to_supported(policy, X)
80+
81+
if not hasattr(self, "_dtype"):
82+
self._dtype = get_dtype(X)
83+
self._params = self._get_onedal_params(X)
84+
85+
X_table = to_table(X)
86+
self._partial_result = super(base_IncrementalPCA, self)._get_backend(
87+
"decomposition",
88+
"dim_reduction",
89+
"partial_train",
90+
policy,
91+
self._params,
92+
self._partial_result,
93+
X_table,
94+
)
95+
return self
96+
97+
def _create_model(self):
98+
m = super(base_IncrementalPCA, self)._get_backend(
99+
"decomposition", "dim_reduction", "model"
100+
)
101+
m.eigenvectors = to_table(self.components_)
102+
m.means = to_table(self.mean_)
103+
if self.whiten:
104+
m.eigenvalues = to_table(self.explained_variance_)
105+
self._onedal_model = m
106+
return m
107+
108+
def predict(self, X, queue=None):
109+
policy = super(base_IncrementalPCA, self)._get_policy(queue, X)
110+
model = self._create_model()
111+
X = _convert_to_supported(policy, X)
112+
params = self._get_onedal_params(X, stage="predict")
113+
114+
result = super(base_IncrementalPCA, self)._get_backend(
115+
"decomposition", "dim_reduction", "infer", policy, params, model, to_table(X)
116+
)
117+
return from_table(result.transformed_data)

sklearnex/preview/decomposition/incremental_pca.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ def _onedal_fit_transform(self, X, queue=None):
6161
return self._onedal_transform(X, queue)
6262

6363
def _onedal_partial_fit(self, X, check_input=True, queue=None):
64-
first_pass = not hasattr(self, "components_")
64+
first_pass = not hasattr(self, "_onedal_estimator")
6565

6666
if check_input:
6767
if sklearn_check_version("1.0"):
@@ -78,10 +78,10 @@ def _onedal_partial_fit(self, X, check_input=True, queue=None):
7878
n_samples, n_features = X.shape
7979

8080
if self.n_components is None:
81-
if not hasattr(self, "components_"):
81+
if not hasattr(self, "_components_shape"):
8282
self.n_components_ = min(n_samples, n_features)
83-
else:
84-
self.n_components_ = self.components_.shape[0]
83+
self._components_shape = self.n_components_
84+
8585
elif not self.n_components <= n_features:
8686
raise ValueError(
8787
"n_components=%r invalid for n_features=%d, need "
@@ -106,12 +106,12 @@ def _onedal_partial_fit(self, X, check_input=True, queue=None):
106106

107107
if not hasattr(self, "_onedal_estimator"):
108108
self._onedal_estimator = self._onedal_incremental_pca(**onedal_params)
109-
self._onedal_estimator.partial_fit(X, queue)
109+
self._onedal_estimator.partial_fit(X, queue=queue)
110110
self._need_to_finalize = True
111111

112-
def _onedal_finalize_fit(self):
112+
def _onedal_finalize_fit(self, queue=None):
113113
assert hasattr(self, "_onedal_estimator")
114-
self._onedal_estimator.finalize_fit()
114+
self._onedal_estimator.finalize_fit(queue=queue)
115115
self._need_to_finalize = False
116116

117117
def _onedal_fit(self, X, queue=None):
@@ -142,7 +142,7 @@ def _onedal_fit(self, X, queue=None):
142142
X_batch = X[batch]
143143
self._onedal_partial_fit(X_batch, queue=queue)
144144

145-
self._onedal_finalize_fit()
145+
self._onedal_finalize_fit(queue=queue)
146146

147147
return self
148148

sklearnex/spmd/decomposition/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
# limitations under the License.
1515
# ==============================================================================
1616

17+
from .incremental_pca import IncrementalPCA
1718
from .pca import PCA
1819

19-
__all__ = ["PCA"]
20+
__all__ = ["IncrementalPCA", "PCA"]
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
# ==============================================================================
2+
# Copyright 2024 Intel Corporation
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
# ==============================================================================
16+
17+
from onedal.spmd.decomposition import IncrementalPCA as onedalSPMD_IncrementalPCA
18+
19+
from ...preview.decomposition import IncrementalPCA as base_IncrementalPCA
20+
21+
22+
class IncrementalPCA(base_IncrementalPCA):
23+
"""
24+
Distributed incremental estimator for PCA based on sklearnex implementation.
25+
Allows for distributed PCA computation if data is split into batches.
26+
27+
API is the same as for `sklearnex.decomposition.IncrementalPCA`
28+
"""
29+
30+
_onedal_incremental_pca = staticmethod(onedalSPMD_IncrementalPCA)

0 commit comments

Comments
 (0)