Skip to content

Commit 2faa869

Browse files
committed
add missing data handler
1 parent ce3cf11 commit 2faa869

File tree

1 file changed

+95
-0
lines changed

1 file changed

+95
-0
lines changed
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
import numpy as np
2+
import pandas as pd
3+
import scipy.stats as stats
4+
from sklearn.impute import SimpleImputer
5+
from sklearn.linear_model import LogisticRegression
6+
from sklearn.preprocessing import LabelEncoder
7+
from fancyimpute import IterativeImputer # For MICE and EM
8+
import warnings
9+
10+
class MissingDataHandler:
11+
"""Detects missingness type (MCAR, MAR, MNAR) and applies automatic imputation."""
12+
13+
def __init__(self):
14+
self.imputers = {}
15+
16+
def detect_missingness(self, df: pd.DataFrame) -> dict:
17+
"""Detects missingness type for each column.
18+
19+
Args:
20+
df (pd.DataFrame): The input DataFrame.
21+
22+
Returns:
23+
dict: Dictionary mapping column names to detected missingness type.
24+
"""
25+
missingness = {}
26+
27+
for col in df.columns:
28+
missing_values = df[col].isna().sum()
29+
if missing_values == 0:
30+
continue # No missing values → Skip detection
31+
32+
# 1️⃣ Little's MCAR Test
33+
_, p_value = stats.chisquare(df[col].dropna().value_counts())
34+
if p_value > 0.05:
35+
missingness[col] = "MCAR"
36+
continue
37+
38+
# 2️⃣ Logistic Regression (MAR Detection)
39+
missing_mask = df[col].isna().astype(int)
40+
observed_data = df.drop(columns=[col]).fillna(df.mean())
41+
42+
model = LogisticRegression()
43+
model.fit(observed_data, missing_mask)
44+
if model.score(observed_data, missing_mask) > 0.6: # Predictable missingness → MAR
45+
missingness[col] = "MAR"
46+
continue
47+
48+
# 3️⃣ Distributional Check (MNAR Detection)
49+
observed_values = df[col].dropna()
50+
missing_rows = df[col].isna()
51+
if missing_rows.sum() > 0:
52+
missing_values = df.loc[missing_rows, df.columns != col].mean(axis=1)
53+
_, p_value = stats.ks_2samp(observed_values, missing_values)
54+
if p_value < 0.05:
55+
missingness[col] = "MNAR"
56+
continue
57+
58+
missingness[col] = "MAR" # Default to MAR if uncertain
59+
60+
return missingness
61+
62+
def apply_imputation(self, df: pd.DataFrame, missingness: dict) -> pd.DataFrame:
63+
"""Automatically applies imputation based on missingness type.
64+
65+
Args:
66+
df (pd.DataFrame): Input data with missing values.
67+
missingness (dict): Mapping of column names to missingness type.
68+
69+
Returns:
70+
pd.DataFrame: Data with imputed values.
71+
"""
72+
df = df.copy()
73+
74+
for col, mtype in missingness.items():
75+
if df[col].dtype == "object":
76+
# Categorical Data
77+
if mtype == "MCAR":
78+
df[col].fillna(df[col].mode()[0], inplace=True) # Mode Imputation
79+
elif mtype == "MAR":
80+
encoder = LabelEncoder()
81+
df[col] = encoder.fit_transform(df[col].astype(str))
82+
df[col] = IterativeImputer().fit_transform(df[[col]]) # Classification-based
83+
elif mtype == "MNAR":
84+
df[col].fillna("Missing", inplace=True) # Add "Missing" Category
85+
86+
else:
87+
# Numerical Data
88+
if mtype == "MCAR":
89+
df[col] = SimpleImputer(strategy="mean").fit_transform(df[[col]])
90+
elif mtype == "MAR":
91+
df[col] = IterativeImputer().fit_transform(df[[col]]) # Regression-based
92+
elif mtype == "MNAR":
93+
df[col] = IterativeImputer().fit_transform(df[[col]]) # EM Algorithm
94+
95+
return df

0 commit comments

Comments
 (0)