Skip to content

Commit 595195e

Browse files
authored
stream open queue (#488)
Signed-off-by: turuslan <[email protected]>
1 parent e1d8d1f commit 595195e

File tree

13 files changed

+207
-135
lines changed

13 files changed

+207
-135
lines changed

core/api/full_node/make.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -667,6 +667,7 @@ namespace fc::api {
667667
vm::message::MessageSignerImpl{key_store}.sign(message.from,
668668
message));
669669
OUTCOME_TRY(mpool->add(signed_message));
670+
spdlog::info("MpoolPushMessage {}", signed_message.getCid());
670671
return std::move(signed_message);
671672
};
672673
api->MpoolSelect = [=](auto &tsk, auto ticket_quality)

core/common/libp2p/CMakeLists.txt

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,10 @@ add_library(cbor_stream
1010
target_link_libraries(cbor_stream
1111
cbor
1212
)
13+
14+
add_library(stream_open_queue
15+
stream_open_queue.cpp
16+
)
17+
target_link_libraries(stream_open_queue
18+
p2p::p2p
19+
)
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/**
2+
* Copyright Soramitsu Co., Ltd. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
#include "common/libp2p/stream_open_queue.hpp"
7+
8+
namespace libp2p::connection {
9+
StreamOpenQueue::Active::Active(std::shared_ptr<Stream> stream,
10+
std::weak_ptr<StreamOpenQueue> weak,
11+
List::iterator it)
12+
: StreamProxy{std::move(stream)},
13+
weak{std::move(weak)},
14+
it{std::move(it)} {}
15+
16+
StreamOpenQueue::Active::~Active() {
17+
stream->close([stream{stream}](auto) { stream->reset(); });
18+
if (auto queue{weak.lock()}) {
19+
queue->active.erase(it);
20+
queue->check();
21+
}
22+
}
23+
24+
StreamOpenQueue::StreamOpenQueue(std::shared_ptr<Host> host,
25+
size_t max_active)
26+
: host{std::move(host)}, max_active{max_active} {
27+
assert(max_active >= 1);
28+
}
29+
30+
void StreamOpenQueue::open(Pending item) {
31+
queue.emplace(std::move(item));
32+
check();
33+
}
34+
35+
void StreamOpenQueue::check() {
36+
while (!queue.empty() && active.size() < max_active) {
37+
auto item{std::move(queue.front())};
38+
queue.pop();
39+
auto it{active.insert(active.begin(), Active::List::value_type{})};
40+
host->newStream(item.peer,
41+
item.protocol,
42+
[weak{weak_from_this()},
43+
cb{std::move(item.cb)},
44+
it{std::move(it)}](Host::StreamResult _stream) {
45+
if (auto self{weak.lock()}) {
46+
if (!_stream) {
47+
self->active.erase(it);
48+
self->check();
49+
return cb(_stream);
50+
}
51+
auto stream{std::make_shared<Active>(
52+
std::move(_stream.value()), weak, it)};
53+
*it = stream;
54+
cb(std::move(stream));
55+
}
56+
});
57+
}
58+
}
59+
} // namespace libp2p::connection
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/**
2+
* Copyright Soramitsu Co., Ltd. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
#pragma once
7+
8+
#include <libp2p/host/host.hpp>
9+
#include <list>
10+
#include <queue>
11+
12+
#include "common/libp2p/stream_proxy.hpp"
13+
14+
namespace libp2p::connection {
15+
struct StreamOpenQueue : std::enable_shared_from_this<StreamOpenQueue> {
16+
struct Active : StreamProxy {
17+
using List = std::list<std::weak_ptr<Active>>;
18+
19+
std::weak_ptr<StreamOpenQueue> weak;
20+
List::iterator it;
21+
22+
Active(std::shared_ptr<Stream> stream,
23+
std::weak_ptr<StreamOpenQueue> weak,
24+
List::iterator it);
25+
~Active() override;
26+
};
27+
28+
struct Pending {
29+
peer::PeerInfo peer;
30+
peer::Protocol protocol;
31+
Host::StreamResultHandler cb;
32+
};
33+
34+
std::shared_ptr<Host> host;
35+
size_t max_active;
36+
std::queue<Pending> queue;
37+
Active::List active;
38+
39+
StreamOpenQueue(std::shared_ptr<Host> host, size_t max_active);
40+
41+
void open(Pending item);
42+
43+
void check();
44+
};
45+
} // namespace libp2p::connection

core/data_transfer/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,5 @@ add_library(data_transfer
99
target_link_libraries(data_transfer
1010
cbor_stream
1111
graphsync
12+
stream_open_queue
1213
)

core/data_transfer/dt.cpp

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ size_t std::hash<fc::data_transfer::PeerDtId>::operator()(
2424
}
2525

2626
namespace fc::data_transfer {
27+
constexpr size_t kStreamOpenMax{20};
28+
2729
using common::libp2p::CborStream;
2830
using libp2p::protocol::Subscription;
2931
using storage::ipld::kAllSelector;
@@ -52,6 +54,7 @@ namespace fc::data_transfer {
5254
std::shared_ptr<Host> host, std::shared_ptr<Graphsync> gs) {
5355
auto dt{std::make_shared<DataTransfer>()};
5456
dt->host = host;
57+
dt->streams = std::make_shared<StreamOpenQueue>(host, kStreamOpenMax);
5558
dt->gs = gs;
5659
gs->setRequestHandler(
5760
[_dt{weaken(dt)}](auto pgsid, auto req) {
@@ -356,11 +359,15 @@ namespace fc::data_transfer {
356359

357360
void DataTransfer::dtSend(const PeerInfo &peer,
358361
const DataTransferMessage &msg) {
359-
host->newStream(peer, kProtocol, [msg](auto _s) {
360-
if (_s) {
361-
auto s{std::make_shared<CborStream>(_s.value())};
362-
s->write(msg, [s](auto) { s->close(); });
363-
}
362+
streams->open({
363+
peer,
364+
kProtocol,
365+
[msg](Host::StreamResult _stream) {
366+
if (_stream) {
367+
auto stream{std::make_shared<CborStream>(_stream.value())};
368+
stream->write(msg, [stream](outcome::result<size_t>) {});
369+
}
370+
},
364371
});
365372
}
366373
} // namespace fc::data_transfer

core/data_transfer/dt.hpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
#pragma once
77

8+
#include "common/libp2p/stream_open_queue.hpp"
89
#include "data_transfer/message.hpp"
910
#include "fwd.hpp"
1011
#include "storage/ipfs/graphsync/graphsync.hpp"
@@ -26,6 +27,7 @@ namespace fc::data_transfer {
2627

2728
using gsns::Graphsync;
2829
using libp2p::Host;
30+
using libp2p::connection::StreamOpenQueue;
2931
using libp2p::peer::PeerId;
3032
using storage::ipld::traverser::Traverser;
3133
using PeerGsId = gsns::FullRequestId;
@@ -94,6 +96,7 @@ namespace fc::data_transfer {
9496
void dtSend(const PeerInfo &peer, const DataTransferMessage &msg);
9597

9698
std::shared_ptr<Host> host;
99+
std::shared_ptr<StreamOpenQueue> streams;
97100
std::shared_ptr<Graphsync> gs;
98101
std::map<std::string, OnPush> on_push;
99102
std::map<std::string, OnPull> on_pull;

core/markets/retrieval/client/impl/retrieval_client_impl.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,9 +112,10 @@ namespace fc::markets::retrieval::client {
112112
auto unseal{res.status == DealStatus::kDealStatusFundsNeededUnseal};
113113
const auto last{res.status
114114
== DealStatus::kDealStatusFundsNeededLastPayment};
115+
const auto pay{res.status == DealStatus::kDealStatusFundsNeeded};
115116
const auto done{res.status == DealStatus::kDealStatusCompleted};
116117
const auto accepted{res.status == DealStatus::kDealStatusAccepted};
117-
deal->accepted = unseal || accepted || last || done;
118+
deal->accepted = unseal || accepted || pay || last || done;
118119
if (!deal->accepted) {
119120
deal->handler(
120121
res.status == DealStatus::kDealStatusRejected

core/markets/storage/client/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,5 @@ target_link_libraries(storage_market_client
2121
tipset
2222
memory_indexed_car
2323
storage_market_import_manager
24+
stream_open_queue
2425
)

0 commit comments

Comments
 (0)