Skip to content

Commit 0904d32

Browse files
committed
refresh probed plugins on init to avoid probe race/erroneous unmounts
Signed-off-by: Olga Shestopalova <[email protected]>
1 parent 74b9204 commit 0904d32

File tree

2 files changed

+66
-0
lines changed

2 files changed

+66
-0
lines changed

pkg/volume/plugins.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -605,6 +605,7 @@ func (pm *VolumePluginMgr) InitPlugins(plugins []VolumePlugin, prober DynamicPlu
605605
pm.plugins[name] = plugin
606606
klog.V(1).InfoS("Loaded volume plugin", "pluginName", name)
607607
}
608+
pm.refreshProbedPlugins()
608609
return utilerrors.NewAggregate(allErrs)
609610
}
610611

pkg/volume/plugins_test.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,16 @@ limitations under the License.
1717
package volume
1818

1919
import (
20+
"sync"
21+
"sync/atomic"
2022
"testing"
2123

2224
v1 "k8s.io/api/core/v1"
2325
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2426
"k8s.io/apimachinery/pkg/types"
27+
28+
"github.com/stretchr/testify/assert"
29+
"github.com/stretchr/testify/require"
2530
)
2631

2732
const testPluginName = "kubernetes.io/testPlugin"
@@ -165,3 +170,63 @@ func Test_ValidatePodTemplate(t *testing.T) {
165170
t.Errorf("isPodTemplateValid(%v) returned (%v), want (%v)", pod.String(), got, "Error: pod specification does not contain any volume(s).")
166171
}
167172
}
173+
174+
// TestVolumePluginMultiThreaded tests FindPluginByName/FindPluginBySpec in a multi-threaded environment.
175+
// If these are called by different threads at the same time, they should still be able to reconcile the plugins
176+
// and return the same results (no missing plugin)
177+
func TestVolumePluginMultiThreaded(t *testing.T) {
178+
vpm := VolumePluginMgr{}
179+
var prober DynamicPluginProber = &fakeProber{events: []ProbeEvent{{PluginName: testPluginName, Op: ProbeAddOrUpdate, Plugin: &testPlugins{}}}}
180+
err := vpm.InitPlugins([]VolumePlugin{}, prober, nil)
181+
require.NoError(t, err)
182+
183+
volumeSpec := &Spec{}
184+
totalErrors := atomic.Int32{}
185+
var wg sync.WaitGroup
186+
187+
for i := 0; i < 100; i++ {
188+
wg.Add(1)
189+
go func() {
190+
defer wg.Done()
191+
_, err := vpm.FindPluginByName(testPluginName)
192+
if err != nil {
193+
totalErrors.Add(1)
194+
}
195+
}()
196+
}
197+
wg.Wait()
198+
199+
assert.Equal(t, int32(0), totalErrors.Load())
200+
totalErrors.Store(0)
201+
202+
for i := 0; i < 100; i++ {
203+
wg.Add(1)
204+
go func() {
205+
defer wg.Done()
206+
_, err := vpm.FindPluginBySpec(volumeSpec)
207+
if err != nil {
208+
totalErrors.Add(1)
209+
}
210+
}()
211+
}
212+
wg.Wait()
213+
214+
assert.Equal(t, int32(0), totalErrors.Load())
215+
}
216+
217+
type fakeProber struct {
218+
events []ProbeEvent
219+
firstExecution atomic.Bool
220+
}
221+
222+
func (prober *fakeProber) Init() error {
223+
prober.firstExecution.Store(true)
224+
return nil
225+
}
226+
227+
func (prober *fakeProber) Probe() (events []ProbeEvent, err error) {
228+
if prober.firstExecution.CompareAndSwap(true, false) {
229+
return prober.events, nil
230+
}
231+
return []ProbeEvent{}, nil
232+
}

0 commit comments

Comments
 (0)