Skip to content

Commit c079362

Browse files
committed
graphdb: fix potential sql tx exhaustion
We should avoid taking the lock of a mutex inside transaction. Currently we also take this lock in other places and there is a chance that in case the application lock aquires the lock but all transactions are already blocked waiting for the mutex to unlock, we end up in a deadlock.
1 parent 0aa757b commit c079362

File tree

2 files changed

+18
-8
lines changed

2 files changed

+18
-8
lines changed

graph/db/kv_store.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2116,6 +2116,13 @@ func (c *KVStore) fetchNextChanUpdateBatch(
21162116
batch []ChannelEdge
21172117
hasMore bool
21182118
)
2119+
2120+
// Acquire read lock before starting transaction to ensure
2121+
// consistent lock ordering (cacheMu -> DB) and prevent
2122+
// deadlock with write operations.
2123+
c.cacheMu.RLock()
2124+
defer c.cacheMu.RUnlock()
2125+
21192126
err := kvdb.View(c.db, func(tx kvdb.RTx) error {
21202127
edges := tx.ReadBucket(edgeBucket)
21212128
if edges == nil {
@@ -2195,9 +2202,7 @@ func (c *KVStore) fetchNextChanUpdateBatch(
21952202
continue
21962203
}
21972204

2198-
// Before we read the edge info, we'll see if this
2199-
// element is already in the cache or not.
2200-
c.cacheMu.RLock()
2205+
// Check cache (we already hold shared read lock).
22012206
if channel, ok := c.chanCache.get(chanIDInt); ok {
22022207
state.edgesSeen[chanIDInt] = struct{}{}
22032208

@@ -2208,11 +2213,8 @@ func (c *KVStore) fetchNextChanUpdateBatch(
22082213

22092214
indexKey, _ = updateCursor.Next()
22102215

2211-
c.cacheMu.RUnlock()
2212-
22132216
continue
22142217
}
2215-
c.cacheMu.RUnlock()
22162218

22172219
// The edge wasn't in the cache, so we'll fetch it along
22182220
// w/ the edge policies and nodes.

graph/db/sql_store.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1126,6 +1126,11 @@ func (s *SQLStore) ChanUpdatesInHorizon(startTime, endTime time.Time,
11261126
for hasMore {
11271127
var batch []ChannelEdge
11281128

1129+
// Acquire read lock before starting transaction to
1130+
// ensure consistent lock ordering (cacheMu -> DB) and
1131+
// prevent deadlock with write operations.
1132+
s.cacheMu.RLock()
1133+
11291134
err := s.db.ExecTx(ctx, sqldb.ReadTxOpt(),
11301135
func(db SQLQueries) error {
11311136
//nolint:ll
@@ -1178,11 +1183,11 @@ func (s *SQLStore) ChanUpdatesInHorizon(startTime, endTime time.Time,
11781183
continue
11791184
}
11801185

1181-
s.cacheMu.RLock()
1186+
// Check cache (we already hold
1187+
// shared read lock).
11821188
channel, ok := s.chanCache.get(
11831189
chanIDInt,
11841190
)
1185-
s.cacheMu.RUnlock()
11861191
if ok {
11871192
hits++
11881193
total++
@@ -1216,6 +1221,9 @@ func (s *SQLStore) ChanUpdatesInHorizon(startTime, endTime time.Time,
12161221
)
12171222
})
12181223

1224+
// Release read lock after transaction completes.
1225+
s.cacheMu.RUnlock()
1226+
12191227
if err != nil {
12201228
log.Errorf("ChanUpdatesInHorizon "+
12211229
"batch error: %v", err)

0 commit comments

Comments
 (0)