Skip to content

Commit 31248fe

Browse files
committed
metrics: collect mount items for detail metrics
Introduce a new Prometheus metric for tracking detailed information about mounted items and refines the cache manager to export richer metrics. It also adds corresponding tests to ensure correct metric collection. The changes enhance observability of mounted volumes, distinguishing between static, dynamic, and inline models. Signed-off-by: imeoer <[email protected]>
1 parent 60f684c commit 31248fe

File tree

6 files changed

+244
-14
lines changed

6 files changed

+244
-14
lines changed

go.mod

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ require (
1818
github.com/opencontainers/image-spec v1.1.1
1919
github.com/pkg/errors v0.9.1
2020
github.com/prometheus/client_golang v1.22.0
21+
github.com/prometheus/client_model v0.6.2
2122
github.com/rexray/gocsi v1.2.2
2223
github.com/sirupsen/logrus v1.9.3
2324
github.com/stretchr/testify v1.11.1
@@ -86,6 +87,7 @@ require (
8687
github.com/kevinburke/ssh_config v1.2.0 // indirect
8788
github.com/klauspost/compress v1.18.0 // indirect
8889
github.com/klauspost/cpuid/v2 v2.3.0 // indirect
90+
github.com/kylelemons/godebug v1.1.0 // indirect
8991
github.com/labstack/gommon v0.4.2 // indirect
9092
github.com/libgit2/git2go/v34 v34.0.0 // indirect
9193
github.com/mailru/easyjson v0.7.7 // indirect
@@ -99,7 +101,6 @@ require (
99101
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
100102
github.com/pjbgf/sha1cd v0.3.2 // indirect
101103
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
102-
github.com/prometheus/client_model v0.6.2 // indirect
103104
github.com/prometheus/common v0.63.0 // indirect
104105
github.com/prometheus/procfs v0.16.1 // indirect
105106
github.com/rivo/uniseg v0.4.7 // indirect

pkg/metrics/mount_collector.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package metrics
2+
3+
import (
4+
"sync/atomic"
5+
6+
"github.com/prometheus/client_golang/prometheus"
7+
)
8+
9+
type MountItem struct {
10+
Reference string
11+
Type string
12+
VolumeName string
13+
MountID string
14+
}
15+
16+
type MountItemCollector struct {
17+
desc *prometheus.Desc
18+
items atomic.Value // stores []MountItem
19+
}
20+
21+
func NewMountItemCollector() *MountItemCollector {
22+
c := &MountItemCollector{
23+
desc: prometheus.NewDesc(
24+
Prefix+"mount_item",
25+
"Mounted item list (static snapshot); value is always 1 for existing items.",
26+
[]string{"reference", "type", "volume_name", "mount_id"},
27+
nil,
28+
),
29+
}
30+
c.items.Store([]MountItem(nil))
31+
return c
32+
}
33+
34+
func (c *MountItemCollector) Set(items []MountItem) {
35+
c.items.Store(append([]MountItem(nil), items...))
36+
}
37+
38+
func (c *MountItemCollector) Describe(ch chan<- *prometheus.Desc) {
39+
ch <- c.desc
40+
}
41+
42+
func (c *MountItemCollector) Collect(ch chan<- prometheus.Metric) {
43+
v := c.items.Load()
44+
if v == nil {
45+
return
46+
}
47+
items := v.([]MountItem)
48+
for _, it := range items {
49+
ch <- prometheus.MustNewConstMetric(
50+
c.desc,
51+
prometheus.GaugeValue,
52+
1,
53+
it.Reference,
54+
it.Type,
55+
it.VolumeName,
56+
it.MountID,
57+
)
58+
}
59+
}
60+
61+
var MountItems = NewMountItemCollector()

pkg/metrics/registry.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,12 @@ var (
8282
},
8383
)
8484

85+
NodeMountedInlineImages = prometheus.NewGauge(
86+
prometheus.GaugeOpts{
87+
Name: Prefix + "node_mounted_inline_images",
88+
},
89+
)
90+
8591
NodeMountedDynamicImages = prometheus.NewGauge(
8692
prometheus.GaugeOpts{
8793
Name: Prefix + "node_mounted_dynamic_images",
@@ -140,7 +146,9 @@ func NodePullOpObserve(op string, size int64, start time.Time, err error) {
140146
func init() {
141147
DummyRegistry.MustRegister()
142148

143-
DetailRegistry.MustRegister()
149+
DetailRegistry.MustRegister(
150+
MountItems,
151+
)
144152

145153
Registry.MustRegister(
146154
NodeNotReady,
@@ -156,6 +164,7 @@ func init() {
156164

157165
NodeCacheSizeInBytes,
158166
NodeMountedStaticImages,
167+
NodeMountedInlineImages,
159168
NodeMountedDynamicImages,
160169
NodePullLayerTooLong,
161170
)

pkg/service/cache.go

Lines changed: 57 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,15 @@ import (
88
"github.com/modelpack/model-csi-driver/pkg/config"
99
"github.com/modelpack/model-csi-driver/pkg/logger"
1010
"github.com/modelpack/model-csi-driver/pkg/metrics"
11+
"github.com/modelpack/model-csi-driver/pkg/status"
1112
"github.com/pkg/errors"
1213
)
1314

14-
var CacheSacnInterval = 60 * time.Second
15+
const CacheScanInterval = 60 * time.Second
1516

1617
type CacheManager struct {
1718
cfg *config.Config
19+
sm *status.StatusManager
1820
}
1921

2022
func (cm *CacheManager) getCacheSize() (int64, error) {
@@ -36,6 +38,7 @@ func (cm *CacheManager) getCacheSize() (int64, error) {
3638

3739
func (cm *CacheManager) scanModels() error {
3840
staticModels := 0
41+
inlineModels := 0
3942
dynamicModels := 0
4043
volumesDir := cm.cfg.Get().GetVolumesDir()
4144
volumeDirs, err := os.ReadDir(volumesDir)
@@ -45,29 +48,71 @@ func (cm *CacheManager) scanModels() error {
4548
}
4649
return errors.Wrapf(err, "read volume dirs from %s", volumesDir)
4750
}
51+
52+
mountItems := []metrics.MountItem{}
4853
for _, volumeDir := range volumeDirs {
4954
if !volumeDir.IsDir() {
5055
continue
5156
}
52-
if isStaticVolume(volumeDir.Name()) {
53-
staticModels += 1
57+
volumeName := volumeDir.Name()
58+
if isStaticVolume(volumeName) {
59+
statusPath := filepath.Join(volumesDir, volumeName, "status.json")
60+
modelStatus, err := cm.sm.Get(statusPath)
61+
if err == nil {
62+
mountItems = append(mountItems, metrics.MountItem{
63+
Reference: modelStatus.Reference,
64+
Type: "static",
65+
VolumeName: volumeName,
66+
MountID: modelStatus.MountID,
67+
})
68+
staticModels += 1
69+
}
5470
}
55-
if isDynamicVolume(volumeDir.Name()) {
56-
modelsDir := cm.cfg.Get().GetModelsDirForDynamic(volumeDir.Name())
71+
if isDynamicVolume(volumeName) {
72+
modelsDir := cm.cfg.Get().GetModelsDirForDynamic(volumeName)
5773
modelDirs, err := os.ReadDir(modelsDir)
5874
if err != nil {
59-
return errors.Wrapf(err, "read model dirs from %s", modelsDir)
75+
if os.IsNotExist(err) {
76+
statusPath := filepath.Join(modelsDir, "status.json")
77+
modelStatus, err := cm.sm.Get(statusPath)
78+
if err == nil {
79+
mountItems = append(mountItems, metrics.MountItem{
80+
Reference: modelStatus.Reference,
81+
Type: "inline",
82+
VolumeName: volumeName,
83+
MountID: modelStatus.MountID,
84+
})
85+
inlineModels += 1
86+
}
87+
continue
88+
}
89+
logger.Logger().WithError(err).Warnf("read model dirs from %s", modelsDir)
90+
continue
6091
}
61-
for _, modelDir := range modelDirs {
62-
if !modelDir.IsDir() {
92+
for _, modelID := range modelDirs {
93+
if !modelID.IsDir() {
6394
continue
6495
}
65-
dynamicModels += 1
96+
statusPath := filepath.Join(modelsDir, modelID.Name(), "status.json")
97+
modelStatus, err := cm.sm.Get(statusPath)
98+
if err == nil {
99+
mountItems = append(mountItems, metrics.MountItem{
100+
Reference: modelStatus.Reference,
101+
Type: "dynamic",
102+
VolumeName: volumeName,
103+
MountID: modelStatus.MountID,
104+
})
105+
dynamicModels += 1
106+
}
66107
}
67108
}
68109
}
110+
111+
metrics.MountItems.Set(mountItems)
69112
metrics.NodeMountedStaticImages.Set(float64(staticModels))
113+
metrics.NodeMountedInlineImages.Set(float64(inlineModels))
70114
metrics.NodeMountedDynamicImages.Set(float64(dynamicModels))
115+
71116
return nil
72117
}
73118

@@ -87,17 +132,18 @@ func (cm *CacheManager) Scan() error {
87132
return nil
88133
}
89134

90-
func NewCacheManager(cfg *config.Config) (*CacheManager, error) {
135+
func NewCacheManager(cfg *config.Config, sm *status.StatusManager) (*CacheManager, error) {
91136
cm := CacheManager{
92137
cfg: cfg,
138+
sm: sm,
93139
}
94140

95141
go func() {
96142
for {
97143
if err := cm.Scan(); err != nil && !errors.Is(err, os.ErrNotExist) {
98144
logger.Logger().WithError(err).Warnf("scan cache failed")
99145
}
100-
time.Sleep(CacheSacnInterval)
146+
time.Sleep(CacheScanInterval)
101147
}
102148
}()
103149

pkg/service/cache_test.go

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
package service
2+
3+
import (
4+
"os"
5+
"path/filepath"
6+
"testing"
7+
8+
"github.com/modelpack/model-csi-driver/pkg/config"
9+
"github.com/modelpack/model-csi-driver/pkg/metrics"
10+
"github.com/modelpack/model-csi-driver/pkg/status"
11+
"github.com/prometheus/client_golang/prometheus"
12+
"github.com/prometheus/client_golang/prometheus/testutil"
13+
dto "github.com/prometheus/client_model/go"
14+
"github.com/stretchr/testify/require"
15+
)
16+
17+
func TestCacheManagerScanUpdatesMetrics(t *testing.T) {
18+
tempDir := t.TempDir()
19+
20+
rawCfg := &config.RawConfig{ServiceName: "test", RootDir: tempDir}
21+
cfg := config.NewWithRaw(rawCfg)
22+
23+
sm, err := status.NewStatusManager()
24+
require.NoError(t, err)
25+
26+
// Create a static volume status
27+
staticStatusPath := filepath.Join(tempDir, "volumes", "pvc-static", "status.json")
28+
_, err = sm.Set(staticStatusPath, status.Status{Reference: "ref-static", MountID: "m-static"})
29+
require.NoError(t, err)
30+
31+
// Create a dynamic volume status under models/<mountID>/status.json
32+
dynamicStatusPath := filepath.Join(tempDir, "volumes", "csi-dyn", "models", "mount-1", "status.json")
33+
_, err = sm.Set(dynamicStatusPath, status.Status{Reference: "ref-dyn", MountID: "mount-1"})
34+
require.NoError(t, err)
35+
36+
// An extra file to ensure cache size covers arbitrary files under RootDir.
37+
extraPath := filepath.Join(tempDir, "extra.bin")
38+
require.NoError(t, os.WriteFile(extraPath, []byte("abc"), 0o644))
39+
40+
paths := []string{staticStatusPath, dynamicStatusPath, extraPath}
41+
var expectedSize int64
42+
for _, p := range paths {
43+
st, statErr := os.Stat(p)
44+
require.NoError(t, statErr)
45+
expectedSize += st.Size()
46+
}
47+
48+
cm := &CacheManager{cfg: cfg, sm: sm}
49+
require.NoError(t, cm.Scan())
50+
51+
require.Equal(t, float64(expectedSize), testutil.ToFloat64(metrics.NodeCacheSizeInBytes))
52+
require.Equal(t, float64(1), testutil.ToFloat64(metrics.NodeMountedStaticImages))
53+
require.Equal(t, float64(0), testutil.ToFloat64(metrics.NodeMountedInlineImages))
54+
require.Equal(t, float64(1), testutil.ToFloat64(metrics.NodeMountedDynamicImages))
55+
56+
// Verify mount item metrics are exported as a snapshot without Reset/Delete races.
57+
reg := prometheus.NewRegistry()
58+
reg.MustRegister(metrics.MountItems)
59+
60+
mfs, err := reg.Gather()
61+
require.NoError(t, err)
62+
63+
mf := findMetricFamily(t, mfs, metrics.Prefix+"mount_item")
64+
require.Len(t, mf.Metric, 2)
65+
66+
require.True(t, hasLabels(mf.Metric[0], map[string]string{
67+
"reference": "ref-static",
68+
"type": "static",
69+
"volume_name": "pvc-static",
70+
"mount_id": "m-static",
71+
}) || hasLabels(mf.Metric[1], map[string]string{
72+
"reference": "ref-static",
73+
"type": "static",
74+
"volume_name": "pvc-static",
75+
"mount_id": "m-static",
76+
}))
77+
78+
require.True(t, hasLabels(mf.Metric[0], map[string]string{
79+
"reference": "ref-dyn",
80+
"type": "dynamic",
81+
"volume_name": "csi-dyn",
82+
"mount_id": "mount-1",
83+
}) || hasLabels(mf.Metric[1], map[string]string{
84+
"reference": "ref-dyn",
85+
"type": "dynamic",
86+
"volume_name": "csi-dyn",
87+
"mount_id": "mount-1",
88+
}))
89+
}
90+
91+
func findMetricFamily(t *testing.T, mfs []*dto.MetricFamily, name string) *dto.MetricFamily {
92+
t.Helper()
93+
for _, mf := range mfs {
94+
if mf.GetName() == name {
95+
return mf
96+
}
97+
}
98+
require.FailNow(t, "metric family not found", name)
99+
return nil
100+
}
101+
102+
func hasLabels(m *dto.Metric, want map[string]string) bool {
103+
labels := map[string]string{}
104+
for _, lp := range m.GetLabel() {
105+
labels[lp.GetName()] = lp.GetValue()
106+
}
107+
for k, v := range want {
108+
if labels[k] != v {
109+
return false
110+
}
111+
}
112+
return true
113+
}

pkg/service/service.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ func New(cfg *config.Config) (*Service, error) {
7575
if err != nil {
7676
return nil, errors.Wrap(err, "create worker")
7777
}
78-
cm, err := NewCacheManager(cfg)
78+
cm, err := NewCacheManager(cfg, sm)
7979
if err != nil {
8080
return nil, errors.Wrap(err, "create cache manager")
8181
}

0 commit comments

Comments
 (0)