Skip to content

Commit df041ef

Browse files
authored
feat: refactor clusterresourceplacementwatcher to support rp (#149)
--------- Signed-off-by: Zhiying Lin <[email protected]>
1 parent 4aedc78 commit df041ef

File tree

6 files changed

+171
-31
lines changed

6 files changed

+171
-31
lines changed

CLAUDE.md

Lines changed: 57 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@ KubeFleet is a CNCF sandbox project that provides multi-cluster application mana
1616
# Build all binaries
1717
make build
1818

19+
# Run specific agent binaries directly
20+
make run-hubagent
21+
make run-memberagent
22+
1923
# Run all tests (unit + integration)
2024
make test
2125

@@ -27,19 +31,28 @@ make integration-test
2731

2832
# Run E2E tests
2933
make e2e-tests
34+
35+
# Run custom E2E tests with labels
36+
make e2e-tests-custom
3037
```
3138

3239
### Code Quality
3340
```bash
3441
# Run linting (required before commits)
3542
make lint
3643

44+
# Run full linting (slower but more thorough)
45+
make lint-full
46+
3747
# Run static analysis
3848
make staticcheck
3949

4050
# Format code
4151
make fmt
4252

53+
# Run go vet
54+
make vet
55+
4356
# Run all quality checks
4457
make reviewable
4558
```
@@ -64,23 +77,42 @@ make setup-clusters MEMBER_CLUSTER_COUNT=5
6477
# Run parallel E2E tests (default - excludes custom tests)
6578
make e2e-tests
6679

80+
# Collect logs after E2E tests
81+
make collect-e2e-logs
82+
6783
# Clean up test clusters
6884
make clean-e2e-tests
6985
```
7086

87+
### Docker and Images
88+
```bash
89+
# Build and push all images
90+
make push
91+
92+
# Build individual images
93+
make docker-build-hub-agent
94+
make docker-build-member-agent
95+
make docker-build-refresh-token
96+
```
97+
7198
## Architecture Overview
7299

73100
### Core API Types
74101
- **ClusterResourcePlacement (CRP)**: Main API for placing resources across clusters with scheduling policies
75102
- **MemberCluster**: Represents a member cluster with identity and heartbeat settings
76103
- **ClusterResourceBinding**: Represents scheduling decisions binding resources to clusters
77104
- **Work**: Contains manifests to be applied on member clusters
105+
- **ClusterResourceSnapshot**: Immutable snapshots of resources to be placed
106+
- **ClusterSchedulingPolicySnapshot**: Immutable snapshots of scheduling policies
78107

79108
### Key Controllers
80109
- **ClusterResourcePlacement Controller** (`pkg/controllers/clusterresourceplacement/`): Manages CRP lifecycle
81110
- **Scheduler** (`pkg/scheduler/`): Makes placement decisions using pluggable framework
82111
- **WorkGenerator** (`pkg/controllers/workgenerator/`): Generates Work objects from bindings
83112
- **WorkApplier** (`pkg/controllers/workapplier/`): Applies Work manifests on member clusters
113+
- **Resource Placement Watchers**: Monitor and react to changes in placement decisions
114+
- **ClusterResourceBinding Watcher** (`pkg/controllers/clusterresourcebindingwatcher/`): Watches binding changes
115+
- **ClusterResourcePlacement Watcher** (`pkg/controllers/clusterresourceplacementwatcher/`): Watches placement changes
84116

85117
### Scheduler Framework
86118
The scheduler uses a pluggable architecture similar to Kubernetes scheduler:
@@ -107,23 +139,39 @@ cmd/hubagent/ # Hub agent main and setup
107139
cmd/memberagent/ # Member agent main and setup
108140
```
109141

110-
## Testing Guidelines
142+
## Development Notes
143+
144+
- Always run `make reviewable` before submitting PRs
145+
- Follow [Uber Go Style Guide](https://github.com/uber-go/guide/blob/master/style.md) when possible
146+
- Favor standard library over third-party libraries
147+
- Controllers should be thoroughly tested with integration tests
148+
- New scheduler plugins should implement both Filter and Score interfaces
149+
- Use existing patterns from similar controllers when adding new functionality
150+
- Property providers should implement the `PropertyProvider` interface
151+
- PR titles must use prefixes: `feat:`, `fix:`, `docs:`, `test:`, `chore:`, `ci:`, `perf:`, `refactor:`, `revert:`
152+
153+
## Testing Patterns
111154

112155
### Unit Tests
113156
- Use `testify` for assertions
114157
- Controllers use `envtest` for integration testing with real etcd
115158
- Mock external dependencies with `gomock`
159+
- Unit test files: `<go_file>_test.go` in same directory
160+
- Table-driven test style preferred
116161

117-
### E2E Tests
118-
- Located in `test/e2e/`
162+
### Integration Tests
163+
- Located in `test/integration/` and `test/scheduler/`
119164
- Use Ginkgo/Gomega framework
120165
- Tests run against real Kind clusters
166+
- Files named: `<go_file>_integration_test.go`
121167
- Separate test suites for different placement strategies
122168

123-
### Integration Tests
124-
- Located in `test/integration/`
169+
### E2E Tests
170+
- Located in `test/e2e/`
171+
- Use Ginkgo/Gomega framework
125172
- Test cross-controller interactions
126173
- Use shared test manifests in `test/integration/manifests/`
174+
- Run with `make e2e-tests` against 3 Kind clusters
127175

128176
## Key Patterns
129177

@@ -148,10 +196,7 @@ All controllers follow standard Kubernetes controller patterns:
148196
- v1beta1 APIs are current stable version
149197
- Feature flags control API version enablement
150198

151-
## Development Notes
152-
153-
- Always run `make reviewable` before submitting PRs
154-
- Controllers should be thoroughly tested with integration tests
155-
- New scheduler plugins should implement both Filter and Score interfaces
156-
- Use existing patterns from similar controllers when adding new functionality
157-
- Property providers should implement the `PropertyProvider` interface
199+
### Watcher Pattern
200+
- Resource placement watchers monitor CRP and binding changes
201+
- Event-driven architecture for responsive placement decisions
202+
- Separate watchers for different resource types to enable focused reconciliation

cmd/hubagent/workload/setup.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ func SetupControllers(ctx context.Context, wg *sync.WaitGroup, mgr ctrl.Manager,
197197
klog.Info("Setting up clusterResourcePlacement watcher")
198198
if err := (&clusterresourceplacementwatcher.Reconciler{
199199
PlacementController: clusterResourcePlacementControllerV1Beta1,
200-
}).SetupWithManager(mgr); err != nil {
200+
}).SetupWithManagerForClusterResourcePlacement(mgr); err != nil {
201201
klog.ErrorS(err, "Unable to set up the clusterResourcePlacement watcher")
202202
return err
203203
}

pkg/controllers/clusterresourceplacement/suite_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ var _ = BeforeSuite(func() {
141141

142142
err = (&clusterresourceplacementwatcher.Reconciler{
143143
PlacementController: crpController,
144-
}).SetupWithManager(mgr)
144+
}).SetupWithManagerForClusterResourcePlacement(mgr)
145145
Expect(err).Should(Succeed(), "failed to create clusterResourcePlacement watcher")
146146

147147
err = (&clusterresourcebindingwatcher.Reconciler{

pkg/controllers/clusterresourceplacementwatcher/suite_test.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ import (
2323

2424
. "github.com/onsi/ginkgo/v2"
2525
. "github.com/onsi/gomega"
26+
corev1 "k8s.io/api/core/v1"
27+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2628
"k8s.io/client-go/kubernetes/scheme"
2729
"k8s.io/client-go/rest"
2830
"k8s.io/klog/v2"
@@ -79,6 +81,14 @@ var _ = BeforeSuite(func() {
7981
Expect(err).Should(Succeed())
8082
Expect(k8sClient).NotTo(BeNil())
8183

84+
By("creating a test namespace")
85+
var ns = corev1.Namespace{
86+
ObjectMeta: metav1.ObjectMeta{
87+
Name: testNamespace,
88+
},
89+
}
90+
Expect(k8sClient.Create(ctx, &ns)).Should(Succeed(), "failed to create namespace")
91+
8292
By("starting the controller manager")
8393
klog.InitFlags(flag.CommandLine)
8494
flag.Parse()
@@ -96,7 +106,12 @@ var _ = BeforeSuite(func() {
96106

97107
err = (&Reconciler{
98108
PlacementController: fakePlacementController,
99-
}).SetupWithManager(mgr)
109+
}).SetupWithManagerForClusterResourcePlacement(mgr)
110+
Expect(err).Should(Succeed())
111+
112+
err = (&Reconciler{
113+
PlacementController: fakePlacementController,
114+
}).SetupWithManagerForResourcePlacement(mgr)
100115
Expect(err).Should(Succeed())
101116

102117
go func() {

pkg/controllers/clusterresourceplacementwatcher/watcher.go

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
1414
limitations under the License.
1515
*/
1616

17-
// Package clusterresourceplacementwatcher features a controller to watch the clusterResourcePlacement changes.
17+
// Package clusterresourceplacementwatcher features a controller to watch the clusterResourcePlacement and resourcePlacement changes.
1818
package clusterresourceplacementwatcher
1919

2020
import (
@@ -29,30 +29,38 @@ import (
2929
"github.com/kubefleet-dev/kubefleet/pkg/utils/controller"
3030
)
3131

32-
// Reconciler reconciles a clusterResourcePlacement object.
32+
// Reconciler reconciles both ClusterResourcePlacement and ResourcePlacement objects.
3333
type Reconciler struct {
3434
// PlacementController maintains a rate limited queue which used to store
35-
// the name of the clusterResourcePlacement and a reconcile function to consume the items in queue.
35+
// the name of the placement objects and a reconcile function to consume the items in queue.
3636
PlacementController controller.Controller
3737
}
3838

39-
// Reconcile triggers a single CRP reconcile round if CRP has changed.
39+
// Reconcile triggers a single placement reconcile round if placement has changed.
4040
func (r *Reconciler) Reconcile(_ context.Context, req ctrl.Request) (ctrl.Result, error) {
4141
startTime := time.Now()
42-
klog.V(2).InfoS("ClusterResourcePlacementWatcher reconciliation starts", "clusterResourcePlacement", req.Name)
42+
klog.V(2).InfoS("PlacementWatcher reconciliation starts", "placement", req.NamespacedName)
4343
defer func() {
4444
latency := time.Since(startTime).Milliseconds()
45-
klog.V(2).InfoS("ClusterResourcePlacementWatcher reconciliation ends", "clusterResourcePlacement", req.Name, "latency", latency)
45+
klog.V(2).InfoS("PlacementWatcher reconciliation ends", "placement", req.NamespacedName, "latency", latency)
4646
}()
4747

48-
r.PlacementController.Enqueue(req.Name)
48+
r.PlacementController.Enqueue(string(controller.GetObjectKeyFromRequest(req)))
4949
return ctrl.Result{}, nil
5050
}
5151

52-
// SetupWithManager sets up the controller with the Manager.
53-
func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error {
52+
// SetupWithManagerForClusterResourcePlacement sets up the controller with the Manager.
53+
func (r *Reconciler) SetupWithManagerForClusterResourcePlacement(mgr ctrl.Manager) error {
5454
return ctrl.NewControllerManagedBy(mgr).Named("clusterresourceplacement-watcher").
5555
For(&fleetv1beta1.ClusterResourcePlacement{}).
5656
WithEventFilter(predicate.GenerationChangedPredicate{}).
5757
Complete(r)
5858
}
59+
60+
// SetupWithManagerForResourcePlacement sets up the controller with the Manager.
61+
func (r *Reconciler) SetupWithManagerForResourcePlacement(mgr ctrl.Manager) error {
62+
return ctrl.NewControllerManagedBy(mgr).Named("resourceplacement-watcher").
63+
For(&fleetv1beta1.ResourcePlacement{}).
64+
WithEventFilter(predicate.GenerationChangedPredicate{}).
65+
Complete(r)
66+
}

pkg/controllers/clusterresourceplacementwatcher/watcher_integration_test.go

Lines changed: 79 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,17 @@ import (
2626
"k8s.io/apimachinery/pkg/types"
2727

2828
fleetv1beta1 "github.com/kubefleet-dev/kubefleet/apis/placement/v1beta1"
29+
"github.com/kubefleet-dev/kubefleet/pkg/utils/controller"
2930
)
3031

3132
const (
32-
testCRPName = "my-crp"
33+
testCRPName = "my-crp"
34+
testRPName = "my-rp"
35+
testNamespace = "test-namespace"
36+
37+
eventuallyTimeout = time.Second * 10
38+
consistentlyDuration = time.Second * 10
39+
interval = time.Millisecond * 250
3340
)
3441

3542
func clusterResourcePlacementForTest() *fleetv1beta1.ClusterResourcePlacement {
@@ -53,14 +60,30 @@ func clusterResourcePlacementForTest() *fleetv1beta1.ClusterResourcePlacement {
5360
}
5461
}
5562

63+
func resourcePlacementForTest() *fleetv1beta1.ResourcePlacement {
64+
return &fleetv1beta1.ResourcePlacement{
65+
ObjectMeta: metav1.ObjectMeta{
66+
Name: testRPName,
67+
Namespace: testNamespace,
68+
},
69+
Spec: fleetv1beta1.PlacementSpec{
70+
ResourceSelectors: []fleetv1beta1.ClusterResourceSelector{
71+
{
72+
Group: corev1.GroupName,
73+
Version: "v1",
74+
Kind: "ConfigMap",
75+
LabelSelector: &metav1.LabelSelector{
76+
MatchLabels: map[string]string{"env": "test"},
77+
},
78+
},
79+
},
80+
Policy: &fleetv1beta1.PlacementPolicy{},
81+
},
82+
}
83+
}
84+
5685
// This container cannot be run in parallel with other ITs because it uses a shared fakePlacementController.
5786
var _ = Describe("Test ClusterResourcePlacement Watcher", Serial, func() {
58-
const (
59-
eventuallyTimeout = time.Second * 10
60-
consistentlyDuration = time.Second * 10
61-
interval = time.Millisecond * 250
62-
)
63-
6487
var (
6588
createdCRP = &fleetv1beta1.ClusterResourcePlacement{}
6689
)
@@ -151,3 +174,52 @@ var _ = Describe("Test ClusterResourcePlacement Watcher", Serial, func() {
151174
})
152175
})
153176
})
177+
178+
var _ = Describe("Test ResourcePlacement Watcher", Serial, func() {
179+
var (
180+
createdRP = &fleetv1beta1.ResourcePlacement{}
181+
key = controller.GetObjectKeyFromNamespaceName(testNamespace, testRPName)
182+
)
183+
184+
BeforeEach(func() {
185+
fakePlacementController.ResetQueue()
186+
187+
By("By creating a new resourcePlacement")
188+
createdRP = resourcePlacementForTest()
189+
Expect(k8sClient.Create(ctx, createdRP)).Should(Succeed())
190+
191+
By("By checking the placement queue before resetting")
192+
// The event could arrive after the resetting, which causes the flakiness.
193+
// It makes sure the queue is clear before proceed.
194+
Eventually(func() bool {
195+
return fakePlacementController.Key() == key
196+
}, eventuallyTimeout, interval).Should(BeTrue(), "placementController should receive the RP namespaced name when creating RP")
197+
198+
By("By resetting the placement queue")
199+
// Reset the queue to avoid the multiple create events triggered.
200+
Consistently(func() error {
201+
if fakePlacementController.Key() == key {
202+
By("By finding the key and resetting the placement queue")
203+
fakePlacementController.ResetQueue()
204+
}
205+
return nil
206+
}, consistentlyDuration, interval).Should(Succeed(), "placementController queue should be stable empty after resetting")
207+
})
208+
209+
Context("When deleting resourcePlacement", func() {
210+
It("Should enqueue the event", func() {
211+
By("By deleting rp")
212+
Expect(k8sClient.Delete(ctx, createdRP)).Should(Succeed())
213+
214+
By("By checking rp")
215+
Eventually(func() bool {
216+
return errors.IsNotFound(k8sClient.Get(ctx, types.NamespacedName{Name: testRPName, Namespace: testNamespace}, createdRP))
217+
}, eventuallyTimeout, interval).Should(BeTrue(), "rp should be deleted")
218+
219+
By("By checking placement controller queue")
220+
Eventually(func() bool {
221+
return fakePlacementController.Key() == key
222+
}, eventuallyTimeout, interval).Should(BeTrue(), "placementController should receive the RP namespaced name")
223+
})
224+
})
225+
})

0 commit comments

Comments
 (0)