diff --git a/CHANGELOG.md b/CHANGELOG.md index b2682f7e..5a52449c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file. ## [Unreleased] +### Added + +- Add rolling upgrade support for upgrades between NiFi 2 versions ([#771]). + ### Changed - BREAKING: Replace stackable-operator `initialize_logging` with stackable-telemetry `Tracing` ([#767], [#776]). @@ -21,6 +25,7 @@ All notable changes to this project will be documented in this file. - Fix a bug where changes to ConfigMaps that are referenced in the NifiCluster spec didn't trigger a reconciliation ([#772]). [#767]: https://github.com/stackabletech/nifi-operator/pull/767 +[#771]: https://github.com/stackabletech/nifi-operator/pull/771 [#772]: https://github.com/stackabletech/nifi-operator/pull/772 [#774]: https://github.com/stackabletech/nifi-operator/pull/774 [#776]: https://github.com/stackabletech/nifi-operator/pull/776 diff --git a/docs/modules/nifi/pages/usage_guide/updating.adoc b/docs/modules/nifi/pages/usage_guide/updating.adoc index 9cb7c478..d1bbde6d 100644 --- a/docs/modules/nifi/pages/usage_guide/updating.adoc +++ b/docs/modules/nifi/pages/usage_guide/updating.adoc @@ -17,10 +17,16 @@ spec: <1> Change the NiFi version here -WARNING: NiFi clusters cannot be upgraded or downgraded in a rolling fashion due to a limitation in NiFi. -Any change to the NiFi version in the CRD triggers a full cluster restart with brief downtime. +[WARNING] +==== +NiFi clusters cannot be upgraded or downgraded in a rolling fashion due to a limitation in NiFi prior to version 2. + +When upgrading between NiFi 1 versions or from NiFi 1 to NiFi 2, any change to the NiFi version in the CRD triggers a full cluster restart with brief downtime. However, the Stackable image version can be updated in a rolling manner, provided the NiFi version remains unchanged. +For upgrades between NiFi 2 versions, e.g. from `2.0.0` to `2.2.0`, rolling upgrades are supported. +==== + == NiFi 2.0.0 Before you can upgrade to `2.0.0` you https://cwiki.apache.org/confluence/display/NIFI/Migration+Guidance[need to update] to at least version 1.27.x! diff --git a/rust/operator-binary/src/controller.rs b/rust/operator-binary/src/controller.rs index 76ba4521..cca71af6 100644 --- a/rust/operator-binary/src/controller.rs +++ b/rust/operator-binary/src/controller.rs @@ -87,7 +87,11 @@ use crate::{ STACKABLE_LOG_CONFIG_DIR, STACKABLE_LOG_DIR, authentication::AuthenticationClassResolved, v1alpha1, }, - operations::{graceful_shutdown::add_graceful_shutdown_config, pdb::add_pdbs}, + operations::{ + graceful_shutdown::add_graceful_shutdown_config, + pdb::add_pdbs, + upgrade::{self, ClusterVersionUpdateState}, + }, product_logging::extend_role_group_config_map, reporting_task::{self, build_maybe_reporting_task, build_reporting_task_service_name}, security::{ @@ -346,6 +350,9 @@ pub enum Error { AddVolumeMount { source: builder::pod::container::Error, }, + + #[snafu(display("Failed to determine the state of the version upgrade procedure"))] + ClusterVersionUpdateState { source: upgrade::Error }, } type Result = std::result::Result; @@ -356,13 +363,6 @@ impl ReconcilerError for Error { } } -#[derive(Debug, PartialEq, Eq)] -pub enum VersionChangeState { - BeginChange, - Stopped, - NoChange, -} - pub async fn reconcile_nifi( nifi: Arc>, ctx: Arc, @@ -391,78 +391,33 @@ pub async fn reconcile_nifi( .await .context(SecuritySnafu)?; - // Handle full restarts for a version change - let version_change = if let Some(deployed_version) = nifi + // If rolling upgrade is supported, kubernetes takes care of the cluster scaling automatically + // otherwise the operator handles it + // manage our own flow for upgrade from 1.x.x to 1.x.x/2.x.x + // TODO: this can be removed once 1.x.x is longer supported + let mut cluster_version_update_state = ClusterVersionUpdateState::NoVersionChange; + let deployed_version = nifi .status .as_ref() - .and_then(|status| status.deployed_version.as_ref()) - { - if deployed_version != &resolved_product_image.product_version { - // Check if statefulsets are already scaled to zero, if not - requeue - let selector = LabelSelector { - match_expressions: None, - match_labels: Some( - Labels::role_selector(nifi, APP_NAME, &NifiRole::Node.to_string()) - .context(LabelBuildSnafu)? - .into(), - ), - }; + .and_then(|status| status.deployed_version.as_ref()); + let rolling_upgrade_supported = resolved_product_image.product_version.starts_with("2.") + && deployed_version.is_some_and(|v| v.starts_with("2.")); - // Retrieve the deployed statefulsets to check on the current status of the restart - let deployed_statefulsets = client - .list_with_label_selector::(namespace, &selector) - .await - .context(FetchStatefulsetsSnafu)?; - - // Sum target replicas for all statefulsets - let target_replicas = deployed_statefulsets - .iter() - .filter_map(|statefulset| statefulset.spec.as_ref()) - .filter_map(|spec| spec.replicas) - .sum::(); - - // Sum current ready replicas for all statefulsets - let current_replicas = deployed_statefulsets - .iter() - .filter_map(|statefulset| statefulset.status.as_ref()) - .map(|status| status.replicas) - .sum::(); - - // If statefulsets have already been scaled to zero, but have remaining replicas - // we requeue to wait until a full stop has been performed. - if target_replicas == 0 && current_replicas > 0 { - tracing::info!( - "Cluster is performing a full restart at the moment and still shutting down, remaining replicas: [{}] - requeueing to wait for shutdown to finish", - current_replicas - ); - return Ok(Action::await_change()); - } + if !rolling_upgrade_supported { + cluster_version_update_state = upgrade::cluster_version_update_state( + nifi, + client, + &resolved_product_image.product_version, + deployed_version, + ) + .await + .context(ClusterVersionUpdateStateSnafu)?; - // Otherwise we either still need to scale the statefulsets to 0 or all replicas have - // been stopped and we can restart the cluster. - // Both actions will be taken in the regular reconciliation, so we can simply continue - // here - if target_replicas > 0 { - tracing::info!( - "Version change detected, we'll need to scale down the cluster for a full restart." - ); - VersionChangeState::BeginChange - } else { - tracing::info!("Cluster has been stopped for a restart, will scale back up."); - VersionChangeState::Stopped - } - } else { - // No version change detected, propagate this to the reconciliation - VersionChangeState::NoChange + if cluster_version_update_state == ClusterVersionUpdateState::UpdateInProgress { + return Ok(Action::await_change()); } - } else { - // No deployed version set in status, this is probably the first reconciliation ever - // for this cluster, so just let it progress normally - tracing::debug!( - "No deployed version found for this cluster, this is probably the first start, continue reconciliation" - ); - VersionChangeState::NoChange - }; + } + // end todo let validated_config = validated_product_config( nifi, @@ -570,6 +525,14 @@ pub async fn reconcile_nifi( ) .await?; + let role_group = role.role_groups.get(&rolegroup.role_group); + let replicas = + if cluster_version_update_state == ClusterVersionUpdateState::UpdateRequested { + Some(0) + } else { + role_group.and_then(|rg| rg.replicas).map(i32::from) + }; + let rg_statefulset = build_node_rolegroup_statefulset( nifi, &resolved_product_image, @@ -579,7 +542,8 @@ pub async fn reconcile_nifi( rolegroup_config, &merged_config, &nifi_authentication_config, - &version_change, + rolling_upgrade_supported, + replicas, &rbac_sa.name_any(), ) .await?; @@ -661,7 +625,7 @@ pub async fn reconcile_nifi( // Update the deployed product version in the status after everything has been deployed, unless // we are still in the process of updating - let status = if version_change != VersionChangeState::BeginChange { + let status = if cluster_version_update_state != ClusterVersionUpdateState::UpdateRequested { NifiStatus { deployed_version: Some(resolved_product_image.product_version), conditions, @@ -907,7 +871,8 @@ async fn build_node_rolegroup_statefulset( rolegroup_config: &HashMap>, merged_config: &NifiConfig, nifi_auth_config: &NifiAuthenticationConfig, - version_change_state: &VersionChangeState, + rolling_update_supported: bool, + replicas: Option, sa_name: &str, ) -> Result { tracing::debug!("Building statefulset"); @@ -1391,11 +1356,7 @@ async fn build_node_rolegroup_statefulset( .build(), spec: Some(StatefulSetSpec { pod_management_policy: Some("Parallel".to_string()), - replicas: if version_change_state == &VersionChangeState::BeginChange { - Some(0) - } else { - role_group.and_then(|rg| rg.replicas).map(i32::from) - }, + replicas, selector: LabelSelector { match_labels: Some( Labels::role_group_selector( @@ -1412,7 +1373,11 @@ async fn build_node_rolegroup_statefulset( service_name: rolegroup_ref.object_name(), template: pod_template, update_strategy: Some(StatefulSetUpdateStrategy { - type_: Some("OnDelete".to_string()), + type_: if rolling_update_supported { + Some("RollingUpdate".to_string()) + } else { + Some("OnDelete".to_string()) + }, ..StatefulSetUpdateStrategy::default() }), volume_claim_templates: Some(vec![ diff --git a/rust/operator-binary/src/operations/mod.rs b/rust/operator-binary/src/operations/mod.rs index 92ca2ec7..c62006d9 100644 --- a/rust/operator-binary/src/operations/mod.rs +++ b/rust/operator-binary/src/operations/mod.rs @@ -1,2 +1,3 @@ pub mod graceful_shutdown; pub mod pdb; +pub mod upgrade; diff --git a/rust/operator-binary/src/operations/upgrade.rs b/rust/operator-binary/src/operations/upgrade.rs new file mode 100644 index 00000000..69a01820 --- /dev/null +++ b/rust/operator-binary/src/operations/upgrade.rs @@ -0,0 +1,124 @@ +// TODO: This module can be removed once we don't support NiFi 1.x versions anymore +// It manages the version upgrade procedure for NiFi versions prior to NiFi 2, since rolling upgrade is not supported there yet + +use snafu::{OptionExt, ResultExt, Snafu}; +use stackable_operator::{ + client::Client, + k8s_openapi::{api::apps::v1::StatefulSet, apimachinery::pkg::apis::meta::v1::LabelSelector}, + kvp::Labels, +}; + +use crate::crd::{APP_NAME, NifiRole, v1alpha1}; + +#[derive(Snafu, Debug)] +pub enum Error { + #[snafu(display("object defines no namespace"))] + ObjectHasNoNamespace, + + #[snafu(display("failed to fetch deployed StatefulSets"))] + FetchStatefulsets { + source: stackable_operator::client::Error, + }, + + #[snafu(display("failed to build labels"))] + LabelBuild { + source: stackable_operator::kvp::LabelError, + }, +} + +type Result = std::result::Result; + +// This struct is used for NiFi versions not supporting rolling upgrades since in that case +// we have to manage the restart process ourselves and need to track the state of it +#[derive(Debug, PartialEq, Eq)] +pub enum ClusterVersionUpdateState { + UpdateRequested, + UpdateInProgress, + ClusterStopped, + NoVersionChange, +} + +pub async fn cluster_version_update_state( + nifi: &v1alpha1::NifiCluster, + client: &Client, + resolved_version: &String, + deployed_version: Option<&String>, +) -> Result { + let namespace = &nifi + .metadata + .namespace + .clone() + .with_context(|| ObjectHasNoNamespaceSnafu {})?; + + // Handle full restarts for a version change + match deployed_version { + Some(deployed_version) => { + if deployed_version != resolved_version { + // Check if statefulsets are already scaled to zero, if not - requeue + let selector = LabelSelector { + match_expressions: None, + match_labels: Some( + Labels::role_selector(nifi, APP_NAME, &NifiRole::Node.to_string()) + .context(LabelBuildSnafu)? + .into(), + ), + }; + + // Retrieve the deployed statefulsets to check on the current status of the restart + let deployed_statefulsets = client + .list_with_label_selector::(namespace, &selector) + .await + .context(FetchStatefulsetsSnafu)?; + + // Sum target replicas for all statefulsets + let target_replicas = deployed_statefulsets + .iter() + .filter_map(|statefulset| statefulset.spec.as_ref()) + .filter_map(|spec| spec.replicas) + .sum::(); + + // Sum current ready replicas for all statefulsets + let current_replicas = deployed_statefulsets + .iter() + .filter_map(|statefulset| statefulset.status.as_ref()) + .map(|status| status.replicas) + .sum::(); + + // If statefulsets have already been scaled to zero, but have remaining replicas + // we requeue to wait until a full stop has been performed. + if target_replicas == 0 && current_replicas > 0 { + tracing::info!( + "Cluster is performing a full restart at the moment and still shutting down, remaining replicas: [{}] - requeueing to wait for shutdown to finish", + current_replicas + ); + return Ok(ClusterVersionUpdateState::UpdateInProgress); + } + + // Otherwise we either still need to scale the statefulsets to 0 or all replicas have + // been stopped and we can restart the cluster. + // Both actions will be taken in the regular reconciliation, so we can simply continue + // here + if target_replicas > 0 { + tracing::info!( + "Version change detected, we'll need to scale down the cluster for a full restart." + ); + Ok(ClusterVersionUpdateState::UpdateRequested) + } else { + tracing::info!("Cluster has been stopped for a restart, will scale back up."); + Ok(ClusterVersionUpdateState::ClusterStopped) + } + } else { + // No version change detected, propagate this to the reconciliation + Ok(ClusterVersionUpdateState::NoVersionChange) + } + } + None => { + // No deployed version set in status, this is probably the first reconciliation ever + // for this cluster, so just let it progress normally + tracing::debug!( + "No deployed version found for this cluster, this is probably the first start, continue reconciliation" + ); + Ok(ClusterVersionUpdateState::NoVersionChange) + } + } +} diff --git a/tests/templates/kuttl/cluster_operation/20-install-nifi.yaml.j2 b/tests/templates/kuttl/cluster_operation/20-install-nifi.yaml.j2 index 7351d64b..deb17ac4 100644 --- a/tests/templates/kuttl/cluster_operation/20-install-nifi.yaml.j2 +++ b/tests/templates/kuttl/cluster_operation/20-install-nifi.yaml.j2 @@ -33,6 +33,7 @@ spec: custom: "{{ test_scenario['values']['nifi-latest'].split(',')[1] }}" productVersion: "{{ test_scenario['values']['nifi-latest'].split(',')[0] }}" {% else %} + custom: null productVersion: "{{ test_scenario['values']['nifi-latest'] }}" {% endif %} pullPolicy: IfNotPresent diff --git a/tests/templates/kuttl/cluster_operation/30-stop-nifi.yaml.j2 b/tests/templates/kuttl/cluster_operation/30-stop-nifi.yaml.j2 index dd41e667..e78083bf 100644 --- a/tests/templates/kuttl/cluster_operation/30-stop-nifi.yaml.j2 +++ b/tests/templates/kuttl/cluster_operation/30-stop-nifi.yaml.j2 @@ -9,6 +9,7 @@ spec: custom: "{{ test_scenario['values']['nifi-latest'].split(',')[1] }}" productVersion: "{{ test_scenario['values']['nifi-latest'].split(',')[0] }}" {% else %} + custom: null productVersion: "{{ test_scenario['values']['nifi-latest'] }}" {% endif %} pullPolicy: IfNotPresent diff --git a/tests/templates/kuttl/cluster_operation/40-pause-nifi.yaml.j2 b/tests/templates/kuttl/cluster_operation/40-pause-nifi.yaml.j2 index cb43d975..85f71533 100644 --- a/tests/templates/kuttl/cluster_operation/40-pause-nifi.yaml.j2 +++ b/tests/templates/kuttl/cluster_operation/40-pause-nifi.yaml.j2 @@ -9,6 +9,7 @@ spec: custom: "{{ test_scenario['values']['nifi-latest'].split(',')[1] }}" productVersion: "{{ test_scenario['values']['nifi-latest'].split(',')[0] }}" {% else %} + custom: null productVersion: "{{ test_scenario['values']['nifi-latest'] }}" {% endif %} pullPolicy: IfNotPresent diff --git a/tests/templates/kuttl/cluster_operation/50-restart-nifi.yaml.j2 b/tests/templates/kuttl/cluster_operation/50-restart-nifi.yaml.j2 index 6232efdb..4c50747e 100644 --- a/tests/templates/kuttl/cluster_operation/50-restart-nifi.yaml.j2 +++ b/tests/templates/kuttl/cluster_operation/50-restart-nifi.yaml.j2 @@ -9,6 +9,7 @@ spec: custom: "{{ test_scenario['values']['nifi-latest'].split(',')[1] }}" productVersion: "{{ test_scenario['values']['nifi-latest'].split(',')[0] }}" {% else %} + custom: null productVersion: "{{ test_scenario['values']['nifi-latest'] }}" {% endif %} pullPolicy: IfNotPresent diff --git a/tests/templates/kuttl/ldap/12-install-nifi.yaml.j2 b/tests/templates/kuttl/ldap/12-install-nifi.yaml.j2 index 8852e066..8df853c6 100644 --- a/tests/templates/kuttl/ldap/12-install-nifi.yaml.j2 +++ b/tests/templates/kuttl/ldap/12-install-nifi.yaml.j2 @@ -27,6 +27,7 @@ spec: custom: "{{ test_scenario['values']['nifi'].split(',')[1] }}" productVersion: "{{ test_scenario['values']['nifi'].split(',')[0] }}" {% else %} + custom: null productVersion: "{{ test_scenario['values']['nifi'] }}" {% endif %} pullPolicy: IfNotPresent diff --git a/tests/templates/kuttl/logging/04-install-nifi.yaml.j2 b/tests/templates/kuttl/logging/04-install-nifi.yaml.j2 index 65a22b80..c3926671 100644 --- a/tests/templates/kuttl/logging/04-install-nifi.yaml.j2 +++ b/tests/templates/kuttl/logging/04-install-nifi.yaml.j2 @@ -87,6 +87,7 @@ spec: custom: "{{ test_scenario['values']['nifi'].split(',')[1] }}" productVersion: "{{ test_scenario['values']['nifi'].split(',')[0] }}" {% else %} + custom: null productVersion: "{{ test_scenario['values']['nifi'] }}" {% endif %} pullPolicy: IfNotPresent diff --git a/tests/templates/kuttl/oidc/12_nifi.yaml.j2 b/tests/templates/kuttl/oidc/12_nifi.yaml.j2 index 764ab8bf..6edd2e42 100644 --- a/tests/templates/kuttl/oidc/12_nifi.yaml.j2 +++ b/tests/templates/kuttl/oidc/12_nifi.yaml.j2 @@ -24,6 +24,7 @@ spec: custom: "{{ test_scenario['values']['nifi'].split(',')[1] }}" productVersion: "{{ test_scenario['values']['nifi'].split(',')[0] }}" {% else %} + custom: null productVersion: "{{ test_scenario['values']['nifi'] }}" {% endif %} pullPolicy: IfNotPresent diff --git a/tests/templates/kuttl/orphaned_resources/02-install-nifi.yaml.j2 b/tests/templates/kuttl/orphaned_resources/02-install-nifi.yaml.j2 index c2cfe9e7..43ed9fb4 100644 --- a/tests/templates/kuttl/orphaned_resources/02-install-nifi.yaml.j2 +++ b/tests/templates/kuttl/orphaned_resources/02-install-nifi.yaml.j2 @@ -33,6 +33,7 @@ spec: custom: "{{ test_scenario['values']['nifi'].split(',')[1] }}" productVersion: "{{ test_scenario['values']['nifi'].split(',')[0] }}" {% else %} + custom: null productVersion: "{{ test_scenario['values']['nifi'] }}" {% endif %} pullPolicy: IfNotPresent diff --git a/tests/templates/kuttl/resources/02-install-nifi.yaml.j2 b/tests/templates/kuttl/resources/02-install-nifi.yaml.j2 index 3e55f070..d6aeee86 100644 --- a/tests/templates/kuttl/resources/02-install-nifi.yaml.j2 +++ b/tests/templates/kuttl/resources/02-install-nifi.yaml.j2 @@ -33,6 +33,7 @@ spec: custom: "{{ test_scenario['values']['nifi'].split(',')[1] }}" productVersion: "{{ test_scenario['values']['nifi'].split(',')[0] }}" {% else %} + custom: null productVersion: "{{ test_scenario['values']['nifi'] }}" {% endif %} pullPolicy: IfNotPresent diff --git a/tests/templates/kuttl/smoke/30-install-nifi.yaml.j2 b/tests/templates/kuttl/smoke/30-install-nifi.yaml.j2 index b6b3191f..aceebaf6 100644 --- a/tests/templates/kuttl/smoke/30-install-nifi.yaml.j2 +++ b/tests/templates/kuttl/smoke/30-install-nifi.yaml.j2 @@ -34,6 +34,7 @@ spec: custom: "{{ test_scenario['values']['nifi'].split(',')[1] }}" productVersion: "{{ test_scenario['values']['nifi'].split(',')[0] }}" {% else %} + custom: null productVersion: "{{ test_scenario['values']['nifi'] }}" {% endif %} pullPolicy: IfNotPresent diff --git a/tests/templates/kuttl/upgrade/02-assert.yaml.j2 b/tests/templates/kuttl/upgrade/02-assert.yaml.j2 index 4b1ce7d7..634f281b 100644 --- a/tests/templates/kuttl/upgrade/02-assert.yaml.j2 +++ b/tests/templates/kuttl/upgrade/02-assert.yaml.j2 @@ -16,4 +16,4 @@ kind: NifiCluster metadata: name: test-nifi status: - deployed_version: {{ test_scenario['values']['nifi_old'].split("-")[0] }} + deployed_version: {{ test_scenario['values']['nifi_old'].split(',')[0] }} diff --git a/tests/templates/kuttl/upgrade/02-install-nifi.yaml.j2 b/tests/templates/kuttl/upgrade/02-install-nifi.yaml.j2 index 10c879d4..47337032 100644 --- a/tests/templates/kuttl/upgrade/02-install-nifi.yaml.j2 +++ b/tests/templates/kuttl/upgrade/02-install-nifi.yaml.j2 @@ -33,6 +33,7 @@ spec: custom: "{{ test_scenario['values']['nifi_old'].split(',')[1] }}" productVersion: "{{ test_scenario['values']['nifi_old'].split(',')[0] }}" {% else %} + custom: null productVersion: "{{ test_scenario['values']['nifi_old'] }}" {% endif %} pullPolicy: IfNotPresent diff --git a/tests/templates/kuttl/upgrade/04-assert.yaml.j2 b/tests/templates/kuttl/upgrade/04-assert.yaml.j2 index 9f90b3f1..8bcf1c8e 100644 --- a/tests/templates/kuttl/upgrade/04-assert.yaml.j2 +++ b/tests/templates/kuttl/upgrade/04-assert.yaml.j2 @@ -4,7 +4,13 @@ kind: TestAssert timeout: 300 commands: - script: kubectl exec -n $NAMESPACE test-nifi-0 -- python /tmp/test_nifi.py -u admin -p supersecretpassword -n $NAMESPACE -c 3 -{% if test_scenario['values']['nifi_old'].startswith('1.') %} +{% if test_scenario['values']['nifi_old'].split(',')[0].startswith('1.') %} - script: kubectl exec -n $NAMESPACE test-nifi-0 -- python /tmp/test_nifi_metrics.py -n $NAMESPACE {% endif %} - - script: kubectl exec -n $NAMESPACE test-nifi-0 -- sh -c "python /tmp/flow.py -e https://test-nifi-node-default-0.test-nifi-node-default.$NAMESPACE.svc.cluster.local:8443 run -t /tmp/generate-and-log-flowfiles.xml > /tmp/old_input" +{% if test_scenario['values']['nifi_old'].split(',')[0] == '2.0.0' %} + - script: kubectl exec -n $NAMESPACE test-nifi-0 -- sh -c "python /tmp/flow.py -e https://test-nifi-node-default-0.test-nifi-node-default.$NAMESPACE.svc.cluster.local:8443 run json /tmp/generate-and-log-flowfiles.json > /tmp/old_input" +{% else %} + - script: kubectl exec -n $NAMESPACE test-nifi-0 -- sh -c "python /tmp/flow.py -e https://test-nifi-node-default-0.test-nifi-node-default.$NAMESPACE.svc.cluster.local:8443 run template /tmp/generate-and-log-flowfiles.xml > /tmp/old_input" +{% endif %} + # This tests if the output contains an Error or zero flow files are queued, which also indicates that something went wrong + - script: kubectl exec -n $NAMESPACE test-nifi-0 -- sh -c "cat /tmp/old_input | grep -Eov 'Error|\b0\b'" diff --git a/tests/templates/kuttl/upgrade/04-prepare-test-nifi.yaml b/tests/templates/kuttl/upgrade/04-prepare-test-nifi.yaml.j2 similarity index 70% rename from tests/templates/kuttl/upgrade/04-prepare-test-nifi.yaml rename to tests/templates/kuttl/upgrade/04-prepare-test-nifi.yaml.j2 index 9fe76b7b..fc488c1e 100644 --- a/tests/templates/kuttl/upgrade/04-prepare-test-nifi.yaml +++ b/tests/templates/kuttl/upgrade/04-prepare-test-nifi.yaml.j2 @@ -5,5 +5,9 @@ commands: - script: kubectl cp -n $NAMESPACE ./test_nifi_metrics.py test-nifi-0:/tmp - script: kubectl cp -n $NAMESPACE ./test_nifi.py test-nifi-0:/tmp - script: kubectl cp -n $NAMESPACE ./flow.py test-nifi-0:/tmp +{% if test_scenario['values']['nifi_old'].split(',')[0] == '2.0.0' %} + - script: kubectl cp -n $NAMESPACE ./generate-and-log-flowfiles.json test-nifi-0:/tmp +{% else %} - script: kubectl cp -n $NAMESPACE ./generate-and-log-flowfiles.xml test-nifi-0:/tmp +{% endif %} - script: kubectl cp -n $NAMESPACE ./cacert.pem test-nifi-0:/tmp diff --git a/tests/templates/kuttl/upgrade/05-upgrade-nifi.yaml.j2 b/tests/templates/kuttl/upgrade/05-upgrade-nifi.yaml.j2 index ad33cfae..ec6e3b36 100644 --- a/tests/templates/kuttl/upgrade/05-upgrade-nifi.yaml.j2 +++ b/tests/templates/kuttl/upgrade/05-upgrade-nifi.yaml.j2 @@ -9,5 +9,6 @@ spec: custom: "{{ test_scenario['values']['nifi_new'].split(',')[1] }}" productVersion: "{{ test_scenario['values']['nifi_new'].split(',')[0] }}" {% else %} + custom: null productVersion: "{{ test_scenario['values']['nifi_new'] }}" {% endif %} diff --git a/tests/templates/kuttl/upgrade/07-assert.yaml.j2 b/tests/templates/kuttl/upgrade/07-assert.yaml.j2 index 24abfbc8..a71c5a37 100644 --- a/tests/templates/kuttl/upgrade/07-assert.yaml.j2 +++ b/tests/templates/kuttl/upgrade/07-assert.yaml.j2 @@ -6,9 +6,11 @@ metadata: timeout: 300 commands: - script: kubectl exec -n $NAMESPACE test-nifi-0 -- python /tmp/test_nifi.py -u admin -p supersecretpassword -n $NAMESPACE -c 3 -{% if test_scenario['values']['nifi_new'].startswith('1.') %} +{% if test_scenario['values']['nifi_new'].split(',')[0].startswith('1.') %} - script: kubectl exec -n $NAMESPACE test-nifi-0 -- python /tmp/test_nifi_metrics.py -n $NAMESPACE {% endif %} - script: kubectl exec -n $NAMESPACE test-nifi-0 -- sh -c "python /tmp/flow.py -e https://test-nifi-node-default-0.test-nifi-node-default.$NAMESPACE.svc.cluster.local:8443 query > /tmp/new_input" + # This tests if the output contains an Error or zero flow files are queued, which also indicates that something went wrong + - script: kubectl exec -n $NAMESPACE test-nifi-0 -- sh -c "cat /tmp/new_input | grep -Eov 'Error|\b0\b'" # This tests that the number of input records stays the same after the upgrade. - script: kubectl exec -n $NAMESPACE test-nifi-0 -- diff /tmp/old_input /tmp/new_input diff --git a/tests/templates/kuttl/upgrade/flow.py b/tests/templates/kuttl/upgrade/flow.py index a5fa03ae..61834b43 100755 --- a/tests/templates/kuttl/upgrade/flow.py +++ b/tests/templates/kuttl/upgrade/flow.py @@ -3,7 +3,13 @@ # of seconds, stop it and then query the number of queued flow files. import nipyapi -from nipyapi.canvas import get_root_pg_id, schedule_process_group, list_all_controllers, schedule_controller, recurse_flow +from nipyapi.canvas import ( + get_root_pg_id, + schedule_process_group, + list_all_controllers, + schedule_controller, + recurse_flow, +) from nipyapi.security import service_login from nipyapi.templates import get_template, upload_template, deploy_template from nipyapi.nifi.models import FlowEntity @@ -22,6 +28,8 @@ PASSWORD = "supersecretpassword" TEMPLATE_NAME = "generate-and-log-flowfiles" TEMPLATE_FILE = f"{TEMPLATE_NAME}.xml" +JSON_NAME = "generate-and-log-flowfiles" +JSON_FILE = f"{JSON_NAME}.json" CONNECTION_NAME = "output-connection" @@ -31,9 +39,7 @@ class Error: def parse_args(args: List[str]) -> argparse.Namespace: - parser = argparse.ArgumentParser( - description="Setup a flow and query the status." - ) + parser = argparse.ArgumentParser(description="Setup a flow and query the status.") parser.add_argument( "-e", "--endpoint", @@ -55,11 +61,21 @@ def parse_args(args: List[str]) -> argparse.Namespace: "run", help="Install and run a flow.", ) - run_parser.add_argument( - "-t", - "--template", + format_parser = run_parser.add_subparsers(dest="formatcommand") + template_parser = format_parser.add_parser( + "template", help="Use the template format for the flow." + ) + template_parser.add_argument( + "template", help="Nifi template file.", ) + json_parser = format_parser.add_parser( + "json", help="Use the json format for the flow." + ) + json_parser.add_argument( + "json", + help="Nifi json file.", + ) subparsers.add_parser( "query", help="Query the flow.", @@ -73,53 +89,119 @@ def login(endpoint: str, user: str, passwd: str) -> bool: return service_login(username=user, password=passwd) -def flow_from_template(pg_id: str, template_file: str, template_name: str) -> FlowEntity: - upload_template(pg_id, template_file) - template_id = get_template(template_name).id - return deploy_template(pg_id, template_id, 200, 0) +def flow_from_template( + pg_id: str, template_file: str, template_name: str +) -> FlowEntity: + upload_template(pg_id, template_file) + template_id = get_template(template_name).id + return deploy_template(pg_id, template_id, 200, 0) def schedule(pg_id: str, toggle: bool) -> bool: - for controller in list_all_controllers(): - schedule_controller(controller, scheduled=toggle) - return schedule_process_group(pg_id, scheduled=toggle) + for controller in list_all_controllers(pg_id): + schedule_controller(controller, scheduled=toggle) + return schedule_process_group(pg_id, scheduled=toggle) def flow_files_queued(connection_name: str) -> Union[str, Error]: - """ Returns the current input record number for the given component""" - flow = recurse_flow() - c = [c for c in flow.process_group_flow.flow.connections if c.status.name == connection_name] - if c: - return c[0].status.aggregate_snapshot.flow_files_queued - else: - return Error(f"No connection named '{connection_name}' found.") + """Returns the current input record number for the given component""" + flow = recurse_flow() + c = [ + c + for c in flow.process_group_flow.flow.connections + if c.status.name == connection_name + ] + if c: + return c[0].status.aggregate_snapshot.flow_files_queued + else: + # for nifi 2.x the flow is loaded via api into a new process group + # we need to look for the connection in that process group instead + if flow.process_group_flow.flow.process_groups: + inner_flow = recurse_flow(flow.process_group_flow.flow.process_groups[0].id) + inner_c = [ + inner_c + for inner_c in inner_flow.process_group_flow.flow.connections + if inner_c.status.name == connection_name + ] + if inner_c: + return inner_c[0].status.aggregate_snapshot.flow_files_queued + else: + return Error(f"No connection named '{connection_name}' found.") + else: + return Error(f"No connection named '{connection_name}' found.") def main(): - # turn this on to debug Nifi api calls. Also need to import logging - # logging.getLogger().setLevel(logging.INFO) - - args = parse_args(sys.argv[1:]) - - urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) - - username = args.user or USERNAME - password = args.password or PASSWORD - - if args.subcommand == "run": - template_file = args.template or TEMPLATE_FILE - template_name = re.sub(r"\..+$", "", os.path.basename(template_file)) - - login(args.endpoint, username, password) - pg_id = get_root_pg_id() - flow_from_template(pg_id, template_file, template_name) - schedule(pg_id, True) # start - time.sleep(5) # give the flow 5 seconds to run - schedule(pg_id, False) # stop - print(flow_files_queued(CONNECTION_NAME)) - elif args.subcommand == "query": - login(args.endpoint, username, password) - print(flow_files_queued(CONNECTION_NAME)) + # turn this on to debug Nifi api calls. Also need to import logging + # logging.getLogger().setLevel(logging.INFO) + + args = parse_args(sys.argv[1:]) + + urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + + username = args.user or USERNAME + password = args.password or PASSWORD + + if args.subcommand == "run": + login(args.endpoint, username, password) + pg_id = get_root_pg_id() + + if args.formatcommand == "template": + template_file = args.template or TEMPLATE_FILE + template_name = re.sub(r"\..+$", "", os.path.basename(template_file)) + + flow_from_template(pg_id, template_file, template_name) + + elif args.formatcommand == "json": + json_file = args.json or JSON_FILE + json_name = re.sub(r"\..+$", "", os.path.basename(json_file)) + + if not nipyapi.config.nifi_config.api_client: + nipyapi.config.nifi_config.api_client = nipyapi.nifi.ApiClient() + + header_params = {} + header_params["Accept"] = ( + nipyapi.config.nifi_config.api_client.select_header_accept( + ["application/json"] + ) + ) + header_params["Content-Type"] = ( + nipyapi.config.nifi_config.api_client.select_header_content_type( + ["multipart/form-data"] + ) + ) + + nipyapi.config.nifi_config.api_client.call_api( + "/process-groups/{pg_id}/process-groups/upload", + "POST", + path_params={"pg_id": pg_id}, + header_params=header_params, + _return_http_data_only=True, + post_params=[ + ("id", pg_id), + ("groupName", json_name), + ("positionX", 100), + ("positionY", 10), + ("clientId", nipyapi.nifi.FlowApi().generate_client_id()), + ], + files={"file": json_file}, + auth_settings=["tokenAuth"], + ) + + # get the process group id of the newly created process group to pass it to the schedule() function + # for nifi 2.x we need to schedule the inner process group and instead of the root process group + flow = recurse_flow() + inner_flow_id = flow.process_group_flow.flow.process_groups[0].id + pg_id = inner_flow_id + + schedule(pg_id, True) # start + time.sleep(5) # give the flow 5 seconds to run + schedule(pg_id, False) # stop + print(flow_files_queued(CONNECTION_NAME)) + + elif args.subcommand == "query": + login(args.endpoint, username, password) + print(flow_files_queued(CONNECTION_NAME)) if __name__ == "__main__": diff --git a/tests/templates/kuttl/upgrade/generate-and-log-flowfiles.json b/tests/templates/kuttl/upgrade/generate-and-log-flowfiles.json new file mode 100644 index 00000000..27593ebb --- /dev/null +++ b/tests/templates/kuttl/upgrade/generate-and-log-flowfiles.json @@ -0,0 +1,338 @@ +{ + "flowContents": { + "identifier": "2fd98cd2-df08-3faa-8b9a-d5343ffd500d", + "instanceIdentifier": "f1d4875b-0195-1000-dc79-af7f2c011d90", + "name": "NiFi Flow", + "comments": "", + "position": { + "x": 0.0, + "y": 0.0 + }, + "processGroups": [], + "remoteProcessGroups": [], + "processors": [ + { + "identifier": "1a6cf8b8-6ab9-3b3f-a7e6-8b2ec3196af8", + "instanceIdentifier": "bd1a2413-49c5-3d4b-9ac9-8d6aae6cbf33", + "name": "GenerateFlowFile", + "comments": "", + "position": { + "x": 200.0, + "y": 0.0 + }, + "type": "org.apache.nifi.processors.standard.GenerateFlowFile", + "bundle": { + "group": "org.apache.nifi", + "artifact": "nifi-standard-nar", + "version": "2.0.0" + }, + "properties": { + "character-set": "UTF-8", + "File Size": "64B", + "mime-type": null, + "generate-ff-custom-text": null, + "Batch Size": "1", + "Unique FlowFiles": "false", + "Data Format": "Text" + }, + "propertyDescriptors": { + "character-set": { + "name": "character-set", + "displayName": "Character Set", + "identifiesControllerService": false, + "sensitive": false, + "dynamic": false + }, + "File Size": { + "name": "File Size", + "displayName": "File Size", + "identifiesControllerService": false, + "sensitive": false, + "dynamic": false + }, + "mime-type": { + "name": "mime-type", + "displayName": "Mime Type", + "identifiesControllerService": false, + "sensitive": false, + "dynamic": false + }, + "generate-ff-custom-text": { + "name": "generate-ff-custom-text", + "displayName": "Custom Text", + "identifiesControllerService": false, + "sensitive": false, + "dynamic": false + }, + "Batch Size": { + "name": "Batch Size", + "displayName": "Batch Size", + "identifiesControllerService": false, + "sensitive": false, + "dynamic": false + }, + "Unique FlowFiles": { + "name": "Unique FlowFiles", + "displayName": "Unique FlowFiles", + "identifiesControllerService": false, + "sensitive": false, + "dynamic": false + }, + "Data Format": { + "name": "Data Format", + "displayName": "Data Format", + "identifiesControllerService": false, + "sensitive": false, + "dynamic": false + } + }, + "style": {}, + "schedulingPeriod": "1 sec", + "schedulingStrategy": "TIMER_DRIVEN", + "executionNode": "ALL", + "penaltyDuration": "30 sec", + "yieldDuration": "1 sec", + "bulletinLevel": "WARN", + "runDurationMillis": 0, + "concurrentlySchedulableTaskCount": 1, + "autoTerminatedRelationships": [], + "scheduledState": "ENABLED", + "retryCount": 0, + "retriedRelationships": [], + "backoffMechanism": "PENALIZE_FLOWFILE", + "maxBackoffPeriod": "10 mins", + "componentType": "PROCESSOR", + "groupIdentifier": "2fd98cd2-df08-3faa-8b9a-d5343ffd500d" + }, + { + "identifier": "766493bb-db2d-3ec1-aac9-e9e667f6ff5f", + "instanceIdentifier": "83c0bba6-72a1-36d9-a3af-37d480b3a7fe", + "name": "LogAttribute", + "comments": "", + "position": { + "x": 1016.0, + "y": 0.0 + }, + "type": "org.apache.nifi.processors.standard.LogAttribute", + "bundle": { + "group": "org.apache.nifi", + "artifact": "nifi-standard-nar", + "version": "2.0.0" + }, + "properties": { + "Log prefix": null, + "character-set": "US-ASCII", + "Log FlowFile Properties": "true", + "Log Level": "info", + "attributes-to-ignore-regex": null, + "Attributes to Ignore": null, + "Attributes to Log": null, + "attributes-to-log-regex": ".*", + "Output Format": "Line per Attribute", + "Log Payload": "false" + }, + "propertyDescriptors": { + "Log prefix": { + "name": "Log prefix", + "displayName": "Log prefix", + "identifiesControllerService": false, + "sensitive": false, + "dynamic": false + }, + "character-set": { + "name": "character-set", + "displayName": "Character Set", + "identifiesControllerService": false, + "sensitive": false, + "dynamic": false + }, + "Log FlowFile Properties": { + "name": "Log FlowFile Properties", + "displayName": "Log FlowFile Properties", + "identifiesControllerService": false, + "sensitive": false, + "dynamic": false + }, + "Log Level": { + "name": "Log Level", + "displayName": "Log Level", + "identifiesControllerService": false, + "sensitive": false, + "dynamic": false + }, + "attributes-to-ignore-regex": { + "name": "attributes-to-ignore-regex", + "displayName": "Attributes to Ignore by Regular Expression", + "identifiesControllerService": false, + "sensitive": false, + "dynamic": false + }, + "Attributes to Ignore": { + "name": "Attributes to Ignore", + "displayName": "Attributes to Ignore", + "identifiesControllerService": false, + "sensitive": false, + "dynamic": false + }, + "Attributes to Log": { + "name": "Attributes to Log", + "displayName": "Attributes to Log", + "identifiesControllerService": false, + "sensitive": false, + "dynamic": false + }, + "attributes-to-log-regex": { + "name": "attributes-to-log-regex", + "displayName": "Attributes to Log by Regular Expression", + "identifiesControllerService": false, + "sensitive": false, + "dynamic": false + }, + "Output Format": { + "name": "Output Format", + "displayName": "Output Format", + "identifiesControllerService": false, + "sensitive": false, + "dynamic": false + }, + "Log Payload": { + "name": "Log Payload", + "displayName": "Log Payload", + "identifiesControllerService": false, + "sensitive": false, + "dynamic": false + } + }, + "style": {}, + "schedulingPeriod": "0 sec", + "schedulingStrategy": "TIMER_DRIVEN", + "executionNode": "ALL", + "penaltyDuration": "30 sec", + "yieldDuration": "1 sec", + "bulletinLevel": "WARN", + "runDurationMillis": 0, + "concurrentlySchedulableTaskCount": 1, + "autoTerminatedRelationships": [ + "success" + ], + "scheduledState": "ENABLED", + "retryCount": 0, + "retriedRelationships": [], + "backoffMechanism": "PENALIZE_FLOWFILE", + "maxBackoffPeriod": "10 mins", + "componentType": "PROCESSOR", + "groupIdentifier": "2fd98cd2-df08-3faa-8b9a-d5343ffd500d" + } + ], + "inputPorts": [], + "outputPorts": [ + { + "identifier": "0f402e28-8c6a-30d4-bed8-0a9c3a0e0d0f", + "instanceIdentifier": "91a1e057-83f6-37e8-8bb1-6d59dd861686", + "name": "Output", + "position": { + "x": 672.0, + "y": 224.0 + }, + "type": "OUTPUT_PORT", + "concurrentlySchedulableTaskCount": 1, + "scheduledState": "ENABLED", + "allowRemoteAccess": true, + "portFunction": "STANDARD", + "componentType": "OUTPUT_PORT", + "groupIdentifier": "2fd98cd2-df08-3faa-8b9a-d5343ffd500d" + } + ], + "connections": [ + { + "identifier": "7822c7dc-3306-3d3c-9e28-f3e3efd9897a", + "instanceIdentifier": "3a49a47d-0862-34fc-bde7-e83012d7e4fb", + "name": "", + "source": { + "id": "1a6cf8b8-6ab9-3b3f-a7e6-8b2ec3196af8", + "type": "PROCESSOR", + "groupId": "2fd98cd2-df08-3faa-8b9a-d5343ffd500d", + "name": "GenerateFlowFile", + "comments": "", + "instanceIdentifier": "bd1a2413-49c5-3d4b-9ac9-8d6aae6cbf33" + }, + "destination": { + "id": "766493bb-db2d-3ec1-aac9-e9e667f6ff5f", + "type": "PROCESSOR", + "groupId": "2fd98cd2-df08-3faa-8b9a-d5343ffd500d", + "name": "LogAttribute", + "comments": "", + "instanceIdentifier": "83c0bba6-72a1-36d9-a3af-37d480b3a7fe" + }, + "labelIndex": 1, + "zIndex": 0, + "selectedRelationships": [ + "success" + ], + "backPressureObjectThreshold": 10000, + "backPressureDataSizeThreshold": "1 GB", + "flowFileExpiration": "0 sec", + "prioritizers": [], + "bends": [], + "loadBalanceStrategy": "DO_NOT_LOAD_BALANCE", + "partitioningAttribute": "", + "loadBalanceCompression": "DO_NOT_COMPRESS", + "componentType": "CONNECTION", + "groupIdentifier": "2fd98cd2-df08-3faa-8b9a-d5343ffd500d" + }, + { + "identifier": "e46e6370-45fe-3387-8c6d-a22fbc846dc4", + "instanceIdentifier": "92c1103a-88bd-39a1-9fb0-498afa1214b3", + "name": "output-connection", + "source": { + "id": "1a6cf8b8-6ab9-3b3f-a7e6-8b2ec3196af8", + "type": "PROCESSOR", + "groupId": "2fd98cd2-df08-3faa-8b9a-d5343ffd500d", + "name": "GenerateFlowFile", + "comments": "", + "instanceIdentifier": "bd1a2413-49c5-3d4b-9ac9-8d6aae6cbf33" + }, + "destination": { + "id": "0f402e28-8c6a-30d4-bed8-0a9c3a0e0d0f", + "type": "OUTPUT_PORT", + "groupId": "2fd98cd2-df08-3faa-8b9a-d5343ffd500d", + "name": "Output", + "instanceIdentifier": "91a1e057-83f6-37e8-8bb1-6d59dd861686" + }, + "labelIndex": 1, + "zIndex": 0, + "selectedRelationships": [ + "success" + ], + "backPressureObjectThreshold": 10000, + "backPressureDataSizeThreshold": "1 GB", + "flowFileExpiration": "0 sec", + "prioritizers": [], + "bends": [], + "loadBalanceStrategy": "DO_NOT_LOAD_BALANCE", + "partitioningAttribute": "", + "loadBalanceCompression": "DO_NOT_COMPRESS", + "componentType": "CONNECTION", + "groupIdentifier": "2fd98cd2-df08-3faa-8b9a-d5343ffd500d" + } + ], + "labels": [], + "funnels": [], + "controllerServices": [], + "defaultFlowFileExpiration": "0 sec", + "defaultBackPressureObjectThreshold": 10000, + "defaultBackPressureDataSizeThreshold": "1 GB", + "scheduledState": "ENABLED", + "executionEngine": "INHERITED", + "maxConcurrentTasks": 1, + "statelessFlowTimeout": "1 min", + "flowFileConcurrency": "UNBOUNDED", + "flowFileOutboundPolicy": "STREAM_WHEN_AVAILABLE", + "componentType": "PROCESS_GROUP" + }, + "externalControllerServices": {}, + "parameterContexts": {}, + "flowEncodingVersion": "1.0", + "parameterProviders": {}, + "latest": false +}