Skip to content

Commit ab92b3d

Browse files
authored
fix: race condition between tracked pods / boosts (#112)
1 parent 439c476 commit ab92b3d

File tree

10 files changed

+586
-309
lines changed

10 files changed

+586
-309
lines changed

internal/boost/manager.go

Lines changed: 160 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -43,17 +43,35 @@ const (
4343
)
4444

4545
type Manager interface {
46-
// AddStartupCPUBoost registers a new startup-cpu-boost is a manager.
47-
AddStartupCPUBoost(ctx context.Context, boost StartupCPUBoost) error
48-
// RemoveStartupCPUBoost removes a startup-cpu-boost from a manager
49-
RemoveStartupCPUBoost(ctx context.Context, namespace, name string)
50-
// UpdateStartupCPUBoost updates a startup-cpu-boost in a manager
51-
UpdateStartupCPUBoost(ctx context.Context, spec *autoscaling.StartupCPUBoost) error
52-
// StartupCPUBoost returns a startup-cpu-boost with a given name and namespace
53-
StartupCPUBoostForPod(ctx context.Context, pod *corev1.Pod) (StartupCPUBoost, bool)
54-
// StartupCPUBoostForPod returns a startup-cpu-boost that matches a given pod
55-
StartupCPUBoost(namespace, name string) (StartupCPUBoost, bool)
46+
// AddRegularCPUBoost registers new regular startup cpu boost in a manager.
47+
AddRegularCPUBoost(ctx context.Context, boost StartupCPUBoost) error
48+
49+
// DeleteRegularCPUBoost deletes a regular startup cpu boost from a manager.
50+
DeleteRegularCPUBoost(ctx context.Context, name, namespace string)
51+
52+
// UpdateRegularCPUBoost updates a regular startup cpu boost in a manager.
53+
UpdateRegularCPUBoost(ctx context.Context, spec *autoscaling.StartupCPUBoost) error
54+
55+
// GetRegularCPUBoost returns a regular startup cpu boost with a given name and namespace
56+
// if such is registered in a manager.
57+
GetRegularCPUBoost(ctx context.Context, name, namespace string) (StartupCPUBoost, bool)
58+
59+
// GetCPUBoostForPod returns a startup cpu boost that matches a given pod if such is registered
60+
// in a manager. If multiple boost types matches, the most specific is returned.
61+
GetCPUBoostForPod(ctx context.Context, pod *corev1.Pod) (StartupCPUBoost, bool)
62+
63+
// UpsertPod adds new or updates existing tracked POD to the manager and boosts.
64+
// If found, the matching cpu boost is returned.
65+
UpsertPod(ctx context.Context, pod *corev1.Pod) (StartupCPUBoost, error)
66+
67+
// DeletePod deletes the tracked POD from the manager and boosts.
68+
// If found, the matching cpu boost is returned.
69+
DeletePod(ctx context.Context, pod *corev1.Pod) (StartupCPUBoost, error)
70+
71+
// SetStartupCPUBoostReconciler sets the boost object reconciler for the manager.
5672
SetStartupCPUBoostReconciler(reconciler reconcile.Reconciler)
73+
74+
// Start starts the manager time based check loop.
5775
Start(ctx context.Context) error
5876
}
5977

@@ -80,21 +98,26 @@ func newTimeTickerImpl(d time.Duration) TimeTicker {
8098
}
8199
}
82100

83-
type managerImpl struct {
84-
sync.RWMutex
85-
client client.Client
86-
reconciler reconcile.Reconciler
87-
ticker TimeTicker
88-
checkInterval time.Duration
89-
startupCPUBoosts map[string]map[string]StartupCPUBoost
90-
timePolicyBoosts map[boostKey]StartupCPUBoost
91-
maxGoroutines int
92-
log logr.Logger
101+
type podRevertTask struct {
102+
boost StartupCPUBoost
103+
pod *corev1.Pod
93104
}
94105

95-
type boostKey struct {
96-
name string
97-
namespace string
106+
type managerImpl struct {
107+
sync.RWMutex
108+
client client.Client
109+
reconciler reconcile.Reconciler
110+
ticker TimeTicker
111+
checkInterval time.Duration
112+
maxGoroutines int
113+
log logr.Logger
114+
115+
// timedBoosts is a collection of boosts of any kind that have time duration policy set
116+
timedBoosts namespacedObjects[StartupCPUBoost]
117+
// regularBoost is collection of a regular, namespaced boosts
118+
regularBoosts namespacedObjects[StartupCPUBoost]
119+
// orphanedPods is a collection of tracked pods that have no matching boost registered
120+
orphanedPods namespacedObjects[*corev1.Pod]
98121
}
99122

100123
func NewManager(client client.Client) Manager {
@@ -103,94 +126,118 @@ func NewManager(client client.Client) Manager {
103126

104127
func NewManagerWithTicker(client client.Client, ticker TimeTicker) Manager {
105128
return &managerImpl{
106-
client: client,
107-
ticker: ticker,
108-
checkInterval: DefaultManagerCheckInterval,
109-
startupCPUBoosts: make(map[string]map[string]StartupCPUBoost),
110-
timePolicyBoosts: make(map[boostKey]StartupCPUBoost),
111-
maxGoroutines: DefaultMaxGoroutines,
112-
log: ctrl.Log.WithName("boost-manager"),
129+
client: client,
130+
ticker: ticker,
131+
checkInterval: DefaultManagerCheckInterval,
132+
timedBoosts: *newNamespacedObjects[StartupCPUBoost](),
133+
regularBoosts: *newNamespacedObjects[StartupCPUBoost](),
134+
orphanedPods: *newNamespacedObjects[*corev1.Pod](),
135+
maxGoroutines: DefaultMaxGoroutines,
136+
log: ctrl.Log.WithName("boost-manager"),
113137
}
114138
}
115139

116-
// AddStartupCPUBoost registers a new startup-cpu-boost is a manager.
117-
// If a boost with a given name and namespace already exists, it returns an error.
118-
func (m *managerImpl) AddStartupCPUBoost(ctx context.Context, boost StartupCPUBoost) error {
140+
// AddRegularCPUBoost registers new regular startup cpu boost in a manager.
141+
// Returns an error if a boost is already registered.
142+
func (m *managerImpl) AddRegularCPUBoost(ctx context.Context, boost StartupCPUBoost) error {
119143
m.Lock()
120144
defer m.Unlock()
121-
if _, ok := m.getStartupCPUBoost(boost.Namespace(), boost.Name()); ok {
145+
log := m.log.WithValues("boost", boost.Name(), "namespace", boost.Namespace())
146+
if _, ok := m.regularBoosts.Get(boost.Name(), boost.Namespace()); ok {
122147
return errStartupCPUBoostAlreadyExists
123148
}
124-
log := m.log.WithValues("boost", boost.Name(), "namespace", boost.Namespace())
125-
log.V(5).Info("handling boost registration")
126-
m.addStartupCPUBoost(boost)
149+
defer log.Info("regular boost registered successfully")
150+
defer m.postProcessNewBoost(ctx, boost)
151+
log.V(5).Info("handling regular boost registration")
152+
m.regularBoosts.Put(boost.Name(), boost.Namespace(), boost)
127153
metrics.NewBoostConfiguration(boost.Namespace())
128-
log.Info("boost registered successfully")
129154
return nil
130155
}
131156

132-
// RemoveStartupCPUBoost removes a startup-cpu-boost from a manager if registered.
133-
func (m *managerImpl) RemoveStartupCPUBoost(ctx context.Context, namespace, name string) {
157+
// DeleteRegularCPUBoost deletes a regular startup cpu boost from a manager.
158+
func (m *managerImpl) DeleteRegularCPUBoost(ctx context.Context, namespace, name string) {
134159
m.Lock()
135160
defer m.Unlock()
136161
log := m.log.WithValues("boost", name, "namespace", namespace)
137-
log.V(5).Info("handling boost deletion")
138-
if boosts, ok := m.startupCPUBoosts[namespace]; ok {
139-
delete(boosts, name)
140-
}
141-
key := boostKey{name: name, namespace: namespace}
142-
delete(m.timePolicyBoosts, key)
162+
log.V(5).Info("handling regular boost deletion")
163+
defer log.Info("boost deleted successfully")
164+
m.regularBoosts.Delete(name, namespace)
165+
m.timedBoosts.Delete(name, namespace)
143166
metrics.DeleteBoostConfiguration(namespace)
144-
log.Info("boost deleted successfully")
145167
}
146168

147-
func (m *managerImpl) UpdateStartupCPUBoost(ctx context.Context, spec *autoscaling.StartupCPUBoost) error {
169+
// UpdateRegularCPUBoost updates a regular startup cpu boost in a manager.
170+
func (m *managerImpl) UpdateRegularCPUBoost(ctx context.Context,
171+
spec *autoscaling.StartupCPUBoost) error {
148172
m.Lock()
149173
defer m.Unlock()
150174
log := m.log.WithValues("boost", spec.ObjectMeta.Name, "namespace", spec.ObjectMeta.Namespace)
151175
log.V(5).Info("handling boost update")
152-
boost, ok := m.getStartupCPUBoost(spec.ObjectMeta.Namespace, spec.ObjectMeta.Name)
176+
defer log.Info("boost updated successfully")
177+
boost, ok := m.regularBoosts.Get(spec.ObjectMeta.Name, spec.ObjectMeta.Namespace)
153178
if !ok {
154-
log.V(5).Info("boost object not found")
179+
log.V(5).Info("boost not found")
155180
return nil
156181
}
157-
if err := boost.UpdateFromSpec(ctx, spec); err != nil {
158-
return err
159-
}
160-
log.Info("boost updated successfully")
161-
return nil
182+
return boost.UpdateFromSpec(ctx, spec)
162183
}
163184

164-
// StartupCPUBoost returns a startup-cpu-boost with a given name and namespace
165-
// if registered in a manager.
166-
func (m *managerImpl) StartupCPUBoost(namespace string, name string) (StartupCPUBoost, bool) {
185+
// GetRegularCPUBoost returns a regular startup cpu boost with a given name and namespace
186+
// if such is registered in a manager.
187+
func (m *managerImpl) GetRegularCPUBoost(ctx context.Context, name string,
188+
namespace string) (StartupCPUBoost, bool) {
167189
m.RLock()
168190
defer m.RUnlock()
169-
return m.getStartupCPUBoost(namespace, name)
191+
return m.regularBoosts.Get(name, namespace)
170192
}
171193

172-
// StartupCPUBoostForPod returns a startup-cpu-boost that matches a given pod if such is registered
173-
// in a manager.
174-
func (m *managerImpl) StartupCPUBoostForPod(ctx context.Context, pod *corev1.Pod) (StartupCPUBoost, bool) {
194+
// GetCPUBoostForPod returns a startup cpu boost that matches a given pod if such is registered
195+
// in a manager. If multiple boost types matches, the most specific is returned.
196+
func (m *managerImpl) GetCPUBoostForPod(ctx context.Context,
197+
pod *corev1.Pod) (StartupCPUBoost, bool) {
175198
m.RLock()
176199
defer m.RUnlock()
177-
m.log.V(5).Info("handling boost pod lookup")
178-
nsBoosts, ok := m.startupCPUBoosts[pod.Namespace]
179-
if !ok {
180-
return nil, false
181-
}
182-
for _, boost := range nsBoosts {
183-
if boost.Matches(pod) {
184-
return boost, true
200+
return m.getMatchingBoost(pod)
201+
}
202+
203+
// UpsertPod adds new or updates existing tracked POD to the manager and boosts.
204+
// If found, the matching cpu boost is returned.
205+
func (m *managerImpl) UpsertPod(ctx context.Context, pod *corev1.Pod) (StartupCPUBoost, error) {
206+
m.Lock()
207+
defer m.Unlock()
208+
m.log.V(5).Info("handling pod upsert")
209+
if boost, ok := m.getMatchingBoost(pod); ok {
210+
err := boost.UpsertPod(ctx, pod)
211+
if err == nil {
212+
m.orphanedPods.Delete(pod.Name, pod.Namespace)
185213
}
214+
return boost, err
186215
}
187-
return nil, false
216+
m.log.V(5).Info("boost not found, registering orphaned pod")
217+
m.orphanedPods.Put(pod.Name, pod.Namespace, pod)
218+
return nil, nil
219+
}
220+
221+
// DeletePod deletes the tracked POD from the manager and boosts.
222+
// If found, the matching cpu boost is returned.
223+
func (m *managerImpl) DeletePod(ctx context.Context, pod *corev1.Pod) (StartupCPUBoost, error) {
224+
m.Lock()
225+
defer m.Unlock()
226+
m.log.V(5).Info("handling pod delete")
227+
if boost, ok := m.getMatchingBoost(pod); ok {
228+
return boost, boost.DeletePod(ctx, pod)
229+
}
230+
m.log.V(5).Info("boost not found, removing orphaned pod if exists")
231+
m.orphanedPods.Delete(pod.Name, pod.Namespace)
232+
return nil, nil
188233
}
189234

235+
// SetStartupCPUBoostReconciler sets the boost object reconciler for the manager.
190236
func (m *managerImpl) SetStartupCPUBoostReconciler(reconciler reconcile.Reconciler) {
191237
m.reconciler = reconciler
192238
}
193239

240+
// Start starts the manager time based check loop.
194241
func (m *managerImpl) Start(ctx context.Context) error {
195242
defer m.ticker.Stop()
196243
m.log.Info("starting")
@@ -205,33 +252,53 @@ func (m *managerImpl) Start(ctx context.Context) error {
205252
}
206253
}
207254

208-
// addStartupCPUBoost registers a new startup-cpu-boost in a manager.
209-
func (m *managerImpl) addStartupCPUBoost(boost StartupCPUBoost) {
210-
boosts, ok := m.startupCPUBoosts[boost.Namespace()]
211-
if !ok {
212-
boosts = make(map[string]StartupCPUBoost)
213-
m.startupCPUBoosts[boost.Namespace()] = boosts
214-
}
215-
boosts[boost.Name()] = boost
216-
if _, ok := boost.DurationPolicies()[duration.FixedDurationPolicyName]; ok {
217-
key := boostKey{name: boost.Name(), namespace: boost.Namespace()}
218-
m.timePolicyBoosts[key] = boost
255+
// PRIVATE FUNCS START below
256+
257+
// getMatchingBoost finds the most specific matching boost for a given pod.
258+
func (m *managerImpl) getMatchingBoost(pod *corev1.Pod) (StartupCPUBoost, bool) {
259+
namespaceBoosts := m.regularBoosts.List(pod.Namespace)
260+
for _, boost := range namespaceBoosts {
261+
if boost.Matches(pod) {
262+
return boost, true
263+
}
219264
}
265+
return nil, false
220266
}
221267

222-
// getStartupCPUBoost returns the startup-cpu-boost with a given name and namespace
223-
// if registered in a manager.
224-
func (m *managerImpl) getStartupCPUBoost(namespace string, name string) (StartupCPUBoost, bool) {
225-
if boosts, ok := m.startupCPUBoosts[namespace]; ok {
226-
boost, ok := boosts[name]
227-
return boost, ok
268+
// postProcessNewBoost performs additional post processing of a newly registered boost
269+
func (m *managerImpl) postProcessNewBoost(ctx context.Context, boost StartupCPUBoost) {
270+
log := m.log.WithValues("boost", boost.Name(), "namespace", boost.Namespace())
271+
if _, ok := boost.DurationPolicies()[duration.FixedDurationPolicyName]; ok {
272+
log.V(5).Info("adding boost to timedBoosts collection")
273+
m.timedBoosts.Put(boost.Name(), boost.Namespace(), boost)
274+
}
275+
if err := m.mapOrphanedPods(ctx, boost); err != nil {
276+
log.Error(err, "failed to map orphaned pods")
228277
}
229-
return nil, false
230278
}
231279

232-
type podRevertTask struct {
233-
boost StartupCPUBoost
234-
pod *corev1.Pod
280+
// mapOrphanedPods maps orphaned pods to the given boost if they match.
281+
// Matched pods are registered in a boost and removed from orphanedPods collection.
282+
func (m *managerImpl) mapOrphanedPods(ctx context.Context, boost StartupCPUBoost) error {
283+
log := m.log.WithValues("boost", boost.Name(), "namespace", boost.Namespace())
284+
errs := make([]error, 0)
285+
namespaceOrphanedPods := m.orphanedPods.List(boost.Namespace())
286+
mappedOrphanedPods := make([]*corev1.Pod, 0, len(namespaceOrphanedPods))
287+
for _, orphanedPod := range namespaceOrphanedPods {
288+
if boost.Matches(orphanedPod) {
289+
log := log.WithValues("pod", orphanedPod.Name)
290+
log.V(5).Info("matched orphaned pod")
291+
if err := boost.UpsertPod(ctx, orphanedPod); err != nil {
292+
errs = append(errs, err)
293+
} else {
294+
mappedOrphanedPods = append(mappedOrphanedPods, orphanedPod)
295+
}
296+
}
297+
}
298+
for _, orphanedPod := range mappedOrphanedPods {
299+
m.orphanedPods.Delete(orphanedPod.Name, orphanedPod.Namespace)
300+
}
301+
return errors.Join(errs...)
235302
}
236303

237304
// validateTimePolicyBoosts validates all time policy boosts in a manager
@@ -244,7 +311,7 @@ func (m *managerImpl) validateTimePolicyBoosts(ctx context.Context) {
244311
errors := make(chan error, m.maxGoroutines)
245312

246313
go func() {
247-
for _, boost := range m.timePolicyBoosts {
314+
for _, boost := range m.timedBoosts.ListAll() {
248315
for _, pod := range boost.ValidatePolicy(ctx, duration.FixedDurationPolicyName) {
249316
revertTasks <- &podRevertTask{
250317
boost: boost,

0 commit comments

Comments
 (0)