Skip to content

Commit fa024e5

Browse files
authored
Merge pull request #374 from fuweid/fix-missing-event-issue
cgroup2: fix event loss and resource issues in EventChan
2 parents 6b761e5 + 208578e commit fa024e5

File tree

2 files changed

+120
-20
lines changed

2 files changed

+120
-20
lines changed

cgroup2/manager.go

Lines changed: 60 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -764,32 +764,69 @@ func (c *Manager) MemoryEventFD() (int, uint32, error) {
764764
return fd, uint32(wd), nil
765765
}
766766

767-
func (c *Manager) EventChan() (<-chan Event, <-chan error) {
768-
ec := make(chan Event)
769-
errCh := make(chan error, 1)
770-
go c.waitForEvents(ec, errCh)
767+
// memoryEventNonBlockFD returns a non-blocking inotify file descriptor monitoring memory.events.
768+
//
769+
// NOTE: Block FD is expensive because unix.Read will block that thread once there is
770+
// available data to read. In high scale scenarios, it will create a lot of threads.
771+
func (c *Manager) memoryEventNonBlockFD() (_ *os.File, retErr error) {
771772

772-
return ec, errCh
773+
rawFd, err := unix.InotifyInit1(unix.IN_CLOEXEC | unix.IN_NONBLOCK)
774+
if err != nil {
775+
return nil, fmt.Errorf("failed to create inotify fd: %w", err)
776+
}
777+
778+
fd := os.NewFile(uintptr(rawFd), "inotifyfd")
779+
defer func() {
780+
if retErr != nil {
781+
fd.Close()
782+
}
783+
}()
784+
785+
fpath := filepath.Join(c.path, "memory.events")
786+
if _, err := unix.InotifyAddWatch(rawFd, fpath, unix.IN_MODIFY); err != nil {
787+
return nil, fmt.Errorf("failed to add inotify watch for %q: %w", fpath, err)
788+
}
789+
790+
// monitor to detect process exit/cgroup deletion
791+
evpath := filepath.Join(c.path, "cgroup.events")
792+
if _, err = unix.InotifyAddWatch(rawFd, evpath, unix.IN_MODIFY); err != nil {
793+
return nil, fmt.Errorf("failed to add inotify watch for %q: %w", evpath, err)
794+
}
795+
return fd, nil
773796
}
774797

775-
func (c *Manager) waitForEvents(ec chan<- Event, errCh chan<- error) {
776-
defer close(errCh)
798+
func (c *Manager) EventChan() (<-chan Event, <-chan error) {
799+
ec := make(chan Event, 1)
800+
errCh := make(chan error, 1)
777801

778-
fd, _, err := c.MemoryEventFD()
802+
fd, err := c.memoryEventNonBlockFD()
779803
if err != nil {
780804
errCh <- err
781-
return
805+
return ec, errCh
782806
}
783-
defer unix.Close(fd)
784807

785-
for {
786-
buffer := make([]byte, unix.SizeofInotifyEvent*10)
787-
bytesRead, err := unix.Read(fd, buffer)
788-
if err != nil {
789-
errCh <- err
790-
return
791-
}
792-
if bytesRead >= unix.SizeofInotifyEvent {
808+
go func() {
809+
defer close(errCh)
810+
defer fd.Close()
811+
812+
for {
813+
buffer := make([]byte, unix.SizeofInotifyEvent*10)
814+
bytesRead, err := fd.Read(buffer)
815+
if err != nil {
816+
errCh <- err
817+
return
818+
}
819+
820+
if bytesRead < unix.SizeofInotifyEvent {
821+
continue
822+
}
823+
824+
// Check cgroup.events first
825+
shouldExit := false
826+
if c.isCgroupEmpty() {
827+
shouldExit = true
828+
}
829+
793830
out := make(map[string]uint64)
794831
if err := readKVStatsFile(c.path, "memory.events", out); err != nil {
795832
// When cgroup is deleted read may return -ENODEV instead of -ENOENT from open.
@@ -798,18 +835,21 @@ func (c *Manager) waitForEvents(ec chan<- Event, errCh chan<- error) {
798835
}
799836
return
800837
}
838+
801839
ec <- Event{
802840
Low: out["low"],
803841
High: out["high"],
804842
Max: out["max"],
805843
OOM: out["oom"],
806844
OOMKill: out["oom_kill"],
807845
}
808-
if c.isCgroupEmpty() {
846+
847+
if shouldExit {
809848
return
810849
}
811850
}
812-
}
851+
}()
852+
return ec, errCh
813853
}
814854

815855
func setDevices(path string, devices []specs.LinuxDeviceCgroup) error {

cgroup2/manager_test.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,62 @@ func TestEventChanCleanupOnCgroupRemoval(t *testing.T) {
118118
goleak.VerifyNone(t)
119119
}
120120

121+
func TestEventChanCleanupAfterOOMKill(t *testing.T) {
122+
checkCgroupMode(t)
123+
124+
groupPath := fmt.Sprintf("/testing-oom-watcher-%d", time.Now().UnixNano())
125+
c, err := NewManager(defaultCgroup2Path, groupPath,
126+
&Resources{
127+
Memory: &Memory{
128+
Max: toPtr(int64(15 * 1024 * 1024)), // 15MB
129+
Swap: toPtr(int64(15 * 1024 * 1024)), // 15MB
130+
},
131+
},
132+
)
133+
require.NoError(t, err, "failed to init new cgroup manager")
134+
defer func() {
135+
require.NoError(t, c.Delete())
136+
}()
137+
138+
cgroupFD, err := os.Open(c.path)
139+
require.NoError(t, err, "failed to open cgroup path")
140+
defer cgroupFD.Close()
141+
142+
evCh, errCh := c.EventChan()
143+
144+
cmd := exec.Command("dd", "if=/dev/zero", "of=/dev/null", "bs=64M")
145+
cmd.SysProcAttr = &syscall.SysProcAttr{
146+
Pdeathsig: syscall.SIGKILL,
147+
UseCgroupFD: true,
148+
CgroupFD: int(cgroupFD.Fd()),
149+
}
150+
151+
err = cmd.Start()
152+
require.NoError(t, err, "failed to start dd process")
153+
154+
err = cmd.Wait()
155+
require.Error(t, err)
156+
157+
for ev := range evCh {
158+
t.Logf("Received memory event: %+v", ev)
159+
if ev.OOMKill > 0 {
160+
break
161+
}
162+
}
163+
164+
done := false
165+
for !done {
166+
select {
167+
case err := <-errCh:
168+
require.NoError(t, err, "unexpected error on error channel")
169+
done = true
170+
case <-time.After(5 * time.Second):
171+
t.Fatal("Timed out")
172+
}
173+
}
174+
goleak.VerifyNone(t)
175+
}
176+
121177
func TestSystemdFullPath(t *testing.T) {
122178
tests := []struct {
123179
inputSlice string
@@ -406,3 +462,7 @@ func BenchmarkStat(b *testing.B) {
406462
require.NoError(b, err)
407463
}
408464
}
465+
466+
func toPtr[T any](v T) *T {
467+
return &v
468+
}

0 commit comments

Comments
 (0)