@@ -51,15 +51,28 @@ func chunkSizeFor(n, parallelism int) int {
51
51
return s
52
52
}
53
53
54
+ // numWorkersForChunkSize returns number of workers (goroutines)
55
+ // that will be created in workqueue.ParallelizeUntil
56
+ // for given parallelism, pieces and chunkSize values.
57
+ func numWorkersForChunkSize (parallelism , pieces , chunkSize int ) int {
58
+ chunks := (pieces + chunkSize - 1 ) / chunkSize
59
+ if chunks < parallelism {
60
+ return chunks
61
+ }
62
+ return parallelism
63
+ }
64
+
54
65
// Until is a wrapper around workqueue.ParallelizeUntil to use in scheduling algorithms.
55
66
// A given operation will be a label that is recorded in the goroutine metric.
56
67
func (p Parallelizer ) Until (ctx context.Context , pieces int , doWorkPiece workqueue.DoWorkPieceFunc , operation string ) {
68
+ chunkSize := chunkSizeFor (pieces , p .parallelism )
69
+ workers := numWorkersForChunkSize (p .parallelism , pieces , chunkSize )
70
+
57
71
goroutinesMetric := metrics .Goroutines .WithLabelValues (operation )
58
- withMetrics := func (piece int ) {
59
- goroutinesMetric .Inc ()
60
- doWorkPiece (piece )
61
- goroutinesMetric .Dec ()
62
- }
72
+ // Calling single Add with workers' count is more efficient than calling Inc or Dec per each work piece.
73
+ // This approach improves performance of some plugins (affinity, topology spreading) as well as preemption.
74
+ goroutinesMetric .Add (float64 (workers ))
75
+ defer goroutinesMetric .Add (float64 (- workers ))
63
76
64
- workqueue .ParallelizeUntil (ctx , p .parallelism , pieces , withMetrics , workqueue .WithChunkSize (chunkSizeFor ( pieces , p . parallelism ) ))
77
+ workqueue .ParallelizeUntil (ctx , p .parallelism , pieces , doWorkPiece , workqueue .WithChunkSize (chunkSize ))
65
78
}
0 commit comments