diff --git a/pkg/storer/internal/pinning/pinning.go b/pkg/storer/internal/pinning/pinning.go index 04689a7eec6..01abe264fc8 100644 --- a/pkg/storer/internal/pinning/pinning.go +++ b/pkg/storer/internal/pinning/pinning.go @@ -207,6 +207,32 @@ func HasPin(st storage.Reader, root swarm.Address) (bool, error) { return has, nil } +// GetCollectionUUIDs returns all collection UUIDs from pin collections. +func GetCollectionUUIDs(st storage.Reader) ([][]byte, error) { + var collectionUUIDs [][]byte + err := st.Iterate(storage.Query{ + Factory: func() storage.Item { return &pinCollectionItem{} }, + }, func(r storage.Result) (bool, error) { + collection := r.Entry.(*pinCollectionItem) + collectionUUIDs = append(collectionUUIDs, collection.UUID) + return false, nil + }) + if err != nil { + return nil, fmt.Errorf("pin store: failed getting collections: %w", err) + } + return collectionUUIDs, nil +} + +// IsChunkPinnedInCollection checks if a chunk address is pinned under the given collection uuid. +func IsChunkPinnedInCollection(st storage.Reader, chunkAddr swarm.Address, uuid []byte) (bool, error) { + chunkItem := &pinChunkItem{UUID: uuid, Addr: chunkAddr} + has, err := st.Has(chunkItem) + if err != nil { + return false, fmt.Errorf("pin store: failed checking chunk pin status: %w", err) + } + return has, nil +} + // Pins lists all the added pinning collections. func Pins(st storage.Reader) ([]swarm.Address, error) { pins := make([]swarm.Address, 0) diff --git a/pkg/storer/internal/reserve/reserve.go b/pkg/storer/internal/reserve/reserve.go index 80e301e72a8..67a0043ccf5 100644 --- a/pkg/storer/internal/reserve/reserve.go +++ b/pkg/storer/internal/reserve/reserve.go @@ -19,6 +19,7 @@ import ( "github.com/ethersphere/bee/v2/pkg/postage" "github.com/ethersphere/bee/v2/pkg/storage" "github.com/ethersphere/bee/v2/pkg/storer/internal/chunkstamp" + pinstore "github.com/ethersphere/bee/v2/pkg/storer/internal/pinning" "github.com/ethersphere/bee/v2/pkg/storer/internal/stampindex" "github.com/ethersphere/bee/v2/pkg/storer/internal/transaction" "github.com/ethersphere/bee/v2/pkg/swarm" @@ -327,6 +328,7 @@ func (r *Reserve) Get(ctx context.Context, addr swarm.Address, batchID []byte, s } // EvictBatchBin evicts all chunks from bins upto the bin provided. +// Pinned chunks are protected from eviction to maintain data integrity. func (r *Reserve) EvictBatchBin( ctx context.Context, batchID []byte, @@ -336,13 +338,21 @@ func (r *Reserve) EvictBatchBin( r.multx.Lock(string(batchID)) defer r.multx.Unlock(string(batchID)) - var evicteditems []*BatchRadiusItem + var ( + evictedItems []*BatchRadiusItem + pinnedEvictedItems []*BatchRadiusItem + ) if count <= 0 { return 0, nil } - err := r.st.IndexStore().Iterate(storage.Query{ + pinUuids, err := pinstore.GetCollectionUUIDs(r.st.IndexStore()) + if err != nil { + return 0, err + } + + err = r.st.IndexStore().Iterate(storage.Query{ Factory: func() storage.Item { return &BatchRadiusItem{} }, Prefix: string(batchID), }, func(res storage.Result) (bool, error) { @@ -350,7 +360,24 @@ func (r *Reserve) EvictBatchBin( if batchRadius.Bin >= bin { return true, nil } - evicteditems = append(evicteditems, batchRadius) + + // Check if the chunk is pinned in any collection + pinned := false + for _, uuid := range pinUuids { + has, err := pinstore.IsChunkPinnedInCollection(r.st.IndexStore(), batchRadius.Address, uuid) + if err != nil { + return true, err + } + if has { + pinned = true + pinnedEvictedItems = append(pinnedEvictedItems, batchRadius) + break + } + } + + if !pinned { + evictedItems = append(evictedItems, batchRadius) + } count-- if count == 0 { return true, nil @@ -366,7 +393,7 @@ func (r *Reserve) EvictBatchBin( var evicted atomic.Int64 - for _, item := range evicteditems { + for _, item := range evictedItems { func(item *BatchRadiusItem) { eg.Go(func() error { err := r.st.Run(ctx, func(s transaction.Store) error { @@ -381,6 +408,21 @@ func (r *Reserve) EvictBatchBin( }(item) } + for _, item := range pinnedEvictedItems { + func(item *BatchRadiusItem) { + eg.Go(func() error { + err := r.st.Run(ctx, func(s transaction.Store) error { + return RemoveChunkMetaData(ctx, s, item) + }) + if err != nil { + return err + } + evicted.Add(1) + return nil + }) + }(item) + } + err = eg.Wait() r.size.Add(-evicted.Load()) @@ -430,6 +472,29 @@ func RemoveChunkWithItem( ) } +// RemoveChunkMetaData removes chunk reserve metadata from reserve indexes but keeps the cunks in the chunkstore. +// used at pinned data eviction +func RemoveChunkMetaData( + ctx context.Context, + trx transaction.Store, + item *BatchRadiusItem, +) error { + var errs error + + stamp, _ := chunkstamp.LoadWithStampHash(trx.IndexStore(), reserveScope, item.Address, item.StampHash) + if stamp != nil { + errs = errors.Join( + stampindex.Delete(trx.IndexStore(), reserveScope, stamp), + chunkstamp.DeleteWithStamp(trx.IndexStore(), reserveScope, item.Address, stamp), + ) + } + + return errors.Join(errs, + trx.IndexStore().Delete(item), + trx.IndexStore().Delete(&ChunkBinItem{Bin: item.Bin, BinID: item.BinID}), + ) +} + func (r *Reserve) IterateBin(bin uint8, startBinID uint64, cb func(swarm.Address, uint64, []byte, []byte) (bool, error)) error { err := r.st.IndexStore().Iterate(storage.Query{ Factory: func() storage.Item { return &ChunkBinItem{} }, diff --git a/pkg/storer/internal/reserve/reserve_test.go b/pkg/storer/internal/reserve/reserve_test.go index 95218e992f6..c6d682a7171 100644 --- a/pkg/storer/internal/reserve/reserve_test.go +++ b/pkg/storer/internal/reserve/reserve_test.go @@ -22,6 +22,7 @@ import ( chunk "github.com/ethersphere/bee/v2/pkg/storage/testing" "github.com/ethersphere/bee/v2/pkg/storer/internal" "github.com/ethersphere/bee/v2/pkg/storer/internal/chunkstamp" + pinstore "github.com/ethersphere/bee/v2/pkg/storer/internal/pinning" "github.com/ethersphere/bee/v2/pkg/storer/internal/reserve" "github.com/ethersphere/bee/v2/pkg/storer/internal/stampindex" "github.com/ethersphere/bee/v2/pkg/storer/internal/transaction" @@ -966,6 +967,139 @@ func TestReset(t *testing.T) { } } +// TestEvictRemovesPinnedContent checks that pinned chunks are protected from eviction. +func TestEvictRemovesPinnedContent(t *testing.T) { + t.Parallel() + + const ( + numChunks = 5 + numPinnedChunks = 3 + ) + + ctx := context.Background() + baseAddr := swarm.RandAddress(t) + ts := internal.NewInmemStorage() + + r, err := reserve.New( + baseAddr, + ts, + 0, + kademlia.NewTopologyDriver(), + log.Noop, + ) + if err != nil { + t.Fatal(err) + } + + batch := postagetesting.MustNewBatch() + + chunks := make([]swarm.Chunk, numChunks) + for i := 0; i < numChunks; i++ { + ch := chunk.GenerateTestRandomChunkAt(t, baseAddr, 0).WithStamp(postagetesting.MustNewBatchStamp(batch.ID)) + chunks[i] = ch + + err := r.Put(ctx, ch) + if err != nil { + t.Fatal(err) + } + } + + var pinningPutter internal.PutterCloserWithReference + err = ts.Run(ctx, func(store transaction.Store) error { + pinningPutter, err = pinstore.NewCollection(store.IndexStore()) + return err + }) + if err != nil { + t.Fatal(err) + } + + // Add chunks to pin collection + pinnedChunks := chunks[:numPinnedChunks] + for _, ch := range pinnedChunks { + err = ts.Run(ctx, func(s transaction.Store) error { + return pinningPutter.Put(ctx, s, ch) + }) + if err != nil { + t.Fatal(err) + } + } + err = ts.Run(ctx, func(s transaction.Store) error { + return pinningPutter.Close(s.IndexStore(), pinnedChunks[0].Address()) + }) + if err != nil { + t.Fatal(err) + } + + // evict all chunks from this batch - this should NOT remove pinned chunks + evicted, err := r.EvictBatchBin(ctx, batch.ID, numChunks, swarm.MaxBins) + if err != nil { + t.Fatal(err) + } + if evicted != numChunks { + t.Fatalf("expected %d evicted chunks, got %d", numChunks, evicted) + } + + uuids, err := pinstore.GetCollectionUUIDs(ts.IndexStore()) + if err != nil { + t.Fatal(err) + } + if len(uuids) != 1 { + t.Fatalf("expected exactly one pin collection, but found %d", len(uuids)) + } + + for i, ch := range chunks { + stampHash, err := ch.Stamp().Hash() + if err != nil { + t.Fatal(err) + } + + // Try to get the chunk from reserve, error is checked later + _, err = r.Get(ctx, ch.Address(), ch.Stamp().BatchID(), stampHash) + + // Also try to get chunk directly from chunkstore (like bzz/bytes endpoints do) + _, chunkStoreErr := ts.ChunkStore().Get(ctx, ch.Address()) + + pinned := false + for _, uuid := range uuids { + has, err := pinstore.IsChunkPinnedInCollection(ts.IndexStore(), ch.Address(), uuid) + if err != nil { + t.Fatal(err) + } + if has { + pinned = true + } + } + + if i < len(pinnedChunks) { + if pinned { + // This chunk is pinned, so it should NOT be accessible from reserve but SHOULD be accessible from chunkstore + if !errors.Is(err, storage.ErrNotFound) { + t.Errorf("Pinned chunk %s should have been evicted from reserve", ch.Address()) + } + if errors.Is(chunkStoreErr, storage.ErrNotFound) { + t.Errorf("Pinned chunk %s was deleted from chunkstore - should remain retrievable!", ch.Address()) + } else if chunkStoreErr != nil { + t.Fatal(chunkStoreErr) + } + } else { + t.Errorf("Chunk %s should be pinned", ch.Address()) + } + } else { // unpinned chunks + if !pinned { + // Unpinned chunks should be completely evicted (both reserve and chunkstore) + if !errors.Is(err, storage.ErrNotFound) { + t.Errorf("Unpinned chunk %s should have been evicted from reserve", ch.Address()) + } + if !errors.Is(chunkStoreErr, storage.ErrNotFound) { + t.Errorf("Unpinned chunk %s should have been evicted from chunkstore", ch.Address()) + } + } else { + t.Errorf("Chunk %s should not be pinned", ch.Address()) + } + } + } +} + func checkStore(t *testing.T, s storage.Reader, k storage.Key, gone bool) { t.Helper() h, err := s.Has(k)