@@ -25,8 +25,6 @@ class Harmonizer:
2525 ----------
2626 min_common_columns : int
2727 Minimum number of common columns required for vertical merge (default is 1).
28- similarity_threshold : float
29- Similarity threshold to consider for vertical merge (default is 0.8).
3028 nan_threshold : float
3129 Percentage threshold of ``NaN`` values to drop columns (default is 1.0).
3230 sample_frac : float or ``None``
@@ -58,7 +56,6 @@ class Harmonizer:
5856 """
5957 def __init__ (self ,
6058 min_common_columns : int = 1 ,
61- similarity_threshold : float = 1 ,
6259 nan_threshold : float = 1.0 ,
6360 sample_frac : Optional [float ] = None ,
6461 column_mapping : Optional [Union [Type [Enum ], Dict [str , Dict [str , str ]], str , Path ]] = None ,
@@ -77,7 +74,6 @@ def __init__(self,
7774 Initialize the Harmonizer class with default parameters.
7875 """
7976 self .min_common_columns = min_common_columns
80- self .similarity_threshold = similarity_threshold
8177 self .nan_threshold = nan_threshold
8278 self .sample_frac = sample_frac
8379 self .column_mapping = column_mapping
@@ -100,12 +96,7 @@ def __init__(self,
10096 def min_common_columns (self ) -> int :
10197 """Get the minimum number of common columns required for vertical merge."""
10298 return self ._min_common_columns
103-
104- @property
105- def similarity_threshold (self ) -> float :
106- """Get the similarity threshold for vertical merge."""
107- return self ._similarity_threshold
108-
99+
109100 @property
110101 def nan_threshold (self ) -> float :
111102 """Get the NaN threshold for column dropping."""
@@ -174,13 +165,6 @@ def min_common_columns(self, value: int):
174165 raise ValueError ("min_common_columns must be a non-negative integer" )
175166 self ._min_common_columns = value
176167
177- @similarity_threshold .setter
178- def similarity_threshold (self , value : float ):
179- """Set the similarity threshold for vertical merge."""
180- if not isinstance (value , (int , float )) or not 0 <= value <= 1 :
181- raise ValueError ("similarity_threshold must be a float between 0 and 1" )
182- self ._similarity_threshold = float (value )
183-
184168 @nan_threshold .setter
185169 def nan_threshold (self , value : float ):
186170 """Set the NaN threshold for column dropping."""
@@ -257,14 +241,20 @@ def extra_cols(self, value: List[str]):
257241 raise ValueError ("extra_cols must be a list of strings" )
258242 self ._extra_cols = value
259243
260- def s4h_vertical_merge (self , ddfs : List [dd .DataFrame ]) -> List [dd .DataFrame ]:
244+ def s4h_vertical_merge (self , ddfs : List [dd .DataFrame ], overlap_threshold : float = 1 , method : str = "union" ) -> List [dd .DataFrame ]:
261245 """
262246 Merge a list of `Dask <https://docs.dask.org>`_ DataFrames vertically using instance parameters.
263247
264248 Parameters
265249 ----------
266250 ddfs : list of `dask.dataframe.DataFrame <https://docs.dask.org/en/stable/generated/dask.dataframe.DataFrame.html>`_
267251 List of `Dask <https://docs.dask.org>`_ DataFrames to be merged.
252+ overlap_threshold : float, optional
253+ Overlap coefficient (Szymkiewicz–Simpson coefficient) threshold to consider for vertical merge (default is 1).
254+ method : str, optional
255+ Method to use for merging (default is "union").
256+ - "union": Merge all columns from all DataFrames, filling missing values with NaN.
257+ - "intersection": Merge only columns that are common to all DataFrames.
268258
269259 Returns
270260 -------
@@ -273,20 +263,28 @@ def s4h_vertical_merge(self, ddfs: List[dd.DataFrame]) -> List[dd.DataFrame]:
273263
274264 Notes
275265 -----
276- - DataFrames are grouped and merged if they share at least ``min_common_columns`` columns and their column similarity is above ``similarity_threshold ``.
266+ - DataFrames are grouped and merged if they share at least ``min_common_columns`` columns and their column overlap coefficient is above ``overlap_threshold ``.
277267 - Only columns with matching data types are considered compatible for merging.
278268 """
279269 if not ddfs :
280270 return []
281271
272+ if not isinstance (overlap_threshold , (int , float )) or not 0 <= overlap_threshold <= 1 :
273+ raise ValueError ("overlap_threshold must be a float between 0 and 1" )
274+
282275 groups = []
283276 used_indices = set ()
284277
285278 for i , df1 in enumerate (tqdm (ddfs , desc = "Grouping DataFrames" )):
286279 if i in used_indices :
287280 continue
288281
289- cols1 = set (df1 .columns )
282+ cols1 = set (
283+ df1
284+ .columns
285+ .str .upper ()
286+ .str .strip ()
287+ )
290288 dtypes1 = {col : str (df1 [col ].dtype ) for col in df1 .columns }
291289 current_group = [i ]
292290 used_indices .add (i )
@@ -296,12 +294,18 @@ def s4h_vertical_merge(self, ddfs: List[dd.DataFrame]) -> List[dd.DataFrame]:
296294 if j_actual in used_indices :
297295 continue
298296
299- cols2 = set (df2 .columns )
297+ cols2 = set (
298+ df2
299+ .columns
300+ .str .upper ()
301+ .str .strip ()
302+ )
300303 common_cols = cols1 & cols2
301- similarity = len (common_cols ) / max (len (cols1 ), len (cols2 ))
304+
305+ overlap = len (common_cols ) / min (len (cols1 ), len (cols2 )) if min (len (cols1 ), len (cols2 )) > 0 else 0
302306
303307 if (len (common_cols ) >= self .min_common_columns and
304- similarity >= self . similarity_threshold ):
308+ overlap >= overlap_threshold ):
305309
306310 compatible = True
307311 for col in common_cols :
@@ -325,16 +329,19 @@ def s4h_vertical_merge(self, ddfs: List[dd.DataFrame]) -> List[dd.DataFrame]:
325329 merged_dfs .append (ddfs [group_indices [0 ]])
326330 else :
327331 group_dfs = [ddfs [i ] for i in group_indices ]
328- common_cols = set (group_dfs [0 ].columns )
329- for df in group_dfs [1 :]:
330- common_cols .intersection_update (df .columns )
331-
332- aligned_dfs = []
333- for df in group_dfs :
334- common_cols_ordered = [col for col in df .columns if col in common_cols ]
335- other_cols = [col for col in df .columns if col not in common_cols ]
336- aligned_dfs .append (df [common_cols_ordered + other_cols ])
337-
332+ if method == "intersection" :
333+ common_cols = set (group_dfs [0 ].columns )
334+ for df in group_dfs [1 :]:
335+ common_cols .intersection_update (df .columns )
336+ aligned_dfs = [df [list (common_cols )] for df in group_dfs ]
337+ elif method == "union" :
338+ all_cols = set ()
339+ for df in group_dfs :
340+ all_cols .update (df .columns )
341+ all_cols = list (all_cols )
342+ aligned_dfs = [df .reindex (columns = all_cols ) for df in group_dfs ]
343+ else :
344+ raise ValueError ("method must be 'union' or 'intersection'" )
338345 merged_df = dd .concat (aligned_dfs , axis = 0 , ignore_index = True )
339346 merged_dfs .append (merged_df )
340347 if len (merged_dfs ) > 1 :
0 commit comments