Skip to content

Commit ae799a0

Browse files
authored
Merge pull request #440 from ggriffiths/release-3.0_cherrypick_941821bf99ec8d30e7183d67decce4b5dd9bfee6
Cherry-pick:Add snapshot controller metrics
2 parents c0bbf76 + a09b562 commit ae799a0

File tree

7 files changed

+659
-284
lines changed

7 files changed

+659
-284
lines changed

cmd/snapshot-controller/main.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"fmt"
2323
"os"
2424
"os/signal"
25+
"sync"
2526
"time"
2627

2728
"k8s.io/client-go/kubernetes"
@@ -32,6 +33,7 @@ import (
3233

3334
"github.com/kubernetes-csi/csi-lib-utils/leaderelection"
3435
controller "github.com/kubernetes-csi/external-snapshotter/v3/pkg/common-controller"
36+
"github.com/kubernetes-csi/external-snapshotter/v3/pkg/metrics"
3537

3638
clientset "github.com/kubernetes-csi/external-snapshotter/client/v3/clientset/versioned"
3739
snapshotscheme "github.com/kubernetes-csi/external-snapshotter/client/v3/clientset/versioned/scheme"
@@ -48,6 +50,9 @@ var (
4850

4951
leaderElection = flag.Bool("leader-election", false, "Enables leader election.")
5052
leaderElectionNamespace = flag.String("leader-election-namespace", "", "The namespace where the leader election resource exists. Defaults to the pod namespace if not set.")
53+
54+
httpEndpoint = flag.String("http-endpoint", "", "The TCP network address where the HTTP server for diagnostics, including metrics, will listen (example: :8080). The default is empty string, which means the server is disabled.")
55+
metricsPath = flag.String("metrics-path", "/metrics", "The HTTP path where prometheus metrics will be exposed. Default is `/metrics`.")
5156
)
5257

5358
var (
@@ -87,6 +92,28 @@ func main() {
8792
factory := informers.NewSharedInformerFactory(snapClient, *resyncPeriod)
8893
coreFactory := coreinformers.NewSharedInformerFactory(kubeClient, *resyncPeriod)
8994

95+
// Create and register metrics manager
96+
metricsManager := metrics.NewMetricsManager()
97+
wg := &sync.WaitGroup{}
98+
wg.Add(1)
99+
if *httpEndpoint != "" {
100+
srv, err := metricsManager.StartMetricsEndpoint(*metricsPath, *httpEndpoint, promklog{}, wg)
101+
if err != nil {
102+
klog.Errorf("Failed to start metrics server: %s", err.Error())
103+
os.Exit(1)
104+
}
105+
defer func() {
106+
err := srv.Shutdown(context.Background())
107+
if err != nil {
108+
klog.Errorf("Failed to shutdown metrics server: %s", err.Error())
109+
}
110+
111+
klog.Infof("Metrics server successfully shutdown")
112+
wg.Done()
113+
}()
114+
klog.Infof("Metrics server successfully started on %s, %s", *httpEndpoint, *metricsPath)
115+
}
116+
90117
// Add Snapshot types to the default Kubernetes so events can be logged for them
91118
snapshotscheme.AddToScheme(scheme.Scheme)
92119

@@ -99,6 +126,7 @@ func main() {
99126
factory.Snapshot().V1beta1().VolumeSnapshotContents(),
100127
factory.Snapshot().V1beta1().VolumeSnapshotClasses(),
101128
coreFactory.Core().V1().PersistentVolumeClaims(),
129+
metricsManager,
102130
*resyncPeriod,
103131
)
104132

@@ -142,3 +170,9 @@ func buildConfig(kubeconfig string) (*rest.Config, error) {
142170
}
143171
return rest.InClusterConfig()
144172
}
173+
174+
type promklog struct{}
175+
176+
func (pl promklog) Println(v ...interface{}) {
177+
klog.Error(v...)
178+
}

pkg/common-controller/framework_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
snapshotscheme "github.com/kubernetes-csi/external-snapshotter/client/v3/clientset/versioned/scheme"
3535
informers "github.com/kubernetes-csi/external-snapshotter/client/v3/informers/externalversions"
3636
storagelisters "github.com/kubernetes-csi/external-snapshotter/client/v3/listers/volumesnapshot/v1beta1"
37+
"github.com/kubernetes-csi/external-snapshotter/v3/pkg/metrics"
3738
"github.com/kubernetes-csi/external-snapshotter/v3/pkg/utils"
3839
v1 "k8s.io/api/core/v1"
3940
storagev1 "k8s.io/api/storage/v1"
@@ -750,6 +751,10 @@ func newTestController(kubeClient kubernetes.Interface, clientset clientset.Inte
750751
}
751752

752753
coreFactory := coreinformers.NewSharedInformerFactory(kubeClient, utils.NoResyncPeriodFunc())
754+
metricsManager := metrics.NewMetricsManager()
755+
wg := &sync.WaitGroup{}
756+
wg.Add(1)
757+
metricsManager.StartMetricsEndpoint("/metrics", "localhost:0", nil, wg)
753758

754759
ctrl := NewCSISnapshotCommonController(
755760
clientset,
@@ -758,6 +763,7 @@ func newTestController(kubeClient kubernetes.Interface, clientset clientset.Inte
758763
informerFactory.Snapshot().V1beta1().VolumeSnapshotContents(),
759764
informerFactory.Snapshot().V1beta1().VolumeSnapshotClasses(),
760765
coreFactory.Core().V1().PersistentVolumeClaims(),
766+
metricsManager,
761767
60*time.Second,
762768
)
763769

pkg/common-controller/snapshot_controller.go

Lines changed: 107 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
klog "k8s.io/klog/v2"
3434

3535
crdv1 "github.com/kubernetes-csi/external-snapshotter/client/v3/apis/volumesnapshot/v1beta1"
36+
"github.com/kubernetes-csi/external-snapshotter/v3/pkg/metrics"
3637
"github.com/kubernetes-csi/external-snapshotter/v3/pkg/utils"
3738
)
3839

@@ -236,6 +237,20 @@ func (ctrl *csiSnapshotCommonController) syncSnapshot(snapshot *crdv1.VolumeSnap
236237
// 2. Call checkandRemoveSnapshotFinalizersAndCheckandDeleteContent() with information obtained from step 1. This function name is very long but the name suggests what it does. It determines whether to remove finalizers on snapshot and whether to delete content.
237238
func (ctrl *csiSnapshotCommonController) processSnapshotWithDeletionTimestamp(snapshot *crdv1.VolumeSnapshot) error {
238239
klog.V(5).Infof("processSnapshotWithDeletionTimestamp VolumeSnapshot[%s]: %s", utils.SnapshotKey(snapshot), utils.GetSnapshotStatusForLogging(snapshot))
240+
driverName, err := ctrl.getSnapshotDriverName(snapshot)
241+
if err != nil {
242+
klog.Errorf("failed to getSnapshotDriverName while recording metrics for snapshot %q: %v", utils.SnapshotKey(snapshot), err)
243+
}
244+
245+
snapshotProvisionType := metrics.DynamicSnapshotType
246+
if snapshot.Spec.Source.VolumeSnapshotContentName != nil {
247+
snapshotProvisionType = metrics.PreProvisionedSnapshotType
248+
}
249+
250+
// Processing delete, start operation metric
251+
deleteOperationKey := metrics.NewOperationKey(metrics.DeleteSnapshotOperationName, snapshot.UID)
252+
deleteOperationValue := metrics.NewOperationValue(driverName, snapshotProvisionType)
253+
ctrl.metricsManager.OperationStart(deleteOperationKey, deleteOperationValue)
239254

240255
var contentName string
241256
if snapshot.Status != nil && snapshot.Status.BoundVolumeSnapshotContentName != nil {
@@ -270,6 +285,7 @@ func (ctrl *csiSnapshotCommonController) processSnapshotWithDeletionTimestamp(sn
270285
}
271286

272287
klog.V(5).Infof("processSnapshotWithDeletionTimestamp[%s]: delete snapshot content and remove finalizer from snapshot if needed", utils.SnapshotKey(snapshot))
288+
273289
return ctrl.checkandRemoveSnapshotFinalizersAndCheckandDeleteContent(snapshot, content, deleteContent)
274290
}
275291

@@ -389,6 +405,7 @@ func (ctrl *csiSnapshotCommonController) syncReadySnapshot(snapshot *crdv1.Volum
389405
// snapshot is bound but content is not pointing to the snapshot
390406
return ctrl.updateSnapshotErrorStatusWithEvent(snapshot, v1.EventTypeWarning, "SnapshotMisbound", "VolumeSnapshotContent is not bound to the VolumeSnapshot correctly")
391407
}
408+
392409
// everything is verified, return
393410
return nil
394411
}
@@ -397,20 +414,45 @@ func (ctrl *csiSnapshotCommonController) syncReadySnapshot(snapshot *crdv1.Volum
397414
func (ctrl *csiSnapshotCommonController) syncUnreadySnapshot(snapshot *crdv1.VolumeSnapshot) error {
398415
uniqueSnapshotName := utils.SnapshotKey(snapshot)
399416
klog.V(5).Infof("syncUnreadySnapshot %s", uniqueSnapshotName)
417+
driverName, err := ctrl.getSnapshotDriverName(snapshot)
418+
if err != nil {
419+
klog.Errorf("failed to getSnapshotDriverName while recording metrics for snapshot %q: %s", utils.SnapshotKey(snapshot), err)
420+
}
421+
422+
snapshotProvisionType := metrics.DynamicSnapshotType
423+
if snapshot.Spec.Source.VolumeSnapshotContentName != nil {
424+
snapshotProvisionType = metrics.PreProvisionedSnapshotType
425+
}
426+
427+
// Start metrics operations
428+
if !utils.IsSnapshotCreated(snapshot) {
429+
// Only start CreateSnapshot operation if the snapshot has not been cut
430+
ctrl.metricsManager.OperationStart(
431+
metrics.NewOperationKey(metrics.CreateSnapshotOperationName, snapshot.UID),
432+
metrics.NewOperationValue(driverName, snapshotProvisionType),
433+
)
434+
}
435+
ctrl.metricsManager.OperationStart(
436+
metrics.NewOperationKey(metrics.CreateSnapshotAndReadyOperationName, snapshot.UID),
437+
metrics.NewOperationValue(driverName, snapshotProvisionType),
438+
)
400439

401440
// Pre-provisioned snapshot
402441
if snapshot.Spec.Source.VolumeSnapshotContentName != nil {
403442
content, err := ctrl.getPreprovisionedContentFromStore(snapshot)
404443
if err != nil {
405444
return err
406445
}
446+
407447
// if no content found yet, update status and return
408448
if content == nil {
409449
// can not find the desired VolumeSnapshotContent from cache store
410450
ctrl.updateSnapshotErrorStatusWithEvent(snapshot, v1.EventTypeWarning, "SnapshotContentMissing", "VolumeSnapshotContent is missing")
411451
klog.V(4).Infof("syncUnreadySnapshot[%s]: snapshot content %q requested but not found, will try again", utils.SnapshotKey(snapshot), *snapshot.Spec.Source.VolumeSnapshotContentName)
452+
412453
return fmt.Errorf("snapshot %s requests an non-existing content %s", utils.SnapshotKey(snapshot), *snapshot.Spec.Source.VolumeSnapshotContentName)
413454
}
455+
414456
// Set VolumeSnapshotRef UID
415457
newContent, err := ctrl.checkandBindSnapshotContent(snapshot, content)
416458
if err != nil {
@@ -427,8 +469,10 @@ func (ctrl *csiSnapshotCommonController) syncUnreadySnapshot(snapshot *crdv1.Vol
427469
ctrl.updateSnapshotErrorStatusWithEvent(snapshot, v1.EventTypeWarning, "SnapshotStatusUpdateFailed", fmt.Sprintf("Snapshot status update failed, %v", err))
428470
return err
429471
}
472+
430473
return nil
431474
}
475+
432476
// snapshot.Spec.Source.VolumeSnapshotContentName == nil - dynamically creating snapshot
433477
klog.V(5).Infof("getDynamicallyProvisionedContentFromStore for snapshot %s", uniqueSnapshotName)
434478
contentObj, err := ctrl.getDynamicallyProvisionedContentFromStore(snapshot)
@@ -1095,10 +1139,30 @@ func (ctrl *csiSnapshotCommonController) updateSnapshotStatus(snapshot *crdv1.Vo
10951139
if updated {
10961140
snapshotClone := snapshotObj.DeepCopy()
10971141
snapshotClone.Status = newStatus
1142+
1143+
// We need to record metrics before updating the status due to a bug causing cache entries after a failed UpdateStatus call.
1144+
// Must meet the following criteria to emit a successful CreateSnapshot status
1145+
// 1. Previous status was nil OR Previous status had a nil CreationTime
1146+
// 2. New status must be non-nil with a non-nil CreationTime
1147+
driverName := content.Spec.Driver
1148+
createOperationKey := metrics.NewOperationKey(metrics.CreateSnapshotOperationName, snapshot.UID)
1149+
if !utils.IsSnapshotCreated(snapshotObj) && utils.IsSnapshotCreated(snapshotClone) {
1150+
ctrl.metricsManager.RecordMetrics(createOperationKey, metrics.NewSnapshotOperationStatus(metrics.SnapshotStatusTypeSuccess), driverName)
1151+
}
1152+
1153+
// Must meet the following criteria to emit a successful CreateSnapshotAndReady status
1154+
// 1. Previous status was nil OR Previous status had a nil ReadyToUse OR Previous status had a false ReadyToUse
1155+
// 2. New status must be non-nil with a ReadyToUse as true
1156+
if !utils.IsSnapshotReady(snapshotObj) && utils.IsSnapshotReady(snapshotClone) {
1157+
createAndReadyOperation := metrics.NewOperationKey(metrics.CreateSnapshotAndReadyOperationName, snapshot.UID)
1158+
ctrl.metricsManager.RecordMetrics(createAndReadyOperation, metrics.NewSnapshotOperationStatus(metrics.SnapshotStatusTypeSuccess), driverName)
1159+
}
1160+
10981161
newSnapshotObj, err := ctrl.clientset.SnapshotV1beta1().VolumeSnapshots(snapshotClone.Namespace).UpdateStatus(context.TODO(), snapshotClone, metav1.UpdateOptions{})
10991162
if err != nil {
11001163
return nil, newControllerUpdateError(utils.SnapshotKey(snapshot), err.Error())
11011164
}
1165+
11021166
return newSnapshotObj, nil
11031167
}
11041168

@@ -1186,8 +1250,49 @@ func (ctrl *csiSnapshotCommonController) getSnapshotClass(className string) (*cr
11861250
return class, nil
11871251
}
11881252

1189-
// SetDefaultSnapshotClass is a helper function to figure out the default snapshot class from
1190-
// PVC/PV StorageClass and update VolumeSnapshot with this snapshot class name.
1253+
// getSnapshotDriverName is a helper function to get snapshot driver from the VolumeSnapshot.
1254+
// We try to get the driverName in multiple ways, as snapshot controller metrics depend on the correct driverName.
1255+
func (ctrl *csiSnapshotCommonController) getSnapshotDriverName(vs *crdv1.VolumeSnapshot) (string, error) {
1256+
klog.V(5).Infof("getSnapshotDriverName: VolumeSnapshot[%s]", vs.Name)
1257+
var driverName string
1258+
1259+
// Pre-Provisioned snapshots have contentName as source
1260+
var contentName string
1261+
if vs.Spec.Source.VolumeSnapshotContentName != nil {
1262+
contentName = *vs.Spec.Source.VolumeSnapshotContentName
1263+
}
1264+
1265+
// Get Driver name from SnapshotContent if we found a contentName
1266+
if contentName != "" {
1267+
content, err := ctrl.contentLister.Get(contentName)
1268+
if err != nil {
1269+
klog.Errorf("getSnapshotDriverName: failed to get snapshotContent: %v", contentName)
1270+
} else {
1271+
driverName = content.Spec.Driver
1272+
}
1273+
1274+
if driverName != "" {
1275+
return driverName, nil
1276+
}
1277+
}
1278+
1279+
// Dynamic snapshots will have a snapshotclass with a driver
1280+
if vs.Spec.VolumeSnapshotClassName != nil {
1281+
class, err := ctrl.getSnapshotClass(*vs.Spec.VolumeSnapshotClassName)
1282+
if err != nil {
1283+
klog.Errorf("getSnapshotDriverName: failed to get snapshotClass: %v", *vs.Spec.VolumeSnapshotClassName)
1284+
} else {
1285+
driverName = class.Driver
1286+
}
1287+
}
1288+
1289+
return driverName, nil
1290+
}
1291+
1292+
// SetDefaultSnapshotClass is a helper function to figure out the default snapshot class.
1293+
// For pre-provisioned case, it's an no-op.
1294+
// For dynamic provisioning, it gets the default SnapshotClasses in the system if there is any(could be multiple),
1295+
// and finds the one with the same CSI Driver as the PV from which a snapshot will be taken.
11911296
func (ctrl *csiSnapshotCommonController) SetDefaultSnapshotClass(snapshot *crdv1.VolumeSnapshot) (*crdv1.VolumeSnapshotClass, *crdv1.VolumeSnapshot, error) {
11921297
klog.V(5).Infof("SetDefaultSnapshotClass for snapshot [%s]", snapshot.Name)
11931298

pkg/common-controller/snapshot_controller_base.go

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
clientset "github.com/kubernetes-csi/external-snapshotter/client/v3/clientset/versioned"
2525
storageinformers "github.com/kubernetes-csi/external-snapshotter/client/v3/informers/externalversions/volumesnapshot/v1beta1"
2626
storagelisters "github.com/kubernetes-csi/external-snapshotter/client/v3/listers/volumesnapshot/v1beta1"
27+
"github.com/kubernetes-csi/external-snapshotter/v3/pkg/metrics"
2728
"github.com/kubernetes-csi/external-snapshotter/v3/pkg/utils"
2829

2930
v1 "k8s.io/api/core/v1"
@@ -60,6 +61,8 @@ type csiSnapshotCommonController struct {
6061
snapshotStore cache.Store
6162
contentStore cache.Store
6263

64+
metricsManager metrics.MetricsManager
65+
6366
resyncPeriod time.Duration
6467
}
6568

@@ -71,6 +74,7 @@ func NewCSISnapshotCommonController(
7174
volumeSnapshotContentInformer storageinformers.VolumeSnapshotContentInformer,
7275
volumeSnapshotClassInformer storageinformers.VolumeSnapshotClassInformer,
7376
pvcInformer coreinformers.PersistentVolumeClaimInformer,
77+
metricsManager metrics.MetricsManager,
7478
resyncPeriod time.Duration,
7579
) *csiSnapshotCommonController {
7680
broadcaster := record.NewBroadcaster()
@@ -80,14 +84,15 @@ func NewCSISnapshotCommonController(
8084
eventRecorder = broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: fmt.Sprintf("snapshot-controller")})
8185

8286
ctrl := &csiSnapshotCommonController{
83-
clientset: clientset,
84-
client: client,
85-
eventRecorder: eventRecorder,
86-
resyncPeriod: resyncPeriod,
87-
snapshotStore: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc),
88-
contentStore: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc),
89-
snapshotQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "snapshot-controller-snapshot"),
90-
contentQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "snapshot-controller-content"),
87+
clientset: clientset,
88+
client: client,
89+
eventRecorder: eventRecorder,
90+
resyncPeriod: resyncPeriod,
91+
snapshotStore: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc),
92+
contentStore: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc),
93+
snapshotQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "snapshot-controller-snapshot"),
94+
contentQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "snapshot-controller-content"),
95+
metricsManager: metricsManager,
9196
}
9297

9398
ctrl.pvcLister = pvcInformer.Lister()
@@ -363,6 +368,7 @@ func (ctrl *csiSnapshotCommonController) updateSnapshot(snapshot *crdv1.VolumeSn
363368
if !newSnapshot {
364369
return nil
365370
}
371+
366372
err = ctrl.syncSnapshot(snapshot)
367373
if err != nil {
368374
if errors.IsConflict(err) {
@@ -407,6 +413,13 @@ func (ctrl *csiSnapshotCommonController) updateContent(content *crdv1.VolumeSnap
407413
func (ctrl *csiSnapshotCommonController) deleteSnapshot(snapshot *crdv1.VolumeSnapshot) {
408414
_ = ctrl.snapshotStore.Delete(snapshot)
409415
klog.V(4).Infof("snapshot %q deleted", utils.SnapshotKey(snapshot))
416+
driverName, err := ctrl.getSnapshotDriverName(snapshot)
417+
if err != nil {
418+
klog.Errorf("failed to getSnapshotDriverName while recording metrics for snapshot %q: %s", utils.SnapshotKey(snapshot), err)
419+
} else {
420+
deleteOperationKey := metrics.NewOperationKey(metrics.DeleteSnapshotOperationName, snapshot.UID)
421+
ctrl.metricsManager.RecordMetrics(deleteOperationKey, metrics.NewSnapshotOperationStatus(metrics.SnapshotStatusTypeSuccess), driverName)
422+
}
410423

411424
snapshotContentName := ""
412425
if snapshot.Status != nil && snapshot.Status.BoundVolumeSnapshotContentName != nil {
@@ -416,6 +429,7 @@ func (ctrl *csiSnapshotCommonController) deleteSnapshot(snapshot *crdv1.VolumeSn
416429
klog.V(5).Infof("deleteSnapshot[%q]: content not bound", utils.SnapshotKey(snapshot))
417430
return
418431
}
432+
419433
// sync the content when its snapshot is deleted. Explicitly sync'ing the
420434
// content here in response to snapshot deletion prevents the content from
421435
// waiting until the next sync period for its Release.

0 commit comments

Comments
 (0)