Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
3a4625e
feat(CNS): Early work on better throttling in NMAgent fetch for nodes…
santhoshmprabhu Sep 19, 2024
8f00354
feat(CNS): Update NMAgent fetches to be async with binary exponential…
santhoshmprabhu Sep 19, 2024
187cb66
chore: check for empty nmagent response
santhoshmprabhu Sep 19, 2024
e21a29f
test: update test for empty response
santhoshmprabhu Sep 19, 2024
c7e88fb
style: make linter happy
santhoshmprabhu Sep 19, 2024
ab97370
Merge remote-tracking branch 'origin/master' into sanprabhu/cilium-no…
santhoshmprabhu Sep 19, 2024
8011972
chore: fix some comments
santhoshmprabhu Sep 19, 2024
b6bc7ad
Merge remote-tracking branch 'origin/master' into sanprabhu/cilium-no…
santhoshmprabhu Sep 20, 2024
b20588e
fix: Fix bug in refresh
santhoshmprabhu Sep 20, 2024
4c7394d
refactor: Address comments
santhoshmprabhu Sep 24, 2024
7708cc9
Merge remote-tracking branch 'origin/master' into sanprabhu/cilium-no…
santhoshmprabhu Sep 24, 2024
f0e3f3d
Merge branch 'master' into sanprabhu/cilium-node-subnet-intelligent-r…
santhoshmprabhu Sep 24, 2024
3b07378
refactor: ignore primary ip
santhoshmprabhu Sep 25, 2024
c5e154e
Merge branch 'sanprabhu/cilium-node-subnet-intelligent-refresh' of gi…
santhoshmprabhu Sep 25, 2024
720358b
Merge branch 'master' into sanprabhu/cilium-node-subnet-intelligent-r…
santhoshmprabhu Sep 25, 2024
9ad80f5
refactor: move refresh out of ipfetcher
santhoshmprabhu Sep 27, 2024
62daac4
test: add ip fetcher tests
santhoshmprabhu Sep 27, 2024
72b685d
fix: remove broken import
santhoshmprabhu Sep 27, 2024
90c56b7
fix: fix import
santhoshmprabhu Sep 27, 2024
a6f4889
fix: fix linting
santhoshmprabhu Sep 27, 2024
7ed21ae
fix: fix some failing tests
santhoshmprabhu Sep 27, 2024
7551a04
chore: Remove unused function
santhoshmprabhu Sep 27, 2024
2a2ae20
test: test updates
santhoshmprabhu Sep 27, 2024
945784f
Merge remote-tracking branch 'origin/master' into sanprabhu/cilium-no…
santhoshmprabhu Oct 1, 2024
f3e076c
fix: address comments
santhoshmprabhu Oct 1, 2024
d57810a
chore: add missed file
santhoshmprabhu Oct 1, 2024
510d7cf
chore: add comment about static interval
santhoshmprabhu Oct 1, 2024
7e720a7
Merge branch 'master' into sanprabhu/cilium-node-subnet-intelligent-r…
santhoshmprabhu Oct 1, 2024
14be616
Merge branch 'master' into sanprabhu/cilium-node-subnet-intelligent-r…
santhoshmprabhu Oct 1, 2024
8fc2a4c
Merge branch 'master' into sanprabhu/cilium-node-subnet-intelligent-r…
santhoshmprabhu Oct 2, 2024
64da2bc
feat: address Evan's comment to require Equal method on cached results
santhoshmprabhu Oct 2, 2024
f79a6c3
chore: add missed file
santhoshmprabhu Oct 2, 2024
2331766
feat: more efficient equality
santhoshmprabhu Oct 2, 2024
a78ae15
Merge branch 'master' into sanprabhu/cilium-node-subnet-intelligent-r…
santhoshmprabhu Oct 3, 2024
2f323e6
Merge branch 'master' into sanprabhu/cilium-node-subnet-intelligent-r…
santhoshmprabhu Oct 3, 2024
38c5047
refactor: address Evan's comment
santhoshmprabhu Oct 4, 2024
22bdf3a
Merge branch 'sanprabhu/cilium-node-subnet-intelligent-refresh' of gi…
santhoshmprabhu Oct 4, 2024
c62f74a
Merge branch 'master' into sanprabhu/cilium-node-subnet-intelligent-r…
santhoshmprabhu Oct 7, 2024
6ed6687
Merge branch 'master' into sanprabhu/cilium-node-subnet-intelligent-r…
santhoshmprabhu Oct 7, 2024
5b64c2e
Merge branch 'master' into sanprabhu/cilium-node-subnet-intelligent-r…
santhoshmprabhu Oct 8, 2024
1ee34c2
Merge branch 'master' into sanprabhu/cilium-node-subnet-intelligent-r…
santhoshmprabhu Oct 8, 2024
f8b1c64
refactor: address Tim's comment
santhoshmprabhu Oct 8, 2024
8541631
fix: undo accidental commit
santhoshmprabhu Oct 8, 2024
cc0b7d7
Merge branch 'master' into sanprabhu/cilium-node-subnet-intelligent-r…
santhoshmprabhu Oct 8, 2024
0abad88
Merge branch 'master' into sanprabhu/cilium-node-subnet-intelligent-r…
santhoshmprabhu Oct 9, 2024
df3d900
fix: make linter happy
santhoshmprabhu Oct 14, 2024
d1b1461
Merge branch 'master' into sanprabhu/cilium-node-subnet-intelligent-r…
santhoshmprabhu Oct 14, 2024
0407ef0
fix: make linter happy
santhoshmprabhu Oct 14, 2024
b235877
Merge branch 'sanprabhu/cilium-node-subnet-intelligent-refresh' of gi…
santhoshmprabhu Oct 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions cns/nodesubnet/helper_for_ip_fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,13 @@ package nodesubnet

import "time"

// This method is in this file (_test.go) because it is a test helper method.
// The following method is built during tests, and is not part of the main code.
func (c *IPFetcher) SetSecondaryIPQueryInterval(interval time.Duration) {
c.secondaryIPQueryInterval = interval
// These methods is in this file (_test.go) because they are helpers. They are
// built during tests, and are not part of the main code.

func (c *IPFetcher) GetCurrentQueryInterval() time.Duration {
return c.tickerInterval
}

func (c *IPFetcher) SetTicker(tickProvider TickProvider) {
c.ticker = tickProvider
}
125 changes: 105 additions & 20 deletions cns/nodesubnet/ip_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,46 +10,130 @@ import (
"github.com/pkg/errors"
)

const (
// Default minimum time between secondary IP fetches
DefaultMinRefreshInterval = 4 * time.Second
// Default maximum time between secondary IP fetches
DefaultMaxRefreshInterval = 1024 * time.Second
)

var ErrRefreshSkipped = errors.New("refresh skipped due to throttling")

// InterfaceRetriever is an interface is implemented by the NMAgent Client, and also a mock client for testing.
type InterfaceRetriever interface {
GetInterfaceIPInfo(ctx context.Context) (nmagent.Interfaces, error)
}

type IPFetcher struct {
// Node subnet state
secondaryIPQueryInterval time.Duration // Minimum time between secondary IP fetches
secondaryIPLastRefreshTime time.Time // Time of last secondary IP fetch
// IPConsumer is an interface implemented by whoever consumes the secondary IPs fetched in nodesubnet
type IPConsumer interface {
UpdateIPsForNodeSubnet(netip.Addr, []netip.Addr) error
}

ipFectcherClient InterfaceRetriever
type IPFetcher struct {
// Node subnet config
intfFetcherClient InterfaceRetriever
ticker TickProvider
tickerInterval time.Duration
consumer IPConsumer
minRefreshInterval time.Duration
maxRefreshInterval time.Duration
}

func NewIPFetcher(nmaClient InterfaceRetriever, queryInterval time.Duration) *IPFetcher {
// NewIPFetcher creates a new IPFetcher. If minInterval is 0, it will default to 4 seconds.
// If maxInterval is 0, it will default to 1024 seconds (or minInterval, if it is higher).
func NewIPFetcher(
client InterfaceRetriever,
consumer IPConsumer,
minInterval time.Duration,
maxInterval time.Duration,
) *IPFetcher {
if minInterval == 0 {
minInterval = DefaultMinRefreshInterval
}

if maxInterval == 0 {
maxInterval = DefaultMaxRefreshInterval
}

maxInterval = max(maxInterval, minInterval)

return &IPFetcher{
ipFectcherClient: nmaClient,
secondaryIPQueryInterval: queryInterval,
intfFetcherClient: client,
consumer: consumer,
minRefreshInterval: minInterval,
maxRefreshInterval: maxInterval,
tickerInterval: minInterval,
}
}

func (c *IPFetcher) UpdateFetchIntervalForNoObservedDiff() {
c.tickerInterval = min(c.tickerInterval*2, c.maxRefreshInterval) //nolint:gomnd // doubling interval

if c.ticker != nil {
c.ticker.Reset(c.tickerInterval)
}
}

func (c *IPFetcher) RefreshSecondaryIPsIfNeeded(ctx context.Context) (ips []netip.Addr, err error) {
// If secondaryIPQueryInterval has elapsed since the last fetch, fetch secondary IPs
if time.Since(c.secondaryIPLastRefreshTime) < c.secondaryIPQueryInterval {
return nil, ErrRefreshSkipped
func (c *IPFetcher) UpdateFetchIntervalForObservedDiff() {
c.tickerInterval = c.minRefreshInterval

if c.ticker != nil {
c.ticker.Reset(c.tickerInterval)
}
}

func (c *IPFetcher) Start(ctx context.Context) {
go func() {
// Do an initial fetch
err := c.RefreshSecondaryIPs(ctx)
if err != nil {
log.Printf("Error refreshing secondary IPs: %v", err)
}

if c.ticker == nil {
c.ticker = NewTimedTickProvider(c.tickerInterval)
}

defer c.ticker.Stop()

for {
select {
case <-c.ticker.C():
err := c.RefreshSecondaryIPs(ctx)
if err != nil {
log.Printf("Error refreshing secondary IPs: %v", err)
}
case <-ctx.Done():
log.Println("IPFetcher stopped")
return
}
}
}()
}

// Fetch IPs from NMAgent and pass to the consumer
func (c *IPFetcher) RefreshSecondaryIPs(ctx context.Context) error {
response, err := c.intfFetcherClient.GetInterfaceIPInfo(ctx)
if err != nil {
return errors.Wrap(err, "getting interface IPs")
}

if len(response.Entries) == 0 {
return errors.New("no interfaces found in response from NMAgent")
}

c.secondaryIPLastRefreshTime = time.Now()
response, err := c.ipFectcherClient.GetInterfaceIPInfo(ctx)
primaryIP, secondaryIPs := flattenIPListFromResponse(&response)
err = c.consumer.UpdateIPsForNodeSubnet(primaryIP, secondaryIPs)
if err != nil {
return nil, errors.Wrap(err, "getting interface IPs")
return errors.Wrap(err, "updating secondary IPs")
}

res := flattenIPListFromResponse(&response)
return res, nil
return nil
}

// Get the list of secondary IPs from fetched Interfaces
func flattenIPListFromResponse(resp *nmagent.Interfaces) (res []netip.Addr) {
func flattenIPListFromResponse(resp *nmagent.Interfaces) (primary netip.Addr, secondaryIPs []netip.Addr) {
var primaryIP netip.Addr
// For each interface...
for _, intf := range resp.Entries {
if !intf.IsPrimary {
Expand All @@ -63,15 +147,16 @@ func flattenIPListFromResponse(resp *nmagent.Interfaces) (res []netip.Addr) {
for _, a := range s.IPAddress {
// Primary addresses are reserved for the host.
if a.IsPrimary {
primaryIP = netip.Addr(a.Address)
continue
}

res = append(res, netip.Addr(a.Address))
secondaryIPs = append(secondaryIPs, netip.Addr(a.Address))
addressCount++
}
log.Printf("Got %d addresses from subnet %s", addressCount, s.Prefix)
}
}

return res
return primaryIP, secondaryIPs
}
155 changes: 103 additions & 52 deletions cns/nodesubnet/ip_fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ package nodesubnet_test

import (
"context"
"errors"
"net/netip"
"sync/atomic"
"testing"
"time"

Expand All @@ -12,55 +13,117 @@ import (

// Mock client that simply tracks if refresh has been called
type TestClient struct {
fetchCalled bool
refreshCount int32
}

// Mock refresh
func (c *TestClient) GetInterfaceIPInfo(_ context.Context) (nmagent.Interfaces, error) {
c.fetchCalled = true
atomic.AddInt32(&c.refreshCount, 1)
return nmagent.Interfaces{}, nil
}

func TestRefreshSecondaryIPsIfNeeded(t *testing.T) {
getTests := []struct {
name string
shouldCall bool
interval time.Duration
}{
{
"fetch called",
true,
-1 * time.Second, // Negative timeout to force refresh
},
{
"no refresh needed",
false,
10 * time.Hour, // High timeout to avoid refresh
},
var _ nodesubnet.InterfaceRetriever = &TestClient{}

// Mock client that simply consumes fetched IPs
type TestConsumer struct {
consumeCount int32
}

// Mock IP update
func (c *TestConsumer) UpdateIPsForNodeSubnet(_ netip.Addr, _ []netip.Addr) error {
atomic.AddInt32(&c.consumeCount, 1)
return nil
}

var _ nodesubnet.IPConsumer = &TestConsumer{}

// MockTickProvider is a mock implementation of the TickProvider interface
type MockTickProvider struct {
tickChan chan time.Time
currentDuration time.Duration
}

// NewMockTickProvider creates a new MockTickProvider
func NewMockTickProvider() *MockTickProvider {
return &MockTickProvider{
tickChan: make(chan time.Time, 1),
}
}

// C returns the channel on which ticks are delivered
func (m *MockTickProvider) C() <-chan time.Time {
return m.tickChan
}

// Stop stops the ticker
func (m *MockTickProvider) Stop() {
close(m.tickChan)
}

// Tick manually sends a tick to the channel
func (m *MockTickProvider) Tick() {
m.tickChan <- time.Now()
}

func (m *MockTickProvider) Reset(d time.Duration) {
m.currentDuration = d
}

var _ nodesubnet.TickProvider = &MockTickProvider{}

func TestRefresh(t *testing.T) {
clientPtr := &TestClient{}
consumerPtr := &TestConsumer{}
fetcher := nodesubnet.NewIPFetcher(clientPtr, consumerPtr, 0, 0)
ticker := NewMockTickProvider()
fetcher.SetTicker(ticker)
ctx, cancel := testContext(t)
defer cancel()
fetcher.Start(ctx)
ticker.Tick() // Trigger a refresh
ticker.Tick() // This tick will be read only after previous refresh is done
ticker.Tick() // This call will block until the prevous tick is read

// At least 2 refreshes - one initial and one after the first tick should be done
if atomic.LoadInt32(&clientPtr.refreshCount) < 2 {
t.Error("Not enough refreshes")
}

// No consumes, since the responses are empty
if atomic.LoadInt32(&consumerPtr.consumeCount) > 0 {
t.Error("Consume called unexpectedly, shouldn't be called since responses are empty")
}
}

func TestIntervalUpdate(t *testing.T) {
clientPtr := &TestClient{}
fetcher := nodesubnet.NewIPFetcher(clientPtr, 0)

for _, test := range getTests {
test := test
t.Run(test.name, func(t *testing.T) { // Do not parallelize, as we are using a shared client
fetcher.SetSecondaryIPQueryInterval(test.interval)
ctx, cancel := testContext(t)
defer cancel()
clientPtr.fetchCalled = false
_, err := fetcher.RefreshSecondaryIPsIfNeeded(ctx)

if test.shouldCall {
if err != nil && errors.Is(err, nodesubnet.ErrRefreshSkipped) {
t.Error("refresh expected, but didn't happen")
}

checkErr(t, err, false)
} else if err == nil || !errors.Is(err, nodesubnet.ErrRefreshSkipped) {
t.Error("refresh not expected, but happened")
}
})
consumerPtr := &TestConsumer{}
fetcher := nodesubnet.NewIPFetcher(clientPtr, consumerPtr, 0, 0)
interval := fetcher.GetCurrentQueryInterval()
ticker := NewMockTickProvider()
fetcher.SetTicker(ticker)

if interval != nodesubnet.DefaultMinRefreshInterval {
t.Error("Default min interval not used")
}

for i := 1; i <= 10; i++ {
fetcher.UpdateFetchIntervalForNoObservedDiff()
exp := interval * 2
if interval == nodesubnet.DefaultMaxRefreshInterval {
exp = nodesubnet.DefaultMaxRefreshInterval
}
if fetcher.GetCurrentQueryInterval() != exp || ticker.currentDuration != exp {
t.Error("Interval not updated correctly")
} else {
interval = exp
}
}

fetcher.UpdateFetchIntervalForObservedDiff()

if fetcher.GetCurrentQueryInterval() != nodesubnet.DefaultMinRefreshInterval || ticker.currentDuration != nodesubnet.DefaultMinRefreshInterval {
t.Error("Observed diff update incorrect")
}
}

Expand All @@ -72,15 +135,3 @@ func testContext(t *testing.T) (context.Context, context.CancelFunc) {
}
return context.WithCancel(context.Background())
}

// checkErr is an assertion of the presence or absence of an error
func checkErr(t *testing.T, err error, shouldErr bool) {
t.Helper()
if err != nil && !shouldErr {
t.Fatal("unexpected error: err:", err)
}

if err == nil && shouldErr {
t.Fatal("expected error but received none")
}
}
Loading