Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 30 additions & 2 deletions .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,34 @@
"dockerfile": "Dockerfile"
},
"features": {
"ghcr.io/rocker-org/devcontainer-features/miniforge:2": {}
}
"ghcr.io/devcontainers/features/anaconda:1": {
"version": "latest"
},
"ghcr.io/devcontainers/features/nvidia-cuda:2": {
"installCudnn": true,
"installCudnnDev": true,
"installNvtx": true,
"installToolkit": true,
"cudaVersion": "11.8",
"cudnnVersion": "automatic"
},
"ghcr.io/raucha/devcontainer-features/pytorch:1": {}
},
"runArgs": [
"--gpus=all"
]
// Features to add to the dev container. More info: https://containers.dev/features.
// "features": {},

// Use 'forwardPorts' to make a list of ports inside the container available locally.
// "forwardPorts": [],

// Use 'postCreateCommand' to run commands after the container is created.
// "postCreateCommand": "python --version",

// Configure tool-specific properties.
// "customizations": {},

// Uncomment to connect as root instead. More info: https://aka.ms/dev-containers-non-root.
// "remoteUser": "root"
}
29 changes: 13 additions & 16 deletions Adversarial_Observation/BirdParticle.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,41 +9,38 @@ class BirdParticle:
fitness evaluation, and the updates to its velocity and position based on the PSO algorithm.
"""

def __init__(self, model: tf.keras.Model, input_data: tf.Tensor, target_class: int, epsilon: float,
def __init__(self, model: tf.keras.Model, input_data: tf.Tensor, target_class: int, num_iterations: int = 20,
velocity: tf.Tensor = None, inertia_weight: float = 0.5,
cognitive_weight: float = 1.0, social_weight: float = 1.0, momentum: float = 0.9,
velocity_clamp: float = 0.1):
cognitive_weight: float = 1.0, social_weight: float = 1.0,
momentum: float = 0.9, clip_value_position: float = 1.0):
"""
Initialize a particle in the PSO algorithm.

Args:
model (tf.keras.Model): The model to attack.
input_data (tf.Tensor): The input data (image) to attack.
target_class (int): The target class for misclassification.
epsilon (float): The perturbation bound (maximum amount the image can be altered).
velocity (tf.Tensor, optional): The initial velocity for the particle's movement. Defaults to zero velocity if not provided.
inertia_weight (float): The inertia weight for the velocity update. Default is 0.5.
cognitive_weight (float): The cognitive weight for the velocity update. Default is 1.0.
social_weight (float): The social weight for the velocity update. Default is 1.0.
momentum (float): The momentum for the velocity update. Default is 0.9.
velocity_clamp (float): The velocity clamp for limiting the maximum velocity. Default is 0.1.
"""
self.model = model
self.num_iterations = num_iterations
self.original_data = tf.identity(input_data) # Clone the input data
self.target_class = target_class
self.epsilon = epsilon
self.best_position = tf.identity(input_data) # Clone the input data
self.best_score = -np.inf
self.position = tf.identity(input_data) # Clone the input data
self.velocity = velocity if velocity is not None else tf.zeros_like(input_data)
self.history = [self.position]

self.clip_value_position = clip_value_position
# Class attributes
self.inertia_weight = inertia_weight
self.cognitive_weight = cognitive_weight
self.social_weight = social_weight
self.momentum = momentum
self.velocity_clamp = velocity_clamp

def fitness(self) -> float:
"""
Expand Down Expand Up @@ -71,29 +68,29 @@ def update_velocity(self, global_best_position: tf.Tensor) -> None:
cognitive = self.cognitive_weight * tf.random.uniform(self.position.shape) * (self.best_position - self.position)
social = self.social_weight * tf.random.uniform(self.position.shape) * (global_best_position - self.position)

self.velocity = inertia + cognitive + social # Update velocity based on PSO formula

# Apply momentum and velocity clamping
self.velocity = self.velocity * self.momentum # Apply momentum
self.velocity = tf.clip_by_value(self.velocity, -self.velocity_clamp, self.velocity_clamp) # Apply velocity clamp
# Apply momentum to velocity update:
self.velocity = self.momentum * self.velocity + inertia + cognitive + social # Apply momentum

def update_position(self) -> None:
"""
Update the position of the particle based on the updated velocity.

Ensures that the position stays within the valid input range [0, 1] (normalized pixel values).
The position is updated directly without any bounds checking.
"""
self.position = tf.clip_by_value(self.position + self.velocity, 0, 1) # Ensure position stays within bounds
self.position = self.position + self.velocity # Update position directly without clipping
self.position = tf.clip_by_value(self.position + self.velocity, 0, 1) # Ensure position stays within bounds
self.history.append(tf.identity(self.position)) # Store the position history
self.position = tf.clip_by_value(self.position, -self.clip_value_position, self.clip_value_position)

def evaluate(self) -> None:
"""
Evaluate the fitness of the current particle and update its personal best.

The fitness score is calculated using the target class probability. If the current fitness score
is better than the personal best, update the personal best position and score.
is better than the personal best, update the best position and score.
"""
score = self.fitness() # Get the current fitness score based on the perturbation
if score > self.best_score: # If score is better than the personal best, update the best position
self.best_score = score
self.best_position = tf.identity(self.position) # Clone the current position

161 changes: 106 additions & 55 deletions Adversarial_Observation/Swarm.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import logging
from typing import List
from Adversarial_Observation.BirdParticle import BirdParticle

from tqdm import tqdm
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
Expand All @@ -15,10 +15,10 @@ class ParticleSwarm:
to misclassify it into the target class.
"""

def __init__(self, model: tf.keras.Model, input_set: np.ndarray, target_class: int,
num_iterations: int = 20, epsilon: float = 0.8, save_dir: str = 'results',
inertia_weight: float = 0.5, cognitive_weight: float = .5, social_weight: float = .5,
momentum: float = 0.9, velocity_clamp: float = 0.1):
def __init__(self, model: tf.keras.Model, input_set: np.ndarray, starting_class: int, target_class: int,
num_iterations: int = 20, save_dir: str = 'results', inertia_weight: float = 0.5,
cognitive_weight: float = .5, social_weight: float = .5, momentum: float = 0.9,
clip_value_position: float = 0.2, enable_logging: bool = False, device: str = 'cpu'):
"""
Initialize the Particle Swarm Optimization (PSO) for adversarial attacks.

Expand All @@ -27,44 +27,48 @@ def __init__(self, model: tf.keras.Model, input_set: np.ndarray, target_class: i
input_set (np.ndarray): The batch of input images to attack as a NumPy array.
target_class (int): The target class for misclassification.
num_iterations (int): The number of optimization iterations.
epsilon (float): The perturbation bound.
save_dir (str): The directory to save output images and logs.
inertia_weight (float): The inertia weight for the velocity update.
cognitive_weight (float): The cognitive weight for the velocity update.
social_weight (float): The social weight for the velocity update.
momentum (float): The momentum for the velocity update.
velocity_clamp (float): The velocity clamp to limit the velocity.
clip_value_position (float): The velocity clamp to limit the velocity.
device (str): The device for computation ('cpu' or 'gpu'). Default is 'cpu'.
"""
self.model = model
self.input_set = tf.convert_to_tensor(input_set, dtype=tf.float32) # Convert NumPy array to TensorFlow tensor
self.start_class = starting_class # The starting class index
self.target_class = target_class # The target class index
self.num_iterations = num_iterations
self.epsilon = epsilon # Perturbation bound
self.save_dir = save_dir # Directory to save perturbed images

self.enable_logging = enable_logging
self.device = device # Device ('cpu' or 'gpu')

self.particles: List[BirdParticle] = [
BirdParticle(model, self.input_set[i:i + 1], target_class, epsilon,
inertia_weight=inertia_weight, cognitive_weight=cognitive_weight,
social_weight=social_weight, momentum=momentum, velocity_clamp=velocity_clamp)
BirdParticle(model, self.input_set[i:i + 1], target_class,
inertia_weight=inertia_weight, cognitive_weight=cognitive_weight, social_weight=social_weight,
momentum=momentum, clip_value_position=clip_value_position)
for i in range(len(input_set))
]

self.global_best_position = tf.zeros_like(self.input_set[0]) # Global best position
self.global_best_score = -float('inf') # Initialize with a very low score

self.fitness_history: List[float] = [] # History of fitness scores to track progress
self.setup_logging() # Set up logging
self.log_progress(-1) # Log initial state (before optimization)

# Make output folder
iteration_dir = self.save_dir
os.makedirs(iteration_dir, exist_ok=True)
if self.enable_logging:
self.setup_logging()
self.log_progress(-1)

def setup_logging(self):
"""
Set up logging for each iteration. Each iteration will have a separate log file.
Also prints logs to the terminal.
"""
iteration_dir = self.save_dir
os.makedirs(iteration_dir, exist_ok=True)

log_file = os.path.join(iteration_dir, f'iteration_log.log')
log_file = os.path.join(self.save_dir, f'iteration_log.log')
self.logger = logging.getLogger()

# Create a file handler to save logs to a file
Expand All @@ -86,7 +90,6 @@ def setup_logging(self):
self.logger.info(f"Model: {self.model.__class__.__name__}")
self.logger.info(f"Target Class: {self.target_class} (This is the class we want to misclassify the image into)")
self.logger.info(f"Number of Iterations: {self.num_iterations} (Optimization steps)")
self.logger.info(f"Epsilon (perturbation bound): {self.epsilon} (Maximum perturbation allowed)")
self.logger.info(f"Save Directory: {self.save_dir}")
self.logger.info(f"{'*' * 60}")

Expand All @@ -97,13 +100,16 @@ def log_progress(self, iteration: int):
Args:
iteration (int): The current iteration of the optimization process.
"""
if not self.enable_logging:
return

# Log the header for the iteration
self.logger.info(f"\n{'-'*60}")
self.logger.info(f"Iteration {iteration + 1}/{self.num_iterations}")
self.logger.info(f"{'='*60}")

# Table header
header = f"{'Particle':<10}{'Original Pred':<15}{'Perturbed Pred':<18}{'Orig Target Prob':<20}" \
header = f"{'Particle':<10}{'Original Pred':<15}{'Perturbed Pred':<18}{'Orig Start Prob':<20}{'Pert Start Prob':<20}{'Orig Target Prob':<20}" \
f"{'Pert Target Prob':<20}{'Personal Best':<20}{'Global Best':<20}"
self.logger.info(header)
self.logger.info(f"{'-'*60}")
Expand All @@ -121,51 +127,96 @@ def log_progress(self, iteration: int):
# Get softmax probabilities
original_probs = tf.nn.softmax(original_output, axis=1)
perturbed_probs = tf.nn.softmax(perturbed_output, axis=1)


# Get starting class probabilities (how far away we've moved)
original_prob_start = original_probs[0, self.start_class].numpy().item()
perturbed_prob_start = perturbed_probs[0, self.start_class].numpy().item()

# Get target class probabilities
original_prob_target = original_probs[0, self.target_class].numpy().item()
perturbed_prob_target = perturbed_probs[0, self.target_class].numpy().item()

# Log each particle's data in a formatted row
self.logger.info(f"{i+1:<10}{original_pred:<15}{perturbed_pred:<18}{original_prob_target:<20.4f}"
f"{perturbed_prob_target:<20.4f}{particle.best_score:<20.4f}{self.global_best_score:<20.4f}")
self.logger.info(f"{i+1:<10}{original_pred:<15}{perturbed_pred:<18}{original_prob_start:<20.4f}{perturbed_prob_start:<20.4f}"
f"{original_prob_target:<20.4f}{perturbed_prob_target:<20.4f}{particle.best_score:<20.4f}{self.global_best_score:<20.4f}")

self.logger.info(f"{'='*60}")

def save_images(self, iteration: int):
def optimize(self):
"""
Save the perturbed images for the current iteration.

Args:
iteration (int): The current iteration number.
Run the Particle Swarm Optimization process to optimize the perturbations.
"""
iteration_folder = os.path.join(self.save_dir, f"iteration_{iteration + 1}")
os.makedirs(iteration_folder, exist_ok=True)

for i, particle in enumerate(self.particles):
# Convert TensorFlow tensor to NumPy array
position_numpy = particle.position.numpy()
# Remove extra batch dimension (if it exists)
position_numpy = np.squeeze(position_numpy) # Now shape is (28, 28)
plt.imsave(os.path.join(iteration_folder, f"perturbed_image_{i + 1}.png"), position_numpy, cmap="gray", vmin=0, vmax=1)
with tf.device(f"/{self.device}:0"): # Use the GPU/CPU based on the flag
for iteration in tqdm(range(self.num_iterations), desc="Running Swarm"):
# Update particles and velocities, evaluate them, and track global best
for particle in self.particles:
particle.evaluate()
particle.update_velocity(self.global_best_position) # No need to pass inertia_weight explicitly
particle.update_position()

# Update the global best based on the personal best scores of particles
best_particle = max(self.particles, key=lambda p: p.best_score)
if best_particle.best_score > self.global_best_score:
self.global_best_score = best_particle.best_score
self.global_best_position = tf.identity(best_particle.best_position)

self.log_progress(iteration)

def optimize(self):
def reduce_excess_perturbations(self, original_img: np.ndarray, target_label: int, model_shape: tuple = (1, 28, 28, 1)) -> np.ndarray:
"""
Run the Particle Swarm Optimization process to optimize the perturbations.
Reduces excess perturbations in adversarial images while ensuring the misclassification remains.

Args:
original_img (np.ndarray): The original (clean) image.
target_label (int): The label we want the adversarial image to produce.
model_shape (tuple): The expected shape of the model input.

Returns:
list[np.ndarray]: A list of denoised adversarial images.
"""
for iteration in range(self.num_iterations):

# Update particles and velocities, evaluate them, and track global best
for particle in self.particles:
particle.evaluate()
particle.update_velocity(self.global_best_position) # No need to pass inertia_weight explicitly
particle.update_position()

# Update the global best based on the personal best scores of particles
best_particle = max(self.particles, key=lambda p: p.best_score)
if best_particle.best_score > self.global_best_score:
self.global_best_score = best_particle.best_score
self.global_best_position = tf.identity(best_particle.best_position)

self.save_images(iteration)
self.log_progress(iteration)
denoised_adv = []
total_pixels = np.prod(original_img.shape) # Total pixels in the image

for adv_particle in tqdm(self.particles, desc="Processing Particles", unit="particle"):
adv_img = np.copy(adv_particle.position).reshape(original_img.shape) # Copy to avoid modifying the original

if original_img.shape != adv_img.shape:
raise ValueError("original_img and adv_img must have the same shape.")

# Iterate over every pixel coordinate in the image with tqdm progress bar
with tqdm(total=total_pixels, desc="Processing Pixels", unit="pixel", leave=False) as pbar:
for idx in np.ndindex(original_img.shape):
if original_img[idx] == adv_img[idx]: # Ignore unchanged pixels
pbar.update(1)
continue

# Store old adversarial value
old_val = adv_img[idx]

# Try restoring the pixel to original
adv_img[idx] = original_img[idx]

# Check if the label is still the target label
output = self.model(adv_img.reshape(model_shape))
softmax_output = tf.nn.softmax(tf.squeeze(output), axis=0).numpy()
current_label = np.argmax(softmax_output)

if current_label != target_label:
# If misclassification is lost, try halfway adjustment
adv_img[idx] = old_val
adv_img[idx] += (original_img[idx] - adv_img[idx]) * 0.5

# Recheck if the label is still the target
output = self.model(adv_img.reshape(model_shape))
softmax_output = tf.nn.softmax(tf.squeeze(output), axis=0).numpy()
current_label = np.argmax(softmax_output)

if current_label != target_label:
# If misclassification is still lost, revert back
adv_img[idx] = old_val

pbar.update(1) # Update pixel progress

denoised_adv.append(adv_img)

return denoised_adv
6 changes: 6 additions & 0 deletions manuscripts/Posion25/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
AudioMNIST*
results/
*.pyc
__pycache__/
*.pkl
*.keras
Loading
Loading