diff --git a/LICENSE b/LICENSE index f20b198d808..c16e59652bb 100644 --- a/LICENSE +++ b/LICENSE @@ -9,6 +9,7 @@ Copyright (c) 2023 Apple Inc. Copyright (c) 2024 MediaTek Inc. Copyright 2023 NXP Copyright (c) 2025 Samsung Electronics Co. LTD +Copyright (c) Intel Corporation Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: diff --git a/examples/models/stable_diffusion/__init__.py b/examples/models/stable_diffusion/__init__.py new file mode 100644 index 00000000000..b749e67df66 --- /dev/null +++ b/examples/models/stable_diffusion/__init__.py @@ -0,0 +1,9 @@ +# Copyright (c) Intel Corporation +# +# Licensed under the BSD License (the "License"); you may not use this file +# except in compliance with the License. See the license file found in the +# LICENSE file in the root directory of this source tree. + +from .model import LCMModelLoader, TextEncoderWrapper, UNetWrapper, VAEDecoder + +__all__ = ["LCMModelLoader", "TextEncoderWrapper", "UNetWrapper", "VAEDecoder"] diff --git a/examples/models/stable_diffusion/model.py b/examples/models/stable_diffusion/model.py new file mode 100644 index 00000000000..6c55e2bb173 --- /dev/null +++ b/examples/models/stable_diffusion/model.py @@ -0,0 +1,193 @@ +# Copyright (c) Intel Corporation +# +# Licensed under the BSD License (the "License"); you may not use this file +# except in compliance with the License. See the license file found in the +# LICENSE file in the root directory of this source tree. + +""" +Stable Diffusion / LCM model definitions. + +This module provides reusable model wrappers that can be used with any backend +(OpenVINO, XNNPACK, etc.) for exporting Latent Consistency Models. +""" + +import logging +from typing import Any, Optional + +import torch + +try: + from diffusers import DiffusionPipeline +except ImportError: + raise ImportError( + "Please install diffusers and transformers: pip install diffusers transformers" + ) + +logger = logging.getLogger(__name__) + + +class TextEncoderWrapper(torch.nn.Module): + """Wrapper for CLIP text encoder that extracts last_hidden_state""" + + def __init__(self, text_encoder): + super().__init__() + self.text_encoder = text_encoder + + def forward(self, input_ids): + # Call text encoder and extract last_hidden_state + output = self.text_encoder(input_ids, return_dict=True) + return output.last_hidden_state + + +class UNetWrapper(torch.nn.Module): + """Wrapper for UNet that extracts sample tensor from output""" + + def __init__(self, unet): + super().__init__() + self.unet = unet + + def forward(self, latents, timestep, encoder_hidden_states): + # Call UNet and extract sample from the output + output = self.unet(latents, timestep, encoder_hidden_states, return_dict=True) + return output.sample + + +class VAEDecoder(torch.nn.Module): + """Wrapper for VAE decoder with scaling and normalization""" + + def __init__(self, vae): + super().__init__() + self.vae = vae + + def forward(self, latents): + # Scale latents + latents = latents / self.vae.config.scaling_factor + # Decode + image = self.vae.decode(latents).sample + # Scale to [0, 1] + image = (image / 2 + 0.5).clamp(0, 1) + return image + + +class LCMModelLoader: + """ + Backend-agnostic loader for Latent Consistency Model components. + + This class handles loading the LCM pipeline from HuggingFace and extracting + individual components (text_encoder, unet, vae) as PyTorch modules ready + for export to any backend. + """ + + def __init__( + self, + model_id: str = "SimianLuo/LCM_Dreamshaper_v7", + dtype: torch.dtype = torch.float16, + ): + """ + Initialize the LCM model loader. + + Args: + model_id: HuggingFace model ID for the LCM model + dtype: Target dtype for the models (fp16 or fp32) + """ + self.model_id = model_id + self.dtype = dtype + self.pipeline: Optional[DiffusionPipeline] = None + self.text_encoder: Any = None + self.unet: Any = None + self.vae: Any = None + self.tokenizer: Any = None + + def load_models(self) -> bool: + """ + Load the LCM pipeline and extract components. + + Returns: + True if successful, False otherwise + """ + try: + logger.info(f"Loading LCM pipeline: {self.model_id} (dtype: {self.dtype})") + self.pipeline = DiffusionPipeline.from_pretrained( + self.model_id, use_safetensors=True + ) + + # Extract individual components and convert to desired dtype + self.text_encoder = self.pipeline.text_encoder.to(dtype=self.dtype) + self.unet = self.pipeline.unet.to(dtype=self.dtype) + self.vae = self.pipeline.vae.to(dtype=self.dtype) + self.tokenizer = self.pipeline.tokenizer + + # Set models to evaluation mode + self.text_encoder.eval() + self.unet.eval() + self.vae.eval() + + logger.info("Successfully loaded all LCM model components") + return True + + except Exception as e: + logger.error(f"Failed to load models: {e}") + import traceback + + traceback.print_exc() + return False + + def get_text_encoder_wrapper(self) -> TextEncoderWrapper: + """Get wrapped text encoder ready for export""" + if self.text_encoder is None: + raise ValueError("Models not loaded. Call load_models() first.") + return TextEncoderWrapper(self.text_encoder) + + def get_unet_wrapper(self) -> UNetWrapper: + """Get wrapped UNet ready for export""" + if self.unet is None: + raise ValueError("Models not loaded. Call load_models() first.") + return UNetWrapper(self.unet) + + def get_vae_decoder(self) -> VAEDecoder: + """Get wrapped VAE decoder ready for export""" + if self.vae is None: + raise ValueError("Models not loaded. Call load_models() first.") + return VAEDecoder(self.vae) + + def get_dummy_inputs(self): + """ + Get dummy inputs for each model component. + + Returns: + Dictionary with dummy inputs for text_encoder, unet, and vae_decoder + """ + if self.unet is None: + raise ValueError("Models not loaded. Call load_models() first.") + + # Text encoder dummy input + text_encoder_input = torch.ones(1, 77, dtype=torch.long) + + # UNet dummy inputs + batch_size = 1 + latent_channels = 4 + latent_height = 64 + latent_width = 64 + text_embed_dim = self.unet.config.cross_attention_dim + text_seq_len = 77 + + unet_inputs = ( + torch.randn( + batch_size, + latent_channels, + latent_height, + latent_width, + dtype=self.dtype, + ), + torch.tensor([981]), # Random timestep + torch.randn(batch_size, text_seq_len, text_embed_dim, dtype=self.dtype), + ) + + # VAE decoder dummy input + vae_input = torch.randn(1, 4, 64, 64, dtype=self.dtype) + + return { + "text_encoder": (text_encoder_input,), + "unet": unet_inputs, + "vae_decoder": (vae_input,), + } diff --git a/examples/openvino/stable_diffusion/README.md b/examples/openvino/stable_diffusion/README.md new file mode 100644 index 00000000000..fef1e3f50f9 --- /dev/null +++ b/examples/openvino/stable_diffusion/README.md @@ -0,0 +1,48 @@ +# Stable Diffusion LCM with OpenVINO Backend + +This example demonstrates how to run Latent Consistency Models (LCM) for fast text-to-image generation on Intel hardware using ExecuTorch with the OpenVINO backend. + +## Overview + +Latent Consistency Models (LCMs) are optimized diffusion models that generate high-quality images in just 4-8 steps, compared to 25-50 steps required by traditional Stable Diffusion models. + +## Environment Setup +Follow the [instructions](../../../backends/openvino/README.md) of **Prerequisites** and **Setup** in `backends/openvino/README.md` to set up the OpenVINO backend. + +### Install dependencies +```bash +pip install -r requirements.txt +``` + +## Export the Model + +Export the LCM model: + +```bash +python export_lcm.py \ + --model_id SimianLuo/LCM_Dreamshaper_v7 \ + --output_dir ./lcm_models \ + --device CPU \ + --dtype fp16 +``` +This will create three files in `./lcm_models/`: +- `text_encoder.pte` +- `unet.pte` +- `vae_decoder.pte` + +### Generate Images + +Run inference with the exported model: + +```bash +python openvino_lcm.py \ + --models_dir ./lcm_models \ + --prompt "a beautiful sunset over mountains" \ + --steps 4 \ + --dtype fp16 +``` +## Supported Models + +This implementation supports LCM-based Stable Diffusion models: +- **SimianLuo/LCM_Dreamshaper_v7** +- **latent-consistency/lcm-sdxl** \ No newline at end of file diff --git a/examples/openvino/stable_diffusion/export_lcm.py b/examples/openvino/stable_diffusion/export_lcm.py new file mode 100644 index 00000000000..3917b1abf6d --- /dev/null +++ b/examples/openvino/stable_diffusion/export_lcm.py @@ -0,0 +1,274 @@ +# Copyright (c) Intel Corporation +# +# Licensed under the BSD License (the "License"); you may not use this file +# except in compliance with the License. See the license file found in the +# LICENSE file in the root directory of this source tree. + +# mypy: disable-error-code="union-attr,import-not-found" + +import argparse +import logging +import os + +import torch + +from executorch.backends.openvino.partitioner import OpenvinoPartitioner +from executorch.examples.models.stable_diffusion.model import ( # type: ignore[import-untyped] + LCMModelLoader, +) +from executorch.exir import ExecutorchBackendConfig, to_edge_transform_and_lower +from executorch.exir.backend.backend_details import CompileSpec +from torch.export import export + +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +class LCMOpenVINOExporter: + """Export Latent Consistency Model (LCM) components to OpenVINO PTE files""" + + def __init__( + self, + model_id: str = "SimianLuo/LCM_Dreamshaper_v7", + dtype: torch.dtype = torch.float16, + ): + self.model_loader = LCMModelLoader(model_id=model_id, dtype=dtype) + + def load_models(self) -> bool: + """Load the LCM pipeline and extract components""" + return self.model_loader.load_models() + + def export_text_encoder(self, output_path: str, device: str = "CPU") -> bool: + """Export CLIP text encoder to PTE file""" + try: + logger.info("Exporting text encoder with OpenVINO backend...") + + # Get wrapped model and dummy inputs + text_encoder_wrapper = self.model_loader.get_text_encoder_wrapper() + dummy_inputs = self.model_loader.get_dummy_inputs() + + # Export to ATEN graph + exported_program = export( + text_encoder_wrapper, dummy_inputs["text_encoder"] + ) + + # Configure OpenVINO compilation + compile_spec = [CompileSpec("device", device.encode())] + partitioner = OpenvinoPartitioner(compile_spec) + + # Lower to edge dialect and apply OpenVINO backend + edge_manager = to_edge_transform_and_lower( + exported_program, partitioner=[partitioner] + ) + + # Convert to ExecutorTorch program + executorch_program = edge_manager.to_executorch( + config=ExecutorchBackendConfig() + ) + + # Save to file + with open(output_path, "wb") as f: + f.write(executorch_program.buffer) + + logger.info("✓ Text encoder exported successfully") + return True + + except Exception as e: + logger.error(f"Failed to export text encoder: {e}") + import traceback + + traceback.print_exc() + return False + + def export_unet(self, output_path: str, device: str = "CPU") -> bool: + """Export UNet model to PTE file""" + try: + logger.info("Exporting UNet model with OpenVINO backend...") + + # Get wrapped model and dummy inputs + unet_wrapper = self.model_loader.get_unet_wrapper() + dummy_inputs = self.model_loader.get_dummy_inputs() + + # Export to ATEN graph + exported_program = export(unet_wrapper, dummy_inputs["unet"]) + + # Configure OpenVINO compilation + compile_spec = [CompileSpec("device", device.encode())] + partitioner = OpenvinoPartitioner(compile_spec) + + # Lower to edge dialect and apply OpenVINO backend + edge_manager = to_edge_transform_and_lower( + exported_program, partitioner=[partitioner] + ) + + # Convert to ExecutorTorch program + executorch_program = edge_manager.to_executorch( + config=ExecutorchBackendConfig() + ) + + # Save to file + with open(output_path, "wb") as f: + f.write(executorch_program.buffer) + + logger.info("✓ UNet exported successfully") + return True + + except Exception as e: + logger.error(f"Failed to export UNet: {e}") + import traceback + + traceback.print_exc() + return False + + def export_vae_decoder(self, output_path: str, device: str = "CPU") -> bool: + """Export VAE decoder to PTE file""" + try: + logger.info("Exporting VAE decoder with OpenVINO backend...") + + # Get wrapped model and dummy inputs + vae_decoder = self.model_loader.get_vae_decoder() + dummy_inputs = self.model_loader.get_dummy_inputs() + + # Export to ATEN graph + exported_program = export(vae_decoder, dummy_inputs["vae_decoder"]) + + # Configure OpenVINO compilation + compile_spec = [CompileSpec("device", device.encode())] + partitioner = OpenvinoPartitioner(compile_spec) + + # Lower to edge dialect and apply OpenVINO backend + edge_manager = to_edge_transform_and_lower( + exported_program, partitioner=[partitioner] + ) + + # Convert to ExecutorTorch program + executorch_program = edge_manager.to_executorch( + config=ExecutorchBackendConfig() + ) + + # Save to file + with open(output_path, "wb") as f: + f.write(executorch_program.buffer) + + logger.info("✓ VAE decoder exported successfully") + return True + + except Exception as e: + logger.error(f"Failed to export VAE decoder: {e}") + import traceback + + traceback.print_exc() + return False + + def export_all_components(self, output_dir: str, device: str = "CPU") -> bool: + """Export all LCM components""" + # Create output directory + os.makedirs(output_dir, exist_ok=True) + + # Define output paths + text_encoder_path = os.path.join(output_dir, "text_encoder.pte") + unet_path = os.path.join(output_dir, "unet.pte") + vae_decoder_path = os.path.join(output_dir, "vae_decoder.pte") + + # Export each component + success = True + success &= self.export_text_encoder(text_encoder_path, device) + success &= self.export_unet(unet_path, device) + success &= self.export_vae_decoder(vae_decoder_path, device) + + if success: + logger.info(f"\n{'='*60}") + logger.info("✓ All components exported successfully!") + logger.info(f"Output directory: {output_dir}") + logger.info(f"{'='*60}") + else: + logger.error("Export failed") + + return success + + +def create_argument_parser(): + """Create command line argument parser""" + parser = argparse.ArgumentParser( + description="Export Latent Consistency Model (LCM) components to OpenVINO PTE files", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + Export LCM_Dreamshaper_v7 (default): + python export_lcm.py --output_dir ./lcm_models +""", + ) + + parser.add_argument( + "--model_id", + type=str, + default="SimianLuo/LCM_Dreamshaper_v7", + help="HuggingFace model ID for LCM (default: SimianLuo/LCM_Dreamshaper_v7)", + ) + + parser.add_argument( + "--output_dir", + type=str, + required=True, + help="Output directory for exported PTE files", + ) + + parser.add_argument( + "--device", + choices=["CPU", "GPU", "NPU"], + default="CPU", + help="Target OpenVINO device (default: CPU)", + ) + + parser.add_argument( + "--dtype", + choices=["fp16", "fp32"], + default="fp16", + help="Model data type (default: fp16)", + ) + + parser.add_argument("--verbose", action="store_true", help="Enable verbose logging") + + return parser + + +def main() -> int: + """Main execution function""" + parser = create_argument_parser() + args = parser.parse_args() + + # Set logging level + if args.verbose: + logging.getLogger().setLevel(logging.DEBUG) + + logger.info("=" * 60) + logger.info("LCM Model Export") + logger.info(f"Model: {args.model_id}") + logger.info(f"Device: {args.device} | Dtype: {args.dtype}") + logger.info("=" * 60) + + # Map dtype string to torch dtype + dtype_map = {"fp16": torch.float16, "fp32": torch.float32} + dtype = dtype_map[args.dtype] + + # Create exporter and load models + exporter = LCMOpenVINOExporter(args.model_id, dtype=dtype) + + if not exporter.load_models(): + logger.error("Failed to load models") + return 1 + + # Export all components + if not exporter.export_all_components(args.output_dir, args.device): + return 1 + + logger.info("\nTo run inference:") + logger.info( + f' python openvino_lcm.py --models_dir {args.output_dir} --prompt "your prompt" --steps 4' + ) + return 0 + + +if __name__ == "__main__": + main() diff --git a/examples/openvino/stable_diffusion/openvino_lcm.py b/examples/openvino/stable_diffusion/openvino_lcm.py new file mode 100644 index 00000000000..f9d68a633a3 --- /dev/null +++ b/examples/openvino/stable_diffusion/openvino_lcm.py @@ -0,0 +1,396 @@ +# Copyright (c) Intel Corporation +# +# Licensed under the BSD License (the "License"); you may not use this file +# except in compliance with the License. See the license file found in the +# LICENSE file in the root directory of this source tree. + +# mypy: disable-error-code="union-attr,import-not-found" + +import argparse +import logging +import os +import time +from typing import Any, Dict, Optional + +import torch +from PIL import Image + +try: + from diffusers import LCMScheduler + from transformers import CLIPTokenizer +except ImportError: + raise ImportError( + "Please install diffusers and transformers: pip install diffusers transformers" + ) + +from executorch.runtime import Runtime + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +class OpenVINOLCMPipeline: + """OpenVINO optimized Latent Consistency Model pipeline for Intel hardware""" + + def __init__(self, device: str = "CPU", dtype: torch.dtype = torch.float16): + self.device = device + self.dtype = dtype + self.models: Dict[str, Any] = {} + self.tokenizer: Optional[CLIPTokenizer] = None + self.scheduler: Optional[LCMScheduler] = None + self.runtime = Runtime.get() + self._initialized = False + + # Cumulative timing metrics + self.models_load_time = 0.0 + self.exec_time = 0.0 + + def load_tokenizer(self, vocab_path: str): + """Load CLIP tokenizer""" + try: + self.tokenizer = CLIPTokenizer.from_pretrained( + "openai/clip-vit-base-patch32" + ) + logger.info("✓ Tokenizer loaded") + return True + except Exception as e: + logger.error(f"Failed to load tokenizer: {e}") + return False + + def initialize_scheduler( + self, original_model_id: str = "SimianLuo/LCM_Dreamshaper_v7" + ): + """Initialize the LCM scheduler""" + try: + self.scheduler = LCMScheduler.from_pretrained( + original_model_id, subfolder="scheduler" + ) + logger.info("✓ Scheduler loaded") + return True + except Exception as e: + logger.error(f"Failed to load scheduler from {original_model_id}: {e}") + return False + + def load_model_component(self, component_name: str, model_path: str): + """Load a model component""" + try: + if not os.path.exists(model_path): + logger.error(f"Model file not found: {model_path}") + return False + + program = self.runtime.load_program(model_path) + self.models[component_name] = program + logger.info(f"✓ Loaded {component_name}") + return True + except Exception as e: + logger.error(f"Failed to load {component_name}: {e}") + return False + + def encode_prompt(self, prompt: str): + """Encode text prompt using the text encoder""" + if "text_encoder" not in self.models or self.tokenizer is None: + logger.error("Text encoder or tokenizer not loaded") + return None + + try: + inputs = self.tokenizer( + prompt, + padding="max_length", + max_length=77, + truncation=True, + return_tensors="pt", + ) + + load_start = time.time() + text_encoder_method = self.models["text_encoder"].load_method("forward") + load_time = time.time() - load_start + self.models_load_time += load_time + + exec_start = time.time() + embeddings = text_encoder_method.execute([inputs.input_ids])[0] + exec_time = time.time() - exec_start + self.exec_time += exec_time + + logger.info( + f"Text encoder - Load: {load_time:.3f}s, Execute: {exec_time:.3f}s" + ) + return embeddings + except Exception as e: + logger.error(f"Failed to encode prompt: {e}") + return None + + def denoise_latents( + self, + text_embeddings: torch.Tensor, + num_steps: int, + guidance_scale: float, + seed: Optional[int] = None, + ): + """Run the denoising process using the UNet model with LCM scheduler""" + if "unet" not in self.models: + logger.error("UNet model not loaded") + return None + + try: + # Initialize latents + generator = torch.Generator() + if seed is not None: + generator.manual_seed(seed) + + latents = torch.randn( + (1, 4, 64, 64), # Standard latent dimensions for SD + generator=generator, + dtype=self.dtype, + ) + + # Set timesteps for LCM + self.scheduler.set_timesteps(num_steps) + + # Get UNet method + load_start = time.time() + unet_method = self.models["unet"].load_method("forward") + load_time = time.time() - load_start + self.models_load_time += load_time + logger.info(f"UNet - Load: {load_time:.3f}s") + + # Denoising loop + logger.info(f"Running LCM denoising with {num_steps} steps...") + denoise_start = time.time() + + for step, timestep in enumerate(self.scheduler.timesteps): + step_start = time.time() + + latent_model_input = self.scheduler.scale_model_input(latents, timestep) + if latent_model_input.dtype != self.dtype: + latent_model_input = latent_model_input.to(self.dtype) + + timestep_tensor = torch.tensor( + timestep.item(), dtype=torch.long + ).unsqueeze(0) + noise_pred = unet_method.execute( + [latent_model_input, timestep_tensor, text_embeddings] + )[0] + + if guidance_scale != 1.0: + noise_pred = noise_pred * guidance_scale + + latents = self.scheduler.step(noise_pred, timestep, latents).prev_sample + logger.info( + f" Step {step+1}/{num_steps} completed ({time.time() - step_start:.3f}s)" + ) + + exec_time = time.time() - denoise_start + self.exec_time += exec_time + logger.info( + f"UNet - Execute: {exec_time:.3f}s, avg {exec_time/num_steps:.3f}s/step" + ) + + return latents + except Exception as e: + logger.error(f"Failed during denoising: {e}") + return None + + def decode_image(self, latents: torch.Tensor): + """Decode latents to final image using VAE decoder""" + if "vae_decoder" not in self.models: + logger.error("VAE decoder not loaded") + return None + + try: + load_start = time.time() + vae_method = self.models["vae_decoder"].load_method("forward") + load_time = time.time() - load_start + self.models_load_time += load_time + + exec_start = time.time() + decoded_image = vae_method.execute([latents])[0] + exec_time = time.time() - exec_start + self.exec_time += exec_time + + # Convert from (1, 3, 512, 512) CHW to (512, 512, 3) HWC + conversion_start = time.time() + decoded_image = decoded_image.squeeze(0).permute(1, 2, 0) + decoded_image = (decoded_image * 255).clamp(0, 255).to(torch.uint8) + image = Image.fromarray(decoded_image.numpy()) + postprocess_time = time.time() - conversion_start + self.exec_time += postprocess_time + + logger.info( + f"VAE decoder - Load: {load_time:.3f}s, " + f"Execute: {exec_time:.3f}s, " + f"Post-process: {postprocess_time:.3f}s" + ) + + return image + except Exception as e: + logger.error(f"Failed to decode image: {e}") + return None + + def generate_image( + self, + prompt: str, + num_steps: int = 4, + guidance_scale: float = 1.0, + seed: Optional[int] = None, + ): + """Complete image generation pipeline using LCM""" + if not self._initialized: + logger.error("Pipeline not initialized") + return None + + logger.info("=" * 60) + logger.info(f"Prompt: '{prompt}'") + logger.info(f"Steps: {num_steps} | Guidance: {guidance_scale} | Seed: {seed}") + logger.info("=" * 60) + + # Reset cumulative timers + self.models_load_time = 0.0 + self.exec_time = 0.0 + + total_start = time.time() + + text_embeddings = self.encode_prompt(prompt) + if text_embeddings is None: + return None + + latents = self.denoise_latents(text_embeddings, num_steps, guidance_scale, seed) + if latents is None: + return None + + image = self.decode_image(latents) + if image is None: + return None + + total_time = time.time() - total_start + + logger.info("=" * 60) + logger.info("✓ Generation completed!") + logger.info(f" Total time: {total_time:.3f}s") + logger.info(f" Total load time: {self.models_load_time:.3f}s") + logger.info(f" Total Inference time: {self.exec_time:.3f}s") + logger.info("=" * 60) + return image + + def initialize( + self, + text_encoder_path: str, + unet_path: str, + vae_path: str, + vocab_path: str, + original_model_id: str = "SimianLuo/LCM_Dreamshaper_v7", + ): + """Initialize the LCM pipeline""" + logger.info("Initializing pipeline...") + + if not self.load_tokenizer(vocab_path): + return False + + if not self.initialize_scheduler(original_model_id): + return False + + components = { + "text_encoder": text_encoder_path, + "unet": unet_path, + "vae_decoder": vae_path, + } + + for component, path in components.items(): + if not self.load_model_component(component, path): + return False + + self._initialized = True + logger.info("✓ Pipeline ready") + return True + + +def create_argument_parser(): + """Create command line argument parser""" + parser = argparse.ArgumentParser( + description="OpenVINO LCM (Latent Consistency Model) Inference", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Example: + python openvino_lcm.py --models_dir ./lcm_models --prompt "sunset over mountains" --steps 4 +""", + ) + + parser.add_argument( + "--models_dir", type=str, required=True, help="Directory containing PTE models" + ) + parser.add_argument( + "--prompt", type=str, default="a serene landscape", help="Text prompt" + ) + parser.add_argument( + "--steps", type=int, default=4, help="Denoising steps (default: 4)" + ) + parser.add_argument( + "--guidance", type=float, default=1.0, help="Guidance scale (default: 1.0)" + ) + parser.add_argument("--seed", type=int, help="Random seed") + parser.add_argument( + "--device", choices=["CPU", "GPU"], default="CPU", help="Target device" + ) + parser.add_argument( + "--dtype", choices=["fp16", "fp32"], default="fp16", help="Model dtype" + ) + parser.add_argument( + "--output_dir", type=str, default="./lcm_outputs", help="Output directory" + ) + parser.add_argument("--filename", type=str, help="Custom output filename") + parser.add_argument("--tokenizer_path", type=str, help="Tokenizer path (optional)") + parser.add_argument( + "--original_model_id", + type=str, + default="SimianLuo/LCM_Dreamshaper_v7", + help="Model ID for scheduler", + ) + + return parser + + +def validate_model_files(models_dir: str): + """Validate required model files exist""" + for filename in ["text_encoder.pte", "unet.pte", "vae_decoder.pte"]: + if not os.path.exists(os.path.join(models_dir, filename)): + logger.error(f"Missing: {filename}") + return False + return True + + +def main(): + """Main execution function""" + args = create_argument_parser().parse_args() + + if not validate_model_files(args.models_dir): + return + + os.makedirs(args.output_dir, exist_ok=True) + + dtype = torch.float16 if args.dtype == "fp16" else torch.float32 + pipeline = OpenVINOLCMPipeline(device=args.device, dtype=dtype) + + if not pipeline.initialize( + text_encoder_path=os.path.join(args.models_dir, "text_encoder.pte"), + unet_path=os.path.join(args.models_dir, "unet.pte"), + vae_path=os.path.join(args.models_dir, "vae_decoder.pte"), + vocab_path=args.tokenizer_path or "", + original_model_id=args.original_model_id, + ): + return + + image = pipeline.generate_image(args.prompt, args.steps, args.guidance, args.seed) + if image is None: + return + + # Save image + filename = args.filename or "output.jpg" + if not filename.endswith(".jpg"): + filename += ".jpg" + + output_path = os.path.join(args.output_dir, filename) + image.save(output_path) + logger.info(f"Image saved: {output_path}") + + +if __name__ == "__main__": + main() diff --git a/examples/openvino/stable_diffusion/requirements.txt b/examples/openvino/stable_diffusion/requirements.txt new file mode 100644 index 00000000000..7a55f6c19b8 --- /dev/null +++ b/examples/openvino/stable_diffusion/requirements.txt @@ -0,0 +1 @@ +diffusers>=0.29.0 \ No newline at end of file