Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file removed insight
Binary file not shown.
186 changes: 186 additions & 0 deletions internal/orchestrator/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,130 @@ func NewCommitter(rpc rpc.IRPCClient, storage storage.IStorage, opts ...Committe
return committer
}

func (c *Committer) cleanupStrandedBlocks() error {
// Get the current max block from main storage
latestCommittedBlockNumber, err := c.storage.MainStorage.GetMaxBlockNumber(c.rpc.GetChainID())
if err != nil {
return fmt.Errorf("error getting max block number from main storage: %v", err)
}

if latestCommittedBlockNumber.Sign() == 0 {
// No blocks in main storage yet, nothing to clean up
return nil
}

// Get block numbers from PostgreSQL that are less than latest committed block
psqlBlockNumbers, err := c.storage.StagingStorage.GetBlockNumbersLessThan(c.rpc.GetChainID(), latestCommittedBlockNumber)
if err != nil {
return fmt.Errorf("error getting block numbers from PostgreSQL: %v", err)
}

if len(psqlBlockNumbers) == 0 {
// No stranded blocks in staging
return nil
}

log.Info().
Int("block_count", len(psqlBlockNumbers)).
Str("min_block", psqlBlockNumbers[0].String()).
Str("max_block", psqlBlockNumbers[len(psqlBlockNumbers)-1].String()).
Msg("Found stranded blocks in staging")

// Process blocks in batches of c.blocksPerCommit, but max 1000 to avoid ClickHouse query limits
batchSize := c.blocksPerCommit
if batchSize > 1000 {
batchSize = 1000
}

for i := 0; i < len(psqlBlockNumbers); i += batchSize {
end := i + batchSize
if end > len(psqlBlockNumbers) {
end = len(psqlBlockNumbers)
}

batchBlockNumbers := psqlBlockNumbers[i:end]

if err := c.processStrandedBlocksBatch(batchBlockNumbers); err != nil {
return fmt.Errorf("error processing stranded blocks batch %d-%d: %v", i, end-1, err)
}
}

return nil
}
Comment on lines +78 to +127
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Sort and guard against nil to make logging/batching safe and deterministic.

  • latestCommittedBlockNumber can be nil; guard before use.
  • Sort psqlBlockNumbers before logging/batching so min/max reflect reality and batches are contiguous.

Apply this diff:

 func (c *Committer) cleanupStrandedBlocks() error {
 	// Get the current max block from main storage
 	latestCommittedBlockNumber, err := c.storage.MainStorage.GetMaxBlockNumber(c.rpc.GetChainID())
-	if err != nil {
+	if err != nil {
 		return fmt.Errorf("error getting max block number from main storage: %v", err)
 	}
 
-	if latestCommittedBlockNumber.Sign() == 0 {
+	if latestCommittedBlockNumber == nil || latestCommittedBlockNumber.Sign() == 0 {
 		// No blocks in main storage yet, nothing to clean up
 		return nil
 	}
 
 	// Get block numbers from PostgreSQL that are less than latest committed block
 	psqlBlockNumbers, err := c.storage.StagingStorage.GetBlockNumbersLessThan(c.rpc.GetChainID(), latestCommittedBlockNumber)
 	if err != nil {
 		return fmt.Errorf("error getting block numbers from PostgreSQL: %v", err)
 	}
 
 	if len(psqlBlockNumbers) == 0 {
 		// No stranded blocks in staging
 		return nil
 	}
 
+	// Ensure deterministic ordering for logging and batching
+	sort.Slice(psqlBlockNumbers, func(i, j int) bool {
+		return psqlBlockNumbers[i].Cmp(psqlBlockNumbers[j]) < 0
+	})
+
 	log.Info().
 		Int("block_count", len(psqlBlockNumbers)).
 		Str("min_block", psqlBlockNumbers[0].String()).
 		Str("max_block", psqlBlockNumbers[len(psqlBlockNumbers)-1].String()).
 		Msg("Found stranded blocks in staging")
🤖 Prompt for AI Agents
In internal/orchestrator/committer.go lines 78 to 127,
latestCommittedBlockNumber should be checked for nil before calling Sign() to
avoid nil pointer dereference. Also, sort the psqlBlockNumbers slice before
logging and batching to ensure min and max block numbers are accurate and
batches are processed in order. Add a nil check for latestCommittedBlockNumber
and use a sorting function on psqlBlockNumbers before the log.Info() call and
batch processing loop.


func (c *Committer) processStrandedBlocksBatch(blockNumbers []*big.Int) error {
if len(blockNumbers) == 0 {
return nil
}

log.Debug().
Int("batch_size", len(blockNumbers)).
Str("min_block", blockNumbers[0].String()).
Str("max_block", blockNumbers[len(blockNumbers)-1].String()).
Msg("Processing stranded blocks batch")

// Check which blocks exist in ClickHouse
existsInClickHouse, err := c.storage.MainStorage.CheckBlocksExist(c.rpc.GetChainID(), blockNumbers)
if err != nil {
return fmt.Errorf("error checking blocks in ClickHouse: %v", err)
}

// Get block data from PostgreSQL for blocks that don't exist in ClickHouse
var blocksToCommit []common.BlockData
for _, blockNum := range blockNumbers {
if !existsInClickHouse[blockNum.String()] {
data, err := c.storage.StagingStorage.GetStagingData(storage.QueryFilter{
BlockNumbers: []*big.Int{blockNum},
ChainId: c.rpc.GetChainID(),
})
if err != nil {
return fmt.Errorf("error getting block data from PostgreSQL: %v", err)
}
if len(data) > 0 {
blocksToCommit = append(blocksToCommit, data[0])
}
}
}

// Insert blocks into ClickHouse
if len(blocksToCommit) > 0 {
log.Info().
Int("block_count", len(blocksToCommit)).
Str("min_block", blocksToCommit[0].Block.Number.String()).
Str("max_block", blocksToCommit[len(blocksToCommit)-1].Block.Number.String()).
Msg("Committing stranded blocks to ClickHouse")

if err := c.storage.MainStorage.InsertBlockData(blocksToCommit); err != nil {
return fmt.Errorf("error inserting blocks into ClickHouse: %v", err)
}
Comment on lines +146 to +173
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Fix N+1 staging queries and delete only blocks that exist or were inserted.

Batch fetch missing blocks in one call and restrict deletion to blocks that either existed in ClickHouse or were successfully inserted just now. This avoids heavy N+1 patterns and reduces risk of data loss on partial failures.

Apply this diff:

-	// Get block data from PostgreSQL for blocks that don't exist in ClickHouse
-	var blocksToCommit []common.BlockData
-	for _, blockNum := range blockNumbers {
-		if !existsInClickHouse[blockNum.String()] {
-			data, err := c.storage.StagingStorage.GetStagingData(storage.QueryFilter{
-				BlockNumbers: []*big.Int{blockNum},
-				ChainId:      c.rpc.GetChainID(),
-			})
-			if err != nil {
-				return fmt.Errorf("error getting block data from PostgreSQL: %v", err)
-			}
-			if len(data) > 0 {
-				blocksToCommit = append(blocksToCommit, data[0])
-			}
-		}
-	}
+	// Collect all missing blocks and fetch them in a single query
+	var missingBlockNumbers []*big.Int
+	for _, bn := range blockNumbers {
+		if !existsInClickHouse[bn.String()] {
+			missingBlockNumbers = append(missingBlockNumbers, bn)
+		}
+	}
+
+	var blocksToCommit []common.BlockData
+	if len(missingBlockNumbers) > 0 {
+		data, err := c.storage.StagingStorage.GetStagingData(storage.QueryFilter{
+			BlockNumbers: missingBlockNumbers,
+			ChainId:      c.rpc.GetChainID(),
+		})
+		if err != nil {
+			return fmt.Errorf("error getting block data from PostgreSQL: %v", err)
+		}
+		blocksToCommit = data
+	}
 
 	// Insert blocks into ClickHouse
 	if len(blocksToCommit) > 0 {
 		log.Info().
 			Int("block_count", len(blocksToCommit)).
 			Str("min_block", blocksToCommit[0].Block.Number.String()).
 			Str("max_block", blocksToCommit[len(blocksToCommit)-1].Block.Number.String()).
 			Msg("Committing stranded blocks to ClickHouse")
 
 		if err := c.storage.MainStorage.InsertBlockData(blocksToCommit); err != nil {
 			return fmt.Errorf("error inserting blocks into ClickHouse: %v", err)
 		}
 	}
 
-	// Delete all blocks from PostgreSQL that were checked (whether they existed in ClickHouse or not)
-	var blocksToDelete []common.BlockData
-	for _, blockNum := range blockNumbers {
-		blocksToDelete = append(blocksToDelete, common.BlockData{
-			Block: common.Block{
-				ChainId: c.rpc.GetChainID(),
-				Number:  blockNum,
-			},
-		})
-	}
+	// Delete only blocks that exist in ClickHouse or were just inserted
+	inserted := make(map[string]struct{}, len(blocksToCommit))
+	for _, bd := range blocksToCommit {
+		inserted[bd.Block.Number.String()] = struct{}{}
+	}
+	var blocksToDelete []common.BlockData
+	for _, bn := range blockNumbers {
+		if existsInClickHouse[bn.String()] {
+			blocksToDelete = append(blocksToDelete, common.BlockData{Block: common.Block{ChainId: c.rpc.GetChainID(), Number: bn}})
+			continue
+		}
+		if _, ok := inserted[bn.String()]; ok {
+			blocksToDelete = append(blocksToDelete, common.BlockData{Block: common.Block{ChainId: c.rpc.GetChainID(), Number: bn}})
+		}
+	}
 
 	if len(blocksToDelete) > 0 {
 		log.Info().
 			Int("block_count", len(blocksToDelete)).
 			Str("min_block", blocksToDelete[0].Block.Number.String()).
 			Str("max_block", blocksToDelete[len(blocksToDelete)-1].Block.Number.String()).
 			Msg("Deleting stranded blocks from PostgreSQL")
 
 		if err := c.storage.StagingStorage.DeleteStagingData(blocksToDelete); err != nil {
 			return fmt.Errorf("error deleting blocks from PostgreSQL: %v", err)
 		}
 	}

Also applies to: 176-197

🤖 Prompt for AI Agents
In internal/orchestrator/committer.go from lines 146 to 173 and also lines 176
to 197, the current code performs N+1 queries to fetch staging data for each
missing block individually and deletes blocks without verifying their existence
or successful insertion. To fix this, modify the code to batch fetch all missing
blocks in a single query instead of looping with individual calls. Then, when
deleting blocks, restrict the deletion only to those blocks that either already
existed in ClickHouse or were successfully inserted in the current operation.
This will eliminate the N+1 query pattern and prevent accidental deletion of
blocks that were not inserted due to partial failures.

}

// Delete all blocks from PostgreSQL that were checked (whether they existed in ClickHouse or not)
var blocksToDelete []common.BlockData
for _, blockNum := range blockNumbers {
blocksToDelete = append(blocksToDelete, common.BlockData{
Block: common.Block{
ChainId: c.rpc.GetChainID(),
Number: blockNum,
},
})
}

if len(blocksToDelete) > 0 {
log.Info().
Int("block_count", len(blocksToDelete)).
Str("min_block", blocksToDelete[0].Block.Number.String()).
Str("max_block", blocksToDelete[len(blocksToDelete)-1].Block.Number.String()).
Msg("Deleting stranded blocks from PostgreSQL")

if err := c.storage.StagingStorage.DeleteStagingData(blocksToDelete); err != nil {
return fmt.Errorf("error deleting blocks from PostgreSQL: %v", err)
}
}

return nil
}

func (c *Committer) Start(ctx context.Context) {
interval := time.Duration(c.triggerIntervalMs) * time.Millisecond

Expand Down Expand Up @@ -135,6 +259,68 @@ func (c *Committer) getBlockNumbersToCommit(ctx context.Context) ([]*big.Int, er
}
}

// Get block numbers from PostgreSQL that are less than latest committed block
psqlBlockNumbers, err := c.storage.StagingStorage.GetBlockNumbersLessThan(c.rpc.GetChainID(), latestCommittedBlockNumber)
if err != nil {
return nil, fmt.Errorf("error getting block numbers from PostgreSQL: %v", err)
}

if len(psqlBlockNumbers) > 0 {
// Check which blocks exist in ClickHouse
existsInClickHouse, err := c.storage.MainStorage.CheckBlocksExist(c.rpc.GetChainID(), psqlBlockNumbers)
if err != nil {
return nil, fmt.Errorf("error checking blocks in ClickHouse: %v", err)
}

// Get block data from PostgreSQL for blocks that don't exist in ClickHouse
var blocksToCommit []common.BlockData
for _, blockNum := range psqlBlockNumbers {
if !existsInClickHouse[blockNum.String()] {
data, err := c.storage.StagingStorage.GetStagingData(storage.QueryFilter{
BlockNumbers: []*big.Int{blockNum},
ChainId: c.rpc.GetChainID(),
})
if err != nil {
return nil, fmt.Errorf("error getting block data from PostgreSQL: %v", err)
}
if len(data) > 0 {
blocksToCommit = append(blocksToCommit, data[0])
}
}
}

// Insert blocks into ClickHouse
if len(blocksToCommit) > 0 {
if err := c.storage.MainStorage.InsertBlockData(blocksToCommit); err != nil {
return nil, fmt.Errorf("error inserting blocks into ClickHouse: %v", err)
}
}

// Delete all blocks from PostgreSQL that were checked (whether they existed in ClickHouse or not)
var blocksToDelete []common.BlockData
for _, blockNum := range psqlBlockNumbers {
blocksToDelete = append(blocksToDelete, common.BlockData{
Block: common.Block{
ChainId: c.rpc.GetChainID(),
Number: blockNum,
},
})
}

if len(blocksToDelete) > 0 {
log.Info().
Int("block_count", len(blocksToDelete)).
Str("min_block", blocksToDelete[0].Block.Number.String()).
Str("max_block", blocksToDelete[len(blocksToDelete)-1].Block.Number.String()).
Msg("Deleting stranded blocks from PostgreSQL")

if err := c.storage.StagingStorage.DeleteStagingData(blocksToDelete); err != nil {
log.Error().Err(err).Msg("Failed to delete blocks from PostgreSQL")
}
}
}

// Continue with normal block range processing
startBlock := new(big.Int).Add(latestCommittedBlockNumber, big.NewInt(1))
endBlock, err := c.getBlockToCommitUntil(ctx, latestCommittedBlockNumber)
if err != nil {
Expand Down
13 changes: 13 additions & 0 deletions internal/orchestrator/committer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func TestGetBlockNumbersToCommit(t *testing.T) {

mockRPC.EXPECT().GetChainID().Return(chainID)
mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(100), nil)
mockStagingStorage.EXPECT().GetBlockNumbersLessThan(chainID, big.NewInt(100)).Return([]*big.Int{}, nil)

Comment on lines +49 to 50
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Add tests for the stranded-blocks cleanup path and reduce stub duplication.

Right now all tests stub GetBlockNumbersLessThan to return empty, so the new logic (CheckBlocksExist → fetch missing → insert → delete) isn’t exercised. Add a test that:

  • Returns a non-empty set of stranded blocks from staging,
  • Mocks CheckBlocksExist to simulate partial existence in main storage,
  • Verifies Staging.GetStagingData is called once with all missing blocks (batch, not N+1),
  • Verifies Main.InsertBlockData and Staging.DeleteStagingData are called with the correct sets.

Also consider a small helper to DRY the repeated “no stranded blocks” stub across tests.

Also applies to: 74-75, 102-103, 130-131, 155-156, 236-237, 263-264, 299-300, 416-416, 516-517, 562-563


🏁 Script executed:

#!/bin/bash
set -e
# Find all occurrences of GetBlockNumbersLessThan in the committer tests
rg -n "GetBlockNumbersLessThan" internal/orchestrator/committer_test.go

Length of output: 1495


🏁 Script executed:

#!/bin/bash
set -e
# Search for CheckBlocksExist usage in committer tests
rg -n "CheckBlocksExist" internal/orchestrator/committer_test.go

Length of output: 73


Add stranded-blocks cleanup tests and DRY the GetBlockNumbersLessThan stub

Currently, all committer_test.go tests stub

mockStagingStorage.EXPECT().GetBlockNumbersLessThan(...).Return([]*big.Int{}, nil)

so the cleanup path (CheckBlocksExist → GetStagingData → InsertBlockData → DeleteStagingData) never runs. To address this:

  • In internal/orchestrator/committer_test.go, add a test case that:
    • Mocks GetBlockNumbersLessThan to return a non-empty slice of block numbers.
    • Mocks mockMainStorage.CheckBlocksExist to mark some blocks as already present and others missing.
    • Verifies mockStagingStorage.GetStagingData is called exactly once with the full batch of missing blocks (not one per block).
    • Verifies mockMainStorage.InsertBlockData is called for the missing blocks and mockStagingStorage.DeleteStagingData is called for the ones that already existed.

  • Extract a helper (e.g. stubEmptyStrandedBlocks()) to centralize the repeated
    GetBlockNumbersLessThan(...).Return([]*big.Int{}, nil)
    stub across all tests (lines 49, 74, 102, 130, 155, 183, 236, 263, 299, 415, 450, 516, 562).

This will ensure the new cleanup logic is covered and reduce boilerplate.

🤖 Prompt for AI Agents
In internal/orchestrator/committer_test.go around lines 49 to 50, add a new test
case that mocks GetBlockNumbersLessThan to return a non-empty slice of block
numbers, mocks mockMainStorage.CheckBlocksExist to mark some blocks as present
and others missing, and verifies that mockStagingStorage.GetStagingData is
called once with all missing blocks, mockMainStorage.InsertBlockData is called
for missing blocks, and mockStagingStorage.DeleteStagingData is called for
existing blocks. Also, create a helper function (e.g., stubEmptyStrandedBlocks)
to centralize the repeated stub of GetBlockNumbersLessThan returning an empty
slice and nil error, and replace all existing instances of this stub at lines
49, 74, 102, 130, 155, 183, 236, 263, 299, 415, 450, 516, and 562 with calls to
this helper to reduce boilerplate.

blockNumbers, err := committer.getBlockNumbersToCommit(context.Background())

Expand All @@ -70,6 +71,7 @@ func TestGetBlockNumbersToCommitWithoutConfiguredAndNotStored(t *testing.T) {

mockRPC.EXPECT().GetChainID().Return(chainID)
mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(0), nil)
mockStagingStorage.EXPECT().GetBlockNumbersLessThan(chainID, big.NewInt(-1)).Return([]*big.Int{}, nil)

blockNumbers, err := committer.getBlockNumbersToCommit(context.Background())

Expand Down Expand Up @@ -97,6 +99,7 @@ func TestGetBlockNumbersToCommitWithConfiguredAndNotStored(t *testing.T) {

mockRPC.EXPECT().GetChainID().Return(chainID)
mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(0), nil)
mockStagingStorage.EXPECT().GetBlockNumbersLessThan(chainID, big.NewInt(49)).Return([]*big.Int{}, nil)

blockNumbers, err := committer.getBlockNumbersToCommit(context.Background())

Expand Down Expand Up @@ -124,6 +127,7 @@ func TestGetBlockNumbersToCommitWithConfiguredAndStored(t *testing.T) {

mockRPC.EXPECT().GetChainID().Return(chainID)
mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(2000), nil)
mockStagingStorage.EXPECT().GetBlockNumbersLessThan(chainID, big.NewInt(2000)).Return([]*big.Int{}, nil)

blockNumbers, err := committer.getBlockNumbersToCommit(context.Background())

Expand All @@ -148,6 +152,7 @@ func TestGetBlockNumbersToCommitWithoutConfiguredAndStored(t *testing.T) {

mockRPC.EXPECT().GetChainID().Return(chainID)
mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(2000), nil)
mockStagingStorage.EXPECT().GetBlockNumbersLessThan(chainID, big.NewInt(2000)).Return([]*big.Int{}, nil)

blockNumbers, err := committer.getBlockNumbersToCommit(context.Background())

Expand Down Expand Up @@ -175,6 +180,7 @@ func TestGetBlockNumbersToCommitWithStoredHigherThanInMemory(t *testing.T) {

mockRPC.EXPECT().GetChainID().Return(chainID)
mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(2000), nil)
mockStagingStorage.EXPECT().GetBlockNumbersLessThan(chainID, big.NewInt(2000)).Return([]*big.Int{}, nil)

blockNumbers, err := committer.getBlockNumbersToCommit(context.Background())

Expand Down Expand Up @@ -227,6 +233,7 @@ func TestGetBlockNumbersToCommitWithStoredEqualThanInMemory(t *testing.T) {

mockRPC.EXPECT().GetChainID().Return(chainID)
mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(2000), nil)
mockStagingStorage.EXPECT().GetBlockNumbersLessThan(chainID, big.NewInt(2000)).Return([]*big.Int{}, nil)

blockNumbers, err := committer.getBlockNumbersToCommit(context.Background())

Expand All @@ -253,6 +260,7 @@ func TestGetSequentialBlockDataToCommit(t *testing.T) {

mockRPC.EXPECT().GetChainID().Return(chainID)
mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(100), nil)
mockStagingStorage.EXPECT().GetBlockNumbersLessThan(chainID, big.NewInt(100)).Return([]*big.Int{}, nil)

blockData := []common.BlockData{
{Block: common.Block{Number: big.NewInt(101)}},
Expand Down Expand Up @@ -288,6 +296,7 @@ func TestGetSequentialBlockDataToCommitWithDuplicateBlocks(t *testing.T) {

mockRPC.EXPECT().GetChainID().Return(chainID)
mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(100), nil)
mockStagingStorage.EXPECT().GetBlockNumbersLessThan(chainID, big.NewInt(100)).Return([]*big.Int{}, nil)

blockData := []common.BlockData{
{Block: common.Block{Number: big.NewInt(101)}},
Expand Down Expand Up @@ -403,6 +412,7 @@ func TestStartCommitter(t *testing.T) {
chainID := big.NewInt(1)
mockRPC.EXPECT().GetChainID().Return(chainID)
mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(100), nil)
mockStagingStorage.EXPECT().GetBlockNumbersLessThan(chainID, big.NewInt(100)).Return([]*big.Int{}, nil)

blockData := []common.BlockData{
{Block: common.Block{Number: big.NewInt(101)}},
Expand Down Expand Up @@ -437,6 +447,7 @@ func TestCommitterRespectsSIGTERM(t *testing.T) {
chainID := big.NewInt(1)
mockRPC.EXPECT().GetChainID().Return(chainID)
mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(100), nil)
mockStagingStorage.EXPECT().GetBlockNumbersLessThan(chainID, big.NewInt(100)).Return([]*big.Int{}, nil)

blockData := []common.BlockData{
{Block: common.Block{Number: big.NewInt(101)}},
Expand Down Expand Up @@ -502,6 +513,7 @@ func TestHandleMissingStagingData(t *testing.T) {
mockStagingStorage.EXPECT().InsertStagingData(mock.Anything).Return(nil)

mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(0), nil)
mockStagingStorage.EXPECT().GetBlockNumbersLessThan(chainID, big.NewInt(-1)).Return([]*big.Int{}, nil)
expectedEndBlock := big.NewInt(4)
mockStagingStorage.EXPECT().GetLastStagedBlockNumber(chainID, expectedEndBlock, big.NewInt(0)).Return(big.NewInt(20), nil)

Expand Down Expand Up @@ -547,6 +559,7 @@ func TestHandleMissingStagingDataIsPolledWithCorrectBatchSize(t *testing.T) {
mockStagingStorage.EXPECT().InsertStagingData(mock.Anything).Return(nil)

mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(0), nil)
mockStagingStorage.EXPECT().GetBlockNumbersLessThan(chainID, big.NewInt(-1)).Return([]*big.Int{}, nil)
expectedEndBlock := big.NewInt(4)
mockStagingStorage.EXPECT().GetLastStagedBlockNumber(chainID, expectedEndBlock, big.NewInt(0)).Return(big.NewInt(20), nil)

Expand Down
8 changes: 8 additions & 0 deletions internal/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,14 @@ func (o *Orchestrator) Start() {
defer workModeMonitor.UnregisterChannel(committerWorkModeChan)
validator := NewValidator(o.rpc, o.storage)
committer := NewCommitter(o.rpc, o.storage, WithCommitterWorkModeChan(committerWorkModeChan), WithValidator(validator))

// Clean up any stranded blocks in staging in a separate goroutine
go func() {
if err := committer.cleanupStrandedBlocks(); err != nil {
log.Error().Err(err).Msg("Failed to clean up stranded blocks during initialization")
}
}()

committer.Start(ctx)
}()
}
Expand Down
Loading