Skip to content

Commit 8b52253

Browse files
authored
Missing write requests (#75)
* Fix missing callbacks in yamux Signed-off-by: kamilsa <[email protected]>
1 parent a85364e commit 8b52253

File tree

4 files changed

+43
-2
lines changed

4 files changed

+43
-2
lines changed

include/libp2p/muxer/mplex/mplex_stream.hpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@
66
#ifndef LIBP2P_MPLEX_STREAM_HPP
77
#define LIBP2P_MPLEX_STREAM_HPP
88

9+
#include <deque>
10+
#include <mutex>
11+
912
#include <boost/asio/streambuf.hpp>
1013
#include <boost/noncopyable.hpp>
1114
#include <libp2p/common/logger.hpp>
@@ -111,6 +114,12 @@ namespace libp2p::connection {
111114
std::function<void(outcome::result<size_t>)> data_notifyee_;
112115
bool data_notified_ = false;
113116

117+
/// Queue of write requests that were received when stream was writing
118+
std::deque<std::tuple<gsl::span<const uint8_t>, size_t, WriteCallbackFunc>>
119+
write_queue_{};
120+
121+
mutable std::mutex write_queue_mutex_;
122+
114123
/// is the stream opened for reads?
115124
bool is_readable_ = true;
116125
bool is_reading_ = false;

include/libp2p/muxer/yamux/yamux_stream.hpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@
66
#ifndef LIBP2P_YAMUX_STREAM_HPP
77
#define LIBP2P_YAMUX_STREAM_HPP
88

9+
#include <deque>
10+
#include <mutex>
11+
912
#include <boost/asio/streambuf.hpp>
1013
#include <boost/noncopyable.hpp>
1114
#include <libp2p/connection/stream.hpp>
@@ -159,6 +162,13 @@ namespace libp2p::connection {
159162
/// write callback, non-zero during async sends
160163
WriteCallbackFunc write_cb_;
161164

165+
/// Queue of write requests that were received when stream was writing
166+
std::deque<
167+
std::tuple<gsl::span<const uint8_t>, size_t, WriteCallbackFunc, bool>>
168+
write_queue_{};
169+
170+
mutable std::mutex write_queue_mutex_;
171+
162172
/// starts async write operation
163173
void beginWrite(WriteCallbackFunc cb);
164174

src/muxer/mplex/mplex_stream.cpp

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,9 @@ namespace libp2p::connection {
136136
return cb(Error::INVALID_ARGUMENT);
137137
}
138138
if (is_writing_) {
139-
return cb(Error::IS_WRITING);
139+
std::lock_guard<std::mutex> lock(write_queue_mutex_);
140+
write_queue_.emplace_back(in, bytes, cb);
141+
return;
140142
}
141143
if (connection_.expired()) {
142144
return cb(Error::CONNECTION_IS_DEAD);
@@ -153,6 +155,15 @@ namespace libp2p::connection {
153155
write_res.error().message());
154156
}
155157
cb(std::forward<decltype(write_res)>(write_res));
158+
159+
std::lock_guard<std::mutex> lock(self->write_queue_mutex_);
160+
// check if new write messages were received while stream was writing
161+
// and propagate these messages
162+
if (not self->write_queue_.empty()) {
163+
auto [in, bytes, cb] = self->write_queue_.front();
164+
self->write_queue_.pop_front();
165+
self->write(in, bytes, cb);
166+
}
156167
});
157168
}
158169

src/muxer/yamux/yamux_stream.cpp

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,15 @@ namespace libp2p::connection {
179179
write_cb_ = WriteCallbackFunc{};
180180
cb(result);
181181
}
182+
183+
std::lock_guard<std::mutex> lock(write_queue_mutex_);
184+
// check if new write messages were received while stream was writing
185+
// and propagate these messages
186+
if (not write_queue_.empty()) {
187+
auto [in, bytes, cb, some] = write_queue_.front();
188+
write_queue_.pop_front();
189+
write(in, bytes, cb, some);
190+
}
182191
}
183192

184193
void YamuxStream::write(gsl::span<const uint8_t> in, size_t bytes,
@@ -187,7 +196,9 @@ namespace libp2p::connection {
187196
return cb(Error::NOT_WRITABLE);
188197
}
189198
if (is_writing_) {
190-
return cb(Error::IS_WRITING);
199+
std::lock_guard<std::mutex> lock(write_queue_mutex_);
200+
write_queue_.emplace_back(in, bytes, cb, some);
201+
return;
191202
}
192203

193204
beginWrite(std::move(cb));

0 commit comments

Comments
 (0)