@@ -78,14 +78,16 @@ var (
78
78
operationRequestStoreInstance * operationRequestStore
79
79
operationStoreInitLock = & sync.Mutex {}
80
80
isPodVMOnStretchSupervisorFSSEnabled bool
81
+ isCSITransactionSupportEnabled bool
81
82
)
82
83
83
84
// InitVolumeOperationRequestInterface creates the CnsVolumeOperationRequest
84
85
// definition on the API server and returns an implementation of
85
86
// VolumeOperationRequest interface. Clients are unaware of the implementation
86
87
// details to read and persist volume operation details.
87
88
func InitVolumeOperationRequestInterface (ctx context.Context , cleanupInterval int ,
88
- isBlockVolumeSnapshotEnabled func () bool , isPodVMOnStretchSupervisorEnabled bool ) (
89
+ isBlockVolumeSnapshotEnabled func () bool , isPodVMOnStretchSupervisorEnabled bool ,
90
+ csiTransactionSupportEnabled bool ) (
89
91
VolumeOperationRequest , error ) {
90
92
log := logger .GetLogger (ctx )
91
93
csiNamespace = getCSINamespace ()
@@ -130,6 +132,8 @@ func InitVolumeOperationRequestInterface(ctx context.Context, cleanupInterval in
130
132
}
131
133
// Store PodVMOnStretchedSupervisor FSS value for later use.
132
134
isPodVMOnStretchSupervisorFSSEnabled = isPodVMOnStretchSupervisorEnabled
135
+ // Store CSI Transaction Support FSS value for later use.
136
+ isCSITransactionSupportEnabled = csiTransactionSupportEnabled
133
137
134
138
return operationRequestStoreInstance , nil
135
139
}
@@ -261,6 +265,32 @@ func (or *operationRequestStore) StoreRequestDetails(
261
265
// Create a deep copy since we modify the object.
262
266
updatedInstance := instance .DeepCopy ()
263
267
268
+ // If CSI Transaction Support is enabled and we're storing a new InProgress operation with empty TaskID,
269
+ // mark any existing InProgress entries as TrackingAborted since this indicates a retry scenario.
270
+ if isCSITransactionSupportEnabled && operationDetailsToStore .TaskStatus == TaskInvocationStatusInProgress &&
271
+ operationDetailsToStore .TaskID == "" {
272
+ // This is a new operation attempt (Phase 1: Intent Registration)
273
+ // Mark any existing InProgress entries as TrackingAborted since this is clearly a retry
274
+ for index := range updatedInstance .Status .LatestOperationDetails {
275
+ existingOp := & updatedInstance .Status .LatestOperationDetails [index ]
276
+ if existingOp .TaskStatus == TaskInvocationStatusInProgress {
277
+ // This is a retry - mark the previous attempt as aborted
278
+ existingOp .TaskStatus = TaskInvocationStatusTrackingAborted
279
+ existingOp .Error = "Operation tracking aborted due to retry attempt"
280
+ log .Infof ("Marked previous InProgress operation as TrackingAborted due to retry detection. Instance: %s" ,
281
+ operationToStore .Name )
282
+ }
283
+ }
284
+
285
+ // Also check FirstOperationDetails
286
+ if updatedInstance .Status .FirstOperationDetails .TaskStatus == TaskInvocationStatusInProgress {
287
+ updatedInstance .Status .FirstOperationDetails .TaskStatus = TaskInvocationStatusTrackingAborted
288
+ updatedInstance .Status .FirstOperationDetails .Error = "Operation tracking aborted due to retry attempt"
289
+ log .Infof ("Marked FirstOperationDetails as TrackingAborted due to retry detection. Instance: %s" ,
290
+ operationToStore .Name )
291
+ }
292
+ }
293
+
264
294
// Modify VolumeID, SnapshotID and Capacity
265
295
updatedInstance .Status .VolumeID = operationToStore .VolumeID
266
296
updatedInstance .Status .SnapshotID = operationToStore .SnapshotID
@@ -409,6 +439,12 @@ func (or *operationRequestStore) cleanupStaleInstances(cleanupInterval int) {
409
439
}
410
440
}
411
441
442
+ // SetCSITransactionSupport sets the CSI Transaction Support feature flag.
443
+ // This function allows runtime modification of the isCSITransactionSupportEnabled variable.
444
+ func SetCSITransactionSupport (enabled bool ) {
445
+ isCSITransactionSupportEnabled = enabled
446
+ }
447
+
412
448
func getCSINamespace () string {
413
449
csiNamespace := os .Getenv (EnvCSINamespace )
414
450
if strings .TrimSpace (csiNamespace ) == "" {
0 commit comments