diff --git a/CHANGELOG.md b/CHANGELOG.md index 28cb1c9850..554cd501f7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ This project adheres to [Semantic Versioning](https://semver.org/). - [#3407](https://github.com/plotly/dash/pull/3407) Add `hidden` to callback arguments, hiding the callback from appearing in the devtool callback graph. - [#3424](https://github.com/plotly/dash/pull/3424) Adds support for `Patch` on clientside callbacks class `dash_clientside.Patch`, as well as supporting side updates, eg: (Running, SetProps). - [#3347](https://github.com/plotly/dash/pull/3347) Added 'api_endpoint' to `callback` to expose api endpoints at the provided path for use to be executed directly without dash. +- [#3452](https://github.com/plotly/dash/issues/3452) Add `CompressionManager` parameter to callback decorators enabling automatic server-side compression/decompression of dcc.Store component data. Supports Gzip, Deflate, and Brotli compression algorithms with configurable compression levels and size thresholds to reduce network payload sizes for large data transfers. ## Fixed - [#3395](https://github.com/plotly/dash/pull/3395) Fix Components added through set_props() cannot trigger related callback functions. Fix [#3316](https://github.com/plotly/dash/issues/3316) diff --git a/dash/_callback.py b/dash/_callback.py index aacb8dbdde..6baebcb673 100644 --- a/dash/_callback.py +++ b/dash/_callback.py @@ -43,6 +43,7 @@ from .background_callback.managers import BaseBackgroundCallbackManager from ._callback_context import context_value from ._no_update import NoUpdate +from ._compression import get_compression_manager_from_kwargs async def _async_invoke_callback( @@ -279,6 +280,7 @@ def insert_callback( no_output=False, optional=False, hidden=False, + compression_manager=None, ): if prevent_initial_call is None: prevent_initial_call = config_prevent_initial_callbacks @@ -319,6 +321,7 @@ def insert_callback( "manager": manager, "allow_dynamic_callbacks": dynamic_creator, "no_output": no_output, + "compression_manager": compression_manager, } callback_list.append(callback_spec) @@ -653,6 +656,7 @@ def register_callback( no_output=not has_output, optional=_kwargs.get("optional", False), hidden=_kwargs.get("hidden", False), + compression_manager=get_compression_manager_from_kwargs(_kwargs), ) # pylint: disable=too-many-locals @@ -670,6 +674,9 @@ def wrap_func(func): callback_id, ) + # Get compression manager for this callback + compression_manager = get_compression_manager_from_kwargs(_kwargs) + @wraps(func) def add_context(*args, **kwargs): """Handles synchronous callbacks with context management.""" @@ -687,6 +694,12 @@ def add_context(*args, **kwargs): args, kwargs, inputs_state_indices, has_output, insert_output ) + # Decompress inputs if compression manager is available + if compression_manager: + func_args = compression_manager.decompress_callback_inputs( + func_args, inputs_state_indices + ) + response: dict = {"multi": True} jsonResponse = None @@ -720,6 +733,12 @@ def add_context(*args, **kwargs): else: raise err + # Compress outputs if compression manager is available + if compression_manager: + output_value = compression_manager.compress_callback_outputs( + output_value, output_spec + ) + _prepare_response( output_value, output_spec, @@ -759,6 +778,12 @@ async def async_add_context(*args, **kwargs): args, kwargs, inputs_state_indices, has_output, insert_output ) + # Decompress inputs if compression manager is available + if compression_manager: + func_args = compression_manager.decompress_callback_inputs( + func_args, inputs_state_indices + ) + response: dict = {"multi": True} try: @@ -792,6 +817,12 @@ async def async_add_context(*args, **kwargs): else: raise err + # Compress outputs if compression manager is available + if compression_manager: + output_value = compression_manager.compress_callback_outputs( + output_value, output_spec + ) + _prepare_response( output_value, output_spec, diff --git a/dash/_compression.py b/dash/_compression.py new file mode 100644 index 0000000000..0ff4c5d53c --- /dev/null +++ b/dash/_compression.py @@ -0,0 +1,334 @@ +"""Store compression managers for Dash callbacks. + +This module provides compression managers enabling callback-level compression for +Store components to reduce network payload sizes. +""" + +from abc import ABC, abstractmethod +import base64 +import json +import gzip +import zlib +import logging +from typing import Any, Dict, List, Union, Tuple, Optional + +try: + import brotli + + BROTLI_AVAILABLE = True +except ImportError: + BROTLI_AVAILABLE = False + +logger = logging.getLogger(__name__) + + +class BaseStoreCompressionManager(ABC): + """ + Abstract base class for Store compression managers. + """ + + def __init__( + self, + level: int = 6, + threshold: int = 0, # by default always compress data + ): + """Initialize compression manager. + + Args: + level: Compression level (1-9, algorithm dependent) + threshold: Minimum data size to compress (bytes) + """ + self.level = self._validate_level(level) + if threshold < 0: + raise ValueError("threshold argument should not be negative") + self.threshold = threshold + + def _validate_level(self, level: int) -> int: + """Validate compression level for this algorithm.""" + if not isinstance(level, int) or level < 1 or level > 9: + raise ValueError(f"Compression level must be 1-9, got {level}") + return level + + @property + @abstractmethod + def algorithm_name(self) -> str: + """Return the algorithm name for payload metadata.""" + + @abstractmethod + def _compress_bytes(self, data: bytes) -> bytes: + """Compress raw bytes using algorithm-specific method.""" + + @abstractmethod + def _decompress_bytes(self, data: bytes) -> bytes: + """Decompress raw bytes using algorithm-specific method.""" + + def should_compress(self, data: Any) -> bool: + """Check if data meets compression threshold. + + Args: + data: Data to potentially compress + + Returns: + True if data should be compressed + """ + if data is None: + return False + if self.threshold == 0: # default threshold will always compress data + return True + + # Convert to JSON to estimate size + try: + json_str = json.dumps(data, default=str) + return len(json_str.encode("utf-8")) >= self.threshold + except (TypeError, ValueError): + # If we can't serialize, don't compress + return False + + def compress_store_data(self, data: Any) -> Union[Dict[str, Any], Any]: + """Compress Store data with metadata for later decompression. + + Args: + data: Python object to compress + + Returns: + Compressed payload dict or original data if compression failed/skipped + """ + try: + # Check if we should compress + if not self.should_compress(data): + return data + + # Serialize to JSON + json_str = json.dumps(data, default=str) + json_bytes = json_str.encode("utf-8") + original_size = len(json_bytes) + + # Compress the data + compressed_bytes = self._compress_bytes(json_bytes) + + compressed_size = len(compressed_bytes) + + # Only return compressed if we actually saved space + if compressed_size >= original_size: + logger.debug( + "Compression ineffective: %d -> %d", original_size, compressed_size + ) + return data + + # Return structured payload + return { + "compressed": True, + "algorithm": self.algorithm_name, + "level": self.level, + "data": base64.b64encode(compressed_bytes).decode("ascii"), + "original_size": original_size, + "compressed_size": compressed_size, + } + + except (TypeError, ValueError, OSError, UnicodeError) as e: + # Graceful fallback on compression failure + logger.warning("Store compression failed: %s", e) + return data + + def decompress_store_data(self, payload: Any) -> Any: + """Decompress Store data payload. + + Args: + payload: Data that may be compressed payload or original data + + Returns: + Decompressed Python object or original payload + """ + # Check if this is a compressed payload + if not self._is_compressed_payload(payload): + return payload + + try: + algorithm = payload["algorithm"] + if algorithm != self.algorithm_name: + logger.error( + "🚨 Algorithm mismatch: expected %s, got %s", + self.algorithm_name, + algorithm, + ) + return payload + + # Decode and decompress + compressed_bytes = base64.b64decode(payload["data"]) + json_bytes = self._decompress_bytes(compressed_bytes) + json_str = json_bytes.decode("utf-8") + + return json.loads(json_str) + + except (TypeError, ValueError, OSError, UnicodeError, KeyError) as e: + logger.error("🚨 Store decompression failed: %s", e) + # Return original payload as fallback + return payload + + def _is_compressed_payload(self, payload: Any) -> bool: + """Check if payload is a compressed data structure.""" + return ( + isinstance(payload, dict) + and payload.get("compressed") is True + and "algorithm" in payload + and "data" in payload + ) + + def compress_callback_outputs( + self, output_value: Any, output_spec: List[Dict[str, Any]] + ) -> Any: + """Compress ALL Store outputs in this callback with same settings. + + Args: + output_value: Callback return value (single value or tuple/list) + output_spec: List of output specifications + + Returns: + Processed output_value with Store data compressed + """ + if not output_spec: + return output_value + + # Handle single output + if not isinstance(output_value, (list, tuple)): + if self._is_store_output(output_spec[0]): + return self.compress_store_data(output_value) + return output_value + + # Handle multiple outputs + processed_outputs = [] + for data, spec in zip(output_value, output_spec): + if self._is_store_output(spec): + processed_outputs.append(self.compress_store_data(data)) + else: + processed_outputs.append(data) + + return type(output_value)(processed_outputs) + + def decompress_callback_inputs( + self, func_args: Tuple[Any, ...], input_spec: List[Dict[str, Any]] + ) -> Tuple[Any, ...]: + """Decompress ALL Store inputs in this callback. + + Args: + func_args: Function arguments tuple + input_spec: List of input+state specifications + + Returns: + Processed func_args with Store data decompressed + """ + if not input_spec or not func_args: + return func_args + + processed_args = [] + for arg, spec in zip(func_args, input_spec): + if self._is_store_input(spec): + processed_args.append(self.decompress_store_data(arg)) + else: + processed_args.append(arg) + + return tuple(processed_args) + + def _is_store_output(self, output_spec: Dict[str, Any]) -> bool: + """Check if output is a Store component data property.""" + return ( + output_spec.get("type") == "Store" and output_spec.get("property") == "data" + ) + + def _is_store_input(self, input_spec: Dict[str, Any]) -> bool: + """Check if input is a Store component data property.""" + return ( + input_spec.get("type") == "Store" and input_spec.get("property") == "data" + ) + + +class GzipCompressionManager(BaseStoreCompressionManager): + """Gzip compression manager for Store components. + + Provides good balance of compression ratio and speed. + Most widely supported compression algorithm. + """ + + @property + def algorithm_name(self) -> str: + return "gzip" + + def _compress_bytes(self, data: bytes) -> bytes: + """Compress using gzip algorithm.""" + return gzip.compress(data, compresslevel=self.level) + + def _decompress_bytes(self, data: bytes) -> bytes: + """Decompress using gzip algorithm.""" + return gzip.decompress(data) + + +class DeflateCompressionManager(BaseStoreCompressionManager): + """Deflate compression manager for Store components. + + Faster than gzip with slightly less compression. + Good for real-time applications where speed matters. + """ + + @property + def algorithm_name(self) -> str: + return "deflate" + + def _compress_bytes(self, data: bytes) -> bytes: + """Compress using deflate algorithm.""" + return zlib.compress(data, level=self.level) + + def _decompress_bytes(self, data: bytes) -> bytes: + """Decompress using deflate algorithm.""" + return zlib.decompress(data) + + +class BrotliCompressionManager(BaseStoreCompressionManager): + """Brotli compression manager for Store components. + + Best compression ratio but slower than gzip/deflate. + Ideal for large datasets where compression ratio is most important. + """ + + def __init__(self, *args, **kwargs): + if not BROTLI_AVAILABLE: + raise ImportError( + "Brotli compression requires the 'brotli' package. " + "Install with: pip install brotli" + ) + super().__init__(*args, **kwargs) + + @property + def algorithm_name(self) -> str: + return "brotli" + + def _validate_level(self, level: int) -> int: + """Validate brotli compression level (0-11).""" + if not isinstance(level, int) or level < 0 or level > 11: + raise ValueError(f"Brotli compression level must be 0-11, got {level}") + return level + + def _compress_bytes(self, data: bytes) -> bytes: + """Compress using brotli algorithm.""" + return brotli.compress(data, quality=self.level) + + def _decompress_bytes(self, data: bytes) -> bytes: + """Decompress using brotli algorithm.""" + return brotli.decompress(data) + + +def get_compression_manager_from_kwargs( + kwargs: dict, +) -> Optional[BaseStoreCompressionManager]: + """Extract compression manager from kwargs dict. + + Args: + kwargs: Dictionary that may contain 'CompressionManager' key + + Returns: + BaseStoreCompressionManager instance or None if not found + """ + return kwargs.get("CompressionManager", None) + + +# Convenience alias +StoreCompressionManager = GzipCompressionManager diff --git a/tests/unit/library/test_grouped_callbacks.py b/tests/unit/library/test_grouped_callbacks.py index d332ea0561..bb10f58649 100644 --- a/tests/unit/library/test_grouped_callbacks.py +++ b/tests/unit/library/test_grouped_callbacks.py @@ -2,6 +2,7 @@ from dash._grouping import make_grouping_by_index, grouping_len, flatten_grouping from dash._utils import create_callback_id from dash.dependencies import Input, State, Output, ClientsideFunction +from dash._compression import GzipCompressionManager import mock import json import string @@ -202,3 +203,87 @@ def test_clientside_callback_grouping_validation(grouping): make_dependency_grouping(grouping, [Output]), make_dependency_grouping(grouping, [Input]), ) + + +def test_callback_compression_manager_parameter(): + """Test that compression_manager can be passed to callback decorator.""" + app = dash.Dash() + compression_manager = GzipCompressionManager() + + @app.callback( + Output("output", "children"), + Input("input", "value"), + CompressionManager=compression_manager, + ) + def update_output(value): + return value + + # Verify the callback was registered + assert len(app.callback_map) == 1 + + # Get the registered callback entry + callback_id = "output.children" + assert callback_id in app.callback_map + callback_entry = app.callback_map[callback_id] + + # Verify compression manager is stored in the callback entry + assert "compression_manager" in callback_entry + assert callback_entry["compression_manager"] is compression_manager + + +def test_callback_without_compression_manager(): + """Test that callbacks work normally without compression_manager.""" + app = dash.Dash() + + @app.callback(Output("output", "children"), Input("input", "value")) + def update_output(value): + return value + + # Verify the callback was registered + assert len(app.callback_map) == 1 + + # Get the registered callback entry + callback_id = "output.children" + callback_entry = app.callback_map[callback_id] + + # Verify no compression manager is stored + assert callback_entry["compression_manager"] is None + + +def test_multiple_callbacks_different_compression_managers(): + """Test that different callbacks can have different compression managers.""" + app = dash.Dash() + compression_manager1 = GzipCompressionManager(level=1) + compression_manager2 = GzipCompressionManager(level=9) + + @app.callback( + Output("output1", "children"), + Input("input1", "value"), + CompressionManager=compression_manager1, + ) + def update_output1(value): + return value + + @app.callback( + Output("output2", "children"), + Input("input2", "value"), + CompressionManager=compression_manager2, + ) + def update_output2(value): + return value + + # Verify both callbacks were registered + assert len(app.callback_map) == 2 + + # Get the registered callback entries + callback_entry1 = app.callback_map["output1.children"] + callback_entry2 = app.callback_map["output2.children"] + + # Extract compression managers + manager1 = callback_entry1["compression_manager"] + manager2 = callback_entry2["compression_manager"] + + # Verify different managers are assigned + assert manager1 is compression_manager1 + assert manager2 is compression_manager2 + assert manager1.level != manager2.level diff --git a/tests/unit/test_compression.py b/tests/unit/test_compression.py new file mode 100644 index 0000000000..5df2781e8d --- /dev/null +++ b/tests/unit/test_compression.py @@ -0,0 +1,485 @@ +"""Unit tests for Store compression managers.""" + +import pytest +import base64 + +from dash._compression import ( + GzipCompressionManager, + DeflateCompressionManager, + BrotliCompressionManager, + StoreCompressionManager, + get_compression_manager_from_kwargs, +) + + +class TestCompressionManagerCreation: + """Test compression manager instantiation and configuration.""" + + def test_gzip_manager_creation(self): + """Test GzipCompressionManager can be created with default settings.""" + manager = GzipCompressionManager() + assert manager.algorithm_name == "gzip" + assert manager.level == 6 + assert manager.threshold == 0 + + def test_deflate_manager_creation(self): + """Test DeflateCompressionManager can be created with default settings.""" + manager = DeflateCompressionManager() + assert manager.algorithm_name == "deflate" + assert manager.level == 6 + assert manager.threshold == 0 + + def test_brotli_manager_creation(self): + """Test BrotliCompressionManager creation (if brotli available).""" + try: + manager = BrotliCompressionManager() + assert manager.algorithm_name == "brotli" + assert manager.level == 6 + assert manager.threshold == 0 + except ImportError: + # Brotli not available, this is expected behavior + with pytest.raises(ImportError, match="Brotli compression requires"): + BrotliCompressionManager() + + def test_custom_parameters(self): + """Test compression managers with custom parameters.""" + manager = GzipCompressionManager(level=9, threshold=2048) + assert manager.level == 9 + assert manager.threshold == 2048 + + def test_level_validation(self): + """Test compression level validation.""" + # Valid levels + GzipCompressionManager(level=1) + GzipCompressionManager(level=9) + + # Invalid levels + with pytest.raises(ValueError, match="Compression level must be 1-9"): + GzipCompressionManager(level=0) + with pytest.raises(ValueError, match="Compression level must be 1-9"): + GzipCompressionManager(level=10) + + def test_brotli_level_validation(self): + """Test brotli-specific level validation (0-11).""" + try: + # Valid brotli levels + BrotliCompressionManager(level=0) + BrotliCompressionManager(level=11) + + # Invalid brotli levels + with pytest.raises( + ValueError, match="Brotli compression level must be 0-11" + ): + BrotliCompressionManager(level=12) + except ImportError: + # Brotli not available, skip test + pytest.skip("Brotli not available") + + def test_convenience_alias(self): + """Test that StoreCompressionManager is an alias for GzipCompressionManager.""" + manager = StoreCompressionManager() + assert isinstance(manager, GzipCompressionManager) + assert manager.algorithm_name == "gzip" + + +class TestCompressionThreshold: + """Test compression threshold behavior.""" + + def test_should_compress_none(self): + """Test that None data is not compressed.""" + manager = GzipCompressionManager() + assert not manager.should_compress(None) + + def test_should_compress_small_data(self): + """Test that small data below threshold is not compressed.""" + manager = GzipCompressionManager(threshold=1000) + small_data = {"key": "value"} # Much smaller than 1000 bytes + assert not manager.should_compress(small_data) + + def test_should_compress_large_data(self): + """Test that large data above threshold is compressed.""" + manager = GzipCompressionManager(threshold=100) + large_data = {"key": "x" * 200} # Larger than 100 bytes when JSON serialized + assert manager.should_compress(large_data) + + def test_should_compress_unserializable(self): + """Test that unserializable data is not compressed.""" + manager = GzipCompressionManager( + threshold=100 + ) # Set threshold > 0 to test serialization check + + class UnserializableClass: + def __str__(self): + raise TypeError("Cannot convert to string") + + unserializable = UnserializableClass() + assert not manager.should_compress(unserializable) + + +class TestCompressionRoundTrip: + """Test compression and decompression round-trip behavior.""" + + @pytest.mark.parametrize( + "manager_class", [GzipCompressionManager, DeflateCompressionManager] + ) + def test_basic_round_trip(self, manager_class): + """Test basic compression/decompression round trip.""" + manager = manager_class(threshold=10) # Low threshold to ensure compression + + # Create data large enough to ensure compression + original_data = { + "numbers": list(range(100)), # Much larger dataset + "text": "Hello, world! " * 50, # Repeat text to make it larger + "nested": {"key": "value" * 20, "count": 42}, + } + + # Compress + compressed = manager.compress_store_data(original_data) + + # Should return compressed payload + assert isinstance(compressed, dict) + assert compressed.get("compressed") is True + assert "algorithm" in compressed + assert "data" in compressed + assert "original_size" in compressed + assert "compressed_size" in compressed + + # Decompress + decompressed = manager.decompress_store_data(compressed) + + # Should match original + assert decompressed == original_data + + def test_brotli_round_trip(self): + """Test brotli compression round trip (if available).""" + try: + manager = BrotliCompressionManager(threshold=10) + + original_data = {"test": "data" * 50} # Ensure it's above threshold + + compressed = manager.compress_store_data(original_data) + assert compressed.get("algorithm") == "brotli" + + decompressed = manager.decompress_store_data(compressed) + assert decompressed == original_data + + except ImportError: + pytest.skip("Brotli not available") + + def test_compression_effectiveness(self): + """Test that compression actually reduces size for compressible data.""" + manager = GzipCompressionManager(threshold=10) + + # Highly repetitive data should compress well + repetitive_data = {"repeated": "A" * 1000} + + compressed = manager.compress_store_data(repetitive_data) + + assert compressed.get("compressed") is True + assert compressed["compressed_size"] < compressed["original_size"] + + def test_incompressible_data_fallback(self): + """Test fallback when compression doesn't reduce size.""" + manager = GzipCompressionManager(threshold=10) + + # Create data that might not compress well + # Note: This test might be flaky as gzip can compress almost anything + # We're testing the logic path, even if compression is usually effective + small_random_data = {"x": 42} + + result = manager.compress_store_data(small_random_data) + + # Either compressed or original data should be returned + if isinstance(result, dict) and result.get("compressed"): + # Was compressed + assert "algorithm" in result + else: + # Fell back to original + assert result == small_random_data + + +class TestCompressionErrorHandling: + """Test error handling and graceful fallbacks.""" + + def test_compression_error_fallback(self): + """Test graceful fallback when compression fails.""" + manager = GzipCompressionManager(threshold=10) + + # Mock a compression failure by overriding _compress_bytes + original_compress = manager._compress_bytes + + def failing_compress(data): + raise OSError("Compression failed") + + manager._compress_bytes = failing_compress + + data = {"test": "data" * 50} + result = manager.compress_store_data(data) + + # Should fall back to original data + assert result == data + + # Restore original method + manager._compress_bytes = original_compress + + def test_decompression_error_fallback(self): + """Test graceful fallback when decompression fails.""" + manager = GzipCompressionManager() + + # Create invalid compressed payload + invalid_payload = { + "compressed": True, + "algorithm": "gzip", + "data": "invalid_base64_data!!!", + "original_size": 100, + "compressed_size": 50, + } + + result = manager.decompress_store_data(invalid_payload) + + # Should fall back to original payload + assert result == invalid_payload + + def test_algorithm_mismatch_fallback(self): + """Test fallback when algorithm doesn't match.""" + gzip_manager = GzipCompressionManager() + + # Create payload with different algorithm + mismatched_payload = { + "compressed": True, + "algorithm": "deflate", # Wrong algorithm + "data": base64.b64encode(b"test").decode("ascii"), + "original_size": 100, + "compressed_size": 50, + } + + result = gzip_manager.decompress_store_data(mismatched_payload) + + # Should fall back to original payload + assert result == mismatched_payload + + def test_non_compressed_payload_passthrough(self): + """Test that non-compressed data passes through unchanged.""" + manager = GzipCompressionManager() + + regular_data = {"normal": "data"} + result = manager.decompress_store_data(regular_data) + + assert result == regular_data + + +class TestCallbackIntegration: + """Test callback-level compression and decompression methods.""" + + def test_compress_callback_outputs_single(self): + """Test compressing single callback output.""" + manager = GzipCompressionManager(threshold=10) + + output_value = {"large": "data" * 100} + output_spec = [{"type": "Store", "property": "data"}] + + result = manager.compress_callback_outputs(output_value, output_spec) + + # Should be compressed + assert isinstance(result, dict) + assert result.get("compressed") is True + + def test_compress_callback_outputs_multiple(self): + """Test compressing multiple callback outputs.""" + manager = GzipCompressionManager(threshold=10) + + output_value = [ + {"store": "data" * 100}, # Should be compressed + {"graph": "figure_data"}, # Should not be compressed (not Store) + ] + output_spec = [ + {"type": "Store", "property": "data"}, + {"type": "Graph", "property": "figure"}, + ] + + result = manager.compress_callback_outputs(output_value, output_spec) + + assert isinstance(result, list) + assert len(result) == 2 + + # First should be compressed (Store) + assert isinstance(result[0], dict) + assert result[0].get("compressed") is True + + # Second should be unchanged (not Store) + assert result[1] == {"graph": "figure_data"} + + def test_decompress_callback_inputs(self): + """Test decompressing callback inputs.""" + manager = GzipCompressionManager(threshold=10) + + # Create compressed data + original_data = {"input": "data" * 100} + compressed_data = manager.compress_store_data(original_data) + + func_args = (compressed_data, "other_arg") + input_spec = [ + {"type": "Store", "property": "data"}, + {"type": "Input", "property": "value"}, + ] + + result = manager.decompress_callback_inputs(func_args, input_spec) + + assert isinstance(result, tuple) + assert len(result) == 2 + + # First should be decompressed + assert result[0] == original_data + + # Second should be unchanged + assert result[1] == "other_arg" + + def test_non_store_components_ignored(self): + """Test that non-Store components are ignored during compression.""" + manager = GzipCompressionManager(threshold=10) + + output_value = [{"data": "value1"}, {"data": "value2"}] + output_spec = [ + {"type": "Input", "property": "value"}, + {"type": "Div", "property": "children"}, + ] + + result = manager.compress_callback_outputs(output_value, output_spec) + + # Should be unchanged since no Store components + assert result == output_value + + +class TestKwargsHelperFunction: + """Test the get_compression_manager_from_kwargs helper function.""" + + def test_get_manager_from_kwargs_present(self): + """Test extracting compression manager when present in kwargs.""" + manager = GzipCompressionManager() + kwargs = {"CompressionManager": manager, "other_param": "value"} + + result = get_compression_manager_from_kwargs(kwargs) + assert result is manager + + def test_get_manager_from_kwargs_absent(self): + """Test extracting compression manager when not present in kwargs.""" + kwargs = {"other_param": "value"} + + result = get_compression_manager_from_kwargs(kwargs) + assert result is None + + def test_get_manager_from_kwargs_empty(self): + """Test extracting compression manager from empty kwargs.""" + kwargs = {} + + result = get_compression_manager_from_kwargs(kwargs) + assert result is None + + +class TestStoreComponentDetection: + """Test Store component detection logic.""" + + def test_is_store_output_positive(self): + """Test detecting Store output components.""" + manager = GzipCompressionManager() + + store_spec = {"type": "Store", "property": "data"} + assert manager._is_store_output(store_spec) + + def test_is_store_output_negative(self): + """Test rejecting non-Store output components.""" + manager = GzipCompressionManager() + + non_store_specs = [ + {"type": "Input", "property": "value"}, + {"type": "Store", "property": "clear_data"}, # Wrong property + {"type": "Div", "property": "children"}, + ] + + for spec in non_store_specs: + assert not manager._is_store_output(spec) + + def test_is_store_input_positive(self): + """Test detecting Store input components.""" + manager = GzipCompressionManager() + + store_spec = {"type": "Store", "property": "data"} + assert manager._is_store_input(store_spec) + + def test_is_store_input_negative(self): + """Test rejecting non-Store input components.""" + manager = GzipCompressionManager() + + non_store_specs = [ + {"type": "Input", "property": "value"}, + {"type": "Store", "property": "modified_timestamp"}, # Wrong property + {"type": "State", "property": "data"}, + ] + + for spec in non_store_specs: + assert not manager._is_store_input(spec) + + +class TestCompressionPayloadStructure: + """Test the structure of compressed payloads.""" + + def test_compressed_payload_structure(self): + """Test that compressed payloads have the expected structure.""" + manager = GzipCompressionManager(threshold=10) + + data = {"test": "data" * 100} + compressed = manager.compress_store_data(data) + + # Check required fields + required_fields = [ + "compressed", + "algorithm", + "level", + "data", + "original_size", + "compressed_size", + ] + for field in required_fields: + assert field in compressed + + # Check field types and values + assert compressed["compressed"] is True + assert isinstance(compressed["algorithm"], str) + assert isinstance(compressed["level"], int) + assert isinstance(compressed["data"], str) # Base64 encoded + assert isinstance(compressed["original_size"], int) + assert isinstance(compressed["compressed_size"], int) + + # Check that base64 data is valid + try: + base64.b64decode(compressed["data"]) + except Exception: + pytest.fail("Invalid base64 data in compressed payload") + + def test_is_compressed_payload_detection(self): + """Test detection of compressed vs uncompressed payloads.""" + manager = GzipCompressionManager() + + # Valid compressed payload + compressed_payload = { + "compressed": True, + "algorithm": "gzip", + "data": "eJzLSM3JyVcozy/KSVEEABxJBD4=", + "original_size": 20, + "compressed_size": 15, + } + assert manager._is_compressed_payload(compressed_payload) + + # Invalid payloads + invalid_payloads = [ + {"compressed": False, "algorithm": "gzip", "data": "test"}, + {"algorithm": "gzip", "data": "test"}, # Missing compressed field + {"compressed": True, "data": "test"}, # Missing algorithm + {"compressed": True, "algorithm": "gzip"}, # Missing data + "not_a_dict", + None, + {"regular": "data"}, + ] + + for payload in invalid_payloads: + assert not manager._is_compressed_payload(payload)