Skip to content

Commit 8736fca

Browse files
committed
graph/db+sqldb: make channel SQL mig retry-safe
In this commit, we make the channel part of the graph SQL migration idempotent (retry-safe!). We do this by adding a migration-only channel insert query that will not error out if a the query is called and a chanenl with the given scid&version already exists. We also ensure that errors are not thrown if existing channel features & extra types are re-added.
1 parent a291d6f commit 8736fca

File tree

6 files changed

+267
-61
lines changed

6 files changed

+267
-61
lines changed

graph/db/sql_migration.go

Lines changed: 111 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -524,7 +524,7 @@ func migrateChannelsAndPolicies(ctx context.Context, cfg *SQLStoreConfig,
524524
chunk++
525525

526526
// Migrate the channel info along with its policies.
527-
dbChanInfo, err := insertChannel(ctx, sqlDB, channel)
527+
dbChanInfo, err := insertChannelMig(ctx, sqlDB, channel)
528528
if err != nil {
529529
return fmt.Errorf("could not insert record for "+
530530
"channel %d in SQL store: %w", scid, err)
@@ -577,8 +577,13 @@ func migrateChannelsAndPolicies(ctx context.Context, cfg *SQLStoreConfig,
577577

578578
return nil
579579
}, func() {
580-
// No reset is needed since if a retry occurs, the entire
581-
// migration will be retried from the start.
580+
channelCount = 0
581+
policyCount = 0
582+
chunk = 0
583+
skippedChanCount = 0
584+
skippedPolicyCount = 0
585+
t0 = time.Now()
586+
batch = make(map[int64]*migChanInfo, cfg.QueryCfg.MaxBatchSize)
582587
})
583588
if err != nil {
584589
return fmt.Errorf("could not migrate channels and policies: %w",
@@ -1452,3 +1457,106 @@ func insertNodeSQLMig(ctx context.Context, db SQLQueries,
14521457

14531458
return nodeID, nil
14541459
}
1460+
1461+
// dbChanInfo holds the DB level IDs of a channel and the nodes involved in the
1462+
// channel.
1463+
type dbChanInfo struct {
1464+
channelID int64
1465+
node1ID int64
1466+
node2ID int64
1467+
}
1468+
1469+
// insertChannelMig inserts a new channel record into the database during the
1470+
// graph SQL migration.
1471+
func insertChannelMig(ctx context.Context, db SQLQueries,
1472+
edge *models.ChannelEdgeInfo) (*dbChanInfo, error) {
1473+
1474+
// Make sure that at least a "shell" entry for each node is present in
1475+
// the nodes table.
1476+
//
1477+
// NOTE: we need this even during the SQL migration where nodes are
1478+
// migrated first because there are cases were some nodes may have
1479+
// been skipped due to invalid TLV data.
1480+
node1DBID, err := maybeCreateShellNode(ctx, db, edge.NodeKey1Bytes)
1481+
if err != nil {
1482+
return nil, fmt.Errorf("unable to create shell node: %w", err)
1483+
}
1484+
1485+
node2DBID, err := maybeCreateShellNode(ctx, db, edge.NodeKey2Bytes)
1486+
if err != nil {
1487+
return nil, fmt.Errorf("unable to create shell node: %w", err)
1488+
}
1489+
1490+
var capacity sql.NullInt64
1491+
if edge.Capacity != 0 {
1492+
capacity = sqldb.SQLInt64(int64(edge.Capacity))
1493+
}
1494+
1495+
createParams := sqlc.InsertChannelMigParams{
1496+
Version: int16(ProtocolV1),
1497+
Scid: channelIDToBytes(edge.ChannelID),
1498+
NodeID1: node1DBID,
1499+
NodeID2: node2DBID,
1500+
Outpoint: edge.ChannelPoint.String(),
1501+
Capacity: capacity,
1502+
BitcoinKey1: edge.BitcoinKey1Bytes[:],
1503+
BitcoinKey2: edge.BitcoinKey2Bytes[:],
1504+
}
1505+
1506+
if edge.AuthProof != nil {
1507+
proof := edge.AuthProof
1508+
1509+
createParams.Node1Signature = proof.NodeSig1Bytes
1510+
createParams.Node2Signature = proof.NodeSig2Bytes
1511+
createParams.Bitcoin1Signature = proof.BitcoinSig1Bytes
1512+
createParams.Bitcoin2Signature = proof.BitcoinSig2Bytes
1513+
}
1514+
1515+
// Insert the new channel record.
1516+
dbChanID, err := db.InsertChannelMig(ctx, createParams)
1517+
if err != nil {
1518+
return nil, err
1519+
}
1520+
1521+
// Insert any channel features.
1522+
for feature := range edge.Features.Features() {
1523+
err = db.InsertChannelFeature(
1524+
ctx, sqlc.InsertChannelFeatureParams{
1525+
ChannelID: dbChanID,
1526+
FeatureBit: int32(feature),
1527+
},
1528+
)
1529+
if err != nil {
1530+
return nil, fmt.Errorf("unable to insert channel(%d) "+
1531+
"feature(%v): %w", dbChanID, feature, err)
1532+
}
1533+
}
1534+
1535+
// Finally, insert any extra TLV fields in the channel announcement.
1536+
extra, err := marshalExtraOpaqueData(edge.ExtraOpaqueData)
1537+
if err != nil {
1538+
return nil, fmt.Errorf("unable to marshal extra opaque "+
1539+
"data: %w", err)
1540+
}
1541+
1542+
for tlvType, value := range extra {
1543+
err := db.UpsertChannelExtraType(
1544+
ctx, sqlc.UpsertChannelExtraTypeParams{
1545+
ChannelID: dbChanID,
1546+
Type: int64(tlvType),
1547+
Value: value,
1548+
},
1549+
)
1550+
if err != nil {
1551+
return nil, fmt.Errorf("unable to upsert "+
1552+
"channel(%d) extra signed field(%v): %w",
1553+
edge.ChannelID, tlvType, err)
1554+
}
1555+
}
1556+
1557+
return &dbChanInfo{
1558+
channelID: dbChanID,
1559+
node1ID: node1DBID,
1560+
node2ID: node2DBID,
1561+
}, nil
1562+
}

graph/db/sql_migration_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,6 @@ func TestMigrateGraphToSQL(t *testing.T) {
225225
numNodes: 4,
226226
numChannels: 3,
227227
},
228-
expNotRetrySafety: true,
229228
},
230229
{
231230
name: "channels and policies",

graph/db/sql_store.go

Lines changed: 16 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ type SQLQueries interface {
112112
GetSCIDByOutpoint(ctx context.Context, arg sqlc.GetSCIDByOutpointParams) ([]byte, error)
113113
DeleteChannels(ctx context.Context, ids []int64) error
114114

115-
CreateChannelExtraType(ctx context.Context, arg sqlc.CreateChannelExtraTypeParams) error
115+
UpsertChannelExtraType(ctx context.Context, arg sqlc.UpsertChannelExtraTypeParams) error
116116
GetChannelExtrasBatch(ctx context.Context, chanIds []int64) ([]sqlc.GraphChannelExtraType, error)
117117
InsertChannelFeature(ctx context.Context, arg sqlc.InsertChannelFeatureParams) error
118118
GetChannelFeaturesBatch(ctx context.Context, chanIds []int64) ([]sqlc.GraphChannelFeature, error)
@@ -163,6 +163,7 @@ type SQLQueries interface {
163163
structs.
164164
*/
165165
InsertNodeMig(ctx context.Context, arg sqlc.InsertNodeMigParams) (int64, error)
166+
InsertChannelMig(ctx context.Context, arg sqlc.InsertChannelMigParams) (int64, error)
166167
}
167168

168169
// BatchedSQLQueries is a version of SQLQueries that's capable of batched
@@ -627,9 +628,7 @@ func (s *SQLStore) AddChannelEdge(ctx context.Context,
627628
err)
628629
}
629630

630-
_, err = insertChannel(ctx, tx, edge)
631-
632-
return err
631+
return insertChannel(ctx, tx, edge)
633632
},
634633
OnCommit: func(err error) error {
635634
switch {
@@ -3799,28 +3798,20 @@ func marshalExtraOpaqueData(data []byte) (map[uint64][]byte, error) {
37993798
return records, nil
38003799
}
38013800

3802-
// dbChanInfo holds the DB level IDs of a channel and the nodes involved in the
3803-
// channel.
3804-
type dbChanInfo struct {
3805-
channelID int64
3806-
node1ID int64
3807-
node2ID int64
3808-
}
3809-
38103801
// insertChannel inserts a new channel record into the database.
38113802
func insertChannel(ctx context.Context, db SQLQueries,
3812-
edge *models.ChannelEdgeInfo) (*dbChanInfo, error) {
3803+
edge *models.ChannelEdgeInfo) error {
38133804

38143805
// Make sure that at least a "shell" entry for each node is present in
38153806
// the nodes table.
38163807
node1DBID, err := maybeCreateShellNode(ctx, db, edge.NodeKey1Bytes)
38173808
if err != nil {
3818-
return nil, fmt.Errorf("unable to create shell node: %w", err)
3809+
return fmt.Errorf("unable to create shell node: %w", err)
38193810
}
38203811

38213812
node2DBID, err := maybeCreateShellNode(ctx, db, edge.NodeKey2Bytes)
38223813
if err != nil {
3823-
return nil, fmt.Errorf("unable to create shell node: %w", err)
3814+
return fmt.Errorf("unable to create shell node: %w", err)
38243815
}
38253816

38263817
var capacity sql.NullInt64
@@ -3851,7 +3842,7 @@ func insertChannel(ctx context.Context, db SQLQueries,
38513842
// Insert the new channel record.
38523843
dbChanID, err := db.CreateChannel(ctx, createParams)
38533844
if err != nil {
3854-
return nil, err
3845+
return err
38553846
}
38563847

38573848
// Insert any channel features.
@@ -3863,38 +3854,34 @@ func insertChannel(ctx context.Context, db SQLQueries,
38633854
},
38643855
)
38653856
if err != nil {
3866-
return nil, fmt.Errorf("unable to insert channel(%d) "+
3857+
return fmt.Errorf("unable to insert channel(%d) "+
38673858
"feature(%v): %w", dbChanID, feature, err)
38683859
}
38693860
}
38703861

38713862
// Finally, insert any extra TLV fields in the channel announcement.
38723863
extra, err := marshalExtraOpaqueData(edge.ExtraOpaqueData)
38733864
if err != nil {
3874-
return nil, fmt.Errorf("unable to marshal extra opaque "+
3875-
"data: %w", err)
3865+
return fmt.Errorf("unable to marshal extra opaque data: %w",
3866+
err)
38763867
}
38773868

38783869
for tlvType, value := range extra {
3879-
err := db.CreateChannelExtraType(
3880-
ctx, sqlc.CreateChannelExtraTypeParams{
3870+
err := db.UpsertChannelExtraType(
3871+
ctx, sqlc.UpsertChannelExtraTypeParams{
38813872
ChannelID: dbChanID,
38823873
Type: int64(tlvType),
38833874
Value: value,
38843875
},
38853876
)
38863877
if err != nil {
3887-
return nil, fmt.Errorf("unable to upsert "+
3888-
"channel(%d) extra signed field(%v): %w",
3889-
edge.ChannelID, tlvType, err)
3878+
return fmt.Errorf("unable to upsert channel(%d) "+
3879+
"extra signed field(%v): %w", edge.ChannelID,
3880+
tlvType, err)
38903881
}
38913882
}
38923883

3893-
return &dbChanInfo{
3894-
channelID: dbChanID,
3895-
node1ID: node1DBID,
3896-
node2ID: node2DBID,
3897-
}, nil
3884+
return nil
38983885
}
38993886

39003887
// maybeCreateShellNode checks if a shell node entry exists for the

0 commit comments

Comments
 (0)