Skip to content

Commit dfc8651

Browse files
committed
feat: refactor clusterresourceplacementwatcher to support rp
Signed-off-by: Zhiying Lin <[email protected]>
1 parent 50594ee commit dfc8651

File tree

6 files changed

+172
-32
lines changed

6 files changed

+172
-32
lines changed

CLAUDE.md

Lines changed: 57 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@ KubeFleet is a CNCF sandbox project that provides multi-cluster application mana
1616
# Build all binaries
1717
make build
1818

19+
# Run specific agent binaries directly
20+
make run-hubagent
21+
make run-memberagent
22+
1923
# Run all tests (unit + integration)
2024
make test
2125

@@ -27,19 +31,28 @@ make integration-test
2731

2832
# Run E2E tests
2933
make e2e-tests
34+
35+
# Run custom E2E tests with labels
36+
make e2e-tests-custom
3037
```
3138

3239
### Code Quality
3340
```bash
3441
# Run linting (required before commits)
3542
make lint
3643

44+
# Run full linting (slower but more thorough)
45+
make lint-full
46+
3747
# Run static analysis
3848
make staticcheck
3949

4050
# Format code
4151
make fmt
4252

53+
# Run go vet
54+
make vet
55+
4356
# Run all quality checks
4457
make reviewable
4558
```
@@ -62,23 +75,42 @@ make setup-clusters
6275
make setup-clusters MEMBER_CLUSTER_COUNT=5
6376
make e2e-tests
6477

78+
# Collect logs after E2E tests
79+
make collect-e2e-logs
80+
6581
# Clean up test clusters
6682
make clean-e2e-tests
6783
```
6884

85+
### Docker and Images
86+
```bash
87+
# Build and push all images
88+
make push
89+
90+
# Build individual images
91+
make docker-build-hub-agent
92+
make docker-build-member-agent
93+
make docker-build-refresh-token
94+
```
95+
6996
## Architecture Overview
7097

7198
### Core API Types
7299
- **ClusterResourcePlacement (CRP)**: Main API for placing resources across clusters with scheduling policies
73100
- **MemberCluster**: Represents a member cluster with identity and heartbeat settings
74101
- **ClusterResourceBinding**: Represents scheduling decisions binding resources to clusters
75102
- **Work**: Contains manifests to be applied on member clusters
103+
- **ClusterResourceSnapshot**: Immutable snapshots of resources to be placed
104+
- **ClusterSchedulingPolicySnapshot**: Immutable snapshots of scheduling policies
76105

77106
### Key Controllers
78107
- **ClusterResourcePlacement Controller** (`pkg/controllers/clusterresourceplacement/`): Manages CRP lifecycle
79108
- **Scheduler** (`pkg/scheduler/`): Makes placement decisions using pluggable framework
80109
- **WorkGenerator** (`pkg/controllers/workgenerator/`): Generates Work objects from bindings
81110
- **WorkApplier** (`pkg/controllers/workapplier/`): Applies Work manifests on member clusters
111+
- **Resource Placement Watchers**: Monitor and react to changes in placement decisions
112+
- **ClusterResourceBinding Watcher** (`pkg/controllers/clusterresourcebindingwatcher/`): Watches binding changes
113+
- **ClusterResourcePlacement Watcher** (`pkg/controllers/clusterresourceplacementwatcher/`): Watches placement changes
82114

83115
### Scheduler Framework
84116
The scheduler uses a pluggable architecture similar to Kubernetes scheduler:
@@ -105,23 +137,39 @@ cmd/hubagent/ # Hub agent main and setup
105137
cmd/memberagent/ # Member agent main and setup
106138
```
107139

108-
## Testing Guidelines
140+
## Development Notes
141+
142+
- Always run `make reviewable` before submitting PRs
143+
- Follow [Uber Go Style Guide](https://github.com/uber-go/guide/blob/master/style.md) when possible
144+
- Favor standard library over third-party libraries
145+
- Controllers should be thoroughly tested with integration tests
146+
- New scheduler plugins should implement both Filter and Score interfaces
147+
- Use existing patterns from similar controllers when adding new functionality
148+
- Property providers should implement the `PropertyProvider` interface
149+
- PR titles must use prefixes: `feat:`, `fix:`, `docs:`, `test:`, `chore:`, `ci:`, `perf:`, `refactor:`, `revert:`
150+
151+
## Testing Patterns
109152

110153
### Unit Tests
111154
- Use `testify` for assertions
112155
- Controllers use `envtest` for integration testing with real etcd
113156
- Mock external dependencies with `gomock`
157+
- Unit test files: `<go_file>_test.go` in same directory
158+
- Table-driven test style preferred
114159

115-
### E2E Tests
116-
- Located in `test/e2e/`
160+
### Integration Tests
161+
- Located in `test/integration/` and `test/scheduler/`
117162
- Use Ginkgo/Gomega framework
118163
- Tests run against real Kind clusters
164+
- Files named: `<go_file>_integration_test.go`
119165
- Separate test suites for different placement strategies
120166

121-
### Integration Tests
122-
- Located in `test/integration/`
167+
### E2E Tests
168+
- Located in `test/e2e/`
169+
- Use Ginkgo/Gomega framework
123170
- Test cross-controller interactions
124171
- Use shared test manifests in `test/integration/manifests/`
172+
- Run with `make e2e-tests` against 3 Kind clusters
125173

126174
## Key Patterns
127175

@@ -146,10 +194,7 @@ All controllers follow standard Kubernetes controller patterns:
146194
- v1beta1 APIs are current stable version
147195
- Feature flags control API version enablement
148196

149-
## Development Notes
150-
151-
- Always run `make reviewable` before submitting PRs
152-
- Controllers should be thoroughly tested with integration tests
153-
- New scheduler plugins should implement both Filter and Score interfaces
154-
- Use existing patterns from similar controllers when adding new functionality
155-
- Property providers should implement the `PropertyProvider` interface
197+
### Watcher Pattern
198+
- Resource placement watchers monitor CRP and binding changes
199+
- Event-driven architecture for responsive placement decisions
200+
- Separate watchers for different resource types to enable focused reconciliation

cmd/hubagent/workload/setup.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ func SetupControllers(ctx context.Context, wg *sync.WaitGroup, mgr ctrl.Manager,
198198
klog.Info("Setting up clusterResourcePlacement watcher")
199199
if err := (&clusterresourceplacementwatcher.Reconciler{
200200
PlacementController: clusterResourcePlacementControllerV1Beta1,
201-
}).SetupWithManager(mgr); err != nil {
201+
}).SetupWithManagerForClusterResourcePlacement(mgr); err != nil {
202202
klog.ErrorS(err, "Unable to set up the clusterResourcePlacement watcher")
203203
return err
204204
}

pkg/controllers/clusterresourceplacement/suite_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,13 +141,13 @@ var _ = BeforeSuite(func() {
141141

142142
err = (&clusterresourceplacementwatcher.Reconciler{
143143
PlacementController: crpController,
144-
}).SetupWithManager(mgr)
144+
}).SetupWithManagerForClusterResourcePlacement(mgr)
145145
Expect(err).Should(Succeed(), "failed to create clusterResourcePlacement watcher")
146146

147147
err = (&clusterresourcebindingwatcher.Reconciler{
148148
Client: mgr.GetClient(),
149149
PlacementController: crpController,
150-
}).SetupWithManager(mgr)
150+
}).SetupWithManagerForClusterResourceBinding(mgr)
151151
Expect(err).Should(Succeed(), "failed to create clusterResourceBinding watcher")
152152

153153
ctx, cancel = context.WithCancel(context.TODO())

pkg/controllers/clusterresourceplacementwatcher/suite_test.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ import (
2323

2424
. "github.com/onsi/ginkgo/v2"
2525
. "github.com/onsi/gomega"
26+
corev1 "k8s.io/api/core/v1"
27+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2628
"k8s.io/client-go/kubernetes/scheme"
2729
"k8s.io/client-go/rest"
2830
"k8s.io/klog/v2"
@@ -79,6 +81,14 @@ var _ = BeforeSuite(func() {
7981
Expect(err).Should(Succeed())
8082
Expect(k8sClient).NotTo(BeNil())
8183

84+
By("creating a test namespace")
85+
var ns = corev1.Namespace{
86+
ObjectMeta: metav1.ObjectMeta{
87+
Name: testNamespace,
88+
},
89+
}
90+
Expect(k8sClient.Create(ctx, &ns)).Should(Succeed(), "failed to create namespace")
91+
8292
By("starting the controller manager")
8393
klog.InitFlags(flag.CommandLine)
8494
flag.Parse()
@@ -96,7 +106,12 @@ var _ = BeforeSuite(func() {
96106

97107
err = (&Reconciler{
98108
PlacementController: fakePlacementController,
99-
}).SetupWithManager(mgr)
109+
}).SetupWithManagerForClusterResourcePlacement(mgr)
110+
Expect(err).Should(Succeed())
111+
112+
err = (&Reconciler{
113+
PlacementController: fakePlacementController,
114+
}).SetupWithManagerForResourcePlacement(mgr)
100115
Expect(err).Should(Succeed())
101116

102117
go func() {

pkg/controllers/clusterresourceplacementwatcher/watcher.go

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
1414
limitations under the License.
1515
*/
1616

17-
// Package clusterresourceplacementwatcher features a controller to watch the clusterResourcePlacement changes.
17+
// Package clusterresourceplacementwatcher features a controller to watch the clusterResourcePlacement and resourcePlacement changes.
1818
package clusterresourceplacementwatcher
1919

2020
import (
@@ -29,30 +29,38 @@ import (
2929
"github.com/kubefleet-dev/kubefleet/pkg/utils/controller"
3030
)
3131

32-
// Reconciler reconciles a clusterResourcePlacement object.
32+
// Reconciler reconciles both ClusterResourcePlacement and ResourcePlacement objects.
3333
type Reconciler struct {
3434
// PlacementController maintains a rate limited queue which used to store
35-
// the name of the clusterResourcePlacement and a reconcile function to consume the items in queue.
35+
// the name of the placement objects and a reconcile function to consume the items in queue.
3636
PlacementController controller.Controller
3737
}
3838

39-
// Reconcile triggers a single CRP reconcile round if CRP has changed.
39+
// Reconcile triggers a single placement reconcile round if placement has changed.
4040
func (r *Reconciler) Reconcile(_ context.Context, req ctrl.Request) (ctrl.Result, error) {
4141
startTime := time.Now()
42-
klog.V(2).InfoS("ClusterResourcePlacementWatcher reconciliation starts", "clusterResourcePlacement", req.Name)
42+
klog.V(2).InfoS("PlacementWatcher reconciliation starts", "placement", req.NamespacedName)
4343
defer func() {
4444
latency := time.Since(startTime).Milliseconds()
45-
klog.V(2).InfoS("ClusterResourcePlacementWatcher reconciliation ends", "clusterResourcePlacement", req.Name, "latency", latency)
45+
klog.V(2).InfoS("PlacementWatcher reconciliation ends", "placement", req.NamespacedName, "latency", latency)
4646
}()
4747

48-
r.PlacementController.Enqueue(req.Name)
48+
r.PlacementController.Enqueue(string(controller.GetObjectKeyFromRequest(req)))
4949
return ctrl.Result{}, nil
5050
}
5151

52-
// SetupWithManager sets up the controller with the Manager.
53-
func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error {
52+
// SetupWithManagerForClusterResourcePlacement sets up the controller with the Manager.
53+
func (r *Reconciler) SetupWithManagerForClusterResourcePlacement(mgr ctrl.Manager) error {
5454
return ctrl.NewControllerManagedBy(mgr).Named("clusterresourceplacement-watcher").
5555
For(&fleetv1beta1.ClusterResourcePlacement{}).
5656
WithEventFilter(predicate.GenerationChangedPredicate{}).
5757
Complete(r)
5858
}
59+
60+
// SetupWithManagerForResourcePlacement sets up the controller with the Manager.
61+
func (r *Reconciler) SetupWithManagerForResourcePlacement(mgr ctrl.Manager) error {
62+
return ctrl.NewControllerManagedBy(mgr).Named("resourceplacement-watcher").
63+
For(&fleetv1beta1.ResourcePlacement{}).
64+
WithEventFilter(predicate.GenerationChangedPredicate{}).
65+
Complete(r)
66+
}

pkg/controllers/clusterresourceplacementwatcher/watcher_integration_test.go

Lines changed: 79 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,17 @@ import (
2626
"k8s.io/apimachinery/pkg/types"
2727

2828
fleetv1beta1 "github.com/kubefleet-dev/kubefleet/apis/placement/v1beta1"
29+
"github.com/kubefleet-dev/kubefleet/pkg/utils/controller"
2930
)
3031

3132
const (
32-
testCRPName = "my-crp"
33+
testCRPName = "my-crp"
34+
testRPName = "my-rp"
35+
testNamespace = "test-namespace"
36+
37+
eventuallyTimeout = time.Second * 10
38+
consistentlyDuration = time.Second * 10
39+
interval = time.Millisecond * 250
3340
)
3441

3542
func clusterResourcePlacementForTest() *fleetv1beta1.ClusterResourcePlacement {
@@ -53,14 +60,30 @@ func clusterResourcePlacementForTest() *fleetv1beta1.ClusterResourcePlacement {
5360
}
5461
}
5562

63+
func resourcePlacementForTest() *fleetv1beta1.ResourcePlacement {
64+
return &fleetv1beta1.ResourcePlacement{
65+
ObjectMeta: metav1.ObjectMeta{
66+
Name: testRPName,
67+
Namespace: testNamespace,
68+
},
69+
Spec: fleetv1beta1.PlacementSpec{
70+
ResourceSelectors: []fleetv1beta1.ClusterResourceSelector{
71+
{
72+
Group: corev1.GroupName,
73+
Version: "v1",
74+
Kind: "ConfigMap",
75+
LabelSelector: &metav1.LabelSelector{
76+
MatchLabels: map[string]string{"env": "test"},
77+
},
78+
},
79+
},
80+
Policy: &fleetv1beta1.PlacementPolicy{},
81+
},
82+
}
83+
}
84+
5685
// This container cannot be run in parallel with other ITs because it uses a shared fakePlacementController.
5786
var _ = Describe("Test ClusterResourcePlacement Watcher", Serial, func() {
58-
const (
59-
eventuallyTimeout = time.Second * 10
60-
consistentlyDuration = time.Second * 10
61-
interval = time.Millisecond * 250
62-
)
63-
6487
var (
6588
createdCRP = &fleetv1beta1.ClusterResourcePlacement{}
6689
)
@@ -151,3 +174,52 @@ var _ = Describe("Test ClusterResourcePlacement Watcher", Serial, func() {
151174
})
152175
})
153176
})
177+
178+
var _ = Describe("Test ResourcePlacement Watcher", Serial, func() {
179+
var (
180+
createdRP = &fleetv1beta1.ResourcePlacement{}
181+
key = controller.GetObjectKeyFromNamespaceName(testNamespace, testRPName)
182+
)
183+
184+
BeforeEach(func() {
185+
fakePlacementController.ResetQueue()
186+
187+
By("By creating a new resourcePlacement")
188+
createdRP = resourcePlacementForTest()
189+
Expect(k8sClient.Create(ctx, createdRP)).Should(Succeed())
190+
191+
By("By checking the placement queue before resetting")
192+
// The event could arrive after the resetting, which causes the flakiness.
193+
// It makes sure the queue is clear before proceed.
194+
Eventually(func() bool {
195+
return fakePlacementController.Key() == key
196+
}, eventuallyTimeout, interval).Should(BeTrue(), "placementController should receive the RP namespaced name when creating RP")
197+
198+
By("By resetting the placement queue")
199+
// Reset the queue to avoid the multiple create events triggered.
200+
Consistently(func() error {
201+
if fakePlacementController.Key() == key {
202+
By("By finding the key and resetting the placement queue")
203+
fakePlacementController.ResetQueue()
204+
}
205+
return nil
206+
}, consistentlyDuration, interval).Should(Succeed(), "placementController queue should be stable empty after resetting")
207+
})
208+
209+
Context("When deleting resourcePlacement", func() {
210+
It("Should enqueue the event", func() {
211+
By("By deleting rp")
212+
Expect(k8sClient.Delete(ctx, createdRP)).Should(Succeed())
213+
214+
By("By checking rp")
215+
Eventually(func() bool {
216+
return errors.IsNotFound(k8sClient.Get(ctx, types.NamespacedName{Name: testRPName, Namespace: testNamespace}, createdRP))
217+
}, eventuallyTimeout, interval).Should(BeTrue(), "rp should be deleted")
218+
219+
By("By checking placement controller queue")
220+
Eventually(func() bool {
221+
return fakePlacementController.Key() == key
222+
}, eventuallyTimeout, interval).Should(BeTrue(), "placementController should receive the RP namespaced name")
223+
})
224+
})
225+
})

0 commit comments

Comments
 (0)