Skip to content

Commit 0f12a69

Browse files
committed
change main storage interface to handle inserts all at once
1 parent e4b9b29 commit 0f12a69

File tree

6 files changed

+159
-124
lines changed

6 files changed

+159
-124
lines changed

internal/orchestrator/committer.go

Lines changed: 23 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"fmt"
55
"math/big"
66
"sort"
7-
"sync"
87
"time"
98

109
"github.com/rs/zerolog/log"
@@ -57,7 +56,7 @@ func (c *Committer) Start() {
5756
log.Error().Err(err).Msg("Error getting block data to commit")
5857
continue
5958
}
60-
if len(blockDataToCommit) == 0 {
59+
if blockDataToCommit == nil || len(*blockDataToCommit) == 0 {
6160
log.Debug().Msg("No block data to commit")
6261
continue
6362
}
@@ -95,7 +94,7 @@ func (c *Committer) getBlockNumbersToCommit() ([]*big.Int, error) {
9594
return blockNumbers, nil
9695
}
9796

98-
func (c *Committer) getSequentialBlockDataToCommit() ([]common.BlockData, error) {
97+
func (c *Committer) getSequentialBlockDataToCommit() (*[]common.BlockData, error) {
9998
blocksToCommit, err := c.getBlockNumbersToCommit()
10099
if err != nil {
101100
return nil, fmt.Errorf("error determining blocks to commit: %v", err)
@@ -104,129 +103,65 @@ func (c *Committer) getSequentialBlockDataToCommit() ([]common.BlockData, error)
104103
return nil, nil
105104
}
106105

107-
blocksData, err := c.storage.StagingStorage.GetBlockData(storage.QueryFilter{BlockNumbers: blocksToCommit, ChainId: c.rpc.ChainID})
106+
blocksData, err := c.storage.StagingStorage.GetStagingData(storage.QueryFilter{BlockNumbers: blocksToCommit, ChainId: c.rpc.ChainID})
108107
if err != nil {
109108
return nil, fmt.Errorf("error fetching blocks to commit: %v", err)
110109
}
111-
if len(blocksData) == 0 {
110+
if blocksData == nil || len(*blocksData) == 0 {
112111
log.Warn().Msgf("Committer didn't find the following range in staging: %v - %v", blocksToCommit[0].Int64(), blocksToCommit[len(blocksToCommit)-1].Int64())
113112
return nil, nil
114113
}
115114

116115
// Sort blocks by block number
117-
sort.Slice(blocksData, func(i, j int) bool {
118-
return blocksData[i].Block.Number.Cmp(blocksData[j].Block.Number) < 0
116+
sort.Slice(*blocksData, func(i, j int) bool {
117+
return (*blocksData)[i].Block.Number.Cmp((*blocksData)[j].Block.Number) < 0
119118
})
120119

121-
if blocksData[0].Block.Number.Cmp(blocksToCommit[0]) != 0 {
122-
return nil, c.handleGap(blocksToCommit[0], blocksData[0].Block)
120+
if (*blocksData)[0].Block.Number.Cmp(blocksToCommit[0]) != 0 {
121+
return nil, c.handleGap(blocksToCommit[0], (*blocksData)[0].Block)
123122
}
124123

125124
var sequentialBlockData []common.BlockData
126-
sequentialBlockData = append(sequentialBlockData, blocksData[0])
127-
expectedBlockNumber := new(big.Int).Add(blocksData[0].Block.Number, big.NewInt(1))
125+
sequentialBlockData = append(sequentialBlockData, (*blocksData)[0])
126+
expectedBlockNumber := new(big.Int).Add((*blocksData)[0].Block.Number, big.NewInt(1))
128127

129-
for i := 1; i < len(blocksData); i++ {
130-
if blocksData[i].Block.Number.Cmp(expectedBlockNumber) != 0 {
128+
for i := 1; i < len(*blocksData); i++ {
129+
if (*blocksData)[i].Block.Number.Cmp(expectedBlockNumber) != 0 {
131130
// Note: Gap detected, stop here
132-
log.Warn().Msgf("Gap detected at block %s, committing until %s", expectedBlockNumber.String(), blocksData[i-1].Block.Number.String())
131+
log.Warn().Msgf("Gap detected at block %s, committing until %s", expectedBlockNumber.String(), (*blocksData)[i-1].Block.Number.String())
133132
// increment the a gap counter in prometheus
134133
metrics.GapCounter.Inc()
135134
// record the first missed block number in prometheus
136-
metrics.MissedBlockNumbers.Set(float64(blocksData[0].Block.Number.Int64()))
135+
metrics.MissedBlockNumbers.Set(float64((*blocksData)[0].Block.Number.Int64()))
137136
break
138137
}
139-
sequentialBlockData = append(sequentialBlockData, blocksData[i])
138+
sequentialBlockData = append(sequentialBlockData, (*blocksData)[i])
140139
expectedBlockNumber.Add(expectedBlockNumber, big.NewInt(1))
141140
}
142141

143-
return sequentialBlockData, nil
142+
return &sequentialBlockData, nil
144143
}
145144

146-
func (c *Committer) commit(blockData []common.BlockData) error {
147-
blockNumbers := make([]*big.Int, len(blockData))
148-
for i, block := range blockData {
145+
func (c *Committer) commit(blockData *[]common.BlockData) error {
146+
blockNumbers := make([]*big.Int, len(*blockData))
147+
for i, block := range *blockData {
149148
blockNumbers[i] = block.Block.Number
150149
}
151150
log.Debug().Msgf("Committing %d blocks", len(blockNumbers))
152151

153152
// TODO if next parts (saving or deleting) fail, we'll have to do a rollback
154-
if err := c.saveDataToMainStorage(blockData); err != nil {
153+
if err := c.storage.MainStorage.InsertBlockData(blockData); err != nil {
155154
log.Error().Err(err).Msgf("Failed to commit blocks: %v", blockNumbers)
156155
return fmt.Errorf("error saving data to main storage: %v", err)
157156
}
158157

159-
if err := c.storage.StagingStorage.DeleteBlockData(blockData); err != nil {
158+
if err := c.storage.StagingStorage.DeleteStagingData(blockData); err != nil {
160159
return fmt.Errorf("error deleting data from staging storage: %v", err)
161160
}
162161

163162
// Update metrics for successful commits
164-
metrics.SuccessfulCommits.Add(float64(len(blockData)))
165-
metrics.LastCommittedBlock.Set(float64(blockData[len(blockData)-1].Block.Number.Int64()))
166-
167-
return nil
168-
}
169-
170-
func (c *Committer) saveDataToMainStorage(blockData []common.BlockData) error {
171-
var commitWg sync.WaitGroup
172-
commitWg.Add(4)
173-
174-
var commitErr error
175-
var commitErrMutex sync.Mutex
176-
177-
blocks := make([]common.Block, 0, len(blockData))
178-
logs := make([]common.Log, 0)
179-
transactions := make([]common.Transaction, 0)
180-
traces := make([]common.Trace, 0)
181-
182-
for _, block := range blockData {
183-
blocks = append(blocks, block.Block)
184-
logs = append(logs, block.Logs...)
185-
transactions = append(transactions, block.Transactions...)
186-
traces = append(traces, block.Traces...)
187-
}
188-
189-
go func() {
190-
defer commitWg.Done()
191-
if err := c.storage.MainStorage.InsertBlocks(blocks); err != nil {
192-
commitErrMutex.Lock()
193-
commitErr = fmt.Errorf("error inserting blocks: %v", err)
194-
commitErrMutex.Unlock()
195-
}
196-
}()
197-
198-
go func() {
199-
defer commitWg.Done()
200-
if err := c.storage.MainStorage.InsertLogs(logs); err != nil {
201-
commitErrMutex.Lock()
202-
commitErr = fmt.Errorf("error inserting logs: %v", err)
203-
commitErrMutex.Unlock()
204-
}
205-
}()
206-
207-
go func() {
208-
defer commitWg.Done()
209-
if err := c.storage.MainStorage.InsertTransactions(transactions); err != nil {
210-
commitErrMutex.Lock()
211-
commitErr = fmt.Errorf("error inserting transactions: %v", err)
212-
commitErrMutex.Unlock()
213-
}
214-
}()
215-
216-
go func() {
217-
defer commitWg.Done()
218-
if err := c.storage.MainStorage.InsertTraces(traces); err != nil {
219-
commitErrMutex.Lock()
220-
commitErr = fmt.Errorf("error inserting traces: %v", err)
221-
commitErrMutex.Unlock()
222-
}
223-
}()
224-
225-
commitWg.Wait()
226-
227-
if commitErr != nil {
228-
return commitErr
229-
}
163+
metrics.SuccessfulCommits.Add(float64(len(*blockData)))
164+
metrics.LastCommittedBlock.Set(float64((*blockData)[len(*blockData)-1].Block.Number.Int64()))
230165

231166
return nil
232167
}

internal/orchestrator/failure_recoverer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ func (fr *FailureRecoverer) handleWorkerResults(blockFailures []common.BlockFail
114114
failuresToDelete = append(failuresToDelete, blockFailureForBlock)
115115
}
116116
}
117-
if err := fr.storage.StagingStorage.InsertBlockData(successfulResults); err != nil {
117+
if err := fr.storage.StagingStorage.InsertStagingData(successfulResults); err != nil {
118118
log.Error().Err(fmt.Errorf("error inserting block data in failure recoverer: %v", err))
119119
return
120120
}

internal/orchestrator/poller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ func (p *Poller) handleWorkerResults(results []rpc.GetFullBlockResult) {
192192
Traces: result.Data.Traces,
193193
})
194194
}
195-
if err := p.storage.StagingStorage.InsertBlockData(blockData); err != nil {
195+
if err := p.storage.StagingStorage.InsertStagingData(blockData); err != nil {
196196
e := fmt.Errorf("error inserting block data: %v", err)
197197
log.Error().Err(e)
198198
for _, result := range successfulResults {

internal/storage/clickhouse.go

Lines changed: 88 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"fmt"
88
"math/big"
99
"strings"
10+
"sync"
1011
"time"
1112

1213
"github.com/ClickHouse/clickhouse-go/v2"
@@ -60,7 +61,7 @@ func connectDB(cfg *config.ClickhouseConfig) (clickhouse.Conn, error) {
6061
return conn, nil
6162
}
6263

63-
func (c *ClickHouseConnector) InsertBlocks(blocks []common.Block) error {
64+
func (c *ClickHouseConnector) insertBlocks(blocks *[]common.Block) error {
6465
query := `
6566
INSERT INTO ` + c.cfg.Database + `.blocks (
6667
chain_id, number, timestamp, hash, parent_hash, sha3_uncles, nonce,
@@ -73,7 +74,7 @@ func (c *ClickHouseConnector) InsertBlocks(blocks []common.Block) error {
7374
if err != nil {
7475
return err
7576
}
76-
for _, block := range blocks {
77+
for _, block := range *blocks {
7778
err := batch.Append(
7879
block.ChainId,
7980
block.Number,
@@ -105,7 +106,7 @@ func (c *ClickHouseConnector) InsertBlocks(blocks []common.Block) error {
105106
return batch.Send()
106107
}
107108

108-
func (c *ClickHouseConnector) InsertTransactions(txs []common.Transaction) error {
109+
func (c *ClickHouseConnector) insertTransactions(txs *[]common.Transaction) error {
109110
query := `
110111
INSERT INTO ` + c.cfg.Database + `.transactions (
111112
chain_id, hash, nonce, block_hash, block_number, block_timestamp, transaction_index,
@@ -117,7 +118,7 @@ func (c *ClickHouseConnector) InsertTransactions(txs []common.Transaction) error
117118
if err != nil {
118119
return err
119120
}
120-
for _, tx := range txs {
121+
for _, tx := range *txs {
121122
err := batch.Append(
122123
tx.ChainId,
123124
tx.Hash,
@@ -147,7 +148,7 @@ func (c *ClickHouseConnector) InsertTransactions(txs []common.Transaction) error
147148
return batch.Send()
148149
}
149150

150-
func (c *ClickHouseConnector) InsertLogs(logs []common.Log) error {
151+
func (c *ClickHouseConnector) insertLogs(logs *[]common.Log) error {
151152
query := `
152153
INSERT INTO ` + c.cfg.Database + `.logs (
153154
chain_id, block_number, block_hash, block_timestamp, transaction_hash, transaction_index,
@@ -158,7 +159,7 @@ func (c *ClickHouseConnector) InsertLogs(logs []common.Log) error {
158159
if err != nil {
159160
return err
160161
}
161-
for _, log := range logs {
162+
for _, log := range *logs {
162163
err := batch.Append(
163164
log.ChainId,
164165
log.BlockNumber,
@@ -585,7 +586,7 @@ func getBlockNumbersStringArray(blockNumbers []*big.Int) string {
585586
return blockNumbersString
586587
}
587588

588-
func (c *ClickHouseConnector) InsertBlockData(data []common.BlockData) error {
589+
func (c *ClickHouseConnector) InsertStagingData(data []common.BlockData) error {
589590
query := `INSERT INTO ` + c.cfg.Database + `.block_data (chain_id, block_number, data)`
590591
batch, err := c.conn.PrepareBatch(context.Background(), query)
591592
if err != nil {
@@ -608,7 +609,7 @@ func (c *ClickHouseConnector) InsertBlockData(data []common.BlockData) error {
608609
return batch.Send()
609610
}
610611

611-
func (c *ClickHouseConnector) GetBlockData(qf QueryFilter) (blockDataList []common.BlockData, err error) {
612+
func (c *ClickHouseConnector) GetStagingData(qf QueryFilter) (blockDataList *[]common.BlockData, err error) {
612613
query := fmt.Sprintf("SELECT data FROM %s.block_data FINAL WHERE block_number IN (%s) AND is_deleted = 0",
613614
c.cfg.Database, getBlockNumbersStringArray(qf.BlockNumbers))
614615

@@ -638,12 +639,12 @@ func (c *ClickHouseConnector) GetBlockData(qf QueryFilter) (blockDataList []comm
638639
if err != nil {
639640
return nil, err
640641
}
641-
blockDataList = append(blockDataList, blockData)
642+
*blockDataList = append(*blockDataList, blockData)
642643
}
643644
return blockDataList, nil
644645
}
645646

646-
func (c *ClickHouseConnector) DeleteBlockData(data []common.BlockData) error {
647+
func (c *ClickHouseConnector) DeleteStagingData(data *[]common.BlockData) error {
647648
query := fmt.Sprintf(`
648649
INSERT INTO %s.block_data (
649650
chain_id, block_number, is_deleted
@@ -655,7 +656,7 @@ func (c *ClickHouseConnector) DeleteBlockData(data []common.BlockData) error {
655656
return err
656657
}
657658

658-
for _, blockData := range data {
659+
for _, blockData := range *data {
659660
err := batch.Append(
660661
blockData.Block.ChainId,
661662
blockData.Block.Number,
@@ -668,7 +669,7 @@ func (c *ClickHouseConnector) DeleteBlockData(data []common.BlockData) error {
668669
return batch.Send()
669670
}
670671

671-
func (c *ClickHouseConnector) InsertTraces(traces []common.Trace) error {
672+
func (c *ClickHouseConnector) insertTraces(traces *[]common.Trace) error {
672673
query := `
673674
INSERT INTO ` + c.cfg.Database + `.traces (
674675
chain_id, block_number, block_hash, block_timestamp, transaction_hash, transaction_index,
@@ -680,7 +681,7 @@ func (c *ClickHouseConnector) InsertTraces(traces []common.Trace) error {
680681
if err != nil {
681682
return err
682683
}
683-
for _, trace := range traces {
684+
for _, trace := range *traces {
684685
err = batch.Append(
685686
trace.ChainID,
686687
trace.BlockNumber,
@@ -761,3 +762,77 @@ func (c *ClickHouseConnector) GetTraces(qf QueryFilter) (traces []common.Trace,
761762
}
762763
return traces, nil
763764
}
765+
766+
// TODO make this atomic
767+
func (c *ClickHouseConnector) InsertBlockData(data *[]common.BlockData) error {
768+
blocks := make([]common.Block, 0, len(*data))
769+
logs := make([]common.Log, 0)
770+
transactions := make([]common.Transaction, 0)
771+
traces := make([]common.Trace, 0)
772+
773+
for _, blockData := range *data {
774+
blocks = append(blocks, blockData.Block)
775+
logs = append(logs, blockData.Logs...)
776+
transactions = append(transactions, blockData.Transactions...)
777+
traces = append(traces, blockData.Traces...)
778+
}
779+
780+
var saveErr error
781+
var saveErrMutex sync.Mutex
782+
var wg sync.WaitGroup
783+
784+
if len(blocks) > 0 {
785+
wg.Add(1)
786+
go func() {
787+
defer wg.Done()
788+
if err := c.insertBlocks(&blocks); err != nil {
789+
saveErrMutex.Lock()
790+
saveErr = fmt.Errorf("error deleting blocks: %v", err)
791+
saveErrMutex.Unlock()
792+
}
793+
}()
794+
}
795+
796+
if len(logs) > 0 {
797+
wg.Add(1)
798+
go func() {
799+
defer wg.Done()
800+
if err := c.insertLogs(&logs); err != nil {
801+
saveErrMutex.Lock()
802+
saveErr = fmt.Errorf("error deleting logs: %v", err)
803+
saveErrMutex.Unlock()
804+
}
805+
}()
806+
}
807+
808+
if len(transactions) > 0 {
809+
wg.Add(1)
810+
go func() {
811+
defer wg.Done()
812+
if err := c.insertTransactions(&transactions); err != nil {
813+
saveErrMutex.Lock()
814+
saveErr = fmt.Errorf("error deleting transactions: %v", err)
815+
saveErrMutex.Unlock()
816+
}
817+
}()
818+
}
819+
820+
if len(traces) > 0 {
821+
wg.Add(1)
822+
go func() {
823+
defer wg.Done()
824+
if err := c.insertTraces(&traces); err != nil {
825+
saveErrMutex.Lock()
826+
saveErr = fmt.Errorf("error deleting traces: %v", err)
827+
saveErrMutex.Unlock()
828+
}
829+
}()
830+
}
831+
832+
wg.Wait()
833+
834+
if saveErr != nil {
835+
return saveErr
836+
}
837+
return nil
838+
}

0 commit comments

Comments
 (0)