1414from Adversarial_Observation import AdversarialTester , ParticleSwarm
1515
1616from sklearn .metrics import roc_auc_score , average_precision_score
17+ import matplotlib .pyplot as plt
1718
18- import sys
19+ import sys , os , json
1920
2021def evaluate_model (model , test_dataset ):
2122 """
@@ -73,9 +74,10 @@ def load_MNIST_model(model_path=None):
7374
7475def normalize_mnist (x ):
7576 """Applies mean/std normalization to MNIST image"""
76- mean = 0.1307
77- std = 0.3081
78- return (x / 255.0 - mean ) / std
77+ # mean = 0.1307
78+ # std = 0.3081
79+ # return ((x / 255.0) - mean) / std
80+ return x / 255.0
7981
8082def load_data (batch_size = 32 ):
8183 """
@@ -143,7 +145,7 @@ def train(model: tf.keras.Model, train_dataset: tf.data.Dataset, epochs: int = 1
143145
144146 return model
145147
146- def adversarial_attack_blackbox (model : tf .keras .Model , dataset : tf .data .Dataset , image_index : int , num_iterations : int = 30 , num_particles : int = 100 ) -> tf .data .Dataset :
148+ def adversarial_attack_blackbox (model : tf .keras .Model , dataset : tf .data .Dataset , image_index : int , output_dir : str = 'results' , num_iterations : int = 30 , num_particles : int = 100 ) -> tf .data .Dataset :
147149 """
148150 Performs a black-box adversarial attack on a specific image in the dataset using Particle Swarm optimization.
149151
@@ -178,7 +180,7 @@ def adversarial_attack_blackbox(model: tf.keras.Model, dataset: tf.data.Dataset,
178180 "Target classes should be different for misclassification."
179181
180182 # Create a noisy input set for black-box attack
181- input_set = [single_image_input + np .random .normal (0 , 0.75 , single_image_input .shape ) for _ in range (num_particles )]
183+ input_set = [single_image_input + ( np .random .uniform (0 , 1 , single_image_input .shape ) * ( np . random . rand ( * single_image_input . shape ) < 0.9 )) for _ in range (num_particles ) ]
182184 input_set = np .stack (input_set )
183185
184186 print (f"Original class: { single_image_target } " )
@@ -187,17 +189,106 @@ def adversarial_attack_blackbox(model: tf.keras.Model, dataset: tf.data.Dataset,
187189 # Initialize the Particle Swarm optimizer with the model and input set
188190 attacker = ParticleSwarm (
189191 model , input_set , single_misclassification_target , num_iterations = num_iterations ,
190- epsilon = 0.8 , save_dir = 'results' , inertia_weight = 0.8 , cognitive_weight = 0.5 ,
191- social_weight = 0.5 , momentum = 0.9 , velocity_clamp = 0.1
192+ epsilon = 1 , save_dir = output_dir , inertia_weight = 1 , cognitive_weight = 0.8 ,
193+ social_weight = 0.5 , momentum = 0.9 , velocity_clamp = 0.2
192194 )
195+
193196 attacker .optimize ()
194197
198+ analysis (attacker , single_image_input , single_misclassification_target )
199+
200+ def analysis (attacker , single_misclassification_input : np .ndarray , single_misclassification_target ):
201+ """
202+ Analyzes the results of the attack and generates plots.
203+ - Saves the original misclassification input and target.
204+ - For each particle and each position in the particle's history:
205+ - Save the position (perturbed image).
206+ - Save all confidence values.
207+ - Save the maximum output (softmax confidence).
208+ - Save the difference from the original input.
209+ """
210+ # Save the original image and its classification
211+ plt .imsave (os .path .join (attacker .save_dir , "original.png" ), single_misclassification_input .squeeze (), cmap = "gray" , vmin = 0 , vmax = 1 )
212+
213+ analysis_results = {
214+ "original_misclassification_input" : single_misclassification_input .tolist (),
215+ "original_misclassification_target" : int (single_misclassification_target ),
216+ "particles" : []
217+ }
218+
219+ # Process each particle in the attacker's particles list
220+ for particle_idx , particle in enumerate (attacker .particles ):
221+ print (f"Processing particle: { particle_idx } " )
222+ particle_data = {
223+ "particle_index" : particle_idx ,
224+ "positions" : [],
225+ "confidence_values" : [],
226+ "max_output_values" : [],
227+ "max_output_classes" : [],
228+ "differences_from_original" : [],
229+ "confidence_over_time" : [] # Store confidence over time
230+ }
231+
232+ for step_idx , position in enumerate (particle .history ):
233+ # Ensure 'position' is a numpy array.
234+ if isinstance (position , tf .Tensor ):
235+ position_np = position .numpy ()
236+ else :
237+ position_np = np .array (position )
238+
239+ output = attacker .model (position_np )
240+
241+ # Remove the batch dimension and apply softmax
242+ softmax_output = tf .nn .softmax (tf .squeeze (output ), axis = 0 )
243+ confidence_values = softmax_output .numpy ().tolist ()
244+ max_output_value = float (max (confidence_values ))
245+ max_output_class = confidence_values .index (max_output_value )
246+
247+ # Calculate pixel-wise difference from original image (before attack)
248+ #diff_image = np.abs(position_np - single_misclassification_input)[0]
249+ diff_image = (position_np - single_misclassification_input )[0 ]
250+ #print(position_np)
251+ #print(single_misclassification_input)
252+ #print(diff_image)
253+ # Save the difference image
254+ iteration_folder = os .path .join (attacker .save_dir , f"iteration_{ step_idx + 1 } " )
255+ if not os .path .exists (iteration_folder ):
256+ os .makedirs (iteration_folder )
257+ plt .imsave (os .path .join (iteration_folder , f"attack-vector_image_{ particle_idx + 1 } .png" ), diff_image .squeeze (), cmap = "seismic" , vmin = - 1 , vmax = 1 )
258+
259+ # Calculate difference from original image (before attack)
260+ difference_from_original = float (np .linalg .norm (position - single_misclassification_input ))
261+
262+ # Add data for this step to the particle_data
263+ particle_data ["positions" ].append (position_np .tolist ())
264+ particle_data ["confidence_values" ].append (confidence_values )
265+ particle_data ["max_output_values" ].append (max_output_value )
266+ particle_data ["max_output_classes" ].append (max_output_class )
267+ particle_data ["differences_from_original" ].append (difference_from_original )
268+ particle_data ["confidence_over_time" ].append (max_output_value ) # Store max output (confidence)
269+
270+ # Append the particle's data to the main analysis results
271+ analysis_results ["particles" ].append (particle_data )
272+
273+ # Save the analysis results to a JSON file
274+ output_dir = attacker .save_dir # Use the save_dir from the attacker
275+ os .makedirs (output_dir , exist_ok = True )
276+ file_path = os .path .join (output_dir , "attack_analysis.json" )
277+
278+ with open (file_path , "w" ) as f :
279+ json .dump (analysis_results , f , indent = 4 )
280+
281+ print (f"Analysis results saved to { file_path } " )
282+
195283def main () -> None :
196284 """
197285 Main function to execute the adversarial attack workflow.
198286 """
199287 parser = argparse .ArgumentParser (description = "Adversarial attack workflow with optional pre-trained Keras model." )
200288 parser .add_argument ('--model_path' , type = str , default = None , help = "Path to a pre-trained Keras model." )
289+ parser .add_argument ('--iterations' , type = int , default = 50 , help = "Number of iterations for the black-box attack." )
290+ parser .add_argument ('--particles' , type = int , default = 100 , help = "Number of particles for the black-box attack." )
291+ parser .add_argument ('--save_dir' , type = str , default = "analysis_results" , help = "Directory to save analysis results." )
201292 args = parser .parse_args ()
202293
203294 #seed_everything(1252025)
@@ -219,7 +310,7 @@ def main() -> None:
219310 evaluate_model (model , test_dataset )
220311
221312 # Perform adversarial attack
222- adversarial_dataset = adversarial_attack_blackbox (model , test_dataset , 0 , 50 , 100 )
313+ adversarial_dataset = adversarial_attack_blackbox (model , test_dataset , 0 , output_dir = args . save_dir , num_iterations = args . iterations , num_particles = args . particles )
223314
224315if __name__ == "__main__" :
225316 main ()
0 commit comments