Skip to content

Commit 75d0be7

Browse files
authored
feat: add status back-reporting supprt (2/, new controller) (#329)
* Added status back-reporting controller Signed-off-by: michaelawyu <[email protected]> * Minor fixes Signed-off-by: michaelawyu <[email protected]> * Minor fixes Signed-off-by: michaelawyu <[email protected]> --------- Signed-off-by: michaelawyu <[email protected]>
1 parent f81a13d commit 75d0be7

File tree

9 files changed

+1747
-88
lines changed

9 files changed

+1747
-88
lines changed
Lines changed: 291 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,291 @@
1+
/*
2+
Copyright 2025 The KubeFleet Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package statusbackreporter
18+
19+
import (
20+
"context"
21+
"encoding/json"
22+
"fmt"
23+
"time"
24+
25+
"k8s.io/apimachinery/pkg/api/meta"
26+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27+
"k8s.io/apimachinery/pkg/runtime/schema"
28+
errorsutil "k8s.io/apimachinery/pkg/util/errors"
29+
"k8s.io/client-go/dynamic"
30+
"k8s.io/klog/v2"
31+
ctrl "sigs.k8s.io/controller-runtime"
32+
"sigs.k8s.io/controller-runtime/pkg/client"
33+
"sigs.k8s.io/controller-runtime/pkg/handler"
34+
35+
placementv1beta1 "github.com/kubefleet-dev/kubefleet/apis/placement/v1beta1"
36+
"github.com/kubefleet-dev/kubefleet/pkg/utils/controller"
37+
parallelizerutil "github.com/kubefleet-dev/kubefleet/pkg/utils/parallelizer"
38+
)
39+
40+
// Reconciler reconciles a Work object (specifically its status) to back-report
41+
// statuses to their corresponding original resources in the hub cluster.
42+
type Reconciler struct {
43+
hubClient client.Client
44+
hubDynamicClient dynamic.Interface
45+
46+
parallelizer parallelizerutil.Parallelizer
47+
}
48+
49+
// NewReconciler creates a new Reconciler.
50+
func NewReconciler(hubClient client.Client, hubDynamicClient dynamic.Interface, parallelizer parallelizerutil.Parallelizer) *Reconciler {
51+
if parallelizer == nil {
52+
klog.V(2).InfoS("parallelizer is not set; using the default parallelizer with a worker count of 1")
53+
parallelizer = parallelizerutil.NewParallelizer(1)
54+
}
55+
56+
return &Reconciler{
57+
hubClient: hubClient,
58+
hubDynamicClient: hubDynamicClient,
59+
parallelizer: parallelizer,
60+
}
61+
}
62+
63+
// Reconcile reconciles the Work object to back-report statuses to their corresponding
64+
// original resources in the hub cluster.
65+
func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
66+
workRef := klog.KRef(req.Namespace, req.Name)
67+
startTime := time.Now()
68+
klog.V(2).InfoS("Reconciliation loop starts", "controller", "statusBackReporter", "work", workRef)
69+
defer func() {
70+
latency := time.Since(startTime).Milliseconds()
71+
klog.V(2).InfoS("Reconciliation loop ends", "controller", "statusBackReporter", "work", workRef, "latency", latency)
72+
}()
73+
74+
work := &placementv1beta1.Work{}
75+
if err := r.hubClient.Get(ctx, req.NamespacedName, work); err != nil {
76+
klog.ErrorS(err, "Failed to retrieve Work object", "work", workRef)
77+
return ctrl.Result{}, client.IgnoreNotFound(err)
78+
}
79+
80+
// Perform a sanity check; make sure that mirroring back to original resources can be done, i.e.,
81+
// the scheduling policy is set to the PickFixed type with exactly one target cluster, or the PickN
82+
// type with the number of clusters set to 1. The logic also checks if the report back strategy still
83+
// allows status back-reporting.
84+
placementObj, shouldSkip, err := r.validatePlacementObjectForOriginalResourceStatusBackReporting(ctx, work)
85+
if err != nil {
86+
klog.ErrorS(err, "Failed to validate the placement object associated with the Work object for back-reporting statuses to original resources", "work", workRef)
87+
return ctrl.Result{}, err
88+
}
89+
if shouldSkip {
90+
klog.V(2).InfoS("Skip status back-reporting to original resources as the report-back strategy on the placement object forbids so", "work", workRef, "placement", klog.KObj(placementObj))
91+
return ctrl.Result{}, nil
92+
}
93+
94+
// Prepare a map for quick lookup of whether a resource is enveloped.
95+
isResEnvelopedByIdStr := prepareIsResEnvelopedMap(placementObj)
96+
97+
// Back-report statuses to original resources.
98+
99+
// Prepare a child context.
100+
// Cancel the child context anyway to avoid leaks.
101+
childCtx, cancel := context.WithCancel(ctx)
102+
defer cancel()
103+
errs := make([]error, len(work.Status.ManifestConditions))
104+
doWork := func(pieces int) {
105+
manifestCond := &work.Status.ManifestConditions[pieces]
106+
resIdentifier := manifestCond.Identifier
107+
108+
applyCond := meta.FindStatusCondition(work.Status.Conditions, placementv1beta1.WorkConditionTypeApplied)
109+
if applyCond == nil || applyCond.ObservedGeneration != work.Generation || applyCond.Status != metav1.ConditionTrue {
110+
// The resource has not been successfully applied yet. Skip back-reporting.
111+
klog.V(2).InfoS("Skip status back-reporting for the resource; the resource has not been successfully applied yet", "work", workRef, "resourceIdentifier", resIdentifier)
112+
return
113+
}
114+
115+
// Skip the resource if there is no back-reported status.
116+
if manifestCond.BackReportedStatus == nil || len(manifestCond.BackReportedStatus.ObservedStatus.Raw) == 0 {
117+
klog.V(2).InfoS("Skip status back-reporting for the resource; there is no back-reported status", "work", workRef, "resourceIdentifier", resIdentifier)
118+
return
119+
}
120+
121+
// Skip the resource if it is enveloped.
122+
idStr := formatWorkResourceIdentifier(&resIdentifier)
123+
isEnveloped, ok := isResEnvelopedByIdStr[idStr]
124+
if !ok {
125+
// The resource is not found in the list of selected resources as reported by the status of the placement object.
126+
//
127+
// This is not considered as an error as the resource might be absent due to consistency reasons (i.e., it has
128+
// just been de-selected); the status back-reporter will skip the resource for now.
129+
klog.V(2).InfoS("Skip status back-reporting for the resource; the resource is not found in the list of selected resources in the placement object", "work", workRef, "resourceIdentifier", resIdentifier)
130+
return
131+
}
132+
if isEnveloped {
133+
// The resource is enveloped; skip back-reporting.
134+
klog.V(2).InfoS("Skip status back-reporting for the resource; the resource is enveloped", "work", workRef, "resourceIdentifier", resIdentifier)
135+
return
136+
}
137+
138+
// Note that applied resources should always have a valid identifier set; for simplicity reasons
139+
// here the back-reporter will no longer perform any validation.
140+
gvr := schema.GroupVersionResource{
141+
Group: resIdentifier.Group,
142+
Version: resIdentifier.Version,
143+
Resource: resIdentifier.Resource,
144+
}
145+
nsName := resIdentifier.Namespace
146+
resName := resIdentifier.Name
147+
unstructured, err := r.hubDynamicClient.Resource(gvr).Namespace(nsName).Get(ctx, resName, metav1.GetOptions{})
148+
if err != nil {
149+
wrappedErr := fmt.Errorf("failed to retrieve the target resource for status back-reporting: %w", err)
150+
klog.ErrorS(err, "Failed to retrieve the target resource for status back-reporting", "work", workRef, "resourceIdentifier", resIdentifier)
151+
errs[pieces] = wrappedErr
152+
return
153+
}
154+
155+
// Set the back-reported status to the target resource.
156+
statusWrapper := make(map[string]interface{})
157+
if err := json.Unmarshal(manifestCond.BackReportedStatus.ObservedStatus.Raw, &statusWrapper); err != nil {
158+
wrappedErr := fmt.Errorf("failed to unmarshal back-reported status: %w", err)
159+
klog.ErrorS(err, "Failed to unmarshal back-reported status", "work", workRef, "resourceIdentifier", resIdentifier)
160+
errs[pieces] = wrappedErr
161+
return
162+
}
163+
164+
// Note that if the applied resource has a status sub-resource, it is usually safe for us to assume that
165+
// the original resource should also have a status sub-resource of the same format.
166+
unstructured.Object["status"] = statusWrapper["status"]
167+
_, err = r.hubDynamicClient.Resource(gvr).Namespace(nsName).UpdateStatus(ctx, unstructured, metav1.UpdateOptions{})
168+
if err != nil {
169+
// TO-DO (chenyu1): check for cases where the API definition is inconsistent between the member cluster
170+
// side and the hub cluster side, and single out the errors as user errors instead.
171+
wrappedErr := fmt.Errorf("failed to update status to the target resource: %w", err)
172+
klog.ErrorS(err, "Failed to update status to the target resource", "work", workRef, "resourceIdentifier", resIdentifier)
173+
errs[pieces] = wrappedErr
174+
return
175+
}
176+
}
177+
r.parallelizer.ParallelizeUntil(childCtx, len(work.Status.ManifestConditions), doWork, "backReportStatusToOriginalResources")
178+
return ctrl.Result{}, errorsutil.NewAggregate(errs)
179+
}
180+
181+
// validatePlacementObjectForOriginalResourceStatusBackReporting validates whether
182+
// the placement object associated with the given Work object is eligible for back-reporting
183+
// statuses to original resources.
184+
func (r *Reconciler) validatePlacementObjectForOriginalResourceStatusBackReporting(
185+
ctx context.Context, work *placementv1beta1.Work) (placementv1beta1.PlacementObj, bool, error) {
186+
// Read the `kubernetes-fleet.io/parent-CRP` label to retrieve the CRP/RP name.
187+
parentPlacementName, ok := work.Labels[placementv1beta1.PlacementTrackingLabel]
188+
if !ok || len(parentPlacementName) == 0 {
189+
// Normally this should never occur.
190+
wrappedErr := fmt.Errorf("the placement tracking label is absent or invalid (label value: %s)", parentPlacementName)
191+
return nil, false, controller.NewUnexpectedBehaviorError(wrappedErr)
192+
}
193+
194+
// Read the `kubernetes-fleet.io/parent-namespace` label to retrieve the RP namespace (if any).
195+
parentPlacementNSName := work.Labels[placementv1beta1.ParentNamespaceLabel]
196+
197+
var placementObj placementv1beta1.PlacementObj
198+
if len(parentPlacementNSName) == 0 {
199+
// Retrieve the CRP object.
200+
placementObj = &placementv1beta1.ClusterResourcePlacement{}
201+
if err := r.hubClient.Get(ctx, client.ObjectKey{Name: parentPlacementName}, placementObj); err != nil {
202+
wrappedErr := fmt.Errorf("failed to retrieve CRP object: %w", err)
203+
return nil, false, controller.NewAPIServerError(true, wrappedErr)
204+
}
205+
} else {
206+
// Retrieve the RP object.
207+
placementObj = &placementv1beta1.ResourcePlacement{}
208+
if err := r.hubClient.Get(ctx, client.ObjectKey{Namespace: parentPlacementNSName, Name: parentPlacementName}, placementObj); err != nil {
209+
wrappedErr := fmt.Errorf("failed to retrieve RP object: %w", err)
210+
return nil, false, controller.NewAPIServerError(true, wrappedErr)
211+
}
212+
}
213+
214+
// Validate the scheduling policy of the placement object.
215+
schedulingPolicy := placementObj.GetPlacementSpec().Policy
216+
switch {
217+
case schedulingPolicy == nil:
218+
// The system uses a default scheduling policy of the PickAll placement type. Reject status back-reporting.
219+
wrappedErr := fmt.Errorf("no scheduling policy specified (the PickAll type is in use); cannot back-report status to original resources")
220+
return nil, false, controller.NewUserError(wrappedErr)
221+
case schedulingPolicy.PlacementType == placementv1beta1.PickAllPlacementType:
222+
wrappedErr := fmt.Errorf("the scheduling policy in use is of the PickAll type; cannot back-report status to original resources")
223+
return nil, false, controller.NewUserError(wrappedErr)
224+
case schedulingPolicy.PlacementType == placementv1beta1.PickFixedPlacementType && len(schedulingPolicy.ClusterNames) != 1:
225+
wrappedErr := fmt.Errorf("the scheduling policy in use is of the PickFixed type, but it has more than one target cluster (%d clusters); cannot back-report status to original resources", len(schedulingPolicy.ClusterNames))
226+
return nil, false, controller.NewUserError(wrappedErr)
227+
case schedulingPolicy.PlacementType == placementv1beta1.PickNPlacementType && schedulingPolicy.NumberOfClusters == nil:
228+
// Normally this should never occur.
229+
wrappedErr := fmt.Errorf("the scheduling policy in use is of the PickN type, but no number of target clusters is specified; cannot back-report status to original resources")
230+
return nil, false, controller.NewUserError(wrappedErr)
231+
case schedulingPolicy.PlacementType == placementv1beta1.PickNPlacementType && *schedulingPolicy.NumberOfClusters != 1:
232+
wrappedErr := fmt.Errorf("the scheduling policy in use is of the PickN type, but the number of target clusters is not set to 1; cannot back-report status to original resources")
233+
return nil, false, controller.NewUserError(wrappedErr)
234+
}
235+
236+
// Check if the report back strategy on the placement object still allows status back-reporting to the original resources.
237+
reportBackStrategy := placementObj.GetPlacementSpec().Strategy.ReportBackStrategy
238+
switch {
239+
case reportBackStrategy == nil:
240+
klog.V(2).InfoS("Skip status back-reporting; the strategy has not been set", "placement", klog.KObj(placementObj))
241+
return placementObj, true, nil
242+
case reportBackStrategy.Type != placementv1beta1.ReportBackStrategyTypeMirror:
243+
klog.V(2).InfoS("Skip status back-reporting; it has been disabled in the strategy", "placement", klog.KObj(placementObj))
244+
return placementObj, true, nil
245+
case reportBackStrategy.Destination == nil:
246+
// This in theory should never occur; CEL based validation should have rejected such strategies.
247+
klog.V(2).InfoS("Skip status back-reporting; destination has not been set in the strategy", "placement", klog.KObj(placementObj))
248+
return placementObj, true, nil
249+
case *reportBackStrategy.Destination != placementv1beta1.ReportBackDestinationOriginalResource:
250+
klog.V(2).InfoS("Skip status back-reporting; destination has been set to the Work API", "placement", klog.KObj(placementObj))
251+
return placementObj, true, nil
252+
}
253+
254+
// The scheduling policy is valid for back-reporting statuses to original resources.
255+
return placementObj, false, nil
256+
}
257+
258+
// formatResourceIdentifier formats a ResourceIdentifier object to a string for keying purposes.
259+
//
260+
// The format in use is `[API-GROUP]/[API-VERSION]/[API-KIND]/[NAMESPACE]/[NAME]`, e.g., `/v1/Namespace//work`.
261+
func formatResourceIdentifier(resourceIdentifier *placementv1beta1.ResourceIdentifier) string {
262+
return fmt.Sprintf("%s/%s/%s/%s/%s", resourceIdentifier.Group, resourceIdentifier.Version, resourceIdentifier.Kind, resourceIdentifier.Namespace, resourceIdentifier.Name)
263+
}
264+
265+
// formatWorkResourceIdentifier formats a WorkResourceIdentifier object to a string for keying purposes.
266+
//
267+
// The format in use is `[API-GROUP]/[API-VERSION]/[API-KIND]/[NAMESPACE]/[NAME]`, e.g., `/v1/Namespace//work`.
268+
func formatWorkResourceIdentifier(workResourceIdentifier *placementv1beta1.WorkResourceIdentifier) string {
269+
return fmt.Sprintf("%s/%s/%s/%s/%s", workResourceIdentifier.Group, workResourceIdentifier.Version, workResourceIdentifier.Kind, workResourceIdentifier.Namespace, workResourceIdentifier.Name)
270+
}
271+
272+
// prepareIsResEnvelopedMap prepares a map for quick lookup of whether a resource is enveloped.
273+
func prepareIsResEnvelopedMap(placementObj placementv1beta1.PlacementObj) map[string]bool {
274+
isResEnvelopedByIdStr := make(map[string]bool)
275+
276+
selectedResources := placementObj.GetPlacementStatus().SelectedResources
277+
for idx := range selectedResources {
278+
selectedRes := selectedResources[idx]
279+
idStr := formatResourceIdentifier(&selectedRes)
280+
isResEnvelopedByIdStr[idStr] = selectedRes.Envelope != nil
281+
}
282+
283+
return isResEnvelopedByIdStr
284+
}
285+
286+
func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error {
287+
return ctrl.NewControllerManagedBy(mgr).
288+
Named("status-back-reporter").
289+
Watches(&placementv1beta1.Work{}, &handler.EnqueueRequestForObject{}).
290+
Complete(r)
291+
}

0 commit comments

Comments
 (0)