From 3a4625e4c8c187e1d4ca51e095592fc905c025d1 Mon Sep 17 00:00:00 2001 From: Santhosh Prabhu Date: Thu, 19 Sep 2024 09:25:58 -0700 Subject: [PATCH 01/28] feat(CNS): Early work on better throttling in NMAgent fetch for nodesubnet --- cns/nodesubnet/ip_fetcher.go | 85 +++++++++++++++++++++++++++--------- 1 file changed, 65 insertions(+), 20 deletions(-) diff --git a/cns/nodesubnet/ip_fetcher.go b/cns/nodesubnet/ip_fetcher.go index 5c2233786d..f057e93bf0 100644 --- a/cns/nodesubnet/ip_fetcher.go +++ b/cns/nodesubnet/ip_fetcher.go @@ -10,6 +10,13 @@ import ( "github.com/pkg/errors" ) +const ( + // Minimum time between secondary IP fetches + MinRefreshInterval = 4 * time.Second + // Maximum time between secondary IP fetches + MaxRefreshInterval = 1024 * time.Second +) + var ErrRefreshSkipped = errors.New("refresh skipped due to throttling") // InterfaceRetriever is an interface is implemented by the NMAgent Client, and also a mock client for testing. @@ -17,39 +24,76 @@ type InterfaceRetriever interface { GetInterfaceIPInfo(ctx context.Context) (nmagent.Interfaces, error) } -type IPFetcher struct { - // Node subnet state - secondaryIPQueryInterval time.Duration // Minimum time between secondary IP fetches - secondaryIPLastRefreshTime time.Time // Time of last secondary IP fetch +// SecondaryIPConsumer is an interface implemented by whoever consumes the secondary IPs fetched in nodesubnet +type SecondaryIPConsumer interface { + UpdateSecondaryIPsForNodeSubnet(netip.Addr, []netip.Addr) error +} - ipFectcherClient InterfaceRetriever +type IPFetcher struct { + // Node subnet config + intfFetcherClient InterfaceRetriever + ticker *time.Ticker + tickerInterval time.Duration + consumer SecondaryIPConsumer } -func NewIPFetcher(nmaClient InterfaceRetriever, queryInterval time.Duration) *IPFetcher { +func NewIPFetcher(nmaClient InterfaceRetriever, c SecondaryIPConsumer) *IPFetcher { return &IPFetcher{ - ipFectcherClient: nmaClient, - secondaryIPQueryInterval: queryInterval, + intfFetcherClient: nmaClient, + consumer: c, } } -func (c *IPFetcher) RefreshSecondaryIPsIfNeeded(ctx context.Context) (ips []netip.Addr, err error) { - // If secondaryIPQueryInterval has elapsed since the last fetch, fetch secondary IPs - if time.Since(c.secondaryIPLastRefreshTime) < c.secondaryIPQueryInterval { - return nil, ErrRefreshSkipped +func (c *IPFetcher) updateFetchIntervalForNoObservedDiff() { + c.tickerInterval = min(c.tickerInterval*2, MaxRefreshInterval) + c.ticker.Reset(c.tickerInterval) +} + +func (c *IPFetcher) updateFetchIntervalForObservedDiff() { + c.tickerInterval = MinRefreshInterval + c.ticker.Reset(c.tickerInterval) +} + +func (c *IPFetcher) Start(ctx context.Context) { + go func() { + c.tickerInterval = MinRefreshInterval + c.ticker = time.NewTicker(c.tickerInterval) + defer c.ticker.Stop() + + for { + select { + case <-c.ticker.C: + err := c.RefreshSecondaryIPs(ctx) + if err != nil { + log.Printf("Error refreshing secondary IPs: %v", err) + } + case <-ctx.Done(): + log.Println("IPFetcher stopped") + return + } + } + }() +} + +// If secondaryIPQueryInterval has elapsed since the last fetch, fetch secondary IPs +func (c *IPFetcher) RefreshSecondaryIPs(ctx context.Context) error { + response, err := c.intfFetcherClient.GetInterfaceIPInfo(ctx) + if err != nil { + return errors.Wrap(err, "getting interface IPs") } - c.secondaryIPLastRefreshTime = time.Now() - response, err := c.ipFectcherClient.GetInterfaceIPInfo(ctx) + primaryIP, secondaryIPs := flattenIPListFromResponse(&response) + err = c.consumer.UpdateSecondaryIPsForNodeSubnet(primaryIP, secondaryIPs) if err != nil { - return nil, errors.Wrap(err, "getting interface IPs") + return errors.Wrap(err, "updating secondary IPs") } - res := flattenIPListFromResponse(&response) - return res, nil + return nil } // Get the list of secondary IPs from fetched Interfaces -func flattenIPListFromResponse(resp *nmagent.Interfaces) (res []netip.Addr) { +func flattenIPListFromResponse(resp *nmagent.Interfaces) (primary netip.Addr, secondaryIPs []netip.Addr) { + var primaryIP netip.Addr // For each interface... for _, intf := range resp.Entries { if !intf.IsPrimary { @@ -63,15 +107,16 @@ func flattenIPListFromResponse(resp *nmagent.Interfaces) (res []netip.Addr) { for _, a := range s.IPAddress { // Primary addresses are reserved for the host. if a.IsPrimary { + primaryIP = netip.Addr(a.Address) continue } - res = append(res, netip.Addr(a.Address)) + secondaryIPs = append(secondaryIPs, netip.Addr(a.Address)) addressCount++ } log.Printf("Got %d addresses from subnet %s", addressCount, s.Prefix) } } - return res + return primaryIP, secondaryIPs } From 8f0035470eac82ef673b66f99ab98ff9a8a61b69 Mon Sep 17 00:00:00 2001 From: Santhosh Prabhu Date: Thu, 19 Sep 2024 14:09:06 -0700 Subject: [PATCH 02/28] feat(CNS): Update NMAgent fetches to be async with binary exponential backoff --- cns/nodesubnet/helper_for_ip_fetcher_test.go | 13 +- cns/nodesubnet/ip_fetcher.go | 81 +++++++--- cns/nodesubnet/ip_fetcher_test.go | 155 ++++++++++++------- cns/nodesubnet/refreshticker.go | 37 +++++ 4 files changed, 206 insertions(+), 80 deletions(-) create mode 100644 cns/nodesubnet/refreshticker.go diff --git a/cns/nodesubnet/helper_for_ip_fetcher_test.go b/cns/nodesubnet/helper_for_ip_fetcher_test.go index f8eda641f4..aa3f2fd289 100644 --- a/cns/nodesubnet/helper_for_ip_fetcher_test.go +++ b/cns/nodesubnet/helper_for_ip_fetcher_test.go @@ -2,8 +2,13 @@ package nodesubnet import "time" -// This method is in this file (_test.go) because it is a test helper method. -// The following method is built during tests, and is not part of the main code. -func (c *IPFetcher) SetSecondaryIPQueryInterval(interval time.Duration) { - c.secondaryIPQueryInterval = interval +// These methods is in this file (_test.go) because they are helpers. They are +// built during tests, and are not part of the main code. + +func (c *IPFetcher) GetCurrentQueryInterval() time.Duration { + return c.tickerInterval +} + +func (c *IPFetcher) SetTicker(tickProvider TickProvider) { + c.ticker = tickProvider } diff --git a/cns/nodesubnet/ip_fetcher.go b/cns/nodesubnet/ip_fetcher.go index f057e93bf0..841411f6cf 100644 --- a/cns/nodesubnet/ip_fetcher.go +++ b/cns/nodesubnet/ip_fetcher.go @@ -11,10 +11,10 @@ import ( ) const ( - // Minimum time between secondary IP fetches - MinRefreshInterval = 4 * time.Second - // Maximum time between secondary IP fetches - MaxRefreshInterval = 1024 * time.Second + // Default minimum time between secondary IP fetches + DefaultMinRefreshInterval = 4 * time.Second + // Default maximum time between secondary IP fetches + DefaultMaxRefreshInterval = 1024 * time.Second ) var ErrRefreshSkipped = errors.New("refresh skipped due to throttling") @@ -24,45 +24,78 @@ type InterfaceRetriever interface { GetInterfaceIPInfo(ctx context.Context) (nmagent.Interfaces, error) } -// SecondaryIPConsumer is an interface implemented by whoever consumes the secondary IPs fetched in nodesubnet -type SecondaryIPConsumer interface { - UpdateSecondaryIPsForNodeSubnet(netip.Addr, []netip.Addr) error +// IPConsumer is an interface implemented by whoever consumes the secondary IPs fetched in nodesubnet +type IPConsumer interface { + UpdateIPsForNodeSubnet(netip.Addr, []netip.Addr) error } type IPFetcher struct { // Node subnet config - intfFetcherClient InterfaceRetriever - ticker *time.Ticker - tickerInterval time.Duration - consumer SecondaryIPConsumer + intfFetcherClient InterfaceRetriever + ticker TickProvider + tickerInterval time.Duration + consumer IPConsumer + minRefreshInterval time.Duration + maxRefreshInterval time.Duration } -func NewIPFetcher(nmaClient InterfaceRetriever, c SecondaryIPConsumer) *IPFetcher { +// NewIPFetcher creates a new IPFetcher. If minInterval is 0, it will default to 4 seconds. +// If maxInterval is 0, it will default to 1024 seconds (or minInterval, if it is higher). +func NewIPFetcher( + client InterfaceRetriever, + consumer IPConsumer, + minInterval time.Duration, + maxInterval time.Duration, +) *IPFetcher { + if minInterval == 0 { + minInterval = DefaultMinRefreshInterval + } + + if maxInterval == 0 { + maxInterval = DefaultMaxRefreshInterval + } + + maxInterval = max(maxInterval, minInterval) + return &IPFetcher{ - intfFetcherClient: nmaClient, - consumer: c, + intfFetcherClient: client, + consumer: consumer, + minRefreshInterval: minInterval, + maxRefreshInterval: maxInterval, + tickerInterval: minInterval, } } -func (c *IPFetcher) updateFetchIntervalForNoObservedDiff() { - c.tickerInterval = min(c.tickerInterval*2, MaxRefreshInterval) - c.ticker.Reset(c.tickerInterval) +func (c *IPFetcher) UpdateFetchIntervalForNoObservedDiff() { + c.tickerInterval = min(c.tickerInterval*2, c.maxRefreshInterval) + + if c.ticker != nil { + c.ticker.Reset(c.tickerInterval) + } } -func (c *IPFetcher) updateFetchIntervalForObservedDiff() { - c.tickerInterval = MinRefreshInterval - c.ticker.Reset(c.tickerInterval) +func (c *IPFetcher) UpdateFetchIntervalForObservedDiff() { + c.tickerInterval = c.minRefreshInterval + + if c.ticker != nil { + c.ticker.Reset(c.tickerInterval) + } } func (c *IPFetcher) Start(ctx context.Context) { go func() { - c.tickerInterval = MinRefreshInterval - c.ticker = time.NewTicker(c.tickerInterval) + // Do an initial fetch + c.RefreshSecondaryIPs(ctx) + + if c.ticker == nil { + c.ticker = NewTimedTickProvider(c.tickerInterval) + } + defer c.ticker.Stop() for { select { - case <-c.ticker.C: + case <-c.ticker.C(): err := c.RefreshSecondaryIPs(ctx) if err != nil { log.Printf("Error refreshing secondary IPs: %v", err) @@ -83,7 +116,7 @@ func (c *IPFetcher) RefreshSecondaryIPs(ctx context.Context) error { } primaryIP, secondaryIPs := flattenIPListFromResponse(&response) - err = c.consumer.UpdateSecondaryIPsForNodeSubnet(primaryIP, secondaryIPs) + err = c.consumer.UpdateIPsForNodeSubnet(primaryIP, secondaryIPs) if err != nil { return errors.Wrap(err, "updating secondary IPs") } diff --git a/cns/nodesubnet/ip_fetcher_test.go b/cns/nodesubnet/ip_fetcher_test.go index 6a2e425126..f541011274 100644 --- a/cns/nodesubnet/ip_fetcher_test.go +++ b/cns/nodesubnet/ip_fetcher_test.go @@ -2,7 +2,8 @@ package nodesubnet_test import ( "context" - "errors" + "net/netip" + "sync/atomic" "testing" "time" @@ -12,55 +13,117 @@ import ( // Mock client that simply tracks if refresh has been called type TestClient struct { - fetchCalled bool + refreshCount int32 } // Mock refresh func (c *TestClient) GetInterfaceIPInfo(_ context.Context) (nmagent.Interfaces, error) { - c.fetchCalled = true + atomic.AddInt32(&c.refreshCount, 1) return nmagent.Interfaces{}, nil } -func TestRefreshSecondaryIPsIfNeeded(t *testing.T) { - getTests := []struct { - name string - shouldCall bool - interval time.Duration - }{ - { - "fetch called", - true, - -1 * time.Second, // Negative timeout to force refresh - }, - { - "no refresh needed", - false, - 10 * time.Hour, // High timeout to avoid refresh - }, +var _ nodesubnet.InterfaceRetriever = &TestClient{} + +// Mock client that simply consumes fetched IPs +type TestConsumer struct { + consumeCount int32 +} + +// Mock IP update +func (c *TestConsumer) UpdateIPsForNodeSubnet(_ netip.Addr, _ []netip.Addr) error { + atomic.AddInt32(&c.consumeCount, 1) + return nil +} + +var _ nodesubnet.IPConsumer = &TestConsumer{} + +// MockTickProvider is a mock implementation of the TickProvider interface +type MockTickProvider struct { + tickChan chan time.Time + currentDuration time.Duration +} + +// NewMockTickProvider creates a new MockTickProvider +func NewMockTickProvider() *MockTickProvider { + return &MockTickProvider{ + tickChan: make(chan time.Time, 1), + } +} + +// C returns the channel on which ticks are delivered +func (m *MockTickProvider) C() <-chan time.Time { + return m.tickChan +} + +// Stop stops the ticker +func (m *MockTickProvider) Stop() { + close(m.tickChan) +} + +// Tick manually sends a tick to the channel +func (m *MockTickProvider) Tick() { + m.tickChan <- time.Now() +} + +func (m *MockTickProvider) Reset(d time.Duration) { + m.currentDuration = d +} + +var _ nodesubnet.TickProvider = &MockTickProvider{} + +func TestRefresh(t *testing.T) { + clientPtr := &TestClient{} + consumerPtr := &TestConsumer{} + fetcher := nodesubnet.NewIPFetcher(clientPtr, consumerPtr, 0, 0) + ticker := NewMockTickProvider() + fetcher.SetTicker(ticker) + ctx, cancel := testContext(t) + defer cancel() + fetcher.Start(ctx) + ticker.Tick() // Trigger a refresh + ticker.Tick() // This tick will be read only after previous refresh is done + ticker.Tick() // This call will block until the prevous tick is read + + // At least 2 refreshes - one initial and one after the first tick should be done + if atomic.LoadInt32(&clientPtr.refreshCount) < 2 { + t.Error("Not enough refreshes") } + // At least 2 consumes - one initial and one after the first tick should be done + if atomic.LoadInt32(&consumerPtr.consumeCount) < 2 { + t.Error("Not enough consumes") + } +} + +func TestIntervalUpdate(t *testing.T) { clientPtr := &TestClient{} - fetcher := nodesubnet.NewIPFetcher(clientPtr, 0) - - for _, test := range getTests { - test := test - t.Run(test.name, func(t *testing.T) { // Do not parallelize, as we are using a shared client - fetcher.SetSecondaryIPQueryInterval(test.interval) - ctx, cancel := testContext(t) - defer cancel() - clientPtr.fetchCalled = false - _, err := fetcher.RefreshSecondaryIPsIfNeeded(ctx) - - if test.shouldCall { - if err != nil && errors.Is(err, nodesubnet.ErrRefreshSkipped) { - t.Error("refresh expected, but didn't happen") - } - - checkErr(t, err, false) - } else if err == nil || !errors.Is(err, nodesubnet.ErrRefreshSkipped) { - t.Error("refresh not expected, but happened") - } - }) + consumerPtr := &TestConsumer{} + fetcher := nodesubnet.NewIPFetcher(clientPtr, consumerPtr, 0, 0) + interval := fetcher.GetCurrentQueryInterval() + ticker := NewMockTickProvider() + fetcher.SetTicker(ticker) + + if interval != nodesubnet.DefaultMinRefreshInterval { + t.Error("Default min interval not used") + } + + for range 10 { + fetcher.UpdateFetchIntervalForNoObservedDiff() + exp := interval * 2 + if interval == nodesubnet.DefaultMaxRefreshInterval { + exp = nodesubnet.DefaultMaxRefreshInterval + } + if fetcher.GetCurrentQueryInterval() != exp || ticker.currentDuration != exp { + t.Error("Interval not updated correctly") + } else { + interval = exp + } + } + + fetcher.UpdateFetchIntervalForObservedDiff() + + if fetcher.GetCurrentQueryInterval() != nodesubnet.DefaultMinRefreshInterval || ticker.currentDuration != nodesubnet.DefaultMinRefreshInterval { + t.Error("Observed diff update incorrect") } } @@ -72,15 +135,3 @@ func testContext(t *testing.T) (context.Context, context.CancelFunc) { } return context.WithCancel(context.Background()) } - -// checkErr is an assertion of the presence or absence of an error -func checkErr(t *testing.T, err error, shouldErr bool) { - t.Helper() - if err != nil && !shouldErr { - t.Fatal("unexpected error: err:", err) - } - - if err == nil && shouldErr { - t.Fatal("expected error but received none") - } -} diff --git a/cns/nodesubnet/refreshticker.go b/cns/nodesubnet/refreshticker.go new file mode 100644 index 0000000000..015d47b5d6 --- /dev/null +++ b/cns/nodesubnet/refreshticker.go @@ -0,0 +1,37 @@ +package nodesubnet + +import "time" + +// TickProvider defines a wrapper for time.Ticker +type TickProvider interface { + Stop() + Reset(d time.Duration) + C() <-chan time.Time +} + +// TimedTickProvider wraps a time.Ticker to implement TickProvider +type TimedTickProvider struct { + ticker time.Ticker +} + +var _ TickProvider = &TimedTickProvider{} + +// NewTickerWrapper creates a new TickerWrapper +func NewTimedTickProvider(d time.Duration) *TimedTickProvider { + return &TimedTickProvider{ticker: *time.NewTicker(d)} +} + +// Stop stops the ticker +func (tw *TimedTickProvider) Stop() { + tw.ticker.Stop() +} + +// Reset resets the ticker with a new duration +func (tw *TimedTickProvider) Reset(d time.Duration) { + tw.ticker.Reset(d) +} + +// C returns the ticker's channel +func (tw *TimedTickProvider) C() <-chan time.Time { + return tw.ticker.C +} From 187cb6612bc38202cf1aef076eb562799566fb33 Mon Sep 17 00:00:00 2001 From: Santhosh Prabhu Date: Thu, 19 Sep 2024 14:15:47 -0700 Subject: [PATCH 03/28] chore: check for empty nmagent response --- cns/nodesubnet/ip_fetcher.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/cns/nodesubnet/ip_fetcher.go b/cns/nodesubnet/ip_fetcher.go index 841411f6cf..533ec98ade 100644 --- a/cns/nodesubnet/ip_fetcher.go +++ b/cns/nodesubnet/ip_fetcher.go @@ -115,6 +115,10 @@ func (c *IPFetcher) RefreshSecondaryIPs(ctx context.Context) error { return errors.Wrap(err, "getting interface IPs") } + if len(response.Entries) == 0 { + return errors.New("no interfaces found in response from NMAgent") + } + primaryIP, secondaryIPs := flattenIPListFromResponse(&response) err = c.consumer.UpdateIPsForNodeSubnet(primaryIP, secondaryIPs) if err != nil { From e21a29f9a542514bcf961fe735dbc5aefbb124aa Mon Sep 17 00:00:00 2001 From: Santhosh Prabhu Date: Thu, 19 Sep 2024 14:21:32 -0700 Subject: [PATCH 04/28] test: update test for empty response --- cns/nodesubnet/ip_fetcher_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cns/nodesubnet/ip_fetcher_test.go b/cns/nodesubnet/ip_fetcher_test.go index f541011274..d6bfa6e257 100644 --- a/cns/nodesubnet/ip_fetcher_test.go +++ b/cns/nodesubnet/ip_fetcher_test.go @@ -90,8 +90,8 @@ func TestRefresh(t *testing.T) { } // At least 2 consumes - one initial and one after the first tick should be done - if atomic.LoadInt32(&consumerPtr.consumeCount) < 2 { - t.Error("Not enough consumes") + if atomic.LoadInt32(&consumerPtr.consumeCount) > 0 { + t.Error("Consume called unexpectedly, shouldn't be called since responses are empty") } } From c7e88fb46446bb16a5439d00ee84958c7b3744b4 Mon Sep 17 00:00:00 2001 From: Santhosh Prabhu Date: Thu, 19 Sep 2024 14:39:25 -0700 Subject: [PATCH 05/28] style: make linter happy --- cns/nodesubnet/ip_fetcher.go | 7 +++++-- cns/nodesubnet/ip_fetcher_test.go | 2 +- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/cns/nodesubnet/ip_fetcher.go b/cns/nodesubnet/ip_fetcher.go index 533ec98ade..04a76593da 100644 --- a/cns/nodesubnet/ip_fetcher.go +++ b/cns/nodesubnet/ip_fetcher.go @@ -67,7 +67,7 @@ func NewIPFetcher( } func (c *IPFetcher) UpdateFetchIntervalForNoObservedDiff() { - c.tickerInterval = min(c.tickerInterval*2, c.maxRefreshInterval) + c.tickerInterval = min(c.tickerInterval*2, c.maxRefreshInterval) //nolint:gomnd // doubling interval if c.ticker != nil { c.ticker.Reset(c.tickerInterval) @@ -85,7 +85,10 @@ func (c *IPFetcher) UpdateFetchIntervalForObservedDiff() { func (c *IPFetcher) Start(ctx context.Context) { go func() { // Do an initial fetch - c.RefreshSecondaryIPs(ctx) + err := c.RefreshSecondaryIPs(ctx) + if err != nil { + log.Printf("Error refreshing secondary IPs: %v", err) + } if c.ticker == nil { c.ticker = NewTimedTickProvider(c.tickerInterval) diff --git a/cns/nodesubnet/ip_fetcher_test.go b/cns/nodesubnet/ip_fetcher_test.go index d6bfa6e257..1fdc9e393f 100644 --- a/cns/nodesubnet/ip_fetcher_test.go +++ b/cns/nodesubnet/ip_fetcher_test.go @@ -107,7 +107,7 @@ func TestIntervalUpdate(t *testing.T) { t.Error("Default min interval not used") } - for range 10 { + for i := 1; i <= 10; i++ { fetcher.UpdateFetchIntervalForNoObservedDiff() exp := interval * 2 if interval == nodesubnet.DefaultMaxRefreshInterval { From 80119723945efce443f62098fb7db1468a12a0ee Mon Sep 17 00:00:00 2001 From: Santhosh Prabhu Date: Thu, 19 Sep 2024 15:19:05 -0700 Subject: [PATCH 06/28] chore: fix some comments --- cns/nodesubnet/ip_fetcher.go | 2 +- cns/nodesubnet/ip_fetcher_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cns/nodesubnet/ip_fetcher.go b/cns/nodesubnet/ip_fetcher.go index 04a76593da..d3c8b11b50 100644 --- a/cns/nodesubnet/ip_fetcher.go +++ b/cns/nodesubnet/ip_fetcher.go @@ -111,7 +111,7 @@ func (c *IPFetcher) Start(ctx context.Context) { }() } -// If secondaryIPQueryInterval has elapsed since the last fetch, fetch secondary IPs +// Fetch IPs from NMAgent and pass to the consumer func (c *IPFetcher) RefreshSecondaryIPs(ctx context.Context) error { response, err := c.intfFetcherClient.GetInterfaceIPInfo(ctx) if err != nil { diff --git a/cns/nodesubnet/ip_fetcher_test.go b/cns/nodesubnet/ip_fetcher_test.go index 1fdc9e393f..3614661782 100644 --- a/cns/nodesubnet/ip_fetcher_test.go +++ b/cns/nodesubnet/ip_fetcher_test.go @@ -89,7 +89,7 @@ func TestRefresh(t *testing.T) { t.Error("Not enough refreshes") } - // At least 2 consumes - one initial and one after the first tick should be done + // No consumes, since the responses are empty if atomic.LoadInt32(&consumerPtr.consumeCount) > 0 { t.Error("Consume called unexpectedly, shouldn't be called since responses are empty") } From b20588e98e3baafc45a8fc754aed12c52519dd51 Mon Sep 17 00:00:00 2001 From: Santhosh Prabhu Date: Fri, 20 Sep 2024 15:46:40 -0700 Subject: [PATCH 07/28] fix: Fix bug in refresh --- cns/nodesubnet/refreshticker.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cns/nodesubnet/refreshticker.go b/cns/nodesubnet/refreshticker.go index 015d47b5d6..470b1002f2 100644 --- a/cns/nodesubnet/refreshticker.go +++ b/cns/nodesubnet/refreshticker.go @@ -11,14 +11,14 @@ type TickProvider interface { // TimedTickProvider wraps a time.Ticker to implement TickProvider type TimedTickProvider struct { - ticker time.Ticker + ticker *time.Ticker } var _ TickProvider = &TimedTickProvider{} // NewTickerWrapper creates a new TickerWrapper func NewTimedTickProvider(d time.Duration) *TimedTickProvider { - return &TimedTickProvider{ticker: *time.NewTicker(d)} + return &TimedTickProvider{ticker: time.NewTicker(d)} } // Stop stops the ticker From 4c7394d3ccf518684effe1a7cb32d55104b0de4a Mon Sep 17 00:00:00 2001 From: Santhosh Prabhu Date: Tue, 24 Sep 2024 11:50:13 -0700 Subject: [PATCH 08/28] refactor: Address comments --- cns/nodesubnet/helper_for_ip_fetcher_test.go | 8 +- cns/nodesubnet/ip_fetcher.go | 17 +++- cns/nodesubnet/ip_fetcher_test.go | 84 +++++++++---------- refreshticker/mocktickprovider.go | 41 +++++++++ .../refreshticker.go | 6 +- 5 files changed, 104 insertions(+), 52 deletions(-) create mode 100644 refreshticker/mocktickprovider.go rename {cns/nodesubnet => refreshticker}/refreshticker.go (79%) diff --git a/cns/nodesubnet/helper_for_ip_fetcher_test.go b/cns/nodesubnet/helper_for_ip_fetcher_test.go index aa3f2fd289..11fcc9d8af 100644 --- a/cns/nodesubnet/helper_for_ip_fetcher_test.go +++ b/cns/nodesubnet/helper_for_ip_fetcher_test.go @@ -1,6 +1,10 @@ package nodesubnet -import "time" +import ( + "time" + + "github.com/Azure/azure-container-networking/refreshticker" +) // These methods is in this file (_test.go) because they are helpers. They are // built during tests, and are not part of the main code. @@ -9,6 +13,6 @@ func (c *IPFetcher) GetCurrentQueryInterval() time.Duration { return c.tickerInterval } -func (c *IPFetcher) SetTicker(tickProvider TickProvider) { +func (c *IPFetcher) SetTicker(tickProvider refreshticker.TickProvider) { c.ticker = tickProvider } diff --git a/cns/nodesubnet/ip_fetcher.go b/cns/nodesubnet/ip_fetcher.go index d3c8b11b50..72042b88db 100644 --- a/cns/nodesubnet/ip_fetcher.go +++ b/cns/nodesubnet/ip_fetcher.go @@ -7,6 +7,7 @@ import ( "time" "github.com/Azure/azure-container-networking/nmagent" + "github.com/Azure/azure-container-networking/refreshticker" "github.com/pkg/errors" ) @@ -29,10 +30,15 @@ type IPConsumer interface { UpdateIPsForNodeSubnet(netip.Addr, []netip.Addr) error } +// IPFetcher fetches secondary IPs from NMAgent at regular intervals. The +// interval will vary within the range of minRefreshInterval and +// maxRefreshInterval. When no diff is observed after a fetch, the interval +// doubles (subject to the maximum interval). When a diff is observed, the +// interval resets to the minimum. type IPFetcher struct { // Node subnet config intfFetcherClient InterfaceRetriever - ticker TickProvider + ticker refreshticker.TickProvider tickerInterval time.Duration consumer IPConsumer minRefreshInterval time.Duration @@ -66,6 +72,8 @@ func NewIPFetcher( } } +// UpdateFetchIntervalForNoObservedDiff informs IPFetcher that no diff was observed in the last fetch. +// In the current design, this doubles the fetch interval, subject to the maximum interval. func (c *IPFetcher) UpdateFetchIntervalForNoObservedDiff() { c.tickerInterval = min(c.tickerInterval*2, c.maxRefreshInterval) //nolint:gomnd // doubling interval @@ -74,6 +82,8 @@ func (c *IPFetcher) UpdateFetchIntervalForNoObservedDiff() { } } +// UpdateFetchIntervalForNoObservedDiff informs IPFetcher that a diff was observed in the last fetch. +// In the current design, this resets the fetch interval to the minimum. func (c *IPFetcher) UpdateFetchIntervalForObservedDiff() { c.tickerInterval = c.minRefreshInterval @@ -82,6 +92,7 @@ func (c *IPFetcher) UpdateFetchIntervalForObservedDiff() { } } +// Start the IPFetcher. func (c *IPFetcher) Start(ctx context.Context) { go func() { // Do an initial fetch @@ -91,7 +102,7 @@ func (c *IPFetcher) Start(ctx context.Context) { } if c.ticker == nil { - c.ticker = NewTimedTickProvider(c.tickerInterval) + c.ticker = refreshticker.NewTimedTickProvider(c.tickerInterval) } defer c.ticker.Stop() @@ -104,7 +115,7 @@ func (c *IPFetcher) Start(ctx context.Context) { log.Printf("Error refreshing secondary IPs: %v", err) } case <-ctx.Done(): - log.Println("IPFetcher stopped") + log.Printf("IPFetcher stopped") return } } diff --git a/cns/nodesubnet/ip_fetcher_test.go b/cns/nodesubnet/ip_fetcher_test.go index 3614661782..b69ec0bfd7 100644 --- a/cns/nodesubnet/ip_fetcher_test.go +++ b/cns/nodesubnet/ip_fetcher_test.go @@ -3,22 +3,37 @@ package nodesubnet_test import ( "context" "net/netip" - "sync/atomic" + "sync" "testing" - "time" "github.com/Azure/azure-container-networking/cns/nodesubnet" "github.com/Azure/azure-container-networking/nmagent" + "github.com/Azure/azure-container-networking/refreshticker" ) // Mock client that simply tracks if refresh has been called type TestClient struct { refreshCount int32 + mu sync.Mutex +} + +// FetchRefreshCount atomically fetches the refresh count +func (c *TestClient) FetchRefreshCount() int32 { + c.mu.Lock() + defer c.mu.Unlock() + return c.refreshCount +} + +// UpdateRefreshCount atomically updates the refresh count +func (c *TestClient) UpdateRefreshCount() { + c.mu.Lock() + defer c.mu.Unlock() + c.refreshCount++ } // Mock refresh func (c *TestClient) GetInterfaceIPInfo(_ context.Context) (nmagent.Interfaces, error) { - atomic.AddInt32(&c.refreshCount, 1) + c.UpdateRefreshCount() return nmagent.Interfaces{}, nil } @@ -27,55 +42,36 @@ var _ nodesubnet.InterfaceRetriever = &TestClient{} // Mock client that simply consumes fetched IPs type TestConsumer struct { consumeCount int32 + mu sync.Mutex } -// Mock IP update -func (c *TestConsumer) UpdateIPsForNodeSubnet(_ netip.Addr, _ []netip.Addr) error { - atomic.AddInt32(&c.consumeCount, 1) - return nil -} - -var _ nodesubnet.IPConsumer = &TestConsumer{} - -// MockTickProvider is a mock implementation of the TickProvider interface -type MockTickProvider struct { - tickChan chan time.Time - currentDuration time.Duration -} - -// NewMockTickProvider creates a new MockTickProvider -func NewMockTickProvider() *MockTickProvider { - return &MockTickProvider{ - tickChan: make(chan time.Time, 1), - } -} - -// C returns the channel on which ticks are delivered -func (m *MockTickProvider) C() <-chan time.Time { - return m.tickChan +// FetchConsumeCount atomically fetches the consume count +func (c *TestConsumer) FetchConsumeCount() int32 { + c.mu.Lock() + defer c.mu.Unlock() + return c.consumeCount } -// Stop stops the ticker -func (m *MockTickProvider) Stop() { - close(m.tickChan) +// UpdateConsumeCount atomically updates the consume count +func (c *TestConsumer) UpdateConsumeCount() { + c.mu.Lock() + defer c.mu.Unlock() + c.consumeCount++ } -// Tick manually sends a tick to the channel -func (m *MockTickProvider) Tick() { - m.tickChan <- time.Now() -} - -func (m *MockTickProvider) Reset(d time.Duration) { - m.currentDuration = d +// Mock IP update +func (c *TestConsumer) UpdateIPsForNodeSubnet(_ netip.Addr, _ []netip.Addr) error { + c.UpdateConsumeCount() + return nil } -var _ nodesubnet.TickProvider = &MockTickProvider{} +var _ nodesubnet.IPConsumer = &TestConsumer{} func TestRefresh(t *testing.T) { clientPtr := &TestClient{} consumerPtr := &TestConsumer{} fetcher := nodesubnet.NewIPFetcher(clientPtr, consumerPtr, 0, 0) - ticker := NewMockTickProvider() + ticker := refreshticker.NewMockTickProvider() fetcher.SetTicker(ticker) ctx, cancel := testContext(t) defer cancel() @@ -85,12 +81,12 @@ func TestRefresh(t *testing.T) { ticker.Tick() // This call will block until the prevous tick is read // At least 2 refreshes - one initial and one after the first tick should be done - if atomic.LoadInt32(&clientPtr.refreshCount) < 2 { + if clientPtr.FetchRefreshCount() < 2 { t.Error("Not enough refreshes") } // No consumes, since the responses are empty - if atomic.LoadInt32(&consumerPtr.consumeCount) > 0 { + if consumerPtr.FetchConsumeCount() > 0 { t.Error("Consume called unexpectedly, shouldn't be called since responses are empty") } } @@ -100,7 +96,7 @@ func TestIntervalUpdate(t *testing.T) { consumerPtr := &TestConsumer{} fetcher := nodesubnet.NewIPFetcher(clientPtr, consumerPtr, 0, 0) interval := fetcher.GetCurrentQueryInterval() - ticker := NewMockTickProvider() + ticker := refreshticker.NewMockTickProvider() fetcher.SetTicker(ticker) if interval != nodesubnet.DefaultMinRefreshInterval { @@ -113,7 +109,7 @@ func TestIntervalUpdate(t *testing.T) { if interval == nodesubnet.DefaultMaxRefreshInterval { exp = nodesubnet.DefaultMaxRefreshInterval } - if fetcher.GetCurrentQueryInterval() != exp || ticker.currentDuration != exp { + if fetcher.GetCurrentQueryInterval() != exp || ticker.GetCurrentDuration() != exp { t.Error("Interval not updated correctly") } else { interval = exp @@ -122,7 +118,7 @@ func TestIntervalUpdate(t *testing.T) { fetcher.UpdateFetchIntervalForObservedDiff() - if fetcher.GetCurrentQueryInterval() != nodesubnet.DefaultMinRefreshInterval || ticker.currentDuration != nodesubnet.DefaultMinRefreshInterval { + if fetcher.GetCurrentQueryInterval() != nodesubnet.DefaultMinRefreshInterval || ticker.GetCurrentDuration() != nodesubnet.DefaultMinRefreshInterval { t.Error("Observed diff update incorrect") } } diff --git a/refreshticker/mocktickprovider.go b/refreshticker/mocktickprovider.go new file mode 100644 index 0000000000..a5ae3f88d6 --- /dev/null +++ b/refreshticker/mocktickprovider.go @@ -0,0 +1,41 @@ +package refreshticker + +import "time" + +// MockTickProvider is a mock implementation of the TickProvider interface +type MockTickProvider struct { + tickChan chan time.Time + currentDuration time.Duration +} + +// NewMockTickProvider creates a new MockTickProvider +func NewMockTickProvider() *MockTickProvider { + return &MockTickProvider{ + tickChan: make(chan time.Time, 1), + } +} + +// C returns the channel on which ticks are delivered +func (m *MockTickProvider) C() <-chan time.Time { + return m.tickChan +} + +// Stop stops the ticker +func (m *MockTickProvider) Stop() { + close(m.tickChan) +} + +// Tick manually sends a tick to the channel +func (m *MockTickProvider) Tick() { + m.tickChan <- time.Now() +} + +func (m *MockTickProvider) Reset(d time.Duration) { + m.currentDuration = d +} + +func (m *MockTickProvider) GetCurrentDuration() time.Duration { + return m.currentDuration +} + +var _ TickProvider = &MockTickProvider{} diff --git a/cns/nodesubnet/refreshticker.go b/refreshticker/refreshticker.go similarity index 79% rename from cns/nodesubnet/refreshticker.go rename to refreshticker/refreshticker.go index 470b1002f2..0c7f248024 100644 --- a/cns/nodesubnet/refreshticker.go +++ b/refreshticker/refreshticker.go @@ -1,8 +1,8 @@ -package nodesubnet +package refreshticker import "time" -// TickProvider defines a wrapper for time.Ticker +// TickProvider defines an interface for a type that provides a channel that ticks at a regular interval type TickProvider interface { Stop() Reset(d time.Duration) @@ -16,7 +16,7 @@ type TimedTickProvider struct { var _ TickProvider = &TimedTickProvider{} -// NewTickerWrapper creates a new TickerWrapper +// NewTimedTickProvider creates a new TimedTickProvider func NewTimedTickProvider(d time.Duration) *TimedTickProvider { return &TimedTickProvider{ticker: time.NewTicker(d)} } From 3b07378166a5a82243a5a79fbc56ed1128caa9b6 Mon Sep 17 00:00:00 2001 From: Santhosh Prabhu Date: Wed, 25 Sep 2024 10:28:07 -0700 Subject: [PATCH 09/28] refactor: ignore primary ip --- cns/nodesubnet/ip_fetcher.go | 6 +++--- cns/nodesubnet/ip_fetcher_test.go | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/cns/nodesubnet/ip_fetcher.go b/cns/nodesubnet/ip_fetcher.go index 72042b88db..e11bbea742 100644 --- a/cns/nodesubnet/ip_fetcher.go +++ b/cns/nodesubnet/ip_fetcher.go @@ -27,7 +27,7 @@ type InterfaceRetriever interface { // IPConsumer is an interface implemented by whoever consumes the secondary IPs fetched in nodesubnet type IPConsumer interface { - UpdateIPsForNodeSubnet(netip.Addr, []netip.Addr) error + UpdateIPsForNodeSubnet([]netip.Addr) error } // IPFetcher fetches secondary IPs from NMAgent at regular intervals. The @@ -133,8 +133,8 @@ func (c *IPFetcher) RefreshSecondaryIPs(ctx context.Context) error { return errors.New("no interfaces found in response from NMAgent") } - primaryIP, secondaryIPs := flattenIPListFromResponse(&response) - err = c.consumer.UpdateIPsForNodeSubnet(primaryIP, secondaryIPs) + _, secondaryIPs := flattenIPListFromResponse(&response) + err = c.consumer.UpdateIPsForNodeSubnet(secondaryIPs) if err != nil { return errors.Wrap(err, "updating secondary IPs") } diff --git a/cns/nodesubnet/ip_fetcher_test.go b/cns/nodesubnet/ip_fetcher_test.go index b69ec0bfd7..bb889569e0 100644 --- a/cns/nodesubnet/ip_fetcher_test.go +++ b/cns/nodesubnet/ip_fetcher_test.go @@ -60,7 +60,7 @@ func (c *TestConsumer) UpdateConsumeCount() { } // Mock IP update -func (c *TestConsumer) UpdateIPsForNodeSubnet(_ netip.Addr, _ []netip.Addr) error { +func (c *TestConsumer) UpdateIPsForNodeSubnet(_ []netip.Addr) error { c.UpdateConsumeCount() return nil } From 9ad80f517ac83e137519e50c4c6770b13d1369a6 Mon Sep 17 00:00:00 2001 From: Santhosh Prabhu Date: Thu, 26 Sep 2024 21:13:01 -0700 Subject: [PATCH 10/28] refactor: move refresh out of ipfetcher --- cns/nodesubnet/helper_for_ip_fetcher_test.go | 18 --- cns/nodesubnet/ip_fetcher.go | 80 +++---------- cns/nodesubnet/ip_fetcher_test.go | 32 ----- cns/refresh/fetcher.go | 107 +++++++++++++++++ cns/refresh/fetcher_test.go | 111 ++++++++++++++++++ cns/refresh/helper_for_fetcher_test.go | 11 ++ .../refresh}/mocktickprovider.go | 2 +- .../refresh}/refreshticker.go | 2 +- 8 files changed, 245 insertions(+), 118 deletions(-) delete mode 100644 cns/nodesubnet/helper_for_ip_fetcher_test.go create mode 100644 cns/refresh/fetcher.go create mode 100644 cns/refresh/fetcher_test.go create mode 100644 cns/refresh/helper_for_fetcher_test.go rename {refreshticker => cns/refresh}/mocktickprovider.go (97%) rename {refreshticker => cns/refresh}/refreshticker.go (97%) diff --git a/cns/nodesubnet/helper_for_ip_fetcher_test.go b/cns/nodesubnet/helper_for_ip_fetcher_test.go deleted file mode 100644 index 11fcc9d8af..0000000000 --- a/cns/nodesubnet/helper_for_ip_fetcher_test.go +++ /dev/null @@ -1,18 +0,0 @@ -package nodesubnet - -import ( - "time" - - "github.com/Azure/azure-container-networking/refreshticker" -) - -// These methods is in this file (_test.go) because they are helpers. They are -// built during tests, and are not part of the main code. - -func (c *IPFetcher) GetCurrentQueryInterval() time.Duration { - return c.tickerInterval -} - -func (c *IPFetcher) SetTicker(tickProvider refreshticker.TickProvider) { - c.ticker = tickProvider -} diff --git a/cns/nodesubnet/ip_fetcher.go b/cns/nodesubnet/ip_fetcher.go index e11bbea742..df54755603 100644 --- a/cns/nodesubnet/ip_fetcher.go +++ b/cns/nodesubnet/ip_fetcher.go @@ -7,7 +7,7 @@ import ( "time" "github.com/Azure/azure-container-networking/nmagent" - "github.com/Azure/azure-container-networking/refreshticker" + "github.com/Azure/azure-container-networking/refresh" "github.com/pkg/errors" ) @@ -37,12 +37,9 @@ type IPConsumer interface { // interval resets to the minimum. type IPFetcher struct { // Node subnet config - intfFetcherClient InterfaceRetriever - ticker refreshticker.TickProvider - tickerInterval time.Duration - consumer IPConsumer - minRefreshInterval time.Duration - maxRefreshInterval time.Duration + intfFetcherClient InterfaceRetriever + consumer IPConsumer + fetcher *refresh.Fetcher[nmagent.Interfaces] } // NewIPFetcher creates a new IPFetcher. If minInterval is 0, it will default to 4 seconds. @@ -63,78 +60,29 @@ func NewIPFetcher( maxInterval = max(maxInterval, minInterval) - return &IPFetcher{ - intfFetcherClient: client, - consumer: consumer, - minRefreshInterval: minInterval, - maxRefreshInterval: maxInterval, - tickerInterval: minInterval, - } -} - -// UpdateFetchIntervalForNoObservedDiff informs IPFetcher that no diff was observed in the last fetch. -// In the current design, this doubles the fetch interval, subject to the maximum interval. -func (c *IPFetcher) UpdateFetchIntervalForNoObservedDiff() { - c.tickerInterval = min(c.tickerInterval*2, c.maxRefreshInterval) //nolint:gomnd // doubling interval - - if c.ticker != nil { - c.ticker.Reset(c.tickerInterval) - } -} - -// UpdateFetchIntervalForNoObservedDiff informs IPFetcher that a diff was observed in the last fetch. -// In the current design, this resets the fetch interval to the minimum. -func (c *IPFetcher) UpdateFetchIntervalForObservedDiff() { - c.tickerInterval = c.minRefreshInterval - - if c.ticker != nil { - c.ticker.Reset(c.tickerInterval) + newIPFetcher := &IPFetcher{ + intfFetcherClient: client, + consumer: consumer, + fetcher: nil, } + fetcher := refresh.NewFetcher[nmagent.Interfaces](client.GetInterfaceIPInfo, minInterval, maxInterval, newIPFetcher.ProcessInterfaces) + newIPFetcher.fetcher = fetcher + return newIPFetcher } // Start the IPFetcher. func (c *IPFetcher) Start(ctx context.Context) { - go func() { - // Do an initial fetch - err := c.RefreshSecondaryIPs(ctx) - if err != nil { - log.Printf("Error refreshing secondary IPs: %v", err) - } - - if c.ticker == nil { - c.ticker = refreshticker.NewTimedTickProvider(c.tickerInterval) - } - - defer c.ticker.Stop() - - for { - select { - case <-c.ticker.C(): - err := c.RefreshSecondaryIPs(ctx) - if err != nil { - log.Printf("Error refreshing secondary IPs: %v", err) - } - case <-ctx.Done(): - log.Printf("IPFetcher stopped") - return - } - } - }() + c.fetcher.Start(ctx) } // Fetch IPs from NMAgent and pass to the consumer -func (c *IPFetcher) RefreshSecondaryIPs(ctx context.Context) error { - response, err := c.intfFetcherClient.GetInterfaceIPInfo(ctx) - if err != nil { - return errors.Wrap(err, "getting interface IPs") - } - +func (c *IPFetcher) ProcessInterfaces(response nmagent.Interfaces) error { if len(response.Entries) == 0 { return errors.New("no interfaces found in response from NMAgent") } _, secondaryIPs := flattenIPListFromResponse(&response) - err = c.consumer.UpdateIPsForNodeSubnet(secondaryIPs) + err := c.consumer.UpdateIPsForNodeSubnet(secondaryIPs) if err != nil { return errors.Wrap(err, "updating secondary IPs") } diff --git a/cns/nodesubnet/ip_fetcher_test.go b/cns/nodesubnet/ip_fetcher_test.go index bb889569e0..88fccda13e 100644 --- a/cns/nodesubnet/ip_fetcher_test.go +++ b/cns/nodesubnet/ip_fetcher_test.go @@ -91,38 +91,6 @@ func TestRefresh(t *testing.T) { } } -func TestIntervalUpdate(t *testing.T) { - clientPtr := &TestClient{} - consumerPtr := &TestConsumer{} - fetcher := nodesubnet.NewIPFetcher(clientPtr, consumerPtr, 0, 0) - interval := fetcher.GetCurrentQueryInterval() - ticker := refreshticker.NewMockTickProvider() - fetcher.SetTicker(ticker) - - if interval != nodesubnet.DefaultMinRefreshInterval { - t.Error("Default min interval not used") - } - - for i := 1; i <= 10; i++ { - fetcher.UpdateFetchIntervalForNoObservedDiff() - exp := interval * 2 - if interval == nodesubnet.DefaultMaxRefreshInterval { - exp = nodesubnet.DefaultMaxRefreshInterval - } - if fetcher.GetCurrentQueryInterval() != exp || ticker.GetCurrentDuration() != exp { - t.Error("Interval not updated correctly") - } else { - interval = exp - } - } - - fetcher.UpdateFetchIntervalForObservedDiff() - - if fetcher.GetCurrentQueryInterval() != nodesubnet.DefaultMinRefreshInterval || ticker.GetCurrentDuration() != nodesubnet.DefaultMinRefreshInterval { - t.Error("Observed diff update incorrect") - } -} - // testContext creates a context from the provided testing.T that will be // canceled if the test suite is terminated. func testContext(t *testing.T) (context.Context, context.CancelFunc) { diff --git a/cns/refresh/fetcher.go b/cns/refresh/fetcher.go new file mode 100644 index 0000000000..9e71f3e278 --- /dev/null +++ b/cns/refresh/fetcher.go @@ -0,0 +1,107 @@ +package refresh + +import ( + "context" + "reflect" + "time" + + "github.com/Azure/azure-container-networking/cns/logger" +) + +const ( + DefaultMinInterval = 4 * time.Second + DefaultMaxInterval = 1024 * time.Second +) + +// Fetcher fetches data at regular intervals. The interval will vary within the range of minInterval and +// maxInterval. When no diff is observed after a fetch, the interval doubles (subject to the maximum interval). +// When a diff is observed, the interval resets to the minimum. + +type Fetcher[T any] struct { + fetchFunc func(context.Context) (T, error) + cache T + minInterval time.Duration + maxInterval time.Duration + currentInterval time.Duration + ticker TickProvider + consumeFunc func(T) error +} + +// NewFetcher creates a new Fetcher. If minInterval is 0, it will default to 4 seconds. +func NewFetcher[T any](fetchFunc func(context.Context) (T, error), minInterval, maxInterval time.Duration, consumeFunc func(T) error) *Fetcher[T] { + if minInterval == 0 { + minInterval = DefaultMinInterval + } + + if maxInterval == 0 { + maxInterval = DefaultMaxInterval + } + + maxInterval = max(minInterval, maxInterval) + + return &Fetcher[T]{ + fetchFunc: fetchFunc, + minInterval: minInterval, + maxInterval: maxInterval, + currentInterval: minInterval, + consumeFunc: consumeFunc, + } +} + +func (f *Fetcher[T]) Start(ctx context.Context) { + go func() { + // do an initial fetch + res, err := f.fetchFunc(ctx) + if err != nil { + logger.Printf("Error refreshing secondary IPs: %v", err) + } + + f.cache = res + if f.consumeFunc != nil { + if err := f.consumeFunc(res); err != nil { + logger.Errorf("Error consuming data: %v", err) + } + } + + if f.ticker == nil { + f.ticker = NewTimedTickProvider(f.currentInterval) + } + + defer f.ticker.Stop() + + for { + select { + case <-ctx.Done(): + logger.Printf("Fetcher stopped") + return + case <-f.ticker.C(): + result, err := f.fetchFunc(ctx) + if err != nil { + logger.Errorf("Error fetching data: %v", err) + } else { + if reflect.DeepEqual(result, f.cache) { + f.updateFetchIntervalForNoObservedDiff() + } else { + f.cache = result + f.updateFetchIntervalForObservedDiff() + if f.consumeFunc != nil { + if err := f.consumeFunc(result); err != nil { + logger.Errorf("Error consuming data: %v", err) + } + } + } + } + + f.ticker.Reset(f.currentInterval) + } + } + }() +} + +func (f *Fetcher[T]) updateFetchIntervalForNoObservedDiff() { + f.currentInterval = min(f.currentInterval*2, f.maxInterval) +} + +func (f *Fetcher[T]) updateFetchIntervalForObservedDiff() { + f.currentInterval = f.minInterval +} diff --git a/cns/refresh/fetcher_test.go b/cns/refresh/fetcher_test.go new file mode 100644 index 0000000000..4cc7b760f5 --- /dev/null +++ b/cns/refresh/fetcher_test.go @@ -0,0 +1,111 @@ +package refresh_test + +import ( + "context" + "sync" + "testing" + + "github.com/Azure/azure-container-networking/cns/nodesubnet" + "github.com/Azure/azure-container-networking/nmagent" + "github.com/Azure/azure-container-networking/refresh" +) + +// Mock client that simply tracks if refresh has been called +type TestClient struct { + refreshCount int32 + mu sync.Mutex +} + +// FetchRefreshCount atomically fetches the refresh count +func (c *TestClient) FetchRefreshCount() int32 { + c.mu.Lock() + defer c.mu.Unlock() + return c.refreshCount +} + +// UpdateRefreshCount atomically updates the refresh count +func (c *TestClient) UpdateRefreshCount() { + c.mu.Lock() + defer c.mu.Unlock() + c.refreshCount++ +} + +// Mock refresh +func (c *TestClient) GetInterfaceIPInfo(_ context.Context) (nmagent.Interfaces, error) { + c.UpdateRefreshCount() + return nmagent.Interfaces{}, nil +} + +var _ nodesubnet.InterfaceRetriever = &TestClient{} + +// Mock client that simply consumes fetched IPs +type TestConsumer struct { + consumeCount int32 + mu sync.Mutex +} + +// FetchConsumeCount atomically fetches the consume count +func (c *TestConsumer) FetchConsumeCount() int32 { + c.mu.Lock() + defer c.mu.Unlock() + return c.consumeCount +} + +// UpdateConsumeCount atomically updates the consume count +func (c *TestConsumer) UpdateConsumeCount() { + c.mu.Lock() + defer c.mu.Unlock() + c.consumeCount++ +} + +// Mock IP update +func (c *TestConsumer) ConsumeInterfaces(nmagent.Interfaces) error { + c.UpdateConsumeCount() + return nil +} + +func TestRefresh(t *testing.T) { + clientPtr := &TestClient{} + consumerPtr := &TestConsumer{} + fetcher := refresh.NewFetcher[nmagent.Interfaces](clientPtr.GetInterfaceIPInfo, 0, 0, consumerPtr.ConsumeInterfaces) + ticker := refresh.NewMockTickProvider() + fetcher.SetTicker(ticker) + ctx, cancel := testContext(t) + defer cancel() + fetcher.Start(ctx) + ticker.Tick() // Trigger a refresh + ticker.Tick() // This tick will be read only after previous refresh is done + ticker.Tick() // This call will block until the prevous tick is read + + // At least 2 refreshes - one initial and one after the first tick should be done + if clientPtr.FetchRefreshCount() < 2 { + t.Error("Not enough refreshes") + } + + // At least 2 consumes - one initial and one after the first tick should be done + if consumerPtr.FetchConsumeCount() < 0 { + t.Error("Not enough consumes") + } +} + +func TestInterval(t *testing.T) { + clientPtr := &TestClient{} + consumerPtr := &TestConsumer{} + fetcher := refresh.NewFetcher[nmagent.Interfaces](clientPtr.GetInterfaceIPInfo, 0, 0, consumerPtr.ConsumeInterfaces) + interval := fetcher.GetCurrentInterval() + + if interval != refresh.DefaultMinInterval { + t.Error("Default min interval not used") + } + + // Testing that the interval doubles will require making the interval thread-safe. Not doing that to avoid performance hit. +} + +// testContext creates a context from the provided testing.T that will be +// canceled if the test suite is terminated. +func testContext(t *testing.T) (context.Context, context.CancelFunc) { + if deadline, ok := t.Deadline(); ok { + return context.WithDeadline(context.Background(), deadline) + } + return context.WithCancel(context.Background()) +} diff --git a/cns/refresh/helper_for_fetcher_test.go b/cns/refresh/helper_for_fetcher_test.go new file mode 100644 index 0000000000..add0afa3c8 --- /dev/null +++ b/cns/refresh/helper_for_fetcher_test.go @@ -0,0 +1,11 @@ +package refresh + +import "time" + +func (f *Fetcher[T]) GetCurrentInterval() time.Duration { + return f.currentInterval +} + +func (f *Fetcher[T]) SetTicker(t TickProvider) { + f.ticker = t +} diff --git a/refreshticker/mocktickprovider.go b/cns/refresh/mocktickprovider.go similarity index 97% rename from refreshticker/mocktickprovider.go rename to cns/refresh/mocktickprovider.go index a5ae3f88d6..34b4190b50 100644 --- a/refreshticker/mocktickprovider.go +++ b/cns/refresh/mocktickprovider.go @@ -1,4 +1,4 @@ -package refreshticker +package refresh import "time" diff --git a/refreshticker/refreshticker.go b/cns/refresh/refreshticker.go similarity index 97% rename from refreshticker/refreshticker.go rename to cns/refresh/refreshticker.go index 0c7f248024..20ad268718 100644 --- a/refreshticker/refreshticker.go +++ b/cns/refresh/refreshticker.go @@ -1,4 +1,4 @@ -package refreshticker +package refresh import "time" From 62daac481832d24f967ab99dfc2eb656ba1c279a Mon Sep 17 00:00:00 2001 From: Santhosh Prabhu Date: Thu, 26 Sep 2024 22:49:17 -0700 Subject: [PATCH 11/28] test: add ip fetcher tests --- cns/nodesubnet/ip_fetcher.go | 2 +- cns/nodesubnet/ip_fetcher_test.go | 119 +++++++++++++++--------------- 2 files changed, 59 insertions(+), 62 deletions(-) diff --git a/cns/nodesubnet/ip_fetcher.go b/cns/nodesubnet/ip_fetcher.go index df54755603..330213caa3 100644 --- a/cns/nodesubnet/ip_fetcher.go +++ b/cns/nodesubnet/ip_fetcher.go @@ -6,8 +6,8 @@ import ( "net/netip" "time" + "github.com/Azure/azure-container-networking/cns/refresh" "github.com/Azure/azure-container-networking/nmagent" - "github.com/Azure/azure-container-networking/refresh" "github.com/pkg/errors" ) diff --git a/cns/nodesubnet/ip_fetcher_test.go b/cns/nodesubnet/ip_fetcher_test.go index 88fccda13e..aaac1f3414 100644 --- a/cns/nodesubnet/ip_fetcher_test.go +++ b/cns/nodesubnet/ip_fetcher_test.go @@ -3,87 +3,54 @@ package nodesubnet_test import ( "context" "net/netip" - "sync" "testing" "github.com/Azure/azure-container-networking/cns/nodesubnet" "github.com/Azure/azure-container-networking/nmagent" - "github.com/Azure/azure-container-networking/refreshticker" ) -// Mock client that simply tracks if refresh has been called -type TestClient struct { - refreshCount int32 - mu sync.Mutex -} - -// FetchRefreshCount atomically fetches the refresh count -func (c *TestClient) FetchRefreshCount() int32 { - c.mu.Lock() - defer c.mu.Unlock() - return c.refreshCount -} - -// UpdateRefreshCount atomically updates the refresh count -func (c *TestClient) UpdateRefreshCount() { - c.mu.Lock() - defer c.mu.Unlock() - c.refreshCount++ -} - -// Mock refresh -func (c *TestClient) GetInterfaceIPInfo(_ context.Context) (nmagent.Interfaces, error) { - c.UpdateRefreshCount() - return nmagent.Interfaces{}, nil -} - -var _ nodesubnet.InterfaceRetriever = &TestClient{} - // Mock client that simply consumes fetched IPs type TestConsumer struct { - consumeCount int32 - mu sync.Mutex + consumeCount int32 + secondaryIPCount int32 } // FetchConsumeCount atomically fetches the consume count func (c *TestConsumer) FetchConsumeCount() int32 { - c.mu.Lock() - defer c.mu.Unlock() + return c.consumeCount +} + +// FetchSecondaryIPCount atomically fetches the last IP count +func (c *TestConsumer) FetchSecondaryIPCount() int32 { return c.consumeCount } // UpdateConsumeCount atomically updates the consume count -func (c *TestConsumer) UpdateConsumeCount() { - c.mu.Lock() - defer c.mu.Unlock() +func (c *TestConsumer) updateCounts(ipCount int32) { c.consumeCount++ + c.secondaryIPCount = ipCount } // Mock IP update -func (c *TestConsumer) UpdateIPsForNodeSubnet(_ []netip.Addr) error { - c.UpdateConsumeCount() +func (c *TestConsumer) UpdateIPsForNodeSubnet(ips []netip.Addr) error { + c.updateCounts(int32(len(ips))) return nil } var _ nodesubnet.IPConsumer = &TestConsumer{} -func TestRefresh(t *testing.T) { - clientPtr := &TestClient{} - consumerPtr := &TestConsumer{} - fetcher := nodesubnet.NewIPFetcher(clientPtr, consumerPtr, 0, 0) - ticker := refreshticker.NewMockTickProvider() - fetcher.SetTicker(ticker) - ctx, cancel := testContext(t) - defer cancel() - fetcher.Start(ctx) - ticker.Tick() // Trigger a refresh - ticker.Tick() // This tick will be read only after previous refresh is done - ticker.Tick() // This call will block until the prevous tick is read +// Mock client that simply satisfies the interface +type TestClient struct{} - // At least 2 refreshes - one initial and one after the first tick should be done - if clientPtr.FetchRefreshCount() < 2 { - t.Error("Not enough refreshes") - } +// Mock refresh +func (c *TestClient) GetInterfaceIPInfo(_ context.Context) (nmagent.Interfaces, error) { + return nmagent.Interfaces{}, nil +} + +func TestEmptyResponse(t *testing.T) { + consumerPtr := &TestConsumer{} + fetcher := nodesubnet.NewIPFetcher(&TestClient{}, consumerPtr, 0, 0) + fetcher.ProcessInterfaces(nmagent.Interfaces{}) // No consumes, since the responses are empty if consumerPtr.FetchConsumeCount() > 0 { @@ -91,11 +58,41 @@ func TestRefresh(t *testing.T) { } } -// testContext creates a context from the provided testing.T that will be -// canceled if the test suite is terminated. -func testContext(t *testing.T) (context.Context, context.CancelFunc) { - if deadline, ok := t.Deadline(); ok { - return context.WithDeadline(context.Background(), deadline) +func TestFlatten(t *testing.T) { + interfaces := nmagent.Interfaces{ + Entries: []nmagent.Interface{ + { + MacAddress: nmagent.MACAddress{0x00, 0x0D, 0x3A, 0xF9, 0xDC, 0xA6}, + IsPrimary: true, + InterfaceSubnets: []nmagent.InterfaceSubnet{ + { + Prefix: "10.240.0.0/16", + IPAddress: []nmagent.NodeIP{ + { + Address: nmagent.IPAddress(netip.AddrFrom4([4]byte{10, 240, 0, 5})), + IsPrimary: true, + }, + { + Address: nmagent.IPAddress(netip.AddrFrom4([4]byte{10, 240, 0, 6})), + IsPrimary: false, + }, + }, + }, + }, + }, + }, + } + consumerPtr := &TestConsumer{} + fetcher := nodesubnet.NewIPFetcher(&TestClient{}, consumerPtr, 0, 0) + fetcher.ProcessInterfaces(interfaces) + + // 1 consume to be called + if consumerPtr.FetchConsumeCount() != 1 { + t.Error("Consume expected to be called, but not called") + } + + // 1 consume to be called + if consumerPtr.FetchSecondaryIPCount() != 1 { + t.Error("Wrong number of secondary IPs ", consumerPtr.FetchSecondaryIPCount()) } - return context.WithCancel(context.Background()) } From 72b685d0ce5af3d07e4a5a5ee3fc06edcf9c1f3c Mon Sep 17 00:00:00 2001 From: Santhosh Prabhu Date: Thu, 26 Sep 2024 23:28:01 -0700 Subject: [PATCH 12/28] fix: remove broken import --- cns/refresh/fetcher_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/cns/refresh/fetcher_test.go b/cns/refresh/fetcher_test.go index 4cc7b760f5..1f2eaafcaf 100644 --- a/cns/refresh/fetcher_test.go +++ b/cns/refresh/fetcher_test.go @@ -7,7 +7,6 @@ import ( "github.com/Azure/azure-container-networking/cns/nodesubnet" "github.com/Azure/azure-container-networking/nmagent" - "github.com/Azure/azure-container-networking/refresh" ) // Mock client that simply tracks if refresh has been called From 90c56b708c7dae54d6a73ce9e8399e2c8c4c3513 Mon Sep 17 00:00:00 2001 From: Santhosh Prabhu Date: Thu, 26 Sep 2024 23:32:38 -0700 Subject: [PATCH 13/28] fix: fix import --- cns/refresh/fetcher_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cns/refresh/fetcher_test.go b/cns/refresh/fetcher_test.go index 1f2eaafcaf..dbb7348882 100644 --- a/cns/refresh/fetcher_test.go +++ b/cns/refresh/fetcher_test.go @@ -6,6 +6,7 @@ import ( "testing" "github.com/Azure/azure-container-networking/cns/nodesubnet" + "github.com/Azure/azure-container-networking/cns/refresh" "github.com/Azure/azure-container-networking/nmagent" ) From a6f48897850221b666abc845b9dedaf4b7d07c87 Mon Sep 17 00:00:00 2001 From: Santhosh Prabhu Date: Fri, 27 Sep 2024 00:24:41 -0700 Subject: [PATCH 14/28] fix: fix linting --- cns/nodesubnet/ip_fetcher_test.go | 10 ++++++++-- cns/refresh/fetcher.go | 2 +- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/cns/nodesubnet/ip_fetcher_test.go b/cns/nodesubnet/ip_fetcher_test.go index aaac1f3414..6a2e89bc3c 100644 --- a/cns/nodesubnet/ip_fetcher_test.go +++ b/cns/nodesubnet/ip_fetcher_test.go @@ -50,7 +50,10 @@ func (c *TestClient) GetInterfaceIPInfo(_ context.Context) (nmagent.Interfaces, func TestEmptyResponse(t *testing.T) { consumerPtr := &TestConsumer{} fetcher := nodesubnet.NewIPFetcher(&TestClient{}, consumerPtr, 0, 0) - fetcher.ProcessInterfaces(nmagent.Interfaces{}) + err := fetcher.ProcessInterfaces(nmagent.Interfaces{}) + if err != nil { + t.Error("Error processing empty interfaces") + } // No consumes, since the responses are empty if consumerPtr.FetchConsumeCount() > 0 { @@ -84,7 +87,10 @@ func TestFlatten(t *testing.T) { } consumerPtr := &TestConsumer{} fetcher := nodesubnet.NewIPFetcher(&TestClient{}, consumerPtr, 0, 0) - fetcher.ProcessInterfaces(interfaces) + err := fetcher.ProcessInterfaces(interfaces) + if err != nil { + t.Error("Error processing interfaces") + } // 1 consume to be called if consumerPtr.FetchConsumeCount() != 1 { diff --git a/cns/refresh/fetcher.go b/cns/refresh/fetcher.go index 9e71f3e278..03dded36b3 100644 --- a/cns/refresh/fetcher.go +++ b/cns/refresh/fetcher.go @@ -99,7 +99,7 @@ func (f *Fetcher[T]) Start(ctx context.Context) { } func (f *Fetcher[T]) updateFetchIntervalForNoObservedDiff() { - f.currentInterval = min(f.currentInterval*2, f.maxInterval) + f.currentInterval = min(f.currentInterval*2, f.maxInterval) // nolint:gomnd // doubling logic } func (f *Fetcher[T]) updateFetchIntervalForObservedDiff() { From 7ed21aeaf7e37a2124a2414c791d9cd14c529815 Mon Sep 17 00:00:00 2001 From: Santhosh Prabhu Date: Fri, 27 Sep 2024 09:36:16 -0700 Subject: [PATCH 15/28] fix: fix some failing tests --- cns/nodesubnet/ip_fetcher_test.go | 25 +++++++++++++++++++------ cns/refresh/fetcher_test.go | 5 +++++ 2 files changed, 24 insertions(+), 6 deletions(-) diff --git a/cns/nodesubnet/ip_fetcher_test.go b/cns/nodesubnet/ip_fetcher_test.go index 6a2e89bc3c..fba0328c0e 100644 --- a/cns/nodesubnet/ip_fetcher_test.go +++ b/cns/nodesubnet/ip_fetcher_test.go @@ -5,6 +5,7 @@ import ( "net/netip" "testing" + "github.com/Azure/azure-container-networking/cns/logger" "github.com/Azure/azure-container-networking/cns/nodesubnet" "github.com/Azure/azure-container-networking/nmagent" ) @@ -51,9 +52,7 @@ func TestEmptyResponse(t *testing.T) { consumerPtr := &TestConsumer{} fetcher := nodesubnet.NewIPFetcher(&TestClient{}, consumerPtr, 0, 0) err := fetcher.ProcessInterfaces(nmagent.Interfaces{}) - if err != nil { - t.Error("Error processing empty interfaces") - } + checkErr(t, err, true) // No consumes, since the responses are empty if consumerPtr.FetchConsumeCount() > 0 { @@ -88,9 +87,7 @@ func TestFlatten(t *testing.T) { consumerPtr := &TestConsumer{} fetcher := nodesubnet.NewIPFetcher(&TestClient{}, consumerPtr, 0, 0) err := fetcher.ProcessInterfaces(interfaces) - if err != nil { - t.Error("Error processing interfaces") - } + checkErr(t, err, false) // 1 consume to be called if consumerPtr.FetchConsumeCount() != 1 { @@ -102,3 +99,19 @@ func TestFlatten(t *testing.T) { t.Error("Wrong number of secondary IPs ", consumerPtr.FetchSecondaryIPCount()) } } + +// checkErr is an assertion of the presence or absence of an error +func checkErr(t *testing.T, err error, shouldErr bool) { + t.Helper() + if err != nil && !shouldErr { + t.Fatal("unexpected error: err:", err) + } + + if err == nil && shouldErr { + t.Fatal("expected error but received none") + } +} + +func init() { + logger.InitLogger("testlogs", 0, 0, "./") +} diff --git a/cns/refresh/fetcher_test.go b/cns/refresh/fetcher_test.go index dbb7348882..4e21d662e0 100644 --- a/cns/refresh/fetcher_test.go +++ b/cns/refresh/fetcher_test.go @@ -5,6 +5,7 @@ import ( "sync" "testing" + "github.com/Azure/azure-container-networking/cns/logger" "github.com/Azure/azure-container-networking/cns/nodesubnet" "github.com/Azure/azure-container-networking/cns/refresh" "github.com/Azure/azure-container-networking/nmagent" @@ -109,3 +110,7 @@ func testContext(t *testing.T) (context.Context, context.CancelFunc) { } return context.WithCancel(context.Background()) } + +func init() { + logger.InitLogger("testlogs", 0, 0, "./") +} From 7551a04bddbe025a0df690e5433aedecc785c72e Mon Sep 17 00:00:00 2001 From: Santhosh Prabhu Date: Fri, 27 Sep 2024 10:13:34 -0700 Subject: [PATCH 16/28] chore: Remove unused function --- cns/refresh/helper_for_fetcher_test.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/cns/refresh/helper_for_fetcher_test.go b/cns/refresh/helper_for_fetcher_test.go index add0afa3c8..fa6a6554eb 100644 --- a/cns/refresh/helper_for_fetcher_test.go +++ b/cns/refresh/helper_for_fetcher_test.go @@ -1,11 +1,5 @@ package refresh -import "time" - -func (f *Fetcher[T]) GetCurrentInterval() time.Duration { - return f.currentInterval -} - func (f *Fetcher[T]) SetTicker(t TickProvider) { f.ticker = t } From 2a2ae20658a27494baaa76e1f3e8fda29c46a0e6 Mon Sep 17 00:00:00 2001 From: Santhosh Prabhu Date: Fri, 27 Sep 2024 11:32:12 -0700 Subject: [PATCH 17/28] test: test updates --- cns/refresh/fetcher.go | 1 + cns/refresh/fetcher_test.go | 85 ++++++++++++++++++++++++++++--------- 2 files changed, 66 insertions(+), 20 deletions(-) diff --git a/cns/refresh/fetcher.go b/cns/refresh/fetcher.go index 03dded36b3..e15573174d 100644 --- a/cns/refresh/fetcher.go +++ b/cns/refresh/fetcher.go @@ -81,6 +81,7 @@ func (f *Fetcher[T]) Start(ctx context.Context) { } else { if reflect.DeepEqual(result, f.cache) { f.updateFetchIntervalForNoObservedDiff() + logger.Printf("No diff observed in fetch, not invoking the consumer") } else { f.cache = result f.updateFetchIntervalForObservedDiff() diff --git a/cns/refresh/fetcher_test.go b/cns/refresh/fetcher_test.go index 4e21d662e0..4585a8a6c5 100644 --- a/cns/refresh/fetcher_test.go +++ b/cns/refresh/fetcher_test.go @@ -2,6 +2,8 @@ package refresh_test import ( "context" + "fmt" + "net/netip" "sync" "testing" @@ -14,6 +16,7 @@ import ( // Mock client that simply tracks if refresh has been called type TestClient struct { refreshCount int32 + responses []nmagent.Interfaces mu sync.Mutex } @@ -33,8 +36,13 @@ func (c *TestClient) UpdateRefreshCount() { // Mock refresh func (c *TestClient) GetInterfaceIPInfo(_ context.Context) (nmagent.Interfaces, error) { - c.UpdateRefreshCount() - return nmagent.Interfaces{}, nil + defer c.UpdateRefreshCount() + + if c.refreshCount >= int32(len(c.responses)) { + return c.responses[len(c.responses)-1], nil + } + + return c.responses[c.refreshCount], nil } var _ nodesubnet.InterfaceRetriever = &TestClient{} @@ -60,13 +68,62 @@ func (c *TestConsumer) UpdateConsumeCount() { } // Mock IP update -func (c *TestConsumer) ConsumeInterfaces(nmagent.Interfaces) error { +func (c *TestConsumer) ConsumeInterfaces(intfs nmagent.Interfaces) error { + fmt.Printf("Consumed interfaces: %v\n", intfs) c.UpdateConsumeCount() return nil } func TestRefresh(t *testing.T) { - clientPtr := &TestClient{} + clientPtr := &TestClient{ + refreshCount: 0, + responses: []nmagent.Interfaces{ + { + Entries: []nmagent.Interface{ + { + MacAddress: nmagent.MACAddress{0x00, 0x0D, 0x3A, 0xF9, 0xDC, 0xA6}, + IsPrimary: true, + InterfaceSubnets: []nmagent.InterfaceSubnet{ + { + Prefix: "10.240.0.0/16", + IPAddress: []nmagent.NodeIP{ + { + Address: nmagent.IPAddress(netip.AddrFrom4([4]byte{10, 240, 0, 5})), + IsPrimary: true, + }, + { + Address: nmagent.IPAddress(netip.AddrFrom4([4]byte{10, 240, 0, 6})), + IsPrimary: false, + }, + }, + }, + }, + }, + }, + }, + { + Entries: []nmagent.Interface{ + { + MacAddress: nmagent.MACAddress{0x00, 0x0D, 0x3A, 0xF9, 0xDC, 0xA6}, + IsPrimary: true, + InterfaceSubnets: []nmagent.InterfaceSubnet{ + { + Prefix: "10.240.0.0/16", + IPAddress: []nmagent.NodeIP{ + { + Address: nmagent.IPAddress(netip.AddrFrom4([4]byte{10, 240, 0, 5})), + IsPrimary: true, + }, + }, + }, + }, + }, + }, + }, + }, + mu: sync.Mutex{}, + } + consumerPtr := &TestConsumer{} fetcher := refresh.NewFetcher[nmagent.Interfaces](clientPtr.GetInterfaceIPInfo, 0, 0, consumerPtr.ConsumeInterfaces) ticker := refresh.NewMockTickProvider() @@ -83,23 +140,11 @@ func TestRefresh(t *testing.T) { t.Error("Not enough refreshes") } - // At least 2 consumes - one initial and one after the first tick should be done - if consumerPtr.FetchConsumeCount() < 0 { - t.Error("Not enough consumes") - } -} - -func TestInterval(t *testing.T) { - clientPtr := &TestClient{} - consumerPtr := &TestConsumer{} - fetcher := refresh.NewFetcher[nmagent.Interfaces](clientPtr.GetInterfaceIPInfo, 0, 0, consumerPtr.ConsumeInterfaces) - interval := fetcher.GetCurrentInterval() - - if interval != refresh.DefaultMinInterval { - t.Error("Default min interval not used") + // Exactly 2 consumes - one initial and one after the first tick should be done (responses are different). + // Then no more, since the response is unchanged + if consumerPtr.FetchConsumeCount() != 2 { + t.Error("Exactly two consumes expected (for two different responses)") } - - // Testing that the interval doubles will require making the interval thread-safe. Not doing that to avoid performance hit. } // testContext creates a context from the provided testing.T that will be From f3e076cee05a28a5ff50f5b31039f03efd8aa55d Mon Sep 17 00:00:00 2001 From: Santhosh Prabhu Date: Tue, 1 Oct 2024 09:30:27 -0700 Subject: [PATCH 18/28] fix: address comments --- cns/nodesubnet/ip_fetcher.go | 5 ++-- cns/nodesubnet/ip_fetcher_test.go | 4 +-- {cns/refresh => refresh}/fetcher.go | 27 ++++++++++++------- {cns/refresh => refresh}/fetcher_test.go | 4 +-- .../helper_for_fetcher_test.go | 0 {cns/refresh => refresh}/mocktickprovider.go | 0 {cns/refresh => refresh}/refreshticker.go | 0 7 files changed, 24 insertions(+), 16 deletions(-) rename {cns/refresh => refresh}/fetcher.go (77%) rename {cns/refresh => refresh}/fetcher_test.go (97%) rename {cns/refresh => refresh}/helper_for_fetcher_test.go (100%) rename {cns/refresh => refresh}/mocktickprovider.go (100%) rename {cns/refresh => refresh}/refreshticker.go (100%) diff --git a/cns/nodesubnet/ip_fetcher.go b/cns/nodesubnet/ip_fetcher.go index 330213caa3..7457d529e8 100644 --- a/cns/nodesubnet/ip_fetcher.go +++ b/cns/nodesubnet/ip_fetcher.go @@ -6,8 +6,8 @@ import ( "net/netip" "time" - "github.com/Azure/azure-container-networking/cns/refresh" "github.com/Azure/azure-container-networking/nmagent" + "github.com/Azure/azure-container-networking/refresh" "github.com/pkg/errors" ) @@ -49,6 +49,7 @@ func NewIPFetcher( consumer IPConsumer, minInterval time.Duration, maxInterval time.Duration, + logger refresh.Logger, ) *IPFetcher { if minInterval == 0 { minInterval = DefaultMinRefreshInterval @@ -65,7 +66,7 @@ func NewIPFetcher( consumer: consumer, fetcher: nil, } - fetcher := refresh.NewFetcher[nmagent.Interfaces](client.GetInterfaceIPInfo, minInterval, maxInterval, newIPFetcher.ProcessInterfaces) + fetcher := refresh.NewFetcher[nmagent.Interfaces](client.GetInterfaceIPInfo, minInterval, maxInterval, newIPFetcher.ProcessInterfaces, logger) newIPFetcher.fetcher = fetcher return newIPFetcher } diff --git a/cns/nodesubnet/ip_fetcher_test.go b/cns/nodesubnet/ip_fetcher_test.go index fba0328c0e..3e5406fb46 100644 --- a/cns/nodesubnet/ip_fetcher_test.go +++ b/cns/nodesubnet/ip_fetcher_test.go @@ -50,7 +50,7 @@ func (c *TestClient) GetInterfaceIPInfo(_ context.Context) (nmagent.Interfaces, func TestEmptyResponse(t *testing.T) { consumerPtr := &TestConsumer{} - fetcher := nodesubnet.NewIPFetcher(&TestClient{}, consumerPtr, 0, 0) + fetcher := nodesubnet.NewIPFetcher(&TestClient{}, consumerPtr, 0, 0, logger.Log) err := fetcher.ProcessInterfaces(nmagent.Interfaces{}) checkErr(t, err, true) @@ -85,7 +85,7 @@ func TestFlatten(t *testing.T) { }, } consumerPtr := &TestConsumer{} - fetcher := nodesubnet.NewIPFetcher(&TestClient{}, consumerPtr, 0, 0) + fetcher := nodesubnet.NewIPFetcher(&TestClient{}, consumerPtr, 0, 0, logger.Log) err := fetcher.ProcessInterfaces(interfaces) checkErr(t, err, false) diff --git a/cns/refresh/fetcher.go b/refresh/fetcher.go similarity index 77% rename from cns/refresh/fetcher.go rename to refresh/fetcher.go index e15573174d..b1f250078d 100644 --- a/cns/refresh/fetcher.go +++ b/refresh/fetcher.go @@ -2,10 +2,9 @@ package refresh import ( "context" - "reflect" "time" - "github.com/Azure/azure-container-networking/cns/logger" + "github.com/google/go-cmp/cmp" ) const ( @@ -25,10 +24,17 @@ type Fetcher[T any] struct { currentInterval time.Duration ticker TickProvider consumeFunc func(T) error + logger Logger } // NewFetcher creates a new Fetcher. If minInterval is 0, it will default to 4 seconds. -func NewFetcher[T any](fetchFunc func(context.Context) (T, error), minInterval, maxInterval time.Duration, consumeFunc func(T) error) *Fetcher[T] { +func NewFetcher[T any]( + fetchFunc func(context.Context) (T, error), + minInterval time.Duration, + maxInterval time.Duration, + consumeFunc func(T) error, + logger Logger, +) *Fetcher[T] { if minInterval == 0 { minInterval = DefaultMinInterval } @@ -45,6 +51,7 @@ func NewFetcher[T any](fetchFunc func(context.Context) (T, error), minInterval, maxInterval: maxInterval, currentInterval: minInterval, consumeFunc: consumeFunc, + logger: logger, } } @@ -53,13 +60,13 @@ func (f *Fetcher[T]) Start(ctx context.Context) { // do an initial fetch res, err := f.fetchFunc(ctx) if err != nil { - logger.Printf("Error refreshing secondary IPs: %v", err) + f.logger.Printf("Error invoking fetch: %v", err) } f.cache = res if f.consumeFunc != nil { if err := f.consumeFunc(res); err != nil { - logger.Errorf("Error consuming data: %v", err) + f.logger.Errorf("Error consuming data: %v", err) } } @@ -72,22 +79,22 @@ func (f *Fetcher[T]) Start(ctx context.Context) { for { select { case <-ctx.Done(): - logger.Printf("Fetcher stopped") + f.logger.Printf("Fetcher stopped") return case <-f.ticker.C(): result, err := f.fetchFunc(ctx) if err != nil { - logger.Errorf("Error fetching data: %v", err) + f.logger.Errorf("Error fetching data: %v", err) } else { - if reflect.DeepEqual(result, f.cache) { + if cmp.Equal(result, f.cache) { f.updateFetchIntervalForNoObservedDiff() - logger.Printf("No diff observed in fetch, not invoking the consumer") + f.logger.Printf("No diff observed in fetch, not invoking the consumer") } else { f.cache = result f.updateFetchIntervalForObservedDiff() if f.consumeFunc != nil { if err := f.consumeFunc(result); err != nil { - logger.Errorf("Error consuming data: %v", err) + f.logger.Errorf("Error consuming data: %v", err) } } } diff --git a/cns/refresh/fetcher_test.go b/refresh/fetcher_test.go similarity index 97% rename from cns/refresh/fetcher_test.go rename to refresh/fetcher_test.go index 4585a8a6c5..ef81e6044d 100644 --- a/cns/refresh/fetcher_test.go +++ b/refresh/fetcher_test.go @@ -9,8 +9,8 @@ import ( "github.com/Azure/azure-container-networking/cns/logger" "github.com/Azure/azure-container-networking/cns/nodesubnet" - "github.com/Azure/azure-container-networking/cns/refresh" "github.com/Azure/azure-container-networking/nmagent" + "github.com/Azure/azure-container-networking/refresh" ) // Mock client that simply tracks if refresh has been called @@ -125,7 +125,7 @@ func TestRefresh(t *testing.T) { } consumerPtr := &TestConsumer{} - fetcher := refresh.NewFetcher[nmagent.Interfaces](clientPtr.GetInterfaceIPInfo, 0, 0, consumerPtr.ConsumeInterfaces) + fetcher := refresh.NewFetcher[nmagent.Interfaces](clientPtr.GetInterfaceIPInfo, 0, 0, consumerPtr.ConsumeInterfaces, logger.Log) ticker := refresh.NewMockTickProvider() fetcher.SetTicker(ticker) ctx, cancel := testContext(t) diff --git a/cns/refresh/helper_for_fetcher_test.go b/refresh/helper_for_fetcher_test.go similarity index 100% rename from cns/refresh/helper_for_fetcher_test.go rename to refresh/helper_for_fetcher_test.go diff --git a/cns/refresh/mocktickprovider.go b/refresh/mocktickprovider.go similarity index 100% rename from cns/refresh/mocktickprovider.go rename to refresh/mocktickprovider.go diff --git a/cns/refresh/refreshticker.go b/refresh/refreshticker.go similarity index 100% rename from cns/refresh/refreshticker.go rename to refresh/refreshticker.go From d57810ac98e78d8d15fcf35b400456ee9f6ba281 Mon Sep 17 00:00:00 2001 From: Santhosh Prabhu Date: Tue, 1 Oct 2024 09:31:01 -0700 Subject: [PATCH 19/28] chore: add missed file --- refresh/logger.go | 8 ++++++++ 1 file changed, 8 insertions(+) create mode 100644 refresh/logger.go diff --git a/refresh/logger.go b/refresh/logger.go new file mode 100644 index 0000000000..d3a8ea66d0 --- /dev/null +++ b/refresh/logger.go @@ -0,0 +1,8 @@ +package refresh + +type Logger interface { + Debugf(format string, v ...interface{}) + Printf(format string, v ...interface{}) + Warnf(format string, v ...interface{}) + Errorf(format string, v ...interface{}) +} From 510d7cfc90730185d0653765dda20111c5782059 Mon Sep 17 00:00:00 2001 From: Santhosh Prabhu Date: Tue, 1 Oct 2024 09:50:23 -0700 Subject: [PATCH 20/28] chore: add comment about static interval --- refresh/fetcher.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/refresh/fetcher.go b/refresh/fetcher.go index b1f250078d..d781d88460 100644 --- a/refresh/fetcher.go +++ b/refresh/fetcher.go @@ -14,7 +14,8 @@ const ( // Fetcher fetches data at regular intervals. The interval will vary within the range of minInterval and // maxInterval. When no diff is observed after a fetch, the interval doubles (subject to the maximum interval). -// When a diff is observed, the interval resets to the minimum. +// When a diff is observed, the interval resets to the minimum. The interval can be made unchanging by setting +// minInterval and maxInterval to the same desired value. type Fetcher[T any] struct { fetchFunc func(context.Context) (T, error) From 64da2bc3921b00102a65e377d5dface876ab6b63 Mon Sep 17 00:00:00 2001 From: Santhosh Prabhu Date: Wed, 2 Oct 2024 13:56:04 -0700 Subject: [PATCH 21/28] feat: address Evan's comment to require Equal method on cached results --- nmagent/equality.go | 51 +++++++++++++++++++++++++++++++++++++++++++ nmagent/macaddress.go | 12 ++++++++++ refresh/fetcher.go | 8 +++---- 3 files changed, 66 insertions(+), 5 deletions(-) create mode 100644 nmagent/equality.go diff --git a/nmagent/equality.go b/nmagent/equality.go new file mode 100644 index 0000000000..67381e9897 --- /dev/null +++ b/nmagent/equality.go @@ -0,0 +1,51 @@ +package nmagent + +// Equal compares two Interfaces objects for equality. +func (i Interfaces) Equal(other Interfaces) bool { + if len(i.Entries) != len(other.Entries) { + return false + } + for idx, entry := range i.Entries { + if !entry.Equal(other.Entries[idx]) { + return false + } + } + return true +} + +// Equal compares two Interface objects for equality. +func (i Interface) Equal(other Interface) bool { + if len(i.InterfaceSubnets) != len(other.InterfaceSubnets) { + return false + } + for idx, subnet := range i.InterfaceSubnets { + if !subnet.Equal(other.InterfaceSubnets[idx]) { + return false + } + } + if i.IsPrimary != other.IsPrimary || !i.MacAddress.Equal(other.MacAddress) { + return false + } + return true +} + +// Equal compares two InterfaceSubnet objects for equality. +func (s InterfaceSubnet) Equal(other InterfaceSubnet) bool { + if len(s.IPAddress) != len(other.IPAddress) { + return false + } + if s.Prefix != other.Prefix { + return false + } + for idx, ip := range s.IPAddress { + if !ip.Equal(other.IPAddress[idx]) { + return false + } + } + return true +} + +// Equal compares two NodeIP objects for equality. +func (ip NodeIP) Equal(other NodeIP) bool { + return ip.IsPrimary == other.IsPrimary && ip.Address.Equal(other.Address) +} diff --git a/nmagent/macaddress.go b/nmagent/macaddress.go index 97c5385162..fa81afc7ef 100644 --- a/nmagent/macaddress.go +++ b/nmagent/macaddress.go @@ -14,6 +14,18 @@ const ( type MACAddress net.HardwareAddr +func (h MACAddress) Equal(other MACAddress) bool { + if len(h) != len(other) { + return false + } + for i := range h { + if h[i] != other[i] { + return false + } + } + return true +} + func (h *MACAddress) UnmarshalXML(d *xml.Decoder, start xml.StartElement) error { var macStr string if err := d.DecodeElement(&macStr, &start); err != nil { diff --git a/refresh/fetcher.go b/refresh/fetcher.go index d781d88460..8476032cd0 100644 --- a/refresh/fetcher.go +++ b/refresh/fetcher.go @@ -3,8 +3,6 @@ package refresh import ( "context" "time" - - "github.com/google/go-cmp/cmp" ) const ( @@ -17,7 +15,7 @@ const ( // When a diff is observed, the interval resets to the minimum. The interval can be made unchanging by setting // minInterval and maxInterval to the same desired value. -type Fetcher[T any] struct { +type Fetcher[T Equaler[T]] struct { fetchFunc func(context.Context) (T, error) cache T minInterval time.Duration @@ -29,7 +27,7 @@ type Fetcher[T any] struct { } // NewFetcher creates a new Fetcher. If minInterval is 0, it will default to 4 seconds. -func NewFetcher[T any]( +func NewFetcher[T Equaler[T]]( fetchFunc func(context.Context) (T, error), minInterval time.Duration, maxInterval time.Duration, @@ -87,7 +85,7 @@ func (f *Fetcher[T]) Start(ctx context.Context) { if err != nil { f.logger.Errorf("Error fetching data: %v", err) } else { - if cmp.Equal(result, f.cache) { + if result.Equal(f.cache) { f.updateFetchIntervalForNoObservedDiff() f.logger.Printf("No diff observed in fetch, not invoking the consumer") } else { From f79a6c3a32e00a32c1357384f54fda4d72a51778 Mon Sep 17 00:00:00 2001 From: Santhosh Prabhu Date: Wed, 2 Oct 2024 13:59:34 -0700 Subject: [PATCH 22/28] chore: add missed file --- refresh/equaler.go | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 refresh/equaler.go diff --git a/refresh/equaler.go b/refresh/equaler.go new file mode 100644 index 0000000000..85b52838fc --- /dev/null +++ b/refresh/equaler.go @@ -0,0 +1,5 @@ +package refresh + +type Equaler[T any] interface { + Equal(T) bool +} From 2331766ea80a04732880a9ebc9bd20f245e33dbe Mon Sep 17 00:00:00 2001 From: Santhosh Prabhu Date: Wed, 2 Oct 2024 14:36:30 -0700 Subject: [PATCH 23/28] feat: more efficient equality --- nmagent/equality.go | 40 ++++++++++++++++++++++++++++++---------- 1 file changed, 30 insertions(+), 10 deletions(-) diff --git a/nmagent/equality.go b/nmagent/equality.go index 67381e9897..5faee9a847 100644 --- a/nmagent/equality.go +++ b/nmagent/equality.go @@ -1,25 +1,30 @@ package nmagent -// Equal compares two Interfaces objects for equality. -func (i Interfaces) Equal(other Interfaces) bool { +// equalPtr compares two Interfaces objects for equality. +func (i *Interfaces) equalPtr(other *Interfaces) bool { if len(i.Entries) != len(other.Entries) { return false } for idx, entry := range i.Entries { - if !entry.Equal(other.Entries[idx]) { + if !entry.equalPtr(&other.Entries[idx]) { return false } } return true } -// Equal compares two Interface objects for equality. -func (i Interface) Equal(other Interface) bool { +// Equal compares two Interfaces objects for equality. +func (i Interfaces) Equal(other Interfaces) bool { + return i.equalPtr(&other) +} + +// equalPtr compares two Interface objects for equality. +func (i *Interface) equalPtr(other *Interface) bool { if len(i.InterfaceSubnets) != len(other.InterfaceSubnets) { return false } for idx, subnet := range i.InterfaceSubnets { - if !subnet.Equal(other.InterfaceSubnets[idx]) { + if !subnet.equalPtr(&other.InterfaceSubnets[idx]) { return false } } @@ -29,8 +34,13 @@ func (i Interface) Equal(other Interface) bool { return true } -// Equal compares two InterfaceSubnet objects for equality. -func (s InterfaceSubnet) Equal(other InterfaceSubnet) bool { +// Equal compares two Interface objects for equality. +func (i Interface) Equal(other Interface) bool { + return i.equalPtr(&other) +} + +// equalPtr compares two InterfaceSubnet objects for equality. +func (s *InterfaceSubnet) equalPtr(other *InterfaceSubnet) bool { if len(s.IPAddress) != len(other.IPAddress) { return false } @@ -38,14 +48,24 @@ func (s InterfaceSubnet) Equal(other InterfaceSubnet) bool { return false } for idx, ip := range s.IPAddress { - if !ip.Equal(other.IPAddress[idx]) { + if !ip.equalPtr(&other.IPAddress[idx]) { return false } } return true } +// Equal compares two InterfaceSubnet objects for equality. +func (s InterfaceSubnet) Equal(other InterfaceSubnet) bool { + return s.equalPtr(&other) +} + +// equalPtr compares two NodeIP objects for equality. +func (ip *NodeIP) equalPtr(other *NodeIP) bool { + return ip.IsPrimary == other.IsPrimary && ip.Address.Equal(other.Address) +} + // Equal compares two NodeIP objects for equality. func (ip NodeIP) Equal(other NodeIP) bool { - return ip.IsPrimary == other.IsPrimary && ip.Address.Equal(other.Address) + return ip.equalPtr(&other) } From 38c50476ce444cbdcc9a052a04f35d0abf7b0088 Mon Sep 17 00:00:00 2001 From: Santhosh Prabhu Date: Fri, 4 Oct 2024 09:50:44 -0700 Subject: [PATCH 24/28] refactor: address Evan's comment --- refresh/equaler.go | 2 +- refresh/fetcher.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/refresh/equaler.go b/refresh/equaler.go index 85b52838fc..96a42f413e 100644 --- a/refresh/equaler.go +++ b/refresh/equaler.go @@ -1,5 +1,5 @@ package refresh -type Equaler[T any] interface { +type equaler[T any] interface { Equal(T) bool } diff --git a/refresh/fetcher.go b/refresh/fetcher.go index 8476032cd0..a509e0dc8c 100644 --- a/refresh/fetcher.go +++ b/refresh/fetcher.go @@ -15,7 +15,7 @@ const ( // When a diff is observed, the interval resets to the minimum. The interval can be made unchanging by setting // minInterval and maxInterval to the same desired value. -type Fetcher[T Equaler[T]] struct { +type Fetcher[T equaler[T]] struct { fetchFunc func(context.Context) (T, error) cache T minInterval time.Duration @@ -27,7 +27,7 @@ type Fetcher[T Equaler[T]] struct { } // NewFetcher creates a new Fetcher. If minInterval is 0, it will default to 4 seconds. -func NewFetcher[T Equaler[T]]( +func NewFetcher[T equaler[T]]( fetchFunc func(context.Context) (T, error), minInterval time.Duration, maxInterval time.Duration, From f8b1c6432210a9d21f7ea02d4bec88c037629297 Mon Sep 17 00:00:00 2001 From: Santhosh Prabhu Date: Tue, 8 Oct 2024 12:52:33 -0700 Subject: [PATCH 25/28] refactor: address Tim's comment --- cns/service/main.go | 4 ++-- nmagent/equality.go | 40 ++++++++++------------------------------ 2 files changed, 12 insertions(+), 32 deletions(-) diff --git a/cns/service/main.go b/cns/service/main.go index a83324ab8c..2d435c1f5c 100644 --- a/cns/service/main.go +++ b/cns/service/main.go @@ -1123,13 +1123,13 @@ type nodeNetworkConfigGetter interface { Get(context.Context) (*v1alpha.NodeNetworkConfig, error) } -type ipamStateReconciler interface { +type IpamStateReconciler interface { ReconcileIPAMState(ncRequests []*cns.CreateNetworkContainerRequest, podInfoByIP map[string]cns.PodInfo, nnc *v1alpha.NodeNetworkConfig) cnstypes.ResponseCode } // TODO(rbtr) where should this live?? // reconcileInitialCNSState initializes cns by passing pods and a CreateNetworkContainerRequest -func reconcileInitialCNSState(ctx context.Context, cli nodeNetworkConfigGetter, ipamReconciler ipamStateReconciler, podInfoByIPProvider cns.PodInfoByIPProvider) error { +func reconcileInitialCNSState(ctx context.Context, cli nodeNetworkConfigGetter, ipamReconciler IpamStateReconciler, podInfoByIPProvider cns.PodInfoByIPProvider) error { // Get nnc using direct client nnc, err := cli.Get(ctx) if err != nil { diff --git a/nmagent/equality.go b/nmagent/equality.go index 5faee9a847..67381e9897 100644 --- a/nmagent/equality.go +++ b/nmagent/equality.go @@ -1,30 +1,25 @@ package nmagent -// equalPtr compares two Interfaces objects for equality. -func (i *Interfaces) equalPtr(other *Interfaces) bool { +// Equal compares two Interfaces objects for equality. +func (i Interfaces) Equal(other Interfaces) bool { if len(i.Entries) != len(other.Entries) { return false } for idx, entry := range i.Entries { - if !entry.equalPtr(&other.Entries[idx]) { + if !entry.Equal(other.Entries[idx]) { return false } } return true } -// Equal compares two Interfaces objects for equality. -func (i Interfaces) Equal(other Interfaces) bool { - return i.equalPtr(&other) -} - -// equalPtr compares two Interface objects for equality. -func (i *Interface) equalPtr(other *Interface) bool { +// Equal compares two Interface objects for equality. +func (i Interface) Equal(other Interface) bool { if len(i.InterfaceSubnets) != len(other.InterfaceSubnets) { return false } for idx, subnet := range i.InterfaceSubnets { - if !subnet.equalPtr(&other.InterfaceSubnets[idx]) { + if !subnet.Equal(other.InterfaceSubnets[idx]) { return false } } @@ -34,13 +29,8 @@ func (i *Interface) equalPtr(other *Interface) bool { return true } -// Equal compares two Interface objects for equality. -func (i Interface) Equal(other Interface) bool { - return i.equalPtr(&other) -} - -// equalPtr compares two InterfaceSubnet objects for equality. -func (s *InterfaceSubnet) equalPtr(other *InterfaceSubnet) bool { +// Equal compares two InterfaceSubnet objects for equality. +func (s InterfaceSubnet) Equal(other InterfaceSubnet) bool { if len(s.IPAddress) != len(other.IPAddress) { return false } @@ -48,24 +38,14 @@ func (s *InterfaceSubnet) equalPtr(other *InterfaceSubnet) bool { return false } for idx, ip := range s.IPAddress { - if !ip.equalPtr(&other.IPAddress[idx]) { + if !ip.Equal(other.IPAddress[idx]) { return false } } return true } -// Equal compares two InterfaceSubnet objects for equality. -func (s InterfaceSubnet) Equal(other InterfaceSubnet) bool { - return s.equalPtr(&other) -} - -// equalPtr compares two NodeIP objects for equality. -func (ip *NodeIP) equalPtr(other *NodeIP) bool { - return ip.IsPrimary == other.IsPrimary && ip.Address.Equal(other.Address) -} - // Equal compares two NodeIP objects for equality. func (ip NodeIP) Equal(other NodeIP) bool { - return ip.equalPtr(&other) + return ip.IsPrimary == other.IsPrimary && ip.Address.Equal(other.Address) } From 85416313550182d32abf014aa602c1f0d7599185 Mon Sep 17 00:00:00 2001 From: Santhosh Prabhu Date: Tue, 8 Oct 2024 12:53:41 -0700 Subject: [PATCH 26/28] fix: undo accidental commit --- cns/service/main.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cns/service/main.go b/cns/service/main.go index 2d435c1f5c..a83324ab8c 100644 --- a/cns/service/main.go +++ b/cns/service/main.go @@ -1123,13 +1123,13 @@ type nodeNetworkConfigGetter interface { Get(context.Context) (*v1alpha.NodeNetworkConfig, error) } -type IpamStateReconciler interface { +type ipamStateReconciler interface { ReconcileIPAMState(ncRequests []*cns.CreateNetworkContainerRequest, podInfoByIP map[string]cns.PodInfo, nnc *v1alpha.NodeNetworkConfig) cnstypes.ResponseCode } // TODO(rbtr) where should this live?? // reconcileInitialCNSState initializes cns by passing pods and a CreateNetworkContainerRequest -func reconcileInitialCNSState(ctx context.Context, cli nodeNetworkConfigGetter, ipamReconciler IpamStateReconciler, podInfoByIPProvider cns.PodInfoByIPProvider) error { +func reconcileInitialCNSState(ctx context.Context, cli nodeNetworkConfigGetter, ipamReconciler ipamStateReconciler, podInfoByIPProvider cns.PodInfoByIPProvider) error { // Get nnc using direct client nnc, err := cli.Get(ctx) if err != nil { From df3d900ae8cc52240727f94e7e7ab83f852356d5 Mon Sep 17 00:00:00 2001 From: Santhosh Prabhu Date: Mon, 14 Oct 2024 09:24:55 -0700 Subject: [PATCH 27/28] fix: make linter happy --- refresh/fetcher_test.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/refresh/fetcher_test.go b/refresh/fetcher_test.go index ef81e6044d..0e686a358e 100644 --- a/refresh/fetcher_test.go +++ b/refresh/fetcher_test.go @@ -15,13 +15,13 @@ import ( // Mock client that simply tracks if refresh has been called type TestClient struct { - refreshCount int32 + refreshCount int responses []nmagent.Interfaces mu sync.Mutex } // FetchRefreshCount atomically fetches the refresh count -func (c *TestClient) FetchRefreshCount() int32 { +func (c *TestClient) FetchRefreshCount() int { c.mu.Lock() defer c.mu.Unlock() return c.refreshCount @@ -38,7 +38,7 @@ func (c *TestClient) UpdateRefreshCount() { func (c *TestClient) GetInterfaceIPInfo(_ context.Context) (nmagent.Interfaces, error) { defer c.UpdateRefreshCount() - if c.refreshCount >= int32(len(c.responses)) { + if c.refreshCount >= len(c.responses) { return c.responses[len(c.responses)-1], nil } @@ -49,12 +49,12 @@ var _ nodesubnet.InterfaceRetriever = &TestClient{} // Mock client that simply consumes fetched IPs type TestConsumer struct { - consumeCount int32 + consumeCount int mu sync.Mutex } // FetchConsumeCount atomically fetches the consume count -func (c *TestConsumer) FetchConsumeCount() int32 { +func (c *TestConsumer) FetchConsumeCount() int { c.mu.Lock() defer c.mu.Unlock() return c.consumeCount From 0407ef0189fc6e1ed71554fded851fce7bc6640e Mon Sep 17 00:00:00 2001 From: Santhosh Prabhu Date: Mon, 14 Oct 2024 09:33:14 -0700 Subject: [PATCH 28/28] fix: make linter happy --- cns/nodesubnet/ip_fetcher_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/cns/nodesubnet/ip_fetcher_test.go b/cns/nodesubnet/ip_fetcher_test.go index 3e5406fb46..b981fd552b 100644 --- a/cns/nodesubnet/ip_fetcher_test.go +++ b/cns/nodesubnet/ip_fetcher_test.go @@ -12,29 +12,29 @@ import ( // Mock client that simply consumes fetched IPs type TestConsumer struct { - consumeCount int32 - secondaryIPCount int32 + consumeCount int + secondaryIPCount int } // FetchConsumeCount atomically fetches the consume count -func (c *TestConsumer) FetchConsumeCount() int32 { +func (c *TestConsumer) FetchConsumeCount() int { return c.consumeCount } // FetchSecondaryIPCount atomically fetches the last IP count -func (c *TestConsumer) FetchSecondaryIPCount() int32 { +func (c *TestConsumer) FetchSecondaryIPCount() int { return c.consumeCount } // UpdateConsumeCount atomically updates the consume count -func (c *TestConsumer) updateCounts(ipCount int32) { +func (c *TestConsumer) updateCounts(ipCount int) { c.consumeCount++ c.secondaryIPCount = ipCount } // Mock IP update func (c *TestConsumer) UpdateIPsForNodeSubnet(ips []netip.Addr) error { - c.updateCounts(int32(len(ips))) + c.updateCounts(len(ips)) return nil }