Skip to content

Commit 2af11e9

Browse files
committed
internal/kubernetes/kubernetes.go: lazy K8S API calls
Change-Id: I50815baeea52d0ff11bf85a1861c2d38cc0f3fd1
1 parent 8a7c0b8 commit 2af11e9

File tree

1 file changed

+209
-22
lines changed

1 file changed

+209
-22
lines changed

internal/kubernetes/kubernetes.go

Lines changed: 209 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -18,27 +18,171 @@ package kubernetes
1818
import (
1919
"context"
2020
"fmt"
21+
"sync"
22+
"time"
2123

2224
"github.com/eiffel-community/etos-api/internal/config"
2325
"github.com/sirupsen/logrus"
2426
v1 "k8s.io/api/batch/v1"
27+
corev1 "k8s.io/api/core/v1"
2528
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2629
"k8s.io/client-go/kubernetes"
2730
"k8s.io/client-go/rest"
2831
)
2932

33+
// Cache entry with TTL
34+
type cacheEntry struct {
35+
data interface{}
36+
timestamp time.Time
37+
}
38+
39+
// Cache for Kubernetes API responses with TTL
40+
type kubernetesCache struct {
41+
jobs sync.Map // map[string]*cacheEntry for job lists
42+
pods sync.Map // map[string]*cacheEntry for pod lists
43+
cacheTTL time.Duration // Cache validity duration (5 seconds)
44+
// Mutexes to prevent concurrent API calls for the same resource
45+
jobsMutex sync.Mutex
46+
podsMutex sync.Mutex
47+
}
48+
49+
// newKubernetesCache creates a new cache with 5 second cache validity
50+
func newKubernetesCache() *kubernetesCache {
51+
return &kubernetesCache{
52+
cacheTTL: 5 * time.Second,
53+
}
54+
}
55+
56+
// getAllJobs retrieves all jobs from cache or API, making API calls if data is older than 5 seconds
57+
func (c *kubernetesCache) getAllJobs(namespace string, fetcher func() (*v1.JobList, error), logger *logrus.Entry) (*v1.JobList, error) {
58+
// Use namespace as cache key since we're caching all jobs in the namespace
59+
key := fmt.Sprintf("all_jobs_%s", namespace)
60+
61+
// Nested function to check cache and return data if fresh
62+
checkCache := func() (*v1.JobList, bool) {
63+
if cached, ok := c.jobs.Load(key); ok {
64+
if entry, ok := cached.(*cacheEntry); ok {
65+
if time.Since(entry.timestamp) < c.cacheTTL {
66+
if jobs, ok := entry.data.(*v1.JobList); ok {
67+
return jobs, true
68+
}
69+
}
70+
}
71+
}
72+
return nil, false
73+
}
74+
75+
// Check cache first (fast path - no locking)
76+
if jobs, found := checkCache(); found {
77+
logger.Infof("Returning cached jobs for namespace: %s (age: %v, count: %d)", namespace, time.Since(getTimestamp(&c.jobs, key)), len(jobs.Items))
78+
return jobs, nil
79+
}
80+
81+
// Use mutex to prevent concurrent API calls
82+
c.jobsMutex.Lock()
83+
defer c.jobsMutex.Unlock()
84+
85+
// Double-check cache after acquiring mutex (another goroutine might have updated it)
86+
if jobs, found := checkCache(); found {
87+
logger.Infof("Returning cached jobs for namespace: %s (age: %v, count: %d) [double-check]", namespace, time.Since(getTimestamp(&c.jobs, key)), len(jobs.Items))
88+
return jobs, nil
89+
}
90+
91+
// Fetch from API if no cache entry exists or data is older than 5 seconds
92+
logger.Infof("Making Kubernetes API call to fetch all jobs for namespace: %s", namespace)
93+
jobs, err := fetcher()
94+
if err != nil {
95+
logger.Errorf("Failed to fetch jobs from Kubernetes API for namespace %s: %v", namespace, err)
96+
return nil, err
97+
}
98+
99+
// Store in cache
100+
c.jobs.Store(key, &cacheEntry{
101+
data: jobs,
102+
timestamp: time.Now(),
103+
})
104+
105+
logger.Infof("Successfully fetched and cached %d jobs for namespace: %s", len(jobs.Items), namespace)
106+
return jobs, nil
107+
}
108+
109+
// getAllPods retrieves all pods from cache or API, making API calls if data is older than 5 seconds
110+
func (c *kubernetesCache) getAllPods(namespace string, fetcher func() (*corev1.PodList, error), logger *logrus.Entry) (*corev1.PodList, error) {
111+
// Use namespace as cache key since we're caching all pods in the namespace
112+
key := fmt.Sprintf("all_pods_%s", namespace)
113+
114+
// Nested function to check cache and return data if fresh
115+
checkCache := func() (*corev1.PodList, bool) {
116+
if cached, ok := c.pods.Load(key); ok {
117+
if entry, ok := cached.(*cacheEntry); ok {
118+
if time.Since(entry.timestamp) < c.cacheTTL {
119+
if pods, ok := entry.data.(*corev1.PodList); ok {
120+
return pods, true
121+
}
122+
}
123+
}
124+
}
125+
return nil, false
126+
}
127+
128+
// Check cache first (fast path - no locking)
129+
if pods, found := checkCache(); found {
130+
logger.Infof("Returning cached pods for namespace: %s (age: %v, count: %d)", namespace, time.Since(getTimestamp(&c.pods, key)), len(pods.Items))
131+
return pods, nil
132+
}
133+
134+
// Use mutex to prevent concurrent API calls
135+
c.podsMutex.Lock()
136+
defer c.podsMutex.Unlock()
137+
138+
// Double-check cache after acquiring mutex (another goroutine might have updated it)
139+
if pods, found := checkCache(); found {
140+
logger.Infof("Returning cached pods for namespace: %s (age: %v, count: %d) [double-check]", namespace, time.Since(getTimestamp(&c.pods, key)), len(pods.Items))
141+
return pods, nil
142+
}
143+
144+
// Fetch from API if no cache entry exists or data is older than 5 seconds
145+
logger.Infof("Making Kubernetes API call to fetch all pods for namespace: %s", namespace)
146+
pods, err := fetcher()
147+
if err != nil {
148+
logger.Errorf("Failed to fetch pods from Kubernetes API for namespace %s: %v", namespace, err)
149+
return nil, err
150+
}
151+
152+
// Store in cache
153+
c.pods.Store(key, &cacheEntry{
154+
data: pods,
155+
timestamp: time.Now(),
156+
})
157+
158+
logger.Infof("Successfully fetched and cached %d pods for namespace: %s", len(pods.Items), namespace)
159+
return pods, nil
160+
}
161+
162+
// getTimestamp is a helper function to get the timestamp of a cache entry
163+
func getTimestamp(cache *sync.Map, key string) time.Time {
164+
if cached, ok := cache.Load(key); ok {
165+
if entry, ok := cached.(*cacheEntry); ok {
166+
return entry.timestamp
167+
}
168+
}
169+
return time.Time{}
170+
}
171+
30172
type Kubernetes struct {
31173
logger *logrus.Entry
32174
config *rest.Config
33175
client *kubernetes.Clientset
34176
namespace string
177+
cache *kubernetesCache
35178
}
36179

37180
// New creates a new Kubernetes struct.
38181
func New(cfg config.Config, log *logrus.Entry) *Kubernetes {
39182
return &Kubernetes{
40183
logger: log,
41184
namespace: cfg.ETOSNamespace(),
185+
cache: newKubernetesCache(),
42186
}
43187
}
44188

@@ -59,35 +203,66 @@ func (k *Kubernetes) clientset() (*kubernetes.Clientset, error) {
59203
}
60204
k.config = cfg
61205
}
206+
207+
// Log rate limiter settings before creating client
208+
if k.config.RateLimiter != nil {
209+
k.logger.Info("Kubernetes client has custom rate limiter configured")
210+
}
211+
212+
// Log QPS and Burst settings
213+
if k.config.QPS > 0 || k.config.Burst > 0 {
214+
k.logger.Infof("Kubernetes client rate limiter settings - QPS: %.2f, Burst: %d",
215+
k.config.QPS, k.config.Burst)
216+
} else {
217+
k.logger.Info("Kubernetes client using default rate limiter settings")
218+
}
219+
62220
cli, err := kubernetes.NewForConfig(k.config)
63221
if err != nil {
222+
k.logger.Errorf("Failed to create Kubernetes client: %v", err)
64223
return nil, err
65224
}
66225
k.client = cli
226+
k.logger.Info("Kubernetes client created successfully")
67227
return k.client, nil
68228
}
69229

70230
// getJobsByIdentifier returns a list of jobs bound to the given testrun identifier.
71231
func (k *Kubernetes) getJobsByIdentifier(ctx context.Context, client *kubernetes.Clientset, identifier string) (*v1.JobList, error) {
232+
// Get all jobs from cache or API
233+
allJobs, err := k.cache.getAllJobs(k.namespace, func() (*v1.JobList, error) {
234+
// Fetch all jobs in the namespace without label selector
235+
return client.BatchV1().Jobs(k.namespace).List(ctx, metav1.ListOptions{})
236+
}, k.logger)
237+
if err != nil {
238+
return nil, err
239+
}
240+
241+
// Filter jobs by identifier in-memory
242+
filteredJobs := &v1.JobList{
243+
TypeMeta: allJobs.TypeMeta,
244+
ListMeta: allJobs.ListMeta,
245+
Items: []v1.Job{},
246+
}
247+
72248
// Try different labels for backward compatibility:
73249
// - etos.eiffel-community.github.io/id is v1alpha+
74250
// - id is v0 legacy
75-
for _, label := range []string{"etos.eiffel-community.github.io/id", "id"} {
76-
jobs, err := client.BatchV1().Jobs(k.namespace).List(
77-
ctx,
78-
metav1.ListOptions{
79-
LabelSelector: fmt.Sprintf("%s=%s", label, identifier),
80-
},
81-
)
82-
if err != nil {
83-
k.logger.Error(err)
84-
return nil, err
85-
}
86-
if len(jobs.Items) > 0 {
87-
return jobs, nil
251+
labelKeys := []string{"etos.eiffel-community.github.io/id", "id"}
252+
253+
for _, job := range allJobs.Items {
254+
for _, labelKey := range labelKeys {
255+
if labelValue, exists := job.Labels[labelKey]; exists && labelValue == identifier {
256+
filteredJobs.Items = append(filteredJobs.Items, job)
257+
break // Found match, no need to check other labels for this job
258+
}
88259
}
89260
}
90-
return &v1.JobList{}, nil
261+
262+
k.logger.Infof("Filtered %d jobs with identifier '%s' from %d total jobs",
263+
len(filteredJobs.Items), identifier, len(allJobs.Items))
264+
265+
return filteredJobs, nil
91266
}
92267

93268
// IsFinished checks if an ESR job is finished.
@@ -130,18 +305,30 @@ func (k *Kubernetes) LogListenerIP(ctx context.Context, identifier string) (stri
130305
}
131306
job := jobs.Items[0]
132307

133-
pods, err := client.CoreV1().Pods(k.namespace).List(
134-
ctx,
135-
metav1.ListOptions{
136-
LabelSelector: fmt.Sprintf("job-name=%s", job.Name),
137-
},
138-
)
308+
// Get all pods from cache or API
309+
allPods, err := k.cache.getAllPods(k.namespace, func() (*corev1.PodList, error) {
310+
// Fetch all pods in the namespace without label selector
311+
return client.CoreV1().Pods(k.namespace).List(ctx, metav1.ListOptions{})
312+
}, k.logger)
139313
if err != nil {
140314
return "", err
141315
}
142-
if len(pods.Items) == 0 {
316+
317+
// Filter pods by job name in-memory
318+
var matchingPods []corev1.Pod
319+
for _, pod := range allPods.Items {
320+
if jobName, exists := pod.Labels["job-name"]; exists && jobName == job.Name {
321+
matchingPods = append(matchingPods, pod)
322+
}
323+
}
324+
325+
if len(matchingPods) == 0 {
143326
return "", fmt.Errorf("could not find pod for job with id %s", identifier)
144327
}
145-
pod := pods.Items[0]
328+
329+
k.logger.Infof("Found %d pods for job '%s' with identifier '%s'",
330+
len(matchingPods), job.Name, identifier)
331+
332+
pod := matchingPods[0]
146333
return pod.Status.PodIP, nil
147334
}

0 commit comments

Comments
 (0)