@@ -279,12 +279,8 @@ def s4h_vertical_merge(self, ddfs: List[dd.DataFrame], overlap_threshold: float
279279 if i in used_indices :
280280 continue
281281
282- cols1 = set (
283- df1
284- .columns
285- .str .upper ()
286- .str .strip ()
287- )
282+ df1 .columns = df1 .columns .str .upper ().str .strip ()
283+ cols1 = set (df1 .columns )
288284 dtypes1 = {col : str (df1 [col ].dtype ) for col in df1 .columns }
289285 current_group = [i ]
290286 used_indices .add (i )
@@ -294,12 +290,8 @@ def s4h_vertical_merge(self, ddfs: List[dd.DataFrame], overlap_threshold: float
294290 if j_actual in used_indices :
295291 continue
296292
297- cols2 = set (
298- df2
299- .columns
300- .str .upper ()
301- .str .strip ()
302- )
293+ df2 .columns = df2 .columns .str .upper ().str .strip ()
294+ cols2 = set (df2 .columns )
303295 common_cols = cols1 & cols2
304296
305297 overlap = len (common_cols ) / min (len (cols1 ), len (cols2 )) if min (len (cols1 ), len (cols2 )) > 0 else 0
@@ -377,6 +369,8 @@ def drop_nan_columns(self, ddf_or_ddfs: Union[dd.DataFrame, List[dd.DataFrame]])
377369 raise ValueError ("Threshold must be between 0 and 1" )
378370
379371 def process_ddf (ddf ):
372+ ddf .columns = ddf .columns .str .upper ().str .strip ()
373+ #ddf = ddf.loc[:, ~ddf.columns.duplicated()]
380374 if self .sample_frac is not None :
381375 if not 0 < self .sample_frac <= 1 :
382376 raise ValueError ("sample_frac must be between 0 and 1" )
@@ -432,6 +426,9 @@ def s4h_get_available_columns(df_or_dfs: Union[dd.DataFrame, pd.DataFrame, List[
432426 for df in df_or_dfs :
433427 if not isinstance (df , (dd .DataFrame , pd .DataFrame )):
434428 raise TypeError ("All elements in the list must be DataFrames (Dask or pandas)" )
429+ # Clean columns: uppercase, strip, deduplicate
430+ df .columns = df .columns .str .upper ().str .strip ()
431+ df = df .loc [:, ~ df .columns .duplicated ()]
435432 unique_columns .update (df .columns )
436433
437434 return sorted (unique_columns )
@@ -493,6 +490,9 @@ def get_country_mapping(mapping_obj, country):
493490
494491 def process_dataframe (df : dd .DataFrame , country : str ) -> dd .DataFrame :
495492 """Process a single dataframe"""
493+ # Clean columns: uppercase, strip, deduplicate
494+ df .columns = df .columns .str .upper ().str .strip ()
495+ df = df .loc [:, ~ df .columns .duplicated ()]
496496 # Get mappings for this country
497497 col_map = get_country_mapping (column_mapping , country )
498498 val_maps = get_country_mapping (value_mappings , country )
@@ -600,7 +600,9 @@ def s4h_data_selector(self, ddfs: List[dd.DataFrame]) -> List[dd.DataFrame]:
600600
601601 filtered_ddfs = []
602602 for ddf in ddfs :
603- ddf .columns = ddf .columns .str .upper ()
603+ # Clean columns: uppercase, strip, deduplicate
604+ ddf .columns = ddf .columns .str .upper ().str .strip ()
605+ ddf = ddf .loc [:, ~ ddf .columns .duplicated ()]
604606
605607 if self .key_col and self .key_val :
606608 if key_column_upper not in ddf .columns :
@@ -664,6 +666,9 @@ def s4h_join_data(self, ddfs: List[dd.DataFrame]) -> pd.DataFrame:
664666 Merged DataFrame with duplicate columns removed.
665667 """
666668 pandas_dfs = [df .compute () for df in ddfs ]
669+ # Clean columns: uppercase, strip, deduplicate
670+ pandas_dfs = [df .rename (columns = lambda x : str (x ).upper ().strip ()) for df in pandas_dfs ]
671+ pandas_dfs = [df .loc [:, ~ df .columns .duplicated ()] for df in pandas_dfs ]
667672
668673 def identify_primary_df (dfs ):
669674 candidates = []
0 commit comments