diff --git a/onedal/neighbors/neighbors.py b/onedal/neighbors/neighbors.py index 64ffb7c5ca..3a5185d6e9 100755 --- a/onedal/neighbors/neighbors.py +++ b/onedal/neighbors/neighbors.py @@ -296,6 +296,7 @@ def _fit(self, X, y): return result def _kneighbors(self, X=None, n_neighbors=None, return_distance=True): + use_raw_input = _get_config().get("use_raw_input", False) is True n_features = getattr(self, "n_features_in_", None) shape = getattr(X, "shape", None) if n_features and shape and len(shape) > 1 and shape[1] != n_features: @@ -322,7 +323,8 @@ def _kneighbors(self, X=None, n_neighbors=None, return_distance=True): if X is not None: query_is_train = False - X = _check_array(X, accept_sparse="csr", dtype=[np.float64, np.float32]) + if not use_raw_input: + X = _check_array(X, accept_sparse="csr", dtype=[np.float64, np.float32]) else: query_is_train = True X = self._fit_X @@ -730,7 +732,6 @@ def __init__( self, n_neighbors=5, *, - weights="uniform", algorithm="auto", p=2, metric="minkowski", @@ -745,7 +746,7 @@ def __init__( metric_params=metric_params, **kwargs, ) - self.weights = weights + self.requires_y = False @bind_default_backend("neighbors.search") def train(self, *args, **kwargs): ... @@ -792,7 +793,7 @@ def _onedal_predict(self, model, X, params): return self.infer(params, model, X) @supports_queue - def fit(self, X, y, queue=None): + def fit(self, X, y=None, queue=None): return self._fit(X, y) @supports_queue diff --git a/onedal/spmd/neighbors/__init__.py b/onedal/spmd/neighbors/__init__.py index 1aa6247605..8036511d9f 100644 --- a/onedal/spmd/neighbors/__init__.py +++ b/onedal/spmd/neighbors/__init__.py @@ -14,6 +14,6 @@ # limitations under the License. # ============================================================================== -from .neighbors import KNeighborsClassifier, KNeighborsRegressor +from .neighbors import KNeighborsClassifier, KNeighborsRegressor, NearestNeighbors -__all__ = ["KNeighborsClassifier", "KNeighborsRegressor"] +__all__ = ["KNeighborsClassifier", "KNeighborsRegressor", "NearestNeighbors"] diff --git a/onedal/spmd/neighbors/neighbors.py b/onedal/spmd/neighbors/neighbors.py index b9f5f98d18..94deec6826 100644 --- a/onedal/spmd/neighbors/neighbors.py +++ b/onedal/spmd/neighbors/neighbors.py @@ -18,6 +18,7 @@ from ...common._backend import bind_spmd_backend from ...neighbors import KNeighborsClassifier as KNeighborsClassifier_Batch from ...neighbors import KNeighborsRegressor as KNeighborsRegressor_Batch +from ...neighbors import NearestNeighbors as NearestNeighbors_Batch class KNeighborsClassifier(KNeighborsClassifier_Batch): @@ -30,6 +31,8 @@ def infer(self, *args, **kwargs): ... @support_input_format def fit(self, X, y, queue=None): + # Store queue to use during inference if not provided (if X is none in kneighbors) + self.spmd_queue_ = queue return super().fit(X, y, queue=queue) @support_input_format @@ -42,6 +45,8 @@ def predict_proba(self, X, queue=None): @support_input_format def kneighbors(self, X=None, n_neighbors=None, return_distance=True, queue=None): + if X is None and queue is None: + queue = getattr(self, "spmd_queue_", None) return super().kneighbors(X, n_neighbors, return_distance, queue=queue) @@ -62,6 +67,8 @@ def infer(self, *args, **kwargs): ... @support_input_format @supports_queue def fit(self, X, y, queue=None): + # Store queue to use during inference if not provided (if X is none in kneighbors) + self.spmd_queue_ = queue if queue is not None and queue.sycl_device.is_gpu: return self._fit(X, y) else: @@ -72,6 +79,8 @@ def fit(self, X, y, queue=None): @support_input_format def kneighbors(self, X=None, n_neighbors=None, return_distance=True, queue=None): + if X is None and queue is None: + queue = getattr(self, "spmd_queue_", None) return super().kneighbors(X, n_neighbors, return_distance, queue=queue) @support_input_format @@ -84,3 +93,24 @@ def _get_onedal_params(self, X, y=None): if "responses" not in params["result_option"]: params["result_option"] += "|responses" return params + + +class NearestNeighbors(NearestNeighbors_Batch): + + @bind_spmd_backend("neighbors.search") + def train(self, *args, **kwargs): ... + + @bind_spmd_backend("neighbors.search") + def infer(self, *args, **kwargs): ... + + @support_input_format + def fit(self, X, y=None, queue=None): + # Store queue to use during inference if not provided (if X is none in kneighbors) + self.spmd_queue_ = queue + return super().fit(X, y, queue=queue) + + @support_input_format + def kneighbors(self, X=None, n_neighbors=None, return_distance=True, queue=None): + if X is None and queue is None: + queue = getattr(self, "spmd_queue_", None) + return super().kneighbors(X, n_neighbors, return_distance, queue=queue) diff --git a/sklearnex/spmd/neighbors/__init__.py b/sklearnex/spmd/neighbors/__init__.py index 7b1f9f646c..44cb849591 100644 --- a/sklearnex/spmd/neighbors/__init__.py +++ b/sklearnex/spmd/neighbors/__init__.py @@ -14,6 +14,10 @@ # limitations under the License. # ============================================================================== -from onedal.spmd.neighbors import KNeighborsClassifier, KNeighborsRegressor +from onedal.spmd.neighbors import ( + KNeighborsClassifier, + KNeighborsRegressor, + NearestNeighbors, +) -__all__ = ["KNeighborsClassifier", "KNeighborsRegressor"] +__all__ = ["KNeighborsClassifier", "KNeighborsRegressor", "NearestNeighbors"] diff --git a/sklearnex/spmd/neighbors/tests/test_neighbors_spmd.py b/sklearnex/spmd/neighbors/tests/test_neighbors_spmd.py index d362cf0bac..ca41194701 100644 --- a/sklearnex/spmd/neighbors/tests/test_neighbors_spmd.py +++ b/sklearnex/spmd/neighbors/tests/test_neighbors_spmd.py @@ -24,9 +24,9 @@ ) from sklearnex import config_context from sklearnex.tests.utils.spmd import ( - _assert_unordered_allclose, _generate_classification_data, _generate_regression_data, + _generate_statistic_data, _get_local_tensor, _mpi_libs_and_gpu_available, _spmd_assert_allclose, @@ -94,8 +94,8 @@ def test_knncls_spmd_gold(dataframe, queue): spmd_result = spmd_model.predict(local_dpt_X_test) batch_result = batch_model.predict(X_test) - _assert_unordered_allclose(spmd_indcs, batch_indcs, localize=True) - _assert_unordered_allclose(spmd_dists, batch_dists, localize=True) + _spmd_assert_allclose(spmd_indcs, batch_indcs) + _spmd_assert_allclose(spmd_dists, batch_dists) _spmd_assert_allclose(spmd_result, batch_result) @@ -164,10 +164,8 @@ def test_knncls_spmd_synthetic( tol = 1e-4 if dtype == np.float64: - _assert_unordered_allclose(spmd_indcs, batch_indcs, localize=True) - _assert_unordered_allclose( - spmd_dists, batch_dists, localize=True, rtol=tol, atol=tol - ) + _spmd_assert_allclose(spmd_indcs, batch_indcs) + _spmd_assert_allclose(spmd_dists, batch_dists, rtol=tol, atol=tol) _spmd_assert_allclose(spmd_result, batch_result) @@ -231,8 +229,8 @@ def test_knnreg_spmd_gold(dataframe, queue): spmd_result = spmd_model.predict(local_dpt_X_test) batch_result = batch_model.predict(X_test) - _assert_unordered_allclose(spmd_indcs, batch_indcs, localize=True) - _assert_unordered_allclose(spmd_dists, batch_dists, localize=True) + _spmd_assert_allclose(spmd_indcs, batch_indcs) + _spmd_assert_allclose(spmd_dists, batch_dists) _spmd_assert_allclose(spmd_result, batch_result) @@ -303,8 +301,133 @@ def test_knnreg_spmd_synthetic( tol = 0.005 if dtype == np.float32 else 1e-4 if dtype == np.float64: - _assert_unordered_allclose(spmd_indcs, batch_indcs, localize=True) - _assert_unordered_allclose( - spmd_dists, batch_dists, localize=True, rtol=tol, atol=tol - ) + _spmd_assert_allclose(spmd_indcs, batch_indcs) + _spmd_assert_allclose(spmd_dists, batch_dists, rtol=tol, atol=tol) _spmd_assert_allclose(spmd_result, batch_result, rtol=tol, atol=tol) + + +@pytest.mark.skipif( + not _mpi_libs_and_gpu_available, + reason="GPU device and MPI libs required for test", +) +@pytest.mark.parametrize( + "dataframe,queue", + get_dataframes_and_queues(dataframe_filter_="dpnp,dpctl", device_filter_="gpu"), +) +@pytest.mark.mpi +def test_knnsearch_spmd_gold(dataframe, queue): + # Import spmd and batch algo + from sklearnex.neighbors import NearestNeighbors as NearestNeighbors_Batch + from sklearnex.spmd.neighbors import NearestNeighbors as NearestNeighbors_SPMD + + # Create gold data and convert to dataframe + X_train = np.array( + [[-1, -1], [-2, -1], [-3, -2], [1, 1], [2, 1], [3, 2], [10, 10], [9, 9]] + ) + local_dpt_X_train = _convert_to_dataframe( + _get_local_tensor(X_train), sycl_queue=queue, target_df=dataframe + ) + + # Ensure predictions of batch algo match spmd + spmd_model = NearestNeighbors_SPMD(n_neighbors=2, algorithm="brute").fit( + local_dpt_X_train + ) + batch_model = NearestNeighbors_Batch(n_neighbors=2, algorithm="brute").fit(X_train) + spmd_dists, spmd_indcs = spmd_model.kneighbors(local_dpt_X_train) + batch_dists, batch_indcs = batch_model.kneighbors(X_train) + + _spmd_assert_allclose(spmd_indcs, batch_indcs) + _spmd_assert_allclose(spmd_dists, batch_dists) + + +@pytest.mark.skipif( + not _mpi_libs_and_gpu_available, + reason="GPU device and MPI libs required for test", +) +@pytest.mark.parametrize( + "dimensions", [{"n": 100, "m": 10, "k": 2}, {"n": 100000, "m": 100, "k": 100}] +) +@pytest.mark.parametrize( + "dataframe,queue", + get_dataframes_and_queues(dataframe_filter_="dpnp,dpctl", device_filter_="gpu"), +) +@pytest.mark.parametrize("dtype", [np.float32, np.float64]) +@pytest.mark.mpi +def test_knnsearch_spmd_synthetic( + dimensions, + dataframe, + queue, + dtype, +): + if dimensions["n"] > 10000 and dtype == np.float32: + pytest.skip("Skipping large float32 test due to expected precision issues") + + # Import spmd and batch algo + from sklearnex.neighbors import NearestNeighbors as NearestNeighbors_Batch + from sklearnex.spmd.neighbors import NearestNeighbors as NearestNeighbors_SPMD + + # Generate data and convert to dataframe + X_train = _generate_statistic_data(dimensions["n"], dimensions["m"], dtype=dtype) + + local_dpt_X_train = _convert_to_dataframe( + _get_local_tensor(X_train), sycl_queue=queue, target_df=dataframe + ) + + # Ensure search results of batch algo match spmd + spmd_model = NearestNeighbors_SPMD( + n_neighbors=dimensions["k"], algorithm="brute" + ).fit(local_dpt_X_train) + batch_model = NearestNeighbors_Batch( + n_neighbors=dimensions["k"], algorithm="brute" + ).fit(X_train) + spmd_dists, spmd_indcs = spmd_model.kneighbors(local_dpt_X_train) + batch_dists, batch_indcs = batch_model.kneighbors(X_train) + + tol = 0.005 if dtype == np.float32 else 1e-6 + _spmd_assert_allclose(spmd_indcs, batch_indcs) + _spmd_assert_allclose(spmd_dists, batch_dists, rtol=tol, atol=tol) + + +@pytest.mark.skipif( + not _mpi_libs_and_gpu_available, + reason="GPU device and MPI libs required for test", +) +@pytest.mark.parametrize( + "dataframe,queue", + get_dataframes_and_queues(dataframe_filter_="dpnp,dpctl", device_filter_="gpu"), +) +@pytest.mark.mpi +def test_knn_spmd_empty_kneighbors(dataframe, queue): + # Import spmd and batch algo + from sklearnex.neighbors import NearestNeighbors as NearestNeighbors_Batch + from sklearnex.spmd.neighbors import ( + KNeighborsClassifier, + KNeighborsRegressor, + NearestNeighbors, + ) + + # Create gold data and convert to dataframe + X_train = np.array( + [[-1, -1], [-2, -1], [-3, -2], [1, 1], [2, 1], [3, 2], [10, 10], [9, 9]] + ) + y_train = np.array([0, 1, 0, 1, 0, 1, 0, 1]) + local_dpt_X_train = _convert_to_dataframe( + _get_local_tensor(X_train), sycl_queue=queue, target_df=dataframe + ) + local_dpt_y_train = _convert_to_dataframe( + _get_local_tensor(y_train), sycl_queue=queue, target_df=dataframe + ) + + # Run each estimator without an input to kneighbors() and ensure functionality and equivalence + for CurrentEstimator in [KNeighborsClassifier, KNeighborsRegressor, NearestNeighbors]: + spmd_model = CurrentEstimator(n_neighbors=1, algorithm="brute").fit( + local_dpt_X_train, local_dpt_y_train + ) + batch_model = NearestNeighbors_Batch(n_neighbors=1, algorithm="brute").fit( + X_train, y_train + ) + spmd_dists, spmd_indcs = spmd_model.kneighbors() + batch_dists, batch_indcs = batch_model.kneighbors() + + _spmd_assert_allclose(spmd_indcs, batch_indcs) + _spmd_assert_allclose(spmd_dists, batch_dists)