From 2c37b35fd65bb665fe2318b5f6d7e3bc0d697d34 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Tue, 9 Sep 2025 11:11:15 +0200 Subject: [PATCH 1/5] feat: Add `DeleteFromHead` method to Store --- headertest/store.go | 28 ++++ interface.go | 3 + p2p/server_test.go | 5 + store/store_delete.go | 155 ++++++++++++++++++++ store/store_test.go | 329 ++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 520 insertions(+) diff --git a/headertest/store.go b/headertest/store.go index 8ce96213..c205264e 100644 --- a/headertest/store.go +++ b/headertest/store.go @@ -99,6 +99,34 @@ func (m *Store[H]) DeleteTo(ctx context.Context, to uint64) error { return nil } +func (m *Store[H]) DeleteFromHead(ctx context.Context, to uint64) error { + // Find the current head height + headHeight := uint64(0) + for h := range m.Headers { + if h > headHeight { + headHeight = h + } + } + + // Delete from head down to (but not including) 'to' + for h := headHeight; h > to; h-- { + _, ok := m.Headers[h] + if !ok { + continue + } + + for _, deleteFn := range m.onDelete { + err := deleteFn(ctx, h) + if err != nil { + return err + } + } + delete(m.Headers, h) // must be after deleteFn + } + + return nil +} + func (m *Store[H]) OnDelete(fn func(context.Context, uint64) error) { m.onDeleteMu.Lock() defer m.onDeleteMu.Unlock() diff --git a/interface.go b/interface.go index 1c719fa6..3f926500 100644 --- a/interface.go +++ b/interface.go @@ -88,6 +88,9 @@ type Store[H Header[H]] interface { // DeleteTo deletes the range [Tail():to). DeleteTo(ctx context.Context, to uint64) error + // DeleteFromHead deletes the range (to:Head()]. + DeleteFromHead(ctx context.Context, to uint64) error + // OnDelete registers given handler to be called whenever a header with the height is being removed. // OnDelete guarantees that the header is accessible for the handler with GetByHeight and is removed // only after the handler terminates with nil error. diff --git a/p2p/server_test.go b/p2p/server_test.go index 0af96b11..cf05546a 100644 --- a/p2p/server_test.go +++ b/p2p/server_test.go @@ -197,4 +197,9 @@ func (timeoutStore[H]) DeleteTo(ctx context.Context, _ uint64) error { return ctx.Err() } +func (timeoutStore[H]) DeleteFromHead(ctx context.Context, _ uint64) error { + <-ctx.Done() + return ctx.Err() +} + func (timeoutStore[H]) OnDelete(fn func(context.Context, uint64) error) {} diff --git a/store/store_delete.go b/store/store_delete.go index 4bea211e..3e793998 100644 --- a/store/store_delete.go +++ b/store/store_delete.go @@ -346,3 +346,158 @@ func (s *Store[H]) deleteParallel(ctx context.Context, from, to uint64) (uint64, ) return highest, missing, nil } + +// DeleteFromHead deletes headers from the current head down to (but not including) the specified height. +// The specified height becomes the new head of the store. +// This is conceptually the opposite of DeleteTo, which deletes from tail up to a height. +func (s *Store[H]) DeleteFromHead(ctx context.Context, to uint64) error { + // ensure all the pending headers are synchronized + err := s.Sync(ctx) + if err != nil { + return err + } + + head, err := s.Head(ctx) + if err != nil { + return fmt.Errorf("header/store: reading head: %w", err) + } + + tail, err := s.Tail(ctx) + if err != nil { + return fmt.Errorf("header/store: reading tail: %w", err) + } + + // validate that 'to' height is within valid bounds + if to < tail.Height() { + return fmt.Errorf( + "header/store: delete from head to %d below current tail(%d)", + to, + tail.Height(), + ) + } + if to > head.Height() { + return fmt.Errorf( + "header/store: delete from head to %d above current head(%d)", + to, + head.Height(), + ) + } + + // if to equals head height, it's a no-op + if to == head.Height() { + return nil + } + + // verify that the target height exists and will become the new head + _, err = s.getByHeight(ctx, to) + if err != nil { + return fmt.Errorf( + "header/store: target height %d not found: %w", + to, + err, + ) + } + + // delete the range (to, head.Height()] + err = s.deleteRangeFromHead(ctx, to+1, head.Height()+1) + if err != nil { + return fmt.Errorf( + "header/store: delete from head to height %d: %w", + to, + err, + ) + } + + return nil +} + +// deleteRangeFromHead deletes [from:to) header range from the store and updates the head. +// This is similar to deleteRange but updates the head pointer instead of tail. +func (s *Store[H]) deleteRangeFromHead(ctx context.Context, from, to uint64) (err error) { + startTime := time.Now() + + var ( + height uint64 + missing int + ) + defer func() { + if err != nil { + if errors.Is(err, errDeleteTimeout) { + log.Warnw("partial delete from head", + "from_height", from, + "expected_to_height", to, + "actual_to_height", height, + "hdrs_not_found", missing, + "took(s)", time.Since(startTime), + ) + } else { + log.Errorw("partial delete from head with error", + "from_height", from, + "expected_to_height", to, + "actual_to_height", height, + "hdrs_not_found", missing, + "took(s)", time.Since(startTime), + "err", err, + ) + } + } else if to-from > 1 { + log.Debugw("deleted headers from head", + "from_height", from, + "to_height", to, + "hdrs_not_found", missing, + "took(s)", time.Since(startTime).Seconds(), + ) + } + + // Set new head to the height just before the deleted range + if from > 0 { + newHeadHeight := from - 1 + if derr := s.setHead(ctx, s.ds, newHeadHeight); derr != nil { + err = errors.Join( + err, + fmt.Errorf("setting head to %d: %w", newHeadHeight, derr), + ) + } + } + }() + + deleteCtx := ctx + if deadline, ok := ctx.Deadline(); ok { + // allocate 95% of caller's set deadline for deletion + // and give leftover to save progress + // this prevents store's state corruption from partial deletion + sub := deadline.Sub(startTime) / 100 * 95 + var cancel context.CancelFunc + deleteCtx, cancel = context.WithDeadlineCause( + ctx, + startTime.Add(sub), + errDeleteTimeout, + ) + defer cancel() + } + + if to-from < deleteRangeParallelThreshold { + height, missing, err = s.deleteSequential(deleteCtx, from, to) + } else { + height, missing, err = s.deleteParallel(deleteCtx, from, to) + } + + return err +} + +// setHead sets the head of the store to the specified height. +// This is similar to setTail but updates the head pointer. +func (s *Store[H]) setHead(ctx context.Context, write datastore.Write, to uint64) error { + newHead, err := s.getByHeight(ctx, to) + if err != nil { + return fmt.Errorf("getting head: %w", err) + } + + // update the contiguous head + s.contiguousHead.Store(&newHead) + if err := writeHeaderHashTo(ctx, write, newHead, headKey); err != nil { + return fmt.Errorf("writing headKey in batch: %w", err) + } + + return nil +} diff --git a/store/store_test.go b/store/store_test.go index f9ffca0c..7a780e60 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -907,3 +907,332 @@ func TestStore_HasAt(t *testing.T) { has = store.HasAt(ctx, 0) assert.False(t, has) } + +func TestStore_DeleteFromHead(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + t.Cleanup(cancel) + + suite := headertest.NewTestSuite(t) + + ds := sync.MutexWrap(datastore.NewMapDatastore()) + store := NewTestStore(t, ctx, ds, suite.Head(), WithWriteBatchSize(10)) + + const count = 100 + in := suite.GenDummyHeaders(count) + err := store.Append(ctx, in...) + require.NoError(t, err) + + hashes := make(map[uint64]header.Hash, count) + for _, h := range in { + hashes[h.Height()] = h.Hash() + } + + // wait until headers are written + time.Sleep(100 * time.Millisecond) + + tests := []struct { + name string + to uint64 + wantHead uint64 + wantError bool + }{ + { + name: "initial delete from head request", + to: 85, + wantHead: 85, + wantError: false, + }, + { + name: "no-op delete request - to equals current head", + to: 85, // same as previous head + wantError: false, + }, + { + name: "valid delete from head request", + to: 50, + wantHead: 50, + wantError: false, + }, + { + name: "delete to height above current head", + to: 200, + wantError: true, + }, + { + name: "delete to height below tail", + to: 0, + wantError: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + beforeHead, err := store.Head(ctx) + require.NoError(t, err) + beforeTail, err := store.Tail(ctx) + require.NoError(t, err) + + // manually add something to the pending for assert at the bottom + if idx := beforeHead.Height() - 1; idx < count && idx > 0 { + store.pending.Append(in[idx-1]) + defer store.pending.Reset() + } + + ctx, cancel := context.WithTimeout(ctx, time.Second) + defer cancel() + + err = store.DeleteFromHead(ctx, tt.to) + if tt.wantError { + assert.Error(t, err) + return + } + require.NoError(t, err) + + // check that cache and pending doesn't contain deleted headers + for h := tt.to + 1; h <= beforeHead.Height(); h++ { + hash, ok := hashes[h] + if !ok { + continue // skip heights that weren't in our original set + } + assert.False( + t, + store.cache.Contains(hash.String()), + "height %d should be removed from cache", + h, + ) + assert.False( + t, + store.heightIndex.cache.Contains(h), + "height %d should be removed from height index", + h, + ) + assert.False( + t, + store.pending.Has(hash), + "height %d should be removed from pending", + h, + ) + } + + // verify new head is correct + if tt.wantHead > 0 { + head, err := store.Head(ctx) + require.NoError(t, err) + require.EqualValues(t, tt.wantHead, head.Height()) + } + + // verify tail hasn't changed + tail, err := store.Tail(ctx) + require.NoError(t, err) + require.EqualValues(t, beforeTail.Height(), tail.Height()) + + // verify headers below 'to' still exist + for h := beforeTail.Height(); h <= tt.to; h++ { + has := store.HasAt(ctx, h) + assert.True(t, has, "height %d should still exist", h) + } + }) + } +} + +func TestStore_DeleteFromHead_EmptyStore(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + t.Cleanup(cancel) + + ds := sync.MutexWrap(datastore.NewMapDatastore()) + store, err := NewStore[*headertest.DummyHeader](ds) + require.NoError(t, err) + + err = store.Start(ctx) + require.NoError(t, err) + t.Cleanup(func() { + _ = store.Stop(ctx) + }) + + // wait for store to initialize + time.Sleep(10 * time.Millisecond) + + // should handle empty store gracefully + err = store.DeleteFromHead(ctx, 50) + require.Error(t, err) // should error because store is empty +} + +func TestStore_DeleteFromHead_SingleHeader(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + t.Cleanup(cancel) + + suite := headertest.NewTestSuite(t) + + ds := sync.MutexWrap(datastore.NewMapDatastore()) + store := NewTestStore(t, ctx, ds, suite.Head(), WithWriteBatchSize(10)) + + // Add single header at height 1 (genesis is at 0) + headers := suite.GenDummyHeaders(1) + err := store.Append(ctx, headers...) + require.NoError(t, err) + + time.Sleep(10 * time.Millisecond) + + // Should not be able to delete the only header + err = store.DeleteFromHead(ctx, 0) + require.Error(t, err) // should error - would delete below tail +} + +func TestStore_DeleteFromHead_Synchronized(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + t.Cleanup(cancel) + + suite := headertest.NewTestSuite(t) + + ds := sync.MutexWrap(datastore.NewMapDatastore()) + store := NewTestStore(t, ctx, ds, suite.Head(), WithWriteBatchSize(10)) + + err := store.Append(ctx, suite.GenDummyHeaders(50)...) + require.NoError(t, err) + + time.Sleep(100 * time.Millisecond) + + // Ensure sync completes + err = store.Sync(ctx) + require.NoError(t, err) + + err = store.DeleteFromHead(ctx, 25) + require.NoError(t, err) + + // Verify head is now at height 25 + head, err := store.Head(ctx) + require.NoError(t, err) + require.EqualValues(t, 25, head.Height()) + + // Verify headers above 25 are gone + for h := uint64(26); h <= 50; h++ { + has := store.HasAt(ctx, h) + assert.False(t, has, "height %d should be deleted", h) + } + + // Verify headers at and below 25 still exist + for h := uint64(1); h <= 25; h++ { + has := store.HasAt(ctx, h) + assert.True(t, has, "height %d should still exist", h) + } +} + +func TestStore_DeleteFromHead_OnDeleteHandlers(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + t.Cleanup(cancel) + + suite := headertest.NewTestSuite(t) + + ds := sync.MutexWrap(datastore.NewMapDatastore()) + store := NewTestStore(t, ctx, ds, suite.Head(), WithWriteBatchSize(10)) + + err := store.Append(ctx, suite.GenDummyHeaders(50)...) + require.NoError(t, err) + + time.Sleep(100 * time.Millisecond) + + // Get the actual head height to calculate expected deletions + head, err := store.Head(ctx) + require.NoError(t, err) + + var deletedHeights []uint64 + store.OnDelete(func(ctx context.Context, height uint64) error { + deletedHeights = append(deletedHeights, height) + return nil + }) + + err = store.DeleteFromHead(ctx, 40) + require.NoError(t, err) + + // Verify onDelete was called for each deleted height (from 41 to head height) + var expectedDeleted []uint64 + for h := uint64(41); h <= head.Height(); h++ { + expectedDeleted = append(expectedDeleted, h) + } + assert.ElementsMatch(t, expectedDeleted, deletedHeights) +} + +func TestStore_DeleteFromHead_LargeRange(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + t.Cleanup(cancel) + + suite := headertest.NewTestSuite(t) + + ds := sync.MutexWrap(datastore.NewMapDatastore()) + store := NewTestStore(t, ctx, ds, suite.Head(), WithWriteBatchSize(100)) + + // Create a large number of headers to trigger parallel deletion + const count = 15000 + headers := suite.GenDummyHeaders(count) + err := store.Append(ctx, headers...) + require.NoError(t, err) + + time.Sleep(500 * time.Millisecond) // allow time for large batch to write + + // Delete a large range to test parallel deletion path + const keepHeight = 5000 + err = store.DeleteFromHead(ctx, keepHeight) + require.NoError(t, err) + + // Verify new head + head, err := store.Head(ctx) + require.NoError(t, err) + require.EqualValues(t, keepHeight, head.Height()) + + // Spot check that high numbered headers are gone + for h := uint64(keepHeight + 1000); h <= count; h += 1000 { + has := store.HasAt(ctx, h) + assert.False(t, has, "height %d should be deleted", h) + } + + // Spot check that low numbered headers still exist + for h := uint64(1000); h <= keepHeight; h += 1000 { + has := store.HasAt(ctx, h) + assert.True(t, has, "height %d should still exist", h) + } +} + +func TestStore_DeleteFromHead_ValidationErrors(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + t.Cleanup(cancel) + + suite := headertest.NewTestSuite(t) + + ds := sync.MutexWrap(datastore.NewMapDatastore()) + store := NewTestStore(t, ctx, ds, suite.Head(), WithWriteBatchSize(10)) + + err := store.Append(ctx, suite.GenDummyHeaders(20)...) + require.NoError(t, err) + + time.Sleep(100 * time.Millisecond) + + head, err := store.Head(ctx) + require.NoError(t, err) + tail, err := store.Tail(ctx) + require.NoError(t, err) + + tests := []struct { + name string + to uint64 + errMsg string + }{ + { + name: "to below tail", + to: tail.Height() - 1, + errMsg: "below current tail", + }, + { + name: "to above head", + to: head.Height() + 1, + errMsg: "above current head", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := store.DeleteFromHead(ctx, tt.to) + require.Error(t, err) + assert.Contains(t, err.Error(), tt.errMsg) + }) + } +} From 4c3fc7cb55406c767726d5c8b0d2eeb619dfa0b1 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Tue, 9 Sep 2025 13:19:53 +0200 Subject: [PATCH 2/5] refactor --- headertest/store.go | 19 +-- interface.go | 4 +- p2p/server_test.go | 2 +- store/store_delete.go | 236 ++++++++++-------------------- store/store_test.go | 329 ------------------------------------------ 5 files changed, 85 insertions(+), 505 deletions(-) diff --git a/headertest/store.go b/headertest/store.go index c205264e..9c1dd731 100644 --- a/headertest/store.go +++ b/headertest/store.go @@ -99,17 +99,9 @@ func (m *Store[H]) DeleteTo(ctx context.Context, to uint64) error { return nil } -func (m *Store[H]) DeleteFromHead(ctx context.Context, to uint64) error { - // Find the current head height - headHeight := uint64(0) - for h := range m.Headers { - if h > headHeight { - headHeight = h - } - } - - // Delete from head down to (but not including) 'to' - for h := headHeight; h > to; h-- { +func (m *Store[H]) DeleteRange(ctx context.Context, from, to uint64) error { + // Delete headers in the range [from:to) + for h := from; h < to; h++ { _, ok := m.Headers[h] if !ok { continue @@ -124,6 +116,11 @@ func (m *Store[H]) DeleteFromHead(ctx context.Context, to uint64) error { delete(m.Headers, h) // must be after deleteFn } + // Update TailHeight if we deleted from the beginning + if from <= m.TailHeight { + m.TailHeight = to + } + return nil } diff --git a/interface.go b/interface.go index 3f926500..d139e719 100644 --- a/interface.go +++ b/interface.go @@ -88,8 +88,8 @@ type Store[H Header[H]] interface { // DeleteTo deletes the range [Tail():to). DeleteTo(ctx context.Context, to uint64) error - // DeleteFromHead deletes the range (to:Head()]. - DeleteFromHead(ctx context.Context, to uint64) error + // DeleteRange deletes the range [from:to). + DeleteRange(ctx context.Context, from, to uint64) error // OnDelete registers given handler to be called whenever a header with the height is being removed. // OnDelete guarantees that the header is accessible for the handler with GetByHeight and is removed diff --git a/p2p/server_test.go b/p2p/server_test.go index cf05546a..e53808c1 100644 --- a/p2p/server_test.go +++ b/p2p/server_test.go @@ -197,7 +197,7 @@ func (timeoutStore[H]) DeleteTo(ctx context.Context, _ uint64) error { return ctx.Err() } -func (timeoutStore[H]) DeleteFromHead(ctx context.Context, _ uint64) error { +func (timeoutStore[H]) DeleteRange(ctx context.Context, _, _ uint64) error { <-ctx.Done() return ctx.Err() } diff --git a/store/store_delete.go b/store/store_delete.go index 3e793998..3c9724f8 100644 --- a/store/store_delete.go +++ b/store/store_delete.go @@ -10,8 +10,6 @@ import ( "time" "github.com/ipfs/go-datastore" - - "github.com/celestiaorg/go-header" ) // OnDelete implements [header.Store] interface. @@ -35,56 +33,14 @@ func (s *Store[H]) OnDelete(fn func(context.Context, uint64) error) { } // DeleteTo implements [header.Store] interface. +// This is a convenience wrapper around DeleteRange that deletes from tail up to a height. func (s *Store[H]) DeleteTo(ctx context.Context, to uint64) error { - // ensure all the pending headers are synchronized - err := s.Sync(ctx) - if err != nil { - return err - } - - head, err := s.Head(ctx) - if err != nil { - return fmt.Errorf("header/store: reading head: %w", err) - } - if head.Height()+1 < to { - _, err := s.getByHeight(ctx, to) - if errors.Is(err, header.ErrNotFound) { - return fmt.Errorf( - "header/store: delete to %d beyond current head(%d)", - to, - head.Height(), - ) - } - if err != nil { - return fmt.Errorf("delete to potential new head: %w", err) - } - - // if `to` is bigger than the current head and is stored - allow delete, making `to` a new head - } - tail, err := s.Tail(ctx) if err != nil { return fmt.Errorf("header/store: reading tail: %w", err) } - if tail.Height() >= to { - return fmt.Errorf("header/store: delete to %d below current tail(%d)", to, tail.Height()) - } - err = s.deleteRange(ctx, tail.Height(), to) - if errors.Is(err, header.ErrNotFound) && head.Height()+1 == to { - // this is the case where we have deleted all the headers - // wipe the store - if err := s.wipe(ctx); err != nil { - return fmt.Errorf("header/store: wipe: %w", err) - } - - return nil - } - if err != nil { - return fmt.Errorf("header/store: delete to height %d: %w", to, err) - } - - return nil + return s.DeleteRange(ctx, tail.Height(), to) } // deleteRangeParallelThreshold defines the threshold for parallel deletion. @@ -94,69 +50,6 @@ var ( errDeleteTimeout = errors.New("delete timeout") ) -// deleteRange deletes [from:to) header range from the store. -// It gracefully handles context and errors attempting to save interrupted progress. -func (s *Store[H]) deleteRange(ctx context.Context, from, to uint64) (err error) { - startTime := time.Now() - - var ( - height uint64 - missing int - ) - defer func() { - if err != nil { - if errors.Is(err, errDeleteTimeout) { - log.Warnw("partial delete", - "from_height", from, - "expected_to_height", to, - "actual_to_height", height, - "hdrs_not_found", missing, - "took(s)", time.Since(startTime), - ) - } else { - log.Errorw("partial delete with error", - "from_height", from, - "expected_to_height", to, - "actual_to_height", height, - "hdrs_not_found", missing, - "took(s)", time.Since(startTime), - "err", err, - ) - } - } else if to-from > 1 { - log.Debugw("deleted headers", - "from_height", from, - "to_height", to, - "hdrs_not_found", missing, - "took(s)", time.Since(startTime).Seconds(), - ) - } - - if derr := s.setTail(ctx, s.ds, height); derr != nil { - err = errors.Join(err, fmt.Errorf("setting tail to %d: %w", height, derr)) - } - }() - - deleteCtx := ctx - if deadline, ok := ctx.Deadline(); ok { - // allocate 95% of caller's set deadline for deletion - // and give leftover to save progress - // this prevents store's state corruption from partial deletion - sub := deadline.Sub(startTime) / 100 * 95 - var cancel context.CancelFunc - deleteCtx, cancel = context.WithDeadlineCause(ctx, startTime.Add(sub), errDeleteTimeout) - defer cancel() - } - - if to-from < deleteRangeParallelThreshold { - height, missing, err = s.deleteSequential(deleteCtx, from, to) - } else { - height, missing, err = s.deleteParallel(deleteCtx, from, to) - } - - return err -} - // deleteSingle deletes a single header from the store, // its caches and indexies, notifying any registered onDelete handlers. func (s *Store[H]) deleteSingle( @@ -347,10 +240,9 @@ func (s *Store[H]) deleteParallel(ctx context.Context, from, to uint64) (uint64, return highest, missing, nil } -// DeleteFromHead deletes headers from the current head down to (but not including) the specified height. -// The specified height becomes the new head of the store. -// This is conceptually the opposite of DeleteTo, which deletes from tail up to a height. -func (s *Store[H]) DeleteFromHead(ctx context.Context, to uint64) error { +// DeleteRange deletes headers in the range [from:to) from the store. +// It intelligently updates head and/or tail pointers based on what range is being deleted. +func (s *Store[H]) DeleteRange(ctx context.Context, from, to uint64) error { // ensure all the pending headers are synchronized err := s.Sync(ctx) if err != nil { @@ -367,53 +259,90 @@ func (s *Store[H]) DeleteFromHead(ctx context.Context, to uint64) error { return fmt.Errorf("header/store: reading tail: %w", err) } - // validate that 'to' height is within valid bounds - if to < tail.Height() { + // validate range parameters + if from >= to { return fmt.Errorf( - "header/store: delete from head to %d below current tail(%d)", + "header/store: invalid range [%d:%d) - from must be less than to", + from, to, - tail.Height(), ) } - if to > head.Height() { + + if from < tail.Height() { return fmt.Errorf( - "header/store: delete from head to %d above current head(%d)", - to, - head.Height(), + "header/store: delete range from %d below current tail(%d)", + from, + tail.Height(), ) } - // if to equals head height, it's a no-op - if to == head.Height() { + // Note: Allow deletion beyond head to match original DeleteTo behavior + // Missing headers in the range will be handled gracefully by the deletion logic + + // if range is empty within the current store bounds, it's a no-op + if from > head.Height() || to <= tail.Height() { return nil } - // verify that the target height exists and will become the new head - _, err = s.getByHeight(ctx, to) - if err != nil { - return fmt.Errorf( - "header/store: target height %d not found: %w", - to, - err, - ) + // Check if we're deleting all existing headers (making store empty) + // Only wipe if 'to' is exactly at head+1 (normal case) to avoid accidental wipes + if from <= tail.Height() && to == head.Height()+1 { + // Check if any headers exist at or beyond 'to' + hasHeadersAtOrBeyond := false + for checkHeight := to; checkHeight <= to+10; checkHeight++ { + if _, err := s.getByHeight(ctx, checkHeight); err == nil { + hasHeadersAtOrBeyond = true + break + } + } + + if !hasHeadersAtOrBeyond { + // wipe the entire store + if err := s.wipe(ctx); err != nil { + return fmt.Errorf("header/store: wipe: %w", err) + } + return nil + } } - // delete the range (to, head.Height()] - err = s.deleteRangeFromHead(ctx, to+1, head.Height()+1) + // Determine which pointers need updating + updateTail := from <= tail.Height() + updateHead := to > head.Height() + + // Delete the headers without automatic tail updates + err = s.deleteRangeRaw(ctx, from, to) if err != nil { - return fmt.Errorf( - "header/store: delete from head to height %d: %w", - to, - err, - ) + return fmt.Errorf("header/store: delete range [%d:%d): %w", from, to, err) + } + + // Update tail if we deleted from the beginning + if updateTail { + _, err = s.getByHeight(ctx, to) + if err != nil { + return fmt.Errorf("header/store: new tail height %d not found: %w", to, err) + } + err = s.setTail(ctx, s.ds, to) + if err != nil { + return fmt.Errorf("header/store: setting tail to %d: %w", to, err) + } + } + + // Update head if we deleted from the end + if updateHead && from > tail.Height() { + newHeadHeight := from - 1 + if newHeadHeight >= tail.Height() { + err = s.setHead(ctx, s.ds, newHeadHeight) + if err != nil { + return fmt.Errorf("header/store: setting head to %d: %w", newHeadHeight, err) + } + } } return nil } -// deleteRangeFromHead deletes [from:to) header range from the store and updates the head. -// This is similar to deleteRange but updates the head pointer instead of tail. -func (s *Store[H]) deleteRangeFromHead(ctx context.Context, from, to uint64) (err error) { +// deleteRangeRaw deletes [from:to) header range without updating head or tail pointers. +func (s *Store[H]) deleteRangeRaw(ctx context.Context, from, to uint64) (err error) { startTime := time.Now() var ( @@ -423,7 +352,7 @@ func (s *Store[H]) deleteRangeFromHead(ctx context.Context, from, to uint64) (er defer func() { if err != nil { if errors.Is(err, errDeleteTimeout) { - log.Warnw("partial delete from head", + log.Warnw("partial delete range", "from_height", from, "expected_to_height", to, "actual_to_height", height, @@ -431,7 +360,7 @@ func (s *Store[H]) deleteRangeFromHead(ctx context.Context, from, to uint64) (er "took(s)", time.Since(startTime), ) } else { - log.Errorw("partial delete from head with error", + log.Errorw("partial delete range with error", "from_height", from, "expected_to_height", to, "actual_to_height", height, @@ -441,38 +370,22 @@ func (s *Store[H]) deleteRangeFromHead(ctx context.Context, from, to uint64) (er ) } } else if to-from > 1 { - log.Debugw("deleted headers from head", + log.Debugw("deleted range", "from_height", from, "to_height", to, "hdrs_not_found", missing, "took(s)", time.Since(startTime).Seconds(), ) } - - // Set new head to the height just before the deleted range - if from > 0 { - newHeadHeight := from - 1 - if derr := s.setHead(ctx, s.ds, newHeadHeight); derr != nil { - err = errors.Join( - err, - fmt.Errorf("setting head to %d: %w", newHeadHeight, derr), - ) - } - } }() deleteCtx := ctx if deadline, ok := ctx.Deadline(); ok { // allocate 95% of caller's set deadline for deletion // and give leftover to save progress - // this prevents store's state corruption from partial deletion sub := deadline.Sub(startTime) / 100 * 95 var cancel context.CancelFunc - deleteCtx, cancel = context.WithDeadlineCause( - ctx, - startTime.Add(sub), - errDeleteTimeout, - ) + deleteCtx, cancel = context.WithDeadlineCause(ctx, startTime.Add(sub), errDeleteTimeout) defer cancel() } @@ -486,7 +399,6 @@ func (s *Store[H]) deleteRangeFromHead(ctx context.Context, from, to uint64) (er } // setHead sets the head of the store to the specified height. -// This is similar to setTail but updates the head pointer. func (s *Store[H]) setHead(ctx context.Context, write datastore.Write, to uint64) error { newHead, err := s.getByHeight(ctx, to) if err != nil { diff --git a/store/store_test.go b/store/store_test.go index 7a780e60..f9ffca0c 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -907,332 +907,3 @@ func TestStore_HasAt(t *testing.T) { has = store.HasAt(ctx, 0) assert.False(t, has) } - -func TestStore_DeleteFromHead(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) - t.Cleanup(cancel) - - suite := headertest.NewTestSuite(t) - - ds := sync.MutexWrap(datastore.NewMapDatastore()) - store := NewTestStore(t, ctx, ds, suite.Head(), WithWriteBatchSize(10)) - - const count = 100 - in := suite.GenDummyHeaders(count) - err := store.Append(ctx, in...) - require.NoError(t, err) - - hashes := make(map[uint64]header.Hash, count) - for _, h := range in { - hashes[h.Height()] = h.Hash() - } - - // wait until headers are written - time.Sleep(100 * time.Millisecond) - - tests := []struct { - name string - to uint64 - wantHead uint64 - wantError bool - }{ - { - name: "initial delete from head request", - to: 85, - wantHead: 85, - wantError: false, - }, - { - name: "no-op delete request - to equals current head", - to: 85, // same as previous head - wantError: false, - }, - { - name: "valid delete from head request", - to: 50, - wantHead: 50, - wantError: false, - }, - { - name: "delete to height above current head", - to: 200, - wantError: true, - }, - { - name: "delete to height below tail", - to: 0, - wantError: true, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - beforeHead, err := store.Head(ctx) - require.NoError(t, err) - beforeTail, err := store.Tail(ctx) - require.NoError(t, err) - - // manually add something to the pending for assert at the bottom - if idx := beforeHead.Height() - 1; idx < count && idx > 0 { - store.pending.Append(in[idx-1]) - defer store.pending.Reset() - } - - ctx, cancel := context.WithTimeout(ctx, time.Second) - defer cancel() - - err = store.DeleteFromHead(ctx, tt.to) - if tt.wantError { - assert.Error(t, err) - return - } - require.NoError(t, err) - - // check that cache and pending doesn't contain deleted headers - for h := tt.to + 1; h <= beforeHead.Height(); h++ { - hash, ok := hashes[h] - if !ok { - continue // skip heights that weren't in our original set - } - assert.False( - t, - store.cache.Contains(hash.String()), - "height %d should be removed from cache", - h, - ) - assert.False( - t, - store.heightIndex.cache.Contains(h), - "height %d should be removed from height index", - h, - ) - assert.False( - t, - store.pending.Has(hash), - "height %d should be removed from pending", - h, - ) - } - - // verify new head is correct - if tt.wantHead > 0 { - head, err := store.Head(ctx) - require.NoError(t, err) - require.EqualValues(t, tt.wantHead, head.Height()) - } - - // verify tail hasn't changed - tail, err := store.Tail(ctx) - require.NoError(t, err) - require.EqualValues(t, beforeTail.Height(), tail.Height()) - - // verify headers below 'to' still exist - for h := beforeTail.Height(); h <= tt.to; h++ { - has := store.HasAt(ctx, h) - assert.True(t, has, "height %d should still exist", h) - } - }) - } -} - -func TestStore_DeleteFromHead_EmptyStore(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) - t.Cleanup(cancel) - - ds := sync.MutexWrap(datastore.NewMapDatastore()) - store, err := NewStore[*headertest.DummyHeader](ds) - require.NoError(t, err) - - err = store.Start(ctx) - require.NoError(t, err) - t.Cleanup(func() { - _ = store.Stop(ctx) - }) - - // wait for store to initialize - time.Sleep(10 * time.Millisecond) - - // should handle empty store gracefully - err = store.DeleteFromHead(ctx, 50) - require.Error(t, err) // should error because store is empty -} - -func TestStore_DeleteFromHead_SingleHeader(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) - t.Cleanup(cancel) - - suite := headertest.NewTestSuite(t) - - ds := sync.MutexWrap(datastore.NewMapDatastore()) - store := NewTestStore(t, ctx, ds, suite.Head(), WithWriteBatchSize(10)) - - // Add single header at height 1 (genesis is at 0) - headers := suite.GenDummyHeaders(1) - err := store.Append(ctx, headers...) - require.NoError(t, err) - - time.Sleep(10 * time.Millisecond) - - // Should not be able to delete the only header - err = store.DeleteFromHead(ctx, 0) - require.Error(t, err) // should error - would delete below tail -} - -func TestStore_DeleteFromHead_Synchronized(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) - t.Cleanup(cancel) - - suite := headertest.NewTestSuite(t) - - ds := sync.MutexWrap(datastore.NewMapDatastore()) - store := NewTestStore(t, ctx, ds, suite.Head(), WithWriteBatchSize(10)) - - err := store.Append(ctx, suite.GenDummyHeaders(50)...) - require.NoError(t, err) - - time.Sleep(100 * time.Millisecond) - - // Ensure sync completes - err = store.Sync(ctx) - require.NoError(t, err) - - err = store.DeleteFromHead(ctx, 25) - require.NoError(t, err) - - // Verify head is now at height 25 - head, err := store.Head(ctx) - require.NoError(t, err) - require.EqualValues(t, 25, head.Height()) - - // Verify headers above 25 are gone - for h := uint64(26); h <= 50; h++ { - has := store.HasAt(ctx, h) - assert.False(t, has, "height %d should be deleted", h) - } - - // Verify headers at and below 25 still exist - for h := uint64(1); h <= 25; h++ { - has := store.HasAt(ctx, h) - assert.True(t, has, "height %d should still exist", h) - } -} - -func TestStore_DeleteFromHead_OnDeleteHandlers(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) - t.Cleanup(cancel) - - suite := headertest.NewTestSuite(t) - - ds := sync.MutexWrap(datastore.NewMapDatastore()) - store := NewTestStore(t, ctx, ds, suite.Head(), WithWriteBatchSize(10)) - - err := store.Append(ctx, suite.GenDummyHeaders(50)...) - require.NoError(t, err) - - time.Sleep(100 * time.Millisecond) - - // Get the actual head height to calculate expected deletions - head, err := store.Head(ctx) - require.NoError(t, err) - - var deletedHeights []uint64 - store.OnDelete(func(ctx context.Context, height uint64) error { - deletedHeights = append(deletedHeights, height) - return nil - }) - - err = store.DeleteFromHead(ctx, 40) - require.NoError(t, err) - - // Verify onDelete was called for each deleted height (from 41 to head height) - var expectedDeleted []uint64 - for h := uint64(41); h <= head.Height(); h++ { - expectedDeleted = append(expectedDeleted, h) - } - assert.ElementsMatch(t, expectedDeleted, deletedHeights) -} - -func TestStore_DeleteFromHead_LargeRange(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) - t.Cleanup(cancel) - - suite := headertest.NewTestSuite(t) - - ds := sync.MutexWrap(datastore.NewMapDatastore()) - store := NewTestStore(t, ctx, ds, suite.Head(), WithWriteBatchSize(100)) - - // Create a large number of headers to trigger parallel deletion - const count = 15000 - headers := suite.GenDummyHeaders(count) - err := store.Append(ctx, headers...) - require.NoError(t, err) - - time.Sleep(500 * time.Millisecond) // allow time for large batch to write - - // Delete a large range to test parallel deletion path - const keepHeight = 5000 - err = store.DeleteFromHead(ctx, keepHeight) - require.NoError(t, err) - - // Verify new head - head, err := store.Head(ctx) - require.NoError(t, err) - require.EqualValues(t, keepHeight, head.Height()) - - // Spot check that high numbered headers are gone - for h := uint64(keepHeight + 1000); h <= count; h += 1000 { - has := store.HasAt(ctx, h) - assert.False(t, has, "height %d should be deleted", h) - } - - // Spot check that low numbered headers still exist - for h := uint64(1000); h <= keepHeight; h += 1000 { - has := store.HasAt(ctx, h) - assert.True(t, has, "height %d should still exist", h) - } -} - -func TestStore_DeleteFromHead_ValidationErrors(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) - t.Cleanup(cancel) - - suite := headertest.NewTestSuite(t) - - ds := sync.MutexWrap(datastore.NewMapDatastore()) - store := NewTestStore(t, ctx, ds, suite.Head(), WithWriteBatchSize(10)) - - err := store.Append(ctx, suite.GenDummyHeaders(20)...) - require.NoError(t, err) - - time.Sleep(100 * time.Millisecond) - - head, err := store.Head(ctx) - require.NoError(t, err) - tail, err := store.Tail(ctx) - require.NoError(t, err) - - tests := []struct { - name string - to uint64 - errMsg string - }{ - { - name: "to below tail", - to: tail.Height() - 1, - errMsg: "below current tail", - }, - { - name: "to above head", - to: head.Height() + 1, - errMsg: "above current head", - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - err := store.DeleteFromHead(ctx, tt.to) - require.Error(t, err) - assert.Contains(t, err.Error(), tt.errMsg) - }) - } -} From 41f74467ef6e031ae16708b27016b9a728e3a852 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Tue, 9 Sep 2025 15:14:45 +0200 Subject: [PATCH 3/5] add tests --- store/store_delete.go | 13 ++ store/store_test.go | 409 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 422 insertions(+) diff --git a/store/store_delete.go b/store/store_delete.go index 3c9724f8..efa051c4 100644 --- a/store/store_delete.go +++ b/store/store_delete.go @@ -284,6 +284,19 @@ func (s *Store[H]) DeleteRange(ctx context.Context, from, to uint64) error { return nil } + // Validate that deletion won't create gaps in the store + // Only allow deletions that: + // 1. Start from tail (advancing tail forward) + // 2. End at head+1 (moving head backward) + // 3. Delete the entire store + if from > tail.Height() && to <= head.Height() { + return fmt.Errorf( + "header/store: deletion range [%d:%d) would create gaps in the store. "+ + "Only deletion from tail (%d) or to head+1 (%d) is supported", + from, to, tail.Height(), head.Height()+1, + ) + } + // Check if we're deleting all existing headers (making store empty) // Only wipe if 'to' is exactly at head+1 (normal case) to avoid accidental wipes if from <= tail.Height() && to == head.Height()+1 { diff --git a/store/store_test.go b/store/store_test.go index f9ffca0c..e5902617 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "math/rand" + "strings" stdsync "sync" "testing" "time" @@ -907,3 +908,411 @@ func TestStore_HasAt(t *testing.T) { has = store.HasAt(ctx, 0) assert.False(t, has) } + +func TestStore_DeleteRange(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + t.Cleanup(cancel) + + t.Run("delete range from head down", func(t *testing.T) { + suite := headertest.NewTestSuite(t) + ds := sync.MutexWrap(datastore.NewMapDatastore()) + store := NewTestStore(t, ctx, ds, suite.Head(), WithWriteBatchSize(10)) + + const count = 20 + in := suite.GenDummyHeaders(count) + err := store.Append(ctx, in...) + require.NoError(t, err) + + time.Sleep(100 * time.Millisecond) + + // Genesis is at height 1, GenDummyHeaders(20) creates headers 2-21 + // So head should be at height 21, tail at height 1 + head, err := store.Head(ctx) + require.NoError(t, err) + assert.Equal(t, uint64(21), head.Height()) + + // Delete from height 16 to 22 (should delete 16, 17, 18, 19, 20, 21) + err = store.DeleteRange(ctx, 16, 22) + require.NoError(t, err) + + // Verify new head is at height 15 + newHead, err := store.Head(ctx) + require.NoError(t, err) + assert.Equal(t, uint64(15), newHead.Height()) + + // Verify deleted headers are gone + for h := uint64(16); h <= 21; h++ { + has := store.HasAt(ctx, h) + assert.False(t, has, "height %d should be deleted", h) + } + + // Verify remaining headers still exist + for h := uint64(1); h <= 15; h++ { + has := store.HasAt(ctx, h) + assert.True(t, has, "height %d should still exist", h) + } + }) + + t.Run("delete range in middle should fail", func(t *testing.T) { + suite := headertest.NewTestSuite(t) + ds := sync.MutexWrap(datastore.NewMapDatastore()) + store := NewTestStore(t, ctx, ds, suite.Head(), WithWriteBatchSize(10)) + + const count = 20 + in := suite.GenDummyHeaders(count) + err := store.Append(ctx, in...) + require.NoError(t, err) + + time.Sleep(100 * time.Millisecond) + + // Try to delete a range in the middle (heights 8-12) which would create gaps + err = store.DeleteRange(ctx, 8, 12) + require.Error(t, err) + assert.Contains(t, err.Error(), "would create gaps in the store") + + // Verify all headers still exist since the operation failed + for h := uint64(1); h <= 21; h++ { + has := store.HasAt(ctx, h) + assert.True(t, has, "height %d should still exist after failed deletion", h) + } + }) + + t.Run("delete range from tail up", func(t *testing.T) { + suite := headertest.NewTestSuite(t) + ds := sync.MutexWrap(datastore.NewMapDatastore()) + store := NewTestStore(t, ctx, ds, suite.Head(), WithWriteBatchSize(10)) + + const count = 20 + in := suite.GenDummyHeaders(count) + err := store.Append(ctx, in...) + require.NoError(t, err) + + time.Sleep(100 * time.Millisecond) + + originalHead, err := store.Head(ctx) + require.NoError(t, err) + + // Delete from tail height to height 10 + err = store.DeleteRange(ctx, 1, 10) + require.NoError(t, err) + + // Verify head is unchanged + head, err := store.Head(ctx) + require.NoError(t, err) + assert.Equal(t, originalHead.Height(), head.Height()) + + // Verify tail moved to height 10 + tail, err := store.Tail(ctx) + require.NoError(t, err) + assert.Equal(t, uint64(10), tail.Height()) + + // Verify deleted headers are gone + for h := uint64(1); h < 10; h++ { + has := store.HasAt(ctx, h) + assert.False(t, has, "height %d should be deleted", h) + } + + // Verify remaining headers still exist + for h := uint64(10); h <= 21; h++ { + has := store.HasAt(ctx, h) + assert.True(t, has, "height %d should still exist", h) + } + }) + + t.Run("delete range completely out of bounds", func(t *testing.T) { + suite := headertest.NewTestSuite(t) + ds := sync.MutexWrap(datastore.NewMapDatastore()) + store := NewTestStore(t, ctx, ds, suite.Head(), WithWriteBatchSize(10)) + + const count = 20 + in := suite.GenDummyHeaders(count) + err := store.Append(ctx, in...) + require.NoError(t, err) + + time.Sleep(100 * time.Millisecond) + + originalHead, err := store.Head(ctx) + require.NoError(t, err) + originalTail, err := store.Tail(ctx) + require.NoError(t, err) + + // Delete range completely above head - should be no-op + err = store.DeleteRange(ctx, 200, 300) + require.NoError(t, err) + + // Verify head and tail are unchanged + head, err := store.Head(ctx) + require.NoError(t, err) + assert.Equal(t, originalHead.Height(), head.Height()) + + tail, err := store.Tail(ctx) + require.NoError(t, err) + assert.Equal(t, originalTail.Height(), tail.Height()) + + // Verify all original headers still exist + for h := uint64(1); h <= 21; h++ { + has := store.HasAt(ctx, h) + assert.True(t, has, "height %d should still exist", h) + } + }) + + t.Run("invalid range errors", func(t *testing.T) { + suite := headertest.NewTestSuite(t) + ds := sync.MutexWrap(datastore.NewMapDatastore()) + store := NewTestStore(t, ctx, ds, suite.Head(), WithWriteBatchSize(10)) + + const count = 20 + in := suite.GenDummyHeaders(count) + err := store.Append(ctx, in...) + require.NoError(t, err) + + time.Sleep(100 * time.Millisecond) + + // from >= to should error + err = store.DeleteRange(ctx, 50, 50) + assert.Error(t, err) + assert.Contains(t, err.Error(), "from must be less than to") + + // from > to should error + err = store.DeleteRange(ctx, 60, 50) + assert.Error(t, err) + assert.Contains(t, err.Error(), "from must be less than to") + + // from below tail should error + err = store.DeleteRange(ctx, 0, 5) + assert.Error(t, err) + assert.Contains(t, err.Error(), "below current tail") + + // middle deletion should error + err = store.DeleteRange(ctx, 10, 15) + assert.Error(t, err) + assert.Contains(t, err.Error(), "would create gaps") + }) +} + +func TestStore_DeleteRange_EmptyStore(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + t.Cleanup(cancel) + + ds := sync.MutexWrap(datastore.NewMapDatastore()) + store, err := NewStore[*headertest.DummyHeader](ds) + require.NoError(t, err) + + err = store.Start(ctx) + require.NoError(t, err) + t.Cleanup(func() { + err := store.Stop(ctx) + require.NoError(t, err) + }) + + // wait until headers are written + time.Sleep(10 * time.Millisecond) + + // should fail when trying to delete from empty store because it can't read head/tail + err = store.DeleteRange(ctx, 50, 60) + require.Error(t, err) + assert.Contains(t, err.Error(), "store is empty") + + // invalid range should also error with empty store, but it will hit the empty store error first + err = store.DeleteRange(ctx, 60, 50) + require.Error(t, err) + // Could be either empty store error or range validation error, both are valid + assert.True(t, + strings.Contains(err.Error(), "store is empty") || + strings.Contains(err.Error(), "from must be less than to"), + "Expected either empty store or range validation error, got: %v", err) +} + +func TestStore_DeleteRange_SingleHeader(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + t.Cleanup(cancel) + + suite := headertest.NewTestSuite(t) + + ds := sync.MutexWrap(datastore.NewMapDatastore()) + store := NewTestStore(t, ctx, ds, suite.Head(), WithWriteBatchSize(10)) + + // Add single header at height 1 (genesis is at 0) + headers := suite.GenDummyHeaders(1) + err := store.Append(ctx, headers...) + require.NoError(t, err) + + // Should not be able to delete below tail + err = store.DeleteRange(ctx, 0, 1) + require.Error(t, err) // should error - would delete below tail +} + +func TestStore_DeleteRange_Synchronized(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + t.Cleanup(cancel) + + suite := headertest.NewTestSuite(t) + + ds := sync.MutexWrap(datastore.NewMapDatastore()) + store := NewTestStore(t, ctx, ds, suite.Head(), WithWriteBatchSize(10)) + + err := store.Append(ctx, suite.GenDummyHeaders(50)...) + require.NoError(t, err) + + time.Sleep(100 * time.Millisecond) + + // Ensure sync completes + err = store.Sync(ctx) + require.NoError(t, err) + + // Delete from height 26 to head+1 (equivalent to DeleteFromHead(25)) + head, err := store.Head(ctx) + require.NoError(t, err) + + err = store.DeleteRange(ctx, 26, head.Height()+1) + require.NoError(t, err) + + // Verify head is now at height 25 + newHead, err := store.Head(ctx) + require.NoError(t, err) + require.EqualValues(t, 25, newHead.Height()) + + // Verify headers above 25 are gone + for h := uint64(26); h <= 50; h++ { + has := store.HasAt(ctx, h) + assert.False(t, has, "height %d should be deleted", h) + } + + // Verify headers at and below 25 still exist + for h := uint64(1); h <= 25; h++ { + has := store.HasAt(ctx, h) + assert.True(t, has, "height %d should still exist", h) + } +} + +func TestStore_DeleteRange_OnDeleteHandlers(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + t.Cleanup(cancel) + + suite := headertest.NewTestSuite(t) + + ds := sync.MutexWrap(datastore.NewMapDatastore()) + store := NewTestStore(t, ctx, ds, suite.Head(), WithWriteBatchSize(10)) + + err := store.Append(ctx, suite.GenDummyHeaders(50)...) + require.NoError(t, err) + + time.Sleep(100 * time.Millisecond) + + // Get the actual head height to calculate expected deletions + head, err := store.Head(ctx) + require.NoError(t, err) + + var deletedHeights []uint64 + store.OnDelete(func(ctx context.Context, height uint64) error { + deletedHeights = append(deletedHeights, height) + return nil + }) + + // Delete from height 41 to head+1 (equivalent to DeleteFromHead(40)) + err = store.DeleteRange(ctx, 41, head.Height()+1) + require.NoError(t, err) + + // Verify onDelete was called for each deleted height (from 41 to head height) + var expectedDeleted []uint64 + for h := uint64(41); h <= head.Height(); h++ { + expectedDeleted = append(expectedDeleted, h) + } + assert.ElementsMatch(t, expectedDeleted, deletedHeights) +} + +func TestStore_DeleteRange_LargeRange(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + t.Cleanup(cancel) + + suite := headertest.NewTestSuite(t) + + ds := sync.MutexWrap(datastore.NewMapDatastore()) + store := NewTestStore(t, ctx, ds, suite.Head(), WithWriteBatchSize(100)) + + // Create a large number of headers to trigger parallel deletion + const count = 15000 + headers := suite.GenDummyHeaders(count) + err := store.Append(ctx, headers...) + require.NoError(t, err) + + time.Sleep(500 * time.Millisecond) // allow time for large batch to write + + // Get head height for deletion range + head, err := store.Head(ctx) + require.NoError(t, err) + + // Delete a large range to test parallel deletion path (from 5001 to head+1) + const keepHeight = 5000 + err = store.DeleteRange(ctx, keepHeight+1, head.Height()+1) + require.NoError(t, err) + + // Verify new head + newHead, err := store.Head(ctx) + require.NoError(t, err) + require.EqualValues(t, keepHeight, newHead.Height()) + + // Spot check that high numbered headers are gone + for h := uint64(keepHeight + 1000); h <= count; h += 1000 { + has := store.HasAt(ctx, h) + assert.False(t, has, "height %d should be deleted", h) + } + + // Spot check that low numbered headers still exist + for h := uint64(1000); h <= keepHeight; h += 1000 { + has := store.HasAt(ctx, h) + assert.True(t, has, "height %d should still exist", h) + } +} + +func TestStore_DeleteRange_ValidationErrors(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + t.Cleanup(cancel) + + suite := headertest.NewTestSuite(t) + + ds := sync.MutexWrap(datastore.NewMapDatastore()) + store := NewTestStore(t, ctx, ds, suite.Head(), WithWriteBatchSize(10)) + + err := store.Append(ctx, suite.GenDummyHeaders(20)...) + require.NoError(t, err) + + time.Sleep(100 * time.Millisecond) + + tail, err := store.Tail(ctx) + require.NoError(t, err) + + tests := []struct { + name string + from uint64 + to uint64 + errMsg string + }{ + { + name: "delete from below tail boundary", + from: tail.Height() - 1, + to: tail.Height() + 5, + errMsg: "below current tail", + }, + { + name: "invalid range - from equals to", + from: 10, + to: 10, + errMsg: "from must be less than to", + }, + { + name: "invalid range - from greater than to", + from: 15, + to: 10, + errMsg: "from must be less than to", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := store.DeleteRange(ctx, tt.from, tt.to) + require.Error(t, err) + assert.Contains(t, err.Error(), tt.errMsg) + }) + } +} From cb11b091bf585190233e4bbf4663847f2d2c6110 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Tue, 9 Sep 2025 17:15:51 +0200 Subject: [PATCH 4/5] cleanup comments --- store/store_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/store/store_test.go b/store/store_test.go index e5902617..08e231de 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -1160,7 +1160,7 @@ func TestStore_DeleteRange_Synchronized(t *testing.T) { err = store.Sync(ctx) require.NoError(t, err) - // Delete from height 26 to head+1 (equivalent to DeleteFromHead(25)) + // Delete from height 26 to head+1 head, err := store.Head(ctx) require.NoError(t, err) @@ -1209,7 +1209,7 @@ func TestStore_DeleteRange_OnDeleteHandlers(t *testing.T) { return nil }) - // Delete from height 41 to head+1 (equivalent to DeleteFromHead(40)) + // Delete from height 41 to head+1 err = store.DeleteRange(ctx, 41, head.Height()+1) require.NoError(t, err) From a4b1924e220452329f5b37ec86051de91bfb74a7 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Fri, 12 Sep 2025 09:46:37 +0200 Subject: [PATCH 5/5] feedback --- headertest/store.go | 25 +++--------- interface.go | 3 -- p2p/server_test.go | 5 --- store/store_delete.go | 15 ------- store/store_test.go | 92 +++++++++++-------------------------------- sync/syncer_tail.go | 2 +- 6 files changed, 28 insertions(+), 114 deletions(-) diff --git a/headertest/store.go b/headertest/store.go index 9c1dd731..b08b8c03 100644 --- a/headertest/store.go +++ b/headertest/store.go @@ -79,26 +79,6 @@ func (m *Store[H]) GetByHeight(_ context.Context, height uint64) (H, error) { return zero, header.ErrNotFound } -func (m *Store[H]) DeleteTo(ctx context.Context, to uint64) error { - for h := m.TailHeight; h < to; h++ { - _, ok := m.Headers[h] - if !ok { - continue - } - - for _, deleteFn := range m.onDelete { - err := deleteFn(ctx, h) - if err != nil { - return err - } - } - delete(m.Headers, h) // must be after deleteFn - } - - m.TailHeight = to - return nil -} - func (m *Store[H]) DeleteRange(ctx context.Context, from, to uint64) error { // Delete headers in the range [from:to) for h := from; h < to; h++ { @@ -121,6 +101,11 @@ func (m *Store[H]) DeleteRange(ctx context.Context, from, to uint64) error { m.TailHeight = to } + // Update HeadHeight if we deleted from the end + if to >= m.HeadHeight { + m.HeadHeight = from - 1 + } + return nil } diff --git a/interface.go b/interface.go index d139e719..de7146de 100644 --- a/interface.go +++ b/interface.go @@ -85,9 +85,6 @@ type Store[H Header[H]] interface { // GetRange returns the range [from:to). GetRange(context.Context, uint64, uint64) ([]H, error) - // DeleteTo deletes the range [Tail():to). - DeleteTo(ctx context.Context, to uint64) error - // DeleteRange deletes the range [from:to). DeleteRange(ctx context.Context, from, to uint64) error diff --git a/p2p/server_test.go b/p2p/server_test.go index e53808c1..315359fc 100644 --- a/p2p/server_test.go +++ b/p2p/server_test.go @@ -192,11 +192,6 @@ func (timeoutStore[H]) GetRange(ctx context.Context, _, _ uint64) ([]H, error) { return nil, ctx.Err() } -func (timeoutStore[H]) DeleteTo(ctx context.Context, _ uint64) error { - <-ctx.Done() - return ctx.Err() -} - func (timeoutStore[H]) DeleteRange(ctx context.Context, _, _ uint64) error { <-ctx.Done() return ctx.Err() diff --git a/store/store_delete.go b/store/store_delete.go index efa051c4..a5d5ac85 100644 --- a/store/store_delete.go +++ b/store/store_delete.go @@ -32,17 +32,6 @@ func (s *Store[H]) OnDelete(fn func(context.Context, uint64) error) { }) } -// DeleteTo implements [header.Store] interface. -// This is a convenience wrapper around DeleteRange that deletes from tail up to a height. -func (s *Store[H]) DeleteTo(ctx context.Context, to uint64) error { - tail, err := s.Tail(ctx) - if err != nil { - return fmt.Errorf("header/store: reading tail: %w", err) - } - - return s.DeleteRange(ctx, tail.Height(), to) -} - // deleteRangeParallelThreshold defines the threshold for parallel deletion. // If range is smaller than this threshold, deletion will be performed sequentially. var ( @@ -330,10 +319,6 @@ func (s *Store[H]) DeleteRange(ctx context.Context, from, to uint64) error { // Update tail if we deleted from the beginning if updateTail { - _, err = s.getByHeight(ctx, to) - if err != nil { - return fmt.Errorf("header/store: new tail height %d not found: %w", to, err) - } err = s.setTail(ctx, s.ds, to) if err != nil { return fmt.Errorf("header/store: setting tail to %d: %w", to, err) diff --git a/store/store_test.go b/store/store_test.go index 08e231de..6725a63f 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -4,7 +4,6 @@ import ( "bytes" "context" "math/rand" - "strings" stdsync "sync" "testing" "time" @@ -526,7 +525,7 @@ func TestStore_GetRange(t *testing.T) { } } -func TestStore_DeleteTo(t *testing.T) { +func TestStore_DeleteRange_Tail(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) t.Cleanup(cancel) @@ -591,7 +590,7 @@ func TestStore_DeleteTo(t *testing.T) { ctx, cancel := context.WithTimeout(ctx, time.Second) defer cancel() - err := store.DeleteTo(ctx, tt.to) + err := store.DeleteRange(ctx, from, tt.to) if tt.wantError { assert.Error(t, err) return @@ -613,7 +612,7 @@ func TestStore_DeleteTo(t *testing.T) { } } -func TestStore_DeleteTo_EmptyStore(t *testing.T) { +func TestStore_DeleteRange_EmptyStore(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) t.Cleanup(cancel) @@ -630,11 +629,14 @@ func TestStore_DeleteTo_EmptyStore(t *testing.T) { require.NoError(t, err) time.Sleep(10 * time.Millisecond) - err = store.DeleteTo(ctx, 101) + tail, err := store.Tail(ctx) + require.NoError(t, err) + + err = store.DeleteRange(ctx, tail.Height(), 101) require.NoError(t, err) // assert store is empty - tail, err := store.Tail(ctx) + tail, err = store.Tail(ctx) assert.Nil(t, tail) assert.ErrorIs(t, err, header.ErrEmptyStore) head, err := store.Head(ctx) @@ -655,7 +657,7 @@ func TestStore_DeleteTo_EmptyStore(t *testing.T) { assert.ErrorIs(t, err, header.ErrEmptyStore) } -func TestStore_DeleteTo_MoveHeadAndTail(t *testing.T) { +func TestStore_DeleteRange_MoveHeadAndTail(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) t.Cleanup(cancel) @@ -678,11 +680,14 @@ func TestStore_DeleteTo_MoveHeadAndTail(t *testing.T) { require.NoError(t, err) time.Sleep(10 * time.Millisecond) - err = store.DeleteTo(ctx, 111) + tail, err := store.Tail(ctx) + require.NoError(t, err) + + err = store.DeleteRange(ctx, tail.Height(), 111) require.NoError(t, err) // assert store is not empty - tail, err := store.Tail(ctx) + tail, err = store.Tail(ctx) require.NoError(t, err) assert.Equal(t, int(gap[len(gap)-1].Height()+1), int(tail.Height())) head, err := store.Head(ctx) @@ -703,32 +708,6 @@ func TestStore_DeleteTo_MoveHeadAndTail(t *testing.T) { assert.Equal(t, suite.Head().Height(), head.Height()) } -func TestStore_DeleteTo_Synchronized(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) - t.Cleanup(cancel) - - suite := headertest.NewTestSuite(t) - - ds := sync.MutexWrap(datastore.NewMapDatastore()) - store := NewTestStore(t, ctx, ds, suite.Head(), WithWriteBatchSize(10)) - - err := store.Append(ctx, suite.GenDummyHeaders(50)...) - require.NoError(t, err) - - err = store.Append(ctx, suite.GenDummyHeaders(50)...) - require.NoError(t, err) - - err = store.Append(ctx, suite.GenDummyHeaders(50)...) - require.NoError(t, err) - - err = store.DeleteTo(ctx, 100) - require.NoError(t, err) - - tail, err := store.Tail(ctx) - require.NoError(t, err) - require.EqualValues(t, 100, tail.Height()) -} - func TestStore_OnDelete(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) t.Cleanup(cancel) @@ -759,7 +738,10 @@ func TestStore_OnDelete(t *testing.T) { return nil }) - err = store.DeleteTo(ctx, 101) + tail, err := store.Tail(ctx) + require.NoError(t, err) + + err = store.DeleteRange(ctx, tail.Height(), 101) require.NoError(t, err) assert.Equal(t, 50, deleted) @@ -890,7 +872,10 @@ func TestStore_HasAt(t *testing.T) { require.NoError(t, err) time.Sleep(100 * time.Millisecond) - err = store.DeleteTo(ctx, 50) + tail, err := store.Tail(ctx) + require.NoError(t, err) + + err = store.DeleteRange(ctx, tail.Height(), 50) require.NoError(t, err) has := store.HasAt(ctx, 100) @@ -1090,39 +1075,6 @@ func TestStore_DeleteRange(t *testing.T) { }) } -func TestStore_DeleteRange_EmptyStore(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) - t.Cleanup(cancel) - - ds := sync.MutexWrap(datastore.NewMapDatastore()) - store, err := NewStore[*headertest.DummyHeader](ds) - require.NoError(t, err) - - err = store.Start(ctx) - require.NoError(t, err) - t.Cleanup(func() { - err := store.Stop(ctx) - require.NoError(t, err) - }) - - // wait until headers are written - time.Sleep(10 * time.Millisecond) - - // should fail when trying to delete from empty store because it can't read head/tail - err = store.DeleteRange(ctx, 50, 60) - require.Error(t, err) - assert.Contains(t, err.Error(), "store is empty") - - // invalid range should also error with empty store, but it will hit the empty store error first - err = store.DeleteRange(ctx, 60, 50) - require.Error(t, err) - // Could be either empty store error or range validation error, both are valid - assert.True(t, - strings.Contains(err.Error(), "store is empty") || - strings.Contains(err.Error(), "from must be less than to"), - "Expected either empty store or range validation error, got: %v", err) -} - func TestStore_DeleteRange_SingleHeader(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) t.Cleanup(cancel) diff --git a/sync/syncer_tail.go b/sync/syncer_tail.go index 877ff407..c7790c99 100644 --- a/sync/syncer_tail.go +++ b/sync/syncer_tail.go @@ -130,7 +130,7 @@ func (s *Syncer[H]) moveTail(ctx context.Context, from, to H) error { switch { case from.Height() < to.Height(): log.Infof("move tail up from %d to %d, pruning the diff...", from.Height(), to.Height()) - err := s.store.DeleteTo(ctx, to.Height()) + err := s.store.DeleteRange(ctx, from.Height(), to.Height()) if err != nil { return fmt.Errorf( "deleting headers up to newly configured tail(%d): %w",