Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 7 additions & 9 deletions libp2p/protocols/kademlia/find.nim
Original file line number Diff line number Diff line change
Expand Up @@ -120,14 +120,12 @@ proc dispatchFindNode*(
peer: PeerId,
target: Key,
addrs: Opt[seq[MultiAddress]] = Opt.none(seq[MultiAddress]),
): Future[Opt[Message]] {.async: (raises: [CancelledError, LPStreamError]), gcsafe.} =
): Future[Opt[Message]] {.
async: (raises: [CancelledError, DialFailedError, ValueError, LPStreamError]),
gcsafe
.} =
let addrs = addrs.valueOr(kad.switch.peerStore[AddressBook][peer])
let conn =
try:
await kad.switch.dial(peer, addrs, kad.codec)
except DialFailedError as e:
error "FindNode could not dial peer", description = e.msg
return Opt.none(Message)
let conn = await kad.switch.dial(peer, addrs, kad.codec)
defer:
await conn.close()

Expand All @@ -149,8 +147,7 @@ proc dispatchFindNode*(
)

let reply = Message.decode(replyBuf).valueOr:
debug "FindNode reply decode fail", error = error, conn = conn
return Opt.none(Message)
raise newException(ValueError, "FindNode reply decode fail")

if reply.closerPeers.len > 0:
kad_responses_with_closer_peers.inc(labelValues = [$MessageType.findNode])
Expand Down Expand Up @@ -205,6 +202,7 @@ proc iterativeLookup*(
continue
state.responded[peerId] =
if fut.failed(): RespondedStatus.Failed else: RespondedStatus.Success
# TODO: treat valueerrors or dialfailederrors

for (peerId, msg) in completedRPCBatch:
msg.withValue(reply):
Expand Down
13 changes: 5 additions & 8 deletions libp2p/protocols/kademlia/get.nim
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,11 @@ logScope:

proc dispatchGetVal*(
kad: KadDHT, peer: PeerId, key: Key
): Future[Opt[Message]] {.async: (raises: [CancelledError, LPStreamError]), gcsafe.} =
): Future[Opt[Message]] {.
async: (raises: [CancelledError, DialFailedError, LPStreamError]), gcsafe
.} =
let conn =
try:
await kad.switch.dial(peer, kad.switch.peerStore[AddressBook][peer], kad.codec)
except DialFailedError as e:
error "GetValue could not dial peer", description = e.msg
return Opt.none(Message)
await kad.switch.dial(peer, kad.switch.peerStore[AddressBook][peer], kad.codec)
defer:
await conn.close()

Expand All @@ -40,8 +38,7 @@ proc dispatchGetVal*(
)

let reply = Message.decode(replyBuf).valueOr:
error "GetValue reply decode fail", error = error, conn = conn
return Opt.none(Message)
raise newException(ValueError, "GetValue reply decode fail")

if reply.closerPeers.len > 0:
kad_responses_with_closer_peers.inc(labelValues = [$MessageType.getValue])
Expand Down
11 changes: 4 additions & 7 deletions libp2p/protocols/kademlia/ping.nim
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,10 @@ import ./[protobuf, types, kademlia_metrics]

proc ping*(
kad: KadDHT, peerId: PeerId, addrs: seq[MultiAddress]
): Future[bool] {.async: (raises: [CancelledError, ValueError, LPStreamError]).} =
let conn =
try:
await kad.switch.dial(peerId, addrs, kad.codec)
except DialFailedError as e:
error "Kad ping could not dial peer", description = e.msg
return false
): Future[bool] {.
async: (raises: [CancelledError, DialFailedError, ValueError, LPStreamError])
.} =
let conn = await kad.switch.dial(peerId, addrs, kad.codec)
defer:
await conn.close()

Expand Down
23 changes: 8 additions & 15 deletions libp2p/protocols/kademlia/provider.nim
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,8 @@ proc addProviderRecord(pm: ProviderManager, record: ProviderRecord) =

proc dispatchAddProvider(
switch: Switch, peer: PeerId, key: Key, codec: string
) {.async: (raises: [CancelledError, LPStreamError]).} =
let conn =
try:
await switch.dial(peer, switch.peerStore[AddressBook][peer], codec)
except DialFailedError as e:
error "AddProvider could not dial peer", description = e.msg
return
) {.async: (raises: [CancelledError, DialFailedError, LPStreamError]).} =
let conn = await switch.dial(peer, switch.peerStore[AddressBook][peer], codec)
defer:
await conn.close()

Expand Down Expand Up @@ -186,13 +181,12 @@ method handleAddProvider*(

proc dispatchGetProviders*(
kad: KadDHT, peer: PeerId, key: Key
): Future[Opt[Message]] {.async: (raises: [CancelledError, LPStreamError]), gcsafe.} =
): Future[Opt[Message]] {.
async: (raises: [CancelledError, DialFailedError, ValueError, LPStreamError]),
gcsafe
.} =
let conn =
try:
await kad.switch.dial(peer, kad.switch.peerStore[AddressBook][peer], kad.codec)
except DialFailedError as e:
error "GetProviders could not dial peer", description = e.msg
return Opt.none(Message)
await kad.switch.dial(peer, kad.switch.peerStore[AddressBook][peer], kad.codec)
defer:
await conn.close()
let msg = Message(msgType: MessageType.getProviders, key: key)
Expand All @@ -213,8 +207,7 @@ proc dispatchGetProviders*(
)

let reply = Message.decode(replyBuf).valueOr:
error "GetProviders reply decode fail", error = error, conn = conn
return Opt.none(Message)
raise newException(ValueError, "GetProviders reply decode fail")

if reply.closerPeers.len > 0:
kad_responses_with_closer_peers.inc(labelValues = [$MessageType.getProviders])
Expand Down
16 changes: 5 additions & 11 deletions libp2p/protocols/kademlia/put.nim
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,8 @@ proc isBestValue(kad: KadDHT, key: Key, record: EntryRecord): bool =

proc dispatchPutVal*(
switch: Switch, peer: PeerId, key: Key, value: seq[byte], codec: string
) {.async: (raises: [CancelledError, LPStreamError]).} =
let conn =
try:
await switch.dial(peer, switch.peerStore[AddressBook][peer], codec)
except DialFailedError as e:
error "PutVal could not dial peer", description = e.msg
return
) {.async: (raises: [CancelledError, DialFailedError, LPStreamError, ValueError]).} =
let conn = await switch.dial(peer, switch.peerStore[AddressBook][peer], codec)
defer:
await conn.close()
let msg = Message(
Expand All @@ -52,15 +47,14 @@ proc dispatchPutVal*(
)

let reply = Message.decode(replyBuf).valueOr:
error "PutValue reply decode fail", error = error, conn = conn
return
raise newException(ValueError, "PutValue reply decode fail")

debug "Got PutValue reply", msg = msg, reply = reply, conn = conn

if reply != msg:
error "Unexpected change between msg and reply: ",
msg = msg, reply = reply, conn = conn

debug "Got PutValue reply", msg = msg, reply = reply, conn = conn

proc putValue*(
kad: KadDHT, key: Key, value: seq[byte]
): Future[Result[void, string]] {.async: (raises: [CancelledError]), gcsafe.} =
Expand Down
Loading