Skip to content

Commit 51fdd55

Browse files
committed
use sync map for the cache
1 parent 547c005 commit 51fdd55

File tree

2 files changed

+8
-32
lines changed

2 files changed

+8
-32
lines changed

pkg/controller/job/job_controller.go

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -124,9 +124,9 @@ type Controller struct {
124124
// recreation in case of pod failures.
125125
podBackoffStore *backoffStore
126126

127-
// completedJobStore contains the job ids for which the job status is updated to completed
127+
// finishedJobStore contains the job ids for which the job status is finished
128128
// but the corresponding event is not yet received.
129-
completedJobStore *jobUIDCache
129+
finishedJobStore sync.Map
130130
}
131131

132132
type syncJobCtx struct {
@@ -188,9 +188,7 @@ func newControllerWithClock(ctx context.Context, podInformer coreinformers.PodIn
188188
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}),
189189
clock: clock,
190190
podBackoffStore: newBackoffStore(),
191-
completedJobStore: &jobUIDCache{
192-
set: sets.New[types.UID](),
193-
},
191+
finishedJobStore: sync.Map{},
194192
}
195193

196194
if _, err := jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
@@ -543,7 +541,7 @@ func (jm *Controller) deleteJob(logger klog.Logger, obj interface{}) {
543541
return
544542
}
545543
}
546-
jm.completedJobStore.remove(jobObj.UID)
544+
jm.finishedJobStore.Delete(jobObj.UID)
547545
jm.enqueueLabelSelector(jobObj)
548546
}
549547

@@ -828,6 +826,7 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
828826
}
829827
return err
830828
}
829+
831830
// Skip syncing of the job it is managed by another controller.
832831
// We cannot rely solely on skipping of queueing such jobs for synchronization,
833832
// because it is possible a synchronization task is queued for a job, without
@@ -848,10 +847,10 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
848847
// re-syncing here as the record has to be removed for finished/deleted jobs
849848
return fmt.Errorf("error removing backoff record %w", err)
850849
}
851-
jm.completedJobStore.remove(job.UID)
850+
jm.finishedJobStore.Delete(job.UID)
852851
return nil
853852
}
854-
if jm.completedJobStore.exists(job.UID) {
853+
if _, ok := jm.finishedJobStore.Load(job.UID); ok {
855854
logger.V(2).Info("Skip syncing the job as its marked completed but the completed update event is not yet received", "uid", job.UID, "key", key)
856855
return nil
857856
}
@@ -1316,7 +1315,7 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job
13161315
}
13171316
if jobFinished {
13181317
jm.recordJobFinished(jobCtx.job, jobCtx.finishedCondition)
1319-
jm.completedJobStore.add(jobCtx.job.UID)
1318+
jm.finishedJobStore.Store(jobCtx.job.UID, struct{}{})
13201319
}
13211320
recordJobPodFinished(logger, jobCtx.job, oldCounters)
13221321
}

pkg/controller/job/tracking_utils.go

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -151,26 +151,3 @@ func isFinishedPodWithTrackingFinalizer(pod *v1.Pod) bool {
151151
}
152152
return (pod.Status.Phase == v1.PodFailed || pod.Status.Phase == v1.PodSucceeded) && hasJobTrackingFinalizer(pod)
153153
}
154-
155-
type jobUIDCache struct {
156-
sync.RWMutex
157-
set sets.Set[types.UID]
158-
}
159-
160-
func (j *jobUIDCache) add(uid types.UID) {
161-
j.Lock()
162-
defer j.Unlock()
163-
j.set.Insert(uid)
164-
}
165-
166-
func (j *jobUIDCache) remove(uid types.UID) {
167-
j.Lock()
168-
defer j.Unlock()
169-
j.set.Delete(uid)
170-
}
171-
172-
func (j *jobUIDCache) exists(uid types.UID) bool {
173-
j.RLock()
174-
defer j.RUnlock()
175-
return j.set.Has(uid)
176-
}

0 commit comments

Comments
 (0)