Skip to content

Commit c6a03fa

Browse files
committed
Parallelize attach operations across different nodes for volumes that allow multi-attach
1 parent 37957e2 commit c6a03fa

File tree

9 files changed

+684
-199
lines changed

9 files changed

+684
-199
lines changed

pkg/controller/volume/attachdetach/reconciler/BUILD

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ go_library(
1616
"//pkg/controller/volume/attachdetach/statusupdater:go_default_library",
1717
"//pkg/kubelet/events:go_default_library",
1818
"//pkg/util/goroutinemap/exponentialbackoff:go_default_library",
19-
"//pkg/volume:go_default_library",
19+
"//pkg/volume/util:go_default_library",
2020
"//pkg/volume/util/operationexecutor:go_default_library",
2121
"//staging/src/k8s.io/api/core/v1:go_default_library",
2222
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",

pkg/controller/volume/attachdetach/reconciler/reconciler.go

Lines changed: 32 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import (
2424
"strings"
2525
"time"
2626

27-
"k8s.io/api/core/v1"
27+
v1 "k8s.io/api/core/v1"
2828
"k8s.io/apimachinery/pkg/types"
2929
"k8s.io/apimachinery/pkg/util/wait"
3030
"k8s.io/client-go/tools/record"
@@ -34,7 +34,7 @@ import (
3434
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/statusupdater"
3535
kevents "k8s.io/kubernetes/pkg/kubelet/events"
3636
"k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
37-
"k8s.io/kubernetes/pkg/volume"
37+
"k8s.io/kubernetes/pkg/volume/util"
3838
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
3939
)
4040

@@ -134,42 +134,6 @@ func (rc *reconciler) syncStates() {
134134
rc.attacherDetacher.VerifyVolumesAreAttached(volumesPerNode, rc.actualStateOfWorld)
135135
}
136136

137-
// isMultiAttachForbidden checks if attaching this volume to multiple nodes is definitely not allowed/possible.
138-
// In its current form, this function can only reliably say for which volumes it's definitely forbidden. If it returns
139-
// false, it is not guaranteed that multi-attach is actually supported by the volume type and we must rely on the
140-
// attacher to fail fast in such cases.
141-
// Please see https://github.com/kubernetes/kubernetes/issues/40669 and https://github.com/kubernetes/kubernetes/pull/40148#discussion_r98055047
142-
func (rc *reconciler) isMultiAttachForbidden(volumeSpec *volume.Spec) bool {
143-
if volumeSpec.Volume != nil {
144-
// Check for volume types which are known to fail slow or cause trouble when trying to multi-attach
145-
if volumeSpec.Volume.AzureDisk != nil ||
146-
volumeSpec.Volume.Cinder != nil {
147-
return true
148-
}
149-
}
150-
151-
// Only if this volume is a persistent volume, we have reliable information on whether it's allowed or not to
152-
// multi-attach. We trust in the individual volume implementations to not allow unsupported access modes
153-
if volumeSpec.PersistentVolume != nil {
154-
// Check for persistent volume types which do not fail when trying to multi-attach
155-
if len(volumeSpec.PersistentVolume.Spec.AccessModes) == 0 {
156-
// No access mode specified so we don't know for sure. Let the attacher fail if needed
157-
return false
158-
}
159-
160-
// check if this volume is allowed to be attached to multiple PODs/nodes, if yes, return false
161-
for _, accessMode := range volumeSpec.PersistentVolume.Spec.AccessModes {
162-
if accessMode == v1.ReadWriteMany || accessMode == v1.ReadOnlyMany {
163-
return false
164-
}
165-
}
166-
return true
167-
}
168-
169-
// we don't know if it's supported or not and let the attacher fail later in cases it's not supported
170-
return false
171-
}
172-
173137
func (rc *reconciler) reconcile() {
174138
// Detaches are triggered before attaches so that volumes referenced by
175139
// pods that are rescheduled to a different node are detached first.
@@ -182,9 +146,16 @@ func (rc *reconciler) reconcile() {
182146
// This check must be done before we do any other checks, as otherwise the other checks
183147
// may pass while at the same time the volume leaves the pending state, resulting in
184148
// double detach attempts
185-
if rc.attacherDetacher.IsOperationPending(attachedVolume.VolumeName, "") {
186-
klog.V(10).Infof("Operation for volume %q is already running. Can't start detach for %q", attachedVolume.VolumeName, attachedVolume.NodeName)
187-
continue
149+
if util.IsMultiAttachForbidden(attachedVolume.VolumeSpec) {
150+
if rc.attacherDetacher.IsOperationPending(attachedVolume.VolumeName, "" /* podName */, "" /* nodeName */) {
151+
klog.V(10).Infof("Operation for volume %q is already running in the cluster. Can't start detach for %q", attachedVolume.VolumeName, attachedVolume.NodeName)
152+
continue
153+
}
154+
} else {
155+
if rc.attacherDetacher.IsOperationPending(attachedVolume.VolumeName, "" /* podName */, attachedVolume.NodeName) {
156+
klog.V(10).Infof("Operation for volume %q is already running for node %q. Can't start detach", attachedVolume.VolumeName, attachedVolume.NodeName)
157+
continue
158+
}
188159
}
189160

190161
// Set the detach request time
@@ -260,15 +231,17 @@ func (rc *reconciler) attachDesiredVolumes() {
260231
rc.actualStateOfWorld.ResetDetachRequestTime(volumeToAttach.VolumeName, volumeToAttach.NodeName)
261232
continue
262233
}
263-
// Don't even try to start an operation if there is already one running
264-
if rc.attacherDetacher.IsOperationPending(volumeToAttach.VolumeName, "") {
265-
if klog.V(10) {
266-
klog.Infof("Operation for volume %q is already running. Can't start attach for %q", volumeToAttach.VolumeName, volumeToAttach.NodeName)
234+
235+
if util.IsMultiAttachForbidden(volumeToAttach.VolumeSpec) {
236+
237+
// Don't even try to start an operation if there is already one running for the given volume
238+
if rc.attacherDetacher.IsOperationPending(volumeToAttach.VolumeName, "" /* podName */, "" /* nodeName */) {
239+
if klog.V(10) {
240+
klog.Infof("Operation for volume %q is already running. Can't start attach for %q", volumeToAttach.VolumeName, volumeToAttach.NodeName)
241+
}
242+
continue
267243
}
268-
continue
269-
}
270244

271-
if rc.isMultiAttachForbidden(volumeToAttach.VolumeSpec) {
272245
nodes := rc.actualStateOfWorld.GetNodesForAttachedVolume(volumeToAttach.VolumeName)
273246
if len(nodes) > 0 {
274247
if !volumeToAttach.MultiAttachErrorReported {
@@ -277,6 +250,17 @@ func (rc *reconciler) attachDesiredVolumes() {
277250
}
278251
continue
279252
}
253+
254+
} else {
255+
256+
// Don't even try to start an operation if there is already one running for the given volume and node.
257+
if rc.attacherDetacher.IsOperationPending(volumeToAttach.VolumeName, "" /* podName */, volumeToAttach.NodeName) {
258+
if klog.V(10) {
259+
klog.Infof("Operation for volume %q is already running for node %q. Can't start attach", volumeToAttach.VolumeName, volumeToAttach.NodeName)
260+
}
261+
continue
262+
}
263+
280264
}
281265

282266
// Volume/Node doesn't exist, spawn a goroutine to attach it

pkg/kubelet/volumemanager/reconciler/reconciler.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -294,7 +294,7 @@ func (rc *reconciler) unmountDetachDevices() {
294294
for _, attachedVolume := range rc.actualStateOfWorld.GetUnmountedVolumes() {
295295
// Check IsOperationPending to avoid marking a volume as detached if it's in the process of mounting.
296296
if !rc.desiredStateOfWorld.VolumeExists(attachedVolume.VolumeName) &&
297-
!rc.operationExecutor.IsOperationPending(attachedVolume.VolumeName, nestedpendingoperations.EmptyUniquePodName) {
297+
!rc.operationExecutor.IsOperationPending(attachedVolume.VolumeName, nestedpendingoperations.EmptyUniquePodName, nestedpendingoperations.EmptyNodeName) {
298298
if attachedVolume.DeviceMayBeMounted() {
299299
// Volume is globally mounted to device, unmount it
300300
klog.V(5).Infof(attachedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountDevice", ""))
@@ -422,7 +422,7 @@ func (rc *reconciler) syncStates() {
422422
continue
423423
}
424424
// There is no pod that uses the volume.
425-
if rc.operationExecutor.IsOperationPending(reconstructedVolume.volumeName, nestedpendingoperations.EmptyUniquePodName) {
425+
if rc.operationExecutor.IsOperationPending(reconstructedVolume.volumeName, nestedpendingoperations.EmptyUniquePodName, nestedpendingoperations.EmptyNodeName) {
426426
klog.Warning("Volume is in pending operation, skip cleaning up mounts")
427427
}
428428
klog.V(2).Infof(

pkg/volume/util/nestedpendingoperations/BUILD

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ go_library(
1414
"//pkg/util/goroutinemap/exponentialbackoff:go_default_library",
1515
"//pkg/volume/util/types:go_default_library",
1616
"//staging/src/k8s.io/api/core/v1:go_default_library",
17+
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
1718
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
1819
"//vendor/k8s.io/klog:go_default_library",
1920
],
@@ -27,6 +28,7 @@ go_test(
2728
"//pkg/util/goroutinemap/exponentialbackoff:go_default_library",
2829
"//pkg/volume/util/types:go_default_library",
2930
"//staging/src/k8s.io/api/core/v1:go_default_library",
31+
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
3032
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
3133
],
3234
)

pkg/volume/util/nestedpendingoperations/nestedpendingoperations.go

Lines changed: 62 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -29,45 +29,73 @@ import (
2929
"sync"
3030

3131
"k8s.io/api/core/v1"
32+
"k8s.io/apimachinery/pkg/types"
3233
k8sRuntime "k8s.io/apimachinery/pkg/util/runtime"
3334
"k8s.io/klog"
3435
"k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
35-
"k8s.io/kubernetes/pkg/volume/util/types"
36+
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
3637
)
3738

3839
const (
3940
// EmptyUniquePodName is a UniquePodName for empty string.
40-
EmptyUniquePodName types.UniquePodName = types.UniquePodName("")
41+
EmptyUniquePodName volumetypes.UniquePodName = volumetypes.UniquePodName("")
4142

4243
// EmptyUniqueVolumeName is a UniqueVolumeName for empty string
4344
EmptyUniqueVolumeName v1.UniqueVolumeName = v1.UniqueVolumeName("")
45+
46+
// EmptyNodeName is a NodeName for empty string
47+
EmptyNodeName types.NodeName = types.NodeName("")
4448
)
4549

4650
// NestedPendingOperations defines the supported set of operations.
4751
type NestedPendingOperations interface {
48-
// Run adds the concatenation of volumeName and podName to the list of
49-
// running operations and spawns a new go routine to execute operationFunc.
50-
// If an operation with the same volumeName, same or empty podName
51-
// and same operationName exits, an AlreadyExists or ExponentialBackoff
52-
// error is returned. If an operation with same volumeName and podName
53-
// has ExponentialBackoff error but operationName is different, exponential
54-
// backoff is reset and operation is allowed to proceed.
55-
// This enables multiple operations to execute in parallel for the same
56-
// volumeName as long as they have different podName.
52+
53+
// Run adds the concatenation of volumeName and one of podName or nodeName to
54+
// the list of running operations and spawns a new go routine to execute
55+
// OperationFunc inside generatedOperations.
56+
57+
// volumeName, podName, and nodeName collectively form the operation key.
58+
// The following forms of operation keys are supported:
59+
// - volumeName empty, podName empty, nodeName empty
60+
// This key does not have any conflicting keys.
61+
// - volumeName exists, podName empty, nodeName empty
62+
// This key conflicts with all other keys with the same volumeName.
63+
// - volumeName exists, podName exists, nodeName empty
64+
// This key conflicts with:
65+
// - the same volumeName and podName
66+
// - the same volumeName, but no podName
67+
// - volumeName exists, podName empty, nodeName exists
68+
// This key conflicts with:
69+
// - the same volumeName and nodeName
70+
// - the same volumeName but no nodeName
71+
72+
// If an operation with the same operationName and a conflicting key exists,
73+
// an AlreadyExists or ExponentialBackoff error is returned.
74+
// If an operation with a conflicting key has ExponentialBackoff error but
75+
// operationName is different, exponential backoff is reset and operation is
76+
// allowed to proceed.
77+
5778
// Once the operation is complete, the go routine is terminated and the
58-
// concatenation of volumeName and podName is removed from the list of
59-
// executing operations allowing a new operation to be started with the
60-
// volumeName without error.
61-
Run(volumeName v1.UniqueVolumeName, podName types.UniquePodName, generatedOperations types.GeneratedOperations) error
79+
// concatenation of volumeName and (podName or nodeName) is removed from the
80+
// list of executing operations allowing a new operation to be started with
81+
// the volumeName without error.
82+
Run(
83+
volumeName v1.UniqueVolumeName,
84+
podName volumetypes.UniquePodName,
85+
nodeName types.NodeName,
86+
generatedOperations volumetypes.GeneratedOperations) error
6287

6388
// Wait blocks until all operations are completed. This is typically
6489
// necessary during tests - the test should wait until all operations finish
6590
// and evaluate results after that.
6691
Wait()
6792

68-
// IsOperationPending returns true if an operation for the given volumeName and podName is pending,
69-
// otherwise it returns false
70-
IsOperationPending(volumeName v1.UniqueVolumeName, podName types.UniquePodName) bool
93+
// IsOperationPending returns true if an operation for the given volumeName
94+
// and one of podName or nodeName is pending, otherwise it returns false
95+
IsOperationPending(
96+
volumeName v1.UniqueVolumeName,
97+
podName volumetypes.UniquePodName,
98+
nodeName types.NodeName) bool
7199
}
72100

73101
// NewNestedPendingOperations returns a new instance of NestedPendingOperations.
@@ -96,12 +124,13 @@ type operation struct {
96124

97125
func (grm *nestedPendingOperations) Run(
98126
volumeName v1.UniqueVolumeName,
99-
podName types.UniquePodName,
100-
generatedOperations types.GeneratedOperations) error {
127+
podName volumetypes.UniquePodName,
128+
nodeName types.NodeName,
129+
generatedOperations volumetypes.GeneratedOperations) error {
101130
grm.lock.Lock()
102131
defer grm.lock.Unlock()
103132

104-
opKey := operationKey{volumeName, podName}
133+
opKey := operationKey{volumeName, podName, nodeName}
105134

106135
opExists, previousOpIndex := grm.isOperationExists(opKey)
107136
if opExists {
@@ -149,12 +178,13 @@ func (grm *nestedPendingOperations) Run(
149178

150179
func (grm *nestedPendingOperations) IsOperationPending(
151180
volumeName v1.UniqueVolumeName,
152-
podName types.UniquePodName) bool {
181+
podName volumetypes.UniquePodName,
182+
nodeName types.NodeName) bool {
153183

154184
grm.lock.RLock()
155185
defer grm.lock.RUnlock()
156186

157-
opKey := operationKey{volumeName, podName}
187+
opKey := operationKey{volumeName, podName, nodeName}
158188
exist, previousOpIndex := grm.isOperationExists(opKey)
159189
if exist && grm.operations[previousOpIndex].operationPending {
160190
return true
@@ -177,8 +207,11 @@ func (grm *nestedPendingOperations) isOperationExists(key operationKey) (bool, i
177207
key.podName == EmptyUniquePodName ||
178208
previousOp.key.podName == key.podName
179209

210+
nodeNameMatch := previousOp.key.nodeName == EmptyNodeName ||
211+
key.nodeName == EmptyNodeName ||
212+
previousOp.key.nodeName == key.nodeName
180213

181-
if volumeNameMatch && podNameMatch {
214+
if volumeNameMatch && podNameMatch && nodeNameMatch {
182215
return true, previousOpIndex
183216
}
184217
}
@@ -264,15 +297,15 @@ func (grm *nestedPendingOperations) Wait() {
264297

265298
type operationKey struct {
266299
volumeName v1.UniqueVolumeName
267-
podName types.UniquePodName
300+
podName volumetypes.UniquePodName
301+
nodeName types.NodeName
268302
}
269303

270304
func (key operationKey) String() string {
271-
podNameStr := fmt.Sprintf(" (%q)", key.podName)
272-
273-
return fmt.Sprintf("%q%s",
305+
return fmt.Sprintf("{volumeName=%q, podName=%q, nodeName=%q}",
274306
key.volumeName,
275-
podNameStr)
307+
key.podName,
308+
key.nodeName)
276309
}
277310

278311
// NewAlreadyExistsError returns a new instance of AlreadyExists error.

0 commit comments

Comments
 (0)