diff --git a/pkg/controller/runtime/internal/cache/cache.go b/pkg/controller/runtime/internal/cache/cache.go index 619fbb2..c5933e0 100644 --- a/pkg/controller/runtime/internal/cache/cache.go +++ b/pkg/controller/runtime/internal/cache/cache.go @@ -137,7 +137,22 @@ func (cache *ResourceCache) CachePut(r resource.Resource) { // CacheRemove handles deleted objects. func (cache *ResourceCache) CacheRemove(r resource.Resource) { - cache.getHandler(r.Metadata().Namespace(), r.Metadata().Type()).remove(r) + cache.CacheRemoveByPointer(r.Metadata()) +} + +func (cache *ResourceCache) CacheRemoveByPointer(ptr *resource.Metadata) { + // cache.getHandler(ptr.Namespace(), ptr.Type()).remove(ptr) + tombstone := newCacheTombstone(ptr) + cache.getHandler(ptr.Namespace(), ptr.Type()).put(tombstone) +} + +// ClearTombstones removes all tombstones from the cache. +// +// TODO: call this periodically in a goroutine, e.g., in the controller runtime. +// +// TODO: only remove tombstones older than X. +func (cache *ResourceCache) ClearTombstones(namespace resource.Namespace, resourceType resource.Type) { + cache.getHandler(namespace, resourceType).clearTombstones() } // WrapState returns a cached wrapped state, which serves some operations from the cache bypassing the underlying state. @@ -147,3 +162,57 @@ func (cache *ResourceCache) WrapState(st state.CoreState) state.CoreState { st: st, } } + +var _ resource.Resource = (*cacheTombstone)(nil) + +// cacheTombstone is a resource without a Spec. +// +// Tombstones are used to present state of a deleted resource. +type cacheTombstone struct { + ref resource.Metadata +} + +// newCacheTombstone builds a tombstone from resource reference. +func newCacheTombstone(ref resource.Reference) *cacheTombstone { + return &cacheTombstone{ + ref: resource.NewMetadata(ref.Namespace(), ref.Type(), ref.ID(), ref.Version()), + } +} + +// String method for debugging/logging. +func (t *cacheTombstone) String() string { + return fmt.Sprintf("cacheTombstone(%s)", t.ref.String()) +} + +// Metadata for the resource. +// +// Metadata.Version should change each time Spec changes. +func (t *cacheTombstone) Metadata() *resource.Metadata { + return &t.ref +} + +// Spec is not implemented for tobmstones. +func (t *cacheTombstone) Spec() any { + panic("tombstone doesn't contain spec") +} + +// DeepCopy returns self, as tombstone is immutable. +func (t *cacheTombstone) DeepCopy() resource.Resource { //nolint:ireturn + return t +} + +// cacheTombstone implements Tombstoned interface. +func (t *cacheTombstone) cacheTombstone() { +} + +// Tombstoned is a marker interface for Tombstones. +type cacheTombstoned interface { + cacheTombstone() +} + +// IsTombstone checks if resource is represented by the cacheTombstone. +func isCacheTombstone(res resource.Resource) bool { + _, ok := res.(cacheTombstoned) + + return ok +} diff --git a/pkg/controller/runtime/internal/cache/cache_test.go b/pkg/controller/runtime/internal/cache/cache_test.go index d7d9feb..b7b9476 100644 --- a/pkg/controller/runtime/internal/cache/cache_test.go +++ b/pkg/controller/runtime/internal/cache/cache_test.go @@ -171,6 +171,9 @@ func TestCacheOperations(t *testing.T) { }) assert.Equal(t, "1000", metrics.CachedResources.Get("A").String()) + assert.Equal(t, "10000", metrics.CachedResources.Get("B").String()) // drops do not cause the cache to be cleared due to the tombstone + + c.ClearTombstones("b", "B") assert.Equal(t, "5000", metrics.CachedResources.Get("B").String()) } diff --git a/pkg/controller/runtime/internal/cache/handler.go b/pkg/controller/runtime/internal/cache/handler.go index 9bb85ac..759a3c4 100644 --- a/pkg/controller/runtime/internal/cache/handler.go +++ b/pkg/controller/runtime/internal/cache/handler.go @@ -79,8 +79,13 @@ func (h *cacheHandler) get(ctx context.Context, id resource.ID, opts ...state.Ge return nil, ErrNotFound(resource.NewMetadata(h.key.Namespace, h.key.Type, id, resource.VersionUndefined)) } + res := h.resources[idx] + if isCacheTombstone(res) { + return nil, ErrNotFound(resource.NewMetadata(h.key.Namespace, h.key.Type, id, resource.VersionUndefined)) + } + // return a copy of the resource to satisfy State semantics - return h.resources[idx].DeepCopy(), nil + return res.DeepCopy(), nil } func (h *cacheHandler) contextWithTeardown(ctx context.Context, id resource.ID) (context.Context, error) { @@ -160,6 +165,10 @@ func (h *cacheHandler) list(ctx context.Context, opts ...state.ListOption) (reso resources := slices.Clone(h.resources) h.mu.Unlock() + resources = xslices.Filter(resources, func(r resource.Resource) bool { + return !isCacheTombstone(r) + }) + // micro optimization: apply filter only if some filters are specified if !value.IsZero(options.IDQuery) || options.LabelQueries != nil { resources = xslices.Filter(resources, func(r resource.Resource) bool { @@ -190,6 +199,15 @@ func (h *cacheHandler) put(r resource.Resource) { }) if found { + existing := h.resources[idx] + existingVersion := existing.Metadata().Version() + newVersion := r.Metadata().Version() + + stale := !isCacheTombstone(existing) && newVersion.Value() < existingVersion.Value() + if stale { + return + } + h.resources[idx] = r } else { h.resources = slices.Insert(h.resources, idx, r) @@ -205,24 +223,16 @@ func (h *cacheHandler) put(r resource.Resource) { } } -func (h *cacheHandler) remove(r resource.Resource) { +func (h *cacheHandler) clearTombstones() { h.mu.Lock() defer h.mu.Unlock() - idx, found := slices.BinarySearchFunc(h.resources, r.Metadata().ID(), func(r resource.Resource, id resource.ID) int { - return cmp.Compare(r.Metadata().ID(), id) - }) - - if found { - h.resources = slices.Delete(h.resources, idx, idx+1) - - metrics.CachedResources.Add(r.Metadata().Type(), -1) - } + before := len(h.resources) + h.resources = slices.DeleteFunc(h.resources, isCacheTombstone) + after := len(h.resources) + delta := after - before - if ch, ok := h.teardownWaiters[r.Metadata().ID()]; ok { - close(ch) - delete(h.teardownWaiters, r.Metadata().ID()) - } + metrics.CachedResources.Add(h.key.Type, int64(delta)) } func (h *cacheHandler) len() int { diff --git a/pkg/controller/runtime/internal/cache/state.go b/pkg/controller/runtime/internal/cache/state.go index fa4415b..f2e8fa5 100644 --- a/pkg/controller/runtime/internal/cache/state.go +++ b/pkg/controller/runtime/internal/cache/state.go @@ -6,6 +6,7 @@ package cache import ( "context" + "sync" "github.com/cosi-project/runtime/pkg/resource" "github.com/cosi-project/runtime/pkg/state" @@ -20,6 +21,8 @@ import ( type stateWrapper struct { cache *ResourceCache st state.CoreState + + lock sync.Mutex } // Check interfaces. @@ -49,7 +52,19 @@ func (wrapper *stateWrapper) List(ctx context.Context, r resource.Kind, opts ... // // If a resource already exists, Create returns an error. func (wrapper *stateWrapper) Create(ctx context.Context, r resource.Resource, opts ...state.CreateOption) error { - return wrapper.st.Create(ctx, r, opts...) + wrapper.lock.Lock() + defer wrapper.lock.Unlock() + + err := wrapper.st.Create(ctx, r, opts...) + if err != nil { + return err + } + + if wrapper.cache.IsHandled(r.Metadata().Namespace(), r.Metadata().Type()) { + wrapper.cache.CachePut(r) + } + + return nil } // Update a resource. @@ -58,7 +73,18 @@ func (wrapper *stateWrapper) Create(ctx context.Context, r resource.Resource, op // On update current version of resource `new` in the state should match // the version on the backend, otherwise conflict error is returned. func (wrapper *stateWrapper) Update(ctx context.Context, newResource resource.Resource, opts ...state.UpdateOption) error { - return wrapper.st.Update(ctx, newResource, opts...) + wrapper.lock.Lock() + defer wrapper.lock.Unlock() + + if err := wrapper.st.Update(ctx, newResource, opts...); err != nil { + return err + } + + if wrapper.cache.IsHandled(newResource.Metadata().Namespace(), newResource.Metadata().Type()) { + wrapper.cache.CachePut(newResource) + } + + return nil } // Destroy a resource. @@ -66,7 +92,31 @@ func (wrapper *stateWrapper) Update(ctx context.Context, newResource resource.Re // If a resource doesn't exist, error is returned. // If a resource has pending finalizers, error is returned. func (wrapper *stateWrapper) Destroy(ctx context.Context, ptr resource.Pointer, opts ...state.DestroyOption) error { - return wrapper.st.Destroy(ctx, ptr, opts...) + wrapper.lock.Lock() + defer wrapper.lock.Unlock() + + var cached resource.Resource + + if wrapper.cache.IsHandled(ptr.Namespace(), ptr.Type()) { + var err error + if cached, err = wrapper.cache.Get(ctx, ptr); err != nil { + return err + } + } + + if err := wrapper.st.Destroy(ctx, ptr, opts...); err != nil { + if cached != nil && state.IsNotFoundError(err) { + wrapper.cache.CacheRemoveByPointer(cached.Metadata()) + } + + return err + } + + if cached != nil { + wrapper.cache.CacheRemoveByPointer(cached.Metadata()) + } + + return nil } // Watch state of a resource by type. diff --git a/pkg/controller/runtime/runtime_test.go b/pkg/controller/runtime/runtime_test.go index 71f07cd..86dc47e 100644 --- a/pkg/controller/runtime/runtime_test.go +++ b/pkg/controller/runtime/runtime_test.go @@ -18,6 +18,7 @@ import ( suiterunner "github.com/stretchr/testify/suite" "go.uber.org/goleak" "go.uber.org/zap/zaptest" + "golang.org/x/sync/errgroup" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" @@ -27,7 +28,10 @@ import ( "github.com/cosi-project/runtime/pkg/controller/runtime/options" "github.com/cosi-project/runtime/pkg/future" "github.com/cosi-project/runtime/pkg/resource" + "github.com/cosi-project/runtime/pkg/resource/meta" "github.com/cosi-project/runtime/pkg/resource/protobuf" + "github.com/cosi-project/runtime/pkg/resource/rtestutils" + "github.com/cosi-project/runtime/pkg/resource/typed" "github.com/cosi-project/runtime/pkg/safe" "github.com/cosi-project/runtime/pkg/state" stateconformance "github.com/cosi-project/runtime/pkg/state/conformance" @@ -304,3 +308,143 @@ func TestRuntimeCachedState(t *testing.T) { require.NoError(t, <-errCh) } + +func TestNoStaleReadOnUpdate(t *testing.T) { + test := func(ctx context.Context) { + ctx, cancel := context.WithCancel(ctx) + + st := state.WrapCore(namespaced.NewState(inmem.Build)) + + logger := zaptest.NewLogger(t) + rt, err := runtime.NewRuntime(st, logger, options.WithCachedResource("default", "TestType")) + + require.NoError(t, err) + + cachedState := state.WrapCore(rt.CachedState()) + + var eg errgroup.Group + + eg.Go(func() error { + return rt.Run(ctx) + }) + + testRes := newTest("default", "test") + testRes.TypedSpec().Var = 2 + + require.NoError(t, cachedState.Create(ctx, testRes)) + + rtestutils.AssertResource(ctx, t, cachedState, "test", func(res *testResource, assertion *assert.Assertions) { + assertion.Equal(2, res.TypedSpec().Var) + }) + + // get out of maintenance + _, err = safe.StateUpdateWithConflicts(ctx, cachedState, testRes.Metadata(), func(res *testResource) error { + res.TypedSpec().Var = 1 + + return nil + }) + require.NoError(t, err) + + // should be visible in the actual state + rtestutils.AssertResource(ctx, t, st, "test", func(res *testResource, assertion *assert.Assertions) { + assertion.Equal(1, res.TypedSpec().Var) + }) + + rtestutils.AssertResource(ctx, t, cachedState, "test", func(res *testResource, assertion *assert.Assertions) { + assertion.Equal(1, res.TypedSpec().Var) + }) + + cancel() + + require.NoError(t, eg.Wait()) + } + + defer goleak.VerifyNone(t, goleak.IgnoreCurrent()) + + ctx, cancel := context.WithTimeout(t.Context(), 10*time.Second) + defer cancel() + + for range 1000 { + test(ctx) + } +} + +func TestNoStaleReadOnDestroy(t *testing.T) { + test := func(ctx context.Context) { + ctx, cancel := context.WithCancel(ctx) + st := state.WrapCore(namespaced.NewState(inmem.Build)) + + logger := zaptest.NewLogger(t) + rt, err := runtime.NewRuntime(st, logger, options.WithCachedResource("default", "TestType")) + + require.NoError(t, err) + + cachedState := state.WrapCore(rt.CachedState()) + + var eg errgroup.Group + + eg.Go(func() error { + return rt.Run(ctx) + }) + + testRes := newTest("default", "test") + testRes.TypedSpec().Var = 2 + + require.NoError(t, cachedState.Create(ctx, testRes)) + + rtestutils.AssertResource(ctx, t, cachedState, "test", func(res *testResource, assertion *assert.Assertions) { + assertion.Equal(2, res.TypedSpec().Var) + }) + + // get out of maintenance + require.NoError(t, cachedState.Destroy(ctx, testRes.Metadata())) + + // should be gone from the actual state + rtestutils.AssertNoResource[*testResource](ctx, t, st, "test") + + // rtestutils.AssertResource - FAILS! + _, err = cachedState.Get(ctx, testRes.Metadata()) + require.Error(t, err) + assert.True(t, state.IsNotFoundError(err)) + + cancel() + require.NoError(t, eg.Wait()) + } + + defer goleak.VerifyNone(t, goleak.IgnoreCurrent()) + + ctx, cancel := context.WithTimeout(t.Context(), 10*time.Second) + defer cancel() + + for range 1000 { + test(ctx) + } +} + +type TestSpec struct { + Var int +} + +func (t TestSpec) DeepCopy() TestSpec { + return t +} + +type testResource = typed.Resource[TestSpec, TestExtension] + +var _ resource.Resource = (*testResource)(nil) + +func newTest(ns, id string) *testResource { + return typed.NewResource[TestSpec, TestExtension]( + resource.NewMetadata(ns, "TestType", id, resource.VersionUndefined), + TestSpec{}, + ) +} + +type TestExtension struct{} + +func (TestExtension) ResourceDefinition() meta.ResourceDefinitionSpec { + return meta.ResourceDefinitionSpec{ + Type: "TestType", + DefaultNamespace: "default", + } +}