-
Notifications
You must be signed in to change notification settings - Fork 6.6k
Add FSDP option for Flux2 #12860
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Add FSDP option for Flux2 #12860
Changes from 1 commit
c766e27
0052b21
647c66a
f931ec3
8bce38c
6cfac46
af339de
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -6,10 +6,15 @@ | |
| import re | ||
| import warnings | ||
| from contextlib import contextmanager | ||
| from typing import Any, Dict, Iterable, List, Optional, Tuple, Union | ||
| from typing import Any, Dict, Iterable, List, Optional, Tuple, Union, Set, Type | ||
|
|
||
| import numpy as np | ||
| import torch | ||
| import torch.distributed as dist | ||
| from torch.distributed.fsdp import CPUOffload, ShardingStrategy | ||
| from torch.distributed.fsdp import FullyShardedDataParallel as FSDP | ||
| from torch.distributed.fsdp.wrap import transformer_auto_wrap_policy | ||
|
||
| from functools import partial | ||
|
|
||
| from .models import UNet2DConditionModel | ||
| from .pipelines import DiffusionPipeline | ||
|
|
@@ -394,6 +399,78 @@ def find_nearest_bucket(h, w, bucket_options): | |
| return best_bucket_idx | ||
|
|
||
|
|
||
| def get_fsdp_kwargs_from_accelerator(accelerator) -> dict: | ||
| """ | ||
| Extract and convert FSDP config from Accelerator into PyTorch FSDP kwargs. | ||
| """ | ||
|
|
||
| kwargs = {} | ||
| fsdp_plugin = accelerator.state.fsdp_plugin | ||
sayakpaul marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| if fsdp_plugin is None: | ||
| # FSDP not enabled in Accelerator | ||
| kwargs["sharding_strategy"] = ShardingStrategy.FULL_SHARD | ||
| else: | ||
| # FSDP is enabled → use plugin's strategy, or default if None | ||
| kwargs["sharding_strategy"] = ( | ||
| fsdp_plugin.sharding_strategy or ShardingStrategy.FULL_SHARD | ||
| ) | ||
|
|
||
| return kwargs | ||
|
|
||
|
|
||
| def wrap_with_fsdp( | ||
| model: torch.nn.Module, | ||
| device: Union[str, torch.device], | ||
| offload: bool = True, | ||
| use_orig_params: bool = True, | ||
| limit_all_gathers: bool = True, | ||
| fsdp_kwargs: Optional[Dict[str, Any]] = None, | ||
| transformer_layer_cls: Optional[Set[Type[torch.nn.Module]]] = None, | ||
| ) -> FSDP: | ||
| """ | ||
| Wrap a model with FSDP using common defaults and optional transformer auto-wrapping. | ||
| Args: | ||
| model: Model to wrap | ||
| device: Target device (e.g., accelerator.device) | ||
| offload: Whether to enable CPU parameter offloading | ||
| use_orig_params: Whether to use original parameters | ||
| limit_all_gathers: Whether to limit all gathers | ||
| fsdp_kwargs: FSDP arguments (sharding_strategy, etc.) — usually from Accelerate config | ||
| transformer_layer_cls: Classes for auto-wrapping (if not using policy from fsdp_kwargs) | ||
| Returns: | ||
| FSDP-wrapped model | ||
| """ | ||
|
|
||
| if transformer_layer_cls is None: | ||
| # Set the default layers if transformer_layer_cls is not provided | ||
| transformer_layer_cls = type(model.model.language_model.layers[0]) | ||
sayakpaul marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| # Add auto-wrap policy if transformer layers specified | ||
| auto_wrap_policy = partial( | ||
| transformer_auto_wrap_policy, | ||
| transformer_layer_cls={transformer_layer_cls}, | ||
| ) | ||
|
|
||
| config = { | ||
| "device_id": device, | ||
| "cpu_offload": CPUOffload(offload_params=offload) if offload else None, | ||
| "use_orig_params": use_orig_params, | ||
| "limit_all_gathers": limit_all_gathers, | ||
| "auto_wrap_policy": auto_wrap_policy | ||
| } | ||
|
|
||
| if fsdp_kwargs: | ||
| config.update(fsdp_kwargs) | ||
|
|
||
| fsdp_model = FSDP(model, **config) | ||
| if dist.is_initialized(): | ||
| dist.barrier() | ||
|
||
| return fsdp_model | ||
|
|
||
|
|
||
| # Adapted from torch-ema https://github.com/fadel/pytorch_ema/blob/master/torch_ema/ema.py#L14 | ||
| class EMAModel: | ||
| """ | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be guarded as well.