Skip to content

Commit 9eb27b0

Browse files
bound concurrent namespace scanning to less than k8s client QPS limit (#629)
1 parent fda38e9 commit 9eb27b0

File tree

2 files changed

+25
-2
lines changed

2 files changed

+25
-2
lines changed

internal/kube/kube.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,11 @@ import (
1515
"k8s.io/client-go/tools/clientcmd"
1616
)
1717

18+
const (
19+
clientQPS = 50
20+
clientBurst = 100
21+
)
22+
1823
// K8sEnvRequest represents the PUT request body to be sent to kosli from k8s
1924
type K8sEnvRequest struct {
2025
Artifacts []*PodData `json:"artifacts"`
@@ -86,8 +91,8 @@ func NewK8sClientSet(kubeconfigPath string) (*K8SConnection, error) {
8691

8792
// set the QPS and burst for the config to control the rate of requests to the API server
8893
// defaults are 5 QPS and 10 burst which is too low for large clusters
89-
config.QPS = 50
90-
config.Burst = 100
94+
config.QPS = clientQPS
95+
config.Burst = clientBurst
9196

9297
clientset, err := kubernetes.NewForConfig(config)
9398
if err != nil {
@@ -127,10 +132,15 @@ func (clientset *K8SConnection) GetPodsData(filter *filters.ResourceFilterOption
127132
ctx, cancel := context.WithCancel(context.Background())
128133
defer cancel() // Make sure it's called to release resources even if no errors
129134

135+
// semaphore to limit the number of concurrent requests to the API server
136+
sem := make(chan struct{}, (clientBurst/2)-1) // max concurrent requests: slightly lower than client's QPS to avoid throttling
137+
130138
for _, ns := range filteredNamespaces {
131139
wg.Add(1)
132140
go func(ns string) {
133141
defer wg.Done()
142+
sem <- struct{}{} // acquire the semaphore
143+
defer func() { <-sem }() // release the semaphore
134144
// Check if any error occurred in any other gorouties:
135145
select {
136146
case <-ctx.Done():

internal/kube/kube_test.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,19 @@ func (suite *KubeTestSuite) TestGetPodsData() {
192192
}
193193
}
194194

195+
func (suite *KubeTestSuite) TestGetPodsDataWithThrottling() {
196+
// create a large number of pods
197+
for i := 0; i < 200; i++ {
198+
suite.createNamespace(fmt.Sprintf("ns-%d", i))
199+
}
200+
// Get pods data with timeout check
201+
startTime := time.Now()
202+
_, err := suite.clientset.GetPodsData(&filters.ResourceFilterOptions{IncludeNamesRegex: []string{"^ns-.*"}}, logger.NewStandardLogger())
203+
duration := time.Since(startTime)
204+
require.NoErrorf(suite.Suite.T(), err, "error getting pods data for test GetPodsDataWithThrottling")
205+
require.LessOrEqual(suite.Suite.T(), duration, 5*time.Second, "GetPodsData should complete within 5 seconds, but took %v", duration)
206+
}
207+
195208
func (suite *KubeTestSuite) TestFilterNamespaces() {
196209
type args struct {
197210
nsList []corev1.Namespace

0 commit comments

Comments
 (0)