-
Notifications
You must be signed in to change notification settings - Fork 28
cleanup staging on boot #258
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -75,6 +75,130 @@ func NewCommitter(rpc rpc.IRPCClient, storage storage.IStorage, opts ...Committe | |
| return committer | ||
| } | ||
|
|
||
| func (c *Committer) cleanupStrandedBlocks() error { | ||
| // Get the current max block from main storage | ||
| latestCommittedBlockNumber, err := c.storage.MainStorage.GetMaxBlockNumber(c.rpc.GetChainID()) | ||
| if err != nil { | ||
| return fmt.Errorf("error getting max block number from main storage: %v", err) | ||
| } | ||
|
|
||
| if latestCommittedBlockNumber.Sign() == 0 { | ||
| // No blocks in main storage yet, nothing to clean up | ||
| return nil | ||
| } | ||
|
|
||
| // Get block numbers from PostgreSQL that are less than latest committed block | ||
| psqlBlockNumbers, err := c.storage.StagingStorage.GetBlockNumbersLessThan(c.rpc.GetChainID(), latestCommittedBlockNumber) | ||
| if err != nil { | ||
| return fmt.Errorf("error getting block numbers from PostgreSQL: %v", err) | ||
| } | ||
|
|
||
| if len(psqlBlockNumbers) == 0 { | ||
| // No stranded blocks in staging | ||
| return nil | ||
| } | ||
|
|
||
| log.Info(). | ||
| Int("block_count", len(psqlBlockNumbers)). | ||
| Str("min_block", psqlBlockNumbers[0].String()). | ||
| Str("max_block", psqlBlockNumbers[len(psqlBlockNumbers)-1].String()). | ||
| Msg("Found stranded blocks in staging") | ||
|
|
||
| // Process blocks in batches of c.blocksPerCommit, but max 1000 to avoid ClickHouse query limits | ||
| batchSize := c.blocksPerCommit | ||
| if batchSize > 1000 { | ||
| batchSize = 1000 | ||
| } | ||
|
|
||
| for i := 0; i < len(psqlBlockNumbers); i += batchSize { | ||
| end := i + batchSize | ||
| if end > len(psqlBlockNumbers) { | ||
| end = len(psqlBlockNumbers) | ||
| } | ||
|
|
||
| batchBlockNumbers := psqlBlockNumbers[i:end] | ||
|
|
||
| if err := c.processStrandedBlocksBatch(batchBlockNumbers); err != nil { | ||
| return fmt.Errorf("error processing stranded blocks batch %d-%d: %v", i, end-1, err) | ||
| } | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| func (c *Committer) processStrandedBlocksBatch(blockNumbers []*big.Int) error { | ||
| if len(blockNumbers) == 0 { | ||
| return nil | ||
| } | ||
|
|
||
| log.Debug(). | ||
| Int("batch_size", len(blockNumbers)). | ||
| Str("min_block", blockNumbers[0].String()). | ||
| Str("max_block", blockNumbers[len(blockNumbers)-1].String()). | ||
| Msg("Processing stranded blocks batch") | ||
|
|
||
| // Check which blocks exist in ClickHouse | ||
| existsInClickHouse, err := c.storage.MainStorage.CheckBlocksExist(c.rpc.GetChainID(), blockNumbers) | ||
| if err != nil { | ||
| return fmt.Errorf("error checking blocks in ClickHouse: %v", err) | ||
| } | ||
|
|
||
| // Get block data from PostgreSQL for blocks that don't exist in ClickHouse | ||
| var blocksToCommit []common.BlockData | ||
| for _, blockNum := range blockNumbers { | ||
| if !existsInClickHouse[blockNum.String()] { | ||
| data, err := c.storage.StagingStorage.GetStagingData(storage.QueryFilter{ | ||
| BlockNumbers: []*big.Int{blockNum}, | ||
| ChainId: c.rpc.GetChainID(), | ||
| }) | ||
| if err != nil { | ||
| return fmt.Errorf("error getting block data from PostgreSQL: %v", err) | ||
| } | ||
| if len(data) > 0 { | ||
| blocksToCommit = append(blocksToCommit, data[0]) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // Insert blocks into ClickHouse | ||
| if len(blocksToCommit) > 0 { | ||
| log.Info(). | ||
| Int("block_count", len(blocksToCommit)). | ||
| Str("min_block", blocksToCommit[0].Block.Number.String()). | ||
| Str("max_block", blocksToCommit[len(blocksToCommit)-1].Block.Number.String()). | ||
| Msg("Committing stranded blocks to ClickHouse") | ||
|
|
||
| if err := c.storage.MainStorage.InsertBlockData(blocksToCommit); err != nil { | ||
| return fmt.Errorf("error inserting blocks into ClickHouse: %v", err) | ||
| } | ||
|
Comment on lines
+146
to
+173
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Fix N+1 staging queries and delete only blocks that exist or were inserted. Batch fetch missing blocks in one call and restrict deletion to blocks that either existed in ClickHouse or were successfully inserted just now. This avoids heavy N+1 patterns and reduces risk of data loss on partial failures. Apply this diff: - // Get block data from PostgreSQL for blocks that don't exist in ClickHouse
- var blocksToCommit []common.BlockData
- for _, blockNum := range blockNumbers {
- if !existsInClickHouse[blockNum.String()] {
- data, err := c.storage.StagingStorage.GetStagingData(storage.QueryFilter{
- BlockNumbers: []*big.Int{blockNum},
- ChainId: c.rpc.GetChainID(),
- })
- if err != nil {
- return fmt.Errorf("error getting block data from PostgreSQL: %v", err)
- }
- if len(data) > 0 {
- blocksToCommit = append(blocksToCommit, data[0])
- }
- }
- }
+ // Collect all missing blocks and fetch them in a single query
+ var missingBlockNumbers []*big.Int
+ for _, bn := range blockNumbers {
+ if !existsInClickHouse[bn.String()] {
+ missingBlockNumbers = append(missingBlockNumbers, bn)
+ }
+ }
+
+ var blocksToCommit []common.BlockData
+ if len(missingBlockNumbers) > 0 {
+ data, err := c.storage.StagingStorage.GetStagingData(storage.QueryFilter{
+ BlockNumbers: missingBlockNumbers,
+ ChainId: c.rpc.GetChainID(),
+ })
+ if err != nil {
+ return fmt.Errorf("error getting block data from PostgreSQL: %v", err)
+ }
+ blocksToCommit = data
+ }
// Insert blocks into ClickHouse
if len(blocksToCommit) > 0 {
log.Info().
Int("block_count", len(blocksToCommit)).
Str("min_block", blocksToCommit[0].Block.Number.String()).
Str("max_block", blocksToCommit[len(blocksToCommit)-1].Block.Number.String()).
Msg("Committing stranded blocks to ClickHouse")
if err := c.storage.MainStorage.InsertBlockData(blocksToCommit); err != nil {
return fmt.Errorf("error inserting blocks into ClickHouse: %v", err)
}
}
- // Delete all blocks from PostgreSQL that were checked (whether they existed in ClickHouse or not)
- var blocksToDelete []common.BlockData
- for _, blockNum := range blockNumbers {
- blocksToDelete = append(blocksToDelete, common.BlockData{
- Block: common.Block{
- ChainId: c.rpc.GetChainID(),
- Number: blockNum,
- },
- })
- }
+ // Delete only blocks that exist in ClickHouse or were just inserted
+ inserted := make(map[string]struct{}, len(blocksToCommit))
+ for _, bd := range blocksToCommit {
+ inserted[bd.Block.Number.String()] = struct{}{}
+ }
+ var blocksToDelete []common.BlockData
+ for _, bn := range blockNumbers {
+ if existsInClickHouse[bn.String()] {
+ blocksToDelete = append(blocksToDelete, common.BlockData{Block: common.Block{ChainId: c.rpc.GetChainID(), Number: bn}})
+ continue
+ }
+ if _, ok := inserted[bn.String()]; ok {
+ blocksToDelete = append(blocksToDelete, common.BlockData{Block: common.Block{ChainId: c.rpc.GetChainID(), Number: bn}})
+ }
+ }
if len(blocksToDelete) > 0 {
log.Info().
Int("block_count", len(blocksToDelete)).
Str("min_block", blocksToDelete[0].Block.Number.String()).
Str("max_block", blocksToDelete[len(blocksToDelete)-1].Block.Number.String()).
Msg("Deleting stranded blocks from PostgreSQL")
if err := c.storage.StagingStorage.DeleteStagingData(blocksToDelete); err != nil {
return fmt.Errorf("error deleting blocks from PostgreSQL: %v", err)
}
}Also applies to: 176-197 🤖 Prompt for AI Agents |
||
| } | ||
|
|
||
| // Delete all blocks from PostgreSQL that were checked (whether they existed in ClickHouse or not) | ||
| var blocksToDelete []common.BlockData | ||
| for _, blockNum := range blockNumbers { | ||
| blocksToDelete = append(blocksToDelete, common.BlockData{ | ||
| Block: common.Block{ | ||
| ChainId: c.rpc.GetChainID(), | ||
| Number: blockNum, | ||
| }, | ||
| }) | ||
| } | ||
|
|
||
| if len(blocksToDelete) > 0 { | ||
| log.Info(). | ||
| Int("block_count", len(blocksToDelete)). | ||
| Str("min_block", blocksToDelete[0].Block.Number.String()). | ||
| Str("max_block", blocksToDelete[len(blocksToDelete)-1].Block.Number.String()). | ||
| Msg("Deleting stranded blocks from PostgreSQL") | ||
|
|
||
| if err := c.storage.StagingStorage.DeleteStagingData(blocksToDelete); err != nil { | ||
| return fmt.Errorf("error deleting blocks from PostgreSQL: %v", err) | ||
| } | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| func (c *Committer) Start(ctx context.Context) { | ||
| interval := time.Duration(c.triggerIntervalMs) * time.Millisecond | ||
|
|
||
|
|
@@ -135,6 +259,68 @@ func (c *Committer) getBlockNumbersToCommit(ctx context.Context) ([]*big.Int, er | |
| } | ||
| } | ||
|
|
||
| // Get block numbers from PostgreSQL that are less than latest committed block | ||
| psqlBlockNumbers, err := c.storage.StagingStorage.GetBlockNumbersLessThan(c.rpc.GetChainID(), latestCommittedBlockNumber) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("error getting block numbers from PostgreSQL: %v", err) | ||
| } | ||
|
|
||
| if len(psqlBlockNumbers) > 0 { | ||
| // Check which blocks exist in ClickHouse | ||
| existsInClickHouse, err := c.storage.MainStorage.CheckBlocksExist(c.rpc.GetChainID(), psqlBlockNumbers) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("error checking blocks in ClickHouse: %v", err) | ||
| } | ||
|
|
||
| // Get block data from PostgreSQL for blocks that don't exist in ClickHouse | ||
| var blocksToCommit []common.BlockData | ||
| for _, blockNum := range psqlBlockNumbers { | ||
| if !existsInClickHouse[blockNum.String()] { | ||
| data, err := c.storage.StagingStorage.GetStagingData(storage.QueryFilter{ | ||
| BlockNumbers: []*big.Int{blockNum}, | ||
| ChainId: c.rpc.GetChainID(), | ||
| }) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("error getting block data from PostgreSQL: %v", err) | ||
| } | ||
| if len(data) > 0 { | ||
| blocksToCommit = append(blocksToCommit, data[0]) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // Insert blocks into ClickHouse | ||
| if len(blocksToCommit) > 0 { | ||
| if err := c.storage.MainStorage.InsertBlockData(blocksToCommit); err != nil { | ||
| return nil, fmt.Errorf("error inserting blocks into ClickHouse: %v", err) | ||
| } | ||
| } | ||
|
|
||
| // Delete all blocks from PostgreSQL that were checked (whether they existed in ClickHouse or not) | ||
| var blocksToDelete []common.BlockData | ||
| for _, blockNum := range psqlBlockNumbers { | ||
| blocksToDelete = append(blocksToDelete, common.BlockData{ | ||
| Block: common.Block{ | ||
| ChainId: c.rpc.GetChainID(), | ||
| Number: blockNum, | ||
| }, | ||
| }) | ||
| } | ||
|
|
||
| if len(blocksToDelete) > 0 { | ||
| log.Info(). | ||
| Int("block_count", len(blocksToDelete)). | ||
| Str("min_block", blocksToDelete[0].Block.Number.String()). | ||
| Str("max_block", blocksToDelete[len(blocksToDelete)-1].Block.Number.String()). | ||
| Msg("Deleting stranded blocks from PostgreSQL") | ||
|
|
||
| if err := c.storage.StagingStorage.DeleteStagingData(blocksToDelete); err != nil { | ||
| log.Error().Err(err).Msg("Failed to delete blocks from PostgreSQL") | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // Continue with normal block range processing | ||
| startBlock := new(big.Int).Add(latestCommittedBlockNumber, big.NewInt(1)) | ||
| endBlock, err := c.getBlockToCommitUntil(ctx, latestCommittedBlockNumber) | ||
| if err != nil { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -46,6 +46,7 @@ func TestGetBlockNumbersToCommit(t *testing.T) { | |
|
|
||
| mockRPC.EXPECT().GetChainID().Return(chainID) | ||
| mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(100), nil) | ||
| mockStagingStorage.EXPECT().GetBlockNumbersLessThan(chainID, big.NewInt(100)).Return([]*big.Int{}, nil) | ||
|
|
||
|
Comment on lines
+49
to
50
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 💡 Verification agent 🧩 Analysis chainAdd tests for the stranded-blocks cleanup path and reduce stub duplication. Right now all tests stub GetBlockNumbersLessThan to return empty, so the new logic (CheckBlocksExist → fetch missing → insert → delete) isn’t exercised. Add a test that:
Also consider a small helper to DRY the repeated “no stranded blocks” stub across tests. Also applies to: 74-75, 102-103, 130-131, 155-156, 236-237, 263-264, 299-300, 416-416, 516-517, 562-563 🏁 Script executed: #!/bin/bash
set -e
# Find all occurrences of GetBlockNumbersLessThan in the committer tests
rg -n "GetBlockNumbersLessThan" internal/orchestrator/committer_test.goLength of output: 1495 🏁 Script executed: #!/bin/bash
set -e
# Search for CheckBlocksExist usage in committer tests
rg -n "CheckBlocksExist" internal/orchestrator/committer_test.goLength of output: 73 Add stranded-blocks cleanup tests and DRY the GetBlockNumbersLessThan stub Currently, all mockStagingStorage.EXPECT().GetBlockNumbersLessThan(...).Return([]*big.Int{}, nil)so the cleanup path (
This will ensure the new cleanup logic is covered and reduce boilerplate. 🤖 Prompt for AI Agents |
||
| blockNumbers, err := committer.getBlockNumbersToCommit(context.Background()) | ||
|
|
||
|
|
@@ -70,6 +71,7 @@ func TestGetBlockNumbersToCommitWithoutConfiguredAndNotStored(t *testing.T) { | |
|
|
||
| mockRPC.EXPECT().GetChainID().Return(chainID) | ||
| mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(0), nil) | ||
| mockStagingStorage.EXPECT().GetBlockNumbersLessThan(chainID, big.NewInt(-1)).Return([]*big.Int{}, nil) | ||
|
|
||
| blockNumbers, err := committer.getBlockNumbersToCommit(context.Background()) | ||
|
|
||
|
|
@@ -97,6 +99,7 @@ func TestGetBlockNumbersToCommitWithConfiguredAndNotStored(t *testing.T) { | |
|
|
||
| mockRPC.EXPECT().GetChainID().Return(chainID) | ||
| mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(0), nil) | ||
| mockStagingStorage.EXPECT().GetBlockNumbersLessThan(chainID, big.NewInt(49)).Return([]*big.Int{}, nil) | ||
|
|
||
| blockNumbers, err := committer.getBlockNumbersToCommit(context.Background()) | ||
|
|
||
|
|
@@ -124,6 +127,7 @@ func TestGetBlockNumbersToCommitWithConfiguredAndStored(t *testing.T) { | |
|
|
||
| mockRPC.EXPECT().GetChainID().Return(chainID) | ||
| mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(2000), nil) | ||
| mockStagingStorage.EXPECT().GetBlockNumbersLessThan(chainID, big.NewInt(2000)).Return([]*big.Int{}, nil) | ||
|
|
||
| blockNumbers, err := committer.getBlockNumbersToCommit(context.Background()) | ||
|
|
||
|
|
@@ -148,6 +152,7 @@ func TestGetBlockNumbersToCommitWithoutConfiguredAndStored(t *testing.T) { | |
|
|
||
| mockRPC.EXPECT().GetChainID().Return(chainID) | ||
| mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(2000), nil) | ||
| mockStagingStorage.EXPECT().GetBlockNumbersLessThan(chainID, big.NewInt(2000)).Return([]*big.Int{}, nil) | ||
|
|
||
| blockNumbers, err := committer.getBlockNumbersToCommit(context.Background()) | ||
|
|
||
|
|
@@ -175,6 +180,7 @@ func TestGetBlockNumbersToCommitWithStoredHigherThanInMemory(t *testing.T) { | |
|
|
||
| mockRPC.EXPECT().GetChainID().Return(chainID) | ||
| mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(2000), nil) | ||
| mockStagingStorage.EXPECT().GetBlockNumbersLessThan(chainID, big.NewInt(2000)).Return([]*big.Int{}, nil) | ||
|
|
||
| blockNumbers, err := committer.getBlockNumbersToCommit(context.Background()) | ||
|
|
||
|
|
@@ -227,6 +233,7 @@ func TestGetBlockNumbersToCommitWithStoredEqualThanInMemory(t *testing.T) { | |
|
|
||
| mockRPC.EXPECT().GetChainID().Return(chainID) | ||
| mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(2000), nil) | ||
| mockStagingStorage.EXPECT().GetBlockNumbersLessThan(chainID, big.NewInt(2000)).Return([]*big.Int{}, nil) | ||
|
|
||
| blockNumbers, err := committer.getBlockNumbersToCommit(context.Background()) | ||
|
|
||
|
|
@@ -253,6 +260,7 @@ func TestGetSequentialBlockDataToCommit(t *testing.T) { | |
|
|
||
| mockRPC.EXPECT().GetChainID().Return(chainID) | ||
| mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(100), nil) | ||
| mockStagingStorage.EXPECT().GetBlockNumbersLessThan(chainID, big.NewInt(100)).Return([]*big.Int{}, nil) | ||
|
|
||
| blockData := []common.BlockData{ | ||
| {Block: common.Block{Number: big.NewInt(101)}}, | ||
|
|
@@ -288,6 +296,7 @@ func TestGetSequentialBlockDataToCommitWithDuplicateBlocks(t *testing.T) { | |
|
|
||
| mockRPC.EXPECT().GetChainID().Return(chainID) | ||
| mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(100), nil) | ||
| mockStagingStorage.EXPECT().GetBlockNumbersLessThan(chainID, big.NewInt(100)).Return([]*big.Int{}, nil) | ||
|
|
||
| blockData := []common.BlockData{ | ||
| {Block: common.Block{Number: big.NewInt(101)}}, | ||
|
|
@@ -403,6 +412,7 @@ func TestStartCommitter(t *testing.T) { | |
| chainID := big.NewInt(1) | ||
| mockRPC.EXPECT().GetChainID().Return(chainID) | ||
| mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(100), nil) | ||
| mockStagingStorage.EXPECT().GetBlockNumbersLessThan(chainID, big.NewInt(100)).Return([]*big.Int{}, nil) | ||
|
|
||
| blockData := []common.BlockData{ | ||
| {Block: common.Block{Number: big.NewInt(101)}}, | ||
|
|
@@ -437,6 +447,7 @@ func TestCommitterRespectsSIGTERM(t *testing.T) { | |
| chainID := big.NewInt(1) | ||
| mockRPC.EXPECT().GetChainID().Return(chainID) | ||
| mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(100), nil) | ||
| mockStagingStorage.EXPECT().GetBlockNumbersLessThan(chainID, big.NewInt(100)).Return([]*big.Int{}, nil) | ||
|
|
||
| blockData := []common.BlockData{ | ||
| {Block: common.Block{Number: big.NewInt(101)}}, | ||
|
|
@@ -502,6 +513,7 @@ func TestHandleMissingStagingData(t *testing.T) { | |
| mockStagingStorage.EXPECT().InsertStagingData(mock.Anything).Return(nil) | ||
|
|
||
| mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(0), nil) | ||
| mockStagingStorage.EXPECT().GetBlockNumbersLessThan(chainID, big.NewInt(-1)).Return([]*big.Int{}, nil) | ||
| expectedEndBlock := big.NewInt(4) | ||
| mockStagingStorage.EXPECT().GetLastStagedBlockNumber(chainID, expectedEndBlock, big.NewInt(0)).Return(big.NewInt(20), nil) | ||
|
|
||
|
|
@@ -547,6 +559,7 @@ func TestHandleMissingStagingDataIsPolledWithCorrectBatchSize(t *testing.T) { | |
| mockStagingStorage.EXPECT().InsertStagingData(mock.Anything).Return(nil) | ||
|
|
||
| mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(0), nil) | ||
| mockStagingStorage.EXPECT().GetBlockNumbersLessThan(chainID, big.NewInt(-1)).Return([]*big.Int{}, nil) | ||
| expectedEndBlock := big.NewInt(4) | ||
| mockStagingStorage.EXPECT().GetLastStagedBlockNumber(chainID, expectedEndBlock, big.NewInt(0)).Return(big.NewInt(20), nil) | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Sort and guard against nil to make logging/batching safe and deterministic.
Apply this diff:
func (c *Committer) cleanupStrandedBlocks() error { // Get the current max block from main storage latestCommittedBlockNumber, err := c.storage.MainStorage.GetMaxBlockNumber(c.rpc.GetChainID()) - if err != nil { + if err != nil { return fmt.Errorf("error getting max block number from main storage: %v", err) } - if latestCommittedBlockNumber.Sign() == 0 { + if latestCommittedBlockNumber == nil || latestCommittedBlockNumber.Sign() == 0 { // No blocks in main storage yet, nothing to clean up return nil } // Get block numbers from PostgreSQL that are less than latest committed block psqlBlockNumbers, err := c.storage.StagingStorage.GetBlockNumbersLessThan(c.rpc.GetChainID(), latestCommittedBlockNumber) if err != nil { return fmt.Errorf("error getting block numbers from PostgreSQL: %v", err) } if len(psqlBlockNumbers) == 0 { // No stranded blocks in staging return nil } + // Ensure deterministic ordering for logging and batching + sort.Slice(psqlBlockNumbers, func(i, j int) bool { + return psqlBlockNumbers[i].Cmp(psqlBlockNumbers[j]) < 0 + }) + log.Info(). Int("block_count", len(psqlBlockNumbers)). Str("min_block", psqlBlockNumbers[0].String()). Str("max_block", psqlBlockNumbers[len(psqlBlockNumbers)-1].String()). Msg("Found stranded blocks in staging")🤖 Prompt for AI Agents