Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions internal/orchestrator/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,17 @@ func NewPoller(rpc rpc.IRPCClient, storage storage.IStorage) *Poller {
}
untilBlock := big.NewInt(int64(config.Cfg.Poller.UntilBlock))
pollFromBlock := big.NewInt(int64(config.Cfg.Poller.FromBlock))
lastPolledBlock, err := storage.StagingStorage.GetLastStagedBlockNumber(rpc.GetChainID(), untilBlock)
if err != nil || lastPolledBlock == nil || lastPolledBlock.Sign() <= 0 {
lastPolledBlock = new(big.Int).Sub(pollFromBlock, big.NewInt(1)) // needs to include the first block
log.Warn().Err(err).Msgf("No last polled block found, setting to %s", lastPolledBlock.String())
lastPolledBlock := new(big.Int).Sub(pollFromBlock, big.NewInt(1)) // needs to include the first block
if config.Cfg.Poller.ForceFromBlock {
log.Debug().Msgf("ForceFromBlock is enabled, setting last polled block to %s", lastPolledBlock.String())
} else {
// In the case where the start block in staging introduces a gap with main storage,
// This hack allows us to re-poll from the start block without having to delete the staging data
if config.Cfg.Poller.ForceFromBlock {
lastPolledBlock = new(big.Int).Sub(pollFromBlock, big.NewInt(1)) // needs to include the first block
highestBlockFromStaging, err := storage.StagingStorage.GetLastStagedBlockNumber(rpc.GetChainID(), pollFromBlock, untilBlock)
if err != nil || highestBlockFromStaging == nil || highestBlockFromStaging.Sign() <= 0 {
log.Warn().Err(err).Msgf("No last polled block found, setting to %s", lastPolledBlock.String())
} else {
lastPolledBlock = highestBlockFromStaging
log.Debug().Msgf("Last polled block found in staging: %s", lastPolledBlock.String())
}
log.Info().Msgf("Last polled block found: %s", lastPolledBlock.String())
}
return &Poller{
rpc: rpc,
Expand Down
5 changes: 4 additions & 1 deletion internal/storage/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,11 +516,14 @@ func (c *ClickHouseConnector) GetMaxBlockNumber(chainId *big.Int) (maxBlockNumbe
return maxBlockNumber, nil
}

func (c *ClickHouseConnector) GetLastStagedBlockNumber(chainId *big.Int, rangeEnd *big.Int) (maxBlockNumber *big.Int, err error) {
func (c *ClickHouseConnector) GetLastStagedBlockNumber(chainId *big.Int, rangeStart *big.Int, rangeEnd *big.Int) (maxBlockNumber *big.Int, err error) {
query := fmt.Sprintf("SELECT block_number FROM %s.block_data WHERE is_deleted = 0", c.cfg.Database)
if chainId.Sign() > 0 {
query += fmt.Sprintf(" AND chain_id = %s", chainId.String())
}
if rangeStart.Sign() > 0 {
query += fmt.Sprintf(" AND block_number >= %s", rangeStart.String())
}
if rangeEnd.Sign() > 0 {
query += fmt.Sprintf(" AND block_number <= %s", rangeEnd.String())
}
Expand Down
2 changes: 1 addition & 1 deletion internal/storage/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type IStagingStorage interface {
InsertStagingData(data []common.BlockData) error
GetStagingData(qf QueryFilter) (data *[]common.BlockData, err error)
DeleteStagingData(data *[]common.BlockData) error
GetLastStagedBlockNumber(chainId *big.Int, rangeEnd *big.Int) (maxBlockNumber *big.Int, err error)
GetLastStagedBlockNumber(chainId *big.Int, rangeStart *big.Int, rangeEnd *big.Int) (maxBlockNumber *big.Int, err error)
}

type IMainStorage interface {
Expand Down
8 changes: 4 additions & 4 deletions internal/storage/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,14 +194,14 @@ func (m *MemoryConnector) GetMaxBlockNumber(chainId *big.Int) (*big.Int, error)
return maxBlockNumber, nil
}

func IsInRange(num *big.Int, rangeEnd *big.Int) bool {
func IsInRange(num *big.Int, rangeStart *big.Int, rangeEnd *big.Int) bool {
if rangeEnd.Sign() == 0 {
return true
}
return num.Cmp(rangeEnd) <= 0
return num.Cmp(rangeStart) >= 0 && num.Cmp(rangeEnd) <= 0
}

func (m *MemoryConnector) GetLastStagedBlockNumber(chainId *big.Int, rangeEnd *big.Int) (*big.Int, error) {
func (m *MemoryConnector) GetLastStagedBlockNumber(chainId *big.Int, rangeStart *big.Int, rangeEnd *big.Int) (*big.Int, error) {
maxBlockNumber := new(big.Int)
for _, key := range m.cache.Keys() {
if strings.HasPrefix(key, fmt.Sprintf("blockData:%s:", chainId.String())) {
Expand All @@ -210,7 +210,7 @@ func (m *MemoryConnector) GetLastStagedBlockNumber(chainId *big.Int, rangeEnd *b
if !ok {
return nil, fmt.Errorf("failed to parse block number: %s", blockNumberStr)
}
if blockNumber.Cmp(maxBlockNumber) > 0 && IsInRange(blockNumber, rangeEnd) {
if blockNumber.Cmp(maxBlockNumber) > 0 && IsInRange(blockNumber, rangeStart, rangeEnd) {
maxBlockNumber = blockNumber
}
}
Expand Down
29 changes: 15 additions & 14 deletions test/mocks/MockIStagingStorage.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading