Skip to content

Commit ad52fa0

Browse files
authored
internal/kubernetes/kubernetes.go: lazy K8S API calls (eiffel-community#117)
1 parent 8a7c0b8 commit ad52fa0

File tree

1 file changed

+202
-22
lines changed

1 file changed

+202
-22
lines changed

internal/kubernetes/kubernetes.go

Lines changed: 202 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -18,27 +18,171 @@ package kubernetes
1818
import (
1919
"context"
2020
"fmt"
21+
"sync"
22+
"time"
2123

2224
"github.com/eiffel-community/etos-api/internal/config"
2325
"github.com/sirupsen/logrus"
2426
v1 "k8s.io/api/batch/v1"
27+
corev1 "k8s.io/api/core/v1"
2528
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2629
"k8s.io/client-go/kubernetes"
2730
"k8s.io/client-go/rest"
2831
)
2932

33+
// Cache entry with TTL
34+
type cacheEntry struct {
35+
data interface{}
36+
timestamp time.Time
37+
}
38+
39+
// Cache for Kubernetes API responses with TTL
40+
type kubernetesCache struct {
41+
jobs sync.Map // map[string]*cacheEntry for job lists
42+
pods sync.Map // map[string]*cacheEntry for pod lists
43+
cacheTTL time.Duration // Cache validity duration
44+
// Mutexes to prevent concurrent API calls for the same resource
45+
jobsMutex sync.Mutex
46+
podsMutex sync.Mutex
47+
}
48+
49+
// newKubernetesCache creates a new cache with configured cache validity
50+
func newKubernetesCache() *kubernetesCache {
51+
return &kubernetesCache{
52+
cacheTTL: 5 * time.Second,
53+
}
54+
}
55+
56+
// getAllJobs retrieves all jobs from cache or API, making API calls if cached data is stale
57+
func (c *kubernetesCache) getAllJobs(ctx context.Context, client *kubernetes.Clientset, namespace string, logger *logrus.Entry) (*v1.JobList, error) {
58+
// Use namespace as cache key since we're caching all jobs in the namespace
59+
key := fmt.Sprintf("all_jobs_%s", namespace)
60+
61+
// Nested function to check cache and return data if fresh
62+
checkCache := func() (*v1.JobList, bool) {
63+
if cached, ok := c.jobs.Load(key); ok {
64+
if entry, ok := cached.(*cacheEntry); ok {
65+
if time.Since(entry.timestamp) < c.cacheTTL {
66+
if jobs, ok := entry.data.(*v1.JobList); ok {
67+
return jobs, true
68+
}
69+
}
70+
}
71+
}
72+
return nil, false
73+
}
74+
75+
// Check cache first (fast path - no locking)
76+
if jobs, found := checkCache(); found {
77+
logger.Debugf("Returning cached jobs for namespace: %s (age: %v, count: %d)", namespace, time.Since(getTimestamp(&c.jobs, key)), len(jobs.Items))
78+
return jobs, nil
79+
}
80+
81+
// Use mutex to prevent concurrent API calls
82+
c.jobsMutex.Lock()
83+
defer c.jobsMutex.Unlock()
84+
85+
// Double-check cache after acquiring mutex (another goroutine might have updated it)
86+
if jobs, found := checkCache(); found {
87+
logger.Debugf("Returning cached jobs for namespace: %s (age: %v, count: %d) [double-check]", namespace, time.Since(getTimestamp(&c.jobs, key)), len(jobs.Items))
88+
return jobs, nil
89+
}
90+
91+
// Fetch from API if no cache entry exists or cached data is stale
92+
logger.Debugf("Making Kubernetes API call to fetch all jobs for namespace: %s", namespace)
93+
jobs, err := client.BatchV1().Jobs(namespace).List(ctx, metav1.ListOptions{})
94+
if err != nil {
95+
logger.Errorf("Failed to fetch jobs from Kubernetes API for namespace %s: %v", namespace, err)
96+
return nil, err
97+
}
98+
99+
// Store in cache
100+
c.jobs.Store(key, &cacheEntry{
101+
data: jobs,
102+
timestamp: time.Now(),
103+
})
104+
105+
logger.Debugf("Successfully fetched and cached %d jobs for namespace: %s", len(jobs.Items), namespace)
106+
return jobs, nil
107+
}
108+
109+
// getAllPods retrieves all pods from cache or API, making API calls if cached data is stale
110+
func (c *kubernetesCache) getAllPods(ctx context.Context, client *kubernetes.Clientset, namespace string, logger *logrus.Entry) (*corev1.PodList, error) {
111+
// Use namespace as cache key since we're caching all pods in the namespace
112+
key := fmt.Sprintf("all_pods_%s", namespace)
113+
114+
// Nested function to check cache and return data if fresh
115+
checkCache := func() (*corev1.PodList, bool) {
116+
if cached, ok := c.pods.Load(key); ok {
117+
if entry, ok := cached.(*cacheEntry); ok {
118+
if time.Since(entry.timestamp) < c.cacheTTL {
119+
if pods, ok := entry.data.(*corev1.PodList); ok {
120+
return pods, true
121+
}
122+
}
123+
}
124+
}
125+
return nil, false
126+
}
127+
128+
// Check cache first (fast path - no locking)
129+
if pods, found := checkCache(); found {
130+
logger.Debugf("Returning cached pods for namespace: %s (age: %v, count: %d)", namespace, time.Since(getTimestamp(&c.pods, key)), len(pods.Items))
131+
return pods, nil
132+
}
133+
134+
// Use mutex to prevent concurrent API calls
135+
c.podsMutex.Lock()
136+
defer c.podsMutex.Unlock()
137+
138+
// Double-check cache after acquiring mutex (another goroutine might have updated it)
139+
if pods, found := checkCache(); found {
140+
logger.Debugf("Returning cached pods for namespace: %s (age: %v, count: %d) [double-check]", namespace, time.Since(getTimestamp(&c.pods, key)), len(pods.Items))
141+
return pods, nil
142+
}
143+
144+
// Fetch from API if no cache entry exists or cached data is stale
145+
logger.Debugf("Making Kubernetes API call to fetch all pods for namespace: %s", namespace)
146+
pods, err := client.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{})
147+
if err != nil {
148+
logger.Errorf("Failed to fetch pods from Kubernetes API for namespace %s: %v", namespace, err)
149+
return nil, err
150+
}
151+
152+
// Store in cache
153+
c.pods.Store(key, &cacheEntry{
154+
data: pods,
155+
timestamp: time.Now(),
156+
})
157+
158+
logger.Debugf("Successfully fetched and cached %d pods for namespace: %s", len(pods.Items), namespace)
159+
return pods, nil
160+
}
161+
162+
// getTimestamp is a helper function to get the timestamp of a cache entry
163+
func getTimestamp(cache *sync.Map, key string) time.Time {
164+
if cached, ok := cache.Load(key); ok {
165+
if entry, ok := cached.(*cacheEntry); ok {
166+
return entry.timestamp
167+
}
168+
}
169+
return time.Time{}
170+
}
171+
30172
type Kubernetes struct {
31173
logger *logrus.Entry
32174
config *rest.Config
33175
client *kubernetes.Clientset
34176
namespace string
177+
cache *kubernetesCache
35178
}
36179

37180
// New creates a new Kubernetes struct.
38181
func New(cfg config.Config, log *logrus.Entry) *Kubernetes {
39182
return &Kubernetes{
40183
logger: log,
41184
namespace: cfg.ETOSNamespace(),
185+
cache: newKubernetesCache(),
42186
}
43187
}
44188

@@ -59,8 +203,23 @@ func (k *Kubernetes) clientset() (*kubernetes.Clientset, error) {
59203
}
60204
k.config = cfg
61205
}
206+
207+
// Log rate limiter settings before creating client
208+
if k.config.RateLimiter != nil {
209+
k.logger.Debug("Kubernetes client has custom rate limiter configured")
210+
}
211+
212+
// Log QPS and Burst settings
213+
if k.config.QPS > 0 || k.config.Burst > 0 {
214+
k.logger.Debugf("Kubernetes client rate limiter settings - QPS: %.2f, Burst: %d",
215+
k.config.QPS, k.config.Burst)
216+
} else {
217+
k.logger.Debug("Kubernetes client using default rate limiter settings")
218+
}
219+
62220
cli, err := kubernetes.NewForConfig(k.config)
63221
if err != nil {
222+
k.logger.Errorf("Failed to create Kubernetes client: %v", err)
64223
return nil, err
65224
}
66225
k.client = cli
@@ -69,25 +228,37 @@ func (k *Kubernetes) clientset() (*kubernetes.Clientset, error) {
69228

70229
// getJobsByIdentifier returns a list of jobs bound to the given testrun identifier.
71230
func (k *Kubernetes) getJobsByIdentifier(ctx context.Context, client *kubernetes.Clientset, identifier string) (*v1.JobList, error) {
231+
// Get all jobs from cache or API
232+
allJobs, err := k.cache.getAllJobs(ctx, client, k.namespace, k.logger)
233+
if err != nil {
234+
return nil, err
235+
}
236+
237+
// Filter jobs by identifier in-memory
238+
filteredJobs := &v1.JobList{
239+
TypeMeta: allJobs.TypeMeta,
240+
ListMeta: allJobs.ListMeta,
241+
Items: []v1.Job{},
242+
}
243+
72244
// Try different labels for backward compatibility:
73245
// - etos.eiffel-community.github.io/id is v1alpha+
74246
// - id is v0 legacy
75-
for _, label := range []string{"etos.eiffel-community.github.io/id", "id"} {
76-
jobs, err := client.BatchV1().Jobs(k.namespace).List(
77-
ctx,
78-
metav1.ListOptions{
79-
LabelSelector: fmt.Sprintf("%s=%s", label, identifier),
80-
},
81-
)
82-
if err != nil {
83-
k.logger.Error(err)
84-
return nil, err
85-
}
86-
if len(jobs.Items) > 0 {
87-
return jobs, nil
247+
labelKeys := []string{"etos.eiffel-community.github.io/id", "id"}
248+
249+
for _, job := range allJobs.Items {
250+
for _, labelKey := range labelKeys {
251+
if labelValue, exists := job.Labels[labelKey]; exists && labelValue == identifier {
252+
filteredJobs.Items = append(filteredJobs.Items, job)
253+
break // Found match, no need to check other labels for this job
254+
}
88255
}
89256
}
90-
return &v1.JobList{}, nil
257+
258+
k.logger.Debugf("Filtered %d jobs with identifier '%s' from %d total jobs",
259+
len(filteredJobs.Items), identifier, len(allJobs.Items))
260+
261+
return filteredJobs, nil
91262
}
92263

93264
// IsFinished checks if an ESR job is finished.
@@ -130,18 +301,27 @@ func (k *Kubernetes) LogListenerIP(ctx context.Context, identifier string) (stri
130301
}
131302
job := jobs.Items[0]
132303

133-
pods, err := client.CoreV1().Pods(k.namespace).List(
134-
ctx,
135-
metav1.ListOptions{
136-
LabelSelector: fmt.Sprintf("job-name=%s", job.Name),
137-
},
138-
)
304+
// Get all pods from cache or API
305+
allPods, err := k.cache.getAllPods(ctx, client, k.namespace, k.logger)
139306
if err != nil {
140307
return "", err
141308
}
142-
if len(pods.Items) == 0 {
309+
310+
// Filter pods by job name in-memory
311+
var matchingPods []corev1.Pod
312+
for _, pod := range allPods.Items {
313+
if jobName, exists := pod.Labels["job-name"]; exists && jobName == job.Name {
314+
matchingPods = append(matchingPods, pod)
315+
}
316+
}
317+
318+
if len(matchingPods) == 0 {
143319
return "", fmt.Errorf("could not find pod for job with id %s", identifier)
144320
}
145-
pod := pods.Items[0]
321+
322+
k.logger.Debugf("Found %d pods for job '%s' with identifier '%s'",
323+
len(matchingPods), job.Name, identifier)
324+
325+
pod := matchingPods[0]
146326
return pod.Status.PodIP, nil
147327
}

0 commit comments

Comments
 (0)