Skip to content

Commit f954e86

Browse files
rbtrcamrynl
andauthored
backport "feat: CNS/CNI async pod delete (#2183)" to release/v1.4 (#2245)
feat: CNS/CNI async pod delete (#2183) * initial changes for cni/cns delete deadlock * add logs and set watcher path * working fswatcher, removing extra debug lines * watcher changes for azure-ipam * remove additional logger from fsnotify and address comments * /deleteIDs directory as part of cnsconfig * add feature flag for async delete * adds some unit test + remove changes for azure-ipam(split pr, dependency conflicts) * update ut * update uts * swift configmap update * fix configmap for test * addressing comments * fix lint * adding cause to connection error struct * connectionerr lint * addressing comments, change watchfs to watcher method * add ctx to releaseIP func * log containerID in failure to add watcher, exit select if context is cancelled * fix logs in network.go after rebase * catch release ip error in invoker_cns.go * retry on failure to release ip * lint fix * rework asyncdelete watcher * include podinterfaceID in file for releaseIP * close file before delete --------- Signed-off-by: Evan Baker <[email protected]> Co-authored-by: Camryn Lee <[email protected]>
1 parent a3592c0 commit f954e86

File tree

8 files changed

+378
-14
lines changed

8 files changed

+378
-14
lines changed

cni/network/invoker_cns.go

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ import (
99
"github.com/Azure/azure-container-networking/cni"
1010
"github.com/Azure/azure-container-networking/cni/util"
1111
"github.com/Azure/azure-container-networking/cns"
12+
cnscli "github.com/Azure/azure-container-networking/cns/client"
13+
"github.com/Azure/azure-container-networking/cns/fsnotify"
1214
"github.com/Azure/azure-container-networking/iptables"
1315
"github.com/Azure/azure-container-networking/log"
1416
"github.com/Azure/azure-container-networking/network"
@@ -17,11 +19,13 @@ import (
1719
cniTypes "github.com/containernetworking/cni/pkg/types"
1820
cniTypesCurr "github.com/containernetworking/cni/pkg/types/100"
1921
"github.com/pkg/errors"
22+
"go.uber.org/zap"
2023
)
2124

2225
var (
2326
errEmptyCNIArgs = errors.New("empty CNI cmd args not allowed")
2427
errInvalidArgs = errors.New("invalid arg(s)")
28+
watcherPath = "/var/run/azure-vnet/deleteIDs"
2529
)
2630

2731
type CNSIPAMInvoker struct {
@@ -242,8 +246,19 @@ func (invoker *CNSIPAMInvoker) Delete(address *net.IPNet, nwCfg *cni.NetworkConf
242246
}
243247

244248
if err := invoker.cnsClient.ReleaseIPAddress(context.TODO(), req); err != nil {
245-
return errors.Wrap(err, fmt.Sprintf("failed to release IP %v with err ", address)+"%w")
249+
var connectionErr *cnscli.ConnectionFailureErr
250+
if errors.As(err, &connectionErr) {
251+
addErr := fsnotify.AddFile(req.PodInterfaceID, args.ContainerID, watcherPath)
252+
if addErr != nil {
253+
log.Errorf("Failed to add file to watcher", zap.String("podInterfaceID", req.PodInterfaceID), zap.String("containerID", args.ContainerID), zap.Error(addErr))
254+
return errors.Wrap(addErr, fmt.Sprintf("failed to add file to watcher with containerID %s and podInterfaceID %s", args.ContainerID, req.PodInterfaceID))
255+
}
256+
return nil
257+
}
258+
log.Errorf("Failed to release IP address",
259+
zap.String("infracontainerid", req.InfraContainerID),
260+
zap.Error(err))
261+
return errors.Wrap(err, fmt.Sprintf("failed to release IP %v using ReleaseIPs with err ", req.DesiredIPAddress)+"%w")
246262
}
247-
248263
return nil
249264
}

cns/client/client.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,14 @@ type Client struct {
5454
routes map[string]url.URL
5555
}
5656

57+
type ConnectionFailureErr struct {
58+
cause error
59+
}
60+
61+
func (e *ConnectionFailureErr) Error() string {
62+
return e.cause.Error()
63+
}
64+
5765
// New returns a new CNS client configured with the passed URL and timeout.
5866
func New(baseURL string, requestTimeout time.Duration) (*Client, error) {
5967
if baseURL == "" {
@@ -291,7 +299,9 @@ func (c *Client) ReleaseIPAddress(ctx context.Context, ipconfig cns.IPConfigRequ
291299
req.Header.Set(headerContentType, contentTypeJSON)
292300
res, err := c.client.Do(req)
293301
if err != nil {
294-
return errors.Wrap(err, "http request failed")
302+
return &ConnectionFailureErr{
303+
cause: err,
304+
}
295305
}
296306
defer res.Body.Close()
297307

cns/configuration/configuration.go

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
// Copyright Microsoft. All rights reserved.
21
package configuration
32

43
import (
@@ -20,12 +19,24 @@ const (
2019
)
2120

2221
type CNSConfig struct {
22+
AsyncPodDeletePath string
23+
CNIConflistFilepath string
24+
CNIConflistScenario string
2325
ChannelMode string
26+
EnableAsyncPodDelete bool
27+
EnableCNIConflistGeneration bool
2428
EnablePprof bool
2529
EnableSubnetScarcity bool
30+
EnableSwiftV2 bool
2631
InitializeFromCNI bool
32+
KeyVaultSettings KeyVaultSettings
33+
MSISettings MSISettings
34+
ManageEndpointState bool
2735
ManagedSettings ManagedSettings
36+
MellanoxMonitorIntervalSecs int
2837
MetricsBindAddress string
38+
PopulateHomeAzCacheRetryIntervalSecs int
39+
ProgramSNATIPTables bool
2940
SyncHostNCTimeoutMs int
3041
SyncHostNCVersionIntervalMs int
3142
TLSCertificatePath string
@@ -34,16 +45,8 @@ type CNSConfig struct {
3445
TLSSubjectName string
3546
TelemetrySettings TelemetrySettings
3647
UseHTTPS bool
48+
WatchPods bool
3749
WireserverIP string
38-
KeyVaultSettings KeyVaultSettings
39-
MSISettings MSISettings
40-
ProgramSNATIPTables bool
41-
ManageEndpointState bool
42-
CNIConflistScenario string
43-
EnableCNIConflistGeneration bool
44-
CNIConflistFilepath string
45-
PopulateHomeAzCacheRetryIntervalSecs int
46-
MellanoxMonitorIntervalSecs int
4750
}
4851

4952
type TelemetrySettings struct {
@@ -196,4 +199,7 @@ func SetCNSConfigDefaults(config *CNSConfig) {
196199
if config.WireserverIP == "" {
197200
config.WireserverIP = "168.63.129.16"
198201
}
202+
if config.AsyncPodDeletePath == "" {
203+
config.AsyncPodDeletePath = "/var/run/azure-vnet/deleteIDs"
204+
}
199205
}

cns/configuration/configuration_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,7 @@ func TestSetCNSConfigDefaults(t *testing.T) {
208208
},
209209
PopulateHomeAzCacheRetryIntervalSecs: 15,
210210
WireserverIP: "168.63.129.16",
211+
AsyncPodDeletePath: "/var/run/azure-vnet/deleteIDs",
211212
},
212213
},
213214
{
@@ -252,6 +253,7 @@ func TestSetCNSConfigDefaults(t *testing.T) {
252253
},
253254
PopulateHomeAzCacheRetryIntervalSecs: 10,
254255
WireserverIP: "168.63.129.16",
256+
AsyncPodDeletePath: "/var/run/azure-vnet/deleteIDs",
255257
},
256258
},
257259
}

cns/fsnotify/fsnotify.go

Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
package fsnotify
2+
3+
import (
4+
"context"
5+
"io"
6+
"os"
7+
"sync"
8+
"time"
9+
10+
"github.com/Azure/azure-container-networking/cns"
11+
"github.com/fsnotify/fsnotify"
12+
"github.com/pkg/errors"
13+
"go.uber.org/zap"
14+
)
15+
16+
type releaseIPsClient interface {
17+
ReleaseIPAddress(ctx context.Context, ipconfig cns.IPConfigRequest) error
18+
}
19+
20+
type watcher struct {
21+
cli releaseIPsClient
22+
path string
23+
log *zap.Logger
24+
25+
pendingDelete map[string]struct{}
26+
lock sync.Mutex
27+
}
28+
29+
// Create the AsyncDelete watcher.
30+
func New(cli releaseIPsClient, path string, logger *zap.Logger) *watcher { //nolint
31+
// Add directory where intended deletes are kept
32+
if err := os.Mkdir(path, 0o755); err != nil { //nolint
33+
logger.Error("error making directory", zap.String("path", path), zap.Error(err))
34+
}
35+
return &watcher{
36+
cli: cli,
37+
path: path,
38+
log: logger,
39+
pendingDelete: make(map[string]struct{}),
40+
}
41+
}
42+
43+
// releaseAll locks and iterates the pendingDeletes map and calls CNS to
44+
// release the IP for any Pod containerIDs present. When an IP is released
45+
// that entry is removed from the map and the file is deleted. If the file
46+
// fails to delete, we still remove it from the map so that we don't retry
47+
// it during the life of this process, but we may retry it on a subsequent
48+
// invocation of CNS. This is okay because calling releaseIP on an already
49+
// processed containerID is a no-op, and we may be able to delete the file
50+
// during that future retry.
51+
func (w *watcher) releaseAll(ctx context.Context) {
52+
w.lock.Lock()
53+
defer w.lock.Unlock()
54+
for containerID := range w.pendingDelete {
55+
// read file contents
56+
filepath := w.path + "/" + containerID
57+
file, err := os.Open(filepath)
58+
if err != nil {
59+
w.log.Error("failed to open file", zap.Error(err))
60+
}
61+
62+
data, errReadingFile := io.ReadAll(file)
63+
if errReadingFile != nil {
64+
w.log.Error("failed to read file content", zap.Error(errReadingFile))
65+
}
66+
file.Close()
67+
podInterfaceID := string(data)
68+
69+
w.log.Info("releasing IP for missed delete", zap.String("podInterfaceID", podInterfaceID), zap.String("containerID", containerID))
70+
if err := w.releaseIP(ctx, podInterfaceID, containerID); err != nil {
71+
w.log.Error("failed to release IP for missed delete", zap.String("containerID", containerID), zap.Error(err))
72+
continue
73+
}
74+
w.log.Info("successfully released IP for missed delete", zap.String("containerID", containerID))
75+
delete(w.pendingDelete, containerID)
76+
if err := removeFile(containerID, w.path); err != nil {
77+
w.log.Error("failed to remove file for missed delete", zap.Error(err))
78+
}
79+
}
80+
}
81+
82+
// watchPendingDelete periodically checks the map for pending release IPs
83+
// and calls releaseAll to process the contents when present.
84+
func (w *watcher) watchPendingDelete(ctx context.Context) error {
85+
ticker := time.NewTicker(15 * time.Second) //nolint
86+
defer ticker.Stop()
87+
for {
88+
select {
89+
case <-ctx.Done():
90+
return errors.Wrap(ctx.Err(), "exiting watchPendingDelete")
91+
case <-ticker.C:
92+
n := len(w.pendingDelete)
93+
if n == 0 {
94+
continue
95+
}
96+
w.log.Info("processing pending missed deletes", zap.Int("count", n))
97+
w.releaseAll(ctx)
98+
}
99+
}
100+
}
101+
102+
// watchFS starts the fsnotify watcher and handles events for file creation
103+
// or deletion in the missed pending delete directory. A file creation event
104+
// indicates that CNS missed the delete call for a containerID and needs
105+
// to process the release IP asynchronously.
106+
func (w *watcher) watchFS(ctx context.Context) error {
107+
// Create new fs watcher.
108+
watcher, err := fsnotify.NewWatcher()
109+
if err != nil {
110+
return errors.Wrap(err, "error creating fsnotify watcher")
111+
}
112+
defer watcher.Close()
113+
114+
err = watcher.Add(w.path)
115+
if err != nil {
116+
w.log.Error("failed to add path to fsnotify watcher", zap.String("path", w.path), zap.Error(err))
117+
}
118+
// Start listening for events.
119+
w.log.Info("listening for events from fsnotify watcher")
120+
for {
121+
select {
122+
case <-ctx.Done():
123+
return errors.Wrap(ctx.Err(), "exiting watchFS")
124+
case event, ok := <-watcher.Events:
125+
if !ok {
126+
return errors.New("fsnotify watcher closed")
127+
}
128+
if !event.Has(fsnotify.Create) {
129+
// discard any event that is not a file Create
130+
continue
131+
}
132+
w.log.Info("received create event", zap.String("event", event.Name))
133+
w.lock.Lock()
134+
w.pendingDelete[event.Name] = struct{}{}
135+
w.lock.Unlock()
136+
case watcherErr := <-watcher.Errors:
137+
w.log.Error("fsnotify watcher error", zap.Error(watcherErr))
138+
}
139+
}
140+
}
141+
142+
// readFS lists the directory and enqueues any missed deletes that are already
143+
// present on-disk.
144+
func (w *watcher) readFS() error {
145+
w.log.Info("listing directory", zap.String("path", w.path))
146+
dirContents, err := os.ReadDir(w.path)
147+
if err != nil {
148+
w.log.Error("error reading deleteID directory", zap.String("path", w.path), zap.Error(err))
149+
return errors.Wrapf(err, "failed to read %s", w.path)
150+
}
151+
if len(dirContents) == 0 {
152+
w.log.Info("no missed deletes found")
153+
return nil
154+
}
155+
w.lock.Lock()
156+
for _, file := range dirContents {
157+
w.log.Info("adding missed delete from file", zap.String("name", file.Name()))
158+
w.pendingDelete[file.Name()] = struct{}{}
159+
}
160+
w.lock.Unlock()
161+
return nil
162+
}
163+
164+
// WatchFS starts the filesystem watcher to handle async Pod deletes.
165+
// Blocks until the context is closed; returns underlying fsnotify errors
166+
// if something goes fatally wrong.
167+
func (w *watcher) Start(ctx context.Context) error {
168+
errs := make(chan error)
169+
// Start watching for enqueued missed deletes so that we process them as soon as they arrive.
170+
go func(errs chan<- error) {
171+
errs <- w.watchPendingDelete(ctx)
172+
}(errs)
173+
174+
// Start watching for changes to the filesystem so that we don't miss any async deletes.
175+
go func(errs chan<- error) {
176+
errs <- w.watchFS(ctx)
177+
}(errs)
178+
179+
// Read the directory to enqueue any missed deletes that are already present on-disk.
180+
if err := w.readFS(); err != nil {
181+
return err
182+
}
183+
184+
// block until one of the goroutines returns an error
185+
err := <-errs
186+
return err
187+
}
188+
189+
// AddFile creates new file using the containerID as name
190+
func AddFile(podInterfaceID, containerID, path string) error {
191+
filepath := path + "/" + containerID
192+
f, err := os.Create(filepath)
193+
if err != nil {
194+
return errors.Wrap(err, "error creating file")
195+
}
196+
_, writeErr := f.WriteString(podInterfaceID)
197+
if writeErr != nil {
198+
return errors.Wrap(writeErr, "error writing to file")
199+
}
200+
return errors.Wrap(f.Close(), "error adding file to directory")
201+
}
202+
203+
// removeFile removes the file based on containerID
204+
func removeFile(containerID, path string) error {
205+
filepath := path + "/" + containerID
206+
if err := os.Remove(filepath); err != nil {
207+
return errors.Wrap(err, "error deleting file")
208+
}
209+
return nil
210+
}
211+
212+
// call cns ReleaseIPs
213+
func (w *watcher) releaseIP(ctx context.Context, podInterfaceID, containerID string) error {
214+
ipconfigreq := &cns.IPConfigRequest{
215+
PodInterfaceID: podInterfaceID,
216+
InfraContainerID: containerID,
217+
}
218+
return errors.Wrap(w.cli.ReleaseIPAddress(ctx, *ipconfigreq), "failed to release IP from CNS")
219+
}

0 commit comments

Comments
 (0)