Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions pkg/storer/internal/pinning/pinning.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,32 @@ func HasPin(st storage.Reader, root swarm.Address) (bool, error) {
return has, nil
}

// GetCollectionUUIDs returns all collection UUIDs from pin collections.
func GetCollectionUUIDs(st storage.Reader) ([][]byte, error) {
var collectionUUIDs [][]byte
err := st.Iterate(storage.Query{
Factory: func() storage.Item { return &pinCollectionItem{} },
}, func(r storage.Result) (bool, error) {
collection := r.Entry.(*pinCollectionItem)
collectionUUIDs = append(collectionUUIDs, collection.UUID)
return false, nil
})
if err != nil {
return nil, fmt.Errorf("pin store: failed getting collections: %w", err)
}
return collectionUUIDs, nil
}

// IsChunkPinnedInCollection checks if a chunk address is pinned under the given collection uuid.
func IsChunkPinnedInCollection(st storage.Reader, chunkAddr swarm.Address, uuid []byte) (bool, error) {
chunkItem := &pinChunkItem{UUID: uuid, Addr: chunkAddr}
has, err := st.Has(chunkItem)
if err != nil {
return false, fmt.Errorf("pin store: failed checking chunk pin status: %w", err)
}
return has, nil
}

// Pins lists all the added pinning collections.
func Pins(st storage.Reader) ([]swarm.Address, error) {
pins := make([]swarm.Address, 0)
Expand Down
73 changes: 69 additions & 4 deletions pkg/storer/internal/reserve/reserve.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/ethersphere/bee/v2/pkg/postage"
"github.com/ethersphere/bee/v2/pkg/storage"
"github.com/ethersphere/bee/v2/pkg/storer/internal/chunkstamp"
pinstore "github.com/ethersphere/bee/v2/pkg/storer/internal/pinning"
"github.com/ethersphere/bee/v2/pkg/storer/internal/stampindex"
"github.com/ethersphere/bee/v2/pkg/storer/internal/transaction"
"github.com/ethersphere/bee/v2/pkg/swarm"
Expand Down Expand Up @@ -327,6 +328,7 @@ func (r *Reserve) Get(ctx context.Context, addr swarm.Address, batchID []byte, s
}

// EvictBatchBin evicts all chunks from bins upto the bin provided.
// Pinned chunks are protected from eviction to maintain data integrity.
func (r *Reserve) EvictBatchBin(
ctx context.Context,
batchID []byte,
Expand All @@ -336,21 +338,46 @@ func (r *Reserve) EvictBatchBin(
r.multx.Lock(string(batchID))
defer r.multx.Unlock(string(batchID))

var evicteditems []*BatchRadiusItem
var (
evictedItems []*BatchRadiusItem
pinnedEvictedItems []*BatchRadiusItem
Comment on lines +342 to +343
Copy link
Member

@gacevicljubisa gacevicljubisa Sep 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think to change names of this 2 collections and add some comment, something like:

    nonPinnedItems []*BatchRadiusItem  // Fully evicted from reserve + chunkstore
    pinnedItems    []*BatchRadiusItem  // Only evicted from reserve, kept in chunkstore

)

if count <= 0 {
return 0, nil
}

err := r.st.IndexStore().Iterate(storage.Query{
pinUuids, err := pinstore.GetCollectionUUIDs(r.st.IndexStore())
if err != nil {
return 0, err
}

err = r.st.IndexStore().Iterate(storage.Query{
Factory: func() storage.Item { return &BatchRadiusItem{} },
Prefix: string(batchID),
}, func(res storage.Result) (bool, error) {
batchRadius := res.Entry.(*BatchRadiusItem)
if batchRadius.Bin >= bin {
return true, nil
}
evicteditems = append(evicteditems, batchRadius)

// Check if the chunk is pinned in any collection
pinned := false
for _, uuid := range pinUuids {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will slow down batch eviction linearly with the number of collections. Is there a possibility to have collection uuid as a field in BatchRadiusItem so that all of these lookups (Has method calls) are not done? From what I see, it would need some complexity to manage the synchronisations between BatchRadiusItem and pinChunkItem keys, but if the performance degradation is significant, it may be necessary. It is ok to try with this simpler solution.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that is information redundant approach but faster one for sure.
IMO this is still a good enough approach because if you don't use pinning feature you won't notice any performance difference. It is questionable how much this feature is used to worth modifying the underlying DB structure to be redundant.

has, err := pinstore.IsChunkPinnedInCollection(r.st.IndexStore(), batchRadius.Address, uuid)
if err != nil {
return true, err
}
if has {
pinned = true
pinnedEvictedItems = append(pinnedEvictedItems, batchRadius)
break
}
}

if !pinned {
evictedItems = append(evictedItems, batchRadius)
}
count--
if count == 0 {
return true, nil
Expand All @@ -366,7 +393,7 @@ func (r *Reserve) EvictBatchBin(

var evicted atomic.Int64

for _, item := range evicteditems {
for _, item := range evictedItems {
func(item *BatchRadiusItem) {
eg.Go(func() error {
err := r.st.Run(ctx, func(s transaction.Store) error {
Expand All @@ -381,6 +408,21 @@ func (r *Reserve) EvictBatchBin(
}(item)
}

for _, item := range pinnedEvictedItems {
func(item *BatchRadiusItem) {
eg.Go(func() error {
err := r.st.Run(ctx, func(s transaction.Store) error {
return RemoveChunkMetaData(ctx, s, item)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we want to remove the Pinned Chunks metadata, would this affect retrieval of those pinned chunks if storage pressure eviction or manual eviction?

})
if err != nil {
return err
}
evicted.Add(1)
return nil
})
}(item)
}

err = eg.Wait()

r.size.Add(-evicted.Load())
Expand Down Expand Up @@ -430,6 +472,29 @@ func RemoveChunkWithItem(
)
}

// RemoveChunkMetaData removes chunk reserve metadata from reserve indexes but keeps the cunks in the chunkstore.
// used at pinned data eviction
func RemoveChunkMetaData(
ctx context.Context,
trx transaction.Store,
item *BatchRadiusItem,
) error {
var errs error

stamp, _ := chunkstamp.LoadWithStampHash(trx.IndexStore(), reserveScope, item.Address, item.StampHash)
if stamp != nil {
errs = errors.Join(
stampindex.Delete(trx.IndexStore(), reserveScope, stamp),
chunkstamp.DeleteWithStamp(trx.IndexStore(), reserveScope, item.Address, stamp),
)
}

return errors.Join(errs,
trx.IndexStore().Delete(item),
trx.IndexStore().Delete(&ChunkBinItem{Bin: item.Bin, BinID: item.BinID}),
)
}

func (r *Reserve) IterateBin(bin uint8, startBinID uint64, cb func(swarm.Address, uint64, []byte, []byte) (bool, error)) error {
err := r.st.IndexStore().Iterate(storage.Query{
Factory: func() storage.Item { return &ChunkBinItem{} },
Expand Down
134 changes: 134 additions & 0 deletions pkg/storer/internal/reserve/reserve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
chunk "github.com/ethersphere/bee/v2/pkg/storage/testing"
"github.com/ethersphere/bee/v2/pkg/storer/internal"
"github.com/ethersphere/bee/v2/pkg/storer/internal/chunkstamp"
pinstore "github.com/ethersphere/bee/v2/pkg/storer/internal/pinning"
"github.com/ethersphere/bee/v2/pkg/storer/internal/reserve"
"github.com/ethersphere/bee/v2/pkg/storer/internal/stampindex"
"github.com/ethersphere/bee/v2/pkg/storer/internal/transaction"
Expand Down Expand Up @@ -966,6 +967,139 @@ func TestReset(t *testing.T) {
}
}

// TestEvictRemovesPinnedContent checks that pinned chunks are protected from eviction.
func TestEvictRemovesPinnedContent(t *testing.T) {
t.Parallel()

const (
numChunks = 5
numPinnedChunks = 3
)

ctx := context.Background()
baseAddr := swarm.RandAddress(t)
ts := internal.NewInmemStorage()

r, err := reserve.New(
baseAddr,
ts,
0,
kademlia.NewTopologyDriver(),
log.Noop,
)
if err != nil {
t.Fatal(err)
}

batch := postagetesting.MustNewBatch()

chunks := make([]swarm.Chunk, numChunks)
for i := 0; i < numChunks; i++ {
ch := chunk.GenerateTestRandomChunkAt(t, baseAddr, 0).WithStamp(postagetesting.MustNewBatchStamp(batch.ID))
chunks[i] = ch

err := r.Put(ctx, ch)
if err != nil {
t.Fatal(err)
}
}

var pinningPutter internal.PutterCloserWithReference
err = ts.Run(ctx, func(store transaction.Store) error {
pinningPutter, err = pinstore.NewCollection(store.IndexStore())
return err
})
if err != nil {
t.Fatal(err)
}

// Add chunks to pin collection
pinnedChunks := chunks[:numPinnedChunks]
for _, ch := range pinnedChunks {
err = ts.Run(ctx, func(s transaction.Store) error {
return pinningPutter.Put(ctx, s, ch)
})
if err != nil {
t.Fatal(err)
}
}
err = ts.Run(ctx, func(s transaction.Store) error {
return pinningPutter.Close(s.IndexStore(), pinnedChunks[0].Address())
})
if err != nil {
t.Fatal(err)
}

// evict all chunks from this batch - this should NOT remove pinned chunks
evicted, err := r.EvictBatchBin(ctx, batch.ID, numChunks, swarm.MaxBins)
if err != nil {
t.Fatal(err)
}
if evicted != numChunks {
t.Fatalf("expected %d evicted chunks, got %d", numChunks, evicted)
}

uuids, err := pinstore.GetCollectionUUIDs(ts.IndexStore())
if err != nil {
t.Fatal(err)
}
if len(uuids) != 1 {
t.Fatalf("expected exactly one pin collection, but found %d", len(uuids))
}

for i, ch := range chunks {
stampHash, err := ch.Stamp().Hash()
if err != nil {
t.Fatal(err)
}

// Try to get the chunk from reserve, error is checked later
_, err = r.Get(ctx, ch.Address(), ch.Stamp().BatchID(), stampHash)

// Also try to get chunk directly from chunkstore (like bzz/bytes endpoints do)
_, chunkStoreErr := ts.ChunkStore().Get(ctx, ch.Address())

pinned := false
for _, uuid := range uuids {
has, err := pinstore.IsChunkPinnedInCollection(ts.IndexStore(), ch.Address(), uuid)
if err != nil {
t.Fatal(err)
}
if has {
pinned = true
}
}

if i < len(pinnedChunks) {
if pinned {
// This chunk is pinned, so it should NOT be accessible from reserve but SHOULD be accessible from chunkstore
if !errors.Is(err, storage.ErrNotFound) {
t.Errorf("Pinned chunk %s should have been evicted from reserve", ch.Address())
}
if errors.Is(chunkStoreErr, storage.ErrNotFound) {
t.Errorf("Pinned chunk %s was deleted from chunkstore - should remain retrievable!", ch.Address())
} else if chunkStoreErr != nil {
t.Fatal(chunkStoreErr)
}
} else {
t.Errorf("Chunk %s should be pinned", ch.Address())
}
} else { // unpinned chunks
if !pinned {
// Unpinned chunks should be completely evicted (both reserve and chunkstore)
if !errors.Is(err, storage.ErrNotFound) {
t.Errorf("Unpinned chunk %s should have been evicted from reserve", ch.Address())
}
if !errors.Is(chunkStoreErr, storage.ErrNotFound) {
t.Errorf("Unpinned chunk %s should have been evicted from chunkstore", ch.Address())
}
} else {
t.Errorf("Chunk %s should not be pinned", ch.Address())
}
}
}
}

func checkStore(t *testing.T, s storage.Reader, k storage.Key, gone bool) {
t.Helper()
h, err := s.Has(k)
Expand Down
Loading