Skip to content

Commit 5156e62

Browse files
committed
SERVER-34636 enable opportunistic writes with ssl
Add a side channel to the asio ssl streams which allows us to capture the work remaining for the tcp send in the event of a full send buffer. This makes opportunistic writes safe for ssl sockets
1 parent 751e985 commit 5156e62

File tree

4 files changed

+59
-3
lines changed

4 files changed

+59
-3
lines changed

src/mongo/transport/session_asio.h

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -379,13 +379,19 @@ class TransportLayerASIO::ASIOSession final : public Session {
379379
#ifdef MONGO_CONFIG_SSL
380380
_ranHandshake = true;
381381
if (_sslSocket) {
382+
#ifdef __linux__
383+
// We do some trickery in asio (see moreToSend), which appears to work well on linux,
384+
// but fails on other platforms.
385+
return opportunisticWrite(*_sslSocket, buffers);
386+
#else
382387
if (_blockingMode == Async) {
383388
// Opportunistic writes are broken for async egress SSL (switching between blocking
384389
// and non-blocking mode corrupts the TLS exchange).
385390
return asio::async_write(*_sslSocket, buffers, UseFuture{});
386391
} else {
387392
return opportunisticWrite(*_sslSocket, buffers);
388393
}
394+
#endif
389395
}
390396
#endif
391397
return opportunisticWrite(_socket, buffers);
@@ -414,19 +420,60 @@ class TransportLayerASIO::ASIOSession final : public Session {
414420
}
415421
}
416422

423+
/**
424+
* moreToSend checks the ssl socket after an opportunisticWrite. If there are still bytes to
425+
* send, we manually send them off the underlying socket. Then we hook that up with a future
426+
* that gets us back to sending from the ssl side.
427+
*
428+
* There are two variants because we call opportunisticWrite on generic sockets and ssl sockets.
429+
* The generic socket impl never has more to send (because it doesn't have an inner socket it
430+
* needs to keep sending).
431+
*/
432+
template <typename ConstBufferSequence>
433+
boost::optional<Future<size_t>> moreToSend(GenericSocket& socket,
434+
const ConstBufferSequence& buffers,
435+
size_t size) {
436+
return boost::none;
437+
}
438+
439+
#ifdef MONGO_CONFIG_SSL
440+
template <typename ConstBufferSequence>
441+
boost::optional<Future<size_t>> moreToSend(asio::ssl::stream<GenericSocket>& socket,
442+
const ConstBufferSequence& buffers,
443+
size_t sizeFromBefore) {
444+
if (_sslSocket->getCoreOutputBuffer().size()) {
445+
return opportunisticWrite(getSocket(), _sslSocket->getCoreOutputBuffer())
446+
.then([this, &socket, buffers, sizeFromBefore](size_t) {
447+
return opportunisticWrite(socket, buffers)
448+
.then([sizeFromBefore](size_t justWritten) {
449+
return justWritten + sizeFromBefore;
450+
});
451+
});
452+
}
453+
454+
return boost::none;
455+
}
456+
#endif
457+
417458
template <typename Stream, typename ConstBufferSequence>
418459
Future<size_t> opportunisticWrite(Stream& stream, const ConstBufferSequence& buffers) {
419460
std::error_code ec;
420461
auto size = asio::write(stream, buffers, ec);
421462
if (((ec == asio::error::would_block) || (ec == asio::error::try_again)) &&
422463
(_blockingMode == Async)) {
464+
423465
// asio::write is a loop internally, so some of buffers may have been read into already.
424466
// So we need to adjust the buffers passed into async_write to be offset by size, if
425467
// size is > 0.
426468
ConstBufferSequence asyncBuffers(buffers);
427469
if (size > 0) {
428470
asyncBuffers += size;
429471
}
472+
473+
if (auto more = moreToSend(stream, asyncBuffers, size)) {
474+
return std::move(*more);
475+
}
476+
430477
return asio::async_write(stream, asyncBuffers, UseFuture{})
431478
.then([size](size_t asyncSize) {
432479
// Add back in the size written opportunistically.

src/mongo/util/net/ssl/detail/io.hpp

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,18 +48,20 @@ std::size_t io(Stream& next_layer, stream_core& core, const Operation& op, asio:
4848

4949
case engine::want_output_and_retry:
5050

51+
core.output_ = core.engine_.get_output(core.output_buffer_);
5152
// Get output data from the engine and write it to the underlying
5253
// transport.
53-
asio::write(next_layer, core.engine_.get_output(core.output_buffer_), ec);
54+
core.output_ += asio::write(next_layer, core.output_, ec);
5455

5556
// Try the operation again.
5657
continue;
5758

5859
case engine::want_output:
5960

61+
core.output_ = core.engine_.get_output(core.output_buffer_);
6062
// Get output data from the engine and write it to the underlying
6163
// transport.
62-
asio::write(next_layer, core.engine_.get_output(core.output_buffer_), ec);
64+
core.output_ += asio::write(next_layer, core.output_, ec);
6365

6466
// Operation is complete. Return result to caller.
6567
core.engine_.map_error_code(ec);
@@ -75,7 +77,7 @@ std::size_t io(Stream& next_layer, stream_core& core, const Operation& op, asio:
7577

7678
// Operation failed. Return result to caller.
7779
core.engine_.map_error_code(ec);
78-
return 0;
80+
return bytes_transferred;
7981
}
8082

8183
template <typename Stream, typename Operation, typename Handler>

src/mongo/util/net/ssl/detail/stream_core.hpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,9 @@ struct stream_core {
125125

126126
// The buffer pointing to the engine's unconsumed input.
127127
asio::const_buffer input_;
128+
129+
// The buffer pointing to the engine's unconsumed output.
130+
asio::mutable_buffer output_;
128131
};
129132

130133
} // namespace detail

src/mongo/util/net/ssl/stream.hpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -577,6 +577,10 @@ class stream : public stream_base, private noncopyable {
577577
return init.result.get();
578578
}
579579

580+
asio::mutable_buffer& getCoreOutputBuffer() {
581+
return core_.output_;
582+
}
583+
580584
private:
581585
Stream next_layer_;
582586
detail::stream_core core_;

0 commit comments

Comments
 (0)