Skip to content

Commit d633e03

Browse files
authored
Merge pull request kubernetes#94002 from wojtek-t/fix_list_from_etcd
Fix bug for inconsistent lists served from etcd
2 parents 3b5aedc + a5b60c3 commit d633e03

File tree

2 files changed

+124
-5
lines changed

2 files changed

+124
-5
lines changed

staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -572,7 +572,7 @@ func (s *store) List(ctx context.Context, key string, opts storage.ListOptions,
572572
fromRV = &parsedRV
573573
}
574574

575-
var returnedRV, continueRV int64
575+
var returnedRV, continueRV, withRev int64
576576
var continueKey string
577577
switch {
578578
case s.pagingEnabled && len(pred.Continue) > 0:
@@ -593,7 +593,7 @@ func (s *store) List(ctx context.Context, key string, opts storage.ListOptions,
593593
// continueRV==0 is invalid.
594594
// If continueRV < 0, the request is for the latest resource version.
595595
if continueRV > 0 {
596-
options = append(options, clientv3.WithRev(continueRV))
596+
withRev = continueRV
597597
returnedRV = continueRV
598598
}
599599
case s.pagingEnabled && pred.Limit > 0:
@@ -604,11 +604,11 @@ func (s *store) List(ctx context.Context, key string, opts storage.ListOptions,
604604
// and returnedRV is then set to the revision we get from the etcd response.
605605
case metav1.ResourceVersionMatchExact:
606606
returnedRV = int64(*fromRV)
607-
options = append(options, clientv3.WithRev(returnedRV))
607+
withRev = returnedRV
608608
case "": // legacy case
609609
if *fromRV > 0 {
610610
returnedRV = int64(*fromRV)
611-
options = append(options, clientv3.WithRev(returnedRV))
611+
withRev = returnedRV
612612
}
613613
default:
614614
return fmt.Errorf("unknown ResourceVersionMatch value: %v", match)
@@ -625,7 +625,7 @@ func (s *store) List(ctx context.Context, key string, opts storage.ListOptions,
625625
// and returnedRV is then set to the revision we get from the etcd response.
626626
case metav1.ResourceVersionMatchExact:
627627
returnedRV = int64(*fromRV)
628-
options = append(options, clientv3.WithRev(returnedRV))
628+
withRev = returnedRV
629629
case "": // legacy case
630630
default:
631631
return fmt.Errorf("unknown ResourceVersionMatch value: %v", match)
@@ -634,6 +634,9 @@ func (s *store) List(ctx context.Context, key string, opts storage.ListOptions,
634634

635635
options = append(options, clientv3.WithPrefix())
636636
}
637+
if withRev != 0 {
638+
options = append(options, clientv3.WithRev(withRev))
639+
}
637640

638641
// loop until we have filled the requested limit from etcd or there are no more results
639642
var lastKey []byte
@@ -695,6 +698,10 @@ func (s *store) List(ctx context.Context, key string, opts storage.ListOptions,
695698
break
696699
}
697700
key = string(lastKey) + "\x00"
701+
if withRev == 0 {
702+
withRev = returnedRV
703+
options = append(options, clientv3.WithRev(withRev))
704+
}
698705
}
699706

700707
// instruct the client to begin querying from immediately after the last key we returned

staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1963,3 +1963,115 @@ func Test_growSlice(t *testing.T) {
19631963
})
19641964
}
19651965
}
1966+
1967+
// fancyTransformer creates next object on each call to
1968+
// TransformFromStorage call.
1969+
type fancyTransformer struct {
1970+
transformer value.Transformer
1971+
store *store
1972+
1973+
lock sync.Mutex
1974+
index int
1975+
}
1976+
1977+
func (t *fancyTransformer) TransformFromStorage(b []byte, ctx value.Context) ([]byte, bool, error) {
1978+
if err := t.createObject(); err != nil {
1979+
return nil, false, err
1980+
}
1981+
return t.transformer.TransformFromStorage(b, ctx)
1982+
}
1983+
1984+
func (t *fancyTransformer) TransformToStorage(b []byte, ctx value.Context) ([]byte, error) {
1985+
return t.transformer.TransformToStorage(b, ctx)
1986+
}
1987+
1988+
func (t *fancyTransformer) createObject() error {
1989+
t.lock.Lock()
1990+
defer t.lock.Unlock()
1991+
1992+
t.index++
1993+
key := fmt.Sprintf("pod-%d", t.index)
1994+
obj := &example.Pod{
1995+
ObjectMeta: metav1.ObjectMeta{
1996+
Name: key,
1997+
Labels: map[string]string{
1998+
"even": strconv.FormatBool(t.index%2 == 0),
1999+
},
2000+
},
2001+
}
2002+
out := &example.Pod{}
2003+
return t.store.Create(context.TODO(), key, obj, out, 0)
2004+
}
2005+
2006+
func TestConsistentList(t *testing.T) {
2007+
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
2008+
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
2009+
defer cluster.Terminate(t)
2010+
2011+
transformer := &fancyTransformer{
2012+
transformer: &prefixTransformer{prefix: []byte(defaultTestPrefix)},
2013+
}
2014+
store := newStore(cluster.RandClient(), true, codec, "", transformer)
2015+
transformer.store = store
2016+
2017+
for i := 0; i < 5; i++ {
2018+
if err := transformer.createObject(); err != nil {
2019+
t.Fatalf("failed to create object: %v", err)
2020+
}
2021+
}
2022+
2023+
getAttrs := func(obj runtime.Object) (labels.Set, fields.Set, error) {
2024+
pod, ok := obj.(*example.Pod)
2025+
if !ok {
2026+
return nil, nil, fmt.Errorf("invalid object")
2027+
}
2028+
return labels.Set(pod.Labels), nil, nil
2029+
}
2030+
predicate := storage.SelectionPredicate{
2031+
Label: labels.Set{"even": "true"}.AsSelector(),
2032+
GetAttrs: getAttrs,
2033+
Limit: 4,
2034+
}
2035+
2036+
result1 := example.PodList{}
2037+
if err := store.List(context.TODO(), "/", storage.ListOptions{Predicate: predicate}, &result1); err != nil {
2038+
t.Fatalf("failed to list objects: %v", err)
2039+
}
2040+
2041+
// List objects from the returned resource version.
2042+
options := storage.ListOptions{
2043+
Predicate: predicate,
2044+
ResourceVersion: result1.ResourceVersion,
2045+
ResourceVersionMatch: metav1.ResourceVersionMatchExact,
2046+
}
2047+
2048+
result2 := example.PodList{}
2049+
if err := store.List(context.TODO(), "/", options, &result2); err != nil {
2050+
t.Fatalf("failed to list objects: %v", err)
2051+
}
2052+
2053+
if !reflect.DeepEqual(result1, result2) {
2054+
t.Errorf("inconsistent lists: %#v, %#v", result1, result2)
2055+
}
2056+
2057+
// Now also verify the ResourceVersionMatchNotOlderThan.
2058+
options.ResourceVersionMatch = metav1.ResourceVersionMatchNotOlderThan
2059+
2060+
result3 := example.PodList{}
2061+
if err := store.List(context.TODO(), "/", options, &result3); err != nil {
2062+
t.Fatalf("failed to list objects: %v", err)
2063+
}
2064+
2065+
options.ResourceVersion = result3.ResourceVersion
2066+
options.ResourceVersionMatch = metav1.ResourceVersionMatchExact
2067+
2068+
result4 := example.PodList{}
2069+
if err := store.List(context.TODO(), "/", options, &result4); err != nil {
2070+
t.Fatalf("failed to list objects: %v", err)
2071+
}
2072+
2073+
if !reflect.DeepEqual(result3, result4) {
2074+
t.Errorf("inconsistent lists: %#v, %#v", result3, result4)
2075+
}
2076+
2077+
}

0 commit comments

Comments
 (0)