diff --git a/internal/orchestrator/committer.go b/internal/orchestrator/committer.go index 7f8d344..9c7c122 100644 --- a/internal/orchestrator/committer.go +++ b/internal/orchestrator/committer.go @@ -168,18 +168,17 @@ func (c *Committer) commit(blockData []common.BlockData) error { } log.Debug().Msgf("Committing %d blocks", len(blockNumbers)) + if err := c.storage.MainStorage.InsertBlockData(blockData); err != nil { + log.Error().Err(err).Msgf("Failed to commit blocks: %v", blockNumbers) + return fmt.Errorf("error saving data to main storage: %v", err) + } + go func() { if err := c.publisher.PublishBlockData(blockData); err != nil { log.Error().Err(err).Msg("Failed to publish block data to kafka") } }() - // TODO if next parts (saving or deleting) fail, we'll have to do a rollback - if err := c.storage.MainStorage.InsertBlockData(blockData); err != nil { - log.Error().Err(err).Msgf("Failed to commit blocks: %v", blockNumbers) - return fmt.Errorf("error saving data to main storage: %v", err) - } - if err := c.storage.StagingStorage.DeleteStagingData(blockData); err != nil { return fmt.Errorf("error deleting data from staging storage: %v", err) } diff --git a/internal/orchestrator/reorg_handler.go b/internal/orchestrator/reorg_handler.go index 99d1f2c..866f707 100644 --- a/internal/orchestrator/reorg_handler.go +++ b/internal/orchestrator/reorg_handler.go @@ -281,13 +281,10 @@ func (rh *ReorgHandler) handleReorg(reorgedBlockNumbers []*big.Int) error { }) blocksToDelete = append(blocksToDelete, result.BlockNumber) } - // TODO make delete and insert atomic - deletedBlockData, err := rh.storage.MainStorage.DeleteBlockData(rh.rpc.GetChainID(), blocksToDelete) + + deletedBlockData, err := rh.storage.MainStorage.ReplaceBlockData(data) if err != nil { - return fmt.Errorf("error deleting data for blocks %v: %w", blocksToDelete, err) - } - if err := rh.storage.MainStorage.InsertBlockData(data); err != nil { - return fmt.Errorf("error saving data to main storage: %w", err) + return fmt.Errorf("error replacing reorged data for blocks %v: %w", blocksToDelete, err) } if rh.publisher != nil { // Publish block data asynchronously diff --git a/internal/orchestrator/reorg_handler_test.go b/internal/orchestrator/reorg_handler_test.go index 054623a..5872bec 100644 --- a/internal/orchestrator/reorg_handler_test.go +++ b/internal/orchestrator/reorg_handler_test.go @@ -504,8 +504,7 @@ func TestHandleReorg(t *testing.T) { }) mockOrchestratorStorage.EXPECT().GetLastReorgCheckedBlockNumber(big.NewInt(1)).Return(big.NewInt(3), nil) - mockMainStorage.EXPECT().DeleteBlockData(big.NewInt(1), mock.Anything).Return([]common.BlockData{}, nil) - mockMainStorage.EXPECT().InsertBlockData(mock.Anything).Return(nil) + mockMainStorage.EXPECT().ReplaceBlockData(mock.Anything).Return([]common.BlockData{}, nil) handler := NewReorgHandler(mockRPC, mockStorage) err := handler.handleReorg([]*big.Int{big.NewInt(1), big.NewInt(2), big.NewInt(3)}) @@ -611,12 +610,9 @@ func TestHandleReorgWithSingleBlockReorg(t *testing.T) { {BlockNumber: big.NewInt(105), Data: common.BlockData{}}, }) - mockMainStorage.EXPECT().DeleteBlockData(big.NewInt(1), mock.MatchedBy(func(blocks []*big.Int) bool { + mockMainStorage.EXPECT().ReplaceBlockData(mock.MatchedBy(func(blocks []common.BlockData) bool { return len(blocks) == 1 })).Return([]common.BlockData{}, nil) - mockMainStorage.EXPECT().InsertBlockData(mock.MatchedBy(func(data []common.BlockData) bool { - return len(data) == 1 - })).Return(nil) handler := NewReorgHandler(mockRPC, mockStorage) mostRecentBlockChecked, err := handler.RunFromBlock(big.NewInt(99)) @@ -679,12 +675,9 @@ func TestHandleReorgWithLatestBlockReorged(t *testing.T) { {BlockNumber: big.NewInt(108), Data: common.BlockData{}}, }) - mockMainStorage.EXPECT().DeleteBlockData(big.NewInt(1), mock.MatchedBy(func(blocks []*big.Int) bool { - return len(blocks) == 8 - })).Return([]common.BlockData{}, nil) - mockMainStorage.EXPECT().InsertBlockData(mock.MatchedBy(func(data []common.BlockData) bool { + mockMainStorage.EXPECT().ReplaceBlockData(mock.MatchedBy(func(data []common.BlockData) bool { return len(data) == 8 - })).Return(nil) + })).Return([]common.BlockData{}, nil) handler := NewReorgHandler(mockRPC, mockStorage) mostRecentBlockChecked, err := handler.RunFromBlock(big.NewInt(99)) @@ -743,12 +736,9 @@ func TestHandleReorgWithManyBlocks(t *testing.T) { {BlockNumber: big.NewInt(103), Data: common.BlockData{}}, }) - mockMainStorage.EXPECT().DeleteBlockData(big.NewInt(1), mock.MatchedBy(func(blocks []*big.Int) bool { - return len(blocks) == 5 - })).Return([]common.BlockData{}, nil) - mockMainStorage.EXPECT().InsertBlockData(mock.MatchedBy(func(data []common.BlockData) bool { + mockMainStorage.EXPECT().ReplaceBlockData(mock.MatchedBy(func(data []common.BlockData) bool { return len(data) == 5 - })).Return(nil) + })).Return([]common.BlockData{}, nil) handler := NewReorgHandler(mockRPC, mockStorage) mostRecentBlockChecked, err := handler.RunFromBlock(big.NewInt(99)) diff --git a/internal/storage/clickhouse.go b/internal/storage/clickhouse.go index 9c2f774..68bb505 100644 --- a/internal/storage/clickhouse.go +++ b/internal/storage/clickhouse.go @@ -1007,252 +1007,289 @@ func (c *ClickHouseConnector) GetBlockHeadersDescending(chainId *big.Int, from * return blockHeaders, nil } -func (c *ClickHouseConnector) DeleteBlockData(chainId *big.Int, blockNumbers []*big.Int) ([]common.BlockData, error) { - var deleteErr error - var deleteErrMutex sync.Mutex +func (c *ClickHouseConnector) ReplaceBlockData(data []common.BlockData) ([]common.BlockData, error) { + if len(data) == 0 { + return nil, nil + } + chainId := data[0].Block.ChainId + + var fetchErr error + var fetchErrMutex sync.Mutex + var deletedDataMutex sync.Mutex var wg sync.WaitGroup wg.Add(4) - // Create a map to store block data that will be deleted deletedBlockDataByNumber := make(map[*big.Int]common.BlockData) + + blockNumbers := make([]*big.Int, len(data)) + for i, blockData := range data { + blockNumbers[i] = blockData.Block.Number + } go func() { defer wg.Done() - deletedBlocks, err := c.deleteBlocks(chainId, blockNumbers) + blocksQueryResult, err := c.GetBlocks(QueryFilter{ + ChainId: chainId, + BlockNumbers: blockNumbers, + ForceConsistentData: true, + }, "*") if err != nil { - deleteErrMutex.Lock() - deleteErr = fmt.Errorf("error deleting blocks: %v", err) - deleteErrMutex.Unlock() - } - for _, block := range deletedBlocks { - data := deletedBlockDataByNumber[block.Number] - data.Block = block - deletedBlockDataByNumber[block.Number] = data + fetchErrMutex.Lock() + fetchErr = fmt.Errorf("error fetching blocks: %v", err) + fetchErrMutex.Unlock() + } + for _, block := range blocksQueryResult.Data { + deletedDataMutex.Lock() + deletedData := deletedBlockDataByNumber[block.Number] + block.Sign = -1 + deletedData.Block = block + deletedBlockDataByNumber[block.Number] = deletedData + deletedDataMutex.Unlock() } }() go func() { defer wg.Done() - deletedLogs, err := c.deleteLogs(chainId, blockNumbers) + logsQueryResult, err := c.GetLogs(QueryFilter{ + ChainId: chainId, + BlockNumbers: blockNumbers, + ForceConsistentData: true, + }, "*") if err != nil { - deleteErrMutex.Lock() - deleteErr = fmt.Errorf("error deleting logs: %v", err) - deleteErrMutex.Unlock() - } - for _, log := range deletedLogs { - data := deletedBlockDataByNumber[log.BlockNumber] - data.Logs = append(data.Logs, log) - deletedBlockDataByNumber[log.BlockNumber] = data + fetchErrMutex.Lock() + fetchErr = fmt.Errorf("error fetching logs: %v", err) + fetchErrMutex.Unlock() + } + for _, log := range logsQueryResult.Data { + deletedDataMutex.Lock() + deletedData := deletedBlockDataByNumber[log.BlockNumber] + log.Sign = -1 + deletedData.Logs = append(deletedData.Logs, log) + deletedBlockDataByNumber[log.BlockNumber] = deletedData + deletedDataMutex.Unlock() } }() go func() { defer wg.Done() - deletedTransactions, err := c.deleteTransactions(chainId, blockNumbers) + txsQueryResult, err := c.GetTransactions(QueryFilter{ + ChainId: chainId, + BlockNumbers: blockNumbers, + ForceConsistentData: true, + }, "*") if err != nil { - deleteErrMutex.Lock() - deleteErr = fmt.Errorf("error deleting transactions: %v", err) - deleteErrMutex.Unlock() - } - for _, tx := range deletedTransactions { - data := deletedBlockDataByNumber[tx.BlockNumber] - data.Transactions = append(data.Transactions, tx) - deletedBlockDataByNumber[tx.BlockNumber] = data + fetchErrMutex.Lock() + fetchErr = fmt.Errorf("error fetching transactions: %v", err) + fetchErrMutex.Unlock() + } + for _, tx := range txsQueryResult.Data { + deletedDataMutex.Lock() + deletedData := deletedBlockDataByNumber[tx.BlockNumber] + tx.Sign = -1 + deletedData.Transactions = append(deletedData.Transactions, tx) + deletedBlockDataByNumber[tx.BlockNumber] = deletedData + deletedDataMutex.Unlock() } }() go func() { defer wg.Done() - deletedTraces, err := c.deleteTraces(chainId, blockNumbers) + tracesQueryResult, err := c.GetTraces(QueryFilter{ + ChainId: chainId, + BlockNumbers: blockNumbers, + ForceConsistentData: true, + }, "*") if err != nil { - deleteErrMutex.Lock() - deleteErr = fmt.Errorf("error deleting traces: %v", err) - deleteErrMutex.Unlock() - } - for _, trace := range deletedTraces { - data := deletedBlockDataByNumber[trace.BlockNumber] - data.Traces = append(data.Traces, trace) - deletedBlockDataByNumber[trace.BlockNumber] = data + fetchErrMutex.Lock() + fetchErr = fmt.Errorf("error fetching traces: %v", err) + fetchErrMutex.Unlock() + } + for _, trace := range tracesQueryResult.Data { + deletedDataMutex.Lock() + deletedData := deletedBlockDataByNumber[trace.BlockNumber] + trace.Sign = -1 + deletedData.Traces = append(deletedData.Traces, trace) + deletedBlockDataByNumber[trace.BlockNumber] = deletedData + deletedDataMutex.Unlock() } }() wg.Wait() - if deleteErr != nil { - return nil, deleteErr + if fetchErr != nil { + return nil, fetchErr } deletedBlockData := make([]common.BlockData, 0, len(deletedBlockDataByNumber)) - for _, data := range deletedBlockDataByNumber { - deletedBlockData = append(deletedBlockData, data) + for _, deletedData := range deletedBlockDataByNumber { + deletedBlockData = append(deletedBlockData, deletedData) + data = append(data, deletedData) } - return deletedBlockData, nil -} -func (c *ClickHouseConnector) deleteBlocks(chainId *big.Int, blockNumbers []*big.Int) ([]common.Block, error) { - blocksQueryResult, err := c.GetBlocks(QueryFilter{ - ChainId: chainId, - BlockNumbers: blockNumbers, - ForceConsistentData: true, - }, "*") - if err != nil { - return nil, err - } - if len(blocksQueryResult.Data) == 0 { - return nil, nil // No blocks to delete - } - err = c.insertBlocks(blocksQueryResult.Data, InsertOptions{ - AsDeleted: true, - }) - if err != nil { - return nil, err - } - return blocksQueryResult.Data, nil -} - -func (c *ClickHouseConnector) deleteLogs(chainId *big.Int, blockNumbers []*big.Int) ([]common.Log, error) { - logsQueryResult, err := c.GetLogs(QueryFilter{ - ChainId: chainId, - BlockNumbers: blockNumbers, - ForceConsistentData: true, - }, "*") - if err != nil { - return nil, err - } - if len(logsQueryResult.Data) == 0 { - return nil, nil // No logs to delete - } - err = c.insertLogs(logsQueryResult.Data, InsertOptions{ - AsDeleted: true, - }) - if err != nil { - return nil, err - } - return logsQueryResult.Data, nil -} - -func (c *ClickHouseConnector) deleteTransactions(chainId *big.Int, blockNumbers []*big.Int) ([]common.Transaction, error) { - txsQueryResult, err := c.GetTransactions(QueryFilter{ - ChainId: chainId, - BlockNumbers: blockNumbers, - ForceConsistentData: true, - }, "*") - if err != nil { - return nil, err - } - if len(txsQueryResult.Data) == 0 { - return nil, nil // No transactions to delete - } - err = c.insertTransactions(txsQueryResult.Data, InsertOptions{ - AsDeleted: true, - }) - if err != nil { - return nil, err - } - return txsQueryResult.Data, nil -} - -func (c *ClickHouseConnector) deleteTraces(chainId *big.Int, blockNumbers []*big.Int) ([]common.Trace, error) { - tracesQueryResult, err := c.GetTraces(QueryFilter{ - ChainId: chainId, - BlockNumbers: blockNumbers, - ForceConsistentData: true, - }, "*") - if err != nil { - return nil, err - } - if len(tracesQueryResult.Data) == 0 { - return nil, nil // No traces to delete - } - err = c.insertTraces(tracesQueryResult.Data, InsertOptions{ - AsDeleted: true, - }) - if err != nil { - return nil, err + insertErr := c.InsertBlockData(data) + if insertErr != nil { + return nil, insertErr } - return tracesQueryResult.Data, nil + return deletedBlockData, nil } -// TODO make this atomic func (c *ClickHouseConnector) InsertBlockData(data []common.BlockData) error { if len(data) == 0 { return nil } - blocks := make([]common.Block, 0, len(data)) - logs := make([]common.Log, 0) - transactions := make([]common.Transaction, 0) - traces := make([]common.Trace, 0) - for _, blockData := range data { - blocks = append(blocks, blockData.Block) - logs = append(logs, blockData.Logs...) - transactions = append(transactions, blockData.Transactions...) - traces = append(traces, blockData.Traces...) + tableName := c.getTableName(data[0].Block.ChainId, "inserts_null_table") + columns := []string{ + "chain_id", "block", "transactions", "logs", "traces", "sign", "insert_timestamp", } + query := fmt.Sprintf("INSERT INTO %s.%s (%s)", c.cfg.Database, tableName, strings.Join(columns, ", ")) + for i := 0; i < len(data); i += c.cfg.MaxRowsPerInsert { + end := i + c.cfg.MaxRowsPerInsert + if end > len(data) { + end = len(data) + } - var saveErr error - var saveErrMutex sync.Mutex - var wg sync.WaitGroup + batch, err := c.conn.PrepareBatch(context.Background(), query) + if err != nil { + return err + } + + for _, blockData := range data[i:end] { + block := blockData.Block - if len(blocks) > 0 { - wg.Add(1) - go func() { - defer wg.Done() - if err := c.insertBlocks(blocks, InsertOptions{ - AsDeleted: false, - }); err != nil { - saveErrMutex.Lock() - saveErr = fmt.Errorf("error inserting blocks: %v", err) - saveErrMutex.Unlock() + // Prepare block tuple + blockTuple := []interface{}{ + block.Number, + block.Timestamp, + block.Hash, + block.ParentHash, + block.Sha3Uncles, + block.Nonce, + block.MixHash, + block.Miner, + block.StateRoot, + block.TransactionsRoot, + block.ReceiptsRoot, + block.LogsBloom, + block.Size, + block.ExtraData, + block.Difficulty, + block.TotalDifficulty, + block.TransactionCount, + block.GasLimit, + block.GasUsed, + block.WithdrawalsRoot, + block.BaseFeePerGas, } - }() - } - - if len(logs) > 0 { - wg.Add(1) - go func() { - defer wg.Done() - if err := c.insertLogs(logs, InsertOptions{ - AsDeleted: false, - }); err != nil { - saveErrMutex.Lock() - saveErr = fmt.Errorf("error inserting logs: %v", err) - saveErrMutex.Unlock() + + // Prepare transactions array + transactions := make([][]interface{}, len(blockData.Transactions)) + for j, tx := range blockData.Transactions { + transactions[j] = []interface{}{ + tx.Hash, + tx.Nonce, + tx.BlockHash, + tx.BlockNumber, + tx.BlockTimestamp, + tx.TransactionIndex, + tx.FromAddress, + tx.ToAddress, + tx.Value, + tx.Gas, + tx.GasPrice, + tx.Data, + tx.FunctionSelector, + tx.MaxFeePerGas, + tx.MaxPriorityFeePerGas, + tx.MaxFeePerBlobGas, + tx.BlobVersionedHashes, + tx.TransactionType, + tx.R, + tx.S, + tx.V, + tx.AccessListJson, + tx.ContractAddress, + tx.GasUsed, + tx.CumulativeGasUsed, + tx.EffectiveGasPrice, + tx.BlobGasUsed, + tx.BlobGasPrice, + tx.LogsBloom, + tx.Status, + } } - }() - } - - if len(transactions) > 0 { - wg.Add(1) - go func() { - defer wg.Done() - if err := c.insertTransactions(transactions, InsertOptions{ - AsDeleted: false, - }); err != nil { - saveErrMutex.Lock() - saveErr = fmt.Errorf("error inserting transactions: %v", err) - saveErrMutex.Unlock() + + // Prepare logs array + logs := make([][]interface{}, len(blockData.Logs)) + for j, log := range blockData.Logs { + logs[j] = []interface{}{ + log.BlockNumber, + log.BlockHash, + log.BlockTimestamp, + log.TransactionHash, + log.TransactionIndex, + log.LogIndex, + log.Address, + log.Data, + log.Topic0, + log.Topic1, + log.Topic2, + log.Topic3, + } } - }() - } - - if len(traces) > 0 { - wg.Add(1) - go func() { - defer wg.Done() - if err := c.insertTraces(traces, InsertOptions{ - AsDeleted: false, - }); err != nil { - saveErrMutex.Lock() - saveErr = fmt.Errorf("error inserting traces: %v", err) - saveErrMutex.Unlock() + + // Prepare traces array + traces := make([][]interface{}, len(blockData.Traces)) + for j, trace := range blockData.Traces { + traces[j] = []interface{}{ + trace.BlockNumber, + trace.BlockHash, + trace.BlockTimestamp, + trace.TransactionHash, + trace.TransactionIndex, + trace.Subtraces, + trace.TraceAddress, + trace.TraceType, + trace.CallType, + trace.Error, + trace.FromAddress, + trace.ToAddress, + trace.Gas.Uint64(), + trace.GasUsed.Uint64(), + trace.Input, + trace.Output, + trace.Value, + trace.Author, + trace.RewardType, + trace.RefundAddress, + } } - }() - } - wg.Wait() + sign := int8(1) + if block.Sign != 1 { + sign = block.Sign + } + insertTimestamp := time.Now() + if !block.InsertTimestamp.IsZero() { + insertTimestamp = block.InsertTimestamp + } + // Append the row to the batch + if err := batch.Append( + block.ChainId, + blockTuple, + transactions, + logs, + traces, + sign, + insertTimestamp, + ); err != nil { + return err + } + } - if saveErr != nil { - return saveErr + if err := batch.Send(); err != nil { + return err + } } + return nil } diff --git a/internal/storage/connector.go b/internal/storage/connector.go index 312429d..caeaa38 100644 --- a/internal/storage/connector.go +++ b/internal/storage/connector.go @@ -87,6 +87,7 @@ type IStagingStorage interface { type IMainStorage interface { InsertBlockData(data []common.BlockData) error + ReplaceBlockData(data []common.BlockData) ([]common.BlockData, error) GetBlocks(qf QueryFilter, fields ...string) (blocks QueryResult[common.Block], err error) GetTransactions(qf QueryFilter, fields ...string) (transactions QueryResult[common.Transaction], err error) @@ -98,7 +99,6 @@ type IMainStorage interface { * Get block headers ordered from latest to oldest. */ GetBlockHeadersDescending(chainId *big.Int, from *big.Int, to *big.Int) (blockHeaders []common.BlockHeader, err error) - DeleteBlockData(chainId *big.Int, blockNumbers []*big.Int) ([]common.BlockData, error) GetTokenBalances(qf BalancesQueryFilter, fields ...string) (QueryResult[common.TokenBalance], error) GetTokenTransfers(qf TransfersQueryFilter, fields ...string) (QueryResult[common.TokenTransfer], error) diff --git a/internal/tools/clickhouse_create_insert_mvs.sql b/internal/tools/clickhouse_create_insert_mvs.sql new file mode 100644 index 0000000..ebd8007 --- /dev/null +++ b/internal/tools/clickhouse_create_insert_mvs.sql @@ -0,0 +1,121 @@ +CREATE MATERIALIZED VIEW IF NOT EXISTS mv_blocks_inserts +TO blocks +AS +SELECT + chain_id, + block.1 AS block_number, + block.2 AS block_timestamp, + block.3 AS hash, + block.4 AS parent_hash, + block.5 AS sha3_uncles, + block.6 AS nonce, + block.7 AS mix_hash, + block.8 AS miner, + block.9 AS state_root, + block.10 AS transactions_root, + block.11 AS receipts_root, + block.12 AS logs_bloom, + block.13 AS size, + block.14 AS extra_data, + block.15 AS difficulty, + block.16 AS total_difficulty, + block.17 AS transaction_count, + block.18 AS gas_limit, + block.19 AS gas_used, + block.20 AS withdrawals_root, + block.21 AS base_fee_per_gas, + insert_timestamp, + sign +FROM inserts_null_table; + +CREATE MATERIALIZED VIEW IF NOT EXISTS mv_transactions_inserts +TO transactions +AS +SELECT + chain_id, + t.1 AS hash, + t.2 AS nonce, + t.3 AS block_hash, + t.4 AS block_number, + t.5 AS block_timestamp, + t.6 AS transaction_index, + t.7 AS from_address, + t.8 AS to_address, + t.9 AS value, + t.10 AS gas, + t.11 AS gas_price, + t.12 AS data, + t.13 AS function_selector, + t.14 AS max_fee_per_gas, + t.15 AS max_priority_fee_per_gas, + t.16 AS max_fee_per_blob_gas, + t.17 AS blob_versioned_hashes, + t.18 AS transaction_type, + t.19 AS r, + t.20 AS s, + t.21 AS v, + t.22 AS access_list, + t.23 AS contract_address, + t.24 AS gas_used, + t.25 AS cumulative_gas_used, + t.26 AS effective_gas_price, + t.27 AS blob_gas_used, + t.28 AS blob_gas_price, + t.29 AS logs_bloom, + t.30 AS status, + insert_timestamp, + sign +FROM inserts_null_table +ARRAY JOIN transactions AS t; + +CREATE MATERIALIZED VIEW IF NOT EXISTS mv_logs_inserts +TO logs +AS +SELECT + chain_id, + l.1 AS block_number, + l.2 AS block_hash, + l.3 AS block_timestamp, + l.4 AS transaction_hash, + l.5 AS transaction_index, + l.6 AS log_index, + l.7 AS address, + l.8 AS data, + l.9 AS topic_0, + l.10 AS topic_1, + l.11 AS topic_2, + l.12 AS topic_3, + insert_timestamp, + sign +FROM inserts_null_table +ARRAY JOIN logs AS l; + +CREATE MATERIALIZED VIEW IF NOT EXISTS mv_traces_inserts +TO traces +AS +SELECT + chain_id, + tr.1 AS block_number, + tr.2 AS block_hash, + tr.3 AS block_timestamp, + tr.4 AS transaction_hash, + tr.5 AS transaction_index, + tr.6 AS subtraces, + tr.7 AS trace_address, + tr.8 AS type, + tr.9 AS call_type, + tr.10 AS error, + tr.11 AS from_address, + tr.12 AS to_address, + tr.13 AS gas, + tr.14 AS gas_used, + tr.15 AS input, + tr.16 AS output, + tr.17 AS value, + tr.18 AS author, + tr.19 AS reward_type, + tr.20 AS refund_address, + insert_timestamp, + sign +FROM inserts_null_table +ARRAY JOIN traces AS tr; diff --git a/internal/tools/clickhouse_create_insert_null_table.sql b/internal/tools/clickhouse_create_insert_null_table.sql new file mode 100644 index 0000000..435af9f --- /dev/null +++ b/internal/tools/clickhouse_create_insert_null_table.sql @@ -0,0 +1,98 @@ +CREATE TABLE IF NOT EXISTS inserts_null_table ( + chain_id UInt256, + block Tuple( + block_number UInt256, + block_timestamp DateTime, + hash FixedString(66), + parent_hash FixedString(66), + sha3_uncles FixedString(66), + nonce FixedString(18), + mix_hash FixedString(66), + miner FixedString(42), + state_root FixedString(66), + transactions_root FixedString(66), + receipts_root FixedString(66), + logs_bloom String, + size UInt64, + extra_data String, + difficulty UInt256, + total_difficulty UInt256, + transaction_count UInt64, + gas_limit UInt256, + gas_used UInt256, + withdrawals_root FixedString(66), + base_fee_per_gas Nullable(UInt64) + ), + transactions Array(Tuple( + hash FixedString(66), + nonce UInt64, + block_hash FixedString(66), + block_number UInt256, + block_timestamp DateTime, + transaction_index UInt64, + from_address FixedString(42), + to_address FixedString(42), + value UInt256, + gas UInt64, + gas_price UInt256, + data String, + function_selector FixedString(10), + max_fee_per_gas UInt128, + max_priority_fee_per_gas UInt128, + max_fee_per_blob_gas UInt256, + blob_versioned_hashes Array(String), + transaction_type UInt8, + r UInt256, + s UInt256, + v UInt256, + access_list Nullable(String), + contract_address Nullable(FixedString(42)), + gas_used Nullable(UInt64), + cumulative_gas_used Nullable(UInt64), + effective_gas_price Nullable(UInt256), + blob_gas_used Nullable(UInt64), + blob_gas_price Nullable(UInt256), + logs_bloom Nullable(String), + status Nullable(UInt64) + )), + logs Array(Tuple( + block_number UInt256, + block_hash FixedString(66), + block_timestamp DateTime, + transaction_hash FixedString(66), + transaction_index UInt64, + log_index UInt64, + address FixedString(42), + data String, + topic_0 String, + topic_1 String, + topic_2 String, + topic_3 String + )), + traces Array(Tuple( + block_number UInt256, + block_hash FixedString(66), + block_timestamp DateTime, + transaction_hash FixedString(66), + transaction_index UInt64, + subtraces Int64, + trace_address Array(Int64), + type LowCardinality(String), + call_type LowCardinality(String), + error Nullable(String), + from_address FixedString(42), + to_address FixedString(42), + gas UInt64, + gas_used UInt64, + input String, + output Nullable(String), + value UInt256, + author Nullable(FixedString(42)), + reward_type LowCardinality(Nullable(String)), + refund_address Nullable(FixedString(42)) + )), + insert_timestamp DateTime DEFAULT now(), + sign Int8 DEFAULT 1 +) ENGINE = MergeTree +ORDER BY (chain_id, insert_timestamp) +PARTITION BY chain_id; diff --git a/test/mocks/MockIMainStorage.go b/test/mocks/MockIMainStorage.go index c82d7ff..eaaac80 100644 --- a/test/mocks/MockIMainStorage.go +++ b/test/mocks/MockIMainStorage.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.53.2. DO NOT EDIT. +// Code generated by mockery v2.50.4. DO NOT EDIT. //go:build !production @@ -26,65 +26,6 @@ func (_m *MockIMainStorage) EXPECT() *MockIMainStorage_Expecter { return &MockIMainStorage_Expecter{mock: &_m.Mock} } -// DeleteBlockData provides a mock function with given fields: chainId, blockNumbers -func (_m *MockIMainStorage) DeleteBlockData(chainId *big.Int, blockNumbers []*big.Int) ([]common.BlockData, error) { - ret := _m.Called(chainId, blockNumbers) - - if len(ret) == 0 { - panic("no return value specified for DeleteBlockData") - } - - var r0 []common.BlockData - var r1 error - if rf, ok := ret.Get(0).(func(*big.Int, []*big.Int) ([]common.BlockData, error)); ok { - return rf(chainId, blockNumbers) - } - if rf, ok := ret.Get(0).(func(*big.Int, []*big.Int) []common.BlockData); ok { - r0 = rf(chainId, blockNumbers) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).([]common.BlockData) - } - } - - if rf, ok := ret.Get(1).(func(*big.Int, []*big.Int) error); ok { - r1 = rf(chainId, blockNumbers) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// MockIMainStorage_DeleteBlockData_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DeleteBlockData' -type MockIMainStorage_DeleteBlockData_Call struct { - *mock.Call -} - -// DeleteBlockData is a helper method to define mock.On call -// - chainId *big.Int -// - blockNumbers []*big.Int -func (_e *MockIMainStorage_Expecter) DeleteBlockData(chainId interface{}, blockNumbers interface{}) *MockIMainStorage_DeleteBlockData_Call { - return &MockIMainStorage_DeleteBlockData_Call{Call: _e.mock.On("DeleteBlockData", chainId, blockNumbers)} -} - -func (_c *MockIMainStorage_DeleteBlockData_Call) Run(run func(chainId *big.Int, blockNumbers []*big.Int)) *MockIMainStorage_DeleteBlockData_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(*big.Int), args[1].([]*big.Int)) - }) - return _c -} - -func (_c *MockIMainStorage_DeleteBlockData_Call) Return(_a0 []common.BlockData, _a1 error) *MockIMainStorage_DeleteBlockData_Call { - _c.Call.Return(_a0, _a1) - return _c -} - -func (_c *MockIMainStorage_DeleteBlockData_Call) RunAndReturn(run func(*big.Int, []*big.Int) ([]common.BlockData, error)) *MockIMainStorage_DeleteBlockData_Call { - _c.Call.Return(run) - return _c -} - // GetAggregations provides a mock function with given fields: table, qf func (_m *MockIMainStorage) GetAggregations(table string, qf storage.QueryFilter) (storage.QueryResult[interface{}], error) { ret := _m.Called(table, qf) @@ -732,6 +673,64 @@ func (_c *MockIMainStorage_InsertBlockData_Call) RunAndReturn(run func([]common. return _c } +// ReplaceBlockData provides a mock function with given fields: data +func (_m *MockIMainStorage) ReplaceBlockData(data []common.BlockData) ([]common.BlockData, error) { + ret := _m.Called(data) + + if len(ret) == 0 { + panic("no return value specified for ReplaceBlockData") + } + + var r0 []common.BlockData + var r1 error + if rf, ok := ret.Get(0).(func([]common.BlockData) ([]common.BlockData, error)); ok { + return rf(data) + } + if rf, ok := ret.Get(0).(func([]common.BlockData) []common.BlockData); ok { + r0 = rf(data) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]common.BlockData) + } + } + + if rf, ok := ret.Get(1).(func([]common.BlockData) error); ok { + r1 = rf(data) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockIMainStorage_ReplaceBlockData_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ReplaceBlockData' +type MockIMainStorage_ReplaceBlockData_Call struct { + *mock.Call +} + +// ReplaceBlockData is a helper method to define mock.On call +// - data []common.BlockData +func (_e *MockIMainStorage_Expecter) ReplaceBlockData(data interface{}) *MockIMainStorage_ReplaceBlockData_Call { + return &MockIMainStorage_ReplaceBlockData_Call{Call: _e.mock.On("ReplaceBlockData", data)} +} + +func (_c *MockIMainStorage_ReplaceBlockData_Call) Run(run func(data []common.BlockData)) *MockIMainStorage_ReplaceBlockData_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].([]common.BlockData)) + }) + return _c +} + +func (_c *MockIMainStorage_ReplaceBlockData_Call) Return(_a0 []common.BlockData, _a1 error) *MockIMainStorage_ReplaceBlockData_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockIMainStorage_ReplaceBlockData_Call) RunAndReturn(run func([]common.BlockData) ([]common.BlockData, error)) *MockIMainStorage_ReplaceBlockData_Call { + _c.Call.Return(run) + return _c +} + // NewMockIMainStorage creates a new instance of MockIMainStorage. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewMockIMainStorage(t interface { diff --git a/test/mocks/MockIOrchestratorStorage.go b/test/mocks/MockIOrchestratorStorage.go index 97cf435..fe382f0 100644 --- a/test/mocks/MockIOrchestratorStorage.go +++ b/test/mocks/MockIOrchestratorStorage.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.53.2. DO NOT EDIT. +// Code generated by mockery v2.50.4. DO NOT EDIT. //go:build !production diff --git a/test/mocks/MockIRPCClient.go b/test/mocks/MockIRPCClient.go index 4737815..816b205 100644 --- a/test/mocks/MockIRPCClient.go +++ b/test/mocks/MockIRPCClient.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.53.2. DO NOT EDIT. +// Code generated by mockery v2.50.4. DO NOT EDIT. //go:build !production diff --git a/test/mocks/MockIStagingStorage.go b/test/mocks/MockIStagingStorage.go index 08a090b..090f8f2 100644 --- a/test/mocks/MockIStagingStorage.go +++ b/test/mocks/MockIStagingStorage.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.53.2. DO NOT EDIT. +// Code generated by mockery v2.50.4. DO NOT EDIT. //go:build !production