Skip to content

Commit 9853123

Browse files
authored
feat(cg): Introduce lightweight scheduler simulator (#83)
* feat(cg): make cg-worker respect nodeAffinity Signed-off-by: Xuhui zhang <xuhui@juicedata.io> * feat(cg): add lightweight scheduler simulator Signed-off-by: Xuhui zhang <xuhui@juicedata.io> --------- Signed-off-by: Xuhui zhang <xuhui@juicedata.io>
1 parent d2b8ebe commit 9853123

File tree

10 files changed

+563
-11
lines changed

10 files changed

+563
-11
lines changed

.gitignore

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,4 +26,6 @@ go.work
2626
*.swo
2727
*~
2828

29-
deploy_dev/*
29+
deploy_dev/*
30+
31+
__debug*

api/v1/cachegroup_types.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,11 @@ type CacheGroupSpec struct {
165165
// Default is 1 hour
166166
// +optional
167167
WaitingDeletedMaxDuration *metav1.Duration `json:"waitingDeletedMaxDuration,omitempty"`
168+
169+
// Whether to use the Kubernetes scheduler for worker pods.
170+
// If false, then the nodeName field will be used to bypass the scheduler, and `affinity` will be ignored.
171+
// +optional
172+
EnableScheduling bool `json:"enableScheduling,omitempty"`
168173
}
169174

170175
type CacheGroupPhase string

config/crd/bases/juicefs.io_cachegroups.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ spec:
5353
type: string
5454
cleanCache:
5555
type: boolean
56+
enableScheduling:
57+
type: boolean
5658
replicas:
5759
format: int32
5860
type: integer

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ require (
1313
k8s.io/api v0.32.2
1414
k8s.io/apimachinery v0.32.2
1515
k8s.io/client-go v0.32.2
16+
k8s.io/component-helpers v0.32.2
1617
k8s.io/klog/v2 v2.130.1
1718
k8s.io/kubectl v0.32.2
1819
sigs.k8s.io/controller-runtime v0.20.2

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,8 @@ k8s.io/client-go v0.32.2 h1:4dYCD4Nz+9RApM2b/3BtVvBHw54QjMFUl1OLcJG5yOA=
243243
k8s.io/client-go v0.32.2/go.mod h1:fpZ4oJXclZ3r2nDOv+Ux3XcJutfrwjKTCHz2H3sww94=
244244
k8s.io/component-base v0.32.2 h1:1aUL5Vdmu7qNo4ZsE+569PV5zFatM9hl+lb3dEea2zU=
245245
k8s.io/component-base v0.32.2/go.mod h1:PXJ61Vx9Lg+P5mS8TLd7bCIr+eMJRQTyXe8KvkrvJq0=
246+
k8s.io/component-helpers v0.32.2 h1:2usSAm3zNE5yu5DdAdrKBWLfSYNpU4OPjZywJY5ovP8=
247+
k8s.io/component-helpers v0.32.2/go.mod h1:fvQAoiiOP7jUEUBc9qR0PXiBPuB0I56WTxTkkpcI8g8=
246248
k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk=
247249
k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE=
248250
k8s.io/kube-openapi v0.0.0-20241105132330-32ad38e42d3f h1:GA7//TjRY9yWGy1poLzYYJJ4JRdzg3+O6e8I+e+8T5Y=

internal/controller/cachegroup_controller.go

Lines changed: 58 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,11 @@ import (
2020
"context"
2121
"fmt"
2222
"reflect"
23+
"sort"
2324
"sync"
2425
"time"
2526

27+
"k8s.io/apimachinery/pkg/labels"
2628
"k8s.io/apimachinery/pkg/runtime"
2729
ctrl "sigs.k8s.io/controller-runtime"
2830
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -33,6 +35,7 @@ import (
3335
juicefsiov1 "github.com/juicedata/juicefs-operator/api/v1"
3436
"github.com/juicedata/juicefs-operator/pkg/builder"
3537
"github.com/juicedata/juicefs-operator/pkg/common"
38+
"github.com/juicedata/juicefs-operator/pkg/scheduler"
3639
"github.com/juicedata/juicefs-operator/pkg/utils"
3740

3841
appsv1 "k8s.io/api/apps/v1"
@@ -230,6 +233,9 @@ func (r *CacheGroupReconciler) parseExpectState(ctx context.Context, cg *juicefs
230233
if cg.Spec.Replicas != nil {
231234
return r.parseExpectStateByReplicas(cg), nil
232235
}
236+
if cg.Spec.EnableScheduling {
237+
return r.parseExpectStateByScheduler(ctx, cg)
238+
}
233239
return r.parseExpectStateByNodeSelector(ctx, cg)
234240
}
235241

@@ -242,14 +248,9 @@ func (r *CacheGroupReconciler) parseExpectStateByReplicas(cg *juicefsiov1.CacheG
242248
return expectStates
243249
}
244250

245-
func (r *CacheGroupReconciler) parseExpectStateByNodeSelector(ctx context.Context, cg *juicefsiov1.CacheGroup) (map[string]juicefsiov1.CacheGroupWorkerTemplate, error) {
246-
expectAppliedNodes := corev1.NodeList{}
247-
err := r.List(ctx, &expectAppliedNodes, client.MatchingLabels(cg.Spec.Worker.Template.NodeSelector))
248-
if err != nil {
249-
return nil, err
250-
}
251+
func (r *CacheGroupReconciler) parseExpectStateWithNode(cg *juicefsiov1.CacheGroup, nodes []corev1.Node) map[string]juicefsiov1.CacheGroupWorkerTemplate {
251252
expectStates := map[string]juicefsiov1.CacheGroupWorkerTemplate{}
252-
for _, node := range expectAppliedNodes.Items {
253+
for _, node := range nodes {
253254
expectState := cg.Spec.Worker.Template.DeepCopy()
254255
for _, overwrite := range cg.Spec.Worker.Overwrite {
255256
if utils.SliceContains(overwrite.Nodes, node.Name) ||
@@ -259,6 +260,56 @@ func (r *CacheGroupReconciler) parseExpectStateByNodeSelector(ctx context.Contex
259260
}
260261
expectStates[node.Name] = *expectState
261262
}
263+
return expectStates
264+
}
265+
266+
func (r *CacheGroupReconciler) parseExpectStateByNodeSelector(ctx context.Context, cg *juicefsiov1.CacheGroup) (map[string]juicefsiov1.CacheGroupWorkerTemplate, error) {
267+
expectAppliedNodes := corev1.NodeList{}
268+
err := r.List(ctx, &expectAppliedNodes, client.MatchingLabels(cg.Spec.Worker.Template.NodeSelector))
269+
if err != nil {
270+
return nil, err
271+
}
272+
return r.parseExpectStateWithNode(cg, expectAppliedNodes.Items), nil
273+
}
274+
275+
// parseExpectStateByScheduler
276+
// make cg respect the affinity and tolerations of the scheduler
277+
func (r *CacheGroupReconciler) parseExpectStateByScheduler(ctx context.Context, cg *juicefsiov1.CacheGroup) (map[string]juicefsiov1.CacheGroupWorkerTemplate, error) {
278+
selector := labels.Everything()
279+
if cg.Spec.Worker.Template.NodeSelector != nil {
280+
selector = labels.SelectorFromSet(cg.Spec.Worker.Template.NodeSelector)
281+
}
282+
checkNodes := corev1.NodeList{}
283+
err := r.List(ctx, &checkNodes, client.MatchingLabelsSelector{Selector: selector})
284+
if err != nil {
285+
return nil, err
286+
}
287+
if len(checkNodes.Items) == 0 {
288+
return nil, nil
289+
}
290+
sim := scheduler.NewSchedulerSimulator(r.Client, checkNodes.Items)
291+
expectStates := r.parseExpectStateWithNode(cg, checkNodes.Items)
292+
293+
nodes := checkNodes.Items
294+
sort.Slice(nodes, func(i, j int) bool {
295+
return nodes[i].Name < nodes[j].Name
296+
})
297+
for _, node := range nodes {
298+
state := expectStates[node.Name]
299+
podBuilder := builder.NewPodBuilder(cg, &corev1.Secret{}, node.Name, state, false)
300+
pod := podBuilder.NewCacheGroupWorker(ctx)
301+
if ok, err := sim.CanSchedulePodOnNode(ctx, pod, &node); err != nil {
302+
return nil, err
303+
} else {
304+
if !ok {
305+
delete(expectStates, node.Name)
306+
sim.AppendToBeDeletedPod(pod)
307+
continue
308+
}
309+
pod.Spec.NodeName = node.Name
310+
sim.AppendPendingPod(pod)
311+
}
312+
}
262313
return expectStates, nil
263314
}
264315

pkg/builder/cache_group_pod.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,9 @@ func newBasicPod(cg *juicefsiov1.CacheGroup, nodeName string) *corev1.Pod {
128128
},
129129
}
130130
if cg.Spec.Replicas == nil {
131-
worker.Spec.NodeName = nodeName
131+
if !cg.Spec.EnableScheduling {
132+
worker.Spec.NodeName = nodeName
133+
}
132134
} else {
133135
if cg.Spec.Worker.Template.NodeSelector != nil {
134136
worker.Spec.NodeSelector = cg.Spec.Worker.Template.NodeSelector
@@ -391,8 +393,10 @@ func (p *PodBuilder) NewCacheGroupWorker(ctx context.Context) *corev1.Pod {
391393
worker.Spec.ImagePullSecrets = common.OperatorPod.Spec.ImagePullSecrets
392394
}
393395
}
394-
if spec.Affinity != nil {
395-
worker.Spec.Affinity = spec.Affinity
396+
if p.cg.Spec.Replicas == nil && spec.Affinity != nil {
397+
if p.cg.Spec.EnableScheduling {
398+
worker.Spec.Affinity = utils.AddNodeNameNodeAffinity(spec.Affinity.DeepCopy(), p.node)
399+
}
396400
}
397401
maps.Copy(worker.Labels, spec.Labels)
398402
maps.Copy(worker.Annotations, spec.Annotations)
Lines changed: 206 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,206 @@
1+
/*
2+
Copyright 2025.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
package scheduler
17+
18+
import (
19+
"context"
20+
"fmt"
21+
22+
"github.com/samber/lo"
23+
corev1 "k8s.io/api/core/v1"
24+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
25+
"k8s.io/apimachinery/pkg/labels"
26+
v1helper "k8s.io/component-helpers/scheduling/corev1"
27+
"k8s.io/component-helpers/scheduling/corev1/nodeaffinity"
28+
"sigs.k8s.io/controller-runtime/pkg/client"
29+
"sigs.k8s.io/controller-runtime/pkg/log"
30+
)
31+
32+
type SchedulerSimulator struct {
33+
client client.Client
34+
pendingPods []*corev1.Pod
35+
toBeDeletedPods map[string]*corev1.Pod
36+
nodes map[string]corev1.Node
37+
}
38+
39+
func NewSchedulerSimulator(c client.Client, nodes []corev1.Node) *SchedulerSimulator {
40+
return &SchedulerSimulator{
41+
client: c,
42+
pendingPods: make([]*corev1.Pod, 0),
43+
toBeDeletedPods: make(map[string]*corev1.Pod),
44+
nodes: lo.KeyBy(nodes, func(n corev1.Node) string { return n.Name }),
45+
}
46+
}
47+
48+
// CanSchedulePodOnNode checks if a pod can be scheduled on a specific node
49+
// This is a lightweight simulation and does not cover all scheduling scenarios
50+
// It mainly checks node affinity, tolerations, pod affinity, and pod anti-affinity
51+
// It does not consider resource requests, limits, or other constraints
52+
// The returned result may be inaccurate, leading to scheduling failures and causing the Pod to remain in a Pending state.
53+
func (s *SchedulerSimulator) CanSchedulePodOnNode(ctx context.Context, pod *corev1.Pod, node *corev1.Node) (bool, error) {
54+
log := log.FromContext(ctx)
55+
if pod == nil || node == nil {
56+
return false, nil
57+
}
58+
log = log.WithValues("pod", pod.Name, "node", node.Name)
59+
if !s.checkNodeAffinity(pod, node) {
60+
log.V(1).Info("pod cannot be scheduled on node", "reason", "node affinity not match")
61+
return false, nil
62+
}
63+
64+
if !s.checkTolerations(pod, node) {
65+
log.V(1).Info("pod cannot be scheduled on node", "reason", "tolerations not match")
66+
return false, nil
67+
}
68+
69+
if v, err := s.checkPodAffinity(ctx, pod, node); err != nil || !v {
70+
log.V(1).Info("pod cannot be scheduled on node", "reason", "pod affinity not match")
71+
return false, err
72+
}
73+
74+
if v, err := s.checkPodAntiAffinity(ctx, pod, node); err != nil || !v {
75+
log.V(1).Info("pod cannot be scheduled on node", "reason", "pod anti-affinity not match")
76+
return false, err
77+
}
78+
79+
log.V(1).Info("pod can be scheduled on node")
80+
return true, nil
81+
}
82+
83+
func (s *SchedulerSimulator) AppendPendingPod(pod *corev1.Pod) {
84+
s.pendingPods = append(s.pendingPods, pod)
85+
}
86+
87+
func (s *SchedulerSimulator) AppendToBeDeletedPod(pod *corev1.Pod) {
88+
s.toBeDeletedPods[pod.Name] = pod
89+
}
90+
91+
func (s *SchedulerSimulator) checkNodeAffinity(pod *corev1.Pod, node *corev1.Node) bool {
92+
if pod.Spec.Affinity == nil || pod.Spec.Affinity.NodeAffinity == nil {
93+
return true
94+
}
95+
fitsNodeAffinity, _ := nodeaffinity.GetRequiredNodeAffinity(pod).Match(node)
96+
return fitsNodeAffinity
97+
}
98+
99+
func (s *SchedulerSimulator) checkTolerations(pod *corev1.Pod, node *corev1.Node) bool {
100+
_, hasUntoleratedTaint := v1helper.FindMatchingUntoleratedTaint(node.Spec.Taints, pod.Spec.Tolerations, func(t *corev1.Taint) bool {
101+
return t.Effect == corev1.TaintEffectNoExecute || t.Effect == corev1.TaintEffectNoSchedule
102+
})
103+
return !hasUntoleratedTaint
104+
}
105+
106+
func (s *SchedulerSimulator) checkPodAffinity(ctx context.Context, pod *corev1.Pod, node *corev1.Node) (bool, error) {
107+
if pod.Spec.Affinity == nil || pod.Spec.Affinity.PodAffinity == nil {
108+
return true, nil
109+
}
110+
terms := pod.Spec.Affinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution
111+
if len(terms) == 0 {
112+
return true, nil
113+
}
114+
for _, term := range terms {
115+
topologyValue, exists := node.Labels[term.TopologyKey]
116+
if !exists {
117+
return false, nil
118+
}
119+
pods, err := s.ListPodAffinityTermPods(ctx, pod.Name, term, topologyValue)
120+
if err != nil {
121+
return false, err
122+
}
123+
if len(pods) == 0 {
124+
return false, nil
125+
}
126+
}
127+
return true, nil
128+
}
129+
130+
func (s *SchedulerSimulator) checkPodAntiAffinity(ctx context.Context, pod *corev1.Pod, node *corev1.Node) (bool, error) {
131+
if pod.Spec.Affinity == nil || pod.Spec.Affinity.PodAntiAffinity == nil {
132+
return true, nil
133+
}
134+
terms := pod.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution
135+
if len(terms) == 0 {
136+
return true, nil
137+
}
138+
for _, term := range terms {
139+
topologyValue, exists := node.Labels[term.TopologyKey]
140+
if !exists {
141+
// If the topology key doesn't exist, the anti-affinity constraint is considered satisfied.
142+
continue
143+
}
144+
pods, err := s.ListPodAffinityTermPods(ctx, pod.Name, term, topologyValue)
145+
if err != nil {
146+
return false, err
147+
}
148+
if len(pods) > 0 {
149+
return false, nil
150+
}
151+
}
152+
return true, nil
153+
}
154+
155+
// ListPodAffinityTermPods lists pods that match the given PodAffinityTerms
156+
func (s *SchedulerSimulator) ListPodAffinityTermPods(ctx context.Context, currentPod string, term corev1.PodAffinityTerm, topologyValue string) ([]corev1.Pod, error) {
157+
var matchedPods []corev1.Pod
158+
selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector)
159+
if err != nil {
160+
return nil, fmt.Errorf("failed to convert label selector: %w", err)
161+
}
162+
listOpts := []client.ListOption{
163+
client.MatchingLabelsSelector{Selector: selector},
164+
}
165+
166+
var podLists []corev1.PodList
167+
if len(term.Namespaces) > 0 {
168+
for _, ns := range term.Namespaces {
169+
var podList corev1.PodList
170+
nsListOpts := append([]client.ListOption{client.InNamespace(ns)}, listOpts...)
171+
if err := s.client.List(ctx, &podList, nsListOpts...); err != nil {
172+
return nil, fmt.Errorf("failed to list pods in namespace %s: %w", ns, err)
173+
}
174+
podLists = append(podLists, podList)
175+
}
176+
} else {
177+
var podList corev1.PodList
178+
if err := s.client.List(ctx, &podList, listOpts...); err != nil {
179+
return nil, fmt.Errorf("failed to list pods: %w", err)
180+
}
181+
podLists = append(podLists, podList)
182+
}
183+
184+
for _, podList := range podLists {
185+
for _, pod := range podList.Items {
186+
node, exists := s.nodes[pod.Spec.NodeName]
187+
if !exists {
188+
continue
189+
}
190+
if node.Labels[term.TopologyKey] == topologyValue && pod.Name != currentPod && s.toBeDeletedPods[pod.Name] == nil {
191+
matchedPods = append(matchedPods, pod)
192+
}
193+
}
194+
}
195+
// check pending pods as well
196+
for _, pod := range s.pendingPods {
197+
node, exists := s.nodes[pod.Spec.NodeName]
198+
if !exists {
199+
continue
200+
}
201+
if node.Labels[term.TopologyKey] == topologyValue && selector.Matches(labels.Set(pod.Labels)) && pod.Name != currentPod {
202+
matchedPods = append(matchedPods, *pod)
203+
}
204+
}
205+
return matchedPods, nil
206+
}

0 commit comments

Comments
 (0)