Skip to content

Commit 208578e

Browse files
committed
cgroup2: fix event loss and resource issues in EventChan
Fix multiple issues in EventChan that caused event loss and inefficient resource usage when monitoring cgroup memory events. 1. Use non-blocking I/O for efficient event monitoring EventChan previously used blocking file descriptors to watch memory.events, causing goroutines to block indefinitely when no events occurred. This wasted system resources by holding threads without doing useful work. Switch to non-blocking I/O to allow goroutines to efficiently wait for events without consuming blocked threads. 2. Fix race condition causing missed OOM kill events A race condition could cause OOM kill events to be missed when they occurred between reading memory.events and checking if the cgroup was empty: - T1: Receive modified event for memory.events - T2: Read memory.events (no oom_kill event present yet) - T3: Process in cgroup is OOM-killed - T4: No running processes remain in cgroup - T5: isCgroupEmpty() returns true - T6: EventChan exits, missing the OOM kill event Fix by checking isCgroupEmpty() before reading memory.events to ensure events are not lost. 3. Initialize inotify watch before returning Ensure inotify watches are fully initialized before EventChan returns to prevent races where events could be missed during initialization. Signed-off-by: Wei Fu <[email protected]>
1 parent 751ce28 commit 208578e

File tree

2 files changed

+120
-20
lines changed

2 files changed

+120
-20
lines changed

cgroup2/manager.go

Lines changed: 60 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -764,32 +764,69 @@ func (c *Manager) MemoryEventFD() (int, uint32, error) {
764764
return fd, uint32(wd), nil
765765
}
766766

767-
func (c *Manager) EventChan() (<-chan Event, <-chan error) {
768-
ec := make(chan Event)
769-
errCh := make(chan error, 1)
770-
go c.waitForEvents(ec, errCh)
767+
// memoryEventNonBlockFD returns a non-blocking inotify file descriptor monitoring memory.events.
768+
//
769+
// NOTE: Block FD is expensive because unix.Read will block that thread once there is
770+
// available data to read. In high scale scenarios, it will create a lot of threads.
771+
func (c *Manager) memoryEventNonBlockFD() (_ *os.File, retErr error) {
771772

772-
return ec, errCh
773+
rawFd, err := unix.InotifyInit1(unix.IN_CLOEXEC | unix.IN_NONBLOCK)
774+
if err != nil {
775+
return nil, fmt.Errorf("failed to create inotify fd: %w", err)
776+
}
777+
778+
fd := os.NewFile(uintptr(rawFd), "inotifyfd")
779+
defer func() {
780+
if retErr != nil {
781+
fd.Close()
782+
}
783+
}()
784+
785+
fpath := filepath.Join(c.path, "memory.events")
786+
if _, err := unix.InotifyAddWatch(rawFd, fpath, unix.IN_MODIFY); err != nil {
787+
return nil, fmt.Errorf("failed to add inotify watch for %q: %w", fpath, err)
788+
}
789+
790+
// monitor to detect process exit/cgroup deletion
791+
evpath := filepath.Join(c.path, "cgroup.events")
792+
if _, err = unix.InotifyAddWatch(rawFd, evpath, unix.IN_MODIFY); err != nil {
793+
return nil, fmt.Errorf("failed to add inotify watch for %q: %w", evpath, err)
794+
}
795+
return fd, nil
773796
}
774797

775-
func (c *Manager) waitForEvents(ec chan<- Event, errCh chan<- error) {
776-
defer close(errCh)
798+
func (c *Manager) EventChan() (<-chan Event, <-chan error) {
799+
ec := make(chan Event, 1)
800+
errCh := make(chan error, 1)
777801

778-
fd, _, err := c.MemoryEventFD()
802+
fd, err := c.memoryEventNonBlockFD()
779803
if err != nil {
780804
errCh <- err
781-
return
805+
return ec, errCh
782806
}
783-
defer unix.Close(fd)
784807

785-
for {
786-
buffer := make([]byte, unix.SizeofInotifyEvent*10)
787-
bytesRead, err := unix.Read(fd, buffer)
788-
if err != nil {
789-
errCh <- err
790-
return
791-
}
792-
if bytesRead >= unix.SizeofInotifyEvent {
808+
go func() {
809+
defer close(errCh)
810+
defer fd.Close()
811+
812+
for {
813+
buffer := make([]byte, unix.SizeofInotifyEvent*10)
814+
bytesRead, err := fd.Read(buffer)
815+
if err != nil {
816+
errCh <- err
817+
return
818+
}
819+
820+
if bytesRead < unix.SizeofInotifyEvent {
821+
continue
822+
}
823+
824+
// Check cgroup.events first
825+
shouldExit := false
826+
if c.isCgroupEmpty() {
827+
shouldExit = true
828+
}
829+
793830
out := make(map[string]uint64)
794831
if err := readKVStatsFile(c.path, "memory.events", out); err != nil {
795832
// When cgroup is deleted read may return -ENODEV instead of -ENOENT from open.
@@ -798,18 +835,21 @@ func (c *Manager) waitForEvents(ec chan<- Event, errCh chan<- error) {
798835
}
799836
return
800837
}
838+
801839
ec <- Event{
802840
Low: out["low"],
803841
High: out["high"],
804842
Max: out["max"],
805843
OOM: out["oom"],
806844
OOMKill: out["oom_kill"],
807845
}
808-
if c.isCgroupEmpty() {
846+
847+
if shouldExit {
809848
return
810849
}
811850
}
812-
}
851+
}()
852+
return ec, errCh
813853
}
814854

815855
func setDevices(path string, devices []specs.LinuxDeviceCgroup) error {

cgroup2/manager_test.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,62 @@ func TestEventChanCleanupOnCgroupRemoval(t *testing.T) {
118118
goleak.VerifyNone(t)
119119
}
120120

121+
func TestEventChanCleanupAfterOOMKill(t *testing.T) {
122+
checkCgroupMode(t)
123+
124+
groupPath := fmt.Sprintf("/testing-oom-watcher-%d", time.Now().UnixNano())
125+
c, err := NewManager(defaultCgroup2Path, groupPath,
126+
&Resources{
127+
Memory: &Memory{
128+
Max: toPtr(int64(15 * 1024 * 1024)), // 15MB
129+
Swap: toPtr(int64(15 * 1024 * 1024)), // 15MB
130+
},
131+
},
132+
)
133+
require.NoError(t, err, "failed to init new cgroup manager")
134+
defer func() {
135+
require.NoError(t, c.Delete())
136+
}()
137+
138+
cgroupFD, err := os.Open(c.path)
139+
require.NoError(t, err, "failed to open cgroup path")
140+
defer cgroupFD.Close()
141+
142+
evCh, errCh := c.EventChan()
143+
144+
cmd := exec.Command("dd", "if=/dev/zero", "of=/dev/null", "bs=64M")
145+
cmd.SysProcAttr = &syscall.SysProcAttr{
146+
Pdeathsig: syscall.SIGKILL,
147+
UseCgroupFD: true,
148+
CgroupFD: int(cgroupFD.Fd()),
149+
}
150+
151+
err = cmd.Start()
152+
require.NoError(t, err, "failed to start dd process")
153+
154+
err = cmd.Wait()
155+
require.Error(t, err)
156+
157+
for ev := range evCh {
158+
t.Logf("Received memory event: %+v", ev)
159+
if ev.OOMKill > 0 {
160+
break
161+
}
162+
}
163+
164+
done := false
165+
for !done {
166+
select {
167+
case err := <-errCh:
168+
require.NoError(t, err, "unexpected error on error channel")
169+
done = true
170+
case <-time.After(5 * time.Second):
171+
t.Fatal("Timed out")
172+
}
173+
}
174+
goleak.VerifyNone(t)
175+
}
176+
121177
func TestSystemdFullPath(t *testing.T) {
122178
tests := []struct {
123179
inputSlice string
@@ -406,3 +462,7 @@ func BenchmarkStat(b *testing.B) {
406462
require.NoError(b, err)
407463
}
408464
}
465+
466+
func toPtr[T any](v T) *T {
467+
return &v
468+
}

0 commit comments

Comments
 (0)