Skip to content

Commit c2a14dd

Browse files
committed
ensure that requests are never scheduled in parallel
1 parent c30ab8b commit c2a14dd

File tree

3 files changed

+22
-9
lines changed

3 files changed

+22
-9
lines changed

internal/controllers/scheduler/controller.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,17 +38,14 @@ func NewClusterScheduler(setupLog *logging.Logger, platformCluster *clusters.Clu
3838
setupLog.WithName(ControllerName).Info("Initializing cluster scheduler", "scope", string(config.Scope), "strategy", string(config.Strategy), "knownPurposes", strings.Join(sets.List(sets.KeySet(config.PurposeMappings)), ","))
3939
}
4040
return &ClusterScheduler{
41-
Preemptive: PreemptiveScheduler{
42-
PlatformCluster: platformCluster,
43-
Config: config,
44-
},
41+
Preemptive: NewPreemptiveScheduler(platformCluster, config),
4542
PlatformCluster: platformCluster,
4643
Config: config,
4744
}, nil
4845
}
4946

5047
type ClusterScheduler struct {
51-
Preemptive PreemptiveScheduler
48+
Preemptive *PreemptiveScheduler
5249
PlatformCluster *clusters.Cluster
5350
Config *config.SchedulerConfig
5451
}
@@ -60,6 +57,8 @@ type ReconcileResult = ctrlutils.ReconcileResult[*clustersv1alpha1.ClusterReques
6057
func (r *ClusterScheduler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
6158
log := logging.FromContextOrPanic(ctx).WithName(ControllerName)
6259
ctx = logging.NewContext(ctx, log)
60+
r.Preemptive.Lock.Lock()
61+
defer r.Preemptive.Lock.Unlock()
6362
log.Info("Starting reconcile")
6463
rr := r.reconcile(ctx, req)
6564
// status update

internal/controllers/scheduler/controller_preemptive.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package scheduler
33
import (
44
"context"
55
"fmt"
6+
"sync"
67

78
apierrors "k8s.io/apimachinery/pkg/api/errors"
89
ctrl "sigs.k8s.io/controller-runtime"
@@ -24,17 +25,33 @@ import (
2425
const PreemptiveControllerName = "PreemptiveScheduler"
2526

2627
type PreemptiveScheduler struct {
28+
Lock *sync.Mutex
2729
PlatformCluster *clusters.Cluster
2830
Config *config.SchedulerConfig
2931
}
3032

33+
// NewPreemptiveScheduler creates a new PreemptiveScheduler.
34+
// Note that this is already called by the NewScheduler constructor function, there should be no need to call this outside of tests.
35+
func NewPreemptiveScheduler(platformCluster *clusters.Cluster, cfg *config.SchedulerConfig) *PreemptiveScheduler {
36+
if cfg == nil {
37+
cfg = &config.SchedulerConfig{}
38+
}
39+
return &PreemptiveScheduler{
40+
Lock: &sync.Mutex{},
41+
PlatformCluster: platformCluster,
42+
Config: cfg,
43+
}
44+
}
45+
3146
var _ reconcile.Reconciler = &PreemptiveScheduler{}
3247

3348
type PreemptiveReconcileResult = ctrlutils.ReconcileResult[*clustersv1alpha1.PreemptiveClusterRequest, clustersv1alpha1.ConditionStatus]
3449

3550
func (r *PreemptiveScheduler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
3651
log := logging.FromContextOrPanic(ctx).WithName(PreemptiveControllerName)
3752
ctx = logging.NewContext(ctx, log)
53+
r.Lock.Lock()
54+
defer r.Lock.Unlock()
3855
log.Info("Starting reconcile")
3956
rr := r.reconcile(ctx, req)
4057
// status update

internal/controllers/scheduler/controller_test.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,7 @@ func defaultTestSetup(testDirPathSegments ...string) (*config.SchedulerConfig, *
6060
return r
6161
}, platform).
6262
WithReconcilerConstructor(pSchedulerRec, func(c ...client.Client) reconcile.Reconciler {
63-
return &scheduler.PreemptiveScheduler{
64-
PlatformCluster: clusters.NewTestClusterFromClient(platform, c[0]),
65-
Config: cfg.Scheduler,
66-
}
63+
return scheduler.NewPreemptiveScheduler(clusters.NewTestClusterFromClient(platform, c[0]), cfg.Scheduler)
6764
}, platform).
6865
WithUIDs(platform).
6966
Build()

0 commit comments

Comments
 (0)