Skip to content

Commit 8ea1af5

Browse files
authored
Use vector in write queue (#77)
* Use vector in write queue Signed-off-by: kamilsa <[email protected]>
1 parent 5ff9050 commit 8ea1af5

File tree

4 files changed

+14
-9
lines changed

4 files changed

+14
-9
lines changed

include/libp2p/muxer/mplex/mplex_stream.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ namespace libp2p::connection {
115115
bool data_notified_ = false;
116116

117117
/// Queue of write requests that were received when stream was writing
118-
std::deque<std::tuple<gsl::span<const uint8_t>, size_t, WriteCallbackFunc>>
118+
std::deque<std::tuple<std::vector<uint8_t>, size_t, WriteCallbackFunc>>
119119
write_queue_{};
120120

121121
mutable std::mutex write_queue_mutex_;

include/libp2p/muxer/yamux/yamux_stream.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ namespace libp2p::connection {
164164

165165
/// Queue of write requests that were received when stream was writing
166166
std::deque<
167-
std::tuple<gsl::span<const uint8_t>, size_t, WriteCallbackFunc, bool>>
167+
std::tuple<std::vector<uint8_t>, size_t, WriteCallbackFunc, bool>>
168168
write_queue_{};
169169

170170
mutable std::mutex write_queue_mutex_;

src/muxer/mplex/mplex_stream.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,8 +136,9 @@ namespace libp2p::connection {
136136
return cb(Error::INVALID_ARGUMENT);
137137
}
138138
if (is_writing_) {
139+
std::vector<uint8_t> in_vector(in.begin(), in.end());
139140
std::lock_guard<std::mutex> lock(write_queue_mutex_);
140-
write_queue_.emplace_back(in, bytes, cb);
141+
write_queue_.emplace_back(in_vector, bytes, cb);
141142
return;
142143
}
143144
if (connection_.expired()) {

src/muxer/yamux/yamux_stream.cpp

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -197,14 +197,16 @@ namespace libp2p::connection {
197197
}
198198
if (is_writing_) {
199199
std::lock_guard<std::mutex> lock(write_queue_mutex_);
200-
write_queue_.emplace_back(in, bytes, cb, some);
200+
201+
std::vector<uint8_t> in_vector(in.begin(), in.end());
202+
write_queue_.emplace_back(in_vector, bytes, cb, some);
201203
return;
202204
}
203205

204206
beginWrite(std::move(cb));
205207

206-
auto write_lambda = [self{shared_from_this()}, in, bytes,
207-
some]() mutable -> bool {
208+
auto write_lambda = [self{shared_from_this()}, bytes,
209+
some](gsl::span<const uint8_t> in) mutable -> bool {
208210
if (self->send_window_size_ >= bytes) {
209211
// we can write - window size on the other side allows us
210212
auto conn_wptr = self->yamuxed_connection_;
@@ -226,7 +228,7 @@ namespace libp2p::connection {
226228
};
227229

228230
// if we can write now - do it and return
229-
if (write_lambda()) {
231+
if (write_lambda(in)) {
230232
return;
231233
}
232234

@@ -236,8 +238,10 @@ namespace libp2p::connection {
236238
return endWrite(Error::CONNECTION_IS_DEAD);
237239
}
238240
yamuxed_connection_.lock()->streamOnWindowUpdate(
239-
stream_id_, [write_lambda = std::move(write_lambda)]() mutable {
240-
return write_lambda();
241+
stream_id_,
242+
[write_lambda = std::move(write_lambda),
243+
in_bytes = std::vector<uint8_t>{in.begin(), in.end()}]() mutable {
244+
return write_lambda(in_bytes);
241245
});
242246
}
243247

0 commit comments

Comments
 (0)