diff --git a/go.mod b/go.mod index cd00b3f..94b8f15 100644 --- a/go.mod +++ b/go.mod @@ -18,6 +18,7 @@ require ( github.com/opencontainers/image-spec v1.1.1 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.22.0 + github.com/prometheus/client_model v0.6.2 github.com/rexray/gocsi v1.2.2 github.com/sirupsen/logrus v1.9.3 github.com/stretchr/testify v1.11.1 @@ -86,6 +87,7 @@ require ( github.com/kevinburke/ssh_config v1.2.0 // indirect github.com/klauspost/compress v1.18.0 // indirect github.com/klauspost/cpuid/v2 v2.3.0 // indirect + github.com/kylelemons/godebug v1.1.0 // indirect github.com/labstack/gommon v0.4.2 // indirect github.com/libgit2/git2go/v34 v34.0.0 // indirect github.com/mailru/easyjson v0.7.7 // indirect @@ -99,7 +101,6 @@ require ( github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/pjbgf/sha1cd v0.3.2 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect - github.com/prometheus/client_model v0.6.2 // indirect github.com/prometheus/common v0.63.0 // indirect github.com/prometheus/procfs v0.16.1 // indirect github.com/rivo/uniseg v0.4.7 // indirect diff --git a/pkg/metrics/mount_collector.go b/pkg/metrics/mount_collector.go new file mode 100644 index 0000000..1c3d563 --- /dev/null +++ b/pkg/metrics/mount_collector.go @@ -0,0 +1,61 @@ +package metrics + +import ( + "sync/atomic" + + "github.com/prometheus/client_golang/prometheus" +) + +type MountItem struct { + Reference string + Type string + VolumeName string + MountID string +} + +type MountItemCollector struct { + desc *prometheus.Desc + items atomic.Value // stores []MountItem +} + +func NewMountItemCollector() *MountItemCollector { + c := &MountItemCollector{ + desc: prometheus.NewDesc( + Prefix+"mount_item", + "Mounted item list (pvc, inline, dynamic types), value is always 1 for existing items.", + []string{"reference", "type", "volume_name", "mount_id"}, + nil, + ), + } + c.items.Store([]MountItem(nil)) + return c +} + +func (c *MountItemCollector) Set(items []MountItem) { + c.items.Store(append([]MountItem(nil), items...)) +} + +func (c *MountItemCollector) Describe(ch chan<- *prometheus.Desc) { + ch <- c.desc +} + +func (c *MountItemCollector) Collect(ch chan<- prometheus.Metric) { + v := c.items.Load() + if v == nil { + return + } + items := v.([]MountItem) + for _, it := range items { + ch <- prometheus.MustNewConstMetric( + c.desc, + prometheus.GaugeValue, + 1, + it.Reference, + it.Type, + it.VolumeName, + it.MountID, + ) + } +} + +var MountItems = NewMountItemCollector() diff --git a/pkg/metrics/registry.go b/pkg/metrics/registry.go index 38e393b..54e1091 100644 --- a/pkg/metrics/registry.go +++ b/pkg/metrics/registry.go @@ -76,15 +76,21 @@ var ( }, ) - NodeMountedStaticImages = prometheus.NewGauge( + NodeMountedPVCModels = prometheus.NewGauge( prometheus.GaugeOpts{ - Name: Prefix + "node_mounted_static_images", + Name: Prefix + "node_mounted_pvc_models", }, ) - NodeMountedDynamicImages = prometheus.NewGauge( + NodeMountedInlineModels = prometheus.NewGauge( prometheus.GaugeOpts{ - Name: Prefix + "node_mounted_dynamic_images", + Name: Prefix + "node_mounted_inline_models", + }, + ) + + NodeMountedDynamicModels = prometheus.NewGauge( + prometheus.GaugeOpts{ + Name: Prefix + "node_mounted_dynamic_models", }, ) @@ -140,7 +146,9 @@ func NodePullOpObserve(op string, size int64, start time.Time, err error) { func init() { DummyRegistry.MustRegister() - DetailRegistry.MustRegister() + DetailRegistry.MustRegister( + MountItems, + ) Registry.MustRegister( NodeNotReady, @@ -155,8 +163,9 @@ func init() { ControllerOpLatency, NodeCacheSizeInBytes, - NodeMountedStaticImages, - NodeMountedDynamicImages, + NodeMountedPVCModels, + NodeMountedInlineModels, + NodeMountedDynamicModels, NodePullLayerTooLong, ) } diff --git a/pkg/server/server_test.go b/pkg/server/server_test.go index d6925e4..528fff8 100644 --- a/pkg/server/server_test.go +++ b/pkg/server/server_test.go @@ -558,7 +558,7 @@ func TestServer(t *testing.T) { require.NoError(t, err) cfg.Get().RootDir = rootDir cfg.Get().PullConfig.ProxyURL = "" - service.CacheSacnInterval = 1 * time.Second + service.CacheScanInterval = 1 * time.Second service.NewPuller = func(ctx context.Context, pullCfg *config.PullConfig, hook *status.Hook, diskQuotaChecker *service.DiskQuotaChecker) service.Puller { return &mockPuller{ diff --git a/pkg/service/cache.go b/pkg/service/cache.go index b75853b..63a83fb 100644 --- a/pkg/service/cache.go +++ b/pkg/service/cache.go @@ -8,34 +8,35 @@ import ( "github.com/modelpack/model-csi-driver/pkg/config" "github.com/modelpack/model-csi-driver/pkg/logger" "github.com/modelpack/model-csi-driver/pkg/metrics" + "github.com/modelpack/model-csi-driver/pkg/status" "github.com/pkg/errors" ) -var CacheSacnInterval = 60 * time.Second +var CacheScanInterval = 60 * time.Second + +const ( + mountTypePVC = "pvc" + mountTypeInline = "inline" + mountTypeDynamic = "dynamic" +) type CacheManager struct { cfg *config.Config + sm *status.StatusManager } func (cm *CacheManager) getCacheSize() (int64, error) { - var total int64 - if err := filepath.Walk(cm.cfg.Get().RootDir, func(path string, info os.FileInfo, err error) error { - if err != nil { - return err - } - if info.IsDir() { - return nil - } - total += info.Size() - return nil - }); err != nil { - return 0, err + size, err := getUsedSize(cm.cfg.Get().RootDir) + if err != nil { + return 0, errors.Wrapf(err, "get used size: %s", cm.cfg.Get().RootDir) } - return total, nil + + return size, nil } func (cm *CacheManager) scanModels() error { - staticModels := 0 + pvcModels := 0 + inlineModels := 0 dynamicModels := 0 volumesDir := cm.cfg.Get().GetVolumesDir() volumeDirs, err := os.ReadDir(volumesDir) @@ -45,29 +46,73 @@ func (cm *CacheManager) scanModels() error { } return errors.Wrapf(err, "read volume dirs from %s", volumesDir) } + + mountItems := []metrics.MountItem{} for _, volumeDir := range volumeDirs { if !volumeDir.IsDir() { continue } - if isStaticVolume(volumeDir.Name()) { - staticModels += 1 + volumeName := volumeDir.Name() + if isStaticVolume(volumeName) { + statusPath := filepath.Join(volumesDir, volumeName, "status.json") + modelStatus, err := cm.sm.Get(statusPath) + if err == nil { + mountItems = append(mountItems, metrics.MountItem{ + Reference: modelStatus.Reference, + Type: mountTypePVC, + VolumeName: volumeName, + MountID: modelStatus.MountID, + }) + pvcModels += 1 + } } - if isDynamicVolume(volumeDir.Name()) { - modelsDir := cm.cfg.Get().GetModelsDirForDynamic(volumeDir.Name()) + if isDynamicVolume(volumeName) { + modelsDir := cm.cfg.Get().GetModelsDirForDynamic(volumeName) modelDirs, err := os.ReadDir(modelsDir) - if err != nil { - return errors.Wrapf(err, "read model dirs from %s", modelsDir) + if err != nil { + if os.IsNotExist(err) { + // This is potentially an inline model, the status file is expected + // to be directly under the volume directory. + statusPath := filepath.Join(volumesDir, volumeName, "status.json") + modelStatus, err := cm.sm.Get(statusPath) + if err == nil { + mountItems = append(mountItems, metrics.MountItem{ + Reference: modelStatus.Reference, + Type: mountTypeInline, + VolumeName: volumeName, + MountID: modelStatus.MountID, + }) + inlineModels += 1 + } + continue + } + logger.Logger().WithError(err).Warnf("read model dirs from %s", modelsDir) + continue } for _, modelDir := range modelDirs { if !modelDir.IsDir() { continue } - dynamicModels += 1 + statusPath := filepath.Join(modelsDir, modelDir.Name(), "status.json") + modelStatus, err := cm.sm.Get(statusPath) + if err == nil { + mountItems = append(mountItems, metrics.MountItem{ + Reference: modelStatus.Reference, + Type: mountTypeDynamic, + VolumeName: volumeName, + MountID: modelStatus.MountID, + }) + dynamicModels += 1 + } } } } - metrics.NodeMountedStaticImages.Set(float64(staticModels)) - metrics.NodeMountedDynamicImages.Set(float64(dynamicModels)) + + metrics.MountItems.Set(mountItems) + metrics.NodeMountedPVCModels.Set(float64(pvcModels)) + metrics.NodeMountedInlineModels.Set(float64(inlineModels)) + metrics.NodeMountedDynamicModels.Set(float64(dynamicModels)) + return nil } @@ -87,9 +132,10 @@ func (cm *CacheManager) Scan() error { return nil } -func NewCacheManager(cfg *config.Config) (*CacheManager, error) { +func NewCacheManager(cfg *config.Config, sm *status.StatusManager) (*CacheManager, error) { cm := CacheManager{ cfg: cfg, + sm: sm, } go func() { @@ -97,7 +143,7 @@ func NewCacheManager(cfg *config.Config) (*CacheManager, error) { if err := cm.Scan(); err != nil && !errors.Is(err, os.ErrNotExist) { logger.Logger().WithError(err).Warnf("scan cache failed") } - time.Sleep(CacheSacnInterval) + time.Sleep(CacheScanInterval) } }() diff --git a/pkg/service/cache_test.go b/pkg/service/cache_test.go new file mode 100644 index 0000000..e05d183 --- /dev/null +++ b/pkg/service/cache_test.go @@ -0,0 +1,109 @@ +package service + +import ( + "os" + "path/filepath" + "testing" + + "github.com/modelpack/model-csi-driver/pkg/config" + "github.com/modelpack/model-csi-driver/pkg/metrics" + "github.com/modelpack/model-csi-driver/pkg/status" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" + dto "github.com/prometheus/client_model/go" + "github.com/stretchr/testify/require" +) + +func TestCacheManagerScanUpdatesMetrics(t *testing.T) { + tempDir := t.TempDir() + + rawCfg := &config.RawConfig{ServiceName: "test", RootDir: tempDir} + cfg := config.NewWithRaw(rawCfg) + + sm, err := status.NewStatusManager() + require.NoError(t, err) + + // Create a pvc volume status + pvcStatusPath := filepath.Join(tempDir, "volumes", "pvc-static", "status.json") + _, err = sm.Set(pvcStatusPath, status.Status{Reference: "ref-pvc", MountID: ""}) + require.NoError(t, err) + + // Create a dynamic volume status under models//status.json + dynamicStatusPath := filepath.Join(tempDir, "volumes", "csi-dyn", "models", "mount-1", "status.json") + _, err = sm.Set(dynamicStatusPath, status.Status{Reference: "ref-dyn", MountID: "mount-1"}) + require.NoError(t, err) + + // An extra file to ensure cache size covers arbitrary files under RootDir. + extraPath := filepath.Join(tempDir, "extra.bin") + require.NoError(t, os.WriteFile(extraPath, []byte("abc"), 0o644)) + + expectedSize, err := getUsedSize(rawCfg.RootDir) + require.NoError(t, err) + + cm := &CacheManager{cfg: cfg, sm: sm} + require.NoError(t, cm.Scan()) + + require.Equal(t, float64(expectedSize), testutil.ToFloat64(metrics.NodeCacheSizeInBytes)) + require.Equal(t, float64(1), testutil.ToFloat64(metrics.NodeMountedPVCModels)) + require.Equal(t, float64(0), testutil.ToFloat64(metrics.NodeMountedInlineModels)) + require.Equal(t, float64(1), testutil.ToFloat64(metrics.NodeMountedDynamicModels)) + + // Verify mount item metrics are exported as a snapshot without Reset/Delete races. + reg := prometheus.NewRegistry() + reg.MustRegister(metrics.MountItems) + + mfs, err := reg.Gather() + require.NoError(t, err) + + mf := findMetricFamily(t, mfs, metrics.Prefix+"mount_item") + require.Len(t, mf.Metric, 2) + + pvcLabels := map[string]string{ + "reference": "ref-pvc", + "type": "pvc", + "volume_name": "pvc-static", + "mount_id": "", + } + dynamicLabels := map[string]string{ + "reference": "ref-dyn", + "type": "dynamic", + "volume_name": "csi-dyn", + "mount_id": "mount-1", + } + + var foundPVC, foundDynamic bool + for _, m := range mf.Metric { + if hasLabels(m, pvcLabels) { + foundPVC = true + } + if hasLabels(m, dynamicLabels) { + foundDynamic = true + } + } + require.True(t, foundPVC, "pvc mount item metric not found") + require.True(t, foundDynamic, "dynamic mount item metric not found") +} + +func findMetricFamily(t *testing.T, mfs []*dto.MetricFamily, name string) *dto.MetricFamily { + t.Helper() + for _, mf := range mfs { + if mf.GetName() == name { + return mf + } + } + require.FailNow(t, "metric family not found", name) + return nil +} + +func hasLabels(m *dto.Metric, want map[string]string) bool { + labels := map[string]string{} + for _, lp := range m.GetLabel() { + labels[lp.GetName()] = lp.GetValue() + } + for k, v := range want { + if labels[k] != v { + return false + } + } + return true +} diff --git a/pkg/service/service.go b/pkg/service/service.go index 83463f0..73b0257 100644 --- a/pkg/service/service.go +++ b/pkg/service/service.go @@ -75,7 +75,7 @@ func New(cfg *config.Config) (*Service, error) { if err != nil { return nil, errors.Wrap(err, "create worker") } - cm, err := NewCacheManager(cfg) + cm, err := NewCacheManager(cfg, sm) if err != nil { return nil, errors.Wrap(err, "create cache manager") }