Skip to content

Commit d0961cb

Browse files
committed
change main storage interface to handle inserts all at once
1 parent 8663df7 commit d0961cb

File tree

4 files changed

+152
-117
lines changed

4 files changed

+152
-117
lines changed

internal/orchestrator/committer.go

Lines changed: 21 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"fmt"
55
"math/big"
66
"sort"
7-
"sync"
87
"time"
98

109
"github.com/rs/zerolog/log"
@@ -57,7 +56,7 @@ func (c *Committer) Start() {
5756
log.Error().Err(err).Msg("Error getting block data to commit")
5857
continue
5958
}
60-
if len(blockDataToCommit) == 0 {
59+
if len(*blockDataToCommit) == 0 {
6160
log.Debug().Msg("No block data to commit")
6261
continue
6362
}
@@ -95,7 +94,7 @@ func (c *Committer) getBlockNumbersToCommit() ([]*big.Int, error) {
9594
return blockNumbers, nil
9695
}
9796

98-
func (c *Committer) getSequentialBlockDataToCommit() ([]common.BlockData, error) {
97+
func (c *Committer) getSequentialBlockDataToCommit() (*[]common.BlockData, error) {
9998
blocksToCommit, err := c.getBlockNumbersToCommit()
10099
if err != nil {
101100
return nil, fmt.Errorf("error determining blocks to commit: %v", err)
@@ -108,50 +107,50 @@ func (c *Committer) getSequentialBlockDataToCommit() ([]common.BlockData, error)
108107
if err != nil {
109108
return nil, fmt.Errorf("error fetching blocks to commit: %v", err)
110109
}
111-
if len(blocksData) == 0 {
110+
if len(*blocksData) == 0 {
112111
log.Warn().Msgf("Committer didn't find the following range in staging: %v - %v", blocksToCommit[0].Int64(), blocksToCommit[len(blocksToCommit)-1].Int64())
113112
return nil, nil
114113
}
115114

116115
// Sort blocks by block number
117-
sort.Slice(blocksData, func(i, j int) bool {
118-
return blocksData[i].Block.Number.Cmp(blocksData[j].Block.Number) < 0
116+
sort.Slice(*blocksData, func(i, j int) bool {
117+
return (*blocksData)[i].Block.Number.Cmp((*blocksData)[j].Block.Number) < 0
119118
})
120119

121-
if blocksData[0].Block.Number.Cmp(blocksToCommit[0]) != 0 {
122-
return nil, c.handleGap(blocksToCommit[0], blocksData[0].Block)
120+
if (*blocksData)[0].Block.Number.Cmp(blocksToCommit[0]) != 0 {
121+
return nil, c.handleGap(blocksToCommit[0], (*blocksData)[0].Block)
123122
}
124123

125124
var sequentialBlockData []common.BlockData
126-
sequentialBlockData = append(sequentialBlockData, blocksData[0])
127-
expectedBlockNumber := new(big.Int).Add(blocksData[0].Block.Number, big.NewInt(1))
125+
sequentialBlockData = append(sequentialBlockData, (*blocksData)[0])
126+
expectedBlockNumber := new(big.Int).Add((*blocksData)[0].Block.Number, big.NewInt(1))
128127

129-
for i := 1; i < len(blocksData); i++ {
130-
if blocksData[i].Block.Number.Cmp(expectedBlockNumber) != 0 {
128+
for i := 1; i < len(*blocksData); i++ {
129+
if (*blocksData)[i].Block.Number.Cmp(expectedBlockNumber) != 0 {
131130
// Note: Gap detected, stop here
132-
log.Warn().Msgf("Gap detected at block %s, committing until %s", expectedBlockNumber.String(), blocksData[i-1].Block.Number.String())
131+
log.Warn().Msgf("Gap detected at block %s, committing until %s", expectedBlockNumber.String(), (*blocksData)[i-1].Block.Number.String())
133132
// increment the a gap counter in prometheus
134133
metrics.GapCounter.Inc()
135134
// record the first missed block number in prometheus
136-
metrics.MissedBlockNumbers.Set(float64(blocksData[0].Block.Number.Int64()))
135+
metrics.MissedBlockNumbers.Set(float64((*blocksData)[0].Block.Number.Int64()))
137136
break
138137
}
139-
sequentialBlockData = append(sequentialBlockData, blocksData[i])
138+
sequentialBlockData = append(sequentialBlockData, (*blocksData)[i])
140139
expectedBlockNumber.Add(expectedBlockNumber, big.NewInt(1))
141140
}
142141

143-
return sequentialBlockData, nil
142+
return &sequentialBlockData, nil
144143
}
145144

146-
func (c *Committer) commit(blockData []common.BlockData) error {
147-
blockNumbers := make([]*big.Int, len(blockData))
148-
for i, block := range blockData {
145+
func (c *Committer) commit(blockData *[]common.BlockData) error {
146+
blockNumbers := make([]*big.Int, len(*blockData))
147+
for i, block := range *blockData {
149148
blockNumbers[i] = block.Block.Number
150149
}
151150
log.Debug().Msgf("Committing %d blocks", len(blockNumbers))
152151

153152
// TODO if next parts (saving or deleting) fail, we'll have to do a rollback
154-
if err := c.saveDataToMainStorage(blockData); err != nil {
153+
if err := c.storage.MainStorage.InsertDataForBlocks(blockData); err != nil {
155154
log.Error().Err(err).Msgf("Failed to commit blocks: %v", blockNumbers)
156155
return fmt.Errorf("error saving data to main storage: %v", err)
157156
}
@@ -161,72 +160,8 @@ func (c *Committer) commit(blockData []common.BlockData) error {
161160
}
162161

163162
// Update metrics for successful commits
164-
metrics.SuccessfulCommits.Add(float64(len(blockData)))
165-
metrics.LastCommittedBlock.Set(float64(blockData[len(blockData)-1].Block.Number.Int64()))
166-
167-
return nil
168-
}
169-
170-
func (c *Committer) saveDataToMainStorage(blockData []common.BlockData) error {
171-
var commitWg sync.WaitGroup
172-
commitWg.Add(4)
173-
174-
var commitErr error
175-
var commitErrMutex sync.Mutex
176-
177-
blocks := make([]common.Block, 0, len(blockData))
178-
logs := make([]common.Log, 0)
179-
transactions := make([]common.Transaction, 0)
180-
traces := make([]common.Trace, 0)
181-
182-
for _, block := range blockData {
183-
blocks = append(blocks, block.Block)
184-
logs = append(logs, block.Logs...)
185-
transactions = append(transactions, block.Transactions...)
186-
traces = append(traces, block.Traces...)
187-
}
188-
189-
go func() {
190-
defer commitWg.Done()
191-
if err := c.storage.MainStorage.InsertBlocks(blocks); err != nil {
192-
commitErrMutex.Lock()
193-
commitErr = fmt.Errorf("error inserting blocks: %v", err)
194-
commitErrMutex.Unlock()
195-
}
196-
}()
197-
198-
go func() {
199-
defer commitWg.Done()
200-
if err := c.storage.MainStorage.InsertLogs(logs); err != nil {
201-
commitErrMutex.Lock()
202-
commitErr = fmt.Errorf("error inserting logs: %v", err)
203-
commitErrMutex.Unlock()
204-
}
205-
}()
206-
207-
go func() {
208-
defer commitWg.Done()
209-
if err := c.storage.MainStorage.InsertTransactions(transactions); err != nil {
210-
commitErrMutex.Lock()
211-
commitErr = fmt.Errorf("error inserting transactions: %v", err)
212-
commitErrMutex.Unlock()
213-
}
214-
}()
215-
216-
go func() {
217-
defer commitWg.Done()
218-
if err := c.storage.MainStorage.InsertTraces(traces); err != nil {
219-
commitErrMutex.Lock()
220-
commitErr = fmt.Errorf("error inserting traces: %v", err)
221-
commitErrMutex.Unlock()
222-
}
223-
}()
224-
225-
commitWg.Wait()
226-
227-
if commitErr != nil {
228-
return commitErr
229-
}
163+
metrics.SuccessfulCommits.Add(float64(len(*blockData)))
164+
metrics.LastCommittedBlock.Set(float64((*blockData)[len(*blockData)-1].Block.Number.Int64()))
230165

231166
return nil
232167
}

internal/storage/clickhouse.go

Lines changed: 87 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"fmt"
88
"math/big"
99
"strings"
10+
"sync"
1011
"time"
1112

1213
"github.com/ClickHouse/clickhouse-go/v2"
@@ -55,7 +56,7 @@ func connectDB(cfg *config.ClickhouseConfig) (clickhouse.Conn, error) {
5556
return conn, nil
5657
}
5758

58-
func (c *ClickHouseConnector) InsertBlocks(blocks []common.Block) error {
59+
func (c *ClickHouseConnector) insertBlocks(blocks *[]common.Block) error {
5960
query := `
6061
INSERT INTO ` + c.cfg.Database + `.blocks (
6162
chain_id, number, timestamp, hash, parent_hash, sha3_uncles, nonce,
@@ -68,7 +69,7 @@ func (c *ClickHouseConnector) InsertBlocks(blocks []common.Block) error {
6869
if err != nil {
6970
return err
7071
}
71-
for _, block := range blocks {
72+
for _, block := range *blocks {
7273
err := batch.Append(
7374
block.ChainId,
7475
block.Number,
@@ -100,7 +101,7 @@ func (c *ClickHouseConnector) InsertBlocks(blocks []common.Block) error {
100101
return batch.Send()
101102
}
102103

103-
func (c *ClickHouseConnector) InsertTransactions(txs []common.Transaction) error {
104+
func (c *ClickHouseConnector) insertTransactions(txs *[]common.Transaction) error {
104105
query := `
105106
INSERT INTO ` + c.cfg.Database + `.transactions (
106107
chain_id, hash, nonce, block_hash, block_number, block_timestamp, transaction_index,
@@ -112,7 +113,7 @@ func (c *ClickHouseConnector) InsertTransactions(txs []common.Transaction) error
112113
if err != nil {
113114
return err
114115
}
115-
for _, tx := range txs {
116+
for _, tx := range *txs {
116117
err := batch.Append(
117118
tx.ChainId,
118119
tx.Hash,
@@ -142,7 +143,7 @@ func (c *ClickHouseConnector) InsertTransactions(txs []common.Transaction) error
142143
return batch.Send()
143144
}
144145

145-
func (c *ClickHouseConnector) InsertLogs(logs []common.Log) error {
146+
func (c *ClickHouseConnector) insertLogs(logs *[]common.Log) error {
146147
query := `
147148
INSERT INTO ` + c.cfg.Database + `.logs (
148149
chain_id, block_number, block_hash, block_timestamp, transaction_hash, transaction_index,
@@ -153,7 +154,7 @@ func (c *ClickHouseConnector) InsertLogs(logs []common.Log) error {
153154
if err != nil {
154155
return err
155156
}
156-
for _, log := range logs {
157+
for _, log := range *logs {
157158
err := batch.Append(
158159
log.ChainId,
159160
log.BlockNumber,
@@ -603,7 +604,7 @@ func (c *ClickHouseConnector) InsertBlockData(data []common.BlockData) error {
603604
return batch.Send()
604605
}
605606

606-
func (c *ClickHouseConnector) GetBlockData(qf QueryFilter) (blockDataList []common.BlockData, err error) {
607+
func (c *ClickHouseConnector) GetBlockData(qf QueryFilter) (blockDataList *[]common.BlockData, err error) {
607608
query := fmt.Sprintf("SELECT data FROM %s.block_data FINAL WHERE block_number IN (%s) AND is_deleted = 0",
608609
c.cfg.Database, getBlockNumbersStringArray(qf.BlockNumbers))
609610

@@ -633,12 +634,12 @@ func (c *ClickHouseConnector) GetBlockData(qf QueryFilter) (blockDataList []comm
633634
if err != nil {
634635
return nil, err
635636
}
636-
blockDataList = append(blockDataList, blockData)
637+
*blockDataList = append(*blockDataList, blockData)
637638
}
638639
return blockDataList, nil
639640
}
640641

641-
func (c *ClickHouseConnector) DeleteBlockData(data []common.BlockData) error {
642+
func (c *ClickHouseConnector) DeleteBlockData(data *[]common.BlockData) error {
642643
query := fmt.Sprintf(`
643644
INSERT INTO %s.block_data (
644645
chain_id, block_number, is_deleted
@@ -650,7 +651,7 @@ func (c *ClickHouseConnector) DeleteBlockData(data []common.BlockData) error {
650651
return err
651652
}
652653

653-
for _, blockData := range data {
654+
for _, blockData := range *data {
654655
err := batch.Append(
655656
blockData.Block.ChainId,
656657
blockData.Block.Number,
@@ -663,7 +664,7 @@ func (c *ClickHouseConnector) DeleteBlockData(data []common.BlockData) error {
663664
return batch.Send()
664665
}
665666

666-
func (c *ClickHouseConnector) InsertTraces(traces []common.Trace) error {
667+
func (c *ClickHouseConnector) insertTraces(traces *[]common.Trace) error {
667668
query := `
668669
INSERT INTO ` + c.cfg.Database + `.traces (
669670
chain_id, block_number, block_hash, block_timestamp, transaction_hash, transaction_index,
@@ -675,7 +676,7 @@ func (c *ClickHouseConnector) InsertTraces(traces []common.Trace) error {
675676
if err != nil {
676677
return err
677678
}
678-
for _, trace := range traces {
679+
for _, trace := range *traces {
679680
err = batch.Append(
680681
trace.ChainID,
681682
trace.BlockNumber,
@@ -756,3 +757,77 @@ func (c *ClickHouseConnector) GetTraces(qf QueryFilter) (traces []common.Trace,
756757
}
757758
return traces, nil
758759
}
760+
761+
// TODO make this atomic
762+
func (c *ClickHouseConnector) InsertDataForBlocks(data *[]common.BlockData) error {
763+
blocks := make([]common.Block, 0, len(*data))
764+
logs := make([]common.Log, 0)
765+
transactions := make([]common.Transaction, 0)
766+
traces := make([]common.Trace, 0)
767+
768+
for _, blockData := range *data {
769+
blocks = append(blocks, blockData.Block)
770+
logs = append(logs, blockData.Logs...)
771+
transactions = append(transactions, blockData.Transactions...)
772+
traces = append(traces, blockData.Traces...)
773+
}
774+
775+
var saveErr error
776+
var saveErrMutex sync.Mutex
777+
var wg sync.WaitGroup
778+
779+
if len(blocks) > 0 {
780+
wg.Add(1)
781+
go func() {
782+
defer wg.Done()
783+
if err := c.insertBlocks(&blocks); err != nil {
784+
saveErrMutex.Lock()
785+
saveErr = fmt.Errorf("error deleting blocks: %v", err)
786+
saveErrMutex.Unlock()
787+
}
788+
}()
789+
}
790+
791+
if len(logs) > 0 {
792+
wg.Add(1)
793+
go func() {
794+
defer wg.Done()
795+
if err := c.insertLogs(&logs); err != nil {
796+
saveErrMutex.Lock()
797+
saveErr = fmt.Errorf("error deleting logs: %v", err)
798+
saveErrMutex.Unlock()
799+
}
800+
}()
801+
}
802+
803+
if len(transactions) > 0 {
804+
wg.Add(1)
805+
go func() {
806+
defer wg.Done()
807+
if err := c.insertTransactions(&transactions); err != nil {
808+
saveErrMutex.Lock()
809+
saveErr = fmt.Errorf("error deleting transactions: %v", err)
810+
saveErrMutex.Unlock()
811+
}
812+
}()
813+
}
814+
815+
if len(traces) > 0 {
816+
wg.Add(1)
817+
go func() {
818+
defer wg.Done()
819+
if err := c.insertTraces(&traces); err != nil {
820+
saveErrMutex.Lock()
821+
saveErr = fmt.Errorf("error deleting traces: %v", err)
822+
saveErrMutex.Unlock()
823+
}
824+
}()
825+
}
826+
827+
wg.Wait()
828+
829+
if saveErr != nil {
830+
return saveErr
831+
}
832+
return nil
833+
}

internal/storage/connector.go

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -42,18 +42,15 @@ type IOrchestratorStorage interface {
4242

4343
type IStagingStorage interface {
4444
InsertBlockData(data []common.BlockData) error
45-
GetBlockData(qf QueryFilter) (data []common.BlockData, err error)
46-
DeleteBlockData(data []common.BlockData) error
45+
GetBlockData(qf QueryFilter) (data *[]common.BlockData, err error)
46+
DeleteBlockData(data *[]common.BlockData) error
4747
GetLastStagedBlockNumber(chainId *big.Int, rangeEnd *big.Int) (maxBlockNumber *big.Int, err error)
4848
}
4949

5050
type IMainStorage interface {
51-
InsertBlocks(blocks []common.Block) error
52-
InsertTransactions(txs []common.Transaction) error
53-
InsertLogs(logs []common.Log) error
54-
InsertTraces(traces []common.Trace) error
51+
InsertDataForBlocks(data *[]common.BlockData) error
5552

56-
GetBlocks(qf QueryFilter) (logs []common.Block, err error)
53+
GetBlocks(qf QueryFilter) (blocks []common.Block, err error)
5754
GetTransactions(qf QueryFilter) (transactions QueryResult[common.Transaction], err error)
5855
GetLogs(qf QueryFilter) (logs QueryResult[common.Log], err error)
5956
GetTraces(qf QueryFilter) (traces []common.Trace, err error)

0 commit comments

Comments
 (0)