15
15
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
16
16
# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
17
17
# SOFTWARE.
18
- from __future__ import absolute_import , division , print_function , unicode_literals , annotations
18
+ from __future__ import (
19
+ absolute_import ,
20
+ division ,
21
+ print_function ,
22
+ unicode_literals ,
23
+ annotations ,
24
+ )
19
25
20
26
import logging
21
27
import warnings
22
- from typing import TYPE_CHECKING , Tuple , Union , Dict , Any , List
28
+ from typing import TYPE_CHECKING , Any
23
29
24
30
import numpy as np
25
31
import tensorflow as tf
26
- from numpy ._typing import _64Bit
27
32
from sklearn .base import ClusterMixin
28
33
from sklearn .cluster import DBSCAN
29
34
from tensorflow .keras import Model , Sequential
39
44
tf .get_logger ().setLevel (logging .WARN )
40
45
41
46
42
- def _encode_labels (y : np .ndarray ) -> Tuple [np .ndarray , set , np .ndarray , dict ]:
47
+ def _encode_labels (y : np .ndarray ) -> tuple [np .ndarray , set , np .ndarray , dict ]:
43
48
"""
44
- Given the target column, it generates the label encoding and the reverse mapping to use in the classification process
49
+ Given the target column, it generates the label encoding and the reverse mapping to use in the
50
+ classification process
45
51
46
52
:param y: 1D np.ndarray with single values that represent the different classes
47
53
:return: (y_encoded, unique_classes, label_mapping, reverse_mapping) encoded column, set of unique classes,
@@ -74,25 +80,34 @@ def _calculate_centroid(selected_indices: np.ndarray, features: np.ndarray) -> n
74
80
75
81
76
82
def _class_clustering (
77
- y : np .ndarray , features : np .ndarray , label : Union [ int , str ] , clusterer : ClusterMixin
78
- ) -> Tuple [np .ndarray , np .ndarray ]:
83
+ y : np .ndarray , features : np .ndarray , label : int | str , clusterer : ClusterMixin
84
+ ) -> tuple [np .ndarray , np .ndarray ]:
79
85
"""
80
86
Given a class label, it clusters all the feature representations that map to that class
81
87
82
88
:param y: array of n class labels
83
89
:param label: class label in the classification task
84
90
:param features: numpy array d-dimensional features for n data entries
85
- :return: (cluster_labels, selected_indices) ndarrays of equal size with cluster labels and corresponding original indices.
91
+ :param clusterer: clustering algorithm used
92
+ :return: (cluster_labels, selected_indices) ndarrays of equal size with cluster labels and corresponding
93
+ original indices.
86
94
"""
87
- logging .info (f "Clustering class { label } ..." )
95
+ logging .info ("Clustering class %s ..." , label )
88
96
selected_indices = np .where (y == label )[0 ]
89
97
selected_features = features [selected_indices ]
90
98
cluster_labels = clusterer .fit_predict (selected_features )
91
99
return cluster_labels , selected_indices
92
100
93
101
94
102
@tf .function
95
- def _calculate_features (feature_representation_model , x ):
103
+ def _calculate_features (feature_representation_model : Model , x : np .ndarray ) -> np .ndarray :
104
+ """
105
+ Calculates the features using the first DNN slice
106
+
107
+ :param feature_representation_model: DNN submodel from input up to feature abstraction
108
+ :param x: input data
109
+ :return: features array
110
+ """
96
111
return feature_representation_model (x , training = False )
97
112
98
113
@@ -101,6 +116,7 @@ def _feature_extraction(x_train: np.ndarray, feature_representation_model: Model
101
116
Extract features from the model using the feature representation sub model.
102
117
103
118
:param x_train: numpy array d-dimensional features for n data entries. Features are extracted from here
119
+ :param feature_representation_model: DNN submodel from input up to feature abstraction
104
120
:return: features. numpy array of features
105
121
"""
106
122
# Convert data to TensorFlow tensors if needed
@@ -111,7 +127,7 @@ def _feature_extraction(x_train: np.ndarray, feature_representation_model: Model
111
127
# Process in batches to avoid memory issues
112
128
batch_size = 256
113
129
num_batches = int (np .ceil (len (data ) / batch_size ))
114
- features : List [tf .Tensor ] = []
130
+ features : list [tf .Tensor ] = []
115
131
116
132
for i in range (num_batches ):
117
133
start_idx = i * batch_size
@@ -131,25 +147,33 @@ def _feature_extraction(x_train: np.ndarray, feature_representation_model: Model
131
147
132
148
133
149
def _cluster_classes (
134
- y_train : np .ndarray , unique_classes : set [int ], features : np .ndarray , clusterer : ClusterMixin
135
- ) -> Tuple [np .ndarray , dict ]:
150
+ y_train : np .ndarray ,
151
+ unique_classes : set [int ],
152
+ features : np .ndarray ,
153
+ clusterer : ClusterMixin ,
154
+ ) -> tuple [np .ndarray , dict ]:
136
155
"""
137
156
Clusters all the classes in the given dataset into uniquely identifiable clusters.
138
157
158
+ :param y_train: numpy array of labels for n data entries
159
+ :param unique_classes: set of unique classes
139
160
:param features: feature representations' array of n rows
161
+ :param clusterer: clustering algorithm used
140
162
:return: (class_cluster_labels, cluster_class_mapping)
141
163
"""
142
164
# represents the number of clusters used up until now to differentiate clusters obtained in different
143
165
# clustering runs by classes
144
166
logging .info ("Clustering classes..." )
145
167
used_cluster_labels = 0
146
- cluster_class_mapping = dict ()
168
+ cluster_class_mapping = {}
147
169
class_cluster_labels = np .full (len (y_train ), - 1 )
148
170
149
- logging .info ( f "Unique classes are: { unique_classes } " )
171
+ logging .debug ( "Unique classes are: %s" , unique_classes )
150
172
151
173
for class_label in unique_classes :
152
- cluster_labels , selected_indices = _class_clustering (y_train , features , class_label , clusterer )
174
+ cluster_labels , selected_indices = _class_clustering (
175
+ y_train , features , class_label , clusterer
176
+ )
153
177
# label values are adjusted to account for labels of previous clustering tasks
154
178
cluster_labels [cluster_labels != - 1 ] += used_cluster_labels
155
179
used_cluster_labels += len (np .unique (cluster_labels [cluster_labels != - 1 ]))
@@ -187,39 +211,46 @@ class ClusteringCentroidAnalysis(PoisonFilteringDefence):
187
211
valid_clustering = ["DBSCAN" ]
188
212
valid_reduce = ["UMAP" ]
189
213
190
- def _get_benign_data (self ) -> Tuple [np .ndarray , np .ndarray ]:
214
+ def _get_benign_data (self ) -> tuple [np .ndarray , np .ndarray ]:
191
215
"""
192
216
Retrieves the benign data from the training data using benign indices
193
217
194
218
:return: (x_benign, y_benign) ndarrays with the benign data.
195
219
"""
196
220
if len (self .benign_indices ) == 0 :
197
- raise ValueError (f"Benign indices passed ({ len (self .benign_indices )} ) are not enough to run the algorithm" )
221
+ raise ValueError (
222
+ f"Benign indices passed ({ len (self .benign_indices )} ) are not enough to run the algorithm"
223
+ )
198
224
199
225
return self .x_train [self .benign_indices ], self .y_train [self .benign_indices ]
200
226
201
- def _extract_submodels (self , final_feature_layer_name : str ) -> Tuple [Model , Model ]:
227
+ def _extract_submodels (self , final_feature_layer_name : str ) -> tuple [Model , Model ]:
202
228
"""
203
229
Extracts the feature representation and final classifier submodels from the original classifier.
204
230
Composition of both models should result in the original model
205
231
206
- :param final_feature_layer_name: Name of the final layer in feature abstraction. Should be a ReLu-activated layer
207
- as suggested in the paper.
232
+ :param final_feature_layer_name: Name of the final layer in feature abstraction.
233
+ Should be a ReLu-activated layer as suggested in the paper.
208
234
:return: (feature_representation_submodel, classifying_submodel)
209
235
"""
210
236
logging .info ("Extracting submodels..." )
211
237
keras_model : Model = self .classifier .model
212
238
213
239
try :
214
240
final_feature_layer = keras_model .get_layer (name = final_feature_layer_name )
215
- except ValueError :
216
- raise ValueError (f"Layer with name '{ final_feature_layer_name } ' not found in the model." )
241
+ except ValueError as exc :
242
+ raise ValueError (
243
+ f"Layer with name '{ final_feature_layer_name } ' not found in the model."
244
+ ) from exc
217
245
218
246
if (
219
247
not hasattr (final_feature_layer , "activation" )
220
248
or final_feature_layer .activation != tf .keras .activations .relu
221
249
):
222
- warnings .warn (f"Final feature layer '{ final_feature_layer_name } ' must have a ReLU activation." , UserWarning )
250
+ warnings .warn (
251
+ f"Final feature layer '{ final_feature_layer_name } ' must have a ReLU activation." ,
252
+ UserWarning ,
253
+ )
223
254
224
255
# Create a feature representation submodel with weight sharing
225
256
feature_representation_model = Model (
@@ -284,11 +315,18 @@ def __init__(
284
315
self .reducer = reducer
285
316
self .clusterer = clusterer
286
317
self .benign_indices = benign_indices
287
- self .y_train , self .unique_classes , self .class_mapping , self .reverse_class_mapping = _encode_labels (y_train )
318
+ (
319
+ self .y_train ,
320
+ self .unique_classes ,
321
+ self .class_mapping ,
322
+ self .reverse_class_mapping ,
323
+ ) = _encode_labels (y_train )
288
324
289
325
self .x_benign , self .y_benign = self ._get_benign_data ()
290
326
291
- self .feature_representation_model , self .classifying_submodel = self ._extract_submodels (final_feature_layer_name )
327
+ self .feature_representation_model , self .classifying_submodel = self ._extract_submodels (
328
+ final_feature_layer_name
329
+ )
292
330
293
331
self .misclassification_threshold = np .float64 (misclassification_threshold )
294
332
logger .info ("CCA object created successfully." )
@@ -311,13 +349,16 @@ def evaluate_defence(self, is_clean: np.ndarray, **kwargs) -> str:
311
349
is_clean_by_class .append (is_clean [class_indices ])
312
350
313
351
# Create evaluator and analyze results
314
- errors_by_class , confusion_matrix_json = evaluator .analyze_correctness (
315
- assigned_clean_by_class = assigned_clean_by_class , is_clean_by_class = is_clean_by_class
352
+ _ , confusion_matrix_json = evaluator .analyze_correctness (
353
+ assigned_clean_by_class = assigned_clean_by_class ,
354
+ is_clean_by_class = is_clean_by_class ,
316
355
)
317
356
318
357
return confusion_matrix_json
319
358
320
- def _calculate_misclassification_rate (self , class_label : int , deviation : np .ndarray ) -> np .float64 :
359
+ def _calculate_misclassification_rate (
360
+ self , class_label : int , deviation : np .ndarray
361
+ ) -> np .float64 :
321
362
"""
322
363
Calculate the misclassification rate when applying a deviation to other classes.
323
364
@@ -404,16 +445,20 @@ def predict_with_deviation(features, deviation):
404
445
return np .float64 (0.0 )
405
446
406
447
all_f_vectors_np = np .concatenate (all_features , axis = 0 )
407
- logger .info (
408
- f"MR --> { class_label } , |f| = { np .linalg .norm (np .mean (all_f_vectors_np , axis = 0 ))} : { misclassified_elements } / { total_elements } = { np .float64 (misclassified_elements ) / np .float64 (total_elements )} "
448
+ logger .debug (
449
+ "MR --> %s , |f| = %s: %s / %s = %s" ,
450
+ class_label ,
451
+ np .linalg .norm (np .mean (all_f_vectors_np , axis = 0 )),
452
+ misclassified_elements ,
453
+ total_elements ,
454
+ np .float64 (misclassified_elements ) / np .float64 (total_elements ),
409
455
)
410
456
411
457
return np .float64 (misclassified_elements ) / np .float64 (total_elements )
412
458
413
- def detect_poison (self , ** kwargs ) -> Tuple [dict , List [int ]]:
414
-
459
+ def detect_poison (self , ** kwargs ) -> tuple [dict , list [int ]]:
415
460
# saves important information about the algorithm execution for further analysis
416
- report : Dict [str , Any ] = dict ()
461
+ report : dict [str , Any ] = {}
417
462
418
463
self .is_clean_np = np .ones (len (self .y_train ))
419
464
@@ -436,58 +481,75 @@ def detect_poison(self, **kwargs) -> Tuple[dict, List[int]]:
436
481
437
482
# cluster labels are saved in the report
438
483
report ["cluster_labels" ] = self .get_clusters ()
439
- report ["cluster_data" ] = dict ()
484
+ report ["cluster_data" ] = {}
440
485
441
486
logging .info ("Calculating real centroids..." )
442
- real_centroids = dict ()
487
+ real_centroids = {}
443
488
444
489
# for each cluster found for each target class
445
490
for label in np .unique (self .class_cluster_labels [self .class_cluster_labels != - 1 ]):
446
491
selected_elements = np .where (self .class_cluster_labels == label )[0 ]
447
492
real_centroids [label ] = _calculate_centroid (selected_elements , self .features )
448
493
449
- report ["cluster_data" ][label ] = dict ()
494
+ report ["cluster_data" ][label ] = {}
450
495
report ["cluster_data" ][label ]["size" ] = len (selected_elements )
451
496
452
497
logging .info ("Calculating benign centroids..." )
453
- benign_centroids = dict ()
498
+ benign_centroids = {}
454
499
455
- logger .info (f "Target classes are: { self .unique_classes } " )
500
+ logger .info ("Target classes are: %s" , self .unique_classes )
456
501
457
502
# for each target class
458
503
for class_label in self .unique_classes :
459
- benign_class_indices = np .intersect1d (self .benign_indices , np .where (self .y_train == class_label )[0 ])
504
+ benign_class_indices = np .intersect1d (
505
+ self .benign_indices , np .where (self .y_train == class_label )[0 ]
506
+ )
460
507
benign_centroids [class_label ] = _calculate_centroid (benign_class_indices , self .features )
461
508
462
509
logging .info ("Calculating misclassification rates..." )
463
- misclassification_rates = dict ()
510
+ misclassification_rates = {}
464
511
465
512
for cluster_label , centroid in real_centroids .items ():
466
513
class_label = self .cluster_class_mapping [cluster_label ]
467
514
# B^k_i
468
515
deviation = centroid - benign_centroids [class_label ]
469
516
470
517
# MR^k_i
471
- # with unique cluster labels for each cluster in each clustering run, the label already maps to a target class
472
- misclassification_rates [cluster_label ] = self ._calculate_misclassification_rate (class_label , deviation )
518
+ # with unique cluster labels for each cluster in each clustering run, the label
519
+ # already maps to a target class
520
+ misclassification_rates [cluster_label ] = self ._calculate_misclassification_rate (
521
+ class_label , deviation
522
+ )
473
523
logging .info (
474
- f"MR (k={ cluster_label } , i={ class_label } , |d|={ np .linalg .norm (deviation )} ) = { misclassification_rates [cluster_label ]} "
524
+ "MR (k=%s, i=%s, |d|=%s) = %s" ,
525
+ cluster_label ,
526
+ class_label ,
527
+ np .linalg .norm (deviation ), # This will be evaluated, but only if the log is emitted
528
+ misclassification_rates [cluster_label ],
475
529
)
476
530
477
- report ["cluster_data" ][cluster_label ]["centroid_l2" ] = np .linalg .norm (real_centroids [cluster_label ])
531
+ report ["cluster_data" ][cluster_label ]["centroid_l2" ] = np .linalg .norm (
532
+ real_centroids [cluster_label ]
533
+ )
478
534
report ["cluster_data" ][cluster_label ]["deviation_l2" ] = np .linalg .norm (deviation )
479
535
report ["cluster_data" ][cluster_label ]["class" ] = class_label
480
- report ["cluster_data" ][cluster_label ]["misclassification_rate" ] = misclassification_rates [cluster_label ]
536
+ report ["cluster_data" ][cluster_label ]["misclassification_rate" ] = (
537
+ misclassification_rates [cluster_label ]
538
+ )
481
539
482
540
logging .info ("Evaluating cluster misclassification..." )
483
541
for cluster_label , mr in misclassification_rates .items ():
484
542
if mr >= 1 - self .misclassification_threshold :
485
543
cluster_indices = np .where (self .class_cluster_labels == cluster_label )[0 ]
486
544
self .is_clean_np [cluster_indices ] = 0
487
545
logging .info (
488
- f"Cluster k={ cluster_label } i={ self .cluster_class_mapping [cluster_label ]} considered poison ({ misclassification_rates [cluster_label ]} >= { 1 - self .misclassification_threshold } )"
546
+ "Cluster k=%s i=%s considered poison (%s >= %s)" ,
547
+ cluster_label ,
548
+ self .cluster_class_mapping [cluster_label ],
549
+ misclassification_rates [cluster_label ],
550
+ 1 - self .misclassification_threshold ,
489
551
)
490
552
491
553
# Forced conversion for interface consistency
492
- self .is_clean : List [int ] = self .is_clean_np .tolist ()
554
+ self .is_clean : list [int ] = self .is_clean_np .tolist ()
493
555
return report , self .is_clean .copy ()
0 commit comments