diff --git a/src/diffusers/pipelines/pipeline_loading_utils.py b/src/diffusers/pipelines/pipeline_loading_utils.py index 5eba1952e608..769bc84216f4 100644 --- a/src/diffusers/pipelines/pipeline_loading_utils.py +++ b/src/diffusers/pipelines/pipeline_loading_utils.py @@ -36,6 +36,7 @@ deprecate, get_class_from_dynamic_module, is_accelerate_available, + is_accelerate_version, is_peft_available, is_transformers_available, logging, @@ -947,3 +948,18 @@ def _get_ignore_patterns( ) return ignore_patterns + + +def model_has_device_map(model): + if not is_accelerate_available() or is_accelerate_version("<", "0.14.0"): + return False + + # Check if the model has a device map that is not exclusively CPU + # `device_map` can only contain CPU when a model has sharded checkpoints. + # See here: https://github.com/huggingface/diffusers/blob/41e4779d988ead99e7acd78dc8e752de88777d0f/src/diffusers/models/modeling_utils.py#L883 + device_map = getattr(model, "hf_device_map", None) + if device_map is not None: + unique_devices = set(device_map.values()) + return len(unique_devices) > 1 or unique_devices != {"cpu"} + + return False diff --git a/tests/pipelines/kandinsky/test_kandinsky_prior.py b/tests/pipelines/kandinsky/test_kandinsky_prior.py index 5f42447bd9d5..7545ec5bb5d3 100644 --- a/tests/pipelines/kandinsky/test_kandinsky_prior.py +++ b/tests/pipelines/kandinsky/test_kandinsky_prior.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import os +import tempfile import unittest import numpy as np @@ -28,11 +30,16 @@ ) from diffusers import KandinskyPriorPipeline, PriorTransformer, UnCLIPScheduler -from diffusers.utils.testing_utils import enable_full_determinism, skip_mps, torch_device +from diffusers.models.modeling_utils import ModelMixin +from diffusers.utils import SAFE_WEIGHTS_INDEX_NAME +from diffusers.utils.testing_utils import enable_full_determinism, is_accelerate_available, skip_mps, torch_device from ..test_pipelines_common import PipelineTesterMixin +if is_accelerate_available(): + from accelerate.utils import compute_module_sizes + enable_full_determinism() @@ -236,3 +243,31 @@ def test_attention_slicing_forward_pass(self): test_max_difference=test_max_difference, test_mean_pixel_difference=test_mean_pixel_difference, ) + + # It needs a different sharding ratio than the standard 0.75. So, we override it. + def test_sharded_components_can_be_device_placed(self): + components = self.get_dummy_components() + + component_selected = None + for component_name in components: + if isinstance(components[component_name], ModelMixin) and hasattr( + components[component_name], "load_config" + ): + component_to_be_sharded = components[component_name] + component_cls = component_to_be_sharded.__class__ + component_selected = component_name + break + + assert component_selected, "No component selected that can be sharded." + + model_size = compute_module_sizes(component_to_be_sharded)[""] + max_shard_size = int((model_size * 0.45) / (2**10)) + + with tempfile.TemporaryDirectory() as tmp_dir: + component_to_be_sharded.cpu().save_pretrained(tmp_dir, max_shard_size=f"{max_shard_size}KB") + self.assertTrue(os.path.exists(os.path.join(tmp_dir, SAFE_WEIGHTS_INDEX_NAME))) + + loaded_sharded_component = component_cls.from_pretrained(tmp_dir) + _ = components.pop(component_selected) + components.update({component_selected: loaded_sharded_component}) + _ = self.pipeline_class(**components).to(torch_device) diff --git a/tests/pipelines/kandinsky2_2/test_kandinsky_prior.py b/tests/pipelines/kandinsky2_2/test_kandinsky_prior.py index be0bc238d4da..55dbb302b274 100644 --- a/tests/pipelines/kandinsky2_2/test_kandinsky_prior.py +++ b/tests/pipelines/kandinsky2_2/test_kandinsky_prior.py @@ -14,6 +14,8 @@ # limitations under the License. import inspect +import os +import tempfile import unittest import numpy as np @@ -29,11 +31,16 @@ ) from diffusers import KandinskyV22PriorPipeline, PriorTransformer, UnCLIPScheduler -from diffusers.utils.testing_utils import enable_full_determinism, skip_mps, torch_device +from diffusers.models.modeling_utils import ModelMixin +from diffusers.utils import SAFE_WEIGHTS_INDEX_NAME +from diffusers.utils.testing_utils import enable_full_determinism, is_accelerate_available, skip_mps, torch_device from ..test_pipelines_common import PipelineTesterMixin +if is_accelerate_available(): + from accelerate.utils import compute_module_sizes + enable_full_determinism() @@ -277,3 +284,31 @@ def callback_inputs_test(pipe, i, t, callback_kwargs): output = pipe(**inputs)[0] assert output.abs().sum() == 0 + + # It needs a different sharding ratio than the standard 0.75. So, we override it. + def test_sharded_components_can_be_device_placed(self): + components = self.get_dummy_components() + + component_selected = None + for component_name in components: + if isinstance(components[component_name], ModelMixin) and hasattr( + components[component_name], "load_config" + ): + component_to_be_sharded = components[component_name] + component_cls = component_to_be_sharded.__class__ + component_selected = component_name + break + + assert component_selected, "No component selected that can be sharded." + + model_size = compute_module_sizes(component_to_be_sharded)[""] + max_shard_size = int((model_size * 0.45) / (2**10)) + + with tempfile.TemporaryDirectory() as tmp_dir: + component_to_be_sharded.cpu().save_pretrained(tmp_dir, max_shard_size=f"{max_shard_size}KB") + self.assertTrue(os.path.exists(os.path.join(tmp_dir, SAFE_WEIGHTS_INDEX_NAME))) + + loaded_sharded_component = component_cls.from_pretrained(tmp_dir) + _ = components.pop(component_selected) + components.update({component_selected: loaded_sharded_component}) + _ = self.pipeline_class(**components).to(torch_device) diff --git a/tests/pipelines/kandinsky2_2/test_kandinsky_prior_emb2emb.py b/tests/pipelines/kandinsky2_2/test_kandinsky_prior_emb2emb.py index e898824e2d17..751a667e19f9 100644 --- a/tests/pipelines/kandinsky2_2/test_kandinsky_prior_emb2emb.py +++ b/tests/pipelines/kandinsky2_2/test_kandinsky_prior_emb2emb.py @@ -13,7 +13,9 @@ # See the License for the specific language governing permissions and # limitations under the License. +import os import random +import tempfile import unittest import numpy as np @@ -30,9 +32,12 @@ ) from diffusers import KandinskyV22PriorEmb2EmbPipeline, PriorTransformer, UnCLIPScheduler +from diffusers.models.modeling_utils import ModelMixin +from diffusers.utils import SAFE_WEIGHTS_INDEX_NAME from diffusers.utils.testing_utils import ( enable_full_determinism, floats_tensor, + is_accelerate_available, skip_mps, torch_device, ) @@ -40,6 +45,9 @@ from ..test_pipelines_common import PipelineTesterMixin +if is_accelerate_available(): + from accelerate.utils import compute_module_sizes + enable_full_determinism() @@ -240,3 +248,31 @@ def test_attention_slicing_forward_pass(self): test_max_difference=test_max_difference, test_mean_pixel_difference=test_mean_pixel_difference, ) + + # It needs a different sharding ratio than the standard 0.75. So, we override it. + def test_sharded_components_can_be_device_placed(self): + components = self.get_dummy_components() + + component_selected = None + for component_name in components: + if isinstance(components[component_name], ModelMixin) and hasattr( + components[component_name], "load_config" + ): + component_to_be_sharded = components[component_name] + component_cls = component_to_be_sharded.__class__ + component_selected = component_name + break + + assert component_selected, "No component selected that can be sharded." + + model_size = compute_module_sizes(component_to_be_sharded)[""] + max_shard_size = int((model_size * 0.45) / (2**10)) + + with tempfile.TemporaryDirectory() as tmp_dir: + component_to_be_sharded.cpu().save_pretrained(tmp_dir, max_shard_size=f"{max_shard_size}KB") + self.assertTrue(os.path.exists(os.path.join(tmp_dir, SAFE_WEIGHTS_INDEX_NAME))) + + loaded_sharded_component = component_cls.from_pretrained(tmp_dir) + _ = components.pop(component_selected) + components.update({component_selected: loaded_sharded_component}) + _ = self.pipeline_class(**components).to(torch_device) diff --git a/tests/pipelines/stable_unclip/test_stable_unclip.py b/tests/pipelines/stable_unclip/test_stable_unclip.py index bb54d212a786..9740d28b0b14 100644 --- a/tests/pipelines/stable_unclip/test_stable_unclip.py +++ b/tests/pipelines/stable_unclip/test_stable_unclip.py @@ -1,4 +1,6 @@ import gc +import os +import tempfile import unittest import torch @@ -12,8 +14,17 @@ StableUnCLIPPipeline, UNet2DConditionModel, ) +from diffusers.models.modeling_utils import ModelMixin from diffusers.pipelines.stable_diffusion.stable_unclip_image_normalizer import StableUnCLIPImageNormalizer -from diffusers.utils.testing_utils import enable_full_determinism, load_numpy, nightly, require_torch_gpu, torch_device +from diffusers.utils import SAFE_WEIGHTS_INDEX_NAME +from diffusers.utils.testing_utils import ( + enable_full_determinism, + is_accelerate_available, + load_numpy, + nightly, + require_torch_gpu, + torch_device, +) from ..pipeline_params import TEXT_TO_IMAGE_BATCH_PARAMS, TEXT_TO_IMAGE_IMAGE_PARAMS, TEXT_TO_IMAGE_PARAMS from ..test_pipelines_common import ( @@ -24,6 +35,10 @@ ) +if is_accelerate_available(): + from accelerate.utils import compute_module_sizes + + enable_full_determinism() @@ -184,6 +199,46 @@ def test_attention_slicing_forward_pass(self): def test_inference_batch_single_identical(self): self._test_inference_batch_single_identical(expected_max_diff=1e-3) + @unittest.skip("Test not supported.") + def test_calling_mco_raises_error_device_mapped_components(self): + pass + + @unittest.skip("Test not supported.") + def test_calling_to_raises_error_device_mapped_components(self): + pass + + @unittest.skip("Test not supported.") + def test_calling_sco_raises_error_device_mapped_components(self): + pass + + # It needs a different sharding ratio than the standard 0.75. So, we override it. + def test_sharded_components_can_be_device_placed(self): + components = self.get_dummy_components() + + component_selected = None + for component_name in components: + if isinstance(components[component_name], ModelMixin) and hasattr( + components[component_name], "load_config" + ): + component_to_be_sharded = components[component_name] + component_cls = component_to_be_sharded.__class__ + component_selected = component_name + break + + assert component_selected, "No component selected that can be sharded." + + model_size = compute_module_sizes(component_to_be_sharded)[""] + max_shard_size = int((model_size * 0.45) / (2**10)) + + with tempfile.TemporaryDirectory() as tmp_dir: + component_to_be_sharded.cpu().save_pretrained(tmp_dir, max_shard_size=f"{max_shard_size}KB") + self.assertTrue(os.path.exists(os.path.join(tmp_dir, SAFE_WEIGHTS_INDEX_NAME))) + + loaded_sharded_component = component_cls.from_pretrained(tmp_dir) + _ = components.pop(component_selected) + components.update({component_selected: loaded_sharded_component}) + _ = self.pipeline_class(**components).to(torch_device) + @nightly @require_torch_gpu diff --git a/tests/pipelines/test_pipelines_common.py b/tests/pipelines/test_pipelines_common.py index 295a94c1d2e4..359799a4cba9 100644 --- a/tests/pipelines/test_pipelines_common.py +++ b/tests/pipelines/test_pipelines_common.py @@ -32,17 +32,21 @@ from diffusers.loaders import IPAdapterMixin from diffusers.models.attention_processor import AttnProcessor from diffusers.models.controlnet_xs import UNetControlNetXSModel +from diffusers.models.modeling_utils import ModelMixin from diffusers.models.unets.unet_3d_condition import UNet3DConditionModel from diffusers.models.unets.unet_i2vgen_xl import I2VGenXLUNet from diffusers.models.unets.unet_motion_model import UNetMotionModel from diffusers.pipelines.pipeline_utils import StableDiffusionMixin from diffusers.schedulers import KarrasDiffusionSchedulers -from diffusers.utils import logging +from diffusers.utils import SAFE_WEIGHTS_INDEX_NAME, logging from diffusers.utils.import_utils import is_accelerate_available, is_accelerate_version, is_xformers_available from diffusers.utils.testing_utils import ( CaptureLogger, + nightly, require_torch, + require_torch_multi_gpu, skip_mps, + slow, torch_device, ) @@ -59,6 +63,10 @@ from ..others.test_utils import TOKEN, USER, is_staging_test +if is_accelerate_available(): + from accelerate.utils import compute_module_sizes + + def to_np(tensor): if isinstance(tensor, torch.Tensor): tensor = tensor.detach().cpu().numpy() @@ -1908,6 +1916,105 @@ def test_StableDiffusionMixin_component(self): ) ) + @require_torch_multi_gpu + @slow + @nightly + def test_calling_to_raises_error_device_mapped_components(self, safe_serialization=True): + components = self.get_dummy_components() + pipe = self.pipeline_class(**components) + max_model_size = max( + compute_module_sizes(module)[""] + for _, module in pipe.components.items() + if isinstance(module, torch.nn.Module) + ) + with tempfile.TemporaryDirectory() as tmpdir: + pipe.save_pretrained(tmpdir, safe_serialization=safe_serialization) + max_memory = {0: max_model_size, 1: max_model_size} + loaded_pipe = self.pipeline_class.from_pretrained(tmpdir, device_map="balanced", max_memory=max_memory) + + with self.assertRaises(ValueError) as err_context: + loaded_pipe.to(torch_device) + + self.assertTrue( + "The following pipeline components have been found" in str(err_context.exception) + and "This is incompatible with explicitly setting the device using `to()`" in str(err_context.exception) + ) + + @require_torch_multi_gpu + @slow + @nightly + def test_calling_mco_raises_error_device_mapped_components(self, safe_serialization=True): + components = self.get_dummy_components() + pipe = self.pipeline_class(**components) + max_model_size = max( + compute_module_sizes(module)[""] + for _, module in pipe.components.items() + if isinstance(module, torch.nn.Module) + ) + with tempfile.TemporaryDirectory() as tmpdir: + pipe.save_pretrained(tmpdir, safe_serialization=safe_serialization) + max_memory = {0: max_model_size, 1: max_model_size} + loaded_pipe = self.pipeline_class.from_pretrained(tmpdir, device_map="balanced", max_memory=max_memory) + + with self.assertRaises(ValueError) as err_context: + loaded_pipe.enable_model_cpu_offload() + + self.assertTrue( + "The following pipeline components have been found" in str(err_context.exception) + and "This is incompatible with `enable_model_cpu_offload()`" in str(err_context.exception) + ) + + @require_torch_multi_gpu + @slow + @nightly + def test_calling_sco_raises_error_device_mapped_components(self, safe_serialization=True): + components = self.get_dummy_components() + pipe = self.pipeline_class(**components) + max_model_size = max( + compute_module_sizes(module)[""] + for _, module in pipe.components.items() + if isinstance(module, torch.nn.Module) + ) + with tempfile.TemporaryDirectory() as tmpdir: + pipe.save_pretrained(tmpdir, safe_serialization=safe_serialization) + max_memory = {0: max_model_size, 1: max_model_size} + loaded_pipe = self.pipeline_class.from_pretrained(tmpdir, device_map="balanced", max_memory=max_memory) + + with self.assertRaises(ValueError) as err_context: + loaded_pipe.enable_sequential_cpu_offload() + + self.assertTrue( + "The following pipeline components have been found" in str(err_context.exception) + and "This is incompatible with `enable_sequential_cpu_offload()`" in str(err_context.exception) + ) + + def test_sharded_components_can_be_device_placed(self): + components = self.get_dummy_components() + + component_selected = None + for component_name in components: + if isinstance(components[component_name], ModelMixin) and hasattr( + components[component_name], "load_config" + ): + component_to_be_sharded = components[component_name] + component_cls = component_to_be_sharded.__class__ + component_selected = component_name + break + + assert component_selected, "No component selected that can be sharded." + + model_size = compute_module_sizes(component_to_be_sharded)[""] + max_shard_size = int((model_size * 0.75) / (2**10)) + + with tempfile.TemporaryDirectory() as tmp_dir: + component_to_be_sharded.cpu().save_pretrained(tmp_dir, max_shard_size=f"{max_shard_size}KB") + self.assertTrue(os.path.exists(os.path.join(tmp_dir, SAFE_WEIGHTS_INDEX_NAME))) + + loaded_sharded_component = component_cls.from_pretrained(tmp_dir) + _ = components.pop(component_selected) + components.update({component_selected: loaded_sharded_component}) + _ = self.pipeline_class(**components).to(torch_device) + @is_staging_test class PipelinePushToHubTester(unittest.TestCase): diff --git a/tests/pipelines/unclip/test_unclip.py b/tests/pipelines/unclip/test_unclip.py index 07590c9db458..a5fe670105a1 100644 --- a/tests/pipelines/unclip/test_unclip.py +++ b/tests/pipelines/unclip/test_unclip.py @@ -14,6 +14,8 @@ # limitations under the License. import gc +import os +import tempfile import unittest import numpy as np @@ -21,9 +23,12 @@ from transformers import CLIPTextConfig, CLIPTextModelWithProjection, CLIPTokenizer from diffusers import PriorTransformer, UnCLIPPipeline, UnCLIPScheduler, UNet2DConditionModel, UNet2DModel +from diffusers.models.modeling_utils import ModelMixin from diffusers.pipelines.unclip.text_proj import UnCLIPTextProjModel +from diffusers.utils import SAFE_WEIGHTS_INDEX_NAME from diffusers.utils.testing_utils import ( enable_full_determinism, + is_accelerate_available, load_numpy, nightly, require_torch_gpu, @@ -35,6 +40,10 @@ from ..test_pipelines_common import PipelineTesterMixin, assert_mean_pixel_difference +if is_accelerate_available(): + from accelerate.utils import compute_module_sizes + + enable_full_determinism() @@ -418,6 +427,34 @@ def test_save_load_optional_components(self): def test_float16_inference(self): super().test_float16_inference(expected_max_diff=1.0) + # It needs a different sharding ratio than the standard 0.75. So, we override it. + def test_sharded_components_can_be_device_placed(self): + components = self.get_dummy_components() + + component_selected = None + for component_name in components: + if isinstance(components[component_name], ModelMixin) and hasattr( + components[component_name], "load_config" + ): + component_to_be_sharded = components[component_name] + component_cls = component_to_be_sharded.__class__ + component_selected = component_name + break + + assert component_selected, "No component selected that can be sharded." + + model_size = compute_module_sizes(component_to_be_sharded)[""] + max_shard_size = int((model_size * 0.45) / (2**10)) + + with tempfile.TemporaryDirectory() as tmp_dir: + component_to_be_sharded.cpu().save_pretrained(tmp_dir, max_shard_size=f"{max_shard_size}KB") + self.assertTrue(os.path.exists(os.path.join(tmp_dir, SAFE_WEIGHTS_INDEX_NAME))) + + loaded_sharded_component = component_cls.from_pretrained(tmp_dir) + _ = components.pop(component_selected) + components.update({component_selected: loaded_sharded_component}) + _ = self.pipeline_class(**components).to(torch_device) + @nightly class UnCLIPPipelineCPUIntegrationTests(unittest.TestCase):