Skip to content

Commit 28ede8c

Browse files
joanestebanrclaude
andcommitted
feat: migrate bridgesync claim data into claimsync DB on startup
Introduce `runImportFromBridgeSyncerIfNeeded` which copies block, claim, set_claim, unset_claim and key_value rows from an existing bridgesync SQLite database into the claimsync database before the syncers start. The migration is triggered at the call site in `start()`: - for L1 when BRIDGE or L1BRIDGESYNC are active - for L2 when any of BRIDGE, L2BRIDGESYNC, L2CLAIMSYNC, AGGSENDER, AGGSENDERVALIDATOR or AGGCHAINPROOFGEN are active It delegates to `claimsyncstorage.ImportDataFromBridgesyncer` and `claimsyncstorage.ImportKeyValueFromBridgesyncer`, both of which are idempotent and no-ops when the source DB has no relevant data. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 7e47af8 commit 28ede8c

File tree

3 files changed

+822
-0
lines changed

3 files changed

+822
-0
lines changed
Lines changed: 395 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,395 @@
1+
package storage
2+
3+
import (
4+
"context"
5+
"database/sql"
6+
"errors"
7+
"fmt"
8+
"strings"
9+
10+
"github.com/agglayer/aggkit/claimsync/storage/migrations"
11+
aggkitcommon "github.com/agglayer/aggkit/common"
12+
"github.com/agglayer/aggkit/db"
13+
"github.com/agglayer/aggkit/log"
14+
)
15+
16+
// requiredBridgeTables are the bridgesync tables that must all exist for the import to proceed.
17+
var requiredBridgeTables = []string{"block", "claim", "set_claim", "unset_claim"}
18+
19+
// requiredBridgeMigration is the ID of the last bridgesync migration that modifies the
20+
// schema of any of the tables listed in requiredBridgeTables.
21+
// The bridge DB must have applied at least this migration before we can safely import.
22+
// - bridgesync0012 – ALTER TABLE claim ADD COLUMN type
23+
const requiredBridgeMigration = "bridgesync0012"
24+
25+
// ImportDataFromBridgesyncer copies block, claim, set_claim and unset_claim data from a
26+
// bridgesync SQLite database (bridgeDBFilename) into the claimsync SQLite database
27+
// (claimDBFilename), creating and migrating it if it does not yet exist.
28+
//
29+
// The function is a no-op when the required source tables are absent in the bridge DB or
30+
// when none of claim/set_claim/unset_claim contain any rows. In that case the claimDB is
31+
// not created at all.
32+
// The import is idempotent: rows that already exist in the destination are silently
33+
// skipped (INSERT OR IGNORE).
34+
//
35+
// Column-level differences between schema versions are handled automatically:
36+
// - block.hash – present since bridgesync migration 0003; defaults to ''.
37+
// - claim.tx_hash – present since bridgesync migration 0002; defaults to ''.
38+
// - claim.block_timestamp – present since bridgesync migration 0002; defaults to 0.
39+
// - claim.type – present since bridgesync migration 0012; defaults to ''.
40+
func ImportDataFromBridgesyncer(ctx context.Context, logger aggkitcommon.Logger, bridgeDBFilename string, claimDBFilename string) error {
41+
if logger == nil {
42+
logger = log.WithFields("module", "ImportDataFromBridgesyncer")
43+
}
44+
45+
// Phase 1 – inspect the bridge DB without touching the claim DB.
46+
hasData, err := bridgeHasClaimData(ctx, bridgeDBFilename)
47+
if err != nil {
48+
return fmt.Errorf("ImportDataFromBridgesyncer: failed to inspect bridge DB: %w", err)
49+
}
50+
if !hasData {
51+
logger.Infof("no claim data found in bridge DB – skipping import")
52+
return nil
53+
}
54+
55+
// Phase 2 – open / create the claim DB and run migrations.
56+
claimDB, err := db.NewSQLiteDB(claimDBFilename)
57+
if err != nil {
58+
return fmt.Errorf("ImportDataFromBridgesyncer: failed to open claim DB: %w", err)
59+
}
60+
defer claimDB.Close()
61+
62+
if err := migrations.RunMigrations(logger, claimDB); err != nil {
63+
return fmt.Errorf("ImportDataFromBridgesyncer: failed to run claim DB migrations: %w", err)
64+
}
65+
66+
// Use a single connection so that ATTACH and the subsequent transaction share the
67+
// same SQLite connection (ATTACH is per-connection in SQLite).
68+
conn, err := claimDB.Conn(ctx)
69+
if err != nil {
70+
return fmt.Errorf("ImportDataFromBridgesyncer: failed to acquire DB connection: %w", err)
71+
}
72+
defer conn.Close()
73+
74+
// ATTACH the bridge DB so we can SELECT from it in the same query.
75+
attachSQL := fmt.Sprintf(`ATTACH DATABASE 'file:%s' AS bridge`, bridgeDBFilename)
76+
if _, err := conn.ExecContext(ctx, attachSQL); err != nil {
77+
return fmt.Errorf("ImportDataFromBridgesyncer: failed to attach bridge DB: %w", err)
78+
}
79+
defer conn.ExecContext(ctx, `DETACH DATABASE bridge`) //nolint:errcheck
80+
81+
hasBlockHash, err := bridgeColumnExists(ctx, conn, "block", "hash")
82+
if err != nil {
83+
return err
84+
}
85+
hasClaimTxHash, err := bridgeColumnExists(ctx, conn, "claim", "tx_hash")
86+
if err != nil {
87+
return err
88+
}
89+
hasClaimBlockTimestamp, err := bridgeColumnExists(ctx, conn, "claim", "block_timestamp")
90+
if err != nil {
91+
return err
92+
}
93+
hasClaimType, err := bridgeColumnExists(ctx, conn, "claim", "type")
94+
if err != nil {
95+
return err
96+
}
97+
98+
tx, err := conn.BeginTx(ctx, nil)
99+
if err != nil {
100+
return fmt.Errorf("ImportDataFromBridgesyncer: failed to begin transaction: %w", err)
101+
}
102+
defer tx.Rollback() //nolint:errcheck
103+
104+
blocksImported, err := importBlocks(tx, hasBlockHash)
105+
if err != nil {
106+
return err
107+
}
108+
claimsImported, err := importClaims(tx, hasClaimTxHash, hasClaimBlockTimestamp, hasClaimType)
109+
if err != nil {
110+
return err
111+
}
112+
unsetClaimsImported, err := importUnsetClaims(tx)
113+
if err != nil {
114+
return err
115+
}
116+
setClaimsImported, err := importSetClaims(tx)
117+
if err != nil {
118+
return err
119+
}
120+
121+
if err := tx.Commit(); err != nil {
122+
return fmt.Errorf("ImportDataFromBridgesyncer: failed to commit transaction: %w", err)
123+
}
124+
125+
logger.Infof("import from bridgesyncer complete: blocks=%d claims=%d set_claims=%d unset_claims=%d",
126+
blocksImported, claimsImported, setClaimsImported, unsetClaimsImported)
127+
return nil
128+
}
129+
130+
// bridgeHasClaimData opens bridgeDBFilename directly, checks that all required tables
131+
// exist, and returns true if any of claim/set_claim/unset_claim contain at least one row.
132+
func bridgeHasClaimData(ctx context.Context, bridgeDBFilename string) (bool, error) {
133+
bdb, err := db.NewSQLiteDB(bridgeDBFilename)
134+
if err != nil {
135+
return false, fmt.Errorf("bridgeHasClaimData: failed to open bridge DB: %w", err)
136+
}
137+
defer bdb.Close()
138+
139+
conn, err := bdb.Conn(ctx)
140+
if err != nil {
141+
return false, fmt.Errorf("bridgeHasClaimData: failed to acquire connection: %w", err)
142+
}
143+
defer conn.Close()
144+
145+
// Re-use the existing helper but against the main schema of the bridge DB.
146+
present, err := checkBridgeTablesOnConn(ctx, conn)
147+
if err != nil {
148+
return false, err
149+
}
150+
if !present {
151+
return false, nil
152+
}
153+
154+
if err := checkBridgeMigration(ctx, conn); err != nil {
155+
return false, err
156+
}
157+
158+
var count int
159+
err = conn.QueryRowContext(ctx, `
160+
SELECT COUNT(*) FROM (
161+
SELECT 1 FROM (SELECT 1 FROM claim LIMIT 1)
162+
UNION ALL
163+
SELECT 1 FROM (SELECT 1 FROM set_claim LIMIT 1)
164+
UNION ALL
165+
SELECT 1 FROM (SELECT 1 FROM unset_claim LIMIT 1)
166+
)`).Scan(&count)
167+
if err != nil {
168+
return false, fmt.Errorf("bridgeHasClaimData: failed to count claim rows: %w", err)
169+
}
170+
return count > 0, nil
171+
}
172+
173+
// checkBridgeMigration returns an error if requiredBridgeMigration has not been applied
174+
// to the bridge DB, which means its schema may be incomplete for a safe import.
175+
func checkBridgeMigration(ctx context.Context, conn *sql.Conn) error {
176+
var count int
177+
err := conn.QueryRowContext(ctx,
178+
`SELECT COUNT(*) FROM gorp_migrations WHERE id = $1`, requiredBridgeMigration).
179+
Scan(&count)
180+
if err != nil {
181+
return fmt.Errorf("checkBridgeMigration: failed to query gorp_migrations: %w", err)
182+
}
183+
if count == 0 {
184+
return fmt.Errorf("checkBridgeMigration: bridge DB has not applied required migration %q", requiredBridgeMigration)
185+
}
186+
return nil
187+
}
188+
189+
// checkBridgeTablesOnConn returns true only when all requiredBridgeTables exist in the
190+
// main schema of the given connection.
191+
func checkBridgeTablesOnConn(ctx context.Context, conn *sql.Conn) (bool, error) {
192+
placeholders := make([]string, len(requiredBridgeTables))
193+
args := make([]any, len(requiredBridgeTables))
194+
for i, name := range requiredBridgeTables {
195+
placeholders[i] = fmt.Sprintf("$%d", i+1)
196+
args[i] = name
197+
}
198+
query := fmt.Sprintf(
199+
`SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name IN (%s)`,
200+
strings.Join(placeholders, ","),
201+
)
202+
var count int
203+
if err := conn.QueryRowContext(ctx, query, args...).Scan(&count); err != nil {
204+
return false, fmt.Errorf("checkBridgeTablesOnConn: %w", err)
205+
}
206+
return count == len(requiredBridgeTables), nil
207+
}
208+
209+
// bridgeColumnExists reports whether the given column exists in the named table of the
210+
// attached 'bridge' schema by inspecting PRAGMA table_info.
211+
func bridgeColumnExists(ctx context.Context, conn *sql.Conn, tableName, columnName string) (bool, error) {
212+
rows, err := conn.QueryContext(ctx, fmt.Sprintf(`PRAGMA bridge.table_info(%s)`, tableName))
213+
if err != nil {
214+
return false, fmt.Errorf("bridgeColumnExists: PRAGMA table_info(%s): %w", tableName, err)
215+
}
216+
defer rows.Close()
217+
218+
for rows.Next() {
219+
var cid int
220+
var name, colType string
221+
var notNull int
222+
var dfltValue sql.NullString
223+
var pk int
224+
if err := rows.Scan(&cid, &name, &colType, &notNull, &dfltValue, &pk); err != nil {
225+
return false, fmt.Errorf("bridgeColumnExists: scan table_info(%s): %w", tableName, err)
226+
}
227+
if name == columnName {
228+
return true, nil
229+
}
230+
}
231+
return false, rows.Err()
232+
}
233+
234+
func importBlocks(tx *sql.Tx, hasHash bool) (int64, error) {
235+
hashExpr := "''"
236+
if hasHash {
237+
hashExpr = "COALESCE(hash, '')"
238+
}
239+
result, err := tx.Exec(fmt.Sprintf(
240+
`INSERT OR IGNORE INTO main.block (num, hash) SELECT num, %s FROM bridge.block`,
241+
hashExpr,
242+
))
243+
if err != nil {
244+
return 0, fmt.Errorf("ImportDataFromBridgesyncer: failed to import blocks: %w", err)
245+
}
246+
n, _ := result.RowsAffected()
247+
return n, nil
248+
}
249+
250+
func importClaims(tx *sql.Tx, hasTxHash, hasBlockTimestamp, hasType bool) (int64, error) {
251+
txHashExpr := "''"
252+
if hasTxHash {
253+
txHashExpr = "COALESCE(tx_hash, '')"
254+
}
255+
blockTimestampExpr := "0"
256+
if hasBlockTimestamp {
257+
blockTimestampExpr = "COALESCE(block_timestamp, 0)"
258+
}
259+
typeExpr := "''"
260+
if hasType {
261+
typeExpr = "COALESCE(type, '')"
262+
}
263+
result, err := tx.Exec(fmt.Sprintf(`
264+
INSERT OR IGNORE INTO main.claim (
265+
block_num, block_pos, tx_hash, global_index,
266+
origin_network, origin_address, destination_address, amount,
267+
proof_local_exit_root, proof_rollup_exit_root,
268+
mainnet_exit_root, rollup_exit_root, global_exit_root,
269+
destination_network, metadata, is_message, block_timestamp, type
270+
)
271+
SELECT
272+
block_num, block_pos, %s, global_index,
273+
origin_network, origin_address, destination_address, amount,
274+
proof_local_exit_root, proof_rollup_exit_root,
275+
mainnet_exit_root, rollup_exit_root, global_exit_root,
276+
destination_network, metadata, is_message, %s, %s
277+
FROM bridge.claim`, txHashExpr, blockTimestampExpr, typeExpr))
278+
if err != nil {
279+
return 0, fmt.Errorf("ImportDataFromBridgesyncer: failed to import claims: %w", err)
280+
}
281+
n, _ := result.RowsAffected()
282+
return n, nil
283+
}
284+
285+
func importUnsetClaims(tx *sql.Tx) (int64, error) {
286+
result, err := tx.Exec(`
287+
INSERT OR IGNORE INTO main.unset_claim
288+
(block_num, block_pos, tx_hash, global_index, unset_global_index_hash_chain, created_at)
289+
SELECT block_num, block_pos, tx_hash, global_index, unset_global_index_hash_chain, created_at
290+
FROM bridge.unset_claim`)
291+
if err != nil {
292+
return 0, fmt.Errorf("ImportDataFromBridgesyncer: failed to import unset_claims: %w", err)
293+
}
294+
n, _ := result.RowsAffected()
295+
return n, nil
296+
}
297+
298+
func importSetClaims(tx *sql.Tx) (int64, error) {
299+
result, err := tx.Exec(`
300+
INSERT OR IGNORE INTO main.set_claim
301+
(block_num, block_pos, tx_hash, global_index, created_at)
302+
SELECT block_num, block_pos, tx_hash, global_index, created_at
303+
FROM bridge.set_claim`)
304+
if err != nil {
305+
return 0, fmt.Errorf("ImportDataFromBridgesyncer: failed to import set_claims: %w", err)
306+
}
307+
n, _ := result.RowsAffected()
308+
return n, nil
309+
}
310+
311+
// ImportKeyValueFromBridgesyncer copies the single key_value row from the bridgesync
312+
// SQLite database (bridgeDBFilename) into the claimsync SQLite database (claimDBFilename),
313+
// replacing the original owner value with the provided owner parameter.
314+
//
315+
// The function is a no-op when the key_value table does not exist in the bridge DB or
316+
// contains no rows. In that case the claimDB is not created at all.
317+
// The import is idempotent: an existing row with the same (owner, key) is silently skipped
318+
// (INSERT OR IGNORE).
319+
func ImportKeyValueFromBridgesyncer(bridgeDBFilename string, claimDBFilename string, owner string) error {
320+
logger := log.WithFields("module", "ImportKeyValueFromBridgesyncer")
321+
ctx := context.Background()
322+
323+
// Phase 1 – read the single key_value row from the bridge DB without touching the claim DB.
324+
row, err := readBridgeKeyValueRow(ctx, bridgeDBFilename)
325+
if err != nil {
326+
return fmt.Errorf("ImportKeyValueFromBridgesyncer: failed to read bridge key_value: %w", err)
327+
}
328+
if row == nil {
329+
logger.Infof("no key_value data found in bridge DB – skipping import")
330+
return nil
331+
}
332+
333+
// Phase 2 – open / create the claim DB, run migrations and insert the row.
334+
claimDB, err := db.NewSQLiteDB(claimDBFilename)
335+
if err != nil {
336+
return fmt.Errorf("ImportKeyValueFromBridgesyncer: failed to open claim DB: %w", err)
337+
}
338+
defer claimDB.Close()
339+
340+
if err := migrations.RunMigrations(logger, claimDB); err != nil {
341+
return fmt.Errorf("ImportKeyValueFromBridgesyncer: failed to run claim DB migrations: %w", err)
342+
}
343+
344+
_, err = claimDB.ExecContext(ctx, `
345+
INSERT OR IGNORE INTO key_value (owner, key, value, updated_at)
346+
VALUES ($1, $2, $3, $4)`,
347+
owner, row.key, row.value, row.updatedAt)
348+
if err != nil {
349+
return fmt.Errorf("ImportKeyValueFromBridgesyncer: failed to insert key_value row: %w", err)
350+
}
351+
352+
logger.Infof("key_value import from bridgesyncer complete (owner=%s key=%s)", owner, row.key)
353+
return nil
354+
}
355+
356+
// keyValueRow holds the fields of a key_value table row.
357+
type keyValueRow struct {
358+
key string
359+
value string
360+
updatedAt int64
361+
}
362+
363+
// readBridgeKeyValueRow opens bridgeDBFilename and returns the single key_value row, or
364+
// nil if the table does not exist or is empty.
365+
func readBridgeKeyValueRow(ctx context.Context, bridgeDBFilename string) (*keyValueRow, error) {
366+
bdb, err := db.NewSQLiteDB(bridgeDBFilename)
367+
if err != nil {
368+
return nil, fmt.Errorf("readBridgeKeyValueRow: failed to open bridge DB: %w", err)
369+
}
370+
defer bdb.Close()
371+
372+
// Check that the key_value table exists.
373+
var tableCount int
374+
err = bdb.QueryRowContext(ctx,
375+
`SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='key_value'`).
376+
Scan(&tableCount)
377+
if err != nil {
378+
return nil, fmt.Errorf("readBridgeKeyValueRow: failed to check key_value table: %w", err)
379+
}
380+
if tableCount == 0 {
381+
return nil, nil
382+
}
383+
384+
row := &keyValueRow{}
385+
err = bdb.QueryRowContext(ctx, `SELECT key, value, updated_at FROM key_value LIMIT 1`).
386+
Scan(&row.key, &row.value, &row.updatedAt)
387+
if err != nil {
388+
if errors.Is(err, sql.ErrNoRows) {
389+
return nil, nil
390+
}
391+
return nil, fmt.Errorf("readBridgeKeyValueRow: failed to read row: %w", err)
392+
}
393+
return row, nil
394+
}
395+

0 commit comments

Comments
 (0)