diff --git a/examples/ensemble_attack/configs/experiment_config.yaml b/examples/ensemble_attack/configs/experiment_config.yaml index 3803d69d..73e25649 100644 --- a/examples/ensemble_attack/configs/experiment_config.yaml +++ b/examples/ensemble_attack/configs/experiment_config.yaml @@ -1,34 +1,35 @@ # Ensemble experiment configuration -# This config can be used to run both the Ensemble attack training (``run_attack.py``) and testing phases (``tets_attack_model.py``). -base_experiment_dir: examples/ensemble_attack/tabddpm_20k_experiment_data # Processed data, and experiment artifacts will be stored here -base_data_config_dir: examples/ensemble_attack/data_configs # Training and data type configs are saved under this directory +# This config can be used to run both the Ensemble attack training (``run_attack.py``) and testing phases (``test_attack_model.py``). +base_experiment_dir: /projects/midst-experiments/ensemble_attack/tabddpm_10k_experiment_data/10k/ # Processed data, and experiment artifacts will be stored under this directory. +base_data_config_dir: examples/ensemble_attack/data_configs # Training and data type configs are saved under this directory. -# Pipeline control +# Training Pipeline Control pipeline: run_data_processing: true # Set this to false if you have already saved the processed data run_shadow_model_training: true # Set this to false if shadow models are already trained and saved run_metaclassifier_training: true target_model: # This is only used for testing the attack on a real target model. - # This is for models trained on 20k data and generating 20k synthetic data - target_model_directory: /projects/midst-experiments/all_tabddpms/tabddpm_trained_with_20k/train/ + target_model_directory: /projects/midst-experiments/all_tabddpms/tabddpm_trained_with_10k/test/ target_model_id: 21 # Will be overridden per SLURM array task target_model_name: tabddpm_${target_model.target_model_id} - target_synthetic_data_path: ${target_model.target_model_directory}/${target_model.target_model_name}/synthetic_data/20k/20k.csv + target_synthetic_data_path: ${target_model.target_model_directory}/${target_model.target_model_name}/synthetic_data/10k/10k.csv challenge_data_path: ${target_model.target_model_directory}/${target_model.target_model_name}/challenge_with_id.csv challenge_label_path: ${target_model.target_model_directory}/${target_model.target_model_name}/challenge_label.csv - target_attack_artifact_dir: ${base_experiment_dir}/target_${target_model.target_model_id}_attack_artifacts/ - attack_probabilities_result_path: ${target_model.target_attack_artifact_dir}/attack_model_${target_model.target_model_id}_proba - target_shadow_models_output_path: ${target_model.target_attack_artifact_dir}/tabddpm_${target_model.target_model_id}_shadows_dir + target_shadow_models_output_path: ${base_experiment_dir}/test_all_targets # Sub-directory to store test shadows and results + attack_probabilities_result_path: ${target_model.target_shadow_models_output_path}/test_probabilities/attack_model_${target_model.target_model_id}_proba + attack_rmia_shadow_training_data_choice: "combined" # Options: "combined", "only_challenge", "only_train". This determines which data to use for training RMIA attack model in testing phase. # Data paths data_paths: - midst_data_path: /projects/midst-experiments/all_tabddpms # Used to collect the data - population_path: ${base_experiment_dir}/population_data # Path where the collected population data will be stored - processed_attack_data_path: ${base_experiment_dir}/attack_data # Path where the processed attack real train and evaluation data is stored - attack_evaluation_result_path: ${base_experiment_dir}/evaluation_results # Path where the attack evaluation results will be stored + midst_data_path: /projects/midst-experiments/all_tabddpms/ # Used to collect the data (input) as defined in data_processing_config + processed_base_data_dir: ${base_experiment_dir} # To save new processed data for training, or read from previously collected and processed data (testing phase). + population_path: ${data_paths.processed_base_data_dir}/population_data # Path where the collected population data will be stored (output/input) + processed_attack_data_path: ${data_paths.processed_base_data_dir}/attack_data # Path where the processed attack real train and evaluation data is stored (output/input) + attack_evaluation_result_path: ${base_experiment_dir}/evaluation_results # Path where the attack (train phase) evaluation results will be stored (output) + model_paths: metaclassifier_model_path: ${base_experiment_dir}/trained_models # Path where the trained metaclassifier model will be saved @@ -38,23 +39,25 @@ model_paths: data_processing_config: population_attack_data_types_to_collect: [ - "tabddpm_trained_with_20k", + "tabddpm_trained_with_10k", ] challenge_attack_data_types_to_collect: [ - "tabddpm_trained_with_20k", + "tabddpm_trained_with_10k", ] population_splits: ["train"] # Data splits to be collected for population data - challenge_splits: ["train"] # Data splits to be collected for challenge points + challenge_splits: ["train" , "test"] # Data splits to be collected for challenge points + original_population_data_path: /projects/midst-experiments/ensemble_attack/competition/population_data/ #Attack's collected population for DOMIAS # The column name in the data to be used for stratified splitting. column_to_stratify: "trans_type" # Attention: This value is not documented in the original codebase. - folder_ranges: #Specify folder ranges for any of the mentioned splits. - train: [[1, 20]] # Folders to be used for train data collection in the experiments + folder_ranges: # Specify folder ranges for any of the mentioned splits. + train: [[1, 21]] # Folders to be used for train data collection in the experiments + test: [[21, 31] , [31, 41]] # File names in MIDST data directories. single_table_train_data_file_name: "train_with_id.csv" multi_table_train_data_file_name: "trans.csv" challenge_data_file_name: "challenge_with_id.csv" - population_sample_size: 40000 # Population size is the total data that your attack has access to. + population_sample_size: 20000 # Population size is the total data that your attack has access to. # In experiments, this is sampled out of all the collected training data in case the available data # is more than this number. Note that, half of this data is actually used for training, the other half # is used for evaluation. For example, with 40k population size, only 20k is used for training the attack model. @@ -86,7 +89,7 @@ shadow_training: fine_tune_diffusion_iterations: 200000 # Original code: 200000 fine_tune_classifier_iterations: 20000 # Original code: 20000 pre_train_data_size: 60000 # Original code: 60000 - number_of_points_to_synthesize: 20000 # Number of synthetic data samples to be generated by shadow models. + number_of_points_to_synthesize: 10000 # Number of synthetic data samples to be generated by shadow models. # Original code: 20000 @@ -104,7 +107,7 @@ metaclassifier: meta_classifier_model_name: ${metaclassifier.model_type}_metaclassifier_model attack_success_computation: - target_ids_to_test: [21,22,23] # List of target model IDs to compute the attack success for. + target_ids_to_test: [21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40] # List of target model IDs to compute the attack success for. # General settings random_seed: 42 # Set to null for no seed, or an integer for a fixed seed diff --git a/examples/ensemble_attack/real_data_collection.py b/examples/ensemble_attack/real_data_collection.py index 264f71ae..4a8362b1 100644 --- a/examples/ensemble_attack/real_data_collection.py +++ b/examples/ensemble_attack/real_data_collection.py @@ -4,12 +4,14 @@ """ from enum import Enum +from logging import INFO from pathlib import Path import pandas as pd from omegaconf import DictConfig from midst_toolkit.attacks.ensemble.data_utils import load_dataframe, save_dataframe +from midst_toolkit.common.logger import log class AttackType(Enum): @@ -59,18 +61,15 @@ def collect_midst_attack_data( Args: attack_type: The attack setting. data_dir: The path where the data is stored. - data_split: Indicates if this is train, dev, or final data. + data_split: Indicates if this is train, dev, or final data. Note that, this is in fact + the name of the folder that contains model folders for data collection. For example, + f"{generation_name}_{i}" should be located under ``data_split`` folder. dataset: The dataset to be collected. Either "train" or "challenge". data_processing_config: Configuration dictionary containing data specific information. Returns: pd.DataFrame: The specified dataset in this setting. """ - assert data_split in [ - "train", - "dev", - "final", - ], "data_split should be one of 'train', 'dev', or 'final'." # `data_id` is the folder numbering of each training or challenge dataset, # and is defined with the provided config. data_id = expand_ranges(data_processing_config.folder_ranges[data_split]) @@ -80,7 +79,7 @@ def collect_midst_attack_data( generation_name = attack_type.value.split("_")[0] if dataset == "challenge": file_name = data_processing_config.challenge_data_file_name - else: # dataset == "train" + else: # Multi-table attacks have different file names. file_name = ( data_processing_config.multi_table_train_data_file_name @@ -108,13 +107,18 @@ def collect_midst_data( ) -> pd.DataFrame: """ Collect train or challenge data of the specified attack type from the provided data folders - in the MIDST competition. + in the MIDST competition. The data is going to be collected from all the folders specified + in ``data_splits`` argument under each attack type folder. For example, if ``data_splits`` + contains `train` and `dev`, the function collects data from both `train` and `dev` folders + under each attack type folder. For more information about the data collection structure, see + the implementation of ``collect_midst_attack_data`` function. Args: midst_data_input_dir: The path where the MIDST data folders are stored. attack_types: List of attack types for data collection. - data_splits: A list indicating the data split to be collected. - Could be any of train, dev, or final data splits. + data_splits: A list indicating the data split to be collected. This is a list of folder names + under each attack type folder where we collect model's data from. For example, it could + contain strings like `train`, `dev`, `final`, or `test` based on the directory structure. dataset: The dataset to be collected. Either `train` or `challenge`. data_processing_config: Configuration dictionary containing data paths and file names. @@ -133,7 +137,7 @@ def collect_midst_data( data_processing_config=data_processing_config, ) - population.append(df_real) + population.append(df_real) return pd.concat(population).drop_duplicates() @@ -142,26 +146,32 @@ def collect_population_data_ensemble( midst_data_input_dir: Path, data_processing_config: DictConfig, save_dir: Path, + original_repo_population: pd.DataFrame, population_splits: list[str] | None = None, challenge_splits: list[str] | None = None, ) -> pd.DataFrame: """ Collect the population data from the MIDST competition based on Ensemble Attack implementation. Returns real data population that consists of the train data of all the attacks - (black box and white box), and challenge points from `train`, `dev` and `final` of - "tabddpm_black_box" attack. The population data is saved in the provided path, - and returned as a dataframe. + (black box and white box) as specified in ``data_processing_config.population_attack_data_types_to_collect`` + , and challenge points from `train`, `dev` and `final` of attacks as specified by + ``data_processing_config.challenge_attack_data_types_to_collect``. The collected population data is concatenated + with ``original_repo_population`` to be large enough for the attack (specially DOMIAS) and then is saved in + the provided path, and returned as a dataframe. Args: midst_data_input_dir: The path where the MIDST data folders are stored. data_processing_config: Configuration dictionary containing data information and file names. save_dir: The path where the collected population data should be saved. + original_repo_population: The original population data collected from the MIDST challenge repository. population_splits: A list indicating the data splits to be collected for population data. - Could be any of `train`, `dev`, or `final` data splits. If None, the default list of ``["train"]`` - is set in the function based on the original attack implementation. + This is a list of strings containing the folder names under attack folders that are + considered for population collection. If None, the default list of ``["train"]`` is set in the + function based on the original attack implementation. challenge_splits: A list indicating the data splits to be collected for challenge points. - Could be any of `train`, `dev`, or `final` data splits. If None, the default list of - ``["train", "dev", "final"]`` is set in the function based on the original attack implementation. + This is a list of strings containing the folder names under attack folders that are + considered for challenge data collection. If None, the default list of ``["train", "dev", "final"]`` + is set in the function based on the original attack implementation. Returns: The collected population data as a dataframe. @@ -176,17 +186,23 @@ def collect_population_data_ensemble( challenge_splits = ["train", "dev", "final"] # Ensemble Attack collects train data of all the attack types (black box and white box) - attack_names = data_processing_config.population_attack_data_types_to_collect + population_attack_names = data_processing_config.population_attack_data_types_to_collect # Provided attack name are valid based on AttackType enum - population_attack_types: list[AttackType] = [AttackType(attack_name) for attack_name in attack_names] + population_attack_types: list[AttackType] = [AttackType(attack_name) for attack_name in population_attack_names] - df_population = collect_midst_data( + df_population_experiment = collect_midst_data( midst_data_input_dir, population_attack_types, data_splits=population_splits, dataset="train", data_processing_config=data_processing_config, ) + + log(INFO, f"Collected experiment population data length before concatenation: {len(df_population_experiment)}") + + df_population = pd.concat([df_population_experiment, original_repo_population]).drop_duplicates() + log(INFO, f"Concatenated population data length: {len(df_population)}") + # Drop ids. df_population_no_id = df_population.drop(columns=["trans_id", "account_id"]) # Save the population data @@ -195,6 +211,7 @@ def collect_population_data_ensemble( challenge_attack_names = data_processing_config.challenge_attack_data_types_to_collect challenge_attack_types = [AttackType(attack_name) for attack_name in challenge_attack_names] + df_challenge = collect_midst_data( midst_data_input_dir, attack_types=challenge_attack_types, @@ -202,6 +219,7 @@ def collect_population_data_ensemble( dataset="challenge", data_processing_config=data_processing_config, ) + log(INFO, f"Collected challenge data length: {len(df_challenge)} from splits: {challenge_splits}") # Save the challenge points save_dataframe(df_challenge, save_dir, "challenge_points_all.csv") diff --git a/examples/ensemble_attack/run_attack.py b/examples/ensemble_attack/run_attack.py index 5592e08e..75fe7254 100644 --- a/examples/ensemble_attack/run_attack.py +++ b/examples/ensemble_attack/run_attack.py @@ -11,6 +11,7 @@ from omegaconf import DictConfig from examples.ensemble_attack.real_data_collection import collect_population_data_ensemble +from midst_toolkit.attacks.ensemble.data_utils import load_dataframe from midst_toolkit.attacks.ensemble.process_split_data import process_split_data from midst_toolkit.common.logger import log from midst_toolkit.common.random import set_all_random_seeds @@ -23,15 +24,25 @@ def run_data_processing(config: DictConfig) -> None: Args: config: Configuration object set in config.yaml. """ + # Load original repo's population to be concatenated to the experiment's population data. + # This is done to align the experiments with the original attack code because + # this attack needs a large population dataset, and only using the experiment's collected population + # is not enough. + original_population_data = load_dataframe( + Path(config.data_processing_config.original_population_data_path), + "population_all_with_challenge.csv", + ) log(INFO, "Running data processing pipeline...") # Collect the real data from the MIDST challenge resources. population_data = collect_population_data_ensemble( midst_data_input_dir=Path(config.data_paths.midst_data_path), data_processing_config=config.data_processing_config, save_dir=Path(config.data_paths.population_path), + original_repo_population=original_population_data, population_splits=config.data_processing_config.population_splits, challenge_splits=config.data_processing_config.challenge_splits, ) + # The following function saves the required dataframe splits in the specified processed_attack_data_path path. process_split_data( all_population_data=population_data, @@ -67,7 +78,11 @@ def main(config: DictConfig) -> None: # TODO: Investigate the source of error. if config.pipeline.run_shadow_model_training: shadow_pipeline = importlib.import_module("examples.ensemble_attack.run_shadow_model_training") - shadow_data_paths = shadow_pipeline.run_shadow_model_training(config) + df_master_challenge_train = load_dataframe( + Path(config.data_paths.processed_attack_data_path), + "master_challenge_train.csv", + ) + shadow_data_paths = shadow_pipeline.run_shadow_model_training(config, df_master_challenge_train) shadow_data_paths = [Path(path) for path in shadow_data_paths] target_model_synthetic_path = shadow_pipeline.run_target_model_training(config) diff --git a/examples/ensemble_attack/run_metaclassifier_training.py b/examples/ensemble_attack/run_metaclassifier_training.py index e6a9c8e5..47cfdd32 100644 --- a/examples/ensemble_attack/run_metaclassifier_training.py +++ b/examples/ensemble_attack/run_metaclassifier_training.py @@ -63,6 +63,7 @@ def run_metaclassifier_training( with open(model_path, "rb") as f: shadow_data_and_result = pickle.load(f) shadow_data_collection.append(shadow_data_and_result) + log(INFO, f"Shadow model data loaded from {model_path}.") assert Path(target_model_synthetic_path).exists(), ( f"No file found at {target_model_synthetic_path}. " @@ -71,6 +72,10 @@ def run_metaclassifier_training( # Load the target model's synthetic data target_synthetic_data = pd.read_csv(target_model_synthetic_path) + log( + INFO, + f"Target model's synthetic data loaded from {target_model_synthetic_path} with size {len(target_synthetic_data)}.", + ) assert target_synthetic_data is not None, "Target model's synthetic data is missing." target_synthetic_data = target_synthetic_data.copy() @@ -79,6 +84,10 @@ def run_metaclassifier_training( Path(config.data_paths.population_path), "population_all_with_challenge_no_id.csv", ) + log( + INFO, + f"Reference population data loaded from {config.data_paths.population_path} with size {len(df_reference)}.", + ) # Extract trans_id from both train and test dataframes assert "trans_id" in df_meta_train.columns, "Meta train data must have trans_id column" diff --git a/examples/ensemble_attack/run_shadow_model_training.py b/examples/ensemble_attack/run_shadow_model_training.py index d4a85cbc..ae69de80 100644 --- a/examples/ensemble_attack/run_shadow_model_training.py +++ b/examples/ensemble_attack/run_shadow_model_training.py @@ -2,6 +2,7 @@ from logging import INFO from pathlib import Path +import pandas as pd from omegaconf import DictConfig from midst_toolkit.attacks.ensemble.data_utils import load_dataframe @@ -79,12 +80,13 @@ def run_target_model_training(config: DictConfig) -> Path: return target_model_synthetic_path -def run_shadow_model_training(config: DictConfig) -> list[Path]: +def run_shadow_model_training(config: DictConfig, df_challenge_train: pd.DataFrame) -> list[Path]: """ Function to run the shadow model training for RMIA attack. Args: config: Configuration object set in config.yaml. + df_challenge_train: DataFrame containing the data that is used to train RMIA shadow models. Returns: Paths to the saved shadow model results for the three sets of shadow models. For more details, @@ -95,27 +97,22 @@ def run_shadow_model_training(config: DictConfig) -> list[Path]: # Load the required dataframes for shadow model training. # For shadow model training we need master_challenge_train and population data. # Master challenge is the main training (or fine-tuning) data for the shadow models. - df_master_challenge_train = load_dataframe( - Path(config.data_paths.processed_attack_data_path), - "master_challenge_train.csv", - ) # Population data is used to pre-train some of the shadow models. df_population_with_challenge = load_dataframe( Path(config.data_paths.population_path), "population_all_with_challenge.csv", ) # Make sure master challenge train and population data have the "trans_id" column. - assert "trans_id" in df_master_challenge_train.columns, ( + assert "trans_id" in df_challenge_train.columns, ( "trans_id column should be present in master train data for the shadow model pipeline." ) assert "trans_id" in df_population_with_challenge.columns - assert "trans_id" in df_master_challenge_train.columns # ``population_data`` in ensemble attack is used for shadow pre-training, and # ``master_challenge_df`` is used for fine-tuning for half of the shadow models. # For the other half of the shadow models, only ``master_challenge_df`` is used for training. first_set_result_path, second_set_result_path, third_set_result_path = train_three_sets_of_shadow_models( population_data=df_population_with_challenge, - master_challenge_data=df_master_challenge_train, + master_challenge_data=df_challenge_train, shadow_models_output_path=Path(config.shadow_training.shadow_models_output_path), training_json_config_paths=config.shadow_training.training_json_config_paths, fine_tuning_config=config.shadow_training.fine_tuning_config, diff --git a/examples/ensemble_attack/run_train.sh b/examples/ensemble_attack/run_train.sh index 72151c4b..d6a36201 100755 --- a/examples/ensemble_attack/run_train.sh +++ b/examples/ensemble_attack/run_train.sh @@ -3,16 +3,16 @@ #SBATCH --nodes=1 #SBATCH --ntasks=1 #SBATCH --ntasks-per-node=1 -#SBATCH --cpus-per-task=1 -#SBATCH --gres=gpu:1 -#SBATCH --mem=32G -#SBATCH --qos=m -#SBATCH --job-name=ensemble_attack_train +#SBATCH --cpus-per-task=2 +#SBATCH --gres=gpu:a100:1 +#SBATCH --mem=210G +#SBATCH --job-name=train10k #SBATCH --output=%j_%x.out #SBATCH --error=%j_%x.err -#SBATCH --time=12:00:00 +#SBATCH --time=14:00:00 +echo "Total memory allocated: $(($SLURM_MEM_PER_NODE / 1024)) GB" # This script sets up the environment and runs the ensemble attack example. source .venv/bin/activate @@ -21,6 +21,6 @@ which python echo "Experiments Launched" -python -m examples.ensemble_attack.run_attack +python -m examples.ensemble_attack.run_attack --config-name=experiment_config_10k echo "Experiments Completed" diff --git a/examples/ensemble_attack/test_attack_model.py b/examples/ensemble_attack/test_attack_model.py index 0ade47df..07446320 100644 --- a/examples/ensemble_attack/test_attack_model.py +++ b/examples/ensemble_attack/test_attack_model.py @@ -11,6 +11,7 @@ import pandas as pd from omegaconf import DictConfig +from examples.ensemble_attack.real_data_collection import AttackType, collect_midst_data from examples.ensemble_attack.run_shadow_model_training import run_shadow_model_training from midst_toolkit.attacks.ensemble.blending import BlendingPlusPlus, MetaClassifierType from midst_toolkit.attacks.ensemble.data_utils import load_dataframe @@ -18,21 +19,81 @@ from midst_toolkit.common.random import set_all_random_seeds -def run_rmia_shadow_training(config: DictConfig) -> list[dict[str, list[Any]]]: +def save_results( + attack_results_path: Path, metaclassifier_model_name: str, probabilities: np.ndarray, pred_score: float | None +) -> None: + """ + Saves the test prediction probabilities and metric results. + + Args: + attack_results_path: Path to save the attack results. + metaclassifier_model_name: Name of the metaclassifier model to be used to name score and prediction files. + probabilities: Prediction probabilities from the metaclassifier. + pred_score: Prediction score to be saved. + """ + file_name = attack_results_path / f"{metaclassifier_model_name}_test_pred_proba.npy" + np.save(file_name, probabilities) + log(INFO, f"Test prediction probabilities saved at {file_name}.") + + if pred_score is not None: + log(INFO, f"TPR at FPR=0.1: {pred_score:.4f}") + + # Save the metric results into a text file. + metric_save_path = attack_results_path / f"prediction_score_{metaclassifier_model_name}.txt" + with open(metric_save_path, "w") as f: + f.write(f"TPR at FPR=0.1: {pred_score:.4f}\n") + + +def extract_and_drop_id_column( + data_frame: pd.DataFrame, + data_types_file_path: Path, +) -> tuple[pd.DataFrame, pd.Series]: + """ + Extracts IDs from the data frame and drops the ID column. ID column is identified based on + the data types JSON file with "id_column_name" key. + + Args: + data_frame: Input data frame. + data_types_file_path: Path to the data types JSON file. + + Returns: + A tuple containing: + - The modified data frame with ID columns dropped. + - A Series containing the extracted data of ID columns. + """ + # Extract ID column from the dataframe + with open(data_types_file_path, "r") as f: + column_types = json.load(f) + + assert "id_column_name" in column_types, f"{data_types_file_path} must contain 'id_column_name' key." + id_column_name = column_types["id_column_name"] + + assert id_column_name in data_frame.columns, f"Dataframe must have {id_column_name} column" + data_trans_ids = data_frame[id_column_name] + + # Drop ID column from data + data_frame = data_frame.drop(columns=id_column_name) + + return data_frame, data_trans_ids + + +def run_rmia_shadow_training(config: DictConfig, df_challenge: pd.DataFrame) -> list[dict[str, list[Any]]]: """ Three sets of shadow models will be trained as a part of this attack. - Note that for every new target model, shadow models need to be trained. - RMIA signals (for the challenge points) are calculated based on these shadow models, - and will be fed into the metaclassifier. + Note that shadow models need to be trained on the collection of challenge points once and used + for all the target models in a setting. In other words, in a standard setting, the + testing points (experiment challenge points) are used as training or included in training data + of the shadow models, and these shadow models are used to attack all target models. Args: config: Configuration object set in ``experiments_config.yaml``. + df_challenge: DataFrame containing the challenge data points for shadow model training. Return: A list containing three dictionaries, each representing a collection of shadow models with their training data and generated synthetic outputs. """ - shadow_model_paths = run_shadow_model_training(config) + shadow_model_paths = run_shadow_model_training(config, df_challenge_train=df_challenge) assert len(shadow_model_paths) == 3, "For testing, meta classifier needs the path to three sets of shadow models." @@ -49,18 +110,160 @@ def run_rmia_shadow_training(config: DictConfig) -> list[dict[str, list[Any]]]: return shadow_data_collection +def load_trained_rmia_shadows_for_test_phase( + shadow_data_paths: list[Path], +) -> tuple[list[dict[str, list[Any]]], bool]: + """ + Loads previously trained RMIA shadow models for the testing phase. Makes sure + all shadow models exist before loading. Otherwise, returns an empty list and False. + + Args: + shadow_data_paths: List of paths to the saved shadow model data. + + Returns: + A tuple containing: + - A list of dictionaries, each representing a collection of shadow + models with their training data and generated synthetic outputs. + - A boolean indicating whether all shadow models were successfully loaded. + """ + shadow_data_collection = [] + models_exists = True + for model_path in shadow_data_paths: + if model_path.exists(): + with open(model_path, "rb") as f: + shadow_data_and_result = pickle.load(f) + shadow_data_collection.append(shadow_data_and_result) + log(INFO, f"Loaded existing shadow model at {model_path}.") + else: + models_exists = False + shadow_data_collection = [] + break + return shadow_data_collection, models_exists + + +def collect_challenge_and_train_data( + data_processing_config: DictConfig, processed_attack_data_path: Path, targets_data_path: Path +) -> tuple[pd.DataFrame, pd.DataFrame]: + """ + Collect challenge experiment data and master train data. + + Args: + data_processing_config: Configuration object for data processing. + processed_attack_data_path: Path to the processed attack data. + targets_data_path: Path to the target model's data. + + Returns: + Tuple of (df_challenge_experiment, df_master_train). + """ + # Collect all repo's challenge points + challenge_attack_names = data_processing_config.challenge_attack_data_types_to_collect + challenge_attack_types = [AttackType(attack_name) for attack_name in challenge_attack_names] + df_challenge_experiment = collect_midst_data( + midst_data_input_dir=targets_data_path, + attack_types=challenge_attack_types, + data_splits=["test"], # For ensemble experiments, change to ``test`` for 10k, and change to ``final`` for 20k + dataset="challenge", + data_processing_config=data_processing_config, + ) + log( + INFO, + f"Collected challenge data length: {len(df_challenge_experiment)} for the testing phase's shadow training.", + ) + + # Load master challenge train data + df_master_train = load_dataframe( + processed_attack_data_path, + "master_challenge_train.csv", + ) + log( + INFO, + f"Loaded master challenge train data length: {len(df_master_train)} for the testing phase's shadow training.", + ) + + return df_challenge_experiment, df_master_train + + +def select_challenge_data_for_training( + attack_rmia_shadow_training_data_choice: str, df_challenge_experiment: pd.DataFrame, df_master_train: pd.DataFrame +) -> pd.DataFrame: + """ + Select the appropriate challenge data based on config choice. + + Args: + attack_rmia_shadow_training_data_choice: Strategy for creating challenge train data for RMIA shadow training. + It can be one of the following: + - "only_challenge": Use only challenge experiment data (``df_challenge_experiment``). + - "only_train": Use only master train data (``df_master_train``). Note that this option contracts + with the original design and purpose of training RMIA shadow models on the challenge points as + RMIA signals (IN train signals) for challenge points could only be computed if + shadow models are trained on these points. + - "combined": Combine both challenge experiment data and master train data. This can + potentially be advantages based on the experiments as RMIA shadows are trained on + more data points. + df_challenge_experiment: Challenge points in this experiment. + df_master_train: Master train data used to train the meta classifier. + + Raises: + ValueError: If an invalid choice is provided. + + Returns: + Selected challenge data. + """ + if attack_rmia_shadow_training_data_choice == "combined": + # Run RMIA shadow model training on experiments challenge points + master challenge train data + df_challenge = pd.concat([df_challenge_experiment, df_master_train]).drop_duplicates() + log(INFO, f"Combined challenge data length for RMIA shadow training: {len(df_challenge)}.") + elif attack_rmia_shadow_training_data_choice == "only_challenge": + df_challenge = df_challenge_experiment + log(INFO, "Using only challenge data points for RMIA shadow training.") + elif attack_rmia_shadow_training_data_choice == "only_train": + df_challenge = df_master_train + log(INFO, "Using only master challenge train data points for RMIA shadow training.") + else: + raise ValueError( + "Invalid choice for attack_rmia_shadow_training_data_choice. Must be one of 'combined', 'only_challenge', or 'only_train'." + ) + + return df_challenge + + +def train_rmia_shadows_for_test_phase(config: DictConfig) -> list[dict[str, list[Any]]]: + """ + Function to train RMIA shadow models for the testing phase using the dataset containing + challenge data points. + + Args: + config: Configuration object set in ``experiments_config.yaml``. + + Returns: + A list containing three dictionaries, each representing a collection of shadow + models with their training data IDs and generated synthetic outputs. + """ + df_challenge_experiment, df_master_train = collect_challenge_and_train_data( + config.data_processing_config, + processed_attack_data_path=Path(config.data_paths.processed_attack_data_path), + targets_data_path=Path(config.data_paths.midst_data_path), + ) + # Load the challenge dataframe for training RMIA shadow models. + df_challenge = select_challenge_data_for_training( + str(config.target_model.attack_rmia_shadow_training_data_choice), df_challenge_experiment, df_master_train + ) + return run_rmia_shadow_training(config, df_challenge=df_challenge) + + @hydra.main(config_path="configs", config_name="experiment_config", version_base=None) def run_metaclassifier_testing( config: DictConfig, ) -> None: """ - Function to run the attack on a target model using a trained metaclassifier. - Note that RMIA shadow models need to be trained for every new target model's challenge dataset. - However, we load the previously trained metaclassifier model and use it for new target models. - Unlike the training phase, in the testing phase, we don't need to train a shadow target model + Function to run the attack on a single target model using a trained metaclassifier. + Note that RMIA shadow models need to be trained for every new set of target models on + their collected challenge data, but once they are trained for the first target, we can reuse them + for the other targets in the same experiment. + Unlike the training phase, in the testing phase, we don't need to train a target shadow model since we already have access to the synthetic data of a real target model. All the collected population data that is used for training, is still needed during testing to compute some - of the signals. + of the signals (DOMIAS). Test prediction probabilities are saved to the specified attack result path in the config. Args: @@ -84,12 +287,11 @@ def run_metaclassifier_testing( with open(mataclassifier_path, "rb") as f: trained_mataclassifier_model = pickle.load(f) - log(INFO, "Metaclassifier model loaded, starting the test...") + log(INFO, f"Metaclassifier model loaded from {mataclassifier_path}, starting the test...") # 2) Read target model's challenge data and synthetic data. - # Back-box attacker has only access to the target model's synthetic data and challenge points. - # We also load challenge labels to report the attack performance. + # We also load challenge labels to report the attack performance at the end. challenge_data_path = Path(config.target_model.challenge_data_path) challenge_label_path = Path(config.target_model.challenge_label_path) @@ -105,34 +307,35 @@ def run_metaclassifier_testing( INFO, f"Target synthetic data loaded from {target_synthetic_path} with a size of {len(target_synthetic_data)}." ) - # Extract trans_id from the test dataframe - with open(Path(config.metaclassifier.data_types_file_path), "r") as f: - column_types = json.load(f) - id_column_name = column_types["id_column_name"] - - assert id_column_name in test_data.columns, f"Test data must have {id_column_name} column" - - test_trans_ids = test_data[id_column_name] - - # Drop id columns from test data - id_column_names = [column_name for column_name in test_data.columns if column_name.endswith("_id")] - test_data = test_data.drop(columns=id_column_names) + # If the synthetic data has more points than specified in the config, take only the required number. + if len(target_synthetic_data) > config.shadow_training.number_of_points_to_synthesize: + # Take only the required number of synthetic data points + target_synthetic_data = target_synthetic_data.head(config.shadow_training.number_of_points_to_synthesize) + log(INFO, f"Target synthetic data size adjusted to {len(target_synthetic_data)} based on the config setting.") # 3) Shadow Model Training Step. - # Make sure to assign a new path for shadow models trained for target's challenge points to # avoid overriding train's shadow models. + # TODO: Assign specific shadow collection path for test phase. config.shadow_training.shadow_models_output_path = config.target_model.target_shadow_models_output_path - shadow_data_collection = run_rmia_shadow_training(config) + shadow_data_paths = [Path(path) for path in config.shadow_training.final_shadow_models_path] + assert len(shadow_data_paths) == 3, "The attack_data_paths list must contain exactly three elements." - # 4) Initialize the attacker object, and assign the loaded metaclassifier to it. - target_synthetic_data = target_synthetic_data.copy() + # If shadows are already trained for test (``models_exists`` is True), don't need to train again. + # Load shadow training collection from previously trained shadow models. + shadow_data_collection, models_exists = load_trained_rmia_shadows_for_test_phase(shadow_data_paths) - df_reference = load_dataframe( - Path(config.data_paths.population_path), - "population_all_with_challenge_no_id.csv", - ) + if not models_exists: + log(INFO, "Shadow models for testing phase do not exist. Training RMIA shadow models...") + shadow_data_collection = train_rmia_shadows_for_test_phase(config) + + else: + log(INFO, "All shadow models for testing phase found. Using existing RMIA shadow models...") + + # Extract and drop id columns from the test data + test_data, test_trans_ids = extract_and_drop_id_column(test_data, Path(config.metaclassifier.data_types_file_path)) + # 4) Initialize the attacker object, and assign the loaded metaclassifier to it. blending_attacker = BlendingPlusPlus( config=config, shadow_data_collection=shadow_data_collection, @@ -145,6 +348,13 @@ def run_metaclassifier_testing( blending_attacker.trained_model = trained_mataclassifier_model # 5) Get predictions on the challenge data (test set). + + # Load the reference population data for DOMIAS signals. + df_reference = load_dataframe( + Path(config.data_paths.population_path), + "population_all_with_challenge_no_id.csv", + ) + probabilities, pred_score = blending_attacker.predict( df_test=test_data, df_original_synthetic=target_synthetic_data, @@ -156,17 +366,7 @@ def run_metaclassifier_testing( # Save the validation prediction probabilities attack_results_path = Path(config.target_model.attack_probabilities_result_path) attack_results_path.mkdir(parents=True, exist_ok=True) - file_name = attack_results_path / f"{metaclassifier_model_name}_test_pred_proba.npy" - np.save(file_name, probabilities) - log(INFO, f"Test prediction probabilities saved at {file_name}.") - - if pred_score is not None: - log(INFO, f"TPR at FPR=0.1: {pred_score:.4f}") - - # Save the metric results into a text file. - metric_save_path = attack_results_path / f"prediction_score_{metaclassifier_model_name}.txt" - with open(metric_save_path, "w") as f: - f.write(f"TPR at FPR=0.1: {pred_score:.4f}\n") + save_results(attack_results_path, metaclassifier_model_name, probabilities, pred_score) if __name__ == "__main__": diff --git a/src/midst_toolkit/attacks/ensemble/rmia/rmia_calculation.py b/src/midst_toolkit/attacks/ensemble/rmia/rmia_calculation.py index 3700c4d4..4c64989b 100644 --- a/src/midst_toolkit/attacks/ensemble/rmia/rmia_calculation.py +++ b/src/midst_toolkit/attacks/ensemble/rmia/rmia_calculation.py @@ -4,9 +4,11 @@ https://github.com/CRCHUM-CITADEL/ensemble-mia. """ +from collections.abc import Sequence from enum import Enum from logging import INFO -from typing import Any +from multiprocessing import Pool +from typing import Any, TypeAlias import gower import numpy as np @@ -15,11 +17,122 @@ from midst_toolkit.common.logger import log +FloatDType: TypeAlias = type[np.float32] | type[np.float64] + + class Key(Enum): TRAINED_RESULTS = "trained_results" FINE_TUNED_RESULTS = "fine_tuned_results" +def compute_gower_batched( + df_x: pd.DataFrame, + df_y: pd.DataFrame, + cat_features: list[bool], + batch_size: int, + dtype: FloatDType, +) -> np.ndarray: + """ + Compute Gower distance matrix in batches to reduce peak memory usage. + + This processes df_x in batches, computing distances against all of df_y, + then stacks results. + + Args: + df_x: First dataframe. + df_y: Second dataframe. + cat_features: A boolean list indicating which columns are categorical. + batch_size: Number of rows from df_x to process in each batch. + dtype: The data type to which numerical columns will be cast. + + Returns: + Gower distance matrix as a numpy array. + """ + n_x = len(df_x) + n_y = len(df_y) + + # Pre-allocate output matrix + gower_matrix = np.zeros((n_x, n_y), dtype=dtype) + + for start_idx in range(0, n_x, batch_size): + end_idx = min(start_idx + batch_size, n_x) + + # Compute distance for this batch + batch_matrix = gower.gower_matrix( + data_x=df_x.iloc[start_idx:end_idx], + data_y=df_y, + cat_features=cat_features, + ) + + # Store in output matrix + gower_matrix[start_idx:end_idx] = batch_matrix.astype(dtype) + + return gower_matrix + + +def compute_gower_for_model( + args: tuple[ + int, + pd.DataFrame, + pd.DataFrame, + int, + str, + int | None, + list[bool], + Sequence[str], + FloatDType, + ], +) -> tuple[int, np.ndarray]: + """ + Computes the Gower distance matrix between df_input and a single synthetic dataframe. + + Args: + args: A tuple containing: + - i: Index of the model (for tracking purposes). + - df_synthetic_raw: The synthetic dataframe generated by the shadow model. + - df_input: The input dataframe to compare against. + - min_length: Minimum length for downsampling. + - id_column_name: Name of the ID column. + - random_seed: Random seed for reproducibility. + - categorical_features: A boolean list indicating which columns are categorical. + - numerical_columns: List of numerical column names. + - dtype: The data type to which numerical columns will be cast. + + Returns: + A tuple containing: + - i: Index of the model (same as input). + - gower_matrix: The computed Gower distance matrix as a numpy array. + """ + ( + i, + df_synthetic_raw, + df_input, + min_length, + id_column_name, + random_seed, + categorical_features, + numerical_columns, + dtype, + ) = args + + df_synthetic = df_synthetic_raw.copy() + + # Convert numerical columns to float (otherwise error in the numpy divide) + df_synthetic[numerical_columns] = df_synthetic[numerical_columns].astype(dtype) + + # Sample synthetic data points if there's too many + if len(df_synthetic) > min_length: + df_synthetic = df_synthetic.sample(n=min_length, random_state=random_seed) + + if id_column_name in df_synthetic.columns: + df_synthetic = df_synthetic.drop(columns=[id_column_name]) + + # Batched computation to reduce peak memory + gower_matrix = compute_gower_batched(df_input, df_synthetic, categorical_features, batch_size=5000, dtype=dtype) + + return i, gower_matrix + + def get_rmia_gower( df_input: pd.DataFrame, model_data: list[pd.DataFrame], @@ -27,6 +140,9 @@ def get_rmia_gower( categorical_column_names: list[str], id_column_name: str, random_seed: int | None = None, + use_multiprocessing: bool = True, + n_jobs: int = 4, + dtype: FloatDType = np.float32, ) -> list[np.ndarray]: """ Computes the Gower distance between the challenge points and the synthetic data generated by the shadow models. @@ -40,6 +156,10 @@ def get_rmia_gower( categorical_column_names: A list of categorical column names. We assume that all other columns are numerical. id_column_name: Name of the ID column. random_seed: Random seed for reproducibility. + use_multiprocessing: Whether to use multiprocessing for parallel computation of Gower distances. + n_jobs: Number of parallel jobs to use for computation if ``use_multiprocessing`` is ``True``. Default is 4. + dtype: The data type to which numerical columns will be cast for Gower distance computation. np.float32 is more + memory efficient. Default is np.float32. Returns: A list of numpy arrays, each representing the Gower distance matrix between the input dataframe and the @@ -47,6 +167,7 @@ def get_rmia_gower( datasets provided within ``model_data``. """ + df_input = df_input.copy() # Check if any specified categorical columns are missing from the dataframe missing_categorical_columns = set(categorical_column_names) - set(df_input.columns) if missing_categorical_columns: @@ -58,32 +179,40 @@ def get_rmia_gower( categorical_features = [column in categorical_column_names for column in df_input.columns] - gower_matrices = [] - - numerical_columns = [col for col in df_input.columns if col not in categorical_column_names] - - df_input[numerical_columns] = df_input[numerical_columns].astype(float) - - for i in range(len(model_data)): - df_synthetic = model_data[i].copy() - - # Convert numerical columns to float (otherwise error in the numpy divide) - df_synthetic[numerical_columns] = df_synthetic[numerical_columns].astype(float) - - # Sample synthetic data points if there's too many - if len(df_synthetic) > min_length: - df_synthetic = df_synthetic.sample(n=min_length, random_state=random_seed) - - if id_column_name in df_synthetic.columns: - df_synthetic = df_synthetic.drop(columns=[id_column_name]) - - gower_matrix = gower.gower_matrix(data_x=df_input, data_y=df_synthetic, cat_features=categorical_features) - - assert np.all((gower_matrix >= 0) & (gower_matrix <= 1)), "Distances are falling outside of range [0, 1]." + # Defined the type as sequence to address mypy error. + numerical_columns: Sequence[str] = [col for col in df_input.columns if col not in categorical_column_names] + + df_input[numerical_columns] = df_input[numerical_columns].astype(dtype) + + # Prepare arguments for each processing task + args_list = [ + ( + i, + df_synthetic, + df_input, + min_length, + id_column_name, + random_seed, + categorical_features, + numerical_columns, + dtype, + ) + for i, df_synthetic in enumerate(model_data) + ] - gower_matrices.append(gower_matrix) + # Process in parallel or sequentially based on use_multiprocessing + results = {} + if use_multiprocessing: + with Pool(processes=n_jobs) as pool: + for i, gower_matrix in pool.imap_unordered(compute_gower_for_model, args_list): + results[i] = gower_matrix + else: + for args in args_list: + i, gower_matrix = compute_gower_for_model(args) + results[i] = gower_matrix - return gower_matrices + # Return in original order + return [results[i] for i in range(len(model_data))] def conditional_average(values: np.ndarray, condition_mask: np.ndarray) -> np.ndarray: @@ -156,10 +285,10 @@ def calculate_rmia_signals( models with their training data and generated synthetic outputs. Each collection can contain multiple shadow models. The first two dictionaries have keys ``fine_tuning_sets`` and ``fine_tuned_results``, while the third has keys ``selected_sets`` and ``trained_results``. The ``fine_tuning_sets`` and - ``selected_sets`` keys map to lists of DataFrames containing the data used to fine-tune or train the shadow - models. The ``fine_tuned_results`` and ``trained_results`` keys map to lists of ``TrainingResult`` objects, - which store model training metadata and the corresponding generated synthetic data. - See ``train_three_sets_of_shadow_models`` in attacks/ensemble/rmia/shadow_model_training.py + ``selected_sets`` keys map to lists of DataFrames containing the data IDs used to fine-tune or train + the shadow models. The ``fine_tuned_results`` and ``trained_results`` keys map to lists of + DataFrame objects, which store model generated synthetic data. + See ``train_three_sets_of_shadow_models`` in ``attacks/ensemble/rmia/shadow_model_training.py`` for additional details. target_synthetic_data: Target model's synthetic data as a DataFrame. categorical_column_names: A list of categorical column names. @@ -202,9 +331,9 @@ def calculate_rmia_signals( trained_shadow_data = shadow_data_collection[2] all_lengths = [ - [len(data.synthetic_data) for data in fine_tuned_shadow_data_0["fine_tuned_results"]], - [len(data.synthetic_data) for data in fine_tuned_shadow_data_1["fine_tuned_results"]], - [len(data.synthetic_data) for data in trained_shadow_data["trained_results"]], + [len(data) for data in fine_tuned_shadow_data_0["fine_tuned_results"]], + [len(data) for data in fine_tuned_shadow_data_1["fine_tuned_results"]], + [len(data) for data in trained_shadow_data["trained_results"]], [len(data) for data in fine_tuned_shadow_data_0["fine_tuning_sets"]], [len(data) for data in fine_tuned_shadow_data_1["fine_tuning_sets"]], [len(data) for data in trained_shadow_data["selected_sets"]], @@ -218,9 +347,7 @@ def calculate_rmia_signals( if not (1 <= k <= min_length): raise ValueError(f"k={k} must be within [1, {min_length}]") - shadow_synthetic_list_0 = [ - train_result.synthetic_data for train_result in fine_tuned_shadow_data_0[Key.FINE_TUNED_RESULTS.value] - ] + shadow_synthetic_list_0 = list(fine_tuned_shadow_data_0[Key.FINE_TUNED_RESULTS.value]) shadow_model_gower_0 = get_rmia_gower( df_input=df_input, model_data=shadow_synthetic_list_0, @@ -228,10 +355,12 @@ def calculate_rmia_signals( categorical_column_names=categorical_column_names, id_column_name=id_column_name, random_seed=random_seed, + n_jobs=4, ) - shadow_synthetic_list_1 = [ - train_result.synthetic_data for train_result in fine_tuned_shadow_data_1[Key.FINE_TUNED_RESULTS.value] - ] + log(INFO, "Computed Gower distance for fine-tuned shadow models (first set of shadow models).") + + shadow_synthetic_list_1 = list(fine_tuned_shadow_data_1[Key.FINE_TUNED_RESULTS.value]) + shadow_model_gower_1 = get_rmia_gower( df_input=df_input, model_data=shadow_synthetic_list_1, @@ -239,10 +368,13 @@ def calculate_rmia_signals( categorical_column_names=categorical_column_names, id_column_name=id_column_name, random_seed=random_seed, + n_jobs=4, ) - shadow_synthetic_list_2 = [ - train_result.synthetic_data for train_result in trained_shadow_data[Key.TRAINED_RESULTS.value] - ] + log(INFO, "Computed Gower distance for fine-tuned shadow models (second set of shadow models).") + + shadow_synthetic_list_2 = list(trained_shadow_data[Key.TRAINED_RESULTS.value]) + + # shadow_synthetic_list_2 includes 8 shadow models shadow_model_gower_2 = get_rmia_gower( df_input=df_input, model_data=shadow_synthetic_list_2, @@ -250,18 +382,18 @@ def calculate_rmia_signals( categorical_column_names=categorical_column_names, id_column_name=id_column_name, random_seed=random_seed, + n_jobs=4, ) + log(INFO, "Computed Gower distance for trained shadow models (third set of shadow models).") + gower_shadows = np.vstack( [np.array(shadow_model_gower_0), np.array(shadow_model_gower_1), np.array(shadow_model_gower_2)] ) - # TODO: ideally remove hard-copied keys - shadow_training_data = ( - fine_tuned_shadow_data_0["fine_tuning_sets"] - + fine_tuned_shadow_data_1["fine_tuning_sets"] - + trained_shadow_data["selected_sets"] - ) + # Process shadow model distances. Gower_shadows is a 3D matrix of shape: + # ((total number of shadow models), len(df_input), len(shadow_synthetic)) + sorted_shadow_gower = np.sort(gower_shadows, axis=2) # TODO: check key after we have the official target model target_model_gower = get_rmia_gower( @@ -271,6 +403,7 @@ def calculate_rmia_signals( categorical_column_names=categorical_column_names, id_column_name=id_column_name, ) + log(INFO, "Computed Gower distance for the target model.") # gower_target is a 2D NumPy array that stores the Gower distances between # the input data (df_input) and the synthetic data generated by the target model. @@ -286,9 +419,12 @@ def calculate_rmia_signals( signal_target_k_mean = np.mean(k_nearest_target_distances, axis=1) signal_target_k_1 = gower_target[:, 0] # First element is the minimum - # Process shadow model distances. Gower_shadows is a 3D matrix of shape: - # ((total number of shadow models), len(df_input), len(shadow_synthetic)) - sorted_shadow_gower = np.sort(gower_shadows, axis=2) + # TODO: ideally remove hard-copied keys + shadow_training_data_ids = ( + fine_tuned_shadow_data_0["fine_tuning_sets"] + + fine_tuned_shadow_data_1["fine_tuning_sets"] + + trained_shadow_data["selected_sets"] + ) # For k nearest neighbors # Similar to target, we sort the distances in each shadow model's gower matrix, @@ -312,12 +448,11 @@ def calculate_rmia_signals( f"signal_shadow_k_{k}": signal_shadows, } ) - # Create masks for records in/out of training sets. We're creating masks for all the samples in train_df, # as opposed to the original implementation which only creates masks a sample of 200 records. We've also # changed the way the masks are created to improve efficiency. - shadow_training_id_data = [set(id_list) for id_list in shadow_training_data] + shadow_training_id_data = [set(id_list) for id_list in shadow_training_data_ids] mask_in_training = np.array( [results_df[id_column_name].isin(id_set).to_numpy() for id_set in shadow_training_id_data] diff --git a/src/midst_toolkit/attacks/ensemble/rmia/shadow_model_training.py b/src/midst_toolkit/attacks/ensemble/rmia/shadow_model_training.py index 672d2ec8..0f52d61a 100644 --- a/src/midst_toolkit/attacks/ensemble/rmia/shadow_model_training.py +++ b/src/midst_toolkit/attacks/ensemble/rmia/shadow_model_training.py @@ -184,7 +184,7 @@ def train_fine_tuned_shadow_models( INFO, f"Fine-tuned model {model_id} generated {len(train_result.synthetic_data)} synthetic samples.", ) - attack_data["fine_tuned_results"].append(train_result) + attack_data["fine_tuned_results"].append(train_result.synthetic_data) # Pickle dump the results result_path = Path(save_dir / "rmia_shadows.pkl") @@ -296,7 +296,7 @@ def train_shadow_on_half_challenge_data( f"Trained shadow model {model_id} generated {len(train_result.synthetic_data)} synthetic samples.", ) - attack_data["trained_results"].append(train_result) + attack_data["trained_results"].append(train_result.synthetic_data) # Pickle dump the results result_path = Path(save_dir, "rmia_shadows_third_set.pkl") @@ -438,4 +438,5 @@ def train_three_sets_of_shadow_models( INFO, f"Third set of shadow model training completed and saved at: {third_set_result_path}", ) + return first_set_result_path, second_set_result_path, third_set_result_path diff --git a/tests/integration/attacks/ensemble/test_shadow_model_training.py b/tests/integration/attacks/ensemble/test_shadow_model_training.py index 354053c3..4e2e6eb9 100644 --- a/tests/integration/attacks/ensemble/test_shadow_model_training.py +++ b/tests/integration/attacks/ensemble/test_shadow_model_training.py @@ -14,7 +14,6 @@ train_shadow_on_half_challenge_data, ) from midst_toolkit.attacks.ensemble.shadow_model_utils import ( - TrainingResult, fine_tune_tabddpm_and_synthesize, save_additional_tabddpm_config, train_tabddpm_and_synthesize, @@ -66,17 +65,10 @@ def test_train_fine_tuned_shadow_models(cfg: DictConfig, tmp_path: Path) -> None assert len(shadow_data["fine_tuning_sets"]) == 2 # n_models assert len(shadow_data["fine_tuned_results"]) == 2 # n_models - for result in shadow_data["fine_tuned_results"]: - assert type(result) is TrainingResult - assert result.synthetic_data is not None - assert result.tables is not None - assert result.models is not None - assert result.configs is not None - assert result.save_dir is not None - assert result.relation_order is not None - assert result.all_group_lengths_probabilities is not None - assert type(result.synthetic_data) is pd.DataFrame - assert len(result.synthetic_data) == 5 + for synthetic_data in shadow_data["fine_tuned_results"]: + assert type(synthetic_data) is pd.DataFrame + assert synthetic_data is not None + assert len(synthetic_data) == 5 # Fine tuning sets should be disjoint assert set(shadow_data["fine_tuning_sets"][0]).isdisjoint(set(shadow_data["fine_tuning_sets"][1])) @@ -112,17 +104,9 @@ def test_train_shadow_on_half_challenge_data(cfg: DictConfig, tmp_path: Path) -> assert len(shadow_data["selected_sets"]) == 2 # n_models assert len(shadow_data["trained_results"]) == 2 # n_models - for result in shadow_data["trained_results"]: - assert type(result) is TrainingResult - assert result.synthetic_data is not None - assert result.tables is not None - assert result.models is not None - assert result.configs is not None - assert result.save_dir is not None - assert result.relation_order is not None - assert result.all_group_lengths_probabilities is not None - assert type(result.synthetic_data) is pd.DataFrame - assert len(result.synthetic_data) == 5 + for synthetic_data in shadow_data["trained_results"]: + assert type(synthetic_data) is pd.DataFrame + assert len(synthetic_data) == 5 # Training sets should be disjoint assert set(shadow_data["selected_sets"][0]).isdisjoint(set(shadow_data["selected_sets"][1])) diff --git a/tests/unit/attacks/ensemble/test_rmia.py b/tests/unit/attacks/ensemble/test_rmia.py index 03ec8373..dfcb5021 100644 --- a/tests/unit/attacks/ensemble/test_rmia.py +++ b/tests/unit/attacks/ensemble/test_rmia.py @@ -1,4 +1,3 @@ -from collections import namedtuple from typing import Any import numpy as np @@ -15,9 +14,6 @@ ) -MockTrainingResult = namedtuple("TrainingResult", ["synthetic_data"]) - - @pytest.fixture def base_data() -> dict[str, Any]: """Provides base data for testing.""" @@ -48,11 +44,11 @@ def base_data() -> dict[str, Any]: model_data = { "trained_results": [ - MockTrainingResult(synthetic_data=df_syn1), - MockTrainingResult(synthetic_data=df_syn2), + df_syn1, + df_syn2, ], "fine_tuned_results": [ - MockTrainingResult(synthetic_data=df_syn1), + df_syn1, ], } @@ -81,19 +77,19 @@ def rmia_signal_data() -> dict[str, Any]: shadow_data_collection = [ { "fine_tuning_sets": [train_set_0["id"].tolist()], - "fine_tuned_results": [MockTrainingResult(syn_data_5.copy())], + "fine_tuned_results": [syn_data_5.copy()], }, { "fine_tuning_sets": [train_set_1["id"].tolist()], - "fine_tuned_results": [MockTrainingResult(syn_data_5.copy())], + "fine_tuned_results": [syn_data_5.copy()], }, { "selected_sets": [train_set_2["id"].tolist()], - "trained_results": [MockTrainingResult(syn_data_5.copy())], + "trained_results": [syn_data_5.copy()], }, ] - target_synthetic_data = MockTrainingResult(syn_data_5.copy()).synthetic_data + target_synthetic_data = syn_data_5.copy() return { "df_input": df_input, @@ -152,9 +148,7 @@ def test_get_rmia_gower_basic_run(self, base_data, mocker): ) min_length = 3 - shadow_synthetic_list = [ - train_result.synthetic_data for train_result in base_data["model_data"][Key.TRAINED_RESULTS.value] - ] + shadow_synthetic_list = list(base_data["model_data"][Key.TRAINED_RESULTS.value]) results = get_rmia_gower( df_input=base_data["df_input"], model_data=shadow_synthetic_list, @@ -162,25 +156,26 @@ def test_get_rmia_gower_basic_run(self, base_data, mocker): categorical_column_names=base_data["categorical_column_names"], id_column_name=base_data["id_column_name"], random_seed=base_data["random_seed"], + use_multiprocessing=False, # Disable multiprocessing to ensure mock is called as expected in the main process. ) assert len(results) == 2 - npt.assert_array_equal(results[0], np.array([[0.1, 0.2], [0.3, 0.4], [0.5, 0.6]])) + npt.assert_array_equal(results[0], np.array([[0.1, 0.2], [0.3, 0.4], [0.5, 0.6]], dtype=np.float32)) npt.assert_array_equal( results[1], - np.array([[0.7, 0.8, 0.9], [0.1, 0.2, 0.3], [0.4, 0.5, 0.6]]), + np.array([[0.7, 0.8, 0.9], [0.1, 0.2, 0.3], [0.4, 0.5, 0.6]], dtype=np.float32), ) assert mock_gower_matrix.call_count == 2 call_args_1 = mock_gower_matrix.call_args_list[0].kwargs pdt.assert_frame_equal(call_args_1["data_x"], base_data["df_input"], check_dtype=False) - syn_data_1_dropped = base_data["model_data"]["trained_results"][0].synthetic_data.drop(columns=["id"]) + syn_data_1_dropped = base_data["model_data"]["trained_results"][0].drop(columns=["id"]) pdt.assert_frame_equal(call_args_1["data_y"], syn_data_1_dropped, check_dtype=False) assert call_args_1["cat_features"] == [False, True, False] call_args_2 = mock_gower_matrix.call_args_list[1].kwargs - syn_data_2_dropped = base_data["model_data"]["trained_results"][1].synthetic_data.drop(columns=["id"]) + syn_data_2_dropped = base_data["model_data"]["trained_results"][1].drop(columns=["id"]) pdt.assert_frame_equal(call_args_2["data_y"], syn_data_2_dropped, check_dtype=False) def test_get_rmia_gower_with_sampling(self, base_data, mocker): @@ -190,12 +185,12 @@ def test_get_rmia_gower_with_sampling(self, base_data, mocker): return_value=np.array([[0.1], [0.2], [0.3]]), ) - original_syn_data = base_data["model_data"]["trained_results"][1].synthetic_data + original_syn_data = base_data["model_data"]["trained_results"][1] mock_sample = mocker.patch("pandas.DataFrame.sample", wraps=original_syn_data.sample) min_length = 2 - synthetic_data_list = [data.synthetic_data for data in base_data["model_data"][Key.TRAINED_RESULTS.value]] + synthetic_data_list = list(base_data["model_data"][Key.TRAINED_RESULTS.value]) get_rmia_gower( df_input=base_data["df_input"], model_data=synthetic_data_list, @@ -203,6 +198,7 @@ def test_get_rmia_gower_with_sampling(self, base_data, mocker): categorical_column_names=base_data["categorical_column_names"], id_column_name=base_data["id_column_name"], random_seed=base_data["random_seed"], + use_multiprocessing=False, # Disable multiprocessing to ensure mock is used in the main process ) assert mock_gower_matrix.call_count == 2 @@ -212,7 +208,12 @@ def test_get_rmia_gower_with_sampling(self, base_data, mocker): expected_sampled_data = original_syn_data.sample(n=min_length, random_state=base_data["random_seed"]).drop( columns=[base_data["id_column_name"]] ) - pdt.assert_frame_equal(call_args_2["data_y"], expected_sampled_data, check_dtype=False) + pdt.assert_frame_equal( + call_args_2["data_y"], + expected_sampled_data, + check_dtype=False, + obj=f"mistake in call args: {call_args_2['data_y'].columns} and {expected_sampled_data.columns}", + ) def test_get_rmia_gower_missing_categorical_column(self, base_data, mocker, caplog): """Tests that a warning is logged for missing categorical columns.""" @@ -223,9 +224,7 @@ def test_get_rmia_gower_missing_categorical_column(self, base_data, mocker, capl missing_cat_cols = ["city", "non_existent_column"] with caplog.at_level("INFO"): - synthetic_data_list = [ - data.synthetic_data for data in base_data["model_data"][Key.FINE_TUNED_RESULTS.value] - ] + synthetic_data_list = list(base_data["model_data"][Key.FINE_TUNED_RESULTS.value]) get_rmia_gower( df_input=base_data["df_input"], model_data=synthetic_data_list,