Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 25 additions & 2 deletions src/ess/livedata/dashboard/configuration_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,34 @@ def initial_parameter_values(self) -> dict[str, Any]:
"""
Initial parameter values.

Default implementation returns persisted parameter values if available,
otherwise returns empty dict.
Default implementation returns persisted parameter values if available
and compatible with the current model, otherwise returns empty dict to
trigger default values.

If stored params have no field overlap with the current model (indicating
complete incompatibility, e.g., from a different workflow version), returns
empty dict to fall back to defaults rather than propagating invalid data.
"""
if not self._config_state:
return {}

# Check compatibility with current model
model_class = self.model_class()
if model_class is None:
# No model defined, return params as-is
return self._config_state.params

# Check if stored params have ANY overlap with current model fields
# If no field names match, the config is from an incompatible version
stored_keys = set(self._config_state.params.keys())
model_fields = set(model_class.model_fields.keys())

if stored_keys and not stored_keys.intersection(model_fields):
# Complete incompatibility: no field overlap, fall back to defaults
return {}

# Partial or full compatibility: let Pydantic handle defaults/validation
# Note: Pydantic ignores extra fields and uses defaults for missing ones
return self._config_state.params

@abstractmethod
Expand Down
22 changes: 17 additions & 5 deletions tests/integration/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
import logging
from contextlib import ExitStack
from types import TracebackType
from typing import Literal

from ess.livedata.dashboard.config_store import ConfigStoreManager
from ess.livedata.dashboard.dashboard_services import DashboardServices
from ess.livedata.dashboard.kafka_transport import DashboardKafkaTransport
from ess.livedata.dashboard.transport import NullTransport, Transport


class DashboardBackend:
Expand All @@ -31,6 +33,9 @@ class DashboardBackend:
Use dev mode with simplified topic structure
log_level:
Logging level
transport:
Transport type to use ('kafka' or 'none'). Defaults to 'kafka'.
Use 'none' for tests that don't need Kafka.

Notes
-----
Expand All @@ -44,6 +49,7 @@ def __init__(
instrument: str = 'dummy',
dev: bool = True,
log_level: int | str = logging.INFO,
transport: Literal['kafka', 'none'] = 'kafka',
):
self._instrument = instrument
self._dev = dev
Expand All @@ -61,10 +67,16 @@ def __init__(
instrument=instrument, store_type='memory'
)

# Create Kafka transport for integration tests
transport = DashboardKafkaTransport(
instrument=instrument, dev=dev, logger=self._logger
)
# Create transport based on configuration
transport_impl: Transport
if transport == 'none':
transport_impl = NullTransport()
elif transport == 'kafka':
transport_impl = DashboardKafkaTransport(
instrument=instrument, dev=dev, logger=self._logger
)
else:
raise ValueError(f"Unknown transport type: {transport}")

# Setup all dashboard services (no GUI components)
self._services = DashboardServices(
Expand All @@ -73,7 +85,7 @@ def __init__(
exit_stack=self._exit_stack,
logger=self._logger,
pipe_factory=lambda data: None, # No-op for tests
transport=transport,
transport=transport_impl,
config_manager=self._config_manager,
)

Expand Down
222 changes: 222 additions & 0 deletions tests/integration/config_persistence_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
# SPDX-License-Identifier: BSD-3-Clause
# Copyright (c) 2025 Scipp contributors (https://github.com/scipp)
"""Integration tests for workflow configuration persistence via config store."""

from collections.abc import Generator

import pytest

from ess.livedata.config.workflow_spec import WorkflowId
from ess.livedata.dashboard.configuration_adapter import ConfigurationState
from ess.livedata.handlers.monitor_workflow_specs import MonitorDataParams
from ess.livedata.parameter_models import Scale, TimeUnit, TOAEdges
from tests.integration.backend import DashboardBackend


@pytest.fixture
def backend_with_null_transport() -> Generator[DashboardBackend, None, None]:
"""Create DashboardBackend with null transport (no Kafka required)."""
with DashboardBackend(instrument='dummy', dev=True, transport='none') as backend:
yield backend


def test_workflow_params_stored_and_retrieved_via_config_store(
backend_with_null_transport: DashboardBackend,
) -> None:
"""
Test that workflow params are stored in config store and retrieved correctly.

This test verifies the complete persistence flow:
1. Start a workflow via WorkflowController with specific params
2. Verify params are stored in the config store
3. Create a new adapter via the controller
4. Verify the adapter retrieves the correct params from the config store

Note: This test uses null transport (no Kafka required) since it only tests
the config store persistence mechanism through the controller.
"""
# Define workflow parameters with non-default values
workflow_id = WorkflowId(
instrument='dummy',
namespace='monitor_data',
name='monitor_histogram',
version=1,
)
source_names = ['monitor1', 'monitor2']

# Create params with custom values (non-default)
custom_params = MonitorDataParams(
toa_edges=TOAEdges(
start=5.0,
stop=15.0,
num_bins=150,
scale=Scale.LINEAR,
unit=TimeUnit.MS,
)
)

# Start the workflow with custom parameters
job_ids = backend_with_null_transport.workflow_controller.start_workflow(
workflow_id=workflow_id,
source_names=source_names,
config=custom_params,
)

assert len(job_ids) == 2, f"Expected 2 jobs, got {len(job_ids)}"

# Verify params are stored in config store
stored_config = backend_with_null_transport.workflow_controller.get_workflow_config(
workflow_id
)
assert stored_config is not None, "Config should be stored in config store"
assert stored_config.source_names == source_names
assert stored_config.params == custom_params.model_dump()

# Create adapter and verify it retrieves correct params from config store
adapter = backend_with_null_transport.workflow_controller.create_workflow_adapter(
workflow_id
)
assert adapter.initial_source_names == source_names
assert adapter.initial_parameter_values == custom_params.model_dump()


def test_adapter_filters_removed_sources(
backend_with_null_transport: DashboardBackend,
) -> None:
"""
Test that adapter filters out sources that are no longer available.

This test verifies that if a workflow was started with sources that are
no longer in the workflow spec, the adapter correctly filters them out
when restoring the configuration.
"""
workflow_id = WorkflowId(
instrument='dummy',
namespace='monitor_data',
name='monitor_histogram',
version=1,
)

# Start workflow with multiple sources (monitor3 not in spec)
source_names = ['monitor1', 'monitor2', 'monitor3']
backend_with_null_transport.workflow_controller.start_workflow(
workflow_id=workflow_id,
source_names=source_names,
config=MonitorDataParams(),
)

# Verify config is stored with all sources
stored_config = backend_with_null_transport.workflow_controller.get_workflow_config(
workflow_id
)
assert stored_config is not None
assert set(stored_config.source_names) == {'monitor1', 'monitor2', 'monitor3'}

# Create adapter - it should filter to only sources available in spec
adapter = backend_with_null_transport.workflow_controller.create_workflow_adapter(
workflow_id
)

# Adapter should only return sources that are both persisted AND in the spec
initial_sources = adapter.initial_source_names
assert 'monitor1' in initial_sources
assert 'monitor2' in initial_sources
# monitor3 should be filtered out as it's not in the workflow spec


def test_config_persists_across_adapter_recreations(
backend_with_null_transport: DashboardBackend,
) -> None:
"""
Test that config persists correctly across multiple adapter recreations.

This verifies that creating multiple adapters from the same stored config
yields consistent results.
"""
workflow_id = WorkflowId(
instrument='dummy',
namespace='monitor_data',
name='monitor_histogram',
version=1,
)
source_names = ['monitor1']
custom_params = MonitorDataParams(
toa_edges=TOAEdges(
start=1.0,
stop=10.0,
num_bins=200,
scale=Scale.LINEAR,
unit=TimeUnit.MS,
)
)

backend_with_null_transport.workflow_controller.start_workflow(
workflow_id=workflow_id,
source_names=source_names,
config=custom_params,
)

# Create two adapters from the same stored config
adapter1 = backend_with_null_transport.workflow_controller.create_workflow_adapter(
workflow_id
)
adapter2 = backend_with_null_transport.workflow_controller.create_workflow_adapter(
workflow_id
)

# Both adapters should retrieve identical params from config store
assert adapter1.initial_parameter_values == adapter2.initial_parameter_values
assert adapter1.initial_parameter_values['toa_edges']['num_bins'] == 200


def test_incompatible_config_falls_back_to_defaults(
backend_with_null_transport: DashboardBackend,
) -> None:
"""
Test that incompatible config doesn't break adapter creation.

If stored config has params that are incompatible with the current
workflow parameter model (e.g., due to schema changes between versions),
the adapter should validate against the current model and fall back to
defaults rather than propagating invalid data to the UI.
"""
workflow_id = WorkflowId(
instrument='dummy',
namespace='monitor_data',
name='monitor_histogram',
version=1,
)

# Manually inject completely incompatible config into the store
# (e.g., from an old version of the workflow with different param structure)
incompatible_config = ConfigurationState(
source_names=['monitor1'],
aux_source_names={},
params={
'old_field_that_no_longer_exists': 42,
'another_invalid_field': 'invalid_value',
# Completely wrong structure - not matching current MonitorDataParams
},
)

# Directly store incompatible config in the config store
config_store = backend_with_null_transport.config_manager.get_store(
'workflow_configs'
)
config_store[workflow_id] = incompatible_config.model_dump()

# Adapter creation should not fail even with incompatible config
adapter = backend_with_null_transport.workflow_controller.create_workflow_adapter(
workflow_id
)

# Adapter should validate and detect incompatibility, returning empty dict
# which will cause the UI to use default parameter values
initial_params = adapter.initial_parameter_values
assert initial_params == {}, (
"Expected empty dict for incompatible params to trigger defaults, "
f"got {initial_params}"
)

# Verify source names are still restored (only params validation failed)
assert adapter.initial_source_names == ['monitor1']
Loading