2424from __future__ import absolute_import , division , print_function , unicode_literals
2525
2626import logging
27- from typing import TYPE_CHECKING , Tuple
27+ from typing import TYPE_CHECKING , Tuple , Union
2828
2929import numpy as np
3030import scipy .signal as ss
3131
3232from art .attacks .attack import EvasionAttack
33+ from art .estimators .estimator import BaseEstimator , LossGradientsMixin , NeuralNetworkMixin
34+ from art .estimators .pytorch import PyTorchEstimator
3335from art .estimators .speech_recognition .speech_recognizer import SpeechRecognizerMixin
3436from art .estimators .tensorflow import TensorFlowV2Estimator
3537from art .utils import pad_sequence_input
3638
3739if TYPE_CHECKING :
3840 from tensorflow .compat .v1 import Tensor
41+ from torch import Tensor as PTensor
42+
43+ from art .utils import SPEECH_RECOGNIZER_TYPE
3944
4045logger = logging .getLogger (__name__ )
4146
@@ -58,11 +63,11 @@ class ImperceptibleASR(EvasionAttack):
5863 "batch_size" ,
5964 ]
6065
61- _estimator_requirements = (TensorFlowV2Estimator , SpeechRecognizerMixin )
66+ _estimator_requirements = (NeuralNetworkMixin , LossGradientsMixin , BaseEstimator , SpeechRecognizerMixin )
6267
6368 def __init__ (
6469 self ,
65- estimator : "TensorFlowV2Estimator " ,
70+ estimator : "SPEECH_RECOGNIZER_TYPE " ,
6671 masker : "PsychoacousticMasker" ,
6772 eps : float = 2000.0 ,
6873 learning_rate_1 : float = 100.0 ,
@@ -85,10 +90,6 @@ def __init__(
8590 :param max_iter_2: Number of iterations for stage 2 of attack.
8691 :param batch_size: Batch size.
8792 """
88- import tensorflow .compat .v1 as tf1
89-
90- # disable eager execution as Lingvo uses tensorflow.compat.v1 API
91- tf1 .disable_eager_execution ()
9293
9394 # Super initialization
9495 super ().__init__ (estimator = estimator )
@@ -108,16 +109,34 @@ def __init__(
108109 self ._hop_size = masker .hop_size
109110 self ._sample_rate = masker .sample_rate
110111
111- # TensorFlow placeholders
112- self ._delta = tf1 .placeholder (tf1 .float32 , shape = [None , None ], name = "art_delta" )
113- self ._power_spectral_density_maximum_tf = tf1 .placeholder (tf1 .float32 , shape = [None , None ], name = "art_psd_max" )
114- self ._masking_threshold_tf = tf1 .placeholder (
115- tf1 .float32 , shape = [None , None , None ], name = "art_masking_threshold"
116- )
117- # TensorFlow loss gradient ops
118- self ._loss_gradient_masking_threshold_op_tf = self ._loss_gradient_masking_threshold_tf (
119- self ._delta , self ._power_spectral_density_maximum_tf , self ._masking_threshold_tf
120- )
112+ if isinstance (self .estimator , TensorFlowV2Estimator ):
113+ import tensorflow .compat .v1 as tf1
114+
115+ # set framework attribute
116+ self ._framework = "tensorflow"
117+
118+ # disable eager execution and use tensorflow.compat.v1 API, e.g. Lingvo uses TF2v1 AP
119+ tf1 .disable_eager_execution ()
120+
121+ # TensorFlow placeholders
122+ self ._delta = tf1 .placeholder (tf1 .float32 , shape = [None , None ], name = "art_delta" )
123+ self ._power_spectral_density_maximum_tf = tf1 .placeholder (
124+ tf1 .float32 , shape = [None , None ], name = "art_psd_max"
125+ )
126+ self ._masking_threshold_tf = tf1 .placeholder (
127+ tf1 .float32 , shape = [None , None , None ], name = "art_masking_threshold"
128+ )
129+ # TensorFlow loss gradient ops
130+ self ._loss_gradient_masking_threshold_op_tf = self ._loss_gradient_masking_threshold_tf (
131+ self ._delta , self ._power_spectral_density_maximum_tf , self ._masking_threshold_tf
132+ )
133+
134+ elif isinstance (self .estimator , PyTorchEstimator ):
135+ # set framework attribute
136+ self ._framework = "pytorch"
137+ else :
138+ # set framework attribute
139+ self ._framework = None
121140
122141 def generate (self , x : np .ndarray , y : np .ndarray , ** kwargs ) -> np .ndarray :
123142 """
@@ -235,10 +254,12 @@ def _create_imperceptible(self, x: np.ndarray, x_adversarial: np.ndarray, y: np.
235254 if x .ndim != 1 :
236255 alpha = np .expand_dims (alpha , axis = - 1 )
237256
238- perturbation = x_adversarial - x
239257 x_perturbed = x_adversarial .copy ()
240258
241259 for i in range (self .max_iter_2 ):
260+ # get perturbation
261+ perturbation = x_perturbed - x
262+
242263 # get loss gradients of both losses
243264 gradients_net = self .estimator .loss_gradient (x_perturbed , y , batch_mode = True )
244265 gradients_theta , loss_theta = self ._loss_gradient_masking_threshold (perturbation , x )
@@ -305,13 +326,21 @@ def _loss_gradient_masking_threshold(
305326 masking_threshold_stabilized = 10 ** (masking_threshold * 0.1 )
306327 psd_maximum_stabilized = 10 ** (psd_maximum * 0.1 )
307328
308- # get loss gradients (TensorFlow)
309- feed_dict = {
310- self ._delta : perturbation_padded ,
311- self ._power_spectral_density_maximum_tf : psd_maximum_stabilized ,
312- self ._masking_threshold_tf : masking_threshold_stabilized ,
313- }
314- gradients_padded , loss = self .estimator ._sess .run (self ._loss_gradient_masking_threshold_op_tf , feed_dict )
329+ if self ._framework == "tensorflow" :
330+ # get loss gradients (TensorFlow)
331+ feed_dict = {
332+ self ._delta : perturbation_padded ,
333+ self ._power_spectral_density_maximum_tf : psd_maximum_stabilized ,
334+ self ._masking_threshold_tf : masking_threshold_stabilized ,
335+ }
336+ gradients_padded , loss = self .estimator ._sess .run (self ._loss_gradient_masking_threshold_op_tf , feed_dict )
337+ elif self ._framework == "pytorch" :
338+ # get loss gradients (TensorFlow)
339+ gradients_padded , loss = self ._loss_gradient_masking_threshold_torch (
340+ perturbation_padded , psd_maximum_stabilized , masking_threshold_stabilized
341+ )
342+ else :
343+ raise NotImplementedError
315344
316345 # undo padding, i.e. change gradients shape from (nb_samples, max_length) to (nb_samples)
317346 lengths = delta_mask .sum (axis = 1 )
@@ -320,11 +349,11 @@ def _loss_gradient_masking_threshold(
320349 gradient = gradient_padded [:length ]
321350 gradients .append (gradient )
322351
323- return np .array (gradients ), loss
352+ return np .array (gradients , dtype = object ), loss
324353
325354 def _loss_gradient_masking_threshold_tf (
326355 self , perturbation : "Tensor" , psd_maximum_stabilized : "Tensor" , masking_threshold_stabilized : "Tensor"
327- ) -> "Tensor" :
356+ ) -> Union [ "Tensor" , "Tensor" ] :
328357 """
329358 Compute loss gradient of the masking threshold loss in TensorFlow.
330359
@@ -351,6 +380,41 @@ def _loss_gradient_masking_threshold_tf(
351380 loss_gradient = tf1 .gradients (loss , [perturbation ])[0 ]
352381 return loss_gradient , loss
353382
383+ def _loss_gradient_masking_threshold_torch (
384+ self , perturbation : np .ndarray , psd_maximum_stabilized : np .ndarray , masking_threshold_stabilized : np .ndarray
385+ ) -> Union [np .ndarray , np .ndarray ]:
386+ """
387+ Compute loss gradient of the masking threshold loss in PyTorch.
388+
389+ See also `ImperceptibleASR._loss_gradient_masking_threshold_tf`.
390+ """
391+ import torch
392+
393+ # define tensors
394+ perturbation_torch = torch .from_numpy (perturbation ).to (self .estimator ._device )
395+ masking_threshold_stabilized_torch = torch .from_numpy (masking_threshold_stabilized ).to (self .estimator ._device )
396+ psd_maximum_stabilized_torch = torch .from_numpy (psd_maximum_stabilized ).to (self .estimator ._device )
397+
398+ # track gradient of perturbation
399+ perturbation_torch .requires_grad = True
400+
401+ # calculate approximate power spectral density
402+ psd_perturbation = self ._approximate_power_spectral_density_torch (
403+ perturbation_torch , psd_maximum_stabilized_torch
404+ )
405+
406+ # calculate hinge loss
407+ loss = torch .mean (
408+ torch .nn .functional .relu (psd_perturbation - masking_threshold_stabilized_torch ), dim = (1 , 2 ), keepdims = False
409+ )
410+
411+ # compute loss gradient
412+ loss .sum ().backward ()
413+ loss_gradient = perturbation_torch .grad .cpu ().numpy ()
414+ loss_value = loss .detach ().cpu ().numpy ()
415+
416+ return loss_gradient , loss_value
417+
354418 def _approximate_power_spectral_density_tf (
355419 self , perturbation : "Tensor" , psd_maximum_stabilized : "Tensor"
356420 ) -> "Tensor" :
@@ -381,13 +445,37 @@ def _approximate_power_spectral_density_tf(
381445 # return PSD matrix such that shape is (batch_size, window_size // 2 + 1, frame_length)
382446 return tf1 .transpose (psd_matrix_approximated , [0 , 2 , 1 ])
383447
384- def _approximate_power_spectral_density_torch (self ):
385- """Approximate the power spectral density for a perturbation `perturbation` in PyTorch."""
386- raise NotImplementedError
448+ def _approximate_power_spectral_density_torch (
449+ self , perturbation : "PTensor" , psd_maximum_stabilized : "PTensor"
450+ ) -> "PTensor" :
451+ """
452+ Approximate the power spectral density for a perturbation `perturbation` in PyTorch.
453+
454+ See also `ImperceptibleASR._approximate_power_spectral_density_tf`.
455+ """
456+ import torch
457+
458+ # compute short-time Fourier transform (STFT)
459+ stft_matrix = torch .stft (
460+ perturbation ,
461+ n_fft = self ._window_size ,
462+ hop_length = self ._hop_size ,
463+ win_length = self ._window_size ,
464+ center = False ,
465+ window = torch .hann_window (self ._window_size ).to (self .estimator ._device ),
466+ ).to (self .estimator ._device )
467+ stft_matrix_abs = torch .sqrt (torch .sum (torch .square (stft_matrix ), - 1 ))
387468
388- def _loss_gradient_masking_threshold_torch (self ):
389- """Compute loss gradient of the masking threshold loss in PyTorch."""
390- raise NotImplementedError
469+ # compute power spectral density (PSD)
470+ # note: fixes implementation of Qin et al. by also considering the square root of gain_factor
471+ gain_factor = 8.0 / 3.0
472+ psd_matrix = gain_factor * torch .square (stft_matrix_abs / self ._window_size )
473+
474+ # approximate normalized psd: psd_matrix_approximated = 10^((96.0 - psd_matrix_max + psd_matrix)/10)
475+ psd_matrix_approximated = pow (10.0 , 9.6 ) / torch .unsqueeze (psd_maximum_stabilized , 1 ) * psd_matrix
476+
477+ # return PSD matrix such that shape is (batch_size, window_size // 2 + 1, frame_length)
478+ return psd_matrix_approximated
391479
392480 def _check_params (self ) -> None :
393481 """
@@ -408,8 +496,8 @@ def _check_params(self) -> None:
408496
409497 if not isinstance (self .max_iter_2 , int ):
410498 raise ValueError ("The maximum number of iterations for stage 2 must be of type int." )
411- if self .max_iter_2 <= 0 :
412- raise ValueError ("The maximum number of iterations for stage 2 must be greater than 0 ." )
499+ if self .max_iter_2 < 0 :
500+ raise ValueError ("The maximum number of iterations for stage 2 must be non-negative ." )
413501
414502 if not isinstance (self .learning_rate_1 , float ):
415503 raise ValueError ("The learning rate for stage 1 must be of type float." )
0 commit comments