Skip to content

Commit a1cb0a2

Browse files
committed
cache: Add consistent read for cache
Signed-off-by: Alok Kumar Singh <dev.alok.singh123@gmail.com>
1 parent 84dc6e5 commit a1cb0a2

File tree

9 files changed

+1036
-171
lines changed

9 files changed

+1036
-171
lines changed

cache/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
11
# etcd cache
22

33
Experimental etcd client cache library.
4+
5+
**Note:** gRPC proxy is not supported. The cache relies on `RequestProgress` RPCs, which the gRPC proxy does not forward.

cache/cache.go

Lines changed: 86 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"time"
2424

2525
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
26+
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
2627
clientv3 "go.etcd.io/etcd/client/v3"
2728
)
2829

@@ -31,24 +32,33 @@ var (
3132
ErrUnsupportedRequest = errors.New("cache: unsupported request parameters")
3233
// Returned when the requested key or key‑range is invalid (empty or reversed) or lies outside c.prefix.
3334
ErrKeyRangeInvalid = errors.New("cache: invalid or out‑of‑range key range")
35+
// Returned when the cache timed out waiting for the requested revision
36+
ErrCacheTimeout = errors.New("cache: timed out waiting for revision")
3437
)
3538

3639
// Cache buffers a single etcd Watch for a given key‐prefix and fan‑outs local watchers.
40+
//
41+
// Note: gRPC proxy is not supported. Cache relies on RequestProgress RPCs,
42+
// which the gRPC proxy does not forward.
3743
type Cache struct {
38-
prefix string // prefix is the key-prefix this shard is responsible for ("" = root).
39-
cfg Config // immutable runtime configuration
40-
watcher clientv3.Watcher
41-
kv clientv3.KV
42-
demux *demux // demux fans incoming events out to active watchers and manages resync.
43-
store *store // last‑observed snapshot
44-
ready *ready
45-
stop context.CancelFunc
46-
waitGroup sync.WaitGroup
47-
internalCtx context.Context
44+
prefix string // prefix is the key-prefix this shard is responsible for ("" = root).
45+
cfg Config // immutable runtime configuration
46+
watcher clientv3.Watcher
47+
kv clientv3.KV
48+
demux *demux // demux fans incoming events out to active watchers and manages resync.
49+
store *store // last‑observed snapshot
50+
ready *ready
51+
stop context.CancelFunc
52+
waitGroup sync.WaitGroup
53+
internalCtx context.Context
54+
progressRequestor progressRequestor
4855
}
4956

5057
// New builds a cache shard that watches only the requested prefix.
5158
// For the root cache pass "".
59+
//
60+
// Note: gRPC proxy is not supported. Cache relies on RequestProgress RPCs,
61+
// which the gRPC proxy does not forward.
5262
func New(client *clientv3.Client, prefix string, opts ...Option) (*Cache, error) {
5363
cfg := defaultConfig()
5464
for _, opt := range opts {
@@ -65,23 +75,28 @@ func New(client *clientv3.Client, prefix string, opts ...Option) (*Cache, error)
6575
internalCtx, cancel := context.WithCancel(context.Background())
6676

6777
cache := &Cache{
68-
prefix: prefix,
69-
cfg: cfg,
70-
watcher: client.Watcher,
71-
kv: client.KV,
72-
store: newStore(cfg.BTreeDegree, cfg.HistoryWindowSize),
73-
ready: newReady(),
74-
stop: cancel,
75-
internalCtx: internalCtx,
78+
prefix: prefix,
79+
cfg: cfg,
80+
watcher: client.Watcher,
81+
kv: client.KV,
82+
store: newStore(cfg.BTreeDegree, cfg.HistoryWindowSize),
83+
ready: newReady(),
84+
stop: cancel,
85+
internalCtx: internalCtx,
86+
progressRequestor: newConditionalProgressRequestor(client.Watcher, realClock{}, cfg.ProgressRequestInterval),
7687
}
7788

7889
cache.demux = NewDemux(internalCtx, &cache.waitGroup, cfg.HistoryWindowSize, cfg.ResyncInterval)
7990

80-
cache.waitGroup.Add(1)
91+
cache.waitGroup.Add(2)
8192
go func() {
8293
defer cache.waitGroup.Done()
8394
cache.getWatchLoop()
8495
}()
96+
go func() {
97+
defer cache.waitGroup.Done()
98+
cache.progressRequestor.run(internalCtx)
99+
}()
85100

86101
return cache, nil
87102
}
@@ -161,6 +176,19 @@ func (c *Cache) Get(ctx context.Context, key string, opts ...clientv3.OpOption)
161176
endKey := op.RangeBytes()
162177
requestedRev := op.Rev()
163178

179+
if !op.IsSerializable() {
180+
serverRev, err := c.serverRevision(ctx)
181+
if err != nil {
182+
return nil, err
183+
}
184+
if requestedRev > serverRev {
185+
return nil, rpctypes.ErrFutureRev
186+
}
187+
if err = c.waitTillRevision(ctx, serverRev); err != nil {
188+
return nil, err
189+
}
190+
}
191+
164192
kvs, latestRev, err := c.store.Get(startKey, endKey, requestedRev)
165193
if err != nil {
166194
return nil, err
@@ -196,6 +224,45 @@ func (c *Cache) WaitForRevision(ctx context.Context, rev int64) error {
196224
}
197225
}
198226

227+
func (c *Cache) serverRevision(ctx context.Context) (int64, error) {
228+
key := c.prefix
229+
if key == "" {
230+
key = "/"
231+
}
232+
resp, err := c.kv.Get(ctx, key, clientv3.WithLimit(1), clientv3.WithCountOnly())
233+
if err != nil {
234+
return 0, err
235+
}
236+
return resp.Header.Revision, nil
237+
}
238+
239+
func (c *Cache) waitTillRevision(ctx context.Context, rev int64) error {
240+
if c.store.LatestRev() >= rev {
241+
return nil
242+
}
243+
244+
c.progressRequestor.add()
245+
defer c.progressRequestor.remove()
246+
247+
ticker := time.NewTicker(revisionPollInterval)
248+
defer ticker.Stop()
249+
timeout := time.After(c.cfg.WaitTimeout)
250+
251+
// TODO: rewrite from periodic polling to passive notification
252+
for {
253+
if c.store.LatestRev() >= rev {
254+
return nil
255+
}
256+
select {
257+
case <-ticker.C:
258+
case <-timeout:
259+
return ErrCacheTimeout
260+
case <-ctx.Done():
261+
return ctx.Err()
262+
}
263+
}
264+
}
265+
199266
// Close cancels the private context and blocks until all goroutines return.
200267
func (c *Cache) Close() {
201268
c.stop()
@@ -358,9 +425,6 @@ func (c *Cache) validateGet(key string, op clientv3.Op) (KeyPredicate, error) {
358425
return nil, fmt.Errorf("%w: MinCreateRev(%d) not supported", ErrUnsupportedRequest, op.MinCreateRev())
359426
case op.MaxCreateRev() != 0:
360427
return nil, fmt.Errorf("%w: MaxCreateRev(%d) not supported", ErrUnsupportedRequest, op.MaxCreateRev())
361-
// cache now only serves serializable reads of the latest revision (rev == 0).
362-
case !op.IsSerializable():
363-
return nil, fmt.Errorf("%w: non-serializable request", ErrUnsupportedRequest)
364428
}
365429

366430
startKey := []byte(key)

cache/cache_test.go

Lines changed: 178 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ package cache
1616

1717
import (
1818
"context"
19+
"errors"
20+
"fmt"
1921
"sync"
2022
"testing"
2123
"time"
@@ -501,6 +503,7 @@ type mockWatcher struct {
501503
wg sync.WaitGroup
502504
mu sync.Mutex
503505
lastStartRev int64
506+
progressErr error
504507
}
505508

506509
func newMockWatcher(buf int) *mockWatcher {
@@ -522,7 +525,7 @@ func (m *mockWatcher) Watch(ctx context.Context, _ string, opts ...clientv3.OpOp
522525
return out
523526
}
524527

525-
func (m *mockWatcher) RequestProgress(_ context.Context) error { return nil }
528+
func (m *mockWatcher) RequestProgress(_ context.Context) error { return m.progressErr }
526529

527530
func (m *mockWatcher) Close() error {
528531
m.closeOnce.Do(func() { close(m.responses) })
@@ -600,6 +603,7 @@ func (m *mockWatcher) streamResponses(ctx context.Context, out chan<- clientv3.W
600603
type kvStub struct {
601604
queued []*clientv3.GetResponse
602605
defaultResp *clientv3.GetResponse
606+
defaultErr error
603607
}
604608

605609
func newKVStub(resps ...*clientv3.GetResponse) *kvStub {
@@ -610,7 +614,11 @@ func newKVStub(resps ...*clientv3.GetResponse) *kvStub {
610614
}
611615
}
612616

613-
func (s *kvStub) Get(ctx context.Context, key string, _ ...clientv3.OpOption) (*clientv3.GetResponse, error) {
617+
func (s *kvStub) Get(_ context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) {
618+
if s.defaultErr != nil {
619+
return nil, s.defaultErr
620+
}
621+
614622
if len(s.queued) > 0 {
615623
next := s.queued[0]
616624
s.queued = s.queued[1:]
@@ -692,3 +700,171 @@ func verifySnapshot(t *testing.T, cache *Cache, want []*mvccpb.KeyValue) {
692700
t.Fatalf("cache snapshot mismatch (-want +got):\n%s", diff)
693701
}
694702
}
703+
704+
type noopProgressNotifier struct{}
705+
706+
func (n *noopProgressNotifier) RequestProgress(_ context.Context) error {
707+
return nil
708+
}
709+
710+
func newTestProgressRequestor() *conditionalProgressRequestor {
711+
return newConditionalProgressRequestor(&noopProgressNotifier{}, realClock{}, 100*time.Millisecond)
712+
}
713+
714+
func newCacheForWaitTest(serverRev int64, localRev int64, pr progressRequestor) (*Cache, *store) {
715+
cfg := defaultConfig()
716+
st := newStore(cfg.BTreeDegree, cfg.HistoryWindowSize)
717+
if localRev > 0 {
718+
st.Restore(nil, localRev)
719+
}
720+
kv := &kvStub{
721+
defaultResp: &clientv3.GetResponse{Header: &pb.ResponseHeader{Revision: serverRev}},
722+
}
723+
return &Cache{
724+
kv: kv,
725+
store: st,
726+
prefix: "/",
727+
progressRequestor: pr,
728+
cfg: cfg,
729+
}, st
730+
}
731+
732+
func TestWaitTillRevision(t *testing.T) {
733+
t.Run("cache_already_caught_up", func(t *testing.T) {
734+
c, _ := newCacheForWaitTest(10, 10, newTestProgressRequestor())
735+
736+
if err := c.waitTillRevision(context.Background(), 10); err != nil {
737+
t.Fatalf("unexpected error: %v", err)
738+
}
739+
})
740+
741+
t.Run("local_rev_sufficient_skips_server_call", func(t *testing.T) {
742+
cfg := defaultConfig()
743+
st := newStore(cfg.BTreeDegree, cfg.HistoryWindowSize)
744+
st.Restore(nil, 10)
745+
c := &Cache{
746+
kv: &kvStub{defaultErr: fmt.Errorf("should not be called")},
747+
store: st,
748+
prefix: "/",
749+
progressRequestor: newTestProgressRequestor(),
750+
cfg: cfg,
751+
}
752+
753+
if err := c.waitTillRevision(context.Background(), 5); err != nil {
754+
t.Fatalf("unexpected error: %v", err)
755+
}
756+
})
757+
758+
t.Run("cache_catches_up", func(t *testing.T) {
759+
c, st := newCacheForWaitTest(15, 5, newTestProgressRequestor())
760+
761+
go func() {
762+
time.Sleep(200 * time.Millisecond)
763+
st.Restore(nil, 10)
764+
}()
765+
766+
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
767+
defer cancel()
768+
if err := c.waitTillRevision(ctx, 10); err != nil {
769+
t.Fatalf("unexpected error: %v", err)
770+
}
771+
})
772+
773+
t.Run("rev_zero_cache_caught_up", func(t *testing.T) {
774+
c, _ := newCacheForWaitTest(10, 10, newTestProgressRequestor())
775+
776+
if err := c.waitTillRevision(context.Background(), 0); err != nil {
777+
t.Fatalf("unexpected error: %v", err)
778+
}
779+
})
780+
781+
t.Run("rev_zero_waits_for_server_rev", func(t *testing.T) {
782+
c, st := newCacheForWaitTest(10, 5, newTestProgressRequestor())
783+
784+
go func() {
785+
time.Sleep(200 * time.Millisecond)
786+
st.Restore(nil, 10)
787+
}()
788+
789+
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
790+
defer cancel()
791+
if err := c.waitTillRevision(ctx, 0); err != nil {
792+
t.Fatalf("unexpected error: %v", err)
793+
}
794+
})
795+
796+
t.Run("context_cancelled", func(t *testing.T) {
797+
c, _ := newCacheForWaitTest(10, 5, newTestProgressRequestor())
798+
799+
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
800+
defer cancel()
801+
err := c.waitTillRevision(ctx, 10)
802+
if !errors.Is(err, context.DeadlineExceeded) {
803+
t.Fatalf("got %v, want context.DeadlineExceeded", err)
804+
}
805+
})
806+
807+
t.Run("timeout", func(t *testing.T) {
808+
c, _ := newCacheForWaitTest(10, 5, newTestProgressRequestor())
809+
810+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
811+
defer cancel()
812+
err := c.waitTillRevision(ctx, 10)
813+
if !errors.Is(err, ErrCacheTimeout) {
814+
t.Fatalf("got %v, want ErrCacheTimeout", err)
815+
}
816+
})
817+
}
818+
819+
func TestWaitTillRevisionTriggersProgressRequests(t *testing.T) {
820+
fc := newFakeClock()
821+
pr := newTestConditionalProgressRequestor(fc, 50*time.Millisecond)
822+
c, st := newCacheForWaitTest(15, 5, pr)
823+
824+
// Start progress requestor
825+
ctx, cancel := context.WithCancel(context.Background())
826+
defer cancel()
827+
go pr.run(ctx)
828+
829+
// Wait for goroutine to start
830+
time.Sleep(10 * time.Millisecond)
831+
832+
// Initially, no progress requests should be sent (no waiters)
833+
fc.Advance(100 * time.Millisecond)
834+
if err := pollConditionNoChange(func() bool {
835+
return pr.progressRequestsSentCount.Load() == 0
836+
}); err != nil {
837+
t.Fatal("expected no progress requests without active waiters")
838+
}
839+
840+
// Start waiting - this should trigger progress requests
841+
errCh := make(chan error, 1)
842+
go func() {
843+
errCh <- c.waitTillRevision(context.Background(), 10)
844+
}()
845+
846+
// Advance time and wait for progress requests to start
847+
fc.Advance(50 * time.Millisecond)
848+
time.Sleep(10 * time.Millisecond)
849+
850+
// Verify progress requests are being sent while waiting
851+
if pr.progressRequestsSentCount.Load() == 0 {
852+
t.Fatal("expected progress requests during wait")
853+
}
854+
855+
// Complete the wait
856+
st.Restore(nil, 15)
857+
858+
if err := <-errCh; err != nil {
859+
t.Fatalf("unexpected error: %v", err)
860+
}
861+
862+
// After completion, progress requests should stop
863+
finalCount := pr.progressRequestsSentCount.Load()
864+
fc.Advance(100 * time.Millisecond)
865+
if err := pollConditionNoChange(func() bool {
866+
return pr.progressRequestsSentCount.Load() == finalCount
867+
}); err != nil {
868+
t.Fatalf("expected no new progress requests after completion, got %d initially, then changed", finalCount)
869+
}
870+
}

0 commit comments

Comments
 (0)