1
- import { InvalidParametersError } from '@libp2p/interface'
1
+ import { NotFoundError } from '@libp2p/interface'
2
2
import { peerIdFromCID } from '@libp2p/peer-id'
3
3
import mortice , { type Mortice } from 'mortice'
4
4
import { base32 } from 'multiformats/bases/base32'
5
5
import { CID } from 'multiformats/cid'
6
- import { equals as uint8ArrayEquals } from 'uint8arrays/equals '
6
+ import { MAX_ADDRESS_AGE , MAX_PEER_AGE } from './constants.js '
7
7
import { Peer as PeerPB } from './pb/peer.js'
8
- import { bytesToPeer } from './utils/bytes-to-peer.js'
8
+ import { bytesToPeer , pbToPeer } from './utils/bytes-to-peer.js'
9
+ import { peerEquals } from './utils/peer-equals.js'
9
10
import { NAMESPACE_COMMON , peerIdToDatastoreKey } from './utils/peer-id-to-datastore-key.js'
10
11
import { toPeerPB } from './utils/to-peer-pb.js'
11
12
import type { AddressFilter , PersistentPeerStoreComponents , PersistentPeerStoreInit } from './index.js'
@@ -19,27 +20,33 @@ export interface PeerUpdate extends PeerUpdateExternal {
19
20
updated : boolean
20
21
}
21
22
22
- function decodePeer ( key : Key , value : Uint8Array ) : Peer {
23
+ export interface ExistingPeer {
24
+ peerPB : PeerPB
25
+ peer : Peer
26
+ }
27
+
28
+ function keyToPeerId ( key : Key ) : PeerId {
23
29
// /peers/${peer-id-as-libp2p-key-cid-string-in-base-32}
24
30
const base32Str = key . toString ( ) . split ( '/' ) [ 2 ]
25
31
const buf = CID . parse ( base32Str , base32 )
26
- const peerId = peerIdFromCID ( buf )
27
32
28
- return bytesToPeer ( peerId , value )
33
+ return peerIdFromCID ( buf )
29
34
}
30
35
31
- function mapQuery ( query : PeerQuery ) : Query {
32
- if ( query == null ) {
33
- return { }
34
- }
36
+ function decodePeer ( key : Key , value : Uint8Array , maxAddressAge : number ) : Peer {
37
+ const peerId = keyToPeerId ( key )
38
+
39
+ return bytesToPeer ( peerId , value , maxAddressAge )
40
+ }
35
41
42
+ function mapQuery ( query : PeerQuery , maxAddressAge : number ) : Query {
36
43
return {
37
44
prefix : NAMESPACE_COMMON ,
38
45
filters : ( query . filters ?? [ ] ) . map ( fn => ( { key, value } ) => {
39
- return fn ( decodePeer ( key , value ) )
46
+ return fn ( decodePeer ( key , value , maxAddressAge ) )
40
47
} ) ,
41
48
orders : ( query . orders ?? [ ] ) . map ( fn => ( a , b ) => {
42
- return fn ( decodePeer ( a . key , a . value ) , decodePeer ( b . key , b . value ) )
49
+ return fn ( decodePeer ( a . key , a . value , maxAddressAge ) , decodePeer ( b . key , b . value , maxAddressAge ) )
43
50
} )
44
51
}
45
52
}
@@ -50,6 +57,8 @@ export class PersistentStore {
50
57
public readonly lock : Mortice
51
58
private readonly addressFilter ?: AddressFilter
52
59
private readonly log : Logger
60
+ private readonly maxAddressAge : number
61
+ private readonly maxPeerAge : number
53
62
54
63
constructor ( components : PersistentPeerStoreComponents , init : PersistentPeerStoreInit = { } ) {
55
64
this . log = components . logger . forComponent ( 'libp2p:peer-store' )
@@ -60,115 +69,146 @@ export class PersistentStore {
60
69
name : 'peer-store' ,
61
70
singleProcess : true
62
71
} )
72
+ this . maxAddressAge = init . maxAddressAge ?? MAX_ADDRESS_AGE
73
+ this . maxPeerAge = init . maxPeerAge ?? MAX_PEER_AGE
63
74
}
64
75
65
76
async has ( peerId : PeerId ) : Promise < boolean > {
66
- return this . datastore . has ( peerIdToDatastoreKey ( peerId ) )
77
+ try {
78
+ await this . load ( peerId )
79
+
80
+ return true
81
+ } catch ( err : any ) {
82
+ if ( err . name !== 'NotFoundError' ) {
83
+ throw err
84
+ }
85
+ }
86
+
87
+ return false
67
88
}
68
89
69
90
async delete ( peerId : PeerId ) : Promise < void > {
70
91
if ( this . peerId . equals ( peerId ) ) {
71
- throw new InvalidParametersError ( 'Cannot delete self peer' )
92
+ return
72
93
}
73
94
74
95
await this . datastore . delete ( peerIdToDatastoreKey ( peerId ) )
75
96
}
76
97
77
98
async load ( peerId : PeerId ) : Promise < Peer > {
78
- const buf = await this . datastore . get ( peerIdToDatastoreKey ( peerId ) )
99
+ const key = peerIdToDatastoreKey ( peerId )
100
+ const buf = await this . datastore . get ( key )
101
+ const peer = PeerPB . decode ( buf )
102
+
103
+ if ( this . #peerIsExpired( peer ) ) {
104
+ await this . datastore . delete ( key )
105
+ throw new NotFoundError ( )
106
+ }
79
107
80
- return bytesToPeer ( peerId , buf )
108
+ return pbToPeer ( peerId , peer , this . maxAddressAge )
81
109
}
82
110
83
111
async save ( peerId : PeerId , data : PeerData ) : Promise < PeerUpdate > {
84
- const {
85
- existingBuf,
86
- existingPeer
87
- } = await this . #findExistingPeer( peerId )
112
+ const existingPeer = await this . #findExistingPeer( peerId )
88
113
89
114
const peerPb : PeerPB = await toPeerPB ( peerId , data , 'patch' , {
90
115
addressFilter : this . addressFilter
91
116
} )
92
117
93
- return this . #saveIfDifferent( peerId , peerPb , existingBuf , existingPeer )
118
+ return this . #saveIfDifferent( peerId , peerPb , existingPeer )
94
119
}
95
120
96
121
async patch ( peerId : PeerId , data : Partial < PeerData > ) : Promise < PeerUpdate > {
97
- const {
98
- existingBuf,
99
- existingPeer
100
- } = await this . #findExistingPeer( peerId )
122
+ const existingPeer = await this . #findExistingPeer( peerId )
101
123
102
124
const peerPb : PeerPB = await toPeerPB ( peerId , data , 'patch' , {
103
125
addressFilter : this . addressFilter ,
104
126
existingPeer
105
127
} )
106
128
107
- return this . #saveIfDifferent( peerId , peerPb , existingBuf , existingPeer )
129
+ return this . #saveIfDifferent( peerId , peerPb , existingPeer )
108
130
}
109
131
110
132
async merge ( peerId : PeerId , data : PeerData ) : Promise < PeerUpdate > {
111
- const {
112
- existingBuf,
113
- existingPeer
114
- } = await this . #findExistingPeer( peerId )
133
+ const existingPeer = await this . #findExistingPeer( peerId )
115
134
116
135
const peerPb : PeerPB = await toPeerPB ( peerId , data , 'merge' , {
117
136
addressFilter : this . addressFilter ,
118
137
existingPeer
119
138
} )
120
139
121
- return this . #saveIfDifferent( peerId , peerPb , existingBuf , existingPeer )
140
+ return this . #saveIfDifferent( peerId , peerPb , existingPeer )
122
141
}
123
142
124
143
async * all ( query ?: PeerQuery ) : AsyncGenerator < Peer , void , unknown > {
125
- for await ( const { key, value } of this . datastore . query ( mapQuery ( query ?? { } ) ) ) {
126
- const peer = decodePeer ( key , value )
144
+ for await ( const { key, value } of this . datastore . query ( mapQuery ( query ?? { } , this . maxAddressAge ) ) ) {
145
+ const peerId = keyToPeerId ( key )
127
146
128
- if ( peer . id . equals ( this . peerId ) ) {
129
- // Skip self peer if present
147
+ // skip self peer if present
148
+ if ( peerId . equals ( this . peerId ) ) {
130
149
continue
131
150
}
132
151
133
- yield peer
152
+ const peer = PeerPB . decode ( value )
153
+
154
+ // remove expired peer
155
+ if ( this . #peerIsExpired( peer ) ) {
156
+ await this . datastore . delete ( key )
157
+ continue
158
+ }
159
+
160
+ yield pbToPeer ( peerId , peer , this . maxAddressAge )
134
161
}
135
162
}
136
163
137
- async #findExistingPeer ( peerId : PeerId ) : Promise < { existingBuf ?: Uint8Array , existingPeer ?: Peer } > {
164
+ async #findExistingPeer ( peerId : PeerId ) : Promise < ExistingPeer | undefined > {
138
165
try {
139
- const existingBuf = await this . datastore . get ( peerIdToDatastoreKey ( peerId ) )
140
- const existingPeer = bytesToPeer ( peerId , existingBuf )
166
+ const key = peerIdToDatastoreKey ( peerId )
167
+ const buf = await this . datastore . get ( key )
168
+ const peerPB = PeerPB . decode ( buf )
169
+
170
+ // remove expired peer
171
+ if ( this . #peerIsExpired( peerPB ) ) {
172
+ await this . datastore . delete ( key )
173
+ throw new NotFoundError ( )
174
+ }
141
175
142
176
return {
143
- existingBuf ,
144
- existingPeer
177
+ peerPB ,
178
+ peer : bytesToPeer ( peerId , buf , this . maxAddressAge )
145
179
}
146
180
} catch ( err : any ) {
147
181
if ( err . name !== 'NotFoundError' ) {
148
182
this . log . error ( 'invalid peer data found in peer store - %e' , err )
149
183
}
150
184
}
151
-
152
- return { }
153
185
}
154
186
155
- async #saveIfDifferent ( peerId : PeerId , peer : PeerPB , existingBuf ?: Uint8Array , existingPeer ?: Peer ) : Promise < PeerUpdate > {
187
+ async #saveIfDifferent ( peerId : PeerId , peer : PeerPB , existingPeer ?: ExistingPeer ) : Promise < PeerUpdate > {
188
+ // record last update
189
+ peer . updated = Date . now ( )
156
190
const buf = PeerPB . encode ( peer )
157
191
158
- if ( existingBuf != null && uint8ArrayEquals ( buf , existingBuf ) ) {
159
- return {
160
- peer : bytesToPeer ( peerId , buf ) ,
161
- previous : existingPeer ,
162
- updated : false
163
- }
164
- }
165
-
166
192
await this . datastore . put ( peerIdToDatastoreKey ( peerId ) , buf )
167
193
168
194
return {
169
- peer : bytesToPeer ( peerId , buf ) ,
170
- previous : existingPeer ,
171
- updated : true
195
+ peer : bytesToPeer ( peerId , buf , this . maxAddressAge ) ,
196
+ previous : existingPeer ?. peer ,
197
+ updated : existingPeer == null || ! peerEquals ( peer , existingPeer . peerPB )
198
+ }
199
+ }
200
+
201
+ #peerIsExpired ( peer : PeerPB ) : boolean {
202
+ if ( peer . updated == null ) {
203
+ return true
172
204
}
205
+
206
+ const expired = peer . updated < ( Date . now ( ) - this . maxPeerAge )
207
+ const minAddressObserved = Date . now ( ) - this . maxAddressAge
208
+ const addrs = peer . addresses . filter ( addr => {
209
+ return addr . observed != null && addr . observed > minAddressObserved
210
+ } )
211
+
212
+ return expired && addrs . length === 0
173
213
}
174
214
}
0 commit comments