Skip to content

Commit c03bf7c

Browse files
authored
feat: add scheduler support for timeout management in Multiselect (#312)
* feat: add scheduler support for timeout management in Multiselect * fix: pass scheduler to Multiselect for improved timeout management
1 parent f63fe0c commit c03bf7c

File tree

5 files changed

+59
-5
lines changed

5 files changed

+59
-5
lines changed

include/libp2p/protocol_muxer/multiselect.hpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,10 @@
1111

1212
#include "protocol_muxer.hpp"
1313

14+
namespace libp2p::basic {
15+
class Scheduler;
16+
}
17+
1418
namespace libp2p::protocol_muxer::multiselect {
1519

1620
class MultiselectInstance;
@@ -20,6 +24,8 @@ namespace libp2p::protocol_muxer::multiselect {
2024
public:
2125
using Instance = std::shared_ptr<MultiselectInstance>;
2226

27+
explicit Multiselect(std::shared_ptr<basic::Scheduler> scheduler);
28+
2329
~Multiselect() override = default;
2430

2531
/// Implements ProtocolMuxer API
@@ -45,6 +51,9 @@ namespace libp2p::protocol_muxer::multiselect {
4551
/// Returns instance either from cache or creates a new one
4652
Instance getInstance();
4753

54+
/// Scheduler for timeout management
55+
std::shared_ptr<basic::Scheduler> scheduler_;
56+
4857
/// Active instances, keep them here to hold shared ptrs alive
4958
std::unordered_set<Instance> active_instances_;
5059

include/libp2p/protocol_muxer/multiselect/multiselect_instance.hpp

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,18 @@
66

77
#pragma once
88

9+
#include <libp2p/basic/scheduler.hpp>
910
#include <libp2p/protocol_muxer/multiselect.hpp>
1011
#include "parser.hpp"
1112

1213
namespace soralog {
1314
class Logger;
1415
}
1516

17+
namespace libp2p::basic {
18+
class Scheduler;
19+
}
20+
1621
namespace libp2p::protocol_muxer::multiselect {
1722

1823
class Multiselect;
@@ -21,7 +26,8 @@ namespace libp2p::protocol_muxer::multiselect {
2126
class MultiselectInstance
2227
: public std::enable_shared_from_this<MultiselectInstance> {
2328
public:
24-
explicit MultiselectInstance(Multiselect &owner);
29+
explicit MultiselectInstance(Multiselect &owner,
30+
std::shared_ptr<basic::Scheduler> scheduler);
2531

2632
/// Implements ProtocolMuxer API
2733
void selectOneOf(std::span<const peer::ProtocolName> protocols,
@@ -74,6 +80,9 @@ namespace libp2p::protocol_muxer::multiselect {
7480
/// Handles "na" reply, client-specific
7581
MaybeResult handleNA();
7682

83+
/// Handles timeout when negotiation takes too long
84+
void onTimeout();
85+
7786
/// Owner of this object, needed for reuse of instances
7887
Multiselect &owner_;
7988

@@ -125,6 +134,12 @@ namespace libp2p::protocol_muxer::multiselect {
125134

126135
/// Cache: serialized NA response
127136
boost::optional<Packet> na_response_;
137+
138+
/// Scheduler for timeout handling
139+
std::shared_ptr<basic::Scheduler> scheduler_;
140+
141+
/// Timeout handle for negotiation timeout
142+
basic::Scheduler::Handle timeout_handle_;
128143
};
129144

130145
} // namespace libp2p::protocol_muxer::multiselect

src/protocol_muxer/multiselect.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
* SPDX-License-Identifier: Apache-2.0
55
*/
66

7+
#include <libp2p/basic/scheduler.hpp>
78
#include <libp2p/log/logger.hpp>
89
#include <libp2p/protocol_muxer/multiselect/multiselect_instance.hpp>
910
#include <libp2p/protocol_muxer/multiselect/simple_stream_negotiate.hpp>
@@ -21,6 +22,9 @@ namespace libp2p::protocol_muxer::multiselect {
2122
constexpr size_t kMaxCacheSize = 8;
2223
} // namespace
2324

25+
Multiselect::Multiselect(std::shared_ptr<basic::Scheduler> scheduler)
26+
: scheduler_(std::move(scheduler)) {}
27+
2428
void Multiselect::selectOneOf(std::span<const peer::ProtocolName> protocols,
2529
std::shared_ptr<basic::ReadWriter> connection,
2630
bool is_initiator,
@@ -62,7 +66,7 @@ namespace libp2p::protocol_muxer::multiselect {
6266
Multiselect::Instance Multiselect::getInstance() {
6367
Instance instance;
6468
if (cache_.empty()) {
65-
instance = std::make_shared<MultiselectInstance>(*this);
69+
instance = std::make_shared<MultiselectInstance>(*this, scheduler_);
6670
} else {
6771
SL_TRACE(log(),
6872
"cache: {}->{}, active {}->{}",

src/protocol_muxer/multiselect/multiselect_instance.cpp

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@
77
#include <libp2p/protocol_muxer/multiselect/multiselect_instance.hpp>
88

99
#include <cctype>
10+
#include <span>
1011

12+
#include <libp2p/basic/scheduler.hpp>
1113
#include <libp2p/basic/write_return_size.hpp>
1214
#include <libp2p/common/trace.hpp>
1315
#include <libp2p/protocol_muxer/multiselect/serializing.hpp>
@@ -20,10 +22,14 @@ namespace libp2p::protocol_muxer::multiselect {
2022
static log::Logger logger = log::createLogger("Multiselect");
2123
return logger;
2224
}
25+
26+
/// Timeout for protocol negotiation (5 seconds)
27+
constexpr std::chrono::milliseconds kNegotiationTimeout{5000};
2328
} // namespace
2429

25-
MultiselectInstance::MultiselectInstance(Multiselect &owner)
26-
: owner_(owner) {}
30+
MultiselectInstance::MultiselectInstance(
31+
Multiselect &owner, std::shared_ptr<basic::Scheduler> scheduler)
32+
: owner_(owner), scheduler_(std::move(scheduler)) {}
2733

2834
void MultiselectInstance::selectOneOf(
2935
std::span<const peer::ProtocolName> protocols,
@@ -62,6 +68,15 @@ namespace libp2p::protocol_muxer::multiselect {
6268
write_queue_.clear();
6369
is_writing_ = false;
6470

71+
// Schedule timeout for negotiation
72+
timeout_handle_ = scheduler_->scheduleWithHandle(
73+
[wptr = std::weak_ptr<MultiselectInstance>(shared_from_this())]() {
74+
if (auto self = wptr.lock()) {
75+
self->onTimeout();
76+
}
77+
},
78+
kNegotiationTimeout);
79+
6580
if (is_initiator_) {
6681
std::ignore = sendProposal();
6782
} else if (negotiate_multiselect) {
@@ -166,6 +181,10 @@ namespace libp2p::protocol_muxer::multiselect {
166181

167182
void MultiselectInstance::close(outcome::result<std::string> result) {
168183
closed_ = true;
184+
185+
// Cancel timeout if it's still active
186+
timeout_handle_.reset();
187+
169188
++current_round_;
170189
write_queue_.clear();
171190
Multiselect::ProtocolHandlerFunc callback;
@@ -338,4 +357,11 @@ namespace libp2p::protocol_muxer::multiselect {
338357
return MaybeResult(ProtocolMuxer::Error::PROTOCOL_VIOLATION);
339358
}
340359

360+
void MultiselectInstance::onTimeout() {
361+
SL_DEBUG(log(),
362+
"Protocol negotiation timeout after {}ms",
363+
kNegotiationTimeout.count());
364+
close(ProtocolMuxer::Error::NEGOTIATION_FAILED);
365+
}
366+
341367
} // namespace libp2p::protocol_muxer::multiselect

test/acceptance/p2p/host/peer/test_peer.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ Peer::sptr<host::BasicHost> Peer::makeHost(const crypto::KeyPair &keyPair) {
136136
std::make_shared<peer::IdentityManagerImpl>(keyPair, key_marshaller);
137137

138138
auto multiselect =
139-
std::make_shared<protocol_muxer::multiselect::Multiselect>();
139+
std::make_shared<protocol_muxer::multiselect::Multiselect>(scheduler_);
140140

141141
auto router = std::make_shared<network::RouterImpl>();
142142

0 commit comments

Comments
 (0)