Skip to content

Commit 2a7425c

Browse files
authored
fix: use a per-peer lock (#3162)
Instead of locking the whole peer store on a multi-read/single-write basis, make the lock more granular and use a per-peer-id lock.
1 parent d91ae66 commit 2a7425c

File tree

6 files changed

+133
-42
lines changed

6 files changed

+133
-42
lines changed

packages/peer-store/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
"dependencies": {
5151
"@libp2p/crypto": "^5.1.4",
5252
"@libp2p/interface": "^2.10.2",
53+
"@libp2p/peer-collections": "^6.0.30",
5354
"@libp2p/peer-id": "^5.1.5",
5455
"@libp2p/peer-record": "^8.0.30",
5556
"@multiformats/multiaddr": "^12.4.0",

packages/peer-store/src/index.ts

Lines changed: 16 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import { RecordEnvelope, PeerRecord } from '@libp2p/peer-record'
1010
import all from 'it-all'
1111
import { PersistentStore } from './store.js'
1212
import type { PeerUpdate } from './store.js'
13-
import type { ComponentLogger, Libp2pEvents, Logger, TypedEventTarget, PeerId, PeerStore, Peer, PeerData, PeerQuery, PeerInfo, AbortOptions, ConsumePeerRecordOptions } from '@libp2p/interface'
13+
import type { ComponentLogger, Libp2pEvents, Logger, TypedEventTarget, PeerId, PeerStore, Peer, PeerData, PeerQuery, PeerInfo, AbortOptions, ConsumePeerRecordOptions, Metrics } from '@libp2p/interface'
1414
import type { Multiaddr } from '@multiformats/multiaddr'
1515
import type { Datastore } from 'interface-datastore'
1616

@@ -19,6 +19,7 @@ export interface PersistentPeerStoreComponents {
1919
datastore: Datastore
2020
events: TypedEventTarget<Libp2pEvents>
2121
logger: ComponentLogger
22+
metrics?: Metrics
2223
}
2324

2425
/**
@@ -75,29 +76,17 @@ class PersistentPeerStore implements PeerStore {
7576
readonly [Symbol.toStringTag] = '@libp2p/peer-store'
7677

7778
async forEach (fn: (peer: Peer,) => void, query?: PeerQuery): Promise<void> {
78-
const release = await this.store.lock.readLock(query)
79-
80-
try {
81-
for await (const peer of this.store.all(query)) {
82-
fn(peer)
83-
}
84-
} finally {
85-
release()
79+
for await (const peer of this.store.all(query)) {
80+
fn(peer)
8681
}
8782
}
8883

8984
async all (query?: PeerQuery): Promise<Peer[]> {
90-
const release = await this.store.lock.readLock(query)
91-
92-
try {
93-
return await all(this.store.all(query))
94-
} finally {
95-
release()
96-
}
85+
return all(this.store.all(query))
9786
}
9887

9988
async delete (peerId: PeerId, options?: AbortOptions): Promise<void> {
100-
const release = await this.store.lock.writeLock(options)
89+
const release = await this.store.getReadLock(peerId, options)
10190

10291
try {
10392
await this.store.delete(peerId, options)
@@ -107,23 +96,23 @@ class PersistentPeerStore implements PeerStore {
10796
}
10897

10998
async has (peerId: PeerId, options?: AbortOptions): Promise<boolean> {
110-
const release = await this.store.lock.readLock(options)
99+
const release = await this.store.getReadLock(peerId, options)
111100

112101
try {
113102
return await this.store.has(peerId, options)
114103
} finally {
115104
this.log.trace('has release read lock')
116-
release()
105+
release?.()
117106
}
118107
}
119108

120109
async get (peerId: PeerId, options?: AbortOptions): Promise<Peer> {
121-
const release = await this.store.lock.readLock(options)
110+
const release = await this.store.getReadLock(peerId, options)
122111

123112
try {
124113
return await this.store.load(peerId, options)
125114
} finally {
126-
release()
115+
release?.()
127116
}
128117
}
129118

@@ -137,7 +126,7 @@ class PersistentPeerStore implements PeerStore {
137126
}
138127

139128
async save (id: PeerId, data: PeerData, options?: AbortOptions): Promise<Peer> {
140-
const release = await this.store.lock.writeLock(options)
129+
const release = await this.store.getWriteLock(id, options)
141130

142131
try {
143132
const result = await this.store.save(id, data, options)
@@ -146,12 +135,12 @@ class PersistentPeerStore implements PeerStore {
146135

147136
return result.peer
148137
} finally {
149-
release()
138+
release?.()
150139
}
151140
}
152141

153142
async patch (id: PeerId, data: PeerData, options?: AbortOptions): Promise<Peer> {
154-
const release = await this.store.lock.writeLock(options)
143+
const release = await this.store.getWriteLock(id, options)
155144

156145
try {
157146
const result = await this.store.patch(id, data, options)
@@ -160,12 +149,12 @@ class PersistentPeerStore implements PeerStore {
160149

161150
return result.peer
162151
} finally {
163-
release()
152+
release?.()
164153
}
165154
}
166155

167156
async merge (id: PeerId, data: PeerData, options?: AbortOptions): Promise<Peer> {
168-
const release = await this.store.lock.writeLock(options)
157+
const release = await this.store.getWriteLock(id, options)
169158

170159
try {
171160
const result = await this.store.merge(id, data, options)
@@ -174,7 +163,7 @@ class PersistentPeerStore implements PeerStore {
174163

175164
return result.peer
176165
} finally {
177-
release()
166+
release?.()
178167
}
179168
}
180169

packages/peer-store/src/store.ts

Lines changed: 75 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { NotFoundError } from '@libp2p/interface'
2+
import { PeerMap, trackedPeerMap } from '@libp2p/peer-collections'
23
import { peerIdFromCID } from '@libp2p/peer-id'
34
import mortice from 'mortice'
45
import { base32 } from 'multiformats/bases/base32'
@@ -13,7 +14,7 @@ import type { AddressFilter, PersistentPeerStoreComponents, PersistentPeerStoreI
1314
import type { PeerUpdate as PeerUpdateExternal, PeerId, Peer, PeerData, PeerQuery, Logger } from '@libp2p/interface'
1415
import type { AbortOptions } from '@multiformats/multiaddr'
1516
import type { Datastore, Key, Query } from 'interface-datastore'
16-
import type { Mortice } from 'mortice'
17+
import type { Mortice, Release } from 'mortice'
1718

1819
/**
1920
* Event detail emitted when peer data changes
@@ -53,10 +54,15 @@ function mapQuery (query: PeerQuery, maxAddressAge: number): Query {
5354
}
5455
}
5556

57+
export interface Lock {
58+
refs: number
59+
lock: Mortice
60+
}
61+
5662
export class PersistentStore {
5763
private readonly peerId: PeerId
5864
private readonly datastore: Datastore
59-
public readonly lock: Mortice
65+
private locks: PeerMap<Lock>
6066
private readonly addressFilter?: AddressFilter
6167
private readonly log: Logger
6268
private readonly maxAddressAge: number
@@ -67,14 +73,76 @@ export class PersistentStore {
6773
this.peerId = components.peerId
6874
this.datastore = components.datastore
6975
this.addressFilter = init.addressFilter
70-
this.lock = mortice({
71-
name: 'peer-store',
72-
singleProcess: true
76+
this.locks = trackedPeerMap({
77+
name: 'libp2p_peer_store_locks',
78+
metrics: components.metrics
7379
})
7480
this.maxAddressAge = init.maxAddressAge ?? MAX_ADDRESS_AGE
7581
this.maxPeerAge = init.maxPeerAge ?? MAX_PEER_AGE
7682
}
7783

84+
getLock (peerId: PeerId): Lock {
85+
let lock = this.locks.get(peerId)
86+
87+
if (lock == null) {
88+
lock = {
89+
refs: 0,
90+
lock: mortice({
91+
name: peerId.toString(),
92+
singleProcess: true
93+
})
94+
}
95+
96+
this.locks.set(peerId, lock)
97+
}
98+
99+
lock.refs++
100+
101+
return lock
102+
}
103+
104+
private maybeRemoveLock (peerId: PeerId, lock: Lock): void {
105+
lock.refs--
106+
107+
if (lock.refs === 0) {
108+
this.locks.delete(peerId)
109+
}
110+
}
111+
112+
async getReadLock (peerId: PeerId, options?: AbortOptions): Promise<Release> {
113+
const lock = this.getLock(peerId)
114+
115+
try {
116+
const release = await lock.lock.readLock(options)
117+
118+
return () => {
119+
release()
120+
this.maybeRemoveLock(peerId, lock)
121+
}
122+
} catch (err) {
123+
this.maybeRemoveLock(peerId, lock)
124+
125+
throw err
126+
}
127+
}
128+
129+
async getWriteLock (peerId: PeerId, options?: AbortOptions): Promise<Release> {
130+
const lock = this.getLock(peerId)
131+
132+
try {
133+
const release = await lock.lock.writeLock(options)
134+
135+
return () => {
136+
release()
137+
this.maybeRemoveLock(peerId, lock)
138+
}
139+
} catch (err) {
140+
this.maybeRemoveLock(peerId, lock)
141+
142+
throw err
143+
}
144+
}
145+
78146
async has (peerId: PeerId, options?: AbortOptions): Promise<boolean> {
79147
try {
80148
await this.load(peerId, options)
@@ -179,7 +247,7 @@ export class PersistentStore {
179247

180248
return {
181249
peerPB,
182-
peer: bytesToPeer(peerId, buf, this.maxAddressAge)
250+
peer: pbToPeer(peerId, peerPB, this.maxAddressAge)
183251
}
184252
} catch (err: any) {
185253
if (err.name !== 'NotFoundError') {
@@ -196,7 +264,7 @@ export class PersistentStore {
196264
await this.datastore.put(peerIdToDatastoreKey(peerId), buf, options)
197265

198266
return {
199-
peer: bytesToPeer(peerId, buf, this.maxAddressAge),
267+
peer: pbToPeer(peerId, peer, this.maxAddressAge),
200268
previous: existingPeer?.peer,
201269
updated: existingPeer == null || !peerEquals(peer, existingPeer.peerPB)
202270
}

packages/peer-store/src/utils/bytes-to-peer.ts

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,20 @@
11
import { publicKeyFromProtobuf } from '@libp2p/crypto/keys'
22
import { peerIdFromPublicKey } from '@libp2p/peer-id'
33
import { multiaddr } from '@multiformats/multiaddr'
4-
import { base58btc } from 'multiformats/bases/base58'
5-
import * as Digest from 'multiformats/hashes/digest'
64
import { Peer as PeerPB } from '../pb/peer.js'
75
import type { PeerId, Peer, Tag } from '@libp2p/interface'
6+
import type { Digest } from 'multiformats/hashes/digest'
87

98
function populatePublicKey (peerId: PeerId, protobuf: PeerPB): PeerId {
109
if (peerId.publicKey != null || protobuf.publicKey == null) {
1110
return peerId
1211
}
1312

14-
let digest: any
13+
let digest: Digest<18, number> | undefined
1514

1615
if (peerId.type === 'RSA') {
1716
// avoid hashing public key
18-
const multihash = base58btc.decode(`z${peerId}`)
19-
digest = Digest.decode(multihash)
17+
digest = peerId.toMultihash()
2018
}
2119

2220
const publicKey = publicKeyFromProtobuf(protobuf.publicKey, digest)

packages/peer-store/src/utils/to-peer-pb.ts

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -252,8 +252,13 @@ function mapTag (key: string, tag: any): Tag {
252252
expiry = BigInt(Date.now() + Number(tag.ttl))
253253
}
254254

255-
return {
256-
value: tag.value ?? 0,
257-
expiry
255+
const output: Tag = {
256+
value: tag.value ?? 0
258257
}
258+
259+
if (expiry != null) {
260+
output.expiry = expiry
261+
}
262+
263+
return output
259264
}

packages/peer-store/test/index.spec.ts

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -558,4 +558,34 @@ describe('PersistentPeerStore', () => {
558558
]
559559
})
560560
})
561+
562+
it('should allow concurrent merging', async () => {
563+
const peerStore = persistentPeerStore(components)
564+
565+
expect(peerStore).to.have.nested.property('store.locks')
566+
.that.has.property('size', 0)
567+
568+
const p1 = peerStore.merge(otherPeerId, {
569+
multiaddrs: [
570+
multiaddr('/ip4/123.123.123.123/tcp/1234')
571+
]
572+
})
573+
574+
const p2 = peerStore.merge(otherPeerId, {
575+
multiaddrs: [
576+
multiaddr('/ip4/123.123.123.123/tcp/4567')
577+
]
578+
})
579+
580+
expect(peerStore).to.have.nested.property('store.locks')
581+
.that.has.property('size', 1)
582+
583+
await Promise.all([p1, p2])
584+
585+
const peer = await peerStore.get(otherPeerId)
586+
587+
expect(peer.addresses).to.have.lengthOf(2)
588+
expect(peerStore).to.have.nested.property('store.locks')
589+
.that.has.property('size', 0)
590+
})
561591
})

0 commit comments

Comments
 (0)