Skip to content

Commit 22bf88e

Browse files
committed
graph/db+sqldb: make policy migration idempotent
Finally, we make the channel-policy part of the SQL migration idempotent by adding a migration-only policy insert query which will not error out if the policy already exists and does not have a timestamp that is newer than the existing records timestamp. To keep the commit simple, a insertChanEdgePolicyMig function is added which is basically identical to the updateChanEdgePolicy function except for the fact that it uses the newly added query. In the next commit, it will be simplified even more.
1 parent 8736fca commit 22bf88e

File tree

6 files changed

+221
-23
lines changed

6 files changed

+221
-23
lines changed

graph/db/sql_migration.go

Lines changed: 100 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -454,7 +454,7 @@ func migrateChannelsAndPolicies(ctx context.Context, cfg *SQLStoreConfig,
454454

455455
policyCount++
456456

457-
_, _, _, err := updateChanEdgePolicy(ctx, sqlDB, policy)
457+
_, _, _, err := insertChanEdgePolicyMig(ctx, sqlDB, policy)
458458
if err != nil {
459459
return fmt.Errorf("could not migrate channel "+
460460
"policy %d: %w", policy.ChannelID, err)
@@ -1560,3 +1560,102 @@ func insertChannelMig(ctx context.Context, db SQLQueries,
15601560
node2ID: node2DBID,
15611561
}, nil
15621562
}
1563+
1564+
// insertChanEdgePolicyMig inserts the channel policy info we have stored for
1565+
// a channel we already know of. This is used during the SQL migration
1566+
// process to insert channel policies.
1567+
//
1568+
// TODO(elle): update this function to be more performant in the migration
1569+
// setting. For the sake of keeping the commit that introduced this function
1570+
// simple, this is for now mostly the same as updateChanEdgePolicy.
1571+
func insertChanEdgePolicyMig(ctx context.Context, tx SQLQueries,
1572+
edge *models.ChannelEdgePolicy) (route.Vertex, route.Vertex, bool,
1573+
error) {
1574+
1575+
var (
1576+
node1Pub, node2Pub route.Vertex
1577+
isNode1 bool
1578+
chanIDB = channelIDToBytes(edge.ChannelID)
1579+
)
1580+
1581+
// Check that this edge policy refers to a channel that we already
1582+
// know of. We do this explicitly so that we can return the appropriate
1583+
// ErrEdgeNotFound error if the channel doesn't exist, rather than
1584+
// abort the transaction which would abort the entire batch.
1585+
dbChan, err := tx.GetChannelAndNodesBySCID(
1586+
ctx, sqlc.GetChannelAndNodesBySCIDParams{
1587+
Scid: chanIDB,
1588+
Version: int16(ProtocolV1),
1589+
},
1590+
)
1591+
if errors.Is(err, sql.ErrNoRows) {
1592+
return node1Pub, node2Pub, false, ErrEdgeNotFound
1593+
} else if err != nil {
1594+
return node1Pub, node2Pub, false, fmt.Errorf("unable to "+
1595+
"fetch channel(%v): %w", edge.ChannelID, err)
1596+
}
1597+
1598+
copy(node1Pub[:], dbChan.Node1PubKey)
1599+
copy(node2Pub[:], dbChan.Node2PubKey)
1600+
1601+
// Figure out which node this edge is from.
1602+
isNode1 = edge.ChannelFlags&lnwire.ChanUpdateDirection == 0
1603+
nodeID := dbChan.NodeID1
1604+
if !isNode1 {
1605+
nodeID = dbChan.NodeID2
1606+
}
1607+
1608+
var (
1609+
inboundBase sql.NullInt64
1610+
inboundRate sql.NullInt64
1611+
)
1612+
edge.InboundFee.WhenSome(func(fee lnwire.Fee) {
1613+
inboundRate = sqldb.SQLInt64(fee.FeeRate)
1614+
inboundBase = sqldb.SQLInt64(fee.BaseFee)
1615+
})
1616+
1617+
id, err := tx.InsertEdgePolicyMig(ctx, sqlc.InsertEdgePolicyMigParams{
1618+
Version: int16(ProtocolV1),
1619+
ChannelID: dbChan.ID,
1620+
NodeID: nodeID,
1621+
Timelock: int32(edge.TimeLockDelta),
1622+
FeePpm: int64(edge.FeeProportionalMillionths),
1623+
BaseFeeMsat: int64(edge.FeeBaseMSat),
1624+
MinHtlcMsat: int64(edge.MinHTLC),
1625+
LastUpdate: sqldb.SQLInt64(edge.LastUpdate.Unix()),
1626+
Disabled: sql.NullBool{
1627+
Valid: true,
1628+
Bool: edge.IsDisabled(),
1629+
},
1630+
MaxHtlcMsat: sql.NullInt64{
1631+
Valid: edge.MessageFlags.HasMaxHtlc(),
1632+
Int64: int64(edge.MaxHTLC),
1633+
},
1634+
MessageFlags: sqldb.SQLInt16(edge.MessageFlags),
1635+
ChannelFlags: sqldb.SQLInt16(edge.ChannelFlags),
1636+
InboundBaseFeeMsat: inboundBase,
1637+
InboundFeeRateMilliMsat: inboundRate,
1638+
Signature: edge.SigBytes,
1639+
})
1640+
if err != nil {
1641+
return node1Pub, node2Pub, isNode1,
1642+
fmt.Errorf("unable to upsert edge policy: %w", err)
1643+
}
1644+
1645+
// Convert the flat extra opaque data into a map of TLV types to
1646+
// values.
1647+
extra, err := marshalExtraOpaqueData(edge.ExtraOpaqueData)
1648+
if err != nil {
1649+
return node1Pub, node2Pub, false, fmt.Errorf("unable to "+
1650+
"marshal extra opaque data: %w", err)
1651+
}
1652+
1653+
// Update the channel policy's extra signed fields.
1654+
err = upsertChanPolicyExtraSignedFields(ctx, tx, id, extra)
1655+
if err != nil {
1656+
return node1Pub, node2Pub, false, fmt.Errorf("inserting chan "+
1657+
"policy extra TLVs: %w", err)
1658+
}
1659+
1660+
return node1Pub, node2Pub, isNode1, nil
1661+
}

graph/db/sql_migration_test.go

Lines changed: 4 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -105,15 +105,6 @@ func TestMigrateGraphToSQL(t *testing.T) {
105105
write func(t *testing.T, db *KVStore, object any)
106106
objects []any
107107
expGraphStats graphStats
108-
109-
// expNotRetrySafety is true if we expect an error to occur for
110-
// the test if the migration is run twice. In other-words, if
111-
// the specific case in question is currently not idempotent.
112-
//
113-
// NOTE: we want _all_ the cases here to be idempotent, so this
114-
// is a temporary field which will be removed once we have
115-
// properly made the migration retry-safe.
116-
expNotRetrySafety bool
117108
}{
118109
{
119110
name: "empty",
@@ -297,7 +288,6 @@ func TestMigrateGraphToSQL(t *testing.T) {
297288
numChannels: 3,
298289
numPolicies: 3,
299290
},
300-
expNotRetrySafety: true,
301291
},
302292
{
303293
name: "prune log",
@@ -409,19 +399,11 @@ func TestMigrateGraphToSQL(t *testing.T) {
409399
// Validate that the two databases are now in sync.
410400
assertInSync(t, kvDB, sql, test.expGraphStats)
411401

412-
// NOTE: for now, not all the cases in the test are
413-
// retry safe! The aim is to completely remove this
414-
// field once we have made the migration retry-safe.
402+
// The migration should be retry-safe, so running it
403+
// again should not change the state of the databases.
415404
err = MigrateGraphToSQL(ctx, sql.cfg, kvDB.db, sql.db)
416-
if !test.expNotRetrySafety {
417-
// The migration should be retry-safe, so
418-
// running it again should not change the state
419-
// of the databases.
420-
require.NoError(t, err)
421-
assertInSync(t, kvDB, sql, test.expGraphStats)
422-
} else {
423-
require.Error(t, err)
424-
}
405+
require.NoError(t, err)
406+
assertInSync(t, kvDB, sql, test.expGraphStats)
425407
})
426408
}
427409
}

graph/db/sql_store.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,7 @@ type SQLQueries interface {
164164
*/
165165
InsertNodeMig(ctx context.Context, arg sqlc.InsertNodeMigParams) (int64, error)
166166
InsertChannelMig(ctx context.Context, arg sqlc.InsertChannelMigParams) (int64, error)
167+
InsertEdgePolicyMig(ctx context.Context, arg sqlc.InsertEdgePolicyMigParams) (int64, error)
167168
}
168169

169170
// BatchedSQLQueries is a version of SQLQueries that's capable of batched

sqldb/sqlc/graph.sql.go

Lines changed: 76 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

sqldb/sqlc/querier.go

Lines changed: 6 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

sqldb/sqlc/queries/graph.sql

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1064,3 +1064,37 @@ INSERT INTO graph_channels (
10641064
bitcoin_1_signature = EXCLUDED.bitcoin_1_signature,
10651065
bitcoin_2_signature = EXCLUDED.bitcoin_2_signature
10661066
RETURNING id;
1067+
1068+
-- NOTE: This query is only meant to be used by the graph SQL migration since
1069+
-- for that migration, in order to be retry-safe, we don't want to error out if
1070+
-- we re-insert the same policy (which would error if the normal
1071+
-- UpsertEdgePolicy query is used because of the constraint in that query that
1072+
-- requires a policy update to have a newer last_update than the existing one).
1073+
-- name: InsertEdgePolicyMig :one
1074+
INSERT INTO graph_channel_policies (
1075+
version, channel_id, node_id, timelock, fee_ppm,
1076+
base_fee_msat, min_htlc_msat, last_update, disabled,
1077+
max_htlc_msat, inbound_base_fee_msat,
1078+
inbound_fee_rate_milli_msat, message_flags, channel_flags,
1079+
signature
1080+
) VALUES (
1081+
$1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15
1082+
)
1083+
ON CONFLICT (channel_id, node_id, version)
1084+
-- If a conflict occurs, we have already migrated this policy. However, we
1085+
-- still need to do an "UPDATE SET" here instead of "DO NOTHING" because
1086+
-- otherwise, the "RETURNING id" part does not work.
1087+
DO UPDATE SET
1088+
timelock = EXCLUDED.timelock,
1089+
fee_ppm = EXCLUDED.fee_ppm,
1090+
base_fee_msat = EXCLUDED.base_fee_msat,
1091+
min_htlc_msat = EXCLUDED.min_htlc_msat,
1092+
last_update = EXCLUDED.last_update,
1093+
disabled = EXCLUDED.disabled,
1094+
max_htlc_msat = EXCLUDED.max_htlc_msat,
1095+
inbound_base_fee_msat = EXCLUDED.inbound_base_fee_msat,
1096+
inbound_fee_rate_milli_msat = EXCLUDED.inbound_fee_rate_milli_msat,
1097+
message_flags = EXCLUDED.message_flags,
1098+
channel_flags = EXCLUDED.channel_flags,
1099+
signature = EXCLUDED.signature
1100+
RETURNING id;

0 commit comments

Comments
 (0)