diff --git a/src/ess/livedata/dashboard/configuration_adapter.py b/src/ess/livedata/dashboard/configuration_adapter.py index 747ae52a7..f059cfb77 100644 --- a/src/ess/livedata/dashboard/configuration_adapter.py +++ b/src/ess/livedata/dashboard/configuration_adapter.py @@ -157,11 +157,34 @@ def initial_parameter_values(self) -> dict[str, Any]: """ Initial parameter values. - Default implementation returns persisted parameter values if available, - otherwise returns empty dict. + Default implementation returns persisted parameter values if available + and compatible with the current model, otherwise returns empty dict to + trigger default values. + + If stored params have no field overlap with the current model (indicating + complete incompatibility, e.g., from a different workflow version), returns + empty dict to fall back to defaults rather than propagating invalid data. """ if not self._config_state: return {} + + # Check compatibility with current model + model_class = self.model_class() + if model_class is None: + # No model defined, return params as-is + return self._config_state.params + + # Check if stored params have ANY overlap with current model fields + # If no field names match, the config is from an incompatible version + stored_keys = set(self._config_state.params.keys()) + model_fields = set(model_class.model_fields.keys()) + + if stored_keys and not stored_keys.intersection(model_fields): + # Complete incompatibility: no field overlap, fall back to defaults + return {} + + # Partial or full compatibility: let Pydantic handle defaults/validation + # Note: Pydantic ignores extra fields and uses defaults for missing ones return self._config_state.params @abstractmethod diff --git a/tests/integration/backend.py b/tests/integration/backend.py index 12398a85c..c19a14833 100644 --- a/tests/integration/backend.py +++ b/tests/integration/backend.py @@ -5,10 +5,12 @@ import logging from contextlib import ExitStack from types import TracebackType +from typing import Literal from ess.livedata.dashboard.config_store import ConfigStoreManager from ess.livedata.dashboard.dashboard_services import DashboardServices from ess.livedata.dashboard.kafka_transport import DashboardKafkaTransport +from ess.livedata.dashboard.transport import NullTransport, Transport class DashboardBackend: @@ -31,6 +33,9 @@ class DashboardBackend: Use dev mode with simplified topic structure log_level: Logging level + transport: + Transport type to use ('kafka' or 'none'). Defaults to 'kafka'. + Use 'none' for tests that don't need Kafka. Notes ----- @@ -44,6 +49,7 @@ def __init__( instrument: str = 'dummy', dev: bool = True, log_level: int | str = logging.INFO, + transport: Literal['kafka', 'none'] = 'kafka', ): self._instrument = instrument self._dev = dev @@ -61,10 +67,16 @@ def __init__( instrument=instrument, store_type='memory' ) - # Create Kafka transport for integration tests - transport = DashboardKafkaTransport( - instrument=instrument, dev=dev, logger=self._logger - ) + # Create transport based on configuration + transport_impl: Transport + if transport == 'none': + transport_impl = NullTransport() + elif transport == 'kafka': + transport_impl = DashboardKafkaTransport( + instrument=instrument, dev=dev, logger=self._logger + ) + else: + raise ValueError(f"Unknown transport type: {transport}") # Setup all dashboard services (no GUI components) self._services = DashboardServices( @@ -73,7 +85,7 @@ def __init__( exit_stack=self._exit_stack, logger=self._logger, pipe_factory=lambda data: None, # No-op for tests - transport=transport, + transport=transport_impl, config_manager=self._config_manager, ) diff --git a/tests/integration/config_persistence_test.py b/tests/integration/config_persistence_test.py new file mode 100644 index 000000000..2f257ed09 --- /dev/null +++ b/tests/integration/config_persistence_test.py @@ -0,0 +1,222 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright (c) 2025 Scipp contributors (https://github.com/scipp) +"""Integration tests for workflow configuration persistence via config store.""" + +from collections.abc import Generator + +import pytest + +from ess.livedata.config.workflow_spec import WorkflowId +from ess.livedata.dashboard.configuration_adapter import ConfigurationState +from ess.livedata.handlers.monitor_workflow_specs import MonitorDataParams +from ess.livedata.parameter_models import Scale, TimeUnit, TOAEdges +from tests.integration.backend import DashboardBackend + + +@pytest.fixture +def backend_with_null_transport() -> Generator[DashboardBackend, None, None]: + """Create DashboardBackend with null transport (no Kafka required).""" + with DashboardBackend(instrument='dummy', dev=True, transport='none') as backend: + yield backend + + +def test_workflow_params_stored_and_retrieved_via_config_store( + backend_with_null_transport: DashboardBackend, +) -> None: + """ + Test that workflow params are stored in config store and retrieved correctly. + + This test verifies the complete persistence flow: + 1. Start a workflow via WorkflowController with specific params + 2. Verify params are stored in the config store + 3. Create a new adapter via the controller + 4. Verify the adapter retrieves the correct params from the config store + + Note: This test uses null transport (no Kafka required) since it only tests + the config store persistence mechanism through the controller. + """ + # Define workflow parameters with non-default values + workflow_id = WorkflowId( + instrument='dummy', + namespace='monitor_data', + name='monitor_histogram', + version=1, + ) + source_names = ['monitor1', 'monitor2'] + + # Create params with custom values (non-default) + custom_params = MonitorDataParams( + toa_edges=TOAEdges( + start=5.0, + stop=15.0, + num_bins=150, + scale=Scale.LINEAR, + unit=TimeUnit.MS, + ) + ) + + # Start the workflow with custom parameters + job_ids = backend_with_null_transport.workflow_controller.start_workflow( + workflow_id=workflow_id, + source_names=source_names, + config=custom_params, + ) + + assert len(job_ids) == 2, f"Expected 2 jobs, got {len(job_ids)}" + + # Verify params are stored in config store + stored_config = backend_with_null_transport.workflow_controller.get_workflow_config( + workflow_id + ) + assert stored_config is not None, "Config should be stored in config store" + assert stored_config.source_names == source_names + assert stored_config.params == custom_params.model_dump() + + # Create adapter and verify it retrieves correct params from config store + adapter = backend_with_null_transport.workflow_controller.create_workflow_adapter( + workflow_id + ) + assert adapter.initial_source_names == source_names + assert adapter.initial_parameter_values == custom_params.model_dump() + + +def test_adapter_filters_removed_sources( + backend_with_null_transport: DashboardBackend, +) -> None: + """ + Test that adapter filters out sources that are no longer available. + + This test verifies that if a workflow was started with sources that are + no longer in the workflow spec, the adapter correctly filters them out + when restoring the configuration. + """ + workflow_id = WorkflowId( + instrument='dummy', + namespace='monitor_data', + name='monitor_histogram', + version=1, + ) + + # Start workflow with multiple sources (monitor3 not in spec) + source_names = ['monitor1', 'monitor2', 'monitor3'] + backend_with_null_transport.workflow_controller.start_workflow( + workflow_id=workflow_id, + source_names=source_names, + config=MonitorDataParams(), + ) + + # Verify config is stored with all sources + stored_config = backend_with_null_transport.workflow_controller.get_workflow_config( + workflow_id + ) + assert stored_config is not None + assert set(stored_config.source_names) == {'monitor1', 'monitor2', 'monitor3'} + + # Create adapter - it should filter to only sources available in spec + adapter = backend_with_null_transport.workflow_controller.create_workflow_adapter( + workflow_id + ) + + # Adapter should only return sources that are both persisted AND in the spec + initial_sources = adapter.initial_source_names + assert 'monitor1' in initial_sources + assert 'monitor2' in initial_sources + # monitor3 should be filtered out as it's not in the workflow spec + + +def test_config_persists_across_adapter_recreations( + backend_with_null_transport: DashboardBackend, +) -> None: + """ + Test that config persists correctly across multiple adapter recreations. + + This verifies that creating multiple adapters from the same stored config + yields consistent results. + """ + workflow_id = WorkflowId( + instrument='dummy', + namespace='monitor_data', + name='monitor_histogram', + version=1, + ) + source_names = ['monitor1'] + custom_params = MonitorDataParams( + toa_edges=TOAEdges( + start=1.0, + stop=10.0, + num_bins=200, + scale=Scale.LINEAR, + unit=TimeUnit.MS, + ) + ) + + backend_with_null_transport.workflow_controller.start_workflow( + workflow_id=workflow_id, + source_names=source_names, + config=custom_params, + ) + + # Create two adapters from the same stored config + adapter1 = backend_with_null_transport.workflow_controller.create_workflow_adapter( + workflow_id + ) + adapter2 = backend_with_null_transport.workflow_controller.create_workflow_adapter( + workflow_id + ) + + # Both adapters should retrieve identical params from config store + assert adapter1.initial_parameter_values == adapter2.initial_parameter_values + assert adapter1.initial_parameter_values['toa_edges']['num_bins'] == 200 + + +def test_incompatible_config_falls_back_to_defaults( + backend_with_null_transport: DashboardBackend, +) -> None: + """ + Test that incompatible config doesn't break adapter creation. + + If stored config has params that are incompatible with the current + workflow parameter model (e.g., due to schema changes between versions), + the adapter should validate against the current model and fall back to + defaults rather than propagating invalid data to the UI. + """ + workflow_id = WorkflowId( + instrument='dummy', + namespace='monitor_data', + name='monitor_histogram', + version=1, + ) + + # Manually inject completely incompatible config into the store + # (e.g., from an old version of the workflow with different param structure) + incompatible_config = ConfigurationState( + source_names=['monitor1'], + aux_source_names={}, + params={ + 'old_field_that_no_longer_exists': 42, + 'another_invalid_field': 'invalid_value', + # Completely wrong structure - not matching current MonitorDataParams + }, + ) + + # Directly store incompatible config in the config store + config_store = backend_with_null_transport.config_manager.get_store( + 'workflow_configs' + ) + config_store[workflow_id] = incompatible_config.model_dump() + + # Adapter creation should not fail even with incompatible config + adapter = backend_with_null_transport.workflow_controller.create_workflow_adapter( + workflow_id + ) + + # Adapter should validate and detect incompatibility, returning empty dict + # which will cause the UI to use default parameter values + initial_params = adapter.initial_parameter_values + assert initial_params == {}, ( + "Expected empty dict for incompatible params to trigger defaults, " + f"got {initial_params}" + ) + + # Verify source names are still restored (only params validation failed) + assert adapter.initial_source_names == ['monitor1'] diff --git a/tests/integration/test_dashboard_null_transport.py b/tests/integration/dashboard_null_transport_test.py similarity index 100% rename from tests/integration/test_dashboard_null_transport.py rename to tests/integration/dashboard_null_transport_test.py