From 7213c80f57dc2d64ca6a0e884f4182d294987729 Mon Sep 17 00:00:00 2001 From: ethanglaser Date: Tue, 17 Jun 2025 12:12:34 -0700 Subject: [PATCH 01/10] Add NearestNeighbors SPMD API --- onedal/neighbors/neighbors.py | 5 +- onedal/spmd/neighbors/__init__.py | 4 +- onedal/spmd/neighbors/neighbors.py | 18 ++++ sklearnex/spmd/neighbors/__init__.py | 4 +- .../neighbors/tests/test_neighbors_spmd.py | 83 +++++++++++++++++++ 5 files changed, 107 insertions(+), 7 deletions(-) diff --git a/onedal/neighbors/neighbors.py b/onedal/neighbors/neighbors.py index 64ffb7c5ca..c688ae826a 100755 --- a/onedal/neighbors/neighbors.py +++ b/onedal/neighbors/neighbors.py @@ -730,7 +730,6 @@ def __init__( self, n_neighbors=5, *, - weights="uniform", algorithm="auto", p=2, metric="minkowski", @@ -745,7 +744,7 @@ def __init__( metric_params=metric_params, **kwargs, ) - self.weights = weights + self.requires_y = False @bind_default_backend("neighbors.search") def train(self, *args, **kwargs): ... @@ -792,7 +791,7 @@ def _onedal_predict(self, model, X, params): return self.infer(params, model, X) @supports_queue - def fit(self, X, y, queue=None): + def fit(self, X, y=None, queue=None): return self._fit(X, y) @supports_queue diff --git a/onedal/spmd/neighbors/__init__.py b/onedal/spmd/neighbors/__init__.py index 1aa6247605..8036511d9f 100644 --- a/onedal/spmd/neighbors/__init__.py +++ b/onedal/spmd/neighbors/__init__.py @@ -14,6 +14,6 @@ # limitations under the License. # ============================================================================== -from .neighbors import KNeighborsClassifier, KNeighborsRegressor +from .neighbors import KNeighborsClassifier, KNeighborsRegressor, NearestNeighbors -__all__ = ["KNeighborsClassifier", "KNeighborsRegressor"] +__all__ = ["KNeighborsClassifier", "KNeighborsRegressor", "NearestNeighbors"] diff --git a/onedal/spmd/neighbors/neighbors.py b/onedal/spmd/neighbors/neighbors.py index b9f5f98d18..73beff141e 100644 --- a/onedal/spmd/neighbors/neighbors.py +++ b/onedal/spmd/neighbors/neighbors.py @@ -18,6 +18,7 @@ from ...common._backend import bind_spmd_backend from ...neighbors import KNeighborsClassifier as KNeighborsClassifier_Batch from ...neighbors import KNeighborsRegressor as KNeighborsRegressor_Batch +from ...neighbors import NearestNeighbors as NearestNeighbors_Batch class KNeighborsClassifier(KNeighborsClassifier_Batch): @@ -84,3 +85,20 @@ def _get_onedal_params(self, X, y=None): if "responses" not in params["result_option"]: params["result_option"] += "|responses" return params + + +class NearestNeighbors(NearestNeighbors_Batch): + + @bind_spmd_backend("neighbors.search") + def train(self, *args, **kwargs): ... + + @bind_spmd_backend("neighbors.search") + def infer(self, *args, **kwargs): ... + + @support_input_format + def fit(self, X, y=None, queue=None): + return super().fit(X, y, queue=queue) + + @support_input_format + def kneighbors(self, X=None, n_neighbors=None, return_distance=True, queue=None): + return super().kneighbors(X, n_neighbors, return_distance, queue=queue) diff --git a/sklearnex/spmd/neighbors/__init__.py b/sklearnex/spmd/neighbors/__init__.py index 7b1f9f646c..0dae450769 100644 --- a/sklearnex/spmd/neighbors/__init__.py +++ b/sklearnex/spmd/neighbors/__init__.py @@ -14,6 +14,6 @@ # limitations under the License. # ============================================================================== -from onedal.spmd.neighbors import KNeighborsClassifier, KNeighborsRegressor +from onedal.spmd.neighbors import KNeighborsClassifier, KNeighborsRegressor, NearestNeighbors -__all__ = ["KNeighborsClassifier", "KNeighborsRegressor"] +__all__ = ["KNeighborsClassifier", "KNeighborsRegressor", "NearestNeighbors"] diff --git a/sklearnex/spmd/neighbors/tests/test_neighbors_spmd.py b/sklearnex/spmd/neighbors/tests/test_neighbors_spmd.py index d362cf0bac..772192a119 100644 --- a/sklearnex/spmd/neighbors/tests/test_neighbors_spmd.py +++ b/sklearnex/spmd/neighbors/tests/test_neighbors_spmd.py @@ -27,6 +27,7 @@ _assert_unordered_allclose, _generate_classification_data, _generate_regression_data, + _generate_statistic_data, _get_local_tensor, _mpi_libs_and_gpu_available, _spmd_assert_allclose, @@ -308,3 +309,85 @@ def test_knnreg_spmd_synthetic( spmd_dists, batch_dists, localize=True, rtol=tol, atol=tol ) _spmd_assert_allclose(spmd_result, batch_result, rtol=tol, atol=tol) + + +@pytest.mark.skipif( + not _mpi_libs_and_gpu_available, + reason="GPU device and MPI libs required for test", +) +@pytest.mark.parametrize( + "dataframe,queue", + get_dataframes_and_queues(dataframe_filter_="dpnp,dpctl", device_filter_="gpu"), +) +@pytest.mark.mpi +def test_knnsearch_spmd_gold(dataframe, queue): + # Import spmd and batch algo + from sklearnex.neighbors import NearestNeighbors as NearestNeighbors_Batch + from sklearnex.spmd.neighbors import NearestNeighbors as NearestNeighbors_SPMD + + # Create gold data and convert to dataframe + X_train = np.array( + [[-1, -1], [-2, -1], [-3, -2], [1, 1], [2, 1], [3, 2]] + ) + local_dpt_X_train = _convert_to_dataframe( + _get_local_tensor(X_train), sycl_queue=queue, target_df=dataframe + ) + + # Ensure predictions of batch algo match spmd + spmd_model = NearestNeighbors_SPMD(n_neighbors=2, algorithm="brute").fit(local_dpt_X_train) + batch_model = NearestNeighbors_Batch(n_neighbors=2, algorithm="brute").fit(X_train) + spmd_dists, spmd_indcs = spmd_model.kneighbors(local_dpt_X_train) + batch_dists, batch_indcs = batch_model.kneighbors(X_train) + + _assert_unordered_allclose(spmd_indcs, batch_indcs, localize=True) + _assert_unordered_allclose(spmd_dists, batch_dists, localize=True) + + +@pytest.mark.skipif( + not _mpi_libs_and_gpu_available, + reason="GPU device and MPI libs required for test", +) +@pytest.mark.parametrize("dimensions", [{"n": 100, "m": 10, "k": 2}, {"n": 100000, "m": 100, "k": 100}]) +@pytest.mark.parametrize( + "dataframe,queue", + get_dataframes_and_queues(dataframe_filter_="dpnp,dpctl", device_filter_="gpu"), +) +@pytest.mark.parametrize("dtype", [np.float32, np.float64]) +@pytest.mark.mpi +def test_knnsearch_spmd_synthetic( + dimensions, + dataframe, + queue, + dtype, +): + if dimensions["n"] > 10000 and dtype == np.float32: + pytest.skip("Skipping large float32 test due to expected precision issues") + + # Import spmd and batch algo + from sklearnex.neighbors import NearestNeighbors as NearestNeighbors_Batch + from sklearnex.spmd.neighbors import NearestNeighbors as NearestNeighbors_SPMD + + # Generate data and convert to dataframe + X_train = _generate_statistic_data( + dimensions["n"], dimensions["m"], dtype=dtype + ) + + local_dpt_X_train = _convert_to_dataframe( + _get_local_tensor(X_train), sycl_queue=queue, target_df=dataframe + ) + + # Ensure search results of batch algo match spmd + spmd_model = NearestNeighbors_SPMD( + n_neighbors=dimensions["k"], algorithm="brute" + ).fit(local_dpt_X_train) + batch_model = NearestNeighbors_Batch( + n_neighbors=dimensions["k"], algorithm="brute" + ).fit(X_train) + spmd_dists, spmd_indcs = spmd_model.kneighbors(local_dpt_X_train) + batch_dists, batch_indcs = batch_model.kneighbors(X_train) + + tol = 0.005 if dtype == np.float32 else 1e-6 + _assert_unordered_allclose(spmd_indcs, batch_indcs, localize=True) + _assert_unordered_allclose( + spmd_dists, batch_dists, localize=True, rtol=tol, atol=tol + ) From 7aea5a647e99e0f259a791140ca167fc0d24d0e6 Mon Sep 17 00:00:00 2001 From: ethanglaser Date: Tue, 17 Jun 2025 12:14:19 -0700 Subject: [PATCH 02/10] black format --- sklearnex/spmd/neighbors/__init__.py | 6 +++++- .../neighbors/tests/test_neighbors_spmd.py | 20 +++++++++---------- 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/sklearnex/spmd/neighbors/__init__.py b/sklearnex/spmd/neighbors/__init__.py index 0dae450769..44cb849591 100644 --- a/sklearnex/spmd/neighbors/__init__.py +++ b/sklearnex/spmd/neighbors/__init__.py @@ -14,6 +14,10 @@ # limitations under the License. # ============================================================================== -from onedal.spmd.neighbors import KNeighborsClassifier, KNeighborsRegressor, NearestNeighbors +from onedal.spmd.neighbors import ( + KNeighborsClassifier, + KNeighborsRegressor, + NearestNeighbors, +) __all__ = ["KNeighborsClassifier", "KNeighborsRegressor", "NearestNeighbors"] diff --git a/sklearnex/spmd/neighbors/tests/test_neighbors_spmd.py b/sklearnex/spmd/neighbors/tests/test_neighbors_spmd.py index 772192a119..65f4dd820f 100644 --- a/sklearnex/spmd/neighbors/tests/test_neighbors_spmd.py +++ b/sklearnex/spmd/neighbors/tests/test_neighbors_spmd.py @@ -326,15 +326,15 @@ def test_knnsearch_spmd_gold(dataframe, queue): from sklearnex.spmd.neighbors import NearestNeighbors as NearestNeighbors_SPMD # Create gold data and convert to dataframe - X_train = np.array( - [[-1, -1], [-2, -1], [-3, -2], [1, 1], [2, 1], [3, 2]] - ) + X_train = np.array([[-1, -1], [-2, -1], [-3, -2], [1, 1], [2, 1], [3, 2]]) local_dpt_X_train = _convert_to_dataframe( _get_local_tensor(X_train), sycl_queue=queue, target_df=dataframe ) # Ensure predictions of batch algo match spmd - spmd_model = NearestNeighbors_SPMD(n_neighbors=2, algorithm="brute").fit(local_dpt_X_train) + spmd_model = NearestNeighbors_SPMD(n_neighbors=2, algorithm="brute").fit( + local_dpt_X_train + ) batch_model = NearestNeighbors_Batch(n_neighbors=2, algorithm="brute").fit(X_train) spmd_dists, spmd_indcs = spmd_model.kneighbors(local_dpt_X_train) batch_dists, batch_indcs = batch_model.kneighbors(X_train) @@ -347,7 +347,9 @@ def test_knnsearch_spmd_gold(dataframe, queue): not _mpi_libs_and_gpu_available, reason="GPU device and MPI libs required for test", ) -@pytest.mark.parametrize("dimensions", [{"n": 100, "m": 10, "k": 2}, {"n": 100000, "m": 100, "k": 100}]) +@pytest.mark.parametrize( + "dimensions", [{"n": 100, "m": 10, "k": 2}, {"n": 100000, "m": 100, "k": 100}] +) @pytest.mark.parametrize( "dataframe,queue", get_dataframes_and_queues(dataframe_filter_="dpnp,dpctl", device_filter_="gpu"), @@ -368,9 +370,7 @@ def test_knnsearch_spmd_synthetic( from sklearnex.spmd.neighbors import NearestNeighbors as NearestNeighbors_SPMD # Generate data and convert to dataframe - X_train = _generate_statistic_data( - dimensions["n"], dimensions["m"], dtype=dtype - ) + X_train = _generate_statistic_data(dimensions["n"], dimensions["m"], dtype=dtype) local_dpt_X_train = _convert_to_dataframe( _get_local_tensor(X_train), sycl_queue=queue, target_df=dataframe @@ -388,6 +388,4 @@ def test_knnsearch_spmd_synthetic( tol = 0.005 if dtype == np.float32 else 1e-6 _assert_unordered_allclose(spmd_indcs, batch_indcs, localize=True) - _assert_unordered_allclose( - spmd_dists, batch_dists, localize=True, rtol=tol, atol=tol - ) + _assert_unordered_allclose(spmd_dists, batch_dists, localize=True, rtol=tol, atol=tol) From 3765c6c855ce054336a8e0a93216d4032ec0484c Mon Sep 17 00:00:00 2001 From: ethanglaser <42726565+ethanglaser@users.noreply.github.com> Date: Wed, 25 Jun 2025 15:10:05 -0700 Subject: [PATCH 03/10] extend gold data to have multiple rows per rank --- sklearnex/spmd/neighbors/tests/test_neighbors_spmd.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sklearnex/spmd/neighbors/tests/test_neighbors_spmd.py b/sklearnex/spmd/neighbors/tests/test_neighbors_spmd.py index 65f4dd820f..ac0f7aef63 100644 --- a/sklearnex/spmd/neighbors/tests/test_neighbors_spmd.py +++ b/sklearnex/spmd/neighbors/tests/test_neighbors_spmd.py @@ -326,7 +326,7 @@ def test_knnsearch_spmd_gold(dataframe, queue): from sklearnex.spmd.neighbors import NearestNeighbors as NearestNeighbors_SPMD # Create gold data and convert to dataframe - X_train = np.array([[-1, -1], [-2, -1], [-3, -2], [1, 1], [2, 1], [3, 2]]) + X_train = np.array([[-1, -1], [-2, -1], [-3, -2], [1, 1], [2, 1], [3, 2], [10, 10], [9, 9]]) local_dpt_X_train = _convert_to_dataframe( _get_local_tensor(X_train), sycl_queue=queue, target_df=dataframe ) From ced4aca42205fb83e118e10dfc1b35c7906fd58c Mon Sep 17 00:00:00 2001 From: ethanglaser <42726565+ethanglaser@users.noreply.github.com> Date: Wed, 25 Jun 2025 15:15:25 -0700 Subject: [PATCH 04/10] formatting --- sklearnex/spmd/neighbors/tests/test_neighbors_spmd.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sklearnex/spmd/neighbors/tests/test_neighbors_spmd.py b/sklearnex/spmd/neighbors/tests/test_neighbors_spmd.py index ac0f7aef63..e72812037d 100644 --- a/sklearnex/spmd/neighbors/tests/test_neighbors_spmd.py +++ b/sklearnex/spmd/neighbors/tests/test_neighbors_spmd.py @@ -326,7 +326,9 @@ def test_knnsearch_spmd_gold(dataframe, queue): from sklearnex.spmd.neighbors import NearestNeighbors as NearestNeighbors_SPMD # Create gold data and convert to dataframe - X_train = np.array([[-1, -1], [-2, -1], [-3, -2], [1, 1], [2, 1], [3, 2], [10, 10], [9, 9]]) + X_train = np.array( + [[-1, -1], [-2, -1], [-3, -2], [1, 1], [2, 1], [3, 2], [10, 10], [9, 9]] + ) local_dpt_X_train = _convert_to_dataframe( _get_local_tensor(X_train), sycl_queue=queue, target_df=dataframe ) From a56ee49aa01155c4559e762e6f8ad5708eb0d48c Mon Sep 17 00:00:00 2001 From: ethanglaser Date: Fri, 5 Sep 2025 10:42:09 -0700 Subject: [PATCH 05/10] raw inputs support for kneighbors --- onedal/neighbors/neighbors.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/onedal/neighbors/neighbors.py b/onedal/neighbors/neighbors.py index c688ae826a..6a5627c9b8 100755 --- a/onedal/neighbors/neighbors.py +++ b/onedal/neighbors/neighbors.py @@ -296,6 +296,7 @@ def _fit(self, X, y): return result def _kneighbors(self, X=None, n_neighbors=None, return_distance=True): + use_raw_input = _get_config().get("use_raw_input", False) is True n_features = getattr(self, "n_features_in_", None) shape = getattr(X, "shape", None) if n_features and shape and len(shape) > 1 and shape[1] != n_features: @@ -320,9 +321,11 @@ def _kneighbors(self, X=None, n_neighbors=None, return_distance=True): "enter integer value" % type(n_neighbors) ) + # TODO: validate both when X is None and when not None if X is not None: query_is_train = False - X = _check_array(X, accept_sparse="csr", dtype=[np.float64, np.float32]) + if not use_raw_input: + X = _check_array(X, accept_sparse="csr", dtype=[np.float64, np.float32]) else: query_is_train = True X = self._fit_X From 8532fe381218ee54fbbda7db3cd04ee2898f02e9 Mon Sep 17 00:00:00 2001 From: ethanglaser <42726565+ethanglaser@users.noreply.github.com> Date: Thu, 11 Sep 2025 11:15:05 -0700 Subject: [PATCH 06/10] Reduce rows of synthetic large test --- sklearnex/spmd/neighbors/tests/test_neighbors_spmd.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sklearnex/spmd/neighbors/tests/test_neighbors_spmd.py b/sklearnex/spmd/neighbors/tests/test_neighbors_spmd.py index e72812037d..050dd5bd1d 100644 --- a/sklearnex/spmd/neighbors/tests/test_neighbors_spmd.py +++ b/sklearnex/spmd/neighbors/tests/test_neighbors_spmd.py @@ -350,7 +350,7 @@ def test_knnsearch_spmd_gold(dataframe, queue): reason="GPU device and MPI libs required for test", ) @pytest.mark.parametrize( - "dimensions", [{"n": 100, "m": 10, "k": 2}, {"n": 100000, "m": 100, "k": 100}] + "dimensions", [{"n": 100, "m": 10, "k": 2}, {"n": 20000, "m": 100, "k": 100}] ) @pytest.mark.parametrize( "dataframe,queue", From 19cde345bdef4d2d3197803bf565b5d432787ac3 Mon Sep 17 00:00:00 2001 From: ethanglaser Date: Wed, 17 Sep 2025 15:44:15 -0700 Subject: [PATCH 07/10] update search size and only use _spmd_assert_allclose --- .../neighbors/tests/test_neighbors_spmd.py | 31 ++++++++----------- 1 file changed, 13 insertions(+), 18 deletions(-) diff --git a/sklearnex/spmd/neighbors/tests/test_neighbors_spmd.py b/sklearnex/spmd/neighbors/tests/test_neighbors_spmd.py index 050dd5bd1d..5d5dfa343a 100644 --- a/sklearnex/spmd/neighbors/tests/test_neighbors_spmd.py +++ b/sklearnex/spmd/neighbors/tests/test_neighbors_spmd.py @@ -24,7 +24,6 @@ ) from sklearnex import config_context from sklearnex.tests.utils.spmd import ( - _assert_unordered_allclose, _generate_classification_data, _generate_regression_data, _generate_statistic_data, @@ -95,8 +94,8 @@ def test_knncls_spmd_gold(dataframe, queue): spmd_result = spmd_model.predict(local_dpt_X_test) batch_result = batch_model.predict(X_test) - _assert_unordered_allclose(spmd_indcs, batch_indcs, localize=True) - _assert_unordered_allclose(spmd_dists, batch_dists, localize=True) + _spmd_assert_allclose(spmd_indcs, batch_indcs) + _spmd_assert_allclose(spmd_dists, batch_dists) _spmd_assert_allclose(spmd_result, batch_result) @@ -165,10 +164,8 @@ def test_knncls_spmd_synthetic( tol = 1e-4 if dtype == np.float64: - _assert_unordered_allclose(spmd_indcs, batch_indcs, localize=True) - _assert_unordered_allclose( - spmd_dists, batch_dists, localize=True, rtol=tol, atol=tol - ) + _spmd_assert_allclose(spmd_indcs, batch_indcs) + _spmd_assert_allclose(spmd_dists, batch_dists, rtol=tol, atol=tol) _spmd_assert_allclose(spmd_result, batch_result) @@ -232,8 +229,8 @@ def test_knnreg_spmd_gold(dataframe, queue): spmd_result = spmd_model.predict(local_dpt_X_test) batch_result = batch_model.predict(X_test) - _assert_unordered_allclose(spmd_indcs, batch_indcs, localize=True) - _assert_unordered_allclose(spmd_dists, batch_dists, localize=True) + _spmd_assert_allclose(spmd_indcs, batch_indcs) + _spmd_assert_allclose(spmd_dists, batch_dists) _spmd_assert_allclose(spmd_result, batch_result) @@ -304,10 +301,8 @@ def test_knnreg_spmd_synthetic( tol = 0.005 if dtype == np.float32 else 1e-4 if dtype == np.float64: - _assert_unordered_allclose(spmd_indcs, batch_indcs, localize=True) - _assert_unordered_allclose( - spmd_dists, batch_dists, localize=True, rtol=tol, atol=tol - ) + _spmd_assert_allclose(spmd_indcs, batch_indcs) + _spmd_assert_allclose(spmd_dists, batch_dists, rtol=tol, atol=tol) _spmd_assert_allclose(spmd_result, batch_result, rtol=tol, atol=tol) @@ -341,8 +336,8 @@ def test_knnsearch_spmd_gold(dataframe, queue): spmd_dists, spmd_indcs = spmd_model.kneighbors(local_dpt_X_train) batch_dists, batch_indcs = batch_model.kneighbors(X_train) - _assert_unordered_allclose(spmd_indcs, batch_indcs, localize=True) - _assert_unordered_allclose(spmd_dists, batch_dists, localize=True) + _spmd_assert_allclose(spmd_indcs, batch_indcs) + _spmd_assert_allclose(spmd_dists, batch_dists) @pytest.mark.skipif( @@ -350,7 +345,7 @@ def test_knnsearch_spmd_gold(dataframe, queue): reason="GPU device and MPI libs required for test", ) @pytest.mark.parametrize( - "dimensions", [{"n": 100, "m": 10, "k": 2}, {"n": 20000, "m": 100, "k": 100}] + "dimensions", [{"n": 100, "m": 10, "k": 2}, {"n": 100000, "m": 100, "k": 100}] ) @pytest.mark.parametrize( "dataframe,queue", @@ -389,5 +384,5 @@ def test_knnsearch_spmd_synthetic( batch_dists, batch_indcs = batch_model.kneighbors(X_train) tol = 0.005 if dtype == np.float32 else 1e-6 - _assert_unordered_allclose(spmd_indcs, batch_indcs, localize=True) - _assert_unordered_allclose(spmd_dists, batch_dists, localize=True, rtol=tol, atol=tol) + _spmd_assert_allclose(spmd_indcs, batch_indcs) + _spmd_assert_allclose(spmd_dists, batch_dists, rtol=tol, atol=tol) From ffba570ba1f5ddf04c7fc93a8759426abdc50f66 Mon Sep 17 00:00:00 2001 From: ethanglaser Date: Thu, 25 Sep 2025 17:29:16 -0700 Subject: [PATCH 08/10] support empty kneighbors() --- onedal/neighbors/neighbors.py | 1 - onedal/spmd/neighbors/neighbors.py | 9 ++++ .../neighbors/tests/test_neighbors_spmd.py | 45 +++++++++++++++++++ 3 files changed, 54 insertions(+), 1 deletion(-) diff --git a/onedal/neighbors/neighbors.py b/onedal/neighbors/neighbors.py index 6a5627c9b8..3a5185d6e9 100755 --- a/onedal/neighbors/neighbors.py +++ b/onedal/neighbors/neighbors.py @@ -321,7 +321,6 @@ def _kneighbors(self, X=None, n_neighbors=None, return_distance=True): "enter integer value" % type(n_neighbors) ) - # TODO: validate both when X is None and when not None if X is not None: query_is_train = False if not use_raw_input: diff --git a/onedal/spmd/neighbors/neighbors.py b/onedal/spmd/neighbors/neighbors.py index 73beff141e..e3f6ef9e21 100644 --- a/onedal/spmd/neighbors/neighbors.py +++ b/onedal/spmd/neighbors/neighbors.py @@ -31,6 +31,7 @@ def infer(self, *args, **kwargs): ... @support_input_format def fit(self, X, y, queue=None): + self.spmd_queue_ = queue return super().fit(X, y, queue=queue) @support_input_format @@ -43,6 +44,8 @@ def predict_proba(self, X, queue=None): @support_input_format def kneighbors(self, X=None, n_neighbors=None, return_distance=True, queue=None): + if X is None and queue is None: + queue = getattr(self, "spmd_queue_", None) return super().kneighbors(X, n_neighbors, return_distance, queue=queue) @@ -63,6 +66,7 @@ def infer(self, *args, **kwargs): ... @support_input_format @supports_queue def fit(self, X, y, queue=None): + self.spmd_queue_ = queue if queue is not None and queue.sycl_device.is_gpu: return self._fit(X, y) else: @@ -73,6 +77,8 @@ def fit(self, X, y, queue=None): @support_input_format def kneighbors(self, X=None, n_neighbors=None, return_distance=True, queue=None): + if X is None and queue is None: + queue = getattr(self, "spmd_queue_", None) return super().kneighbors(X, n_neighbors, return_distance, queue=queue) @support_input_format @@ -97,8 +103,11 @@ def infer(self, *args, **kwargs): ... @support_input_format def fit(self, X, y=None, queue=None): + self.spmd_queue_ = queue return super().fit(X, y, queue=queue) @support_input_format def kneighbors(self, X=None, n_neighbors=None, return_distance=True, queue=None): + if X is None and queue is None: + queue = getattr(self, "spmd_queue_", None) return super().kneighbors(X, n_neighbors, return_distance, queue=queue) diff --git a/sklearnex/spmd/neighbors/tests/test_neighbors_spmd.py b/sklearnex/spmd/neighbors/tests/test_neighbors_spmd.py index 5d5dfa343a..0ad470ce9d 100644 --- a/sklearnex/spmd/neighbors/tests/test_neighbors_spmd.py +++ b/sklearnex/spmd/neighbors/tests/test_neighbors_spmd.py @@ -386,3 +386,48 @@ def test_knnsearch_spmd_synthetic( tol = 0.005 if dtype == np.float32 else 1e-6 _spmd_assert_allclose(spmd_indcs, batch_indcs) _spmd_assert_allclose(spmd_dists, batch_dists, rtol=tol, atol=tol) + + +@pytest.mark.skipif( + not _mpi_libs_and_gpu_available, + reason="GPU device and MPI libs required for test", +) +@pytest.mark.parametrize( + "dataframe,queue", + get_dataframes_and_queues(dataframe_filter_="dpnp,dpctl", device_filter_="gpu"), +) +@pytest.mark.mpi +def test_knn_spmd_empty_kneighbors(dataframe, queue): + # Import spmd and batch algo + from sklearnex.neighbors import NearestNeighbors as NearestNeighbors_Batch + from sklearnex.spmd.neighbors import ( + KNeighborsClassifier, + KNeighborsRegressor, + NearestNeighbors, + ) + + # Create gold data and convert to dataframe + X_train = np.array( + [[-1, -1], [-2, -1], [-3, -2], [1, 1], [2, 1], [3, 2], [10, 10], [9, 9]] + ) + y_train = np.array([0, 1, 0, 1, 0, 1, 0, 1]) + local_dpt_X_train = _convert_to_dataframe( + _get_local_tensor(X_train), sycl_queue=queue, target_df=dataframe + ) + local_dpt_y_train = _convert_to_dataframe( + _get_local_tensor(y_train), sycl_queue=queue, target_df=dataframe + ) + + # Run each estimator without an input to kneighbors() and ensure functionality and equivalence + for CurrentEstimator in [KNeighborsClassifier, KNeighborsRegressor, NearestNeighbors]: + spmd_model = CurrentEstimator(n_neighbors=2, algorithm="brute").fit( + local_dpt_X_train, local_dpt_y_train + ) + batch_model = NearestNeighbors_Batch(n_neighbors=2, algorithm="brute").fit( + X_train, y_train + ) + spmd_dists, spmd_indcs = spmd_model.kneighbors() + batch_dists, batch_indcs = batch_model.kneighbors() + + _spmd_assert_allclose(spmd_indcs, batch_indcs) + _spmd_assert_allclose(spmd_dists, batch_dists) From 0b7777e6826c09597855c63a8b84b0a3cb89319d Mon Sep 17 00:00:00 2001 From: ethanglaser <42726565+ethanglaser@users.noreply.github.com> Date: Fri, 26 Sep 2025 10:29:11 -0700 Subject: [PATCH 09/10] Update sklearnex/spmd/neighbors/tests/test_neighbors_spmd.py --- sklearnex/spmd/neighbors/tests/test_neighbors_spmd.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sklearnex/spmd/neighbors/tests/test_neighbors_spmd.py b/sklearnex/spmd/neighbors/tests/test_neighbors_spmd.py index 0ad470ce9d..ca41194701 100644 --- a/sklearnex/spmd/neighbors/tests/test_neighbors_spmd.py +++ b/sklearnex/spmd/neighbors/tests/test_neighbors_spmd.py @@ -420,10 +420,10 @@ def test_knn_spmd_empty_kneighbors(dataframe, queue): # Run each estimator without an input to kneighbors() and ensure functionality and equivalence for CurrentEstimator in [KNeighborsClassifier, KNeighborsRegressor, NearestNeighbors]: - spmd_model = CurrentEstimator(n_neighbors=2, algorithm="brute").fit( + spmd_model = CurrentEstimator(n_neighbors=1, algorithm="brute").fit( local_dpt_X_train, local_dpt_y_train ) - batch_model = NearestNeighbors_Batch(n_neighbors=2, algorithm="brute").fit( + batch_model = NearestNeighbors_Batch(n_neighbors=1, algorithm="brute").fit( X_train, y_train ) spmd_dists, spmd_indcs = spmd_model.kneighbors() From 466e1954b640dddc8c1c3dd2ae926e56c3e89588 Mon Sep 17 00:00:00 2001 From: ethanglaser Date: Tue, 30 Sep 2025 08:06:03 -0700 Subject: [PATCH 10/10] address comments --- onedal/spmd/neighbors/neighbors.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/onedal/spmd/neighbors/neighbors.py b/onedal/spmd/neighbors/neighbors.py index e3f6ef9e21..94deec6826 100644 --- a/onedal/spmd/neighbors/neighbors.py +++ b/onedal/spmd/neighbors/neighbors.py @@ -31,6 +31,7 @@ def infer(self, *args, **kwargs): ... @support_input_format def fit(self, X, y, queue=None): + # Store queue to use during inference if not provided (if X is none in kneighbors) self.spmd_queue_ = queue return super().fit(X, y, queue=queue) @@ -66,6 +67,7 @@ def infer(self, *args, **kwargs): ... @support_input_format @supports_queue def fit(self, X, y, queue=None): + # Store queue to use during inference if not provided (if X is none in kneighbors) self.spmd_queue_ = queue if queue is not None and queue.sycl_device.is_gpu: return self._fit(X, y) @@ -103,6 +105,7 @@ def infer(self, *args, **kwargs): ... @support_input_format def fit(self, X, y=None, queue=None): + # Store queue to use during inference if not provided (if X is none in kneighbors) self.spmd_queue_ = queue return super().fit(X, y, queue=queue)