Skip to content

Commit e06a975

Browse files
Add cache to services reporting (#222)
1 parent a63feec commit e06a975

File tree

4 files changed

+254
-6
lines changed

4 files changed

+254
-6
lines changed

src/go.mod

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/mapper/pkg/config/config.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@ const (
3434
DNSClientIntentsUpdateIntervalDefault = 1 * time.Second
3535
DNSClientIntentsUpdateEnabledKey = "dns-client-intents-update-enabled"
3636
DNSClientIntentsUpdateEnabledDefault = true
37+
ServiceCacheTTLDurationKey = "service-cache-ttl-duration"
38+
ServiceCacheTTLDurationDefault = 1 * time.Minute
39+
ServiceCacheSizeKey = "service-cache-size"
40+
ServiceCacheSizeDefault = 10000
3741

3842
EnableIstioCollectionKey = "enable-istio-collection"
3943
EnableIstioCollectionDefault = false
@@ -71,5 +75,7 @@ func init() {
7175
viper.SetDefault(IstioCooldownIntervalKey, IstioCooldownIntervalDefault)
7276
viper.SetDefault(IstioRestrictCollectionToNamespace, "")
7377
viper.SetDefault(EnableIstioCollectionKey, EnableIstioCollectionDefault)
78+
viper.SetDefault(ServiceCacheTTLDurationKey, ServiceCacheTTLDurationDefault)
79+
viper.SetDefault(ServiceCacheSizeKey, ServiceCacheSizeDefault)
7480
excludedNamespaces = goset.FromSlice(viper.GetStringSlice(ExcludedNamespacesKey))
7581
}

src/mapper/pkg/resourcevisibility/svc_reconciler.go

Lines changed: 57 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,53 @@
11
package resourcevisibility
22

33
import (
4+
"bytes"
45
"context"
6+
"fmt"
7+
"github.com/hashicorp/golang-lru/v2/expirable"
58
"github.com/otterize/intents-operator/src/shared/errors"
69
"github.com/otterize/intents-operator/src/shared/injectablerecorder"
710
"github.com/otterize/network-mapper/src/mapper/pkg/cloudclient"
11+
"github.com/otterize/network-mapper/src/mapper/pkg/config"
812
"github.com/otterize/network-mapper/src/mapper/pkg/graph/model"
913
"github.com/samber/lo"
14+
"github.com/sirupsen/logrus"
15+
"github.com/spf13/viper"
16+
"hash/crc32"
1017
corev1 "k8s.io/api/core/v1"
1118
"k8s.io/client-go/tools/record"
1219
ctrl "sigs.k8s.io/controller-runtime"
1320
"sigs.k8s.io/controller-runtime/pkg/client"
1421
"sigs.k8s.io/controller-runtime/pkg/controller"
22+
"slices"
1523
"time"
1624
)
1725

1826
type ServiceReconciler struct {
1927
client.Client
2028
injectablerecorder.InjectableRecorder
21-
otterizeCloud cloudclient.CloudClient
22-
kubeFinder KubeFinder
29+
otterizeCloud cloudclient.CloudClient
30+
kubeFinder KubeFinder
31+
namespaceToReportedServicesCache *expirable.LRU[string, []byte]
2332
}
2433

2534
type KubeFinder interface {
2635
ResolveOtterizeIdentityForService(ctx context.Context, service *corev1.Service, now time.Time) (model.OtterizeServiceIdentity, bool, error)
2736
}
2837

38+
func OnEvict(key string, _ []byte) {
39+
logrus.WithField("namespace", key).Debug("key evicted from cache, you may change configuration to increase cache size or TTL")
40+
}
41+
2942
func NewServiceReconciler(client client.Client, otterizeCloudClient cloudclient.CloudClient, kubeFinder KubeFinder) *ServiceReconciler {
43+
size := viper.GetInt(config.ServiceCacheSizeKey)
44+
ttl := viper.GetDuration(config.ServiceCacheTTLDurationKey)
45+
cache := expirable.NewLRU[string, []byte](size, OnEvict, ttl)
3046
return &ServiceReconciler{
31-
Client: client,
32-
otterizeCloud: otterizeCloudClient,
33-
kubeFinder: kubeFinder,
47+
Client: client,
48+
otterizeCloud: otterizeCloudClient,
49+
kubeFinder: kubeFinder,
50+
namespaceToReportedServicesCache: cache,
3451
}
3552
}
3653

@@ -63,14 +80,49 @@ func (r *ServiceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
6380
return ctrl.Result{}, errors.Wrap(err)
6481
}
6582

83+
hashSum, err := r.getCachedValue(servicesToReport)
84+
if err != nil {
85+
return ctrl.Result{}, errors.Wrap(err)
86+
}
87+
88+
val, found := r.namespaceToReportedServicesCache.Get(namespace)
89+
if found && bytes.Equal(val, hashSum) {
90+
logrus.WithField("namespace", namespace).Debug("Skipping reporting of services in namespace due to cache")
91+
return ctrl.Result{}, nil
92+
}
93+
6694
err = r.otterizeCloud.ReportK8sServices(ctx, namespace, servicesToReport)
6795
if err != nil {
6896
return ctrl.Result{}, errors.Wrap(err)
6997
}
7098

99+
r.namespaceToReportedServicesCache.Add(namespace, hashSum)
100+
71101
return ctrl.Result{}, nil
72102
}
73103

104+
func (r *ServiceReconciler) getCachedValue(servicesToReport []cloudclient.K8sServiceInput) ([]byte, error) {
105+
values := lo.Map(servicesToReport, func(service cloudclient.K8sServiceInput, _ int) string {
106+
return reportedServicesCacheValuePart(service)
107+
})
108+
109+
slices.Sort(values)
110+
111+
hash := crc32.NewIEEE()
112+
for _, value := range values {
113+
_, err := hash.Write([]byte(value))
114+
if err != nil {
115+
return nil, errors.Wrap(err)
116+
}
117+
}
118+
hashSum := hash.Sum(nil)
119+
return hashSum, nil
120+
}
121+
122+
func reportedServicesCacheValuePart(service cloudclient.K8sServiceInput) string {
123+
return fmt.Sprintf("%s-%s-%s", service.ResourceName, service.OtterizeServer, service.Service.Spec.Type.Item)
124+
}
125+
74126
func (r *ServiceReconciler) convertToCloudServices(ctx context.Context, services []corev1.Service) ([]cloudclient.K8sServiceInput, error) {
75127
cloudServices := make([]cloudclient.K8sServiceInput, 0)
76128
for _, service := range services {

src/mapper/pkg/resourcevisibility/svc_reconciler_test.go

Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,196 @@ func (s *ServiceVisibilityTestSuite) TestServiceUpload() {
117117
res, err := s.reconciler.Reconcile(context.Background(), req)
118118
s.Require().NoError(err)
119119
s.Require().Equal(ctrl.Result{}, res)
120+
121+
s.k8sClient.EXPECT().List(gomock.Any(), gomock.Eq(&emptyList), gomock.Eq(client.InNamespace(testNamespace))).DoAndReturn(
122+
func(ctx context.Context, list *corev1.ServiceList, opts ...client.ListOption) error {
123+
list.Items = []corev1.Service{service}
124+
return nil
125+
})
126+
127+
s.kubeFinder.EXPECT().ResolveOtterizeIdentityForService(gomock.Any(), &service, gomock.Any()).Return(serviceIdentity, true, nil)
128+
129+
// Reconcile again should not upload cause re-upload due to caching
130+
res, err = s.reconciler.Reconcile(context.Background(), req)
131+
s.Require().NoError(err)
132+
s.Require().Equal(ctrl.Result{}, res)
133+
}
134+
135+
func (s *ServiceVisibilityTestSuite) TestServiceReUploadOnIdentityChange() {
136+
deploymentName := "my-server"
137+
service := corev1.Service{
138+
ObjectMeta: metav1.ObjectMeta{
139+
Name: "test-service",
140+
Namespace: testNamespace,
141+
},
142+
Spec: corev1.ServiceSpec{
143+
Type: corev1.ServiceTypeClusterIP,
144+
Selector: map[string]string{
145+
"app": deploymentName,
146+
},
147+
Ports: []corev1.ServicePort{
148+
{
149+
Port: 80,
150+
Protocol: corev1.ProtocolTCP,
151+
TargetPort: intstr.FromInt32(8080),
152+
},
153+
},
154+
},
155+
}
156+
157+
emptyList := corev1.ServiceList{}
158+
s.k8sClient.EXPECT().List(gomock.Any(), gomock.Eq(&emptyList), gomock.Eq(client.InNamespace(testNamespace))).DoAndReturn(
159+
func(ctx context.Context, list *corev1.ServiceList, opts ...client.ListOption) error {
160+
list.Items = []corev1.Service{service}
161+
return nil
162+
})
163+
164+
serviceIdentity := model.OtterizeServiceIdentity{
165+
Name: deploymentName,
166+
Namespace: testNamespace,
167+
PodOwnerKind: nil,
168+
KubernetesService: lo.ToPtr(service.Name),
169+
}
170+
s.kubeFinder.EXPECT().ResolveOtterizeIdentityForService(gomock.Any(), &service, gomock.Any()).Return(serviceIdentity, true, nil)
171+
172+
serviceInput := cloudclient.K8sServiceInput{
173+
OtterizeServer: deploymentName,
174+
Namespace: testNamespace,
175+
ResourceName: service.Name,
176+
Service: cloudclient.K8sResourceServiceInput{
177+
Spec: cloudclient.K8sResourceServiceSpecInput{
178+
Type: nilable.From(cloudclient.K8sServiceTypeClusterIp),
179+
Ports: []cloudclient.K8sServicePort{
180+
{
181+
Port: 80,
182+
Protocol: nilable.From(cloudclient.K8sPortProtocolTcp),
183+
TargetPort: nilable.From(cloudclient.IntOrStringInput{IntVal: nilable.From(8080), IsInt: true}),
184+
},
185+
},
186+
Selector: []cloudclient.SelectorKeyValueInput{{Key: nilable.From("app"), Value: nilable.From(deploymentName)}},
187+
},
188+
},
189+
}
190+
s.cloudClient.EXPECT().ReportK8sServices(gomock.Any(), testNamespace, gomock.Any()).DoAndReturn(
191+
func(ctx context.Context, namespace string, services []cloudclient.K8sServiceInput) error {
192+
s.Require().Len(services, 1)
193+
s.Require().Equal(serviceInput, services[0])
194+
return nil
195+
})
196+
197+
req := ctrl.Request{
198+
NamespacedName: client.ObjectKey{
199+
Namespace: testNamespace,
200+
Name: "endpoint-for-service",
201+
},
202+
}
203+
204+
res, err := s.reconciler.Reconcile(context.Background(), req)
205+
s.Require().NoError(err)
206+
s.Require().Equal(ctrl.Result{}, res)
207+
208+
s.k8sClient.EXPECT().List(gomock.Any(), gomock.Eq(&emptyList), gomock.Eq(client.InNamespace(testNamespace))).DoAndReturn(
209+
func(ctx context.Context, list *corev1.ServiceList, opts ...client.ListOption) error {
210+
list.Items = []corev1.Service{service}
211+
return nil
212+
})
213+
214+
newIdentity := model.OtterizeServiceIdentity{
215+
Name: "another-server",
216+
Namespace: testNamespace,
217+
PodOwnerKind: nil,
218+
KubernetesService: lo.ToPtr(service.Name),
219+
}
220+
221+
s.kubeFinder.EXPECT().ResolveOtterizeIdentityForService(gomock.Any(), &service, gomock.Any()).Return(newIdentity, true, nil)
222+
223+
newServiceInput := cloudclient.K8sServiceInput{
224+
OtterizeServer: "another-server",
225+
Namespace: testNamespace,
226+
ResourceName: service.Name,
227+
Service: cloudclient.K8sResourceServiceInput{
228+
Spec: cloudclient.K8sResourceServiceSpecInput{
229+
Type: nilable.From(cloudclient.K8sServiceTypeClusterIp),
230+
Ports: []cloudclient.K8sServicePort{
231+
{
232+
Port: 80,
233+
Protocol: nilable.From(cloudclient.K8sPortProtocolTcp),
234+
TargetPort: nilable.From(cloudclient.IntOrStringInput{IntVal: nilable.From(8080), IsInt: true}),
235+
},
236+
},
237+
Selector: []cloudclient.SelectorKeyValueInput{{Key: nilable.From("app"), Value: nilable.From(deploymentName)}},
238+
},
239+
},
240+
}
241+
242+
s.cloudClient.EXPECT().ReportK8sServices(gomock.Any(), testNamespace, gomock.Any()).DoAndReturn(
243+
func(ctx context.Context, namespace string, services []cloudclient.K8sServiceInput) error {
244+
s.Require().Len(services, 1)
245+
s.Require().Equal(newServiceInput, services[0])
246+
return nil
247+
})
248+
249+
res, err = s.reconciler.Reconcile(context.Background(), req)
250+
s.Require().NoError(err)
251+
s.Require().Equal(ctrl.Result{}, res)
252+
253+
nodePortService := corev1.Service{
254+
ObjectMeta: metav1.ObjectMeta{
255+
Name: "test-service",
256+
Namespace: testNamespace,
257+
},
258+
Spec: corev1.ServiceSpec{
259+
Type: corev1.ServiceTypeNodePort,
260+
Selector: map[string]string{
261+
"app": deploymentName,
262+
},
263+
Ports: []corev1.ServicePort{
264+
{
265+
Port: 80,
266+
Protocol: corev1.ProtocolTCP,
267+
TargetPort: intstr.FromInt32(8080),
268+
},
269+
},
270+
},
271+
}
272+
s.k8sClient.EXPECT().List(gomock.Any(), gomock.Eq(&emptyList), gomock.Eq(client.InNamespace(testNamespace))).DoAndReturn(
273+
func(ctx context.Context, list *corev1.ServiceList, opts ...client.ListOption) error {
274+
list.Items = []corev1.Service{nodePortService}
275+
return nil
276+
})
277+
278+
s.kubeFinder.EXPECT().ResolveOtterizeIdentityForService(gomock.Any(), &nodePortService, gomock.Any()).Return(newIdentity, true, nil)
279+
280+
nodePortServiceInput := cloudclient.K8sServiceInput{
281+
OtterizeServer: "another-server",
282+
Namespace: testNamespace,
283+
ResourceName: service.Name,
284+
Service: cloudclient.K8sResourceServiceInput{
285+
Spec: cloudclient.K8sResourceServiceSpecInput{
286+
Type: nilable.From(cloudclient.K8sServiceTypeNodePort),
287+
Ports: []cloudclient.K8sServicePort{
288+
{
289+
Port: 80,
290+
Protocol: nilable.From(cloudclient.K8sPortProtocolTcp),
291+
TargetPort: nilable.From(cloudclient.IntOrStringInput{IntVal: nilable.From(8080), IsInt: true}),
292+
},
293+
},
294+
Selector: []cloudclient.SelectorKeyValueInput{{Key: nilable.From("app"), Value: nilable.From(deploymentName)}},
295+
},
296+
},
297+
}
298+
299+
s.cloudClient.EXPECT().ReportK8sServices(gomock.Any(), testNamespace, gomock.Any()).DoAndReturn(
300+
func(ctx context.Context, namespace string, services []cloudclient.K8sServiceInput) error {
301+
s.Require().Len(services, 1)
302+
s.Require().Equal(nodePortServiceInput, services[0])
303+
return nil
304+
305+
})
306+
307+
res, err = s.reconciler.Reconcile(context.Background(), req)
308+
s.Require().NoError(err)
309+
s.Require().Equal(ctrl.Result{}, res)
120310
}
121311

122312
func (s *ServiceVisibilityTestSuite) TestUploadEmptyNamespaces() {

0 commit comments

Comments
 (0)