Skip to content

Commit 7099268

Browse files
committed
address comments
Signed-off-by: Zhiying Lin <[email protected]>
1 parent 5b1b94e commit 7099268

File tree

19 files changed

+609
-119
lines changed

19 files changed

+609
-119
lines changed

.github/workflows/ci.yml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,10 @@ jobs:
9494
- customized-settings: default
9595
# to shorten the test duration, set the resource snapshot creation interval to 0
9696
resource-snapshot-creation-interval: 0m
97+
resource-changes-collection-duration: 0m
9798
- customized-settings: custom
98-
resource-snapshot-creation-interval: 1m
99+
resource-snapshot-creation-interval: 30s
100+
resource-changes-collection-duration: 30s
99101
runs-on: ubuntu-latest
100102
needs: [
101103
detect-noop,
@@ -144,4 +146,5 @@ jobs:
144146
# property provider once the AKS one is split out.
145147
PROPERTY_PROVIDER: 'azure'
146148
RESOURCE_SNAPSHOT_CREATION_INTERVAL: ${{ matrix.resource-snapshot-creation-interval }}
149+
RESOURCE_CHANGES_COLLECTION_DURATION: ${{ matrix.resource-changes-collection-duration }}
147150

apis/placement/v1/resourcesnapshot_types.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ import (
4141
// Each snapshot MUST have the following labels:
4242
// - `CRPTrackingLabel` which points to its owner CRP.
4343
// - `ResourceIndexLabel` which is the index of the snapshot group.
44+
//
45+
// The first snapshot of the index group MAY have the following labels:
4446
// - `IsLatestSnapshotLabel` which indicates whether the snapshot is the latest one.
4547
//
4648
// All the snapshots within the same index group must have the same ResourceIndexLabel.

apis/placement/v1beta1/resourcesnapshot_types.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,8 @@ func (c *ClusterResourceSnapshotList) GetResourceSnapshotObjs() []ResourceSnapsh
232232
// Each snapshot MUST have the following labels:
233233
// - `CRPTrackingLabel` which points to its owner resource placement.
234234
// - `ResourceIndexLabel` which is the index of the snapshot group.
235+
//
236+
// The first snapshot of the index group MAY have the following labels:
235237
// - `IsLatestSnapshotLabel` which indicates whether the snapshot is the latest one.
236238
//
237239
// All the snapshots within the same index group must have the same ResourceIndexLabel.

charts/hub-agent/README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,4 +40,5 @@ _See [helm install](https://helm.sh/docs/helm/helm_install/) for command documen
4040
| `ConcurrentResourceChangeSyncs` | Max concurrent resourceChange reconcilers | `20` |
4141
| `logFileMaxSize` | Max log file size before rotation | `1000000` |
4242
| `MaxFleetSizeSupported` | Max number of member clusters supported | `100` |
43-
| `resourceSnapshotCreationInterval` | Interval for resource snapshot creation | `1m` |
43+
| `resourceSnapshotCreationInterval` | Interval for resource snapshot creation | `1m` |
44+
| `resourceChangesCollectionDuration` | Interval for resource snapshot creation | `1m`

charts/hub-agent/templates/deployment.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ spec:
4444
- --force-delete-wait-time={{ .Values.forceDeleteWaitTime }}
4545
- --cluster-unhealthy-threshold={{ .Values.clusterUnhealthyThreshold }}
4646
- --resource-snapshot-creation-interval={{ .Values.resourceSnapshotCreationInterval }}
47+
- --resource-changes-collection-duration={{ .Values.resourceChangesCollectionDuration }}
4748
ports:
4849
- name: metrics
4950
containerPort: 8080

charts/hub-agent/values.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ enableGuardRail: true
1818
webhookClientConnectionType: service
1919
forceDeleteWaitTime: 15m0s
2020
clusterUnhealthyThreshold: 3m0s
21-
resourceSnapshotCreationInterval: 1m
22-
21+
resourceSnapshotCreationInterval: 30s
22+
resourceChangesCollectionDuration: 30s
2323
namespace:
2424
fleet-system
2525

cmd/hubagent/options/options.go

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,8 @@ type Options struct {
106106
DenyModifyMemberClusterLabels bool
107107
// ResourceSnapshotCreationInterval is the interval at which resource snapshots are created.
108108
ResourceSnapshotCreationInterval time.Duration
109+
// ResourceChangesCollectionDuration is the duration for collecting resource changes into one snapshot.
110+
ResourceChangesCollectionDuration time.Duration
109111
}
110112

111113
// NewOptions builds an empty options.
@@ -117,15 +119,16 @@ func NewOptions() *Options {
117119
ResourceNamespace: utils.FleetSystemNamespace,
118120
ResourceName: "136224848560.hub.fleet.azure.com",
119121
},
120-
MaxConcurrentClusterPlacement: 10,
121-
ConcurrentResourceChangeSyncs: 1,
122-
MaxFleetSizeSupported: 100,
123-
EnableV1Alpha1APIs: false,
124-
EnableClusterInventoryAPIs: true,
125-
EnableStagedUpdateRunAPIs: true,
126-
EnablePprof: false,
127-
PprofPort: 6065,
128-
ResourceSnapshotCreationInterval: 1 * time.Minute,
122+
MaxConcurrentClusterPlacement: 10,
123+
ConcurrentResourceChangeSyncs: 1,
124+
MaxFleetSizeSupported: 100,
125+
EnableV1Alpha1APIs: false,
126+
EnableClusterInventoryAPIs: true,
127+
EnableStagedUpdateRunAPIs: true,
128+
EnablePprof: false,
129+
PprofPort: 6065,
130+
ResourceSnapshotCreationInterval: 30 * time.Second,
131+
ResourceChangesCollectionDuration: 30 * time.Second,
129132
}
130133
}
131134

@@ -172,7 +175,8 @@ func (o *Options) AddFlags(flags *flag.FlagSet) {
172175
flags.BoolVar(&o.EnablePprof, "enable-pprof", false, "If set, the pprof profiling is enabled.")
173176
flags.IntVar(&o.PprofPort, "pprof-port", 6065, "The port for pprof profiling.")
174177
flags.BoolVar(&o.DenyModifyMemberClusterLabels, "deny-modify-member-cluster-labels", false, "If set, users not in the system:masters cannot modify member cluster labels.")
175-
flags.DurationVar(&o.ResourceSnapshotCreationInterval, "resource-snapshot-creation-interval", 1*time.Minute, "The interval at which resource snapshots are created.")
176-
178+
flags.DurationVar(&o.ResourceSnapshotCreationInterval, "resource-snapshot-creation-interval", 30*time.Second, "The interval at which resource snapshots could be created.")
179+
flags.DurationVar(&o.ResourceChangesCollectionDuration, "resource-changes-collection-duration", 30*time.Second,
180+
"The duration for collecting resource changes into one snapshot. The default is 30 seconds, which means that the controller will collect resource changes for 30 seconds before creating a resource snapshot.")
177181
o.RateLimiterOpts.AddFlags(flags)
178182
}

cmd/hubagent/workload/setup.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -153,15 +153,16 @@ func SetupControllers(ctx context.Context, wg *sync.WaitGroup, mgr ctrl.Manager,
153153

154154
// Set up a custom controller to reconcile cluster resource placement
155155
crpc := &clusterresourceplacement.Reconciler{
156-
Client: mgr.GetClient(),
157-
Recorder: mgr.GetEventRecorderFor(crpControllerName),
158-
RestMapper: mgr.GetRESTMapper(),
159-
InformerManager: dynamicInformerManager,
160-
ResourceConfig: resourceConfig,
161-
SkippedNamespaces: skippedNamespaces,
162-
Scheme: mgr.GetScheme(),
163-
UncachedReader: mgr.GetAPIReader(),
164-
ResourceSnapshotCreationInterval: opts.ResourceSnapshotCreationInterval,
156+
Client: mgr.GetClient(),
157+
Recorder: mgr.GetEventRecorderFor(crpControllerName),
158+
RestMapper: mgr.GetRESTMapper(),
159+
InformerManager: dynamicInformerManager,
160+
ResourceConfig: resourceConfig,
161+
SkippedNamespaces: skippedNamespaces,
162+
Scheme: mgr.GetScheme(),
163+
UncachedReader: mgr.GetAPIReader(),
164+
ResourceSnapshotCreationInterval: opts.ResourceSnapshotCreationInterval,
165+
ResourceChangesCollectionDuration: opts.ResourceChangesCollectionDuration,
165166
}
166167

167168
rateLimiter := options.DefaultControllerRateLimiter(opts.RateLimiterOpts)

pkg/controllers/clusterresourceplacement/controller.go

Lines changed: 20 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ import (
4545
"github.com/kubefleet-dev/kubefleet/pkg/utils/defaulter"
4646
"github.com/kubefleet-dev/kubefleet/pkg/utils/labels"
4747
"github.com/kubefleet-dev/kubefleet/pkg/utils/resource"
48+
fleettime "github.com/kubefleet-dev/kubefleet/pkg/utils/time"
4849
)
4950

5051
// The max size of an object in k8s is 1.5MB because of ETCD limit https://etcd.io/docs/v3.3/dev-guide/limit/.
@@ -219,16 +220,11 @@ func (r *Reconciler) handleUpdate(ctx context.Context, crp *fleetv1beta1.Cluster
219220
klog.ErrorS(err, "Failed to extract the resource index from the clusterResourceSnapshot", "clusterResourcePlacement", crpKObj, "clusterResourceSnapshot", latestResourceSnapshotKObj)
220221
return ctrl.Result{}, controller.NewUnexpectedBehaviorError(err)
221222
}
222-
selectedResourceIDs, err = controller.CollectResourceIdentifiersFromClusterResourceSnapshot(ctx, r.Client, crp.Name, strconv.Itoa(latestResourceSnapshotIndex))
223+
selectedResourceIDs, err = controller.CollectResourceIdentifiersUsingMasterClusterResourceSnapshot(ctx, r.Client, crp.Name, latestResourceSnapshot, strconv.Itoa(latestResourceSnapshotIndex))
223224
if err != nil {
224225
klog.ErrorS(err, "Failed to collect resource identifiers from the clusterResourceSnapshot", "clusterResourcePlacement", crpKObj, "clusterResourceSnapshot", latestResourceSnapshotKObj)
225226
return ctrl.Result{}, err
226227
}
227-
if selectedResourceIDs == nil {
228-
err := controller.NewUnexpectedBehaviorError(fmt.Errorf("not found clusterResourceSnapshot %q for clusterResourcePlacement %q", latestResourceSnapshot.Name, crp.Name))
229-
klog.ErrorS(err, "Latest resourceSnapshot should not be deleted", "clusterResourcePlacement", crpKObj, "clusterResourceSnapshot", latestResourceSnapshotKObj)
230-
return ctrl.Result{}, err
231-
}
232228
klog.V(2).InfoS("Fetched the selected resources from the lastestResourceSnapshot", "clusterResourcePlacement", crpKObj, "clusterResourceSnapshot", latestResourceSnapshotKObj, "generation", crp.Generation)
233229
}
234230

@@ -590,46 +586,39 @@ func (r *Reconciler) getOrCreateClusterResourceSnapshot(ctx context.Context, crp
590586

591587
// shouldCreateNewResourceSnapshotNow checks whether it is ready to create the new resource snapshot to avoid too frequent creation
592588
// based on the configured ResourceSnapshotCreationInterval.
593-
func (r *Reconciler) shouldCreateNewResourceSnapshotNow(ctx context.Context, latestResourceSnapshot *fleetv1beta1.ClusterResourceSnapshot) (ctrl.Result, error) {
594-
if r.ResourceSnapshotCreationInterval <= 0 {
589+
func (r *Reconciler) shouldCreateNewResourceSnapshotNow(ctx context.Context, latestResourceSnapshot fleetv1beta1.ResourceSnapshotObj) (ctrl.Result, error) {
590+
if r.ResourceSnapshotCreationInterval <= 0 && r.ResourceChangesCollectionDuration <= 0 {
595591
return ctrl.Result{}, nil
596592
}
597593

598-
// We reserve half of the resourceSnapshotCreationInterval to allow the controller to bundle all the resource changes into one snapshot.
599-
// For example, if the interval is 1m, the first resource change will be captured starting from 30s.
600-
// And then the next 30s will be used to capture all the resource changes into one snapshot.
594+
// We respect the ResourceChangesCollectionDuration to allow the controller to bundle all the resource changes into one snapshot.
601595
snapshotKObj := klog.KObj(latestResourceSnapshot)
602596
now := time.Now()
603-
half := r.ResourceSnapshotCreationInterval / 2
604-
if since := now.Sub(latestResourceSnapshot.CreationTimestamp.Time); since < half {
605-
// If the latest resource snapshot is created less than configured the resourceSnapshotCreationInterval,
606-
// requeue the request to avoid too frequent update.
607-
klog.V(2).InfoS("The latest resource snapshot is just created, skipping the update",
608-
"clusterResourceSnapshot", snapshotKObj, "creationTime", latestResourceSnapshot.CreationTimestamp, "configuredResourceSnapshotCreationInterval", r.ResourceSnapshotCreationInterval, "afterDuration", half-since)
609-
return ctrl.Result{Requeue: true, RequeueAfter: half - since}, nil
610-
}
611597
nextResourceSnapshotCandidateDetectionTime, err := annotations.ExtractNextResourceSnapshotCandidateDetectionTimeFromResourceSnapshot(latestResourceSnapshot)
612598
if nextResourceSnapshotCandidateDetectionTime.IsZero() || err != nil {
613599
if err != nil {
614600
klog.ErrorS(controller.NewUnexpectedBehaviorError(err), "Failed to get the NextResourceSnapshotCandidateDetectionTimeAnnotation", "clusterResourceSnapshot", snapshotKObj)
615601
}
616602
// If the annotation is not set, set next resource snapshot candidate detection time is now.
617-
if latestResourceSnapshot.Annotations == nil {
618-
latestResourceSnapshot.Annotations = make(map[string]string)
603+
if latestResourceSnapshot.GetAnnotations() == nil {
604+
latestResourceSnapshot.SetAnnotations(make(map[string]string))
619605
}
620-
latestResourceSnapshot.Annotations[fleetv1beta1.NextResourceSnapshotCandidateDetectionTimeAnnotation] = now.Format(time.RFC3339)
606+
latestResourceSnapshot.GetAnnotations()[fleetv1beta1.NextResourceSnapshotCandidateDetectionTimeAnnotation] = now.Format(time.RFC3339)
621607
if err := r.Client.Update(ctx, latestResourceSnapshot); err != nil {
622-
klog.ErrorS(err, "Failed to update the NextResourceSnapshotCandidateDetectionTimeAnnotation", "clusterResourceSnapshot", snapshotKObj)
608+
klog.ErrorS(err, "Failed to update the NextResourceSnapshotCandidateDetectionTime annotation", "clusterResourceSnapshot", snapshotKObj)
623609
return ctrl.Result{}, controller.NewUpdateIgnoreConflictError(err)
624610
}
625-
klog.V(2).InfoS("Set the next-resource-snapshot-candidate-detection-time to now", "clusterResourceSnapshot", snapshotKObj, "nextResourceSnapshotCandidateDetectionTime", now)
626-
return ctrl.Result{Requeue: true, RequeueAfter: half}, nil
627-
}
628-
if delay := now.Sub(nextResourceSnapshotCandidateDetectionTime); delay < half {
629-
// If the next resource snapshot candidate detection time is not reached, we requeue the request to avoid too frequent update.
630-
klog.V(2).InfoS("The next resource snapshot candidate has not reached the half of the ResourceSnapshotCreationInterval, skipping the creation",
631-
"clusterResourceSnapshot", snapshotKObj, "nextResourceSnapshotCandidateDetectionTime", nextResourceSnapshotCandidateDetectionTime, "configuredResourceSnapshotCreationInterval", r.ResourceSnapshotCreationInterval, "afterDuration", half-delay)
632-
return ctrl.Result{Requeue: true, RequeueAfter: half - delay}, nil
611+
nextResourceSnapshotCandidateDetectionTime = now
612+
klog.V(2).InfoS("Updated the NextResourceSnapshotCandidateDetectionTime annotation", "clusterResourceSnapshot", snapshotKObj, "nextResourceSnapshotCandidateDetectionTimeAnnotation", now.Format(time.RFC3339))
613+
}
614+
nextCreationTime := fleettime.MaxTime(nextResourceSnapshotCandidateDetectionTime.Add(r.ResourceChangesCollectionDuration), latestResourceSnapshot.GetCreationTimestamp().Add(r.ResourceSnapshotCreationInterval))
615+
if now.Before(nextCreationTime) {
616+
// If the next resource snapshot creation time is not reached, we requeue the request to avoid too frequent update.
617+
klog.V(2).InfoS("Delaying the new resourceSnapshot creation",
618+
"clusterResourceSnapshot", snapshotKObj, "nextCreationTime", nextCreationTime, "latestResourceSnapshotCreationTime", latestResourceSnapshot.GetCreationTimestamp(),
619+
"resourceSnapshotCreationInterval", r.ResourceSnapshotCreationInterval, "resourceChangesCollectionDuration", r.ResourceChangesCollectionDuration,
620+
"afterDuration", nextCreationTime.Sub(now))
621+
return ctrl.Result{Requeue: true, RequeueAfter: nextCreationTime.Sub(now)}, nil
633622
}
634623
return ctrl.Result{}, nil
635624
}

0 commit comments

Comments
 (0)