Skip to content

Commit 0bb69d4

Browse files
authored
fix: in-memory intersect points (#1438)
Signed-off-by: Chris Gianelloni <wolf31o2@blinklabs.io>
1 parent 4742bd1 commit 0bb69d4

File tree

3 files changed

+239
-14
lines changed

3 files changed

+239
-14
lines changed

chain/chain.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -479,6 +479,48 @@ func (c *Chain) ClearHeaders() {
479479
c.headers = c.headers[:0]
480480
}
481481

482+
// RecentPoints returns up to count recent chain points in descending
483+
// order (most recent first) using the in-memory chain state. This
484+
// includes the current tip and, for non-persistent chains, any blocks
485+
// stored in the in-memory buffer. For persistent chains, it walks
486+
// backwards through the database using block indices.
487+
//
488+
// This method is useful for building intersection point lists that
489+
// remain accurate even when the blob store has not yet been fully
490+
// flushed, since the chain's in-memory tip is always up-to-date.
491+
func (c *Chain) RecentPoints(count int) []ocommon.Point {
492+
if c == nil || count <= 0 {
493+
return nil
494+
}
495+
c.mutex.RLock()
496+
defer c.mutex.RUnlock()
497+
// If the chain has no blocks yet, return nothing
498+
if c.tipBlockIndex < initialBlockIndex {
499+
return nil
500+
}
501+
var points []ocommon.Point
502+
// Always include the current tip
503+
tip := c.currentTip.Point
504+
if tip.Slot > 0 || len(tip.Hash) > 0 {
505+
points = append(points, tip)
506+
}
507+
if len(points) >= count {
508+
return points[:count]
509+
}
510+
// Walk backwards through block indices to gather more points
511+
for idx := c.tipBlockIndex - 1; idx >= initialBlockIndex && len(points) < count; idx-- {
512+
blk, err := c.blockByIndex(idx)
513+
if err != nil {
514+
break
515+
}
516+
points = append(
517+
points,
518+
ocommon.NewPoint(blk.Slot, blk.Hash),
519+
)
520+
}
521+
return points
522+
}
523+
482524
func (c *Chain) HeaderCount() int {
483525
c.mutex.RLock()
484526
defer c.mutex.RUnlock()

chain/chain_test.go

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -663,6 +663,148 @@ func TestChainFromIntersect(t *testing.T) {
663663
}
664664
}
665665

666+
func TestRecentPointsNoDatabase(t *testing.T) {
667+
// Create a chain manager with no database. Blocks are stored
668+
// in memory only. RecentPoints must return the in-memory
669+
// chain points even though there is no blob store.
670+
cm, err := chain.NewManager(nil, nil)
671+
if err != nil {
672+
t.Fatalf(
673+
"unexpected error creating chain manager: %s",
674+
err,
675+
)
676+
}
677+
c := cm.PrimaryChain()
678+
679+
// Empty chain should return no points
680+
points := c.RecentPoints(10)
681+
if len(points) != 0 {
682+
t.Fatalf(
683+
"expected 0 points on empty chain, got %d",
684+
len(points),
685+
)
686+
}
687+
688+
// Add all test blocks
689+
for _, testBlock := range testBlocks {
690+
if err := c.AddBlock(testBlock, nil); err != nil {
691+
t.Fatalf(
692+
"unexpected error adding block to chain: %s",
693+
err,
694+
)
695+
}
696+
}
697+
698+
// Request more points than exist; should get all blocks
699+
points = c.RecentPoints(100)
700+
if len(points) != len(testBlocks) {
701+
t.Fatalf(
702+
"expected %d points, got %d",
703+
len(testBlocks),
704+
len(points),
705+
)
706+
}
707+
708+
// Points should be in descending order (most recent first)
709+
for i, p := range points {
710+
expectedBlock := testBlocks[len(testBlocks)-1-i]
711+
expectedHash := decodeHex(expectedBlock.MockHash)
712+
if p.Slot != expectedBlock.MockSlot {
713+
t.Fatalf(
714+
"point %d: expected slot %d, got %d",
715+
i,
716+
expectedBlock.MockSlot,
717+
p.Slot,
718+
)
719+
}
720+
if string(p.Hash) != string(expectedHash) {
721+
t.Fatalf(
722+
"point %d: expected hash %x, got %x",
723+
i,
724+
expectedHash,
725+
p.Hash,
726+
)
727+
}
728+
}
729+
730+
// Request fewer points than exist; should get exactly the
731+
// requested count, starting from the tip
732+
points = c.RecentPoints(2)
733+
if len(points) != 2 {
734+
t.Fatalf("expected 2 points, got %d", len(points))
735+
}
736+
lastBlock := testBlocks[len(testBlocks)-1]
737+
if points[0].Slot != lastBlock.MockSlot {
738+
t.Fatalf(
739+
"first point should be tip: expected slot %d, got %d",
740+
lastBlock.MockSlot,
741+
points[0].Slot,
742+
)
743+
}
744+
secondLastBlock := testBlocks[len(testBlocks)-2]
745+
if points[1].Slot != secondLastBlock.MockSlot {
746+
t.Fatalf(
747+
"second point should be tip-1: expected slot %d, got %d",
748+
secondLastBlock.MockSlot,
749+
points[1].Slot,
750+
)
751+
}
752+
}
753+
754+
func TestRecentPointsWithDatabase(t *testing.T) {
755+
// Create a chain manager with a real database. RecentPoints
756+
// should still return the correct in-memory tip even though
757+
// block storage goes through the blob store.
758+
db := newTestDB(t)
759+
cm, err := chain.NewManager(db, nil)
760+
if err != nil {
761+
t.Fatalf(
762+
"unexpected error creating chain manager: %s",
763+
err,
764+
)
765+
}
766+
c := cm.PrimaryChain()
767+
768+
// Add all test blocks
769+
for _, testBlock := range testBlocks {
770+
if err := c.AddBlock(testBlock, nil); err != nil {
771+
t.Fatalf(
772+
"unexpected error adding block to chain: %s",
773+
err,
774+
)
775+
}
776+
}
777+
778+
// RecentPoints should return points in descending order
779+
points := c.RecentPoints(3)
780+
if len(points) != 3 {
781+
t.Fatalf("expected 3 points, got %d", len(points))
782+
}
783+
784+
// Verify descending order by slot
785+
for i := range len(points) - 1 {
786+
if points[i].Slot <= points[i+1].Slot {
787+
t.Fatalf(
788+
"points not in descending order: "+
789+
"point %d (slot %d) <= point %d (slot %d)",
790+
i, points[i].Slot,
791+
i+1, points[i+1].Slot,
792+
)
793+
}
794+
}
795+
796+
// Tip should be the first point
797+
tip := c.Tip()
798+
if points[0].Slot != tip.Point.Slot ||
799+
string(points[0].Hash) != string(tip.Point.Hash) {
800+
t.Fatalf(
801+
"first point should match tip: got %d.%x, wanted %d.%x",
802+
points[0].Slot, points[0].Hash,
803+
tip.Point.Slot, tip.Point.Hash,
804+
)
805+
}
806+
}
807+
666808
// mockLedger implements the interface{ SecurityParam() int }
667809
// interface used by ChainManager.SetLedger.
668810
type mockLedger struct {

ledger/state.go

Lines changed: 55 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2552,22 +2552,63 @@ func (ls *LedgerState) GetBlock(point ocommon.Point) (models.Block, error) {
25522552
return ret, nil
25532553
}
25542554

2555-
// RecentChainPoints returns the requested count of recent chain points in descending order. This is used mostly
2556-
// for building a set of intersect points when acting as a chainsync client
2557-
func (ls *LedgerState) RecentChainPoints(count int) ([]ocommon.Point, error) {
2558-
tmpBlocks, err := database.BlocksRecent(ls.db, count)
2559-
if err != nil {
2560-
return nil, err
2561-
}
2562-
ret := make([]ocommon.Point, 0, len(tmpBlocks))
2563-
var tmpBlock models.Block
2564-
for _, tmpBlock = range tmpBlocks {
2565-
ret = append(
2566-
ret,
2567-
ocommon.NewPoint(tmpBlock.Slot, tmpBlock.Hash),
2555+
// RecentChainPoints returns the requested count of recent chain
2556+
// points in descending order. This is used mostly for building a set
2557+
// of intersect points when acting as a chainsync client.
2558+
//
2559+
// Points are first collected from the in-memory chain state, which is
2560+
// always up-to-date even when the blob store has not yet flushed
2561+
// recent writes. Database points are then appended to fill the
2562+
// requested count, with duplicates removed.
2563+
func (ls *LedgerState) RecentChainPoints(
2564+
count int,
2565+
) ([]ocommon.Point, error) {
2566+
var points []ocommon.Point
2567+
// Collect points from the in-memory chain first. The chain's
2568+
// tip and recent blocks are always current, even when the
2569+
// underlying blob store has pending writes that are not yet
2570+
// visible to new read transactions.
2571+
if ls.chain != nil {
2572+
points = ls.chain.RecentPoints(count)
2573+
}
2574+
// Supplement with database points for deeper history
2575+
if len(points) < count {
2576+
remaining := count - len(points)
2577+
tmpBlocks, err := database.BlocksRecent(
2578+
ls.db, remaining,
25682579
)
2580+
if err != nil {
2581+
// If we already have in-memory points, a database
2582+
// error is non-fatal: return what we have.
2583+
if len(points) > 0 {
2584+
return points, nil
2585+
}
2586+
return nil, err
2587+
}
2588+
// Build a set of existing points for deduplication
2589+
seen := make(map[string]struct{}, len(points))
2590+
for _, p := range points {
2591+
seen[pointKey(p)] = struct{}{}
2592+
}
2593+
for _, blk := range tmpBlocks {
2594+
p := ocommon.NewPoint(blk.Slot, blk.Hash)
2595+
key := pointKey(p)
2596+
if _, exists := seen[key]; exists {
2597+
continue
2598+
}
2599+
seen[key] = struct{}{}
2600+
points = append(points, p)
2601+
if len(points) >= count {
2602+
break
2603+
}
2604+
}
25692605
}
2570-
return ret, nil
2606+
return points, nil
2607+
}
2608+
2609+
// pointKey returns a string key for deduplication of chain points.
2610+
func pointKey(p ocommon.Point) string {
2611+
return fmt.Sprintf("%d:%x", p.Slot, p.Hash)
25712612
}
25722613

25732614
// GetIntersectPoint returns the intersect between the specified points and the current chain

0 commit comments

Comments
 (0)