1010#include < boost/redis/resp3/type.hpp>
1111#include < boost/redis/resp3/parser.hpp>
1212#include < boost/redis/adapter/ignore.hpp>
13+ #include < boost/redis/detail/helper.hpp>
1314#include < boost/asio/read.hpp>
1415#include < boost/asio/compose.hpp>
1516#include < boost/asio/coroutine.hpp>
16- #include < boost/redis/detail/helper .hpp>
17+ #include < boost/asio/post .hpp>
1718
1819#include < string_view>
1920#include < limits>
@@ -27,6 +28,59 @@ std::string_view buffer_view(DynamicBuffer buf) noexcept
2728 return std::string_view{start, std::size (buf)};
2829}
2930
31+ template <class AsyncReadStream , class DynamicBuffer >
32+ class append_some_op {
33+ private:
34+ AsyncReadStream& stream_;
35+ DynamicBuffer buf_;
36+ std::size_t size_ = 0 ;
37+ std::size_t tmp_ = 0 ;
38+ asio::coroutine coro_{};
39+
40+ public:
41+ append_some_op (AsyncReadStream& stream, DynamicBuffer buf, std::size_t size)
42+ : stream_ {stream}
43+ , buf_ {std::move (buf)}
44+ , size_{size}
45+ { }
46+
47+ template <class Self >
48+ void operator ()( Self& self
49+ , system::error_code ec = {}
50+ , std::size_t n = 0 )
51+ {
52+ BOOST_ASIO_CORO_REENTER (coro_)
53+ {
54+ tmp_ = buf_.size ();
55+ buf_.grow (size_);
56+
57+ BOOST_ASIO_CORO_YIELD
58+ stream_.async_read_some (buf_.data (tmp_, size_), std::move (self));
59+ if (ec) {
60+ self.complete (ec, 0 );
61+ return ;
62+ }
63+
64+ buf_.shrink (buf_.size () - tmp_ - n);
65+ self.complete ({}, n);
66+ }
67+ }
68+ };
69+
70+ template <class AsyncReadStream , class DynamicBuffer , class CompletionToken >
71+ auto
72+ async_append_some (
73+ AsyncReadStream& stream,
74+ DynamicBuffer buffer,
75+ std::size_t size,
76+ CompletionToken&& token)
77+ {
78+ return asio::async_compose
79+ < CompletionToken
80+ , void (system::error_code, std::size_t )
81+ >(append_some_op<AsyncReadStream, DynamicBuffer> {stream, buffer, size}, token, stream);
82+ }
83+
3084template <
3185 class AsyncReadStream ,
3286 class DynamicBuffer ,
@@ -37,8 +91,8 @@ class parse_op {
3791 DynamicBuffer buf_;
3892 resp3::parser parser_;
3993 ResponseAdapter adapter_;
40- std:: size_t tmp_ = 0 ;
41- resp3::parser::result res_ ;
94+ bool needs_rescheduling_ = true ;
95+ system::error_code ec_ ;
4296 asio::coroutine coro_{};
4397
4498 static std::size_t const growth = 1024 ;
@@ -53,36 +107,29 @@ class parse_op {
53107 template <class Self >
54108 void operator ()( Self& self
55109 , system::error_code ec = {}
56- , std::size_t n = 0 )
110+ , std::size_t = 0 )
57111 {
58- BOOST_ASIO_CORO_REENTER (coro_) for (;;) {
59-
60- res_ = parser_.consume (buffer_view (buf_), ec);
61- if (ec)
62- return self.complete (ec, 0 );
63-
64- if (!res_.has_value ()) {
65- tmp_ = buf_.size ();
66- buf_.grow (parser_.get_suggested_buffer_growth (growth));
67-
112+ BOOST_ASIO_CORO_REENTER (coro_)
113+ {
114+ while (!resp3::parse (parser_, buffer_view (buf_), adapter_, ec)) {
115+ needs_rescheduling_ = false ;
68116 BOOST_ASIO_CORO_YIELD
69- stream_. async_read_some (
70- buf_. data (tmp_ , parser_.get_suggested_buffer_growth (growth) ),
117+ async_append_some (
118+ stream_, buf_, parser_.get_suggested_buffer_growth (growth),
71119 std::move (self));
72- BOOST_REDIS_CHECK_OP1 (;);
73-
74- buf_. shrink (buf_. size () - tmp_ - n) ;
75- continue ;
120+ if (ec) {
121+ self. complete (ec, 0 );
122+ return ;
123+ }
76124 }
77125
78- adapter_ (res_.value (), ec);
79- if (ec)
80- return self.complete (ec, 0 );
81-
82- if (parser_.done ()) {
83- self.complete ({}, parser_.get_consumed ());
84- return ;
126+ ec_ = ec;
127+ if (needs_rescheduling_) {
128+ BOOST_ASIO_CORO_YIELD
129+ asio::post (std::move (self));
85130 }
131+
132+ self.complete (ec_, parser_.get_consumed ());
86133 }
87134 }
88135};
0 commit comments