Skip to content

Commit e6da3c4

Browse files
committed
get block range from staging instead
1 parent bb0a3fe commit e6da3c4

File tree

5 files changed

+36
-8
lines changed

5 files changed

+36
-8
lines changed

internal/orchestrator/committer.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -380,13 +380,12 @@ func (c *Committer) getSequentialBlockDataToPublish(ctx context.Context) ([]comm
380380
}
381381

382382
endBlock := new(big.Int).Add(startBlock, big.NewInt(int64(c.blocksPerCommit-1)))
383-
blockCount := new(big.Int).Sub(endBlock, startBlock).Int64() + 1
384-
blockNumbers := make([]*big.Int, blockCount)
385-
for i := int64(0); i < blockCount; i++ {
386-
blockNumbers[i] = new(big.Int).Add(startBlock, big.NewInt(i))
387-
}
388383

389-
blocksData, err := c.storage.StagingStorage.GetStagingData(storage.QueryFilter{ChainId: chainID, BlockNumbers: blockNumbers})
384+
blocksData, err := c.storage.StagingStorage.GetStagingData(storage.QueryFilter{
385+
ChainId: chainID,
386+
StartBlock: startBlock,
387+
EndBlock: endBlock,
388+
})
390389
if err != nil {
391390
return nil, fmt.Errorf("error fetching blocks to publish: %v", err)
392391
}

internal/storage/clickhouse.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1014,8 +1014,17 @@ func (c *ClickHouseConnector) InsertStagingData(data []common.BlockData) error {
10141014
}
10151015

10161016
func (c *ClickHouseConnector) GetStagingData(qf QueryFilter) ([]common.BlockData, error) {
1017-
query := fmt.Sprintf("SELECT data FROM %s.block_data FINAL WHERE block_number IN (%s) AND is_deleted = 0",
1018-
c.cfg.Database, getBlockNumbersStringArray(qf.BlockNumbers))
1017+
var query string
1018+
1019+
if len(qf.BlockNumbers) > 0 {
1020+
query = fmt.Sprintf("SELECT data FROM %s.block_data FINAL WHERE block_number IN (%s) AND is_deleted = 0",
1021+
c.cfg.Database, getBlockNumbersStringArray(qf.BlockNumbers))
1022+
} else if qf.StartBlock != nil && qf.EndBlock != nil {
1023+
query = fmt.Sprintf("SELECT data FROM %s.block_data FINAL WHERE block_number >= %s AND block_number <= %s AND is_deleted = 0",
1024+
c.cfg.Database, qf.StartBlock.String(), qf.EndBlock.String())
1025+
} else {
1026+
return nil, fmt.Errorf("either BlockNumbers or StartBlock/EndBlock must be provided")
1027+
}
10191028

10201029
if qf.ChainId.Sign() != 0 {
10211030
query += fmt.Sprintf(" AND chain_id = %s", qf.ChainId.String())

internal/storage/connector.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ import (
1111
type QueryFilter struct {
1212
ChainId *big.Int
1313
BlockNumbers []*big.Int
14+
StartBlock *big.Int
15+
EndBlock *big.Int
1416
FilterParams map[string]string
1517
GroupBy []string
1618
SortBy string

internal/storage/postgres.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,13 @@ func (p *PostgresConnector) GetStagingData(qf QueryFilter) ([]common.BlockData,
284284
args = append(args, bn.String())
285285
}
286286
query += fmt.Sprintf(" AND block_number IN (%s)", strings.Join(placeholders, ","))
287+
} else if qf.StartBlock != nil && qf.EndBlock != nil {
288+
argCount++
289+
query += fmt.Sprintf(" AND block_number >= $%d", argCount)
290+
args = append(args, qf.StartBlock.String())
291+
argCount++
292+
query += fmt.Sprintf(" AND block_number <= $%d", argCount)
293+
args = append(args, qf.EndBlock.String())
287294
}
288295

289296
query += " ORDER BY block_number ASC"

internal/storage/postgres_connector_test.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,17 @@ func TestPostgresConnector_StagingData(t *testing.T) {
167167
assert.NoError(t, err)
168168
assert.Len(t, retrievedData, 2)
169169

170+
// Test GetStagingData with StartBlock and EndBlock
171+
rangeQf := QueryFilter{
172+
ChainId: big.NewInt(1),
173+
StartBlock: big.NewInt(100),
174+
EndBlock: big.NewInt(101),
175+
}
176+
177+
retrievedDataRange, err := conn.GetStagingData(rangeQf)
178+
assert.NoError(t, err)
179+
assert.Len(t, retrievedDataRange, 2)
180+
170181
// Test GetLastStagedBlockNumber
171182
lastBlock, err := conn.GetLastStagedBlockNumber(big.NewInt(1), big.NewInt(90), big.NewInt(110))
172183
assert.NoError(t, err)

0 commit comments

Comments
 (0)