Skip to content

Commit 34a6722

Browse files
committed
add scalability tests for ipcache
1 parent 81d08d9 commit 34a6722

File tree

3 files changed

+172
-1
lines changed

3 files changed

+172
-1
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ clean:
3737
rm -rf "$(OUT_DIR)/"
3838

3939
test:
40-
CGO_ENABLED=1 go test -v -race -count 1 ./...
40+
CGO_ENABLED=1 go test -short -v -race -count 1 ./...
4141

4242
lint:
4343
hack/lint.sh

pkg/ipcache/etcd.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ func NewEtcdStore(listenURL, etcdDir string, opts ...EtcdOption) (*EtcdStore, er
110110
cfg := embed.NewConfig()
111111
cfg.Dir = etcdDir
112112
cfg.LogLevel = "error"
113+
cfg.UnsafeNoFsync = true
113114
// Disable peer communication as we're running a single-member cluster.
114115
cfg.ListenPeerUrls = []url.URL{}
115116

pkg/ipcache/integration_test.go

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,13 @@ import (
3232
"net"
3333
"os"
3434
"path/filepath"
35+
"runtime"
3536
"strconv"
3637
"sync"
3738
"testing"
3839
"time"
3940

41+
"golang.org/x/time/rate"
4042
"sigs.k8s.io/kube-network-policies/pkg/api"
4143
)
4244

@@ -744,3 +746,171 @@ func TestServerProxyClientIntegration(t *testing.T) {
744746
return !found
745747
}, 15*time.Second)
746748
}
749+
750+
// Helper to calculate the size of a directory
751+
func dirSize(path string) (int64, error) {
752+
var size int64
753+
err := filepath.Walk(path, func(_ string, info os.FileInfo, err error) error {
754+
if err != nil {
755+
return err
756+
}
757+
if !info.IsDir() {
758+
size += info.Size()
759+
}
760+
return err
761+
})
762+
return size, err
763+
}
764+
765+
// TestScalability measures propagation delay, memory, and disk usage.
766+
func TestScalability(t *testing.T) {
767+
if testing.Short() {
768+
t.Skip("skipping test in short mode.")
769+
}
770+
const (
771+
numClients = 1
772+
numNamespaces = 10
773+
numPodsPerNamespace = 1000000
774+
totalPods = numNamespaces * numPodsPerNamespace
775+
qps = 1024
776+
)
777+
778+
ctx, cancel := context.WithCancel(context.Background())
779+
defer cancel()
780+
781+
t.Log("Setting up server")
782+
server, _, cleanup := setupTest(t, ctx)
783+
defer cleanup()
784+
listenURL := server.listenURL
785+
786+
t.Log("Setting up clients")
787+
clients := make([]*Client, numClients)
788+
for i := 0; i < numClients; i++ {
789+
dbDir := filepath.Join(t.TempDir(), fmt.Sprintf("ipcache%d.bolt", i))
790+
boltStore, err := NewBoltStore(dbDir)
791+
if err != nil {
792+
t.Fatalf("Failed to create bolt store: %v", err)
793+
}
794+
lruStore := NewLRUStore(boltStore, totalPods*2)
795+
client, err := NewClient(context.Background(), listenURL, nil, lruStore, boltStore, fmt.Sprintf("node-%d", i))
796+
if err != nil {
797+
t.Fatalf("Failed to create client %d: %v", i, err)
798+
}
799+
defer client.Close()
800+
clients[i] = client
801+
}
802+
803+
t.Log("Create namespaces and pods")
804+
ips := make([]string, totalPods)
805+
podInfos := make([]*api.PodInfo, totalPods)
806+
for i := 0; i < numNamespaces; i++ {
807+
ns := fmt.Sprintf("ns%d", i)
808+
for j := 0; j < numPodsPerNamespace; j++ {
809+
podIndex := i*numPodsPerNamespace + j
810+
podName := fmt.Sprintf("pod-%d", podIndex)
811+
ip := make(net.IP, 4)
812+
binary.BigEndian.PutUint32(ip, uint32(podIndex))
813+
ips[podIndex] = ip.String()
814+
podInfos[podIndex] = &api.PodInfo{Name: podName,
815+
Labels: map[string]string{"foo": "bar"},
816+
Namespace: &api.Namespace{
817+
Name: ns,
818+
Labels: map[string]string{"foo": "bar"},
819+
},
820+
}
821+
}
822+
}
823+
824+
t.Log("Measure initial state")
825+
var beforeMemStats runtime.MemStats
826+
runtime.ReadMemStats(&beforeMemStats)
827+
serverDirSize, err := dirSize(server.etcd.Config().Dir)
828+
if err != nil {
829+
t.Fatalf("Failed to get server dir size: %v", err)
830+
}
831+
t.Logf("Initial server disk usage: %.2f KB", float64(serverDirSize)/1024)
832+
833+
// 5. Upsert all pods and measure propagation delay
834+
var totalPropagationDelay time.Duration
835+
var propagationDelayCount int64
836+
var mu sync.Mutex
837+
var wg sync.WaitGroup
838+
rateLimiter := rate.NewLimiter(rate.Limit(qps), 1)
839+
t.Log("Insert all pods")
840+
841+
for i := 0; i < totalPods; i++ {
842+
err := rateLimiter.WaitN(ctx, 1)
843+
if err != nil {
844+
t.Logf("Rate limiter error: %v", err)
845+
continue
846+
}
847+
wg.Add(1)
848+
go func(i int) {
849+
defer wg.Done()
850+
startTime := time.Now()
851+
if err := server.Upsert(ips[i], podInfos[i]); err != nil {
852+
t.Errorf("Failed to upsert record: %v", err)
853+
return
854+
}
855+
856+
// Wait for the first client to receive the update
857+
waitForCondition(t, "client did not sync upsert", func() bool {
858+
_, found := clients[0].GetPodInfoByIP(ips[i])
859+
return found
860+
}, 10*time.Second)
861+
862+
mu.Lock()
863+
totalPropagationDelay += time.Since(startTime)
864+
propagationDelayCount++
865+
mu.Unlock()
866+
t.Logf("Inserted pod %d", i)
867+
}(i)
868+
}
869+
wg.Wait()
870+
871+
t.Log("Wait for all clients to sync")
872+
for i, client := range clients {
873+
waitForCondition(t, fmt.Sprintf("client %d did not sync all records", i), func() bool {
874+
list, err := client.List()
875+
if err != nil {
876+
t.Errorf("client %d failed to list records: %v", i, err)
877+
return false
878+
}
879+
t.Logf("Synced %d records", len(list))
880+
return len(list) == totalPods
881+
}, 30*time.Second)
882+
}
883+
884+
// 7. Measure final state
885+
var afterMemStats runtime.MemStats
886+
runtime.ReadMemStats(&afterMemStats)
887+
888+
t.Logf("--- Scalability Metrics ---")
889+
t.Logf("Number of clients: %d", numClients)
890+
t.Logf("Number of namespaces: %d", numNamespaces)
891+
t.Logf("Number of pods: %d", totalPods)
892+
t.Logf("Write QPS: %d", qps)
893+
894+
if propagationDelayCount > 0 {
895+
avgPropagationDelay := totalPropagationDelay / time.Duration(propagationDelayCount)
896+
t.Logf("Average propagation delay: %s", avgPropagationDelay)
897+
}
898+
899+
// Server metrics
900+
serverDirSizeAfter, err := dirSize(server.etcd.Config().Dir)
901+
if err != nil {
902+
t.Fatalf("Failed to get server dir size: %v", err)
903+
}
904+
t.Logf("Server memory usage (Alloc): %.2f KB", float64(afterMemStats.Alloc-beforeMemStats.Alloc)/1024)
905+
t.Logf("Server disk usage difference: %.2f KB", float64(serverDirSizeAfter-serverDirSize)/1024)
906+
907+
// Client metrics
908+
for i, client := range clients {
909+
clientDbPath := client.syncStore.(*BoltStore).db.Path()
910+
clientDbSize, err := os.Stat(clientDbPath)
911+
if err != nil {
912+
t.Fatalf("Failed to get client db size: %v", err)
913+
}
914+
t.Logf("Client %d disk usage (BoltDB): %.2f KB", i, float64(clientDbSize.Size())/1024)
915+
}
916+
}

0 commit comments

Comments
 (0)