Skip to content

Commit 3be0122

Browse files
committed
fix duplicate inserts
1 parent 8322feb commit 3be0122

File tree

7 files changed

+254
-13
lines changed

7 files changed

+254
-13
lines changed

internal/orchestrator/committer.go

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,16 +47,15 @@ func NewCommitter(rpc rpc.IRPCClient, storage storage.IStorage) *Committer {
4747

4848
func (c *Committer) Start(ctx context.Context) {
4949
interval := time.Duration(c.triggerIntervalMs) * time.Millisecond
50-
ticker := time.NewTicker(interval)
51-
defer ticker.Stop()
5250

5351
log.Debug().Msgf("Committer running")
5452
for {
5553
select {
5654
case <-ctx.Done():
5755
log.Info().Msg("Committer shutting down")
5856
return
59-
case <-ticker.C:
57+
default:
58+
time.Sleep(interval)
6059
blockDataToCommit, err := c.getSequentialBlockDataToCommit()
6160
if err != nil {
6261
log.Error().Err(err).Msg("Error getting block data to commit")
@@ -151,6 +150,21 @@ func (c *Committer) getSequentialBlockDataToCommit() (*[]common.BlockData, error
151150
}
152151

153152
func (c *Committer) commit(blockData *[]common.BlockData) error {
153+
locked, err := c.storage.OrchestratorStorage.IsCommittingLocked(c.rpc.GetChainID())
154+
if err != nil {
155+
return fmt.Errorf("error checking if committing is locked: %v", err)
156+
}
157+
if locked {
158+
log.Debug().Msg("Committing is locked, skipping commit")
159+
return nil
160+
} else {
161+
c.storage.OrchestratorStorage.SetCommittingLocked(c.rpc.GetChainID(), true)
162+
defer func() {
163+
if err := c.storage.OrchestratorStorage.SetCommittingLocked(c.rpc.GetChainID(), false); err != nil {
164+
log.Error().Err(err).Msg("Error unlocking committing")
165+
}
166+
}()
167+
}
154168
blockNumbers := make([]*big.Int, len(*blockData))
155169
for i, block := range *blockData {
156170
blockNumbers[i] = block.Block.Number

internal/orchestrator/committer_test.go

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -130,9 +130,11 @@ func TestCommit(t *testing.T) {
130130
mockRPC := mocks.NewMockIRPCClient(t)
131131
mockMainStorage := mocks.NewMockIMainStorage(t)
132132
mockStagingStorage := mocks.NewMockIStagingStorage(t)
133+
mockOrchestratorStorage := mocks.NewMockIOrchestratorStorage(t)
133134
mockStorage := storage.IStorage{
134-
MainStorage: mockMainStorage,
135-
StagingStorage: mockStagingStorage,
135+
MainStorage: mockMainStorage,
136+
StagingStorage: mockStagingStorage,
137+
OrchestratorStorage: mockOrchestratorStorage,
136138
}
137139
committer := NewCommitter(mockRPC, mockStorage)
138140

@@ -141,6 +143,10 @@ func TestCommit(t *testing.T) {
141143
{Block: common.Block{Number: big.NewInt(102)}},
142144
}
143145

146+
mockRPC.EXPECT().GetChainID().Return(big.NewInt(1))
147+
mockOrchestratorStorage.EXPECT().SetCommittingLocked(big.NewInt(1), true).Return(nil).Once()
148+
mockOrchestratorStorage.EXPECT().IsCommittingLocked(big.NewInt(1)).Return(false, nil)
149+
mockOrchestratorStorage.EXPECT().SetCommittingLocked(big.NewInt(1), false).Return(nil).Once()
144150
mockMainStorage.EXPECT().InsertBlockData(&blockData).Return(nil)
145151
mockStagingStorage.EXPECT().DeleteStagingData(&blockData).Return(nil)
146152

@@ -186,10 +192,12 @@ func TestStartCommitter(t *testing.T) {
186192
mockRPC := mocks.NewMockIRPCClient(t)
187193
mockMainStorage := mocks.NewMockIMainStorage(t)
188194
mockStagingStorage := mocks.NewMockIStagingStorage(t)
195+
mockOrchestratorStorage := mocks.NewMockIOrchestratorStorage(t)
189196

190197
mockStorage := storage.IStorage{
191-
MainStorage: mockMainStorage,
192-
StagingStorage: mockStagingStorage,
198+
MainStorage: mockMainStorage,
199+
StagingStorage: mockStagingStorage,
200+
OrchestratorStorage: mockOrchestratorStorage,
193201
}
194202

195203
committer := NewCommitter(mockRPC, mockStorage)
@@ -198,6 +206,9 @@ func TestStartCommitter(t *testing.T) {
198206
chainID := big.NewInt(1)
199207
mockRPC.EXPECT().GetChainID().Return(chainID)
200208
mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(100), nil)
209+
mockOrchestratorStorage.EXPECT().SetCommittingLocked(big.NewInt(1), true).Return(nil)
210+
mockOrchestratorStorage.EXPECT().IsCommittingLocked(big.NewInt(1)).Return(false, nil)
211+
mockOrchestratorStorage.EXPECT().SetCommittingLocked(big.NewInt(1), false).Return(nil)
201212

202213
blockData := []common.BlockData{
203214
{Block: common.Block{Number: big.NewInt(101)}},
@@ -218,9 +229,11 @@ func TestCommitterRespectsSIGTERM(t *testing.T) {
218229
mockRPC := mocks.NewMockIRPCClient(t)
219230
mockMainStorage := mocks.NewMockIMainStorage(t)
220231
mockStagingStorage := mocks.NewMockIStagingStorage(t)
232+
mockOrchestratorStorage := mocks.NewMockIOrchestratorStorage(t)
221233
mockStorage := storage.IStorage{
222-
MainStorage: mockMainStorage,
223-
StagingStorage: mockStagingStorage,
234+
MainStorage: mockMainStorage,
235+
StagingStorage: mockStagingStorage,
236+
OrchestratorStorage: mockOrchestratorStorage,
224237
}
225238

226239
committer := NewCommitter(mockRPC, mockStorage)
@@ -229,6 +242,9 @@ func TestCommitterRespectsSIGTERM(t *testing.T) {
229242
chainID := big.NewInt(1)
230243
mockRPC.EXPECT().GetChainID().Return(chainID)
231244
mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(100), nil)
245+
mockOrchestratorStorage.EXPECT().SetCommittingLocked(big.NewInt(1), true).Return(nil)
246+
mockOrchestratorStorage.EXPECT().IsCommittingLocked(big.NewInt(1)).Return(false, nil)
247+
mockOrchestratorStorage.EXPECT().SetCommittingLocked(big.NewInt(1), false).Return(nil)
232248

233249
blockData := []common.BlockData{
234250
{Block: common.Block{Number: big.NewInt(101)}},

internal/storage/clickhouse.go

Lines changed: 71 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1082,18 +1082,59 @@ func (c *ClickHouseConnector) deleteTraces(chainId *big.Int, blockNumbers []*big
10821082
})
10831083
}
10841084

1085+
func (c *ClickHouseConnector) getBlockExistenceMap(data *[]common.BlockData) (map[string]bool, error) {
1086+
if len(*data) == 0 {
1087+
return nil, nil
1088+
}
1089+
blockNumbers := make([]*big.Int, 0, len(*data))
1090+
for _, blockData := range *data {
1091+
blockNumbers = append(blockNumbers, blockData.Block.Number)
1092+
}
1093+
chainId := (*data)[0].Block.ChainId
1094+
query := fmt.Sprintf("SELECT block_number FROM %s.%s WHERE chain_id = ? AND block_number IN (?) GROUP BY block_number HAVING sum(sign) > 0", c.cfg.Database, c.getTableName(chainId, "blocks"))
1095+
rows, err := c.conn.Query(context.Background(), query, chainId, blockNumbers)
1096+
if err != nil {
1097+
return nil, err
1098+
}
1099+
defer rows.Close()
1100+
1101+
blockNumbersMap := make(map[string]bool)
1102+
for _, blockNumber := range blockNumbers {
1103+
blockNumbersMap[blockNumber.String()] = false
1104+
}
1105+
for rows.Next() {
1106+
var blockNumber string
1107+
err := rows.Scan(&blockNumber)
1108+
if err != nil {
1109+
return nil, err
1110+
}
1111+
blockNumbersMap[blockNumber] = true
1112+
}
1113+
return blockNumbersMap, nil
1114+
}
1115+
10851116
// TODO make this atomic
10861117
func (c *ClickHouseConnector) InsertBlockData(data *[]common.BlockData) error {
1118+
if len(*data) == 0 {
1119+
return nil
1120+
}
10871121
blocks := make([]common.Block, 0, len(*data))
10881122
logs := make([]common.Log, 0)
10891123
transactions := make([]common.Transaction, 0)
10901124
traces := make([]common.Trace, 0)
10911125

1126+
blockExistenceMap, err := c.getBlockExistenceMap(data)
1127+
if err != nil {
1128+
return err
1129+
}
1130+
10921131
for _, blockData := range *data {
1093-
blocks = append(blocks, blockData.Block)
1094-
logs = append(logs, blockData.Logs...)
1095-
transactions = append(transactions, blockData.Transactions...)
1096-
traces = append(traces, blockData.Traces...)
1132+
if !blockExistenceMap[blockData.Block.Number.String()] {
1133+
blocks = append(blocks, blockData.Block)
1134+
logs = append(logs, blockData.Logs...)
1135+
transactions = append(transactions, blockData.Transactions...)
1136+
traces = append(traces, blockData.Traces...)
1137+
}
10971138
}
10981139

10991140
var saveErr error
@@ -1416,3 +1457,29 @@ func (c *ClickHouseConnector) GetTokenBalances(qf BalancesQueryFilter) (QueryRes
14161457

14171458
return queryResult, nil
14181459
}
1460+
1461+
func (c *ClickHouseConnector) IsCommittingLocked(chainId *big.Int) (bool, error) {
1462+
query := fmt.Sprintf("SELECT cursor_value FROM %s.cursors FINAL WHERE chain_id = ? AND cursor_type = 'committing_lock' SETTINGS select_sequential_consistency = 1", c.cfg.Database)
1463+
var value string
1464+
err := c.conn.QueryRow(context.Background(), query, chainId).Scan(&value)
1465+
if err != nil {
1466+
if err == sql.ErrNoRows {
1467+
return false, nil
1468+
}
1469+
return false, err
1470+
}
1471+
if value == "1" {
1472+
return true, nil
1473+
}
1474+
return false, nil
1475+
}
1476+
1477+
func (c *ClickHouseConnector) SetCommittingLocked(chainId *big.Int, locked bool) error {
1478+
query := fmt.Sprintf("INSERT INTO %s.cursors (chain_id, cursor_type, cursor_value) VALUES (?, ?, ?)", c.cfg.Database)
1479+
value := "1"
1480+
if !locked {
1481+
value = "0"
1482+
}
1483+
err := c.conn.Exec(context.Background(), query, chainId, "committing_lock", value)
1484+
return err
1485+
}

internal/storage/connector.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ type IOrchestratorStorage interface {
5656
DeleteBlockFailures(failures []common.BlockFailure) error
5757
GetLastReorgCheckedBlockNumber(chainId *big.Int) (*big.Int, error)
5858
SetLastReorgCheckedBlockNumber(chainId *big.Int, blockNumber *big.Int) error
59+
IsCommittingLocked(chainId *big.Int) (bool, error)
60+
SetCommittingLocked(chainId *big.Int, locked bool) error
5961
}
6062

6163
type IStagingStorage interface {

internal/storage/memory.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -473,3 +473,20 @@ func (m *MemoryConnector) GetBlockHeadersDescending(chainId *big.Int, from *big.
473473
}
474474
return blockHeaders, nil
475475
}
476+
477+
func (m *MemoryConnector) IsCommittingLocked(chainId *big.Int) (bool, error) {
478+
value, ok := m.cache.Get(fmt.Sprintf("committing_lock:%s", chainId.String()))
479+
if !ok {
480+
return false, nil
481+
}
482+
return value == "1", nil
483+
}
484+
485+
func (m *MemoryConnector) SetCommittingLocked(chainId *big.Int, locked bool) error {
486+
value := "1"
487+
if !locked {
488+
value = "0"
489+
}
490+
m.cache.Add(fmt.Sprintf("committing_lock:%s", chainId.String()), value)
491+
return nil
492+
}

internal/storage/redis.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,3 +135,25 @@ func (r *RedisConnector) SetLastReorgCheckedBlockNumber(chainId *big.Int, blockN
135135
r.client.Set(ctx, fmt.Sprintf("reorg_check:%s", chainId.String()), blockNumber.String(), 0)
136136
return nil
137137
}
138+
139+
func (r *RedisConnector) IsCommittingLocked(chainId *big.Int) (bool, error) {
140+
ctx := context.Background()
141+
value, err := r.client.Get(ctx, fmt.Sprintf("committing_lock:%s", chainId.String())).Result()
142+
if err != nil {
143+
return false, err
144+
}
145+
if value == "1" {
146+
return true, nil
147+
}
148+
return false, nil
149+
}
150+
151+
func (r *RedisConnector) SetCommittingLocked(chainId *big.Int, locked bool) error {
152+
ctx := context.Background()
153+
value := "1"
154+
if !locked {
155+
value = "0"
156+
}
157+
r.client.Set(ctx, fmt.Sprintf("committing_lock:%s", chainId.String()), value, 0)
158+
return nil
159+
}

test/mocks/MockIOrchestratorStorage.go

Lines changed: 103 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)