diff --git a/misc/helm-charts/operator/README.md b/misc/helm-charts/operator/README.md index 4cbe9277b035e..6cf6ae48569d9 100644 --- a/misc/helm-charts/operator/README.md +++ b/misc/helm-charts/operator/README.md @@ -287,7 +287,9 @@ helm upgrade my-materialize-operator materialize/misc/helm-charts/operator -f my To upgrade your Materialize instances, you'll need to update the Materialize custom resource and trigger a rollout. -By default, the operator performs rolling upgrades (`inPlaceRollout: false`) which minimize downtime but require additional Kubernetes cluster resources during the transition. However, keep in mind that rolling upgrades typically take longer to complete due to the sequential rollout process. For environments where downtime is acceptable, you can opt for in-place upgrades (`inPlaceRollout: true`). +By default, the operator performs rolling upgrades (`rolloutStrategy: WaitUntilReady`) which minimizes downtime but require additional Kubernetes cluster resources during the transition. + +For environments without enough capacity to perform the `WaitUntilReady` strategy, and where downtime is acceptable, there is the `ImmediatelyPromoteCausingDowntime` strategy. This strategy will cause downtime and is not recommended. If you think you need this, please reach out to Materialize engineering to discuss your situation. #### Determining the Version @@ -350,7 +352,6 @@ spec: environmentdImageRef: materialize/environmentd:v0.147.0 # Update version as needed requestRollout: 22222222-2222-2222-2222-222222222222 # Generate new UUID forceRollout: 33333333-3333-3333-3333-333333333333 # Optional: for forced rollouts - inPlaceRollout: false # When false, performs a rolling upgrade rather than in-place backendSecretName: materialize-backend ``` @@ -371,10 +372,6 @@ kubectl patch materialize \ -p "{\"spec\": {\"requestRollout\": \"$(uuidgen)\", \"forceRollout\": \"$(uuidgen)\"}}" ``` -The behavior of a forced rollout follows your `inPlaceRollout` setting: -- With `inPlaceRollout: false` (default): Creates new instances before terminating the old ones, temporarily requiring twice the resources during the transition -- With `inPlaceRollout: true`: Directly replaces the instances, causing downtime but without requiring additional resources - ### Verifying the Upgrade After initiating the rollout, you can monitor the status: @@ -392,9 +389,6 @@ kubectl logs -l app.kubernetes.io/name=materialize-operator -n materialize - `requestRollout` triggers a rollout only if there are actual changes to the instance (like image updates) - `forceRollout` triggers a rollout regardless of whether there are changes, which can be useful for debugging or when you need to force a rollout for other reasons - Both fields expect UUID values and each rollout requires a new, unique UUID value -- `inPlaceRollout`: - - When `false` (default): Performs a rolling upgrade by spawning new instances before terminating old ones. While this minimizes downtime, there may still be a brief interruption during the transition. - - When `true`: Directly replaces existing instances, which will cause downtime. # Operational Guidelines diff --git a/misc/helm-charts/operator/README.md.gotmpl b/misc/helm-charts/operator/README.md.gotmpl index 97e12cebfb32e..3c9343e94c891 100644 --- a/misc/helm-charts/operator/README.md.gotmpl +++ b/misc/helm-charts/operator/README.md.gotmpl @@ -228,7 +228,9 @@ helm upgrade my-materialize-operator materialize/misc/helm-charts/operator -f my To upgrade your Materialize instances, you'll need to update the Materialize custom resource and trigger a rollout. -By default, the operator performs rolling upgrades (`inPlaceRollout: false`) which minimize downtime but require additional Kubernetes cluster resources during the transition. However, keep in mind that rolling upgrades typically take longer to complete due to the sequential rollout process. For environments where downtime is acceptable, you can opt for in-place upgrades (`inPlaceRollout: true`). +By default, the operator performs rolling upgrades (`rolloutStrategy: WaitUntilReady`) which minimizes downtime but require additional Kubernetes cluster resources during the transition. + +For environments without enough capacity to perform the `WaitUntilReady` strategy, and where downtime is acceptable, there is the `ImmediatelyPromoteCausingDowntime` strategy. This strategy will cause downtime and is not recommended. If you think you need this, please reach out to Materialize engineering to discuss your situation. #### Determining the Version @@ -291,7 +293,6 @@ spec: environmentdImageRef: materialize/environmentd:v0.147.0 # Update version as needed requestRollout: 22222222-2222-2222-2222-222222222222 # Generate new UUID forceRollout: 33333333-3333-3333-3333-333333333333 # Optional: for forced rollouts - inPlaceRollout: false # When false, performs a rolling upgrade rather than in-place backendSecretName: materialize-backend ``` @@ -312,10 +313,6 @@ kubectl patch materialize \ -p "{\"spec\": {\"requestRollout\": \"$(uuidgen)\", \"forceRollout\": \"$(uuidgen)\"}}" ``` -The behavior of a forced rollout follows your `inPlaceRollout` setting: -- With `inPlaceRollout: false` (default): Creates new instances before terminating the old ones, temporarily requiring twice the resources during the transition -- With `inPlaceRollout: true`: Directly replaces the instances, causing downtime but without requiring additional resources - ### Verifying the Upgrade After initiating the rollout, you can monitor the status: @@ -333,9 +330,6 @@ kubectl logs -l app.kubernetes.io/name=materialize-operator -n materialize - `requestRollout` triggers a rollout only if there are actual changes to the instance (like image updates) - `forceRollout` triggers a rollout regardless of whether there are changes, which can be useful for debugging or when you need to force a rollout for other reasons - Both fields expect UUID values and each rollout requires a new, unique UUID value -- `inPlaceRollout`: - - When `false` (default): Performs a rolling upgrade by spawning new instances before terminating old ones. While this minimizes downtime, there may still be a brief interruption during the transition. - - When `true`: Directly replaces existing instances, which will cause downtime. # Operational Guidelines diff --git a/src/cloud-resources/src/crd/materialize.rs b/src/cloud-resources/src/crd/materialize.rs index 71b39fc9c0d4d..2102123926ece 100644 --- a/src/cloud-resources/src/crd/materialize.rs +++ b/src/cloud-resources/src/crd/materialize.rs @@ -58,6 +58,30 @@ pub mod v1alpha1 { // Additional annotations and labels to include in the Certificate object. pub secret_template: Option, } + #[derive(Clone, Debug, PartialEq, Deserialize, Serialize, JsonSchema)] + pub enum MaterializeRolloutStrategy { + // Default. Create a new generation of pods, leaving the old generation around until the + // new ones are ready to take over. + // This minimizes downtime, and is what almost everyone should use. + WaitUntilReady, + + // WARNING!!! + // THIS WILL CAUSE YOUR MATERIALIZE INSTANCE TO BE UNAVAILABLE FOR SOME TIME!!! + // WARNING!!! + // + // Tear down the old generation of pods and promote the new generation of pods immediately, + // without waiting for the new generation of pods to be ready. + // + // This strategy should ONLY be used by customers with physical hardware who do not have + // enough hardware for the WaitUntilReady strategy. If you think you want this, please + // consult with Materialize engineering to discuss your situation. + ImmediatelyPromoteCausingDowntime, + } + impl Default for MaterializeRolloutStrategy { + fn default() -> Self { + Self::WaitUntilReady + } + } #[derive( CustomResource, Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema, @@ -147,11 +171,12 @@ pub mod v1alpha1 { // even without making any meaningful changes. #[serde(default)] pub force_rollout: Uuid, - // If false (the default), orchestratord will use the leader - // promotion codepath to minimize downtime during rollouts. If true, - // it will just kill the environmentd pod directly. + // Deprecated and ignored. Use rollout_strategy instead. #[serde(default)] pub in_place_rollout: bool, + // Rollout strategy to use when upgrading this Materialize instance. + #[serde(default)] + pub rollout_strategy: MaterializeRolloutStrategy, // The name of a secret containing metadata_backend_url and persist_backend_url. // It may also contain external_login_password_mz_system, which will be used as // the password for the mz_system user if authenticator_kind is Password. @@ -368,10 +393,6 @@ pub mod v1alpha1 { self.spec.request_rollout } - pub fn in_place_rollout(&self) -> bool { - self.spec.in_place_rollout - } - pub fn rollout_requested(&self) -> bool { self.requested_reconciliation_id() != self @@ -386,6 +407,8 @@ pub mod v1alpha1 { pub fn should_force_promote(&self) -> bool { self.spec.force_promote == self.spec.request_rollout + || self.spec.rollout_strategy + == MaterializeRolloutStrategy::ImmediatelyPromoteCausingDowntime } pub fn conditions_need_update(&self) -> bool { diff --git a/src/orchestratord/src/controller/materialize.rs b/src/orchestratord/src/controller/materialize.rs index 559b3d20d869c..deb5d8952897f 100644 --- a/src/orchestratord/src/controller/materialize.rs +++ b/src/orchestratord/src/controller/materialize.rs @@ -28,7 +28,7 @@ use uuid::Uuid; use crate::metrics::Metrics; use mz_cloud_provider::CloudProvider; use mz_cloud_resources::crd::materialize::v1alpha1::{ - Materialize, MaterializeCertSpec, MaterializeStatus, + Materialize, MaterializeCertSpec, MaterializeRolloutStrategy, MaterializeStatus, }; use mz_license_keys::validate; use mz_orchestrator_kubernetes::KubernetesImagePullPolicy; @@ -431,8 +431,7 @@ impl k8s_controller::Context for Context { let has_current_changes = status.resources_hash != active_resources.generate_hash(); let active_generation = status.active_generation; let next_generation = active_generation + 1; - let increment_generation = has_current_changes && !mz.in_place_rollout(); - let desired_generation = if increment_generation { + let desired_generation = if has_current_changes { next_generation } else { active_generation @@ -449,8 +448,9 @@ impl k8s_controller::Context for Context { ); let resources_hash = resources.generate_hash(); - let mut result = if has_current_changes { - if mz.rollout_requested() { + let mut result = match (has_current_changes, mz.rollout_requested()) { + // There are changes pending, and we want to appy them. + (true, true) => { // we remove the environment resources hash annotation here // because if we fail halfway through applying the resources, // things will be in an inconsistent state, and we don't want @@ -492,14 +492,20 @@ impl k8s_controller::Context for Context { let mz = &mz; let status = mz.status(); + if mz.spec.rollout_strategy + == MaterializeRolloutStrategy::ImmediatelyPromoteCausingDowntime + { + // The only reason someone would choose this strategy is if they didn't have + // space for the two generations of pods. + // Lets make room for the new ones by deleting the old generation. + resources + .teardown_generation(&client, mz, active_generation) + .await?; + } + trace!("applying environment resources"); match resources - .apply( - &client, - increment_generation, - mz.should_force_promote(), - &mz.namespace(), - ) + .apply(&client, mz.should_force_promote(), &mz.namespace()) .await { Ok(Some(action)) => { @@ -510,12 +516,12 @@ impl k8s_controller::Context for Context { // do this last, so that we keep traffic pointing at // the previous environmentd until the new one is // fully ready + // TODO add condition saying we're about to promote, + // and check it before aborting anything. resources.promote_services(&client, &mz.namespace()).await?; - if increment_generation { - resources - .teardown_generation(&client, mz, active_generation) - .await?; - } + resources + .teardown_generation(&client, mz, active_generation) + .await?; self.update_status( &mz_api, mz, @@ -578,7 +584,9 @@ impl k8s_controller::Context for Context { Err(e) } } - } else { + } + // There are changes pending, but we don't want to apply them yet. + (true, false) => { let mut needs_update = mz.conditions_need_update(); if mz.update_in_progress() { resources @@ -613,44 +621,46 @@ impl k8s_controller::Context for Context { debug!("changes detected, waiting for approval"); Ok(None) } - } else { - // this can happen if we update the environment, but then revert - // that update before the update was deployed. in this case, we - // don't want the environment to still show up as - // WaitingForApproval. - let mut needs_update = mz.conditions_need_update() || mz.rollout_requested(); - if mz.update_in_progress() { - resources - .teardown_generation(&client, mz, next_generation) + // No changes pending, but we might need to clean up a partially applied rollout. + (false, _) => { + // this can happen if we update the environment, but then revert + // that update before the update was deployed. in this case, we + // don't want the environment to still show up as + // WaitingForApproval. + let mut needs_update = mz.conditions_need_update() || mz.rollout_requested(); + if mz.update_in_progress() { + resources + .teardown_generation(&client, mz, next_generation) + .await?; + needs_update = true; + } + if needs_update { + self.update_status( + &mz_api, + mz, + MaterializeStatus { + active_generation, + last_completed_rollout_request: mz.requested_reconciliation_id(), + resource_id: status.resource_id, + resources_hash: status.resources_hash, + conditions: vec![Condition { + type_: "UpToDate".into(), + status: "True".into(), + last_transition_time: Time(chrono::offset::Utc::now()), + message: format!( + "No changes found from generation {active_generation}" + ), + observed_generation: mz.meta().generation, + reason: "Applied".into(), + }], + }, + active_generation != desired_generation, + ) .await?; - needs_update = true; - } - if needs_update { - self.update_status( - &mz_api, - mz, - MaterializeStatus { - active_generation, - last_completed_rollout_request: mz.requested_reconciliation_id(), - resource_id: status.resource_id, - resources_hash: status.resources_hash, - conditions: vec![Condition { - type_: "UpToDate".into(), - status: "True".into(), - last_transition_time: Time(chrono::offset::Utc::now()), - message: format!( - "No changes found from generation {active_generation}" - ), - observed_generation: mz.meta().generation, - reason: "Applied".into(), - }], - }, - active_generation != desired_generation, - ) - .await?; + } + debug!("no changes"); + Ok(None) } - debug!("no changes"); - Ok(None) }; // balancers rely on the environmentd service existing, which is diff --git a/src/orchestratord/src/controller/materialize/environmentd.rs b/src/orchestratord/src/controller/materialize/environmentd.rs index 5784b03442a02..c7effa59eca33 100644 --- a/src/orchestratord/src/controller/materialize/environmentd.rs +++ b/src/orchestratord/src/controller/materialize/environmentd.rs @@ -20,7 +20,7 @@ use k8s_openapi::{ apps::v1::{StatefulSet, StatefulSetSpec, StatefulSetUpdateStrategy}, core::v1::{ Capabilities, ConfigMap, ConfigMapVolumeSource, Container, ContainerPort, EnvVar, - EnvVarSource, KeyToPath, Pod, PodSecurityContext, PodSpec, PodTemplateSpec, Probe, + EnvVarSource, KeyToPath, PodSecurityContext, PodSpec, PodTemplateSpec, Probe, SeccompProfile, Secret, SecretKeySelector, SecretVolumeSource, SecurityContext, Service, ServiceAccount, ServicePort, ServiceSpec, TCPSocketAction, Toleration, Volume, VolumeMount, @@ -171,7 +171,6 @@ impl Resources { pub async fn apply( &self, client: &Client, - increment_generation: bool, force_promote: bool, namespace: &str, ) -> Result, anyhow::Error> { @@ -183,7 +182,6 @@ impl Resources { let role_api: Api = Api::namespaced(client.clone(), namespace); let role_binding_api: Api = Api::namespaced(client.clone(), namespace); let statefulset_api: Api = Api::namespaced(client.clone(), namespace); - let pod_api: Api = Api::namespaced(client.clone(), namespace); let certificate_api: Api = Api::namespaced(client.clone(), namespace); let configmap_api: Api = Api::namespaced(client.clone(), namespace); @@ -220,195 +218,166 @@ impl Resources { trace!("creating new environmentd statefulset"); apply_resource(&statefulset_api, &*self.environmentd_statefulset).await?; - // until we have full zero downtime upgrades, we have a tradeoff: if - // we use the graceful upgrade mechanism, we minimize environmentd - // unavailability but require a full clusterd rehydration every time, - // and if we use the in-place upgrade mechanism, we cause a few - // minutes of environmentd downtime, but as long as the environmentd - // version didn't change, the existing clusterds will remain running - // and won't need to rehydrate. during a version bump, the tradeoff - // here is obvious (we need to rehydrate either way, so minimizing - // environmentd downtime in the meantime is strictly better), but if - // we need to force a rollout some other time (for instance, to - // increase the environmentd memory request, or something like that), - // it is often better to accept the environmentd unavailability in - // order to get the environment as a whole back to a working state - // sooner. once clusterd rehydration gets moved ahead of the leader - // promotion step, this will no longer make a difference and we can - // remove the extra codepath. - if increment_generation { - let retry_action = Action::requeue(Duration::from_secs(thread_rng().gen_range(5..10))); - - let statefulset = get_resource( - &statefulset_api, - &self.environmentd_statefulset.name_unchecked(), - ) - .await?; - if statefulset - .and_then(|statefulset| statefulset.status) - .and_then(|status| status.ready_replicas) - .unwrap_or(0) - == 0 - { - trace!("environmentd statefulset is not ready yet..."); - return Ok(Some(retry_action)); - } + let retry_action = Action::requeue(Duration::from_secs(thread_rng().gen_range(5..10))); + + let statefulset = get_resource( + &statefulset_api, + &self.environmentd_statefulset.name_unchecked(), + ) + .await?; + if statefulset + .and_then(|statefulset| statefulset.status) + .and_then(|status| status.ready_replicas) + .unwrap_or(0) + == 0 + { + trace!("environmentd statefulset is not ready yet..."); + return Ok(Some(retry_action)); + } - let http_client = match &self.connection_info.mz_system_secret_name { - Some(mz_system_secret_name) => { - let http_client = reqwest::Client::builder() - .timeout(std::time::Duration::from_secs(10)) - .cookie_store(true) - // TODO add_root_certificate instead - .danger_accept_invalid_certs(true) - .build() + let http_client = match &self.connection_info.mz_system_secret_name { + Some(mz_system_secret_name) => { + let http_client = reqwest::Client::builder() + .timeout(std::time::Duration::from_secs(10)) + .cookie_store(true) + // TODO add_root_certificate instead + .danger_accept_invalid_certs(true) + .build() + .unwrap(); + if let Some(data) = secret_api.get(mz_system_secret_name).await?.data { + if let Some(password) = data.get("external_login_password_mz_system").cloned() { + let password = String::from_utf8_lossy(&password.0).to_string(); + let login_url = reqwest::Url::parse(&format!( + "{}/api/login", + self.connection_info.environmentd_url, + )) .unwrap(); - if let Some(data) = secret_api.get(mz_system_secret_name).await?.data { - if let Some(password) = - data.get("external_login_password_mz_system").cloned() + match http_client + .post(login_url) + .body(serde_json::to_string(&LoginCredentials { + username: "mz_system".to_owned(), + password, + })?) + .header("Content-Type", "application/json") + .send() + .await { - let password = String::from_utf8_lossy(&password.0).to_string(); - let login_url = reqwest::Url::parse(&format!( - "{}/api/login", - self.connection_info.environmentd_url, - )) - .unwrap(); - match http_client - .post(login_url) - .body(serde_json::to_string(&LoginCredentials { - username: "mz_system".to_owned(), - password, - })?) - .header("Content-Type", "application/json") - .send() - .await - { - Ok(response) => { - if let Err(e) = response.error_for_status() { - trace!( - "failed to login to environmentd, retrying... ({e})" - ); - return Ok(Some(retry_action)); - } - } - Err(e) => { - trace!("failed to connect to environmentd, retrying... ({e})"); + Ok(response) => { + if let Err(e) = response.error_for_status() { + trace!("failed to login to environmentd, retrying... ({e})"); return Ok(Some(retry_action)); } - }; - } - }; - http_client - } - None => reqwest::Client::builder() - .timeout(std::time::Duration::from_secs(10)) - .build() - .unwrap(), - }; - let status_url = reqwest::Url::parse(&format!( - "{}/api/leader/status", - self.connection_info.environmentd_url, - )) - .unwrap(); - - match http_client.get(status_url.clone()).send().await { - Ok(response) => { - let response: BTreeMap = - match response.error_for_status() { - Ok(response) => response.json().await?, + } Err(e) => { - trace!("failed to get status of environmentd, retrying... ({e})"); + trace!("failed to connect to environmentd, retrying... ({e})"); return Ok(Some(retry_action)); } }; - if force_promote { - trace!("skipping cluster catchup"); - let skip_catchup_url = reqwest::Url::parse(&format!( - "{}/api/leader/skip-catchup", - self.connection_info.environmentd_url, - )) - .unwrap(); - let response = http_client.post(skip_catchup_url).send().await?; - if response.status() == StatusCode::BAD_REQUEST { - let err: SkipCatchupError = response.json().await?; - bail!("failed to skip catchup: {}", err.message); - } - } else if response["status"] == DeploymentStatus::Initializing { - trace!("environmentd is still initializing, retrying..."); + } + }; + http_client + } + None => reqwest::Client::builder() + .timeout(std::time::Duration::from_secs(10)) + .build() + .unwrap(), + }; + let status_url = reqwest::Url::parse(&format!( + "{}/api/leader/status", + self.connection_info.environmentd_url, + )) + .unwrap(); + + match http_client.get(status_url.clone()).send().await { + Ok(response) => { + let response: BTreeMap = match response.error_for_status() + { + Ok(response) => response.json().await?, + Err(e) => { + trace!("failed to get status of environmentd, retrying... ({e})"); return Ok(Some(retry_action)); - } else { - trace!("environmentd is ready"); } - } - Err(e) => { - trace!("failed to connect to environmentd, retrying... ({e})"); + }; + if force_promote { + trace!("skipping cluster catchup"); + let skip_catchup_url = reqwest::Url::parse(&format!( + "{}/api/leader/skip-catchup", + self.connection_info.environmentd_url, + )) + .unwrap(); + let response = http_client.post(skip_catchup_url).send().await?; + if response.status() == StatusCode::BAD_REQUEST { + let err: SkipCatchupError = response.json().await?; + bail!("failed to skip catchup: {}", err.message); + } + } else if response["status"] == DeploymentStatus::Initializing { + trace!("environmentd is still initializing, retrying..."); return Ok(Some(retry_action)); + } else { + trace!("environmentd is ready"); } } + Err(e) => { + trace!("failed to connect to environmentd, retrying... ({e})"); + return Ok(Some(retry_action)); + } + } - let promote_url = reqwest::Url::parse(&format!( - "{}/api/leader/promote", - self.connection_info.environmentd_url, - )) - .unwrap(); - - // !!!!!!!!!!!!!!!!!!!!!!!!!!! WARNING !!!!!!!!!!!!!!!!!!!!!!!!!!!!!! - // It is absolutely critical that this promotion is done last! - // - // If there are any failures in this method, the error handler in - // the caller will attempt to revert and delete the new environmentd. - // After promotion, the new environmentd is active, so that would - // cause an outage! - // !!!!!!!!!!!!!!!!!!!!!!!!!!! WARNING !!!!!!!!!!!!!!!!!!!!!!!!!!!!!! - trace!("promoting new environmentd to leader"); - let response = http_client.post(promote_url).send().await?; - let response: BecomeLeaderResponse = match response.error_for_status() { - Ok(response) => response.json().await?, - Err(e) => { - trace!("failed to promote environmentd, retrying... ({e})"); - return Ok(Some(retry_action)); - } - }; - if let BecomeLeaderResult::Failure { message } = response.result { - bail!("failed to promote new environmentd: {message}"); + let promote_url = reqwest::Url::parse(&format!( + "{}/api/leader/promote", + self.connection_info.environmentd_url, + )) + .unwrap(); + + // !!!!!!!!!!!!!!!!!!!!!!!!!!! WARNING !!!!!!!!!!!!!!!!!!!!!!!!!!!!!! + // It is absolutely critical that this promotion is done last! + // + // If there are any failures in this method, the error handler in + // the caller will attempt to revert and delete the new environmentd. + // After promotion, the new environmentd is active, so that would + // cause an outage! + // !!!!!!!!!!!!!!!!!!!!!!!!!!! WARNING !!!!!!!!!!!!!!!!!!!!!!!!!!!!!! + trace!("promoting new environmentd to leader"); + let response = http_client.post(promote_url).send().await?; + let response: BecomeLeaderResponse = match response.error_for_status() { + Ok(response) => response.json().await?, + Err(e) => { + trace!("failed to promote environmentd, retrying... ({e})"); + return Ok(Some(retry_action)); } + }; + if let BecomeLeaderResult::Failure { message } = response.result { + bail!("failed to promote new environmentd: {message}"); + } - // A successful POST to the promotion endpoint only indicates - // that the promotion process was kicked off. It does not - // guarantee that the environment will be successfully promoted - // (e.g., if the environment crashes immediately after responding - // to the request, but before executing the takeover, the - // promotion will be lost). - // - // To guarantee the environment has been promoted successfully, - // we must wait to see at least one `IsLeader` status returned - // from the environment. - - match http_client.get(status_url.clone()).send().await { - Ok(response) => { - let response: BTreeMap = response.json().await?; - if response["status"] != DeploymentStatus::IsLeader { - trace!( - "environmentd is still promoting (status: {:?}), retrying...", - response["status"] - ); - return Ok(Some(retry_action)); - } else { - trace!("environmentd is ready"); - } - } - Err(e) => { - trace!("failed to connect to environmentd, retrying... ({e})"); + // A successful POST to the promotion endpoint only indicates + // that the promotion process was kicked off. It does not + // guarantee that the environment will be successfully promoted + // (e.g., if the environment crashes immediately after responding + // to the request, but before executing the takeover, the + // promotion will be lost). + // + // To guarantee the environment has been promoted successfully, + // we must wait to see at least one `IsLeader` status returned + // from the environment. + + match http_client.get(status_url.clone()).send().await { + Ok(response) => { + let response: BTreeMap = response.json().await?; + if response["status"] != DeploymentStatus::IsLeader { + trace!( + "environmentd is still promoting (status: {:?}), retrying...", + response["status"] + ); return Ok(Some(retry_action)); + } else { + trace!("environmentd is ready"); } } - } else { - trace!("restarting environmentd pod to pick up statefulset changes"); - delete_resource( - &pod_api, - &statefulset_pod_name(&*self.environmentd_statefulset, 0), - ) - .await?; + Err(e) => { + trace!("failed to connect to environmentd, retrying... ({e})"); + return Ok(Some(retry_action)); + } } Ok(None) @@ -1773,7 +1742,3 @@ enum BecomeLeaderResult { struct SkipCatchupError { message: String, } - -fn statefulset_pod_name(statefulset: &StatefulSet, idx: u64) -> String { - format!("{}-{}", statefulset.name_unchecked(), idx) -} diff --git a/test/terraform/azure-temporary/main.tf b/test/terraform/azure-temporary/main.tf index 8b97bc404f59d..0bb85386b1549 100644 --- a/test/terraform/azure-temporary/main.tf +++ b/test/terraform/azure-temporary/main.tf @@ -132,7 +132,6 @@ variable "materialize_instances" { memory_request = optional(string, "1Gi") memory_limit = optional(string, "1Gi") create_database = optional(bool, true) - in_place_rollout = optional(bool, false) request_rollout = optional(string) force_rollout = optional(string) })) diff --git a/test/terraform/mzcompose.py b/test/terraform/mzcompose.py index bb86a667afdb8..ecff7ff0f2c76 100644 --- a/test/terraform/mzcompose.py +++ b/test/terraform/mzcompose.py @@ -730,7 +730,6 @@ def upgrade(self, tag: str) -> None: "namespace": "materialize-environment", }, "spec": { - "inPlaceRollout": False, "requestRollout": f"12345678-9012-3456-7890-12345678901{self.version+3}", "environmentdImageRef": f"materialize/environmentd:{tag}", "environmentdResourceRequirements": {