Skip to content

Commit 0746672

Browse files
committed
fix plugin probe init race causing erroneous volume unmounts
Signed-off-by: Olga Shestopalova <[email protected]>
1 parent 4b33029 commit 0746672

File tree

4 files changed

+127
-6
lines changed

4 files changed

+127
-6
lines changed

pkg/volume/flexvolume/probe.go

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import (
3333
)
3434

3535
type flexVolumeProber struct {
36-
mutex sync.Mutex
36+
mutex sync.RWMutex
3737
pluginDir string // Flexvolume driver directory
3838
runner exec.Interface // Interface to use for execing flex calls
3939
watcher utilfs.FSWatcher
@@ -71,11 +71,20 @@ func (prober *flexVolumeProber) Init() error {
7171
// If probeAllNeeded is true, probe all pluginDir
7272
// else probe events in eventsMap
7373
func (prober *flexVolumeProber) Probe() (events []volume.ProbeEvent, err error) {
74+
prober.mutex.RLock()
7475
if prober.probeAllNeeded {
75-
prober.testAndSetProbeAllNeeded(false)
76-
return prober.probeAll()
76+
prober.mutex.RUnlock()
77+
prober.mutex.Lock()
78+
// check again, if multiple readers got through the first if, only one should probeAll
79+
if prober.probeAllNeeded {
80+
events, err = prober.probeAll()
81+
prober.probeAllNeeded = false
82+
prober.mutex.Unlock()
83+
return
84+
}
85+
prober.mutex.Unlock()
7786
}
78-
87+
prober.mutex.RUnlock()
7988
return prober.probeMap()
8089
}
8190

pkg/volume/flexvolume/probe_test.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ import (
2121
"path/filepath"
2222
goruntime "runtime"
2323
"strings"
24+
"sync"
25+
"sync/atomic"
2426
"testing"
2527

2628
"github.com/fsnotify/fsnotify"
@@ -327,6 +329,47 @@ func TestProberSuccessAndError(t *testing.T) {
327329
assert.Error(t, err)
328330
}
329331

332+
// TestProberMultiThreaded tests the code path of many callers calling FindPluginBySpec/FindPluginByName
333+
// which then calls refreshProbedPlugins which then calls prober.Probe() and ensures that the prober is thread safe
334+
func TestProberMultiThreaded(t *testing.T) {
335+
// Arrange
336+
_, _, _, prober := initTestEnvironment(t)
337+
totalEvents := atomic.Int32{}
338+
totalErrors := atomic.Int32{}
339+
pluginNameMutex := sync.RWMutex{}
340+
var pluginName string
341+
var wg sync.WaitGroup
342+
343+
// Act
344+
for i := 0; i < 100; i++ {
345+
go func() {
346+
defer wg.Done()
347+
events, err := prober.Probe()
348+
for _, event := range events {
349+
if event.Op == volume.ProbeAddOrUpdate {
350+
pluginNameMutex.Lock()
351+
pluginName = event.Plugin.GetPluginName()
352+
pluginNameMutex.Unlock()
353+
}
354+
}
355+
// this fails if ProbeAll is not complete before the next call comes in but we have assumed that it has
356+
pluginNameMutex.RLock()
357+
assert.Equal(t, "fake-driver", pluginName)
358+
pluginNameMutex.RUnlock()
359+
totalEvents.Add(int32(len(events)))
360+
if err != nil {
361+
totalErrors.Add(1)
362+
}
363+
}()
364+
wg.Add(1)
365+
}
366+
wg.Wait()
367+
368+
// Assert
369+
assert.Equal(t, int32(1), totalEvents.Load())
370+
assert.Equal(t, int32(0), totalErrors.Load())
371+
}
372+
330373
// Installs a mock driver (an empty file) in the mock fs.
331374
func installDriver(driverName string, fs utilfs.Filesystem) {
332375
driverPath := filepath.Join(pluginDir, driverName)

pkg/volume/plugins.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -627,6 +627,7 @@ func (pm *VolumePluginMgr) initProbedPlugin(probedPlugin VolumePlugin) error {
627627
// specification. If no plugins can support or more than one plugin can
628628
// support it, return error.
629629
func (pm *VolumePluginMgr) FindPluginBySpec(spec *Spec) (VolumePlugin, error) {
630+
pm.refreshProbedPlugins()
630631
pm.mutex.RLock()
631632
defer pm.mutex.RUnlock()
632633

@@ -643,7 +644,6 @@ func (pm *VolumePluginMgr) FindPluginBySpec(spec *Spec) (VolumePlugin, error) {
643644
}
644645
}
645646

646-
pm.refreshProbedPlugins()
647647
for _, plugin := range pm.probedPlugins {
648648
if plugin.CanSupport(spec) {
649649
match = plugin
@@ -663,6 +663,7 @@ func (pm *VolumePluginMgr) FindPluginBySpec(spec *Spec) (VolumePlugin, error) {
663663

664664
// FindPluginByName fetches a plugin by name. If no plugin is found, returns error.
665665
func (pm *VolumePluginMgr) FindPluginByName(name string) (VolumePlugin, error) {
666+
pm.refreshProbedPlugins()
666667
pm.mutex.RLock()
667668
defer pm.mutex.RUnlock()
668669

@@ -671,7 +672,6 @@ func (pm *VolumePluginMgr) FindPluginByName(name string) (VolumePlugin, error) {
671672
match = v
672673
}
673674

674-
pm.refreshProbedPlugins()
675675
if plugin, found := pm.probedPlugins[name]; found {
676676
if match != nil {
677677
return nil, fmt.Errorf("multiple volume plugins matched: %s and %s", match.GetPluginName(), plugin.GetPluginName())
@@ -694,6 +694,12 @@ func (pm *VolumePluginMgr) refreshProbedPlugins() {
694694
klog.ErrorS(err, "Error dynamically probing plugins")
695695
}
696696

697+
if len(events) == 0 {
698+
return
699+
}
700+
701+
pm.mutex.Lock()
702+
defer pm.mutex.Unlock()
697703
// because the probe function can return a list of valid plugins
698704
// even when an error is present we still must add the plugins
699705
// or they will be skipped because each event only fires once

pkg/volume/plugins_test.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@ limitations under the License.
1717
package volume
1818

1919
import (
20+
"github.com/stretchr/testify/assert"
21+
"github.com/stretchr/testify/require"
22+
"sync"
23+
"sync/atomic"
2024
"testing"
2125

2226
v1 "k8s.io/api/core/v1"
@@ -165,3 +169,62 @@ func Test_ValidatePodTemplate(t *testing.T) {
165169
t.Errorf("isPodTemplateValid(%v) returned (%v), want (%v)", pod.String(), got, "Error: pod specification does not contain any volume(s).")
166170
}
167171
}
172+
173+
// TestVolumePluginMultiThreaded tests FindPluginByName/FindPluginBySpec in a multi-threaded environment.
174+
// If these are called by different threads at the same time, they should still be able to reconcile the plugins
175+
// and return the same results (no missing plugin)
176+
func TestVolumePluginMultiThreaded(t *testing.T) {
177+
vpm := VolumePluginMgr{}
178+
var prober DynamicPluginProber = &fakeProber{events: []ProbeEvent{{PluginName: testPluginName, Op: ProbeAddOrUpdate, Plugin: &testPlugins{}}}}
179+
err := vpm.InitPlugins([]VolumePlugin{}, prober, nil)
180+
require.NoError(t, err)
181+
182+
volumeSpec := &Spec{}
183+
totalErrors := atomic.Int32{}
184+
var wg sync.WaitGroup
185+
186+
for i := 0; i < 100; i++ {
187+
go func() {
188+
defer wg.Done()
189+
_, err := vpm.FindPluginByName(testPluginName)
190+
if err != nil {
191+
totalErrors.Add(1)
192+
}
193+
}()
194+
wg.Add(1)
195+
}
196+
wg.Wait()
197+
198+
assert.Equal(t, int32(0), totalErrors.Load())
199+
200+
for i := 0; i < 100; i++ {
201+
go func() {
202+
defer wg.Done()
203+
_, err := vpm.FindPluginBySpec(volumeSpec)
204+
if err != nil {
205+
totalErrors.Add(1)
206+
}
207+
}()
208+
wg.Add(1)
209+
}
210+
wg.Wait()
211+
212+
assert.Equal(t, int32(0), totalErrors.Load())
213+
}
214+
215+
type fakeProber struct {
216+
events []ProbeEvent
217+
firstExecution atomic.Bool
218+
}
219+
220+
func (prober *fakeProber) Init() error {
221+
prober.firstExecution.Store(true)
222+
return nil
223+
}
224+
225+
func (prober *fakeProber) Probe() (events []ProbeEvent, err error) {
226+
if prober.firstExecution.CompareAndSwap(true, false) {
227+
return prober.events, nil
228+
}
229+
return []ProbeEvent{}, nil
230+
}

0 commit comments

Comments
 (0)