Skip to content

Commit 002c754

Browse files
committed
storage: confirm that paging and predicate filtering work together
This change adds the TestListContinuationWithFilter test which confirms that paging with a predicate that does not match everything results in the correct amount of calls to TransformFromStorage and KV.Get. The partial result of each paging call is also asserted. Signed-off-by: Monis Khan <[email protected]>
1 parent a9c7529 commit 002c754

File tree

2 files changed

+182
-17
lines changed

2 files changed

+182
-17
lines changed

staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go

Lines changed: 180 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"strconv"
2828
"strings"
2929
"sync"
30+
"sync/atomic"
3031
"testing"
3132

3233
"github.com/coreos/pkg/capnslog"
@@ -71,9 +72,11 @@ type prefixTransformer struct {
7172
prefix []byte
7273
stale bool
7374
err error
75+
reads uint64
7476
}
7577

76-
func (p prefixTransformer) TransformFromStorage(b []byte, ctx value.Context) ([]byte, bool, error) {
78+
func (p *prefixTransformer) TransformFromStorage(b []byte, ctx value.Context) ([]byte, bool, error) {
79+
atomic.AddUint64(&p.reads, 1)
7780
if ctx == nil {
7881
panic("no context provided")
7982
}
@@ -82,7 +85,7 @@ func (p prefixTransformer) TransformFromStorage(b []byte, ctx value.Context) ([]
8285
}
8386
return bytes.TrimPrefix(b, p.prefix), p.stale, p.err
8487
}
85-
func (p prefixTransformer) TransformToStorage(b []byte, ctx value.Context) ([]byte, error) {
88+
func (p *prefixTransformer) TransformToStorage(b []byte, ctx value.Context) ([]byte, error) {
8689
if ctx == nil {
8790
panic("no context provided")
8891
}
@@ -92,6 +95,10 @@ func (p prefixTransformer) TransformToStorage(b []byte, ctx value.Context) ([]by
9295
return b, p.err
9396
}
9497

98+
func (p *prefixTransformer) resetReads() {
99+
p.reads = 0
100+
}
101+
95102
func TestCreate(t *testing.T) {
96103
ctx, store, cluster := testSetup(t)
97104
defer cluster.Terminate(t)
@@ -502,11 +509,11 @@ func TestGuaranteedUpdate(t *testing.T) {
502509
if tt.expectNoUpdate {
503510
name = storeObj.Name
504511
}
505-
originalTransformer := store.transformer.(prefixTransformer)
512+
originalTransformer := store.transformer.(*prefixTransformer)
506513
if tt.transformStale {
507-
transformer := originalTransformer
514+
transformer := *originalTransformer
508515
transformer.stale = true
509-
store.transformer = transformer
516+
store.transformer = &transformer
510517
}
511518
version := storeObj.ResourceVersion
512519
err := store.GuaranteedUpdate(ctx, tt.key, out, tt.ignoreNotFound, tt.precondition,
@@ -606,7 +613,7 @@ func TestGuaranteedUpdateChecksStoredData(t *testing.T) {
606613
t.Fatal(err)
607614
}
608615

609-
store.transformer = prefixTransformer{prefix: []byte(defaultTestPrefix)}
616+
store.transformer = &prefixTransformer{prefix: []byte(defaultTestPrefix)}
610617

611618
// this update should write the canonical value to etcd because the new serialization differs
612619
// from the stored serialization
@@ -639,7 +646,7 @@ func TestGuaranteedUpdateChecksStoredData(t *testing.T) {
639646
t.Errorf("guaranteed update should have short-circuited write, got %#v", out)
640647
}
641648

642-
store.transformer = prefixTransformer{prefix: []byte(defaultTestPrefix), stale: true}
649+
store.transformer = &prefixTransformer{prefix: []byte(defaultTestPrefix), stale: true}
643650

644651
// this update should write to etcd because the transformer reported stale
645652
err = store.GuaranteedUpdate(ctx, key, out, true, nil,
@@ -752,7 +759,7 @@ func TestTransformationFailure(t *testing.T) {
752759
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
753760
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
754761
defer cluster.Terminate(t)
755-
store := newStore(cluster.RandClient(), false, codec, "", prefixTransformer{prefix: []byte(defaultTestPrefix)})
762+
store := newStore(cluster.RandClient(), false, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)})
756763
ctx := context.Background()
757764

758765
preset := []struct {
@@ -782,7 +789,7 @@ func TestTransformationFailure(t *testing.T) {
782789

783790
// create a second resource with an invalid prefix
784791
oldTransformer := store.transformer
785-
store.transformer = prefixTransformer{prefix: []byte("otherprefix!")}
792+
store.transformer = &prefixTransformer{prefix: []byte("otherprefix!")}
786793
for i, ps := range preset[1:] {
787794
preset[1:][i].storedObj = &example.Pod{}
788795
err := store.Create(ctx, ps.key, ps.obj, preset[1:][i].storedObj, 0)
@@ -829,8 +836,8 @@ func TestList(t *testing.T) {
829836
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
830837
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
831838
defer cluster.Terminate(t)
832-
store := newStore(cluster.RandClient(), true, codec, "", prefixTransformer{prefix: []byte(defaultTestPrefix)})
833-
disablePagingStore := newStore(cluster.RandClient(), false, codec, "", prefixTransformer{prefix: []byte(defaultTestPrefix)})
839+
store := newStore(cluster.RandClient(), true, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)})
840+
disablePagingStore := newStore(cluster.RandClient(), false, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)})
834841
ctx := context.Background()
835842

836843
// Setup storage with the following structure:
@@ -1182,7 +1189,11 @@ func TestListContinuation(t *testing.T) {
11821189
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
11831190
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
11841191
defer cluster.Terminate(t)
1185-
store := newStore(cluster.RandClient(), true, codec, "", prefixTransformer{prefix: []byte(defaultTestPrefix)})
1192+
transformer := &prefixTransformer{prefix: []byte(defaultTestPrefix)}
1193+
etcdClient := cluster.RandClient()
1194+
recorder := &clientRecorder{KV: etcdClient.KV}
1195+
etcdClient.KV = recorder
1196+
store := newStore(etcdClient, true, codec, "", transformer)
11861197
ctx := context.Background()
11871198

11881199
// Setup storage with the following structure:
@@ -1247,6 +1258,14 @@ func TestListContinuation(t *testing.T) {
12471258
if len(out.Items) != 1 || !reflect.DeepEqual(&out.Items[0], preset[0].storedObj) {
12481259
t.Fatalf("Unexpected first page: %#v", out.Items)
12491260
}
1261+
if transformer.reads != 1 {
1262+
t.Errorf("unexpected reads: %d", transformer.reads)
1263+
}
1264+
if recorder.reads != 1 {
1265+
t.Errorf("unexpected reads: %d", recorder.reads)
1266+
}
1267+
transformer.resetReads()
1268+
recorder.resetReads()
12501269

12511270
continueFromSecondItem := out.Continue
12521271

@@ -1263,6 +1282,14 @@ func TestListContinuation(t *testing.T) {
12631282
t.Logf("continue token was %d %s %v", rv, key, err)
12641283
t.Fatalf("Unexpected second page: %#v", out.Items)
12651284
}
1285+
if transformer.reads != 2 {
1286+
t.Errorf("unexpected reads: %d", transformer.reads)
1287+
}
1288+
if recorder.reads != 1 {
1289+
t.Errorf("unexpected reads: %d", recorder.reads)
1290+
}
1291+
transformer.resetReads()
1292+
recorder.resetReads()
12661293

12671294
// limit, should get two more pages
12681295
out = &example.PodList{}
@@ -1275,7 +1302,17 @@ func TestListContinuation(t *testing.T) {
12751302
if len(out.Items) != 1 || !reflect.DeepEqual(&out.Items[0], preset[1].storedObj) {
12761303
t.Fatalf("Unexpected second page: %#v", out.Items)
12771304
}
1305+
if transformer.reads != 1 {
1306+
t.Errorf("unexpected reads: %d", transformer.reads)
1307+
}
1308+
if recorder.reads != 1 {
1309+
t.Errorf("unexpected reads: %d", recorder.reads)
1310+
}
1311+
transformer.resetReads()
1312+
recorder.resetReads()
1313+
12781314
continueFromThirdItem := out.Continue
1315+
12791316
out = &example.PodList{}
12801317
if err := store.List(ctx, "/", "0", pred(1, continueFromThirdItem), out); err != nil {
12811318
t.Fatalf("Unable to get second page: %v", err)
@@ -1286,14 +1323,142 @@ func TestListContinuation(t *testing.T) {
12861323
if len(out.Items) != 1 || !reflect.DeepEqual(&out.Items[0], preset[2].storedObj) {
12871324
t.Fatalf("Unexpected third page: %#v", out.Items)
12881325
}
1326+
if transformer.reads != 1 {
1327+
t.Errorf("unexpected reads: %d", transformer.reads)
1328+
}
1329+
if recorder.reads != 1 {
1330+
t.Errorf("unexpected reads: %d", recorder.reads)
1331+
}
1332+
transformer.resetReads()
1333+
recorder.resetReads()
1334+
}
1335+
1336+
type clientRecorder struct {
1337+
reads uint64
1338+
clientv3.KV
1339+
}
12891340

1341+
func (r *clientRecorder) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) {
1342+
atomic.AddUint64(&r.reads, 1)
1343+
return r.KV.Get(ctx, key, opts...)
1344+
}
1345+
1346+
func (r *clientRecorder) resetReads() {
1347+
r.reads = 0
1348+
}
1349+
1350+
func TestListContinuationWithFilter(t *testing.T) {
1351+
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
1352+
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
1353+
defer cluster.Terminate(t)
1354+
transformer := &prefixTransformer{prefix: []byte(defaultTestPrefix)}
1355+
etcdClient := cluster.RandClient()
1356+
recorder := &clientRecorder{KV: etcdClient.KV}
1357+
etcdClient.KV = recorder
1358+
store := newStore(etcdClient, true, codec, "", transformer)
1359+
ctx := context.Background()
1360+
1361+
preset := []struct {
1362+
key string
1363+
obj *example.Pod
1364+
storedObj *example.Pod
1365+
}{
1366+
{
1367+
key: "/1",
1368+
obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
1369+
},
1370+
{
1371+
key: "/2",
1372+
obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}, // this should not match
1373+
},
1374+
{
1375+
key: "/3",
1376+
obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
1377+
},
1378+
{
1379+
key: "/4",
1380+
obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
1381+
},
1382+
}
1383+
1384+
for i, ps := range preset {
1385+
preset[i].storedObj = &example.Pod{}
1386+
err := store.Create(ctx, ps.key, ps.obj, preset[i].storedObj, 0)
1387+
if err != nil {
1388+
t.Fatalf("Set failed: %v", err)
1389+
}
1390+
}
1391+
1392+
// the first list call should try to get 2 items from etcd (and only those items should be returned)
1393+
// the field selector should result in it reading 3 items via the transformer
1394+
// the chunking should result in 2 etcd Gets
1395+
// there should be a continueValue because there is more data
1396+
out := &example.PodList{}
1397+
pred := func(limit int64, continueValue string) storage.SelectionPredicate {
1398+
return storage.SelectionPredicate{
1399+
Limit: limit,
1400+
Continue: continueValue,
1401+
Label: labels.Everything(),
1402+
Field: fields.OneTermNotEqualSelector("metadata.name", "bar"),
1403+
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
1404+
pod := obj.(*example.Pod)
1405+
return nil, fields.Set{"metadata.name": pod.Name}, nil
1406+
},
1407+
}
1408+
}
1409+
if err := store.List(ctx, "/", "0", pred(2, ""), out); err != nil {
1410+
t.Errorf("Unable to get initial list: %v", err)
1411+
}
1412+
if len(out.Continue) == 0 {
1413+
t.Errorf("No continuation token set")
1414+
}
1415+
if len(out.Items) != 2 || !reflect.DeepEqual(&out.Items[0], preset[0].storedObj) || !reflect.DeepEqual(&out.Items[1], preset[2].storedObj) {
1416+
t.Errorf("Unexpected first page, len=%d: %#v", len(out.Items), out.Items)
1417+
}
1418+
if transformer.reads != 3 {
1419+
t.Errorf("unexpected reads: %d", transformer.reads)
1420+
}
1421+
if recorder.reads != 2 {
1422+
t.Errorf("unexpected reads: %d", recorder.reads)
1423+
}
1424+
transformer.resetReads()
1425+
recorder.resetReads()
1426+
1427+
// the rest of the test does not make sense if the previous call failed
1428+
if t.Failed() {
1429+
return
1430+
}
1431+
1432+
cont := out.Continue
1433+
1434+
// the second list call should try to get 2 more items from etcd
1435+
// but since there is only one item left, that is all we should get with no continueValue
1436+
// both read counters should be incremented for the singular calls they make in this case
1437+
out = &example.PodList{}
1438+
if err := store.List(ctx, "/", "0", pred(2, cont), out); err != nil {
1439+
t.Errorf("Unable to get second page: %v", err)
1440+
}
1441+
if len(out.Continue) != 0 {
1442+
t.Errorf("Unexpected continuation token set")
1443+
}
1444+
if len(out.Items) != 1 || !reflect.DeepEqual(&out.Items[0], preset[3].storedObj) {
1445+
t.Errorf("Unexpected second page, len=%d: %#v", len(out.Items), out.Items)
1446+
}
1447+
if transformer.reads != 1 {
1448+
t.Errorf("unexpected reads: %d", transformer.reads)
1449+
}
1450+
if recorder.reads != 1 {
1451+
t.Errorf("unexpected reads: %d", recorder.reads)
1452+
}
1453+
transformer.resetReads()
1454+
recorder.resetReads()
12901455
}
12911456

12921457
func TestListInconsistentContinuation(t *testing.T) {
12931458
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
12941459
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
12951460
defer cluster.Terminate(t)
1296-
store := newStore(cluster.RandClient(), true, codec, "", prefixTransformer{prefix: []byte(defaultTestPrefix)})
1461+
store := newStore(cluster.RandClient(), true, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)})
12971462
ctx := context.Background()
12981463

12991464
// Setup storage with the following structure:
@@ -1438,7 +1603,7 @@ func TestListInconsistentContinuation(t *testing.T) {
14381603
func testSetup(t *testing.T) (context.Context, *store, *integration.ClusterV3) {
14391604
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
14401605
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
1441-
store := newStore(cluster.RandClient(), true, codec, "", prefixTransformer{prefix: []byte(defaultTestPrefix)})
1606+
store := newStore(cluster.RandClient(), true, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)})
14421607
ctx := context.Background()
14431608
// As 30s is the default timeout for testing in glboal configuration,
14441609
// we cannot wait longer than that in a single time: change it to 10
@@ -1471,7 +1636,7 @@ func TestPrefix(t *testing.T) {
14711636
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
14721637
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
14731638
defer cluster.Terminate(t)
1474-
transformer := prefixTransformer{prefix: []byte(defaultTestPrefix)}
1639+
transformer := &prefixTransformer{prefix: []byte(defaultTestPrefix)}
14751640
testcases := map[string]string{
14761641
"custom/prefix": "/custom/prefix",
14771642
"/custom//prefix//": "/custom/prefix",

staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -225,13 +225,13 @@ func TestWatchError(t *testing.T) {
225225
codec := &testCodec{apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)}
226226
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
227227
defer cluster.Terminate(t)
228-
invalidStore := newStore(cluster.RandClient(), true, codec, "", prefixTransformer{prefix: []byte("test!")})
228+
invalidStore := newStore(cluster.RandClient(), true, codec, "", &prefixTransformer{prefix: []byte("test!")})
229229
ctx := context.Background()
230230
w, err := invalidStore.Watch(ctx, "/abc", "0", storage.Everything)
231231
if err != nil {
232232
t.Fatalf("Watch failed: %v", err)
233233
}
234-
validStore := newStore(cluster.RandClient(), true, codec, "", prefixTransformer{prefix: []byte("test!")})
234+
validStore := newStore(cluster.RandClient(), true, codec, "", &prefixTransformer{prefix: []byte("test!")})
235235
validStore.GuaranteedUpdate(ctx, "/abc", &example.Pod{}, true, nil, storage.SimpleUpdate(
236236
func(runtime.Object) (runtime.Object, error) {
237237
return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, nil

0 commit comments

Comments
 (0)