diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index 05b94fa..975b9d8 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -7,6 +7,34 @@ "dockerfile": "Dockerfile" }, "features": { - "ghcr.io/rocker-org/devcontainer-features/miniforge:2": {} - } + "ghcr.io/devcontainers/features/anaconda:1": { + "version": "latest" + }, + "ghcr.io/devcontainers/features/nvidia-cuda:2": { + "installCudnn": true, + "installCudnnDev": true, + "installNvtx": true, + "installToolkit": true, + "cudaVersion": "11.8", + "cudnnVersion": "automatic" + }, + "ghcr.io/raucha/devcontainer-features/pytorch:1": {} + }, + "runArgs": [ + "--gpus=all" + ] + // Features to add to the dev container. More info: https://containers.dev/features. + // "features": {}, + + // Use 'forwardPorts' to make a list of ports inside the container available locally. + // "forwardPorts": [], + + // Use 'postCreateCommand' to run commands after the container is created. + // "postCreateCommand": "python --version", + + // Configure tool-specific properties. + // "customizations": {}, + + // Uncomment to connect as root instead. More info: https://aka.ms/dev-containers-non-root. + // "remoteUser": "root" } diff --git a/Adversarial_Observation/BirdParticle.py b/Adversarial_Observation/BirdParticle.py index c6d07ca..a9ebcce 100755 --- a/Adversarial_Observation/BirdParticle.py +++ b/Adversarial_Observation/BirdParticle.py @@ -9,10 +9,10 @@ class BirdParticle: fitness evaluation, and the updates to its velocity and position based on the PSO algorithm. """ - def __init__(self, model: tf.keras.Model, input_data: tf.Tensor, target_class: int, epsilon: float, + def __init__(self, model: tf.keras.Model, input_data: tf.Tensor, target_class: int, num_iterations: int = 20, velocity: tf.Tensor = None, inertia_weight: float = 0.5, - cognitive_weight: float = 1.0, social_weight: float = 1.0, momentum: float = 0.9, - velocity_clamp: float = 0.1): + cognitive_weight: float = 1.0, social_weight: float = 1.0, + momentum: float = 0.9, clip_value_position: float = 1.0): """ Initialize a particle in the PSO algorithm. @@ -20,30 +20,27 @@ def __init__(self, model: tf.keras.Model, input_data: tf.Tensor, target_class: i model (tf.keras.Model): The model to attack. input_data (tf.Tensor): The input data (image) to attack. target_class (int): The target class for misclassification. - epsilon (float): The perturbation bound (maximum amount the image can be altered). velocity (tf.Tensor, optional): The initial velocity for the particle's movement. Defaults to zero velocity if not provided. inertia_weight (float): The inertia weight for the velocity update. Default is 0.5. cognitive_weight (float): The cognitive weight for the velocity update. Default is 1.0. social_weight (float): The social weight for the velocity update. Default is 1.0. momentum (float): The momentum for the velocity update. Default is 0.9. - velocity_clamp (float): The velocity clamp for limiting the maximum velocity. Default is 0.1. """ self.model = model + self.num_iterations = num_iterations self.original_data = tf.identity(input_data) # Clone the input data self.target_class = target_class - self.epsilon = epsilon self.best_position = tf.identity(input_data) # Clone the input data self.best_score = -np.inf self.position = tf.identity(input_data) # Clone the input data self.velocity = velocity if velocity is not None else tf.zeros_like(input_data) self.history = [self.position] - + self.clip_value_position = clip_value_position # Class attributes self.inertia_weight = inertia_weight self.cognitive_weight = cognitive_weight self.social_weight = social_weight self.momentum = momentum - self.velocity_clamp = velocity_clamp def fitness(self) -> float: """ @@ -71,29 +68,29 @@ def update_velocity(self, global_best_position: tf.Tensor) -> None: cognitive = self.cognitive_weight * tf.random.uniform(self.position.shape) * (self.best_position - self.position) social = self.social_weight * tf.random.uniform(self.position.shape) * (global_best_position - self.position) - self.velocity = inertia + cognitive + social # Update velocity based on PSO formula - - # Apply momentum and velocity clamping - self.velocity = self.velocity * self.momentum # Apply momentum - self.velocity = tf.clip_by_value(self.velocity, -self.velocity_clamp, self.velocity_clamp) # Apply velocity clamp + # Apply momentum to velocity update: + self.velocity = self.momentum * self.velocity + inertia + cognitive + social # Apply momentum def update_position(self) -> None: """ Update the position of the particle based on the updated velocity. - Ensures that the position stays within the valid input range [0, 1] (normalized pixel values). + The position is updated directly without any bounds checking. """ - self.position = tf.clip_by_value(self.position + self.velocity, 0, 1) # Ensure position stays within bounds + self.position = self.position + self.velocity # Update position directly without clipping + self.position = tf.clip_by_value(self.position + self.velocity, 0, 1) # Ensure position stays within bounds self.history.append(tf.identity(self.position)) # Store the position history + self.position = tf.clip_by_value(self.position, -self.clip_value_position, self.clip_value_position) def evaluate(self) -> None: """ Evaluate the fitness of the current particle and update its personal best. The fitness score is calculated using the target class probability. If the current fitness score - is better than the personal best, update the personal best position and score. + is better than the personal best, update the best position and score. """ score = self.fitness() # Get the current fitness score based on the perturbation if score > self.best_score: # If score is better than the personal best, update the best position self.best_score = score self.best_position = tf.identity(self.position) # Clone the current position + diff --git a/Adversarial_Observation/Swarm.py b/Adversarial_Observation/Swarm.py index 6ede75b..eca34fe 100755 --- a/Adversarial_Observation/Swarm.py +++ b/Adversarial_Observation/Swarm.py @@ -2,7 +2,7 @@ import logging from typing import List from Adversarial_Observation.BirdParticle import BirdParticle - +from tqdm import tqdm import tensorflow as tf import numpy as np import matplotlib.pyplot as plt @@ -15,10 +15,10 @@ class ParticleSwarm: to misclassify it into the target class. """ - def __init__(self, model: tf.keras.Model, input_set: np.ndarray, target_class: int, - num_iterations: int = 20, epsilon: float = 0.8, save_dir: str = 'results', - inertia_weight: float = 0.5, cognitive_weight: float = .5, social_weight: float = .5, - momentum: float = 0.9, velocity_clamp: float = 0.1): + def __init__(self, model: tf.keras.Model, input_set: np.ndarray, starting_class: int, target_class: int, + num_iterations: int = 20, save_dir: str = 'results', inertia_weight: float = 0.5, + cognitive_weight: float = .5, social_weight: float = .5, momentum: float = 0.9, + clip_value_position: float = 0.2, enable_logging: bool = False, device: str = 'cpu'): """ Initialize the Particle Swarm Optimization (PSO) for adversarial attacks. @@ -27,25 +27,27 @@ def __init__(self, model: tf.keras.Model, input_set: np.ndarray, target_class: i input_set (np.ndarray): The batch of input images to attack as a NumPy array. target_class (int): The target class for misclassification. num_iterations (int): The number of optimization iterations. - epsilon (float): The perturbation bound. save_dir (str): The directory to save output images and logs. inertia_weight (float): The inertia weight for the velocity update. cognitive_weight (float): The cognitive weight for the velocity update. social_weight (float): The social weight for the velocity update. momentum (float): The momentum for the velocity update. - velocity_clamp (float): The velocity clamp to limit the velocity. + clip_value_position (float): The velocity clamp to limit the velocity. + device (str): The device for computation ('cpu' or 'gpu'). Default is 'cpu'. """ self.model = model self.input_set = tf.convert_to_tensor(input_set, dtype=tf.float32) # Convert NumPy array to TensorFlow tensor + self.start_class = starting_class # The starting class index self.target_class = target_class # The target class index self.num_iterations = num_iterations - self.epsilon = epsilon # Perturbation bound self.save_dir = save_dir # Directory to save perturbed images - + self.enable_logging = enable_logging + self.device = device # Device ('cpu' or 'gpu') + self.particles: List[BirdParticle] = [ - BirdParticle(model, self.input_set[i:i + 1], target_class, epsilon, - inertia_weight=inertia_weight, cognitive_weight=cognitive_weight, - social_weight=social_weight, momentum=momentum, velocity_clamp=velocity_clamp) + BirdParticle(model, self.input_set[i:i + 1], target_class, + inertia_weight=inertia_weight, cognitive_weight=cognitive_weight, social_weight=social_weight, + momentum=momentum, clip_value_position=clip_value_position) for i in range(len(input_set)) ] @@ -53,18 +55,20 @@ def __init__(self, model: tf.keras.Model, input_set: np.ndarray, target_class: i self.global_best_score = -float('inf') # Initialize with a very low score self.fitness_history: List[float] = [] # History of fitness scores to track progress - self.setup_logging() # Set up logging - self.log_progress(-1) # Log initial state (before optimization) + + # Make output folder + iteration_dir = self.save_dir + os.makedirs(iteration_dir, exist_ok=True) + if self.enable_logging: + self.setup_logging() + self.log_progress(-1) def setup_logging(self): """ Set up logging for each iteration. Each iteration will have a separate log file. Also prints logs to the terminal. """ - iteration_dir = self.save_dir - os.makedirs(iteration_dir, exist_ok=True) - - log_file = os.path.join(iteration_dir, f'iteration_log.log') + log_file = os.path.join(self.save_dir, f'iteration_log.log') self.logger = logging.getLogger() # Create a file handler to save logs to a file @@ -86,7 +90,6 @@ def setup_logging(self): self.logger.info(f"Model: {self.model.__class__.__name__}") self.logger.info(f"Target Class: {self.target_class} (This is the class we want to misclassify the image into)") self.logger.info(f"Number of Iterations: {self.num_iterations} (Optimization steps)") - self.logger.info(f"Epsilon (perturbation bound): {self.epsilon} (Maximum perturbation allowed)") self.logger.info(f"Save Directory: {self.save_dir}") self.logger.info(f"{'*' * 60}") @@ -97,13 +100,16 @@ def log_progress(self, iteration: int): Args: iteration (int): The current iteration of the optimization process. """ + if not self.enable_logging: + return + # Log the header for the iteration self.logger.info(f"\n{'-'*60}") self.logger.info(f"Iteration {iteration + 1}/{self.num_iterations}") self.logger.info(f"{'='*60}") # Table header - header = f"{'Particle':<10}{'Original Pred':<15}{'Perturbed Pred':<18}{'Orig Target Prob':<20}" \ + header = f"{'Particle':<10}{'Original Pred':<15}{'Perturbed Pred':<18}{'Orig Start Prob':<20}{'Pert Start Prob':<20}{'Orig Target Prob':<20}" \ f"{'Pert Target Prob':<20}{'Personal Best':<20}{'Global Best':<20}" self.logger.info(header) self.logger.info(f"{'-'*60}") @@ -121,51 +127,96 @@ def log_progress(self, iteration: int): # Get softmax probabilities original_probs = tf.nn.softmax(original_output, axis=1) perturbed_probs = tf.nn.softmax(perturbed_output, axis=1) - + + # Get starting class probabilities (how far away we've moved) + original_prob_start = original_probs[0, self.start_class].numpy().item() + perturbed_prob_start = perturbed_probs[0, self.start_class].numpy().item() + # Get target class probabilities original_prob_target = original_probs[0, self.target_class].numpy().item() perturbed_prob_target = perturbed_probs[0, self.target_class].numpy().item() # Log each particle's data in a formatted row - self.logger.info(f"{i+1:<10}{original_pred:<15}{perturbed_pred:<18}{original_prob_target:<20.4f}" - f"{perturbed_prob_target:<20.4f}{particle.best_score:<20.4f}{self.global_best_score:<20.4f}") + self.logger.info(f"{i+1:<10}{original_pred:<15}{perturbed_pred:<18}{original_prob_start:<20.4f}{perturbed_prob_start:<20.4f}" + f"{original_prob_target:<20.4f}{perturbed_prob_target:<20.4f}{particle.best_score:<20.4f}{self.global_best_score:<20.4f}") self.logger.info(f"{'='*60}") - def save_images(self, iteration: int): + def optimize(self): """ - Save the perturbed images for the current iteration. - - Args: - iteration (int): The current iteration number. + Run the Particle Swarm Optimization process to optimize the perturbations. """ - iteration_folder = os.path.join(self.save_dir, f"iteration_{iteration + 1}") - os.makedirs(iteration_folder, exist_ok=True) - - for i, particle in enumerate(self.particles): - # Convert TensorFlow tensor to NumPy array - position_numpy = particle.position.numpy() - # Remove extra batch dimension (if it exists) - position_numpy = np.squeeze(position_numpy) # Now shape is (28, 28) - plt.imsave(os.path.join(iteration_folder, f"perturbed_image_{i + 1}.png"), position_numpy, cmap="gray", vmin=0, vmax=1) + with tf.device(f"/{self.device}:0"): # Use the GPU/CPU based on the flag + for iteration in tqdm(range(self.num_iterations), desc="Running Swarm"): + # Update particles and velocities, evaluate them, and track global best + for particle in self.particles: + particle.evaluate() + particle.update_velocity(self.global_best_position) # No need to pass inertia_weight explicitly + particle.update_position() + + # Update the global best based on the personal best scores of particles + best_particle = max(self.particles, key=lambda p: p.best_score) + if best_particle.best_score > self.global_best_score: + self.global_best_score = best_particle.best_score + self.global_best_position = tf.identity(best_particle.best_position) + + self.log_progress(iteration) - def optimize(self): + def reduce_excess_perturbations(self, original_img: np.ndarray, target_label: int, model_shape: tuple = (1, 28, 28, 1)) -> np.ndarray: """ - Run the Particle Swarm Optimization process to optimize the perturbations. + Reduces excess perturbations in adversarial images while ensuring the misclassification remains. + + Args: + original_img (np.ndarray): The original (clean) image. + target_label (int): The label we want the adversarial image to produce. + model_shape (tuple): The expected shape of the model input. + + Returns: + list[np.ndarray]: A list of denoised adversarial images. """ - for iteration in range(self.num_iterations): - - # Update particles and velocities, evaluate them, and track global best - for particle in self.particles: - particle.evaluate() - particle.update_velocity(self.global_best_position) # No need to pass inertia_weight explicitly - particle.update_position() - - # Update the global best based on the personal best scores of particles - best_particle = max(self.particles, key=lambda p: p.best_score) - if best_particle.best_score > self.global_best_score: - self.global_best_score = best_particle.best_score - self.global_best_position = tf.identity(best_particle.best_position) - - self.save_images(iteration) - self.log_progress(iteration) + denoised_adv = [] + total_pixels = np.prod(original_img.shape) # Total pixels in the image + + for adv_particle in tqdm(self.particles, desc="Processing Particles", unit="particle"): + adv_img = np.copy(adv_particle.position).reshape(original_img.shape) # Copy to avoid modifying the original + + if original_img.shape != adv_img.shape: + raise ValueError("original_img and adv_img must have the same shape.") + + # Iterate over every pixel coordinate in the image with tqdm progress bar + with tqdm(total=total_pixels, desc="Processing Pixels", unit="pixel", leave=False) as pbar: + for idx in np.ndindex(original_img.shape): + if original_img[idx] == adv_img[idx]: # Ignore unchanged pixels + pbar.update(1) + continue + + # Store old adversarial value + old_val = adv_img[idx] + + # Try restoring the pixel to original + adv_img[idx] = original_img[idx] + + # Check if the label is still the target label + output = self.model(adv_img.reshape(model_shape)) + softmax_output = tf.nn.softmax(tf.squeeze(output), axis=0).numpy() + current_label = np.argmax(softmax_output) + + if current_label != target_label: + # If misclassification is lost, try halfway adjustment + adv_img[idx] = old_val + adv_img[idx] += (original_img[idx] - adv_img[idx]) * 0.5 + + # Recheck if the label is still the target + output = self.model(adv_img.reshape(model_shape)) + softmax_output = tf.nn.softmax(tf.squeeze(output), axis=0).numpy() + current_label = np.argmax(softmax_output) + + if current_label != target_label: + # If misclassification is still lost, revert back + adv_img[idx] = old_val + + pbar.update(1) # Update pixel progress + + denoised_adv.append(adv_img) + + return denoised_adv diff --git a/manuscripts/Posion25/.gitignore b/manuscripts/Posion25/.gitignore new file mode 100644 index 0000000..bcce73b --- /dev/null +++ b/manuscripts/Posion25/.gitignore @@ -0,0 +1,6 @@ +AudioMNIST* +results/ +*.pyc +__pycache__/ +*.pkl +*.keras \ No newline at end of file diff --git a/manuscripts/Posion25/1_trainModel.py b/manuscripts/Posion25/1_trainModel.py new file mode 100644 index 0000000..955dd8e --- /dev/null +++ b/manuscripts/Posion25/1_trainModel.py @@ -0,0 +1,28 @@ +import argparse +import os +from train import load_data, train_model, evaluate_model, train_model_and_save + + +def main(): + # Command-line arguments + parser = argparse.ArgumentParser() + + # Data and model type arguments + parser.add_argument('--data', type=str, choices=['MNIST', 'MNIST_Audio'], required=True, help='Dataset to use') + parser.add_argument('--model_type', type=str, choices=['normal', 'complex', 'complex_augmented'], required=True, help='Model type to use') + + # Training information arguments + parser.add_argument('--batch_size', type=int, default=1024, help='Batch size for training') + parser.add_argument('--epochs', type=int, default=5, help='Number of epochs for training') + + # Folder saving argument + parser.add_argument('--save_dir', type=str, default='results', help='Directory to save model and results') + + # Parse arguments + args = parser.parse_args() + + # Train the model and evaluate it + train_model_and_save(args) + +if __name__ == '__main__': + main() diff --git a/manuscripts/Posion25/2_attackModel.py b/manuscripts/Posion25/2_attackModel.py new file mode 100644 index 0000000..d720a48 --- /dev/null +++ b/manuscripts/Posion25/2_attackModel.py @@ -0,0 +1,102 @@ +import argparse +import os +import pickle +from taint import adversarial_attack_blackbox +from analysis import * +from train import train_model_and_save +import torch +import tensorflow as tf + +def attack_model(args, model, test_ds, save_dir, num_data=10): + # Get the labels by iterating through a batch from the test_ds + first_batch = next(iter(test_ds)) # Get the first batch + images, labels = first_batch # Unpack the images and labels from the first batch + + # Check if labels are a TensorFlow tensor or PyTorch tensor + if isinstance(labels, tf.Tensor): + # If using TensorFlow, convert labels to class indices (from one-hot encoded) + labels = tf.argmax(labels, axis=1).numpy() # Get class indices from one-hot encoded labels + elif isinstance(labels, torch.Tensor): + # If using PyTorch, convert labels to class indices (from one-hot encoded) + labels = torch.argmax(labels, dim=1).cpu().numpy() # Get class indices from one-hot encoded labels + + # Convert labels to a set of unique outputs + unique_outputs = set(labels) # Convert to a Python set for unique labels + + # Continue with the rest of the attack logic + for output in unique_outputs: + instances = [i for i, label in enumerate(labels) if label == output][:num_data] # Select `num_data` instances with the current output label + + for image_index in instances: + # Create a subdirectory for each image_index and its original output label + sub_dir = os.path.join(save_dir, f'image_{image_index}_label_{output}') + + # Ensure the directory exists + os.makedirs(sub_dir, exist_ok=True) + + # Correct dynamic pickle filename to include the original and target class + pickle_filename = f'attacker_{image_index}_{output}.pkl' + pickle_path = os.path.join(sub_dir, pickle_filename) + + # Check if the attacker pickle already exists for this image_index and output + if os.path.exists(pickle_path): + with open(pickle_path, 'rb') as f: + attacker = pickle.load(f) + print(f"Loaded attacker for image {image_index} with label {output} from {pickle_path}") + else: + print(f"Running adversarial attack for image {image_index} with label {output}...") + + # For the current `output`, target all other classes + for target_output in unique_outputs: + if target_output != output: # We want to target all other outputs + for _ in range(num_data): # Attack the target output `num_data` times + target_sub_dir = os.path.join(sub_dir, f'target_{target_output}') + os.makedirs(target_sub_dir, exist_ok=True) # Create a subdir for each target class + + # Correct dynamic pickle filename to include the original and target class + target_pickle_filename = f'attacker_{image_index}_{output}_to_{target_output}.pkl' + target_pickle_path = os.path.join(target_sub_dir, target_pickle_filename) + + # Perform the adversarial attack targeting `target_output` + attacker = adversarial_attack_blackbox( + model=model, + dataset=test_ds, + image_index=image_index, + output_dir=target_sub_dir, + num_iterations=args.iterations, + num_particles=args.particles, + target_class=target_output # Specify the target class for the attack + ) + print(f"Adversarial attack completed for image {image_index} targeting class {target_output}") + + # After performing the attack, save the attacker object to a pickle file + with open(target_pickle_path, 'wb') as f: + pickle.dump(attacker, f) + print(f"Saved attacker for image {image_index} with label {output} targeting {target_output} to {target_pickle_path}") + +def main(): + # Command-line arguments + parser = argparse.ArgumentParser() + + # Data and model type arguments (to align with the ones used in the training script) + parser.add_argument('--data', type=str, choices=['MNIST', 'MNIST_Audio'], required=True, help='Dataset to use') + parser.add_argument('--model_type', type=str, choices=['normal', 'complex', 'complex_augmented'], required=True, help='Model type to use') + + # Attack parameters + parser.add_argument('--iterations', type=int, default=10, help='Number of iterations for attack') + parser.add_argument('--particles', type=int, default=100, help='Number of particles for attack') + + # Folder saving argument + parser.add_argument('--save_dir', type=str, default='results', help='Directory to save model and results') + + # Parse arguments + args = parser.parse_args() + + # First, train the model and get the necessary details for attack + model, test_ds, save_dir, model_path = train_model_and_save(args) + + # Perform the adversarial attack + attack_model(args, model, test_ds, save_dir) + +if __name__ == '__main__': + main() diff --git a/manuscripts/Posion25/3_stats.py b/manuscripts/Posion25/3_stats.py new file mode 100644 index 0000000..e45660f --- /dev/null +++ b/manuscripts/Posion25/3_stats.py @@ -0,0 +1,146 @@ +import argparse +import os +import numpy as np +import pickle +import tensorflow as tf +import json +from tqdm import tqdm +from taint import adversarial_attack_blackbox +from train import train_model_and_save +from analysis import get_softmax_stats, save_softmax_stats + + +def collect_statistics(model, dataset, model_type, attack_iterations=10, attack_particles=100, image_index=0, output_dir='results'): + """ + Run adversarial attack for the given model and dataset combination and collect statistics. + + Args: + - model: The model to attack. + - dataset: The test dataset. + - model_type: The model type (for logging). + - attack_iterations: Number of iterations for attack. + - attack_particles: Number of particles for attack. + - image_index: Index of the image to perform the attack on. + - output_dir: Directory to save results. + + Returns: + - statistics: A dictionary with softmax output, attack success, and other relevant data. + """ + pickle_path = os.path.join(output_dir, f"{model_type}_attacker.pkl") + + dataset_list = list(dataset.as_numpy_iterator()) + all_images, all_labels = zip(*dataset_list) + all_images = np.concatenate(all_images, axis=0) + all_labels = np.concatenate(all_labels, axis=0) + + if image_index < 0 or image_index >= len(all_images): + raise ValueError(f"Image index {image_index} out of range") + + single_input = all_images[image_index] + single_target = np.argmax(all_labels[image_index]) + target_class = (single_target + 1) % 10 # Attack a different class + + # Load the attacker if pickle exists + if os.path.exists(pickle_path): + with open(pickle_path, 'rb') as f: + attacker = pickle.load(f) + print(f"Loaded attacker from {pickle_path}") + else: + # If attacker doesn't exist, run the attack and save it + adversarial_attack_blackbox( + model, dataset, image_index=image_index, output_dir=output_dir, + num_iterations=attack_iterations, num_particles=attack_particles + ) + with open(pickle_path, 'wb') as f: + pickle.dump(attacker, f) + print(f"Saved attacker to {pickle_path}") + + # Analyze the attack results + softmax_output, max_val, max_class = get_softmax_stats(model, single_input) + attack_success = max_class != target_class # Attack success is when max_class differs from target class + + stats = { + "model_type": model_type, + "target_class": target_class, + "attack_success": attack_success, + "softmax_output": softmax_output.tolist(), + "max_confidence": max_val, + "max_class": max_class, + } + + # Save softmax statistics for this model and image + save_softmax_stats(os.path.join(output_dir, f"{model_type}_softmax_stats.tsv"), softmax_output, max_class, max_val, target_class) + + return stats + + +def get_model_types_for_dataset(dataset): + """ + Dynamically search for model types for a given dataset. + + Args: + - dataset: The dataset name, e.g., 'MNIST' or 'AudioMNIST'. + + Returns: + - model_types: List of available model types for the dataset. + """ + model_types = ['normal', 'complex', 'complex_augmented'] # Can be extended if needed + + return model_types + + +def run_statistics(args): + # Define output directory to save results + results_dir = os.path.join(args.save_dir, f"{args.data}_stats") + os.makedirs(results_dir, exist_ok=True) + + # Store statistics for all combinations of dataset and model types + all_stats = [] + + # Get all possible model types for the dataset + model_types = get_model_types_for_dataset(args.data) + + # Iterate through all pairs of model types + for model_type in model_types: + print(f"Training model: {model_type}...") + model, test_ds, _, model_path = train_model_and_save(args) # Train the model for the current type + + # Run adversarial attack and collect stats for each combination of different model types + for other_model_type in model_types: + if model_type != other_model_type: # Skip 1-1 combinations + print(f"Attacking {other_model_type} model with {model_type} dataset...") + stats = collect_statistics(model, test_ds, other_model_type, attack_iterations=args.iterations, attack_particles=args.particles, output_dir=results_dir) + all_stats.append(stats) + + # Save the collected statistics as a JSON file for later analysis + stats_file = os.path.join(results_dir, f"{args.data}_attack_stats.json") + with open(stats_file, 'w') as f: + json.dump(all_stats, f, indent=4) + + print(f"Statistics saved to {stats_file}") + + +def main(): + # Command-line arguments + parser = argparse.ArgumentParser() + + # Data and model type arguments + parser.add_argument('--data', type=str, choices=['MNIST', 'MNIST_Audio'], required=True, help='Dataset to use') + parser.add_argument('--model_type', type=str, choices=['normal', 'complex', 'complex_augmented'], required=True, help='Model type to use') + + # Attack parameters + parser.add_argument('--iterations', type=int, default=10, help='Number of iterations for attack') + parser.add_argument('--particles', type=int, default=100, help='Number of particles for attack') + + # Folder saving argument + parser.add_argument('--save_dir', type=str, default='results', help='Directory to save model and results') + + # Parse arguments + args = parser.parse_args() + + # Run the statistics collection + run_statistics(args) + + +if __name__ == '__main__': + main() diff --git a/manuscripts/Posion25/__init__.py b/manuscripts/Posion25/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/manuscripts/Posion25/analysis.py b/manuscripts/Posion25/analysis.py new file mode 100644 index 0000000..8178963 --- /dev/null +++ b/manuscripts/Posion25/analysis.py @@ -0,0 +1,126 @@ +import numpy as np +import json +from tqdm import tqdm +import os +import tensorflow as tf +import matplotlib.pyplot as plt +from scipy.io import wavfile + + +def ensure_dir(directory): + os.makedirs(directory, exist_ok=True) + + +def save_ndarray_visualization(path, array, mode="auto", sample_rate=16000, **kwargs): + """ + Try to save an ndarray visualization depending on its shape. + For 2D/3D (image-like), use imsave. + For 1D or other, use line plot or imshow fallback. + Additionally, if the data looks like audio waveform (1D), + save it as a WAV audio file (AudioMNIST style). + + Params: + path: base path (extension used for image, wav saved alongside) + array: ndarray data + mode: "auto" or "image" (forces imsave) + sample_rate: used for wav saving (default 16kHz) + kwargs: extra params for imsave + """ + array = np.squeeze(array) + + try: + if mode == "image" or (mode == "auto" and array.ndim in [2, 3]): + # Image-like data: save as image + plt.imsave(path, array, **kwargs) + elif array.ndim == 1: + # 1D data - audio waveform? + # Save waveform plot + plt.figure() + plt.plot(array) + plt.title(os.path.basename(path)) + plt.savefig(path) + plt.close() + + # Also save as wav audio file (normalize to int16) + wav_path = os.path.splitext(path)[0] + ".wav" + audio = array + # Normalize audio to int16 range + max_abs = np.max(np.abs(audio)) + if max_abs > 0: + audio_norm = audio / max_abs + else: + audio_norm = audio + audio_int16 = (audio_norm * 32767).astype(np.int16) + wavfile.write(wav_path, sample_rate, audio_int16) + else: + # fallback for other dims: show as imshow + plt.figure() + plt.imshow(array, aspect='auto') + plt.title(os.path.basename(path)) + plt.savefig(path) + plt.close() + except Exception as e: + print(f"Failed to save visual for {path}: {e}") + + +def save_array_csv(path, array): + np.savetxt(path, [array.flatten()], delimiter=',') + + +def get_softmax_stats(model, x): + """ + Computes softmax stats for a single input (no batch). + Automatically reshapes to expected 4D (for CNNs) or leaves as-is. + """ + x = np.array(x) + x = np.squeeze(x) + + if x.ndim == 3: # likely [H, W, C] + x = x[np.newaxis, ...] # Add batch dim → [1, H, W, C] + elif x.ndim == 2: # likely [H, W] + x = x[np.newaxis, ..., np.newaxis] # Add batch and channel → [1, H, W, 1] + elif x.ndim == 1: + x = x[np.newaxis, :] # Just batch → [1, features] + elif x.ndim == 4: + pass # already [batch, H, W, C] + else: + raise ValueError(f"Unsupported input shape: {x.shape}") + + x_tensor = tf.convert_to_tensor(x, dtype=tf.float32) + logits = model(x_tensor) + softmax_output = tf.nn.softmax(tf.squeeze(logits)).numpy() + max_val = float(np.max(softmax_output)) + max_class = int(np.argmax(softmax_output)) + return softmax_output, max_val, max_class + + +def predict_class(model, x): + """ + Predicts class index. Same auto-shape handling as get_softmax_stats. + """ + x = np.array(x) + x = np.squeeze(x) + + if x.ndim == 3: + x = x[np.newaxis, ...] + elif x.ndim == 2: + x = x[np.newaxis, ..., np.newaxis] + elif x.ndim == 1: + x = x[np.newaxis, :] + elif x.ndim == 4: + pass + else: + raise ValueError(f"Unsupported input shape: {x.shape}") + + x_tensor = tf.convert_to_tensor(x, dtype=tf.float32) + logits = model(x_tensor) + return tf.argmax(logits[0]).numpy() + + +def save_softmax_stats(path, softmax_output, max_class, max_val, target): + with open(path, 'w') as f: + f.write("Class\tConfidence\n") + for i, val in enumerate(softmax_output): + f.write(f"{i}\t{val}\n") + f.write(f"\nBest Class\t{max_class}\nMax Confidence\t{max_val}\nTarget Class\t{target}\n") + diff --git a/manuscripts/Posion25/models.py b/manuscripts/Posion25/models.py new file mode 100644 index 0000000..40584b2 --- /dev/null +++ b/manuscripts/Posion25/models.py @@ -0,0 +1,137 @@ + +def load_MNIST_model(model_path=None): + from tensorflow.keras.models import Sequential + from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout, BatchNormalization + from tensorflow.keras.optimizers import Adam + + model = Sequential([ + Conv2D(32, (3, 3), padding='same', activation='relu', input_shape=(28, 28, 1)), + MaxPooling2D((2, 2)), + Conv2D(64, (3, 3), padding='same', activation='relu'), + MaxPooling2D((2, 2)), + Flatten(), + Dense(128, activation='relu'), + Dropout(0.5), + Dense(10, activation='softmax') + ]) + + model.compile(optimizer=Adam(), loss='categorical_crossentropy', metrics=['accuracy']) + return model + +def load_complex_MNIST_model(model_path=None): + from tensorflow.keras.models import Sequential + from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout, BatchNormalization, GlobalAveragePooling2D + from tensorflow.keras.optimizers import Adam + + model = Sequential([ + # First block: Convolution + Pooling + Conv2D(32, (3, 3), padding='same', activation='relu', input_shape=(28, 28, 1)), + BatchNormalization(), + MaxPooling2D((2, 2)), + + # Second block: Deeper convolution + Pooling + Conv2D(64, (3, 3), padding='same', activation='relu'), + BatchNormalization(), + MaxPooling2D((2, 2)), + + # Third block: Deeper convolution + Pooling + Conv2D(128, (3, 3), padding='same', activation='relu'), + BatchNormalization(), + MaxPooling2D((2, 2)), + + # Fourth block: Additional convolutions to add complexity + Conv2D(256, (3, 3), padding='same', activation='relu'), + BatchNormalization(), + MaxPooling2D((2, 2)), + + # Global Average Pooling instead of Flatten for better generalization + GlobalAveragePooling2D(), + + # Fully connected layers + Dense(512, activation='relu'), + Dropout(0.5), + + Dense(256, activation='relu'), + Dropout(0.4), + + Dense(128, activation='relu'), + Dropout(0.3), + + # Output layer + Dense(10, activation='softmax') + ]) + + # Compile the model with Adam optimizer and categorical crossentropy loss + model.compile(optimizer=Adam(), loss='categorical_crossentropy', metrics=['accuracy']) + + return model + + +def load_AudioMNIST_model(model_path=None): + from tensorflow.keras.models import Sequential + from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout, BatchNormalization + from tensorflow.keras.optimizers import Adam + from tensorflow.keras.layers import Conv1D, MaxPooling1D + + + model = Sequential([ + Conv1D(32, kernel_size=5, activation='relu', input_shape=(16000, 1)), + MaxPooling1D(pool_size=2), + Conv1D(64, kernel_size=5, activation='relu'), + MaxPooling1D(pool_size=2), + Flatten(), + Dense(128, activation='relu'), + Dropout(0.3), + Dense(10, activation='softmax') + ]) + + model.compile(optimizer=Adam(), loss='categorical_crossentropy', metrics=['accuracy']) + return model + +def load_complex_AudioMNIST_model(model_path=None): + from tensorflow.keras.models import Sequential + from tensorflow.keras.layers import Conv1D, MaxPooling1D, Flatten, Dense, Dropout, BatchNormalization + from tensorflow.keras.optimizers import Adam + + model = Sequential([ + # First block: Convolution + Pooling + Conv1D(64, kernel_size=5, activation='relu', input_shape=(16000, 1)), + BatchNormalization(), + MaxPooling1D(pool_size=2), + + # Second block: Deeper convolution + Pooling + Conv1D(128, kernel_size=5, activation='relu'), + BatchNormalization(), + MaxPooling1D(pool_size=2), + + # Third block: Even deeper convolution + Pooling + Conv1D(256, kernel_size=5, activation='relu'), + BatchNormalization(), + MaxPooling1D(pool_size=2), + + # Fourth block: Additional convolution for higher complexity + Conv1D(512, kernel_size=5, activation='relu'), + BatchNormalization(), + MaxPooling1D(pool_size=2), + + # Flatten and fully connected layers + Flatten(), + + # Fully connected layer with more units + Dense(1024, activation='relu'), + Dropout(0.5), + + Dense(512, activation='relu'), + Dropout(0.4), + + Dense(256, activation='relu'), + Dropout(0.3), + + # Output layer + Dense(10, activation='softmax') + ]) + + # Compile the model with Adam optimizer and categorical crossentropy loss + model.compile(optimizer=Adam(), loss='categorical_crossentropy', metrics=['accuracy']) + + return model diff --git a/manuscripts/Posion25/runall.sh b/manuscripts/Posion25/runall.sh new file mode 100644 index 0000000..46d2bed --- /dev/null +++ b/manuscripts/Posion25/runall.sh @@ -0,0 +1,26 @@ +#!/bin/bash + +# Define available datasets and model types +datasets=("MNIST") +# "MNIST_Audio") +model_types=("normal" "complex") +# "complex_augmented") +save_dir="results" + +# Iterate over all combinations of datasets and model types +for dataset in "${datasets[@]}"; do + for model_type in "${model_types[@]}"; do + echo "Training and attacking model for dataset: $dataset, model_type: $model_type" + + # Step 1: Train the model + python 1_trainModel.py --data $dataset --model_type $model_type --save_dir $save_dir + + # Step 2: Attack the model + python 2_attackModel.py --data $dataset --model_type $model_type --save_dir $save_dir + + echo "Finished training and attacking for $dataset - $model_type" + done + # python 3_stats.py --data $dataset --model_type $model_type --save_dir $save_dir + # echo "Statistics collected for $dataset - $model_type" +done + diff --git a/manuscripts/Posion25/setup.sh b/manuscripts/Posion25/setup.sh new file mode 100644 index 0000000..63ec0c2 --- /dev/null +++ b/manuscripts/Posion25/setup.sh @@ -0,0 +1,3 @@ +pip install librosa +conda install pytorch torchvision torchaudio pytorch-cuda=12.1 -c pytorch -c nvidia +# git clone https://github.com/soerenab/AudioMNIST.git \ No newline at end of file diff --git a/manuscripts/Posion25/taint.py b/manuscripts/Posion25/taint.py new file mode 100644 index 0000000..1740bac --- /dev/null +++ b/manuscripts/Posion25/taint.py @@ -0,0 +1,212 @@ +import os +import numpy as np +import tensorflow as tf +import pickle +from manuscripts.Posion25.analysis import * +from Adversarial_Observation.Swarm import ParticleSwarm +from analysis import * + +def adversarial_attack_blackbox(model, dataset, image_index, output_dir='results', num_iterations=30, num_particles=100, target_class=None): + + pickle_path = os.path.join(output_dir, 'attacker.pkl') + + dataset_list = list(dataset.as_numpy_iterator()) + all_images, all_labels = zip(*dataset_list) + all_images = np.concatenate(all_images, axis=0) + all_labels = np.concatenate(all_labels, axis=0) + + if image_index < 0 or image_index >= len(all_images): + raise ValueError(f"Image index {image_index} out of range") + + single_input = all_images[image_index] + single_target = np.argmax(all_labels[image_index]) + if target_class is None: + target_class = (single_target + 1) % 10 + + input_set = np.stack([ + single_input + (np.random.uniform(0, 1, single_input.shape) * (np.random.rand(*single_input.shape) < 0.9)) + for _ in range(num_particles) + ]) + + if os.path.exists(pickle_path): + with open(pickle_path, 'rb') as f: + attacker = pickle.load(f) + print(f"Loaded attacker from {pickle_path}") + else: + + attacker = ParticleSwarm( + model=model, input_set=input_set, starting_class=single_target, + target_class=target_class, num_iterations=num_iterations, + save_dir=output_dir, inertia_weight=0.01 + ) + attacker.optimize() + # save the attacker as a pickle + with open(pickle_path, 'wb') as f: + pickle.dump(attacker, f) + print(f"Saved attacker to {pickle_path}") + print("Adversarial attack completed. Analyzing results...") + analyze_attack(attacker, single_input, target_class) + return attacker + +def best_analysis(attacker, original_data, target): + adv = attacker.global_best_position.numpy() + save_dir = attacker.save_dir + ensure_dir(save_dir) + + # save the original data + save_array_csv(os.path.join(save_dir, "original_data.csv"), original_data) + save_ndarray_visualization(os.path.join(save_dir, "original_data.png"), original_data) + save_softmax_stats(os.path.join(save_dir, "original_data_stats.tsv"), + *get_softmax_stats(attacker.model, original_data), target) + + # Save best particle + save_array_csv(os.path.join(save_dir, "best_particle.csv"), adv) + save_ndarray_visualization(os.path.join(save_dir, "best_particle.png"), adv) + save_softmax_stats(os.path.join(save_dir, "best_particle_stats.tsv"), + *get_softmax_stats(attacker.model, adv), target) + + # Save difference + diff = original_data - adv + save_array_csv(os.path.join(save_dir, "attack_vector_best_particle.csv"), diff) + save_ndarray_visualization( + os.path.join(save_dir, "attack_vector_best_particle.png"), + diff, mode="auto", cmap="seismic", vmin=-1, vmax=1 + ) + + # Save stats + softmax_output, max_val, max_class = get_softmax_stats(attacker.model, adv) + save_softmax_stats(os.path.join(save_dir, "best_particle_stats.tsv"), softmax_output, max_class, max_val, target) + +def denoise_analysis(attacker, original_data, denoised_data, target): + save_dir = attacker.save_dir + ensure_dir(save_dir) + + save_array_csv(os.path.join(save_dir, "best_particle-clean.csv"), denoised_data) + save_ndarray_visualization(os.path.join(save_dir, "best_particle-clean.png"), denoised_data) + + diff = original_data - denoised_data + save_array_csv(os.path.join(save_dir, "attack_vector_best_particle-clean.csv"), diff) + save_ndarray_visualization( + os.path.join(save_dir, "attack_vector_best_particle-clean.png"), + diff, mode="auto", cmap="seismic", vmin=-1, vmax=1 + ) + + softmax_output, max_val, max_class = get_softmax_stats(attacker.model, denoised_data) + save_softmax_stats(os.path.join(save_dir, "best_particle-clean_stats.tsv"), softmax_output, max_class, max_val, target) + +def reduce_excess_perturbations(attacker, original_data, adv_data, target_label): + """ + Reduce unnecessary perturbations in adversarial data while maintaining misclassification. + Works for data of any shape. + """ + original_data = np.squeeze(original_data) + adv_data = np.squeeze(adv_data) + + if original_data.shape != adv_data.shape: + raise ValueError("Original and adversarial data must have the same shape after squeezing.") + + adv_data = adv_data.copy() + changed = True + + # Wrap the iteration with tqdm to monitor progress + while changed: + changed = False + indices = list(np.ndindex(original_data.shape)) + for idx in tqdm(indices, desc="Reducing perturbations"): + if np.isclose(original_data[idx], adv_data[idx]): + continue + + original_val = original_data[idx] + current_val = adv_data[idx] + + # Try reverting completely + adv_data[idx] = original_val + pred = predict_class(attacker.model, adv_data) + + if pred != target_label: + # Try partial revert + adv_data[idx] = current_val + 0.5 * (original_val - current_val) + pred = predict_class(attacker.model, adv_data) + if pred != target_label: + adv_data[idx] = current_val + else: + changed = True + else: + changed = True + + return adv_data + +def reduce_excess_perturbations_scale(attacker, original_data, adv_data, target_label, tol=1e-4, max_iter=20): + original_data = np.squeeze(original_data) + adv_data = np.squeeze(adv_data) + + if original_data.shape != adv_data.shape: + raise ValueError("Original and adversarial data must have the same shape after squeezing.") + + perturbation = adv_data - original_data + low, high = 0.0, 1.0 + best_scaled = adv_data + + for _ in range(max_iter): + mid = (low + high) / 2.0 + candidate = original_data + mid * perturbation + pred = predict_class(attacker.model, candidate) + + if pred == target_label: + best_scaled = candidate + high = mid + else: + low = mid + + if abs(high - low) < tol: + break + + return best_scaled + +def full_analysis(attacker, input_data, target): + """ + Save full analysis of all particles' histories and confidences. + """ + analysis = { + "original_misclassification_input": input_data.tolist(), + "original_misclassification_target": int(target), + "particles": [] + } + + for i, particle in tqdm(enumerate(attacker.particles), total=len(attacker.particles), desc="Full Analysis"): + pdata = { + "particle_index": i, + "positions": [], + "confidence_values": [], + "max_output_values": [], + "max_output_classes": [], + "differences_from_original": [] + } + + for pos in tqdm(particle.history, desc=f"Particle {i} history", leave=False): + pos_np = pos.numpy() if isinstance(pos, tf.Tensor) else np.array(pos) + softmax, max_val, max_class = get_softmax_stats(attacker.model, pos_np) + diff = float(np.linalg.norm(pos_np - input_data)) + + pdata["positions"].append(pos_np.tolist()) + pdata["confidence_values"].append(softmax.tolist()) + pdata["max_output_values"].append(max_val) + pdata["max_output_classes"].append(max_class) + pdata["differences_from_original"].append(diff) + + analysis["particles"].append(pdata) + + path = os.path.join(attacker.save_dir, "attack_analysis.json") + with open(path, "w") as f: + json.dump(analysis, f, indent=4) + + print(f"Full analysis saved to {path}") + +def analyze_attack(attacker, original_img, target): + print("Starting analysis of the adversarial attack...") + best_analysis(attacker, original_img, target) + print("Reducing excess perturbations...") + reduced_img = reduce_excess_perturbations(attacker, original_img, attacker.global_best_position.numpy(), target) + denoise_analysis(attacker, original_img, reduced_img, target) + print("Performing full analysis of the attack...") + full_analysis(attacker, original_img, target) diff --git a/manuscripts/Posion25/train.py b/manuscripts/Posion25/train.py new file mode 100644 index 0000000..09c829a --- /dev/null +++ b/manuscripts/Posion25/train.py @@ -0,0 +1,176 @@ +import os +import glob +import time +import tqdm +import librosa +import numpy as np +import tensorflow as tf +from sklearn.model_selection import train_test_split +from sklearn.metrics import roc_auc_score, average_precision_score + +from tensorflow.keras.models import Sequential, load_model +from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, BatchNormalization, Dropout, Conv1D, MaxPooling1D +from tensorflow.keras.optimizers import Adam +from tensorflow.keras.datasets import mnist +from tensorflow.keras.preprocessing.image import ImageDataGenerator +from tensorflow.keras.utils import to_categorical +from models import * +SAMPLING_RATE = 16000 +NUM_CLASSES = 10 +RANDOM_SEED = 42 +BATCH_SIZE = 32 + +def normalize_mnist(x): + return x / 255.0 + +def pad_audio(audio, max_len): + return audio[:max_len] if len(audio) > max_len else np.pad(audio, (0, max_len - len(audio)), 'constant') + +def load_dataset(data_path): + data, labels = [], [] + max_len = 0 + wav_files = glob.glob(os.path.join(data_path, '*', '*.wav')) + + for audio_path in tqdm.tqdm(wav_files, desc="Loading audio files"): + audio, sr = librosa.load(audio_path, sr=SAMPLING_RATE) + data.append(audio) + try: + label = int(audio_path.split('/')[-1][0]) + labels.append(label) + except ValueError: + print(f"Skipping file {audio_path} due to invalid class format") + max_len = max(max_len, len(audio)) + + data = [pad_audio(audio, max_len) for audio in data] + return np.array(data), np.array(labels), max_len + +def prepare_datasets(data, labels, max_len, test_size=0.2): + data = np.array([pad_audio(audio, max_len) for audio in data])[..., np.newaxis] + x_train, x_test, y_train, y_test = train_test_split(data, labels, test_size=test_size, random_state=RANDOM_SEED) + x_train = x_train.astype(np.float32) / np.max(np.abs(x_train)) + x_test = x_test.astype(np.float32) / np.max(np.abs(x_test)) + y_train = to_categorical(y_train, NUM_CLASSES) + y_test = to_categorical(y_test, NUM_CLASSES) + train_ds = tf.data.Dataset.from_tensor_slices((x_train, y_train)).batch(BATCH_SIZE) + test_ds = tf.data.Dataset.from_tensor_slices((x_test, y_test)).batch(BATCH_SIZE) + return train_ds, test_ds, (x_test, y_test) + +def load_audio_mnist_data(data_path): + if not os.path.exists(data_path): + raise FileNotFoundError(f"Data path {data_path} does not exist.") + data, labels, max_len = load_dataset(data_path) + train_ds, test_ds, _ = prepare_datasets(data, labels, max_len) + return train_ds, test_ds, max_len + + +def load_data(batch_size=32, dataset_type="MNIST", use_augmentation=False): + if dataset_type == "MNIST": + (x_train, y_train), (x_test, y_test) = mnist.load_data() + x_train = normalize_mnist(x_train.reshape(-1, 28, 28, 1).astype('float32')) + x_test = normalize_mnist(x_test.reshape(-1, 28, 28, 1).astype('float32')) + y_train = to_categorical(y_train, 10) + y_test = to_categorical(y_test, 10) + + if use_augmentation: + # Data Augmentation with ImageDataGenerator + datagen = ImageDataGenerator( + rotation_range=10, + zoom_range=0.10, + width_shift_range=0.1, + height_shift_range=0.1 + ) + datagen.fit(x_train) + train_datagen = datagen.flow(x_train, y_train, batch_size=batch_size) + train_dataset = tf.data.Dataset.from_generator( + lambda: train_datagen, + output_signature=( + tf.TensorSpec(shape=(batch_size, 28, 28, 1), dtype=tf.float32), + tf.TensorSpec(shape=(batch_size, 10), dtype=tf.float32) + ) + ) + else: + train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train)).batch(batch_size) + + test_dataset = tf.data.Dataset.from_tensor_slices((x_test, y_test)).batch(batch_size) + return train_dataset, test_dataset + + elif dataset_type == "MNIST_Audio": + data_path = "./AudioMNIST/data" + train_ds, test_ds, max_len = load_audio_mnist_data(data_path) + return train_ds, test_ds + + else: + raise ValueError(f"Unsupported dataset type: {dataset_type}") + +def train_model(model, train_dataset, epochs=10): + model.compile(optimizer=Adam(0.001), loss='categorical_crossentropy', metrics=['accuracy']) + for epoch in range(epochs): + print(f"\nEpoch {epoch + 1}/{epochs}:") + start = time.time() + loss, acc, batches = 0, 0, 0 + for x, y in train_dataset: + l, a = model.train_on_batch(x, y) + loss += l + acc += a + batches += 1 + print(f"Loss: {loss/batches:.4f}, Accuracy: {acc/batches:.4f}, Time: {time.time()-start:.2f}s") + return model + +def evaluate_model(model, test_dataset): + y_true = [] + loss, acc = model.evaluate(test_dataset, verbose=0) + for _, y in test_dataset: + y_true.extend(np.argmax(y.numpy(), axis=1)) + + y_pred = model.predict(test_dataset, verbose=0) + auroc = roc_auc_score(to_categorical(y_true, NUM_CLASSES), y_pred, multi_class='ovr') + auprc = average_precision_score(to_categorical(y_true, NUM_CLASSES), y_pred) + print(f"Test Loss: {loss:.4f}, Accuracy: {acc:.4f}, AUROC: {auroc:.4f}, AUPRC: {auprc:.4f}") + +def train_model_and_save(args): + # Create folder name based on dataset and model type + folder_name = f"{args.data}_{args.model_type}" + + save_dir = os.path.join(args.save_dir, folder_name) + + # Ensure the save directory exists + os.makedirs(save_dir, exist_ok=True) + model_path = os.path.join(save_dir, f'{folder_name}.keras') + + # Load or train the model based on the data and model type + if args.data == 'MNIST': + if args.model_type == 'normal': + model = load_MNIST_model(model_path) + train_ds, test_ds = load_data(dataset_type="MNIST", use_augmentation=False) + elif args.model_type == 'complex': + model = load_complex_MNIST_model(model_path) + train_ds, test_ds = load_data(dataset_type="MNIST", use_augmentation=False) + elif args.model_type == 'complex_augmented': + model = load_complex_MNIST_model(model_path) + train_ds, test_ds = load_data(dataset_type="MNIST", use_augmentation=True) + elif args.data == 'MNIST_Audio': + if args.model_type == 'normal': + model = load_AudioMNIST_model(model_path) + train_ds, test_ds = load_data(dataset_type="MNIST_Audio", use_augmentation=False) + elif args.model_type == 'complex': + model = load_complex_AudioMNIST_model(model_path) + train_ds, test_ds = load_data(dataset_type="MNIST_Audio", use_augmentation=False) + elif args.model_type == 'complex_augmented': + model = load_complex_AudioMNIST_model(model_path) + train_ds, test_ds = load_data(dataset_type="MNIST_Audio", use_augmentation=True) + + # Check if model weights exist, if not, train and save + if not os.path.exists(model_path): + print("Training the model...") + model = train_model(model, train_ds, epochs=args.epochs) + model.save(model_path) + print(f"Model saved to {model_path}") + else: + print(f"Model found. Loading weights from {model_path}") + model.load_weights(model_path) + + # Evaluate the model + print("Evaluating the model...") + evaluate_model(model, test_ds) + + return model, test_ds, save_dir, model_path diff --git a/setup.py b/setup.py index 5067cda..cda2b3e 100644 --- a/setup.py +++ b/setup.py @@ -18,7 +18,7 @@ author_email="Jamil-gafur@uiowa.edu", license='MIT', packages=[ - "Adversarial_Observation", + "Adversarial_Observation", ], python_requires='>=3.6', install_requires=[ @@ -28,6 +28,8 @@ "torch", "pandas", "scikit-learn", + "tensorflow[and-cuda]", + "librosa", ], examples_require=[ "torchvision", diff --git a/taint_MNIST.py b/taint_MNIST.py index 88fba29..76e2908 100644 --- a/taint_MNIST.py +++ b/taint_MNIST.py @@ -2,65 +2,58 @@ import time import numpy as np from tqdm import tqdm +<<<<<<< HEAD +import tensorflow as tf +from tensorflow.keras.models import Sequential +from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, BatchNormalization, Dropout +from tensorflow.keras.models import load_model # Use `load_model` from Keras +from tensorflow.keras.preprocessing.image import ImageDataGenerator +import numpy as np +import matplotlib.pyplot as plt +======= +>>>>>>> 2e3720e3acf18d382025057cfe30b34846348776 import tensorflow as tf from tensorflow.keras.models import load_model as load_keras_model -from tensorflow.keras.datasets import mnist +from tensorflow.keras.datasets import mnist, cifar10 from tensorflow.keras.utils import to_categorical from tensorflow.keras.models import Sequential from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense from tensorflow.keras.optimizers import Adam from Adversarial_Observation.utils import seed_everything from Adversarial_Observation import AdversarialTester, ParticleSwarm - -from sklearn.metrics import roc_auc_score, average_precision_score +import os +import json import matplotlib.pyplot as plt +from sklearn.metrics import roc_auc_score, average_precision_score -import sys, os, json -def evaluate_model(model, test_dataset): +# Helper Functions for Data Loading +def normalize_images(x, dataset_type='mnist'): """ - Evaluates the model on the test dataset and prints loss, accuracy, auROC, and auPRC. + Normalize images for a given dataset type (MNIST or CIFAR-10). """ - y_true = [] - - # Compute loss and accuracy - loss, accuracy = model.evaluate(test_dataset, verbose=0) - - # Collect ground truth labels - for images, labels in test_dataset: - y_true.extend(np.argmax(labels.numpy(), axis=1)) - - # Predict all at once to suppress excessive output - y_pred = model.predict(test_dataset, verbose=0) - - y_pred = np.array(y_pred) - y_true = np.array(y_true) - - auroc = roc_auc_score(to_categorical(y_true, num_classes=10), y_pred, multi_class='ovr') - auprc = average_precision_score(to_categorical(y_true, num_classes=10), y_pred) - - print(f"Test Loss: {loss:.4f}") - print(f"Test Accuracy: {accuracy:.4f}") - print(f"Test auROC: {auroc:.4f}") - print(f"Test auPRC: {auprc:.4f}") - -def load_MNIST_model(model_path=None): + if dataset_type == 'mnist': + return x / 255.0 + elif dataset_type == 'cifar10': + return x / 255.0 + return x + +<<<<<<< HEAD +def load_MNIST_model(model_path=None, experiment=1): """ Loads a pre-trained Keras model or creates a new one if no model is provided. Args: model_path (str, optional): Path to a pre-trained Keras model. Defaults to None. + experiment (int, optional): Defines which experiment model to create. Defaults to 1. Returns: tf.keras.Model: The Keras model. """ - if model_path: - # Load a pre-trained Keras model - model = load_keras_model(model_path) - return model - else: - # Create a new Keras model + + if experiment == 1: + # Create a new Keras model for experiment 1 model = Sequential([ Conv2D(32, kernel_size=(3, 3), padding='same', activation='relu', input_shape=(28, 28, 1)), MaxPooling2D(pool_size=(2, 2)), @@ -70,21 +63,51 @@ def load_MNIST_model(model_path=None): Dense(128, activation='relu'), Dense(10, activation='softmax') ]) - return model + + elif experiment == 2 or experiment == 3: + # Create a new Keras model for experiment 2 or 3 + model = Sequential() + + model.add(Conv2D(32, kernel_size=3, activation='relu', input_shape=(28, 28, 1))) + model.add(BatchNormalization()) + model.add(Conv2D(32, kernel_size=3, activation='relu')) + model.add(BatchNormalization()) + model.add(Conv2D(32, kernel_size=5, strides=2, padding='same', activation='relu')) + model.add(BatchNormalization()) + model.add(Dropout(0.4)) + + model.add(Conv2D(64, kernel_size=3, activation='relu')) + model.add(BatchNormalization()) + model.add(Conv2D(64, kernel_size=3, activation='relu')) + model.add(BatchNormalization()) + model.add(Conv2D(64, kernel_size=5, strides=2, padding='same', activation='relu')) + model.add(BatchNormalization()) + model.add(Dropout(0.4)) + + model.add(Conv2D(128, kernel_size=4, activation='relu')) + model.add(BatchNormalization()) + model.add(Flatten()) + model.add(Dropout(0.4)) + model.add(Dense(10, activation='softmax')) + + if model_path is not None and os.path.isfile(model_path): + # Load the model weights from the model path + model = load_model(model_path) + print(f"Model loaded from {model_path}") + + return model def normalize_mnist(x): """Applies mean/std normalization to MNIST image""" -# mean = 0.1307 -# std = 0.3081 -# return ((x / 255.0) - mean) / std return x / 255.0 -def load_data(batch_size=32): +def load_data(batch_size=32, experiment=1): """ Loads MNIST train and test data and prepares it for evaluation. Args: batch_size (int): The batch size for data loading. + experiment (int): Experiment number to define augmentation or preprocessing. Returns: tf.data.Dataset, tf.data.Dataset: The training and testing datasets. @@ -95,49 +118,219 @@ def load_data(batch_size=32): # Reshape and normalize the data x_train = normalize_mnist(x_train.reshape(-1, 28, 28, 1).astype('float32')) x_test = normalize_mnist(x_test.reshape(-1, 28, 28, 1).astype('float32')) +======= +def load_data(dataset_type='mnist', batch_size=32): + """ + Loads the specified dataset and prepares it for evaluation. + """ + if dataset_type == 'mnist': + (x_train, y_train), (x_test, y_test) = mnist.load_data() + x_train = x_train.reshape(-1, 28, 28, 1) + x_test = x_test.reshape(-1, 28, 28, 1) + elif dataset_type == 'cifar10': + (x_train, y_train), (x_test, y_test) = cifar10.load_data() + + # Normalize data + x_train = normalize_images(x_train, dataset_type) + x_test = normalize_images(x_test, dataset_type) +>>>>>>> 2e3720e3acf18d382025057cfe30b34846348776 # One-hot encode the labels y_train = to_categorical(y_train, 10) y_test = to_categorical(y_test, 10) - # Create TensorFlow datasets - train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train)).batch(batch_size) - test_dataset = tf.data.Dataset.from_tensor_slices((x_test, y_test)).batch(batch_size) + # Apply data augmentation if experiment == 3 + if experiment == 3: + datagen = ImageDataGenerator( + rotation_range=10, + zoom_range=0.10, + width_shift_range=0.1, + height_shift_range=0.1 + ) + # Fit the datagen on the training data + datagen.fit(x_train) + + # Create an augmented training data generator + train_datagen = datagen.flow(x_train, y_train, batch_size=batch_size) + + # Wrap it into a tf.data.Dataset for compatibility with TensorFlow model training + train_dataset = tf.data.Dataset.from_generator( + lambda: train_datagen, + output_signature=( + tf.TensorSpec(shape=(batch_size, 28, 28, 1), dtype=tf.float32), + tf.TensorSpec(shape=(batch_size, 10), dtype=tf.float32) + ) + ) + else: + # No augmentation, simply batch the data + train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train)).batch(batch_size) + # Create test dataset (no augmentation applied) + test_dataset = tf.data.Dataset.from_tensor_slices((x_test, y_test)).batch(batch_size) + return train_dataset, test_dataset -def train(model: tf.keras.Model, train_dataset: tf.data.Dataset, epochs: int = 10) -> tf.keras.Model: + +# Helper Functions for Model Creation +def create_model(dataset_type='mnist'): """ - Trains the model for a specified number of epochs. + Create a CNN model for the specified dataset type. + """ + if dataset_type == 'mnist': + model = Sequential([ + Conv2D(32, kernel_size=(3, 3), padding='same', activation='relu', input_shape=(28, 28, 1)), + MaxPooling2D(pool_size=(2, 2)), + Conv2D(64, kernel_size=(3, 3), padding='same', activation='relu'), + MaxPooling2D(pool_size=(2, 2)), + Flatten(), + Dense(128, activation='relu'), + Dense(10, activation='softmax') + ]) + elif dataset_type == 'cifar10': + model = Sequential([ + Conv2D(32, kernel_size=(3, 3), padding='same', activation='relu', input_shape=(32, 32, 3)), + MaxPooling2D(pool_size=(2, 2)), + Conv2D(64, kernel_size=(3, 3), padding='same', activation='relu'), + MaxPooling2D(pool_size=(2, 2)), + Flatten(), + Dense(128, activation='relu'), + Dense(10, activation='softmax') + ]) + return model - Args: - model (tf.keras.Model): The model to train. - train_dataset (tf.data.Dataset): The training dataset. - epochs (int, optional): Number of training epochs. Defaults to 10. - Returns: - tf.keras.Model: The trained model. +def load_model(model_path=None, dataset_type='mnist'): """ - # Compile the model - model.compile(optimizer=Adam(learning_rate=0.001), loss='categorical_crossentropy', metrics=['accuracy']) + Loads a pre-trained model or creates a new one if no model is provided. + """ +<<<<<<< HEAD + # Convert dataset to a list of images and labels for indexing + dataset_list = list(dataset.as_numpy_iterator()) + all_images, all_labels = zip(*dataset_list) # Unpack images and labels + all_images = np.concatenate(all_images, axis=0) + all_labels = np.concatenate(all_labels, axis=0) + + # Ensure the index is within bounds + if image_index < 0 or image_index >= len(all_images): + raise ValueError(f"Image index {image_index} is out of bounds. Dataset size: {len(all_images)}") + + # Select the specified image and its ground truth label + single_image_input = all_images[image_index] + single_image_target = np.argmax(all_labels[image_index]) # Use the actual label + + single_misclassification_target = (single_image_target + 1) % 10 # Change target to a different class + + # Ensure the targets are different to simulate misclassification + assert single_image_target != single_misclassification_target, \ + "Target classes should be different for misclassification." + + # Create a noisy input set for black-box attack + input_set = [single_image_input + (np.random.uniform(0, 1, single_image_input.shape) * (np.random.rand(*single_image_input.shape) < 0.9)) for _ in range(num_particles) ] + input_set = np.stack(input_set) + + print(f"Original class: {single_image_target}") + print(f"Misclassification target class: {single_misclassification_target}") + + # Initialize the Particle Swarm optimizer with the model and input set + attacker = ParticleSwarm( + model=model, input_set=input_set,starting_class=single_image_target, target_class=single_misclassification_target, num_iterations=num_iterations, + save_dir=output_dir, inertia_weight=1, cognitive_weight=0.8, + social_weight=0.5, momentum=0.9, clip_value_position=0.2 + ) + + attacker.optimize() + + analysis(attacker, single_image_input, single_misclassification_target) +======= + if model_path: + return load_keras_model(model_path) + else: + return create_model(dataset_type) +>>>>>>> 2e3720e3acf18d382025057cfe30b34846348776 + +def analysis(attacker, single_misclassification_input: np.ndarray, single_misclassification_target): + adv_img = attacker.reduce_excess_perturbations(single_misclassification_input.squeeze(), single_misclassification_target) + + for i in range(len(adv_img)): + fig, axs = plt.subplots(1, 5, figsize=(15, 5)) + + original = single_misclassification_input.squeeze().reshape(28,28) + perturbed = np.copy(attacker.particles[i].position).reshape(28,28) + + axs[0].imshow(original, cmap="gray") + axs[0].set_title("Original Image") + + axs[1].imshow(original - perturbed, cmap="gray") + axs[1].set_title("Original - Perturbation") + + axs[2].imshow(perturbed, cmap="gray") + axs[2].set_title("Perturbation") + + axs[3].imshow(perturbed - adv_img[i], cmap="gray") + axs[3].set_title("Perturbation - Adversarial") + + axs[4].imshow(adv_img[i], cmap="gray") + axs[4].set_title("Adversarial Image") + + for ax in axs: + ax.axis("off") - # Train the model + plt.tight_layout() + os.makedirs(f"{attacker.save_dir}/denoised", exist_ok=True) + plt.savefig(f"{attacker.save_dir}/denoised/{i}.png") + plt.close(fig) + +# Helper Function for Evaluation +def evaluate_model(model, test_dataset): + """ + Evaluates the model on the test dataset and prints loss, accuracy, auROC, and auPRC. + """ + y_true = [] + # Compute loss and accuracy + loss, accuracy = model.evaluate(test_dataset, verbose=0) + + # Collect ground truth labels + for images, labels in test_dataset: + y_true.extend(np.argmax(labels.numpy(), axis=1)) + + # Predict all at once to suppress excessive output + y_pred = model.predict(test_dataset, verbose=0) + + y_pred = np.array(y_pred) + y_true = np.array(y_true) + + auroc = roc_auc_score(to_categorical(y_true, num_classes=10), y_pred, multi_class='ovr') + auprc = average_precision_score(to_categorical(y_true, num_classes=10), y_pred) + + print(f"Test Loss: {loss:.4f}") + print(f"Test Accuracy: {accuracy:.4f}") + print(f"Test auROC: {auroc:.4f}") + print(f"Test auPRC: {auprc:.4f}") + + +# Helper Functions for Training +def train_model(model, train_dataset, epochs=10): + """ + Trains the model for a specified number of epochs. + """ + model.compile(optimizer=Adam(learning_rate=0.001), loss='categorical_crossentropy', metrics=['accuracy']) + for epoch in range(epochs): start_time = time.time() # Track time for each epoch print(f"\nEpoch {epoch + 1}/{epochs}:") - + running_loss = 0.0 running_accuracy = 0.0 num_batches = 0 # Use tqdm for a progress bar for images, labels in tqdm(train_dataset, desc="Training", unit="batch"): + # Use train_on_batch instead of fit to update the model on each batch loss, accuracy = model.train_on_batch(images, labels) running_loss += loss running_accuracy += accuracy num_batches += 1 - # Print average loss and accuracy for the epoch epoch_loss = running_loss / num_batches epoch_accuracy = running_accuracy / num_batches elapsed_time = time.time() - start_time @@ -145,23 +338,15 @@ def train(model: tf.keras.Model, train_dataset: tf.data.Dataset, epochs: int = 1 return model -def adversarial_attack_blackbox(model: tf.keras.Model, dataset: tf.data.Dataset, image_index: int, output_dir: str = 'results', num_iterations: int = 30, num_particles: int = 100) -> tf.data.Dataset: - """ - Performs a black-box adversarial attack on a specific image in the dataset using Particle Swarm optimization. - - Args: - model (tf.keras.Model): The trained model to attack. - dataset (tf.data.Dataset): The dataset containing the images. - image_index (int): The index of the image in the dataset to attack. - num_iterations (int, optional): Number of iterations for the attack. Defaults to 30. - num_particles (int, optional): Number of particles for the attack. Defaults to 100. - Returns: - tf.data.Dataset: A dataset containing adversarially perturbed images. +# Adversarial Attack Helper +def adversarial_attack_blackbox(model, dataset, image_index, output_dir='results', num_iterations=30, num_particles=100): + """ + Perform adversarial attack on a specific image using Particle Swarm optimization. """ # Convert dataset to a list of images and labels for indexing dataset_list = list(dataset.as_numpy_iterator()) - all_images, all_labels = zip(*dataset_list) # Unpack images and labels + all_images, all_labels = zip(*dataset_list) all_images = np.concatenate(all_images, axis=0) all_labels = np.concatenate(all_labels, axis=0) @@ -171,146 +356,64 @@ def adversarial_attack_blackbox(model: tf.keras.Model, dataset: tf.data.Dataset, # Select the specified image and its ground truth label single_image_input = all_images[image_index] - single_image_target = np.argmax(all_labels[image_index]) # Use the actual label + single_image_target = np.argmax(all_labels[image_index]) single_misclassification_target = (single_image_target + 1) % 10 # Change target to a different class - # Ensure the targets are different to simulate misclassification - assert single_image_target != single_misclassification_target, \ - "Target classes should be different for misclassification." - - # Create a noisy input set for black-box attack input_set = [single_image_input + (np.random.uniform(0, 1, single_image_input.shape) * (np.random.rand(*single_image_input.shape) < 0.9)) for _ in range(num_particles) ] input_set = np.stack(input_set) - print(f"Original class: {single_image_target}") - print(f"Misclassification target class: {single_misclassification_target}") - - # Initialize the Particle Swarm optimizer with the model and input set - attacker = ParticleSwarm( - model, input_set, single_misclassification_target, num_iterations=num_iterations, - epsilon=1, save_dir=output_dir, inertia_weight=1, cognitive_weight=0.8, - social_weight=0.5, momentum=0.9, velocity_clamp=0.2 - ) - + attacker = ParticleSwarm(model, input_set, single_misclassification_target, num_iterations=num_iterations, save_dir=output_dir, inertia_weight=.01) attacker.optimize() analysis(attacker, single_image_input, single_misclassification_target) -def analysis(attacker, single_misclassification_input: np.ndarray, single_misclassification_target): - """ - Analyzes the results of the attack and generates plots. - - Saves the original misclassification input and target. - - For each particle and each position in the particle's history: - - Save the position (perturbed image). - - Save all confidence values. - - Save the maximum output (softmax confidence). - - Save the difference from the original input. - """ - # Save the original image and its classification - plt.imsave(os.path.join(attacker.save_dir, "original.png"), single_misclassification_input.squeeze(), cmap="gray", vmin=0, vmax=1) - - analysis_results = { - "original_misclassification_input": single_misclassification_input.tolist(), - "original_misclassification_target": int(single_misclassification_target), - "particles": [] - } - - # Process each particle in the attacker's particles list - for particle_idx, particle in enumerate(attacker.particles): - print(f"Processing particle: {particle_idx}") - particle_data = { - "particle_index": particle_idx, - "positions": [], - "confidence_values": [], - "max_output_values": [], - "max_output_classes": [], - "differences_from_original": [], - "confidence_over_time": [] # Store confidence over time - } - - for step_idx, position in enumerate(particle.history): - # Ensure 'position' is a numpy array. - if isinstance(position, tf.Tensor): - position_np = position.numpy() - else: - position_np = np.array(position) - - output = attacker.model(position_np) - - # Remove the batch dimension and apply softmax - softmax_output = tf.nn.softmax(tf.squeeze(output), axis=0) - confidence_values = softmax_output.numpy().tolist() - max_output_value = float(max(confidence_values)) - max_output_class = confidence_values.index(max_output_value) - - # Calculate pixel-wise difference from original image (before attack) - #diff_image = np.abs(position_np - single_misclassification_input)[0] - diff_image = (position_np - single_misclassification_input)[0] - #print(position_np) - #print(single_misclassification_input) - #print(diff_image) - # Save the difference image - iteration_folder = os.path.join(attacker.save_dir, f"iteration_{step_idx + 1}") - if not os.path.exists(iteration_folder): - os.makedirs(iteration_folder) - plt.imsave(os.path.join(iteration_folder, f"attack-vector_image_{particle_idx + 1}.png"), diff_image.squeeze(), cmap="seismic", vmin=-1, vmax=1) - - # Calculate difference from original image (before attack) - difference_from_original = float(np.linalg.norm(position - single_misclassification_input)) - - # Add data for this step to the particle_data - particle_data["positions"].append(position_np.tolist()) - particle_data["confidence_values"].append(confidence_values) - particle_data["max_output_values"].append(max_output_value) - particle_data["max_output_classes"].append(max_output_class) - particle_data["differences_from_original"].append(difference_from_original) - particle_data["confidence_over_time"].append(max_output_value) # Store max output (confidence) - - # Append the particle's data to the main analysis results - analysis_results["particles"].append(particle_data) - - # Save the analysis results to a JSON file - output_dir = attacker.save_dir # Use the save_dir from the attacker - os.makedirs(output_dir, exist_ok=True) - file_path = os.path.join(output_dir, "attack_analysis.json") - - with open(file_path, "w") as f: - json.dump(analysis_results, f, indent=4) - - print(f"Analysis results saved to {file_path}") -def main() -> None: +# Main Function to Tie Everything Together +def main(): """ Main function to execute the adversarial attack workflow. """ + # Define argument parser parser = argparse.ArgumentParser(description="Adversarial attack workflow with optional pre-trained Keras model.") + parser.add_argument('--dataset', type=str, default='mnist', choices=['mnist', 'cifar10'], help="Dataset to use (mnist or cifar10).") parser.add_argument('--model_path', type=str, default=None, help="Path to a pre-trained Keras model.") - parser.add_argument('--iterations', type=int, default=50, help="Number of iterations for the black-box attack.") - parser.add_argument('--particles', type=int, default=100, help="Number of particles for the black-box attack.") + parser.add_argument('--iterations', type=int, default=5, help="Number of iterations for the black-box attack.") + parser.add_argument('--particles', type=int, default=10, help="Number of particles for the black-box attack.") parser.add_argument('--save_dir', type=str, default="analysis_results", help="Directory to save analysis results.") + parser.add_argument('--model_experiment', type=int, default=1, help="Which of the 3 experiments to run: (1 is simple model, 2 is advanced, 3 is advanced with augmentation)") + args = parser.parse_args() +<<<<<<< HEAD #seed_everything(1252025) # Load pre-trained model (MNIST model) or create a new one - model = load_MNIST_model(args.model_path) + model = load_MNIST_model(args.model_path, args.model_experiment) # Load MNIST dataset (train and test datasets) train_dataset, test_dataset = load_data() - if args.model_path is None: + if args.model_path is None or not os.path.isfile(args.model_path): # Train the model if no pre-trained model is provided model = train(model, train_dataset, epochs=5) - model.save('mnist_model.keras') - print("Model saved to mnist_model.keras") + model.save(f"mnist_model_{args.model_experiment}.keras") + print(f"Model saved to mnist_model_{args.model_experiment}.keras") +======= + model = load_model(args.model_path, args.dataset) + train_dataset, test_dataset = load_data(args.dataset) + + if args.model_path is None: + model = train_model(model, train_dataset, epochs=5) + model.save(f'{args.dataset}_model.keras') + print(f"Model saved to {args.dataset}_model.keras") +>>>>>>> 2e3720e3acf18d382025057cfe30b34846348776 - # Evaluate the model print("Model statistics on test dataset") evaluate_model(model, test_dataset) - # Perform adversarial attack - adversarial_dataset = adversarial_attack_blackbox(model, test_dataset, 0, output_dir=args.save_dir, num_iterations=args.iterations, num_particles=args.particles) + adversarial_attack_blackbox(model, test_dataset, image_index=0, output_dir=args.save_dir, num_iterations=args.iterations, num_particles=args.particles) + if __name__ == "__main__": main()