@@ -87,7 +87,11 @@ use crate::{
8787 STACKABLE_LOG_CONFIG_DIR , STACKABLE_LOG_DIR , authentication:: AuthenticationClassResolved ,
8888 v1alpha1,
8989 } ,
90- operations:: { graceful_shutdown:: add_graceful_shutdown_config, pdb:: add_pdbs} ,
90+ operations:: {
91+ graceful_shutdown:: add_graceful_shutdown_config,
92+ pdb:: add_pdbs,
93+ upgrade:: { self , ClusterVersionUpdateState } ,
94+ } ,
9195 product_logging:: extend_role_group_config_map,
9296 reporting_task:: { self , build_maybe_reporting_task, build_reporting_task_service_name} ,
9397 security:: {
@@ -346,6 +350,9 @@ pub enum Error {
346350 AddVolumeMount {
347351 source : builder:: pod:: container:: Error ,
348352 } ,
353+
354+ #[ snafu( display( "Failed to determine the state of the version upgrade procedure" ) ) ]
355+ ClusterVersionUpdateState { source : upgrade:: Error } ,
349356}
350357
351358type Result < T , E = Error > = std:: result:: Result < T , E > ;
@@ -356,13 +363,6 @@ impl ReconcilerError for Error {
356363 }
357364}
358365
359- #[ derive( Debug , PartialEq , Eq ) ]
360- pub enum VersionChangeState {
361- BeginChange ,
362- Stopped ,
363- NoChange ,
364- }
365-
366366pub async fn reconcile_nifi (
367367 nifi : Arc < DeserializeGuard < v1alpha1:: NifiCluster > > ,
368368 ctx : Arc < Ctx > ,
@@ -391,78 +391,33 @@ pub async fn reconcile_nifi(
391391 . await
392392 . context ( SecuritySnafu ) ?;
393393
394- // Handle full restarts for a version change
395- let version_change = if let Some ( deployed_version) = nifi
394+ // If rolling upgrade is supported, kubernetes takes care of the cluster scaling automatically
395+ // otherwise the operator handles it
396+ // manage our own flow for upgrade from 1.x.x to 1.x.x/2.x.x
397+ // TODO: this can be removed once 1.x.x is longer supported
398+ let mut cluster_version_update_state = ClusterVersionUpdateState :: NoVersionChange ;
399+ let deployed_version = nifi
396400 . status
397401 . as_ref ( )
398- . and_then ( |status| status. deployed_version . as_ref ( ) )
399- {
400- if deployed_version != & resolved_product_image. product_version {
401- // Check if statefulsets are already scaled to zero, if not - requeue
402- let selector = LabelSelector {
403- match_expressions : None ,
404- match_labels : Some (
405- Labels :: role_selector ( nifi, APP_NAME , & NifiRole :: Node . to_string ( ) )
406- . context ( LabelBuildSnafu ) ?
407- . into ( ) ,
408- ) ,
409- } ;
402+ . and_then ( |status| status. deployed_version . as_ref ( ) ) ;
403+ let rolling_upgrade_supported = resolved_product_image. product_version . starts_with ( "2." )
404+ && deployed_version. is_some_and ( |v| v. starts_with ( "2." ) ) ;
410405
411- // Retrieve the deployed statefulsets to check on the current status of the restart
412- let deployed_statefulsets = client
413- . list_with_label_selector :: < StatefulSet > ( namespace, & selector)
414- . await
415- . context ( FetchStatefulsetsSnafu ) ?;
416-
417- // Sum target replicas for all statefulsets
418- let target_replicas = deployed_statefulsets
419- . iter ( )
420- . filter_map ( |statefulset| statefulset. spec . as_ref ( ) )
421- . filter_map ( |spec| spec. replicas )
422- . sum :: < i32 > ( ) ;
423-
424- // Sum current ready replicas for all statefulsets
425- let current_replicas = deployed_statefulsets
426- . iter ( )
427- . filter_map ( |statefulset| statefulset. status . as_ref ( ) )
428- . map ( |status| status. replicas )
429- . sum :: < i32 > ( ) ;
430-
431- // If statefulsets have already been scaled to zero, but have remaining replicas
432- // we requeue to wait until a full stop has been performed.
433- if target_replicas == 0 && current_replicas > 0 {
434- tracing:: info!(
435- "Cluster is performing a full restart at the moment and still shutting down, remaining replicas: [{}] - requeueing to wait for shutdown to finish" ,
436- current_replicas
437- ) ;
438- return Ok ( Action :: await_change ( ) ) ;
439- }
406+ if !rolling_upgrade_supported {
407+ cluster_version_update_state = upgrade:: cluster_version_update_state (
408+ nifi,
409+ client,
410+ & resolved_product_image. product_version ,
411+ deployed_version,
412+ )
413+ . await
414+ . context ( ClusterVersionUpdateStateSnafu ) ?;
440415
441- // Otherwise we either still need to scale the statefulsets to 0 or all replicas have
442- // been stopped and we can restart the cluster.
443- // Both actions will be taken in the regular reconciliation, so we can simply continue
444- // here
445- if target_replicas > 0 {
446- tracing:: info!(
447- "Version change detected, we'll need to scale down the cluster for a full restart."
448- ) ;
449- VersionChangeState :: BeginChange
450- } else {
451- tracing:: info!( "Cluster has been stopped for a restart, will scale back up." ) ;
452- VersionChangeState :: Stopped
453- }
454- } else {
455- // No version change detected, propagate this to the reconciliation
456- VersionChangeState :: NoChange
416+ if cluster_version_update_state == ClusterVersionUpdateState :: UpdateInProgress {
417+ return Ok ( Action :: await_change ( ) ) ;
457418 }
458- } else {
459- // No deployed version set in status, this is probably the first reconciliation ever
460- // for this cluster, so just let it progress normally
461- tracing:: debug!(
462- "No deployed version found for this cluster, this is probably the first start, continue reconciliation"
463- ) ;
464- VersionChangeState :: NoChange
465- } ;
419+ }
420+ // end todo
466421
467422 let validated_config = validated_product_config (
468423 nifi,
@@ -570,6 +525,14 @@ pub async fn reconcile_nifi(
570525 )
571526 . await ?;
572527
528+ let role_group = role. role_groups . get ( & rolegroup. role_group ) ;
529+ let replicas =
530+ if cluster_version_update_state == ClusterVersionUpdateState :: UpdateRequested {
531+ Some ( 0 )
532+ } else {
533+ role_group. and_then ( |rg| rg. replicas ) . map ( i32:: from)
534+ } ;
535+
573536 let rg_statefulset = build_node_rolegroup_statefulset (
574537 nifi,
575538 & resolved_product_image,
@@ -579,7 +542,8 @@ pub async fn reconcile_nifi(
579542 rolegroup_config,
580543 & merged_config,
581544 & nifi_authentication_config,
582- & version_change,
545+ rolling_upgrade_supported,
546+ replicas,
583547 & rbac_sa. name_any ( ) ,
584548 )
585549 . await ?;
@@ -661,7 +625,7 @@ pub async fn reconcile_nifi(
661625
662626 // Update the deployed product version in the status after everything has been deployed, unless
663627 // we are still in the process of updating
664- let status = if version_change != VersionChangeState :: BeginChange {
628+ let status = if cluster_version_update_state != ClusterVersionUpdateState :: UpdateRequested {
665629 NifiStatus {
666630 deployed_version : Some ( resolved_product_image. product_version ) ,
667631 conditions,
@@ -907,7 +871,8 @@ async fn build_node_rolegroup_statefulset(
907871 rolegroup_config : & HashMap < PropertyNameKind , BTreeMap < String , String > > ,
908872 merged_config : & NifiConfig ,
909873 nifi_auth_config : & NifiAuthenticationConfig ,
910- version_change_state : & VersionChangeState ,
874+ rolling_update_supported : bool ,
875+ replicas : Option < i32 > ,
911876 sa_name : & str ,
912877) -> Result < StatefulSet > {
913878 tracing:: debug!( "Building statefulset" ) ;
@@ -1391,11 +1356,7 @@ async fn build_node_rolegroup_statefulset(
13911356 . build ( ) ,
13921357 spec : Some ( StatefulSetSpec {
13931358 pod_management_policy : Some ( "Parallel" . to_string ( ) ) ,
1394- replicas : if version_change_state == & VersionChangeState :: BeginChange {
1395- Some ( 0 )
1396- } else {
1397- role_group. and_then ( |rg| rg. replicas ) . map ( i32:: from)
1398- } ,
1359+ replicas,
13991360 selector : LabelSelector {
14001361 match_labels : Some (
14011362 Labels :: role_group_selector (
@@ -1412,7 +1373,11 @@ async fn build_node_rolegroup_statefulset(
14121373 service_name : rolegroup_ref. object_name ( ) ,
14131374 template : pod_template,
14141375 update_strategy : Some ( StatefulSetUpdateStrategy {
1415- type_ : Some ( "OnDelete" . to_string ( ) ) ,
1376+ type_ : if rolling_update_supported {
1377+ Some ( "RollingUpdate" . to_string ( ) )
1378+ } else {
1379+ Some ( "OnDelete" . to_string ( ) )
1380+ } ,
14161381 ..StatefulSetUpdateStrategy :: default ( )
14171382 } ) ,
14181383 volume_claim_templates : Some ( vec ! [
0 commit comments