From 13fcd44567b4c7c932628a7fe1c682a94dc206d7 Mon Sep 17 00:00:00 2001 From: Varshith-Yadav Date: Wed, 26 Nov 2025 15:52:45 +0530 Subject: [PATCH 1/3] [NNCF] Enable data-aware weight compression for MatMul with transpose_b=False --- .../algorithms/weight_compression/gptq.py | 140 +++++++++++++----- .../weight_compression/openvino_backend.py | 9 +- .../weight_compression/weight_lowering.py | 6 +- 3 files changed, 113 insertions(+), 42 deletions(-) diff --git a/src/nncf/quantization/algorithms/weight_compression/gptq.py b/src/nncf/quantization/algorithms/weight_compression/gptq.py index b90f2e0574b..45b646d5a5a 100644 --- a/src/nncf/quantization/algorithms/weight_compression/gptq.py +++ b/src/nncf/quantization/algorithms/weight_compression/gptq.py @@ -212,18 +212,25 @@ def _quantize_weights( if wc_params.node_with_weight.metatype in self._backend_entity.convolution_metatypes: msg = "Convolution metatypes are not supported" raise RuntimeError(msg) - if not wc_params.node_with_weight.layer_attributes.constant_attributes[wc_params.weight_port_id]["transpose"]: - msg = "Transpose is not supported" - raise RuntimeError(msg) weight_tensor = self._backend_entity.get_weight( wc_params.node_with_weight, wc_params.weight_port_id, model, graph ) weight_tensor = fns.astype(weight_tensor, TensorDataType.float32) + + # Get transpose_b value to handle weight shape correctly + transpose_b = wc_params.node_with_weight.layer_attributes.constant_attributes[wc_params.weight_port_id]["transpose"] dead_indices = fns.diag(hessian) == 0 hessian[dead_indices, dead_indices] = 1 - weight_tensor[:, dead_indices] = 0 + + # Handle weight shape based on transpose_b + if transpose_b: + # transpose_b=True: weight shape is [out_features, in_features] + weight_tensor[:, dead_indices] = 0 + else: + # transpose_b=False: weight shape is [in_features, out_features] + weight_tensor[dead_indices, :] = 0 scales = [] zero_points = [] @@ -235,7 +242,7 @@ def _quantize_weights( group_size = ( wc_params.compression_config.group_size if wc_params.compression_config.group_size != -1 - else weight_tensor.shape[1] + else (weight_tensor.shape[1] if transpose_b else weight_tensor.shape[0]) ) reduction_axes = wc_params.reduction_axes block_compression_config = WeightCompressionConfig( @@ -254,38 +261,69 @@ def _quantize_weights( i2 = min(i1 + self._block_size, columns) count = i2 - i1 - weight_block = weight_tensor[:, i1:i2].clone() + # Handle weight indexing based on transpose_b + if transpose_b: + # transpose_b=True: weight shape is [out_features, in_features] + weight_block = weight_tensor[:, i1:i2].clone() + else: + # transpose_b=False: weight shape is [in_features, out_features] + weight_block = weight_tensor[i1:i2, :].clone() quantized_block = fns.zeros_like(weight_block) error_block = fns.zeros_like(weight_block) loss_block = fns.zeros_like(weight_block) hessian_inv_block = hessian_inv[i1:i2, i1:i2] for i in range(count): - weight_col = weight_block[:, i] + if transpose_b: + weight_col = weight_block[:, i] + else: + weight_col = weight_block[i, :] hessian_diag_val = hessian_inv_block[i, i] if (i1 + i) % group_size == 0: if not block_compression_config.is_integer: - scale = calculate_float_quantization_params( - weight_tensor[:, (i1 + i) : (i1 + i + group_size)], reduction_axes, block_compression_config - ) + if transpose_b: + scale = calculate_float_quantization_params( + weight_tensor[:, (i1 + i) : (i1 + i + group_size)], reduction_axes, block_compression_config + ) + else: + scale = calculate_float_quantization_params( + weight_tensor[(i1 + i) : (i1 + i + group_size), :], reduction_axes, block_compression_config + ) scales.append(scale) else: if self._scale_estimation and block_compression_config.num_bits == 4: - activations = [inp[..., (i1 + i) : (i1 + i + group_size)] for inp in inputs] - wc_statistics = ScaleEstimation.activations_to_wc_statistics(activations) - scale, zero_point = ScaleEstimation.calculate_quantization_params( - wc_statistics, - weight_tensor[:, (i1 + i) : (i1 + i + group_size)], - reduction_axes, - block_compression_config, - ) + if transpose_b: + activations = [inp[..., (i1 + i) : (i1 + i + group_size)] for inp in inputs] + wc_statistics = ScaleEstimation.activations_to_wc_statistics(activations) + scale, zero_point = ScaleEstimation.calculate_quantization_params( + wc_statistics, + weight_tensor[:, (i1 + i) : (i1 + i + group_size)], + reduction_axes, + block_compression_config, + ) + else: + activations = [inp[..., (i1 + i) : (i1 + i + group_size)] for inp in inputs] + wc_statistics = ScaleEstimation.activations_to_wc_statistics(activations) + scale, zero_point = ScaleEstimation.calculate_quantization_params( + wc_statistics, + weight_tensor[(i1 + i) : (i1 + i + group_size), :], + reduction_axes, + block_compression_config, + ) else: - scale, zero_point = calculate_integer_quantization_params( - weight_tensor[:, (i1 + i) : (i1 + i + group_size)], - reduction_axes, - block_compression_config, - ) + if transpose_b: + scale, zero_point = calculate_integer_quantization_params( + weight_tensor[:, (i1 + i) : (i1 + i + group_size)], + reduction_axes, + block_compression_config, + ) + else: + scale, zero_point = calculate_integer_quantization_params( + weight_tensor[(i1 + i) : (i1 + i + group_size), :], + reduction_axes, + block_compression_config, + ) scales.append(scale) zero_points.append(zero_point) @@ -303,19 +341,40 @@ def _quantize_weights( precomputed_zero_point=zero_points[-1], ) quantized_col = fns.flatten(quantized_col) - quantized_block[:, i] = quantized_col - loss_block[:, i] = (weight_col - quantized_col) ** 2 / hessian_diag_val**2 + if transpose_b: + quantized_block[:, i] = quantized_col + else: + quantized_block[i, :] = quantized_col + loss_col = (weight_col - quantized_col) ** 2 / hessian_diag_val**2 + if transpose_b: + loss_block[:, i] = loss_col + else: + loss_block[i, :] = loss_col error_col = (weight_col - quantized_col) / hessian_diag_val - weight_block[:, i:] -= fns.matmul( - fns.unsqueeze(error_col, 1), fns.unsqueeze(hessian_inv_block[i, i:], 0) - ) - error_block[:, i] = error_col - - quantized_tensor[:, i1:i2] = quantized_block - losses[:, i1:i2] = loss_block / 2 - - weight_tensor[:, i2:] -= fns.matmul(error_block, hessian_inv[i1:i2, i2:]) + if transpose_b: + weight_block[:, i:] -= fns.matmul( + fns.unsqueeze(error_col, 1), fns.unsqueeze(hessian_inv_block[i, i:], 0) + ) + error_block[:, i] = error_col + else: + weight_block[i:, :] -= fns.matmul( + fns.unsqueeze(error_col, 0), fns.unsqueeze(hessian_inv_block[i:, i], 1) + ) + error_block[i, :] = error_col + + if transpose_b: + quantized_tensor[:, i1:i2] = quantized_block + losses[:, i1:i2] = loss_block / 2 + weight_tensor[:, i2:] -= fns.matmul(error_block, hessian_inv[i1:i2, i2:]) + else: + quantized_tensor[i1:i2, :] = quantized_block + losses[i1:i2, :] = loss_block / 2 + # For transpose_b=False: error_block shape is [i2-i1, out_features] + # hessian_inv[i2:, i1:i2] shape is [columns-i2, i2-i1] + # We need to transpose error_block to get [out_features, i2-i1] + # Then: hessian_inv[i2:, i1:i2] @ error_block^T gives [columns-i2, out_features] + weight_tensor[i2:, :] -= fns.matmul(hessian_inv[i2:, i1:i2], fns.transpose(error_block)) quantized_tensor = quantized_tensor.reshape(weight_tensor.shape).astype(weight_tensor.dtype) self._backend_entity.set_weight( @@ -325,13 +384,14 @@ def _quantize_weights( scales = fns.stack(scales, axis=1) if wc_params.compression_config.group_size == -1: scales = fns.squeeze(scales, axis=-1) - if wc_params.compression_config.mode in [ + + zero_points_tensor = None + if zero_points and zero_points[0] is not None and wc_params.compression_config.mode in [ CompressWeightsMode.INT8_ASYM, CompressWeightsMode.INT4_ASYM, ]: - zero_points = fns.stack(zero_points, axis=1) + zero_points_tensor = fns.stack(zero_points, axis=1) if wc_params.compression_config.group_size == -1: - zero_points = fns.squeeze(zero_points, axis=-1) - else: - zero_points = None - return scales, zero_points + zero_points_tensor = fns.squeeze(zero_points_tensor, axis=-1) + + return scales, zero_points_tensor diff --git a/src/nncf/quantization/algorithms/weight_compression/openvino_backend.py b/src/nncf/quantization/algorithms/weight_compression/openvino_backend.py index 3ec241b36c6..4c719906d80 100644 --- a/src/nncf/quantization/algorithms/weight_compression/openvino_backend.py +++ b/src/nncf/quantization/algorithms/weight_compression/openvino_backend.py @@ -177,6 +177,11 @@ def insert_adapters( activation_dtype = input_node.get_element_type() should_add_convert_node = activation_dtype != ov.Type.f16 mm_node = self.name_to_node_mapping[wc_params.node_with_weight.node_name] + + # Get the original MatMul's transpose attributes + node_attributes = mm_node.get_attributes() + transpose_a = node_attributes.get("transpose_a", False) + transpose_b = node_attributes.get("transpose_b", True) # Default to True for backward compatibility if int8_lora: const_node_name = wc_params.node_with_weight.node_name @@ -203,7 +208,9 @@ def insert_adapters( A_W = opset.constant(lora_A.data) B_W = opset.constant(lora_B.data) - A_MM = opset.matmul(input_node, A_W, transpose_a=False, transpose_b=True) + # LoRA adapters: input @ A^T @ B^T + # Always keep transpose_b=True to ensure the adapter aligns with the MatMul output shape + A_MM = opset.matmul(input_node, A_W, transpose_a=transpose_a, transpose_b=True) B_MM = opset.matmul(A_MM, B_W, transpose_a=False, transpose_b=True) node_output_port = mm_node.output(0) diff --git a/src/nncf/quantization/algorithms/weight_compression/weight_lowering.py b/src/nncf/quantization/algorithms/weight_compression/weight_lowering.py index d0c96e952fb..be9b2c710e3 100644 --- a/src/nncf/quantization/algorithms/weight_compression/weight_lowering.py +++ b/src/nncf/quantization/algorithms/weight_compression/weight_lowering.py @@ -643,7 +643,11 @@ def _calculate_integer_quantized_weight( compressed_weights = weight / scale if zero_point is not None: - compressed_weights += zero_point.astype(weight.dtype) + zp = zero_point.astype(weight.dtype) + if zp.ndim < compressed_weights.ndim: + new_shape = list(zp.shape) + [1] * (compressed_weights.ndim - zp.ndim) + zp = fns.reshape(zp, new_shape) + compressed_weights += zp compressed_weights = fns.round(compressed_weights) compressed_weights = fns.clip(compressed_weights, level_low, level_high).astype(dtype) From ea6e5e3c59ad32a25f7a63b890800b14fd983553 Mon Sep 17 00:00:00 2001 From: Varshith-Yadav Date: Tue, 2 Dec 2025 02:17:14 +0530 Subject: [PATCH 2/3] Refactor: Use slice_weight helper instead of full transpose --- .../algorithms/weight_compression/gptq.py | 108 ++++++------------ .../algorithms/weight_compression/utils.py | 102 +++++++++++++++++ .../quantization/test_utils_slice_weight.py | 93 +++++++++++++++ 3 files changed, 233 insertions(+), 70 deletions(-) create mode 100644 src/nncf/quantization/algorithms/weight_compression/utils.py create mode 100644 tests/openvino/native/quantization/test_utils_slice_weight.py diff --git a/src/nncf/quantization/algorithms/weight_compression/gptq.py b/src/nncf/quantization/algorithms/weight_compression/gptq.py index 45b646d5a5a..083f1864710 100644 --- a/src/nncf/quantization/algorithms/weight_compression/gptq.py +++ b/src/nncf/quantization/algorithms/weight_compression/gptq.py @@ -27,6 +27,13 @@ from nncf.quantization.algorithms.weight_compression.config import WeightCompressionParameters from nncf.quantization.algorithms.weight_compression.parameters import CompressedWeight from nncf.quantization.algorithms.weight_compression.scale_estimation import ScaleEstimation +from nncf.quantization.algorithms.weight_compression.utils import ( + assign_weight_column, + assign_weight_slice, + extract_weight_column, + slice_weight, + zero_mask_columns, +) from nncf.quantization.algorithms.weight_compression.weight_lowering import calculate_float_quantization_params from nncf.quantization.algorithms.weight_compression.weight_lowering import calculate_integer_quantization_params from nncf.quantization.algorithms.weight_compression.weight_lowering import float_quantize_dequantize_weight @@ -224,13 +231,8 @@ def _quantize_weights( dead_indices = fns.diag(hessian) == 0 hessian[dead_indices, dead_indices] = 1 - # Handle weight shape based on transpose_b - if transpose_b: - # transpose_b=True: weight shape is [out_features, in_features] - weight_tensor[:, dead_indices] = 0 - else: - # transpose_b=False: weight shape is [in_features, out_features] - weight_tensor[dead_indices, :] = 0 + # Zero out dead indices using utility helper + zero_mask_columns(weight_tensor, dead_indices, transpose_b) scales = [] zero_points = [] @@ -261,69 +263,41 @@ def _quantize_weights( i2 = min(i1 + self._block_size, columns) count = i2 - i1 - # Handle weight indexing based on transpose_b - if transpose_b: - # transpose_b=True: weight shape is [out_features, in_features] - weight_block = weight_tensor[:, i1:i2].clone() - else: - # transpose_b=False: weight shape is [in_features, out_features] - weight_block = weight_tensor[i1:i2, :].clone() + # Extract weight block using utility helper + weight_block = slice_weight(weight_tensor, i1, i2, transpose_b).clone() quantized_block = fns.zeros_like(weight_block) error_block = fns.zeros_like(weight_block) loss_block = fns.zeros_like(weight_block) hessian_inv_block = hessian_inv[i1:i2, i1:i2] for i in range(count): - if transpose_b: - weight_col = weight_block[:, i] - else: - weight_col = weight_block[i, :] + weight_col = extract_weight_column(weight_block, i, transpose_b) hessian_diag_val = hessian_inv_block[i, i] if (i1 + i) % group_size == 0: if not block_compression_config.is_integer: - if transpose_b: - scale = calculate_float_quantization_params( - weight_tensor[:, (i1 + i) : (i1 + i + group_size)], reduction_axes, block_compression_config - ) - else: - scale = calculate_float_quantization_params( - weight_tensor[(i1 + i) : (i1 + i + group_size), :], reduction_axes, block_compression_config - ) + weight_slice = slice_weight(weight_tensor, i1 + i, i1 + i + group_size, transpose_b) + scale = calculate_float_quantization_params( + weight_slice, reduction_axes, block_compression_config + ) scales.append(scale) else: + weight_slice = slice_weight(weight_tensor, i1 + i, i1 + i + group_size, transpose_b) if self._scale_estimation and block_compression_config.num_bits == 4: - if transpose_b: - activations = [inp[..., (i1 + i) : (i1 + i + group_size)] for inp in inputs] - wc_statistics = ScaleEstimation.activations_to_wc_statistics(activations) - scale, zero_point = ScaleEstimation.calculate_quantization_params( - wc_statistics, - weight_tensor[:, (i1 + i) : (i1 + i + group_size)], - reduction_axes, - block_compression_config, - ) - else: - activations = [inp[..., (i1 + i) : (i1 + i + group_size)] for inp in inputs] - wc_statistics = ScaleEstimation.activations_to_wc_statistics(activations) - scale, zero_point = ScaleEstimation.calculate_quantization_params( - wc_statistics, - weight_tensor[(i1 + i) : (i1 + i + group_size), :], - reduction_axes, - block_compression_config, - ) + activations = [inp[..., (i1 + i) : (i1 + i + group_size)] for inp in inputs] + wc_statistics = ScaleEstimation.activations_to_wc_statistics(activations) + scale, zero_point = ScaleEstimation.calculate_quantization_params( + wc_statistics, + weight_slice, + reduction_axes, + block_compression_config, + ) else: - if transpose_b: - scale, zero_point = calculate_integer_quantization_params( - weight_tensor[:, (i1 + i) : (i1 + i + group_size)], - reduction_axes, - block_compression_config, - ) - else: - scale, zero_point = calculate_integer_quantization_params( - weight_tensor[(i1 + i) : (i1 + i + group_size), :], - reduction_axes, - block_compression_config, - ) + scale, zero_point = calculate_integer_quantization_params( + weight_slice, + reduction_axes, + block_compression_config, + ) scales.append(scale) zero_points.append(zero_point) @@ -341,35 +315,29 @@ def _quantize_weights( precomputed_zero_point=zero_points[-1], ) quantized_col = fns.flatten(quantized_col) - if transpose_b: - quantized_block[:, i] = quantized_col - else: - quantized_block[i, :] = quantized_col + assign_weight_column(quantized_block, i, quantized_col, transpose_b) loss_col = (weight_col - quantized_col) ** 2 / hessian_diag_val**2 - if transpose_b: - loss_block[:, i] = loss_col - else: - loss_block[i, :] = loss_col + assign_weight_column(loss_block, i, loss_col, transpose_b) error_col = (weight_col - quantized_col) / hessian_diag_val if transpose_b: weight_block[:, i:] -= fns.matmul( fns.unsqueeze(error_col, 1), fns.unsqueeze(hessian_inv_block[i, i:], 0) ) - error_block[:, i] = error_col + assign_weight_column(error_block, i, error_col, transpose_b) else: weight_block[i:, :] -= fns.matmul( fns.unsqueeze(error_col, 0), fns.unsqueeze(hessian_inv_block[i:, i], 1) ) - error_block[i, :] = error_col + assign_weight_column(error_block, i, error_col, transpose_b) + assign_weight_slice(quantized_tensor, i1, i2, quantized_block, transpose_b) + assign_weight_slice(losses, i1, i2, loss_block / 2, transpose_b) + + # Update remaining weights with error propagation if transpose_b: - quantized_tensor[:, i1:i2] = quantized_block - losses[:, i1:i2] = loss_block / 2 weight_tensor[:, i2:] -= fns.matmul(error_block, hessian_inv[i1:i2, i2:]) else: - quantized_tensor[i1:i2, :] = quantized_block - losses[i1:i2, :] = loss_block / 2 # For transpose_b=False: error_block shape is [i2-i1, out_features] # hessian_inv[i2:, i1:i2] shape is [columns-i2, i2-i1] # We need to transpose error_block to get [out_features, i2-i1] diff --git a/src/nncf/quantization/algorithms/weight_compression/utils.py b/src/nncf/quantization/algorithms/weight_compression/utils.py new file mode 100644 index 00000000000..4c150c7ec7d --- /dev/null +++ b/src/nncf/quantization/algorithms/weight_compression/utils.py @@ -0,0 +1,102 @@ +# Copyright (c) 2025 Intel Corporation +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from nncf.tensor import Tensor + + +def slice_weight(weight: Tensor, start: int, end: int, transpose_b: bool) -> Tensor: + """ + Return a view/clone of the requested block without transposing the whole tensor. + + If transpose_b is True, weight layout is [out_features, in_features] + and we return weight[:, start:end] (in_features slice). + If transpose_b is False, layout is [in_features, out_features] + and we return weight[start:end, :] (in_features slice). + + :param weight: The weight tensor to slice. + :param start: Start index for the slice (inclusive). + :param end: End index for the slice (exclusive). + :param transpose_b: Whether the weight is transposed (True) or not (False). + :return: A slice of the weight tensor. + """ + if transpose_b: + return weight[:, start:end] + else: + return weight[start:end, :] + + +def extract_weight_column(weight: Tensor, index: int, transpose_b: bool) -> Tensor: + """ + Extract a single column/row from weight based on transpose_b. + + If transpose_b is True: returns weight[:, index] (a column) + If transpose_b is False: returns weight[index, :] (a row) + + :param weight: The weight tensor to extract from. + :param index: The index of the column/row to extract. + :param transpose_b: Whether the weight is transposed (True) or not (False). + :return: A single column or row from the weight tensor. + """ + if transpose_b: + return weight[:, index] + else: + return weight[index, :] + + +def assign_weight_slice(target_weight: Tensor, start: int, end: int, block: Tensor, transpose_b: bool) -> None: + """ + Assign block back to target_weight in the same orientation used by slice_weight. + This performs in-place assignment. + + :param target_weight: The target weight tensor to assign to. + :param start: Start index for the slice (inclusive). + :param end: End index for the slice (exclusive). + :param block: The block of data to assign. + :param transpose_b: Whether the weight is transposed (True) or not (False). + """ + if transpose_b: + target_weight[:, start:end] = block + else: + target_weight[start:end, :] = block + + +def assign_weight_column(target_weight: Tensor, index: int, column: Tensor, transpose_b: bool) -> None: + """ + Assign a single column/row back to target_weight. + This performs in-place assignment. + + :param target_weight: The target weight tensor to assign to. + :param index: The index of the column/row to assign. + :param column: The column/row data to assign. + :param transpose_b: Whether the weight is transposed (True) or not (False). + """ + if transpose_b: + target_weight[:, index] = column + else: + target_weight[index, :] = column + + +def zero_mask_columns(weight: Tensor, mask: Tensor, transpose_b: bool) -> None: + """ + Zero out columns/rows based on boolean mask. + + If transpose_b is True: zeros weight[:, mask] (columns) + If transpose_b is False: zeros weight[mask, :] (rows) + + :param weight: The weight tensor to modify in-place. + :param mask: Boolean mask indicating which columns/rows to zero. + :param transpose_b: Whether the weight is transposed (True) or not (False). + """ + if transpose_b: + weight[:, mask] = 0 + else: + weight[mask, :] = 0 + diff --git a/tests/openvino/native/quantization/test_utils_slice_weight.py b/tests/openvino/native/quantization/test_utils_slice_weight.py new file mode 100644 index 00000000000..c2d04160d64 --- /dev/null +++ b/tests/openvino/native/quantization/test_utils_slice_weight.py @@ -0,0 +1,93 @@ +import numpy as np +import pytest +import torch +from nncf.quantization.algorithms.weight_compression import utils + + +@pytest.mark.parametrize( + "shape, transpose_b, start, end", + [ + # transpose_b=True means weight layout is [out_features, in_features] -> slice columns + ((5, 8), True, 1, 4), + ((3, 6), True, 0, 3), + # transpose_b=False means weight layout is [in_features, out_features] -> slice rows + ((8, 5), False, 2, 6), + ((6, 3), False, 0, 2), + ], +) +def test_slice_and_assign_weight_block(shape, transpose_b, start, end): + """ + Verify slice_weight returns the expected sub-block and assign_weight_slice writes it back + in the correct orientation for both transpose_b True and False. + """ + + weight = np.arange(np.prod(shape), dtype=np.int64).reshape(shape) + block = utils.slice_weight(weight, start, end, transpose_b) + + # Expected block depending on transpose_b semantics + if transpose_b: + expected_block = weight[:, start:end] + else: + expected_block = weight[start:end, :] + + # The returned block should match the expected slice + np.testing.assert_array_equal(block, expected_block) + + # Prepare a new block to assign (different values) + new_block = np.full(expected_block.shape, fill_value=123, dtype=weight.dtype) + + # Assign it back using the helper + utils.assign_weight_slice(weight, start, end, new_block, transpose_b) + if transpose_b: + np.testing.assert_array_equal(weight[:, start:end], new_block) + else: + np.testing.assert_array_equal(weight[start:end, :], new_block) + +def test_zero_mask_columns(): + """ + Verifies that zero_mask_columns correctly zeros out channels + based on the boolean mask and transpose_b setting. + """ + shape = (4, 4) + # Create a mask: e.g., index 1 and 3 are True (should be zeroed) + mask = np.array([False, True, False, True]) + + # CASE 1: transpose_b=True (Layout [Out, In] -> Columns are inputs) + weight = np.ones(shape, dtype=np.int32) + utils.zero_mask_columns(weight, mask, transpose_b=True) + + # Columns 1 and 3 should be 0, others 1 + expected = np.ones(shape, dtype=np.int32) + expected[:, mask] = 0 + np.testing.assert_array_equal(weight, expected) + + # CASE 2: transpose_b=False (Layout [In, Out] -> Rows are inputs) + weight = np.ones(shape, dtype=np.int32) + utils.zero_mask_columns(weight, mask, transpose_b=False) + + # Rows 1 and 3 should be 0, others 1 + expected = np.ones(shape, dtype=np.int32) + expected[mask, :] = 0 + np.testing.assert_array_equal(weight, expected) + + + + +def test_slice_utils_pytorch_compatibility(): + """ + Ensures the helpers work with torch.Tensor objects, not just numpy arrays. + """ + # [In, Out] = [4, 2] + # transpose_b=False + weight = torch.tensor([[1, 2], [3, 4], [5, 6], [7, 8]]) + + # 1. Test Slicing (taking middle 2 rows) + block = utils.slice_weight(weight, 1, 3, transpose_b=False) + assert torch.equal(block, torch.tensor([[3, 4], [5, 6]])) + + # 2. Test Assigning + new_data = torch.tensor([[10, 10], [10, 10]]) + utils.assign_weight_slice(weight, 1, 3, new_data, transpose_b=False) + + expected = torch.tensor([[1, 2], [10, 10], [10, 10], [7, 8]]) + assert torch.equal(weight, expected) From 80ef438d78a4bd187539f6db84b9ae1def57805f Mon Sep 17 00:00:00 2001 From: Varshith-Yadav Date: Wed, 10 Dec 2025 00:36:13 +0530 Subject: [PATCH 3/3] Refactor weight compression to support transpose_b=False across all algorithms --- .../algorithms/weight_compression/awq.py | 32 +++-- .../algorithms/weight_compression/gptq.py | 54 ++++---- .../weight_compression/lora_correction.py | 11 +- .../weight_compression/openvino_backend.py | 8 +- .../weight_compression/scale_estimation.py | 8 +- .../weight_compression/tensor_slicing.py | 55 ++++++++ .../algorithms/weight_compression/utils.py | 102 -------------- .../quantization/test_utils_slice_weight.py | 130 +++++++----------- 8 files changed, 171 insertions(+), 229 deletions(-) create mode 100644 src/nncf/quantization/algorithms/weight_compression/tensor_slicing.py delete mode 100644 src/nncf/quantization/algorithms/weight_compression/utils.py diff --git a/src/nncf/quantization/algorithms/weight_compression/awq.py b/src/nncf/quantization/algorithms/weight_compression/awq.py index 508ad57060d..b2950f9787a 100644 --- a/src/nncf/quantization/algorithms/weight_compression/awq.py +++ b/src/nncf/quantization/algorithms/weight_compression/awq.py @@ -29,6 +29,8 @@ from nncf.quantization.algorithms.weight_compression.activation_stats import process_stats from nncf.quantization.algorithms.weight_compression.backend import WeightCompressionAlgoBackend from nncf.quantization.algorithms.weight_compression.config import WeightCompressionParameters +from nncf.quantization.algorithms.weight_compression.tensor_slicing import get_weight_slice +from nncf.quantization.algorithms.weight_compression.tensor_slicing import set_weight_slice from nncf.quantization.algorithms.weight_compression.weight_lowering import float_quantize_dequantize_weight from nncf.quantization.algorithms.weight_compression.weight_lowering import integer_quantize_dequantize_weight from nncf.quantization.passes import transform_to_inference_graph @@ -181,7 +183,7 @@ def apply( prev_weight = self._backend_entity.get_weight(merge_node, prev_weight_port_id, model, graph) prev_statistics = statistics[merge_node.node_name] - scale = self._data_aware_step(wp, weight, statistics[k], prev_weight, prev_statistics) + scale = self._data_aware_step(wp, weight, statistics[k], prev_weight, prev_statistics, weight_port_id) w_scale = fns.unsqueeze(scale, 1 - wp.reduction_axes[0]) a_scale = fns.unsqueeze(1.0 / scale, wp.reduction_axes[0]) @@ -210,7 +212,7 @@ def apply( return transformed_model - def _data_aware_step(self, wp, weight, statistics, prev_weight=None, prev_statistics=None): + def _data_aware_step(self, wp, weight, statistics, prev_weight=None, prev_statistics=None, weight_port_id=None): alpha_step = (self._alpha_max - self._alpha_min) / self._steps config = wp.compression_config s, X = process_stats(statistics, self._subset_size) @@ -220,6 +222,9 @@ def _data_aware_step(self, wp, weight, statistics, prev_weight=None, prev_statis assert isinstance(wp.reduction_axes, tuple) and len(wp.reduction_axes) == 1 reduction_axis = wp.reduction_axes[0] + # Get transpose_b value to handle weight shape correctly + transpose_b = wp.node_with_weight.layer_attributes.constant_attributes[weight_port_id]["transpose"] + prev_s, prev_w = None, None if prev_statistics is not None and prev_weight is not None: prev_s, _ = process_stats(prev_statistics, self._subset_size) @@ -239,9 +244,10 @@ def _data_aware_step(self, wp, weight, statistics, prev_weight=None, prev_statis groups_to_correct = list(groups_to_correct) - if reduction_axis == 0: - weight = fns.transpose(weight) - reduction_axis = 1 + # Remove the old transpose logic - we'll use get_weight_slice instead + # if reduction_axis == 0: + # weight = fns.transpose(weight) + # reduction_axis = 1 shape_vector = fns.mean(X, axis=1) scale = fns.ones_like(shape_vector) @@ -257,7 +263,8 @@ def _data_aware_step(self, wp, weight, statistics, prev_weight=None, prev_statis a_max = 1e2 gscale = fns.clip(gscale, a_min=a_min, a_max=a_max) - gweight = weight[:, offset : offset + group_size] + # Use get_weight_slice instead of hardcoded slicing + gweight = get_weight_slice(weight, slice(offset, offset + group_size), transpose_b) gacts = X[offset : offset + group_size, :] fp32_out = fns.matmul(gweight, gacts) @@ -274,18 +281,14 @@ def _data_aware_step(self, wp, weight, statistics, prev_weight=None, prev_statis ) # take the threshold from the fp16 type with some margin # per channel magnitudes for the previous MatMul # mean(abs(prev_weight)) * max(abs((prev_activation))) * prev_weight.shape[reduction_axis] - magnitudes = ( - (prev_w[offset : offset + group_size] / cur_scale) * prev_s * prev_weight.shape[reduction_axis] - ) + prev_w_slice = prev_w[offset : offset + group_size] + magnitudes = (prev_w_slice / cur_scale) * prev_s * prev_weight.shape[reduction_axis] if magnitudes.max() >= threshold: cur_scale = AWQ._clamp_scale( magnitudes, threshold, cur_scale, - prev_w[offset : offset + group_size] - * prev_s - * prev_weight.shape[reduction_axis] - / threshold, + prev_w_slice * prev_s * prev_weight.shape[reduction_axis] / threshold, ) weights_to_fake_quantize = gweight * cur_scale @@ -307,7 +310,8 @@ def _data_aware_step(self, wp, weight, statistics, prev_weight=None, prev_statis alpha += alpha_step if best_scale is not None: - scale.data[offset : offset + group_size] = best_scale.data + # Use set_weight_slice for assignment + set_weight_slice(scale, slice(offset, offset + group_size), best_scale, transpose_b) return scale diff --git a/src/nncf/quantization/algorithms/weight_compression/gptq.py b/src/nncf/quantization/algorithms/weight_compression/gptq.py index 083f1864710..002cb356bcd 100644 --- a/src/nncf/quantization/algorithms/weight_compression/gptq.py +++ b/src/nncf/quantization/algorithms/weight_compression/gptq.py @@ -27,13 +27,8 @@ from nncf.quantization.algorithms.weight_compression.config import WeightCompressionParameters from nncf.quantization.algorithms.weight_compression.parameters import CompressedWeight from nncf.quantization.algorithms.weight_compression.scale_estimation import ScaleEstimation -from nncf.quantization.algorithms.weight_compression.utils import ( - assign_weight_column, - assign_weight_slice, - extract_weight_column, - slice_weight, - zero_mask_columns, -) +from nncf.quantization.algorithms.weight_compression.tensor_slicing import get_weight_slice +from nncf.quantization.algorithms.weight_compression.tensor_slicing import set_weight_slice from nncf.quantization.algorithms.weight_compression.weight_lowering import calculate_float_quantization_params from nncf.quantization.algorithms.weight_compression.weight_lowering import calculate_integer_quantization_params from nncf.quantization.algorithms.weight_compression.weight_lowering import float_quantize_dequantize_weight @@ -224,15 +219,17 @@ def _quantize_weights( wc_params.node_with_weight, wc_params.weight_port_id, model, graph ) weight_tensor = fns.astype(weight_tensor, TensorDataType.float32) - + # Get transpose_b value to handle weight shape correctly - transpose_b = wc_params.node_with_weight.layer_attributes.constant_attributes[wc_params.weight_port_id]["transpose"] + transpose_b = wc_params.node_with_weight.layer_attributes.constant_attributes[wc_params.weight_port_id][ + "transpose" + ] dead_indices = fns.diag(hessian) == 0 hessian[dead_indices, dead_indices] = 1 - + # Zero out dead indices using utility helper - zero_mask_columns(weight_tensor, dead_indices, transpose_b) + set_weight_slice(weight_tensor, dead_indices, 0, transpose_b) scales = [] zero_points = [] @@ -264,25 +261,25 @@ def _quantize_weights( count = i2 - i1 # Extract weight block using utility helper - weight_block = slice_weight(weight_tensor, i1, i2, transpose_b).clone() + weight_block = get_weight_slice(weight_tensor, slice(i1, i2), transpose_b).clone() quantized_block = fns.zeros_like(weight_block) error_block = fns.zeros_like(weight_block) loss_block = fns.zeros_like(weight_block) hessian_inv_block = hessian_inv[i1:i2, i1:i2] for i in range(count): - weight_col = extract_weight_column(weight_block, i, transpose_b) + weight_col = get_weight_slice(weight_block, i, transpose_b) hessian_diag_val = hessian_inv_block[i, i] if (i1 + i) % group_size == 0: if not block_compression_config.is_integer: - weight_slice = slice_weight(weight_tensor, i1 + i, i1 + i + group_size, transpose_b) + weight_slice = get_weight_slice(weight_tensor, slice(i1 + i, i1 + i + group_size), transpose_b) scale = calculate_float_quantization_params( weight_slice, reduction_axes, block_compression_config ) scales.append(scale) else: - weight_slice = slice_weight(weight_tensor, i1 + i, i1 + i + group_size, transpose_b) + weight_slice = get_weight_slice(weight_tensor, slice(i1 + i, i1 + i + group_size), transpose_b) if self._scale_estimation and block_compression_config.num_bits == 4: activations = [inp[..., (i1 + i) : (i1 + i + group_size)] for inp in inputs] wc_statistics = ScaleEstimation.activations_to_wc_statistics(activations) @@ -315,25 +312,25 @@ def _quantize_weights( precomputed_zero_point=zero_points[-1], ) quantized_col = fns.flatten(quantized_col) - assign_weight_column(quantized_block, i, quantized_col, transpose_b) + set_weight_slice(quantized_block, i, quantized_col, transpose_b) loss_col = (weight_col - quantized_col) ** 2 / hessian_diag_val**2 - assign_weight_column(loss_block, i, loss_col, transpose_b) + set_weight_slice(loss_block, i, loss_col, transpose_b) error_col = (weight_col - quantized_col) / hessian_diag_val if transpose_b: weight_block[:, i:] -= fns.matmul( fns.unsqueeze(error_col, 1), fns.unsqueeze(hessian_inv_block[i, i:], 0) ) - assign_weight_column(error_block, i, error_col, transpose_b) + set_weight_slice(error_block, i, error_col, transpose_b) else: weight_block[i:, :] -= fns.matmul( fns.unsqueeze(error_col, 0), fns.unsqueeze(hessian_inv_block[i:, i], 1) ) - assign_weight_column(error_block, i, error_col, transpose_b) + set_weight_slice(error_block, i, error_col, transpose_b) + + set_weight_slice(quantized_tensor, slice(i1, i2), quantized_block, transpose_b) + set_weight_slice(losses, slice(i1, i2), loss_block / 2, transpose_b) - assign_weight_slice(quantized_tensor, i1, i2, quantized_block, transpose_b) - assign_weight_slice(losses, i1, i2, loss_block / 2, transpose_b) - # Update remaining weights with error propagation if transpose_b: weight_tensor[:, i2:] -= fns.matmul(error_block, hessian_inv[i1:i2, i2:]) @@ -354,10 +351,15 @@ def _quantize_weights( scales = fns.squeeze(scales, axis=-1) zero_points_tensor = None - if zero_points and zero_points[0] is not None and wc_params.compression_config.mode in [ - CompressWeightsMode.INT8_ASYM, - CompressWeightsMode.INT4_ASYM, - ]: + if ( + zero_points + and zero_points[0] is not None + and wc_params.compression_config.mode + in [ + CompressWeightsMode.INT8_ASYM, + CompressWeightsMode.INT4_ASYM, + ] + ): zero_points_tensor = fns.stack(zero_points, axis=1) if wc_params.compression_config.group_size == -1: zero_points_tensor = fns.squeeze(zero_points_tensor, axis=-1) diff --git a/src/nncf/quantization/algorithms/weight_compression/lora_correction.py b/src/nncf/quantization/algorithms/weight_compression/lora_correction.py index 0fe478dfab5..2035b9068d5 100644 --- a/src/nncf/quantization/algorithms/weight_compression/lora_correction.py +++ b/src/nncf/quantization/algorithms/weight_compression/lora_correction.py @@ -121,6 +121,12 @@ def calculate_adapters( layer_name = wc_params.node_with_weight.node_name layer_statistics = self._statistics[layer_name] is_debug = self._debug_interface is not None + + # Get transpose_b value to handle weight shape correctly + transpose_b = wc_params.node_with_weight.layer_attributes.constant_attributes[wc_params.weight_port_id][ + "transpose" + ] + lora_A, lora_B, mean_noises = self.calculate_low_rank_matrices( weight, compressed_weight, @@ -129,6 +135,7 @@ def calculate_adapters( self._lora_correction_params, layer_statistics, is_debug, + transpose_b=transpose_b, ) if is_debug: self._debug_interface.add_noises(layer_name, mean_noises) @@ -143,6 +150,7 @@ def calculate_low_rank_matrices( lora_correction_params: AdvancedLoraCorrectionParameters, layer_statistics: WCTensorStatistic, is_debug: Optional[bool] = False, + transpose_b: bool = True, # Add this parameter with default True for backward compatibility ): """ Calculates low rank matrices for a given original and compressed weights. @@ -190,7 +198,8 @@ def calculate_low_rank_matrices( # O stands for output dimension, H - input dimension or hidden size, SS - samples size, R - rank. # reduction axes is all axes except output dimension in linear/conv layers. - if reduction_axes[0] == 1: + # Use transpose_b directly instead of inferring from reduction_axes + if not transpose_b: svd_residual = fns.transpose(svd_residual) residual = svd_residual.clone() # [H, O] diff --git a/src/nncf/quantization/algorithms/weight_compression/openvino_backend.py b/src/nncf/quantization/algorithms/weight_compression/openvino_backend.py index 4c719906d80..2fe1dd5dd0a 100644 --- a/src/nncf/quantization/algorithms/weight_compression/openvino_backend.py +++ b/src/nncf/quantization/algorithms/weight_compression/openvino_backend.py @@ -63,6 +63,7 @@ from nncf.quantization.algorithms.weight_compression.parameters import CompressedWeight from nncf.quantization.algorithms.weight_compression.weight_lowering import compress_weight from nncf.tensor import Tensor +from nncf.tensor import functions as fns from nncf.tensor.definitions import TensorDataType from nncf.tensor.functions.openvino_numeric import DTYPE_MAP_REV @@ -177,12 +178,17 @@ def insert_adapters( activation_dtype = input_node.get_element_type() should_add_convert_node = activation_dtype != ov.Type.f16 mm_node = self.name_to_node_mapping[wc_params.node_with_weight.node_name] - + # Get the original MatMul's transpose attributes node_attributes = mm_node.get_attributes() transpose_a = node_attributes.get("transpose_a", False) transpose_b = node_attributes.get("transpose_b", True) # Default to True for backward compatibility + # Transpose lora_B if the original MatMul had transpose_b=False + # This ensures the matrix multiplication A_MM @ B_W has compatible dimensions + if not transpose_b: + lora_B = fns.transpose(lora_B) + if int8_lora: const_node_name = wc_params.node_with_weight.node_name int8_compression_config = WeightCompressionConfig(mode=CompressWeightsMode.INT8_ASYM, group_size=-1) diff --git a/src/nncf/quantization/algorithms/weight_compression/scale_estimation.py b/src/nncf/quantization/algorithms/weight_compression/scale_estimation.py index 4ad557b9868..565790c95a6 100644 --- a/src/nncf/quantization/algorithms/weight_compression/scale_estimation.py +++ b/src/nncf/quantization/algorithms/weight_compression/scale_estimation.py @@ -141,6 +141,9 @@ def apply( weight = self._backend_entity.get_weight(wp.node_with_weight, weight_port_id, model, graph) + # Get transpose_b value to handle weight shape correctly + transpose_b = wp.node_with_weight.layer_attributes.constant_attributes[weight_port_id]["transpose"] + scale, zero_point = self.calculate_quantization_params( stats, weight, @@ -150,6 +153,7 @@ def apply( self._initial_steps, self._scale_steps, self._weight_penalty, + transpose_b=transpose_b, ) res[weight_name] = CompressedWeight(None, scale, zero_point, None) @@ -165,6 +169,7 @@ def calculate_quantization_params( initial_steps: int = 5, scale_steps: int = 10, weight_penalty: float = -1.0, + transpose_b: bool = True, # Add this parameter with default True for backward compatibility ) -> Tensor: """ Calculates the quantization parameters for a given set of weights and activations. @@ -199,7 +204,8 @@ def calculate_quantization_params( is_3d_weight = len(weight.shape) == 3 was_transposed = False - if reduction_axis == 0 or (reduction_axis == 1 and is_3d_weight): + # Use transpose_b directly instead of inferring from reduction_axis + if not transpose_b or (reduction_axis == 1 and is_3d_weight): # Weights # 3D: [num_experts, hidden_dimension, out_features] -> [num_experts, out_features, hidden_dimension] # 2D: [hidden_dimension, out_features] -> [out_features, hidden_dimension] diff --git a/src/nncf/quantization/algorithms/weight_compression/tensor_slicing.py b/src/nncf/quantization/algorithms/weight_compression/tensor_slicing.py new file mode 100644 index 00000000000..ce2360046fb --- /dev/null +++ b/src/nncf/quantization/algorithms/weight_compression/tensor_slicing.py @@ -0,0 +1,55 @@ +# Copyright (c) 2025 Intel Corporation +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Union + +from nncf.tensor import Tensor + +# slice is a built-in type, so we don't need to import it. +# slice_obj can be: an int (index), a slice (start:end), or a Tensor/Array (mask/indices) + + +def get_weight_slice( + weight: Tensor, + slice_obj: Union[int, slice, Tensor], + is_transposed: bool, +) -> Tensor: + """ + Generic helper to get a subset of weights along the input channel dimension. + + :param weight: The weight tensor. + :param slice_obj: An integer index, a slice(start, end), or a boolean mask/index tensor. + :param is_transposed: True if weight is [Out, In], False if [In, Out]. + :return: A slice of the weight tensor. + """ + if is_transposed: + return weight[:, slice_obj] + return weight[slice_obj, :] + + +def set_weight_slice( + weight: Tensor, + slice_obj: Union[int, slice, Tensor], + value: Union[Tensor, float, int], + is_transposed: bool, +) -> None: + """ + Generic helper to set a subset of weights along the input channel dimension. + + :param weight: The target tensor to modify in-place. + :param slice_obj: An integer index, a slice(start, end), or a boolean mask/index tensor. + :param value: The value(s) to assign. + :param is_transposed: True if weight is [Out, In], False if [In, Out]. + """ + if is_transposed: + weight[:, slice_obj] = value + else: + weight[slice_obj, :] = value diff --git a/src/nncf/quantization/algorithms/weight_compression/utils.py b/src/nncf/quantization/algorithms/weight_compression/utils.py deleted file mode 100644 index 4c150c7ec7d..00000000000 --- a/src/nncf/quantization/algorithms/weight_compression/utils.py +++ /dev/null @@ -1,102 +0,0 @@ -# Copyright (c) 2025 Intel Corporation -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from nncf.tensor import Tensor - - -def slice_weight(weight: Tensor, start: int, end: int, transpose_b: bool) -> Tensor: - """ - Return a view/clone of the requested block without transposing the whole tensor. - - If transpose_b is True, weight layout is [out_features, in_features] - and we return weight[:, start:end] (in_features slice). - If transpose_b is False, layout is [in_features, out_features] - and we return weight[start:end, :] (in_features slice). - - :param weight: The weight tensor to slice. - :param start: Start index for the slice (inclusive). - :param end: End index for the slice (exclusive). - :param transpose_b: Whether the weight is transposed (True) or not (False). - :return: A slice of the weight tensor. - """ - if transpose_b: - return weight[:, start:end] - else: - return weight[start:end, :] - - -def extract_weight_column(weight: Tensor, index: int, transpose_b: bool) -> Tensor: - """ - Extract a single column/row from weight based on transpose_b. - - If transpose_b is True: returns weight[:, index] (a column) - If transpose_b is False: returns weight[index, :] (a row) - - :param weight: The weight tensor to extract from. - :param index: The index of the column/row to extract. - :param transpose_b: Whether the weight is transposed (True) or not (False). - :return: A single column or row from the weight tensor. - """ - if transpose_b: - return weight[:, index] - else: - return weight[index, :] - - -def assign_weight_slice(target_weight: Tensor, start: int, end: int, block: Tensor, transpose_b: bool) -> None: - """ - Assign block back to target_weight in the same orientation used by slice_weight. - This performs in-place assignment. - - :param target_weight: The target weight tensor to assign to. - :param start: Start index for the slice (inclusive). - :param end: End index for the slice (exclusive). - :param block: The block of data to assign. - :param transpose_b: Whether the weight is transposed (True) or not (False). - """ - if transpose_b: - target_weight[:, start:end] = block - else: - target_weight[start:end, :] = block - - -def assign_weight_column(target_weight: Tensor, index: int, column: Tensor, transpose_b: bool) -> None: - """ - Assign a single column/row back to target_weight. - This performs in-place assignment. - - :param target_weight: The target weight tensor to assign to. - :param index: The index of the column/row to assign. - :param column: The column/row data to assign. - :param transpose_b: Whether the weight is transposed (True) or not (False). - """ - if transpose_b: - target_weight[:, index] = column - else: - target_weight[index, :] = column - - -def zero_mask_columns(weight: Tensor, mask: Tensor, transpose_b: bool) -> None: - """ - Zero out columns/rows based on boolean mask. - - If transpose_b is True: zeros weight[:, mask] (columns) - If transpose_b is False: zeros weight[mask, :] (rows) - - :param weight: The weight tensor to modify in-place. - :param mask: Boolean mask indicating which columns/rows to zero. - :param transpose_b: Whether the weight is transposed (True) or not (False). - """ - if transpose_b: - weight[:, mask] = 0 - else: - weight[mask, :] = 0 - diff --git a/tests/openvino/native/quantization/test_utils_slice_weight.py b/tests/openvino/native/quantization/test_utils_slice_weight.py index c2d04160d64..ecd2389538c 100644 --- a/tests/openvino/native/quantization/test_utils_slice_weight.py +++ b/tests/openvino/native/quantization/test_utils_slice_weight.py @@ -1,93 +1,55 @@ +# Copyright (c) 2025 Intel Corporation +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import numpy as np +import openvino as ov import pytest -import torch -from nncf.quantization.algorithms.weight_compression import utils +from openvino import opset13 as opset +import nncf +from nncf import CompressWeightsMode -@pytest.mark.parametrize( - "shape, transpose_b, start, end", - [ - # transpose_b=True means weight layout is [out_features, in_features] -> slice columns - ((5, 8), True, 1, 4), - ((3, 6), True, 0, 3), - # transpose_b=False means weight layout is [in_features, out_features] -> slice rows - ((8, 5), False, 2, 6), - ((6, 3), False, 0, 2), - ], -) -def test_slice_and_assign_weight_block(shape, transpose_b, start, end): - """ - Verify slice_weight returns the expected sub-block and assign_weight_slice writes it back - in the correct orientation for both transpose_b True and False. - """ - weight = np.arange(np.prod(shape), dtype=np.int64).reshape(shape) - block = utils.slice_weight(weight, start, end, transpose_b) +def get_transpose_b_false_model(): + """Creates model with [In, Out] weight layout (transpose_b=False)""" + input_shape = [1, 32] + input_node = opset.parameter(input_shape, name="Input") + # Weight shape [32, 16] -> Input=32, Output=16 + weight_data = np.random.rand(32, 16).astype(np.float32) + matmul_node = opset.matmul(input_node, weight_data, transpose_a=False, transpose_b=False, name="MatMul") + result_node = opset.result(matmul_node, name="Result") + return ov.Model([result_node], [input_node], "transpose_b_false_model") - # Expected block depending on transpose_b semantics - if transpose_b: - expected_block = weight[:, start:end] - else: - expected_block = weight[start:end, :] - # The returned block should match the expected slice - np.testing.assert_array_equal(block, expected_block) - - # Prepare a new block to assign (different values) - new_block = np.full(expected_block.shape, fill_value=123, dtype=weight.dtype) - - # Assign it back using the helper - utils.assign_weight_slice(weight, start, end, new_block, transpose_b) - if transpose_b: - np.testing.assert_array_equal(weight[:, start:end], new_block) - else: - np.testing.assert_array_equal(weight[start:end, :], new_block) - -def test_zero_mask_columns(): - """ - Verifies that zero_mask_columns correctly zeros out channels - based on the boolean mask and transpose_b setting. - """ - shape = (4, 4) - # Create a mask: e.g., index 1 and 3 are True (should be zeroed) - mask = np.array([False, True, False, True]) - - # CASE 1: transpose_b=True (Layout [Out, In] -> Columns are inputs) - weight = np.ones(shape, dtype=np.int32) - utils.zero_mask_columns(weight, mask, transpose_b=True) - - # Columns 1 and 3 should be 0, others 1 - expected = np.ones(shape, dtype=np.int32) - expected[:, mask] = 0 - np.testing.assert_array_equal(weight, expected) - - # CASE 2: transpose_b=False (Layout [In, Out] -> Rows are inputs) - weight = np.ones(shape, dtype=np.int32) - utils.zero_mask_columns(weight, mask, transpose_b=False) - - # Rows 1 and 3 should be 0, others 1 - expected = np.ones(shape, dtype=np.int32) - expected[mask, :] = 0 - np.testing.assert_array_equal(weight, expected) - - - - -def test_slice_utils_pytorch_compatibility(): +@pytest.mark.parametrize( + "params", [{"awq": True}, {"gptq": True}, {"scale_estimation": True}, {"lora_correction": True}] +) +def test_compress_weights_algorithms_transpose_b_false(params): """ - Ensures the helpers work with torch.Tensor objects, not just numpy arrays. + Checks that ALL data-aware algorithms support transpose_b=False + without crashing. """ - # [In, Out] = [4, 2] - # transpose_b=False - weight = torch.tensor([[1, 2], [3, 4], [5, 6], [7, 8]]) - - # 1. Test Slicing (taking middle 2 rows) - block = utils.slice_weight(weight, 1, 3, transpose_b=False) - assert torch.equal(block, torch.tensor([[3, 4], [5, 6]])) - - # 2. Test Assigning - new_data = torch.tensor([[10, 10], [10, 10]]) - utils.assign_weight_slice(weight, 1, 3, new_data, transpose_b=False) - - expected = torch.tensor([[1, 2], [10, 10], [10, 10], [7, 8]]) - assert torch.equal(weight, expected) + model = get_transpose_b_false_model() + + # Dummy dataset for calibration + dataset = nncf.Dataset([np.random.rand(1, 32).astype(np.float32) for _ in range(3)]) + + # We use INT4_ASYM as it supports all these advanced algorithms + try: + nncf.compress_weights( + model, + mode=CompressWeightsMode.INT4_ASYM, + dataset=dataset, + subset_size=2, + **params, # Unpacks to awq=True, gptq=True, etc. + ) + except Exception as e: + pytest.fail(f"Algorithm {list(params.keys())[0]} failed for transpose_b=False. Error: {e}")