Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 8 additions & 14 deletions .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,16 @@
"dockerfile": "Dockerfile"
},
"features": {
"ghcr.io/devcontainers/features/anaconda:1": {
"version": "latest"
},
"ghcr.io/devcontainers/features/nvidia-cuda:2": {
"installCudnn": true,
"installCudnnDev": true,
"installNvtx": true,
"installToolkit": true,
"cudaVersion": "11.8",
"cudnnVersion": "automatic"
},
"ghcr.io/raucha/devcontainer-features/pytorch:1": {}
"ghcr.io/devcontainers/features/anaconda:1": {},
"ghcr.io/devcontainers/features/nvidia-cuda:2": {},
"ghcr.io/rocker-org/devcontainer-features/miniforge:2": {}
},
"runArgs": [
"--gpus=all"
]
"--gpus", "all"
],
// allow gpu


// Features to add to the dev container. More info: https://containers.dev/features.
// "features": {},

Expand Down
160 changes: 89 additions & 71 deletions Adversarial_Observation/Attacks.py
Original file line number Diff line number Diff line change
@@ -1,74 +1,92 @@
import numpy as np
import torch
import torch.nn.functional as F
import matplotlib.pyplot as plt
import logging
import os
from datetime import datetime
from torch.nn import Softmax
from .utils import fgsm_attack, compute_success_rate, log_metrics, visualize_adversarial_examples
from .utils import seed_everything

class AdversarialTester:
def __init__(self, model: torch.nn.Module, epsilon: float = 0.1, attack_method: str = 'fgsm', alpha: float = 0.01,
num_steps: int = 40, device=None, save_dir: str = './results', seed: int = 42):
seed_everything(seed)
self.model = model

# Set up logging
logging.basicConfig(level=logging.INFO)

def fgsm_attack(input_batch_data: torch.Tensor, model: torch.nn.Module, input_shape: tuple, epsilon: float) -> torch.Tensor:
"""
Apply the FGSM attack to input images given a pre-trained PyTorch model.
"""
model.eval()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
input_batch_data = input_batch_data.to(device)

adversarial_batch_data = []

for img in input_batch_data:
# Make a copy of the image and enable gradient tracking
img = img.clone().detach().unsqueeze(0).to(device)
img.requires_grad = True

# Forward pass
preds = model(img)
target = torch.argmax(preds, dim=1)
loss = F.cross_entropy(preds, target)

# Backward pass
model.zero_grad()
loss.backward()

# Generate perturbation
grad = img.grad.data
adversarial_img = img + epsilon * grad.sign()
adversarial_img = torch.clamp(adversarial_img, 0, 1)

adversarial_batch_data.append(adversarial_img.squeeze(0).detach())

return torch.stack(adversarial_batch_data)

def compute_gradients(model, img, target_class):
preds = model(img)
target_score = preds[0, target_class]
return torch.autograd.grad(target_score, img)[0]

def generate_adversarial_examples(input_batch_data, model, method='fgsm', **kwargs):
if method == 'fgsm':
return fgsm_attack(input_batch_data, model, **kwargs)
# Implement other attack methods as needed

def gradient_ascent(input_image, model, input_shape, target_class, num_iterations=100, step_size=0.01):
model.eval()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
input_image = input_image.to(device).detach().requires_grad_(True)

for _ in range(num_iterations):
gradients = compute_gradients(model, input_image.reshape(input_shape), target_class)
input_image = input_image + step_size * gradients.sign()
input_image = torch.clamp(input_image, 0, 1)
input_image = input_image.detach().requires_grad_(True)

return input_image.cpu().detach().numpy()

def gradient_map(input_image, model, input_shape):
model.eval()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
input_image = torch.tensor(input_image).to(device).detach().requires_grad_(True)

preds = model(input_image.reshape(input_shape))
target_class = torch.argmax(preds)
loss = F.cross_entropy(preds, target_class.unsqueeze(0))

model.zero_grad()
loss.backward()

gradient = input_image.grad.data.cpu().numpy()
gradient = np.abs(gradient).mean(axis=1) # Average over channels if needed
return gradient

def visualize_adversarial_examples(original, adversarial):
# Code to visualize original vs adversarial images
pass

def log_metrics(success_rate, average_perturbation):
logging.info(f'Success Rate: {success_rate}, Average Perturbation: {average_perturbation}')

class Config:
def __init__(self, epsilon=0.1, attack_method='fgsm'):
self.epsilon = epsilon
self.attack_method = attack_method
self.alpha = alpha
self.num_steps = num_steps
self.device = device or ("cuda" if torch.cuda.is_available() else "cpu")
self.save_dir = save_dir

# Create save directory if it doesn't exist
os.makedirs(self.save_dir, exist_ok=True)
self.model.to(self.device)
self.model.eval()

self._setup_logging()

def _setup_logging(self):
log_file = os.path.join(self.save_dir, f"attack_log_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log")
logging.basicConfig(filename=log_file, level=logging.DEBUG)
logging.info(f"Started adversarial testing at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
logging.info(f"Using model: {self.model.__class__.__name__}")
logging.info(f"Attack Method: {self.attack_method}, Epsilon: {self.epsilon}, Alpha: {self.alpha}, Steps: {self.num_steps}")

def test_attack(self, input_batch_data: torch.Tensor):
input_batch_data = input_batch_data.to(self.device)
adversarial_images = self._generate_adversarial_images(input_batch_data)

# Save and log images
self._save_images(input_batch_data, adversarial_images)
self._compute_and_log_metrics(input_batch_data, adversarial_images)

def _generate_adversarial_images(self, input_batch_data: torch.Tensor):
logging.info(f"Starting attack with method: {self.attack_method}")
if self.attack_method == 'fgsm':
return fgsm_attack(input_batch_data, self.model, self.epsilon, self.device)
else:
raise ValueError(f"Unsupported attack method: {self.attack_method}")

def _save_images(self, original_images: torch.Tensor, adversarial_images: torch.Tensor):
for i in range(original_images.size(0)):
original_image_path = os.path.join(self.save_dir, f"original_{i}.png")
adversarial_image_path = os.path.join(self.save_dir, f"adversarial_{i}.png")
visualize_adversarial_examples(original_images, adversarial_images, original_image_path, adversarial_image_path)

def _compute_and_log_metrics(self, original_images: torch.Tensor, adversarial_images: torch.Tensor):
original_predictions = torch.argmax(self.model(original_images), dim=1)
adversarial_predictions = torch.argmax(self.model(adversarial_images), dim=1)

success_rate = compute_success_rate(original_predictions, adversarial_predictions)
average_perturbation = torch.mean(torch.abs(adversarial_images - original_images)).item()

log_metrics(success_rate, average_perturbation)
self._save_metrics(success_rate, average_perturbation)

logging.info(f"Success Rate: {success_rate:.4f}, Average Perturbation: {average_perturbation:.4f}")

def _save_metrics(self, success_rate: float, avg_perturbation: float):
"""
Save the metrics (success rate and average perturbation) to a file.
"""
metrics_file = os.path.join(self.save_dir, "attack_metrics.txt")
with open(metrics_file, 'a') as f:
f.write(f"Success Rate: {success_rate:.4f}, Average Perturbation: {avg_perturbation:.4f}\n")
self.attack_method = attack_method
107 changes: 52 additions & 55 deletions Adversarial_Observation/BirdParticle.py
Original file line number Diff line number Diff line change
@@ -1,42 +1,44 @@
import tensorflow as tf
import torch
import torch.nn.functional as F
import numpy as np

class BirdParticle:
"""
Represents a particle in the Particle Swarm Optimization (PSO) algorithm for adversarial attacks.

The BirdParticle class encapsulates the state of each particle, including its position, velocity,
fitness evaluation, and the updates to its velocity and position based on the PSO algorithm.
Represents a particle in the Particle Swarm Optimization (PSO) algorithm for adversarial attacks (PyTorch version).
"""

def __init__(self, model: tf.keras.Model, input_data: tf.Tensor, target_class: int, num_iterations: int = 20,
velocity: tf.Tensor = None, inertia_weight: float = 0.5,
def __init__(self, model: torch.nn.Module, input_data: torch.Tensor, target_class: int, num_iterations: int = 20,
velocity: torch.Tensor = None, inertia_weight: float = 0.5,
cognitive_weight: float = 1.0, social_weight: float = 1.0,
momentum: float = 0.9, clip_value_position: float = 1.0):
momentum: float = 0.9, clip_value_position: float = 1.0, device='cpu'):
"""
Initialize a particle in the PSO algorithm.

Args:
model (tf.keras.Model): The model to attack.
input_data (tf.Tensor): The input data (image) to attack.
model (torch.nn.Module): The model to attack.
input_data (torch.Tensor): The input data (image) to attack.
target_class (int): The target class for misclassification.
velocity (tf.Tensor, optional): The initial velocity for the particle's movement. Defaults to zero velocity if not provided.
inertia_weight (float): The inertia weight for the velocity update. Default is 0.5.
cognitive_weight (float): The cognitive weight for the velocity update. Default is 1.0.
social_weight (float): The social weight for the velocity update. Default is 1.0.
momentum (float): The momentum for the velocity update. Default is 0.9.
velocity (torch.Tensor, optional): Initial velocity; defaults to zero.
inertia_weight (float): Inertia weight for velocity update.
cognitive_weight (float): Cognitive weight for velocity update.
social_weight (float): Social weight for velocity update.
momentum (float): Momentum for velocity update.
clip_value_position (float): Max absolute value to clip position.
device (str): Device to run on.
"""
self.model = model
self.device = device
self.model = model.to(device)
self.num_iterations = num_iterations
self.original_data = tf.identity(input_data) # Clone the input data
self.original_data = input_data.clone().detach().to(device)
self.position = input_data.clone().detach().to(device)
self.target_class = target_class
self.best_position = tf.identity(input_data) # Clone the input data
self.velocity = velocity.clone().detach().to(device) if velocity is not None else torch.zeros_like(input_data).to(device)
self.best_position = self.position.clone().detach()
self.best_score = -np.inf
self.position = tf.identity(input_data) # Clone the input data
self.velocity = velocity if velocity is not None else tf.zeros_like(input_data)
self.history = [self.position]
self.history = [self.position.clone().detach()]
self.clip_value_position = clip_value_position
# Class attributes

# PSO hyperparameters
self.inertia_weight = inertia_weight
self.cognitive_weight = cognitive_weight
self.social_weight = social_weight
Expand All @@ -45,52 +47,47 @@ def __init__(self, model: tf.keras.Model, input_data: tf.Tensor, target_class: i
def fitness(self) -> float:
"""
Compute the fitness score for the particle, which is the softmax probability of the target class.

Higher fitness scores correspond to better success in the attack (misclassifying the image into the target class).

Returns:
float: Fitness score for this particle (higher is better).
float: Target class softmax probability.
"""
output = self.model(self.position) # Add batch dimension and pass through the model
probabilities = tf.nn.softmax(output, axis=1) # Get probabilities for each class
target_prob = probabilities[:, self.target_class] # Target class probability

return target_prob.numpy().item() # Return the target class probability as fitness score
self.model.eval()
with torch.no_grad():
input_tensor = self.position
output = self.model(input_tensor.to(self.device))
probabilities = F.softmax(output, dim=1)
target_prob = probabilities[:, self.target_class]
return target_prob.item()

def update_velocity(self, global_best_position: tf.Tensor) -> None:
def update_velocity(self, global_best_position: torch.Tensor) -> None:
"""
Update the velocity of the particle based on the PSO update rule.
Update the particle's velocity using the PSO rule.

Args:
global_best_position (tf.Tensor): The global best position found by the swarm.
global_best_position (torch.Tensor): Global best position in the swarm.
"""
r1 = torch.rand_like(self.position).to(self.device)
r2 = torch.rand_like(self.position).to(self.device)

inertia = self.inertia_weight * self.velocity
cognitive = self.cognitive_weight * tf.random.uniform(self.position.shape) * (self.best_position - self.position)
social = self.social_weight * tf.random.uniform(self.position.shape) * (global_best_position - self.position)

# Apply momentum to velocity update:
self.velocity = self.momentum * self.velocity + inertia + cognitive + social # Apply momentum
cognitive = self.cognitive_weight * r1 * (self.best_position - self.position)
social = self.social_weight * r2 * (global_best_position.to(self.device) - self.position)

self.velocity = self.momentum * self.velocity + inertia + cognitive + social

def update_position(self) -> None:
"""
Update the position of the particle based on the updated velocity.

The position is updated directly without any bounds checking.
Update the particle's position based on the new velocity.
"""
self.position = self.position + self.velocity # Update position directly without clipping
self.position = tf.clip_by_value(self.position + self.velocity, 0, 1) # Ensure position stays within bounds
self.history.append(tf.identity(self.position)) # Store the position history
self.position = tf.clip_by_value(self.position, -self.clip_value_position, self.clip_value_position)
self.position = self.position + self.velocity
self.position = torch.clamp(self.position, 0.0, 1.0) # Keep values in [0, 1]
self.position = torch.clamp(self.position, -self.clip_value_position, self.clip_value_position)
self.history.append(self.position.clone().detach())

def evaluate(self) -> None:
"""
Evaluate the fitness of the current particle and update its personal best.

The fitness score is calculated using the target class probability. If the current fitness score
is better than the personal best, update the best position and score.
Evaluate current fitness and update personal best if needed.
"""
score = self.fitness() # Get the current fitness score based on the perturbation
if score > self.best_score: # If score is better than the personal best, update the best position
score = self.fitness()
if score > self.best_score:
self.best_score = score
self.best_position = tf.identity(self.position) # Clone the current position

self.best_position = self.position.clone().detach()
Loading
Loading