16
16
#include < boost/redis/resp3/type.hpp>
17
17
#include < boost/redis/config.hpp>
18
18
#include < boost/redis/detail/runner.hpp>
19
+ #include < boost/redis/usage.hpp>
19
20
20
21
#include < boost/system.hpp>
21
22
#include < boost/asio/basic_stream_socket.hpp>
40
41
#include < type_traits>
41
42
#include < functional>
42
43
43
- namespace boost ::redis::detail {
44
+ namespace boost ::redis::detail
45
+ {
44
46
45
47
template <class Conn >
46
48
struct exec_op {
47
49
using req_info_type = typename Conn::req_info;
48
50
using adapter_type = typename Conn::adapter_type;
49
51
50
52
Conn* conn_ = nullptr ;
51
- request const * req_ = nullptr ;
52
- adapter_type adapter{};
53
53
std::shared_ptr<req_info_type> info_ = nullptr ;
54
54
asio::coroutine coro{};
55
55
@@ -60,14 +60,12 @@ struct exec_op {
60
60
{
61
61
// Check whether the user wants to wait for the connection to
62
62
// be stablished.
63
- if (req_->get_config ().cancel_if_not_connected && !conn_->is_open ()) {
63
+ if (info_-> req_ ->get_config ().cancel_if_not_connected && !conn_->is_open ()) {
64
64
BOOST_ASIO_CORO_YIELD
65
65
asio::post (std::move (self));
66
66
return self.complete (error::not_connected, 0 );
67
67
}
68
68
69
- info_ = std::allocate_shared<req_info_type>(asio::get_associated_allocator (self), *req_, adapter, conn_->get_executor ());
70
-
71
69
conn_->add_request_info (info_);
72
70
73
71
EXEC_OP_WAIT:
@@ -329,6 +327,10 @@ class connection_base {
329
327
// / Type of the next layer
330
328
using next_layer_type = asio::ssl::stream<asio::basic_stream_socket<asio::ip::tcp, Executor>>;
331
329
330
+ using clock_type = std::chrono::steady_clock;
331
+ using clock_traits_type = asio::wait_traits<clock_type>;
332
+ using timer_type = asio::basic_waitable_timer<clock_type, clock_traits_type, executor_type>;
333
+
332
334
using receiver_adapter_type = std::function<void (resp3::basic_node<std::string_view> const &, system::error_code&)>;
333
335
334
336
using this_type = connection_base<Executor>;
@@ -391,12 +393,14 @@ class connection_base {
391
393
{
392
394
using namespace boost ::redis::adapter;
393
395
auto f = boost_redis_adapt (resp);
394
- BOOST_ASSERT_MSG (req.size () <= f.get_supported_response_size (), " Request and response have incompatible sizes." );
396
+ BOOST_ASSERT_MSG (req.get_expected_responses () <= f.get_supported_response_size (), " Request and response have incompatible sizes." );
397
+
398
+ auto info = std::make_shared<req_info>(req, f, get_executor ());
395
399
396
400
return asio::async_compose
397
401
< CompletionToken
398
402
, void (system::error_code, std::size_t )
399
- >(redis::detail:: exec_op<this_type>{this , &req, f }, token, writer_timer_);
403
+ >(exec_op<this_type>{this , info }, token, writer_timer_);
400
404
}
401
405
402
406
template <class Response , class CompletionToken >
@@ -427,12 +431,12 @@ class connection_base {
427
431
receive_adapter_ = adapter::detail::make_adapter_wrapper (g);
428
432
}
429
433
434
+ usage get_usage () const noexcept
435
+ { return usage_; }
436
+
430
437
private:
431
- using clock_type = std::chrono::steady_clock;
432
- using clock_traits_type = asio::wait_traits<clock_type>;
433
- using timer_type = asio::basic_waitable_timer<clock_type, clock_traits_type, executor_type>;
434
438
using receive_channel_type = asio::experimental::channel<executor_type, void (system::error_code, std::size_t )>;
435
- using runner_type = redis::detail:: runner<executor_type>;
439
+ using runner_type = runner<executor_type>;
436
440
using adapter_type = std::function<void (std::size_t , resp3::basic_node<std::string_view> const &, system::error_code&)>;
437
441
438
442
auto use_ssl () const noexcept
@@ -545,7 +549,7 @@ class connection_base {
545
549
, action_{action::none}
546
550
, req_{&req}
547
551
, adapter_{}
548
- , cmds_{ std::size ( req)}
552
+ , expected_responses_{ req. get_expected_responses ( )}
549
553
, status_{status::none}
550
554
, ec_{{}}
551
555
, read_size_{0 }
@@ -554,7 +558,7 @@ class connection_base {
554
558
555
559
adapter_ = [this , adapter](node_type const & nd, system::error_code& ec)
556
560
{
557
- auto const i = std::size (* req_) - cmds_ ;
561
+ auto const i = req_-> get_expected_responses ( ) - expected_responses_ ;
558
562
adapter (i, nd, ec);
559
563
};
560
564
}
@@ -611,7 +615,7 @@ class connection_base {
611
615
wrapped_adapter_type adapter_;
612
616
613
617
// Contains the number of commands that haven't been read yet.
614
- std::size_t cmds_ ;
618
+ std::size_t expected_responses_ ;
615
619
status status_;
616
620
617
621
system::error_code ec_;
@@ -625,16 +629,16 @@ class connection_base {
625
629
626
630
using reqs_type = std::deque<std::shared_ptr<req_info>>;
627
631
628
- template <class , class > friend struct redis ::detail:: reader_op;
629
- template <class , class > friend struct redis ::detail:: writer_op;
630
- template <class , class > friend struct redis ::detail:: run_op;
631
- template <class > friend struct redis ::detail:: exec_op;
632
- template <class , class , class > friend struct redis ::detail:: run_all_op;
632
+ template <class , class > friend struct reader_op ;
633
+ template <class , class > friend struct writer_op ;
634
+ template <class , class > friend struct run_op ;
635
+ template <class > friend struct exec_op ;
636
+ template <class , class , class > friend struct run_all_op ;
633
637
634
638
void cancel_push_requests ()
635
639
{
636
640
auto point = std::stable_partition (std::begin (reqs_), std::end (reqs_), [](auto const & ptr) {
637
- return !(ptr->is_staged () && ptr->req_ ->size () == 0 );
641
+ return !(ptr->is_staged () && ptr->req_ ->get_expected_responses () == 0 );
638
642
});
639
643
640
644
std::for_each (point, std::end (reqs_), [](auto const & ptr) {
@@ -671,7 +675,7 @@ class connection_base {
671
675
return asio::async_compose
672
676
< CompletionToken
673
677
, void (system::error_code)
674
- >(redis::detail:: reader_op<this_type, Logger>{this , l}, token, writer_timer_);
678
+ >(reader_op<this_type, Logger>{this , l}, token, writer_timer_);
675
679
}
676
680
677
681
template <class CompletionToken , class Logger >
@@ -680,7 +684,7 @@ class connection_base {
680
684
return asio::async_compose
681
685
< CompletionToken
682
686
, void (system::error_code)
683
- >(redis::detail:: writer_op<this_type, Logger>{this , l}, token, writer_timer_);
687
+ >(writer_op<this_type, Logger>{this , l}, token, writer_timer_);
684
688
}
685
689
686
690
template <class Logger , class CompletionToken >
@@ -691,7 +695,7 @@ class connection_base {
691
695
return asio::async_compose
692
696
< CompletionToken
693
697
, void (system::error_code)
694
- >(redis::detail:: run_op<this_type, Logger>{this , l}, token, writer_timer_);
698
+ >(run_op<this_type, Logger>{this , l}, token, writer_timer_);
695
699
}
696
700
697
701
[[nodiscard]] bool coalesce_requests ()
@@ -706,8 +710,11 @@ class connection_base {
706
710
// Stage the request.
707
711
write_buffer_ += ri->req_ ->payload ();
708
712
ri->mark_staged ();
713
+ usage_.commands_sent += ri->expected_responses_ ;
709
714
});
710
715
716
+ usage_.bytes_sent += std::size (write_buffer_);
717
+
711
718
return point != std::cend (reqs_);
712
719
}
713
720
@@ -758,13 +765,13 @@ class connection_base {
758
765
return
759
766
(resp3::to_type (read_buffer_.front ()) == resp3::type::push)
760
767
|| reqs_.empty ()
761
- || (!reqs_.empty () && reqs_.front ()->cmds_ == 0 )
768
+ || (!reqs_.empty () && reqs_.front ()->expected_responses_ == 0 )
762
769
|| !is_waiting_response (); // Added to deal with MONITOR.
763
770
}
764
771
765
772
auto get_suggested_buffer_growth () const noexcept
766
773
{
767
- return parser_.get_suggested_buffer_growth (1024 );
774
+ return parser_.get_suggested_buffer_growth (4096 );
768
775
}
769
776
770
777
enum class parse_result { needs_more, push, resp };
@@ -773,6 +780,14 @@ class connection_base {
773
780
774
781
parse_ret_type on_finish_parsing (parse_result t)
775
782
{
783
+ if (t == parse_result::push) {
784
+ usage_.pushes_received += 1 ;
785
+ usage_.push_bytes_received += parser_.get_consumed ();
786
+ } else {
787
+ usage_.responses_received += 1 ;
788
+ usage_.response_bytes_received += parser_.get_consumed ();
789
+ }
790
+
776
791
on_push_ = false ;
777
792
dbuf_.consume (parser_.get_consumed ());
778
793
auto const res = std::make_pair (t, parser_.get_consumed ());
@@ -808,7 +823,7 @@ class connection_base {
808
823
BOOST_ASSERT_MSG (is_waiting_response (), " Not waiting for a response (using MONITOR command perhaps?)" );
809
824
BOOST_ASSERT (!reqs_.empty ());
810
825
BOOST_ASSERT (reqs_.front () != nullptr );
811
- BOOST_ASSERT (reqs_.front ()->cmds_ != 0 );
826
+ BOOST_ASSERT (reqs_.front ()->expected_responses_ != 0 );
812
827
813
828
if (!resp3::parse (parser_, data, reqs_.front ()->adapter_ , ec))
814
829
return std::make_pair (parse_result::needs_more, 0 );
@@ -821,7 +836,7 @@ class connection_base {
821
836
822
837
reqs_.front ()->read_size_ += parser_.get_consumed ();
823
838
824
- if (--reqs_.front ()->cmds_ == 0 ) {
839
+ if (--reqs_.front ()->expected_responses_ == 0 ) {
825
840
// Done with this request.
826
841
reqs_.front ()->proceed ();
827
842
reqs_.pop_front ();
@@ -849,6 +864,8 @@ class connection_base {
849
864
reqs_type reqs_;
850
865
resp3::parser parser_{};
851
866
bool on_push_ = false ;
867
+
868
+ usage usage_;
852
869
};
853
870
854
871
} // boost::redis::detail
0 commit comments