Skip to content

Commit fb41972

Browse files
authored
chore: rendezvous improvements (vacp2p#1319)
1 parent 504d161 commit fb41972

File tree

3 files changed

+88
-58
lines changed

3 files changed

+88
-58
lines changed

libp2p/discovery/rendezvousinterface.nim

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,14 @@ proc `==`*(a, b: RdvNamespace): bool {.borrow.}
2626
method request*(
2727
self: RendezVousInterface, pa: PeerAttributes
2828
) {.async: (raises: [DiscoveryError, CancelledError]).} =
29-
var namespace = ""
29+
var namespace = Opt.none(string)
3030
for attr in pa:
3131
if attr.ofType(RdvNamespace):
32-
namespace = string attr.to(RdvNamespace)
32+
namespace = Opt.some(string attr.to(RdvNamespace))
3333
elif attr.ofType(DiscoveryService):
34-
namespace = string attr.to(DiscoveryService)
34+
namespace = Opt.some(string attr.to(DiscoveryService))
3535
elif attr.ofType(PeerId):
36-
namespace = $attr.to(PeerId)
36+
namespace = Opt.some($attr.to(PeerId))
3737
else:
3838
# unhandled type
3939
return
@@ -44,8 +44,8 @@ method request*(
4444
for address in pr.addresses:
4545
peer.add(address.address)
4646

47-
peer.add(DiscoveryService(namespace))
48-
peer.add(RdvNamespace(namespace))
47+
peer.add(DiscoveryService(namespace.get()))
48+
peer.add(RdvNamespace(namespace.get()))
4949
self.onPeerFound(peer)
5050

5151
await sleepAsync(self.timeToRequest)

libp2p/protocols/rendezvous.nim

Lines changed: 62 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ const
3737
RendezVousCodec* = "/rendezvous/1.0.0"
3838
MinimumDuration* = 2.hours
3939
MaximumDuration = 72.hours
40+
MaximumMessageLen = 1 shl 22 # 4MB
4041
MinimumNamespaceLen = 1
4142
MaximumNamespaceLen = 255
4243
RegistrationLimitPerPeer = 1000
@@ -63,7 +64,7 @@ type
6364

6465
Cookie = object
6566
offset: uint64
66-
ns: string
67+
ns: Opt[string]
6768

6869
Register = object
6970
ns: string
@@ -79,7 +80,7 @@ type
7980
ns: string
8081

8182
Discover = object
82-
ns: string
83+
ns: Opt[string]
8384
limit: Opt[uint64]
8485
cookie: Opt[seq[byte]]
8586

@@ -100,7 +101,8 @@ type
100101
proc encode(c: Cookie): ProtoBuffer =
101102
result = initProtoBuffer()
102103
result.write(1, c.offset)
103-
result.write(2, c.ns)
104+
if c.ns.isSome():
105+
result.write(2, c.ns.get())
104106
result.finish()
105107

106108
proc encode(r: Register): ProtoBuffer =
@@ -127,7 +129,8 @@ proc encode(u: Unregister): ProtoBuffer =
127129

128130
proc encode(d: Discover): ProtoBuffer =
129131
result = initProtoBuffer()
130-
result.write(1, d.ns)
132+
if d.ns.isSome():
133+
result.write(1, d.ns.get())
131134
d.limit.withValue(limit):
132135
result.write(2, limit)
133136
d.cookie.withValue(cookie):
@@ -161,13 +164,17 @@ proc encode(msg: Message): ProtoBuffer =
161164
result.finish()
162165

163166
proc decode(_: typedesc[Cookie], buf: seq[byte]): Opt[Cookie] =
164-
var c: Cookie
167+
var
168+
c: Cookie
169+
ns: string
165170
let
166171
pb = initProtoBuffer(buf)
167172
r1 = pb.getRequiredField(1, c.offset)
168-
r2 = pb.getRequiredField(2, c.ns)
173+
r2 = pb.getField(2, ns)
169174
if r1.isErr() or r2.isErr():
170175
return Opt.none(Cookie)
176+
if r2.get(false):
177+
c.ns = Opt.some(ns)
171178
Opt.some(c)
172179

173180
proc decode(_: typedesc[Register], buf: seq[byte]): Opt[Register] =
@@ -219,13 +226,16 @@ proc decode(_: typedesc[Discover], buf: seq[byte]): Opt[Discover] =
219226
d: Discover
220227
limit: uint64
221228
cookie: seq[byte]
229+
ns: string
222230
let
223231
pb = initProtoBuffer(buf)
224-
r1 = pb.getRequiredField(1, d.ns)
232+
r1 = pb.getField(1, ns)
225233
r2 = pb.getField(2, limit)
226234
r3 = pb.getField(3, cookie)
227235
if r1.isErr() or r2.isErr() or r3.isErr:
228236
return Opt.none(Discover)
237+
if r1.get(false):
238+
d.ns = Opt.some(ns)
229239
if r2.get(false):
230240
d.limit = Opt.some(limit)
231241
if r3.get(false):
@@ -446,7 +456,7 @@ proc discover(
446456
) {.async: (raises: [CancelledError, LPStreamError]).} =
447457
trace "Received Discover", peerId = conn.peerId, ns = d.ns
448458
libp2p_rendezvous_discover.inc()
449-
if d.ns.len > MaximumNamespaceLen:
459+
if d.ns.isSome() and d.ns.get().len > MaximumNamespaceLen:
450460
await conn.sendDiscoverResponseError(InvalidNamespace)
451461
return
452462
var limit = min(DiscoverLimit, d.limit.get(DiscoverLimit))
@@ -459,20 +469,19 @@ proc discover(
459469
return
460470
else:
461471
Cookie(offset: rdv.registered.low().uint64 - 1)
462-
if cookie.ns != d.ns or cookie.offset < rdv.registered.low().uint64 or
472+
if d.ns.isSome() and cookie.ns.isSome() and cookie.ns.get() != d.ns.get() or
473+
cookie.offset < rdv.registered.low().uint64 or
463474
cookie.offset > rdv.registered.high().uint64:
464475
cookie = Cookie(offset: rdv.registered.low().uint64 - 1)
465-
let
466-
nsSalted = d.ns & rdv.salt
467-
namespaces =
468-
if d.ns != "":
469-
try:
470-
rdv.namespaces[nsSalted]
471-
except KeyError:
472-
await conn.sendDiscoverResponseError(InvalidNamespace)
473-
return
474-
else:
475-
toSeq(cookie.offset.int .. rdv.registered.high())
476+
let namespaces =
477+
if d.ns.isSome():
478+
try:
479+
rdv.namespaces[d.ns.get() & rdv.salt]
480+
except KeyError:
481+
await conn.sendDiscoverResponseError(InvalidNamespace)
482+
return
483+
else:
484+
toSeq(max(cookie.offset.int, rdv.registered.offset) .. rdv.registered.high())
476485
if namespaces.len() == 0:
477486
await conn.sendDiscoverResponse(@[], Cookie())
478487
return
@@ -516,7 +525,7 @@ proc advertisePeer(
516525
rdv.sema.release()
517526

518527
await rdv.sema.acquire()
519-
discard await advertiseWrap().withTimeout(5.seconds)
528+
await advertiseWrap()
520529

521530
proc advertise*(
522531
rdv: RendezVous, ns: string, ttl: Duration, peers: seq[PeerId]
@@ -539,7 +548,7 @@ proc advertise*(
539548
let futs = collect(newSeq()):
540549
for peer in peers:
541550
trace "Send Advertise", peerId = peer, ns
542-
rdv.advertisePeer(peer, msg.buffer)
551+
rdv.advertisePeer(peer, msg.buffer).withTimeout(5.seconds)
543552

544553
await allFutures(futs)
545554

@@ -563,7 +572,7 @@ proc requestLocally*(rdv: RendezVous, ns: string): seq[PeerRecord] =
563572
@[]
564573

565574
proc request*(
566-
rdv: RendezVous, ns: string, l: int = DiscoverLimit.int, peers: seq[PeerId]
575+
rdv: RendezVous, ns: Opt[string], l: int = DiscoverLimit.int, peers: seq[PeerId]
567576
): Future[seq[PeerRecord]] {.async: (raises: [DiscoveryError, CancelledError]).} =
568577
var
569578
s: Table[PeerId, (PeerRecord, Register)]
@@ -572,7 +581,7 @@ proc request*(
572581

573582
if l <= 0 or l > DiscoverLimit.int:
574583
raise newException(AdvertiseError, "Invalid limit")
575-
if ns.len > MaximumNamespaceLen:
584+
if ns.isSome() and ns.get().len > MaximumNamespaceLen:
576585
raise newException(AdvertiseError, "Invalid namespace")
577586

578587
limit = l.uint64
@@ -584,15 +593,18 @@ proc request*(
584593
await conn.close()
585594
d.limit = Opt.some(limit)
586595
d.cookie =
587-
try:
588-
Opt.some(rdv.cookiesSaved[peer][ns])
589-
except KeyError as exc:
596+
if ns.isSome():
597+
try:
598+
Opt.some(rdv.cookiesSaved[peer][ns.get()])
599+
except KeyError, CatchableError:
600+
Opt.none(seq[byte])
601+
else:
590602
Opt.none(seq[byte])
591603
await conn.writeLp(
592604
encode(Message(msgType: MessageType.Discover, discover: Opt.some(d))).buffer
593605
)
594606
let
595-
buf = await conn.readLp(65536)
607+
buf = await conn.readLp(MaximumMessageLen)
596608
msgRcv = Message.decode(buf).valueOr:
597609
debug "Message undecodable"
598610
return
@@ -606,12 +618,14 @@ proc request*(
606618
trace "Cannot discover", ns, status = resp.status, text = resp.text
607619
return
608620
resp.cookie.withValue(cookie):
609-
if cookie.len() < 1000 and
610-
rdv.cookiesSaved.hasKeyOrPut(peer, {ns: cookie}.toTable()):
611-
try:
612-
rdv.cookiesSaved[peer][ns] = cookie
613-
except KeyError:
614-
raiseAssert "checked with hasKeyOrPut"
621+
if ns.isSome:
622+
let namespace = ns.get()
623+
if cookie.len() < 1000 and
624+
rdv.cookiesSaved.hasKeyOrPut(peer, {namespace: cookie}.toTable()):
625+
try:
626+
rdv.cookiesSaved[peer][namespace] = cookie
627+
except KeyError:
628+
raiseAssert "checked with hasKeyOrPut"
615629
for r in resp.registrations:
616630
if limit == 0:
617631
return
@@ -634,8 +648,9 @@ proc request*(
634648
else:
635649
s[pr.peerId] = (pr, r)
636650
limit.dec()
637-
for (_, r) in s.values():
638-
rdv.save(ns, peer, r, false)
651+
if ns.isSome():
652+
for (_, r) in s.values():
653+
rdv.save(ns.get(), peer, r, false)
639654

640655
for peer in peers:
641656
if limit == 0:
@@ -654,10 +669,15 @@ proc request*(
654669
return toSeq(s.values()).mapIt(it[0])
655670

656671
proc request*(
657-
rdv: RendezVous, ns: string, l: int = DiscoverLimit.int
672+
rdv: RendezVous, ns: Opt[string], l: int = DiscoverLimit.int
658673
): Future[seq[PeerRecord]] {.async: (raises: [DiscoveryError, CancelledError]).} =
659674
await rdv.request(ns, l, rdv.peers)
660675

676+
proc request*(
677+
rdv: RendezVous, l: int = DiscoverLimit.int
678+
): Future[seq[PeerRecord]] {.async: (raises: [DiscoveryError, CancelledError]).} =
679+
await rdv.request(Opt.none(string), l, rdv.peers)
680+
661681
proc unsubscribeLocally*(rdv: RendezVous, ns: string) =
662682
let nsSalted = ns & rdv.salt
663683
try:
@@ -690,7 +710,7 @@ proc unsubscribe*(
690710
for peer in peerIds:
691711
unsubscribePeer(peer)
692712

693-
discard await allFutures(futs).withTimeout(5.seconds)
713+
await allFutures(futs)
694714

695715
proc unsubscribe*(
696716
rdv: RendezVous, ns: string
@@ -786,8 +806,10 @@ proc new*(
786806
rdv.setup(switch)
787807
return rdv
788808

789-
proc deletesRegister(rdv: RendezVous) {.async: (raises: [CancelledError]).} =
790-
heartbeat "Register timeout", 1.minutes:
809+
proc deletesRegister(
810+
rdv: RendezVous, interval = 1.minutes
811+
) {.async: (raises: [CancelledError]).} =
812+
heartbeat "Register timeout", interval:
791813
let n = Moment.now()
792814
var total = 0
793815
rdv.registered.flushIfIt(it.expiration < n)

tests/testrendezvous.nim

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -58,18 +58,22 @@ suite "RendezVous":
5858
await client.start()
5959
await remoteSwitch.start()
6060
await client.connect(remoteSwitch.peerInfo.peerId, remoteSwitch.peerInfo.addrs)
61-
let res0 = await rdv.request("empty")
61+
let res0 = await rdv.request(Opt.some("empty"))
6262
check res0.len == 0
63+
6364
await rdv.advertise("foo")
64-
let res1 = await rdv.request("foo")
65+
let res1 = await rdv.request(Opt.some("foo"))
6566
check:
6667
res1.len == 1
6768
res1[0] == client.peerInfo.signedPeerRecord.data
68-
let res2 = await rdv.request("bar")
69+
70+
let res2 = await rdv.request(Opt.some("bar"))
6971
check res2.len == 0
72+
7073
await rdv.unsubscribe("foo")
71-
let res3 = await rdv.request("foo")
74+
let res3 = await rdv.request(Opt.some("foo"))
7275
check res3.len == 0
76+
7377
await allFutures(client.stop(), remoteSwitch.stop())
7478

7579
asyncTest "Harder remote test":
@@ -88,17 +92,21 @@ suite "RendezVous":
8892
)
8993
await allFutures(rdvSeq.mapIt(it.advertise("foo")))
9094
var data = clientSeq.mapIt(it.peerInfo.signedPeerRecord.data)
91-
let res1 = await rdvSeq[0].request("foo", 5)
95+
let res1 = await rdvSeq[0].request(Opt.some("foo"), 5)
9296
check res1.len == 5
9397
for d in res1:
9498
check d in data
9599
data.keepItIf(it notin res1)
96-
let res2 = await rdvSeq[0].request("foo")
100+
let res2 = await rdvSeq[0].request(Opt.some("foo"))
97101
check res2.len == 5
98102
for d in res2:
99103
check d in data
100-
let res3 = await rdvSeq[0].request("foo")
104+
let res3 = await rdvSeq[0].request(Opt.some("foo"))
101105
check res3.len == 0
106+
let res4 = await rdvSeq[0].request()
107+
check res4.len == 11
108+
let res5 = await rdvSeq[0].request(Opt.none(string))
109+
check res5.len == 11
102110
await remoteSwitch.stop()
103111
await allFutures(clientSeq.mapIt(it.stop()))
104112

@@ -116,9 +124,9 @@ suite "RendezVous":
116124
await clientA.connect(remoteSwitch.peerInfo.peerId, remoteSwitch.peerInfo.addrs)
117125
await clientB.connect(remoteSwitch.peerInfo.peerId, remoteSwitch.peerInfo.addrs)
118126
await rdvA.advertise("foo")
119-
let res1 = await rdvA.request("foo")
127+
let res1 = await rdvA.request(Opt.some("foo"))
120128
await rdvB.advertise("foo")
121-
let res2 = await rdvA.request("foo")
129+
let res2 = await rdvA.request(Opt.some("foo"))
122130
check:
123131
res2.len == 1
124132
res2[0] == clientB.peerInfo.signedPeerRecord.data
@@ -129,11 +137,11 @@ suite "RendezVous":
129137
rdv = RendezVous.new(minDuration = 1.minutes, maxDuration = 72.hours)
130138
switch = createSwitch(rdv)
131139
expect AdvertiseError:
132-
discard await rdv.request("A".repeat(300))
140+
discard await rdv.request(Opt.some("A".repeat(300)))
133141
expect AdvertiseError:
134-
discard await rdv.request("A", -1)
142+
discard await rdv.request(Opt.some("A"), -1)
135143
expect AdvertiseError:
136-
discard await rdv.request("A", 3000)
144+
discard await rdv.request(Opt.some("A"), 3000)
137145
expect AdvertiseError:
138146
await rdv.advertise("A".repeat(300))
139147
expect AdvertiseError:

0 commit comments

Comments
 (0)