diff --git a/include/boost/beast2/body_read_stream.hpp b/include/boost/beast2/body_read_stream.hpp new file mode 100644 index 0000000..808acd0 --- /dev/null +++ b/include/boost/beast2/body_read_stream.hpp @@ -0,0 +1,239 @@ +// +// Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/vinniefalco/http_io +// + +#ifndef BOOST_HTTP_IO_BODY_READ_STREAM_HPP +#define BOOST_HTTP_IO_BODY_READ_STREAM_HPP + +#include +#include +#include +#include + +namespace boost { +namespace beast2 { + + /** A body reader for HTTP/1 messages. + + This type meets the requirements of asio's + AsyncReadStream, and is constructed with a reference to an + underlying AsyncReadStream. + + Any call to `async_read_some` initially triggers reads + from the underlying stream until all of the HTTP headers + have been read and processed. Thereafter, each subsequent + call to `async_read_some` triggers a call to the underlying + stream's `async_read_some` method, with the resulting body + data stored in the referenced MutableBufferSequence. + + All processing depends on a http_io::parser object owned + by the caller and referenced in the construction of this + object. + + @see + @ref response_parser, + @ref request_parser. + */ +template +class body_read_stream { + +public: + + /** The type of the executor associated with the stream. + + This will be the type of executor used to invoke completion + handlers which do not have an explicit associated executor. + */ + typedef AsyncReadStream::executor_type executor_type; + + /** Get the executor associated with the object. + + This function may be used to obtain the executor object that the + stream uses to dispatch completion handlers without an assocaited + executor. + + @return A copy of the executor that stream will use to dispatch + handlers. + */ + executor_type get_executor() { + return us_.get_executor(); + } + + /** Constructor + + This constructor creates the stream by forwarding all arguments + to the underlying socket. The socket then needs to be open and + connected or accepted before data can be sent or received on it. + + @param us The underlying stream from which the HTTP message is read. + This object's executor is initialized to that of the + underlying stream. + + @param pr A http_proto::parser object which will perform the parsing + of the HTTP message and extraction of the body. This must + be initialized by the caller and ownership of the parser is + retained by the caller, which must guarantee that it remains + valid until the handler is called. + */ + explicit + body_read_stream( + AsyncReadStream& us, + http_proto::parser& pr); + + /** Read some data asynchronously. + + This function is used to asynchronously read data from the stream. + + This call always returns immediately. The asynchronous operation + will continue until one of the following conditions is true: + + @li The HTTP headers are read in full from the underlying stream + and one or more bytes of the body are read from the stream and + stored in the buffer `mb`. + + @li An error occurs. + + The algorithm, known as a composed asynchronous operation, + is implemented in terms of calls to the underlying stream's `async_read_some` + function. The program must ensure that no other calls to @ref + `async_read_some` are performed until this operation completes. + + @param mb The buffers into which the data will be read. If the size + of the buffers is zero bytes, the operation always completes immediately + with no error. + Although the buffers object may be copied as necessary, ownership of the + underlying memory blocks is retained by the caller, which must guarantee + that they remain valid until the handler is called. + Where the mb buffer is not of sufficient size to hold the read data, the + remainder may be read by subsequent calls to this function. + + @param handler The completion handler to invoke when the operation + completes. The implementation takes ownership of the handler by + performing a decay-copy. The equivalent function signature of + the handler must be: + @code + void handler( + error_code error, // Result of operation. + std::size_t bytes_transferred // Number of bytes read. + ); + @endcode + If the handler has an associated immediate executor, + an immediate completion will be dispatched to it. + Otherwise, the handler will not be invoked from within + this function. Invocation of the handler will be performed + by dispatching to the immediate executor. If no + immediate executor is specified, this is equivalent + to using `net::post`. + + @note The `async_read_some` operation may not receive all of the requested + number of bytes. Consider using the function `net::async_read` if you need + to ensure that the requested amount of data is read before the asynchronous + operation completes. + + @par Per-Operation Cancellation + + This asynchronous operation supports cancellation for the following + net::cancellation_type values: + + @li @c net::cancellation_type::terminal + @li @c net::cancellation_type::partial + @li @c net::cancellation_type::total + + if they are also supported by the underlying stream's @c async_read_some + operation. + */ + template< + class MutableBufferSequence, + BOOST_ASIO_COMPLETION_TOKEN_FOR( + void(system::error_code, std::size_t)) CompletionToken> + BOOST_ASIO_INITFN_AUTO_RESULT_TYPE(CompletionToken, + void(system::error_code, std::size_t)) + async_read_some( + const MutableBufferSequence& mb, + CompletionToken&& handler); + + /** Read all remaining data asynchronously. + This function is used to asynchronously read data from the stream. + + This call always returns immediately. The asynchronous operation + will continue until one of the following conditions is true: + + @li The HTTP message is read in full from the underlying stream. + + @li An error occurs. + + The algorithm, known as a composed asynchronous operation, + is implemented in terms of calls to the underlying stream's `async_read_some` + function. The program must ensure that no other calls to @ref + `async_read_some` are performed until this operation completes. + + @param mb The buffers into which the body data will be read. If the size + of the buffers is zero bytes, the operation always completes immediately + with no error. + Although the buffers object may be copied as necessary, ownership of the + underlying memory blocks is retained by the caller, which must guarantee + that they remain valid until the handler is called. + Where the mb buffer is not of sufficient size to hold the read data, the + remainder may be read by subsequent calls to this function. + + @param handler The completion handler to invoke when the operation + completes. The implementation takes ownership of the handler by + performing a decay-copy. The equivalent function signature of + the handler must be: + @code + void handler( + error_code error, // Result of operation. + std::size_t bytes_transferred // Number of bytes read. + ); + @endcode + If the handler has an associated immediate executor, + an immediate completion will be dispatched to it. + Otherwise, the handler will not be invoked from within + this function. Invocation of the handler will be performed + by dispatching to the immediate executor. If no + immediate executor is specified, this is equivalent + to using `net::post`. + + @note The `async_read_some` operation may not receive all of the requested + number of bytes. Consider using the function `net::async_read` if you need + to ensure that the requested amount of data is read before the asynchronous + operation completes. + + @par Per-Operation Cancellation + + This asynchronous operation supports cancellation for the following + net::cancellation_type values: + + @li @c net::cancellation_type::terminal + @li @c net::cancellation_type::partial + @li @c net::cancellation_type::total + + if they are also supported by the underlying stream's @c async_read_some + operation. + */ + template< + class MutableBufferSequence, + BOOST_ASIO_COMPLETION_TOKEN_FOR( + void(system::error_code, std::size_t)) CompletionToken> + BOOST_ASIO_INITFN_AUTO_RESULT_TYPE(CompletionToken, + void(system::error_code, std::size_t)) + async_read( + const MutableBufferSequence& mb, + CompletionToken&& handler); + +private: + AsyncReadStream& us_; + http_proto::parser& pr_; +}; + +} // beast2 +} // boost + +#include + +#endif diff --git a/include/boost/beast2/impl/body_read_stream.hpp b/include/boost/beast2/impl/body_read_stream.hpp new file mode 100644 index 0000000..e342e3a --- /dev/null +++ b/include/boost/beast2/impl/body_read_stream.hpp @@ -0,0 +1,184 @@ +// +// Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/vinniefalco/beast2 +// + +#ifndef BOOST_HTTP_IO_IMPL_BODY_READ_STREAM_HPP +#define BOOST_HTTP_IO_IMPL_BODY_READ_STREAM_HPP + +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +namespace boost { +namespace beast2 { + +namespace detail { + +template +class body_read_stream_op : public asio::coroutine { + + AsyncReadStream& us_; + const MutableBufferSequence& mb_; + http_proto::parser& pr_; + bool some_ = false; + +public: + + body_read_stream_op( + AsyncReadStream& s, + const MutableBufferSequence& mb, + http_proto::parser& pr, + bool some) noexcept + : us_(s) + , mb_(mb) + , pr_(pr) + , some_(some) + { + } + + template + void + operator()( + Self& self, + system::error_code ec = {}, + std::size_t bytes_transferred = 0) + { + boost::ignore_unused(bytes_transferred); + + BOOST_ASIO_CORO_REENTER(*this) + { + if (!pr_.got_header()) { + BOOST_ASIO_CORO_YIELD + { + BOOST_ASIO_HANDLER_LOCATION(( + __FILE__, __LINE__, + "async_read_header")); + beast2::async_read_header< + AsyncReadStream, + Self > ( + us_, + pr_, + std::move(self)); + } + if (ec.failed()) goto upcall; + } + + BOOST_ASIO_CORO_YIELD + { + if (some_) { + BOOST_ASIO_HANDLER_LOCATION(( + __FILE__, __LINE__, + "async_read_some")); + beast2::async_read_some( + us_, + pr_, + std::move(self)); + } + else { + BOOST_ASIO_HANDLER_LOCATION(( + __FILE__, __LINE__, + "async_read")); + beast2::async_read( + us_, + pr_, + std::move(self)); + } + } + + upcall: + std::size_t n = 0; + + if (!ec.failed()) + { + auto source_buf = pr_.pull_body(); + + n = boost::asio::buffer_copy(mb_, source_buf); + + pr_.consume_body(n); + + ec = (n != 0) ? system::error_code{} : asio::stream_errc::eof; + } + + self.complete(ec, n); + } + } +}; + +} // detail + +//------------------------------------------------ + + // TODO: copy in Beast's stream traits to check if AsyncReadStream + // is an AsyncReadStream, and also static_assert that body_read_stream is too. + + + +template +body_read_stream::body_read_stream( + AsyncReadStream& und_stream + , http_proto::parser& pr) + : + us_(und_stream) + , pr_(pr) +{ +} + + +template +template< + class MutableBufferSequence, + BOOST_ASIO_COMPLETION_TOKEN_FOR( + void(system::error_code, std::size_t)) CompletionToken> +BOOST_ASIO_INITFN_AUTO_RESULT_TYPE(CompletionToken, + void(system::error_code, std::size_t)) +body_read_stream::async_read_some( + const MutableBufferSequence& mb + , CompletionToken&& token) +{ + return asio::async_compose< + CompletionToken, + void(system::error_code, std::size_t)>( + detail::body_read_stream_op< + MutableBufferSequence, AsyncReadStream>{us_, mb, pr_, true}, + token, + asio::get_associated_executor(us_) + ); +} + +template +template< + class MutableBufferSequence, + BOOST_ASIO_COMPLETION_TOKEN_FOR( + void(system::error_code, std::size_t)) CompletionToken> +BOOST_ASIO_INITFN_AUTO_RESULT_TYPE(CompletionToken, + void(system::error_code, std::size_t)) + body_read_stream::async_read( + const MutableBufferSequence& mb + , CompletionToken&& token) +{ + return asio::async_compose< + CompletionToken, + void(system::error_code, std::size_t)>( + detail::body_read_stream_op< + MutableBufferSequence, AsyncReadStream>{us_, mb, pr_, false}, + token, + asio::get_associated_executor(us_) + ); +} + +} // beast2 +} // boost + +#endif diff --git a/test/unit/Jamfile b/test/unit/Jamfile index 21409c1..226c4a1 100644 --- a/test/unit/Jamfile +++ b/test/unit/Jamfile @@ -34,6 +34,7 @@ project local SOURCES = beast.cpp + body_read_stream.cpp buffer.cpp client.cpp read.cpp diff --git a/test/unit/body_read_stream.cpp b/test/unit/body_read_stream.cpp new file mode 100644 index 0000000..d398617 --- /dev/null +++ b/test/unit/body_read_stream.cpp @@ -0,0 +1,182 @@ +// +// Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/vinniefalco/http_io +// + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "test_suite.hpp" + +#include +#include + +namespace boost { + +template +struct MockReadStream { + MockReadStream(Executor& ex, const std::string &data, std::size_t chunk) : ex_(ex), mock_data_(data), chunk_(chunk), sent_(0) + { + } + + typedef Executor executor_type; + + Executor get_executor() const { return ex_; } + + //template auto async_write_some(asio::const_buffer buf, Token&& token) { + // return asio::async_initiate( // + // [&ex_](auto h, auto buf) { + // asio::dispatch(ex_, [=, h = std::move(h)]() mutable { + // std::move(h)({}, asio::buffer_size(buf)); + // }); + // }, + // token, buf); + //} + + template< + class MutableBufferSequence, + BOOST_ASIO_COMPLETION_TOKEN_FOR( + void(system::error_code, std::size_t)) CompletionToken> + BOOST_ASIO_INITFN_AUTO_RESULT_TYPE(CompletionToken, + void(system::error_code, std::size_t)) + async_read_some( + const MutableBufferSequence& buf + , CompletionToken&& token) + { + return asio::async_initiate( + [this]( + CompletionToken&& token + , const MutableBufferSequence& buf) + { + boost::string_view source_str{ + mock_data_.data() + sent_, + mock_data_.size() - sent_ }; + auto source_buf = asio::buffer(source_str); + + //std::size_t chunk_size = std::max( + // (std::size_t)(rand() % mock_data_.size()), + // (std::size_t)1); + + std::size_t n = asio::buffer_copy(buf, source_buf, chunk_); + + system::error_code ec = (n != 0) ? system::error_code{} : asio::stream_errc::eof; + + sent_ += n; + + asio::post(ex_, asio::prepend(std::move(token), ec, n)); + }, + token, buf); + } + + Executor& ex_; + std::string mock_data_; + std::size_t sent_; + std::size_t chunk_; +}; + +namespace beast2 { + +struct body_read_stream_test +{ + void + run() + { + std::string data = "HTTP/1.1 200 OK\r\n" + "Content-Type: text/html\r\n" + "Last-Modified: Thu, 09 Oct 2025 16:42:02 GMT\r\n" + "Cache-Control: max-age=86000\r\n" + "Date: Thu, 16 Oct 2025 15:09:10 GMT\r\n" + "Content-Length: 60\r\n" + "Connection: keep-alive\r\n" + "\r\n" + "Hello World\r\n"; + + std::string data2 = "HTTP/1.0 200 OK\r\n" + "Content-Type: text/html\r\n" + "Last-Modified: Thu, 09 Oct 2025 16:42:02 GMT\r\n" + "Cache-Control: max-age=86000\r\n" + "Date: Thu, 16 Oct 2025 15:09:10 GMT\r\n" + //"Content-Length: 60\r\n" + "Connection: keep-alive\r\n" + "\r\n" + ; + + std::cout << data2.size() << std::endl; + + for (std::size_t chunk = 1; chunk < 400; chunk++) + { + asio::io_context ioc; + auto strand = asio::make_strand(ioc); + MockReadStream ms(strand, data, chunk); + + std::array arr; + auto buf = asio::buffer(arr); + + rts::context rts_ctx; + http_proto::response_parser::config cfg; + cfg.body_limit = 1024 * 1024; + cfg.min_buffer = 1024 * 1024; + http_proto::install_parser_service(rts_ctx, cfg); + + http_proto::response_parser pr(rts_ctx); + pr.reset(); + pr.start(); + + body_read_stream brs(ms, pr); + + brs.async_read_some(buf, + [this, &chunk, &brs, &arr, &buf](system::error_code ec, std::size_t bytes_transferred) + { + if (ec.failed()) std::cerr << ec.message() << std::endl; + + BOOST_TEST_EQ(ec.failed(), false); + BOOST_TEST_GE(bytes_transferred, 1); + { + std::string value(arr.data(), bytes_transferred); + std::cout << chunk << ": " << value << std::endl; + //BOOST_TEST_EQ(value, std::string("Hello W")); + } + + if (!ec.failed()) { + brs.async_read_some(buf, + [this, &brs, &arr, &buf](system::error_code ec, std::size_t bytes_transferred) + { + if (ec.failed()) std::cerr << ec.message() << std::endl; + + BOOST_TEST_EQ(ec.failed(), false); + BOOST_TEST_GE(bytes_transferred, 1); + { + std::string value(arr.data(), bytes_transferred); + //BOOST_TEST_EQ(value, std::string("orld")); + } + }); + } + }); + ioc.run(); + } + } +}; + +TEST_SUITE( + body_read_stream_test, + "boost.http_io.body_read_stream.hello_world"); + +} // http_io +} // boost