Skip to content

Commit d06c5d1

Browse files
committed
change main storage interface to handle inserts all at once
1 parent 8663df7 commit d06c5d1

File tree

4 files changed

+114
-79
lines changed

4 files changed

+114
-79
lines changed

internal/orchestrator/committer.go

Lines changed: 1 addition & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"fmt"
55
"math/big"
66
"sort"
7-
"sync"
87
"time"
98

109
"github.com/rs/zerolog/log"
@@ -151,7 +150,7 @@ func (c *Committer) commit(blockData []common.BlockData) error {
151150
log.Debug().Msgf("Committing %d blocks", len(blockNumbers))
152151

153152
// TODO if next parts (saving or deleting) fail, we'll have to do a rollback
154-
if err := c.saveDataToMainStorage(blockData); err != nil {
153+
if err := c.storage.MainStorage.InsertDataForBlocks(blockData); err != nil {
155154
log.Error().Err(err).Msgf("Failed to commit blocks: %v", blockNumbers)
156155
return fmt.Errorf("error saving data to main storage: %v", err)
157156
}
@@ -167,70 +166,6 @@ func (c *Committer) commit(blockData []common.BlockData) error {
167166
return nil
168167
}
169168

170-
func (c *Committer) saveDataToMainStorage(blockData []common.BlockData) error {
171-
var commitWg sync.WaitGroup
172-
commitWg.Add(4)
173-
174-
var commitErr error
175-
var commitErrMutex sync.Mutex
176-
177-
blocks := make([]common.Block, 0, len(blockData))
178-
logs := make([]common.Log, 0)
179-
transactions := make([]common.Transaction, 0)
180-
traces := make([]common.Trace, 0)
181-
182-
for _, block := range blockData {
183-
blocks = append(blocks, block.Block)
184-
logs = append(logs, block.Logs...)
185-
transactions = append(transactions, block.Transactions...)
186-
traces = append(traces, block.Traces...)
187-
}
188-
189-
go func() {
190-
defer commitWg.Done()
191-
if err := c.storage.MainStorage.InsertBlocks(blocks); err != nil {
192-
commitErrMutex.Lock()
193-
commitErr = fmt.Errorf("error inserting blocks: %v", err)
194-
commitErrMutex.Unlock()
195-
}
196-
}()
197-
198-
go func() {
199-
defer commitWg.Done()
200-
if err := c.storage.MainStorage.InsertLogs(logs); err != nil {
201-
commitErrMutex.Lock()
202-
commitErr = fmt.Errorf("error inserting logs: %v", err)
203-
commitErrMutex.Unlock()
204-
}
205-
}()
206-
207-
go func() {
208-
defer commitWg.Done()
209-
if err := c.storage.MainStorage.InsertTransactions(transactions); err != nil {
210-
commitErrMutex.Lock()
211-
commitErr = fmt.Errorf("error inserting transactions: %v", err)
212-
commitErrMutex.Unlock()
213-
}
214-
}()
215-
216-
go func() {
217-
defer commitWg.Done()
218-
if err := c.storage.MainStorage.InsertTraces(traces); err != nil {
219-
commitErrMutex.Lock()
220-
commitErr = fmt.Errorf("error inserting traces: %v", err)
221-
commitErrMutex.Unlock()
222-
}
223-
}()
224-
225-
commitWg.Wait()
226-
227-
if commitErr != nil {
228-
return commitErr
229-
}
230-
231-
return nil
232-
}
233-
234169
func (c *Committer) handleGap(expectedStartBlockNumber *big.Int, actualFirstBlock common.Block) error {
235170
// increment the a gap counter in prometheus
236171
metrics.GapCounter.Inc()

internal/storage/clickhouse.go

Lines changed: 79 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"fmt"
88
"math/big"
99
"strings"
10+
"sync"
1011
"time"
1112

1213
"github.com/ClickHouse/clickhouse-go/v2"
@@ -55,7 +56,7 @@ func connectDB(cfg *config.ClickhouseConfig) (clickhouse.Conn, error) {
5556
return conn, nil
5657
}
5758

58-
func (c *ClickHouseConnector) InsertBlocks(blocks []common.Block) error {
59+
func (c *ClickHouseConnector) insertBlocks(blocks []common.Block) error {
5960
query := `
6061
INSERT INTO ` + c.cfg.Database + `.blocks (
6162
chain_id, number, timestamp, hash, parent_hash, sha3_uncles, nonce,
@@ -100,7 +101,7 @@ func (c *ClickHouseConnector) InsertBlocks(blocks []common.Block) error {
100101
return batch.Send()
101102
}
102103

103-
func (c *ClickHouseConnector) InsertTransactions(txs []common.Transaction) error {
104+
func (c *ClickHouseConnector) insertTransactions(txs []common.Transaction) error {
104105
query := `
105106
INSERT INTO ` + c.cfg.Database + `.transactions (
106107
chain_id, hash, nonce, block_hash, block_number, block_timestamp, transaction_index,
@@ -142,7 +143,7 @@ func (c *ClickHouseConnector) InsertTransactions(txs []common.Transaction) error
142143
return batch.Send()
143144
}
144145

145-
func (c *ClickHouseConnector) InsertLogs(logs []common.Log) error {
146+
func (c *ClickHouseConnector) insertLogs(logs []common.Log) error {
146147
query := `
147148
INSERT INTO ` + c.cfg.Database + `.logs (
148149
chain_id, block_number, block_hash, block_timestamp, transaction_hash, transaction_index,
@@ -663,7 +664,7 @@ func (c *ClickHouseConnector) DeleteBlockData(data []common.BlockData) error {
663664
return batch.Send()
664665
}
665666

666-
func (c *ClickHouseConnector) InsertTraces(traces []common.Trace) error {
667+
func (c *ClickHouseConnector) insertTraces(traces []common.Trace) error {
667668
query := `
668669
INSERT INTO ` + c.cfg.Database + `.traces (
669670
chain_id, block_number, block_hash, block_timestamp, transaction_hash, transaction_index,
@@ -756,3 +757,77 @@ func (c *ClickHouseConnector) GetTraces(qf QueryFilter) (traces []common.Trace,
756757
}
757758
return traces, nil
758759
}
760+
761+
// TODO make this atomic
762+
func (c *ClickHouseConnector) InsertDataForBlocks(data []common.BlockData) error {
763+
blocks := make([]common.Block, 0, len(data))
764+
logs := make([]common.Log, 0)
765+
transactions := make([]common.Transaction, 0)
766+
traces := make([]common.Trace, 0)
767+
768+
for _, blockData := range data {
769+
blocks = append(blocks, blockData.Block)
770+
logs = append(logs, blockData.Logs...)
771+
transactions = append(transactions, blockData.Transactions...)
772+
traces = append(traces, blockData.Traces...)
773+
}
774+
775+
var saveErr error
776+
var saveErrMutex sync.Mutex
777+
var wg sync.WaitGroup
778+
779+
if len(blocks) > 0 {
780+
wg.Add(1)
781+
go func() {
782+
defer wg.Done()
783+
if err := c.insertBlocks(blocks); err != nil {
784+
saveErrMutex.Lock()
785+
saveErr = fmt.Errorf("error deleting blocks: %v", err)
786+
saveErrMutex.Unlock()
787+
}
788+
}()
789+
}
790+
791+
if len(logs) > 0 {
792+
wg.Add(1)
793+
go func() {
794+
defer wg.Done()
795+
if err := c.insertLogs(logs); err != nil {
796+
saveErrMutex.Lock()
797+
saveErr = fmt.Errorf("error deleting logs: %v", err)
798+
saveErrMutex.Unlock()
799+
}
800+
}()
801+
}
802+
803+
if len(transactions) > 0 {
804+
wg.Add(1)
805+
go func() {
806+
defer wg.Done()
807+
if err := c.insertTransactions(transactions); err != nil {
808+
saveErrMutex.Lock()
809+
saveErr = fmt.Errorf("error deleting transactions: %v", err)
810+
saveErrMutex.Unlock()
811+
}
812+
}()
813+
}
814+
815+
if len(traces) > 0 {
816+
wg.Add(1)
817+
go func() {
818+
defer wg.Done()
819+
if err := c.insertTraces(traces); err != nil {
820+
saveErrMutex.Lock()
821+
saveErr = fmt.Errorf("error deleting traces: %v", err)
822+
saveErrMutex.Unlock()
823+
}
824+
}()
825+
}
826+
827+
wg.Wait()
828+
829+
if saveErr != nil {
830+
return saveErr
831+
}
832+
return nil
833+
}

internal/storage/connector.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,12 +48,9 @@ type IStagingStorage interface {
4848
}
4949

5050
type IMainStorage interface {
51-
InsertBlocks(blocks []common.Block) error
52-
InsertTransactions(txs []common.Transaction) error
53-
InsertLogs(logs []common.Log) error
54-
InsertTraces(traces []common.Trace) error
51+
InsertDataForBlocks(data []common.BlockData) error
5552

56-
GetBlocks(qf QueryFilter) (logs []common.Block, err error)
53+
GetBlocks(qf QueryFilter) (blocks []common.Block, err error)
5754
GetTransactions(qf QueryFilter) (transactions QueryResult[common.Transaction], err error)
5855
GetLogs(qf QueryFilter) (logs QueryResult[common.Log], err error)
5956
GetTraces(qf QueryFilter) (traces []common.Trace, err error)

internal/storage/memory.go

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ func (m *MemoryConnector) DeleteBlockFailures(failures []common.BlockFailure) er
7373
return nil
7474
}
7575

76-
func (m *MemoryConnector) InsertBlocks(blocks []common.Block) error {
76+
func (m *MemoryConnector) insertBlocks(blocks []common.Block) error {
7777
for _, block := range blocks {
7878
blockJson, err := json.Marshal(block)
7979
if err != nil {
@@ -109,7 +109,7 @@ func (m *MemoryConnector) GetBlocks(qf QueryFilter) ([]common.Block, error) {
109109
return blocks, nil
110110
}
111111

112-
func (m *MemoryConnector) InsertTransactions(txs []common.Transaction) error {
112+
func (m *MemoryConnector) insertTransactions(txs []common.Transaction) error {
113113
for _, tx := range txs {
114114
txJson, err := json.Marshal(tx)
115115
if err != nil {
@@ -143,7 +143,7 @@ func (m *MemoryConnector) GetTransactions(qf QueryFilter) ([]common.Transaction,
143143
return txs, nil
144144
}
145145

146-
func (m *MemoryConnector) InsertLogs(logs []common.Log) error {
146+
func (m *MemoryConnector) insertLogs(logs []common.Log) error {
147147
for _, log := range logs {
148148
logJson, err := json.Marshal(log)
149149
if err != nil {
@@ -294,7 +294,7 @@ func (m *MemoryConnector) DeleteBlockData(data []common.BlockData) error {
294294
return nil
295295
}
296296

297-
func (m *MemoryConnector) InsertTraces(traces []common.Trace) error {
297+
func (m *MemoryConnector) insertTraces(traces []common.Trace) error {
298298
for _, trace := range traces {
299299
traceJson, err := json.Marshal(trace)
300300
if err != nil {
@@ -331,3 +331,31 @@ func (m *MemoryConnector) GetTraces(qf QueryFilter) ([]common.Trace, error) {
331331
func traceAddressToString(traceAddress []uint64) string {
332332
return strings.Trim(strings.Replace(fmt.Sprint(traceAddress), " ", ",", -1), "[]")
333333
}
334+
335+
func (m *MemoryConnector) InsertDataForBlocks(data []common.BlockData) error {
336+
blocks := make([]common.Block, 0, len(data))
337+
logs := make([]common.Log, 0)
338+
transactions := make([]common.Transaction, 0)
339+
traces := make([]common.Trace, 0)
340+
341+
for _, blockData := range data {
342+
blocks = append(blocks, blockData.Block)
343+
logs = append(logs, blockData.Logs...)
344+
transactions = append(transactions, blockData.Transactions...)
345+
traces = append(traces, blockData.Traces...)
346+
}
347+
348+
if err := m.insertBlocks(blocks); err != nil {
349+
return err
350+
}
351+
if err := m.insertLogs(logs); err != nil {
352+
return err
353+
}
354+
if err := m.insertTransactions(transactions); err != nil {
355+
return err
356+
}
357+
if err := m.insertTraces(traces); err != nil {
358+
return err
359+
}
360+
return nil
361+
}

0 commit comments

Comments
 (0)