Skip to content

Commit 015d06b

Browse files
committed
Internal channels for scheduler
1 parent 60df45f commit 015d06b

File tree

10 files changed

+22
-15
lines changed

10 files changed

+22
-15
lines changed

pkg/scheduler/core/generic_scheduler.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ import (
2828
"time"
2929

3030
"k8s.io/klog"
31-
"k8s.io/kubernetes/pkg/scheduler/internal/parallelize"
3231

3332
v1 "k8s.io/api/core/v1"
3433
policy "k8s.io/api/policy/v1beta1"
@@ -40,6 +39,7 @@ import (
4039
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
4140
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
4241
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
42+
"k8s.io/kubernetes/pkg/scheduler/internal/parallelize"
4343
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
4444
"k8s.io/kubernetes/pkg/scheduler/listers"
4545
"k8s.io/kubernetes/pkg/scheduler/metrics"
@@ -438,7 +438,7 @@ func (g *genericScheduler) findNodesThatPassFilters(ctx context.Context, prof *p
438438
return filtered, nil
439439
}
440440

441-
errCh := util.NewErrorChannel()
441+
errCh := parallelize.NewErrorChannel()
442442
var statusesLock sync.Mutex
443443
var filteredLen int32
444444
ctx, cancel := context.WithCancel(ctx)

pkg/scheduler/framework/plugins/interpodaffinity/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ go_library(
1313
"//pkg/scheduler/framework/v1alpha1:go_default_library",
1414
"//pkg/scheduler/internal/parallelize:go_default_library",
1515
"//pkg/scheduler/listers:go_default_library",
16+
"//pkg/scheduler/nodeinfo:go_default_library",
1617
"//pkg/scheduler/types:go_default_library",
1718
"//pkg/scheduler/util:go_default_library",
1819
"//staging/src/k8s.io/api/core/v1:go_default_library",

pkg/scheduler/framework/plugins/interpodaffinity/filtering.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,14 @@ import (
2121
"fmt"
2222
"sync"
2323

24-
"k8s.io/api/core/v1"
24+
v1 "k8s.io/api/core/v1"
2525
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2626
"k8s.io/apimachinery/pkg/labels"
2727
"k8s.io/apimachinery/pkg/util/sets"
2828
"k8s.io/klog"
2929
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
3030
"k8s.io/kubernetes/pkg/scheduler/internal/parallelize"
31+
"k8s.io/kubernetes/pkg/scheduler/nodeinfo"
3132
schedulertypes "k8s.io/kubernetes/pkg/scheduler/types"
3233
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
3334
)
@@ -209,8 +210,8 @@ func podMatchesAllAffinityTerms(pod *v1.Pod, terms []*affinityTerm) bool {
209210
// getTPMapMatchingExistingAntiAffinity calculates the following for each existing pod on each node:
210211
// (1) Whether it has PodAntiAffinity
211212
// (2) Whether any AffinityTerm matches the incoming pod
212-
func getTPMapMatchingExistingAntiAffinity(pod *v1.Pod, allNodes []*schedulertypes.NodeInfo) (topologyToMatchedTermCount, error) {
213-
errCh := schedutil.NewErrorChannel()
213+
func getTPMapMatchingExistingAntiAffinity(pod *v1.Pod, allNodes []*nodeinfo.NodeInfo) (topologyToMatchedTermCount, error) {
214+
errCh := parallelize.NewErrorChannel()
214215
var lock sync.Mutex
215216
topologyMap := make(topologyToMatchedTermCount)
216217

pkg/scheduler/framework/plugins/interpodaffinity/scoring.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ func (pl *InterPodAffinity) PreScore(
231231
antiAffinityTerms: antiAffinityTerms,
232232
}
233233

234-
errCh := schedutil.NewErrorChannel()
234+
errCh := parallelize.NewErrorChannel()
235235
ctx, cancel := context.WithCancel(pCtx)
236236
processNode := func(i int) {
237237
nodeInfo := allNodes[i]

pkg/scheduler/framework/v1alpha1/BUILD

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ go_library(
1919
"//pkg/scheduler/listers:go_default_library",
2020
"//pkg/scheduler/metrics:go_default_library",
2121
"//pkg/scheduler/types:go_default_library",
22-
"//pkg/scheduler/util:go_default_library",
2322
"//staging/src/k8s.io/api/core/v1:go_default_library",
2423
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
2524
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",

pkg/scheduler/framework/v1alpha1/framework.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ import (
3535
schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers"
3636
"k8s.io/kubernetes/pkg/scheduler/metrics"
3737
schedulertypes "k8s.io/kubernetes/pkg/scheduler/types"
38-
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
3938
)
4039

4140
const (
@@ -510,7 +509,7 @@ func (f *framework) RunScorePlugins(ctx context.Context, state *CycleState, pod
510509
pluginToNodeScores[pl.Name()] = make(NodeScoreList, len(nodes))
511510
}
512511
ctx, cancel := context.WithCancel(ctx)
513-
errCh := schedutil.NewErrorChannel()
512+
errCh := parallelize.NewErrorChannel()
514513

515514
// Run Score method for each node in parallel.
516515
parallelize.Until(ctx, len(nodes), func(index int) {

pkg/scheduler/internal/parallelize/BUILD

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
1-
load("@io_bazel_rules_go//go:def.bzl", "go_library")
1+
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
22

33
go_library(
44
name = "go_default_library",
5-
srcs = ["parallelism.go"],
5+
srcs = [
6+
"error_channel.go",
7+
"parallelism.go",
8+
],
69
importpath = "k8s.io/kubernetes/pkg/scheduler/internal/parallelize",
710
visibility = ["//pkg/scheduler:__subpackages__"],
811
deps = ["//staging/src/k8s.io/client-go/util/workqueue:go_default_library"],
@@ -21,3 +24,9 @@ filegroup(
2124
tags = ["automanaged"],
2225
visibility = ["//visibility:public"],
2326
)
27+
28+
go_test(
29+
name = "go_default_test",
30+
srcs = ["error_channel_test.go"],
31+
embed = [":go_default_library"],
32+
)

pkg/scheduler/util/error_channel.go renamed to pkg/scheduler/internal/parallelize/error_channel.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
1414
limitations under the License.
1515
*/
1616

17-
package util
17+
package parallelize
1818

1919
import "context"
2020

pkg/scheduler/util/error_channel_test.go renamed to pkg/scheduler/internal/parallelize/error_channel_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
1414
limitations under the License.
1515
*/
1616

17-
package util
17+
package parallelize
1818

1919
import (
2020
"context"

pkg/scheduler/util/BUILD

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ load(
99
go_test(
1010
name = "go_default_test",
1111
srcs = [
12-
"error_channel_test.go",
1312
"non_zero_test.go",
1413
"topologies_test.go",
1514
"utils_test.go",
@@ -31,7 +30,6 @@ go_library(
3130
name = "go_default_library",
3231
srcs = [
3332
"clock.go",
34-
"error_channel.go",
3533
"non_zero.go",
3634
"topologies.go",
3735
"utils.go",

0 commit comments

Comments
 (0)