Skip to content

Commit 66b3dc1

Browse files
authored
Merge pull request kubernetes#128400 from tenzen-y/use-uid-typed-instead-of-string
Job: Refactor uncountedTerminatedPods to avoid casting everywhere
2 parents ce47f7b + a23e7a4 commit 66b3dc1

File tree

5 files changed

+53
-60
lines changed

5 files changed

+53
-60
lines changed

pkg/controller/job/indexed_job_utils_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"github.com/google/go-cmp/cmp"
2626
batch "k8s.io/api/batch/v1"
2727
v1 "k8s.io/api/core/v1"
28+
"k8s.io/apimachinery/pkg/types"
2829
"k8s.io/apimachinery/pkg/util/sets"
2930
"k8s.io/apiserver/pkg/util/feature"
3031
featuregatetesting "k8s.io/component-base/featuregate/testing"
@@ -405,7 +406,7 @@ func TestGetPodsWithDelayedDeletionPerIndex(t *testing.T) {
405406
cases := map[string]struct {
406407
job batch.Job
407408
pods []*v1.Pod
408-
expectedRmFinalizers sets.Set[string]
409+
expectedRmFinalizers sets.Set[types.UID]
409410
wantPodsWithDelayedDeletionPerIndex []string
410411
}{
411412
"failed pods are kept corresponding to non-failed indexes are kept": {
@@ -444,7 +445,7 @@ func TestGetPodsWithDelayedDeletionPerIndex(t *testing.T) {
444445
pods: []*v1.Pod{
445446
buildPod().uid("a").indexFailureCount("0").phase(v1.PodFailed).index("0").trackingFinalizer().Pod,
446447
},
447-
expectedRmFinalizers: sets.New("a"),
448+
expectedRmFinalizers: sets.New[types.UID]("a"),
448449
wantPodsWithDelayedDeletionPerIndex: []string{},
449450
},
450451
"failed pod with index outside of completions; the pod's deletion is not delayed": {

pkg/controller/job/job_controller.go

Lines changed: 28 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ type syncJobCtx struct {
136136
succeededIndexes orderedIntervals
137137
failedIndexes *orderedIntervals
138138
newBackoffRecord backoffRecord
139-
expectedRmFinalizers sets.Set[string]
139+
expectedRmFinalizers sets.Set[types.UID]
140140
uncounted *uncountedTerminatedPods
141141
podsWithDelayedDeletionPerIndex map[int]*v1.Pod
142142
terminating *int32
@@ -370,7 +370,7 @@ func (jm *Controller) updatePod(logger klog.Logger, old, cur interface{}) {
370370
if finalizerRemoved {
371371
key, err := controller.KeyFunc(job)
372372
if err == nil {
373-
jm.finalizerExpectations.finalizerRemovalObserved(logger, key, string(curPod.UID))
373+
jm.finalizerExpectations.finalizerRemovalObserved(logger, key, curPod.UID)
374374
}
375375
}
376376
jm.enqueueSyncJobBatched(logger, job)
@@ -386,7 +386,7 @@ func (jm *Controller) updatePod(logger klog.Logger, old, cur interface{}) {
386386
if finalizerRemoved {
387387
key, err := controller.KeyFunc(job)
388388
if err == nil {
389-
jm.finalizerExpectations.finalizerRemovalObserved(logger, key, string(curPod.UID))
389+
jm.finalizerExpectations.finalizerRemovalObserved(logger, key, curPod.UID)
390390
}
391391
}
392392
jm.enqueueSyncJobBatched(logger, job)
@@ -460,7 +460,7 @@ func (jm *Controller) deletePod(logger klog.Logger, obj interface{}, final bool)
460460
// Consider the finalizer removed if this is the final delete. Otherwise,
461461
// it's an update for the deletion timestamp, then check finalizer.
462462
if final || !hasFinalizer {
463-
jm.finalizerExpectations.finalizerRemovalObserved(logger, jobKey, string(pod.UID))
463+
jm.finalizerExpectations.finalizerRemovalObserved(logger, jobKey, pod.UID)
464464
}
465465

466466
jm.enqueueSyncJobBatched(logger, job)
@@ -1167,11 +1167,10 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job
11671167
// Sort to introduce completed Indexes in order.
11681168
sort.Sort(byCompletionIndex(jobCtx.pods))
11691169
}
1170-
uidsWithFinalizer := make(sets.Set[string], len(jobCtx.pods))
1170+
uidsWithFinalizer := make(sets.Set[types.UID], len(jobCtx.pods))
11711171
for _, p := range jobCtx.pods {
1172-
uid := string(p.UID)
1173-
if hasJobTrackingFinalizer(p) && !jobCtx.expectedRmFinalizers.Has(uid) {
1174-
uidsWithFinalizer.Insert(uid)
1172+
if hasJobTrackingFinalizer(p) && !jobCtx.expectedRmFinalizers.Has(p.UID) {
1173+
uidsWithFinalizer.Insert(p.UID)
11751174
}
11761175
}
11771176

@@ -1183,7 +1182,7 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job
11831182
podFailureCountByPolicyAction := map[string]int{}
11841183
reachedMaxUncountedPods := false
11851184
for _, pod := range jobCtx.pods {
1186-
if !hasJobTrackingFinalizer(pod) || jobCtx.expectedRmFinalizers.Has(string(pod.UID)) {
1185+
if !hasJobTrackingFinalizer(pod) || jobCtx.expectedRmFinalizers.Has(pod.UID) {
11871186
// This pod was processed in a previous sync.
11881187
continue
11891188
}
@@ -1192,7 +1191,7 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job
11921191
continue
11931192
}
11941193
podsToRemoveFinalizer = append(podsToRemoveFinalizer, pod)
1195-
if pod.Status.Phase == v1.PodSucceeded && !jobCtx.uncounted.failed.Has(string(pod.UID)) {
1194+
if pod.Status.Phase == v1.PodSucceeded && !jobCtx.uncounted.failed.Has(pod.UID) {
11961195
if isIndexed {
11971196
// The completion index is enough to avoid recounting succeeded pods.
11981197
// No need to track UIDs.
@@ -1201,14 +1200,14 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job
12011200
newSucceededIndexes = append(newSucceededIndexes, ix)
12021201
needsFlush = true
12031202
}
1204-
} else if !jobCtx.uncounted.succeeded.Has(string(pod.UID)) {
1203+
} else if !jobCtx.uncounted.succeeded.Has(pod.UID) {
12051204
needsFlush = true
12061205
uncountedStatus.Succeeded = append(uncountedStatus.Succeeded, pod.UID)
12071206
}
12081207
} else if considerPodFailed || (jobCtx.finishedCondition != nil && !isSuccessCriteriaMetCondition(jobCtx.finishedCondition)) {
12091208
// When the job is considered finished, every non-terminated pod is considered failed.
12101209
ix := getCompletionIndex(pod.Annotations)
1211-
if !jobCtx.uncounted.failed.Has(string(pod.UID)) && (!isIndexed || (ix != unknownCompletionIndex && ix < int(*jobCtx.job.Spec.Completions))) {
1210+
if !jobCtx.uncounted.failed.Has(pod.UID) && (!isIndexed || (ix != unknownCompletionIndex && ix < int(*jobCtx.job.Spec.Completions))) {
12121211
if jobCtx.job.Spec.PodFailurePolicy != nil {
12131212
_, countFailed, action := matchPodFailurePolicy(jobCtx.job.Spec.PodFailurePolicy, pod)
12141213
if action != nil {
@@ -1333,7 +1332,7 @@ func canRemoveFinalizer(logger klog.Logger, jobCtx *syncJobCtx, pod *v1.Pod, con
13331332
//
13341333
// Returns whether there are pending changes in the Job status that need to be
13351334
// flushed in subsequent calls.
1336-
func (jm *Controller) flushUncountedAndRemoveFinalizers(ctx context.Context, jobCtx *syncJobCtx, podsToRemoveFinalizer []*v1.Pod, uidsWithFinalizer sets.Set[string], oldCounters *batch.JobStatus, podFailureCountByPolicyAction map[string]int, needsFlush bool) (*batch.Job, bool, error) {
1335+
func (jm *Controller) flushUncountedAndRemoveFinalizers(ctx context.Context, jobCtx *syncJobCtx, podsToRemoveFinalizer []*v1.Pod, uidsWithFinalizer sets.Set[types.UID], oldCounters *batch.JobStatus, podFailureCountByPolicyAction map[string]int, needsFlush bool) (*batch.Job, bool, error) {
13371336
logger := klog.FromContext(ctx)
13381337
var err error
13391338
if needsFlush {
@@ -1367,7 +1366,7 @@ func (jm *Controller) flushUncountedAndRemoveFinalizers(ctx context.Context, job
13671366
rmSucceded, rmErr = jm.removeTrackingFinalizerFromPods(ctx, jobKey, podsToRemoveFinalizer)
13681367
for i, p := range podsToRemoveFinalizer {
13691368
if rmSucceded[i] {
1370-
uidsWithFinalizer.Delete(string(p.UID))
1369+
uidsWithFinalizer.Delete(p.UID)
13711370
}
13721371
}
13731372
}
@@ -1388,7 +1387,7 @@ func (jm *Controller) flushUncountedAndRemoveFinalizers(ctx context.Context, job
13881387
// .status.uncountedTerminatedPods for which the finalizer was successfully
13891388
// removed and increments the corresponding status counters.
13901389
// Returns whether there was any status change.
1391-
func cleanUncountedPodsWithoutFinalizers(status *batch.JobStatus, uidsWithFinalizer sets.Set[string]) bool {
1390+
func cleanUncountedPodsWithoutFinalizers(status *batch.JobStatus, uidsWithFinalizer sets.Set[types.UID]) bool {
13921391
updated := false
13931392
uncountedStatus := status.UncountedTerminatedPods
13941393
newUncounted := filterInUncountedUIDs(uncountedStatus.Succeeded, uidsWithFinalizer)
@@ -1414,9 +1413,9 @@ func (jm *Controller) removeTrackingFinalizerFromPods(ctx context.Context, jobKe
14141413
logger := klog.FromContext(ctx)
14151414
errCh := make(chan error, len(pods))
14161415
succeeded := make([]bool, len(pods))
1417-
uids := make([]string, len(pods))
1416+
uids := make([]types.UID, len(pods))
14181417
for i, p := range pods {
1419-
uids[i] = string(p.UID)
1418+
uids[i] = p.UID
14201419
}
14211420
if jobKey != "" {
14221421
err := jm.finalizerExpectations.expectFinalizersRemoved(logger, jobKey, uids)
@@ -1435,7 +1434,7 @@ func (jm *Controller) removeTrackingFinalizerFromPods(ctx context.Context, jobKe
14351434
// In case of any failure, we don't expect a Pod update for the
14361435
// finalizer removed. Clear expectation now.
14371436
if jobKey != "" {
1438-
jm.finalizerExpectations.finalizerRemovalObserved(logger, jobKey, string(pod.UID))
1437+
jm.finalizerExpectations.finalizerRemovalObserved(logger, jobKey, pod.UID)
14391438
}
14401439
if !apierrors.IsNotFound(err) {
14411440
errCh <- err
@@ -1495,10 +1494,10 @@ func (jm *Controller) recordJobFinished(job *batch.Job, finishedCond *batch.JobC
14951494
return true
14961495
}
14971496

1498-
func filterInUncountedUIDs(uncounted []types.UID, include sets.Set[string]) []types.UID {
1497+
func filterInUncountedUIDs(uncounted []types.UID, include sets.Set[types.UID]) []types.UID {
14991498
var newUncounted []types.UID
15001499
for _, uid := range uncounted {
1501-
if include.Has(string(uid)) {
1500+
if include.Has(uid) {
15021501
newUncounted = append(newUncounted, uid)
15031502
}
15041503
}
@@ -1852,14 +1851,12 @@ func (jm *Controller) patchJob(ctx context.Context, job *batch.Job, data []byte)
18521851
// getValidPodsWithFilter returns the valid pods that pass the filter.
18531852
// Pods are valid if they have a finalizer or in uncounted set
18541853
// and, for Indexed Jobs, a valid completion index.
1855-
func getValidPodsWithFilter(jobCtx *syncJobCtx, uncounted sets.Set[string], filter func(*v1.Pod) bool) []*v1.Pod {
1854+
func getValidPodsWithFilter(jobCtx *syncJobCtx, uncounted sets.Set[types.UID], filter func(*v1.Pod) bool) []*v1.Pod {
18561855
var result []*v1.Pod
18571856
for _, p := range jobCtx.pods {
1858-
uid := string(p.UID)
1859-
18601857
// Pods that don't have a completion finalizer are in the uncounted set or
18611858
// have already been accounted for in the Job status.
1862-
if !hasJobTrackingFinalizer(p) || uncounted.Has(uid) || jobCtx.expectedRmFinalizers.Has(uid) {
1859+
if !hasJobTrackingFinalizer(p) || uncounted.Has(p.UID) || jobCtx.expectedRmFinalizers.Has(p.UID) {
18631860
continue
18641861
}
18651862
if isIndexedJob(jobCtx.job) {
@@ -1906,32 +1903,25 @@ func removeTrackingFinalizerPatch(pod *v1.Pod) []byte {
19061903
}
19071904

19081905
type uncountedTerminatedPods struct {
1909-
succeeded sets.Set[string]
1910-
failed sets.Set[string]
1906+
succeeded sets.Set[types.UID]
1907+
failed sets.Set[types.UID]
19111908
}
19121909

19131910
func newUncountedTerminatedPods(in batch.UncountedTerminatedPods) *uncountedTerminatedPods {
1914-
obj := uncountedTerminatedPods{
1915-
succeeded: make(sets.Set[string], len(in.Succeeded)),
1916-
failed: make(sets.Set[string], len(in.Failed)),
1917-
}
1918-
for _, v := range in.Succeeded {
1919-
obj.succeeded.Insert(string(v))
1920-
}
1921-
for _, v := range in.Failed {
1922-
obj.failed.Insert(string(v))
1911+
return &uncountedTerminatedPods{
1912+
succeeded: sets.New(in.Succeeded...),
1913+
failed: sets.New(in.Failed...),
19231914
}
1924-
return &obj
19251915
}
19261916

1927-
func (u *uncountedTerminatedPods) Succeeded() sets.Set[string] {
1917+
func (u *uncountedTerminatedPods) Succeeded() sets.Set[types.UID] {
19281918
if u == nil {
19291919
return nil
19301920
}
19311921
return u.succeeded
19321922
}
19331923

1934-
func (u *uncountedTerminatedPods) Failed() sets.Set[string] {
1924+
func (u *uncountedTerminatedPods) Failed() sets.Set[types.UID] {
19351925
if u == nil {
19361926
return nil
19371927
}

pkg/controller/job/job_controller_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1453,7 +1453,7 @@ func TestGetNewFinshedPods(t *testing.T) {
14531453
cases := map[string]struct {
14541454
job batch.Job
14551455
pods []*v1.Pod
1456-
expectedRmFinalizers sets.Set[string]
1456+
expectedRmFinalizers sets.Set[types.UID]
14571457
wantSucceeded int32
14581458
wantFailed int32
14591459
}{
@@ -1510,7 +1510,7 @@ func TestGetNewFinshedPods(t *testing.T) {
15101510
},
15111511
},
15121512
},
1513-
expectedRmFinalizers: sets.New("b", "f"),
1513+
expectedRmFinalizers: sets.New[types.UID]("b", "f"),
15141514
pods: []*v1.Pod{
15151515
buildPod().uid("a").phase(v1.PodSucceeded).Pod,
15161516
buildPod().uid("b").phase(v1.PodSucceeded).trackingFinalizer().Pod,
@@ -1575,7 +1575,7 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) {
15751575
job batch.Job
15761576
pods []*v1.Pod
15771577
finishedCond *batch.JobCondition
1578-
expectedRmFinalizers sets.Set[string]
1578+
expectedRmFinalizers sets.Set[types.UID]
15791579
needsFlush bool
15801580
statusUpdateErr error
15811581
podControlErr error
@@ -1686,7 +1686,7 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) {
16861686
},
16871687
},
16881688
},
1689-
expectedRmFinalizers: sets.New("c", "d", "g", "h"),
1689+
expectedRmFinalizers: sets.New[types.UID]("c", "d", "g", "h"),
16901690
pods: []*v1.Pod{
16911691
buildPod().uid("a").phase(v1.PodSucceeded).trackingFinalizer().Pod,
16921692
buildPod().uid("b").phase(v1.PodFailed).trackingFinalizer().Pod,
@@ -7655,11 +7655,11 @@ func TestFinalizersRemovedExpectations(t *testing.T) {
76557655
pods := append(newPodList(2, v1.PodSucceeded, job), newPodList(2, v1.PodFailed, job)...)
76567656
podInformer := sharedInformers.Core().V1().Pods().Informer()
76577657
podIndexer := podInformer.GetIndexer()
7658-
uids := sets.New[string]()
7658+
uids := sets.New[types.UID]()
76597659
for i := range pods {
76607660
clientset.Tracker().Add(pods[i])
76617661
podIndexer.Add(pods[i])
7662-
uids.Insert(string(pods[i].UID))
7662+
uids.Insert(pods[i].UID)
76637663
}
76647664
jobKey := testutil.GetKey(job, t)
76657665

@@ -7725,7 +7725,7 @@ func TestFinalizersRemovedExpectations(t *testing.T) {
77257725
t.Errorf("Deleting pod that had finalizer: %v", err)
77267726
}
77277727

7728-
uids = sets.New(string(pods[2].UID))
7728+
uids = sets.New(pods[2].UID)
77297729
var diff string
77307730
if err := wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) {
77317731
gotExpectedUIDs = manager.finalizerExpectations.getExpectedUIDs(jobKey)

pkg/controller/job/tracking_utils.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222

2323
batch "k8s.io/api/batch/v1"
2424
v1 "k8s.io/api/core/v1"
25+
"k8s.io/apimachinery/pkg/types"
2526
"k8s.io/apimachinery/pkg/util/sets"
2627
"k8s.io/client-go/tools/cache"
2728
"k8s.io/klog/v2"
@@ -40,7 +41,7 @@ var uidSetKeyFunc = func(obj interface{}) (string, error) {
4041
// uidTrackingExpectations to remember which UID it has seen/still waiting for.
4142
type uidSet struct {
4243
sync.RWMutex
43-
set sets.Set[string]
44+
set sets.Set[types.UID]
4445
key string
4546
}
4647

@@ -60,7 +61,7 @@ func (u *uidTrackingExpectations) getSet(controllerKey string) *uidSet {
6061
return nil
6162
}
6263

63-
func (u *uidTrackingExpectations) getExpectedUIDs(controllerKey string) sets.Set[string] {
64+
func (u *uidTrackingExpectations) getExpectedUIDs(controllerKey string) sets.Set[types.UID] {
6465
uids := u.getSet(controllerKey)
6566
if uids == nil {
6667
return nil
@@ -74,14 +75,14 @@ func (u *uidTrackingExpectations) getExpectedUIDs(controllerKey string) sets.Set
7475
// ExpectDeletions records expectations for the given deleteKeys, against the
7576
// given job-key.
7677
// This is thread-safe across different job keys.
77-
func (u *uidTrackingExpectations) expectFinalizersRemoved(logger klog.Logger, jobKey string, deletedKeys []string) error {
78+
func (u *uidTrackingExpectations) expectFinalizersRemoved(logger klog.Logger, jobKey string, deletedKeys []types.UID) error {
7879
logger.V(4).Info("Expecting tracking finalizers removed", "key", jobKey, "podUIDs", deletedKeys)
7980

8081
uids := u.getSet(jobKey)
8182
if uids == nil {
8283
uids = &uidSet{
8384
key: jobKey,
84-
set: sets.New[string](),
85+
set: sets.New[types.UID](),
8586
}
8687
if err := u.store.Add(uids); err != nil {
8788
return err
@@ -94,7 +95,7 @@ func (u *uidTrackingExpectations) expectFinalizersRemoved(logger klog.Logger, jo
9495
}
9596

9697
// FinalizerRemovalObserved records the given deleteKey as a deletion, for the given job.
97-
func (u *uidTrackingExpectations) finalizerRemovalObserved(logger klog.Logger, jobKey, deleteKey string) {
98+
func (u *uidTrackingExpectations) finalizerRemovalObserved(logger klog.Logger, jobKey string, deleteKey types.UID) {
9899
uids := u.getSet(jobKey)
99100
if uids != nil {
100101
uids.Lock()

pkg/controller/job/tracking_utils_test.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
batch "k8s.io/api/batch/v1"
2626
v1 "k8s.io/api/core/v1"
2727
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28+
"k8s.io/apimachinery/pkg/types"
2829
"k8s.io/apimachinery/pkg/util/sets"
2930
"k8s.io/component-base/metrics/testutil"
3031
"k8s.io/klog/v2/ktesting"
@@ -35,23 +36,23 @@ func TestUIDTrackingExpectations(t *testing.T) {
3536
logger, _ := ktesting.NewTestContext(t)
3637
tracks := []struct {
3738
job string
38-
firstRound []string
39-
secondRound []string
39+
firstRound []types.UID
40+
secondRound []types.UID
4041
}{
4142
{
4243
job: "foo",
43-
firstRound: []string{"a", "b", "c", "d"},
44-
secondRound: []string{"e", "f"},
44+
firstRound: []types.UID{"a", "b", "c", "d"},
45+
secondRound: []types.UID{"e", "f"},
4546
},
4647
{
4748
job: "bar",
48-
firstRound: []string{"x", "y", "z"},
49-
secondRound: []string{"u", "v", "w"},
49+
firstRound: []types.UID{"x", "y", "z"},
50+
secondRound: []types.UID{"u", "v", "w"},
5051
},
5152
{
5253
job: "baz",
53-
firstRound: []string{"w"},
54-
secondRound: []string{"a"},
54+
firstRound: []types.UID{"w"},
55+
secondRound: []types.UID{"a"},
5556
},
5657
}
5758
expectations := newUIDTrackingExpectations()

0 commit comments

Comments
 (0)