diff --git a/feature_engine/encoding/one_hot.py b/feature_engine/encoding/one_hot.py index 68a219790..49149698f 100644 --- a/feature_engine/encoding/one_hot.py +++ b/feature_engine/encoding/one_hot.py @@ -1,7 +1,7 @@ # Authors: Soledad Galli # License: BSD 3 clause -from typing import List, Optional, Union +from typing import Dict, Iterable, List, Optional, Union import numpy as np import pandas as pd @@ -45,6 +45,9 @@ class OneHotEncoder(CategoricalInitMixin, CategoricalMethodsMixin): majority of the observations in the dataset. This behaviour can be specified with the parameter `top_categories`. + OneHotEncoder can also encode a user defined subset of categories for each variable. + See parameter `custom_categories`. + The encoder will encode only categorical variables by default (type 'object' or 'categorical'). You can pass a list of variables to encode. Alternatively, the encoder will find and encode all categorical variables (type 'object' or @@ -82,6 +85,17 @@ class OneHotEncoder(CategoricalInitMixin, CategoricalMethodsMixin): value 0 in all the binary variables. Note that if `top_categories` is not None, the parameter `drop_last` is ignored. + If `top_categories` is being used, `custom_categories` must equal None. + + custom_categories: dict, default=None + Accepts a dictionary in which the keys are the variables that the use would like + to transform. The keys must match the values of `variables` param. + + The dicitonary values are lists of the categories for each selected variable + that are to be one-hot encoded. + + If `custom_categories` is being used, `top_categories` must equal None. + drop_last: boolean, default=False Only used if `top_categories = None`. It indicates whether to create dummy variables for all the categories (k dummies), or if set to `True`, it will @@ -160,9 +174,12 @@ class OneHotEncoder(CategoricalInitMixin, CategoricalMethodsMixin): def __init__( self, top_categories: Optional[int] = None, + custom_categories: Optional[Dict] = None, drop_last: bool = False, drop_last_binary: bool = False, - variables: Union[None, int, str, List[Union[str, int]]] = None, + variables: Union[ + None, int, str, List[Union[str, int]], Iterable[Union[str, int]] + ] = None, ignore_format: bool = False, ) -> None: @@ -174,6 +191,40 @@ def __init__( f"Got {top_categories} instead" ) + if top_categories is not None and custom_categories is not None: + raise ValueError( + "Both top_cagetories and custom_categories have values. " + "Only one of the two parameters may be used at a time. " + f"Got {top_categories} for top_categories. " + f"Got {custom_categories} for custom_categories." + ) + + if custom_categories is not None and not isinstance(custom_categories, dict): + raise ValueError( + "custom_categories must be a dictionary. " + f"Got {custom_categories} instead." + ) + + if custom_categories: + # check that all values of custom_categories key-value pairs are lists + non_lists_custom_categories = [ + val for val in custom_categories.values() if not isinstance(val, list) + ] + if len(non_lists_custom_categories) > 0: + raise ValueError( + "custom_categories must be a dictionary that has lists as " + f"its values. Got {custom_categories} instead." + ) + + # check that custom_categories variable match variables + cust_cat_vars = sorted(list(custom_categories.keys())) + if cust_cat_vars != sorted(variables): + raise ValueError( + "Variables listed in custom_categories must match features " + f"listed in the variables param. Got {cust_cat_vars} for " + f"custom_categories and {sorted(variables)} for variables." + ) + if not isinstance(drop_last, bool): raise ValueError( f"drop_last takes only True or False. Got {drop_last} instead." @@ -187,6 +238,7 @@ def __init__( super().__init__(variables, ignore_format) self.top_categories = top_categories + self.custom_categories = custom_categories self.drop_last = drop_last self.drop_last_binary = drop_last_binary @@ -212,6 +264,9 @@ def fit(self, X: pd.DataFrame, y: Optional[pd.Series] = None): variables_ = self._check_or_select_variables(X) _check_contains_na(X, variables_) + if self.custom_categories: + self._check_custom_categories_in_dataset(X) + self.encoder_dict_ = {} for var in variables_: @@ -226,6 +281,9 @@ def fit(self, X: pd.DataFrame, y: Optional[pd.Series] = None): .head(self.top_categories) .index ] + # assign custom_categories to encoder_dict_ + elif self.custom_categories: + self.encoder_dict_ = self.custom_categories else: category_ls = list(X[var].unique()) @@ -302,3 +360,17 @@ def _add_new_feature_names(self, feature_names) -> List: feature_names = [f for f in feature_names if f not in self.variables_] return feature_names + + def _check_custom_categories_in_dataset(self, X: pd.DataFrame) -> None: + """ + Raise an error if user entered categories in custom_categories that do + not exist within dataset. + + """ + for var, categories in self.custom_categories.items(): + unique_values = set(X[var].unique()) + if not set(categories).issubset(unique_values): + raise ValueError( + f"All categorical values provided in {var} of custom_categories " + "do not exist within the dataset." + ) diff --git a/tests/test_encoding/test_onehot_encoder.py b/tests/test_encoding/test_onehot_encoder.py index 42448be12..1e5d2dfae 100644 --- a/tests/test_encoding/test_onehot_encoder.py +++ b/tests/test_encoding/test_onehot_encoder.py @@ -202,6 +202,37 @@ def test_raises_error_if_df_contains_na(df_enc_big, df_enc_big_na): assert str(record.value) == msg +def test_raises_error_using_top_and_custom_categories(df_enc): + with pytest.raises(ValueError): + OneHotEncoder( + top_categories=1, + custom_categories={"var_A": ["C"]}, + ) + + +@pytest.mark.parametrize("_custom_cat", [3, "hamberguesa", True, [3, 5, 7]]) +def test_raises_error_not_permitted_custom_categories(_custom_cat): + with pytest.raises(ValueError): + OneHotEncoder( + custom_categories=_custom_cat, + ) + + +@pytest.mark.parametrize( + "_custom_cat", + [ + {"var_A": ["ZZ", "YY"], "var_B": 3}, + {"var_M": "test", "var_S": ["T", "U"]}, + ], +) +def test_raises_error_non_permitted_custom_category_pair_values(_custom_cat): + with pytest.raises(ValueError): + OneHotEncoder( + custom_categories=_custom_cat, + variables=list(_custom_cat.keys()), + ) + + def test_encode_numerical_variables(df_enc_numeric): encoder = OneHotEncoder( top_categories=None, @@ -516,3 +547,62 @@ def test_inverse_transform_raises_not_implemented_error(df_enc_binary): enc = OneHotEncoder().fit(df_enc_binary) with pytest.raises(NotImplementedError): enc.inverse_transform(df_enc_binary) + + +def test_error_when_custom_categories_values_do_not_exist(df_enc): + encoder = OneHotEncoder( + top_categories=None, + custom_categories={"var_A": ["A", "C"], "var_B": ["B", "X"]}, + variables=["var_A", "var_B"], + ) + with pytest.raises(ValueError): + encoder._check_custom_categories_in_dataset(df_enc) + + +def test_error_when_custom_categories_does_not_match_variables(): + with pytest.raises(ValueError): + OneHotEncoder( + custom_categories={"var_Q": ["A"], "var_Y": ["G", "H"]}, + variables=["var_Y", "var_B"], + ) + + +def test_encode_custom_categories(df_enc_big): + encoder = OneHotEncoder( + custom_categories={ + "var_A": ["A", "F", "G"], + "var_C": ["B", "F", "E"], + }, + variables=["var_A", "var_C"], + ) + X = encoder.fit_transform(df_enc_big).reset_index() + X = X.drop("index", axis=1) + + expected_results_head = { + "var_B": ["A", "A", "A", "A", "A", "A", "A", "A", "A", "A"], + "var_A_A": [1, 1, 1, 1, 1, 1, 0, 0, 0, 0], + "var_A_F": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0], + "var_A_G": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0], + "var_C_B": [0, 0, 0, 0, 1, 1, 1, 1, 1, 1], + "var_C_F": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0], + "var_C_E": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0], + } + expected_results_head_df = pd.DataFrame(expected_results_head) + + expected_results_tail = { + "var_B": ["E", "E", "F", "F", "G", "G", "G", "G", "G", "G"], + "var_A_A": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0], + "var_A_F": [0, 0, 1, 1, 0, 0, 0, 0, 0, 0], + "var_A_G": [0, 0, 0, 0, 1, 1, 1, 1, 1, 1], + "var_C_B": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0], + "var_C_F": [0, 0, 1, 1, 0, 0, 0, 0, 0, 0], + "var_C_E": [1, 1, 0, 0, 0, 0, 0, 0, 0, 0], + } + expected_results_tail_df = pd.DataFrame( + data=expected_results_tail, + index=range(30, 40), + ) + + # test transform outputs + pd.testing.assert_frame_equal(X.head(10), expected_results_head_df) + pd.testing.assert_frame_equal(X.tail(10), expected_results_tail_df)