Skip to content

Commit 0dbe7b7

Browse files
authored
Following enhancement made in the list-view task monitoring (#3584)
Before recreating a ListView, destroy the old ListView to avoid leaks. Sends error results to pending tasks without blocking - change made in reportErrorOnAllPendingTasks Uses non-blocking send for task result channel in processTaskUpdate TaskResult channel is now buffered (size 1) - change made in waitOnTask
1 parent 4bd811c commit 0dbe7b7

File tree

2 files changed

+30
-5
lines changed

2 files changed

+30
-5
lines changed

pkg/common/cns-lib/volume/listview.go

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -324,13 +324,23 @@ func (l *ListViewImpl) listenToTaskUpdates() {
324324
l.mu.Lock()
325325
log.Infof("acquired lock before re-creating listview")
326326
if recreateView {
327+
if l.listView != nil {
328+
destroyListviewErr := l.listView.Destroy(l.ctx)
329+
if destroyListviewErr != nil {
330+
// ignoring the error and re-creating the list view
331+
log.Errorf("failed to destroy listview object. err: %v", destroyListviewErr)
332+
} else {
333+
log.Info("successfully destroyed existing listview")
334+
}
335+
}
327336
log.Info("re-creating the listView object")
328337
err := l.createListView(l.ctx, nil)
329338
if err != nil {
330339
log.Errorf("failed to create a ListView. error: %+v", err)
331340
l.mu.Unlock()
332341
continue
333342
}
343+
log.Info("successfully created listview")
334344

335345
filter = getListViewWaitFilter(l.listView)
336346
l.waitForUpdatesContext, l.waitForUpdatesCancelFunc = context.WithCancel(context.Background())
@@ -395,12 +405,19 @@ func (l *ListViewImpl) listenToTaskUpdates() {
395405

396406
// reportErrorOnAllPendingTasks returns failure to all pending tasks in the map in case of vc failure
397407
func (l *ListViewImpl) reportErrorOnAllPendingTasks(err error) {
408+
log := logger.GetLogger(context.Background())
398409
for _, taskDetails := range l.taskMap.GetAll() {
399410
result := TaskResult{
400411
TaskInfo: nil,
401412
Err: err,
402413
}
403-
taskDetails.ResultCh <- result
414+
// Non-blocking send
415+
select {
416+
case taskDetails.ResultCh <- result:
417+
log.Infof("reported error for task %+v", taskDetails.Reference)
418+
default:
419+
log.Warnf("failed to report error for task %+v: channel blocked", taskDetails.Reference)
420+
}
404421
}
405422
}
406423

@@ -431,8 +448,16 @@ func (l *ListViewImpl) processTaskUpdate(prop types.PropertyChange) {
431448
result.TaskInfo = &taskInfo
432449
result.Err = nil
433450
}
434-
435-
taskDetails.ResultCh <- result
451+
// Use a non-blocking send to prevent deadlocks when multiple goroutines
452+
// try to send to the same channel (e.g., due to duplicate task updates from vSphere)
453+
select {
454+
case taskDetails.ResultCh <- result:
455+
log.Infof("Successfully sent task result for task %+v", taskInfo.Task)
456+
default:
457+
// Channel is full/blocked, which means another goroutine already sent the result
458+
// This can happen when vSphere sends duplicate task update events
459+
log.Warnf("result channel full for task %+v, ignoring duplicate update", taskInfo.Task)
460+
}
436461
}
437462

438463
// RemoveTasksMarkedForDeletion goes over the list of tasks in the map

pkg/common/cns-lib/volume/manager.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -827,9 +827,9 @@ func (m *defaultManager) waitOnTask(csiOpContext context.Context,
827827
return nil, err
828828
}
829829
}
830-
ch := make(chan TaskResult)
830+
ch := make(chan TaskResult, 1)
831831
err := m.listViewIf.AddTask(csiOpContext, taskMoRef, ch)
832-
if errors.Unwrap(err) == ErrListViewTaskAddition {
832+
if errors.Is(err, ErrListViewTaskAddition) {
833833
return nil, logger.LogNewErrorf(log, "%s. err: %v", listviewAdditionError, err)
834834
} else if err != nil {
835835
// in case the task is not found in VC, we are returning a ManagedObjectNotFound error wrapped as a soap fault

0 commit comments

Comments
 (0)