|
| 1 | +import logging |
| 2 | +from typing import Union, Any, Optional, Callable, List |
| 3 | +from typing_extensions import Literal |
| 4 | + |
| 5 | +import math |
| 6 | + |
| 7 | +import eagerpy as ep |
| 8 | +import numpy as np |
| 9 | + |
| 10 | +from foolbox.attacks import LinearSearchBlendedUniformNoiseAttack |
| 11 | +from foolbox.tensorboard import TensorBoard |
| 12 | +from ..models import Model |
| 13 | + |
| 14 | +from ..criteria import Criterion |
| 15 | + |
| 16 | +from ..distances import l1 |
| 17 | + |
| 18 | +from ..devutils import atleast_kd, flatten |
| 19 | + |
| 20 | +from .base import MinimizationAttack, get_is_adversarial |
| 21 | +from .base import get_criterion |
| 22 | +from .base import T |
| 23 | +from .base import raise_if_kwargs |
| 24 | +from ..distances import l2, linf |
| 25 | + |
| 26 | + |
| 27 | +class HopSkipJump(MinimizationAttack): |
| 28 | + """A powerful adversarial attack that requires neither gradients |
| 29 | + nor probabilities [#Chen19]. |
| 30 | +
|
| 31 | + Args: |
| 32 | + init_attack : Attack to use to find a starting points. Defaults to |
| 33 | + LinearSearchBlendedUniformNoiseAttack. Only used if starting_points is None. |
| 34 | + steps : Number of optimization steps within each binary search step. |
| 35 | + initial_gradient_eval_steps: Initial number of evaluations for gradient estimation. |
| 36 | + Larger initial_num_evals increases time efficiency, but |
| 37 | + may decrease query efficiency. |
| 38 | + max_gradient_eval_steps : Maximum number of evaluations for gradient estimation. |
| 39 | + stepsize_search : How to search for stepsize; choices are 'geometric_progression', |
| 40 | + 'grid_search'. 'geometric progression' initializes the stepsize |
| 41 | + by ||x_t - x||_p / sqrt(iteration), and keep decreasing by half |
| 42 | + until reaching the target side of the boundary. 'grid_search' |
| 43 | + chooses the optimal epsilon over a grid, in the scale of |
| 44 | + ||x_t - x||_p. |
| 45 | + gamma : The binary search threshold theta is gamma / d^1.5 for |
| 46 | + l2 attack and gamma / d^2 for linf attack. |
| 47 | + tensorboard : The log directory for TensorBoard summaries. If False, TensorBoard |
| 48 | + summaries will be disabled (default). If None, the logdir will be |
| 49 | + runs/CURRENT_DATETIME_HOSTNAME. |
| 50 | + constraint : Norm to minimize, either "l2" or "linf" |
| 51 | +
|
| 52 | + References: |
| 53 | + .. [#Chen19] Jianbo Chen, Michael I. Jordan, Martin J. Wainwright, |
| 54 | + "HopSkipJumpAttack: A Query-Efficient Decision-Based Attack", |
| 55 | + https://arxiv.org/abs/1904.02144 |
| 56 | + """ |
| 57 | + |
| 58 | + distance = l1 |
| 59 | + |
| 60 | + def __init__( |
| 61 | + self, |
| 62 | + init_attack: Optional[MinimizationAttack] = None, |
| 63 | + steps: int = 64, |
| 64 | + initial_gradient_eval_steps: int = 100, |
| 65 | + max_gradient_eval_steps: int = 10000, |
| 66 | + stepsize_search: Union[ |
| 67 | + Literal["geometric_progression"], Literal["grid_search"] |
| 68 | + ] = "geometric_progression", |
| 69 | + gamma: float = 1.0, |
| 70 | + tensorboard: Union[Literal[False], None, str] = False, |
| 71 | + constraint: Union[Literal["linf"], Literal["l2"]] = "l2", |
| 72 | + ): |
| 73 | + if init_attack is not None and not isinstance(init_attack, MinimizationAttack): |
| 74 | + raise NotImplementedError |
| 75 | + self.init_attack = init_attack |
| 76 | + self.steps = steps |
| 77 | + self.initial_num_evals = initial_gradient_eval_steps |
| 78 | + self.max_num_evals = max_gradient_eval_steps |
| 79 | + self.stepsize_search = stepsize_search |
| 80 | + self.gamma = gamma |
| 81 | + self.tensorboard = tensorboard |
| 82 | + self.constraint = constraint |
| 83 | + |
| 84 | + assert constraint in ("l2", "linf") |
| 85 | + if constraint == "l2": |
| 86 | + self.distance = l2 |
| 87 | + else: |
| 88 | + self.distance = linf |
| 89 | + |
| 90 | + def run( |
| 91 | + self, |
| 92 | + model: Model, |
| 93 | + inputs: T, |
| 94 | + criterion: Union[Criterion, T], |
| 95 | + *, |
| 96 | + early_stop: Optional[float] = None, |
| 97 | + starting_points: Optional[T] = None, |
| 98 | + **kwargs: Any, |
| 99 | + ) -> T: |
| 100 | + raise_if_kwargs(kwargs) |
| 101 | + originals, restore_type = ep.astensor_(inputs) |
| 102 | + del inputs, kwargs |
| 103 | + |
| 104 | + criterion = get_criterion(criterion) |
| 105 | + is_adversarial = get_is_adversarial(criterion, model) |
| 106 | + |
| 107 | + if starting_points is None: |
| 108 | + init_attack: MinimizationAttack |
| 109 | + if self.init_attack is None: |
| 110 | + init_attack = LinearSearchBlendedUniformNoiseAttack(steps=50) |
| 111 | + logging.info( |
| 112 | + f"Neither starting_points nor init_attack given. Falling" |
| 113 | + f" back to {init_attack!r} for initialization." |
| 114 | + ) |
| 115 | + else: |
| 116 | + init_attack = self.init_attack |
| 117 | + # TODO: use call and support all types of attacks (once early_stop is |
| 118 | + # possible in __call__) |
| 119 | + x_advs = init_attack.run(model, originals, criterion, early_stop=early_stop) |
| 120 | + else: |
| 121 | + x_advs = ep.astensor(starting_points) |
| 122 | + |
| 123 | + is_adv = is_adversarial(x_advs) |
| 124 | + if not is_adv.all(): |
| 125 | + failed = is_adv.logical_not().float32().sum() |
| 126 | + if starting_points is None: |
| 127 | + raise ValueError( |
| 128 | + f"init_attack failed for {failed} of {len(is_adv)} inputs" |
| 129 | + ) |
| 130 | + else: |
| 131 | + raise ValueError( |
| 132 | + f"{failed} of {len(is_adv)} starting_points are not adversarial" |
| 133 | + ) |
| 134 | + del starting_points |
| 135 | + |
| 136 | + tb = TensorBoard(logdir=self.tensorboard) |
| 137 | + |
| 138 | + # Project the initialization to the boundary. |
| 139 | + x_advs = self._binary_search(is_adversarial, originals, x_advs) |
| 140 | + |
| 141 | + assert ep.all(is_adversarial(x_advs)) |
| 142 | + |
| 143 | + distances = self.distance(originals, x_advs) |
| 144 | + |
| 145 | + for step in range(self.steps): |
| 146 | + delta = self.select_delta(originals, distances, step) |
| 147 | + |
| 148 | + # Choose number of gradient estimation steps. |
| 149 | + num_gradient_estimation_steps = int( |
| 150 | + min([self.initial_num_evals * math.sqrt(step + 1), self.max_num_evals]) |
| 151 | + ) |
| 152 | + |
| 153 | + gradients = self.approximate_gradients( |
| 154 | + is_adversarial, x_advs, num_gradient_estimation_steps, delta |
| 155 | + ) |
| 156 | + |
| 157 | + if self.constraint == "linf": |
| 158 | + update = ep.sign(gradients) |
| 159 | + else: |
| 160 | + update = gradients |
| 161 | + |
| 162 | + if self.stepsize_search == "geometric_progression": |
| 163 | + # find step size. |
| 164 | + epsilons = distances / math.sqrt(step + 1) |
| 165 | + |
| 166 | + while True: |
| 167 | + x_advs_proposals = ep.clip( |
| 168 | + x_advs + atleast_kd(epsilons, x_advs.ndim) * update, 0, 1 |
| 169 | + ) |
| 170 | + success = is_adversarial(x_advs_proposals) |
| 171 | + epsilons = ep.where(success, epsilons, epsilons / 2.0) |
| 172 | + |
| 173 | + if ep.all(success): |
| 174 | + break |
| 175 | + |
| 176 | + # Update the sample. |
| 177 | + x_advs = ep.clip( |
| 178 | + x_advs + atleast_kd(epsilons, update.ndim) * update, 0, 1 |
| 179 | + ) |
| 180 | + |
| 181 | + assert ep.all(is_adversarial(x_advs)) |
| 182 | + |
| 183 | + # Binary search to return to the boundary. |
| 184 | + x_advs = self._binary_search(is_adversarial, originals, x_advs) |
| 185 | + |
| 186 | + assert ep.all(is_adversarial(x_advs)) |
| 187 | + |
| 188 | + elif self.stepsize_search == "grid_search": |
| 189 | + # Grid search for stepsize. |
| 190 | + epsilons_grid = ep.expand_dims( |
| 191 | + ep.from_numpy( |
| 192 | + distances, |
| 193 | + np.logspace(-4, 0, num=20, endpoint=True, dtype=np.float32), |
| 194 | + ), |
| 195 | + 1, |
| 196 | + ) * ep.expand_dims(distances, 0) |
| 197 | + |
| 198 | + proposals_list = [] |
| 199 | + |
| 200 | + for epsilons in epsilons_grid: |
| 201 | + x_advs_proposals = ( |
| 202 | + x_advs + atleast_kd(epsilons, update.ndim) * update |
| 203 | + ) |
| 204 | + x_advs_proposals = ep.clip(x_advs_proposals, 0, 1) |
| 205 | + |
| 206 | + mask = is_adversarial(x_advs_proposals) |
| 207 | + |
| 208 | + x_advs_proposals = self._binary_search( |
| 209 | + is_adversarial, originals, x_advs_proposals |
| 210 | + ) |
| 211 | + |
| 212 | + # only use new values where initial guess was already adversarial |
| 213 | + x_advs_proposals = ep.where( |
| 214 | + atleast_kd(mask, x_advs.ndim), x_advs_proposals, x_advs |
| 215 | + ) |
| 216 | + |
| 217 | + proposals_list.append(x_advs_proposals) |
| 218 | + |
| 219 | + proposals = ep.stack(proposals_list, 0) |
| 220 | + proposals_distances = self.distance( |
| 221 | + ep.expand_dims(originals, 0), proposals |
| 222 | + ) |
| 223 | + minimal_idx = ep.argmin(proposals_distances, 0) |
| 224 | + |
| 225 | + x_advs = proposals[minimal_idx] |
| 226 | + |
| 227 | + distances = self.distance(originals, x_advs) |
| 228 | + |
| 229 | + # log stats |
| 230 | + tb.histogram("norms", distances, step) |
| 231 | + |
| 232 | + return restore_type(x_advs) |
| 233 | + |
| 234 | + def approximate_gradients( |
| 235 | + self, |
| 236 | + is_adversarial: Callable[[ep.Tensor], ep.Tensor], |
| 237 | + x_advs: ep.Tensor, |
| 238 | + steps: int, |
| 239 | + delta: ep.Tensor, |
| 240 | + ) -> ep.Tensor: |
| 241 | + # (steps, bs, ...) |
| 242 | + noise_shape = tuple([steps] + list(x_advs.shape)) |
| 243 | + if self.constraint == "l2": |
| 244 | + rv = ep.normal(x_advs, noise_shape) |
| 245 | + elif self.constraint == "linf": |
| 246 | + rv = ep.uniform(x_advs, low=-1, high=1, shape=noise_shape) |
| 247 | + rv /= atleast_kd(ep.norms.l2(flatten(rv, keep=1), -1), rv.ndim) + 1e-12 |
| 248 | + |
| 249 | + scaled_rv = atleast_kd(ep.expand_dims(delta, 0), rv.ndim) * rv |
| 250 | + |
| 251 | + perturbed = ep.expand_dims(x_advs, 0) + scaled_rv |
| 252 | + perturbed = ep.clip(perturbed, 0, 1) |
| 253 | + |
| 254 | + rv = (perturbed - x_advs) / 2 |
| 255 | + |
| 256 | + multipliers_list: List[ep.Tensor] = [] |
| 257 | + for step in range(steps): |
| 258 | + decision = is_adversarial(perturbed[step]) |
| 259 | + multipliers_list.append( |
| 260 | + ep.where( |
| 261 | + decision, |
| 262 | + ep.ones(x_advs, (len(x_advs,))), |
| 263 | + -ep.ones(x_advs, (len(decision,))), |
| 264 | + ) |
| 265 | + ) |
| 266 | + # (steps, bs, ...) |
| 267 | + multipliers = ep.stack(multipliers_list, 0) |
| 268 | + |
| 269 | + vals = ep.where( |
| 270 | + ep.abs(ep.mean(multipliers, axis=0, keepdims=True)) == 1, |
| 271 | + multipliers, |
| 272 | + multipliers - ep.mean(multipliers, axis=0, keepdims=True), |
| 273 | + ) |
| 274 | + grad = ep.mean(atleast_kd(vals, rv.ndim) * rv, axis=0) |
| 275 | + |
| 276 | + grad /= ep.norms.l2(atleast_kd(flatten(grad), grad.ndim)) + 1e-12 |
| 277 | + |
| 278 | + return grad |
| 279 | + |
| 280 | + def _project( |
| 281 | + self, originals: ep.Tensor, perturbed: ep.Tensor, epsilons: ep.Tensor |
| 282 | + ) -> ep.Tensor: |
| 283 | + """Clips the perturbations to epsilon and returns the new perturbed |
| 284 | +
|
| 285 | + Args: |
| 286 | + originals: A batch of reference inputs. |
| 287 | + perturbed: A batch of perturbed inputs. |
| 288 | + epsilons: A batch of norm values to project to. |
| 289 | + Returns: |
| 290 | + A tensor like perturbed but with the perturbation clipped to epsilon. |
| 291 | + """ |
| 292 | + epsilons = atleast_kd(epsilons, originals.ndim) |
| 293 | + if self.constraint == "linf": |
| 294 | + perturbation = perturbed - originals |
| 295 | + |
| 296 | + # ep.clip does not support tensors as min/max |
| 297 | + clipped_perturbed = ep.where( |
| 298 | + perturbation > epsilons, originals + epsilons, perturbed |
| 299 | + ) |
| 300 | + clipped_perturbed = ep.where( |
| 301 | + perturbation < -epsilons, originals - epsilons, clipped_perturbed |
| 302 | + ) |
| 303 | + return clipped_perturbed |
| 304 | + else: |
| 305 | + return (1.0 - epsilons) * originals + epsilons * perturbed |
| 306 | + |
| 307 | + def _binary_search( |
| 308 | + self, |
| 309 | + is_adversarial: Callable[[ep.Tensor], ep.Tensor], |
| 310 | + originals: ep.Tensor, |
| 311 | + perturbed: ep.Tensor, |
| 312 | + ) -> ep.Tensor: |
| 313 | + # Choose upper thresholds in binary search based on constraint. |
| 314 | + d = np.prod(perturbed.shape[1:]) |
| 315 | + if self.constraint == "linf": |
| 316 | + highs = linf(originals, perturbed) |
| 317 | + |
| 318 | + # TODO: Check if the threshold is correct |
| 319 | + # empirically this seems to be too low |
| 320 | + thresholds = highs * self.gamma / (d * d) |
| 321 | + else: |
| 322 | + highs = ep.ones(perturbed, len(perturbed)) |
| 323 | + thresholds = self.gamma / (d * math.sqrt(d)) |
| 324 | + |
| 325 | + lows = ep.zeros_like(highs) |
| 326 | + |
| 327 | + # use this variable to check when mids stays constant and the BS has converged |
| 328 | + old_mids = highs |
| 329 | + |
| 330 | + while ep.any(highs - lows > thresholds): |
| 331 | + mids = (lows + highs) / 2 |
| 332 | + mids_perturbed = self._project(originals, perturbed, mids) |
| 333 | + is_adversarial_ = is_adversarial(mids_perturbed) |
| 334 | + |
| 335 | + highs = ep.where(is_adversarial_, mids, highs) |
| 336 | + lows = ep.where(is_adversarial_, lows, mids) |
| 337 | + |
| 338 | + # check of there is no more progress due to numerical imprecision |
| 339 | + reached_numerical_precision = (old_mids == mids).all() |
| 340 | + old_mids = mids |
| 341 | + |
| 342 | + if reached_numerical_precision: |
| 343 | + # TODO: warn user |
| 344 | + break |
| 345 | + |
| 346 | + res = self._project(originals, perturbed, highs) |
| 347 | + |
| 348 | + return res |
| 349 | + |
| 350 | + def select_delta( |
| 351 | + self, originals: ep.Tensor, distances: ep.Tensor, step: int |
| 352 | + ) -> ep.Tensor: |
| 353 | + result: ep.Tensor |
| 354 | + if step == 0: |
| 355 | + result = 0.1 * ep.ones_like(distances) |
| 356 | + else: |
| 357 | + d = np.prod(originals.shape[1:]) |
| 358 | + |
| 359 | + if self.constraint == "linf": |
| 360 | + theta = self.gamma / (d * d) |
| 361 | + result = d * theta * distances |
| 362 | + else: |
| 363 | + theta = self.gamma / (d * np.sqrt(d)) |
| 364 | + result = np.sqrt(d) * theta * distances |
| 365 | + |
| 366 | + return result |
0 commit comments