Skip to content

Commit 1a6942a

Browse files
authored
Refactor read/write 1 (#316)
* noise read writer * secio connection * noise connection * mplex stream * server echo session * yamux --------- Signed-off-by: turuslan <[email protected]>
1 parent 39ebf82 commit 1a6942a

File tree

15 files changed

+228
-392
lines changed

15 files changed

+228
-392
lines changed

include/libp2p/basic/cb.hpp

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
/**
2+
* Copyright Quadrivium LLC
3+
* All Rights Reserved
4+
* SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
#pragma once
8+
9+
#include <qtils/outcome.hpp>
10+
11+
namespace libp2p {
12+
using CbOutcomeVoid = std::function<void(outcome::result<void>)>;
13+
} // namespace libp2p
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
/**
2+
* Copyright Quadrivium LLC
3+
* All Rights Reserved
4+
* SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
#pragma once
8+
9+
#include <qtils/macro/common.hpp>
10+
11+
#define _IF_ERROR_CB_RETURN(tmp, r) \
12+
({ \
13+
auto &&_r = r; \
14+
if (not _r.has_value()) { \
15+
return cb(_r.error()); \
16+
} \
17+
_r.value(); \
18+
})
19+
#define IF_ERROR_CB_RETURN(r) _IF_ERROR_CB_RETURN(QTILS_UNIQUE_NAME(tmp), r)

include/libp2p/muxer/mplex/mplex_stream.hpp

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,6 @@ namespace libp2p::connection {
8484
private:
8585
struct Reading {
8686
BytesOut out;
87-
size_t bytes;
8887
ReadCallbackFunc cb;
8988
};
9089

@@ -101,8 +100,7 @@ namespace libp2p::connection {
101100
boost::optional<Reading> reading_;
102101

103102
/// Queue of write requests that were received when stream was writing
104-
std::deque<std::tuple<std::vector<uint8_t>, size_t, WriteCallbackFunc>>
105-
write_queue_{};
103+
std::deque<std::tuple<Bytes, WriteCallbackFunc>> write_queue_{};
106104

107105
mutable std::mutex write_queue_mutex_;
108106

include/libp2p/muxer/yamux/yamux_stream.hpp

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -112,21 +112,22 @@ namespace libp2p::connection {
112112
void closedByConnection(std::error_code ec);
113113

114114
private:
115+
struct Reading {
116+
BytesOut out;
117+
ReadCallbackFunc cb;
118+
};
119+
115120
/// Performs close-related cleanup and notifications
116-
void doClose(std::error_code ec, bool notify_read_side);
121+
void doClose(std::error_code ec);
117122

118123
/// Called by read*() functions
119-
void doRead(BytesOut out, size_t bytes, ReadCallbackFunc cb);
120-
121-
/// Completes the read operation if any, clears read state
122-
[[nodiscard]] std::pair<ReadCallbackFunc, outcome::result<size_t>>
123-
readCompleted();
124+
void doRead(BytesOut out, ReadCallbackFunc cb);
124125

125126
/// Dequeues data from write queue and sends to the wire in async manner
126127
void doWrite();
127128

128129
/// Called by write*() functions
129-
void doWrite(BytesIn in, size_t bytes, WriteCallbackFunc cb);
130+
void doWrite(BytesIn in, WriteCallbackFunc cb);
130131

131132
/// Clears close callback state
132133
[[nodiscard]] std::pair<VoidResultHandlerFunc, outcome::result<void>>
@@ -169,16 +170,7 @@ namespace libp2p::connection {
169170
basic::ReadBuffer internal_read_buffer_;
170171

171172
/// True if read operation is active
172-
bool is_reading_ = false;
173-
174-
/// Read callback, it is non-zero during async data receive
175-
ReadCallbackFunc read_cb_;
176-
177-
/// TODO: get rid of this. client's read buffer
178-
BytesOut external_read_buffer_;
179-
180-
/// Size of message being read
181-
size_t read_message_size_ = 0;
173+
std::optional<Reading> reading_;
182174

183175
/// adjustWindowSize() callback, triggers when receive window size
184176
/// becomes greater or equal then desired

include/libp2p/muxer/yamux/yamuxed_connection.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ namespace libp2p::connection {
159159
void doWrite(WriteQueueItem packet);
160160

161161
/// Write callback
162-
void onDataWritten(outcome::result<size_t> res, StreamId stream_id);
162+
void onDataWritten(outcome::result<void> res, WriteQueueItem &&packet);
163163

164164
/// Creates new yamux stream
165165
std::shared_ptr<Stream> createStream(StreamId stream_id);

include/libp2p/protocol/echo/server_echo_session.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,9 @@ namespace libp2p::protocol {
4141

4242
void onRead(outcome::result<size_t> rread);
4343

44-
void doWrite(size_t size);
44+
void doWrite();
4545

46-
void onWrite(outcome::result<size_t> rwrite);
46+
void onWrite(outcome::result<void> rwrite);
4747
};
4848

4949
} // namespace libp2p::protocol

include/libp2p/security/noise/noise_connection.hpp

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,6 @@ namespace libp2p::connection {
2424
class NoiseConnection : public SecureConnection,
2525
public std::enable_shared_from_this<NoiseConnection> {
2626
public:
27-
using BufferList = std::list<Bytes>;
28-
29-
struct OperationContext {
30-
size_t bytes_served; /// written or read bytes count
31-
const size_t total_bytes; /// total size to process
32-
BufferList::iterator write_buffer; /// temporary data storage
33-
};
34-
3527
~NoiseConnection() override = default;
3628

3729
NoiseConnection(
@@ -70,18 +62,6 @@ namespace libp2p::connection {
7062
outcome::result<crypto::PublicKey> remotePublicKey() const override;
7163

7264
private:
73-
void readSome(BytesOut out,
74-
size_t bytes,
75-
OperationContext ctx,
76-
ReadCallbackFunc cb);
77-
78-
void write(BytesIn in,
79-
size_t bytes,
80-
OperationContext ctx,
81-
WriteCallbackFunc cb);
82-
83-
void eraseWriteBuffer(BufferList::iterator &iterator);
84-
8565
std::shared_ptr<LayerConnection> connection_;
8666
crypto::PublicKey local_;
8767
crypto::PublicKey remote_;
@@ -90,7 +70,6 @@ namespace libp2p::connection {
9070
std::shared_ptr<security::noise::CipherState> decoder_cs_;
9171
std::shared_ptr<Bytes> frame_buffer_;
9272
std::shared_ptr<security::noise::InsecureReadWriter> framer_;
93-
BufferList write_buffers_;
9473
log::Logger log_ = log::createLogger("NoiseConnection");
9574

9675
public:

include/libp2p/security/secio/secio_connection.hpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
#include <mutex>
1212
#include <queue>
1313

14+
#include <libp2p/basic/cb.hpp>
1415
#include <libp2p/common/metrics/instance_count.hpp>
1516
#include <libp2p/connection/secure_connection.hpp>
1617
#include <libp2p/crypto/common.hpp>
@@ -115,7 +116,7 @@ namespace libp2p::connection {
115116
/**
116117
* Retrieves the next available SECIO message from the network.
117118
*/
118-
void readNextMessage(ReadCallbackFunc cb);
119+
void readNextMessage(CbOutcomeVoid cb);
119120

120121
/**
121122
* Moves decrypted bytes from internal buffer to the output buffer.
@@ -163,6 +164,7 @@ namespace libp2p::connection {
163164
std::queue<uint8_t> user_data_buffer_;
164165

165166
std::shared_ptr<Bytes> read_buffer_;
167+
std::shared_ptr<Bytes> write_buffer_;
166168

167169
log::Logger log_ = log::createLogger("SecIoConnection");
168170

src/muxer/mplex/mplex_stream.cpp

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ namespace libp2p::connection {
4646
}
4747

4848
bool MplexStream::readTry() {
49-
auto size{std::min(read_buffer_.size(), reading_->bytes)};
49+
auto size{std::min(read_buffer_.size(), reading_->out.size())};
5050
if (size == 0) {
5151
return false;
5252
}
@@ -65,20 +65,21 @@ namespace libp2p::connection {
6565
}
6666

6767
void MplexStream::readSome(BytesOut out, size_t bytes, ReadCallbackFunc cb) {
68+
ambigousSize(out, bytes);
6869
if (is_reset_) {
6970
return cb(Error::STREAM_RESET_BY_PEER);
7071
}
7172
if (!is_readable_) {
7273
return cb(Error::STREAM_NOT_READABLE);
7374
}
74-
if (bytes == 0 || out.empty() || static_cast<size_t>(out.size()) < bytes) {
75+
if (out.empty()) {
7576
return cb(Error::STREAM_INVALID_ARGUMENT);
7677
}
7778
if (reading_.has_value()) {
7879
return cb(Error::STREAM_IS_READING);
7980
}
8081

81-
reading_.emplace(Reading{out, bytes, std::move(cb)});
82+
reading_.emplace(Reading{out, std::move(cb)});
8283
if (readTry()) {
8384
return;
8485
}
@@ -98,13 +99,13 @@ namespace libp2p::connection {
9899
if (!is_writable_) {
99100
return cb(Error::STREAM_NOT_WRITABLE);
100101
}
101-
if (bytes == 0 || in.empty() || static_cast<size_t>(in.size()) < bytes) {
102+
if (in.empty()) {
102103
return cb(Error::STREAM_INVALID_ARGUMENT);
103104
}
104105
if (is_writing_) {
105106
std::vector<uint8_t> in_vector(in.begin(), in.end());
106107
std::lock_guard<std::mutex> lock(write_queue_mutex_);
107-
write_queue_.emplace_back(in_vector, bytes, cb);
108+
write_queue_.emplace_back(in_vector, cb);
108109
return;
109110
}
110111
if (connection_.expired()) {
@@ -129,7 +130,7 @@ namespace libp2p::connection {
129130
// check if new write messages were received while stream was writing
130131
// and propagate these messages
131132
if (not self->write_queue_.empty()) {
132-
auto [in, bytes, cb] = self->write_queue_.front();
133+
auto [in, cb] = self->write_queue_.front();
133134
self->write_queue_.pop_front();
134135
writeReturnSize(self, in, cb);
135136
}

0 commit comments

Comments
 (0)