Skip to content
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
7213c80
Add NearestNeighbors SPMD API
ethanglaser Jun 17, 2025
7aea5a6
black format
ethanglaser Jun 17, 2025
fa48719
Merge branch 'main' into dev/eglaser-knn-spmd-search
ethanglaser Jun 17, 2025
3765c6c
extend gold data to have multiple rows per rank
ethanglaser Jun 25, 2025
b3c66af
Merge branch 'uxlfoundation:main' into dev/eglaser-knn-spmd-search
ethanglaser Jun 25, 2025
ced4aca
formatting
ethanglaser Jun 25, 2025
72fc707
Merge branch 'uxlfoundation:main' into dev/eglaser-knn-spmd-search
ethanglaser Jul 15, 2025
ca9408b
Merge branch 'uxlfoundation:main' into dev/eglaser-knn-spmd-search
ethanglaser Jul 16, 2025
e8c1ed9
Merge branch 'uxlfoundation:main' into dev/eglaser-knn-spmd-search
ethanglaser Jul 25, 2025
eca8bff
Merge branch 'uxlfoundation:main' into dev/eglaser-knn-spmd-search
ethanglaser Aug 20, 2025
47a7d93
Merge branch 'main' into dev/eglaser-knn-spmd-search
ethanglaser Sep 3, 2025
a56ee49
raw inputs support for kneighbors
ethanglaser Sep 5, 2025
544cca3
Merge branch 'uxlfoundation:main' into dev/eglaser-knn-spmd-search
ethanglaser Sep 9, 2025
8532fe3
Reduce rows of synthetic large test
ethanglaser Sep 11, 2025
b9eb2df
Merge branch 'uxlfoundation:main' into dev/eglaser-knn-spmd-search
ethanglaser Sep 11, 2025
ae7ade0
Merge branch 'uxlfoundation:main' into dev/eglaser-knn-spmd-search
ethanglaser Sep 17, 2025
19cde34
update search size and only use _spmd_assert_allclose
ethanglaser Sep 17, 2025
ffba570
support empty kneighbors()
ethanglaser Sep 26, 2025
0b7777e
Update sklearnex/spmd/neighbors/tests/test_neighbors_spmd.py
ethanglaser Sep 26, 2025
5faaa8e
Merge branch 'uxlfoundation:main' into dev/eglaser-knn-spmd-search
ethanglaser Sep 30, 2025
466e195
address comments
ethanglaser Sep 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions onedal/neighbors/neighbors.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ def _fit(self, X, y):
return result

def _kneighbors(self, X=None, n_neighbors=None, return_distance=True):
use_raw_input = _get_config().get("use_raw_input", False) is True
n_features = getattr(self, "n_features_in_", None)
shape = getattr(X, "shape", None)
if n_features and shape and len(shape) > 1 and shape[1] != n_features:
Expand All @@ -322,7 +323,8 @@ def _kneighbors(self, X=None, n_neighbors=None, return_distance=True):

if X is not None:
query_is_train = False
X = _check_array(X, accept_sparse="csr", dtype=[np.float64, np.float32])
if not use_raw_input:
X = _check_array(X, accept_sparse="csr", dtype=[np.float64, np.float32])
else:
query_is_train = True
X = self._fit_X
Expand Down Expand Up @@ -730,7 +732,6 @@ def __init__(
self,
n_neighbors=5,
*,
weights="uniform",
algorithm="auto",
p=2,
metric="minkowski",
Expand All @@ -745,7 +746,7 @@ def __init__(
metric_params=metric_params,
**kwargs,
)
self.weights = weights
self.requires_y = False

@bind_default_backend("neighbors.search")
def train(self, *args, **kwargs): ...
Expand Down Expand Up @@ -792,7 +793,7 @@ def _onedal_predict(self, model, X, params):
return self.infer(params, model, X)

@supports_queue
def fit(self, X, y, queue=None):
def fit(self, X, y=None, queue=None):
return self._fit(X, y)

@supports_queue
Expand Down
4 changes: 2 additions & 2 deletions onedal/spmd/neighbors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,6 @@
# limitations under the License.
# ==============================================================================

from .neighbors import KNeighborsClassifier, KNeighborsRegressor
from .neighbors import KNeighborsClassifier, KNeighborsRegressor, NearestNeighbors

__all__ = ["KNeighborsClassifier", "KNeighborsRegressor"]
__all__ = ["KNeighborsClassifier", "KNeighborsRegressor", "NearestNeighbors"]
27 changes: 27 additions & 0 deletions onedal/spmd/neighbors/neighbors.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from ...common._backend import bind_spmd_backend
from ...neighbors import KNeighborsClassifier as KNeighborsClassifier_Batch
from ...neighbors import KNeighborsRegressor as KNeighborsRegressor_Batch
from ...neighbors import NearestNeighbors as NearestNeighbors_Batch


class KNeighborsClassifier(KNeighborsClassifier_Batch):
Expand All @@ -30,6 +31,7 @@ def infer(self, *args, **kwargs): ...

@support_input_format
def fit(self, X, y, queue=None):
self.spmd_queue_ = queue
return super().fit(X, y, queue=queue)

@support_input_format
Expand All @@ -42,6 +44,8 @@ def predict_proba(self, X, queue=None):

@support_input_format
def kneighbors(self, X=None, n_neighbors=None, return_distance=True, queue=None):
if X is None and queue is None:
queue = getattr(self, "spmd_queue_", None)
return super().kneighbors(X, n_neighbors, return_distance, queue=queue)


Expand All @@ -62,6 +66,7 @@ def infer(self, *args, **kwargs): ...
@support_input_format
@supports_queue
def fit(self, X, y, queue=None):
self.spmd_queue_ = queue
if queue is not None and queue.sycl_device.is_gpu:
return self._fit(X, y)
else:
Expand All @@ -72,6 +77,8 @@ def fit(self, X, y, queue=None):

@support_input_format
def kneighbors(self, X=None, n_neighbors=None, return_distance=True, queue=None):
if X is None and queue is None:
queue = getattr(self, "spmd_queue_", None)
return super().kneighbors(X, n_neighbors, return_distance, queue=queue)

@support_input_format
Expand All @@ -84,3 +91,23 @@ def _get_onedal_params(self, X, y=None):
if "responses" not in params["result_option"]:
params["result_option"] += "|responses"
return params


class NearestNeighbors(NearestNeighbors_Batch):

@bind_spmd_backend("neighbors.search")
def train(self, *args, **kwargs): ...

@bind_spmd_backend("neighbors.search")
def infer(self, *args, **kwargs): ...

@support_input_format
def fit(self, X, y=None, queue=None):
self.spmd_queue_ = queue
return super().fit(X, y, queue=queue)

@support_input_format
def kneighbors(self, X=None, n_neighbors=None, return_distance=True, queue=None):
if X is None and queue is None:
queue = getattr(self, "spmd_queue_", None)
return super().kneighbors(X, n_neighbors, return_distance, queue=queue)
8 changes: 6 additions & 2 deletions sklearnex/spmd/neighbors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@
# limitations under the License.
# ==============================================================================

from onedal.spmd.neighbors import KNeighborsClassifier, KNeighborsRegressor
from onedal.spmd.neighbors import (
KNeighborsClassifier,
KNeighborsRegressor,
NearestNeighbors,
)

__all__ = ["KNeighborsClassifier", "KNeighborsRegressor"]
__all__ = ["KNeighborsClassifier", "KNeighborsRegressor", "NearestNeighbors"]
149 changes: 136 additions & 13 deletions sklearnex/spmd/neighbors/tests/test_neighbors_spmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@
)
from sklearnex import config_context
from sklearnex.tests.utils.spmd import (
_assert_unordered_allclose,
_generate_classification_data,
_generate_regression_data,
_generate_statistic_data,
_get_local_tensor,
_mpi_libs_and_gpu_available,
_spmd_assert_allclose,
Expand Down Expand Up @@ -94,8 +94,8 @@ def test_knncls_spmd_gold(dataframe, queue):
spmd_result = spmd_model.predict(local_dpt_X_test)
batch_result = batch_model.predict(X_test)

_assert_unordered_allclose(spmd_indcs, batch_indcs, localize=True)
_assert_unordered_allclose(spmd_dists, batch_dists, localize=True)
_spmd_assert_allclose(spmd_indcs, batch_indcs)
_spmd_assert_allclose(spmd_dists, batch_dists)
_spmd_assert_allclose(spmd_result, batch_result)


Expand Down Expand Up @@ -164,10 +164,8 @@ def test_knncls_spmd_synthetic(

tol = 1e-4
if dtype == np.float64:
_assert_unordered_allclose(spmd_indcs, batch_indcs, localize=True)
_assert_unordered_allclose(
spmd_dists, batch_dists, localize=True, rtol=tol, atol=tol
)
_spmd_assert_allclose(spmd_indcs, batch_indcs)
_spmd_assert_allclose(spmd_dists, batch_dists, rtol=tol, atol=tol)
_spmd_assert_allclose(spmd_result, batch_result)


Expand Down Expand Up @@ -231,8 +229,8 @@ def test_knnreg_spmd_gold(dataframe, queue):
spmd_result = spmd_model.predict(local_dpt_X_test)
batch_result = batch_model.predict(X_test)

_assert_unordered_allclose(spmd_indcs, batch_indcs, localize=True)
_assert_unordered_allclose(spmd_dists, batch_dists, localize=True)
_spmd_assert_allclose(spmd_indcs, batch_indcs)
_spmd_assert_allclose(spmd_dists, batch_dists)
_spmd_assert_allclose(spmd_result, batch_result)


Expand Down Expand Up @@ -303,8 +301,133 @@ def test_knnreg_spmd_synthetic(

tol = 0.005 if dtype == np.float32 else 1e-4
if dtype == np.float64:
_assert_unordered_allclose(spmd_indcs, batch_indcs, localize=True)
_assert_unordered_allclose(
spmd_dists, batch_dists, localize=True, rtol=tol, atol=tol
)
_spmd_assert_allclose(spmd_indcs, batch_indcs)
_spmd_assert_allclose(spmd_dists, batch_dists, rtol=tol, atol=tol)
_spmd_assert_allclose(spmd_result, batch_result, rtol=tol, atol=tol)


@pytest.mark.skipif(
not _mpi_libs_and_gpu_available,
reason="GPU device and MPI libs required for test",
)
@pytest.mark.parametrize(
"dataframe,queue",
get_dataframes_and_queues(dataframe_filter_="dpnp,dpctl", device_filter_="gpu"),
)
@pytest.mark.mpi
def test_knnsearch_spmd_gold(dataframe, queue):
# Import spmd and batch algo
from sklearnex.neighbors import NearestNeighbors as NearestNeighbors_Batch
from sklearnex.spmd.neighbors import NearestNeighbors as NearestNeighbors_SPMD

# Create gold data and convert to dataframe
X_train = np.array(
[[-1, -1], [-2, -1], [-3, -2], [1, 1], [2, 1], [3, 2], [10, 10], [9, 9]]
)
local_dpt_X_train = _convert_to_dataframe(
_get_local_tensor(X_train), sycl_queue=queue, target_df=dataframe
)

# Ensure predictions of batch algo match spmd
spmd_model = NearestNeighbors_SPMD(n_neighbors=2, algorithm="brute").fit(
local_dpt_X_train
)
batch_model = NearestNeighbors_Batch(n_neighbors=2, algorithm="brute").fit(X_train)
spmd_dists, spmd_indcs = spmd_model.kneighbors(local_dpt_X_train)
batch_dists, batch_indcs = batch_model.kneighbors(X_train)

_spmd_assert_allclose(spmd_indcs, batch_indcs)
_spmd_assert_allclose(spmd_dists, batch_dists)


@pytest.mark.skipif(
not _mpi_libs_and_gpu_available,
reason="GPU device and MPI libs required for test",
)
@pytest.mark.parametrize(
"dimensions", [{"n": 100, "m": 10, "k": 2}, {"n": 100000, "m": 100, "k": 100}]
)
@pytest.mark.parametrize(
"dataframe,queue",
get_dataframes_and_queues(dataframe_filter_="dpnp,dpctl", device_filter_="gpu"),
)
@pytest.mark.parametrize("dtype", [np.float32, np.float64])
@pytest.mark.mpi
def test_knnsearch_spmd_synthetic(
dimensions,
dataframe,
queue,
dtype,
):
if dimensions["n"] > 10000 and dtype == np.float32:
pytest.skip("Skipping large float32 test due to expected precision issues")

# Import spmd and batch algo
from sklearnex.neighbors import NearestNeighbors as NearestNeighbors_Batch
from sklearnex.spmd.neighbors import NearestNeighbors as NearestNeighbors_SPMD

# Generate data and convert to dataframe
X_train = _generate_statistic_data(dimensions["n"], dimensions["m"], dtype=dtype)

local_dpt_X_train = _convert_to_dataframe(
_get_local_tensor(X_train), sycl_queue=queue, target_df=dataframe
)

# Ensure search results of batch algo match spmd
spmd_model = NearestNeighbors_SPMD(
n_neighbors=dimensions["k"], algorithm="brute"
).fit(local_dpt_X_train)
batch_model = NearestNeighbors_Batch(
n_neighbors=dimensions["k"], algorithm="brute"
).fit(X_train)
spmd_dists, spmd_indcs = spmd_model.kneighbors(local_dpt_X_train)
batch_dists, batch_indcs = batch_model.kneighbors(X_train)

tol = 0.005 if dtype == np.float32 else 1e-6
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yikes on this float32 setting. Any info on it? Especially because there is a skip associated with it above (meaning an even worse value occurs?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's true, and good observation. It's pretty tricky because this assert all close functionality will fail even if a single element is not within the threshold, hence why it is so loose - it would be nice if there was some sort of customization of that.

It's possible that we could still run the indices check for this case, but distances are more fragile.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not the only place in spmd test scope where drastically low thresholds are needed to support float32 tests passing though

_spmd_assert_allclose(spmd_indcs, batch_indcs)
_spmd_assert_allclose(spmd_dists, batch_dists, rtol=tol, atol=tol)


@pytest.mark.skipif(
not _mpi_libs_and_gpu_available,
reason="GPU device and MPI libs required for test",
)
@pytest.mark.parametrize(
"dataframe,queue",
get_dataframes_and_queues(dataframe_filter_="dpnp,dpctl", device_filter_="gpu"),
)
@pytest.mark.mpi
def test_knn_spmd_empty_kneighbors(dataframe, queue):
# Import spmd and batch algo
from sklearnex.neighbors import NearestNeighbors as NearestNeighbors_Batch
from sklearnex.spmd.neighbors import (
KNeighborsClassifier,
KNeighborsRegressor,
NearestNeighbors,
)

# Create gold data and convert to dataframe
X_train = np.array(
[[-1, -1], [-2, -1], [-3, -2], [1, 1], [2, 1], [3, 2], [10, 10], [9, 9]]
)
y_train = np.array([0, 1, 0, 1, 0, 1, 0, 1])
local_dpt_X_train = _convert_to_dataframe(
_get_local_tensor(X_train), sycl_queue=queue, target_df=dataframe
)
local_dpt_y_train = _convert_to_dataframe(
_get_local_tensor(y_train), sycl_queue=queue, target_df=dataframe
)

# Run each estimator without an input to kneighbors() and ensure functionality and equivalence
for CurrentEstimator in [KNeighborsClassifier, KNeighborsRegressor, NearestNeighbors]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see why this was done, but is a bit painful to analyze if there is a failure. Ideally it would be parametrized over, but really isn't possible by the way it is imported. Would be worth adding some sort of message to figure out which is the CurrentEstimator (rather than having to dig through the pytest log for the CurrentEstimator current value was).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah - I am pretty open to ideas on this one. The loop is great because I run the exact same test on all 3 classes, but you are correct that analysis on a fail is trickier. I think scikit-learn may do things like this, I could check how they do it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess its easier there because in sklearn they import at top of file

spmd_model = CurrentEstimator(n_neighbors=1, algorithm="brute").fit(
local_dpt_X_train, local_dpt_y_train
)
batch_model = NearestNeighbors_Batch(n_neighbors=1, algorithm="brute").fit(
X_train, y_train
)
spmd_dists, spmd_indcs = spmd_model.kneighbors()
batch_dists, batch_indcs = batch_model.kneighbors()

_spmd_assert_allclose(spmd_indcs, batch_indcs)
_spmd_assert_allclose(spmd_dists, batch_dists)
Loading