Skip to content

Commit 6ce836a

Browse files
authored
Fix reorg backward let (#890)
* deposit_backup table * query * ctx * restore deposits when backwardLet is reorged * fix test + mocks * linter
1 parent 40790e5 commit 6ce836a

File tree

13 files changed

+543
-58
lines changed

13 files changed

+543
-58
lines changed

bridgectrl/merkletree_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ func TestMTGetProof(t *testing.T) {
169169
BlockNumber: uint64(li + 1), // nolint:gosec
170170
BlockHash: common.HexToHash("0x29e885edaf8e4b51e1d2e05f9da28161d2fb4f6b1d53827d9b80a23cf2d7d9fc"),
171171
}
172-
blockID, err := testStore.AddBlock(context.TODO(), block, nil)
172+
blockID, err := testStore.AddBlock(ctx, block, nil)
173173
require.NoError(t, err)
174174
deposit := &etherman.Deposit{
175175
OriginalNetwork: leaf.OriginalNetwork,

db/pgstorage/migrations/0022.sql

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
-- +migrate Up
2+
3+
CREATE TABLE IF NOT EXISTS sync.deposit_backup
4+
(
5+
leaf_type INTEGER,
6+
network_id BIGINT,
7+
orig_net BIGINT,
8+
orig_addr BYTEA NOT NULL,
9+
amount VARCHAR,
10+
dest_net BIGINT NOT NULL,
11+
dest_addr BYTEA NOT NULL,
12+
block_id BIGINT REFERENCES sync.block (id) ON DELETE CASCADE,
13+
deposit_cnt BIGINT,
14+
tx_hash BYTEA NOT NULL,
15+
metadata BYTEA NOT NULL,
16+
id BIGSERIAL,
17+
ready_for_claim BOOLEAN DEFAULT false NOT NULL,
18+
backward_let_id BIGINT NOT NULL,
19+
PRIMARY KEY (id)
20+
);
21+
22+
-- Add index on backward_let_id for faster orphan lookups and JOIN operations
23+
CREATE INDEX IF NOT EXISTS idx_deposit_backup_backward_let_id
24+
ON sync.deposit_backup(backward_let_id);
25+
26+
-- +migrate Down
27+
28+
DROP INDEX IF EXISTS sync.idx_deposit_backup_backward_let_id;
29+
DROP TABLE IF EXISTS sync.deposit_backup;
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
package migrations_test
2+
3+
import (
4+
"database/sql"
5+
"testing"
6+
7+
"github.com/stretchr/testify/assert"
8+
)
9+
10+
type migrationTest0022 struct{}
11+
12+
func (m migrationTest0022) InsertData(db *sql.DB) error {
13+
block := "INSERT INTO sync.block (id, block_num, block_hash, network_id) VALUES(1, 1, decode('C9B5033799ADF3739383A0489EFBE8A0D4D5E4478778A4F4304562FD51AE4C07','hex'), 0);"
14+
if _, err := db.Exec(block); err != nil {
15+
return err
16+
}
17+
return nil
18+
}
19+
20+
func (m migrationTest0022) RunAssertsAfterMigrationUp(t *testing.T, db *sql.DB) {
21+
insertDepositBackup := `INSERT INTO sync.deposit_backup(leaf_type, network_id, orig_net, orig_addr, amount, dest_net, dest_addr, block_id, deposit_cnt, tx_hash, metadata, ready_for_claim, backward_let_id)
22+
VALUES(0, 1, 0, decode('A9B5033799ADF3739383A0489EFBE8A0D4D5E4478778A4F4304562FD51AE4C08','hex'), '1000000000000000000', 1, decode('B9B5033799ADF3739383A0489EFBE8A0D4D5E4478778A4F4304562FD51AE4C09','hex'), 1, 5, decode('C9B5033799ADF3739383A0489EFBE8A0D4D5E4478778A4F4304562FD51AE4C10','hex'), decode('D9B5033799ADF3739383A0489EFBE8A0D4D5E4478778A4F4304562FD51AE4C11','hex'), true, 1);`
23+
_, err := db.Exec(insertDepositBackup)
24+
assert.NoError(t, err)
25+
26+
var count uint32
27+
selectCount := "SELECT count(*) FROM sync.deposit_backup"
28+
err = db.QueryRow(selectCount).Scan(&count)
29+
assert.NoError(t, err)
30+
assert.Equal(t, uint32(1), count)
31+
32+
// Verify the data was inserted correctly
33+
var leafType, networkID, origNet, destNet, depositCnt, backwardLetID int64
34+
var amount string
35+
var readyForClaim bool
36+
selectData := "SELECT leaf_type, network_id, orig_net, amount, dest_net, deposit_cnt, ready_for_claim, backward_let_id FROM sync.deposit_backup WHERE id = 1"
37+
err = db.QueryRow(selectData).Scan(&leafType, &networkID, &origNet, &amount, &destNet, &depositCnt, &readyForClaim, &backwardLetID)
38+
assert.NoError(t, err)
39+
assert.Equal(t, int64(0), leafType)
40+
assert.Equal(t, int64(1), networkID)
41+
assert.Equal(t, int64(0), origNet)
42+
assert.Equal(t, "1000000000000000000", amount)
43+
assert.Equal(t, int64(1), destNet)
44+
assert.Equal(t, int64(5), depositCnt)
45+
assert.Equal(t, true, readyForClaim)
46+
assert.Equal(t, int64(1), backwardLetID)
47+
48+
// Verify the index was created
49+
var indexExists bool
50+
checkIndex := `SELECT EXISTS (
51+
SELECT 1 FROM pg_indexes
52+
WHERE schemaname = 'sync'
53+
AND tablename = 'deposit_backup'
54+
AND indexname = 'idx_deposit_backup_backward_let_id'
55+
)`
56+
err = db.QueryRow(checkIndex).Scan(&indexExists)
57+
assert.NoError(t, err)
58+
assert.True(t, indexExists, "Index idx_deposit_backup_backward_let_id should exist")
59+
}
60+
61+
func (m migrationTest0022) RunAssertsAfterMigrationDown(t *testing.T, db *sql.DB) {
62+
// Verify the table was dropped
63+
var count uint32
64+
selectCount := "SELECT count(*) FROM sync.deposit_backup"
65+
err := db.QueryRow(selectCount).Scan(&count)
66+
assert.Error(t, err)
67+
68+
// Verify the index was also removed
69+
var indexExists bool
70+
checkIndex := `SELECT EXISTS (
71+
SELECT 1 FROM pg_indexes
72+
WHERE schemaname = 'sync'
73+
AND tablename = 'deposit_backup'
74+
AND indexname = 'idx_deposit_backup_backward_let_id'
75+
)`
76+
err = db.QueryRow(checkIndex).Scan(&indexExists)
77+
assert.NoError(t, err)
78+
assert.False(t, indexExists, "Index idx_deposit_backup_backward_let_id should not exist after migration down")
79+
}
80+
81+
func TestMigration0022(t *testing.T) {
82+
runMigrationTest(t, 22, migrationTest0022{})
83+
}

db/pgstorage/pgstorage.go

Lines changed: 65 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -922,22 +922,47 @@ func (p *PostgresStorage) GetSyncStatus(ctx context.Context, dbTx interface{}) (
922922
}
923923

924924
// ResetDeposits resets the state to a depositCount.
925-
func (p *PostgresStorage) ResetDeposits(ctx context.Context, depositCount uint32, networkID uint32, dbTx interface{}) error {
925+
func (p *PostgresStorage) ResetDeposits(ctx context.Context, depositCount uint32, networkID uint32, backwardLETID uint64, dbTx interface{}) error {
926926
if networkID == 0 {
927927
return errors.New("cannot reset L1 deposits")
928928
}
929-
const resetSQL = "DELETE FROM sync.deposit WHERE deposit_cnt >= $1 AND network_id = $2"
929+
// Store the deposits in other table and remove from deposit table
930+
// Using CTE to ensure deposits are returned in ascending order by deposit_cnt
931+
const resetSQL = `
932+
WITH deleted_deposits AS (
933+
DELETE FROM sync.deposit
934+
WHERE deposit_cnt >= $1 AND network_id = $2
935+
RETURNING id, leaf_type, orig_net, orig_addr, amount, dest_net, dest_addr, deposit_cnt, block_id, network_id, tx_hash, metadata, ready_for_claim
936+
)
937+
SELECT id, leaf_type, orig_net, orig_addr, amount, dest_net, dest_addr, deposit_cnt, block_id, network_id, tx_hash, metadata, ready_for_claim
938+
FROM deleted_deposits
939+
ORDER BY deposit_cnt ASC`
930940
e := p.getExecQuerier(dbTx)
931-
_, err := e.Exec(ctx, resetSQL, depositCount, networkID)
932-
return err
941+
rows, err := e.Query(ctx, resetSQL, depositCount, networkID)
942+
if err != nil {
943+
return err
944+
}
945+
defer rows.Close()
946+
deposits, err := parseDeposits(rows, false)
947+
if err != nil {
948+
return err
949+
}
950+
const addDepositSQL = "INSERT INTO sync.deposit_backup (leaf_type, network_id, orig_net, orig_addr, amount, dest_net, dest_addr, block_id, deposit_cnt, tx_hash, metadata, ready_for_claim, backward_let_id) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)"
951+
for _, deposit := range deposits {
952+
_, err = e.Exec(ctx, addDepositSQL, deposit.LeafType, deposit.NetworkID, deposit.OriginalNetwork, deposit.OriginalAddress, deposit.Amount.String(), deposit.DestinationNetwork, deposit.DestinationAddress, deposit.BlockID, deposit.DepositCount, deposit.TxHash, deposit.Metadata, deposit.ReadyForClaim, backwardLETID)
953+
if err != nil {
954+
return err
955+
}
956+
}
957+
return nil
933958
}
934959

935960
// AddBackwardLET adds a new BackwardLET event to the db.
936-
func (p *PostgresStorage) AddBackwardLET(ctx context.Context, backwardLET *etherman.BackwardLET, dbTx interface{}) error {
937-
const addExitRootSQL = "INSERT INTO sync.backward_let(block_id, previous_deposit_cnt, previous_root, new_deposit_cnt, new_root) VALUES ($1, $2, $3, $4, $5)"
938-
e := p.getExecQuerier(dbTx)
939-
_, err := e.Exec(ctx, addExitRootSQL, backwardLET.BlockID, backwardLET.PreviousDepositCount, backwardLET.PreviousRoot, backwardLET.NewDepositCount, backwardLET.NewRoot)
940-
return err
961+
func (p *PostgresStorage) AddBackwardLET(ctx context.Context, backwardLET *etherman.BackwardLET, dbTx interface{}) (uint64, error) {
962+
const addExitRootSQL = "INSERT INTO sync.backward_let(block_id, previous_deposit_cnt, previous_root, new_deposit_cnt, new_root) VALUES ($1, $2, $3, $4, $5) RETURNING id"
963+
var id uint64
964+
err := p.getExecQuerier(dbTx).QueryRow(ctx, addExitRootSQL, backwardLET.BlockID, backwardLET.PreviousDepositCount, backwardLET.PreviousRoot, backwardLET.NewDepositCount, backwardLET.NewRoot).Scan(&id)
965+
return id, err
941966
}
942967

943968
// AddForwardLET adds a new ForwardLET event to the db.
@@ -983,6 +1008,37 @@ func (p *PostgresStorage) GetLastComputedRoot(ctx context.Context, networkID uin
9831008
return root, nil
9841009
}
9851010

1011+
// GetAndDeleteOrphanDepositBackups finds and deletes deposit_backup records whose backward_let_id
1012+
// does not exist in the backward_let table, returning the deleted deposits.
1013+
// This uses a single optimized DELETE query with RETURNING clause and LEFT JOIN for better performance.
1014+
func (p *PostgresStorage) GetAndDeleteOrphanDepositBackups(ctx context.Context, dbTx interface{}) ([]*etherman.Deposit, error) {
1015+
// Single query that deletes orphan deposits and returns them in one operation
1016+
// Using LEFT JOIN with NULL check is often faster than NOT EXISTS for this use case
1017+
const deleteAndReturnOrphanDepositsSQL = `
1018+
DELETE FROM sync.deposit_backup AS db
1019+
USING (
1020+
SELECT db2.id
1021+
FROM sync.deposit_backup AS db2
1022+
LEFT JOIN sync.backward_let AS bl ON bl.id = db2.backward_let_id
1023+
WHERE bl.id IS NULL
1024+
ORDER BY db2.deposit_cnt ASC
1025+
) AS orphans
1026+
WHERE db.id = orphans.id
1027+
RETURNING db.id, db.leaf_type, db.orig_net, db.orig_addr, db.amount, db.dest_net,
1028+
db.dest_addr, db.deposit_cnt, db.block_id, db.network_id, db.tx_hash,
1029+
db.metadata, db.ready_for_claim`
1030+
1031+
e := p.getExecQuerier(dbTx)
1032+
rows, err := e.Query(ctx, deleteAndReturnOrphanDepositsSQL)
1033+
if err != nil {
1034+
return nil, err
1035+
}
1036+
defer rows.Close()
1037+
1038+
// Parse and return the deleted deposits
1039+
return parseDeposits(rows, false)
1040+
}
1041+
9861042
// UpdateDepositsStatusForTesting updates the ready_for_claim status of all deposits for testing.
9871043
func (p *PostgresStorage) UpdateDepositsStatusForTesting(ctx context.Context, dbTx interface{}) error {
9881044
const updateDepositsStatusSQL = "UPDATE sync.deposit SET ready_for_claim = true;"

0 commit comments

Comments
 (0)