Skip to content

Commit 9aa4505

Browse files
Thomas StrombergThomas Stromberg
authored andcommitted
add cache flush API
1 parent 3893c2b commit 9aa4505

File tree

12 files changed

+583
-0
lines changed

12 files changed

+583
-0
lines changed

cache.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,25 @@ func (c *Cache[K, V]) Cleanup() int {
235235
return c.memory.cleanupMemory()
236236
}
237237

238+
// Flush removes all entries from the cache, including persistent storage.
239+
// Returns the total number of entries removed from memory and persistence.
240+
func (c *Cache[K, V]) Flush(ctx context.Context) (int, error) {
241+
memoryRemoved := c.memory.flushMemory()
242+
243+
if c.persist == nil {
244+
slog.Info("cache flushed", "memory", memoryRemoved, "persist", 0)
245+
return memoryRemoved, nil
246+
}
247+
248+
persistRemoved, err := c.persist.Flush(ctx)
249+
if err != nil {
250+
return memoryRemoved, fmt.Errorf("persistence flush failed: %w", err)
251+
}
252+
253+
slog.Info("cache flushed", "memory", memoryRemoved, "persist", persistRemoved)
254+
return memoryRemoved + persistRemoved, nil
255+
}
256+
238257
// Len returns the number of items in the memory cache.
239258
func (c *Cache[K, V]) Len() int {
240259
return c.memory.memoryLen()

cache_persist_test.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,15 @@ func (m *mockPersister[K, V]) Location(key K) string {
149149
return fmt.Sprintf("mock://%v", key)
150150
}
151151

152+
func (m *mockPersister[K, V]) Flush(ctx context.Context) (int, error) {
153+
m.mu.Lock()
154+
defer m.mu.Unlock()
155+
156+
count := len(m.data)
157+
m.data = make(map[string]mockEntry[V])
158+
return count, nil
159+
}
160+
152161
func (m *mockPersister[K, V]) Close() error {
153162
m.mu.Lock()
154163
defer m.mu.Unlock()
@@ -647,3 +656,66 @@ func TestCache_CleanupExpiredEntries(t *testing.T) {
647656
t.Error("valid entry should still exist after cleanup")
648657
}
649658
}
659+
660+
func TestCache_FlushWithPersistence(t *testing.T) {
661+
ctx := context.Background()
662+
persister := newMockPersister[string, int]()
663+
664+
cache, err := New[string, int](ctx, WithPersistence(persister))
665+
if err != nil {
666+
t.Fatalf("New: %v", err)
667+
}
668+
defer func() { _ = cache.Close() }() //nolint:errcheck // Test cleanup
669+
670+
// Add entries
671+
for i := range 10 {
672+
if err := cache.Set(ctx, fmt.Sprintf("key%d", i), i*100, 0); err != nil {
673+
t.Fatalf("Set: %v", err)
674+
}
675+
}
676+
677+
// Verify entries exist in both memory and persistence
678+
if cache.Len() != 10 {
679+
t.Errorf("memory cache length = %d; want 10", cache.Len())
680+
}
681+
for i := range 10 {
682+
if _, _, found, err := persister.Load(ctx, fmt.Sprintf("key%d", i)); err != nil || !found {
683+
t.Fatalf("key%d should exist in persistence", i)
684+
}
685+
}
686+
687+
// Flush
688+
removed, err := cache.Flush(ctx)
689+
if err != nil {
690+
t.Fatalf("Flush: %v", err)
691+
}
692+
// Should remove 10 from memory + 10 from persistence = 20 total
693+
if removed != 20 {
694+
t.Errorf("Flush removed %d items; want 20", removed)
695+
}
696+
697+
// Memory cache should be empty
698+
if cache.Len() != 0 {
699+
t.Errorf("memory cache length after flush = %d; want 0", cache.Len())
700+
}
701+
702+
// Persistence should be empty
703+
for i := range 10 {
704+
if _, _, found, err := persister.Load(ctx, fmt.Sprintf("key%d", i)); err != nil {
705+
t.Fatalf("Load: %v", err)
706+
} else if found {
707+
t.Errorf("key%d should not exist in persistence after flush", i)
708+
}
709+
}
710+
711+
// Get should return not found for all keys
712+
for i := range 10 {
713+
_, found, err := cache.Get(ctx, fmt.Sprintf("key%d", i))
714+
if err != nil {
715+
t.Fatalf("Get: %v", err)
716+
}
717+
if found {
718+
t.Errorf("key%d should not be found after flush", i)
719+
}
720+
}
721+
}

cache_test.go

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package bdcache
22

33
import (
44
"context"
5+
"fmt"
56
"sync"
67
"testing"
78
"time"
@@ -625,3 +626,66 @@ func TestCache_SetUpdateExisting(t *testing.T) {
625626
t.Errorf("Get value = %d; want 100", val)
626627
}
627628
}
629+
630+
func TestCache_Flush(t *testing.T) {
631+
ctx := context.Background()
632+
cache, err := New[string, int](ctx)
633+
if err != nil {
634+
t.Fatalf("New: %v", err)
635+
}
636+
defer func() { _ = cache.Close() }() //nolint:errcheck // Test cleanup
637+
638+
// Add entries
639+
for i := range 10 {
640+
if err := cache.Set(ctx, fmt.Sprintf("key%d", i), i, 0); err != nil {
641+
t.Fatalf("Set: %v", err)
642+
}
643+
}
644+
645+
if cache.Len() != 10 {
646+
t.Errorf("cache length = %d; want 10", cache.Len())
647+
}
648+
649+
// Flush
650+
removed, err := cache.Flush(ctx)
651+
if err != nil {
652+
t.Fatalf("Flush: %v", err)
653+
}
654+
if removed != 10 {
655+
t.Errorf("Flush removed %d items; want 10", removed)
656+
}
657+
658+
// Cache should be empty
659+
if cache.Len() != 0 {
660+
t.Errorf("cache length after flush = %d; want 0", cache.Len())
661+
}
662+
663+
// All keys should be gone
664+
for i := range 10 {
665+
_, found, err := cache.Get(ctx, fmt.Sprintf("key%d", i))
666+
if err != nil {
667+
t.Fatalf("Get: %v", err)
668+
}
669+
if found {
670+
t.Errorf("key%d should not be found after flush", i)
671+
}
672+
}
673+
}
674+
675+
func TestCache_FlushEmpty(t *testing.T) {
676+
ctx := context.Background()
677+
cache, err := New[string, int](ctx)
678+
if err != nil {
679+
t.Fatalf("New: %v", err)
680+
}
681+
defer func() { _ = cache.Close() }() //nolint:errcheck // Test cleanup
682+
683+
// Flush empty cache
684+
removed, err := cache.Flush(ctx)
685+
if err != nil {
686+
t.Fatalf("Flush: %v", err)
687+
}
688+
if removed != 0 {
689+
t.Errorf("Flush removed %d items; want 0", removed)
690+
}
691+
}

persist.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@ type PersistenceLayer[K comparable, V any] interface {
3939
// Useful for testing and debugging to verify where items are stored.
4040
Location(key K) string
4141

42+
// Flush removes all entries from persistent storage.
43+
// Returns the number of entries removed and any error.
44+
Flush(ctx context.Context) (int, error)
45+
4246
// Close releases any resources held by the persistence layer.
4347
Close() error
4448
}

persist/datastore/datastore.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,31 @@ func (p *persister[K, V]) Cleanup(ctx context.Context, maxAge time.Duration) (in
281281
return len(keys), nil
282282
}
283283

284+
// Flush removes all entries from Datastore.
285+
// Returns the number of entries removed and any error.
286+
func (p *persister[K, V]) Flush(ctx context.Context) (int, error) {
287+
// Query for all keys
288+
query := ds.NewQuery(p.kind).KeysOnly()
289+
290+
var entries []datastoreEntry
291+
keys, err := p.client.GetAll(ctx, query, &entries)
292+
if err != nil {
293+
return 0, fmt.Errorf("query all entries: %w", err)
294+
}
295+
296+
if len(keys) == 0 {
297+
return 0, nil
298+
}
299+
300+
// Batch delete all entries
301+
if err := p.client.DeleteMulti(ctx, keys); err != nil {
302+
return 0, fmt.Errorf("delete all entries: %w", err)
303+
}
304+
305+
slog.Info("flushed datastore cache", "count", len(keys), "kind", p.kind)
306+
return len(keys), nil
307+
}
308+
284309
// Close releases Datastore client resources.
285310
func (p *persister[K, V]) Close() error {
286311
return p.client.Close()

persist/datastore/persist_datastore_mock_test.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package datastore
33
import (
44
"context"
55
"errors"
6+
"fmt"
67
"testing"
78
"time"
89

@@ -770,3 +771,61 @@ func TestDatastorePersist_Mock_MultipleOps(t *testing.T) {
770771
t.Error("After Delete: key should not be found")
771772
}
772773
}
774+
775+
func TestDatastorePersist_Mock_Flush(t *testing.T) {
776+
dp, cleanup := newMockDatastorePersist[string, int](t)
777+
defer cleanup()
778+
779+
ctx := context.Background()
780+
781+
// Store multiple entries
782+
for i := range 10 {
783+
key := fmt.Sprintf("key%d", i)
784+
if err := dp.Store(ctx, key, i*100, time.Time{}); err != nil {
785+
t.Fatalf("Store %s: %v", key, err)
786+
}
787+
}
788+
789+
// Verify entries exist
790+
for i := range 10 {
791+
key := fmt.Sprintf("key%d", i)
792+
if _, _, found, err := dp.Load(ctx, key); err != nil || !found {
793+
t.Fatalf("%s should exist before flush", key)
794+
}
795+
}
796+
797+
// Flush
798+
deleted, err := dp.Flush(ctx)
799+
if err != nil {
800+
t.Fatalf("Flush: %v", err)
801+
}
802+
if deleted != 10 {
803+
t.Errorf("Flush deleted %d entries; want 10", deleted)
804+
}
805+
806+
// All entries should be gone
807+
for i := range 10 {
808+
key := fmt.Sprintf("key%d", i)
809+
if _, _, found, err := dp.Load(ctx, key); err != nil {
810+
t.Fatalf("Load: %v", err)
811+
} else if found {
812+
t.Errorf("%s should not exist after flush", key)
813+
}
814+
}
815+
}
816+
817+
func TestDatastorePersist_Mock_FlushEmpty(t *testing.T) {
818+
dp, cleanup := newMockDatastorePersist[string, int](t)
819+
defer cleanup()
820+
821+
ctx := context.Background()
822+
823+
// Flush empty datastore
824+
deleted, err := dp.Flush(ctx)
825+
if err != nil {
826+
t.Fatalf("Flush: %v", err)
827+
}
828+
if deleted != 0 {
829+
t.Errorf("Flush deleted %d entries from empty datastore; want 0", deleted)
830+
}
831+
}

persist/localfs/integration_test.go

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -902,3 +902,102 @@ func TestFilePersist_Cleanup_ContextCancellation(t *testing.T) {
902902
t.Errorf("expected context.Canceled error, got: %v", err)
903903
}
904904
}
905+
906+
func TestFilePersist_Flush(t *testing.T) {
907+
dir := t.TempDir()
908+
fp, err := New[string, int]("test", dir)
909+
if err != nil {
910+
t.Fatalf("New: %v", err)
911+
}
912+
defer func() {
913+
if err := fp.Close(); err != nil {
914+
t.Logf("Close error: %v", err)
915+
}
916+
}()
917+
918+
ctx := context.Background()
919+
920+
// Store multiple entries
921+
for i := range 10 {
922+
if err := fp.Store(ctx, fmt.Sprintf("key-%d", i), i*100, time.Time{}); err != nil {
923+
t.Fatalf("Store: %v", err)
924+
}
925+
}
926+
927+
// Verify files exist
928+
for i := range 10 {
929+
if _, _, found, err := fp.Load(ctx, fmt.Sprintf("key-%d", i)); err != nil || !found {
930+
t.Fatalf("key-%d should exist before flush", i)
931+
}
932+
}
933+
934+
// Flush
935+
deleted, err := fp.Flush(ctx)
936+
if err != nil {
937+
t.Fatalf("Flush: %v", err)
938+
}
939+
if deleted != 10 {
940+
t.Errorf("Flush deleted %d entries; want 10", deleted)
941+
}
942+
943+
// All entries should be gone
944+
for i := range 10 {
945+
if _, _, found, err := fp.Load(ctx, fmt.Sprintf("key-%d", i)); err != nil {
946+
t.Fatalf("Load: %v", err)
947+
} else if found {
948+
t.Errorf("key-%d should not exist after flush", i)
949+
}
950+
}
951+
}
952+
953+
func TestFilePersist_Flush_Empty(t *testing.T) {
954+
dir := t.TempDir()
955+
fp, err := New[string, int]("test", dir)
956+
if err != nil {
957+
t.Fatalf("New: %v", err)
958+
}
959+
defer func() {
960+
if err := fp.Close(); err != nil {
961+
t.Logf("Close error: %v", err)
962+
}
963+
}()
964+
965+
// Flush empty cache
966+
deleted, err := fp.Flush(context.Background())
967+
if err != nil {
968+
t.Fatalf("Flush: %v", err)
969+
}
970+
if deleted != 0 {
971+
t.Errorf("Flush deleted %d entries; want 0", deleted)
972+
}
973+
}
974+
975+
func TestFilePersist_Flush_ContextCancellation(t *testing.T) {
976+
dir := t.TempDir()
977+
fp, err := New[string, int]("test", dir)
978+
if err != nil {
979+
t.Fatalf("New: %v", err)
980+
}
981+
defer func() {
982+
if err := fp.Close(); err != nil {
983+
t.Logf("Close error: %v", err)
984+
}
985+
}()
986+
987+
// Store many entries
988+
for i := range 100 {
989+
if err := fp.Store(context.Background(), fmt.Sprintf("key-%d", i), i, time.Time{}); err != nil {
990+
t.Fatalf("Store: %v", err)
991+
}
992+
}
993+
994+
// Create context that we'll cancel
995+
ctx, cancel := context.WithCancel(context.Background())
996+
cancel()
997+
998+
// Try to flush
999+
_, err = fp.Flush(ctx)
1000+
if err == nil || !errors.Is(err, context.Canceled) {
1001+
t.Errorf("expected context.Canceled error, got: %v", err)
1002+
}
1003+
}

0 commit comments

Comments
 (0)