Skip to content

Commit 7bc7d91

Browse files
authored
Merge pull request #585 from nearora-msft/implement-distributed-snapshotting
Implement distributed snapshotting
2 parents 7cbfc45 + 21fc337 commit 7bc7d91

File tree

16 files changed

+870
-14
lines changed

16 files changed

+870
-14
lines changed

README.md

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,20 @@ Read more about how to install the example webhook [here](deploy/kubernetes/webh
109109

110110
* `--port`: Secure port that the webhook listens on (default 443)
111111

112+
### Distributed Snapshotting
113+
114+
The distributed snapshotting feature is provided to handle snapshot operations for local volumes. To use this functionality, the snapshotter sidecar should be deployed along with the csi driver on each node so that every node manages the snapshot operations only for the volumes local to that node. This feature can be enabled by setting the following command line options to true:
115+
116+
#### Snapshot controller option
117+
118+
* `--enable-distributed-snapshotting`: This option lets the snapshot controller know that distributed snapshotting is enabled and the snapshotter sidecar will be running on each node. Off by default.
119+
120+
#### CSI external snapshotter sidecar option
121+
122+
* `--node-deployment`: Enables the snapshotter sidecar to handle snapshot operations for the volumes local to the node on which it is deployed. Off by default.
123+
124+
Other than this, the NODE_NAME environment variable must be set where the CSI snapshotter sidecar is deployed. The value of NODE_NAME should be the name of the node where the sidecar is running.
125+
112126
### Snapshot controller command line options
113127

114128
#### Important optional arguments that are highly recommended to be used
@@ -134,7 +148,9 @@ Read more about how to install the example webhook [here](deploy/kubernetes/webh
134148

135149
* `--retry-interval-start`: Initial retry interval of failed volume snapshot creation or deletion. It doubles with each failure, up to retry-interval-max. Default value is 1 second.
136150

137-
*`--retry-interval-max`: Maximum retry interval of failed volume snapshot creation or deletion. Default value is 5 minutes.
151+
* `--retry-interval-max`: Maximum retry interval of failed volume snapshot creation or deletion. Default value is 5 minutes.
152+
153+
* `--enable-distributed-snapshotting` : Enables each node to handle snapshots for the volumes local to that node. Off by default. It should be set to true only if `--node-deployment` parameter for the csi external snapshotter sidecar is set to true. See https://github.com/kubernetes-csi/external-snapshotter/blob/master/README.md#distributed-snapshotting for details.
138154

139155
#### Other recognized arguments
140156
* `--kubeconfig <path>`: Path to Kubernetes client configuration that the snapshot controller uses to connect to Kubernetes API server. When omitted, default token provided by Kubernetes will be used. This option is useful only when the snapshot controller does not run as a Kubernetes pod, e.g. for debugging.
@@ -172,9 +188,11 @@ Read more about how to install the example webhook [here](deploy/kubernetes/webh
172188

173189
* `--worker-threads`: Number of worker threads for running create snapshot and delete snapshot operations. Default value is 10.
174190

191+
* `--node-deployment`: Enables deploying the sidecar controller together with a CSI driver on nodes to manage node-local volumes. Off by default. This should be set to true along with the `--enable-distributed-snapshotting` in the snapshot controller parameters to make use of distributed snapshotting. See https://github.com/kubernetes-csi/external-snapshotter/blob/master/README.md#distributed-snapshotting for details.
192+
175193
* `--retry-interval-start`: Initial retry interval of failed volume snapshot creation or deletion. It doubles with each failure, up to retry-interval-max. Default value is 1 second.
176194

177-
*`--retry-interval-max`: Maximum retry interval of failed volume snapshot creation or deletion. Default value is 5 minutes.
195+
* `--retry-interval-max`: Maximum retry interval of failed volume snapshot creation or deletion. Default value is 5 minutes.
178196
#### Other recognized arguments
179197
* `--kubeconfig <path>`: Path to Kubernetes client configuration that the CSI external-snapshotter uses to connect to Kubernetes API server. When omitted, default token provided by Kubernetes will be used. This option is useful only when the external-snapshotter does not run as a Kubernetes pod, e.g. for debugging.
180198

cmd/csi-snapshotter/main.go

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,12 @@ import (
2626
"strings"
2727
"time"
2828

29+
utils "github.com/kubernetes-csi/external-snapshotter/v4/pkg/utils"
30+
2931
"google.golang.org/grpc"
3032

33+
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
34+
"k8s.io/apimachinery/pkg/labels"
3135
"k8s.io/client-go/kubernetes"
3236
"k8s.io/client-go/kubernetes/scheme"
3337
"k8s.io/client-go/rest"
@@ -75,11 +79,12 @@ var (
7579
kubeAPIQPS = flag.Float64("kube-api-qps", 5, "QPS to use while communicating with the kubernetes apiserver. Defaults to 5.0.")
7680
kubeAPIBurst = flag.Int("kube-api-burst", 10, "Burst to use while communicating with the kubernetes apiserver. Defaults to 10.")
7781

78-
metricsAddress = flag.String("metrics-address", "", "(deprecated) The TCP network address where the prometheus metrics endpoint will listen (example: `:8080`). The default is empty string, which means metrics endpoint is disabled. Only one of `--metrics-address` and `--http-endpoint` can be set.")
79-
httpEndpoint = flag.String("http-endpoint", "", "The TCP network address where the HTTP server for diagnostics, including metrics and leader election health check, will listen (example: `:8080`). The default is empty string, which means the server is disabled. Only one of `--metrics-address` and `--http-endpoint` can be set.")
80-
metricsPath = flag.String("metrics-path", "/metrics", "The HTTP path where prometheus metrics will be exposed. Default is `/metrics`.")
81-
retryIntervalStart = flag.Duration("retry-interval-start", time.Second, "Initial retry interval of failed volume snapshot creation or deletion. It doubles with each failure, up to retry-interval-max. Default is 1 second.")
82-
retryIntervalMax = flag.Duration("retry-interval-max", 5*time.Minute, "Maximum retry interval of failed volume snapshot creation or deletion. Default is 5 minutes.")
82+
metricsAddress = flag.String("metrics-address", "", "(deprecated) The TCP network address where the prometheus metrics endpoint will listen (example: `:8080`). The default is empty string, which means metrics endpoint is disabled. Only one of `--metrics-address` and `--http-endpoint` can be set.")
83+
httpEndpoint = flag.String("http-endpoint", "", "The TCP network address where the HTTP server for diagnostics, including metrics and leader election health check, will listen (example: `:8080`). The default is empty string, which means the server is disabled. Only one of `--metrics-address` and `--http-endpoint` can be set.")
84+
metricsPath = flag.String("metrics-path", "/metrics", "The HTTP path where prometheus metrics will be exposed. Default is `/metrics`.")
85+
retryIntervalStart = flag.Duration("retry-interval-start", time.Second, "Initial retry interval of failed volume snapshot creation or deletion. It doubles with each failure, up to retry-interval-max. Default is 1 second.")
86+
retryIntervalMax = flag.Duration("retry-interval-max", 5*time.Minute, "Maximum retry interval of failed volume snapshot creation or deletion. Default is 5 minutes.")
87+
enableNodeDeployment = flag.Bool("node-deployment", false, "Enables deploying the sidecar controller together with a CSI driver on nodes to manage snapshots for node-local volumes.")
8388
)
8489

8590
var (
@@ -98,6 +103,12 @@ func main() {
98103
}
99104
klog.Infof("Version: %s", version)
100105

106+
// If distributed snapshotting is enabled and leaderElection is also set to true, return
107+
if *enableNodeDeployment && *leaderElection {
108+
klog.Error("Leader election cannot happen when node-deployment is set to true")
109+
os.Exit(1)
110+
}
111+
101112
// Create the client config. Use kubeconfig if given, otherwise assume in-cluster.
102113
config, err := buildConfig(*kubeconfig)
103114
if err != nil {
@@ -122,6 +133,19 @@ func main() {
122133

123134
factory := informers.NewSharedInformerFactory(snapClient, *resyncPeriod)
124135
coreFactory := coreinformers.NewSharedInformerFactory(kubeClient, *resyncPeriod)
136+
var snapshotContentfactory informers.SharedInformerFactory
137+
if *enableNodeDeployment {
138+
node := os.Getenv("NODE_NAME")
139+
if node == "" {
140+
klog.Fatal("The NODE_NAME environment variable must be set when using --enable-node-deployment.")
141+
}
142+
snapshotContentfactory = informers.NewSharedInformerFactoryWithOptions(snapClient, *resyncPeriod, informers.WithTweakListOptions(func(lo *v1.ListOptions) {
143+
lo.LabelSelector = labels.Set{utils.VolumeSnapshotContentManagedByLabel: node}.AsSelector().String()
144+
}),
145+
)
146+
} else {
147+
snapshotContentfactory = factory
148+
}
125149

126150
// Add Snapshot types to the default Kubernetes so events can be logged for them
127151
snapshotscheme.AddToScheme(scheme.Scheme)
@@ -202,7 +226,7 @@ func main() {
202226
snapClient,
203227
kubeClient,
204228
driverName,
205-
factory.Snapshot().V1().VolumeSnapshotContents(),
229+
snapshotContentfactory.Snapshot().V1().VolumeSnapshotContents(),
206230
factory.Snapshot().V1().VolumeSnapshotClasses(),
207231
snapShotter,
208232
*csiTimeout,
@@ -216,6 +240,7 @@ func main() {
216240
run := func(context.Context) {
217241
// run...
218242
stopCh := make(chan struct{})
243+
snapshotContentfactory.Start(stopCh)
219244
factory.Start(stopCh)
220245
coreFactory.Start(stopCh)
221246
go ctrl.Run(*threads, stopCh)

cmd/snapshot-controller/main.go

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"sync"
2828
"time"
2929

30+
v1 "k8s.io/client-go/informers/core/v1"
3031
"k8s.io/client-go/kubernetes"
3132
"k8s.io/client-go/kubernetes/scheme"
3233
"k8s.io/client-go/rest"
@@ -64,10 +65,11 @@ var (
6465
kubeAPIQPS = flag.Float64("kube-api-qps", 5, "QPS to use while communicating with the kubernetes apiserver. Defaults to 5.0.")
6566
kubeAPIBurst = flag.Int("kube-api-burst", 10, "Burst to use while communicating with the kubernetes apiserver. Defaults to 10.")
6667

67-
httpEndpoint = flag.String("http-endpoint", "", "The TCP network address where the HTTP server for diagnostics, including metrics, will listen (example: :8080). The default is empty string, which means the server is disabled.")
68-
metricsPath = flag.String("metrics-path", "/metrics", "The HTTP path where prometheus metrics will be exposed. Default is `/metrics`.")
69-
retryIntervalStart = flag.Duration("retry-interval-start", time.Second, "Initial retry interval of failed volume snapshot creation or deletion. It doubles with each failure, up to retry-interval-max. Default is 1 second.")
70-
retryIntervalMax = flag.Duration("retry-interval-max", 5*time.Minute, "Maximum retry interval of failed volume snapshot creation or deletion. Default is 5 minutes.")
68+
httpEndpoint = flag.String("http-endpoint", "", "The TCP network address where the HTTP server for diagnostics, including metrics, will listen (example: :8080). The default is empty string, which means the server is disabled.")
69+
metricsPath = flag.String("metrics-path", "/metrics", "The HTTP path where prometheus metrics will be exposed. Default is `/metrics`.")
70+
retryIntervalStart = flag.Duration("retry-interval-start", time.Second, "Initial retry interval of failed volume snapshot creation or deletion. It doubles with each failure, up to retry-interval-max. Default is 1 second.")
71+
retryIntervalMax = flag.Duration("retry-interval-max", 5*time.Minute, "Maximum retry interval of failed volume snapshot creation or deletion. Default is 5 minutes.")
72+
enableDistributedSnapshotting = flag.Bool("enable-distributed-snapshotting", false, "Enables each node to handle snapshotting for the local volumes created on that node")
7173
)
7274

7375
var (
@@ -147,6 +149,11 @@ func main() {
147149

148150
factory := informers.NewSharedInformerFactory(snapClient, *resyncPeriod)
149151
coreFactory := coreinformers.NewSharedInformerFactory(kubeClient, *resyncPeriod)
152+
var nodeInformer v1.NodeInformer
153+
154+
if *enableDistributedSnapshotting {
155+
nodeInformer = coreFactory.Core().V1().Nodes()
156+
}
150157

151158
// Create and register metrics manager
152159
metricsManager := metrics.NewMetricsManager()
@@ -174,10 +181,12 @@ func main() {
174181
factory.Snapshot().V1().VolumeSnapshotContents(),
175182
factory.Snapshot().V1().VolumeSnapshotClasses(),
176183
coreFactory.Core().V1().PersistentVolumeClaims(),
184+
nodeInformer,
177185
metricsManager,
178186
*resyncPeriod,
179187
workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax),
180188
workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax),
189+
*enableDistributedSnapshotting,
181190
)
182191

183192
if err := ensureCustomResourceDefinitionsExist(snapClient); err != nil {

deploy/kubernetes/snapshot-controller/rbac-snapshot-controller.yaml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,10 @@ rules:
4444
- apiGroups: ["snapshot.storage.k8s.io"]
4545
resources: ["volumesnapshots/status"]
4646
verbs: ["update", "patch"]
47-
47+
# Enable this RBAC rule only when using distributed snapshotting, i.e. when the enable-distributed-snapshotting flag is set to true
48+
# - apiGroups: [""]
49+
# resources: ["nodes"]
50+
# verbs: ["get", "list", "watch"]
4851
---
4952
kind: ClusterRoleBinding
5053
apiVersion: rbac.authorization.k8s.io/v1

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ require (
2424
k8s.io/apimachinery v0.23.0
2525
k8s.io/client-go v0.23.0
2626
k8s.io/component-base v0.23.0
27+
k8s.io/component-helpers v0.23.0
2728
k8s.io/klog/v2 v2.30.0
2829
k8s.io/kubernetes v1.23.0
2930
)

go.sum

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1100,6 +1100,7 @@ k8s.io/cluster-bootstrap v0.23.0/go.mod h1:VltEnKWfrRTiKgOXp3ts3vh7yqNlH6KFKFflo
11001100
k8s.io/code-generator v0.23.0/go.mod h1:vQvOhDXhuzqiVfM/YHp+dmg10WDZCchJVObc9MvowsE=
11011101
k8s.io/component-base v0.23.0 h1:UAnyzjvVZ2ZR1lF35YwtNY6VMN94WtOnArcXBu34es8=
11021102
k8s.io/component-base v0.23.0/go.mod h1:DHH5uiFvLC1edCpvcTDV++NKULdYYU6pR9Tt3HIKMKI=
1103+
k8s.io/component-helpers v0.23.0 h1:qNbqN10QTefiWcCOPkHL/0nn81sdKVv6ZgEXcSyot/U=
11031104
k8s.io/component-helpers v0.23.0/go.mod h1:liXMh6FZS4qamKtMJQ7uLHnFe3tlC86RX5mJEk/aerg=
11041105
k8s.io/controller-manager v0.23.0/go.mod h1:6/IKItSv6p9FY3mSbHgsOYmt4y+HDxiC5hEFg9rJVc8=
11051106
k8s.io/cri-api v0.23.0/go.mod h1:2edENu3/mkyW3c6fVPPPaVGEFbLRacJizBbSp7ZOLOo=

pkg/common-controller/framework_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -838,10 +838,12 @@ func newTestController(kubeClient kubernetes.Interface, clientset clientset.Inte
838838
informerFactory.Snapshot().V1().VolumeSnapshotContents(),
839839
informerFactory.Snapshot().V1().VolumeSnapshotClasses(),
840840
coreFactory.Core().V1().PersistentVolumeClaims(),
841+
nil,
841842
metricsManager,
842843
60*time.Second,
843844
workqueue.NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1*time.Minute),
844845
workqueue.NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1*time.Minute),
846+
false,
845847
)
846848

847849
ctrl.eventRecorder = record.NewFakeRecorder(1000)

pkg/common-controller/snapshot_controller.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"k8s.io/apimachinery/pkg/labels"
3030
"k8s.io/client-go/kubernetes/scheme"
3131
ref "k8s.io/client-go/tools/reference"
32+
corev1helpers "k8s.io/component-helpers/scheduling/corev1"
3233
klog "k8s.io/klog/v2"
3334

3435
crdv1 "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1"
@@ -671,6 +672,18 @@ func (ctrl *csiSnapshotCommonController) createSnapshotContent(snapshot *crdv1.V
671672
},
672673
}
673674

675+
if ctrl.enableDistributedSnapshotting {
676+
nodeName, err := ctrl.getManagedByNode(volume)
677+
if err != nil {
678+
return nil, err
679+
}
680+
if nodeName != "" {
681+
snapshotContent.Labels = map[string]string{
682+
utils.VolumeSnapshotContentManagedByLabel: nodeName,
683+
}
684+
}
685+
}
686+
674687
// Set AnnDeletionSecretRefName and AnnDeletionSecretRefNamespace
675688
if snapshotterSecretRef != nil {
676689
klog.V(5).Infof("createSnapshotContent: set annotation [%s] on content [%s].", utils.AnnDeletionSecretRefName, snapshotContent.Name)
@@ -1655,3 +1668,27 @@ func (ctrl *csiSnapshotCommonController) checkAndSetInvalidSnapshotLabel(snapsho
16551668

16561669
return updatedSnapshot, nil
16571670
}
1671+
1672+
func (ctrl *csiSnapshotCommonController) getManagedByNode(pv *v1.PersistentVolume) (string, error) {
1673+
if pv.Spec.NodeAffinity == nil {
1674+
klog.V(5).Infof("NodeAffinity not set for pv %s", pv.Name)
1675+
return "", nil
1676+
}
1677+
nodeSelectorTerms := pv.Spec.NodeAffinity.Required
1678+
1679+
nodes, err := ctrl.nodeLister.List(labels.Everything())
1680+
if err != nil {
1681+
klog.Errorf("failed to get the list of nodes: %q", err)
1682+
return "", err
1683+
}
1684+
1685+
for _, node := range nodes {
1686+
match, _ := corev1helpers.MatchNodeSelectorTerms(node, nodeSelectorTerms)
1687+
if match {
1688+
return node.Name, nil
1689+
}
1690+
}
1691+
1692+
klog.Errorf("failed to find nodes that match the node affinity requirements for pv[%s]", pv.Name)
1693+
return "", nil
1694+
}

pkg/common-controller/snapshot_controller_base.go

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,13 +57,17 @@ type csiSnapshotCommonController struct {
5757
classListerSynced cache.InformerSynced
5858
pvcLister corelisters.PersistentVolumeClaimLister
5959
pvcListerSynced cache.InformerSynced
60+
nodeLister corelisters.NodeLister
61+
nodeListerSynced cache.InformerSynced
6062

6163
snapshotStore cache.Store
6264
contentStore cache.Store
6365

6466
metricsManager metrics.MetricsManager
6567

6668
resyncPeriod time.Duration
69+
70+
enableDistributedSnapshotting bool
6771
}
6872

6973
// NewCSISnapshotController returns a new *csiSnapshotCommonController
@@ -74,10 +78,12 @@ func NewCSISnapshotCommonController(
7478
volumeSnapshotContentInformer storageinformers.VolumeSnapshotContentInformer,
7579
volumeSnapshotClassInformer storageinformers.VolumeSnapshotClassInformer,
7680
pvcInformer coreinformers.PersistentVolumeClaimInformer,
81+
nodeInformer coreinformers.NodeInformer,
7782
metricsManager metrics.MetricsManager,
7883
resyncPeriod time.Duration,
7984
snapshotRateLimiter workqueue.RateLimiter,
8085
contentRateLimiter workqueue.RateLimiter,
86+
enableDistributedSnapshotting bool,
8187
) *csiSnapshotCommonController {
8288
broadcaster := record.NewBroadcaster()
8389
broadcaster.StartLogging(klog.Infof)
@@ -125,6 +131,13 @@ func NewCSISnapshotCommonController(
125131
ctrl.classLister = volumeSnapshotClassInformer.Lister()
126132
ctrl.classListerSynced = volumeSnapshotClassInformer.Informer().HasSynced
127133

134+
ctrl.enableDistributedSnapshotting = enableDistributedSnapshotting
135+
136+
if enableDistributedSnapshotting {
137+
ctrl.nodeLister = nodeInformer.Lister()
138+
ctrl.nodeListerSynced = nodeInformer.Informer().HasSynced
139+
}
140+
128141
return ctrl
129142
}
130143

@@ -135,7 +148,12 @@ func (ctrl *csiSnapshotCommonController) Run(workers int, stopCh <-chan struct{}
135148
klog.Infof("Starting snapshot controller")
136149
defer klog.Infof("Shutting snapshot controller")
137150

138-
if !cache.WaitForCacheSync(stopCh, ctrl.snapshotListerSynced, ctrl.contentListerSynced, ctrl.classListerSynced, ctrl.pvcListerSynced) {
151+
informersSynced := []cache.InformerSynced{ctrl.snapshotListerSynced, ctrl.contentListerSynced, ctrl.classListerSynced, ctrl.pvcListerSynced}
152+
if ctrl.enableDistributedSnapshotting {
153+
informersSynced = append(informersSynced, ctrl.nodeListerSynced)
154+
}
155+
156+
if !cache.WaitForCacheSync(stopCh, informersSynced...) {
139157
klog.Errorf("Cannot sync caches")
140158
return
141159
}

0 commit comments

Comments
 (0)