Skip to content

Commit 6a2f92d

Browse files
committed
resources: add sampler for periodic stat reads
Signed-off-by: Tonis Tiigi <[email protected]>
1 parent 963f161 commit 6a2f92d

File tree

12 files changed

+244
-58
lines changed

12 files changed

+244
-58
lines changed

executor/resources/cpu.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"strings"
99

1010
"github.com/moby/buildkit/executor/resources/types"
11+
"github.com/pkg/errors"
1112
)
1213

1314
const (
@@ -25,6 +26,9 @@ func getCgroupCPUStat(cgroupPath string) (*types.CPUStat, error) {
2526
// Read cpu.stat file
2627
cpuStatFile, err := os.Open(filepath.Join(cgroupPath, "cpu.stat"))
2728
if err != nil {
29+
if errors.Is(err, os.ErrNotExist) {
30+
return nil, nil
31+
}
2832
return nil, err
2933
}
3034
defer cpuStatFile.Close()
@@ -77,6 +81,9 @@ func getCgroupCPUStat(cgroupPath string) (*types.CPUStat, error) {
7781
func parsePressureFile(filename string) (*types.Pressure, error) {
7882
content, err := os.ReadFile(filename)
7983
if err != nil {
84+
if errors.Is(err, os.ErrNotExist) { // pressure file requires CONFIG_PSI
85+
return nil, nil
86+
}
8087
return nil, err
8188
}
8289

executor/resources/io.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ func getCgroupIOStat(cgroupPath string) (*types.IOStat, error) {
2828
ioStatPath := filepath.Join(cgroupPath, ioStatFile)
2929
data, err := os.ReadFile(ioStatPath)
3030
if err != nil {
31+
if errors.Is(err, os.ErrNotExist) {
32+
return nil, nil
33+
}
3134
return nil, errors.Wrapf(err, "failed to read %s", ioStatPath)
3235
}
3336

executor/resources/memory.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ func getCgroupMemoryStat(path string) (*types.MemoryStat, error) {
7878
}
7979
})
8080
if err != nil {
81-
if os.IsNotExist(err) {
81+
if errors.Is(err, os.ErrNotExist) {
8282
return nil, nil
8383
}
8484
return nil, err
@@ -113,7 +113,7 @@ func getCgroupMemoryStat(path string) (*types.MemoryStat, error) {
113113

114114
peak, err := parseSingleValueFile(filepath.Join(path, memoryPeakFile))
115115
if err != nil {
116-
if !os.IsNotExist(err) {
116+
if !errors.Is(err, os.ErrNotExist) {
117117
return nil, err
118118
}
119119
} else {
@@ -122,7 +122,7 @@ func getCgroupMemoryStat(path string) (*types.MemoryStat, error) {
122122

123123
swap, err := parseSingleValueFile(filepath.Join(path, memorySwapCurrentFile))
124124
if err != nil {
125-
if !os.IsNotExist(err) {
125+
if !errors.Is(err, os.ErrNotExist) {
126126
return nil, err
127127
}
128128
} else {
@@ -135,7 +135,7 @@ func getCgroupMemoryStat(path string) (*types.MemoryStat, error) {
135135
func parseKeyValueFile(filePath string, callback func(key string, value uint64)) error {
136136
content, err := os.ReadFile(filePath)
137137
if err != nil {
138-
return errors.Errorf("failed to read %s: %v", filePath, err)
138+
return errors.Wrapf(err, "failed to read %s", filePath)
139139
}
140140

141141
lines := strings.Split(string(content), "\n")
@@ -149,7 +149,7 @@ func parseKeyValueFile(filePath string, callback func(key string, value uint64))
149149
valueStr := fields[1]
150150
value, err := strconv.ParseUint(valueStr, 10, 64)
151151
if err != nil {
152-
return errors.Errorf("failed to parse value for %s: %v", key, err)
152+
return errors.Wrapf(err, "failed to parse value for %s", key)
153153
}
154154

155155
callback(key, value)

executor/resources/monitor.go

Lines changed: 38 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package resources
33
import (
44
"bufio"
55
"context"
6-
"log"
76
"os"
87
"path/filepath"
98
"strconv"
@@ -13,7 +12,7 @@ import (
1312

1413
"github.com/moby/buildkit/executor/resources/types"
1514
"github.com/moby/buildkit/util/network"
16-
"golang.org/x/sys/unix"
15+
"github.com/sirupsen/logrus"
1716
)
1817

1918
const (
@@ -30,8 +29,8 @@ var isCgroupV2 bool
3029
type cgroupRecord struct {
3130
once sync.Once
3231
ns string
33-
release func(context.Context) error
34-
lastSample *types.Sample
32+
sampler *Sub[*types.Sample]
33+
samples []*types.Sample
3534
err error
3635
done chan struct{}
3736
monitor *Monitor
@@ -44,30 +43,40 @@ func (r *cgroupRecord) Wait() error {
4443
return r.err
4544
}
4645

46+
func (r *cgroupRecord) Start() {
47+
s := NewSampler(2*time.Second, r.sample)
48+
r.sampler = s.Record()
49+
}
50+
51+
func (r *cgroupRecord) CloseAsync(next func(context.Context) error) error {
52+
go func() {
53+
r.close()
54+
next(context.TODO())
55+
}()
56+
return nil
57+
}
58+
4759
func (r *cgroupRecord) close() {
4860
r.once.Do(func() {
49-
s, err := r.sample()
50-
if err != nil {
51-
r.err = err
52-
} else {
53-
r.lastSample = s
54-
}
55-
if err := r.release(context.Background()); err != nil {
56-
if r.err == nil {
57-
r.err = err
58-
}
59-
}
60-
close(r.done)
61+
defer close(r.done)
6162
go func() {
6263
r.monitor.mu.Lock()
6364
delete(r.monitor.records, r.ns)
6465
r.monitor.mu.Unlock()
6566
}()
67+
if r.sampler == nil {
68+
return
69+
}
70+
s, err := r.sampler.Close(true)
71+
if err != nil {
72+
r.err = err
73+
} else {
74+
r.samples = s
75+
}
6676
})
6777
}
6878

69-
func (r *cgroupRecord) sample() (*types.Sample, error) {
70-
now := time.Now()
79+
func (r *cgroupRecord) sample(tm time.Time) (*types.Sample, error) {
7180
cpu, err := getCgroupCPUStat(filepath.Join(defaultMountpoint, r.ns))
7281
if err != nil {
7382
return nil, err
@@ -85,7 +94,7 @@ func (r *cgroupRecord) sample() (*types.Sample, error) {
8594
return nil, err
8695
}
8796
sample := &types.Sample{
88-
Timestamp: now,
97+
Timestamp_: tm,
8998
CPUStat: cpu,
9099
MemoryStat: memory,
91100
IOStat: io,
@@ -106,7 +115,7 @@ func (r *cgroupRecord) Samples() ([]*types.Sample, error) {
106115
if r.err != nil {
107116
return nil, r.err
108117
}
109-
return []*types.Sample{r.lastSample}, nil
118+
return r.samples, nil
110119
}
111120

112121
type nopRecord struct {
@@ -120,6 +129,13 @@ func (r *nopRecord) Samples() ([]*types.Sample, error) {
120129
return nil, nil
121130
}
122131

132+
func (r *nopRecord) CloseAsync(next func(context.Context) error) error {
133+
return next(context.TODO())
134+
}
135+
136+
func (r *nopRecord) Start() {
137+
}
138+
123139
type Monitor struct {
124140
mu sync.Mutex
125141
closed chan struct{}
@@ -131,7 +147,6 @@ type NetworkSampler interface {
131147
}
132148

133149
type RecordOpt struct {
134-
Release func(context.Context) error
135150
NetworkSampler NetworkSampler
136151
}
137152

@@ -143,22 +158,17 @@ func (m *Monitor) RecordNamespace(ns string, opt RecordOpt) (types.Recorder, err
143158
default:
144159
}
145160
if !isCgroupV2 || isClosed {
146-
if err := opt.Release(context.TODO()); err != nil {
147-
return nil, err
148-
}
149161
return &nopRecord{}, nil
150162
}
151163
r := &cgroupRecord{
152164
ns: ns,
153-
release: opt.Release,
154165
done: make(chan struct{}),
155166
monitor: m,
156167
netSampler: opt.NetworkSampler,
157168
}
158169
m.mu.Lock()
159170
m.records[ns] = r
160171
m.mu.Unlock()
161-
go r.close()
162172
return r, nil
163173
}
164174

@@ -180,7 +190,7 @@ func NewMonitor() (*Monitor, error) {
180190
return
181191
}
182192
if err := prepareCgroupControllers(); err != nil {
183-
log.Printf("failed to prepare cgroup controllers: %+v", err)
193+
logrus.Warnf("failed to prepare cgroup controllers: %+v", err)
184194
}
185195
})
186196

@@ -225,17 +235,8 @@ func prepareCgroupControllers() error {
225235
}
226236
if err := os.WriteFile(filepath.Join(defaultMountpoint, cgroupSubtreeFile), []byte("+"+c), 0); err != nil {
227237
// ignore error
228-
log.Printf("failed to enable cgroup controller %q: %+v", c, err)
238+
logrus.Warnf("failed to enable cgroup controller %q: %+v", c, err)
229239
}
230240
}
231241
return nil
232242
}
233-
234-
func isCgroup2() bool {
235-
var st unix.Statfs_t
236-
err := unix.Statfs(defaultMountpoint, &st)
237-
if err != nil {
238-
return false
239-
}
240-
return st.Type == unix.CGROUP2_SUPER_MAGIC
241-
}

executor/resources/monitor_linux.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
//go:build linux
2+
// +build linux
3+
4+
package resources
5+
6+
import "golang.org/x/sys/unix"
7+
8+
func isCgroup2() bool {
9+
var st unix.Statfs_t
10+
err := unix.Statfs(defaultMountpoint, &st)
11+
if err != nil {
12+
return false
13+
}
14+
return st.Type == unix.CGROUP2_SUPER_MAGIC
15+
}

executor/resources/monitor_nolinux.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
//go:build !linux
2+
// +build !linux
3+
4+
package resources
5+
6+
func isCgroup2() bool {
7+
return false
8+
}

executor/resources/pids.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ func getCgroupPIDsStat(path string) (*types.PIDsStat, error) {
1919

2020
v, err := parseSingleValueFile(filepath.Join(path, pidsCurrentFile))
2121
if err != nil {
22-
if !os.IsNotExist(err) {
22+
if !errors.Is(err, os.ErrNotExist) {
2323
return nil, err
2424
}
2525
} else {

0 commit comments

Comments
 (0)