Skip to content

Commit 146bbaa

Browse files
committed
add cb
1 parent dd2912a commit 146bbaa

File tree

3 files changed

+431
-0
lines changed

3 files changed

+431
-0
lines changed

src/lightning/pytorch/callbacks/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from lightning.pytorch.callbacks.callback import Callback
1616
from lightning.pytorch.callbacks.checkpoint import Checkpoint
1717
from lightning.pytorch.callbacks.device_stats_monitor import DeviceStatsMonitor
18+
from lightning.pytorch.callbacks.differential_privacy import DifferentialPrivacy
1819
from lightning.pytorch.callbacks.early_stopping import EarlyStopping
1920
from lightning.pytorch.callbacks.finetuning import BackboneFinetuning, BaseFinetuning
2021
from lightning.pytorch.callbacks.gradient_accumulation_scheduler import GradientAccumulationScheduler
@@ -58,4 +59,5 @@
5859
"ThroughputMonitor",
5960
"Timer",
6061
"TQDMProgressBar",
62+
"DifferentialPrivacy",
6163
]
Lines changed: 325 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,325 @@
1+
__all__ = ["DifferentialPrivacy"]
2+
3+
import typing as ty
4+
from copy import deepcopy
5+
6+
import torch
7+
from torch.optim.optimizer import Optimizer
8+
from torch.utils.data import DataLoader
9+
10+
try:
11+
from opacus import GradSampleModule
12+
from opacus.accountants import RDPAccountant
13+
from opacus.accountants.utils import get_noise_multiplier
14+
from opacus.data_loader import DPDataLoader
15+
from opacus.layers.dp_rnn import DPGRUCell
16+
from opacus.optimizers import DPOptimizer as OpacusDPOptimizer
17+
except ImportError as ex:
18+
raise ImportError("Opacus is not installed. Please install it with `pip install opacus`.") from ex
19+
20+
import pytorch_lightning as pl
21+
22+
23+
def replace_grucell(module: torch.nn.Module) -> None:
24+
"""Replaces GRUCell modules with DP-counterparts."""
25+
for name, child in module.named_children():
26+
if isinstance(child, torch.nn.GRUCell) and not isinstance(child, DPGRUCell):
27+
replacement = copy_gru(child)
28+
setattr(module, name, replacement)
29+
for name, child in module.named_children():
30+
replace_grucell(child)
31+
32+
33+
def copy_gru(grucell: torch.nn.GRUCell) -> DPGRUCell:
34+
"""Creates a DP-GRUCell from a non-DP one."""
35+
input_size: int = grucell.input_size
36+
hidden_size: int = grucell.hidden_size
37+
bias: bool = grucell.bias
38+
dpgrucell = DPGRUCell(input_size, hidden_size, bias)
39+
for name, param in grucell.named_parameters():
40+
if "ih" in name:
41+
_set_layer_param(dpgrucell, name, param, "ih")
42+
elif "hh" in name:
43+
_set_layer_param(dpgrucell, name, param, "hh")
44+
else:
45+
raise AttributeError(f"Unknown parameter {name}")
46+
return dpgrucell
47+
48+
49+
def _set_layer_param(
50+
dpgrucell: DPGRUCell,
51+
name: str,
52+
param: torch.Tensor,
53+
layer_name: str,
54+
) -> None:
55+
"""Helper"""
56+
layer = getattr(dpgrucell, layer_name)
57+
if "weight" in name:
58+
layer.weight = torch.nn.Parameter(deepcopy(param))
59+
elif "bias" in name:
60+
layer.bias = torch.nn.Parameter(deepcopy(param))
61+
else:
62+
raise AttributeError(f"Unknown parameter {name}")
63+
setattr(dpgrucell, layer_name, layer)
64+
65+
66+
def params(
67+
optimizer: torch.optim.Optimizer,
68+
accepted_names: list[str] = None,
69+
) -> list[torch.nn.Parameter]:
70+
"""
71+
Return all parameters controlled by the optimizer
72+
Args:
73+
accepted_names (list[str]):
74+
List of parameter group names you want to apply DP to.
75+
This allows you to choose to apply DP only to specific parameter groups.
76+
Of course, this will work only if the optimizer has named parameter groups.
77+
If it doesn't, then this argument will be ignored and DP will be applied to all parameter groups.
78+
Returns:
79+
(list[torch.nn.Parameter]): Flat list of parameters from all `param_groups`
80+
"""
81+
# lower case
82+
if accepted_names is not None:
83+
accepted_names = [name.lower() for name in accepted_names]
84+
# unwrap parameters from the param_groups into a flat list
85+
ret = []
86+
for param_group in optimizer.param_groups:
87+
if accepted_names is not None and "name" in param_group:
88+
name: str = param_group["name"].lower()
89+
if name.lower() in accepted_names:
90+
ret += [p for p in param_group["params"] if p.requires_grad]
91+
else:
92+
ret += [p for p in param_group["params"] if p.requires_grad]
93+
return ret
94+
95+
96+
class DPOptimizer(OpacusDPOptimizer):
97+
"""Brainiac-2's DP-Optimizer"""
98+
99+
def __init__(
100+
self,
101+
*args: ty.Any,
102+
param_group_names: list[str] = None,
103+
**kwargs: ty.Any,
104+
) -> None:
105+
"""Constructor."""
106+
self.param_group_names = param_group_names
107+
super().__init__(*args, **kwargs)
108+
109+
@property
110+
def params(self) -> list[torch.nn.Parameter]:
111+
"""
112+
Returns a flat list of ``nn.Parameter`` managed by the optimizer
113+
"""
114+
return params(self, self.param_group_names)
115+
116+
117+
class DifferentialPrivacy(pl.callbacks.EarlyStopping):
118+
"""Enables differential privacy using Opacus.
119+
Converts optimizers to instances of the :class:`~opacus.optimizers.DPOptimizer` class.
120+
This callback inherits from `EarlyStopping`, thus it is also able to stop the
121+
training when enough privacy budget has been spent.
122+
Please beware that Opacus does not support multi-optimizer training.
123+
For more info, check the following links:
124+
* https://opacus.ai/tutorials/
125+
* https://blog.openmined.org/differentially-private-deep-learning-using-opacus-in-20-lines-of-code/
126+
"""
127+
128+
def __init__(
129+
self,
130+
budget: float = 1.0,
131+
noise_multiplier: float = 1.0,
132+
max_grad_norm: float = 1.0,
133+
delta: float = None,
134+
use_target_values: bool = False,
135+
idx: ty.Sequence[int] = None,
136+
log_spent_budget_as: str = "DP/spent-budget",
137+
param_group_names: list[str] = None,
138+
private_dataloader: bool = False,
139+
default_alphas: ty.Sequence[ty.Union[float, int]] = None,
140+
**gsm_kwargs: ty.Any,
141+
) -> None:
142+
"""Enables differential privacy using Opacus.
143+
Converts optimizers to instances of the :class:`~opacus.optimizers.DPOptimizer` class.
144+
This callback inherits from `EarlyStopping`,
145+
thus it is also able to stop the training when enough privacy budget has been spent.
146+
Args:
147+
budget (float, optional): Defaults to 1.0.
148+
Maximun privacy budget to spend.
149+
noise_multiplier (float, optional): Defaults to 1.0.
150+
Noise multiplier.
151+
max_grad_norm (float, optional): Defaults to 1.0.
152+
Max grad norm used for gradient clipping.
153+
delta (float, optional): Defaults to None.
154+
The target δ of the (ϵ,δ)-differential privacy guarantee.
155+
Generally, it should be set to be less than the inverse of the size of the training dataset.
156+
If `None`, this will be set to the inverse of the size of the training dataset `N`: `1/N`.
157+
use_target_values (bool, optional):
158+
Whether to call `privacy_engine.make_private_with_epsilon()` or `privacy_engine.make_private`.
159+
If `True`, the value of `noise_multiplier` will be calibrated automatically so that the desired privacy
160+
budget will be reached only at the end of the training.
161+
idx (ty.Sequence[int]):
162+
List of optimizer ID's to make private. Useful when a model may have more than one optimizer.
163+
By default, all optimizers are made private.
164+
log_spent_budget_as (str, optional):
165+
How to log and expose the spent budget value.
166+
Although this callback already allows you to stop the training when
167+
enough privacy budget has been spent (see argument `stop_on_budget`),
168+
this keyword argument can be used in combination with an `EarlyStopping`
169+
callback, so that the latter may use this value to stop the training when enough budget has been spent.
170+
param_group_names (list[str]):
171+
List of parameter group names you want to apply DP to. This allows you
172+
to choose to apply DP only to specific parameter groups. Of course, this
173+
will work only if the optimizer has named parameter groups. If it
174+
doesn't, then this argument will be ignored and DP will be applied to
175+
all parameter groups.
176+
private_dataloader (bool, optional):
177+
Whether to make the dataloader private. Defaults to False.
178+
**gsm_kwargs:
179+
Input arguments for the :class:`~opacus.GradSampleModule` class.
180+
"""
181+
# inputs
182+
self.budget = budget
183+
self.delta = delta
184+
self.noise_multiplier = noise_multiplier
185+
self.max_grad_norm = max_grad_norm
186+
self.use_target_values = use_target_values
187+
self.log_spent_budget_as = log_spent_budget_as
188+
self.param_group_names = param_group_names
189+
self.private_dataloader = private_dataloader
190+
self.gsm_kwargs = gsm_kwargs
191+
if default_alphas is None:
192+
self.default_alphas = RDPAccountant.DEFAULT_ALPHAS + list(range(64, 150))
193+
else:
194+
self.default_alphas = default_alphas
195+
# init early stopping callback
196+
super().__init__(
197+
monitor=self.log_spent_budget_as,
198+
mode="max",
199+
stopping_threshold=self.budget,
200+
check_on_train_epoch_end=True,
201+
# we do not want to stop if spent budget does not increase. this may even be desirable
202+
min_delta=0,
203+
patience=1000000,
204+
)
205+
# attributes
206+
self.epsilon: float = 0.0
207+
self.best_alpha: float = 0.0
208+
self.accountant = RDPAccountant()
209+
self.idx = idx # optims to privatize
210+
211+
def setup(
212+
self,
213+
trainer: pl.Trainer,
214+
pl_module: pl.LightningModule,
215+
stage: str = None,
216+
) -> None:
217+
"""Call the GradSampleModule() wrapper to add attributes to pl_module."""
218+
if stage == "fit":
219+
replace_grucell(pl_module)
220+
try:
221+
pl_module = GradSampleModule(pl_module, **self.gsm_kwargs)
222+
except ImportError as ex:
223+
raise ImportError(f"{ex}. This may be due to a mismatch between Opacus and PyTorch version.") from ex
224+
225+
def on_train_epoch_start(
226+
self,
227+
trainer: pl.Trainer,
228+
pl_module: pl.LightningModule,
229+
) -> None:
230+
"""Called when the training epoch begins. Use this to make optimizers private."""
231+
# idx
232+
if self.idx is None:
233+
self.idx = range(len(trainer.optimizers))
234+
235+
# Replace current dataloaders with private counterparts
236+
expected_batch_size = 1
237+
cl = trainer.fit_loop._combined_loader
238+
if cl is not None:
239+
dp_dls: list[DPDataLoader] = []
240+
for i, dl in enumerate(cl.flattened):
241+
if isinstance(dl, DataLoader):
242+
sample_rate: float = 1 / len(dl)
243+
dataset_size: int = len(dl.dataset) # type: ignore
244+
expected_batch_size = int(dataset_size * sample_rate)
245+
if self.private_dataloader:
246+
dp_dl = DPDataLoader.from_data_loader(dl, distributed=False)
247+
dp_dls.append(dp_dl)
248+
# it also allows you to easily replace the dataloaders
249+
if self.private_dataloader:
250+
cl.flattened = dp_dls
251+
252+
# Delta
253+
if self.delta is None:
254+
self.delta = 1 / dataset_size
255+
256+
# Make optimizers private
257+
optimizers: list[Optimizer] = []
258+
dp_optimizer: ty.Union[Optimizer, DPOptimizer]
259+
for i, optimizer in enumerate(trainer.optimizers):
260+
if not isinstance(optimizer, DPOptimizer) and i in self.idx:
261+
if self.use_target_values:
262+
self.noise_multiplier = get_noise_multiplier(
263+
target_epsilon=self.budget / 2,
264+
target_delta=self.delta,
265+
sample_rate=sample_rate,
266+
epochs=trainer.max_epochs,
267+
accountant="rdp",
268+
)
269+
dp_optimizer = DPOptimizer(
270+
optimizer=optimizer,
271+
noise_multiplier=self.noise_multiplier,
272+
max_grad_norm=self.max_grad_norm,
273+
expected_batch_size=expected_batch_size,
274+
param_group_names=self.param_group_names,
275+
)
276+
dp_optimizer.attach_step_hook(self.accountant.get_optimizer_hook_fn(sample_rate=sample_rate))
277+
else:
278+
dp_optimizer = optimizer
279+
optimizers.append(dp_optimizer)
280+
# Replace optimizers
281+
trainer.optimizers = optimizers
282+
283+
def on_train_batch_end( # pylint: disable=unused-argument # type: ignore
284+
self,
285+
trainer: pl.Trainer,
286+
pl_module: pl.LightningModule,
287+
outputs: ty.Any,
288+
batch: ty.Any,
289+
batch_idx: int,
290+
*args: ty.Any,
291+
) -> None:
292+
"""Called after the batched has been digested. Use this to understand whether to stop or not."""
293+
self._log_and_stop_criterion(trainer, pl_module)
294+
295+
def on_train_epoch_end(
296+
self,
297+
trainer: pl.Trainer,
298+
pl_module: pl.LightningModule,
299+
) -> None:
300+
"""Run at the end of the training epoch."""
301+
302+
def get_privacy_spent(self) -> tuple[float, float]:
303+
"""Estimate spent budget."""
304+
# get privacy budget spent so far
305+
epsilon, best_alpha = self.accountant.get_privacy_spent(
306+
delta=self.delta,
307+
alphas=self.default_alphas,
308+
)
309+
return float(epsilon), float(best_alpha)
310+
311+
def _log_and_stop_criterion(
312+
self,
313+
trainer: pl.Trainer,
314+
pl_module: pl.LightningModule,
315+
) -> None:
316+
"""Logging privacy spent: (epsilon, delta) and stopping if necessary."""
317+
self.epsilon, self.best_alpha = self.get_privacy_spent()
318+
pl_module.log(
319+
self.log_spent_budget_as,
320+
self.epsilon,
321+
on_epoch=True,
322+
prog_bar=True,
323+
)
324+
if self.epsilon > self.budget:
325+
trainer.should_stop = True

0 commit comments

Comments
 (0)