Skip to content

Commit 125ec14

Browse files
committed
metrics: collect mount items for detail metrics
Introduce a new Prometheus metric for tracking detailed information about mounted items and refines the cache manager to export richer metrics. It also adds corresponding tests to ensure correct metric collection. The changes enhance observability of mounted volumes, distinguishing between static, dynamic, and inline models. Additionally, it improves the cache size calculation method to avoid double-counting file sizes from hardlinks. Signed-off-by: imeoer <[email protected]>
1 parent 60f684c commit 125ec14

File tree

7 files changed

+252
-27
lines changed

7 files changed

+252
-27
lines changed

go.mod

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ require (
1818
github.com/opencontainers/image-spec v1.1.1
1919
github.com/pkg/errors v0.9.1
2020
github.com/prometheus/client_golang v1.22.0
21+
github.com/prometheus/client_model v0.6.2
2122
github.com/rexray/gocsi v1.2.2
2223
github.com/sirupsen/logrus v1.9.3
2324
github.com/stretchr/testify v1.11.1
@@ -86,6 +87,7 @@ require (
8687
github.com/kevinburke/ssh_config v1.2.0 // indirect
8788
github.com/klauspost/compress v1.18.0 // indirect
8889
github.com/klauspost/cpuid/v2 v2.3.0 // indirect
90+
github.com/kylelemons/godebug v1.1.0 // indirect
8991
github.com/labstack/gommon v0.4.2 // indirect
9092
github.com/libgit2/git2go/v34 v34.0.0 // indirect
9193
github.com/mailru/easyjson v0.7.7 // indirect
@@ -99,7 +101,6 @@ require (
99101
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
100102
github.com/pjbgf/sha1cd v0.3.2 // indirect
101103
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
102-
github.com/prometheus/client_model v0.6.2 // indirect
103104
github.com/prometheus/common v0.63.0 // indirect
104105
github.com/prometheus/procfs v0.16.1 // indirect
105106
github.com/rivo/uniseg v0.4.7 // indirect

pkg/metrics/mount_collector.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package metrics
2+
3+
import (
4+
"sync/atomic"
5+
6+
"github.com/prometheus/client_golang/prometheus"
7+
)
8+
9+
type MountItem struct {
10+
Reference string
11+
Type string
12+
VolumeName string
13+
MountID string
14+
}
15+
16+
type MountItemCollector struct {
17+
desc *prometheus.Desc
18+
items atomic.Value // stores []MountItem
19+
}
20+
21+
func NewMountItemCollector() *MountItemCollector {
22+
c := &MountItemCollector{
23+
desc: prometheus.NewDesc(
24+
Prefix+"mount_item",
25+
"Mounted item list (static, inline, dynamic types), value is always 1 for existing items.",
26+
[]string{"reference", "type", "volume_name", "mount_id"},
27+
nil,
28+
),
29+
}
30+
c.items.Store([]MountItem(nil))
31+
return c
32+
}
33+
34+
func (c *MountItemCollector) Set(items []MountItem) {
35+
c.items.Store(append([]MountItem(nil), items...))
36+
}
37+
38+
func (c *MountItemCollector) Describe(ch chan<- *prometheus.Desc) {
39+
ch <- c.desc
40+
}
41+
42+
func (c *MountItemCollector) Collect(ch chan<- prometheus.Metric) {
43+
v := c.items.Load()
44+
if v == nil {
45+
return
46+
}
47+
items := v.([]MountItem)
48+
for _, it := range items {
49+
ch <- prometheus.MustNewConstMetric(
50+
c.desc,
51+
prometheus.GaugeValue,
52+
1,
53+
it.Reference,
54+
it.Type,
55+
it.VolumeName,
56+
it.MountID,
57+
)
58+
}
59+
}
60+
61+
var MountItems = NewMountItemCollector()

pkg/metrics/registry.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,12 @@ var (
8282
},
8383
)
8484

85+
NodeMountedInlineImages = prometheus.NewGauge(
86+
prometheus.GaugeOpts{
87+
Name: Prefix + "node_mounted_inline_images",
88+
},
89+
)
90+
8591
NodeMountedDynamicImages = prometheus.NewGauge(
8692
prometheus.GaugeOpts{
8793
Name: Prefix + "node_mounted_dynamic_images",
@@ -140,7 +146,9 @@ func NodePullOpObserve(op string, size int64, start time.Time, err error) {
140146
func init() {
141147
DummyRegistry.MustRegister()
142148

143-
DetailRegistry.MustRegister()
149+
DetailRegistry.MustRegister(
150+
MountItems,
151+
)
144152

145153
Registry.MustRegister(
146154
NodeNotReady,
@@ -156,6 +164,7 @@ func init() {
156164

157165
NodeCacheSizeInBytes,
158166
NodeMountedStaticImages,
167+
NodeMountedInlineImages,
159168
NodeMountedDynamicImages,
160169
NodePullLayerTooLong,
161170
)

pkg/server/server_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -558,7 +558,7 @@ func TestServer(t *testing.T) {
558558
require.NoError(t, err)
559559
cfg.Get().RootDir = rootDir
560560
cfg.Get().PullConfig.ProxyURL = ""
561-
service.CacheSacnInterval = 1 * time.Second
561+
service.CacheScanInterval = 1 * time.Second
562562

563563
service.NewPuller = func(ctx context.Context, pullCfg *config.PullConfig, hook *status.Hook, diskQuotaChecker *service.DiskQuotaChecker) service.Puller {
564564
return &mockPuller{

pkg/service/cache.go

Lines changed: 63 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -8,34 +8,29 @@ import (
88
"github.com/modelpack/model-csi-driver/pkg/config"
99
"github.com/modelpack/model-csi-driver/pkg/logger"
1010
"github.com/modelpack/model-csi-driver/pkg/metrics"
11+
"github.com/modelpack/model-csi-driver/pkg/status"
1112
"github.com/pkg/errors"
1213
)
1314

14-
var CacheSacnInterval = 60 * time.Second
15+
var CacheScanInterval = 60 * time.Second
1516

1617
type CacheManager struct {
1718
cfg *config.Config
19+
sm *status.StatusManager
1820
}
1921

2022
func (cm *CacheManager) getCacheSize() (int64, error) {
21-
var total int64
22-
if err := filepath.Walk(cm.cfg.Get().RootDir, func(path string, info os.FileInfo, err error) error {
23-
if err != nil {
24-
return err
25-
}
26-
if info.IsDir() {
27-
return nil
28-
}
29-
total += info.Size()
30-
return nil
31-
}); err != nil {
32-
return 0, err
23+
size, err := getUsedSize(cm.cfg.Get().RootDir)
24+
if err != nil {
25+
return 0, errors.Wrapf(err, "get used size: %s", cm.cfg.Get().RootDir)
3326
}
34-
return total, nil
27+
28+
return size, nil
3529
}
3630

3731
func (cm *CacheManager) scanModels() error {
3832
staticModels := 0
33+
inlineModels := 0
3934
dynamicModels := 0
4035
volumesDir := cm.cfg.Get().GetVolumesDir()
4136
volumeDirs, err := os.ReadDir(volumesDir)
@@ -45,29 +40,73 @@ func (cm *CacheManager) scanModels() error {
4540
}
4641
return errors.Wrapf(err, "read volume dirs from %s", volumesDir)
4742
}
43+
44+
mountItems := []metrics.MountItem{}
4845
for _, volumeDir := range volumeDirs {
4946
if !volumeDir.IsDir() {
5047
continue
5148
}
52-
if isStaticVolume(volumeDir.Name()) {
53-
staticModels += 1
49+
volumeName := volumeDir.Name()
50+
if isStaticVolume(volumeName) {
51+
statusPath := filepath.Join(volumesDir, volumeName, "status.json")
52+
modelStatus, err := cm.sm.Get(statusPath)
53+
if err == nil {
54+
mountItems = append(mountItems, metrics.MountItem{
55+
Reference: modelStatus.Reference,
56+
Type: "static",
57+
VolumeName: volumeName,
58+
MountID: modelStatus.MountID,
59+
})
60+
staticModels += 1
61+
}
5462
}
55-
if isDynamicVolume(volumeDir.Name()) {
56-
modelsDir := cm.cfg.Get().GetModelsDirForDynamic(volumeDir.Name())
63+
if isDynamicVolume(volumeName) {
64+
modelsDir := cm.cfg.Get().GetModelsDirForDynamic(volumeName)
5765
modelDirs, err := os.ReadDir(modelsDir)
58-
if err != nil {
59-
return errors.Wrapf(err, "read model dirs from %s", modelsDir)
66+
if err != nil {
67+
if os.IsNotExist(err) {
68+
// This is potentially an inline model, the status file is expected
69+
// to be directly under the volume directory.
70+
statusPath := filepath.Join(volumesDir, volumeName, "status.json")
71+
modelStatus, err := cm.sm.Get(statusPath)
72+
if err == nil {
73+
mountItems = append(mountItems, metrics.MountItem{
74+
Reference: modelStatus.Reference,
75+
Type: "inline",
76+
VolumeName: volumeName,
77+
MountID: modelStatus.MountID,
78+
})
79+
inlineModels += 1
80+
}
81+
continue
82+
}
83+
logger.Logger().WithError(err).Warnf("read model dirs from %s", modelsDir)
84+
continue
6085
}
6186
for _, modelDir := range modelDirs {
6287
if !modelDir.IsDir() {
6388
continue
6489
}
65-
dynamicModels += 1
90+
statusPath := filepath.Join(modelsDir, modelDir.Name(), "status.json")
91+
modelStatus, err := cm.sm.Get(statusPath)
92+
if err == nil {
93+
mountItems = append(mountItems, metrics.MountItem{
94+
Reference: modelStatus.Reference,
95+
Type: "dynamic",
96+
VolumeName: volumeName,
97+
MountID: modelStatus.MountID,
98+
})
99+
dynamicModels += 1
100+
}
66101
}
67102
}
68103
}
104+
105+
metrics.MountItems.Set(mountItems)
69106
metrics.NodeMountedStaticImages.Set(float64(staticModels))
107+
metrics.NodeMountedInlineImages.Set(float64(inlineModels))
70108
metrics.NodeMountedDynamicImages.Set(float64(dynamicModels))
109+
71110
return nil
72111
}
73112

@@ -87,17 +126,18 @@ func (cm *CacheManager) Scan() error {
87126
return nil
88127
}
89128

90-
func NewCacheManager(cfg *config.Config) (*CacheManager, error) {
129+
func NewCacheManager(cfg *config.Config, sm *status.StatusManager) (*CacheManager, error) {
91130
cm := CacheManager{
92131
cfg: cfg,
132+
sm: sm,
93133
}
94134

95135
go func() {
96136
for {
97137
if err := cm.Scan(); err != nil && !errors.Is(err, os.ErrNotExist) {
98138
logger.Logger().WithError(err).Warnf("scan cache failed")
99139
}
100-
time.Sleep(CacheSacnInterval)
140+
time.Sleep(CacheScanInterval)
101141
}
102142
}()
103143

pkg/service/cache_test.go

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
package service
2+
3+
import (
4+
"os"
5+
"path/filepath"
6+
"testing"
7+
8+
"github.com/modelpack/model-csi-driver/pkg/config"
9+
"github.com/modelpack/model-csi-driver/pkg/metrics"
10+
"github.com/modelpack/model-csi-driver/pkg/status"
11+
"github.com/prometheus/client_golang/prometheus"
12+
"github.com/prometheus/client_golang/prometheus/testutil"
13+
dto "github.com/prometheus/client_model/go"
14+
"github.com/stretchr/testify/require"
15+
)
16+
17+
func TestCacheManagerScanUpdatesMetrics(t *testing.T) {
18+
tempDir := t.TempDir()
19+
20+
rawCfg := &config.RawConfig{ServiceName: "test", RootDir: tempDir}
21+
cfg := config.NewWithRaw(rawCfg)
22+
23+
sm, err := status.NewStatusManager()
24+
require.NoError(t, err)
25+
26+
// Create a static volume status
27+
staticStatusPath := filepath.Join(tempDir, "volumes", "pvc-static", "status.json")
28+
_, err = sm.Set(staticStatusPath, status.Status{Reference: "ref-static", MountID: "m-static"})
29+
require.NoError(t, err)
30+
31+
// Create a dynamic volume status under models/<mountID>/status.json
32+
dynamicStatusPath := filepath.Join(tempDir, "volumes", "csi-dyn", "models", "mount-1", "status.json")
33+
_, err = sm.Set(dynamicStatusPath, status.Status{Reference: "ref-dyn", MountID: "mount-1"})
34+
require.NoError(t, err)
35+
36+
// An extra file to ensure cache size covers arbitrary files under RootDir.
37+
extraPath := filepath.Join(tempDir, "extra.bin")
38+
require.NoError(t, os.WriteFile(extraPath, []byte("abc"), 0o644))
39+
40+
paths := []string{staticStatusPath, dynamicStatusPath, extraPath}
41+
var expectedSize int64
42+
for _, p := range paths {
43+
st, statErr := os.Stat(p)
44+
require.NoError(t, statErr)
45+
expectedSize += st.Size()
46+
}
47+
48+
cm := &CacheManager{cfg: cfg, sm: sm}
49+
require.NoError(t, cm.Scan())
50+
51+
require.Equal(t, float64(expectedSize), testutil.ToFloat64(metrics.NodeCacheSizeInBytes))
52+
require.Equal(t, float64(1), testutil.ToFloat64(metrics.NodeMountedStaticImages))
53+
require.Equal(t, float64(0), testutil.ToFloat64(metrics.NodeMountedInlineImages))
54+
require.Equal(t, float64(1), testutil.ToFloat64(metrics.NodeMountedDynamicImages))
55+
56+
// Verify mount item metrics are exported as a snapshot without Reset/Delete races.
57+
reg := prometheus.NewRegistry()
58+
reg.MustRegister(metrics.MountItems)
59+
60+
mfs, err := reg.Gather()
61+
require.NoError(t, err)
62+
63+
mf := findMetricFamily(t, mfs, metrics.Prefix+"mount_item")
64+
require.Len(t, mf.Metric, 2)
65+
66+
staticLabels := map[string]string{
67+
"reference": "ref-static",
68+
"type": "static",
69+
"volume_name": "pvc-static",
70+
"mount_id": "m-static",
71+
}
72+
dynamicLabels := map[string]string{
73+
"reference": "ref-dyn",
74+
"type": "dynamic",
75+
"volume_name": "csi-dyn",
76+
"mount_id": "mount-1",
77+
}
78+
79+
var foundStatic, foundDynamic bool
80+
for _, m := range mf.Metric {
81+
if hasLabels(m, staticLabels) {
82+
foundStatic = true
83+
}
84+
if hasLabels(m, dynamicLabels) {
85+
foundDynamic = true
86+
}
87+
}
88+
require.True(t, foundStatic, "static mount item metric not found")
89+
require.True(t, foundDynamic, "dynamic mount item metric not found")
90+
}
91+
92+
func findMetricFamily(t *testing.T, mfs []*dto.MetricFamily, name string) *dto.MetricFamily {
93+
t.Helper()
94+
for _, mf := range mfs {
95+
if mf.GetName() == name {
96+
return mf
97+
}
98+
}
99+
require.FailNow(t, "metric family not found", name)
100+
return nil
101+
}
102+
103+
func hasLabels(m *dto.Metric, want map[string]string) bool {
104+
labels := map[string]string{}
105+
for _, lp := range m.GetLabel() {
106+
labels[lp.GetName()] = lp.GetValue()
107+
}
108+
for k, v := range want {
109+
if labels[k] != v {
110+
return false
111+
}
112+
}
113+
return true
114+
}

pkg/service/service.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ func New(cfg *config.Config) (*Service, error) {
7575
if err != nil {
7676
return nil, errors.Wrap(err, "create worker")
7777
}
78-
cm, err := NewCacheManager(cfg)
78+
cm, err := NewCacheManager(cfg, sm)
7979
if err != nil {
8080
return nil, errors.Wrap(err, "create cache manager")
8181
}

0 commit comments

Comments
 (0)