Skip to content

Commit cc0faf0

Browse files
committed
[kube-proxy:nftables] Skip EP chain updates on startup.
Endpoint chain contents are fairly predictable from their name and existing affinity sets. Skip endpoint chain updates, when we can be sure that rules in that chain are still correct. Add unit test to verify first transaction is optimized. Change baseRules ordering to make it accepted by nft.ParseDump. Signed-off-by: Nadia Pinaeva <[email protected]>
1 parent 7d5f3c5 commit cc0faf0

File tree

2 files changed

+212
-33
lines changed

2 files changed

+212
-33
lines changed

pkg/proxy/nftables/proxier.go

Lines changed: 66 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@ type Proxier struct {
163163
// updating nftables with some partial data after kube-proxy restart.
164164
endpointSlicesSynced bool
165165
servicesSynced bool
166+
syncedOnce bool
166167
lastFullSync time.Time
167168
needFullSync bool
168169
initialized int32
@@ -1189,6 +1190,7 @@ func (proxier *Proxier) syncProxyRules() {
11891190
doFullSync := proxier.needFullSync || (time.Since(proxier.lastFullSync) > proxyutil.FullSyncPeriod)
11901191

11911192
defer func() {
1193+
proxier.syncedOnce = true
11921194
metrics.SyncProxyRulesLatency.WithLabelValues(string(proxier.ipFamily)).Observe(metrics.SinceInSeconds(start))
11931195
if !doFullSync {
11941196
metrics.SyncPartialProxyRulesLatency.WithLabelValues(string(proxier.ipFamily)).Observe(metrics.SinceInSeconds(start))
@@ -1263,6 +1265,26 @@ func (proxier *Proxier) syncProxyRules() {
12631265
ipvX_addr = "ipv6_addr"
12641266
}
12651267

1268+
var existingChains sets.Set[string]
1269+
existingChainsList, err := proxier.nftables.List(context.TODO(), "chain")
1270+
if err == nil {
1271+
existingChains = sets.New(existingChainsList...)
1272+
} else {
1273+
proxier.logger.Error(err, "Failed to list existing chains")
1274+
}
1275+
var existingAffinitySets sets.Set[string]
1276+
existingSets, err := proxier.nftables.List(context.TODO(), "sets")
1277+
if err == nil {
1278+
existingAffinitySets = sets.New[string]()
1279+
for _, set := range existingSets {
1280+
if isAffinitySetName(set) {
1281+
existingAffinitySets.Insert(set)
1282+
}
1283+
}
1284+
} else {
1285+
proxier.logger.Error(err, "Failed to list existing sets")
1286+
}
1287+
12661288
// Accumulate service/endpoint chains and affinity sets to keep.
12671289
activeChains := sets.New[string]()
12681290
activeAffinitySets := sets.New[string]()
@@ -1306,7 +1328,8 @@ func (proxier *Proxier) syncProxyRules() {
13061328
// Note the endpoint chains that will be used
13071329
for _, ep := range allLocallyReachableEndpoints {
13081330
if epInfo, ok := ep.(*endpointInfo); ok {
1309-
ensureChain(epInfo.chainName, tx, activeChains, skipServiceUpdate)
1331+
ensureChain(epInfo.chainName, tx, activeChains, skipServiceUpdate ||
1332+
proxier.epChainSkipUpdate(existingChains, existingAffinitySets, svcInfo, epInfo))
13101333
// Note the affinity sets that will be used
13111334
if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
13121335
activeAffinitySets.Insert(epInfo.affinitySetName)
@@ -1748,6 +1771,10 @@ func (proxier *Proxier) syncProxyRules() {
17481771
continue
17491772
}
17501773

1774+
if proxier.epChainSkipUpdate(existingChains, existingAffinitySets, svcInfo, epInfo) {
1775+
// If the EP chain is already updated, we can skip it.
1776+
continue
1777+
}
17511778
endpointChain := epInfo.chainName
17521779

17531780
// Handle traffic that loops back to the originator with SNAT.
@@ -1787,36 +1814,26 @@ func (proxier *Proxier) syncProxyRules() {
17871814
// short amount of time later that the chain is now unreferenced. So we flush them
17881815
// now, and record the time that they become stale in staleChains so they can be
17891816
// deleted later.
1790-
existingChains, err := proxier.nftables.List(context.TODO(), "chains")
1791-
if err == nil {
1792-
for _, chain := range existingChains {
1793-
if isServiceChainName(chain) {
1794-
if !activeChains.Has(chain) {
1795-
tx.Flush(&knftables.Chain{
1796-
Name: chain,
1797-
})
1798-
proxier.staleChains[chain] = start
1799-
} else {
1800-
delete(proxier.staleChains, chain)
1801-
}
1817+
for chain := range existingChains {
1818+
if isServiceChainName(chain) {
1819+
if !activeChains.Has(chain) {
1820+
tx.Flush(&knftables.Chain{
1821+
Name: chain,
1822+
})
1823+
proxier.staleChains[chain] = start
1824+
} else {
1825+
delete(proxier.staleChains, chain)
18021826
}
18031827
}
1804-
} else if !knftables.IsNotFound(err) {
1805-
proxier.logger.Error(err, "Failed to list nftables chains: stale chains will not be deleted")
18061828
}
18071829

18081830
// OTOH, we can immediately delete any stale affinity sets
1809-
existingSets, err := proxier.nftables.List(context.TODO(), "sets")
1810-
if err == nil {
1811-
for _, set := range existingSets {
1812-
if isAffinitySetName(set) && !activeAffinitySets.Has(set) {
1813-
tx.Delete(&knftables.Set{
1814-
Name: set,
1815-
})
1816-
}
1831+
for set := range existingAffinitySets {
1832+
if !activeAffinitySets.Has(set) {
1833+
tx.Delete(&knftables.Set{
1834+
Name: set,
1835+
})
18171836
}
1818-
} else if !knftables.IsNotFound(err) {
1819-
proxier.logger.Error(err, "Failed to list nftables sets: stale affinity sets will not be deleted")
18201837
}
18211838

18221839
proxier.clusterIPs.cleanupLeftoverKeys(tx)
@@ -1882,6 +1899,30 @@ func (proxier *Proxier) syncProxyRules() {
18821899
}
18831900
}
18841901

1902+
// epChainSkipUpdate returns true if the EP chain doesn't need to be updated.
1903+
func (proxier *Proxier) epChainSkipUpdate(existingChains, existingAffinitySets sets.Set[string], svcInfo *servicePortInfo, epInfo *endpointInfo) bool {
1904+
if proxier.syncedOnce {
1905+
// We only skip updating EP chains during the first sync to speed up kube-proxy restart, otherwise return false.
1906+
return false
1907+
}
1908+
if existingChains == nil || existingAffinitySets == nil {
1909+
// listing existing objects failed, can't skip updating
1910+
return false
1911+
}
1912+
// EP chain can have up to 3 rules:
1913+
// - loopback masquerade rule
1914+
// - includes the endpoint IP
1915+
// - affinity rule when session affinity is set to ClusterIP
1916+
// - includes the affinity set name
1917+
// - DNAT rule
1918+
// - includes the endpoint IP + port
1919+
// EP chain name includes the endpoint IP + port => loopback and DNAT rules are pre-defined by the chain name.
1920+
// When session affinity is set to ClusterIP, the affinity set is created for local endpoints.
1921+
// Therefore, we can check that sessions affinity hasn't changed by checking if the affinity set exists.
1922+
wantAffinitySet := svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP
1923+
return existingChains.Has(epInfo.chainName) && wantAffinitySet == existingAffinitySets.Has(epInfo.affinitySetName)
1924+
}
1925+
18851926
func (proxier *Proxier) writeServiceToEndpointRules(tx *knftables.Transaction, svcInfo *servicePortInfo, svcChain string, endpoints []proxy.Endpoint) {
18861927
// First write session affinity rules, if applicable.
18871928
if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {

pkg/proxy/nftables/proxier_test.go

Lines changed: 146 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,15 @@ func NewFakeProxier(ipFamily v1.IPFamily) (*knftables.Fake, *Proxier) {
150150
var baseRules = dedent.Dedent(`
151151
add table ip kube-proxy { comment "rules for kube-proxy" ; }
152152
153+
add set ip kube-proxy cluster-ips { type ipv4_addr ; comment "Active ClusterIPs" ; }
154+
add set ip kube-proxy nodeport-ips { type ipv4_addr ; comment "IPs that accept NodePort traffic" ; }
155+
156+
add map ip kube-proxy firewall-ips { type ipv4_addr . inet_proto . inet_service : verdict ; comment "destinations that are subject to LoadBalancerSourceRanges" ; }
157+
add map ip kube-proxy no-endpoint-nodeports { type inet_proto . inet_service : verdict ; comment "vmap to drop or reject packets to service nodeports with no endpoints" ; }
158+
add map ip kube-proxy no-endpoint-services { type ipv4_addr . inet_proto . inet_service : verdict ; comment "vmap to drop or reject packets to services with no endpoints" ; }
159+
add map ip kube-proxy service-ips { type ipv4_addr . inet_proto . inet_service : verdict ; comment "ClusterIP, ExternalIP and LoadBalancer IP traffic" ; }
160+
add map ip kube-proxy service-nodeports { type inet_proto . inet_service : verdict ; comment "NodePort traffic" ; }
161+
153162
add chain ip kube-proxy cluster-ips-check
154163
add chain ip kube-proxy filter-prerouting { type filter hook prerouting priority -110 ; }
155164
add chain ip kube-proxy filter-forward { type filter hook forward priority -110 ; }
@@ -189,16 +198,9 @@ var baseRules = dedent.Dedent(`
189198
add rule ip kube-proxy reject-chain reject
190199
add rule ip kube-proxy services ip daddr . meta l4proto . th dport vmap @service-ips
191200
add rule ip kube-proxy services ip daddr @nodeport-ips meta l4proto . th dport vmap @service-nodeports
192-
add set ip kube-proxy cluster-ips { type ipv4_addr ; comment "Active ClusterIPs" ; }
193-
add set ip kube-proxy nodeport-ips { type ipv4_addr ; comment "IPs that accept NodePort traffic" ; }
201+
194202
add element ip kube-proxy nodeport-ips { 192.168.0.2 }
195203
add rule ip kube-proxy service-endpoints-check ip daddr . meta l4proto . th dport vmap @no-endpoint-services
196-
197-
add map ip kube-proxy firewall-ips { type ipv4_addr . inet_proto . inet_service : verdict ; comment "destinations that are subject to LoadBalancerSourceRanges" ; }
198-
add map ip kube-proxy no-endpoint-nodeports { type inet_proto . inet_service : verdict ; comment "vmap to drop or reject packets to service nodeports with no endpoints" ; }
199-
add map ip kube-proxy no-endpoint-services { type ipv4_addr . inet_proto . inet_service : verdict ; comment "vmap to drop or reject packets to services with no endpoints" ; }
200-
add map ip kube-proxy service-ips { type ipv4_addr . inet_proto . inet_service : verdict ; comment "ClusterIP, ExternalIP and LoadBalancer IP traffic" ; }
201-
add map ip kube-proxy service-nodeports { type inet_proto . inet_service : verdict ; comment "NodePort traffic" ; }
202204
`)
203205

204206
// TestOverallNFTablesRules creates a variety of services and verifies that the generated
@@ -4321,6 +4323,142 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
43214323
}
43224324
}
43234325

4326+
func TestSyncProxyRulesStartup(t *testing.T) {
4327+
nft, fp := NewFakeProxier(v1.IPv4Protocol)
4328+
fp.syncProxyRules()
4329+
// measure the amount of ops required for the initial sync
4330+
setupOps := nft.LastTransaction.NumOperations()
4331+
4332+
// now create a new proxier and start from scratch
4333+
nft, fp = NewFakeProxier(v1.IPv4Protocol)
4334+
4335+
// put a part of desired state to nftables
4336+
err := nft.ParseDump(baseRules + dedent.Dedent(`
4337+
add chain ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80
4338+
add chain ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80
4339+
add chain ip kube-proxy service-MHHHYRWA-ns2/svc2/tcp/p8080
4340+
add chain ip kube-proxy endpoint-7RVP4LUQ-ns2/svc2/tcp/p8080__10.0.2.1/8080
4341+
4342+
add rule ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 ip daddr 172.30.0.41 tcp dport 80 ip saddr != 10.0.0.0/8 jump mark-for-masquerade
4343+
add rule ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 numgen random mod 1 vmap { 0 : goto endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 }
4344+
add rule ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 ip saddr 10.0.1.1 jump mark-for-masquerade
4345+
add rule ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 meta l4proto tcp dnat to 10.0.1.1:80
4346+
4347+
add rule ip kube-proxy service-MHHHYRWA-ns2/svc2/tcp/p8080 ip daddr 172.30.0.42 tcp dport 8080 ip saddr != 10.0.0.0/8 jump mark-for-masquerade
4348+
add rule ip kube-proxy service-MHHHYRWA-ns2/svc2/tcp/p8080 numgen random mod 1 vmap { 0 : goto endpoint-7RVP4LUQ-ns2/svc2/tcp/p8080__10.0.2.1/8080 }
4349+
add rule ip kube-proxy endpoint-7RVP4LUQ-ns2/svc2/tcp/p8080__10.0.2.1/8080 ip saddr 10.0.2.1 jump mark-for-masquerade
4350+
add rule ip kube-proxy endpoint-7RVP4LUQ-ns2/svc2/tcp/p8080__10.0.2.1/8080 meta l4proto tcp dnat to 10.0.2.1:8080
4351+
4352+
add element ip kube-proxy cluster-ips { 172.30.0.41 }
4353+
add element ip kube-proxy cluster-ips { 172.30.0.42 }
4354+
add element ip kube-proxy service-ips { 172.30.0.41 . tcp . 80 : goto service-ULMVA6XW-ns1/svc1/tcp/p80 }
4355+
add element ip kube-proxy service-ips { 172.30.0.42 . tcp . 8080 : goto service-MHHHYRWA-ns2/svc2/tcp/p8080 }
4356+
`))
4357+
4358+
if err != nil {
4359+
t.Errorf("nft.ParseDump failed: %v", err)
4360+
}
4361+
4362+
// Create initial state, which differs from the loaded nftables state:
4363+
// - svc1 has a second endpoint
4364+
// - svc3 is added
4365+
makeServiceMap(fp,
4366+
makeTestService("ns1", "svc1", func(svc *v1.Service) {
4367+
svc.Spec.Type = v1.ServiceTypeClusterIP
4368+
svc.Spec.ClusterIP = "172.30.0.41"
4369+
svc.Spec.Ports = []v1.ServicePort{{
4370+
Name: "p80",
4371+
Port: 80,
4372+
Protocol: v1.ProtocolTCP,
4373+
}}
4374+
}),
4375+
makeTestService("ns2", "svc2", func(svc *v1.Service) {
4376+
svc.Spec.Type = v1.ServiceTypeClusterIP
4377+
svc.Spec.ClusterIP = "172.30.0.42"
4378+
svc.Spec.Ports = []v1.ServicePort{{
4379+
Name: "p8080",
4380+
Port: 8080,
4381+
Protocol: v1.ProtocolTCP,
4382+
}}
4383+
}),
4384+
makeTestService("ns3", "svc3", func(svc *v1.Service) {
4385+
svc.Spec.Type = v1.ServiceTypeClusterIP
4386+
svc.Spec.ClusterIP = "172.30.0.43"
4387+
svc.Spec.Ports = []v1.ServicePort{{
4388+
Name: "p80",
4389+
Port: 80,
4390+
Protocol: v1.ProtocolTCP,
4391+
}}
4392+
}),
4393+
)
4394+
4395+
populateEndpointSlices(fp,
4396+
makeTestEndpointSlice("ns1", "svc1", 1, func(eps *discovery.EndpointSlice) {
4397+
eps.AddressType = discovery.AddressTypeIPv4
4398+
eps.Endpoints = []discovery.Endpoint{
4399+
{Addresses: []string{"10.0.1.1"}},
4400+
{Addresses: []string{"10.0.1.2"}},
4401+
}
4402+
eps.Ports = []discovery.EndpointPort{{
4403+
Name: ptr.To("p80"),
4404+
Port: ptr.To[int32](80),
4405+
Protocol: ptr.To(v1.ProtocolTCP),
4406+
}}
4407+
}),
4408+
makeTestEndpointSlice("ns2", "svc2", 1, func(eps *discovery.EndpointSlice) {
4409+
eps.AddressType = discovery.AddressTypeIPv4
4410+
eps.Endpoints = []discovery.Endpoint{{
4411+
Addresses: []string{"10.0.2.1"},
4412+
}}
4413+
eps.Ports = []discovery.EndpointPort{{
4414+
Name: ptr.To("p8080"),
4415+
Port: ptr.To[int32](8080),
4416+
Protocol: ptr.To(v1.ProtocolTCP),
4417+
}}
4418+
}),
4419+
)
4420+
4421+
fp.syncProxyRules()
4422+
4423+
expected := baseRules + dedent.Dedent(`
4424+
add element ip kube-proxy cluster-ips { 172.30.0.41 }
4425+
add element ip kube-proxy cluster-ips { 172.30.0.42 }
4426+
add element ip kube-proxy cluster-ips { 172.30.0.43 }
4427+
add element ip kube-proxy service-ips { 172.30.0.41 . tcp . 80 : goto service-ULMVA6XW-ns1/svc1/tcp/p80 }
4428+
add element ip kube-proxy service-ips { 172.30.0.42 . tcp . 8080 : goto service-MHHHYRWA-ns2/svc2/tcp/p8080 }
4429+
add element ip kube-proxy no-endpoint-services { 172.30.0.43 . tcp . 80 comment "ns3/svc3:p80" : goto reject-chain }
4430+
4431+
add chain ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80
4432+
add rule ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 ip daddr 172.30.0.41 tcp dport 80 ip saddr != 10.0.0.0/8 jump mark-for-masquerade
4433+
add rule ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 numgen random mod 2 vmap { 0 : goto endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 , 1 : goto endpoint-ZCZBVNAZ-ns1/svc1/tcp/p80__10.0.1.2/80 }
4434+
add chain ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80
4435+
add rule ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 ip saddr 10.0.1.1 jump mark-for-masquerade
4436+
add rule ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 meta l4proto tcp dnat to 10.0.1.1:80
4437+
add chain ip kube-proxy endpoint-ZCZBVNAZ-ns1/svc1/tcp/p80__10.0.1.2/80
4438+
add rule ip kube-proxy endpoint-ZCZBVNAZ-ns1/svc1/tcp/p80__10.0.1.2/80 ip saddr 10.0.1.2 jump mark-for-masquerade
4439+
add rule ip kube-proxy endpoint-ZCZBVNAZ-ns1/svc1/tcp/p80__10.0.1.2/80 meta l4proto tcp dnat to 10.0.1.2:80
4440+
4441+
add chain ip kube-proxy service-MHHHYRWA-ns2/svc2/tcp/p8080
4442+
add rule ip kube-proxy service-MHHHYRWA-ns2/svc2/tcp/p8080 ip daddr 172.30.0.42 tcp dport 8080 ip saddr != 10.0.0.0/8 jump mark-for-masquerade
4443+
add rule ip kube-proxy service-MHHHYRWA-ns2/svc2/tcp/p8080 numgen random mod 1 vmap { 0 : goto endpoint-7RVP4LUQ-ns2/svc2/tcp/p8080__10.0.2.1/8080 }
4444+
add chain ip kube-proxy endpoint-7RVP4LUQ-ns2/svc2/tcp/p8080__10.0.2.1/8080
4445+
add rule ip kube-proxy endpoint-7RVP4LUQ-ns2/svc2/tcp/p8080__10.0.2.1/8080 ip saddr 10.0.2.1 jump mark-for-masquerade
4446+
add rule ip kube-proxy endpoint-7RVP4LUQ-ns2/svc2/tcp/p8080__10.0.2.1/8080 meta l4proto tcp dnat to 10.0.2.1:8080
4447+
`)
4448+
assertNFTablesTransactionEqual(t, getLine(), expected, nft.Dump())
4449+
// initial transaction consists of:
4450+
// 1. nft setup, total ops = setupOps
4451+
// 2. services setup (should skip adding existing set/map elements and endpoint chains+rules)
4452+
// - add svc3 IP to the cluster-ips, and to the no-endpoint-services set = 2 ops
4453+
// - add+flush 2 service chains + 2 rules each = 8 ops
4454+
// - add+flush svc1 endpoint chain + 2 rules = 4 ops
4455+
// total: 14 ops
4456+
if nft.LastTransaction.NumOperations() != setupOps+14 {
4457+
fmt.Println(nft.LastTransaction)
4458+
t.Errorf("Expected %v trasaction operations, got %d", setupOps+14, nft.LastTransaction.NumOperations())
4459+
}
4460+
}
4461+
43244462
func TestNoEndpointsMetric(t *testing.T) {
43254463
type endpoint struct {
43264464
ip string

0 commit comments

Comments
 (0)