diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e8fae9156d..8c95849141 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -430,6 +430,7 @@ jobs: - name: Set Environment Variables id: set-env run: | + python -c "import os; print(len(os.sched_getaffinity(0)))" echo "NO_DIST=1" >> "$GITHUB_ENV" # enable coverage report generation echo "COVERAGE_RCFILE=$(readlink -f .coveragerc)" >> "$GITHUB_ENV" diff --git a/daal4py/sklearn/_n_jobs_support.py b/daal4py/sklearn/_n_jobs_support.py index 78d9cf0208..7935053d22 100644 --- a/daal4py/sklearn/_n_jobs_support.py +++ b/daal4py/sklearn/_n_jobs_support.py @@ -19,53 +19,50 @@ import threading from functools import wraps from inspect import Parameter, signature -from multiprocessing import cpu_count from numbers import Integral from warnings import warn import threadpoolctl +from joblib import cpu_count -from daal4py import daalinit as set_n_threads -from daal4py import num_threads as get_n_threads +from daal4py import _get__daal_link_version__, daalinit, num_threads from ._utils import sklearn_check_version if sklearn_check_version("1.2"): from sklearn.utils._param_validation import validate_parameter_constraints +else: + def validate_parameter_constraints(n_jobs): + if n_jobs is not None and not isinstance(n_jobs, Integral): + raise TypeError( + f"n_jobs must be an instance of int, not {n_jobs.__class__.__name__}." + ) + + +class oneDALLibController(threadpoolctl.LibController): + user_api = "onedal" + internal_api = "onedal" + + filename_prefixes = ("libonedal_thread", "libonedal") + + def get_num_threads(self): + return num_threads() + + def set_num_threads(self, nthreads): + daalinit(nthreads) + + def get_version(self): + return _get__daal_link_version__() + + +threadpoolctl.register(oneDALLibController) # Note: getting controller in global scope of this module is required # to avoid overheads by its initialization per each function call threadpool_controller = threadpoolctl.ThreadpoolController() -def get_suggested_n_threads(n_cpus): - """ - Function to get `n_threads` limit - if `n_jobs` is set in upper parallelization context. - Usually, limit is equal to `n_logical_cpus` // `n_jobs`. - Returns None if limit is not set. - """ - n_threads_map = { - lib_ctl.internal_api: lib_ctl.get_num_threads() - for lib_ctl in threadpool_controller.lib_controllers - if lib_ctl.internal_api != "mkl" - } - # openBLAS is limited to 24, 64 or 128 threads by default - # depending on SW/HW configuration. - # thus, these numbers of threads from openBLAS are uninformative - if "openblas" in n_threads_map and n_threads_map["openblas"] in [24, 64, 128]: - del n_threads_map["openblas"] - # remove default values equal to n_cpus as uninformative - for backend in list(n_threads_map.keys()): - if n_threads_map[backend] == n_cpus: - del n_threads_map[backend] - if len(n_threads_map) > 0: - return min(n_threads_map.values()) - else: - return None - - def _run_with_n_jobs(method): """ Decorator for running of methods containing oneDAL kernels with 'n_jobs'. @@ -79,59 +76,46 @@ def _run_with_n_jobs(method): @wraps(method) def n_jobs_wrapper(self, *args, **kwargs): # threading parallel backend branch - if not isinstance(threading.current_thread(), threading._MainThread): - warn( - "'Threading' parallel backend is not supported by " - "Extension for Scikit-learn*. " - "Falling back to usage of all available threads." - ) - result = method(self, *args, **kwargs) - return result # multiprocess parallel backends branch # preemptive validation of n_jobs parameter is required # because '_run_with_n_jobs' decorator is applied on top of method # where validation takes place - if sklearn_check_version("1.2") and hasattr(self, "_parameter_constraints"): + if sklearn_check_version("1.2"): validate_parameter_constraints( parameter_constraints={"n_jobs": self._parameter_constraints["n_jobs"]}, params={"n_jobs": self.n_jobs}, caller_name=self.__class__.__name__, ) - # search for specified n_jobs - n_jobs = self.n_jobs - n_cpus = cpu_count() + else: + validate_parameter_constraints(self.n_jobs) + # receive n_threads limitation from upper parallelism context # using `threadpoolctl.ThreadpoolController` - n_threads = get_suggested_n_threads(n_cpus) # get real `n_jobs` number of threads for oneDAL # using sklearn rules and `n_threads` from upper parallelism context - if n_jobs is None or n_jobs == 0: - if n_threads is None: - # default branch with no setting for n_jobs - return method(self, *args, **kwargs) - else: - n_jobs = n_threads - elif n_jobs < 0: - if n_threads is None: - n_jobs = max(1, n_cpus + n_jobs + 1) - else: - n_jobs = max(1, n_threads + n_jobs + 1) - # branch with set n_jobs - old_n_threads = get_n_threads() - if n_jobs == old_n_threads: + + if self.n_jobs: + n_jobs = ( + self.n_jobs if self.n_jobs > 0 else max(1, cpu_count() + self.n_jobs + 1) + ) + elif self.n_jobs == 0: + # This is a small variation on joblib's equivalent error + raise ValueError("n_jobs == 0 has no meaning") + else: return method(self, *args, **kwargs) - try: + # n_jobs value is attempting to be set + if (old_n_threads := num_threads()) != n_jobs: logger = logging.getLogger("sklearnex") cl = self.__class__ logger.debug( f"{cl.__module__}.{cl.__name__}.{method.__name__}: " f"setting {n_jobs} threads (previous - {old_n_threads})" ) - set_n_threads(n_jobs) + with threadpool_controller.limit(limits=n_jobs, user_api="onedal"): + return method(self, *args, **kwargs) + else: return method(self, *args, **kwargs) - finally: - set_n_threads(old_n_threads) return n_jobs_wrapper @@ -185,6 +169,8 @@ def class_wrapper(original_class): ): parameter_constraints = original_class._parameter_constraints if "n_jobs" not in parameter_constraints: + # n_jobs = 0 is not allowed, but it is handled elsewhere + # This definition matches scikit-learn parameter_constraints["n_jobs"] = [Integral, None] @wraps(original_init) diff --git a/daal4py/sklearn/ensemble/AdaBoostClassifier.py b/daal4py/sklearn/ensemble/AdaBoostClassifier.py index c344bce5ba..a4c11867e1 100644 --- a/daal4py/sklearn/ensemble/AdaBoostClassifier.py +++ b/daal4py/sklearn/ensemble/AdaBoostClassifier.py @@ -34,6 +34,10 @@ @control_n_jobs(decorated_methods=["fit", "predict"]) class AdaBoostClassifier(ClassifierMixin, BaseEstimator): + + if sklearn_check_version("1.2"): + _parameter_constraints = {} + def __init__( self, split_criterion="gini", diff --git a/daal4py/sklearn/ensemble/GBTDAAL.py b/daal4py/sklearn/ensemble/GBTDAAL.py index 0a871564a7..51dfe705d5 100644 --- a/daal4py/sklearn/ensemble/GBTDAAL.py +++ b/daal4py/sklearn/ensemble/GBTDAAL.py @@ -34,6 +34,10 @@ class GBTDAALBase(BaseEstimator, d4p.mb.GBTDAALBaseModel): + + if sklearn_check_version("1.2"): + _parameter_constraints = {} + def __init__( self, split_method="inexact", diff --git a/sklearnex/tests/test_memory_usage.py b/sklearnex/tests/test_memory_usage.py index 9510ded22d..0f982493e7 100644 --- a/sklearnex/tests/test_memory_usage.py +++ b/sklearnex/tests/test_memory_usage.py @@ -114,7 +114,7 @@ def gen_functions(functions): data_shapes = [ pytest.param((1000, 100), id="(1000, 100)"), - pytest.param((2000, 50), id="(2000, 50)"), + pytest.param((2000, 40), id="(2000, 40)"), ] EXTRA_MEMORY_THRESHOLD = 0.15 diff --git a/sklearnex/tests/test_n_jobs_support.py b/sklearnex/tests/test_n_jobs_support.py index 45d63d8e90..1f96234bc9 100644 --- a/sklearnex/tests/test_n_jobs_support.py +++ b/sklearnex/tests/test_n_jobs_support.py @@ -16,19 +16,15 @@ import inspect import logging -from multiprocessing import cpu_count +import os import pytest +from joblib import cpu_count from sklearn.datasets import make_classification from sklearn.exceptions import NotFittedError +from threadpoolctl import threadpool_info -from sklearnex.tests.utils import ( - PATCHED_MODELS, - SPECIAL_INSTANCES, - call_method, - gen_dataset, - gen_models_info, -) +from sklearnex.tests.utils import PATCHED_MODELS, SPECIAL_INSTANCES, call_method _X, _Y = make_classification(n_samples=40, n_features=4, random_state=42) @@ -49,7 +45,7 @@ def _check_n_jobs_entry_in_logs(records, function_name, n_jobs): if f"{function_name}: setting {expected_n_jobs} threads" in rec: return True # False if n_jobs is set and not found in logs - return n_jobs is None + return n_jobs is None or expected_n_jobs == cpu_count() @pytest.mark.parametrize("estimator", {**PATCHED_MODELS, **SPECIAL_INSTANCES}.keys()) @@ -106,3 +102,31 @@ def test_n_jobs_support(estimator, n_jobs, caplog): messages = [msg.message for msg in caplog.records] assert _check_n_jobs_entry_in_logs(messages, method_name, n_jobs) + + +@pytest.mark.skipif( + not hasattr(os, "sched_setaffinity") or len(os.sched_getaffinity(0)) < 4, + reason="python CPU affinity control unavailable or too few threads", +) +@pytest.mark.parametrize("estimator", {**PATCHED_MODELS, **SPECIAL_INSTANCES}.keys()) +def test_n_jobs_affinity(estimator, caplog): + # verify that n_jobs 1) starts at default value of cpu_count + # 2) respects os.sched_setaffinity on supported machines + n_t = next(i for i in threadpool_info() if i["user_api"] == "onedal")["num_threads"] + + # get affinity mask of calling process + mask = os.sched_getaffinity(0) + # by default, oneDAL should match the number of threads made available to the sklearnex pytest suite + # This is currently disabled due to thread setting occurring in test_run_to_run_stability + # assert len(mask) == n_t + + try: + # use half of the available threads + newmask = set(list(mask)[: len(mask) // 2]) + os.sched_setaffinity(0, newmask) + # -2 is used as this forces n_jobs to be based on cpu_count and must value match in test + test_n_jobs_support(estimator, -2, caplog) + + finally: + # reset affinity mask no matter what + os.sched_setaffinity(0, mask)