Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 159 additions & 0 deletions src/cloud-resources/src/crd/materialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,71 @@ pub mod v1alpha1 {
}
}

/// This check isn't strictly required since environmentd will still be able to determine
/// if the upgrade is allowed or not. However, doing this check allows us to provide
/// the error as soon as possible and in a more user friendly way.
pub fn is_valid_upgrade_version(active_version: &Version, next_version: &Version) -> bool {
// Don't allow rolling back
// Note: semver comparison handles RC versions correctly:
// v26.0.0-rc.1 < v26.0.0-rc.2 < v26.0.0
if next_version < active_version {
return false;
}

if active_version.major == 0 {
// Self managed 25.2 to 26.0
if next_version.major != active_version.major {
if next_version.major == 26
&& active_version.major == 0
&& active_version.minor == 164
{
return true;
} else {
return false;
}
}
// Self managed 25.1 to 25.2
if next_version.minor == 147 && active_version.minor == 130 {
return true;
}
// only allow upgrading a single minor version at a time
return next_version.minor <= active_version.minor + 1;
} else if active_version.major >= 26 {
// For versions 26.X.X and onwards, we deny upgrades past 1 major version of the active version
return next_version.major <= active_version.major + 1;
}

true
}

/// Checks if the current environmentd image ref is within the upgrade window of the last
/// successful rollout.
pub fn within_upgrade_window(&self) -> bool {
let active_environmentd_version = self
.status
.as_ref()
.and_then(|status| {
status
.last_completed_rollout_environmentd_image_ref
.as_ref()
})
.and_then(|image_ref| parse_image_ref(image_ref));

if let (Some(next_environmentd_version), Some(active_environmentd_version)) = (
parse_image_ref(&self.spec.environmentd_image_ref),
active_environmentd_version,
) {
Self::is_valid_upgrade_version(
&active_environmentd_version,
&next_environmentd_version,
)
} else {
// If we fail to parse either version,
// we still allow the upgrade since environmentd will still error if the upgrade is not allowed.
true
}
}

pub fn managed_resource_meta(&self, name: String) -> ObjectMeta {
ObjectMeta {
namespace: Some(self.namespace()),
Expand Down Expand Up @@ -516,6 +581,11 @@ pub mod v1alpha1 {
.expect("valid int generation");
}

// Initialize the last completed rollout environmentd image ref to
// the current image ref if not already set.
status.last_completed_rollout_environmentd_image_ref =
Some(self.spec.environmentd_image_ref.clone());

status
})
}
Expand All @@ -530,6 +600,10 @@ pub mod v1alpha1 {
pub active_generation: u64,
/// The UUID of the last successfully completed rollout.
pub last_completed_rollout_request: Uuid,
/// The image ref of the environmentd image that was last successfully rolled out.
/// Used to deny upgrades past 1 major version from the last successful rollout.
/// When None, we upgrade anyways.
pub last_completed_rollout_environmentd_image_ref: Option<String>,
/// A hash calculated from the spec of resources to be created based on this Materialize
/// spec. This is used for detecting when the existing resources are up to date.
/// If you want to trigger a rollout without making other changes that would cause this
Expand Down Expand Up @@ -630,4 +704,89 @@ mod tests {
mz.spec.environmentd_image_ref = "my.private.registry:5000:v0.33.3".to_owned();
assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
}

#[mz_ore::test]
fn within_upgrade_window() {
use super::v1alpha1::MaterializeStatus;

let mut mz = Materialize {
spec: MaterializeSpec {
environmentd_image_ref: "materialize/environmentd:v26.0.0".to_owned(),
..Default::default()
},
metadata: ObjectMeta {
..Default::default()
},
status: Some(MaterializeStatus {
last_completed_rollout_environmentd_image_ref: Some(
"materialize/environmentd:v26.0.0".to_owned(),
),
..Default::default()
}),
};

// Pass: upgrading from 26.0.0 to 27.7.3 (within 1 major version)
mz.spec.environmentd_image_ref = "materialize/environmentd:v27.7.3".to_owned();
assert!(mz.within_upgrade_window());

// Pass: upgrading from 26.0.0 to 27.7.8-dev.0 (within 1 major version, pre-release)
mz.spec.environmentd_image_ref = "materialize/environmentd:v27.7.8-dev.0".to_owned();
assert!(mz.within_upgrade_window());

// Fail: upgrading from 26.0.0 to 28.0.1 (more than 1 major version)
mz.spec.environmentd_image_ref = "materialize/environmentd:v28.0.1".to_owned();
assert!(!mz.within_upgrade_window());

// Pass: upgrading from 26.0.0 to 28.0.1.not_a_valid_version (invalid version, defaults to true)
mz.spec.environmentd_image_ref =
"materialize/environmentd:v28.0.1.not_a_valid_version".to_owned();
assert!(mz.within_upgrade_window());

// Pass: upgrading from 0.164.0 to 26.1.0 (self managed 25.2 to 26.0)
mz.status
.as_mut()
.unwrap()
.last_completed_rollout_environmentd_image_ref =
Some("materialize/environmentd:v0.164.0".to_owned());
mz.spec.environmentd_image_ref = "materialize/environmentd:v26.1.0".to_owned();
assert!(mz.within_upgrade_window());
}

#[mz_ore::test]
fn is_valid_upgrade_version() {
let success_tests = [
(Version::new(0, 83, 0), Version::new(0, 83, 0)),
(Version::new(0, 83, 0), Version::new(0, 84, 0)),
(Version::new(0, 9, 0), Version::new(0, 10, 0)),
(Version::new(0, 99, 0), Version::new(0, 100, 0)),
(Version::new(0, 83, 0), Version::new(0, 83, 1)),
(Version::new(0, 83, 0), Version::new(0, 83, 2)),
(Version::new(0, 83, 2), Version::new(0, 83, 10)),
(Version::new(0, 164, 0), Version::new(26, 0, 0)),
(Version::new(26, 0, 0), Version::new(26, 1, 0)),
(Version::new(26, 5, 3), Version::new(26, 10, 0)),
(Version::new(0, 130, 0), Version::new(0, 147, 0)),
];
for (active_version, next_version) in success_tests {
assert!(
Materialize::is_valid_upgrade_version(&active_version, &next_version),
"v{active_version} can upgrade to v{next_version}"
);
}

let failure_tests = [
(Version::new(0, 83, 0), Version::new(0, 82, 0)),
(Version::new(0, 83, 3), Version::new(0, 83, 2)),
(Version::new(0, 83, 3), Version::new(1, 83, 3)),
(Version::new(0, 83, 0), Version::new(0, 85, 0)),
(Version::new(26, 0, 0), Version::new(28, 0, 0)),
(Version::new(0, 130, 0), Version::new(26, 1, 0)),
];
for (active_version, next_version) in failure_tests {
assert!(
!Materialize::is_valid_upgrade_version(&active_version, &next_version),
"v{active_version} can't upgrade to v{next_version}"
);
}
}
}
47 changes: 46 additions & 1 deletion src/orchestratord/src/controller/materialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,9 @@ impl Context {
MaterializeStatus {
active_generation: desired_generation,
last_completed_rollout_request: mz.requested_reconciliation_id(),
last_completed_rollout_environmentd_image_ref: Some(
mz.spec.environmentd_image_ref.clone(),
),
resource_id: mz.status().resource_id,
resources_hash,
conditions: vec![Condition {
Expand Down Expand Up @@ -591,7 +594,7 @@ impl k8s_controller::Context for Context {
)
.await
}
// There are changes pending, and we want to appy them.
// There are changes pending, and we want to apply them.
(false, true, true) => {
// we remove the environment resources hash annotation here
// because if we fail halfway through applying the resources,
Expand All @@ -615,6 +618,8 @@ impl k8s_controller::Context for Context {
// we fail later on, we want to ensure that the
// rollout gets retried.
last_completed_rollout_request: status.last_completed_rollout_request,
last_completed_rollout_environmentd_image_ref: status
.last_completed_rollout_environmentd_image_ref,
resource_id: status.resource_id,
resources_hash: String::new(),
conditions: vec![Condition {
Expand All @@ -634,6 +639,38 @@ impl k8s_controller::Context for Context {
let mz = &mz;
let status = mz.status();

if !mz.within_upgrade_window() {
let last_completed_rollout_environmentd_image_ref =
status.last_completed_rollout_environmentd_image_ref;

self.update_status(
&mz_api,
mz,
MaterializeStatus {
active_generation,
last_completed_rollout_request: status.last_completed_rollout_request,
last_completed_rollout_environmentd_image_ref: last_completed_rollout_environmentd_image_ref.clone(),
resource_id: status.resource_id,
resources_hash: status.resources_hash,
conditions: vec![Condition {
type_: "UpToDate".into(),
status: "False".into(),
last_transition_time: Time(chrono::offset::Utc::now()),
message: format!(
"Refusing to upgrade from {} to {}. More than one major version from last successful rollout.",
last_completed_rollout_environmentd_image_ref.expect("should be set if upgrade window check fails"),
&mz.spec.environmentd_image_ref,
),
observed_generation: mz.meta().generation,
reason: "FailedDeploy".into(),
}],
},
active_generation != desired_generation,
)
.await?;
return Ok(None);
}

if mz.spec.rollout_strategy
== MaterializeRolloutStrategy::ImmediatelyPromoteCausingDowntime
{
Expand Down Expand Up @@ -673,6 +710,8 @@ impl k8s_controller::Context for Context {
// rollout gets retried.
last_completed_rollout_request: status
.last_completed_rollout_request,
last_completed_rollout_environmentd_image_ref: status
.last_completed_rollout_environmentd_image_ref,
resource_id: status.resource_id,
resources_hash: resources_hash.clone(),
conditions: vec![Condition {
Expand Down Expand Up @@ -710,6 +749,8 @@ impl k8s_controller::Context for Context {
// the rollout and we want to ensure it gets
// retried.
last_completed_rollout_request: status.last_completed_rollout_request,
last_completed_rollout_environmentd_image_ref: status
.last_completed_rollout_environmentd_image_ref,
resource_id: status.resource_id,
resources_hash: status.resources_hash,
conditions: vec![Condition {
Expand Down Expand Up @@ -746,6 +787,8 @@ impl k8s_controller::Context for Context {
MaterializeStatus {
active_generation,
last_completed_rollout_request: mz.requested_reconciliation_id(),
last_completed_rollout_environmentd_image_ref: status
.last_completed_rollout_environmentd_image_ref,
resource_id: status.resource_id,
resources_hash: status.resources_hash,
conditions: vec![Condition {
Expand Down Expand Up @@ -786,6 +829,8 @@ impl k8s_controller::Context for Context {
MaterializeStatus {
active_generation,
last_completed_rollout_request: mz.requested_reconciliation_id(),
last_completed_rollout_environmentd_image_ref: status
.last_completed_rollout_environmentd_image_ref,
resource_id: status.resource_id,
resources_hash: status.resources_hash,
conditions: vec![Condition {
Expand Down