1
+ # -*- coding: utf-8 -*-
2
+ """DynamicBackdoorGAN_Demo.ipynb
3
+
4
+ Automatically generated by Colab.
5
+
6
+ Original file is located at
7
+ https://colab.research.google.com/drive/1aMV5GZ7Z0cwuUl36NxFUsBU5RoJunCGA
8
+ """
9
+
10
+ pip install adversarial - robustness - toolbox
11
+
12
+ # Imports
13
+ import torch
14
+ import torch .nn as nn
15
+ import numpy as np
16
+ from torch .utils .data import Subset
17
+ from torchvision import datasets , transforms , models
18
+ from art .estimators .classification import PyTorchClassifier
19
+ from art .utils import to_categorical
20
+ from art .attacks .poisoning import PoisoningAttackBackdoor
21
+
22
+ # User Config
23
+ config = {
24
+ "dataset" : "CIFAR10" , # CIFAR10, CIFAR100, MNIST
25
+ "model_name" : "resnet18" , # resnet18, resnet50, mobilenetv2, densenet121
26
+ "poison_ratio" : 0.1 ,
27
+ "target_label" : 0 , # Target label to which poisoned samples are mapped
28
+ "epochs" : 30 ,
29
+ "batch_size" : 128 ,
30
+ "epsilon" : 0.5 # Trigger strength
31
+ }
32
+
33
+ # #Trigger Generator:A small CNN that learns to generate input-specific triggers
34
+ class TriggerGenerator (nn .Module ):
35
+ def __init__ (self , input_channels = 3 ):
36
+ super ().__init__ ()
37
+ self .net = nn .Sequential (
38
+ nn .Conv2d (input_channels , 32 , kernel_size = 3 , padding = 1 ),
39
+ nn .ReLU (),
40
+ nn .Conv2d (32 , 32 , kernel_size = 3 , padding = 1 ),
41
+ nn .ReLU (),
42
+ nn .Conv2d (32 , input_channels , kernel_size = 3 , padding = 1 ),
43
+ nn .Tanh ()
44
+ )
45
+
46
+ def forward (self , x ):
47
+ return self .net (x )
48
+
49
+ # Custom Poisoning Attack: DynamicBackdoorGAN-This class defines how to poison data using the GAN trigger generator
50
+ class DynamicBackdoorGAN (PoisoningAttackBackdoor ):
51
+ def __init__ (self , generator , target_label , backdoor_rate , classifier , epsilon = 0.5 ):
52
+ super ().__init__ (perturbation = lambda x : x )
53
+ self .classifier = classifier
54
+ self .generator = generator .to (classifier .device )
55
+ self .target_label = target_label
56
+ self .backdoor_rate = backdoor_rate
57
+ self .epsilon = epsilon
58
+ # Add trigger to a given image batch
59
+ def apply_trigger (self , images ):
60
+ self .generator .eval ()
61
+ with torch .no_grad ():
62
+ images = nn .functional .interpolate (images , size = (32 , 32 ), mode = 'bilinear' ) # Resize images to ensure uniform dimension
63
+ triggers = self .generator (images .to (self .classifier .device )) #Generate dynamic, input-specific triggers using the trained TriggerGenerator
64
+ poisoned = (images .to (self .classifier .device ) + self .epsilon * triggers ).clamp (0 , 1 ) # Clamp the pixel values to ensure they stay in the valid [0, 1] range.
65
+ return poisoned
66
+ # Poison the training data by injecting dynamic triggers and changing labels
67
+ def poison (self , x , y ):
68
+ # Convert raw image data (x) to torch tensors (float), and convert one-hot labels (y) to class indices-required by ART
69
+ x_tensor = torch .tensor (x ).float ()
70
+ y_tensor = torch .tensor (np .argmax (y , axis = 1 ))
71
+ # Calculate total number of samples and how many should be poisoned(posion ratio=backdoor_rate)
72
+ batch_size = x_tensor .shape [0 ]
73
+ n_poison = int (self .backdoor_rate * batch_size )
74
+ # Apply the learned trigger to the first 'n_poison' samples
75
+ poisoned = self .apply_trigger (x_tensor [:n_poison ])
76
+ # The remaining samples remain clean
77
+ clean = x_tensor [n_poison :].to (self .classifier .device )
78
+ # Combine poisoned and clean samples into a single batch
79
+ poisoned_images = torch .cat ([poisoned , clean ], dim = 0 ).cpu ().numpy ()
80
+ # Modify the labels of poisoned samples to the attacker's target class
81
+ new_labels = y_tensor .clone ()
82
+ new_labels [:n_poison ] = self .target_label # Set the poisoned labels to the desired misclassification
83
+ # Convert all labels back to one-hot encoding (required by ART classifiers)
84
+ new_labels = to_categorical (new_labels .numpy (), nb_classes = self .classifier .nb_classes )
85
+ return poisoned_images .astype (np .float32 ), new_labels .astype (np .float32 )
86
+ #Evaluate the attack's success on test data
87
+ def evaluate (self , x_clean , y_clean ):
88
+ x_tensor = torch .tensor (x_clean ).float ()
89
+ poisoned_test = self .apply_trigger (x_tensor ).cpu ().numpy ().astype (np .float32 )# Apply the trigger to every test image to create a poisoned test set
90
+
91
+ preds = self .classifier .predict (poisoned_test )
92
+ true_target = np .full ((len (preds ),), self .target_label )
93
+ pred_labels = np .argmax (preds , axis = 1 )
94
+
95
+ success = np .sum (pred_labels == true_target )
96
+ asr = 100.0 * success / len (pred_labels )
97
+ return asr
98
+
99
+ # ✅ Utility: Load Data
100
+ def get_data (dataset = "CIFAR10" , train_subset = None , test_subset = None ):
101
+ if dataset in ["CIFAR10" , "CIFAR100" ]:
102
+ transform = transforms .Compose ([transforms .Resize ((32 , 32 )), transforms .ToTensor ()])
103
+ elif dataset == "MNIST" :
104
+ transform = transforms .Compose ([
105
+ transforms .Grayscale (num_output_channels = 3 ),
106
+ transforms .Resize ((32 , 32 )),
107
+ transforms .ToTensor ()
108
+ ])
109
+ else :
110
+ raise ValueError ("Unsupported dataset" )
111
+
112
+ if dataset == "CIFAR10" :
113
+ dataset_cls = datasets .CIFAR10
114
+ num_classes = 10
115
+ elif dataset == "CIFAR100" :
116
+ dataset_cls = datasets .CIFAR100
117
+ num_classes = 100
118
+ elif dataset == "MNIST" :
119
+ dataset_cls = datasets .MNIST
120
+ num_classes = 10
121
+
122
+ train_set = dataset_cls (root = "./data" , train = True , download = True , transform = transform )
123
+ test_set = dataset_cls (root = "./data" , train = False , download = True , transform = transform )
124
+
125
+ if train_subset is not None :
126
+ train_set = Subset (train_set , range (train_subset ))
127
+ if test_subset is not None :
128
+ test_set = Subset (test_set , range (test_subset ))
129
+
130
+ x_train = torch .stack ([x for x , _ in train_set ]).numpy ()
131
+ y_train = to_categorical ([y for _ , y in train_set ], nb_classes = num_classes )
132
+
133
+ x_test = torch .stack ([x for x , _ in test_set ]).numpy ()
134
+ y_test = to_categorical ([y for _ , y in test_set ], nb_classes = num_classes )
135
+
136
+ return x_train , y_train , x_test , y_test , num_classes
137
+
138
+ # Utility: Get ART Classifier:Returns an ART-compatible classifier wrapped around a selected PyTorch model
139
+ def get_classifier (config ):
140
+ model_name = config ["model_name" ]
141
+ nb_classes = config ["nb_classes" ]
142
+ input_shape = config ["input_shape" ]
143
+ lr = config .get ("learning_rate" , 0.001 )
144
+
145
+ if model_name == "resnet18" :
146
+ model = models .resnet18 (num_classes = nb_classes )
147
+ elif model_name == "resnet50" :
148
+ model = models .resnet50 (num_classes = nb_classes )
149
+ elif model_name == "mobilenetv2" :
150
+ model = models .mobilenet_v2 (num_classes = nb_classes )
151
+ elif model_name == "densenet121" :
152
+ model = models .densenet121 (num_classes = nb_classes )
153
+ else :
154
+ raise ValueError (f"Unsupported model: { model_name } " )
155
+
156
+ loss = torch .nn .CrossEntropyLoss ()
157
+ optimizer = torch .optim .Adam (model .parameters (), lr = lr )
158
+
159
+ classifier = PyTorchClassifier (
160
+ model = model ,
161
+ loss = loss ,
162
+ optimizer = optimizer ,
163
+ input_shape = input_shape ,
164
+ nb_classes = nb_classes ,
165
+ clip_values = (0.0 , 1.0 ),
166
+ device_type = "gpu" if torch .cuda .is_available () else "cpu"
167
+ )
168
+ return classifier
169
+
170
+ # Full Experiment:Runs both clean training and poisoned training, and evaluates the effectiveness of the backdoor attack
171
+ def run_dynamic_backdoor_experiment (config ):
172
+ x_train , y_train , x_test , y_test , num_classes = get_data (
173
+ dataset = config ["dataset" ],
174
+ train_subset = config .get ("train_subset" ),
175
+ test_subset = config .get ("test_subset" )
176
+ )
177
+ config ["nb_classes" ] = num_classes
178
+ config ["input_shape" ] = x_train .shape [1 :]
179
+
180
+ classifier = get_classifier (config )
181
+
182
+ # Clean training
183
+ classifier .fit (x_train , y_train , nb_epochs = config ["epochs" ], batch_size = config ["batch_size" ])
184
+ clean_acc = np .mean (np .argmax (classifier .predict (x_test ), axis = 1 ) == np .argmax (y_test , axis = 1 ))
185
+ print (f"Clean Accuracy: { clean_acc * 100 :.2f} %" )
186
+
187
+ # Poison training
188
+ generator = TriggerGenerator ()
189
+ attack = DynamicBackdoorGAN (
190
+ generator ,
191
+ config ["target_label" ],
192
+ config ["poison_ratio" ],
193
+ classifier ,
194
+ epsilon = config ["epsilon" ]
195
+ )
196
+ x_poison , y_poison = attack .poison (x_train , y_train )
197
+
198
+ classifier .fit (x_poison , y_poison , nb_epochs = config ["epochs" ], batch_size = config ["batch_size" ])
199
+ poisoned_acc = np .mean (np .argmax (classifier .predict (x_test ), axis = 1 ) == np .argmax (y_test , axis = 1 ))
200
+ print (f"Poisoned Accuracy: { poisoned_acc * 100 :.2f} %" )
201
+
202
+ asr = attack .evaluate (x_test , y_test )
203
+ print (f" Attack Success Rate (ASR): { asr :.2f} %" )
204
+
205
+ # ✅ Run
206
+ run_dynamic_backdoor_experiment (config )
0 commit comments