Skip to content

Commit 37957e2

Browse files
committed
Refactor operation keys for NestedPendingOperations
1 parent 3fca0a6 commit 37957e2

File tree

1 file changed

+47
-64
lines changed

1 file changed

+47
-64
lines changed

pkg/volume/util/nestedpendingoperations/nestedpendingoperations.go

Lines changed: 47 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,7 @@ type nestedPendingOperations struct {
8888
}
8989

9090
type operation struct {
91-
volumeName v1.UniqueVolumeName
92-
podName types.UniquePodName
91+
key operationKey
9392
operationName string
9493
operationPending bool
9594
expBackoff exponentialbackoff.ExponentialBackoff
@@ -101,18 +100,19 @@ func (grm *nestedPendingOperations) Run(
101100
generatedOperations types.GeneratedOperations) error {
102101
grm.lock.Lock()
103102
defer grm.lock.Unlock()
104-
opExists, previousOpIndex := grm.isOperationExists(volumeName, podName)
103+
104+
opKey := operationKey{volumeName, podName}
105+
106+
opExists, previousOpIndex := grm.isOperationExists(opKey)
105107
if opExists {
106108
previousOp := grm.operations[previousOpIndex]
107109
// Operation already exists
108110
if previousOp.operationPending {
109111
// Operation is pending
110-
operationKey := getOperationKey(volumeName, podName)
111-
return NewAlreadyExistsError(operationKey)
112+
return NewAlreadyExistsError(opKey)
112113
}
113114

114-
operationKey := getOperationKey(volumeName, podName)
115-
backOffErr := previousOp.expBackoff.SafeToRetry(operationKey)
115+
backOffErr := previousOp.expBackoff.SafeToRetry(opKey.String())
116116
if backOffErr != nil {
117117
if previousOp.operationName == generatedOperations.OperationName {
118118
return backOffErr
@@ -124,15 +124,13 @@ func (grm *nestedPendingOperations) Run(
124124

125125
// Update existing operation to mark as pending.
126126
grm.operations[previousOpIndex].operationPending = true
127-
grm.operations[previousOpIndex].volumeName = volumeName
128-
grm.operations[previousOpIndex].podName = podName
127+
grm.operations[previousOpIndex].key = opKey
129128
} else {
130129
// Create a new operation
131130
grm.operations = append(grm.operations,
132131
operation{
132+
key: opKey,
133133
operationPending: true,
134-
volumeName: volumeName,
135-
podName: podName,
136134
operationName: generatedOperations.OperationName,
137135
expBackoff: exponentialbackoff.ExponentialBackoff{},
138136
})
@@ -142,7 +140,7 @@ func (grm *nestedPendingOperations) Run(
142140
// Handle unhandled panics (very unlikely)
143141
defer k8sRuntime.HandleCrash()
144142
// Handle completion of and error, if any, from operationFunc()
145-
defer grm.operationComplete(volumeName, podName, &detailedErr)
143+
defer grm.operationComplete(opKey, &detailedErr)
146144
return generatedOperations.Run()
147145
}()
148146

@@ -156,67 +154,58 @@ func (grm *nestedPendingOperations) IsOperationPending(
156154
grm.lock.RLock()
157155
defer grm.lock.RUnlock()
158156

159-
exist, previousOpIndex := grm.isOperationExists(volumeName, podName)
157+
opKey := operationKey{volumeName, podName}
158+
exist, previousOpIndex := grm.isOperationExists(opKey)
160159
if exist && grm.operations[previousOpIndex].operationPending {
161160
return true
162161
}
163162
return false
164163
}
165164

166165
// This is an internal function and caller should acquire and release the lock
167-
func (grm *nestedPendingOperations) isOperationExists(
168-
volumeName v1.UniqueVolumeName,
169-
podName types.UniquePodName) (bool, int) {
166+
func (grm *nestedPendingOperations) isOperationExists(key operationKey) (bool, int) {
170167

171168
// If volumeName is empty, operation can be executed concurrently
172-
if volumeName == EmptyUniqueVolumeName {
169+
if key.volumeName == EmptyUniqueVolumeName {
173170
return false, -1
174171
}
175172

176173
for previousOpIndex, previousOp := range grm.operations {
177-
if previousOp.volumeName != volumeName {
178-
// No match, keep searching
179-
continue
180-
}
174+
volumeNameMatch := previousOp.key.volumeName == key.volumeName
175+
176+
podNameMatch := previousOp.key.podName == EmptyUniquePodName ||
177+
key.podName == EmptyUniquePodName ||
178+
previousOp.key.podName == key.podName
181179

182-
if previousOp.podName != EmptyUniquePodName &&
183-
podName != EmptyUniquePodName &&
184-
previousOp.podName != podName {
185-
// No match, keep searching
186-
continue
187-
}
188180

189-
// Match
190-
return true, previousOpIndex
181+
if volumeNameMatch && podNameMatch {
182+
return true, previousOpIndex
183+
}
191184
}
185+
192186
return false, -1
193187
}
194188

195-
func (grm *nestedPendingOperations) getOperation(
196-
volumeName v1.UniqueVolumeName,
197-
podName types.UniquePodName) (uint, error) {
189+
func (grm *nestedPendingOperations) getOperation(key operationKey) (uint, error) {
198190
// Assumes lock has been acquired by caller.
199191

200192
for i, op := range grm.operations {
201-
if op.volumeName == volumeName &&
202-
op.podName == podName {
193+
if op.key.volumeName == key.volumeName &&
194+
op.key.podName == key.podName {
203195
return uint(i), nil
204196
}
205197
}
206198

207-
logOperationKey := getOperationKey(volumeName, podName)
208-
return 0, fmt.Errorf("Operation %q not found", logOperationKey)
199+
return 0, fmt.Errorf("Operation %q not found", key)
209200
}
210201

211-
func (grm *nestedPendingOperations) deleteOperation(
202+
func (grm *nestedPendingOperations) deleteOperation(key operationKey) {
212203
// Assumes lock has been acquired by caller.
213-
volumeName v1.UniqueVolumeName,
214-
podName types.UniquePodName) {
215204

216205
opIndex := -1
217206
for i, op := range grm.operations {
218-
if op.volumeName == volumeName &&
219-
op.podName == podName {
207+
if op.key.volumeName == key.volumeName &&
208+
op.key.podName == key.podName {
220209
opIndex = i
221210
break
222211
}
@@ -227,8 +216,7 @@ func (grm *nestedPendingOperations) deleteOperation(
227216
grm.operations = grm.operations[:len(grm.operations)-1]
228217
}
229218

230-
func (grm *nestedPendingOperations) operationComplete(
231-
volumeName v1.UniqueVolumeName, podName types.UniquePodName, err *error) {
219+
func (grm *nestedPendingOperations) operationComplete(key operationKey, err *error) {
232220
// Defer operations are executed in Last-In is First-Out order. In this case
233221
// the lock is acquired first when operationCompletes begins, and is
234222
// released when the method finishes, after the lock is released cond is
@@ -239,24 +227,20 @@ func (grm *nestedPendingOperations) operationComplete(
239227

240228
if *err == nil || !grm.exponentialBackOffOnError {
241229
// Operation completed without error, or exponentialBackOffOnError disabled
242-
grm.deleteOperation(volumeName, podName)
230+
grm.deleteOperation(key)
243231
if *err != nil {
244232
// Log error
245-
logOperationKey := getOperationKey(volumeName, podName)
246-
klog.Errorf("operation %s failed with: %v",
247-
logOperationKey,
248-
*err)
233+
klog.Errorf("operation %s failed with: %v", key, *err)
249234
}
250235
return
251236
}
252237

253238
// Operation completed with error and exponentialBackOffOnError Enabled
254-
existingOpIndex, getOpErr := grm.getOperation(volumeName, podName)
239+
existingOpIndex, getOpErr := grm.getOperation(key)
255240
if getOpErr != nil {
256241
// Failed to find existing operation
257-
logOperationKey := getOperationKey(volumeName, podName)
258242
klog.Errorf("Operation %s completed. error: %v. exponentialBackOffOnError is enabled, but failed to get operation to update.",
259-
logOperationKey,
243+
key,
260244
*err)
261245
return
262246
}
@@ -265,10 +249,8 @@ func (grm *nestedPendingOperations) operationComplete(
265249
grm.operations[existingOpIndex].operationPending = false
266250

267251
// Log error
268-
operationKey :=
269-
getOperationKey(volumeName, podName)
270252
klog.Errorf("%v", grm.operations[existingOpIndex].expBackoff.
271-
GenerateNoRetriesPermittedMsg(operationKey))
253+
GenerateNoRetriesPermittedMsg(key.String()))
272254
}
273255

274256
func (grm *nestedPendingOperations) Wait() {
@@ -280,21 +262,22 @@ func (grm *nestedPendingOperations) Wait() {
280262
}
281263
}
282264

283-
func getOperationKey(
284-
volumeName v1.UniqueVolumeName, podName types.UniquePodName) string {
285-
podNameStr := ""
286-
if podName != EmptyUniquePodName {
287-
podNameStr = fmt.Sprintf(" (%q)", podName)
288-
}
265+
type operationKey struct {
266+
volumeName v1.UniqueVolumeName
267+
podName types.UniquePodName
268+
}
269+
270+
func (key operationKey) String() string {
271+
podNameStr := fmt.Sprintf(" (%q)", key.podName)
289272

290273
return fmt.Sprintf("%q%s",
291-
volumeName,
274+
key.volumeName,
292275
podNameStr)
293276
}
294277

295278
// NewAlreadyExistsError returns a new instance of AlreadyExists error.
296-
func NewAlreadyExistsError(operationKey string) error {
297-
return alreadyExistsError{operationKey}
279+
func NewAlreadyExistsError(key operationKey) error {
280+
return alreadyExistsError{key}
298281
}
299282

300283
// IsAlreadyExists returns true if an error returned from
@@ -313,7 +296,7 @@ func IsAlreadyExists(err error) bool {
313296
// new operation can not be started because an operation with the same operation
314297
// name is already executing.
315298
type alreadyExistsError struct {
316-
operationKey string
299+
operationKey operationKey
317300
}
318301

319302
var _ error = alreadyExistsError{}

0 commit comments

Comments
 (0)