Skip to content

Commit 1831ffa

Browse files
committed
graphdb: fix potential sql tx exhaustion
We should avoid taking the lock of a mutex inside transaction. Currently we also take this lock in other places and there is a chance that in case the application lock aquires the lock but all transactions are already blocked waiting for the mutex to unlock, we end up in a deadlock.
1 parent 0aa757b commit 1831ffa

File tree

2 files changed

+17
-8
lines changed

2 files changed

+17
-8
lines changed

graph/db/kv_store.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2116,6 +2116,13 @@ func (c *KVStore) fetchNextChanUpdateBatch(
21162116
batch []ChannelEdge
21172117
hasMore bool
21182118
)
2119+
2120+
// Acquire read lock before starting transaction to ensure
2121+
// consistent lock ordering (cacheMu -> DB) and prevent
2122+
// deadlock with write operations.
2123+
c.cacheMu.RLock()
2124+
defer c.cacheMu.RUnlock()
2125+
21192126
err := kvdb.View(c.db, func(tx kvdb.RTx) error {
21202127
edges := tx.ReadBucket(edgeBucket)
21212128
if edges == nil {
@@ -2195,9 +2202,7 @@ func (c *KVStore) fetchNextChanUpdateBatch(
21952202
continue
21962203
}
21972204

2198-
// Before we read the edge info, we'll see if this
2199-
// element is already in the cache or not.
2200-
c.cacheMu.RLock()
2205+
// Check cache (we already hold RLock).
22012206
if channel, ok := c.chanCache.get(chanIDInt); ok {
22022207
state.edgesSeen[chanIDInt] = struct{}{}
22032208

@@ -2208,11 +2213,8 @@ func (c *KVStore) fetchNextChanUpdateBatch(
22082213

22092214
indexKey, _ = updateCursor.Next()
22102215

2211-
c.cacheMu.RUnlock()
2212-
22132216
continue
22142217
}
2215-
c.cacheMu.RUnlock()
22162218

22172219
// The edge wasn't in the cache, so we'll fetch it along
22182220
// w/ the edge policies and nodes.

graph/db/sql_store.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1126,6 +1126,11 @@ func (s *SQLStore) ChanUpdatesInHorizon(startTime, endTime time.Time,
11261126
for hasMore {
11271127
var batch []ChannelEdge
11281128

1129+
// Acquire read lock before starting transaction to ensure
1130+
// consistent lock ordering (cacheMu -> DB) and prevent
1131+
// deadlock with write operations.
1132+
s.cacheMu.RLock()
1133+
11291134
err := s.db.ExecTx(ctx, sqldb.ReadTxOpt(),
11301135
func(db SQLQueries) error {
11311136
//nolint:ll
@@ -1178,11 +1183,10 @@ func (s *SQLStore) ChanUpdatesInHorizon(startTime, endTime time.Time,
11781183
continue
11791184
}
11801185

1181-
s.cacheMu.RLock()
1186+
// Check cache (we already hold RLock).
11821187
channel, ok := s.chanCache.get(
11831188
chanIDInt,
11841189
)
1185-
s.cacheMu.RUnlock()
11861190
if ok {
11871191
hits++
11881192
total++
@@ -1216,6 +1220,9 @@ func (s *SQLStore) ChanUpdatesInHorizon(startTime, endTime time.Time,
12161220
)
12171221
})
12181222

1223+
// Release read lock after transaction completes.
1224+
s.cacheMu.RUnlock()
1225+
12191226
if err != nil {
12201227
log.Errorf("ChanUpdatesInHorizon "+
12211228
"batch error: %v", err)

0 commit comments

Comments
 (0)