Skip to content
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 41 additions & 1 deletion pkg/controllers/updaterun/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ import (
"context"
"errors"
"fmt"
"strconv"
"time"

"github.com/prometheus/client_golang/prometheus"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -31,6 +33,7 @@ import (
"go.goms.io/fleet/pkg/utils"
"go.goms.io/fleet/pkg/utils/condition"
"go.goms.io/fleet/pkg/utils/controller"
"go.goms.io/fleet/pkg/utils/controller/metrics"
"go.goms.io/fleet/pkg/utils/informer"
)

Expand Down Expand Up @@ -84,6 +87,9 @@ func (r *Reconciler) Reconcile(ctx context.Context, req runtime.Request) (runtim
return runtime.Result{}, err
}

// Emit the update run status metric based on status conditions in the updateRun.
defer emitUpdateRunStatusMetric(&updateRun)

var updatingStageIndex int
var toBeUpdatedBindings, toBeDeletedBindings []*placementv1beta1.ClusterResourceBinding
var err error
Expand Down Expand Up @@ -158,13 +164,17 @@ func (r *Reconciler) Reconcile(ctx context.Context, req runtime.Request) (runtim
// We delete all the dependent resources, including approvalRequest objects, of the clusterStagedUpdateRun object.
func (r *Reconciler) handleDelete(ctx context.Context, updateRun *placementv1beta1.ClusterStagedUpdateRun) (bool, time.Duration, error) {
runObjRef := klog.KObj(updateRun)
// delete all the associated approvalRequests.
// Delete all the associated approvalRequests.
approvalRequest := &placementv1beta1.ClusterApprovalRequest{}
if err := r.Client.DeleteAllOf(ctx, approvalRequest, client.MatchingLabels{placementv1beta1.TargetUpdateRunLabel: updateRun.GetName()}); err != nil {
klog.ErrorS(err, "Failed to delete all associated approvalRequests", "clusterStagedUpdateRun", runObjRef)
return false, 0, controller.NewAPIServerError(false, err)
}
klog.V(2).InfoS("Deleted all approvalRequests associated with the clusterStagedUpdateRun", "clusterStagedUpdateRun", runObjRef)

// Delete the update run status metric.
metrics.FleetUpdateRunStatusLastTimestampSeconds.DeletePartialMatch(prometheus.Labels{"name": updateRun.GetName()})

controllerutil.RemoveFinalizer(updateRun, placementv1beta1.ClusterStagedUpdateRunFinalizer)
if err := r.Client.Update(ctx, updateRun); err != nil {
klog.ErrorS(err, "Failed to remove updateRun finalizer", "clusterStagedUpdateRun", runObjRef)
Expand Down Expand Up @@ -290,3 +300,33 @@ func handleClusterApprovalRequest(oldObj, newObj client.Object, q workqueue.Type
NamespacedName: types.NamespacedName{Name: updateRun},
})
}

// emitUpdateRunStatusMetric emits the update run status metric based on status conditions in the updateRun.
func emitUpdateRunStatusMetric(updateRun *placementv1beta1.ClusterStagedUpdateRun) {
generation := updateRun.Generation
genStr := strconv.FormatInt(generation, 10)

succeedCond := meta.FindStatusCondition(updateRun.Status.Conditions, string(placementv1beta1.StagedUpdateRunConditionSucceeded))
if condition.IsConditionStatusTrue(succeedCond, generation) || condition.IsConditionStatusFalse(succeedCond, generation) {
metrics.FleetUpdateRunStatusLastTimestampSeconds.WithLabelValues(updateRun.Name, genStr,
string(placementv1beta1.StagedUpdateRunConditionSucceeded), string(succeedCond.Status), succeedCond.Reason).SetToCurrentTime()
return
}

progressingCond := meta.FindStatusCondition(updateRun.Status.Conditions, string(placementv1beta1.StagedUpdateRunConditionProgressing))
if condition.IsConditionStatusTrue(progressingCond, generation) || condition.IsConditionStatusFalse(progressingCond, generation) {
metrics.FleetUpdateRunStatusLastTimestampSeconds.WithLabelValues(updateRun.Name, genStr,
string(placementv1beta1.StagedUpdateRunConditionProgressing), string(progressingCond.Status), progressingCond.Reason).SetToCurrentTime()
return
}

initializedCond := meta.FindStatusCondition(updateRun.Status.Conditions, string(placementv1beta1.StagedUpdateRunConditionInitialized))
if condition.IsConditionStatusTrue(initializedCond, generation) || condition.IsConditionStatusFalse(initializedCond, generation) {
metrics.FleetUpdateRunStatusLastTimestampSeconds.WithLabelValues(updateRun.Name, genStr,
string(placementv1beta1.StagedUpdateRunConditionInitialized), string(initializedCond.Status), initializedCond.Reason).SetToCurrentTime()
return
}

// We should rarely reach here, it can only happen when updating updateRun status fails.
klog.V(2).InfoS("There's no valid status condition on updateRun, status updating failed possibly", "updateRun", klog.KObj(updateRun))
}
126 changes: 126 additions & 0 deletions pkg/controllers/updaterun/controller_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,20 @@ import (
"strconv"
"time"

"github.com/google/go-cmp/cmp"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

"github.com/prometheus/client_golang/prometheus"
prometheusclientmodel "github.com/prometheus/client_model/go"
rbacv1 "k8s.io/api/rbac/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

Expand All @@ -29,6 +33,8 @@ import (
placementv1beta1 "go.goms.io/fleet/apis/placement/v1beta1"
"go.goms.io/fleet/pkg/utils"
"go.goms.io/fleet/pkg/utils/condition"
"go.goms.io/fleet/pkg/utils/controller/metrics"
metricsutils "go.goms.io/fleet/pkg/utils/metrics"
)

const (
Expand Down Expand Up @@ -58,6 +64,7 @@ var (
testCROName string
updateRunNamespacedName types.NamespacedName
testNamespace []byte
customRegistry *prometheus.Registry
)

var _ = Describe("Test the clusterStagedUpdateRun controller", func() {
Expand All @@ -69,6 +76,15 @@ var _ = Describe("Test the clusterStagedUpdateRun controller", func() {
testUpdateStrategyName = "updatestrategy-" + utils.RandStr()
testCROName = "cro-" + utils.RandStr()
updateRunNamespacedName = types.NamespacedName{Name: testUpdateRunName}

customRegistry = initializeUpdateRunMetricsRegistry()
})

AfterEach(func() {
By("Checking the update run status metrics are removed")
// No metrics are emitted as all are removed after updateRun is deleted.
validateUpdateRunMetricsEmitted(customRegistry)
unregisterUpdateRunMetrics(customRegistry)
})

Context("Test reconciling a clusterStagedUpdateRun", func() {
Expand Down Expand Up @@ -212,6 +228,114 @@ var _ = Describe("Test the clusterStagedUpdateRun controller", func() {
})
})

func initializeUpdateRunMetricsRegistry() *prometheus.Registry {
// Create a test registry
customRegistry := prometheus.NewRegistry()
Expect(customRegistry.Register(metrics.FleetUpdateRunStatusLastTimestampSeconds)).Should(Succeed())
// Reset metrics before each test
metrics.FleetUpdateRunStatusLastTimestampSeconds.Reset()
return customRegistry
}

func unregisterUpdateRunMetrics(registry *prometheus.Registry) {
Expect(registry.Unregister(metrics.FleetUpdateRunStatusLastTimestampSeconds)).Should(BeTrue())
}

// validateUpdateRunMetricsEmitted validates the update run status metrics are emitted and are emitted in the correct order.
func validateUpdateRunMetricsEmitted(registry *prometheus.Registry, wantMetrics ...*prometheusclientmodel.Metric) {
Eventually(func() error {
metricFamilies, err := registry.Gather()
if err != nil {
return fmt.Errorf("failed to gather metrics: %w", err)
}
var gotMetrics []*prometheusclientmodel.Metric
for _, mf := range metricFamilies {
if mf.GetName() == "fleet_workload_update_run_status_last_timestamp_seconds" {
gotMetrics = mf.GetMetric()
}
}

if diff := cmp.Diff(gotMetrics, wantMetrics, metricsutils.MetricsCmpOptions...); diff != "" {
return fmt.Errorf("update run status metrics mismatch (-got, +want):\n%s", diff)
}

return nil
}, timeout, interval).Should(Succeed(), "failed to validate the update run status metrics")
}

func generateMetricsLabels(
updateRun *placementv1beta1.ClusterStagedUpdateRun,
condition, status, reason string,
) []*prometheusclientmodel.LabelPair {
return []*prometheusclientmodel.LabelPair{
{Name: ptr.To("name"), Value: &updateRun.Name},
{Name: ptr.To("generation"), Value: ptr.To(strconv.FormatInt(updateRun.Generation, 10))},
{Name: ptr.To("condition"), Value: ptr.To(condition)},
{Name: ptr.To("status"), Value: ptr.To(status)},
{Name: ptr.To("reason"), Value: ptr.To(reason)},
}
}

func generateInitializationFailedMetric(updateRun *placementv1beta1.ClusterStagedUpdateRun) *prometheusclientmodel.Metric {
return &prometheusclientmodel.Metric{
Label: generateMetricsLabels(updateRun, string(placementv1beta1.StagedUpdateRunConditionInitialized),
string(metav1.ConditionFalse), condition.UpdateRunInitializeFailedReason),
Gauge: &prometheusclientmodel.Gauge{
Value: ptr.To(float64(time.Now().UnixNano()) / 1e9),
},
}
}

func generateProgressingMetric(updateRun *placementv1beta1.ClusterStagedUpdateRun) *prometheusclientmodel.Metric {
return &prometheusclientmodel.Metric{
Label: generateMetricsLabels(updateRun, string(placementv1beta1.StagedUpdateRunConditionProgressing),
string(metav1.ConditionTrue), condition.UpdateRunStartedReason),
Gauge: &prometheusclientmodel.Gauge{
Value: ptr.To(float64(time.Now().UnixNano()) / 1e9),
},
}
}

func generateWaitingMetric(updateRun *placementv1beta1.ClusterStagedUpdateRun) *prometheusclientmodel.Metric {
return &prometheusclientmodel.Metric{
Label: generateMetricsLabels(updateRun, string(placementv1beta1.StagedUpdateRunConditionProgressing),
string(metav1.ConditionFalse), condition.UpdateRunWaitingReason),
Gauge: &prometheusclientmodel.Gauge{
Value: ptr.To(float64(time.Now().UnixNano()) / 1e9),
},
}
}

func generateStuckMetric(updateRun *placementv1beta1.ClusterStagedUpdateRun) *prometheusclientmodel.Metric {
return &prometheusclientmodel.Metric{
Label: generateMetricsLabels(updateRun, string(placementv1beta1.StagedUpdateRunConditionProgressing),
string(metav1.ConditionFalse), condition.UpdateRunStuckReason),
Gauge: &prometheusclientmodel.Gauge{
Value: ptr.To(float64(time.Now().UnixNano()) / 1e9),
},
}
}

func generateFailedMetric(updateRun *placementv1beta1.ClusterStagedUpdateRun) *prometheusclientmodel.Metric {
return &prometheusclientmodel.Metric{
Label: generateMetricsLabels(updateRun, string(placementv1beta1.StagedUpdateRunConditionSucceeded),
string(metav1.ConditionFalse), condition.UpdateRunFailedReason),
Gauge: &prometheusclientmodel.Gauge{
Value: ptr.To(float64(time.Now().UnixNano()) / 1e9),
},
}
}

func generateSucceededMetric(updateRun *placementv1beta1.ClusterStagedUpdateRun) *prometheusclientmodel.Metric {
return &prometheusclientmodel.Metric{
Label: generateMetricsLabels(updateRun, string(placementv1beta1.StagedUpdateRunConditionSucceeded),
string(metav1.ConditionTrue), condition.UpdateRunSucceededReason),
Gauge: &prometheusclientmodel.Gauge{
Value: ptr.To(float64(time.Now().UnixNano()) / 1e9),
},
}
}

func generateTestClusterStagedUpdateRun() *placementv1beta1.ClusterStagedUpdateRun {
return &placementv1beta1.ClusterStagedUpdateRun{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -553,6 +677,8 @@ func generateFalseCondition(obj client.Object, condType any) metav1.Condition {
reason = condition.UpdateRunInitializeFailedReason
case placementv1beta1.StagedUpdateRunConditionSucceeded:
reason = condition.UpdateRunFailedReason
case placementv1beta1.StagedUpdateRunConditionProgressing:
reason = condition.UpdateRunWaitingReason
}
typeStr = string(cond)
case placementv1beta1.StageUpdatingConditionType:
Expand Down
36 changes: 36 additions & 0 deletions pkg/controllers/updaterun/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ var (
// stageUpdatingWaitTime is the time to wait before rechecking the stage update status.
// Put it as a variable for convenient testing.
stageUpdatingWaitTime = 60 * time.Second

// updateRunStuckThreshold is the time to wait on a single cluster update before marking update run as stuck.
updateRunStuckThreshold = 60 * time.Second
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems to be a bit too aggressive, a normal deployment with many replicas can take a few mins to get ready

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me set it as 5 minutes for now. It will not cause updateRun to stop. Just mark it in the status so users can investigate CRP and see if there's issue. If issue is fixed or recovered automatically, updateRun can continue and the status is removed. We also add metrics to track so that we could have a better understanding and treak it.

)

// execute executes the update run by updating the clusters in the updating stage specified by updatingStageIndex.
Expand Down Expand Up @@ -161,17 +164,26 @@ func (r *Reconciler) executeUpdatingStage(
markClusterUpdatingFailed(clusterStatus, updateRun.Generation, unexpectedErr.Error())
return 0, fmt.Errorf("%w: %s", errStagedUpdatedAborted, unexpectedErr.Error())
}

finished, updateErr := checkClusterUpdateResult(binding, clusterStatus, updatingStageStatus, updateRun)
if finished {
finishedClusterCount++
continue
} else {
// If cluster update has been running for more than 1 minute, mark the update run as stuck.
timeElapsed := time.Since(clusterStartedCond.LastTransitionTime.Time)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checkClusterUpdateResult wait until all the resources become available which depends on the "timeToReady" setting by the user. Therefore, 'updateRunStuckThreshold' has to be the greater of 60 seconds or that number

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realized one thing: "timeToReady" is a rolling update specific. It's under RollingUpdateConfig and cannot be set when rolling type is set to External. The wait is done in the rollingUpdate controller, not in the workApplier. For untrackable resources, workApplier simply sets it as Available with reason set to "Availability not trackable".

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, does it mean that we need to add this to the update run API too or create some wait between clusters when there are untrackable resources?

Copy link
Contributor

@zhiying-lin zhiying-lin Apr 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i notice this problem when reviewing the code. Should we keep the default value as 1 min to keep it consistent with the rollingUpdate config? later, we can support the customized value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, looks like we need to do similar thing in updateRun. That will be a new feature.

if timeElapsed > updateRunStuckThreshold {
klog.V(2).InfoS("Time waiting for cluster update to finish passes threshold, mark the update run as stuck", "time elapsed", timeElapsed, "threshold", updateRunStuckThreshold, "cluster", clusterStatus.ClusterName, "stage", updatingStageStatus.StageName, "clusterStagedUpdateRun", updateRunRef)
markUpdateRunStuck(updateRun, updatingStageStatus.StageName, clusterStatus.ClusterName)
}
}
// No need to continue as we only support one cluster updating at a time for now.
return clusterUpdatingWaitTime, updateErr
}

if finishedClusterCount == len(updatingStageStatus.Clusters) {
// All the clusters in the stage have been updated.
markUpdateRunWaiting(updateRun, updatingStageStatus.StageName)
markStageUpdatingWaiting(updatingStageStatus, updateRun.Generation)
klog.V(2).InfoS("The stage has finished all cluster updating", "stage", updatingStageStatus.StageName, "clusterStagedUpdateRun", updateRunRef)
// Check if the after stage tasks are ready.
Expand All @@ -180,6 +192,8 @@ func (r *Reconciler) executeUpdatingStage(
return 0, err
}
if approved {
// Mark updateRun as progressing again.
markUpdateRunStarted(updateRun)
markStageUpdatingSucceeded(updatingStageStatus, updateRun.Generation)
// No need to wait to get to the next stage.
return 0, nil
Expand Down Expand Up @@ -448,6 +462,28 @@ func markUpdateRunStarted(updateRun *placementv1beta1.ClusterStagedUpdateRun) {
})
}

// markUpdateRunStuck marks the updateRun as stuck in memory.
func markUpdateRunStuck(updateRun *placementv1beta1.ClusterStagedUpdateRun, stageName, clusterName string) {
meta.SetStatusCondition(&updateRun.Status.Conditions, metav1.Condition{
Type: string(placementv1beta1.StagedUpdateRunConditionProgressing),
Status: metav1.ConditionFalse,
ObservedGeneration: updateRun.Generation,
Reason: condition.UpdateRunStuckReason,
Message: fmt.Sprintf("The updateRun is stuck waiting for cluster %s in stage %s to finish updating", clusterName, stageName),
})
}

// markUpdateRunWaiting marks the updateRun as waiting in memory.
func markUpdateRunWaiting(updateRun *placementv1beta1.ClusterStagedUpdateRun, stageName string) {
meta.SetStatusCondition(&updateRun.Status.Conditions, metav1.Condition{
Type: string(placementv1beta1.StagedUpdateRunConditionProgressing),
Status: metav1.ConditionFalse,
ObservedGeneration: updateRun.Generation,
Reason: condition.UpdateRunWaitingReason,
Message: fmt.Sprintf("The updateRun is waiting for after-stage tasks instage %s to complete", stageName),
})
}

// markStageUpdatingStarted marks the stage updating status as started in memory.
func markStageUpdatingStarted(stageUpdatingStatus *placementv1beta1.StageUpdatingStatus, generation int64) {
if stageUpdatingStatus.StartTime == nil {
Expand Down
Loading
Loading