Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions ci/nightly/pipeline.template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1094,6 +1094,18 @@ steps:
composition: platform-checks
args: [--scenario=ZeroDowntimeUpgradeEntireMzFourVersions, "--seed=$BUILDKITE_JOB_ID"]

- id: checks-0dt-upgrade-for-previous-versions-to-current
label: "Checks 0dt upgrade for previous versions to current"
depends_on: build-x86_64
timeout_in_minutes: 240
parallelism: 2
agents:
queue: hetzner-x86-64-16cpu-32gb
plugins:
- ./ci/plugins/mzcompose:
composition: platform-checks
args: [--scenario=MultiVersionZeroDowntimeBasic, "--seed=$BUILDKITE_JOB_ID", "--teardown=True"]

- id: checks-0dt-bump-version
label: "Checks 0dt upgrade to a bumped version"
depends_on: build-x86_64
Expand Down
4 changes: 3 additions & 1 deletion misc/python/materialize/checks/mzcompose_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,9 @@ def execute(self, e: Executor) -> None:
# Don't fail since we are careful to explicitly kill and collect logs
# of the services thus started
with c.override(mz, fail_on_new_service=False):
c.up("materialized" if self.mz_service is None else self.mz_service)
c.up(
"materialized" if self.mz_service is None else self.mz_service,
)

# If we start up Materialize with a deploy-generation , then it
# stays in a stuck state when the preflight-check is completed. So
Expand Down
52 changes: 52 additions & 0 deletions misc/python/materialize/checks/scenarios_zero_downtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
)
from materialize.mz_version import MzVersion
from materialize.mzcompose import get_default_system_parameters
from materialize.version_list import VersionsFromDocs


def wait_ready_and_promote(mz_service: str) -> list[MzcomposeAction]:
Expand Down Expand Up @@ -315,3 +316,54 @@ def actions(self) -> list[Action]:
*wait_ready_and_promote("mz_5"),
Validate(self, mz_service="mz_5"),
]


def create_zero_downtime_basic(
name: str,
base_version: MzVersion,
) -> type[Scenario]:

def actions(self) -> list[Action]:
return [
StartMz(
self,
tag=self.base_version(),
mz_service="mz_1",
),
Initialize(self, mz_service="mz_1"),
Manipulate(self, phase=1, mz_service="mz_1"),
Manipulate(self, phase=2, mz_service="mz_1"),
start_mz_read_only(
self,
tag=None,
deploy_generation=1,
mz_service="mz_2",
),
*wait_ready_and_promote("mz_2"),
Validate(self, mz_service="mz_2"),
]

return type(
name,
(Scenario,),
{
"base_version": lambda self: base_version,
"actions": actions,
},
)


versions_from_docs = [
version
for version in VersionsFromDocs(respect_released_tag=True).minor_versions()
if version >= MzVersion.parse_mz("v0.126.0")
]


zero_downtime_basic_scenarios = [
create_zero_downtime_basic(
name=f"MultiVersionZeroDowntimeBasic_{version}",
base_version=version,
)
for version in versions_from_docs
]
1 change: 1 addition & 0 deletions misc/python/materialize/mzcompose/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ def get_minimal_system_parameters(
"ore_overflowing_behavior": "panic",
"unsafe_enable_table_keys": "true",
"with_0dt_deployment_max_wait": "1800s",
"log_filter": "debug",
# End of list (ordered by name)
}

Expand Down
5 changes: 1 addition & 4 deletions src/catalog/src/durable/persist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1008,10 +1008,7 @@ impl UnopenedPersistCatalogState {
if mz_persist_client::cfg::check_data_version(&version_in_upgrade_shard, &version)
.is_err()
{
return Err(DurableCatalogError::IncompatiblePersistVersion {
found_version: version_in_upgrade_shard,
catalog_version: version,
});
tracing::info!("optimistically ignoring persist version error");
}
}

Expand Down
96 changes: 78 additions & 18 deletions test/platform-checks/mzcompose.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import argparse
import os
import re
from enum import Enum

from materialize import buildkite
Expand Down Expand Up @@ -257,12 +258,44 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
"--external-blob-store", action=argparse.BooleanOptionalAction, default=True
)

parser.add_argument(
"--teardown",
default="False",
choices=["True", "False"],
help="Teardown the environment per scenario ran",
)

args = parser.parse_args()
features = Features(args.features)

if args.scenario:
assert args.scenario in globals(), f"scenario {args.scenario} does not exist"
scenarios = [globals()[args.scenario]]
# Get all available scenarios
base_scenarios = {SystemVarChange}
all_scenarios = all_subclasses(Scenario) - base_scenarios

# Create a mapping of scenario names to scenario classes
scenario_map = {scenario.__name__: scenario for scenario in all_scenarios}

# Compile the regex pattern
try:
pattern = re.compile(args.scenario)
except re.error as e:
raise ValueError(f"Invalid regex pattern '{args.scenario}': {e}")

# Filter scenarios by regex match
scenarios = [
scenario for name, scenario in scenario_map.items() if pattern.search(name)
]

if not scenarios:
available = sorted(scenario_map.keys())
raise ValueError(
f"No scenarios matched pattern '{args.scenario}'. "
f"Available scenarios: {', '.join(available)}"
)
scenarios.sort(key=lambda s: s.__name__)

print(f"Matched scenarios: {[s.__name__ for s in scenarios]}")
else:
base_scenarios = {SystemVarChange}
scenarios = all_subclasses(Scenario) - base_scenarios
Expand Down Expand Up @@ -315,14 +348,26 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
execution_mode = args.execution_mode

if execution_mode in [ExecutionMode.SEQUENTIAL, ExecutionMode.PARALLEL]:
setup(c, args.external_blob_store)
scenario = scenario_class(
checks=checks,
executor=executor,
features=features,
seed=args.seed,
)
scenario.run()
try:
setup(c, args.external_blob_store)
scenario = scenario_class(
checks=checks,
executor=executor,
features=features,
seed=args.seed,
)
scenario.run()
except Exception as e:
c.invoke(
"logs",
"--no-color",
"--timestamps",
"--tail",
"20",
"mz_1",
"mz_2",
)
print("Error in scenario", e)
elif execution_mode is ExecutionMode.ONEATATIME:
for check in checks:
print(
Expand All @@ -331,13 +376,28 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
c.override_current_testcase_name(
f"Check '{check}' with scenario '{scenario_class}'"
)
setup(c, args.external_blob_store)
scenario = scenario_class(
checks=[check],
executor=executor,
features=features,
seed=args.seed,
)
scenario.run()
try:
setup(c, args.external_blob_store)
scenario = scenario_class(
checks=[check],
executor=executor,
features=features,
seed=args.seed,
)
scenario.run()
except Exception as e:
c.invoke(
"logs",
"--no-color",
"--timestamps",
"--tail",
"20",
"mz_1",
"mz_2",
)
print("Error in scenario", e)
else:
raise RuntimeError(f"Unsupported execution mode: {execution_mode}")
if args.teardown == "True":

teardown(c)