Skip to content
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
ae849be
Added logic to make 2 hns calls for 2 different endpoint states
rejain789 Oct 22, 2024
6e56bba
added querying to l1vh hns only if npm lite is enabled
rejain789 Oct 24, 2024
98a5d1e
added logging line for debugging
rejain789 Oct 24, 2024
ddc83b0
updated config
rejain789 Oct 24, 2024
382c150
removed logging lines
rejain789 Oct 24, 2024
1ef66e3
fixing go lint err
rejain789 Oct 25, 2024
70fb5c4
refactored based on pr comments
rejain789 Oct 25, 2024
605a498
replaced with errors.Wrap and fixed a logging statement
rejain789 Oct 25, 2024
6dde355
added if condition with logic
rejain789 Oct 25, 2024
4abb1c9
changed errl1vh to err
rejain789 Oct 25, 2024
f19fd62
added omments
rejain789 Oct 25, 2024
a79e44f
added logging lines for debugging
rejain789 Oct 25, 2024
79b54fb
added npm lite enabled log debugging
rejain789 Oct 25, 2024
65714fb
spacing
rejain789 Oct 25, 2024
7b2e422
syntax
rejain789 Oct 25, 2024
79d19b8
added logs for debugging
rejain789 Oct 25, 2024
27b76cd
optimizing api load
rejain789 Oct 25, 2024
7728b31
added function to remove common endpoints
rejain789 Oct 29, 2024
b6e07fb
added logging for debugging
rejain789 Oct 29, 2024
b1b941e
removed npm lite check
rejain789 Oct 30, 2024
ae341d5
removed all the debugging comments
rejain789 Oct 31, 2024
c5b18be
added extra unit test cases
rejain789 Oct 31, 2024
3c237ba
added additional unit tests
rejain789 Oct 31, 2024
3089651
removed protobuf code
rejain789 Oct 31, 2024
15a1182
fixed comment
rejain789 Nov 4, 2024
fd985c7
fixed a spelling error
rejain789 Nov 4, 2024
60dda1c
resolved pr comments
rejain789 Nov 4, 2024
36f0d74
updated a comment
rejain789 Nov 4, 2024
ab7038d
revised comment
rejain789 Nov 4, 2024
2c66d63
resolved further pr comments
rejain789 Nov 4, 2024
10d116f
changed back to for loop from range
rejain789 Nov 4, 2024
cd17bd3
Merge branch 'master' into jainriya/hnsEndpointFixL1VH
rejain456 Nov 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion npm/npm.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func (npMgr *NetworkPolicyManager) Start(config npmconfig.Config, stopCh <-chan
// Starts all informers manufactured by npMgr's informerFactory.
npMgr.InformerFactory.Start(stopCh)

// npn lite
// npm lite
if npMgr.NpmLiteToggle {
npMgr.PodInformerFactory.Start(stopCh)
}
Expand Down
26 changes: 13 additions & 13 deletions npm/pkg/dataplane/dataplane.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,13 @@ type DataPlane struct {
nodeName string
// endpointCache stores all endpoints of the network (including off-node)
// Key is PodIP
endpointCache *endpointCache
ioShim *common.IOShim
updatePodCache *updatePodCache
endpointQuery *endpointQuery
endpointQueryL1VH *endpointQuery // windows -> filter for state 2 (attached) endpoints in l1vh
applyInfo *applyInfo
netPolQueue *netPolQueue
endpointCache *endpointCache
ioShim *common.IOShim
updatePodCache *updatePodCache
endpointQuery *endpointQuery
endpointQueryAttachedState *endpointQuery // windows -> filter for state 2 (attached) endpoints in l1vh
applyInfo *applyInfo
netPolQueue *netPolQueue
// removePolicyInfo tracks when a policy was removed yet had ApplyIPSet failures.
// This field is only relevant for Linux.
removePolicyInfo removePolicyInfo
Expand All @@ -90,12 +90,12 @@ func NewDataPlane(nodeName string, ioShim *common.IOShim, cfg *Config, stopChann
policyMgr: policies.NewPolicyManager(ioShim, cfg.PolicyManagerCfg),
ipsetMgr: ipsets.NewIPSetManager(cfg.IPSetManagerCfg, ioShim),
// networkID is set when initializing Windows dataplane
networkID: "",
endpointCache: newEndpointCache(),
nodeName: nodeName,
ioShim: ioShim,
endpointQuery: new(endpointQuery),
endpointQueryL1VH: new(endpointQuery),
networkID: "",
endpointCache: newEndpointCache(),
nodeName: nodeName,
ioShim: ioShim,
endpointQuery: new(endpointQuery),
endpointQueryAttachedState: new(endpointQuery),
applyInfo: &applyInfo{
inBootupPhase: true,
},
Expand Down
75 changes: 55 additions & 20 deletions npm/pkg/dataplane/dataplane_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (dp *DataPlane) initializeDataPlane() error {
Flags: hcn.HostComputeQueryFlagsNone,
}
// Initialize Endpoint query used to filter healthy endpoints (vNIC) of Windows pods on L1VH Node
dp.endpointQueryL1VH.query = hcn.HostComputeQuery{
dp.endpointQueryAttachedState.query = hcn.HostComputeQuery{
SchemaVersion: hcn.SchemaVersion{
Major: hcnSchemaMajorVersion,
Minor: hcnSchemaMinorVersion,
Expand All @@ -63,18 +63,17 @@ func (dp *DataPlane) initializeDataPlane() error {
filterMap := map[string]uint16{"State": hcnEndpointStateAttachedSharing}
filter, err := json.Marshal(filterMap)
if err != nil {
return errors.Wrap(err, "failed to marshal endpoint filter map")
return errors.Wrap(err, "failed to marshal endpoint filter map for attachedsharing state")
}
dp.endpointQuery.query.Filter = string(filter)

if dp.EnableNPMLite {
filterMapL1VH := map[string]uint16{"State": hcnEndpointStateAttached}
filterL1VH, errL1VH := json.Marshal(filterMapL1VH)
if errL1VH != nil {
return errors.Wrap(errL1VH, "failed to marshal endpoint filter map")
}
dp.endpointQueryL1VH.query.Filter = string(filterL1VH)
// Filter out any endpoints that are not in "Attached" State. All running Windows L1VH pods with networking must be in this state.
filterMapAttached := map[string]uint16{"State": hcnEndpointStateAttached}
filterAttached, err := json.Marshal(filterMapAttached)
if err != nil {
return errors.Wrap(err, "failed to marshal endpoint filter map for attched state")
}
dp.endpointQueryAttachedState.query.Filter = string(filterAttached)

// reset endpoint cache so that netpol references are removed for all endpoints while refreshing pod endpoints
// no need to lock endpointCache at boot up
Expand Down Expand Up @@ -347,31 +346,67 @@ func (dp *DataPlane) getEndpointsToApplyPolicies(netPols []*policies.NPMNetworkP

func (dp *DataPlane) getLocalPodEndpoints() ([]*hcn.HostComputeEndpoint, error) {
klog.Info("getting local endpoints")

// Gets endpoints in state: Attached
timer := metrics.StartNewTimer()
endpoints, err := dp.ioShim.Hns.ListEndpointsQuery(dp.endpointQuery.query)
endpointsAttached, err := dp.ioShim.Hns.ListEndpointsQuery(dp.endpointQueryAttachedState.query)
metrics.RecordListEndpointsLatency(timer)
if err != nil {
metrics.IncListEndpointsFailures()
return nil, errors.Wrap(err, "failed to get local pod endpoints")
return nil, errors.Wrap(err, "failed to get local pod endpoints in state:attached")
}

if dp.EnableNPMLite {
timer = metrics.StartNewTimer()
endpointsAttached, errL1vh := dp.ioShim.Hns.ListEndpointsQuery(dp.endpointQueryL1VH.query)
metrics.RecordListEndpointsLatency(timer)
if errL1vh != nil {
metrics.IncListEndpointsFailures()
return nil, errors.Wrap(errL1vh, "failed to get local pod endpoints in L1VH")
}
endpoints = append(endpoints, endpointsAttached...)
// Gets endpoints in state: AttachedSharing
timer = metrics.StartNewTimer()
endpoints, err := dp.ioShim.Hns.ListEndpointsQuery(dp.endpointQuery.query)
metrics.RecordListEndpointsLatency(timer)
if err != nil {
metrics.IncListEndpointsFailures()
return nil, errors.Wrap(err, "failed to get local pod endpoints in state: attachedSharing")
}

// Filtering out any of the same endpoints between endpoints with state attached and attachedSharing
endpoints = removeCommonEndpoints(&endpoints, &endpointsAttached)

epPointers := make([]*hcn.HostComputeEndpoint, 0, len(endpoints))
for k := range endpoints {
epPointers = append(epPointers, &endpoints[k])
}
return epPointers, nil
}

func removeCommonEndpoints(endpoints, endpointsAttached *[]hcn.HostComputeEndpoint) []hcn.HostComputeEndpoint {
// Choose smaller and larger lists based on length for efficiency
smallerEndpointsList, largerEndpointsList := endpoints, endpointsAttached
if len(*endpoints) > len(*endpointsAttached) {
smallerEndpointsList, largerEndpointsList = endpointsAttached, endpoints
}

// Store IDs of smaller list in a map for quick lookup
idMap := make(map[string]struct{}, len(*smallerEndpointsList))
for i := 0; i < len(*smallerEndpointsList); i++ {
ep := &(*smallerEndpointsList)[i]
idMap[ep.Id] = struct{}{}
}

// Append endpoints from larger list and remove common IDs from map
result := []hcn.HostComputeEndpoint{}
for i := 0; i < len(*largerEndpointsList); i++ {
ep := (*largerEndpointsList)[i]
result = append(result, ep)
delete(idMap, ep.Id)
}

// Append remaining unique endpoints from smaller list to result
for i := 0; i < len(*smallerEndpointsList); i++ {
ep := (*smallerEndpointsList)[i]
if _, found := idMap[ep.Id]; found {
result = append(result, ep)
}
}
return result
}

// refreshPodEndpoints will refresh all the pod endpoints and create empty netpol references for new endpoints
/*
Key Assumption: a new pod event (w/ IP) cannot come before HNS knows (and can tell us) about the endpoint.
Expand Down
68 changes: 68 additions & 0 deletions npm/pkg/dataplane/dataplane_windows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package dataplane

import (
"fmt"
"reflect"
"sync"
"testing"
"time"
Expand All @@ -10,6 +11,7 @@ import (
"github.com/Azure/azure-container-networking/npm/metrics"
"github.com/Azure/azure-container-networking/npm/pkg/dataplane/ipsets"
dptestutils "github.com/Azure/azure-container-networking/npm/pkg/dataplane/testutils"
"github.com/Microsoft/hcsshim/hcn"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -86,6 +88,72 @@ func TestMultiJobApplyInBackground(t *testing.T) {
testMultiJobCases(t, multiJobApplyInBackgroundTests(), time.Duration(1*time.Second))
}

func TestRemoveCommonEndpoints(t *testing.T) {
tests := []struct {
name string
endpoints []hcn.HostComputeEndpoint
endpointsAttached []hcn.HostComputeEndpoint
expected []hcn.HostComputeEndpoint
}{
{
name: "1 value same",
endpoints: []hcn.HostComputeEndpoint{{Id: "456901"}, {Id: "123456"}, {Id: "560971"}},
endpointsAttached: []hcn.HostComputeEndpoint{{Id: "567890"}, {Id: "123456"}, {Id: "789012"}},
expected: []hcn.HostComputeEndpoint{{Id: "567890"}, {Id: "123456"}, {Id: "789012"}, {Id: "456901"}, {Id: "560971"}},
},
{
name: "no values same",
endpoints: []hcn.HostComputeEndpoint{{Id: "456901"}, {Id: "560971"}},
endpointsAttached: []hcn.HostComputeEndpoint{{Id: "567890"}, {Id: "789012"}},
expected: []hcn.HostComputeEndpoint{{Id: "567890"}, {Id: "789012"}, {Id: "456901"}, {Id: "560971"}},
},
{
name: "1 value same",
endpoints: []hcn.HostComputeEndpoint{{Id: "456901"}, {Id: "123456"}, {Id: "560971"}},
endpointsAttached: []hcn.HostComputeEndpoint{{Id: "567890"}, {Id: "123456"}, {Id: "789012"}},
expected: []hcn.HostComputeEndpoint{{Id: "567890"}, {Id: "123456"}, {Id: "789012"}, {Id: "456901"}, {Id: "560971"}},
},
{
name: "two values same",
endpoints: []hcn.HostComputeEndpoint{{Id: "456901"}, {Id: "560971"}, {Id: "123456"}, {Id: "789012"}},
endpointsAttached: []hcn.HostComputeEndpoint{{Id: "567890"}, {Id: "789012"}, {Id: "123456"}},
expected: []hcn.HostComputeEndpoint{{Id: "456901"}, {Id: "560971"}, {Id: "123456"}, {Id: "789012"}, {Id: "567890"}},
},
{
name: "no values",
endpoints: []hcn.HostComputeEndpoint{},
endpointsAttached: []hcn.HostComputeEndpoint{},
expected: []hcn.HostComputeEndpoint{},
},
{
name: "1 value - same",
endpoints: []hcn.HostComputeEndpoint{{Id: "456901"}},
endpointsAttached: []hcn.HostComputeEndpoint{{Id: "456901"}},
expected: []hcn.HostComputeEndpoint{{Id: "456901"}},
},
{
name: "1 value - different",
endpoints: []hcn.HostComputeEndpoint{{Id: "456901"}},
endpointsAttached: []hcn.HostComputeEndpoint{},
expected: []hcn.HostComputeEndpoint{{Id: "456901"}},
},
}
for _, tt := range tests {
tt := tt

t.Run(tt.name, func(t *testing.T) {
result := removeCommonEndpoints(&tt.endpoints, &tt.endpointsAttached)
// Use reflect.DeepEqual as a backup if require.Equal doesn't work as expected
if !reflect.DeepEqual(tt.expected, result) {
t.Errorf("Test %s failed: expected %v, got %v", tt.name, tt.expected, result)
}

// Or, if require.Equal works fine, it will display a descriptive error message
require.Equal(t, tt.expected, result, "expected array equals result")
})
}
}

func testSerialCases(t *testing.T, tests []*SerialTestCase, finalSleep time.Duration) {
for i, tt := range tests {
i := i
Expand Down
Loading