diff --git a/auto_round/compressors/base.py b/auto_round/compressors/base.py index e9e0a66a..b3faa135 100644 --- a/auto_round/compressors/base.py +++ b/auto_round/compressors/base.py @@ -35,6 +35,7 @@ from auto_round.export.export_to_gguf.config import GGUF_CONFIG, GGUF_INNER_CONFIG, ModelType from auto_round.logger import logger from auto_round.low_cpu_mem.utils import get_layers_before_block +from auto_round.quantizers import BaseQuantizer, create_quantizer from auto_round.schemes import QuantizationScheme, preset_name_to_scheme from auto_round.sign_sgd import SignSGD from auto_round.special_model_handler import _handle_moe_model @@ -98,6 +99,34 @@ ) from auto_round.wrapper import WrapperLinear, WrapperMultiblock, unwrapper_block, unwrapper_layer, wrapper_block +SERIALIZATION_KEYS = [ + "bits", + "group_size", + "sym", + "data_type", + "enable_quanted_input", + "enable_minmax_tuning", + "seqlen", + "batch_size", + "scale_dtype", + "lr", + "minmax_lr", + "gradient_accumulate_steps", + "iters", + "amp", + "nsamples", + "low_gpu_mem_usage", + "to_quant_block_names", + "enable_norm_bias_tuning", + "act_bits", + "act_group_size", + "act_sym", + "act_dynamic", + "act_data_type", + "super_bits", + "super_group_size", +] + class BaseCompressor(object): """Base compressor for LLM quantization @@ -125,6 +154,8 @@ class BaseCompressor(object): super_bits: int | None super_group_size: int | None + quantizer: BaseQuantizer | None + def __init__( self, model: Union[torch.nn.Module, str], @@ -385,6 +416,8 @@ def __init__( import habana_frameworks.torch.core as htcore # pylint: disable=E0401 import habana_frameworks.torch.hpu as hthpu # pylint: disable=E0401] + self.quantizer = create_quantizer(self) + def _set_device(self, device_map): if hasattr(self, "device") and self.device is not None: return @@ -409,6 +442,141 @@ def _set_device(self, device_map): else: raise TypeError(f"device_map should be [str, torch.device, int, dict], but got {type(device_map)}") + def _set_layerwise_config(self, layer_config: dict) -> bool: + """ + Sets the layer-wise configuration based on the provided `layer_config`. + By default, only quantize layers in blocks. + + Args: + layer_config (dict): The configuration dictionary for each layer containing various configuration options. + + Returns: + bool: Returns True if there are quantized layers outside the blocks (e.g., lm-head), + otherwise returns False. + """ + # Get the names of layers in quantization blocks + supported_types = self.supported_types + layers_in_blocks = get_layer_names_in_block( + self.model, supported_types, self.quant_block_list, self.inner_supported_types + ) + ##process regex in layer_config + all_supported_layer_names = [] + # List of configuration keys + keys = get_quant_keys() + + for n, m in self.model.named_modules(): + # Delete previous configuration to avoid conflicts with prior tuning + for key in keys: + if hasattr(m, key): + delattr(m, key) + + if not isinstance(m, supported_types) and m.__class__.__name__ not in self.inner_supported_types: + continue + all_supported_layer_names.append(n) + + names_in_layer_config = list(layer_config.keys()) + for name in names_in_layer_config: + if name in all_supported_layer_names: + continue + matched_names = [] + for layer_name in all_supported_layer_names: + if re.search(re.compile(name), layer_name) is not None: + matched_names.append(layer_name) + if len(matched_names) > 0: + val = layer_config[name] + layer_config.pop(name) + for match_name in matched_names: + layer_config[match_name] = val + else: + tmp_m = get_module(self.model, name) + if not isinstance(tmp_m, torch.nn.Embedding): # TODO not good code style + raise ValueError(f"key {name} in layer_config is invalid, please have a double check") + + has_qlayer_outside_block = False # Flag to track if there are quantized layers outside blocks (e.g., lm-head) + + # Iterate through all modules in the model + is_gguf = hasattr(self, "formats") and any("gguf" in format_ for format_ in self.formats) + for n, m in self.model.named_modules(): + # Skip unsupported types + if not isinstance(m, supported_types) and m.__class__.__name__ not in self.inner_supported_types: + if n in self.layer_config: + if not isinstance(m, torch.nn.Embedding): + logger.warning(f"{n} is not supported, layer_config {n}: {layer_config[n]} will be ignored.") + self.layer_config.pop(n) + continue + if not is_gguf: + if not check_to_quantized(layer_config[n]): + self.layer_config.pop(n) + continue + else: + continue + + # If the layer is not in the config and is part of a quantization block, use default configuration + if n not in layer_config.keys() and n in layers_in_blocks: + layer_config[n] = {} + for key in keys: + layer_config[n][key] = getattr(self, key) + + # If the layer is partially configured, fill in missing values + elif n in layer_config.keys(): + if "data_type" in layer_config[n] and "bits" not in layer_config[n]: + tmp_bits = infer_bits_by_data_type(layer_config[n]["data_type"]) + if tmp_bits is not None and tmp_bits != self.bits: + logger.warning( + f"'data_type' do not match the specified 'bits' setting for {n}." + f" Resetting 'bits' to {tmp_bits}." + ) + layer_config[n]["bits"] = tmp_bits + if "act_data_type" in layer_config[n] and "act_bits" not in layer_config[n]: + tmp_bits = infer_bits_by_data_type(layer_config[n]["act_data_type"]) + if tmp_bits is not None and tmp_bits != self.act_bits: + logger.warning( + f"'act_data_type' do not match the specified 'act_bits' setting for {n}." + f" Resetting 'act_bits' to {tmp_bits}." + ) + layer_config[n]["act_bits"] = tmp_bits + + for key in keys: + if key not in layer_config[n].keys(): + layer_config[n][key] = getattr(self, key) + layer_config[n]["fixed_by_user"] = True + + # If the layer is not in the config and not part of a quantization block, + # use default configuration and set specific values + else: + layer_config[n] = {} + for key in keys: + layer_config[n][key] = getattr(self, key) + layer_config[n]["bits"] = 16 + layer_config[n]["act_bits"] = 16 + + if n in layers_in_blocks: + layer_config[n]["in_blocks"] = True + else: + layer_config[n]["in_blocks"] = False + + # If the layer is outside a block and requires quantization, mark it as a quantized layer outside the block + if ( + n not in layers_in_blocks + and check_to_quantized(layer_config[n]) + and not isinstance(m, torch.nn.Embedding) + ): + has_qlayer_outside_block = True + + in_features, out_features = get_layer_features(m) + if in_features <= layer_config[n]["group_size"]: + layer_config[n]["group_size"] = -1 + + # Apply the configuration to the corresponding layer in the model + for key in keys: + setattr(m, key, layer_config[n][key]) + need_to_quantize_lm_head = self._check_need_to_quantize_lm_head_embedding() + if need_to_quantize_lm_head: + has_qlayer_outside_block = True + + # Return whether there are quantized layers outside the blocks + return has_qlayer_outside_block + def _parse_layer_config(self, layer_config: dict[str, Union[str, dict, QuantizationScheme]]) -> None: """Parse and set the layer-wise quantization configuration.""" # Some other quantization configs @@ -1102,226 +1270,6 @@ def _get_save_folder_name(self, format_str: str) -> str: return self.orig_output_dir - @torch.inference_mode() - def _quantize_embedding_layer(self): - """Quantizes embedding layers in the model according to the configuration. - - This method iterates through all modules in the model, identifies embedding - layers specified in `self.layer_config`, and applies the appropriate quantization - function based on bit precision, grouping strategy, and dtype. - - Returns: - bool: True if the quantization process completes without critical errors. - """ - is_quantized = False - for name, module in self.model.named_modules(): - # Skip non-Embedding modules or layers not in config - if not isinstance(module, torch.nn.Embedding) or name not in self.layer_config: - continue - - config = self.layer_config[name] - - # Skip layers that are not marked for quantization - if not check_to_quantized(config): - continue - is_quantized = True - config["scale_dtype"] = self.scale_dtype - dtype = config["data_type"] - - # Determine quantization function key with symmetry/asymmetry - if dtype not in QUANT_FUNC_WITH_DTYPE: - dtype = f"{dtype}_{'sym' if config['sym'] else 'asym'}" - - # Optionally use optimized rounding (RTN) variant - if not self.disable_opt_rtn and f"rtn_{dtype}" in QUANT_FUNC_WITH_DTYPE: - dtype = f"rtn_{dtype}" - - quant_func = QUANT_FUNC_WITH_DTYPE[dtype] - - # Attempt quantization on GPU, fall back to CPU if OOM - try: - weight, scale, zp = quant_func( - module.weight.to(self.device), - **{k: config[k] for k in ["bits", "group_size", "super_bits", "super_group_size", "scale_dtype"]}, - ) - except RuntimeError as e: - cuda_error_msg = traceback.format_exc() - try: - logger.error(cuda_error_msg) - logger.warning("falling back to CPU") - weight, scale, zp = quant_func( - module.weight.to("cpu"), - **{ - k: config[k] - for k in ["bits", "group_size", "super_bits", "super_group_size", "scale_dtype"] - }, - ) - except Exception as e: - raise - - # Overwrite the module's weights with the quantized version - module.weight.data.copy_(weight.cpu()) - - # Attach scale and zero point (zp) to the module - for param_name, value in zip(["scale", "zp"], [scale, zp]): - if isinstance(value, dict): - for k, v in value.items(): - setattr(module, k if k == "scale" else f"w_{k}", v.cpu()) - elif isinstance(value, torch.Tensor): - setattr(module, param_name, value.cpu()) - else: - setattr(module, param_name, value) - - # Update config - self.layer_config.setdefault(name, {}).update(config) - - # Release memory - clear_memory() - - return is_quantized - - def _quant_rtn_with_imatrix(self, all_to_quantized_module_names: list[str]) -> None: - """Performs RTN quantization using input activation statistics (imatrix). - - This method accumulates per-channel second-moment activation statistics (imatrix) - via forward hooks and uses them to perform RTN quantization. If CUDA memory runs out, - it falls back to CPU-based blockwise quantization. - - Args: - all_to_quantized_module_names (list[str]): - A list of module names (e.g., 'model.layers.0.self_attn.q_proj') to be quantized. - - Returns: - None - """ - logger.info("start to compute imatrix for GGUF quantization") - - # Load dataset - from auto_round.calib_dataset import get_dataloader - - if _is_fp8_model(self.model): - convert_fp8_model_to_16b_model(self.model, self.amp_dtype) - - if isinstance(self.dataset, str): - if self.tokenizer is None: - raise ValueError("A tokenizer must be set for the model when using a dataset string.") - dataset_name = self.dataset.replace(" ", "") - self.dataloader = get_dataloader( - self.tokenizer, self.seqlen, dataset_name, self.seed, self.batch_size, self.nsamples - ) - else: - self.dataloader = self.dataset - - model = self.model - - # Dispatch multi-GPU model if necessary - if hasattr(model, "hf_device_map") and len(model.hf_device_map) > 1: - dispatch_model(model, model.hf_device_map) - - def register_act_hook(model): - """Registers hooks to accumulate activation squared norms into `imatrix`.""" - - def get_imatrix_hook(module, input, output): - input = input[0] if isinstance(input, (tuple, list)) else input - flattened = input.reshape(-1, input.shape[-1]).to(torch.float32) - squared = torch.sum(torch.pow(flattened, 2), dim=0).to(torch.float32) - - if not hasattr(module, "imatrix"): - module.imatrix = squared - module.imatrix_cnt = input.shape[0] - else: - module.imatrix += squared.to(module.imatrix.device) - module.imatrix_cnt += input.shape[0] - - hook_handles = [] - for name, module in model.named_modules(): - if isinstance(module, self.supported_types) and check_to_quantized(module): - hook = module.register_forward_hook(get_imatrix_hook) - hook_handles.append(hook) - return hook_handles - - hooks = register_act_hook(model) - - try: - # Move model to target device - if hasattr(self.model, "hf_device_map") and len(self.model.hf_device_map) > 1: - dispatch_model(self.model, self.model.hf_device_map) - else: - model = model.to(self.device) - cnt = 0 - - # Run forward pass to accumulate imatrix - for data in self.dataloader: - cnt += data["input_ids"].shape[0] - data = to_device(data, self.device) - model(**data) - if cnt >= self.nsamples: - break - - # Remove hooks after data collection - for hook in hooks: - hook.remove() - - # Normalize imatrix by count - for _, module in model.named_modules(): - if hasattr(module, "imatrix"): - module.imatrix /= module.imatrix_cnt - if hasattr(model, "hf_device_map") and len(model.hf_device_map) > 1: - import accelerate - - accelerate.hooks.remove_hook_from_submodules(model) - # Perform quantization using RTN - pbar = tqdm(all_to_quantized_module_names) - block_names_cnt = len(flatten_list(get_block_names(self.model, True))) - clear_mem_freq = len(all_to_quantized_module_names) // block_names_cnt - if clear_mem_freq == 0: - clear_mem_freq = 1 - cnt = 1 - for name in pbar: - pbar.set_description(f"Quantizing {name}") - self._quantize_layer_via_rtn(name) - if cnt % clear_mem_freq == 0: - clear_memory() - cnt = 1 - cnt += 1 - except RuntimeError as e: - try: - if hasattr(model, "hf_device_map") and len(model.hf_device_map) > 1: - import accelerate - - accelerate.hooks.remove_hook_from_submodules(model) - # Fallback: out-of-memory → try CPU blockwise quantization - logger.warning("Out of VRAM, falling back to blockwise quantization. Accuracy may degrade.") - model = model.to("cpu") - clear_memory() - self._quantize_via_rtn_blockwise(all_to_quantized_module_names) - except RuntimeError as e: - cuda_error_msg = traceback.format_exc() - try: - logger.error(cuda_error_msg) - # Final fallback: warn and use CPU-only quantization - logger.warning( - "Fallback to CPU. " - "Consider enabling `low_gpu_mem_usage` or using more GPUs via `--device 0,1,2,3`." - ) - model = model.to("cpu") - clear_memory() - if hasattr(model, "hf_device_map") and len(model.hf_device_map) > 1: - import accelerate - - accelerate.hooks.remove_hook_from_submodules(model) - - orig_device = self.device - self.device = "cpu" - self._quantize_via_rtn_blockwise(all_to_quantized_module_names) - self.device = orig_device - except Exception as e: - raise - finally: - # Always remove hooks - for hook in hooks: - hook.remove() - def _check_need_to_quantize_lm_head_embedding(self) -> bool: """Checks if LM head and embedding layers need quantization for GGUF format. @@ -1408,301 +1356,6 @@ def _apply_config_to_layer( setattr(get_module(self.model, layer_name), "act_bits", act_bits) setattr(get_module(self.model, layer_name), "scale_dtype", scale_dtype) - def _quantize_layer_via_rtn(self, name: str) -> None: - """Quantizes a layer using RTN (Round-To-Nearest) if available. - - This function attempts to quantize a layer by switching its data type to a - `rtn_*` version if supported, then wraps and unwraps the module to apply - quantization. If GPU memory is insufficient, it falls back to CPU. - - If packing is enabled (`is_packing_immediate`), the function will also export - the quantized layer to the appropriate backend format. - - Args: - name (str): Name of the layer to quantize. - - Raises: - RuntimeError: If quantization fails for reasons unrelated to memory. - """ - m = get_module(self.model, name) - - # if m.__class__.__name__ == "FP8Linear": - if _is_fp8_linear(m): - m = convert_fp8_layer_to_linear(m, self.amp_dtype) - set_module(self.model, name, m) - - # Step 1: Use optimized RTN data type if available - if not self.disable_opt_rtn and not m.data_type.startswith("rtn_"): - from auto_round.data_type import QUANT_FUNC_WITH_DTYPE - - rtn_dtype = "rtn_" + m.data_type - if rtn_dtype in QUANT_FUNC_WITH_DTYPE: - m.data_type = rtn_dtype - self.layer_config[name]["data_type"] = m.data_type - - # Step 2: Try quantization on GPU first, fall back to CPU if OOM - # if only export gguf, using gguf-packing instead of rtn - if self.is_packing_immediate and self.iters == 0 and "gguf" in self.formats[0] and not self.disable_opt_rtn: - m.scale = None - m.zp = None - else: - try: - m.to(self.device) - m = WrapperLinear( - m, - enable_minmax_tuning=False, - enable_norm_bias_tuning=False, - enable_round_tuning=False, - ) - m = m.unwrapper({}) - m.to("cpu") - except RuntimeError as e: - cuda_error_msg = traceback.format_exc() - m = m.orig_layer if hasattr(m, "orig_layer") else m - try: - logger.error(cuda_error_msg) - logger.warning("falling back to CPU.") - m.to("cpu") - m = WrapperLinear( - m, - enable_minmax_tuning=False, - enable_norm_bias_tuning=False, - enable_round_tuning=False, - ) - m = m.unwrapper({}) - except Exception as e: - raise - - # Step 3: Optional immediate packing/export - if self.is_packing_immediate: - from auto_round.export import PACKING_LAYER_WITH_FORMAT - - if check_to_quantized(m): - target_backend = self.formats[0].split(":")[0] if ":" in self.formats[0] else self.formats[0] - has_gguf = any("gguf" in fmt for fmt in self.formats) - - if has_gguf: - from auto_round.export.export_to_gguf.export import pack_gguf_layer - - output_dir = self._get_save_folder_name(self.formats[0]) - model_type = ModelType.MMPROJ if self.mllm else ModelType.TEXT - pack_gguf_layer( - name, - self.model, - self.formats[0], - output_dir, - self.layer_config, - self.tokenizer, - processor=self.processor if hasattr(self, "processor") else None, - image_processor=self.image_processor if hasattr(self, "image_processor") else None, - model_type=model_type, - ) - else: - PACKING_LAYER_WITH_FORMAT[target_backend](name, self.model, self.formats[0], device=self.device) - - # if self.low_gpu_mem_usage: - # clear_memory() - else: - set_module(self.model, name, m) - - @torch.inference_mode() - def _quantize_rtn(self) -> tuple[torch.nn.Module, dict[str, Any]]: - """Quantize all modules in the model using RTN (Round-To-Nearest) strategy. - - If the target format includes GGUF with `k`, and optimized RTN is enabled, - blockwise quantization with input caching and imatrix is used. - - Returns: - tuple[nn.Module, Dict[str, Any]]: The quantized model and the layer configuration. - """ - if self.amp and self.model.dtype != self.amp_dtype: - self.model.to(self.amp_dtype) - - all_to_quantized_module_names: list[str] = [n for n, m in self.model.named_modules() if check_to_quantized(m)] - - if is_nv_fp(self.data_type): - from auto_round.data_type.nvfp import calculate_gparam - from auto_round.data_type.utils import update_fused_layer_global_scales - - pbar = tqdm(all_to_quantized_module_names) - for name in pbar: - pbar.set_description(f"Calculate weight global scale: {name}") - m = get_module(self.model, name) - weight_global_scale = calculate_gparam(m.weight, self.group_size) - setattr(m, "weight_global_scale", weight_global_scale) - - modules = list(self.model.modules()) - for module in tqdm(modules, desc="Update weight global scale for fuse module"): - update_fused_layer_global_scales(module) - - has_gguf_k = any("gguf" in fmt and "k" in fmt for fmt in getattr(self, "formats", [])) - - self._quantize_embedding_layer() - - self.model.to("cpu") - if has_gguf_k and not self.disable_opt_rtn: - self._quant_rtn_with_imatrix(all_to_quantized_module_names) - elif self.act_bits <= 8 and check_need_act_calibration( - self.act_dynamic, self.act_data_type, self.act_bits - ): # TODO, mixed datatype has bug - hook_handles = self._register_act_max_hook(self.model) - try: - self._quantize_via_rtn_blockwise(all_to_quantized_module_names) - except RuntimeError as e: - logger.warning("Fallback to CPU. Consider using more GPUs via `--device 0,1,2,3`.") - self.model = self.model.to("cpu") - clear_memory() - if hasattr(self.model, "hf_device_map") and len(self.model.hf_device_map) > 1: - import accelerate - - accelerate.hooks.remove_hook_from_submodules(self.model) - orig_device = self.device - self.device = "cpu" - self._quantize_via_rtn_blockwise(all_to_quantized_module_names) - self.device = orig_device - for handle in hook_handles: - handle.remove() - else: - block_names_cnt = len(flatten_list(get_block_names(self.model, True))) - clear_mem_freq = len(all_to_quantized_module_names) // block_names_cnt - if clear_mem_freq == 0: - clear_mem_freq = 1 - pbar = tqdm(all_to_quantized_module_names) - cnt = 1 - for name in pbar: - pbar.set_description(f"Quantizing {name}") - self._quantize_layer_via_rtn(name) - if cnt % clear_mem_freq == 0: - clear_memory() - cnt = 1 - cnt += 1 - # Convert remaining fp8 - if _is_fp8_model(self.model): - convert_fp8_model_to_16b_model(self.model, self.amp_dtype) - self.quantized = True - return self.model, self.layer_config - - def _quantize_via_rtn_blockwise(self, all_to_quantized_module_names: list[str]) -> None: - """Quantize model layers block by block using cached inputs and imatrix. - - Args: - all_to_quantized_module_names (list[str]): Names of layers to be quantized. - """ - all_to_quantized_module_names = list(set(all_to_quantized_module_names)) - - all_blocks = self.quant_block_list if self.quant_block_list else get_block_names(self.model) - if not all_blocks: - raise ValueError("Could not find any blocks. Check the model or quant_block_list.") - - all_first_block_names = [block[0] for block in all_blocks] - if self.act_bits < 16 and not self.act_dynamic: - layer_names = self._get_quantized_layer_names_outside_blocks() - if len(layer_names) > 0: - logger.warning( - "quantize layers outside blocks for static activation quantizaiton" - " will significantly increase calibration time" - ) - all_inputs = self.try_cache_inter_data_gpucpu(all_first_block_names, self.nsamples, layer_names) - else: - all_inputs = self.cache_inter_data(all_first_block_names, self.nsamples) - - # Clear hooks for multi-GPU setups - if hasattr(self.model, "hf_device_map") and len(self.model.hf_device_map) > 1: - accelerate.hooks.remove_hook_from_submodules(self.model) - - pbar = tqdm(range(sum(len(block) for block in all_blocks))) - - for block_names in all_blocks: - first_block = block_names[0] - inputs = all_inputs.pop(first_block) - input_keys = [k for k in inputs if k.startswith("hidden_state")] - if len(input_keys) != 1: - raise RuntimeError( - "hidden_states arg mismatch. Please file an issue at https://github.com/intel/auto-round/issues" - ) - inputs["input_ids"] = inputs.pop(input_keys[0]) - - clear_memory(self.inputs) - - total_samples = len(inputs["input_ids"]) - if total_samples < self.batch_size: - self.batch_size = total_samples - logger.warning(f"Forcing batch size to {total_samples}") - - input_ids = to_device(inputs.pop("input_ids"), self.cache_device) - input_others = to_device(inputs, self.cache_device) - - tmp_dtype = self.amp_dtype if self.amp else torch.float32 - input_ids = [id_.to(tmp_dtype) for id_ in input_ids] - - for key, val in input_others.items(): - if isinstance(val, torch.Tensor) and val.dtype in (torch.float16, torch.bfloat16): - input_others[key] = val.to(tmp_dtype) - elif isinstance(val, list): - input_others[key] = [to_dtype(v, tmp_dtype) for v in val] - - for block_name in block_names: - pbar.set_description(f"Quantizing {block_name}") - block = get_module(self.model, block_name) - block = block.to(self.device) - if _is_fp8_model(self.model): - convert_fp8_model_to_16b_model(block, dtype=self.amp_dtype) - - if self.device_map == "auto": - self._set_auto_device_map_in_block(block, input_ids) - - # Dispatch model if needed - if self.device_map is not None: - from accelerate.hooks import AlignDevicesHook, add_hook_to_module - - for _, m in block.named_modules(): - if len(list(m.children())) != 0 or not hasattr(m, "tuning_device"): - continue - hook = AlignDevicesHook(m.tuning_device, io_same_device=True) - add_hook_to_module(m, hook, True) - else: - block = block.to(self.device) - input_ids = self._get_block_outputs( - block, - input_ids, - input_others, - self.batch_size * self.infer_bs_coeff, - self.device, - self.cache_device, - ) - if self.device_map is not None: - accelerate.hooks.remove_hook_from_submodules(block) - - if ( - is_nv_fp(self.act_data_type) and any("nv_fp" in format_ for format_ in self.formats) - ) or is_static_wfp8afp8(self): - # enable moe experts act_max automatic generation for Linear - set_amax_for_all_moe_layers(block, attr_name="act_max") - # Normalize imatrix and quantize layers - for _, m in block.named_modules(): - if hasattr(m, "imatrix"): - m.imatrix /= m.imatrix_cnt - if hasattr(m, "tmp_name") and m.tmp_name in all_to_quantized_module_names: - self._quantize_layer_via_rtn(m.tmp_name) - all_to_quantized_module_names.remove(m.tmp_name) - - mv_module_from_gpu(block, self.low_cpu_mem_usage) - pbar.update(1) - - pbar.close() - cnt = 1 - block_names_cnt = len(flatten_list(get_block_names(self.model, True))) - clear_mem_freq = len(all_to_quantized_module_names) // block_names_cnt - if clear_mem_freq == 0: - clear_mem_freq = 1 - # Process remaining layers not in blocks - for name in all_to_quantized_module_names: - self._quantize_layer_via_rtn(name) - if cnt % clear_mem_freq == 0: - clear_memory() - cnt = 1 - cnt += 1 - def quantize(self) -> tuple[torch.nn.Module, dict[str, Any]]: """Quantize the model and return the quantized model along with layer configurations.The entry of AutoRound. Returns: @@ -1748,1340 +1401,17 @@ def quantize(self) -> tuple[torch.nn.Module, dict[str, Any]]: and self.inplace ): self.is_packing_immediate = True - if self.iters == 0: - return self._quantize_rtn() - - if bool(self.quant_block_list): - all_blocks = self.quant_block_list - else: - all_blocks = get_block_names(self.model) - if len(all_blocks) == 0: - logger.warning("could not find blocks, exit with original model") - return self.model, self.layer_config + return self.quantizer.quantize() - if self.amp and self.model.dtype != self.amp_dtype: - self.model = self.model.to(self.amp_dtype) + @property + def scaler(self): + return self._get_scaler() - layer_names = self._get_quantized_layer_names_outside_blocks() - self.start_time = time.time() - all_first_block_names = [block[0] for block in all_blocks] - if len(layer_names) > 0: - logger.info( - "Starting to cache block inputs. This may be slow due to external block layers: %s", layer_names - ) - else: - logger.info("start to cache block inputs") - all_inputs = self.try_cache_inter_data_gpucpu(all_first_block_names, self.nsamples, layer_names=layer_names) - is_quantized_embedding = self._quantize_embedding_layer() - all_q_inputs = None - if is_quantized_embedding: - all_inputs = copy.deepcopy(self.inputs) - clear_memory(self.inputs) - all_q_inputs = self.try_cache_inter_data_gpucpu( - all_first_block_names, self.nsamples, layer_names=layer_names - ) - self.model = mv_module_from_gpu(self.model, self.low_cpu_mem_usage) - clear_memory() - if hasattr(self.model, "hf_device_map") and len(self.model.hf_device_map) > 1: - accelerate.hooks.remove_hook_from_submodules(self.model) # self.model.hf_device_map has not been changed - self.model = mv_module_from_gpu(self.model, self.low_cpu_mem_usage) - logger.info("caching done") - if len(all_blocks) > 1: - pbar = tqdm(range(0, sum([len(i) for i in all_blocks]), self.nblocks)) - else: - pbar = None # move the alg warning outside pbar - - for block_names in all_blocks: - inputs = all_inputs[block_names[0]] - all_inputs.pop(block_names[0]) - q_inputs = None - if all_q_inputs is not None: - q_inputs = all_q_inputs[block_names[0]] - all_q_inputs.pop(block_names[0]) - keys = inputs.keys() - input_id_str = [key for key in keys if key.startswith("hidden_state")] - if len(input_id_str) != 1: - raise RuntimeError( - "hidden_states arg mismatch error," - "please raise an issue in https://github.com/intel/auto-round/issues" - ) - inputs["input_ids"] = inputs.pop(input_id_str[0], None) - if q_inputs is not None: - q_inputs["input_ids"] = q_inputs.pop(input_id_str[0], None) - - clear_memory(self.inputs) - - if "input_ids" in inputs.keys(): - total_samples = len(inputs["input_ids"]) - if total_samples < self.batch_size: - self.batch_size = total_samples - logger.warning(f"force the train batch size to {total_samples}") - - self._quantize_blocks( - self.model, - inputs, - block_names, - q_input=q_inputs["input_ids"] if q_inputs is not None else None, - nblocks=self.nblocks, - device=self.device, - pbar=pbar, - ) - if self.is_packing_immediate and len(self.formats) != 1: - raise ValueError( - f"Expected exactly one packing format when 'is_packing_immediate' is True, " - f"but got {len(self.formats)} formats." - ) - - self._quantize_layers(layer_names, all_inputs) ##TODO pack layer immediately - - if _is_fp8_model(self.model): - for n, m in self.model.named_modules(): - if _is_fp8_linear(m): - new_layer = convert_fp8_layer_to_linear(m, self.amp_dtype).to("cpu") - set_module(self.model, n, new_layer) - - end_time = time.time() - cost_time = end_time - self.start_time - logger.info(f"quantization tuning time {cost_time}") - - ## dump a summary - quantized_layers = [] - unquantized_layers = [] - for n, m in self.model.named_modules(): - if isinstance(m, tuple(self.supported_types)): - if check_to_quantized(m): - quantized_layers.append(n) - else: - unquantized_layers.append(n) - elif hasattr(m, "scales") or hasattr(m, "scale"): ##packing_immediately - quantized_layers.append(n) - summary_info = ( - f"Summary: quantized {len(quantized_layers)}/{len(quantized_layers) + len(unquantized_layers)} in the model" - ) - if len(unquantized_layers) > 0: - summary_info += f", {unquantized_layers} have not been quantized" - logger.info(summary_info) - - self.quantized = True - return self.model, self.layer_config - - def _quantize_layers(self, layer_names: list, layer_inputs: dict) -> None: - """Quantizes specified layers based on inputs and configuration. - - Args: - layer_names (list): list of layer names to quantize. - layer_inputs (dict): Dictionary mapping layer names to input data. - - Returns: - None - """ - ##TODO currently we take all the layers outside blocks as post block layers which is not optimal - ## if there is no input for layer, we use rtn - - for layer_name in copy.deepcopy(layer_names): - if layer_name not in layer_inputs: - logger.info(f"using rtn to quantize {layer_name}") - from auto_round.data_type import QUANT_FUNC_WITH_DTYPE - - layer = get_module(self.model, layer_name) - if _is_fp8_model(self.model): - new_layer = convert_fp8_layer_to_linear(layer, self.amp_dtype).to(self.device) - set_module(self.model, layer_name, new_layer) - layer = new_layer - - if not self.disable_opt_rtn and "rtn_" + layer.data_type in QUANT_FUNC_WITH_DTYPE: - layer.data_type = "rtn_" + layer.data_type - logger.info("using optimized rtn method for quantizing %s", layer_name) - self.layer_config[layer_name]["data_type"] = layer.data_type - wrapper_layer = WrapperLinear( - layer, - enable_round_tuning=False, - enable_minmax_tuning=False, - enable_norm_bias_tuning=False, - device=self.device, - ) - new_layer = wrapper_layer.unwrapper({}) - set_module(self.model, layer_name, new_layer) - layer.cpu() - layer_names.remove(layer_name) - if len(layer_names) == 0: - return - q_layer_inputs = None - enable_quanted_input = self.enable_quanted_input - has_gguf = False - if hasattr(self, "formats"): - has_gguf = any("gguf" in format_ for format_ in self.formats) - if has_gguf and self.is_packing_immediate: - enable_quanted_input = False - - if hasattr(self.model, "hf_device_map") and len(self.model.hf_device_map) > 1 and enable_quanted_input: - dispatch_model(self.model, self.model.hf_device_map) - - if enable_quanted_input: - logger.info("starting to cache layer inputs for %s, this may be quite slow ", layer_names) - q_layer_inputs = self.try_cache_inter_data_gpucpu([], self.nsamples, layer_names=layer_names) - if hasattr(self.model, "hf_device_map") and len(self.model.hf_device_map) > 1: - accelerate.hooks.remove_hook_from_submodules( - self.model - ) ##self.model.hf_device_map has not been changed - - self.model = mv_module_from_gpu(self.model, self.low_cpu_mem_usage) - clear_memory() - if self.enable_torch_compile: - quant_layer = compile_func(self._quantize_layer, self.device) - else: - quant_layer = self._quantize_layer - for layer_name in layer_names: - layer_input = layer_inputs[layer_name] - layer_input = to_device(layer_input, self.cache_device) - q_layer_input = q_layer_inputs.get(layer_name, None) if q_layer_inputs is not None else None - q_layer_input = to_device(q_layer_input, self.cache_device) - quant_layer(layer_name, layer_input, q_layer_input, device=self.device) - del layer_input - clear_memory(q_layer_input) - - def _set_layerwise_config(self, layer_config: dict) -> bool: - """ - Sets the layer-wise configuration based on the provided `layer_config`. - By default, only quantize layers in blocks. - - Args: - layer_config (dict): The configuration dictionary for each layer containing various configuration options. - - Returns: - bool: Returns True if there are quantized layers outside the blocks (e.g., lm-head), - otherwise returns False. - """ - # Get the names of layers in quantization blocks - supported_types = self.supported_types - layers_in_blocks = get_layer_names_in_block( - self.model, supported_types, self.quant_block_list, self.inner_supported_types - ) - ##process regex in layer_config - all_supported_layer_names = [] - # List of configuration keys - keys = get_quant_keys() - - for n, m in self.model.named_modules(): - # Delete previous configuration to avoid conflicts with prior tuning - for key in keys: - if hasattr(m, key): - delattr(m, key) - - if not isinstance(m, supported_types) and m.__class__.__name__ not in self.inner_supported_types: - continue - all_supported_layer_names.append(n) - - names_in_layer_config = list(layer_config.keys()) - for name in names_in_layer_config: - if name in all_supported_layer_names: - continue - matched_names = [] - for layer_name in all_supported_layer_names: - if re.search(re.compile(name), layer_name) is not None: - matched_names.append(layer_name) - if len(matched_names) > 0: - val = layer_config[name] - layer_config.pop(name) - for match_name in matched_names: - layer_config[match_name] = val - else: - tmp_m = get_module(self.model, name) - if not isinstance(tmp_m, torch.nn.Embedding): # TODO not good code style - raise ValueError(f"key {name} in layer_config is invalid, please have a double check") - - has_qlayer_outside_block = False # Flag to track if there are quantized layers outside blocks (e.g., lm-head) - - # Iterate through all modules in the model - is_gguf = hasattr(self, "formats") and any("gguf" in format_ for format_ in self.formats) - for n, m in self.model.named_modules(): - # Skip unsupported types - if not isinstance(m, supported_types) and m.__class__.__name__ not in self.inner_supported_types: - if n in self.layer_config: - if not isinstance(m, torch.nn.Embedding): - logger.warning(f"{n} is not supported, layer_config {n}: {layer_config[n]} will be ignored.") - self.layer_config.pop(n) - continue - if not is_gguf: - if not check_to_quantized(layer_config[n]): - self.layer_config.pop(n) - continue - else: - continue - - # If the layer is not in the config and is part of a quantization block, use default configuration - if n not in layer_config.keys() and n in layers_in_blocks: - layer_config[n] = {} - for key in keys: - layer_config[n][key] = getattr(self, key) - - # If the layer is partially configured, fill in missing values - elif n in layer_config.keys(): - if "data_type" in layer_config[n] and "bits" not in layer_config[n]: - tmp_bits = infer_bits_by_data_type(layer_config[n]["data_type"]) - if tmp_bits is not None and tmp_bits != self.bits: - logger.warning( - f"'data_type' do not match the specified 'bits' setting for {n}." - f" Resetting 'bits' to {tmp_bits}." - ) - layer_config[n]["bits"] = tmp_bits - if "act_data_type" in layer_config[n] and "act_bits" not in layer_config[n]: - tmp_bits = infer_bits_by_data_type(layer_config[n]["act_data_type"]) - if tmp_bits is not None and tmp_bits != self.act_bits: - logger.warning( - f"'act_data_type' do not match the specified 'act_bits' setting for {n}." - f" Resetting 'act_bits' to {tmp_bits}." - ) - layer_config[n]["act_bits"] = tmp_bits - - for key in keys: - if key not in layer_config[n].keys(): - layer_config[n][key] = getattr(self, key) - layer_config[n]["fixed_by_user"] = True - - # If the layer is not in the config and not part of a quantization block, - # use default configuration and set specific values - else: - layer_config[n] = {} - for key in keys: - layer_config[n][key] = getattr(self, key) - layer_config[n]["bits"] = 16 - layer_config[n]["act_bits"] = 16 - - if n in layers_in_blocks: - layer_config[n]["in_blocks"] = True - else: - layer_config[n]["in_blocks"] = False - - # If the layer is outside a block and requires quantization, mark it as a quantized layer outside the block - if ( - n not in layers_in_blocks - and check_to_quantized(layer_config[n]) - and not isinstance(m, torch.nn.Embedding) - ): - has_qlayer_outside_block = True - - in_features, out_features = get_layer_features(m) - if in_features <= layer_config[n]["group_size"]: - layer_config[n]["group_size"] = -1 - - # Apply the configuration to the corresponding layer in the model - for key in keys: - setattr(m, key, layer_config[n][key]) - need_to_quantize_lm_head = self._check_need_to_quantize_lm_head_embedding() - if need_to_quantize_lm_head: - has_qlayer_outside_block = True - - # Return whether there are quantized layers outside the blocks - return has_qlayer_outside_block - - @torch.no_grad() - def _get_block_outputs( - self, - block: torch.nn.Module, - input_ids: torch.Tensor, - input_others: torch.Tensor, - bs: int, - device: Union[str, torch.device], - cache_device: Union[str, torch.device], - save_output: bool = True, - ): - """Compute the output of a given block of the model for a given input. - - Args: - block: The block of the model. - input_ids: The input tensor containing tokenized input ids. - input_others: A dictionary containing additional input data. - bs: The batch size for computing the output. - device: The device for computation. - cache_device: The device for storing the output. - batch_dim: The batch dimension of the output tensor. - - Returns: - The output tensor of the block. - """ - - output = [] - nsamples = len(input_ids) - for i in range(0, nsamples, bs): - end_index = min(nsamples, i + bs) - indices = torch.arange(i, end_index).to(torch.long) - tmp_input_ids, tmp_input_others = self._sampling_inputs( - input_ids, input_others, indices, self.seqlen, self.batch_dim, share_cache_keys=self.shared_cache_keys - ) - tmp_output = block_forward(block, tmp_input_ids, tmp_input_others, self.amp, self.amp_dtype, device).to( - cache_device - ) - if save_output: - if self.batch_size == 1: - output.append(tmp_output) - else: - output.extend(list(torch.split(tmp_output, 1, dim=self.batch_dim))) - if self.low_gpu_mem_usage: - clear_memory() - - return output - - @torch.no_grad() - def calib(self, nsamples, bs): - """Perform calibration for quantization. - - This method calibrates the model for quantization by processing a specified - number of samples from the calibration dataset. It ensures that the data is - properly formatted and feeds it to the model. If the number of samples processed - is less than the specified number, it logs a warning. If no samples are processed, - it logs an error and exits. - Args: - nsamples (int): The number of samples to use for calibration. - bs (int): The number of samples to use for calibration - """ - from auto_round.calib_dataset import get_dataloader - - if isinstance(self.dataset, str): - dataset = self.dataset.replace(" ", "") ##remove all whitespaces - - # slow here - self.dataloader = get_dataloader( - self.tokenizer, - self.seqlen, - dataset, - self.seed, - bs, - self.nsamples, - ) - else: - self.dataloader = self.dataset - total_cnt = 0 - - # load embed weight if use low_cpu_mem_usage - if self.low_cpu_mem_usage: - embed_layers = get_layers_before_block(self.model) - for n, m in embed_layers: - m = m.to(self.device) - - for data in self.dataloader: - if data is None: - continue - if isinstance(data, torch.Tensor): - input_ids = data.to(self.model.device) - data_new = input_ids - elif isinstance(data, str): - if self.tokenizer is None: - logger.error("please provide tokenizer for string input") - exit(-1) - data = self.tokenizer(data, truncation=True, max_length=self.seqlen, return_tensors="pt").data - data_new = {} - for key in data.keys(): - data_new[key] = data[key].to(self.model.device) - input_ids = data_new["input_ids"] - elif isinstance(data, tuple) or isinstance(data, list): - data_new = to_device(data) - input_ids = data_new[0] - else: - data_new = {} - for key in data.keys(): - data_new[key] = to_device(data[key], self.model.device) - if key == "images": - data_new[key] = to_dtype(data_new[key], self.model.dtype) - input_ids = data_new["input_ids"] - if input_ids.shape[-1] < self.seqlen: - continue - try: - if isinstance(data_new, torch.Tensor): - self.model(data_new, use_cache=False) - elif isinstance(data_new, tuple) or isinstance(data_new, list): - self.model(*data_new, use_cache=False) - else: - self.model(**data_new, use_cache=False) - except NotImplementedError: - pass - except RuntimeError as error: - error_msg = str(error) - if "The expanded size of the tensor" in str(error_msg) and "must match the existing size" in error_msg: - check_seqlen_compatible(self.seqlen, self.tokenizer, self.model) - logger.warning( - "When quantization encounters tensor shape mismatch error, " - "you can try to avoid it with batch_size=1" - ) - raise error - except Exception as error: - raise error - total_cnt += input_ids.shape[0] if len(input_ids.shape) > 1 else 1 - if total_cnt >= nsamples: - break - if total_cnt == 0: - logger.error( - f"no data has been cached, please provide more data with sequence length >={self.seqlen} in the " - f"dataset or decease the sequence length" - ) - exit(-1) - elif total_cnt < nsamples: - logger.warning( - f"An insufficient number of samples likely reduces the accuracy of the quantized model. " - f"Target samples count is {nsamples}, while valid samples count is {total_cnt}" - ) - - # clean embed weight to save memory - if self.low_cpu_mem_usage: - for n, m in embed_layers: - m = m.to("meta") - - @torch.no_grad() - def try_cache_inter_data_gpucpu(self, block_names, nsamples, layer_names=None, last_cache_name=None): - """Attempts to cache intermediate data on GPU, if failed, then using CPU. - - Args: - block_names (list): List of block names to cache data for. - nsamples (int): Number of samples to use for caching. - layer_names (list, optional): List of layer names to cache data for. Defaults to []. - last_cache_name (str, optional): Name of the last cache. Defaults to None. - - Returns: - all_inputs: Cached intermediate data. - - Raises: - Exception: If caching on GPU fails, switches to CPU and caches there. - """ - if _is_fp8_model(self.model): - layer_names = [] - if layer_names is None: - layer_names = [] - - if self.low_gpu_mem_usage or ( - len(block_names) == 1 - and len(layer_names) == 0 - and not self.has_qlayer_outside_block - and (last_cache_name is None or last_cache_name in block_names) - ): - # low_gpu_mem_usage or calibrate only the embedding layer, which is also very fast on CPU - all_inputs = self.cache_inter_data(block_names, nsamples, layer_names=[], last_cache_name=last_cache_name) - else: - try: - if not self.model.device.type == "meta": - if hasattr(self.model, "hf_device_map") and len(self.model.hf_device_map) > 1: - self.model = dispatch_model(self.model, device_map=self.model.hf_device_map) - else: - # Change this if new device is supported - if str(self.model.device) == "cpu" and ( - self.device.startswith("xpu") or self.device.startswith("cuda") - ): - max_memory = get_max_vram() # TODO model is not evenly split - no_split_modules = getattr(self.model, "_no_split_modules", []) - device_map = infer_auto_device_map( - self.model, max_memory=max_memory, no_split_module_classes=no_split_modules - ) - - self.model = dispatch_model(self.model, device_map=device_map) - else: - self.model = self.model.to(self.device) - - all_inputs = self.cache_inter_data( - block_names, nsamples, layer_names=layer_names, last_cache_name=last_cache_name - ) - if hasattr(self.model, "hf_device_map") and len(self.model.hf_device_map) > 1: - accelerate.hooks.remove_hook_from_submodules(self.model) - - except RuntimeError as e: - cuda_error_msg = traceback.format_exc() - try: - logger.info("switch to cpu to cache block inputs") - if self.has_qlayer_outside_block or self.__class__.__name__ == "AutoRoundMLLM": - logger.warning( - "we recommend using more GPUs in calibration." - " Otherwise, some layers may fall back to `rtn` mode, which can affect accuracy." - ) - if hasattr(self.model, "hf_device_map") and len(self.model.hf_device_map) > 1: - accelerate.hooks.remove_hook_from_submodules( - self.model - ) ##self.model.hf_device_map has not been changed - self.model = mv_module_from_gpu(self.model, self.low_cpu_mem_usage) - clear_memory() - ## Important change after v0.51, on cpu, we use rtn mode for layers in layer_names - all_inputs = self.cache_inter_data( - block_names, nsamples, layer_names=[], last_cache_name=last_cache_name - ) - except Exception as e: - logger.error(cuda_error_msg) - raise - return all_inputs - - @torch.no_grad() - def cache_inter_data(self, block_names, nsamples, layer_names=None, last_cache_name=None): - """Save the inputs of block_name for calibration. - - This method temporarily replaces the forward method of the model to capture - the inputs passing through the specified block. It then calibrates the model - using a specified number of samples. Finally, it restores the original forward - method and returns the inputs for the specified block. - Args: - block_names (list): The names of the blocks for which inputs are to be saved. - layer_names (list):The names of the layers for which inputs are to be saved. - nsamples (int): The number of samples to use for calibration. - last_cache_name (str, optional): The name of the last layer to be cached, - we could break the forward in this layer to save time - - Returns: - dict: A dictionary containing the inputs for the specified block. - """ - if layer_names is None: - layer_names = [] - self.inputs = {} - self.to_cached_layers = block_names + layer_names - - tmp_dtype = None # TODO delete this as most model is not fp32 now - ## have bug if block name is not the first block - if (len(block_names) > 1 or len(layer_names) > 0) and self.low_gpu_mem_usage: - tmp_dtype = self.model.dtype - if self.amp: - if self.model.dtype != self.model.dtype: - self.model = self.model.to(torch.bfloat16) - else: - self.model = self.model.to(torch.float32) ##model on cpu - - self.last_cache_name = last_cache_name - if last_cache_name is None and len(block_names) + len(layer_names) == 1: - self.last_cache_name = block_names[0] if len(block_names) == 1 else layer_names[0] - # do not set last_cache_name for multimodal models - calib_bs = self.batch_size - self.hook_handles = [] - self._replace_forward() - self.calib(nsamples, calib_bs) - self._recover_forward() - res = self.inputs - del self.last_cache_name - del self.to_cached_layers - if tmp_dtype is not None: - self.model = self.model.to(tmp_dtype) - - return res - - @torch.no_grad() - def _get_block_forward_func(self, name: str) -> Callable: - """Gets the forward function. - - Args: - name (str): The name of the function. - Returns: - function: The forward function. - """ - - def post_process_cache_data(batch_size, data, data_name): - """ - Processes store data for batch handling, reshaping if necessary. - - Args: - batch_size (int): The size of the batch. - data: The data value to store, potentially for caching. - data_name (str): Name of the data. - - Returns: - Processed data or None - """ - new_data = data - if batch_size <= 1: - return new_data - if data_name in self.shared_cache_keys: - return None - if "alibi" in data_name: - if isinstance(data, torch.Tensor): - alibi = data - alibi = alibi.reshape(batch_size, -1, alibi.shape[1], alibi.shape[2]) - new_data = alibi - return new_data - - def forward(m, hidden_states=None, *positional_inputs, **kwargs): - """Rewrite forward function, process and collect input data. - - Args: - hidden_states (torch.Tensor): The hidden states tensor. - *positional_inputs: Variable number of positional arguments. - **kwargs: Variable number of keyword arguments. - - Returns: - NotImplementedError: Getting the first layer inputs and then raise the error to save runtime. - """ - if name not in self.inputs: - self.inputs[name] = {} - init_cache(positional_inputs, self.inputs[name]) - - if self.batch_dim is None: - self.batch_dim = 0 - if hidden_states is not None and self.batch_size > 1: - if hidden_states.shape[0] > self.batch_size: - self.batch_dim = 1 - if len(hidden_states.shape) > 1 and hidden_states.shape[1] > self.batch_size: - logger.error( - "this model has not been supported, " - "please raise an issue in https://github.com/intel/auto-round/issues" - " or try to set the `batch_size` to 1 and " - "`gradient_accumulate_steps` to your current batch size." - ) - exit(-1) - - if hidden_states is not None: - kwargs["hidden_states"] = hidden_states - - for key in kwargs.keys(): - if ( - isinstance(kwargs[key], torch.Tensor) - or isinstance(kwargs[key], list) - or isinstance(kwargs[key], tuple) - ): - if key not in self.inputs[name].keys(): # initialization - data = to_device(kwargs[key], device=torch.device("cpu")) - if data is None or (self.batch_size > 1 and key in self.shared_cache_keys): - self.inputs[name][key] = data - continue - if self.batch_size <= 1: - self.inputs[name][key] = [data] - else: - data = post_process_cache_data(self.batch_size, data, key) - self.inputs[name][key] = list(torch.split(data, 1, dim=self.batch_dim)) - else: # append cache inputs - new_data = post_process_cache_data(self.batch_size, kwargs[key], key) - if new_data is None: # shareable args or NoneType - continue - new_data = to_device(new_data, device=torch.device("cpu")) - if self.batch_size <= 1: - self.inputs[name][key].append(new_data) - else: - self.inputs[name][key].extend(list(torch.split(new_data, 1, dim=self.batch_dim))) - elif isinstance(kwargs[key], (str, bool, type(None))): - if key not in self.inputs[name].keys(): - self.inputs[name][key] = kwargs[key] - else: - # Parameters not to be cached - if check_skippable_keywords(key): - logger.warning_once( - f"Please note that '{key}' key" " is not currently used in quantization fine-tuning." - ) - reset_params(self.inputs[name]) - if name == self.last_cache_name: - raise NotImplementedError - else: - if hidden_states is not None: - kwargs.pop("hidden_states") - return m.orig_forward(hidden_states, *positional_inputs, **kwargs) - else: - # Currently only for Llama-3.2-Vision-Instruct Series - return m.orig_forward(*positional_inputs, **kwargs) - - return forward - - @torch.no_grad() - def _get_cache_data_hook_for_layer(self, name): - """A forward hook to save input max of a module - :param name: the module name - :return: A hook function.""" - - def cache_input_hook(module, inputs, outputs): - input = inputs - if isinstance(inputs, tuple) or isinstance(input, list): - input = inputs[0] - if name in self.inputs: - self.inputs[name].extend(list(torch.split(input.to("cpu"), 1, dim=0))) - else: - self.inputs[name] = list(torch.split(input.to("cpu"), 1, dim=0)) - - return cache_input_hook - - def _recover_forward(self): - """Recovers the forward function.""" - for n, m in self.model.named_modules(): - if hasattr(m, "orig_forward"): - m.forward = m.orig_forward - delattr(m, "orig_forward") - for hook_handle in self.hook_handles: - hook_handle.remove() - self.hook_handles = [] - - def _replace_forward(self): - """Replaces the forward function.""" - from functools import partial - - for n, m in self.model.named_modules(): - if n in self.to_cached_layers and not isinstance(m, tuple(self.supported_types)): ##block - m.orig_forward = m.forward - m.forward = partial(self._get_block_forward_func(n), m) - elif n in self.to_cached_layers: ##linear layer or conv1d layer - hook_func = self._get_cache_data_hook_for_layer(n) - hook_handle = m.register_forward_hook(hook_func) - self.hook_handles.append(hook_handle) - - def _quantize_layer( - self, layer_name: str, inputs: torch.Tensor, q_inputs: torch.Tensor = None, device: str = "cpu" - ): - """Quantize a specific layer of the model using the provided inputs. - - Args: - layer_name (str): The name of the layer to quantize. - inputs (torch.Tensor): Input data for quantization. - q_inputs (torch.Tensor, optional): Quantized input data. Defaults to None. - device (torch.device, optional): The device to use for quantization. Defaults to torch.device("cpu"). - - Returns: - None - """ - logger.info(f"quantizing layer {layer_name}") - layer = get_module(self.model, layer_name) - if hasattr(layer, "tuning_device"): - device = layer.tuning_device - - layer = layer.to(device) - for i in range(len(inputs)): - inputs[i] = inputs[i].to(layer.weight.dtype) - if q_inputs is not None: - q_inputs[i] = q_inputs[i].to(layer.weight.dtype) - - wrapper_linear = WrapperLinear(layer, enable_minmax_tuning=self.enable_minmax_tuning, device=device).to(device) - round_params = [] - minmax_params = [] - for key in wrapper_linear.params.keys(): - if "min" in key or "max" in key: - minmax_params.append(wrapper_linear.params[key]) - else: - round_params.append(wrapper_linear.value) - if len(round_params) + len(minmax_params) <= 0: - dump_info = f"quantized {layer_name}" - logger.info(dump_info) - with torch.no_grad(): - unwrapper_layer(self.model, wrapper_linear, layer_name, {}) - mv_module_from_gpu(layer, self.low_cpu_mem_usage) - - lr = torch.tensor(self.lr) - minmax_lr = torch.tensor(self.minmax_lr) - if self.enable_minmax_tuning: - optimizer = self.optimizer( - [{"params": round_params}, {"params": minmax_params, "lr": minmax_lr}], lr=lr, weight_decay=0 - ) - else: - optimizer = self.optimizer(round_params, lr=lr, weight_decay=0) - - if self.lr_scheduler is None: - lr_schedule = torch.optim.lr_scheduler.LinearLR( - optimizer, start_factor=1.0, end_factor=0.0, total_iters=self.iters - ) - else: - lr_schedule = copy.deepcopy(self.lr_scheduler) - nsamples = len(inputs) - last_best_iter = 0 - best_loss = torch.finfo(torch.float).max - scaler = self._get_scaler() # pylint: disable=assignment-from-none - init_loss = None - gradient_accumulate_steps = self.batch_size ##Force to low gpu - batch_size = 1 ##Force to low gpu - pick_samples = batch_size * gradient_accumulate_steps - pick_samples = min(nsamples, pick_samples) - if self.sampler != "rand": - whole_indices = torch.randperm(nsamples)[:pick_samples] - total_loss = 0 - num_elm = 1 - mse_reduction = "mean" - if gradient_accumulate_steps != 1: - mse_reduction = "sum" - mse_loss = torch.nn.MSELoss(reduction=mse_reduction).to(device) - - for i in range(self.iters): - total_loss = 0 - if self.sampler == "rand": - whole_indices = torch.randperm(nsamples)[:pick_samples] - if gradient_accumulate_steps != 1: - if q_inputs is not None: - current_input = [q_inputs[i] for i in whole_indices] - else: - current_input = [inputs[i] for i in whole_indices] - num_elm = sum(id.numel() for id in current_input) - for tmp_step in range(gradient_accumulate_steps): - indices = whole_indices[tmp_step * batch_size : (tmp_step + 1) * batch_size] - if q_inputs is not None: - current_input = [q_inputs[i] for i in indices] - current_input = torch.cat(current_input, dim=0).to(device) - org_input = [inputs[i] for i in indices] - org_input = torch.cat(org_input, dim=0).to(device) - else: - current_input = [inputs[i] for i in indices] - current_input = torch.cat(current_input, dim=0).to(device) - org_input = current_input - with torch.no_grad(): - current_output = layer(org_input) - - if self.amp: - with autocast(device_type=device.split(":")[0], dtype=self.amp_dtype): - output_q = wrapper_linear(current_input) # pylint: disable=not-callable - loss = mse_loss(output_q, current_output) # pylint: disable=not-callable - else: - output_q = wrapper_linear(current_input) # pylint: disable=not-callable - loss = mse_loss( # pylint: disable=not-callable - output_q.to(torch.float32), current_output.to(torch.float32) - ) - total_loss += loss.item() / num_elm - - self._scale_loss_and_backward(scaler, loss) - if i == 0: - init_loss = total_loss - - if total_loss < best_loss: - best_loss = total_loss - if not self.not_use_best_mse: - best_params = collect_best_params(wrapper_linear) - last_best_iter = i - if self.not_use_best_mse and i == self.iters - 1: - best_params = collect_best_params(wrapper_linear) - - if not self.not_use_best_mse: - if 0 < self.dynamic_max_gap <= i - last_best_iter: - break - self._step(scaler, optimizer, lr_schedule) - - last_loss = total_loss - best_iter = self.iters - if not self.not_use_best_mse: - last_loss = best_loss - best_iter = last_best_iter - with torch.no_grad(): - unwrapper_layer(self.model, wrapper_linear, layer_name, best_params) - mv_module_from_gpu(layer, self.low_cpu_mem_usage) - dump_info = f"quantized {layer_name}, loss iter 0: {init_loss:.6f} -> iter {best_iter}: {last_loss:.6f}" - logger.info(dump_info) - - def _register_act_max_hook(self, model): - def get_act_max_hook(module, input, output): - if isinstance(input, (tuple, list)): - input = input[0] - if input.numel() == 0: - return # as no needs for act_max update - input, _, _ = reshape_pad_tensor_by_group_size(input, self.act_group_size) - act_max = torch.max(torch.abs(input), dim=-1).values - if not hasattr(module, "act_max") or module.act_max.numel() == 0: - module.act_max = act_max - else: - act_max = act_max.to(module.act_max.device) - if is_nv_fp(self.act_data_type): ## for nvfp per-tensor input_global_scale calculation usage - module.act_max = torch.max( - torch.tensor([act_max.max(), module.act_max.max()], device=act_max.device) - ) - else: - module.act_max = torch.max(act_max, module.act_max) - - hook_handles = [] - - for n, m in model.named_modules(): - if ( - hasattr(m, "act_dynamic") - and check_need_act_calibration(m.act_dynamic, m.act_data_type, m.act_bits) - and check_to_quantized(m) - ): - hook = m.register_forward_hook(get_act_max_hook) - hook_handles.append(hook) - continue - - # for whole model, RTN - if n in self.layer_config: - config = self.layer_config[n] - act_dynamic = config.get("act_dynamic", True) - act_data_type = config.get("act_data_type", None) - act_bits = config.get("act_data_type", 16) - if ( - config["bits"] <= 8 - and check_need_act_calibration(act_dynamic, act_data_type, act_bits) - and check_to_quantized(config) - ): - hook = m.register_forward_hook(get_act_max_hook) - hook_handles.append(hook) - continue - return hook_handles - - def _quantize_block( - self, - block: torch.nn.Module, - input_ids: list[torch.Tensor], - input_others: dict, - q_input: Union[None, torch.Tensor] = None, - device: Union[str, torch.device] = "cpu", - ): - """Quantize the weights of a given block of the model. - - Args: - block: The block of the model to be quantized. - input_ids: The input tensor containing tokenized input ids. - input_others: A dictionary containing additional input data. - q_input: The quantized input tensor. - device: The device for quantization. - - Returns: - Tuple: (q_outputs, output) if self.enable_quanted_input is True, else (None, output) - """ - if _is_fp8_model(self.model): - for n, m in block.named_modules(): - if _is_fp8_linear(m): - new_layer = convert_fp8_layer_to_linear(m, self.amp_dtype).to(device) - set_module(block, n, new_layer) - - if self.device_map == "auto": - self._set_auto_device_map_in_block(block, input_ids) - - if self.device_map is not None: - for n, m in block.named_modules(): - if len(list(m.children())) != 0 or not hasattr(m, "tuning_device"): - continue - from accelerate.hooks import AlignDevicesHook, add_hook_to_module - - hook = AlignDevicesHook(m.tuning_device, io_same_device=True) - add_hook_to_module(m, hook, True) - - if q_input is None: - hook_handles = self._register_act_max_hook(block) - - output = self._get_block_outputs( - block, input_ids, input_others, self.batch_size * self.infer_bs_coeff, device, self.cache_device - ) - - for handle in hook_handles: - handle.remove() - else: - output = self._get_block_outputs( - block, input_ids, input_others, self.batch_size * self.infer_bs_coeff, device, self.cache_device - ) - hook_handles = self._register_act_max_hook(block) - if hook_handles: - self._get_block_outputs( - block, - q_input, - input_others, - self.batch_size * self.infer_bs_coeff, - device, - self.cache_device, - save_output=False, - ) - - for handle in hook_handles: - handle.remove() - - if q_input is not None: - if input_ids is not q_input: - clear_memory(input_ids) - else: - clear_memory() - input_ids = q_input - - quantized_layer_names, unquantized_layer_names = wrapper_block( - block, self.enable_minmax_tuning, self.enable_norm_bias_tuning, device=self.device - ) - if is_nv_fp(self.data_type): # enable qkv and moe structure global_scale fuse - from auto_round.data_type.utils import update_fused_layer_global_scales - - modules = block.modules() - for module in modules: - update_fused_layer_global_scales(module) - round_params = [] - minmax_params = [] - for n, m in block.named_modules(): - if hasattr(m, "orig_layer"): - for key in m.params.keys(): - if "min" in key or "max" in key: - minmax_params.append(m.params[key]) - else: - round_params.append(m.params[key]) - - lr = torch.tensor(self.lr) - minmax_lr = torch.tensor(self.minmax_lr) - if self.enable_minmax_tuning: - optimizer = self.optimizer( - [{"params": round_params}, {"params": minmax_params, "lr": minmax_lr}], lr=lr, weight_decay=0 - ) - else: - optimizer = self.optimizer(round_params, lr=lr, weight_decay=0) - - if len(round_params) + len(minmax_params) <= 0: - dump_info = ( - f"quantized {len(quantized_layer_names)}/{(len(quantized_layer_names) + len(unquantized_layer_names))} " - f"layers in the block" - ) - logger.info(dump_info) - unwrapper_block(block, {}) # TODO Quant layer should change - mv_module_from_gpu(block, self.low_cpu_mem_usage) - return output, output - - if self.lr_scheduler is None: - lr_schedule = torch.optim.lr_scheduler.LinearLR( - optimizer, start_factor=1.0, end_factor=0.0, total_iters=self.iters - ) - else: - lr_schedule = copy.deepcopy(self.lr_scheduler) - - nsamples = len(input_ids) - pick_samples = self.batch_size * self.gradient_accumulate_steps - pick_samples = min(nsamples, pick_samples) - if self.sampler != "rand": - whole_indices = torch.randperm(nsamples)[:pick_samples] - last_best_iter = 0 - best_loss = torch.finfo(torch.float).max - num_elm = 1 - mse_reduction = "mean" - if self.gradient_accumulate_steps != 1: - mse_reduction = "sum" - mse_loss = torch.nn.MSELoss(reduction=mse_reduction).to(device) - scaler = self._get_scaler() # pylint: disable=assignment-from-none - init_loss = None - best_params = {} - total_loss = 0 - for i in range(self.iters): - total_loss = 0 - if self.sampler == "rand": - whole_indices = torch.randperm(nsamples)[:pick_samples] - # We assume the block input and output shape is same - if self.gradient_accumulate_steps != 1: - current_input_ids = [input_ids[i] for i in whole_indices] - num_elm = sum(id.numel() for id in current_input_ids) - - for tmp_step in range(self.gradient_accumulate_steps): - indices = whole_indices[tmp_step * self.batch_size : (tmp_step + 1) * self.batch_size] - current_input_ids, current_input_others = self._sampling_inputs( - input_ids, - input_others, - indices, - seqlen=self.seqlen, - batch_dim=self.batch_dim, - share_cache_keys=self.shared_cache_keys, - ) - - current_output = [output[x] for x in indices] - current_output = torch.cat(current_output, dim=self.batch_dim) - current_output = to_device(current_output, device) - - output_q = block_forward( - block, current_input_ids, current_input_others, self.amp, self.amp_dtype, device - ) - if self.amp: - with autocast(device_type=device.split(":")[0], dtype=self.amp_dtype): - loss = mse_loss(output_q, current_output) # pylint: disable=not-callable - else: - loss = mse_loss( # pylint: disable=not-callable - output_q.to(torch.float32), current_output.to(torch.float32) - ) - - total_loss += loss.item() / num_elm - self._scale_loss_and_backward(scaler, loss) - - if i == 0: - init_loss = total_loss - - if total_loss < best_loss: - best_loss = total_loss - if not self.not_use_best_mse: - best_params = collect_best_params(block) - # print(f"get better result at iter {i}, the loss is {total_loss}", flush=True) - - last_best_iter = i - if self.not_use_best_mse and i == self.iters - 1: - best_params = collect_best_params(block) - - if not self.not_use_best_mse: - if 0 < self.dynamic_max_gap <= i - last_best_iter: - break - self._step(scaler, optimizer, lr_schedule) - - last_loss = total_loss - best_iter = self.iters - if not self.not_use_best_mse: - last_loss = best_loss - best_iter = last_best_iter - dump_info = ( - f"quantized {len(quantized_layer_names)}/{(len(quantized_layer_names) + len(unquantized_layer_names))} " - f"layers in the block, loss iter 0: {init_loss:.6f} -> iter {best_iter}: {last_loss:.6f}" - ) - logger.info(dump_info) - if len(unquantized_layer_names) != 0: - logger.info(f"{unquantized_layer_names} have not been quantized") - with torch.no_grad(): - unwrapper_block(block, best_params) - - if ( - is_nv_fp(self.act_data_type) - and hasattr(self, "formats") - and any("nv_fp" in format_ for format_ in self.formats) - ): - # enable moe experts act_max automatic generation for WrapperWALayer - set_amax_for_all_moe_layers(block, attr_name="orig_layer.act_max") - - if self.enable_quanted_input: - if self.low_cpu_mem_usage: - block = block.to(device) - clear_memory() - q_outputs = self._get_block_outputs( - block, - input_ids, - input_others, - self.batch_size * self.infer_bs_coeff, - device, - cache_device=self.cache_device, - ) - if self.device_map is not None: - accelerate.hooks.remove_hook_from_submodules(block) - mv_module_from_gpu(block, self.low_cpu_mem_usage) - clear_memory(input_ids) - - return q_outputs, output - - else: - if self.device_map is not None: - accelerate.hooks.remove_hook_from_submodules(block) - mv_module_from_gpu(block, self.low_cpu_mem_usage) - clear_memory(input_ids) - return None, output - - def _quantize_blocks( - self, - model: torch.nn.Module, - inputs: dict, - block_names: list, - q_input: torch.Tensor = None, - nblocks: int = 1, - device: str = "cpu", - pbar: tqdm = None, - ): - """Quantize and dequantize the weights of the specified blocks in the model. - - Args: - model: The PyTorch model to be quantized. - inputs: The input data for quantization. - block_names: The names of the blocks to be quantized and dequantized. - nblocks: The number of blocks to quantize and dequantize. - device: The device for quantization and dequantization. - - Returns: - None - """ - clear_memory() - for n, m in model.named_parameters(): - m.requires_grad_(False) - input_ids = inputs["input_ids"] - inputs.pop("input_ids", None) - input_others = inputs - clear_memory() - input_ids = to_device(input_ids, self.cache_device) - input_others = to_device(input_others, self.cache_device) - # As in calibration phase, we may use bf16 for calibration due to low_gpu_memory usage - tmp_dtype = self.amp_dtype if self.amp else torch.float32 - for i in range(len(input_ids)): - input_ids[i] = input_ids[i].to(tmp_dtype) - - for key in input_others.keys(): - if isinstance(input_others[key], torch.Tensor) and ( - input_others[key].dtype == torch.float16 or input_others[key].dtype == torch.bfloat16 - ): - input_others[key] = input_others[key].to(tmp_dtype) - elif isinstance(input_others[key], list): - for i in range(len(input_others[key])): - to_dtype(input_others[key][i], tmp_dtype) - - if ( - self.sym - and self.enable_alg_ext - and self.super_group_size is None - and ( - (self.data_type.startswith("int") and self.act_bits >= 8) - or self.data_type.startswith("mx") - or self.data_type.startswith("nv") - ) - ): - try: - from auto_round.alg_ext import quantize_block_ext - - BaseCompressor.quantize_block_ext = quantize_block_ext - quantize_block = self.quantize_block_ext # must use self.quantize_block_ext - if self.bits > 2 and (not self.data_type.startswith("mx") or not self.data_type.startswith("nv")): - logger.warning( - "algorithm extension has only undergone limited validation on " - "INT2,mxfp4 and nvfp4; use with caution." - ) - else: - logger.info("using algorithm extension for quantization.") - except (ImportError, ModuleNotFoundError): - quantize_block = self._quantize_block - if self.enable_torch_compile: - quantize_block = compile_func(quantize_block, device) - else: - quantize_block = quantize_block - else: - quantize_block = self._quantize_block - if self.enable_torch_compile: - quantize_block = compile_func(quantize_block, device) - - if pbar is None: - pbar = tqdm(range(0, len(block_names), nblocks)) - - for i in range(0, len(block_names), nblocks): - if i != 0: - pbar.update(1) - if nblocks == 1: - n = block_names[i] - pbar.set_description(f"Quantizing {n}") - m = get_module(model, n) - else: - names = block_names[i : min(i + nblocks, len(block_names))] - pbar.set_description(f"Quantizing [{i + 1}-{min(i + nblocks, len(block_names))}]/{len(block_names)}") - modules = [get_module(model, n) for n in names] - m = WrapperMultiblock(modules) - - if not self.model.device.type == "meta" or self.low_cpu_mem_usage: - m = m.to(device) - - q_input, input_ids = quantize_block( - m, - input_ids, - input_others, - q_input=q_input, - device=device, - ) - if self.is_packing_immediate: - from auto_round.export import PACKING_LAYER_WITH_FORMAT - - for _, tmp_m in m.named_modules(): - if not (hasattr(tmp_m, "bits") and check_to_quantized(tmp_m)): - continue - target_backend = self.formats[0].split(":")[0] if ":" in self.formats[0] else self.formats[0] - has_gguf = any("gguf" in format_ for format_ in self.formats) - if has_gguf: - from auto_round.export.export_to_gguf.export import pack_gguf_layer - - output_dir = self._get_save_folder_name(self.formats[0]) - model_type = ModelType.MMPROJ if self.mllm else ModelType.TEXT - pack_gguf_layer( - tmp_m.tmp_name, - self.model, - self.formats[0], - output_dir, - self.layer_config, - self.tokenizer, - processor=self.processor if hasattr(self, "processor") else None, - image_processor=self.image_processor if hasattr(self, "image_processor") else None, - model_type=model_type, - ) - else: - PACKING_LAYER_WITH_FORMAT[target_backend]( - tmp_m.tmp_name, self.model, self.formats[0], device=self.device - ) - pbar.set_description("Quantizing done") - pbar.update(1) - pbar.close() - self.model = mv_module_from_gpu(self.model, self.low_cpu_mem_usage) - for n, m in self.model.named_modules(): - if hasattr(m, "name"): - delattr(m, "name") - - del q_input - del input_ids - del input_others - del inputs - - clear_memory() - - def save_quantized( - self, output_dir: str = None, format: str = "auto_round", inplace: bool = True, **kwargs - ) -> torch.nn.Module: - """Save the quantized model to the specified output directory in the specified format. + def save_quantized( + self, output_dir: str = None, format: str = "auto_round", inplace: bool = True, **kwargs + ) -> torch.nn.Module: + """Save the quantized model to the specified output directory in the specified format. Args: output_dir (str, optional): The directory to save the quantized model. Defaults to None. @@ -3140,37 +1470,11 @@ def save_quantized( ) if "awq" in format and not self.bits == 4: raise ValueError("The AWQ format only supports W4 quantization ") - serialization_keys = [ - "bits", - "group_size", - "sym", - "data_type", - "enable_quanted_input", - "enable_minmax_tuning", - "seqlen", - "batch_size", - "scale_dtype", - "lr", - "minmax_lr", - "gradient_accumulate_steps", - "iters", - "amp", - "nsamples", - "low_gpu_mem_usage", - "to_quant_block_names", - "enable_norm_bias_tuning", - "act_bits", - "act_group_size", - "act_sym", - "act_dynamic", - "act_data_type", - "super_bits", - "super_group_size", - ] + if isinstance(self.dataset, str): - serialization_keys.append("dataset") + SERIALIZATION_KEYS.append("dataset") serialization_dict = {} - for key in serialization_keys: + for key in SERIALIZATION_KEYS: serialization_dict[key] = getattr(self, key) from auto_round.version import __version__ @@ -3206,30 +1510,6 @@ def save_quantized( ) return compressed_model - def _get_quantized_layer_names_outside_blocks(self) -> list: - """Gets the names of quantized layers outside blocks in the model. - - Returns: - list: List of layer names outside blocks. - """ - if self.layer_config is None or len(self.layer_config) == 0: - return [] - - layer_names = [] - all_layers_in_block = get_layer_names_in_block(self.model, self.supported_types, self.quant_block_list) - - for key in self.layer_config.keys(): - if key in all_layers_in_block: - continue - layer = get_module(self.model, key) - if layer is None: - logger.error(f"could not find layer {key} in the model, exit...") - exit(-1) - if isinstance(layer, tuple(self.supported_types)) and check_to_quantized(self.layer_config[key]): - layer_names.append(key) - - return layer_names - def _set_amp_dtype(self) -> None: """Sets the automatic mixed precision (AMP) data type for the model based on the device and configuration.""" self.amp_dtype = torch.bfloat16 @@ -3301,55 +1581,6 @@ def _step(self, scaler: Any, optimizer: Any, lr_schedule: Any): optimizer.zero_grad() lr_schedule.step() - @classmethod - @torch.no_grad() - def _sampling_inputs( - cls, - input_ids: list[torch.Tensor], - input_others: dict, - indices: list[int], - seqlen: int, - batch_dim: int = 0, - share_cache_keys: tuple = (), - ): - """Samples inputs based on the given indices and sequence length. - - Args: - input_ids: The list of input tensor containing input_ids. - input_others: A dictionary containing other input data. - indices: The indices to sample from the input. - seqlen: The sequence length. - - Returns: - current_input_ids: The sampled input IDs. - current_input_others: The sampled other input data. - """ - current_input_ids = [input_ids[i] for i in indices] - - current_input_ids = torch.cat(current_input_ids, dim=batch_dim) - - current_input_others = {"positional_inputs": input_others["positional_inputs"]} - for key in input_others.keys(): - if "positional_inputs" in key: - continue - if (key not in share_cache_keys or len(indices) == 1) and not isinstance( - input_others[key], (str, bool, type(None)) - ): - current_input_others[key] = None - if input_others[key] is not None: - current_input_others[key] = [input_others[key][i] for i in indices] - if len(indices) == 1: - current_input_others[key] = current_input_others[key][0] - else: - try: - current_input_others[key] = torch.cat(current_input_others[key], dim=0) - except TypeError as err: - logger.warning_once("Please check the model cache inputs or try setting batch_size to 1.") - else: - current_input_others[key] = input_others[key] - - return current_input_ids, current_input_others - class LLMCompressor(BaseCompressor): pass diff --git a/auto_round/quantizers/__init__.py b/auto_round/quantizers/__init__.py new file mode 100644 index 00000000..0adf5035 --- /dev/null +++ b/auto_round/quantizers/__init__.py @@ -0,0 +1,36 @@ +# Copyright (c) 2025 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from __future__ import annotations + +from typing import TYPE_CHECKING + +from auto_round.quantizers.base import QuantizerType, BaseQuantizer +from auto_round.quantizers.mode import TuningQuantizer, RTNQuantizer +from auto_round.quantizers.model_type import LLMQuantizer +from auto_round.quantizers.data_type import GGUFQuantizer + +if TYPE_CHECKING: + from auto_round.compressors import BaseCompressor + + +def create_quantizer(cls: "BaseCompressor"): + # example + quantizers = { + # QuantizerType.DATA_TYPE: GGUFQuantizer, + # QuantizerType.MODEL_TYPE: LLMQuantizer, + QuantizerType.MODE: RTNQuantizer if cls.iters == 0 else TuningQuantizer, + } + + dynamic_quantizer = type("AutoRoundQuantizer", tuple(quantizers.values()), {}) + return dynamic_quantizer(cls) diff --git a/auto_round/quantizers/base.py b/auto_round/quantizers/base.py new file mode 100644 index 00000000..dcf68617 --- /dev/null +++ b/auto_round/quantizers/base.py @@ -0,0 +1,145 @@ +# Copyright (c) 2025 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import traceback +import types +from enum import IntEnum +from typing import TYPE_CHECKING, Any, Callable, Union + +import torch + +from auto_round.data_type import QUANT_FUNC_WITH_DTYPE +from auto_round.logger import logger +from auto_round.schemes import QuantizationScheme +from auto_round.utils import check_to_quantized, clear_memory + +if TYPE_CHECKING: + from auto_round.compressors import BaseCompressor + + +class QuantizerType(IntEnum): + MODE = 1 + MODEL_TYPE = 2 + DATA_TYPE = 3 + + +class BaseQuantizer: + _quantizer_classes: dict[QuantizerType, dict[str, type[BaseQuantizer]]] = { + QuantizerType.MODE: {}, + QuantizerType.MODEL_TYPE: {}, + QuantizerType.DATA_TYPE: {}, + } + compressor: "BaseCompressor" = None + + def __init__(self, compressor: "BaseCompressor"): + self.compressor = compressor + + @staticmethod + def quanize(self): + """Quantize the model and return the quantized model along with layer configurations.The entry of Quantizer. + Returns: + The quantized model and layer configurations. + """ + pass + + def __getattr__(self, name): + if hasattr(self.compressor, name) and not isinstance(getattr(self.compressor, name), types.MethodType): + return getattr(self.compressor, name) + raise AttributeError(f"'{type(self).__name__}' object has no attribute '{name}'") + + def __setattr__(self, name, value): + if hasattr(self.compressor, name): + setattr(self.compressor, name, value) + else: + super().__setattr__(name, value) + + @torch.inference_mode() + def _quantize_embedding_layer(self): + """Quantizes embedding layers in the model according to the configuration. + + This method iterates through all modules in the model, identifies embedding + layers specified in `self.layer_config`, and applies the appropriate quantization + function based on bit precision, grouping strategy, and dtype. + + Returns: + bool: True if the quantization process completes without critical errors. + """ + is_quantized = False + for name, module in self.model.named_modules(): + # Skip non-Embedding modules or layers not in config + if not isinstance(module, torch.nn.Embedding) or name not in self.layer_config: + continue + + config = self.layer_config[name] + + # Skip layers that are not marked for quantization + if not check_to_quantized(config): + continue + is_quantized = True + config["scale_dtype"] = self.scale_dtype + dtype = config["data_type"] + + # Determine quantization function key with symmetry/asymmetry + if dtype not in QUANT_FUNC_WITH_DTYPE: + dtype = f"{dtype}_{'sym' if config['sym'] else 'asym'}" + + # Optionally use optimized rounding (RTN) variant + if not self.disable_opt_rtn and f"rtn_{dtype}" in QUANT_FUNC_WITH_DTYPE: + dtype = f"rtn_{dtype}" + + quant_func = QUANT_FUNC_WITH_DTYPE[dtype] + + # Attempt quantization on GPU, fall back to CPU if OOM + try: + weight, scale, zp = quant_func( + module.weight.to(self.device), + **{k: config[k] for k in ["bits", "group_size", "super_bits", "super_group_size", "scale_dtype"]}, + ) + except RuntimeError as e: + cuda_error_msg = traceback.format_exc() + try: + logger.error(cuda_error_msg) + logger.warning("falling back to CPU") + weight, scale, zp = quant_func( + module.weight.to("cpu"), + **{ + k: config[k] + for k in ["bits", "group_size", "super_bits", "super_group_size", "scale_dtype"] + }, + ) + except Exception as e: + raise + + # Overwrite the module's weights with the quantized version + module.weight.data.copy_(weight.cpu()) + + # Attach scale and zero point (zp) to the module + for param_name, value in zip(["scale", "zp"], [scale, zp]): + if isinstance(value, dict): + for k, v in value.items(): + setattr(module, k if k == "scale" else f"w_{k}", v.cpu()) + elif isinstance(value, torch.Tensor): + setattr(module, param_name, value.cpu()) + else: + setattr(module, param_name, value) + + # Update config + self.layer_config.setdefault(name, {}).update(config) + + # Release memory + clear_memory() + + return is_quantized diff --git a/auto_round/quantizers/data_type.py b/auto_round/quantizers/data_type.py new file mode 100644 index 00000000..2f61ff14 --- /dev/null +++ b/auto_round/quantizers/data_type.py @@ -0,0 +1,19 @@ +# Copyright (c) 2025 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from auto_round.quantizers.base import BaseQuantizer + + +class GGUFQuantizer(BaseQuantizer): + pass diff --git a/auto_round/quantizers/mode/__init__.py b/auto_round/quantizers/mode/__init__.py new file mode 100644 index 00000000..02caff79 --- /dev/null +++ b/auto_round/quantizers/mode/__init__.py @@ -0,0 +1,16 @@ +# Copyright (c) 2025 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from auto_round.quantizers.mode.rtn import RTNQuantizer +from auto_round.quantizers.mode.tuning import TuningQuantizer diff --git a/auto_round/quantizers/mode/rtn.py b/auto_round/quantizers/mode/rtn.py new file mode 100644 index 00000000..3eb55b59 --- /dev/null +++ b/auto_round/quantizers/mode/rtn.py @@ -0,0 +1,525 @@ +# Copyright (c) 2025 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import copy +import time +import traceback +from typing import Any, Callable, Union + +import accelerate +import torch +from accelerate.big_modeling import dispatch_model, infer_auto_device_map +from tqdm import tqdm + +from auto_round.data_type import QUANT_FUNC_WITH_DTYPE +from auto_round.export.export_to_gguf.config import GGUF_CONFIG, GGUF_INNER_CONFIG, ModelType +from auto_round.logger import logger +from auto_round.quantizers.base import BaseQuantizer +from auto_round.utils import ( + INNER_SUPPORTED_LAYER_TYPES, + SUPPORTED_DTYPES, + SUPPORTED_FORMATS, + SUPPORTED_LAYER_TYPES, + TORCH_VERSION_AT_LEAST_2_6, + CpuInfo, + _gguf_args_check, + _is_fp8_linear, + _is_fp8_model, + block_forward, + check_and_mark_fp8_model, + check_is_cpu, + check_need_act_calibration, + check_seqlen_compatible, + check_skippable_keywords, + check_to_quantized, + clear_memory, + collect_best_params, + compile_func, + convert_dtype_str2torch, + convert_fp8_layer_to_linear, + convert_fp8_model_to_16b_model, + copy_python_files_from_model_cache, + detect_device, + estimate_tuning_block_mem, + find_matching_blocks, + flatten_list, + get_block_names, + get_device_memory, + get_fp_layer_names, + get_layer_config_by_gguf_format, + get_layer_features, + get_layer_names_in_block, + get_lm_head_name, + get_max_vram, + get_module, + get_quant_keys, + get_shared_keys, + htcore, + infer_bits_by_data_type, + init_cache, + is_debug_mode, + is_mx_fp, + is_nv_fp, + is_optimum_habana_available, + is_standard_fp, + is_static_wfp8afp8, + is_wfp8afp8, + llm_load_model, + mv_module_from_gpu, + reset_params, + set_amax_for_all_moe_layers, + set_module, + to_device, + to_dtype, + unsupport_meta_device, +) +from auto_round.wrapper import WrapperLinear, WrapperMultiblock, unwrapper_block, unwrapper_layer, wrapper_block + + +class RTNQuantizer(BaseQuantizer): + @torch.inference_mode() + def quantize(self) -> tuple[torch.nn.Module, dict[str, Any]]: + """Quantize all modules in the model using RTN (Round-To-Nearest) strategy. + + If the target format includes GGUF with `k`, and optimized RTN is enabled, + blockwise quantization with input caching and imatrix is used. + + Returns: + tuple[nn.Module, Dict[str, Any]]: The quantized model and the layer configuration. + """ + if self.amp and self.model.dtype != self.amp_dtype: + self.model.to(self.amp_dtype) + + all_to_quantized_module_names: list[str] = [n for n, m in self.model.named_modules() if check_to_quantized(m)] + + if is_nv_fp(self.data_type): + from auto_round.data_type.nvfp import calculate_gparam + from auto_round.data_type.utils import update_fused_layer_global_scales + + pbar = tqdm(all_to_quantized_module_names) + for name in pbar: + pbar.set_description(f"Calculate weight global scale: {name}") + m = get_module(self.model, name) + weight_global_scale = calculate_gparam(m.weight, self.group_size) + setattr(m, "weight_global_scale", weight_global_scale) + + modules = list(self.model.modules()) + for module in tqdm(modules, desc="Update weight global scale for fuse module"): + update_fused_layer_global_scales(module) + + has_gguf_k = any("gguf" in fmt and "k" in fmt for fmt in getattr(self, "formats", [])) + + self._quantize_embedding_layer() + + self.model.to("cpu") + if has_gguf_k and not self.disable_opt_rtn: + self._quant_rtn_with_imatrix(all_to_quantized_module_names) + elif self.act_bits <= 8 and check_need_act_calibration( + self.act_dynamic, self.act_data_type, self.act_bits + ): # TODO, mixed datatype has bug + hook_handles = self._register_act_max_hook(self.model) + try: + self._quantize_via_rtn_blockwise(all_to_quantized_module_names) + except RuntimeError as e: + logger.warning("Fallback to CPU. Consider using more GPUs via `--device 0,1,2,3`.") + self.model = self.model.to("cpu") + clear_memory() + if hasattr(self.model, "hf_device_map") and len(self.model.hf_device_map) > 1: + import accelerate + + accelerate.hooks.remove_hook_from_submodules(self.model) + orig_device = self.device + self.device = "cpu" + self._quantize_via_rtn_blockwise(all_to_quantized_module_names) + self.device = orig_device + for handle in hook_handles: + handle.remove() + else: + block_names_cnt = len(flatten_list(get_block_names(self.model, True))) + clear_mem_freq = len(all_to_quantized_module_names) // block_names_cnt + if clear_mem_freq == 0: + clear_mem_freq = 1 + pbar = tqdm(all_to_quantized_module_names) + cnt = 1 + for name in pbar: + pbar.set_description(f"Quantizing {name}") + self._quantize_layer_via_rtn(name) + if cnt % clear_mem_freq == 0: + clear_memory() + cnt = 1 + cnt += 1 + # Convert remaining fp8 + if _is_fp8_model(self.model): + convert_fp8_model_to_16b_model(self.model, self.amp_dtype) + self.quantized = True + return self.model, self.layer_config + + def _quantize_layer_via_rtn(self, name: str) -> None: + """Quantizes a layer using RTN (Round-To-Nearest) if available. + + This function attempts to quantize a layer by switching its data type to a + `rtn_*` version if supported, then wraps and unwraps the module to apply + quantization. If GPU memory is insufficient, it falls back to CPU. + + If packing is enabled (`is_packing_immediate`), the function will also export + the quantized layer to the appropriate backend format. + + Args: + name (str): Name of the layer to quantize. + + Raises: + RuntimeError: If quantization fails for reasons unrelated to memory. + """ + m = get_module(self.model, name) + + # if m.__class__.__name__ == "FP8Linear": + if _is_fp8_linear(m): + m = convert_fp8_layer_to_linear(m, self.amp_dtype) + set_module(self.model, name, m) + + # Step 1: Use optimized RTN data type if available + if not self.disable_opt_rtn and not m.data_type.startswith("rtn_"): + from auto_round.data_type import QUANT_FUNC_WITH_DTYPE + + rtn_dtype = "rtn_" + m.data_type + if rtn_dtype in QUANT_FUNC_WITH_DTYPE: + m.data_type = rtn_dtype + self.layer_config[name]["data_type"] = m.data_type + + # Step 2: Try quantization on GPU first, fall back to CPU if OOM + # if only export gguf, using gguf-packing instead of rtn + if self.is_packing_immediate and self.iters == 0 and "gguf" in self.formats[0] and not self.disable_opt_rtn: + m.scale = None + m.zp = None + else: + try: + m.to(self.device) + m = WrapperLinear( + m, + enable_minmax_tuning=False, + enable_norm_bias_tuning=False, + enable_round_tuning=False, + ) + m = m.unwrapper({}) + m.to("cpu") + except RuntimeError as e: + cuda_error_msg = traceback.format_exc() + m = m.orig_layer if hasattr(m, "orig_layer") else m + try: + logger.error(cuda_error_msg) + logger.warning("falling back to CPU.") + m.to("cpu") + m = WrapperLinear( + m, + enable_minmax_tuning=False, + enable_norm_bias_tuning=False, + enable_round_tuning=False, + ) + m = m.unwrapper({}) + except Exception as e: + raise + + # Step 3: Optional immediate packing/export + if self.is_packing_immediate: + from auto_round.export import PACKING_LAYER_WITH_FORMAT + + if check_to_quantized(m): + target_backend = self.formats[0].split(":")[0] if ":" in self.formats[0] else self.formats[0] + has_gguf = any("gguf" in fmt for fmt in self.formats) + + if has_gguf: + from auto_round.export.export_to_gguf.export import pack_gguf_layer + + output_dir = self._get_save_folder_name(self.formats[0]) + model_type = ModelType.MMPROJ if self.mllm else ModelType.TEXT + pack_gguf_layer( + name, + self.model, + self.formats[0], + output_dir, + self.layer_config, + self.tokenizer, + processor=self.processor if hasattr(self, "processor") else None, + image_processor=self.image_processor if hasattr(self, "image_processor") else None, + model_type=model_type, + ) + else: + PACKING_LAYER_WITH_FORMAT[target_backend](name, self.model, self.formats[0], device=self.device) + + # if self.low_gpu_mem_usage: + # clear_memory() + else: + set_module(self.model, name, m) + + def _quant_rtn_with_imatrix(self, all_to_quantized_module_names: list[str]) -> None: + """Performs RTN quantization using input activation statistics (imatrix). + + This method accumulates per-channel second-moment activation statistics (imatrix) + via forward hooks and uses them to perform RTN quantization. If CUDA memory runs out, + it falls back to CPU-based blockwise quantization. + + Args: + all_to_quantized_module_names (list[str]): + A list of module names (e.g., 'model.layers.0.self_attn.q_proj') to be quantized. + + Returns: + None + """ + logger.info("start to compute imatrix for GGUF quantization") + + # Load dataset + from auto_round.calib_dataset import get_dataloader + + if _is_fp8_model(self.model): + convert_fp8_model_to_16b_model(self.model, self.amp_dtype) + + if isinstance(self.dataset, str): + if self.tokenizer is None: + raise ValueError("A tokenizer must be set for the model when using a dataset string.") + dataset_name = self.dataset.replace(" ", "") + self.dataloader = get_dataloader( + self.tokenizer, self.seqlen, dataset_name, self.seed, self.batch_size, self.nsamples + ) + else: + self.dataloader = self.dataset + + model = self.model + + # Dispatch multi-GPU model if necessary + if hasattr(model, "hf_device_map") and len(model.hf_device_map) > 1: + dispatch_model(model, model.hf_device_map) + + def register_act_hook(model): + """Registers hooks to accumulate activation squared norms into `imatrix`.""" + + def get_imatrix_hook(module, input, output): + input = input[0] if isinstance(input, (tuple, list)) else input + flattened = input.reshape(-1, input.shape[-1]).to(torch.float32) + squared = torch.sum(torch.pow(flattened, 2), dim=0).to(torch.float32) + + if not hasattr(module, "imatrix"): + module.imatrix = squared + module.imatrix_cnt = input.shape[0] + else: + module.imatrix += squared.to(module.imatrix.device) + module.imatrix_cnt += input.shape[0] + + hook_handles = [] + for name, module in model.named_modules(): + if isinstance(module, self.supported_types) and check_to_quantized(module): + hook = module.register_forward_hook(get_imatrix_hook) + hook_handles.append(hook) + return hook_handles + + hooks = register_act_hook(model) + + try: + # Move model to target device + if hasattr(self.model, "hf_device_map") and len(self.model.hf_device_map) > 1: + dispatch_model(self.model, self.model.hf_device_map) + else: + model = model.to(self.device) + cnt = 0 + + # Run forward pass to accumulate imatrix + for data in self.dataloader: + cnt += data["input_ids"].shape[0] + data = to_device(data, self.device) + model(**data) + if cnt >= self.nsamples: + break + + # Remove hooks after data collection + for hook in hooks: + hook.remove() + + # Normalize imatrix by count + for _, module in model.named_modules(): + if hasattr(module, "imatrix"): + module.imatrix /= module.imatrix_cnt + if hasattr(model, "hf_device_map") and len(model.hf_device_map) > 1: + import accelerate + + accelerate.hooks.remove_hook_from_submodules(model) + # Perform quantization using RTN + pbar = tqdm(all_to_quantized_module_names) + block_names_cnt = len(flatten_list(get_block_names(self.model, True))) + clear_mem_freq = len(all_to_quantized_module_names) // block_names_cnt + if clear_mem_freq == 0: + clear_mem_freq = 1 + cnt = 1 + for name in pbar: + pbar.set_description(f"Quantizing {name}") + self._quantize_layer_via_rtn(name) + if cnt % clear_mem_freq == 0: + clear_memory() + cnt = 1 + cnt += 1 + except RuntimeError as e: + try: + if hasattr(model, "hf_device_map") and len(model.hf_device_map) > 1: + import accelerate + + accelerate.hooks.remove_hook_from_submodules(model) + # Fallback: out-of-memory → try CPU blockwise quantization + logger.warning("Out of VRAM, falling back to blockwise quantization. Accuracy may degrade.") + model = model.to("cpu") + clear_memory() + self._quantize_via_rtn_blockwise(all_to_quantized_module_names) + except RuntimeError as e: + cuda_error_msg = traceback.format_exc() + try: + logger.error(cuda_error_msg) + # Final fallback: warn and use CPU-only quantization + logger.warning( + "Fallback to CPU. " + "Consider enabling `low_gpu_mem_usage` or using more GPUs via `--device 0,1,2,3`." + ) + model = model.to("cpu") + clear_memory() + if hasattr(model, "hf_device_map") and len(model.hf_device_map) > 1: + import accelerate + + accelerate.hooks.remove_hook_from_submodules(model) + + orig_device = self.device + self.device = "cpu" + self._quantize_via_rtn_blockwise(all_to_quantized_module_names) + self.device = orig_device + except Exception as e: + raise + finally: + # Always remove hooks + for hook in hooks: + hook.remove() + + def _quantize_via_rtn_blockwise(self, all_to_quantized_module_names: list[str]) -> None: + """Quantize model layers block by block using cached inputs and imatrix. + + Args: + all_to_quantized_module_names (list[str]): Names of layers to be quantized. + """ + all_to_quantized_module_names = list(set(all_to_quantized_module_names)) + + all_blocks = self.quant_block_list if self.quant_block_list else get_block_names(self.model) + if not all_blocks: + raise ValueError("Could not find any blocks. Check the model or quant_block_list.") + + all_first_block_names = [block[0] for block in all_blocks] + if self.act_bits < 16 and not self.act_dynamic: + layer_names = self._get_quantized_layer_names_outside_blocks() + if len(layer_names) > 0: + logger.warning( + "quantize layers outside blocks for static activation quantizaiton" + " will significantly increase calibration time" + ) + all_inputs = self.try_cache_inter_data_gpucpu(all_first_block_names, self.nsamples, layer_names) + else: + all_inputs = self.cache_inter_data(all_first_block_names, self.nsamples) + + # Clear hooks for multi-GPU setups + if hasattr(self.model, "hf_device_map") and len(self.model.hf_device_map) > 1: + accelerate.hooks.remove_hook_from_submodules(self.model) + + pbar = tqdm(range(sum(len(block) for block in all_blocks))) + + for block_names in all_blocks: + first_block = block_names[0] + inputs = all_inputs.pop(first_block) + input_keys = [k for k in inputs if k.startswith("hidden_state")] + if len(input_keys) != 1: + raise RuntimeError( + "hidden_states arg mismatch. Please file an issue at https://github.com/intel/auto-round/issues" + ) + inputs["input_ids"] = inputs.pop(input_keys[0]) + + clear_memory(self.inputs) + + total_samples = len(inputs["input_ids"]) + if total_samples < self.batch_size: + self.batch_size = total_samples + logger.warning(f"Forcing batch size to {total_samples}") + + input_ids = to_device(inputs.pop("input_ids"), self.cache_device) + input_others = to_device(inputs, self.cache_device) + + tmp_dtype = self.amp_dtype if self.amp else torch.float32 + input_ids = [id_.to(tmp_dtype) for id_ in input_ids] + + for key, val in input_others.items(): + if isinstance(val, torch.Tensor) and val.dtype in (torch.float16, torch.bfloat16): + input_others[key] = val.to(tmp_dtype) + elif isinstance(val, list): + input_others[key] = [to_dtype(v, tmp_dtype) for v in val] + + for block_name in block_names: + pbar.set_description(f"Quantizing {block_name}") + block = get_module(self.model, block_name) + block = block.to(self.device) + if _is_fp8_model(self.model): + convert_fp8_model_to_16b_model(block, dtype=self.amp_dtype) + + if self.device_map == "auto": + self._set_auto_device_map_in_block(block, input_ids) + + # Dispatch model if needed + if self.device_map is not None: + from accelerate.hooks import AlignDevicesHook, add_hook_to_module + + for _, m in block.named_modules(): + if len(list(m.children())) != 0 or not hasattr(m, "tuning_device"): + continue + hook = AlignDevicesHook(m.tuning_device, io_same_device=True) + add_hook_to_module(m, hook, True) + else: + block = block.to(self.device) + input_ids = self._get_block_outputs( + block, + input_ids, + input_others, + self.batch_size * self.infer_bs_coeff, + self.device, + self.cache_device, + ) + if self.device_map is not None: + accelerate.hooks.remove_hook_from_submodules(block) + + if ( + is_nv_fp(self.act_data_type) and any("nv_fp" in format_ for format_ in self.formats) + ) or is_static_wfp8afp8(self): + # enable moe experts act_max automatic generation for Linear + set_amax_for_all_moe_layers(block, attr_name="act_max") + # Normalize imatrix and quantize layers + for _, m in block.named_modules(): + if hasattr(m, "imatrix"): + m.imatrix /= m.imatrix_cnt + if hasattr(m, "tmp_name") and m.tmp_name in all_to_quantized_module_names: + self._quantize_layer_via_rtn(m.tmp_name) + all_to_quantized_module_names.remove(m.tmp_name) + + mv_module_from_gpu(block, self.low_cpu_mem_usage) + pbar.update(1) + + pbar.close() + cnt = 1 + block_names_cnt = len(flatten_list(get_block_names(self.model, True))) + clear_mem_freq = len(all_to_quantized_module_names) // block_names_cnt + if clear_mem_freq == 0: + clear_mem_freq = 1 + # Process remaining layers not in blocks + for name in all_to_quantized_module_names: + self._quantize_layer_via_rtn(name) + if cnt % clear_mem_freq == 0: + clear_memory() + cnt = 1 + cnt += 1 diff --git a/auto_round/quantizers/mode/tuning.py b/auto_round/quantizers/mode/tuning.py new file mode 100644 index 00000000..f5052d29 --- /dev/null +++ b/auto_round/quantizers/mode/tuning.py @@ -0,0 +1,1357 @@ +# Copyright (c) 2025 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import copy +import time +import traceback +from typing import Any, Callable, Union + +import accelerate +import torch +from accelerate.big_modeling import dispatch_model, infer_auto_device_map +from torch import autocast +from tqdm import tqdm + +from auto_round.data_type import QUANT_FUNC_WITH_DTYPE +from auto_round.data_type.utils import reshape_pad_tensor_by_group_size +from auto_round.export.export_to_gguf.config import GGUF_CONFIG, GGUF_INNER_CONFIG, ModelType +from auto_round.logger import logger +from auto_round.low_cpu_mem.utils import get_layers_before_block +from auto_round.quantizers.base import BaseQuantizer +from auto_round.utils import ( + INNER_SUPPORTED_LAYER_TYPES, + SUPPORTED_DTYPES, + SUPPORTED_FORMATS, + SUPPORTED_LAYER_TYPES, + TORCH_VERSION_AT_LEAST_2_6, + CpuInfo, + _gguf_args_check, + _is_fp8_linear, + _is_fp8_model, + block_forward, + check_and_mark_fp8_model, + check_is_cpu, + check_need_act_calibration, + check_seqlen_compatible, + check_skippable_keywords, + check_to_quantized, + clear_memory, + collect_best_params, + compile_func, + convert_dtype_str2torch, + convert_fp8_layer_to_linear, + convert_fp8_model_to_16b_model, + copy_python_files_from_model_cache, + detect_device, + estimate_tuning_block_mem, + find_matching_blocks, + flatten_list, + get_block_names, + get_device_memory, + get_fp_layer_names, + get_layer_config_by_gguf_format, + get_layer_features, + get_layer_names_in_block, + get_lm_head_name, + get_max_vram, + get_module, + get_quant_keys, + get_shared_keys, + htcore, + infer_bits_by_data_type, + init_cache, + is_debug_mode, + is_mx_fp, + is_nv_fp, + is_optimum_habana_available, + is_standard_fp, + is_static_wfp8afp8, + is_wfp8afp8, + llm_load_model, + mv_module_from_gpu, + reset_params, + set_amax_for_all_moe_layers, + set_module, + to_device, + to_dtype, + unsupport_meta_device, +) +from auto_round.wrapper import WrapperLinear, WrapperMultiblock, unwrapper_block, unwrapper_layer, wrapper_block + + +class TuningQuantizer(BaseQuantizer): + def quantize(self) -> tuple[torch.nn.Module, dict[str, Any]]: + if bool(self.quant_block_list): + all_blocks = self.quant_block_list + else: + all_blocks = get_block_names(self.model) + + if len(all_blocks) == 0: + logger.warning("could not find blocks, exit with original model") + return self.model, self.layer_config + + if self.amp and self.model.dtype != self.amp_dtype: + self.model = self.model.to(self.amp_dtype) + + layer_names = self._get_quantized_layer_names_outside_blocks() + self.start_time = time.time() + all_first_block_names = [block[0] for block in all_blocks] + if len(layer_names) > 0: + logger.info( + "Starting to cache block inputs. This may be slow due to external block layers: %s", layer_names + ) + else: + logger.info("start to cache block inputs") + all_inputs = self.try_cache_inter_data_gpucpu(all_first_block_names, self.nsamples, layer_names=layer_names) + is_quantized_embedding = self._quantize_embedding_layer() + all_q_inputs = None + if is_quantized_embedding: + all_inputs = copy.deepcopy(self.inputs) + clear_memory(self.inputs) + all_q_inputs = self.try_cache_inter_data_gpucpu( + all_first_block_names, self.nsamples, layer_names=layer_names + ) + self.model = mv_module_from_gpu(self.model, self.low_cpu_mem_usage) + clear_memory() + if hasattr(self.model, "hf_device_map") and len(self.model.hf_device_map) > 1: + accelerate.hooks.remove_hook_from_submodules(self.model) # self.model.hf_device_map has not been changed + self.model = mv_module_from_gpu(self.model, self.low_cpu_mem_usage) + logger.info("caching done") + if len(all_blocks) > 1: + pbar = tqdm(range(0, sum([len(i) for i in all_blocks]), self.nblocks)) + else: + pbar = None # move the alg warning outside pbar + + for block_names in all_blocks: + inputs = all_inputs[block_names[0]] + all_inputs.pop(block_names[0]) + q_inputs = None + if all_q_inputs is not None: + q_inputs = all_q_inputs[block_names[0]] + all_q_inputs.pop(block_names[0]) + keys = inputs.keys() + input_id_str = [key for key in keys if key.startswith("hidden_state")] + if len(input_id_str) != 1: + raise RuntimeError( + "hidden_states arg mismatch error," + "please raise an issue in https://github.com/intel/auto-round/issues" + ) + inputs["input_ids"] = inputs.pop(input_id_str[0], None) + if q_inputs is not None: + q_inputs["input_ids"] = q_inputs.pop(input_id_str[0], None) + + clear_memory(self.inputs) + + if "input_ids" in inputs.keys(): + total_samples = len(inputs["input_ids"]) + if total_samples < self.batch_size: + self.batch_size = total_samples + logger.warning(f"force the train batch size to {total_samples}") + + self._quantize_blocks( + self.model, + inputs, + block_names, + q_input=q_inputs["input_ids"] if q_inputs is not None else None, + nblocks=self.nblocks, + device=self.device, + pbar=pbar, + ) + if self.is_packing_immediate and len(self.formats) != 1: + raise ValueError( + f"Expected exactly one packing format when 'is_packing_immediate' is True, " + f"but got {len(self.formats)} formats." + ) + + self._quantize_layers(layer_names, all_inputs) ##TODO pack layer immediately + + if _is_fp8_model(self.model): + for n, m in self.model.named_modules(): + if _is_fp8_linear(m): + new_layer = convert_fp8_layer_to_linear(m, self.amp_dtype).to("cpu") + set_module(self.model, n, new_layer) + + end_time = time.time() + cost_time = end_time - self.start_time + logger.info(f"quantization tuning time {cost_time}") + + ## dump a summary + quantized_layers = [] + unquantized_layers = [] + for n, m in self.model.named_modules(): + if isinstance(m, tuple(self.supported_types)): + if check_to_quantized(m): + quantized_layers.append(n) + else: + unquantized_layers.append(n) + elif hasattr(m, "scales") or hasattr(m, "scale"): ##packing_immediately + quantized_layers.append(n) + summary_info = ( + f"Summary: quantized {len(quantized_layers)}/{len(quantized_layers) + len(unquantized_layers)} in the model" + ) + if len(unquantized_layers) > 0: + summary_info += f", {unquantized_layers} have not been quantized" + logger.info(summary_info) + + self.quantized = True + return self.model, self.layer_config + + def _get_quantized_layer_names_outside_blocks(self) -> list: + """Gets the names of quantized layers outside blocks in the model. + + Returns: + list: List of layer names outside blocks. + """ + if self.layer_config is None or len(self.layer_config) == 0: + return [] + + layer_names = [] + all_layers_in_block = get_layer_names_in_block(self.model, self.supported_types, self.quant_block_list) + + for key in self.layer_config.keys(): + if key in all_layers_in_block: + continue + layer = get_module(self.model, key) + if layer is None: + logger.error(f"could not find layer {key} in the model, exit...") + exit(-1) + if isinstance(layer, tuple(self.supported_types)) and check_to_quantized(self.layer_config[key]): + layer_names.append(key) + + return layer_names + + @torch.no_grad() + def cache_inter_data(self, block_names, nsamples, layer_names=None, last_cache_name=None): + """Save the inputs of block_name for calibration. + + This method temporarily replaces the forward method of the model to capture + the inputs passing through the specified block. It then calibrates the model + using a specified number of samples. Finally, it restores the original forward + method and returns the inputs for the specified block. + Args: + block_names (list): The names of the blocks for which inputs are to be saved. + layer_names (list):The names of the layers for which inputs are to be saved. + nsamples (int): The number of samples to use for calibration. + last_cache_name (str, optional): The name of the last layer to be cached, + we could break the forward in this layer to save time + + Returns: + dict: A dictionary containing the inputs for the specified block. + """ + if layer_names is None: + layer_names = [] + self.inputs = {} + self.to_cached_layers = block_names + layer_names + + tmp_dtype = None # TODO delete this as most model is not fp32 now + ## have bug if block name is not the first block + if (len(block_names) > 1 or len(layer_names) > 0) and self.low_gpu_mem_usage: + tmp_dtype = self.model.dtype + if self.amp: + if self.model.dtype != self.model.dtype: + self.model = self.model.to(torch.bfloat16) + else: + self.model = self.model.to(torch.float32) ##model on cpu + + self.last_cache_name = last_cache_name + if last_cache_name is None and len(block_names) + len(layer_names) == 1: + self.last_cache_name = block_names[0] if len(block_names) == 1 else layer_names[0] + # do not set last_cache_name for multimodal models + calib_bs = self.batch_size + self.hook_handles = [] + self._replace_forward() + self.calib(nsamples, calib_bs) + self._recover_forward() + res = self.inputs + del self.last_cache_name + del self.to_cached_layers + if tmp_dtype is not None: + self.model = self.model.to(tmp_dtype) + + return res + + @torch.no_grad() + def try_cache_inter_data_gpucpu(self, block_names, nsamples, layer_names=None, last_cache_name=None): + """Attempts to cache intermediate data on GPU, if failed, then using CPU. + + Args: + block_names (list): List of block names to cache data for. + nsamples (int): Number of samples to use for caching. + layer_names (list, optional): List of layer names to cache data for. Defaults to []. + last_cache_name (str, optional): Name of the last cache. Defaults to None. + + Returns: + all_inputs: Cached intermediate data. + + Raises: + Exception: If caching on GPU fails, switches to CPU and caches there. + """ + if _is_fp8_model(self.model): + layer_names = [] + if layer_names is None: + layer_names = [] + + if self.low_gpu_mem_usage or ( + len(block_names) == 1 + and len(layer_names) == 0 + and not self.has_qlayer_outside_block + and (last_cache_name is None or last_cache_name in block_names) + ): + # low_gpu_mem_usage or calibrate only the embedding layer, which is also very fast on CPU + all_inputs = self.cache_inter_data(block_names, nsamples, layer_names=[], last_cache_name=last_cache_name) + else: + try: + if not self.model.device.type == "meta": + if hasattr(self.model, "hf_device_map") and len(self.model.hf_device_map) > 1: + self.model = dispatch_model(self.model, device_map=self.model.hf_device_map) + else: + # Change this if new device is supported + if str(self.model.device) == "cpu" and ( + self.device.startswith("xpu") or self.device.startswith("cuda") + ): + max_memory = get_max_vram() # TODO model is not evenly split + no_split_modules = getattr(self.model, "_no_split_modules", []) + device_map = infer_auto_device_map( + self.model, max_memory=max_memory, no_split_module_classes=no_split_modules + ) + + self.model = dispatch_model(self.model, device_map=device_map) + else: + self.model = self.model.to(self.device) + + all_inputs = self.cache_inter_data( + block_names, nsamples, layer_names=layer_names, last_cache_name=last_cache_name + ) + if hasattr(self.model, "hf_device_map") and len(self.model.hf_device_map) > 1: + accelerate.hooks.remove_hook_from_submodules(self.model) + + except RuntimeError as e: + cuda_error_msg = traceback.format_exc() + try: + logger.info("switch to cpu to cache block inputs") + if self.has_qlayer_outside_block or self.__class__.__name__ == "AutoRoundMLLM": + logger.warning( + "we recommend using more GPUs in calibration." + " Otherwise, some layers may fall back to `rtn` mode, which can affect accuracy." + ) + if hasattr(self.model, "hf_device_map") and len(self.model.hf_device_map) > 1: + accelerate.hooks.remove_hook_from_submodules( + self.model + ) ##self.model.hf_device_map has not been changed + self.model = mv_module_from_gpu(self.model, self.low_cpu_mem_usage) + clear_memory() + ## Important change after v0.51, on cpu, we use rtn mode for layers in layer_names + all_inputs = self.cache_inter_data( + block_names, nsamples, layer_names=[], last_cache_name=last_cache_name + ) + except Exception as e: + logger.error(cuda_error_msg) + raise + return all_inputs + + def _quantize_block( + self, + block: torch.nn.Module, + input_ids: list[torch.Tensor], + input_others: dict, + q_input: Union[None, torch.Tensor] = None, + device: Union[str, torch.device] = "cpu", + ): + """Quantize the weights of a given block of the model. + + Args: + block: The block of the model to be quantized. + input_ids: The input tensor containing tokenized input ids. + input_others: A dictionary containing additional input data. + q_input: The quantized input tensor. + device: The device for quantization. + + Returns: + Tuple: (q_outputs, output) if self.enable_quanted_input is True, else (None, output) + """ + if _is_fp8_model(self.model): + for n, m in block.named_modules(): + if _is_fp8_linear(m): + new_layer = convert_fp8_layer_to_linear(m, self.amp_dtype).to(device) + set_module(block, n, new_layer) + + if self.device_map == "auto": + self._set_auto_device_map_in_block(block, input_ids) + + if self.device_map is not None: + for n, m in block.named_modules(): + if len(list(m.children())) != 0 or not hasattr(m, "tuning_device"): + continue + from accelerate.hooks import AlignDevicesHook, add_hook_to_module + + hook = AlignDevicesHook(m.tuning_device, io_same_device=True) + add_hook_to_module(m, hook, True) + + if q_input is None: + hook_handles = self._register_act_max_hook(block) + + output = self._get_block_outputs( + block, input_ids, input_others, self.batch_size * self.infer_bs_coeff, device, self.cache_device + ) + + for handle in hook_handles: + handle.remove() + else: + output = self._get_block_outputs( + block, input_ids, input_others, self.batch_size * self.infer_bs_coeff, device, self.cache_device + ) + hook_handles = self._register_act_max_hook(block) + if hook_handles: + self._get_block_outputs( + block, + q_input, + input_others, + self.batch_size * self.infer_bs_coeff, + device, + self.cache_device, + save_output=False, + ) + + for handle in hook_handles: + handle.remove() + + if q_input is not None: + if input_ids is not q_input: + clear_memory(input_ids) + else: + clear_memory() + input_ids = q_input + + quantized_layer_names, unquantized_layer_names = wrapper_block( + block, self.enable_minmax_tuning, self.enable_norm_bias_tuning, device=self.device + ) + if is_nv_fp(self.data_type): # enable qkv and moe structure global_scale fuse + from auto_round.data_type.utils import update_fused_layer_global_scales + + modules = block.modules() + for module in modules: + update_fused_layer_global_scales(module) + round_params = [] + minmax_params = [] + for n, m in block.named_modules(): + if hasattr(m, "orig_layer"): + for key in m.params.keys(): + if "min" in key or "max" in key: + minmax_params.append(m.params[key]) + else: + round_params.append(m.params[key]) + + lr = torch.tensor(self.lr) + minmax_lr = torch.tensor(self.minmax_lr) + if self.enable_minmax_tuning: + optimizer = self.optimizer( + [{"params": round_params}, {"params": minmax_params, "lr": minmax_lr}], lr=lr, weight_decay=0 + ) + else: + optimizer = self.optimizer(round_params, lr=lr, weight_decay=0) + + if len(round_params) + len(minmax_params) <= 0: + dump_info = ( + f"quantized {len(quantized_layer_names)}/{(len(quantized_layer_names) + len(unquantized_layer_names))} " + f"layers in the block" + ) + logger.info(dump_info) + unwrapper_block(block, {}) # TODO Quant layer should change + mv_module_from_gpu(block, self.low_cpu_mem_usage) + return output, output + + if self.lr_scheduler is None: + lr_schedule = torch.optim.lr_scheduler.LinearLR( + optimizer, start_factor=1.0, end_factor=0.0, total_iters=self.iters + ) + else: + lr_schedule = copy.deepcopy(self.lr_scheduler) + + nsamples = len(input_ids) + pick_samples = self.batch_size * self.gradient_accumulate_steps + pick_samples = min(nsamples, pick_samples) + if self.sampler != "rand": + whole_indices = torch.randperm(nsamples)[:pick_samples] + last_best_iter = 0 + best_loss = torch.finfo(torch.float).max + num_elm = 1 + mse_reduction = "mean" + if self.gradient_accumulate_steps != 1: + mse_reduction = "sum" + mse_loss = torch.nn.MSELoss(reduction=mse_reduction).to(device) + scaler = self.scaler # pylint: disable=assignment-from-none + init_loss = None + best_params = {} + total_loss = 0 + for i in range(self.iters): + total_loss = 0 + if self.sampler == "rand": + whole_indices = torch.randperm(nsamples)[:pick_samples] + # We assume the block input and output shape is same + if self.gradient_accumulate_steps != 1: + current_input_ids = [input_ids[i] for i in whole_indices] + num_elm = sum(id.numel() for id in current_input_ids) + + for tmp_step in range(self.gradient_accumulate_steps): + indices = whole_indices[tmp_step * self.batch_size : (tmp_step + 1) * self.batch_size] + current_input_ids, current_input_others = self._sampling_inputs( + input_ids, + input_others, + indices, + seqlen=self.seqlen, + batch_dim=self.batch_dim, + share_cache_keys=self.shared_cache_keys, + ) + + current_output = [output[x] for x in indices] + current_output = torch.cat(current_output, dim=self.batch_dim) + current_output = to_device(current_output, device) + + output_q = block_forward( + block, current_input_ids, current_input_others, self.amp, self.amp_dtype, device + ) + if self.amp: + with autocast(device_type=device.split(":")[0], dtype=self.amp_dtype): + loss = mse_loss(output_q, current_output) # pylint: disable=not-callable + else: + loss = mse_loss( # pylint: disable=not-callable + output_q.to(torch.float32), current_output.to(torch.float32) + ) + + total_loss += loss.item() / num_elm + self._scale_loss_and_backward(scaler, loss) + + if i == 0: + init_loss = total_loss + + if total_loss < best_loss: + best_loss = total_loss + if not self.not_use_best_mse: + best_params = collect_best_params(block) + # print(f"get better result at iter {i}, the loss is {total_loss}", flush=True) + + last_best_iter = i + if self.not_use_best_mse and i == self.iters - 1: + best_params = collect_best_params(block) + + if not self.not_use_best_mse: + if 0 < self.dynamic_max_gap <= i - last_best_iter: + break + self._step(scaler, optimizer, lr_schedule) + + last_loss = total_loss + best_iter = self.iters + if not self.not_use_best_mse: + last_loss = best_loss + best_iter = last_best_iter + dump_info = ( + f"quantized {len(quantized_layer_names)}/{(len(quantized_layer_names) + len(unquantized_layer_names))} " + f"layers in the block, loss iter 0: {init_loss:.6f} -> iter {best_iter}: {last_loss:.6f}" + ) + logger.info(dump_info) + if len(unquantized_layer_names) != 0: + logger.info(f"{unquantized_layer_names} have not been quantized") + with torch.no_grad(): + unwrapper_block(block, best_params) + + if ( + is_nv_fp(self.act_data_type) + and hasattr(self, "formats") + and any("nv_fp" in format_ for format_ in self.formats) + ): + # enable moe experts act_max automatic generation for WrapperWALayer + set_amax_for_all_moe_layers(block, attr_name="orig_layer.act_max") + + if self.enable_quanted_input: + if self.low_cpu_mem_usage: + block = block.to(device) + clear_memory() + q_outputs = self._get_block_outputs( + block, + input_ids, + input_others, + self.batch_size * self.infer_bs_coeff, + device, + cache_device=self.cache_device, + ) + if self.device_map is not None: + accelerate.hooks.remove_hook_from_submodules(block) + mv_module_from_gpu(block, self.low_cpu_mem_usage) + clear_memory(input_ids) + + return q_outputs, output + + else: + if self.device_map is not None: + accelerate.hooks.remove_hook_from_submodules(block) + mv_module_from_gpu(block, self.low_cpu_mem_usage) + clear_memory(input_ids) + return None, output + + def _quantize_blocks( + self, + model: torch.nn.Module, + inputs: dict, + block_names: list, + q_input: torch.Tensor = None, + nblocks: int = 1, + device: str = "cpu", + pbar: tqdm = None, + ): + """Quantize and dequantize the weights of the specified blocks in the model. + + Args: + model: The PyTorch model to be quantized. + inputs: The input data for quantization. + block_names: The names of the blocks to be quantized and dequantized. + nblocks: The number of blocks to quantize and dequantize. + device: The device for quantization and dequantization. + + Returns: + None + """ + clear_memory() + for n, m in model.named_parameters(): + m.requires_grad_(False) + input_ids = inputs["input_ids"] + inputs.pop("input_ids", None) + input_others = inputs + clear_memory() + input_ids = to_device(input_ids, self.cache_device) + input_others = to_device(input_others, self.cache_device) + # As in calibration phase, we may use bf16 for calibration due to low_gpu_memory usage + tmp_dtype = self.amp_dtype if self.amp else torch.float32 + for i in range(len(input_ids)): + input_ids[i] = input_ids[i].to(tmp_dtype) + + for key in input_others.keys(): + if isinstance(input_others[key], torch.Tensor) and ( + input_others[key].dtype == torch.float16 or input_others[key].dtype == torch.bfloat16 + ): + input_others[key] = input_others[key].to(tmp_dtype) + elif isinstance(input_others[key], list): + for i in range(len(input_others[key])): + to_dtype(input_others[key][i], tmp_dtype) + + if ( + self.sym + and self.enable_alg_ext + and self.super_group_size is None + and ( + (self.data_type.startswith("int") and self.act_bits >= 8) + or self.data_type.startswith("mx") + or self.data_type.startswith("nv") + ) + ): + try: + from auto_round.alg_ext import quantize_block_ext + + BaseQuantizer.quantize_block_ext = quantize_block_ext + quantize_block = self.quantize_block_ext # must use self.quantize_block_ext + if self.bits > 2 and (not self.data_type.startswith("mx") or not self.data_type.startswith("nv")): + logger.warning( + "algorithm extension has only undergone limited validation on " + "INT2,mxfp4 and nvfp4; use with caution." + ) + else: + logger.info("using algorithm extension for quantization.") + except (ImportError, ModuleNotFoundError): + quantize_block = self._quantize_block + if self.enable_torch_compile: + quantize_block = compile_func(quantize_block, device) + else: + quantize_block = quantize_block + else: + quantize_block = self._quantize_block + if self.enable_torch_compile: + quantize_block = compile_func(quantize_block, device) + + if pbar is None: + pbar = tqdm(range(0, len(block_names), nblocks)) + + for i in range(0, len(block_names), nblocks): + if i != 0: + pbar.update(1) + if nblocks == 1: + n = block_names[i] + pbar.set_description(f"Quantizing {n}") + m = get_module(model, n) + else: + names = block_names[i : min(i + nblocks, len(block_names))] + pbar.set_description(f"Quantizing [{i + 1}-{min(i + nblocks, len(block_names))}]/{len(block_names)}") + modules = [get_module(model, n) for n in names] + m = WrapperMultiblock(modules) + + if not self.model.device.type == "meta" or self.low_cpu_mem_usage: + m = m.to(device) + + q_input, input_ids = quantize_block( + m, + input_ids, + input_others, + q_input=q_input, + device=device, + ) + if self.is_packing_immediate: + from auto_round.export import PACKING_LAYER_WITH_FORMAT + + for _, tmp_m in m.named_modules(): + if not (hasattr(tmp_m, "bits") and check_to_quantized(tmp_m)): + continue + target_backend = self.formats[0].split(":")[0] if ":" in self.formats[0] else self.formats[0] + has_gguf = any("gguf" in format_ for format_ in self.formats) + if has_gguf: + from auto_round.export.export_to_gguf.export import pack_gguf_layer + + output_dir = self._get_save_folder_name(self.formats[0]) + model_type = ModelType.MMPROJ if self.mllm else ModelType.TEXT + pack_gguf_layer( + tmp_m.tmp_name, + self.model, + self.formats[0], + output_dir, + self.layer_config, + self.tokenizer, + processor=self.processor if hasattr(self, "processor") else None, + image_processor=self.image_processor if hasattr(self, "image_processor") else None, + model_type=model_type, + ) + else: + PACKING_LAYER_WITH_FORMAT[target_backend]( + tmp_m.tmp_name, self.model, self.formats[0], device=self.device + ) + pbar.set_description("Quantizing done") + pbar.update(1) + pbar.close() + self.model = mv_module_from_gpu(self.model, self.low_cpu_mem_usage) + for n, m in self.model.named_modules(): + if hasattr(m, "name"): + delattr(m, "name") + + del q_input + del input_ids + del input_others + del inputs + + clear_memory() + + def _quantize_layers(self, layer_names: list, layer_inputs: dict) -> None: + """Quantizes specified layers based on inputs and configuration. + + Args: + layer_names (list): list of layer names to quantize. + layer_inputs (dict): Dictionary mapping layer names to input data. + + Returns: + None + """ + ##TODO currently we take all the layers outside blocks as post block layers which is not optimal + ## if there is no input for layer, we use rtn + + for layer_name in copy.deepcopy(layer_names): + if layer_name not in layer_inputs: + logger.info(f"using rtn to quantize {layer_name}") + from auto_round.data_type import QUANT_FUNC_WITH_DTYPE + + layer = get_module(self.model, layer_name) + if _is_fp8_model(self.model): + new_layer = convert_fp8_layer_to_linear(layer, self.amp_dtype).to(self.device) + set_module(self.model, layer_name, new_layer) + layer = new_layer + + if not self.disable_opt_rtn and "rtn_" + layer.data_type in QUANT_FUNC_WITH_DTYPE: + layer.data_type = "rtn_" + layer.data_type + logger.info("using optimized rtn method for quantizing %s", layer_name) + self.layer_config[layer_name]["data_type"] = layer.data_type + wrapper_layer = WrapperLinear( + layer, + enable_round_tuning=False, + enable_minmax_tuning=False, + enable_norm_bias_tuning=False, + device=self.device, + ) + new_layer = wrapper_layer.unwrapper({}) + set_module(self.model, layer_name, new_layer) + layer.cpu() + layer_names.remove(layer_name) + if len(layer_names) == 0: + return + q_layer_inputs = None + enable_quanted_input = self.enable_quanted_input + has_gguf = False + if hasattr(self, "formats"): + has_gguf = any("gguf" in format_ for format_ in self.formats) + if has_gguf and self.is_packing_immediate: + enable_quanted_input = False + + if hasattr(self.model, "hf_device_map") and len(self.model.hf_device_map) > 1 and enable_quanted_input: + dispatch_model(self.model, self.model.hf_device_map) + + if enable_quanted_input: + logger.info("starting to cache layer inputs for %s, this may be quite slow ", layer_names) + q_layer_inputs = self.try_cache_inter_data_gpucpu([], self.nsamples, layer_names=layer_names) + if hasattr(self.model, "hf_device_map") and len(self.model.hf_device_map) > 1: + accelerate.hooks.remove_hook_from_submodules( + self.model + ) ##self.model.hf_device_map has not been changed + + self.model = mv_module_from_gpu(self.model, self.low_cpu_mem_usage) + clear_memory() + if self.enable_torch_compile: + quant_layer = compile_func(self._quantize_layer, self.device) + else: + quant_layer = self._quantize_layer + for layer_name in layer_names: + layer_input = layer_inputs[layer_name] + layer_input = to_device(layer_input, self.cache_device) + q_layer_input = q_layer_inputs.get(layer_name, None) if q_layer_inputs is not None else None + q_layer_input = to_device(q_layer_input, self.cache_device) + quant_layer(layer_name, layer_input, q_layer_input, device=self.device) + del layer_input + clear_memory(q_layer_input) + + @torch.no_grad() + def _get_block_outputs( + self, + block: torch.nn.Module, + input_ids: torch.Tensor, + input_others: torch.Tensor, + bs: int, + device: Union[str, torch.device], + cache_device: Union[str, torch.device], + save_output: bool = True, + ): + """Compute the output of a given block of the model for a given input. + + Args: + block: The block of the model. + input_ids: The input tensor containing tokenized input ids. + input_others: A dictionary containing additional input data. + bs: The batch size for computing the output. + device: The device for computation. + cache_device: The device for storing the output. + batch_dim: The batch dimension of the output tensor. + + Returns: + The output tensor of the block. + """ + + output = [] + nsamples = len(input_ids) + for i in range(0, nsamples, bs): + end_index = min(nsamples, i + bs) + indices = torch.arange(i, end_index).to(torch.long) + tmp_input_ids, tmp_input_others = self._sampling_inputs( + input_ids, input_others, indices, self.seqlen, self.batch_dim, share_cache_keys=self.shared_cache_keys + ) + tmp_output = block_forward(block, tmp_input_ids, tmp_input_others, self.amp, self.amp_dtype, device).to( + cache_device + ) + if save_output: + if self.batch_size == 1: + output.append(tmp_output) + else: + output.extend(list(torch.split(tmp_output, 1, dim=self.batch_dim))) + if self.low_gpu_mem_usage: + clear_memory() + + return output + + @torch.no_grad() + def calib(self, nsamples, bs): + """Perform calibration for quantization. + + This method calibrates the model for quantization by processing a specified + number of samples from the calibration dataset. It ensures that the data is + properly formatted and feeds it to the model. If the number of samples processed + is less than the specified number, it logs a warning. If no samples are processed, + it logs an error and exits. + Args: + nsamples (int): The number of samples to use for calibration. + bs (int): The number of samples to use for calibration + """ + from auto_round.calib_dataset import get_dataloader + + if isinstance(self.dataset, str): + dataset = self.dataset.replace(" ", "") ##remove all whitespaces + + # slow here + self.dataloader = get_dataloader( + self.tokenizer, + self.seqlen, + dataset, + self.seed, + bs, + self.nsamples, + ) + else: + self.dataloader = self.dataset + total_cnt = 0 + + # load embed weight if use low_cpu_mem_usage + if self.low_cpu_mem_usage: + embed_layers = get_layers_before_block(self.model) + for n, m in embed_layers: + m = m.to(self.device) + + for data in self.dataloader: + if data is None: + continue + if isinstance(data, torch.Tensor): + input_ids = data.to(self.model.device) + data_new = input_ids + elif isinstance(data, str): + if self.tokenizer is None: + logger.error("please provide tokenizer for string input") + exit(-1) + data = self.tokenizer(data, truncation=True, max_length=self.seqlen, return_tensors="pt").data + data_new = {} + for key in data.keys(): + data_new[key] = data[key].to(self.model.device) + input_ids = data_new["input_ids"] + elif isinstance(data, tuple) or isinstance(data, list): + data_new = to_device(data) + input_ids = data_new[0] + else: + data_new = {} + for key in data.keys(): + data_new[key] = to_device(data[key], self.model.device) + if key == "images": + data_new[key] = to_dtype(data_new[key], self.model.dtype) + input_ids = data_new["input_ids"] + if input_ids.shape[-1] < self.seqlen: + continue + try: + if isinstance(data_new, torch.Tensor): + self.model(data_new, use_cache=False) + elif isinstance(data_new, tuple) or isinstance(data_new, list): + self.model(*data_new, use_cache=False) + else: + self.model(**data_new, use_cache=False) + except NotImplementedError: + pass + except RuntimeError as error: + error_msg = str(error) + if "The expanded size of the tensor" in str(error_msg) and "must match the existing size" in error_msg: + check_seqlen_compatible(self.seqlen, self.tokenizer, self.model) + logger.warning( + "When quantization encounters tensor shape mismatch error, " + "you can try to avoid it with batch_size=1" + ) + raise error + except Exception as error: + raise error + total_cnt += input_ids.shape[0] if len(input_ids.shape) > 1 else 1 + if total_cnt >= nsamples: + break + if total_cnt == 0: + logger.error( + f"no data has been cached, please provide more data with sequence length >={self.seqlen} in the " + f"dataset or decease the sequence length" + ) + exit(-1) + elif total_cnt < nsamples: + logger.warning( + f"An insufficient number of samples likely reduces the accuracy of the quantized model. " + f"Target samples count is {nsamples}, while valid samples count is {total_cnt}" + ) + + # clean embed weight to save memory + if self.low_cpu_mem_usage: + for n, m in embed_layers: + m = m.to("meta") + + @torch.no_grad() + def _get_block_forward_func(self, name: str) -> Callable: + """Gets the forward function. + + Args: + name (str): The name of the function. + Returns: + function: The forward function. + """ + + def post_process_cache_data(batch_size, data, data_name): + """ + Processes store data for batch handling, reshaping if necessary. + + Args: + batch_size (int): The size of the batch. + data: The data value to store, potentially for caching. + data_name (str): Name of the data. + + Returns: + Processed data or None + """ + new_data = data + if batch_size <= 1: + return new_data + if data_name in self.shared_cache_keys: + return None + if "alibi" in data_name: + if isinstance(data, torch.Tensor): + alibi = data + alibi = alibi.reshape(batch_size, -1, alibi.shape[1], alibi.shape[2]) + new_data = alibi + return new_data + + def forward(m, hidden_states=None, *positional_inputs, **kwargs): + """Rewrite forward function, process and collect input data. + + Args: + hidden_states (torch.Tensor): The hidden states tensor. + *positional_inputs: Variable number of positional arguments. + **kwargs: Variable number of keyword arguments. + + Returns: + NotImplementedError: Getting the first layer inputs and then raise the error to save runtime. + """ + if name not in self.inputs: + self.inputs[name] = {} + init_cache(positional_inputs, self.inputs[name]) + + if self.batch_dim is None: + self.batch_dim = 0 + if hidden_states is not None and self.batch_size > 1: + if hidden_states.shape[0] > self.batch_size: + self.batch_dim = 1 + if len(hidden_states.shape) > 1 and hidden_states.shape[1] > self.batch_size: + logger.error( + "this model has not been supported, " + "please raise an issue in https://github.com/intel/auto-round/issues" + " or try to set the `batch_size` to 1 and " + "`gradient_accumulate_steps` to your current batch size." + ) + exit(-1) + + if hidden_states is not None: + kwargs["hidden_states"] = hidden_states + + for key in kwargs.keys(): + if ( + isinstance(kwargs[key], torch.Tensor) + or isinstance(kwargs[key], list) + or isinstance(kwargs[key], tuple) + ): + if key not in self.inputs[name].keys(): # initialization + data = to_device(kwargs[key], device=torch.device("cpu")) + if data is None or (self.batch_size > 1 and key in self.shared_cache_keys): + self.inputs[name][key] = data + continue + if self.batch_size <= 1: + self.inputs[name][key] = [data] + else: + data = post_process_cache_data(self.batch_size, data, key) + self.inputs[name][key] = list(torch.split(data, 1, dim=self.batch_dim)) + else: # append cache inputs + new_data = post_process_cache_data(self.batch_size, kwargs[key], key) + if new_data is None: # shareable args or NoneType + continue + new_data = to_device(new_data, device=torch.device("cpu")) + if self.batch_size <= 1: + self.inputs[name][key].append(new_data) + else: + self.inputs[name][key].extend(list(torch.split(new_data, 1, dim=self.batch_dim))) + elif isinstance(kwargs[key], (str, bool, type(None))): + if key not in self.inputs[name].keys(): + self.inputs[name][key] = kwargs[key] + else: + # Parameters not to be cached + if check_skippable_keywords(key): + logger.warning_once( + f"Please note that '{key}' key" " is not currently used in quantization fine-tuning." + ) + reset_params(self.inputs[name]) + if name == self.last_cache_name: + raise NotImplementedError + else: + if hidden_states is not None: + kwargs.pop("hidden_states") + return m.orig_forward(hidden_states, *positional_inputs, **kwargs) + else: + # Currently only for Llama-3.2-Vision-Instruct Series + return m.orig_forward(*positional_inputs, **kwargs) + + return forward + + @torch.no_grad() + def _get_cache_data_hook_for_layer(self, name): + """A forward hook to save input max of a module + :param name: the module name + :return: A hook function.""" + + def cache_input_hook(module, inputs, outputs): + input = inputs + if isinstance(inputs, tuple) or isinstance(input, list): + input = inputs[0] + if name in self.inputs: + self.inputs[name].extend(list(torch.split(input.to("cpu"), 1, dim=0))) + else: + self.inputs[name] = list(torch.split(input.to("cpu"), 1, dim=0)) + + return cache_input_hook + + def _recover_forward(self): + """Recovers the forward function.""" + for n, m in self.model.named_modules(): + if hasattr(m, "orig_forward"): + m.forward = m.orig_forward + delattr(m, "orig_forward") + for hook_handle in self.hook_handles: + hook_handle.remove() + self.hook_handles = [] + + def _replace_forward(self): + """Replaces the forward function.""" + from functools import partial + + for n, m in self.model.named_modules(): + if n in self.to_cached_layers and not isinstance(m, tuple(self.supported_types)): ##block + m.orig_forward = m.forward + m.forward = partial(self._get_block_forward_func(n), m) + elif n in self.to_cached_layers: ##linear layer or conv1d layer + hook_func = self._get_cache_data_hook_for_layer(n) + hook_handle = m.register_forward_hook(hook_func) + self.hook_handles.append(hook_handle) + + def _quantize_layer( + self, layer_name: str, inputs: torch.Tensor, q_inputs: torch.Tensor = None, device: str = "cpu" + ): + """Quantize a specific layer of the model using the provided inputs. + + Args: + layer_name (str): The name of the layer to quantize. + inputs (torch.Tensor): Input data for quantization. + q_inputs (torch.Tensor, optional): Quantized input data. Defaults to None. + device (torch.device, optional): The device to use for quantization. Defaults to torch.device("cpu"). + + Returns: + None + """ + logger.info(f"quantizing layer {layer_name}") + layer = get_module(self.model, layer_name) + if hasattr(layer, "tuning_device"): + device = layer.tuning_device + + layer = layer.to(device) + for i in range(len(inputs)): + inputs[i] = inputs[i].to(layer.weight.dtype) + if q_inputs is not None: + q_inputs[i] = q_inputs[i].to(layer.weight.dtype) + + wrapper_linear = WrapperLinear(layer, enable_minmax_tuning=self.enable_minmax_tuning, device=device).to(device) + round_params = [] + minmax_params = [] + for key in wrapper_linear.params.keys(): + if "min" in key or "max" in key: + minmax_params.append(wrapper_linear.params[key]) + else: + round_params.append(wrapper_linear.value) + if len(round_params) + len(minmax_params) <= 0: + dump_info = f"quantized {layer_name}" + logger.info(dump_info) + with torch.no_grad(): + unwrapper_layer(self.model, wrapper_linear, layer_name, {}) + mv_module_from_gpu(layer, self.low_cpu_mem_usage) + + lr = torch.tensor(self.lr) + minmax_lr = torch.tensor(self.minmax_lr) + if self.enable_minmax_tuning: + optimizer = self.optimizer( + [{"params": round_params}, {"params": minmax_params, "lr": minmax_lr}], lr=lr, weight_decay=0 + ) + else: + optimizer = self.optimizer(round_params, lr=lr, weight_decay=0) + + if self.lr_scheduler is None: + lr_schedule = torch.optim.lr_scheduler.LinearLR( + optimizer, start_factor=1.0, end_factor=0.0, total_iters=self.iters + ) + else: + lr_schedule = copy.deepcopy(self.lr_scheduler) + nsamples = len(inputs) + last_best_iter = 0 + best_loss = torch.finfo(torch.float).max + scaler = self._get_scaler() # pylint: disable=assignment-from-none + init_loss = None + gradient_accumulate_steps = self.batch_size ##Force to low gpu + batch_size = 1 ##Force to low gpu + pick_samples = batch_size * gradient_accumulate_steps + pick_samples = min(nsamples, pick_samples) + if self.sampler != "rand": + whole_indices = torch.randperm(nsamples)[:pick_samples] + total_loss = 0 + num_elm = 1 + mse_reduction = "mean" + if gradient_accumulate_steps != 1: + mse_reduction = "sum" + mse_loss = torch.nn.MSELoss(reduction=mse_reduction).to(device) + + for i in range(self.iters): + total_loss = 0 + if self.sampler == "rand": + whole_indices = torch.randperm(nsamples)[:pick_samples] + if gradient_accumulate_steps != 1: + if q_inputs is not None: + current_input = [q_inputs[i] for i in whole_indices] + else: + current_input = [inputs[i] for i in whole_indices] + num_elm = sum(id.numel() for id in current_input) + for tmp_step in range(gradient_accumulate_steps): + indices = whole_indices[tmp_step * batch_size : (tmp_step + 1) * batch_size] + if q_inputs is not None: + current_input = [q_inputs[i] for i in indices] + current_input = torch.cat(current_input, dim=0).to(device) + org_input = [inputs[i] for i in indices] + org_input = torch.cat(org_input, dim=0).to(device) + else: + current_input = [inputs[i] for i in indices] + current_input = torch.cat(current_input, dim=0).to(device) + org_input = current_input + with torch.no_grad(): + current_output = layer(org_input) + + if self.amp: + with autocast(device_type=device.split(":")[0], dtype=self.amp_dtype): + output_q = wrapper_linear(current_input) # pylint: disable=not-callable + loss = mse_loss(output_q, current_output) # pylint: disable=not-callable + else: + output_q = wrapper_linear(current_input) # pylint: disable=not-callable + loss = mse_loss( # pylint: disable=not-callable + output_q.to(torch.float32), current_output.to(torch.float32) + ) + total_loss += loss.item() / num_elm + + self._scale_loss_and_backward(scaler, loss) + if i == 0: + init_loss = total_loss + + if total_loss < best_loss: + best_loss = total_loss + if not self.not_use_best_mse: + best_params = collect_best_params(wrapper_linear) + last_best_iter = i + if self.not_use_best_mse and i == self.iters - 1: + best_params = collect_best_params(wrapper_linear) + + if not self.not_use_best_mse: + if 0 < self.dynamic_max_gap <= i - last_best_iter: + break + self._step(scaler, optimizer, lr_schedule) + + last_loss = total_loss + best_iter = self.iters + if not self.not_use_best_mse: + last_loss = best_loss + best_iter = last_best_iter + with torch.no_grad(): + unwrapper_layer(self.model, wrapper_linear, layer_name, best_params) + mv_module_from_gpu(layer, self.low_cpu_mem_usage) + dump_info = f"quantized {layer_name}, loss iter 0: {init_loss:.6f} -> iter {best_iter}: {last_loss:.6f}" + logger.info(dump_info) + + def _register_act_max_hook(self, model): + def get_act_max_hook(module, input, output): + if isinstance(input, (tuple, list)): + input = input[0] + if input.numel() == 0: + return # as no needs for act_max update + input, _, _ = reshape_pad_tensor_by_group_size(input, self.act_group_size) + act_max = torch.max(torch.abs(input), dim=-1).values + if not hasattr(module, "act_max") or module.act_max.numel() == 0: + module.act_max = act_max + else: + act_max = act_max.to(module.act_max.device) + if is_nv_fp(self.act_data_type): ## for nvfp per-tensor input_global_scale calculation usage + module.act_max = torch.max( + torch.tensor([act_max.max(), module.act_max.max()], device=act_max.device) + ) + else: + module.act_max = torch.max(act_max, module.act_max) + + hook_handles = [] + + for n, m in model.named_modules(): + if ( + hasattr(m, "act_dynamic") + and check_need_act_calibration(m.act_dynamic, m.act_data_type, m.act_bits) + and check_to_quantized(m) + ): + hook = m.register_forward_hook(get_act_max_hook) + hook_handles.append(hook) + continue + + # for whole model, RTN + if n in self.layer_config: + config = self.layer_config[n] + act_dynamic = config.get("act_dynamic", True) + act_data_type = config.get("act_data_type", None) + act_bits = config.get("act_data_type", 16) + if ( + config["bits"] <= 8 + and check_need_act_calibration(act_dynamic, act_data_type, act_bits) + and check_to_quantized(config) + ): + hook = m.register_forward_hook(get_act_max_hook) + hook_handles.append(hook) + continue + return hook_handles + + @classmethod + @torch.no_grad() + def _sampling_inputs( + cls, + input_ids: list[torch.Tensor], + input_others: dict, + indices: list[int], + seqlen: int, + batch_dim: int = 0, + share_cache_keys: tuple = (), + ): + """Samples inputs based on the given indices and sequence length. + + Args: + input_ids: The list of input tensor containing input_ids. + input_others: A dictionary containing other input data. + indices: The indices to sample from the input. + seqlen: The sequence length. + + Returns: + current_input_ids: The sampled input IDs. + current_input_others: The sampled other input data. + """ + current_input_ids = [input_ids[i] for i in indices] + + current_input_ids = torch.cat(current_input_ids, dim=batch_dim) + + current_input_others = {"positional_inputs": input_others["positional_inputs"]} + for key in input_others.keys(): + if "positional_inputs" in key: + continue + if (key not in share_cache_keys or len(indices) == 1) and not isinstance( + input_others[key], (str, bool, type(None)) + ): + current_input_others[key] = None + if input_others[key] is not None: + current_input_others[key] = [input_others[key][i] for i in indices] + if len(indices) == 1: + current_input_others[key] = current_input_others[key][0] + else: + try: + current_input_others[key] = torch.cat(current_input_others[key], dim=0) + except TypeError as err: + logger.warning_once("Please check the model cache inputs or try setting batch_size to 1.") + else: + current_input_others[key] = input_others[key] + + return current_input_ids, current_input_others diff --git a/auto_round/quantizers/model_type.py b/auto_round/quantizers/model_type.py new file mode 100644 index 00000000..6a26bde0 --- /dev/null +++ b/auto_round/quantizers/model_type.py @@ -0,0 +1,23 @@ +# Copyright (c) 2025 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from auto_round.quantizers.base import BaseQuantizer + + +class LLMQuantizer(BaseQuantizer): + pass + + +class MLLMQuantizer(BaseQuantizer): + pass