19
19
20
20
import logging
21
21
import warnings
22
- from typing import TYPE_CHECKING
22
+ from typing import TYPE_CHECKING , Tuple , Union , Dict , Any , List
23
23
24
24
import numpy as np
25
25
import tensorflow as tf
26
+ from numpy ._typing import _64Bit
26
27
from sklearn .base import ClusterMixin
27
28
from sklearn .cluster import DBSCAN
28
29
from tensorflow .keras import Model , Sequential
38
39
tf .get_logger ().setLevel (logging .WARN )
39
40
40
41
41
- def _encode_labels (y : np .array ) -> ( np .array , set , np .array , dict ) :
42
+ def _encode_labels (y : np .ndarray ) -> Tuple [ np .ndarray , set , np .ndarray , dict ] :
42
43
"""
43
44
Given the target column, it generates the label encoding and the reverse mapping to use in the classification process
44
45
45
- :param y: 1D np.array with single values that represent the different classes
46
+ :param y: 1D np.ndarray with single values that represent the different classes
46
47
:return: (y_encoded, unique_classes, label_mapping, reverse_mapping) encoded column, set of unique classes,
47
48
mapping from class to numeric label, and mapping from numeric label to class
48
49
"""
@@ -58,7 +59,7 @@ def _calculate_centroid_tf(features):
58
59
return tf .reduce_mean (features , axis = 0 )
59
60
60
61
61
- def _calculate_centroid (selected_indices : np .ndarray , features : np .array ) -> np .ndarray :
62
+ def _calculate_centroid (selected_indices : np .ndarray , features : np .ndarray ) -> np .ndarray :
62
63
"""
63
64
Returns the centroid of all data within a specific cluster that is classified as a specific class label
64
65
@@ -72,7 +73,9 @@ def _calculate_centroid(selected_indices: np.ndarray, features: np.array) -> np.
72
73
return centroid .numpy ()
73
74
74
75
75
- def _class_clustering (y : np .array , features : np .array , label : any , clusterer : ClusterMixin ) -> (np .array , np .array ):
76
+ def _class_clustering (
77
+ y : np .ndarray , features : np .ndarray , label : Union [int , str ], clusterer : ClusterMixin
78
+ ) -> Tuple [np .ndarray , np .ndarray ]:
76
79
"""
77
80
Given a class label, it clusters all the feature representations that map to that class
78
81
@@ -93,7 +96,7 @@ def _calculate_features(feature_representation_model, x):
93
96
return feature_representation_model (x , training = False )
94
97
95
98
96
- def _feature_extraction (x_train : np .array , feature_representation_model : Model ) -> np .ndarray :
99
+ def _feature_extraction (x_train : np .ndarray , feature_representation_model : Model ) -> np .ndarray :
97
100
"""
98
101
Extract features from the model using the feature representation sub model.
99
102
@@ -108,7 +111,7 @@ def _feature_extraction(x_train: np.array, feature_representation_model: Model)
108
111
# Process in batches to avoid memory issues
109
112
batch_size = 256
110
113
num_batches = int (np .ceil (len (data ) / batch_size ))
111
- features = []
114
+ features : List [ tf . Tensor ] = []
112
115
113
116
for i in range (num_batches ):
114
117
start_idx = i * batch_size
@@ -118,17 +121,18 @@ def _feature_extraction(x_train: np.array, feature_representation_model: Model)
118
121
features .append (batch_features )
119
122
120
123
# Concatenate all batches
124
+ final_features_tensor : tf .Tensor
121
125
if len (features ) > 1 :
122
- features = tf .concat (features , axis = 0 )
126
+ final_features_tensor = tf .concat (features , axis = 0 )
123
127
else :
124
- features = features [0 ]
128
+ final_features_tensor = features [0 ]
125
129
126
- return features .numpy ()
130
+ return final_features_tensor .numpy ()
127
131
128
132
129
133
def _cluster_classes (
130
- y_train : np .array , unique_classes : set [int ], features : np .array , clusterer : ClusterMixin
131
- ) -> ( np .array , dict ) :
134
+ y_train : np .ndarray , unique_classes : set [int ], features : np .ndarray , clusterer : ClusterMixin
135
+ ) -> Tuple [ np .ndarray , dict ] :
132
136
"""
133
137
Clusters all the classes in the given dataset into uniquely identifiable clusters.
134
138
@@ -183,7 +187,7 @@ class ClusteringCentroidAnalysis(PoisonFilteringDefence):
183
187
valid_clustering = ["DBSCAN" ]
184
188
valid_reduce = ["UMAP" ]
185
189
186
- def _get_benign_data (self ) -> ( np .ndarray , np .ndarray ) :
190
+ def _get_benign_data (self ) -> Tuple [ np .ndarray , np .ndarray ] :
187
191
"""
188
192
Retrieves the benign data from the training data using benign indices
189
193
@@ -194,7 +198,7 @@ def _get_benign_data(self) -> (np.ndarray, np.ndarray):
194
198
195
199
return self .x_train [self .benign_indices ], self .y_train [self .benign_indices ]
196
200
197
- def _extract_submodels (self , final_feature_layer_name : str ) -> ( Model , Model ) :
201
+ def _extract_submodels (self , final_feature_layer_name : str ) -> Tuple [ Model , Model ] :
198
202
"""
199
203
Extracts the feature representation and final classifier submodels from the original classifier.
200
204
Composition of both models should result in the original model
@@ -204,7 +208,7 @@ def _extract_submodels(self, final_feature_layer_name: str) -> (Model, Model):
204
208
:return: (feature_representation_submodel, classifying_submodel)
205
209
"""
206
210
logging .info ("Extracting submodels..." )
207
- keras_model = self .classifier .model
211
+ keras_model : Model = self .classifier .model
208
212
209
213
try :
210
214
final_feature_layer = keras_model .get_layer (name = final_feature_layer_name )
@@ -236,9 +240,9 @@ def _extract_submodels(self, final_feature_layer_name: str) -> (Model, Model):
236
240
237
241
return feature_representation_model , classifying_submodel
238
242
239
- def get_clusters (self ) -> np .array :
243
+ def get_clusters (self ) -> np .ndarray :
240
244
"""
241
- :return: np.array with m+1 columns, where m is dimensionality of the dimensionality reducer's output.
245
+ :return: np.ndarray with m+1 columns, where m is dimensionality of the dimensionality reducer's output.
242
246
m columns are used for feature representations and the last column is used for cluster label.
243
247
"""
244
248
# Ensure features have been reduced and clustering has been performed
@@ -259,7 +263,7 @@ def __init__(
259
263
classifier : "CLASSIFIER_TYPE" ,
260
264
x_train : np .ndarray ,
261
265
y_train : np .ndarray ,
262
- benign_indices : np .array ,
266
+ benign_indices : np .ndarray ,
263
267
final_feature_layer_name : str ,
264
268
misclassification_threshold : float ,
265
269
reducer = UMAP (n_neighbors = 5 , min_dist = 0 ),
@@ -313,7 +317,7 @@ def evaluate_defence(self, is_clean: np.ndarray, **kwargs) -> str:
313
317
314
318
return confusion_matrix_json
315
319
316
- def _calculate_misclassification_rate (self , class_label : int , deviation : np .array ) -> np .float64 :
320
+ def _calculate_misclassification_rate (self , class_label : int , deviation : np .ndarray ) -> np .float64 :
317
321
"""
318
322
Calculate the misclassification rate when applying a deviation to other classes.
319
323
@@ -406,12 +410,12 @@ def predict_with_deviation(features, deviation):
406
410
407
411
return np .float64 (misclassified_elements ) / np .float64 (total_elements )
408
412
409
- def detect_poison (self , ** kwargs ) -> ( dict , list [int ]) :
413
+ def detect_poison (self , ** kwargs ) -> Tuple [ dict , List [int ]] :
410
414
411
415
# saves important information about the algorithm execution for further analysis
412
- report = dict ()
416
+ report : Dict [ str , Any ] = dict ()
413
417
414
- self .is_clean = np .ones (len (self .y_train ))
418
+ self .is_clean_np = np .ones (len (self .y_train ))
415
419
416
420
self .features = _feature_extraction (self .x_train , self .feature_representation_model )
417
421
@@ -428,7 +432,7 @@ def detect_poison(self, **kwargs) -> (dict, list[int]):
428
432
429
433
# outliers are poisoned
430
434
outlier_indices = np .where (self .class_cluster_labels == - 1 )[0 ]
431
- self .is_clean [outlier_indices ] = 0
435
+ self .is_clean_np [outlier_indices ] = 0
432
436
433
437
# cluster labels are saved in the report
434
438
report ["cluster_labels" ] = self .get_clusters ()
@@ -479,9 +483,11 @@ def detect_poison(self, **kwargs) -> (dict, list[int]):
479
483
for cluster_label , mr in misclassification_rates .items ():
480
484
if mr >= 1 - self .misclassification_threshold :
481
485
cluster_indices = np .where (self .class_cluster_labels == cluster_label )[0 ]
482
- self .is_clean [cluster_indices ] = 0
486
+ self .is_clean_np [cluster_indices ] = 0
483
487
logging .info (
484
488
f"Cluster k={ cluster_label } i={ self .cluster_class_mapping [cluster_label ]} considered poison ({ misclassification_rates [cluster_label ]} >= { 1 - self .misclassification_threshold } )"
485
489
)
486
490
491
+ # Forced conversion for interface consistency
492
+ self .is_clean : List [int ] = self .is_clean_np .tolist ()
487
493
return report , self .is_clean .copy ()
0 commit comments