Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 31 additions & 17 deletions massbalancemachine/data_processing/Dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ class Dataset:
region_id (str): The region ID, for saving the files accordingly and eventually downloading them if needed
data_dir (str): Path to the directory containing the raw data, and save intermediate results
RGIIds (pd.Series): Series of RGI IDs from the data
output_format (str): csv or parquet
months_tail_pad (list of str): Months to pad the start of the hydrological year
months_head_pad (list of str): Months to pad the end of the hydrological year
"""
Expand All @@ -70,6 +71,7 @@ def __init__(
region_name: str,
region_id: int,
data_path: str,
output_format: str = "csv",
months_tail_pad=None, #: List[str] = ['aug_', 'sep_'], # before 'oct'
months_head_pad=None, #: List[str] = ['oct_'], # after 'sep'
):
Expand All @@ -81,7 +83,8 @@ def __init__(
self.RGIIds = self.data["RGIId"]
if not os.path.isdir(self.data_dir):
os.makedirs(self.data_dir, exist_ok=True)

assert output_format in ["csv", "parquet"], "format must be csv or parquet"
self.output_format = output_format
# Padding to allow for flexible month ranges (customize freely)
assert (months_head_pad is None) == (
months_tail_pad is None
Expand All @@ -101,7 +104,9 @@ def get_topo_features(self, vois: list[str], custom_working_dir: str = "") -> No
vois (list[str]): A string containing the topographical variables of interest
custom_working_dir (str, optional): The path to the custom working directory for OGGM data. Default to ''
"""
output_fname = self._get_output_filename("topographical_features")
output_fname = self._get_output_filename(
"topographical_features", self.output_format
)
self.data = get_topographical_features(
self.data, output_fname, vois, self.RGIIds, custom_working_dir, self.cfg
)
Expand All @@ -124,7 +129,7 @@ def get_climate_features(
change_units (bool, optional): A boolean indicating whether to change the units of the climate data. Default to False.
smoothing_vois (dict, optional): A dictionary containing the variables of interest for smoothing climate artifacts. Default to None.
"""
output_fname = self._get_output_filename("climate_features")
output_fname = self._get_output_filename("climate_features", self.output_format)

smoothing_vois = smoothing_vois or {} # Safely default to empty dict
vois_climate = smoothing_vois.get("vois_climate")
Expand Down Expand Up @@ -207,9 +212,14 @@ def convert_to_monthly(
"""
if meta_data_columns is None:
meta_data_columns = self.cfg.metaData
output_fname = self._get_output_filename("monthly_dataset")
output_fname = self._get_output_filename("monthly_dataset", self.output_format)
self.data = transform_to_monthly(
self.data, meta_data_columns, vois_climate, vois_topographical, output_fname
self.data,
meta_data_columns,
vois_climate,
vois_topographical,
output_fname,
self.output_format,
)

def get_glacier_mask(
Expand Down Expand Up @@ -254,17 +264,19 @@ def create_glacier_grid_RGI(self, custom_working_dir: str = "") -> pd.DataFrame:
df_grid = create_glacier_grid_RGI(ds, years, glacier_indices, gdir, rgi_gl)
return df_grid

def _get_output_filename(self, feature_type: str) -> str:
def _get_output_filename(self, feature_type: str, output_format: str) -> str:
"""
Generates the output filename for a given feature type.

Args:
feature_type (str): The type of feature (e.g., "topographical_features", "climate_features", "monthly")

format : csv or parquet
Returns:
str: The full path to the output file
"""
return os.path.join(self.data_dir, f"{self.region}_{feature_type}.csv")
return os.path.join(
self.data_dir, f"{self.region}_{feature_type}.{output_format}"
)

def _copy_padded_month_columns(
self, df: pd.DataFrame, prefixes=("pcsr",), overwrite: bool = False
Expand Down Expand Up @@ -380,7 +392,6 @@ def __init__(
self.metadata = metadata
self.metadataColumns = metadataColumns or self.cfg.metaData
self.targets = targets

assert len(self.features) > 0, "The features variable is empty."

_, self.month_pos = _rebuild_month_index(months_head_pad, months_tail_pad)
Expand All @@ -391,10 +402,8 @@ def __init__(
for i in range(len(self.metadata))
]
)
self.uniqueID = np.unique(self.ID)
self.maxConcatNb = max(
[len(np.argwhere(self.ID == id)[:, 0]) for id in self.uniqueID]
)
self.uniqueID, counts = np.unique(self.ID, return_counts=True)
self.maxConcatNb = counts.max()
self.nbFeatures = self.features.shape[1]
self.nbMetadata = self.metadata.shape[1]
self.norm = Normalizer({k: cfg.bnds[k] for k in cfg.featureColumns})
Expand All @@ -416,14 +425,19 @@ def mapSplitsToDataset(
corresponding indices the cross validation should use according to
the input splits variable.
"""
# Precompute the mapping of unique IDs to indices
uniqueID_to_indices = {
uid: np.where(self.uniqueID == uid)[0] for uid in self.uniqueID
}
ret = []
for split in splits:
t = []
for e in split:
uniqueSelectedId = np.unique(self.ID[e])
ind = np.argwhere(self.uniqueID[None, :] == uniqueSelectedId[:, None])[
:, 1
]
uniqueSelectedId = np.unique(self.ID[e]) # Get the unique selected IDs
# Use the precomputed mapping for fast lookups
ind = np.concatenate(
[uniqueID_to_indices[uid] for uid in uniqueSelectedId]
)
assert all(uniqueSelectedId == self.uniqueID[ind])
t.append(ind)
ret.append(tuple(t))
Expand Down
11 changes: 8 additions & 3 deletions massbalancemachine/data_processing/transform_to_monthly.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def transform_to_monthly(
vois_climate: "list[str]",
vois_topographical: "list[str]",
output_fname: str,
output_format: str
) -> pd.DataFrame:
"""
Converts the DataFrame to a monthly format based on climate-related columns.
Expand Down Expand Up @@ -51,9 +52,13 @@ def transform_to_monthly(

# Create the final dataframe with the new exploded climate data
result_df = _create_result_dataframe(df_exploded, column_names, vois_climate)

result_df.to_csv(output_fname, index=False)


if output_format == 'csv':
result_df.to_csv(output_fname, index=False)
elif output_format == 'parquet':
result_df.to_parquet(output_fname, index=False)
else :
print('output format must be csv or parquet')
return result_df


Expand Down
57 changes: 44 additions & 13 deletions massbalancemachine/dataloader/DataLoader.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,18 @@ def __init__(
self.meta_data_columns = meta_data_columns or cfg.metaData

def set_train_test_split(
self, *, test_size: float = None, type_fold: str = "group-meas-id"
self,
*,
test_size: float = None,
type_fold: str = "group-meas-id",
random_state: bool = False,
) -> Tuple[Iterator[Any], Iterator[Any]]:
"""
Split the dataset into training and testing sets.

Args:
test_size (float): Proportion of the dataset to include in the test split.
type_fold (str): Type of splitting between train and test sets. Options are 'group-rgi', or 'group-meas-id'.
type_fold (str): Type of splitting between train and test sets. Options are 'group-rgi','group-c_region' or 'group-meas-id'.

Returns:
Tuple[Iterator[Any], Iterator[Any]]: Iterators for training and testing indices.
Expand All @@ -89,15 +93,25 @@ def set_train_test_split(
# I.e, one year of a stake is not split amongst test and train set

# From the data get the features, targets, and glacier IDS
X, y, glacier_ids, stake_meas_id = self._prepare_data_for_cv(
X, y, glacier_ids, stake_meas_id, regions = self._prepare_data_for_cv(
self.data, self.meta_data_columns
)
gss = GroupShuffleSplit(
n_splits=1, test_size=test_size, random_state=self.random_seed
)
groups = {"group-meas-id": stake_meas_id, "group-rgi": glacier_ids}.get(
type_fold
)
if random_state == False:
gss = GroupShuffleSplit(
n_splits=1,
test_size=test_size,
random_state=self.random_seed, # commenting this improve randomness
)
elif random_state == True:
gss = GroupShuffleSplit(
n_splits=1,
test_size=test_size,
)
groups = {
"group-meas-id": stake_meas_id,
"group-rgi": glacier_ids,
"group-c_region": regions,
}.get(type_fold)
train_indices, test_indices = next(gss.split(X, y, groups))

# Check that the intersection train and test ids is empty
Expand All @@ -108,9 +122,20 @@ def set_train_test_split(
# Make it iterators and set as an attribute of the class
self.train_indices = train_indices
self.test_indices = test_indices

return iter(self.train_indices), iter(self.test_indices)

def assign_train_test_indices(self, train_indices, test_indices, test_size):
"""
Dividing train and test ensemble based on subregion require to make the sampling N times and then choose the
train-test division closest to the 70-30 repartition. At each iteration the Dataloader object is redifined as well as
self.train_indices and self.test_indices meaning that the information in the Dataloader object are those of the last iterations
and not those of the train-test division chosen after comparing to the 70-30 repartition.
This function aims to correct this by reassigning the indices of the chosen sampling.
Comment on lines +129 to +133
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Dividing train and test ensemble based on subregion require to make the sampling N times and then choose the
train-test division closest to the 70-30 repartition. At each iteration the Dataloader object is redifined as well as
self.train_indices and self.test_indices meaning that the information in the Dataloader object are those of the last iterations
and not those of the train-test division chosen after comparing to the 70-30 repartition.
This function aims to correct this by reassigning the indices of the chosen sampling.
Assign `train_indices`, `test_indices` as well as `test_size` attributes of the object.
Note:
This can be useful when you divide the train and test ensembles based on subregion since this requires to make the sampling N times and then choose the
train-test division closest to the 70-30 repartition. At each iteration the Dataloader object is redifined as well as
self.train_indices and self.test_indices meaning that the information in the Dataloader object are those of the last iterations
and not those of the train-test division chosen after comparing to the 70-30 repartition.
This function aims to correct this by reassigning the indices of the chosen sampling.

"""
self.train_indices = train_indices
self.test_indices = test_indices
self.test_size = test_size

def set_custom_train_test_indices(
self, train_indices: np.array, test_indices: np.array
):
Expand Down Expand Up @@ -157,13 +182,13 @@ def get_cv_split(
train_data = self._get_train_data()

# From the training data get the features, targets, and glacier IDS
X, y, glacier_ids, stake_meas_id = self._prepare_data_for_cv(
X, y, glacier_ids, stake_meas_id, regions = self._prepare_data_for_cv(
train_data, self.meta_data_columns
)

# Create the cross validation splits
splits = self._create_group_kfold_splits(
X, y, glacier_ids, stake_meas_id, type_fold
X, y, glacier_ids, stake_meas_id, regions, type_fold
)
self.cv_split = splits

Expand Down Expand Up @@ -239,14 +264,19 @@ def _prepare_data_for_cv(
y = train_data["POINT_BALANCE"]
glacier_ids = train_data["RGIId"].values
stake_meas_id = train_data["ID"].values # unique value per stake measurement
return X, y, glacier_ids, stake_meas_id
try:
regions = train_data["C_REGION"].values
except:
regions = type(np.array([]))
Comment on lines +267 to +270
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More Pythonic than a try/except which can hide hidden bugs

Suggested change
try:
regions = train_data["C_REGION"].values
except:
regions = type(np.array([]))
regions = train_data["C_REGION"].values if "C_REGION" in train_data.columns else np.array([])

return X, y, glacier_ids, stake_meas_id, regions

def _create_group_kfold_splits(
self,
X: pd.DataFrame,
y: pd.Series,
glacier_ids: np.ndarray,
stake_meas_id: np.ndarray,
regions: np.ndarray,
type_fold: str,
) -> List[Tuple[np.ndarray, np.ndarray]]:
"""
Expand All @@ -268,6 +298,7 @@ def _create_group_kfold_splits(
fold_types = {
"group-rgi": (GroupKFold, glacier_ids),
"group-meas-id": (GroupKFold, stake_meas_id),
"group-c_region": (GroupKFold, regions),
}

FoldClass, groups = fold_types.get(type_fold, (KFold, None))
Expand Down