Skip to content

Commit d5d151c

Browse files
authored
Merge pull request #10462 from ziggie1984/fix-prune-node-race
channeldb: fix race condition in link node pruning
2 parents 99d07fd + d9fb909 commit d5d151c

File tree

4 files changed

+498
-35
lines changed

4 files changed

+498
-35
lines changed

channeldb/db.go

Lines changed: 147 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1363,11 +1363,7 @@ func (c *ChannelStateDB) FetchClosedChannelForID(cid lnwire.ChannelID) (
13631363
// the pending funds in a channel that has been forcibly closed have been
13641364
// swept.
13651365
func (c *ChannelStateDB) MarkChanFullyClosed(chanPoint *wire.OutPoint) error {
1366-
var (
1367-
openChannels []*OpenChannel
1368-
pruneLinkNode *btcec.PublicKey
1369-
)
1370-
err := kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
1366+
return kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
13711367
var b bytes.Buffer
13721368
if err := graphdb.WriteOutpoint(&b, chanPoint); err != nil {
13731369
return err
@@ -1413,44 +1409,72 @@ func (c *ChannelStateDB) MarkChanFullyClosed(chanPoint *wire.OutPoint) error {
14131409
// other open channels with this peer. If we don't we'll
14141410
// garbage collect it to ensure we don't establish persistent
14151411
// connections to peers without open channels.
1416-
pruneLinkNode = chanSummary.RemotePub
1417-
openChannels, err = c.fetchOpenChannels(
1418-
tx, pruneLinkNode,
1419-
)
1412+
remotePub := chanSummary.RemotePub
1413+
openChannels, err := c.fetchOpenChannels(tx, remotePub)
14201414
if err != nil {
14211415
return fmt.Errorf("unable to fetch open channels for "+
14221416
"peer %x: %v",
1423-
pruneLinkNode.SerializeCompressed(), err)
1417+
remotePub.SerializeCompressed(), err)
14241418
}
14251419

1426-
return nil
1427-
}, func() {
1428-
openChannels = nil
1429-
pruneLinkNode = nil
1430-
})
1431-
if err != nil {
1432-
return err
1433-
}
1420+
if len(openChannels) > 0 {
1421+
return nil
1422+
}
1423+
1424+
// If there are no open channels with this peer, prune the
1425+
// link node. We do this within the same transaction to avoid
1426+
// a race condition where a new channel could be opened
1427+
// between this check and the deletion.
1428+
log.Infof("Pruning link node %x with zero open "+
1429+
"channels from database",
1430+
remotePub.SerializeCompressed())
1431+
1432+
err = deleteLinkNode(tx, remotePub)
1433+
if err != nil {
1434+
return fmt.Errorf("unable to delete link "+
1435+
"node: %w", err)
1436+
}
14341437

1435-
// Decide whether we want to remove the link node, based upon the number
1436-
// of still open channels.
1437-
return c.pruneLinkNode(openChannels, pruneLinkNode)
1438+
return nil
1439+
}, func() {})
14381440
}
14391441

14401442
// pruneLinkNode determines whether we should garbage collect a link node from
1441-
// the database due to no longer having any open channels with it. If there are
1442-
// any left, then this acts as a no-op.
1443-
func (c *ChannelStateDB) pruneLinkNode(openChannels []*OpenChannel,
1444-
remotePub *btcec.PublicKey) error {
1443+
// the database due to no longer having any open channels with it.
1444+
//
1445+
// NOTE: This function should be called after an initial check shows no open
1446+
// channels exist. It will double-check within a write transaction to avoid a
1447+
// race condition where a channel could be opened between the initial check
1448+
// and the deletion.
1449+
func (c *ChannelStateDB) pruneLinkNode(remotePub *btcec.PublicKey) error {
1450+
return kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
1451+
// Double-check for open channels to avoid deleting a link node
1452+
// if a channel was opened since the caller's initial check.
1453+
//
1454+
// NOTE: This avoids a race condition where a channel could be
1455+
// opened between the initial check and the deletion.
1456+
openChannels, err := c.fetchOpenChannels(tx, remotePub)
1457+
if err != nil {
1458+
return err
1459+
}
14451460

1446-
if len(openChannels) > 0 {
1447-
return nil
1448-
}
1461+
// If channels exist now, don't prune.
1462+
if len(openChannels) > 0 {
1463+
return nil
1464+
}
14491465

1450-
log.Infof("Pruning link node %x with zero open channels from database",
1451-
remotePub.SerializeCompressed())
1466+
// No open channels, safe to prune the link node.
1467+
log.Infof("Pruning link node %x with zero open channels "+
1468+
"from database",
1469+
remotePub.SerializeCompressed())
14521470

1453-
return c.linkNodeDB.DeleteLinkNode(remotePub)
1471+
err = deleteLinkNode(tx, remotePub)
1472+
if err != nil {
1473+
return fmt.Errorf("unable to prune link node: %w", err)
1474+
}
1475+
1476+
return nil
1477+
}, func() {})
14541478
}
14551479

14561480
// PruneLinkNodes attempts to prune all link nodes found within the database
@@ -1479,12 +1503,103 @@ func (c *ChannelStateDB) PruneLinkNodes() error {
14791503
return err
14801504
}
14811505

1482-
err = c.pruneLinkNode(openChannels, linkNode.IdentityPub)
1506+
if len(openChannels) > 0 {
1507+
continue
1508+
}
1509+
1510+
err = c.pruneLinkNode(linkNode.IdentityPub)
1511+
if err != nil {
1512+
return err
1513+
}
1514+
}
1515+
1516+
return nil
1517+
}
1518+
1519+
// RepairLinkNodes scans all channels in the database and ensures that a
1520+
// link node exists for each remote peer. This should be called on startup to
1521+
// ensure that our database is consistent.
1522+
//
1523+
// NOTE: This function is designed to repair database inconsistencies that may
1524+
// have occurred due to the race condition in link node pruning (where link
1525+
// nodes could be incorrectly deleted while channels still existed). This can
1526+
// be removed once we move to native sql.
1527+
func (c *ChannelStateDB) RepairLinkNodes(network wire.BitcoinNet) error {
1528+
// In a single read transaction, build a list of all peers with open
1529+
// channels and check which ones are missing link nodes.
1530+
var missingPeers []*btcec.PublicKey
1531+
1532+
err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
1533+
openChanBucket := tx.ReadBucket(openChannelBucket)
1534+
if openChanBucket == nil {
1535+
return ErrNoActiveChannels
1536+
}
1537+
1538+
var peersWithChannels []*btcec.PublicKey
1539+
1540+
err := openChanBucket.ForEach(func(nodePubBytes,
1541+
_ []byte) error {
1542+
1543+
nodePub, err := btcec.ParsePubKey(nodePubBytes)
1544+
if err != nil {
1545+
return err
1546+
}
1547+
1548+
channels, err := c.fetchOpenChannels(tx, nodePub)
1549+
if err != nil {
1550+
return err
1551+
}
1552+
1553+
if len(channels) > 0 {
1554+
peersWithChannels = append(
1555+
peersWithChannels, nodePub,
1556+
)
1557+
}
1558+
1559+
return nil
1560+
})
14831561
if err != nil {
14841562
return err
14851563
}
1564+
1565+
// Now check which peers are missing link nodes within the
1566+
// same transaction.
1567+
missingPeers, err = c.linkNodeDB.FindMissingLinkNodes(
1568+
tx, peersWithChannels,
1569+
)
1570+
1571+
return err
1572+
}, func() {
1573+
missingPeers = nil
1574+
})
1575+
if err != nil && !errors.Is(err, ErrNoActiveChannels) {
1576+
return fmt.Errorf("unable to fetch channels: %w", err)
14861577
}
14871578

1579+
// Early exit if no repairs needed.
1580+
if len(missingPeers) == 0 {
1581+
return nil
1582+
}
1583+
1584+
// Create all missing link nodes in a single write transaction
1585+
// using the LinkNodeDB abstraction.
1586+
linkNodesToCreate := make([]*LinkNode, 0, len(missingPeers))
1587+
for _, remotePub := range missingPeers {
1588+
linkNode := NewLinkNode(c.linkNodeDB, network, remotePub)
1589+
linkNodesToCreate = append(linkNodesToCreate, linkNode)
1590+
1591+
log.Infof("Repairing missing link node for peer %x",
1592+
remotePub.SerializeCompressed())
1593+
}
1594+
1595+
err = c.linkNodeDB.CreateLinkNodes(nil, linkNodesToCreate)
1596+
if err != nil {
1597+
return err
1598+
}
1599+
1600+
log.Infof("Repaired %d missing link nodes on startup",
1601+
len(missingPeers))
1602+
14881603
return nil
14891604
}
14901605

channeldb/nodes.go

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package channeldb
22

33
import (
44
"bytes"
5+
"errors"
6+
"fmt"
57
"io"
68
"net"
79
"time"
@@ -134,6 +136,95 @@ type LinkNodeDB struct {
134136
backend kvdb.Backend
135137
}
136138

139+
// FindMissingLinkNodes checks which of the provided public keys do not have
140+
// corresponding link nodes in the database. If tx is nil, a new read
141+
// transaction will be created. Otherwise, the provided transaction is used,
142+
// allowing this to be part of a larger batch operation.
143+
func (l *LinkNodeDB) FindMissingLinkNodes(tx kvdb.RTx,
144+
pubKeys []*btcec.PublicKey) ([]*btcec.PublicKey, error) {
145+
146+
var missing []*btcec.PublicKey
147+
148+
findMissing := func(readTx kvdb.RTx) error {
149+
nodeMetaBucket := readTx.ReadBucket(nodeInfoBucket)
150+
if nodeMetaBucket == nil {
151+
// If the bucket doesn't exist, all peers are missing.
152+
missing = pubKeys
153+
return nil
154+
}
155+
156+
for _, pubKey := range pubKeys {
157+
_, err := fetchLinkNode(readTx, pubKey)
158+
if err == nil {
159+
// Link node exists.
160+
continue
161+
}
162+
163+
if !errors.Is(err, ErrNodeNotFound) {
164+
return fmt.Errorf("unable to check link node "+
165+
"for peer %x: %w",
166+
pubKey.SerializeCompressed(), err)
167+
}
168+
169+
// Link node doesn't exist.
170+
missing = append(missing, pubKey)
171+
}
172+
173+
return nil
174+
}
175+
176+
// If no transaction provided, create our own.
177+
if tx == nil {
178+
err := kvdb.View(l.backend, findMissing, func() {
179+
missing = nil
180+
})
181+
182+
return missing, err
183+
}
184+
185+
// Use the provided transaction.
186+
err := findMissing(tx)
187+
188+
return missing, err
189+
}
190+
191+
// CreateLinkNodes creates multiple link nodes. If tx is nil, a new write
192+
// transaction will be created. Otherwise, the provided transaction is used,
193+
// allowing this to be part of a larger batch operation.
194+
func (l *LinkNodeDB) CreateLinkNodes(tx kvdb.RwTx,
195+
linkNodes []*LinkNode) error {
196+
197+
createNodes := func(writeTx kvdb.RwTx) error {
198+
nodeMetaBucket, err := writeTx.CreateTopLevelBucket(
199+
nodeInfoBucket,
200+
)
201+
if err != nil {
202+
return err
203+
}
204+
205+
for _, linkNode := range linkNodes {
206+
err := putLinkNode(nodeMetaBucket, linkNode)
207+
if err != nil {
208+
pubKey := linkNode.IdentityPub.
209+
SerializeCompressed()
210+
211+
return fmt.Errorf("unable to create link "+
212+
"node for peer %x: %w", pubKey, err)
213+
}
214+
}
215+
216+
return nil
217+
}
218+
219+
// If no transaction provided, create our own.
220+
if tx == nil {
221+
return kvdb.Update(l.backend, createNodes, func() {})
222+
}
223+
224+
// Use the provided transaction.
225+
return createNodes(tx)
226+
}
227+
137228
// DeleteLinkNode removes the link node with the given identity from the
138229
// database.
139230
func (l *LinkNodeDB) DeleteLinkNode(identity *btcec.PublicKey) error {

0 commit comments

Comments
 (0)