Skip to content

Commit 07e65da

Browse files
authored
Merge pull request kubernetes#130417 from serathius/watchcache-compact
Separate compactWatchCache from compactStore
2 parents 7c78041 + 15cb82b commit 07e65da

File tree

4 files changed

+37
-28
lines changed

4 files changed

+37
-28
lines changed

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ func TestList(t *testing.T) {
182182
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, consistentRead)
183183
ctx, cacher, server, terminate := testSetupWithEtcdServer(t)
184184
t.Cleanup(terminate)
185-
storagetesting.RunTestList(ctx, t, cacher, compactStorage(cacher, server.V3Client.Client), true)
185+
storagetesting.RunTestList(ctx, t, cacher, increaseRV(server.V3Client.Client), true)
186186
})
187187
}
188188
}
@@ -192,7 +192,7 @@ func TestConsistentList(t *testing.T) {
192192
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, consistentRead)
193193
ctx, cacher, server, terminate := testSetupWithEtcdServer(t)
194194
t.Cleanup(terminate)
195-
storagetesting.RunTestConsistentList(ctx, t, cacher, compactStorage(cacher, server.V3Client.Client), true, consistentRead)
195+
storagetesting.RunTestConsistentList(ctx, t, cacher, increaseRV(server.V3Client.Client), true, consistentRead)
196196
})
197197
}
198198
}
@@ -203,7 +203,7 @@ func TestGetListNonRecursive(t *testing.T) {
203203
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, consistentRead)
204204
ctx, cacher, server, terminate := testSetupWithEtcdServer(t)
205205
t.Cleanup(terminate)
206-
storagetesting.RunTestGetListNonRecursive(ctx, t, compactStorage(cacher, server.V3Client.Client), cacher)
206+
storagetesting.RunTestGetListNonRecursive(ctx, t, increaseRV(server.V3Client.Client), cacher)
207207
})
208208
}
209209
}
@@ -297,7 +297,7 @@ func TestWatch(t *testing.T) {
297297
func TestWatchFromZero(t *testing.T) {
298298
ctx, cacher, server, terminate := testSetupWithEtcdServer(t)
299299
t.Cleanup(terminate)
300-
storagetesting.RunTestWatchFromZero(ctx, t, cacher, compactStorage(cacher, server.V3Client.Client))
300+
storagetesting.RunTestWatchFromZero(ctx, t, cacher, compactWatchCache(cacher, server.V3Client.Client))
301301
}
302302

303303
func TestDeleteTriggerWatch(t *testing.T) {

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_testing_utils_test.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ func computePodKey(obj *example.Pod) string {
7878
return fmt.Sprintf("/pods/%s/%s", obj.Namespace, obj.Name)
7979
}
8080

81-
func compactStorage(c *CacheDelegator, client *clientv3.Client) storagetesting.Compaction {
81+
func compactWatchCache(c *CacheDelegator, client *clientv3.Client) storagetesting.Compaction {
8282
return func(ctx context.Context, t *testing.T, resourceVersion string) {
8383
versioner := storage.APIObjectVersioner{}
8484
rv, err := versioner.ParseResourceVersion(resourceVersion)
@@ -117,12 +117,16 @@ func compactStorage(c *CacheDelegator, client *clientv3.Client) storagetesting.C
117117
c.cacher.watchCache.startIndex++
118118
}
119119
c.cacher.watchCache.listResourceVersion = rv
120-
121-
if _, err = client.KV.Put(ctx, "compact_rev_key", resourceVersion); err != nil {
122-
t.Fatalf("Could not update compact_rev_key: %v", err)
123-
}
124-
if _, err = client.Compact(ctx, int64(rv)); err != nil {
120+
if _, err := client.Compact(ctx, int64(rv)); err != nil {
125121
t.Fatalf("Could not compact: %v", err)
126122
}
127123
}
128124
}
125+
126+
func increaseRV(client *clientv3.Client) storagetesting.IncreaseRVFunc {
127+
return func(ctx context.Context, t *testing.T) {
128+
if _, err := client.KV.Put(ctx, "increaseRV", "ok"); err != nil {
129+
t.Fatalf("Could not update increaseRV: %v", err)
130+
}
131+
}
132+
}

staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ func TestListPaging(t *testing.T) {
172172

173173
func TestGetListNonRecursive(t *testing.T) {
174174
ctx, store, client := testSetup(t)
175-
storagetesting.RunTestGetListNonRecursive(ctx, t, compactStorage(client.Client), store)
175+
storagetesting.RunTestGetListNonRecursive(ctx, t, increaseRV(client.Client), store)
176176
}
177177

178178
func TestGetListRecursivePrefix(t *testing.T) {
@@ -249,12 +249,12 @@ func TestTransformationFailure(t *testing.T) {
249249

250250
func TestList(t *testing.T) {
251251
ctx, store, client := testSetup(t)
252-
storagetesting.RunTestList(ctx, t, store, compactStorage(client.Client), false)
252+
storagetesting.RunTestList(ctx, t, store, increaseRV(client.Client), false)
253253
}
254254

255255
func TestConsistentList(t *testing.T) {
256256
ctx, store, client := testSetup(t)
257-
storagetesting.RunTestConsistentList(ctx, t, store, compactStorage(client.Client), false, true)
257+
storagetesting.RunTestConsistentList(ctx, t, store, increaseRV(client.Client), false, true)
258258
}
259259

260260
func checkStorageCallsInvariants(transformer *storagetesting.PrefixTransformer, recorder *clientRecorder) storagetesting.CallsValidation {
@@ -313,19 +313,27 @@ func TestNamespaceScopedList(t *testing.T) {
313313
storagetesting.RunTestNamespaceScopedList(ctx, t, store)
314314
}
315315

316-
func compactStorage(etcdClient *clientv3.Client) storagetesting.Compaction {
316+
func compactStorage(client *clientv3.Client) storagetesting.Compaction {
317317
return func(ctx context.Context, t *testing.T, resourceVersion string) {
318318
versioner := storage.APIObjectVersioner{}
319319
rv, err := versioner.ParseResourceVersion(resourceVersion)
320320
if err != nil {
321321
t.Fatal(err)
322322
}
323-
if _, _, err = compact(ctx, etcdClient, 0, int64(rv)); err != nil {
323+
if _, err = client.Compact(ctx, int64(rv)); err != nil {
324324
t.Fatalf("Unable to compact, %v", err)
325325
}
326326
}
327327
}
328328

329+
func increaseRV(client *clientv3.Client) storagetesting.IncreaseRVFunc {
330+
return func(ctx context.Context, t *testing.T) {
331+
if _, err := client.KV.Put(ctx, "increaseRV", "ok"); err != nil {
332+
t.Fatalf("Could not update increaseRV: %v", err)
333+
}
334+
}
335+
}
336+
329337
func TestListInconsistentContinuation(t *testing.T) {
330338
ctx, store, client := testSetup(t)
331339
storagetesting.RunTestListInconsistentContinuation(ctx, t, store, compactStorage(client.Client))

staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -622,7 +622,7 @@ func RunTestPreconditionalDeleteWithOnlySuggestionPass(ctx context.Context, t *t
622622
expectNoDiff(t, "incorrect pod:", updatedPod, out)
623623
}
624624

625-
func RunTestList(ctx context.Context, t *testing.T, store storage.Interface, compaction Compaction, ignoreWatchCacheTests bool) {
625+
func RunTestList(ctx context.Context, t *testing.T, store storage.Interface, increaseRV IncreaseRVFunc, ignoreWatchCacheTests bool) {
626626
initialRV, createdPods, updatedPod, err := seedMultiLevelData(ctx, store)
627627
if err != nil {
628628
t.Fatal(err)
@@ -648,10 +648,8 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface, com
648648
pod := obj.(*example.Pod)
649649
return nil, fields.Set{"metadata.name": pod.Name, "spec.nodeName": pod.Spec.NodeName}, nil
650650
}
651-
// Use compact to increase etcd global revision without changes to any resources.
652-
// The increase in resources version comes from Kubernetes compaction updating hidden key.
653-
// Used to test consistent List to confirm it returns latest etcd revision.
654-
compaction(ctx, t, initialRV)
651+
// Increase RV to test consistent List.
652+
increaseRV(ctx, t)
655653
currentRV := fmt.Sprintf("%d", continueRV+1)
656654

657655
tests := []struct {
@@ -1591,15 +1589,15 @@ func ExpectContinueMatches(t *testing.T, expect, got string) {
15911589
t.Errorf("expected continue token: %s, got: %s", expectDecoded, gotDecoded)
15921590
}
15931591

1594-
func RunTestConsistentList(ctx context.Context, t *testing.T, store storage.Interface, compaction Compaction, cacheEnabled, consistentReadsSupported bool) {
1592+
func RunTestConsistentList(ctx context.Context, t *testing.T, store storage.Interface, increaseRV IncreaseRVFunc, cacheEnabled, consistentReadsSupported bool) {
15951593
outPod := &example.Pod{}
15961594
inPod := &example.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "default", Name: "foo"}}
15971595
err := store.Create(ctx, computePodKey(inPod), inPod, outPod, 0)
15981596
if err != nil {
15991597
t.Errorf("Unexpected error: %v", err)
16001598
}
16011599
lastObjecRV := outPod.ResourceVersion
1602-
compaction(ctx, t, outPod.ResourceVersion)
1600+
increaseRV(ctx, t)
16031601
parsedRV, _ := strconv.Atoi(outPod.ResourceVersion)
16041602
currentRV := fmt.Sprintf("%d", parsedRV+1)
16051603

@@ -1609,7 +1607,7 @@ func RunTestConsistentList(ctx context.Context, t *testing.T, store storage.Inte
16091607
}
16101608

16111609
secondNonConsistentReadRV := lastObjecRV
1612-
if consistentReadsSupported {
1610+
if !cacheEnabled || consistentReadsSupported {
16131611
secondNonConsistentReadRV = currentRV
16141612
}
16151613

@@ -1749,7 +1747,7 @@ func seedMultiLevelData(ctx context.Context, store storage.Interface) (initialRV
17491747
return initialRV, created, updated, nil
17501748
}
17511749

1752-
func RunTestGetListNonRecursive(ctx context.Context, t *testing.T, compaction Compaction, store storage.Interface) {
1750+
func RunTestGetListNonRecursive(ctx context.Context, t *testing.T, increaseRV IncreaseRVFunc, store storage.Interface) {
17531751
key, prevStoredObj := testPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "test-ns"}})
17541752
prevRV, _ := strconv.Atoi(prevStoredObj.ResourceVersion)
17551753

@@ -1763,10 +1761,8 @@ func RunTestGetListNonRecursive(ctx context.Context, t *testing.T, compaction Co
17631761
t.Fatalf("update failed: %v", err)
17641762
}
17651763
objRV, _ := strconv.Atoi(storedObj.ResourceVersion)
1766-
// Use compact to increase etcd global revision without changes to any resources.
1767-
// The increase in resources version comes from Kubernetes compaction updating hidden key.
1768-
// Used to test consistent List to confirm it returns latest etcd revision.
1769-
compaction(ctx, t, prevStoredObj.ResourceVersion)
1764+
// Increase RV to test consistent List.
1765+
increaseRV(ctx, t)
17701766

17711767
tests := []struct {
17721768
name string
@@ -2320,6 +2316,7 @@ func RunTestListContinuationWithFilter(ctx context.Context, t *testing.T, store
23202316
}
23212317

23222318
type Compaction func(ctx context.Context, t *testing.T, resourceVersion string)
2319+
type IncreaseRVFunc func(ctx context.Context, t *testing.T)
23232320

23242321
func RunTestListInconsistentContinuation(ctx context.Context, t *testing.T, store storage.Interface, compaction Compaction) {
23252322
if compaction == nil {

0 commit comments

Comments
 (0)