Skip to content

Commit 0b89a7c

Browse files
authored
Fixed a bug where the mapper cloud crash when using DNS based ClientIntents (#304)
1 parent be1fa44 commit 0b89a7c

File tree

5 files changed

+237
-34
lines changed

5 files changed

+237
-34
lines changed

run-e2e.sh

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
#!/bin/bash
2+
#
3+
#set -euo pipefail
4+
#
5+
## Required inputs (can also be passed as env vars)
6+
#MAPPER_TAG="06f15b3088aee47db853d9c321a7b97efc6534c4"
7+
#SNIFFER_TAG="06f15b3088aee47db853d9c321a7b97efc6534c4"
8+
#MAPPER_IMAGE="mapper"
9+
#SNIFFER_IMAGE="sniffer"
10+
#REGISTRY="us-central1-docker.pkg.dev/main-383408/otterize"
11+
#INSTALL_EXTRA_FLAGS=" --set kafkawatcher.enable=true --set kafkawatcher.kafkaServers={\"kafka-0.kafka\"} "
12+
#
13+
## Ensure required commands are available
14+
#for cmd in docker kubectl helm minikube jq ; do
15+
# command -v $cmd >/dev/null 2>&1 || { echo >&2 "$cmd is required but not installed."; exit 1; }
16+
#done
17+
#
18+
#echo ">> Starting minikube"
19+
#minikube start --cpus=4 --memory 4096 --disk-size 32g --cni=calico
20+
#
21+
#sleep 10
22+
#
23+
#echo ">> Waiting for Calico to be ready"
24+
#kubectl wait pods -n kube-system -l k8s-app=calico-kube-controllers --for condition=Ready --timeout=120s
25+
#kubectl wait pods -n kube-system -l k8s-app=calico-node --for condition=Ready --timeout=120s
26+
#
27+
#echo ">> Pulling Docker images"
28+
#docker pull "${REGISTRY}/mapper:${MAPPER_TAG}"
29+
#minikube image load "${REGISTRY}/mapper:${MAPPER_TAG}"
30+
#docker pull "${REGISTRY}/sniffer:${SNIFFER_TAG}"
31+
#minikube image load "${REGISTRY}/sniffer:${SNIFFER_TAG}"
32+
#
33+
#echo ">> Setting up Helm dependencies and deploying Network Mapper"
34+
#helm dep up ./helm-charts/network-mapper
35+
#helm install otterize ./helm-charts/network-mapper -n otterize-system --create-namespace \
36+
# --set debug=true \
37+
# --set-string mapper.repository="${REGISTRY}" \
38+
# --set-string mapper.image="${MAPPER_IMAGE}" \
39+
# --set-string mapper.tag="${MAPPER_TAG}" \
40+
# --set-string mapper.pullPolicy=Never \
41+
# --set-string sniffer.repository="${REGISTRY}" \
42+
# --set-string sniffer.image="${SNIFFER_IMAGE}" \
43+
# --set-string sniffer.tag="${SNIFFER_TAG}" \
44+
# --set-string sniffer.pullPolicy=Never \
45+
# --set global.telemetry.enabled=false \
46+
# $INSTALL_EXTRA_FLAGS
47+
#
48+
#
49+
#echo ">> Waiting for Otterize components to be ready"
50+
#kubectl wait pods -n otterize-system -l app=otterize-network-sniffer --for condition=Ready --timeout=90s
51+
#kubectl wait pods -n otterize-system -l app=otterize-network-mapper --for condition=Ready --timeout=90s
52+
#
53+
#echo ">> Deploying Kafka via Helm"
54+
#helm repo add otterize https://helm.otterize.com
55+
#helm repo update
56+
#helm install --create-namespace -n kafka -f https://docs.otterize.com/code-examples/kafka-mapping/helm/values.yaml kafka otterize/kafka --version 21.4.4
57+
#
58+
#echo ">> Deploying Kafka Tutorial Services"
59+
#kubectl apply -n otterize-tutorial-kafka-mapping -f https://docs.otterize.com/code-examples/kafka-mapping/all.yaml
60+
#
61+
#echo ">> Waiting for Kafka and Tutorial Services"
62+
#sleep 10
63+
#kubectl wait pods -n kafka -l app.kubernetes.io/component=kafka --for condition=Ready --timeout=180s
64+
#kubectl wait pods -n kafka -l app.kubernetes.io/component=zookeeper --for condition=Ready --timeout=180s
65+
#kubectl wait pods -n otterize-tutorial-kafka-mapping -l app=client --for condition=Ready --timeout=90s
66+
#kubectl wait pods -n otterize-tutorial-kafka-mapping -l app=client-2 --for condition=Ready --timeout=90s
67+
#
68+
#
69+
#echo ">> Waiting for intents to be discovered..."
70+
#for i in {1..5}; do
71+
#
72+
# OUTPUT_JSON=`otterize network-mapper export --telemetry-enabled=false -n otterize-tutorial-kafka-mapping --format=json`
73+
# if [ `echo "$OUTPUT_JSON" | jq ". | length"` != 2 ] || [ `echo "$OUTPUT_JSON" | jq '[.[] | select(.spec.targets[] | has("kafka"))] | length'` != 2 ] ; then
74+
# echo "wait for discovered intents";
75+
# echo _SNIFFER LOGS_
76+
# kubectl logs --since=15s -n otterize-system -l app=otterize-network-sniffer
77+
# echo _MAPPER LOGS_
78+
# kubectl logs --since=15s -n otterize-system -l app=otterize-network-mapper
79+
# sleep 10 ;
80+
# fi
81+
#
82+
## intents_count=$(otterize network-mapper export --telemetry-enabled=false -n otterize-tutorial-kafka-mapping --format=json | jq length)
83+
## if [ "$intents_count" -eq 2 ]; then
84+
## echo "Intents discovered"
85+
## break
86+
## fi
87+
## echo "Waiting... ($i)"
88+
## echo "_SNIFFER LOGS_"
89+
## kubectl logs --since=15s -n otterize-system -l app=otterize-network-sniffer
90+
## echo "_MAPPER LOGS_"
91+
## kubectl logs --since=15s -n otterize-system -l app=otterize-network-mapper
92+
## sleep 10
93+
#done
94+
#
95+
#echo ">> Outputting final logs"
96+
#kubectl logs -n otterize-system -l app=otterize-network-sniffer --tail=-1
97+
#kubectl logs -n otterize-system -l app=otterize-network-mapper --tail=-1
98+
#
99+
#echo ">> Exporting and comparing discovered intents"
100+
#
101+
102+
echo "export intents and compare to expected file"
103+
INTENTS_JSON=`otterize network-mapper export --telemetry-enabled=false -n otterize-tutorial-kafka-mapping --format=json`
104+
echo "1"
105+
echo $INTENTS_JSON
106+
INTENTS_JSON_NO_KIND=`echo "$INTENTS_JSON" | jq 'map(del(.spec.workload.kind))'`
107+
echo "2"
108+
echo $INTENTS_JSON_NO_KIND
109+
INTENTS_JSON_NO_KIND_AND_SORTED=`echo "$INTENTS_JSON_NO_KIND" | jq 'sort_by(.metadata.namespace + .metadata.name) | map(.spec.targets |= (sort_by(keys_unsorted[0]) | map(if .kafka? then .kafka.topics |= map(.operations |= sort) else . end)))'`
110+
echo "3"
111+
echo $INTENTS_JSON_NO_KIND_AND_SORTED
112+
echo "$INTENTS_JSON_NO_KIND_AND_SORTED" > /tmp/intents.json
113+
#echo "expected" && cat .github/workflows/tests-expected-results/kafka-tutorial-intents.json
114+
#echo "actual" && cat /tmp/intents.json
115+
#diff .github/workflows/tests-expected-results/kafka-tutorial-intents.json /tmp/intents.json
116+
117+
#
118+
#otterize network-mapper export --telemetry-enabled=false -n otterize-tutorial-kafka-mapping --format=json | jq \
119+
#'sort_by(.metadata.namespace, .metadata.name) | map(
120+
# .spec.targets |= (sort_by(keys[0])
121+
# | map(
122+
# if .kafka?
123+
# then .kafka.topics |= map(
124+
# .operations |= sort
125+
# )
126+
# else .
127+
# end
128+
# )
129+
# )
130+
# )' > /tmp/intents.json
131+
#
132+
#diff .github/workflows/tests-expected-results/kafka-tutorial-intents.json /tmp/intents.json && echo "Test passed" || {
133+
# echo "Test failed"
134+
# echo "Expected:"
135+
# cat .github/workflows/tests-expected-results/kafka-tutorial-intents.json
136+
# echo "Actual:"
137+
# cat /tmp/intents.json
138+
# exit 1
139+
#}

src/mapper/pkg/dnscache/dns_cache.go

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package dnscache
33
import (
44
"context"
55
"github.com/otterize/network-mapper/src/mapper/pkg/config"
6+
"github.com/otterize/network-mapper/src/mapper/pkg/dnscache/ttl_cache"
67
"github.com/sirupsen/logrus"
78
"github.com/spf13/viper"
89
"net"
@@ -11,7 +12,7 @@ import (
1112
)
1213

1314
type DNSCache struct {
14-
cache *TTLCache[string, string]
15+
cache *ttl_cache.TTLCache[string, string]
1516
}
1617

1718
type Resolver interface {
@@ -23,7 +24,7 @@ func NewDNSCache() *DNSCache {
2324
if capacity == 0 {
2425
logrus.Panic("Capacity cannot be 0")
2526
}
26-
dnsRecordCache := NewTTLCache[string, string](capacity)
27+
dnsRecordCache := ttl_cache.NewTTLCache[string, string](capacity)
2728

2829
return &DNSCache{
2930
cache: dnsRecordCache,
@@ -41,18 +42,9 @@ func (d *DNSCache) GetResolvedIPs(dnsName string) []string {
4142

4243
func (d *DNSCache) GetResolvedIPsForWildcard(dnsName string) []string {
4344
dnsSuffix := strings.TrimPrefix(dnsName, "*") // Strip the wildcard, leave the '.example.com' suffix
44-
result := make([]string, 0)
45-
for entry := range d.cache.items {
46-
if strings.HasSuffix(entry, dnsSuffix) {
47-
// Calling cache.Get() to utilize the LRU instead of iterating over the value too
48-
result = append(result, d.cache.Get(entry)...)
49-
}
50-
}
51-
return result
52-
}
45+
result := d.cache.Filter(func(key string) bool {
46+
return strings.HasSuffix(key, dnsSuffix)
47+
})
5348

54-
// CacheValue holds the value and its expiration time
55-
type CacheValue[V any] struct {
56-
Value V
57-
Expiration time.Time
49+
return result
5850
}

src/mapper/pkg/dnscache/dns_cache_test.go

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -66,24 +66,6 @@ func (s *DNSCacheTestSuite) TestCapacityConfig() {
6666
}
6767
}
6868

69-
func (s *DNSCacheTestSuite) TestTTL() {
70-
cache := NewDNSCache()
71-
72-
cache.AddOrUpdateDNSData("my-future-blog.de", IP1, 1*time.Second)
73-
ips := cache.GetResolvedIPs("my-future-blog.de")
74-
s.Require().Len(ips, 1)
75-
s.Require().Equal(IP1, ips[0])
76-
77-
// This is the only place where we sleep in the test, to make sure the TTL works as expected
78-
time.Sleep(2 * time.Second)
79-
80-
cache.cache.cleanupExpired()
81-
82-
ips = cache.GetResolvedIPs("my-future-blog.de")
83-
s.Require().Len(ips, 0)
84-
85-
}
86-
8769
func (s *DNSCacheTestSuite) TestWildcardIP() {
8870
cache := NewDNSCache()
8971
cache.AddOrUpdateDNSData("www.surf-forecast.com", IP1, 60*time.Second)

src/mapper/pkg/dnscache/ttl_cache.go renamed to src/mapper/pkg/dnscache/ttl_cache/ttl_cache.go

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,19 @@
1-
package dnscache
1+
package ttl_cache
22

33
import (
44
"container/list"
55
"sync"
66
"time"
77
)
88

9+
type Predicate[K comparable] func(key K) bool
10+
11+
// CacheValue holds the value and its expiration time
12+
type CacheValue[V any] struct {
13+
Value V
14+
Expiration time.Time
15+
}
16+
917
// CacheEntry represents an entry in the cache, linking the key with its list element for LRU
1018
type CacheEntry[K comparable, V comparable] struct {
1119
Key K
@@ -95,6 +103,10 @@ func (c *TTLCache[K, V]) Get(key K) []V {
95103
c.mu.Lock()
96104
defer c.mu.Unlock()
97105

106+
return c.getUnsafe(key)
107+
}
108+
109+
func (c *TTLCache[K, V]) getUnsafe(key K) []V {
98110
// Check if the key exists
99111
if _, exists := c.items[key]; !exists {
100112
return make([]V, 0)
@@ -145,6 +157,22 @@ func (c *TTLCache[K, V]) cleanupExpired() {
145157
}
146158
}
147159

160+
// Filter returns all the values that matches the predicator and removes any expired values
161+
func (c *TTLCache[K, V]) Filter(predicate Predicate[K]) []V {
162+
c.mu.Lock()
163+
defer c.mu.Unlock()
164+
165+
result := make([]V, 0)
166+
for key, _ := range c.items {
167+
if predicate(key) {
168+
// Calling `getUnsafe` to utilize the LRU instead of iterating over the value too
169+
result = append(result, c.getUnsafe(key)...)
170+
}
171+
}
172+
173+
return result
174+
}
175+
148176
// lruValueExpiration gets the expiration time for a given LRU element
149177
func (c *TTLCache[K, V]) lruValueExpiration(elem *list.Element) time.Time {
150178
cacheEntry := elem.Value.(CacheEntry[K, V])
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package ttl_cache
2+
3+
import (
4+
"fmt"
5+
"github.com/stretchr/testify/suite"
6+
"testing"
7+
"time"
8+
)
9+
10+
type TTLCacheTestSuite struct {
11+
suite.Suite
12+
}
13+
14+
func (s *TTLCacheTestSuite) TestCacheFilterWhileWrite() {
15+
cache := NewTTLCache[string, string](100)
16+
stop := make(chan struct{})
17+
18+
go func() {
19+
// Intensive write to the cache
20+
for {
21+
cache.Insert("example.com", fmt.Sprintf("192.0.2.%d", time.Now().UnixNano()%255), 5*time.Second)
22+
time.Sleep(1 * time.Millisecond)
23+
}
24+
}()
25+
26+
go func() {
27+
// Iterating over the cache
28+
for {
29+
_ = cache.Filter(func(key string) bool {
30+
return true
31+
})
32+
time.Sleep(1 * time.Millisecond)
33+
}
34+
}()
35+
36+
// Let them race for 15 seconds
37+
// Unfortunately, there isn't a way to make sure that they won't race (which will yield fatal error: concurrent map iteration and map write)
38+
// so we hope that 15 seconds are good enough interval to reproduce the error if it exists
39+
time.Sleep(15 * time.Second)
40+
close(stop)
41+
}
42+
43+
func (s *TTLCacheTestSuite) TestTTL() {
44+
cache := NewTTLCache[string, string](100)
45+
46+
cache.Insert("my-future-blog.de", "ip1", 1*time.Second)
47+
ips := cache.Get("my-future-blog.de")
48+
s.Require().Len(ips, 1)
49+
s.Require().Equal("ip1", ips[0])
50+
51+
// This is the only place where we sleep in the test, to make sure the TTL works as expected
52+
time.Sleep(2 * time.Second)
53+
54+
cache.cleanupExpired()
55+
56+
ips = cache.Get("my-future-blog.de")
57+
s.Require().Len(ips, 0)
58+
}
59+
60+
func TestTTLCacheTestSuite(t *testing.T) {
61+
suite.Run(t, new(TTLCacheTestSuite))
62+
}

0 commit comments

Comments
 (0)