diff --git a/insight b/insight deleted file mode 100755 index 56100c7..0000000 Binary files a/insight and /dev/null differ diff --git a/internal/orchestrator/committer.go b/internal/orchestrator/committer.go index ded24b2..7790363 100644 --- a/internal/orchestrator/committer.go +++ b/internal/orchestrator/committer.go @@ -75,6 +75,130 @@ func NewCommitter(rpc rpc.IRPCClient, storage storage.IStorage, opts ...Committe return committer } +func (c *Committer) cleanupStrandedBlocks() error { + // Get the current max block from main storage + latestCommittedBlockNumber, err := c.storage.MainStorage.GetMaxBlockNumber(c.rpc.GetChainID()) + if err != nil { + return fmt.Errorf("error getting max block number from main storage: %v", err) + } + + if latestCommittedBlockNumber.Sign() == 0 { + // No blocks in main storage yet, nothing to clean up + return nil + } + + // Get block numbers from PostgreSQL that are less than latest committed block + psqlBlockNumbers, err := c.storage.StagingStorage.GetBlockNumbersLessThan(c.rpc.GetChainID(), latestCommittedBlockNumber) + if err != nil { + return fmt.Errorf("error getting block numbers from PostgreSQL: %v", err) + } + + if len(psqlBlockNumbers) == 0 { + // No stranded blocks in staging + return nil + } + + log.Info(). + Int("block_count", len(psqlBlockNumbers)). + Str("min_block", psqlBlockNumbers[0].String()). + Str("max_block", psqlBlockNumbers[len(psqlBlockNumbers)-1].String()). + Msg("Found stranded blocks in staging") + + // Process blocks in batches of c.blocksPerCommit, but max 1000 to avoid ClickHouse query limits + batchSize := c.blocksPerCommit + if batchSize > 1000 { + batchSize = 1000 + } + + for i := 0; i < len(psqlBlockNumbers); i += batchSize { + end := i + batchSize + if end > len(psqlBlockNumbers) { + end = len(psqlBlockNumbers) + } + + batchBlockNumbers := psqlBlockNumbers[i:end] + + if err := c.processStrandedBlocksBatch(batchBlockNumbers); err != nil { + return fmt.Errorf("error processing stranded blocks batch %d-%d: %v", i, end-1, err) + } + } + + return nil +} + +func (c *Committer) processStrandedBlocksBatch(blockNumbers []*big.Int) error { + if len(blockNumbers) == 0 { + return nil + } + + log.Debug(). + Int("batch_size", len(blockNumbers)). + Str("min_block", blockNumbers[0].String()). + Str("max_block", blockNumbers[len(blockNumbers)-1].String()). + Msg("Processing stranded blocks batch") + + // Check which blocks exist in ClickHouse + existsInClickHouse, err := c.storage.MainStorage.CheckBlocksExist(c.rpc.GetChainID(), blockNumbers) + if err != nil { + return fmt.Errorf("error checking blocks in ClickHouse: %v", err) + } + + // Get block data from PostgreSQL for blocks that don't exist in ClickHouse + var blocksToCommit []common.BlockData + for _, blockNum := range blockNumbers { + if !existsInClickHouse[blockNum.String()] { + data, err := c.storage.StagingStorage.GetStagingData(storage.QueryFilter{ + BlockNumbers: []*big.Int{blockNum}, + ChainId: c.rpc.GetChainID(), + }) + if err != nil { + return fmt.Errorf("error getting block data from PostgreSQL: %v", err) + } + if len(data) > 0 { + blocksToCommit = append(blocksToCommit, data[0]) + } + } + } + + // Insert blocks into ClickHouse + if len(blocksToCommit) > 0 { + log.Info(). + Int("block_count", len(blocksToCommit)). + Str("min_block", blocksToCommit[0].Block.Number.String()). + Str("max_block", blocksToCommit[len(blocksToCommit)-1].Block.Number.String()). + Msg("Committing stranded blocks to ClickHouse") + + if err := c.storage.MainStorage.InsertBlockData(blocksToCommit); err != nil { + return fmt.Errorf("error inserting blocks into ClickHouse: %v", err) + } + } + + // Delete all blocks from PostgreSQL that were checked (whether they existed in ClickHouse or not) + var blocksToDelete []common.BlockData + for _, blockNum := range blockNumbers { + blocksToDelete = append(blocksToDelete, common.BlockData{ + Block: common.Block{ + ChainId: c.rpc.GetChainID(), + Number: blockNum, + }, + }) + } + + if len(blocksToDelete) > 0 { + log.Info(). + Int("block_count", len(blocksToDelete)). + Str("min_block", blocksToDelete[0].Block.Number.String()). + Str("max_block", blocksToDelete[len(blocksToDelete)-1].Block.Number.String()). + Msg("Deleting stranded blocks from PostgreSQL") + + if err := c.storage.StagingStorage.DeleteStagingData(blocksToDelete); err != nil { + return fmt.Errorf("error deleting blocks from PostgreSQL: %v", err) + } + } + + return nil +} + func (c *Committer) Start(ctx context.Context) { interval := time.Duration(c.triggerIntervalMs) * time.Millisecond @@ -135,6 +259,68 @@ func (c *Committer) getBlockNumbersToCommit(ctx context.Context) ([]*big.Int, er } } + // Get block numbers from PostgreSQL that are less than latest committed block + psqlBlockNumbers, err := c.storage.StagingStorage.GetBlockNumbersLessThan(c.rpc.GetChainID(), latestCommittedBlockNumber) + if err != nil { + return nil, fmt.Errorf("error getting block numbers from PostgreSQL: %v", err) + } + + if len(psqlBlockNumbers) > 0 { + // Check which blocks exist in ClickHouse + existsInClickHouse, err := c.storage.MainStorage.CheckBlocksExist(c.rpc.GetChainID(), psqlBlockNumbers) + if err != nil { + return nil, fmt.Errorf("error checking blocks in ClickHouse: %v", err) + } + + // Get block data from PostgreSQL for blocks that don't exist in ClickHouse + var blocksToCommit []common.BlockData + for _, blockNum := range psqlBlockNumbers { + if !existsInClickHouse[blockNum.String()] { + data, err := c.storage.StagingStorage.GetStagingData(storage.QueryFilter{ + BlockNumbers: []*big.Int{blockNum}, + ChainId: c.rpc.GetChainID(), + }) + if err != nil { + return nil, fmt.Errorf("error getting block data from PostgreSQL: %v", err) + } + if len(data) > 0 { + blocksToCommit = append(blocksToCommit, data[0]) + } + } + } + + // Insert blocks into ClickHouse + if len(blocksToCommit) > 0 { + if err := c.storage.MainStorage.InsertBlockData(blocksToCommit); err != nil { + return nil, fmt.Errorf("error inserting blocks into ClickHouse: %v", err) + } + } + + // Delete all blocks from PostgreSQL that were checked (whether they existed in ClickHouse or not) + var blocksToDelete []common.BlockData + for _, blockNum := range psqlBlockNumbers { + blocksToDelete = append(blocksToDelete, common.BlockData{ + Block: common.Block{ + ChainId: c.rpc.GetChainID(), + Number: blockNum, + }, + }) + } + + if len(blocksToDelete) > 0 { + log.Info(). + Int("block_count", len(blocksToDelete)). + Str("min_block", blocksToDelete[0].Block.Number.String()). + Str("max_block", blocksToDelete[len(blocksToDelete)-1].Block.Number.String()). + Msg("Deleting stranded blocks from PostgreSQL") + + if err := c.storage.StagingStorage.DeleteStagingData(blocksToDelete); err != nil { + log.Error().Err(err).Msg("Failed to delete blocks from PostgreSQL") + } + } + } + + // Continue with normal block range processing startBlock := new(big.Int).Add(latestCommittedBlockNumber, big.NewInt(1)) endBlock, err := c.getBlockToCommitUntil(ctx, latestCommittedBlockNumber) if err != nil { diff --git a/internal/orchestrator/committer_test.go b/internal/orchestrator/committer_test.go index ddcbddd..9b3939c 100644 --- a/internal/orchestrator/committer_test.go +++ b/internal/orchestrator/committer_test.go @@ -46,6 +46,7 @@ func TestGetBlockNumbersToCommit(t *testing.T) { mockRPC.EXPECT().GetChainID().Return(chainID) mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(100), nil) + mockStagingStorage.EXPECT().GetBlockNumbersLessThan(chainID, big.NewInt(100)).Return([]*big.Int{}, nil) blockNumbers, err := committer.getBlockNumbersToCommit(context.Background()) @@ -70,6 +71,7 @@ func TestGetBlockNumbersToCommitWithoutConfiguredAndNotStored(t *testing.T) { mockRPC.EXPECT().GetChainID().Return(chainID) mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(0), nil) + mockStagingStorage.EXPECT().GetBlockNumbersLessThan(chainID, big.NewInt(-1)).Return([]*big.Int{}, nil) blockNumbers, err := committer.getBlockNumbersToCommit(context.Background()) @@ -97,6 +99,7 @@ func TestGetBlockNumbersToCommitWithConfiguredAndNotStored(t *testing.T) { mockRPC.EXPECT().GetChainID().Return(chainID) mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(0), nil) + mockStagingStorage.EXPECT().GetBlockNumbersLessThan(chainID, big.NewInt(49)).Return([]*big.Int{}, nil) blockNumbers, err := committer.getBlockNumbersToCommit(context.Background()) @@ -124,6 +127,7 @@ func TestGetBlockNumbersToCommitWithConfiguredAndStored(t *testing.T) { mockRPC.EXPECT().GetChainID().Return(chainID) mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(2000), nil) + mockStagingStorage.EXPECT().GetBlockNumbersLessThan(chainID, big.NewInt(2000)).Return([]*big.Int{}, nil) blockNumbers, err := committer.getBlockNumbersToCommit(context.Background()) @@ -148,6 +152,7 @@ func TestGetBlockNumbersToCommitWithoutConfiguredAndStored(t *testing.T) { mockRPC.EXPECT().GetChainID().Return(chainID) mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(2000), nil) + mockStagingStorage.EXPECT().GetBlockNumbersLessThan(chainID, big.NewInt(2000)).Return([]*big.Int{}, nil) blockNumbers, err := committer.getBlockNumbersToCommit(context.Background()) @@ -175,6 +180,7 @@ func TestGetBlockNumbersToCommitWithStoredHigherThanInMemory(t *testing.T) { mockRPC.EXPECT().GetChainID().Return(chainID) mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(2000), nil) + mockStagingStorage.EXPECT().GetBlockNumbersLessThan(chainID, big.NewInt(2000)).Return([]*big.Int{}, nil) blockNumbers, err := committer.getBlockNumbersToCommit(context.Background()) @@ -227,6 +233,7 @@ func TestGetBlockNumbersToCommitWithStoredEqualThanInMemory(t *testing.T) { mockRPC.EXPECT().GetChainID().Return(chainID) mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(2000), nil) + mockStagingStorage.EXPECT().GetBlockNumbersLessThan(chainID, big.NewInt(2000)).Return([]*big.Int{}, nil) blockNumbers, err := committer.getBlockNumbersToCommit(context.Background()) @@ -253,6 +260,7 @@ func TestGetSequentialBlockDataToCommit(t *testing.T) { mockRPC.EXPECT().GetChainID().Return(chainID) mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(100), nil) + mockStagingStorage.EXPECT().GetBlockNumbersLessThan(chainID, big.NewInt(100)).Return([]*big.Int{}, nil) blockData := []common.BlockData{ {Block: common.Block{Number: big.NewInt(101)}}, @@ -288,6 +296,7 @@ func TestGetSequentialBlockDataToCommitWithDuplicateBlocks(t *testing.T) { mockRPC.EXPECT().GetChainID().Return(chainID) mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(100), nil) + mockStagingStorage.EXPECT().GetBlockNumbersLessThan(chainID, big.NewInt(100)).Return([]*big.Int{}, nil) blockData := []common.BlockData{ {Block: common.Block{Number: big.NewInt(101)}}, @@ -403,6 +412,7 @@ func TestStartCommitter(t *testing.T) { chainID := big.NewInt(1) mockRPC.EXPECT().GetChainID().Return(chainID) mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(100), nil) + mockStagingStorage.EXPECT().GetBlockNumbersLessThan(chainID, big.NewInt(100)).Return([]*big.Int{}, nil) blockData := []common.BlockData{ {Block: common.Block{Number: big.NewInt(101)}}, @@ -437,6 +447,7 @@ func TestCommitterRespectsSIGTERM(t *testing.T) { chainID := big.NewInt(1) mockRPC.EXPECT().GetChainID().Return(chainID) mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(100), nil) + mockStagingStorage.EXPECT().GetBlockNumbersLessThan(chainID, big.NewInt(100)).Return([]*big.Int{}, nil) blockData := []common.BlockData{ {Block: common.Block{Number: big.NewInt(101)}}, @@ -502,6 +513,7 @@ func TestHandleMissingStagingData(t *testing.T) { mockStagingStorage.EXPECT().InsertStagingData(mock.Anything).Return(nil) mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(0), nil) + mockStagingStorage.EXPECT().GetBlockNumbersLessThan(chainID, big.NewInt(-1)).Return([]*big.Int{}, nil) expectedEndBlock := big.NewInt(4) mockStagingStorage.EXPECT().GetLastStagedBlockNumber(chainID, expectedEndBlock, big.NewInt(0)).Return(big.NewInt(20), nil) @@ -547,6 +559,7 @@ func TestHandleMissingStagingDataIsPolledWithCorrectBatchSize(t *testing.T) { mockStagingStorage.EXPECT().InsertStagingData(mock.Anything).Return(nil) mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(0), nil) + mockStagingStorage.EXPECT().GetBlockNumbersLessThan(chainID, big.NewInt(-1)).Return([]*big.Int{}, nil) expectedEndBlock := big.NewInt(4) mockStagingStorage.EXPECT().GetLastStagedBlockNumber(chainID, expectedEndBlock, big.NewInt(0)).Return(big.NewInt(20), nil) diff --git a/internal/orchestrator/orchestrator.go b/internal/orchestrator/orchestrator.go index 154dc89..22a8720 100644 --- a/internal/orchestrator/orchestrator.go +++ b/internal/orchestrator/orchestrator.go @@ -87,6 +87,14 @@ func (o *Orchestrator) Start() { defer workModeMonitor.UnregisterChannel(committerWorkModeChan) validator := NewValidator(o.rpc, o.storage) committer := NewCommitter(o.rpc, o.storage, WithCommitterWorkModeChan(committerWorkModeChan), WithValidator(validator)) + + // Clean up any stranded blocks in staging in a separate goroutine + go func() { + if err := committer.cleanupStrandedBlocks(); err != nil { + log.Error().Err(err).Msg("Failed to clean up stranded blocks during initialization") + } + }() + committer.Start(ctx) }() } diff --git a/internal/storage/clickhouse.go b/internal/storage/clickhouse.go index 653090f..8246b19 100644 --- a/internal/storage/clickhouse.go +++ b/internal/storage/clickhouse.go @@ -858,6 +858,89 @@ func scanTrace(rows driver.Rows) (common.Trace, error) { return trace, nil } +func (c *ClickHouseConnector) CheckBlocksExist(chainId *big.Int, blockNumbers []*big.Int) (map[string]bool, error) { + if len(blockNumbers) == 0 { + return make(map[string]bool), nil + } + + // Convert block numbers to strings for the query + blockNumberStrings := make([]string, len(blockNumbers)) + for i, bn := range blockNumbers { + blockNumberStrings[i] = bn.String() + } + + // First check blocks table + blocksQuery := fmt.Sprintf(` + SELECT DISTINCT toString(block_number) as block_number_str + FROM %s.blocks + WHERE chain_id = '%s' + AND block_number IN (%s) + AND sign = 1`, c.cfg.Database, chainId.String(), strings.Join(blockNumberStrings, ",")) + + blocksRows, err := c.conn.Query(context.Background(), blocksQuery) + if err != nil { + return nil, fmt.Errorf("error querying blocks table: %v", err) + } + defer blocksRows.Close() + + // Create a map of all block numbers initially set to false + exists := make(map[string]bool) + for _, bn := range blockNumbers { + exists[bn.String()] = false + } + + // Mark blocks that exist in blocks table as true + for blocksRows.Next() { + var blockNumberStr string + if err := blocksRows.Scan(&blockNumberStr); err != nil { + return nil, fmt.Errorf("error scanning blocks table: %v", err) + } + exists[blockNumberStr] = true + } + + if err := blocksRows.Err(); err != nil { + return nil, fmt.Errorf("error iterating blocks table: %v", err) + } + + // Then check inserts_null_table for any remaining blocks + var remainingBlocks []string + for blockNum, found := range exists { + if !found { + remainingBlocks = append(remainingBlocks, blockNum) + } + } + + if len(remainingBlocks) > 0 { + nullQuery := fmt.Sprintf(` + SELECT DISTINCT toString(block.block_number) as block_number_str + FROM %s.inserts_null_table + WHERE chain_id = '%s' + AND block.block_number IN (%s) + AND sign = 1`, c.cfg.Database, chainId.String(), strings.Join(remainingBlocks, ",")) + + nullRows, err := c.conn.Query(context.Background(), nullQuery) + if err != nil { + return nil, fmt.Errorf("error querying inserts_null_table: %v", err) + } + defer nullRows.Close() + + // Mark blocks that exist in inserts_null_table as true + for nullRows.Next() { + var blockNumberStr string + if err := nullRows.Scan(&blockNumberStr); err != nil { + return nil, fmt.Errorf("error scanning inserts_null_table: %v", err) + } + exists[blockNumberStr] = true + } + + if err := nullRows.Err(); err != nil { + return nil, fmt.Errorf("error iterating inserts_null_table: %v", err) + } + } + + return exists, nil +} + func (c *ClickHouseConnector) GetMaxBlockNumber(chainId *big.Int) (maxBlockNumber *big.Int, err error) { tableName := c.getTableName(chainId, "blocks") query := fmt.Sprintf("SELECT block_number FROM %s.%s WHERE chain_id = ? ORDER BY block_number DESC LIMIT 1", c.cfg.Database, tableName) @@ -1075,6 +1158,37 @@ func (c *ClickHouseConnector) DeleteStagingData(data []common.BlockData) error { return batch.Send() } +func (c *ClickHouseConnector) GetBlockNumbersLessThan(chainId *big.Int, blockNumber *big.Int) ([]*big.Int, error) { + query := fmt.Sprintf(` + SELECT DISTINCT block_number + FROM %s.block_data FINAL + WHERE chain_id = ? + AND block_number < ? + AND is_deleted = 0 + ORDER BY block_number ASC`, c.cfg.Database) + + rows, err := c.conn.Query(context.Background(), query, chainId, blockNumber) + if err != nil { + return nil, fmt.Errorf("error querying block_data: %v", err) + } + defer rows.Close() + + var blockNumbers []*big.Int + for rows.Next() { + var blockNum *big.Int + if err := rows.Scan(&blockNum); err != nil { + return nil, fmt.Errorf("error scanning block number: %v", err) + } + blockNumbers = append(blockNumbers, blockNum) + } + + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("error iterating rows: %v", err) + } + + return blockNumbers, nil +} + func (c *ClickHouseConnector) GetLastReorgCheckedBlockNumber(chainId *big.Int) (*big.Int, error) { query := fmt.Sprintf("SELECT cursor_value FROM %s.cursors FINAL WHERE cursor_type = 'reorg'", c.cfg.Database) if chainId.Sign() > 0 { diff --git a/internal/storage/connector.go b/internal/storage/connector.go index 15813d0..61a8df7 100644 --- a/internal/storage/connector.go +++ b/internal/storage/connector.go @@ -83,6 +83,7 @@ type IStagingStorage interface { GetStagingData(qf QueryFilter) (data []common.BlockData, err error) DeleteStagingData(data []common.BlockData) error GetLastStagedBlockNumber(chainId *big.Int, rangeStart *big.Int, rangeEnd *big.Int) (maxBlockNumber *big.Int, err error) + GetBlockNumbersLessThan(chainId *big.Int, blockNumber *big.Int) ([]*big.Int, error) } type IMainStorage interface { @@ -116,6 +117,10 @@ type IMainStorage interface { * Gets full block data with transactions, logs and traces. */ GetFullBlockData(chainId *big.Int, blockNumbers []*big.Int) (blocks []common.BlockData, err error) + /** + * Checks if blocks exist in the storage. + */ + CheckBlocksExist(chainId *big.Int, blockNumbers []*big.Int) (map[string]bool, error) } func NewStorageConnector(cfg *config.StorageConfig) (IStorage, error) { diff --git a/internal/storage/postgres.go b/internal/storage/postgres.go index 9498e8d..ca7da13 100644 --- a/internal/storage/postgres.go +++ b/internal/storage/postgres.go @@ -61,7 +61,7 @@ func NewPostgresConnector(cfg *config.PostgresConfig) (*PostgresConnector, error func (p *PostgresConnector) GetBlockFailures(qf QueryFilter) ([]common.BlockFailure, error) { query := `SELECT chain_id, block_number, last_error_timestamp, failure_count, reason - FROM block_failures` + FROM block_failures WHERE 1=1` args := []interface{}{} argCount := 0 @@ -73,11 +73,13 @@ func (p *PostgresConnector) GetBlockFailures(qf QueryFilter) ([]common.BlockFail } if len(qf.BlockNumbers) > 0 { - blockNumberStrs := make([]string, len(qf.BlockNumbers)) + placeholders := make([]string, len(qf.BlockNumbers)) for i, bn := range qf.BlockNumbers { - blockNumberStrs[i] = bn.String() + argCount++ + placeholders[i] = fmt.Sprintf("$%d", argCount) + args = append(args, bn.String()) } - query += fmt.Sprintf(" AND block_number IN (%s)", strings.Join(blockNumberStrs, ",")) + query += fmt.Sprintf(" AND block_number IN (%s)", strings.Join(placeholders, ",")) } if qf.SortBy != "" { @@ -263,6 +265,40 @@ func (p *PostgresConnector) InsertStagingData(data []common.BlockData) error { return err } +func (p *PostgresConnector) GetBlockNumbersLessThan(chainId *big.Int, blockNumber *big.Int) ([]*big.Int, error) { + query := `SELECT DISTINCT block_number + FROM block_data + WHERE chain_id = $1 + AND block_number < $2 + ORDER BY block_number ASC` + + rows, err := p.db.Query(query, chainId.String(), blockNumber.String()) + if err != nil { + return nil, fmt.Errorf("error querying block_data: %v", err) + } + defer rows.Close() + + var blockNumbers []*big.Int + for rows.Next() { + var blockNumberStr string + if err := rows.Scan(&blockNumberStr); err != nil { + return nil, fmt.Errorf("error scanning block number: %v", err) + } + + blockNum, ok := new(big.Int).SetString(blockNumberStr, 10) + if !ok { + return nil, fmt.Errorf("failed to parse block number: %s", blockNumberStr) + } + blockNumbers = append(blockNumbers, blockNum) + } + + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("error iterating rows: %v", err) + } + + return blockNumbers, nil +} + func (p *PostgresConnector) GetStagingData(qf QueryFilter) ([]common.BlockData, error) { // No need to check is_deleted since we're using hard deletes for staging data query := `SELECT data FROM block_data WHERE 1=1` diff --git a/test/mocks/MockIMainStorage.go b/test/mocks/MockIMainStorage.go index 679345c..765c022 100644 --- a/test/mocks/MockIMainStorage.go +++ b/test/mocks/MockIMainStorage.go @@ -970,6 +970,65 @@ func (_c *MockIMainStorage_ReplaceBlockData_Call) RunAndReturn(run func([]common return _c } +// CheckBlocksExist provides a mock function with given fields: chainId, blockNumbers +func (_m *MockIMainStorage) CheckBlocksExist(chainId *big.Int, blockNumbers []*big.Int) (map[string]bool, error) { + ret := _m.Called(chainId, blockNumbers) + + if len(ret) == 0 { + panic("no return value specified for CheckBlocksExist") + } + + var r0 map[string]bool + var r1 error + if rf, ok := ret.Get(0).(func(*big.Int, []*big.Int) (map[string]bool, error)); ok { + return rf(chainId, blockNumbers) + } + if rf, ok := ret.Get(0).(func(*big.Int, []*big.Int) map[string]bool); ok { + r0 = rf(chainId, blockNumbers) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(map[string]bool) + } + } + + if rf, ok := ret.Get(1).(func(*big.Int, []*big.Int) error); ok { + r1 = rf(chainId, blockNumbers) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockIMainStorage_CheckBlocksExist_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckBlocksExist' +type MockIMainStorage_CheckBlocksExist_Call struct { + *mock.Call +} + +// CheckBlocksExist is a helper method to define mock.On call +// - chainId *big.Int +// - blockNumbers []*big.Int +func (_e *MockIMainStorage_Expecter) CheckBlocksExist(chainId interface{}, blockNumbers interface{}) *MockIMainStorage_CheckBlocksExist_Call { + return &MockIMainStorage_CheckBlocksExist_Call{Call: _e.mock.On("CheckBlocksExist", chainId, blockNumbers)} +} + +func (_c *MockIMainStorage_CheckBlocksExist_Call) Run(run func(chainId *big.Int, blockNumbers []*big.Int)) *MockIMainStorage_CheckBlocksExist_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(*big.Int), args[1].([]*big.Int)) + }) + return _c +} + +func (_c *MockIMainStorage_CheckBlocksExist_Call) Return(_a0 map[string]bool, _a1 error) *MockIMainStorage_CheckBlocksExist_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockIMainStorage_CheckBlocksExist_Call) RunAndReturn(run func(*big.Int, []*big.Int) (map[string]bool, error)) *MockIMainStorage_CheckBlocksExist_Call { + _c.Call.Return(run) + return _c +} + // NewMockIMainStorage creates a new instance of MockIMainStorage. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewMockIMainStorage(t interface { diff --git a/test/mocks/MockIStagingStorage.go b/test/mocks/MockIStagingStorage.go index 090f8f2..bae6236 100644 --- a/test/mocks/MockIStagingStorage.go +++ b/test/mocks/MockIStagingStorage.go @@ -236,6 +236,65 @@ func (_c *MockIStagingStorage_InsertStagingData_Call) RunAndReturn(run func([]co return _c } +// GetBlockNumbersLessThan provides a mock function with given fields: chainId, blockNumber +func (_m *MockIStagingStorage) GetBlockNumbersLessThan(chainId *big.Int, blockNumber *big.Int) ([]*big.Int, error) { + ret := _m.Called(chainId, blockNumber) + + if len(ret) == 0 { + panic("no return value specified for GetBlockNumbersLessThan") + } + + var r0 []*big.Int + var r1 error + if rf, ok := ret.Get(0).(func(*big.Int, *big.Int) ([]*big.Int, error)); ok { + return rf(chainId, blockNumber) + } + if rf, ok := ret.Get(0).(func(*big.Int, *big.Int) []*big.Int); ok { + r0 = rf(chainId, blockNumber) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*big.Int) + } + } + + if rf, ok := ret.Get(1).(func(*big.Int, *big.Int) error); ok { + r1 = rf(chainId, blockNumber) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockIStagingStorage_GetBlockNumbersLessThan_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetBlockNumbersLessThan' +type MockIStagingStorage_GetBlockNumbersLessThan_Call struct { + *mock.Call +} + +// GetBlockNumbersLessThan is a helper method to define mock.On call +// - chainId *big.Int +// - blockNumber *big.Int +func (_e *MockIStagingStorage_Expecter) GetBlockNumbersLessThan(chainId interface{}, blockNumber interface{}) *MockIStagingStorage_GetBlockNumbersLessThan_Call { + return &MockIStagingStorage_GetBlockNumbersLessThan_Call{Call: _e.mock.On("GetBlockNumbersLessThan", chainId, blockNumber)} +} + +func (_c *MockIStagingStorage_GetBlockNumbersLessThan_Call) Run(run func(chainId *big.Int, blockNumber *big.Int)) *MockIStagingStorage_GetBlockNumbersLessThan_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(*big.Int), args[1].(*big.Int)) + }) + return _c +} + +func (_c *MockIStagingStorage_GetBlockNumbersLessThan_Call) Return(_a0 []*big.Int, _a1 error) *MockIStagingStorage_GetBlockNumbersLessThan_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockIStagingStorage_GetBlockNumbersLessThan_Call) RunAndReturn(run func(*big.Int, *big.Int) ([]*big.Int, error)) *MockIStagingStorage_GetBlockNumbersLessThan_Call { + _c.Call.Return(run) + return _c +} + // NewMockIStagingStorage creates a new instance of MockIStagingStorage. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewMockIStagingStorage(t interface {