Skip to content

Commit bae0106

Browse files
committed
feat(cg): add lightweight scheduler simulator
Signed-off-by: Xuhui zhang <xuhui@juicedata.io>
1 parent e9418c9 commit bae0106

File tree

5 files changed

+465
-18
lines changed

5 files changed

+465
-18
lines changed

.gitignore

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,4 +26,6 @@ go.work
2626
*.swo
2727
*~
2828

29-
deploy_dev/*
29+
deploy_dev/*
30+
31+
__debug*

internal/controller/cachegroup_controller.go

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"context"
2121
"fmt"
2222
"reflect"
23+
"sort"
2324
"sync"
2425
"time"
2526

@@ -34,6 +35,7 @@ import (
3435
juicefsiov1 "github.com/juicedata/juicefs-operator/api/v1"
3536
"github.com/juicedata/juicefs-operator/pkg/builder"
3637
"github.com/juicedata/juicefs-operator/pkg/common"
38+
"github.com/juicedata/juicefs-operator/pkg/scheduler"
3739
"github.com/juicedata/juicefs-operator/pkg/utils"
3840

3941
appsv1 "k8s.io/api/apps/v1"
@@ -273,7 +275,6 @@ func (r *CacheGroupReconciler) parseExpectStateByNodeSelector(ctx context.Contex
273275
// parseExpectStateByScheduler
274276
// make cg respect the affinity and tolerations of the scheduler
275277
func (r *CacheGroupReconciler) parseExpectStateByScheduler(ctx context.Context, cg *juicefsiov1.CacheGroup) (map[string]juicefsiov1.CacheGroupWorkerTemplate, error) {
276-
log := log.FromContext(ctx)
277278
selector := labels.Everything()
278279
if cg.Spec.Worker.Template.NodeSelector != nil {
279280
selector = labels.SelectorFromSet(cg.Spec.Worker.Template.NodeSelector)
@@ -286,20 +287,27 @@ func (r *CacheGroupReconciler) parseExpectStateByScheduler(ctx context.Context,
286287
if len(checkNodes.Items) == 0 {
287288
return nil, nil
288289
}
290+
sim := scheduler.NewSchedulerSimulator(r.Client, checkNodes.Items)
289291
expectStates := r.parseExpectStateWithNode(cg, checkNodes.Items)
290-
for _, node := range checkNodes.Items {
292+
293+
nodes := checkNodes.Items
294+
sort.Slice(nodes, func(i, j int) bool {
295+
return nodes[i].Name < nodes[j].Name
296+
})
297+
for _, node := range nodes {
291298
state := expectStates[node.Name]
292-
pod := &corev1.Pod{
293-
Spec: corev1.PodSpec{
294-
NodeSelector: state.NodeSelector,
295-
Affinity: state.Affinity,
296-
Tolerations: state.Tolerations,
297-
},
298-
}
299-
fitsNodeAffinity, fitsTaints := utils.PodShouldBeOnNode(pod, &node, node.Spec.Taints)
300-
if !fitsNodeAffinity || !fitsTaints {
301-
log.V(1).Info("node does not fit the predicates, ignore", "node", node.Name, "fitsNodeAffinity", fitsNodeAffinity, "fitsTaints", fitsTaints)
302-
delete(expectStates, node.Name)
299+
podBuilder := builder.NewPodBuilder(cg, &corev1.Secret{}, node.Name, state, false)
300+
pod := podBuilder.NewCacheGroupWorker(ctx)
301+
if ok, err := sim.CanSchedulePodOnNode(ctx, pod, &node); err != nil {
302+
return nil, err
303+
} else {
304+
if !ok {
305+
delete(expectStates, node.Name)
306+
sim.AppendToBeDeletedPod(pod)
307+
continue
308+
}
309+
pod.Spec.NodeName = node.Name
310+
sim.AppendPendingPod(pod)
303311
}
304312
}
305313
return expectStates, nil

pkg/builder/cache_group_pod.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -128,9 +128,7 @@ func newBasicPod(cg *juicefsiov1.CacheGroup, nodeName string) *corev1.Pod {
128128
},
129129
}
130130
if cg.Spec.Replicas == nil {
131-
if cg.Spec.EnableScheduling {
132-
worker.Spec.Affinity = utils.AddNodeNameNodeAffinity(worker.Spec.Affinity, nodeName)
133-
} else {
131+
if !cg.Spec.EnableScheduling {
134132
worker.Spec.NodeName = nodeName
135133
}
136134
} else {
@@ -397,7 +395,7 @@ func (p *PodBuilder) NewCacheGroupWorker(ctx context.Context) *corev1.Pod {
397395
}
398396
if p.cg.Spec.Replicas == nil && spec.Affinity != nil {
399397
if p.cg.Spec.EnableScheduling {
400-
worker.Spec.Affinity = utils.AddNodeNameNodeAffinity(spec.Affinity, p.node)
398+
worker.Spec.Affinity = utils.AddNodeNameNodeAffinity(spec.Affinity.DeepCopy(), p.node)
401399
}
402400
}
403401
maps.Copy(worker.Labels, spec.Labels)
Lines changed: 206 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,206 @@
1+
/*
2+
Copyright 2025.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
package scheduler
17+
18+
import (
19+
"context"
20+
"fmt"
21+
22+
"github.com/samber/lo"
23+
corev1 "k8s.io/api/core/v1"
24+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
25+
"k8s.io/apimachinery/pkg/labels"
26+
v1helper "k8s.io/component-helpers/scheduling/corev1"
27+
"k8s.io/component-helpers/scheduling/corev1/nodeaffinity"
28+
"sigs.k8s.io/controller-runtime/pkg/client"
29+
"sigs.k8s.io/controller-runtime/pkg/log"
30+
)
31+
32+
type SchedulerSimulator struct {
33+
client client.Client
34+
pendingPods []*corev1.Pod
35+
toBeDeletedPods map[string]*corev1.Pod
36+
nodes map[string]corev1.Node
37+
}
38+
39+
func NewSchedulerSimulator(c client.Client, nodes []corev1.Node) *SchedulerSimulator {
40+
return &SchedulerSimulator{
41+
client: c,
42+
pendingPods: make([]*corev1.Pod, 0),
43+
toBeDeletedPods: make(map[string]*corev1.Pod),
44+
nodes: lo.KeyBy(nodes, func(n corev1.Node) string { return n.Name }),
45+
}
46+
}
47+
48+
// CanSchedulePodOnNode checks if a pod can be scheduled on a specific node
49+
// This is a lightweight simulation and does not cover all scheduling scenarios
50+
// It mainly checks node affinity, tolerations, pod affinity, and pod anti-affinity
51+
// It does not consider resource requests, limits, or other constraints
52+
// The returned result may be inaccurate, leading to scheduling failures and causing the Pod to remain in a Pending state.
53+
func (s *SchedulerSimulator) CanSchedulePodOnNode(ctx context.Context, pod *corev1.Pod, node *corev1.Node) (bool, error) {
54+
log := log.FromContext(ctx)
55+
if pod == nil || node == nil {
56+
return false, nil
57+
}
58+
log.WithValues("pod", pod.Name, "node", node.Name)
59+
if !s.checkNodeAffinity(pod, node) {
60+
log.V(1).Info("pod cannot be scheduled on node", "reason", "node affinity not match")
61+
return false, nil
62+
}
63+
64+
if !s.checkTolerations(pod, node) {
65+
log.V(1).Info("pod cannot be scheduled on node", "reason", "tolerations not match")
66+
return false, nil
67+
}
68+
69+
if v, err := s.checkPodAffinity(ctx, pod, node); err != nil || !v {
70+
log.V(1).Info("pod cannot be scheduled on node", "reason", "pod affinity not match")
71+
return false, err
72+
}
73+
74+
if v, err := s.checkPodAntiAffinity(ctx, pod, node); err != nil || !v {
75+
log.V(1).Info("pod cannot be scheduled on node", "reason", "pod anti-affinity not match")
76+
return false, err
77+
}
78+
79+
log.V(1).Info("pod can be scheduled on node")
80+
return true, nil
81+
}
82+
83+
func (s *SchedulerSimulator) AppendPendingPod(pod *corev1.Pod) {
84+
s.pendingPods = append(s.pendingPods, pod)
85+
}
86+
87+
func (s *SchedulerSimulator) AppendToBeDeletedPod(pod *corev1.Pod) {
88+
s.toBeDeletedPods[pod.Name] = pod
89+
}
90+
91+
func (s *SchedulerSimulator) checkNodeAffinity(pod *corev1.Pod, node *corev1.Node) bool {
92+
if pod.Spec.Affinity == nil || pod.Spec.Affinity.NodeAffinity == nil {
93+
return true
94+
}
95+
fitsNodeAffinity, _ := nodeaffinity.GetRequiredNodeAffinity(pod).Match(node)
96+
return fitsNodeAffinity
97+
}
98+
99+
func (s *SchedulerSimulator) checkTolerations(pod *corev1.Pod, node *corev1.Node) bool {
100+
_, hasUntoleratedTaint := v1helper.FindMatchingUntoleratedTaint(node.Spec.Taints, pod.Spec.Tolerations, func(t *corev1.Taint) bool {
101+
return t.Effect == corev1.TaintEffectNoExecute || t.Effect == corev1.TaintEffectNoSchedule
102+
})
103+
return !hasUntoleratedTaint
104+
}
105+
106+
func (s *SchedulerSimulator) checkPodAffinity(ctx context.Context, pod *corev1.Pod, node *corev1.Node) (bool, error) {
107+
if pod.Spec.Affinity == nil || pod.Spec.Affinity.PodAffinity == nil {
108+
return true, nil
109+
}
110+
terms := pod.Spec.Affinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution
111+
if len(terms) == 0 {
112+
return true, nil
113+
}
114+
for _, term := range terms {
115+
topologyValue, exists := node.Labels[term.TopologyKey]
116+
if !exists {
117+
return false, nil
118+
}
119+
pods, err := s.ListPodAffinityTermPods(ctx, pod.Name, term, topologyValue)
120+
if err != nil {
121+
return false, err
122+
}
123+
if len(pods) == 0 {
124+
return false, nil
125+
}
126+
}
127+
return true, nil
128+
}
129+
130+
func (s *SchedulerSimulator) checkPodAntiAffinity(ctx context.Context, pod *corev1.Pod, node *corev1.Node) (bool, error) {
131+
if pod.Spec.Affinity == nil || pod.Spec.Affinity.PodAntiAffinity == nil {
132+
return true, nil
133+
}
134+
terms := pod.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution
135+
if len(terms) == 0 {
136+
return true, nil
137+
}
138+
for _, term := range terms {
139+
topologyValue, exists := node.Labels[term.TopologyKey]
140+
if !exists {
141+
// If the topology key doesn't exist, the anti-affinity constraint is considered satisfied.
142+
continue
143+
}
144+
pods, err := s.ListPodAffinityTermPods(ctx, pod.Name, term, topologyValue)
145+
if err != nil {
146+
return false, err
147+
}
148+
if len(pods) > 0 {
149+
return false, nil
150+
}
151+
}
152+
return true, nil
153+
}
154+
155+
// ListPodAffinityTermPods lists pods that match the given PodAffinityTerms
156+
func (s *SchedulerSimulator) ListPodAffinityTermPods(ctx context.Context, currentPod string, term corev1.PodAffinityTerm, topologyValue string) ([]corev1.Pod, error) {
157+
var matchedPods []corev1.Pod
158+
selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector)
159+
if err != nil {
160+
return nil, fmt.Errorf("failed to convert label selector: %w", err)
161+
}
162+
listOpts := []client.ListOption{
163+
client.MatchingLabelsSelector{Selector: selector},
164+
}
165+
166+
var podLists []corev1.PodList
167+
if len(term.Namespaces) > 0 {
168+
for _, ns := range term.Namespaces {
169+
var podList corev1.PodList
170+
nsListOpts := append([]client.ListOption{client.InNamespace(ns)}, listOpts...)
171+
if err := s.client.List(ctx, &podList, nsListOpts...); err != nil {
172+
return nil, fmt.Errorf("failed to list pods in namespace %s: %w", ns, err)
173+
}
174+
podLists = append(podLists, podList)
175+
}
176+
} else {
177+
var podList corev1.PodList
178+
if err := s.client.List(ctx, &podList, listOpts...); err != nil {
179+
return nil, fmt.Errorf("failed to list pods: %w", err)
180+
}
181+
podLists = append(podLists, podList)
182+
}
183+
184+
for _, podList := range podLists {
185+
for _, pod := range podList.Items {
186+
node, exists := s.nodes[pod.Spec.NodeName]
187+
if !exists {
188+
continue
189+
}
190+
if node.Labels[term.TopologyKey] == topologyValue && pod.Name != currentPod && s.toBeDeletedPods[pod.Name] == nil {
191+
matchedPods = append(matchedPods, pod)
192+
}
193+
}
194+
}
195+
// check pending pods as well
196+
for _, pod := range s.pendingPods {
197+
node, exists := s.nodes[pod.Spec.NodeName]
198+
if !exists {
199+
continue
200+
}
201+
if node.Labels[term.TopologyKey] == topologyValue && selector.Matches(labels.Set(pod.Labels)) && pod.Name != currentPod {
202+
matchedPods = append(matchedPods, *pod)
203+
}
204+
}
205+
return matchedPods, nil
206+
}

0 commit comments

Comments
 (0)