Skip to content

Commit 9522fb0

Browse files
authored
Merge pull request kubernetes#83233 from hprateek43/heapPackageMovement
Move heap into its own internal package
2 parents a8e8e54 + f462a31 commit 9522fb0

File tree

8 files changed

+79
-40
lines changed

8 files changed

+79
-40
lines changed

pkg/scheduler/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ filegroup(
102102
"//pkg/scheduler/factory:all-srcs",
103103
"//pkg/scheduler/framework:all-srcs",
104104
"//pkg/scheduler/internal/cache:all-srcs",
105+
"//pkg/scheduler/internal/heap:all-srcs",
105106
"//pkg/scheduler/internal/queue:all-srcs",
106107
"//pkg/scheduler/metrics:all-srcs",
107108
"//pkg/scheduler/nodeinfo:all-srcs",

pkg/scheduler/internal/heap/BUILD

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package(default_visibility = ["//visibility:public"])
2+
3+
load(
4+
"@io_bazel_rules_go//go:def.bzl",
5+
"go_library",
6+
"go_test",
7+
)
8+
9+
go_test(
10+
name = "go_default_test",
11+
srcs = ["heap_test.go"],
12+
embed = [":go_default_library"],
13+
)
14+
15+
go_library(
16+
name = "go_default_library",
17+
srcs = ["heap.go"],
18+
importpath = "k8s.io/kubernetes/pkg/scheduler/internal/heap",
19+
deps = [
20+
"//pkg/scheduler/metrics:go_default_library",
21+
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
22+
],
23+
)
24+
25+
filegroup(
26+
name = "package-srcs",
27+
srcs = glob(["**"]),
28+
tags = ["automanaged"],
29+
visibility = ["//visibility:private"],
30+
)
31+
32+
filegroup(
33+
name = "all-srcs",
34+
srcs = [":package-srcs"],
35+
tags = ["automanaged"],
36+
)

pkg/scheduler/util/heap.go renamed to pkg/scheduler/internal/heap/heap.go

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ limitations under the License.
1818
// as cache.heap, however, this heap does not perform synchronization. It leaves
1919
// synchronization to the SchedulingQueue.
2020

21-
package util
21+
package heap
2222

2323
import (
2424
"container/heap"
@@ -41,9 +41,9 @@ type itemKeyValue struct {
4141
obj interface{}
4242
}
4343

44-
// heapData is an internal struct that implements the standard heap interface
44+
// data is an internal struct that implements the standard heap interface
4545
// and keeps the data stored in the heap.
46-
type heapData struct {
46+
type data struct {
4747
// items is a map from key of the objects to the objects and their index.
4848
// We depend on the property that items in the map are in the queue and vice versa.
4949
items map[string]*heapItem
@@ -56,16 +56,16 @@ type heapData struct {
5656
// should be deterministic.
5757
keyFunc KeyFunc
5858
// lessFunc is used to compare two objects in the heap.
59-
lessFunc LessFunc
59+
lessFunc lessFunc
6060
}
6161

6262
var (
63-
_ = heap.Interface(&heapData{}) // heapData is a standard heap
63+
_ = heap.Interface(&data{}) // heapData is a standard heap
6464
)
6565

6666
// Less compares two objects and returns true if the first one should go
6767
// in front of the second one in the heap.
68-
func (h *heapData) Less(i, j int) bool {
68+
func (h *data) Less(i, j int) bool {
6969
if i > len(h.queue) || j > len(h.queue) {
7070
return false
7171
}
@@ -81,11 +81,11 @@ func (h *heapData) Less(i, j int) bool {
8181
}
8282

8383
// Len returns the number of items in the Heap.
84-
func (h *heapData) Len() int { return len(h.queue) }
84+
func (h *data) Len() int { return len(h.queue) }
8585

8686
// Swap implements swapping of two elements in the heap. This is a part of standard
8787
// heap interface and should never be called directly.
88-
func (h *heapData) Swap(i, j int) {
88+
func (h *data) Swap(i, j int) {
8989
h.queue[i], h.queue[j] = h.queue[j], h.queue[i]
9090
item := h.items[h.queue[i]]
9191
item.index = i
@@ -94,15 +94,15 @@ func (h *heapData) Swap(i, j int) {
9494
}
9595

9696
// Push is supposed to be called by heap.Push only.
97-
func (h *heapData) Push(kv interface{}) {
97+
func (h *data) Push(kv interface{}) {
9898
keyValue := kv.(*itemKeyValue)
9999
n := len(h.queue)
100100
h.items[keyValue.key] = &heapItem{keyValue.obj, n}
101101
h.queue = append(h.queue, keyValue.key)
102102
}
103103

104104
// Pop is supposed to be called by heap.Pop only.
105-
func (h *heapData) Pop() interface{} {
105+
func (h *data) Pop() interface{} {
106106
key := h.queue[len(h.queue)-1]
107107
h.queue = h.queue[0 : len(h.queue)-1]
108108
item, ok := h.items[key]
@@ -115,7 +115,7 @@ func (h *heapData) Pop() interface{} {
115115
}
116116

117117
// Peek is supposed to be called by heap.Peek only.
118-
func (h *heapData) Peek() interface{} {
118+
func (h *data) Peek() interface{} {
119119
if len(h.queue) > 0 {
120120
return h.items[h.queue[0]].obj
121121
}
@@ -127,7 +127,7 @@ func (h *heapData) Peek() interface{} {
127127
type Heap struct {
128128
// data stores objects and has a queue that keeps their ordering according
129129
// to the heap invariant.
130-
data *heapData
130+
data *data
131131
// metricRecorder updates the counter when elements of a heap get added or
132132
// removed, and it does nothing if it's nil
133133
metricRecorder metrics.MetricRecorder
@@ -239,15 +239,15 @@ func (h *Heap) Len() int {
239239
return len(h.data.queue)
240240
}
241241

242-
// NewHeap returns a Heap which can be used to queue up items to process.
243-
func NewHeap(keyFn KeyFunc, lessFn LessFunc) *Heap {
244-
return NewHeapWithRecorder(keyFn, lessFn, nil)
242+
// New returns a Heap which can be used to queue up items to process.
243+
func New(keyFn KeyFunc, lessFn lessFunc) *Heap {
244+
return NewWithRecorder(keyFn, lessFn, nil)
245245
}
246246

247-
// NewHeapWithRecorder wraps an optional metricRecorder to compose a Heap object.
248-
func NewHeapWithRecorder(keyFn KeyFunc, lessFn LessFunc, metricRecorder metrics.MetricRecorder) *Heap {
247+
// NewWithRecorder wraps an optional metricRecorder to compose a Heap object.
248+
func NewWithRecorder(keyFn KeyFunc, lessFn lessFunc, metricRecorder metrics.MetricRecorder) *Heap {
249249
return &Heap{
250-
data: &heapData{
250+
data: &data{
251251
items: map[string]*heapItem{},
252252
queue: []string{},
253253
keyFunc: keyFn,
@@ -256,3 +256,7 @@ func NewHeapWithRecorder(keyFn KeyFunc, lessFn LessFunc, metricRecorder metrics.
256256
metricRecorder: metricRecorder,
257257
}
258258
}
259+
260+
// lessFunc is a function that receives two items and returns true if the first
261+
// item should be placed before the second one when the list is sorted.
262+
type lessFunc = func(item1, item2 interface{}) bool

pkg/scheduler/util/heap_test.go renamed to pkg/scheduler/internal/heap/heap_test.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ limitations under the License.
1717
// This file was copied from client-go/tools/cache/heap.go and modified
1818
// for our non thread-safe heap
1919

20-
package util
20+
package heap
2121

2222
import (
2323
"testing"
@@ -44,7 +44,7 @@ func compareInts(val1 interface{}, val2 interface{}) bool {
4444

4545
// TestHeapBasic tests Heap invariant
4646
func TestHeapBasic(t *testing.T) {
47-
h := NewHeap(testHeapObjectKeyFunc, compareInts)
47+
h := New(testHeapObjectKeyFunc, compareInts)
4848
const amount = 500
4949
var i int
5050

@@ -67,7 +67,7 @@ func TestHeapBasic(t *testing.T) {
6767

6868
// Tests Heap.Add and ensures that heap invariant is preserved after adding items.
6969
func TestHeap_Add(t *testing.T) {
70-
h := NewHeap(testHeapObjectKeyFunc, compareInts)
70+
h := New(testHeapObjectKeyFunc, compareInts)
7171
h.Add(mkHeapObj("foo", 10))
7272
h.Add(mkHeapObj("bar", 1))
7373
h.Add(mkHeapObj("baz", 11))
@@ -97,7 +97,7 @@ func TestHeap_Add(t *testing.T) {
9797
// TestHeap_AddIfNotPresent tests Heap.AddIfNotPresent and ensures that heap
9898
// invariant is preserved after adding items.
9999
func TestHeap_AddIfNotPresent(t *testing.T) {
100-
h := NewHeap(testHeapObjectKeyFunc, compareInts)
100+
h := New(testHeapObjectKeyFunc, compareInts)
101101
h.AddIfNotPresent(mkHeapObj("foo", 10))
102102
h.AddIfNotPresent(mkHeapObj("bar", 1))
103103
h.AddIfNotPresent(mkHeapObj("baz", 11))
@@ -133,7 +133,7 @@ func TestHeap_AddIfNotPresent(t *testing.T) {
133133
// TestHeap_Delete tests Heap.Delete and ensures that heap invariant is
134134
// preserved after deleting items.
135135
func TestHeap_Delete(t *testing.T) {
136-
h := NewHeap(testHeapObjectKeyFunc, compareInts)
136+
h := New(testHeapObjectKeyFunc, compareInts)
137137
h.Add(mkHeapObj("foo", 10))
138138
h.Add(mkHeapObj("bar", 1))
139139
h.Add(mkHeapObj("bal", 31))
@@ -178,7 +178,7 @@ func TestHeap_Delete(t *testing.T) {
178178
// TestHeap_Update tests Heap.Update and ensures that heap invariant is
179179
// preserved after adding items.
180180
func TestHeap_Update(t *testing.T) {
181-
h := NewHeap(testHeapObjectKeyFunc, compareInts)
181+
h := New(testHeapObjectKeyFunc, compareInts)
182182
h.Add(mkHeapObj("foo", 10))
183183
h.Add(mkHeapObj("bar", 1))
184184
h.Add(mkHeapObj("bal", 31))
@@ -202,7 +202,7 @@ func TestHeap_Update(t *testing.T) {
202202

203203
// TestHeap_Get tests Heap.Get.
204204
func TestHeap_Get(t *testing.T) {
205-
h := NewHeap(testHeapObjectKeyFunc, compareInts)
205+
h := New(testHeapObjectKeyFunc, compareInts)
206206
h.Add(mkHeapObj("foo", 10))
207207
h.Add(mkHeapObj("bar", 1))
208208
h.Add(mkHeapObj("bal", 31))
@@ -222,7 +222,7 @@ func TestHeap_Get(t *testing.T) {
222222

223223
// TestHeap_GetByKey tests Heap.GetByKey and is very similar to TestHeap_Get.
224224
func TestHeap_GetByKey(t *testing.T) {
225-
h := NewHeap(testHeapObjectKeyFunc, compareInts)
225+
h := New(testHeapObjectKeyFunc, compareInts)
226226
h.Add(mkHeapObj("foo", 10))
227227
h.Add(mkHeapObj("bar", 1))
228228
h.Add(mkHeapObj("bal", 31))
@@ -241,7 +241,7 @@ func TestHeap_GetByKey(t *testing.T) {
241241

242242
// TestHeap_List tests Heap.List function.
243243
func TestHeap_List(t *testing.T) {
244-
h := NewHeap(testHeapObjectKeyFunc, compareInts)
244+
h := New(testHeapObjectKeyFunc, compareInts)
245245
list := h.List()
246246
if len(list) != 0 {
247247
t.Errorf("expected an empty list")

pkg/scheduler/internal/queue/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ go_library(
1313
"//pkg/scheduler/algorithm/predicates:go_default_library",
1414
"//pkg/scheduler/algorithm/priorities/util:go_default_library",
1515
"//pkg/scheduler/framework/v1alpha1:go_default_library",
16+
"//pkg/scheduler/internal/heap:go_default_library",
1617
"//pkg/scheduler/metrics:go_default_library",
1718
"//pkg/scheduler/util:go_default_library",
1819
"//staging/src/k8s.io/api/core/v1:go_default_library",

pkg/scheduler/internal/queue/scheduling_queue.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import (
4040
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
4141
priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util"
4242
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
43+
"k8s.io/kubernetes/pkg/scheduler/internal/heap"
4344
"k8s.io/kubernetes/pkg/scheduler/metrics"
4445
"k8s.io/kubernetes/pkg/scheduler/util"
4546
)
@@ -116,10 +117,10 @@ type PriorityQueue struct {
116117

117118
// activeQ is heap structure that scheduler actively looks at to find pods to
118119
// schedule. Head of heap is the highest priority pod.
119-
activeQ *util.Heap
120+
activeQ *heap.Heap
120121
// podBackoffQ is a heap ordered by backoff expiry. Pods which have completed backoff
121122
// are popped from this heap before the scheduler looks at activeQ
122-
podBackoffQ *util.Heap
123+
podBackoffQ *heap.Heap
123124
// unschedulableQ holds pods that have been tried and determined unschedulable.
124125
unschedulableQ *UnschedulablePodsMap
125126
// nominatedPods is a structures that stores pods which are nominated to run
@@ -183,13 +184,13 @@ func NewPriorityQueueWithClock(stop <-chan struct{}, clock util.Clock, fwk frame
183184
clock: clock,
184185
stop: stop,
185186
podBackoff: NewPodBackoffMap(1*time.Second, 10*time.Second),
186-
activeQ: util.NewHeapWithRecorder(podInfoKeyFunc, comp, metrics.NewActivePodsRecorder()),
187+
activeQ: heap.NewWithRecorder(podInfoKeyFunc, comp, metrics.NewActivePodsRecorder()),
187188
unschedulableQ: newUnschedulablePodsMap(metrics.NewUnschedulablePodsRecorder()),
188189
nominatedPods: newNominatedPodMap(),
189190
moveRequestCycle: -1,
190191
}
191192
pq.cond.L = &pq.lock
192-
pq.podBackoffQ = util.NewHeapWithRecorder(podInfoKeyFunc, pq.podsCompareBackoffCompleted, metrics.NewBackoffPodsRecorder())
193+
pq.podBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, pq.podsCompareBackoffCompleted, metrics.NewBackoffPodsRecorder())
193194

194195
pq.run()
195196

pkg/scheduler/util/BUILD

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ go_test(
1010
name = "go_default_test",
1111
srcs = [
1212
"error_channel_test.go",
13-
"heap_test.go",
1413
"utils_test.go",
1514
],
1615
embed = [":go_default_library"],
@@ -28,17 +27,14 @@ go_library(
2827
srcs = [
2928
"clock.go",
3029
"error_channel.go",
31-
"heap.go",
3230
"utils.go",
3331
],
3432
importpath = "k8s.io/kubernetes/pkg/scheduler/util",
3533
deps = [
3634
"//pkg/api/v1/pod:go_default_library",
3735
"//pkg/scheduler/apis/extender/v1:go_default_library",
38-
"//pkg/scheduler/metrics:go_default_library",
3936
"//staging/src/k8s.io/api/core/v1:go_default_library",
4037
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
41-
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
4238
"//vendor/k8s.io/klog:go_default_library",
4339
],
4440
)

pkg/scheduler/util/utils.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,10 @@ func GetPodStartTime(pod *v1.Pod) *metav1.Time {
6262
return &metav1.Time{Time: time.Now()}
6363
}
6464

65+
// lessFunc is a function that receives two items and returns true if the first
66+
// item should be placed before the second one when the list is sorted.
67+
type lessFunc = func(item1, item2 interface{}) bool
68+
6569
// GetEarliestPodStartTime returns the earliest start time of all pods that
6670
// have the highest priority among all victims.
6771
func GetEarliestPodStartTime(victims *extenderv1.Victims) *metav1.Time {
@@ -91,13 +95,9 @@ func GetEarliestPodStartTime(victims *extenderv1.Victims) *metav1.Time {
9195
// SortableList is a list that implements sort.Interface.
9296
type SortableList struct {
9397
Items []interface{}
94-
CompFunc LessFunc
98+
CompFunc lessFunc
9599
}
96100

97-
// LessFunc is a function that receives two items and returns true if the first
98-
// item should be placed before the second one when the list is sorted.
99-
type LessFunc func(item1, item2 interface{}) bool
100-
101101
var _ = sort.Interface(&SortableList{})
102102

103103
func (l *SortableList) Len() int { return len(l.Items) }

0 commit comments

Comments
 (0)