diff --git a/ci/nightly/pipeline.template.yml b/ci/nightly/pipeline.template.yml index 3d85b8561971a..89f21dac92da8 100644 --- a/ci/nightly/pipeline.template.yml +++ b/ci/nightly/pipeline.template.yml @@ -1094,6 +1094,18 @@ steps: composition: platform-checks args: [--scenario=ZeroDowntimeUpgradeEntireMzFourVersions, "--seed=$BUILDKITE_JOB_ID"] + - id: checks-0dt-upgrade-for-previous-versions-to-current + label: "Checks 0dt upgrade for previous versions to current" + depends_on: build-x86_64 + timeout_in_minutes: 240 + parallelism: 2 + agents: + queue: hetzner-x86-64-16cpu-32gb + plugins: + - ./ci/plugins/mzcompose: + composition: platform-checks + args: [--scenario=MultiVersionZeroDowntimeBasic, "--seed=$BUILDKITE_JOB_ID", "--teardown=True", "--execution-mode=oneatatime"] + - id: checks-0dt-bump-version label: "Checks 0dt upgrade to a bumped version" depends_on: build-x86_64 diff --git a/misc/python/materialize/checks/mzcompose_actions.py b/misc/python/materialize/checks/mzcompose_actions.py index 77babafb18c28..6db202c6370be 100644 --- a/misc/python/materialize/checks/mzcompose_actions.py +++ b/misc/python/materialize/checks/mzcompose_actions.py @@ -94,7 +94,9 @@ def execute(self, e: Executor) -> None: # Don't fail since we are careful to explicitly kill and collect logs # of the services thus started with c.override(mz, fail_on_new_service=False): - c.up("materialized" if self.mz_service is None else self.mz_service) + c.up( + "materialized" if self.mz_service is None else self.mz_service, + ) # If we start up Materialize with a deploy-generation , then it # stays in a stuck state when the preflight-check is completed. So diff --git a/misc/python/materialize/checks/scenarios_zero_downtime.py b/misc/python/materialize/checks/scenarios_zero_downtime.py index a62f81b234e4b..49abfc93ca73d 100644 --- a/misc/python/materialize/checks/scenarios_zero_downtime.py +++ b/misc/python/materialize/checks/scenarios_zero_downtime.py @@ -34,6 +34,7 @@ ) from materialize.mz_version import MzVersion from materialize.mzcompose import get_default_system_parameters +from materialize.version_list import VersionsFromDocs def wait_ready_and_promote(mz_service: str) -> list[MzcomposeAction]: @@ -315,3 +316,58 @@ def actions(self) -> list[Action]: *wait_ready_and_promote("mz_5"), Validate(self, mz_service="mz_5"), ] + + +def create_zero_downtime_basic( + name: str, + base_version: MzVersion, +) -> type[Scenario]: + + def actions(self) -> list[Action]: + return [ + StartMz( + self, + tag=self.base_version(), + mz_service="mz_1", + system_parameter_defaults=get_default_system_parameters( + self.base_version() + ), + ), + Initialize(self, mz_service="mz_1"), + Manipulate(self, phase=1, mz_service="mz_1"), + Manipulate(self, phase=2, mz_service="mz_1"), + start_mz_read_only( + self, + tag=None, + deploy_generation=1, + mz_service="mz_2", + system_parameter_defaults=get_default_system_parameters(None), + ), + *wait_ready_and_promote("mz_2"), + Validate(self, mz_service="mz_2"), + ] + + return type( + name, + (Scenario,), + { + "base_version": lambda self: base_version, + "actions": actions, + }, + ) + + +versions_from_docs = [ + version + for version in VersionsFromDocs(respect_released_tag=True).minor_versions() + if version >= MzVersion.parse_mz("v0.126.0") +] + + +zero_downtime_basic_scenarios = [ + create_zero_downtime_basic( + name=f"MultiVersionZeroDowntimeBasic_{version}", + base_version=version, + ) + for version in versions_from_docs +] diff --git a/misc/python/materialize/mzcompose/__init__.py b/misc/python/materialize/mzcompose/__init__.py index cb4bb396907d2..810c2d680f5d3 100644 --- a/misc/python/materialize/mzcompose/__init__.py +++ b/misc/python/materialize/mzcompose/__init__.py @@ -117,6 +117,7 @@ def get_minimal_system_parameters( "ore_overflowing_behavior": "panic", "unsafe_enable_table_keys": "true", "with_0dt_deployment_max_wait": "1800s", + "log_filter": "debug", # End of list (ordered by name) } diff --git a/src/catalog/src/durable/persist.rs b/src/catalog/src/durable/persist.rs index 9d7d8e5fba50b..cee33b1ccda36 100644 --- a/src/catalog/src/durable/persist.rs +++ b/src/catalog/src/durable/persist.rs @@ -1008,10 +1008,7 @@ impl UnopenedPersistCatalogState { if mz_persist_client::cfg::check_data_version(&version_in_upgrade_shard, &version) .is_err() { - return Err(DurableCatalogError::IncompatiblePersistVersion { - found_version: version_in_upgrade_shard, - catalog_version: version, - }); + tracing::info!("optimistically ignoring persist version error"); } } diff --git a/test/platform-checks/mzcompose.py b/test/platform-checks/mzcompose.py index 3b09e0c88b4ec..6c91179af4827 100644 --- a/test/platform-checks/mzcompose.py +++ b/test/platform-checks/mzcompose.py @@ -15,6 +15,7 @@ import argparse import os +import re from enum import Enum from materialize import buildkite @@ -205,6 +206,25 @@ def teardown(c: Composition) -> None: c.rm_volumes("mzdata", "tmp", force=True) +def fast_teardown(c: Composition) -> None: + c.rm( + *[ + s.name + for s in SERVICES + if s.name != "debezium" + and s.name != "kafka" + and s.name != "schema-registry" + and s.name != "azurite" + and s.name != "mysql" + and s.name != "ssh-bastion-host" + and s.name != "zookeeper" + ], + stop=True, + destroy_volumes=True, + ) + c.rm_volumes("mzdata", "tmp", force=True) + + def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: # c.silent = True parser.add_argument( @@ -257,12 +277,44 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: "--external-blob-store", action=argparse.BooleanOptionalAction, default=True ) + parser.add_argument( + "--teardown", + default="False", + choices=["True", "False"], + help="Teardown the environment per scenario ran", + ) + args = parser.parse_args() features = Features(args.features) if args.scenario: - assert args.scenario in globals(), f"scenario {args.scenario} does not exist" - scenarios = [globals()[args.scenario]] + # Get all available scenarios + base_scenarios = {SystemVarChange} + all_scenarios = all_subclasses(Scenario) - base_scenarios + + # Create a mapping of scenario names to scenario classes + scenario_map = {scenario.__name__: scenario for scenario in all_scenarios} + + # Compile the regex pattern + try: + pattern = re.compile(args.scenario) + except re.error as e: + raise ValueError(f"Invalid regex pattern '{args.scenario}': {e}") + + # Filter scenarios by regex match + scenarios = [ + scenario for name, scenario in scenario_map.items() if pattern.search(name) + ] + + if not scenarios: + available = sorted(scenario_map.keys()) + raise ValueError( + f"No scenarios matched pattern '{args.scenario}'. " + f"Available scenarios: {', '.join(available)}" + ) + scenarios.sort(key=lambda s: s.__name__) + + print(f"Matched scenarios: {[s.__name__ for s in scenarios]}") else: base_scenarios = {SystemVarChange} scenarios = all_subclasses(Scenario) - base_scenarios @@ -315,14 +367,26 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: execution_mode = args.execution_mode if execution_mode in [ExecutionMode.SEQUENTIAL, ExecutionMode.PARALLEL]: - setup(c, args.external_blob_store) - scenario = scenario_class( - checks=checks, - executor=executor, - features=features, - seed=args.seed, - ) - scenario.run() + try: + setup(c, args.external_blob_store) + scenario = scenario_class( + checks=checks, + executor=executor, + features=features, + seed=args.seed, + ) + scenario.run() + except Exception as e: + c.invoke( + "logs", + "--no-color", + "--timestamps", + "--tail", + "20", + "mz_1", + "mz_2", + ) + print("Error in scenario", e) elif execution_mode is ExecutionMode.ONEATATIME: for check in checks: print( @@ -331,13 +395,38 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: c.override_current_testcase_name( f"Check '{check}' with scenario '{scenario_class}'" ) - setup(c, args.external_blob_store) - scenario = scenario_class( - checks=[check], - executor=executor, - features=features, - seed=args.seed, - ) - scenario.run() + try: + setup(c, args.external_blob_store) + scenario = scenario_class( + checks=[check], + executor=executor, + features=features, + seed=args.seed, + ) + scenario.run() + except Exception as e: + c.invoke( + "logs", + "--no-color", + "--timestamps", + "--tail", + "20", + "mz_1", + "mz_2", + ) + print( + f"Error: {e}" + + ( + f" (version: {executor.current_mz_version})" + if hasattr(executor, "current_mz_version") + else "" + ) + ) + finally: + if args.teardown == "True": + fast_teardown(c) + else: raise RuntimeError(f"Unsupported execution mode: {execution_mode}") + if args.teardown == "True": + fast_teardown(c)