Skip to content

Commit 06a70df

Browse files
authored
Hotfix/streams (#55)
* hotfixes to yamux and dialer * streams hotfix: yamux update * unneeded comment removed * unndeded std::forward changed * one minor cosmetic fix
1 parent c04e76d commit 06a70df

File tree

17 files changed

+386
-170
lines changed

17 files changed

+386
-170
lines changed

include/libp2p/common/trace.hpp

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/**
2+
* Copyright Soramitsu Co., Ltd. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
#include <libp2p/common/logger.hpp>
7+
8+
#ifndef LIBP2P_COMMON_TRACE_HPP
9+
#define LIBP2P_COMMON_TRACE_HPP
10+
11+
namespace libp2p::common {
12+
13+
/**
14+
* Special debug utility function, allows for not having logger as member
15+
* field
16+
*/
17+
template <typename... Args>
18+
inline void traceToDebugLogger(spdlog::string_view_t fmt,
19+
const Args &... args) {
20+
static const std::string tag("debug");
21+
auto log = common::createLogger(tag);
22+
log->trace(fmt, args...);
23+
}
24+
25+
} // namespace libp2p::common
26+
27+
#if TRACE_ENABLED
28+
#define TRACE(...) libp2p::common::traceToDebugLogger(__VA_ARGS__)
29+
#else
30+
#define TRACE(...)
31+
#endif
32+
33+
#endif // LIBP2P_COMMON_TRACE_HPP

include/libp2p/muxer/yamux/yamux_stream.hpp

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,10 @@ namespace libp2p::connection {
9090
void write(gsl::span<const uint8_t> in, size_t bytes, WriteCallbackFunc cb,
9191
bool some);
9292

93+
/// this stream's connection
9394
std::weak_ptr<YamuxedConnection> yamuxed_connection_;
95+
96+
/// id of this stream
9497
YamuxedConnection::StreamId stream_id_;
9598

9699
/// is the stream opened for reads?
@@ -99,8 +102,10 @@ namespace libp2p::connection {
99102
/// is the stream opened for writes?
100103
bool is_writable_ = true;
101104

102-
/// default sliding window size of the stream - how much unread bytes can be
103-
/// on both sides
105+
/**
106+
* default sliding window size of the stream - how much unread bytes can be
107+
* on both sides
108+
*/
104109
static constexpr uint32_t kDefaultWindowSize = 256 * 1024; // in bytes
105110

106111
/// how much unacked bytes can we have on our side
@@ -118,9 +123,48 @@ namespace libp2p::connection {
118123
/// is the stream reading right now?
119124
bool is_reading_ = false;
120125

126+
/// read callback, non-zero during async data receive
127+
ReadCallbackFunc read_cb_;
128+
129+
/// client's read buffer
130+
gsl::span<uint8_t> external_read_buffer_;
131+
132+
/// bytes count client is waiting for, non-zero during async data receive
133+
size_t bytes_waiting_ = 0;
134+
135+
/// client makes readSome operation
136+
bool reading_some_ = false;
137+
138+
/// starts async read operation
139+
void beginRead(ReadCallbackFunc cb, gsl::span<uint8_t> out, size_t bytes,
140+
bool some);
141+
142+
/// ends async read operation
143+
void endRead(outcome::result<size_t> result);
144+
145+
/// Tries to consume requested bytes from already received data
146+
outcome::result<size_t> tryConsumeReadBuffer(gsl::span<uint8_t> out,
147+
size_t bytes, bool some);
148+
149+
/**
150+
* Forwards read buffer and receive window and acknowledges bytes received
151+
* in async manner
152+
* @param bytes number of bytes to ack
153+
*/
154+
void sendAck(size_t bytes);
155+
121156
/// is the stream writing right now?
122157
bool is_writing_ = false;
123158

159+
/// write callback, non-zero during async sends
160+
WriteCallbackFunc write_cb_;
161+
162+
/// starts async write operation
163+
void beginWrite(WriteCallbackFunc cb);
164+
165+
/// ends async write operation
166+
void endWrite(outcome::result<size_t> result);
167+
124168
/// YamuxedConnection API starts here
125169
friend class YamuxedConnection;
126170

@@ -137,6 +181,9 @@ namespace libp2p::connection {
137181
*/
138182
outcome::result<void> commitData(gsl::span<const uint8_t> data,
139183
size_t data_size);
184+
185+
/// Called by connection on reset
186+
void onConnectionReset(outcome::result<size_t> reason);
140187
};
141188
} // namespace libp2p::connection
142189

include/libp2p/muxer/yamux/yamuxed_connection.hpp

Lines changed: 12 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@ namespace libp2p::connection {
3939
TOO_MANY_STREAMS,
4040
FORBIDDEN_CALL,
4141
OTHER_SIDE_ERROR,
42-
INTERNAL_ERROR
42+
INTERNAL_ERROR,
43+
CLOSED_BY_PEER,
4344
};
4445

4546
/**
@@ -104,9 +105,6 @@ namespace libp2p::connection {
104105
// indicates whether start() has been executed or not
105106
bool started_ = false;
106107

107-
// XXX
108-
bool new_stream_pending_ = false;
109-
110108
/**
111109
* Write message to the connection; ensures no more than one wright
112110
* would be executed at one time
@@ -169,7 +167,7 @@ namespace libp2p::connection {
169167
/**
170168
* Reset all streams, which were created over this connection
171169
*/
172-
void resetAllStreams();
170+
void resetAllStreams(outcome::result<void> reason);
173171

174172
/**
175173
* Find stream with such id in local streams
@@ -236,13 +234,21 @@ namespace libp2p::connection {
236234
/**
237235
* Close this Yamux session
238236
*/
239-
void closeSession();
237+
void closeSession(outcome::result<void> reason);
240238

239+
/// Underlying connection
241240
std::shared_ptr<SecureConnection> connection_;
241+
242+
/// Handler for new inbound streams
242243
NewStreamHandlerFunc new_stream_handler_;
244+
245+
/// Config constants
243246
muxer::MuxedConnectionConfig config_;
244247

248+
/// Last stream id to be incremented
245249
uint32_t last_created_stream_id_;
250+
251+
/// Streams
246252
std::unordered_map<StreamId, std::shared_ptr<YamuxStream>> streams_;
247253

248254
libp2p::common::Logger log_ = libp2p::common::createLogger("yx-conn");
@@ -265,18 +271,6 @@ namespace libp2p::connection {
265271
void streamOnWindowUpdate(StreamId stream_id, NotifyeeCallback cb);
266272
std::map<StreamId, NotifyeeCallback> window_updates_subs_;
267273

268-
/**
269-
* Add a handler function, which is called, when data for a particular
270-
* stream is received
271-
* @param stream_id of the stream which is to be notified
272-
* @param handler to be called; if it returns true, it's removed from
273-
* the list of handlers for that stream
274-
* @note this is done through a function and not event emitters, as each
275-
* stream is to receive that event independently based on id
276-
*/
277-
void streamOnAddData(StreamId stream_id, NotifyeeCallback cb);
278-
std::map<StreamId, NotifyeeCallback> data_subs_;
279-
280274
/**
281275
* Write bytes to the connection; before calling this method, the stream
282276
* must ensure that no write operations are currently running

include/libp2p/network/impl/dialer_impl.hpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
#include <libp2p/network/connection_manager.hpp>
1010
#include <libp2p/network/dialer.hpp>
11+
#include <libp2p/network/listener_manager.hpp>
1112
#include <libp2p/network/transport_manager.hpp>
1213
#include <libp2p/protocol_muxer/protocol_muxer.hpp>
1314

@@ -19,7 +20,8 @@ namespace libp2p::network {
1920

2021
DialerImpl(std::shared_ptr<protocol_muxer::ProtocolMuxer> multiselect,
2122
std::shared_ptr<TransportManager> tmgr,
22-
std::shared_ptr<ConnectionManager> cmgr);
23+
std::shared_ptr<ConnectionManager> cmgr,
24+
std::shared_ptr<ListenerManager> listener);
2325

2426
// Establishes a connection to a given peer
2527
void dial(const peer::PeerInfo &p, DialResultFunc cb) override;
@@ -33,8 +35,7 @@ namespace libp2p::network {
3335
std::shared_ptr<protocol_muxer::ProtocolMuxer> multiselect_;
3436
std::shared_ptr<TransportManager> tmgr_;
3537
std::shared_ptr<ConnectionManager> cmgr_;
36-
37-
common::Logger log_ = common::createLogger("debug"); // XXX
38+
std::shared_ptr<ListenerManager> listener_;
3839
};
3940

4041
} // namespace libp2p::network

include/libp2p/network/impl/listener_manager_impl.hpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,10 @@ namespace libp2p::network {
5050

5151
Router &getRouter() override;
5252

53+
void onConnection(
54+
outcome::result<std::shared_ptr<connection::CapableConnection>> rconn)
55+
override;
56+
5357
private:
5458
bool started = false;
5559

@@ -61,9 +65,6 @@ namespace libp2p::network {
6165
std::shared_ptr<network::Router> router_;
6266
std::shared_ptr<TransportManager> tmgr_;
6367
std::shared_ptr<ConnectionManager> cmgr_;
64-
65-
void onConnection(
66-
outcome::result<std::shared_ptr<connection::CapableConnection>> rconn);
6768
};
6869

6970
} // namespace libp2p::network

include/libp2p/network/impl/network_impl.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ namespace libp2p::network {
1515
public:
1616
~NetworkImpl() override = default;
1717

18-
NetworkImpl(std::unique_ptr<ListenerManager> listener,
18+
NetworkImpl(std::shared_ptr<ListenerManager> listener,
1919
std::unique_ptr<Dialer> dialer,
2020
std::shared_ptr<ConnectionManager> cmgr);
2121

@@ -28,7 +28,7 @@ namespace libp2p::network {
2828
ConnectionManager &getConnectionManager() override;
2929

3030
private:
31-
std::unique_ptr<ListenerManager> listener_;
31+
std::shared_ptr<ListenerManager> listener_;
3232
std::unique_ptr<Dialer> dialer_;
3333
std::shared_ptr<ConnectionManager> cmgr_;
3434
};

include/libp2p/network/listener_manager.hpp

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@
1111
#include <libp2p/event/bus.hpp>
1212
#include <libp2p/multi/multiaddress.hpp>
1313
#include <libp2p/network/router.hpp>
14-
#include <libp2p/protocol/base_protocol.hpp>
1514
#include <libp2p/outcome/outcome.hpp>
15+
#include <libp2p/protocol/base_protocol.hpp>
1616

1717
namespace libp2p::network {
1818

@@ -117,6 +117,13 @@ namespace libp2p::network {
117117
* @brief Getter for Router.
118118
*/
119119
virtual Router &getRouter() = 0;
120+
121+
/**
122+
* @brief Allows new connections for accepting incoming streams
123+
*/
124+
virtual void onConnection(
125+
outcome::result<std::shared_ptr<connection::CapableConnection>>
126+
rconn) = 0;
120127
};
121128

122129
} // namespace libp2p::network

src/muxer/yamux/yamux_frame.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@
88
#include <libp2p/common/byteutil.hpp>
99
#include <libp2p/muxer/yamux/yamux_frame.hpp>
1010

11+
#define TRACE_ENABLED 0
12+
#include <libp2p/common/trace.hpp>
13+
1114
namespace libp2p::connection {
1215
YamuxFrame::ByteArray YamuxFrame::frameBytes(uint8_t version, FrameType type,
1316
Flag flag, uint32_t stream_id,
@@ -34,24 +37,28 @@ namespace libp2p::connection {
3437
}
3538

3639
YamuxFrame::ByteArray newStreamMsg(YamuxFrame::StreamId stream_id) {
40+
TRACE("yamux newStreamMsg, stream_id={}", stream_id);
3741
return YamuxFrame::frameBytes(YamuxFrame::kDefaultVersion,
3842
YamuxFrame::FrameType::DATA,
3943
YamuxFrame::Flag::SYN, stream_id, 0);
4044
}
4145

4246
YamuxFrame::ByteArray ackStreamMsg(YamuxFrame::StreamId stream_id) {
47+
TRACE("yamux ackStreamMsg, stream_id={}", stream_id);
4348
return YamuxFrame::frameBytes(YamuxFrame::kDefaultVersion,
4449
YamuxFrame::FrameType::DATA,
4550
YamuxFrame::Flag::ACK, stream_id, 0);
4651
}
4752

4853
YamuxFrame::ByteArray closeStreamMsg(YamuxFrame::StreamId stream_id) {
54+
TRACE("yamux closeStreamMsg, stream_id={}", stream_id);
4955
return YamuxFrame::frameBytes(YamuxFrame::kDefaultVersion,
5056
YamuxFrame::FrameType::DATA,
5157
YamuxFrame::Flag::FIN, stream_id, 0);
5258
}
5359

5460
YamuxFrame::ByteArray resetStreamMsg(YamuxFrame::StreamId stream_id) {
61+
TRACE("yamux resetStreamMsg, stream_id={}", stream_id);
5562
return YamuxFrame::frameBytes(YamuxFrame::kDefaultVersion,
5663
YamuxFrame::FrameType::DATA,
5764
YamuxFrame::Flag::RST, stream_id, 0);
@@ -71,13 +78,15 @@ namespace libp2p::connection {
7178

7279
YamuxFrame::ByteArray dataMsg(YamuxFrame::StreamId stream_id,
7380
gsl::span<const uint8_t> data) {
81+
TRACE("yamux dataMsg, stream_id={}, size={}", stream_id, data.size());
7482
return YamuxFrame::frameBytes(YamuxFrame::kDefaultVersion,
7583
YamuxFrame::FrameType::DATA,
7684
YamuxFrame::Flag::NONE, stream_id,
7785
static_cast<uint32_t>(data.size()), data);
7886
}
7987

8088
YamuxFrame::ByteArray goAwayMsg(YamuxFrame::GoAwayError error) {
89+
TRACE("yamux goAwayMsg");
8190
return YamuxFrame::frameBytes(
8291
YamuxFrame::kDefaultVersion, YamuxFrame::FrameType::GO_AWAY,
8392
YamuxFrame::Flag::NONE, 0, static_cast<uint32_t>(error));

0 commit comments

Comments
 (0)