Skip to content

Commit 9bb5b00

Browse files
authored
graphsync responder (#484)
Signed-off-by: turuslan <[email protected]>
1 parent 11af630 commit 9bb5b00

File tree

13 files changed

+121
-27
lines changed

13 files changed

+121
-27
lines changed

core/data_transfer/dt.cpp

Lines changed: 37 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99

1010
#include "common/libp2p/cbor_stream.hpp"
1111
#include "common/ptr.hpp"
12-
#include "storage/ipld/traverser.hpp"
1312

1413
#define MOVE(x) \
1514
x { \
@@ -80,26 +79,42 @@ namespace fc::data_transfer {
8079
push.on_begin(res.is_accepted);
8180
push.on_begin = {};
8281
if (res.is_accepted) {
83-
storage::ipld::traverser::Traverser t{
84-
*push.ipld, req.root_cid, CborRaw{req.selector}, true};
85-
gsns::Response res{
86-
gsns::ResponseStatusCode::RS_FULL_CONTENT, {}, {}};
87-
if (auto _cids{t.traverseAll()}) {
88-
auto ok{true};
89-
for (auto &cid : _cids.value()) {
90-
if (auto _data{push.ipld->get(cid)}) {
91-
res.data.push_back(
92-
{std::move(cid), std::move(_data.value())});
93-
} else {
94-
ok = false;
95-
break;
96-
}
97-
}
98-
if (ok) {
99-
return dt->gs->postResponse(pgsid, res);
100-
}
101-
}
102-
push.on_end(false);
82+
push.traverser.emplace(Traverser{
83+
*push.ipld,
84+
req.root_cid,
85+
CborRaw{req.selector},
86+
true,
87+
});
88+
dt->gs->postBlocks(
89+
pgsid,
90+
[=, &push](bool ok) -> boost::optional<gsns::Response> {
91+
gsns::Response res;
92+
if (ok) {
93+
if (push.traverser->isCompleted()) {
94+
res.status =
95+
gsns::ResponseStatusCode::RS_FULL_CONTENT;
96+
return res;
97+
}
98+
if (auto _cid{push.traverser->advance()}) {
99+
auto &cid{_cid.value()};
100+
if (auto _data{push.ipld->get(cid)}) {
101+
res.status =
102+
gsns::ResponseStatusCode::RS_PARTIAL_RESPONSE;
103+
res.data.emplace_back(gsns::Data{
104+
std::move(cid), std::move(_data.value())});
105+
return res;
106+
}
107+
}
108+
}
109+
push.on_end(false);
110+
dt->pushing_out.erase(it);
111+
if (ok) {
112+
res.status = gsns::ResponseStatusCode::RS_REJECTED;
113+
return res;
114+
}
115+
return boost::none;
116+
});
117+
return;
103118
}
104119
} else {
105120
push.on_end(false);
@@ -134,7 +149,7 @@ namespace fc::data_transfer {
134149
pushing_out.emplace(
135150
PeerDtId{peer.id, dtid},
136151
PushingOut{
137-
root, std::move(ipld), std::move(on_begin), std::move(on_end)});
152+
root, std::move(ipld), std::move(on_begin), std::move(on_end), {}});
138153
dtSend(peer,
139154
DataTransferRequest{
140155
root,

core/data_transfer/dt.hpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#include "data_transfer/message.hpp"
99
#include "fwd.hpp"
1010
#include "storage/ipfs/graphsync/graphsync.hpp"
11+
#include "storage/ipld/traverser.hpp"
1112

1213
namespace fc::data_transfer {
1314
struct PeerDtId;
@@ -26,6 +27,7 @@ namespace fc::data_transfer {
2627
using gsns::Graphsync;
2728
using libp2p::Host;
2829
using libp2p::peer::PeerId;
30+
using storage::ipld::traverser::Traverser;
2931
using PeerGsId = gsns::FullRequestId;
3032

3133
using DtId = uint64_t;
@@ -53,6 +55,7 @@ namespace fc::data_transfer {
5355
CID root;
5456
IpldPtr ipld;
5557
OkCb on_begin, on_end;
58+
boost::optional<Traverser> traverser;
5659
};
5760

5861
static gsns::Extension makeExt(const DataTransferMessage &msg);

core/storage/ipfs/graphsync/graphsync.hpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,8 @@ namespace fc::storage::ipfs::graphsync {
101101
std::vector<Data> data;
102102
};
103103

104+
using Responder = std::function<boost::optional<Response>(bool)>;
105+
104106
/// Graphsync protocol interface
105107
class Graphsync {
106108
public:
@@ -126,6 +128,8 @@ namespace fc::storage::ipfs::graphsync {
126128
virtual void postResponse(const FullRequestId &id,
127129
const Response &response) = 0;
128130

131+
virtual void postBlocks(const FullRequestId &id, Responder responder) = 0;
132+
129133
/// Starts instance and subscribes to blocks
130134
virtual void start() = 0;
131135

core/storage/ipfs/graphsync/impl/graphsync_impl.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,10 @@ namespace fc::storage::ipfs::graphsync {
6060
network_->sendResponse(id, response);
6161
}
6262

63+
void GraphsyncImpl::postBlocks(const FullRequestId &id, Responder responder) {
64+
network_->postBlocks(id, std::move(responder));
65+
}
66+
6367
void GraphsyncImpl::start() {
6468
if (started_) {
6569
return;

core/storage/ipfs/graphsync/impl/graphsync_impl.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ namespace fc::storage::ipfs::graphsync {
4444
std::string extension_name) override;
4545
void postResponse(const FullRequestId &id,
4646
const Response &response) override;
47+
void postBlocks(const FullRequestId &id, Responder responder) override;
4748
void start() override;
4849
void stop() override;
4950
Subscription makeRequest(const libp2p::peer::PeerInfo &peer,

core/storage/ipfs/graphsync/impl/network/network.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,13 @@ namespace fc::storage::ipfs::graphsync {
119119
ctx->sendResponse(id, response);
120120
}
121121

122+
void Network::postBlocks(const FullRequestId &id, Responder responder) {
123+
if (auto ctx{findContext(id.peer, false)}) {
124+
return ctx->postBlocks(id.id, std::move(responder));
125+
}
126+
responder(false);
127+
}
128+
122129
void Network::peerClosed(const PeerId &peer, ResponseStatusCode status) {
123130
auto it = peers_.find(peer);
124131
if (it != peers_.end()) {

core/storage/ipfs/graphsync/impl/network/network.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ namespace fc::storage::ipfs::graphsync {
5959
/// Sends response to peer.
6060
void sendResponse(const FullRequestId &id, const Response &response);
6161

62+
void postBlocks(const FullRequestId &id, Responder responder);
63+
6264
private:
6365
/// Callback from peer context that it's closed
6466
/// \param peer peer ID

core/storage/ipfs/graphsync/impl/network/outbound_endpoint.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,4 +105,11 @@ namespace fc::storage::ipfs::graphsync {
105105
}
106106
}
107107

108+
bool OutboundEndpoint::empty() const {
109+
if (queue_) {
110+
auto state{queue_->getState()};
111+
return state.pending_bytes == 0 && state.writing_bytes == 0;
112+
}
113+
return pending_bytes_ == 0;
114+
}
108115
} // namespace fc::storage::ipfs::graphsync

core/storage/ipfs/graphsync/impl/network/outbound_endpoint.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ namespace fc::storage::ipfs::graphsync {
4343
/// Clears all pending messages
4444
void clearPendingMessages();
4545

46+
bool empty() const;
47+
4648
private:
4749
/// Adds data block to response. Doesn't send unless sending partial
4850
/// response is needed

core/storage/ipfs/graphsync/impl/network/peer_context.cpp

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,13 +221,26 @@ namespace fc::storage::ipfs::graphsync {
221221
}
222222
}
223223

224+
void PeerContext::postBlocks(RequestId request_id, Responder responder) {
225+
connectIfNeeded();
226+
responders_.emplace(request_id, responder);
227+
if (outbound_endpoint_->empty()) {
228+
checkResponders();
229+
}
230+
}
231+
224232
void PeerContext::close(ResponseStatusCode status) {
225233
if (closed_) {
226234
return;
227235
}
228236

229237
logger()->debug("close peer={} status={}", str, statusCodeToString(status));
230238

239+
for (auto &p : responders_) {
240+
p.second(false);
241+
}
242+
responders_.clear();
243+
231244
close_status_ = status;
232245
closed_ = true;
233246
while (!streams_.empty()) {
@@ -381,6 +394,8 @@ namespace fc::storage::ipfs::graphsync {
381394
return;
382395
}
383396

397+
checkResponders();
398+
384399
shiftExpireTime(stream);
385400
}
386401

@@ -427,4 +442,20 @@ namespace fc::storage::ipfs::graphsync {
427442
}
428443
}
429444

445+
void PeerContext::checkResponders() {
446+
auto it{responders_.begin()};
447+
while (it != responders_.end()) {
448+
auto &[id, cb]{*it};
449+
auto res{cb(true)};
450+
if (res) {
451+
sendResponse({peer, id}, *res);
452+
}
453+
if (!res || isTerminal(res->status)) {
454+
it = responders_.erase(it);
455+
}
456+
if (res) {
457+
break;
458+
}
459+
}
460+
}
430461
} // namespace fc::storage::ipfs::graphsync

0 commit comments

Comments
 (0)