diff --git a/massbalancemachine/data_processing/Dataset.py b/massbalancemachine/data_processing/Dataset.py index ed838ccd..ffaef416 100644 --- a/massbalancemachine/data_processing/Dataset.py +++ b/massbalancemachine/data_processing/Dataset.py @@ -59,6 +59,7 @@ class Dataset: region_id (str): The region ID, for saving the files accordingly and eventually downloading them if needed data_dir (str): Path to the directory containing the raw data, and save intermediate results RGIIds (pd.Series): Series of RGI IDs from the data + output_format (str): csv or parquet months_tail_pad (list of str): Months to pad the start of the hydrological year months_head_pad (list of str): Months to pad the end of the hydrological year """ @@ -70,6 +71,7 @@ def __init__( region_name: str, region_id: int, data_path: str, + output_format: str = "csv", months_tail_pad=None, #: List[str] = ['aug_', 'sep_'], # before 'oct' months_head_pad=None, #: List[str] = ['oct_'], # after 'sep' ): @@ -81,7 +83,8 @@ def __init__( self.RGIIds = self.data["RGIId"] if not os.path.isdir(self.data_dir): os.makedirs(self.data_dir, exist_ok=True) - + assert output_format in ["csv", "parquet"], "format must be csv or parquet" + self.output_format = output_format # Padding to allow for flexible month ranges (customize freely) assert (months_head_pad is None) == ( months_tail_pad is None @@ -101,7 +104,9 @@ def get_topo_features(self, vois: list[str], custom_working_dir: str = "") -> No vois (list[str]): A string containing the topographical variables of interest custom_working_dir (str, optional): The path to the custom working directory for OGGM data. Default to '' """ - output_fname = self._get_output_filename("topographical_features") + output_fname = self._get_output_filename( + "topographical_features", self.output_format + ) self.data = get_topographical_features( self.data, output_fname, vois, self.RGIIds, custom_working_dir, self.cfg ) @@ -124,7 +129,7 @@ def get_climate_features( change_units (bool, optional): A boolean indicating whether to change the units of the climate data. Default to False. smoothing_vois (dict, optional): A dictionary containing the variables of interest for smoothing climate artifacts. Default to None. """ - output_fname = self._get_output_filename("climate_features") + output_fname = self._get_output_filename("climate_features", self.output_format) smoothing_vois = smoothing_vois or {} # Safely default to empty dict vois_climate = smoothing_vois.get("vois_climate") @@ -207,9 +212,14 @@ def convert_to_monthly( """ if meta_data_columns is None: meta_data_columns = self.cfg.metaData - output_fname = self._get_output_filename("monthly_dataset") + output_fname = self._get_output_filename("monthly_dataset", self.output_format) self.data = transform_to_monthly( - self.data, meta_data_columns, vois_climate, vois_topographical, output_fname + self.data, + meta_data_columns, + vois_climate, + vois_topographical, + output_fname, + self.output_format, ) def get_glacier_mask( @@ -254,17 +264,19 @@ def create_glacier_grid_RGI(self, custom_working_dir: str = "") -> pd.DataFrame: df_grid = create_glacier_grid_RGI(ds, years, glacier_indices, gdir, rgi_gl) return df_grid - def _get_output_filename(self, feature_type: str) -> str: + def _get_output_filename(self, feature_type: str, output_format: str) -> str: """ Generates the output filename for a given feature type. Args: feature_type (str): The type of feature (e.g., "topographical_features", "climate_features", "monthly") - + format : csv or parquet Returns: str: The full path to the output file """ - return os.path.join(self.data_dir, f"{self.region}_{feature_type}.csv") + return os.path.join( + self.data_dir, f"{self.region}_{feature_type}.{output_format}" + ) def _copy_padded_month_columns( self, df: pd.DataFrame, prefixes=("pcsr",), overwrite: bool = False @@ -380,7 +392,6 @@ def __init__( self.metadata = metadata self.metadataColumns = metadataColumns or self.cfg.metaData self.targets = targets - assert len(self.features) > 0, "The features variable is empty." _, self.month_pos = _rebuild_month_index(months_head_pad, months_tail_pad) @@ -391,10 +402,8 @@ def __init__( for i in range(len(self.metadata)) ] ) - self.uniqueID = np.unique(self.ID) - self.maxConcatNb = max( - [len(np.argwhere(self.ID == id)[:, 0]) for id in self.uniqueID] - ) + self.uniqueID, counts = np.unique(self.ID, return_counts=True) + self.maxConcatNb = counts.max() self.nbFeatures = self.features.shape[1] self.nbMetadata = self.metadata.shape[1] self.norm = Normalizer({k: cfg.bnds[k] for k in cfg.featureColumns}) @@ -416,14 +425,19 @@ def mapSplitsToDataset( corresponding indices the cross validation should use according to the input splits variable. """ + # Precompute the mapping of unique IDs to indices + uniqueID_to_indices = { + uid: np.where(self.uniqueID == uid)[0] for uid in self.uniqueID + } ret = [] for split in splits: t = [] for e in split: - uniqueSelectedId = np.unique(self.ID[e]) - ind = np.argwhere(self.uniqueID[None, :] == uniqueSelectedId[:, None])[ - :, 1 - ] + uniqueSelectedId = np.unique(self.ID[e]) # Get the unique selected IDs + # Use the precomputed mapping for fast lookups + ind = np.concatenate( + [uniqueID_to_indices[uid] for uid in uniqueSelectedId] + ) assert all(uniqueSelectedId == self.uniqueID[ind]) t.append(ind) ret.append(tuple(t)) diff --git a/massbalancemachine/data_processing/transform_to_monthly.py b/massbalancemachine/data_processing/transform_to_monthly.py index c932a7ea..6554bcf7 100644 --- a/massbalancemachine/data_processing/transform_to_monthly.py +++ b/massbalancemachine/data_processing/transform_to_monthly.py @@ -18,6 +18,7 @@ def transform_to_monthly( vois_climate: "list[str]", vois_topographical: "list[str]", output_fname: str, + output_format: str ) -> pd.DataFrame: """ Converts the DataFrame to a monthly format based on climate-related columns. @@ -51,9 +52,13 @@ def transform_to_monthly( # Create the final dataframe with the new exploded climate data result_df = _create_result_dataframe(df_exploded, column_names, vois_climate) - - result_df.to_csv(output_fname, index=False) - + + if output_format == 'csv': + result_df.to_csv(output_fname, index=False) + elif output_format == 'parquet': + result_df.to_parquet(output_fname, index=False) + else : + print('output format must be csv or parquet') return result_df diff --git a/massbalancemachine/dataloader/DataLoader.py b/massbalancemachine/dataloader/DataLoader.py index 37a9bb0d..cadd312c 100644 --- a/massbalancemachine/dataloader/DataLoader.py +++ b/massbalancemachine/dataloader/DataLoader.py @@ -65,14 +65,18 @@ def __init__( self.meta_data_columns = meta_data_columns or cfg.metaData def set_train_test_split( - self, *, test_size: float = None, type_fold: str = "group-meas-id" + self, + *, + test_size: float = None, + type_fold: str = "group-meas-id", + random_state: bool = False, ) -> Tuple[Iterator[Any], Iterator[Any]]: """ Split the dataset into training and testing sets. Args: test_size (float): Proportion of the dataset to include in the test split. - type_fold (str): Type of splitting between train and test sets. Options are 'group-rgi', or 'group-meas-id'. + type_fold (str): Type of splitting between train and test sets. Options are 'group-rgi','group-c_region' or 'group-meas-id'. Returns: Tuple[Iterator[Any], Iterator[Any]]: Iterators for training and testing indices. @@ -89,15 +93,25 @@ def set_train_test_split( # I.e, one year of a stake is not split amongst test and train set # From the data get the features, targets, and glacier IDS - X, y, glacier_ids, stake_meas_id = self._prepare_data_for_cv( + X, y, glacier_ids, stake_meas_id, regions = self._prepare_data_for_cv( self.data, self.meta_data_columns ) - gss = GroupShuffleSplit( - n_splits=1, test_size=test_size, random_state=self.random_seed - ) - groups = {"group-meas-id": stake_meas_id, "group-rgi": glacier_ids}.get( - type_fold - ) + if random_state == False: + gss = GroupShuffleSplit( + n_splits=1, + test_size=test_size, + random_state=self.random_seed, # commenting this improve randomness + ) + elif random_state == True: + gss = GroupShuffleSplit( + n_splits=1, + test_size=test_size, + ) + groups = { + "group-meas-id": stake_meas_id, + "group-rgi": glacier_ids, + "group-c_region": regions, + }.get(type_fold) train_indices, test_indices = next(gss.split(X, y, groups)) # Check that the intersection train and test ids is empty @@ -108,9 +122,20 @@ def set_train_test_split( # Make it iterators and set as an attribute of the class self.train_indices = train_indices self.test_indices = test_indices - return iter(self.train_indices), iter(self.test_indices) + def assign_train_test_indices(self, train_indices, test_indices, test_size): + """ + Dividing train and test ensemble based on subregion require to make the sampling N times and then choose the + train-test division closest to the 70-30 repartition. At each iteration the Dataloader object is redifined as well as + self.train_indices and self.test_indices meaning that the information in the Dataloader object are those of the last iterations + and not those of the train-test division chosen after comparing to the 70-30 repartition. + This function aims to correct this by reassigning the indices of the chosen sampling. + """ + self.train_indices = train_indices + self.test_indices = test_indices + self.test_size = test_size + def set_custom_train_test_indices( self, train_indices: np.array, test_indices: np.array ): @@ -157,13 +182,13 @@ def get_cv_split( train_data = self._get_train_data() # From the training data get the features, targets, and glacier IDS - X, y, glacier_ids, stake_meas_id = self._prepare_data_for_cv( + X, y, glacier_ids, stake_meas_id, regions = self._prepare_data_for_cv( train_data, self.meta_data_columns ) # Create the cross validation splits splits = self._create_group_kfold_splits( - X, y, glacier_ids, stake_meas_id, type_fold + X, y, glacier_ids, stake_meas_id, regions, type_fold ) self.cv_split = splits @@ -239,7 +264,11 @@ def _prepare_data_for_cv( y = train_data["POINT_BALANCE"] glacier_ids = train_data["RGIId"].values stake_meas_id = train_data["ID"].values # unique value per stake measurement - return X, y, glacier_ids, stake_meas_id + try: + regions = train_data["C_REGION"].values + except: + regions = type(np.array([])) + return X, y, glacier_ids, stake_meas_id, regions def _create_group_kfold_splits( self, @@ -247,6 +276,7 @@ def _create_group_kfold_splits( y: pd.Series, glacier_ids: np.ndarray, stake_meas_id: np.ndarray, + regions: np.ndarray, type_fold: str, ) -> List[Tuple[np.ndarray, np.ndarray]]: """ @@ -268,6 +298,7 @@ def _create_group_kfold_splits( fold_types = { "group-rgi": (GroupKFold, glacier_ids), "group-meas-id": (GroupKFold, stake_meas_id), + "group-c_region": (GroupKFold, regions), } FoldClass, groups = fold_types.get(type_fold, (KFold, None))