|
174 | 174 |
|
175 | 175 | -define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]). |
176 | 176 |
|
177 | | --define(INCR_STATS(Incs, Measure, State), |
| 177 | +-define(INCR_STATS(Type, Key, Inc, Measure, State), |
178 | 178 | case rabbit_event:stats_level(State, #ch.stats_timer) of |
179 | | - fine -> incr_stats(Incs, Measure); |
180 | | - _ -> ok |
| 179 | + fine -> |
| 180 | + rabbit_core_metrics:channel_stats(Type, Measure, {self(), Key}, Inc), |
| 181 | + %% Keys in the process dictionary are used to clean up the core metrics |
| 182 | + put({Type, Key}, none); |
| 183 | + _ -> |
| 184 | + ok |
| 185 | + end). |
| 186 | + |
| 187 | +-define(INCR_STATS(Type, Key, Inc, Measure), |
| 188 | + begin |
| 189 | + rabbit_core_metrics:channel_stats(Type, Measure, {self(), Key}, Inc), |
| 190 | + %% Keys in the process dictionary are used to clean up the core metrics |
| 191 | + put({Type, Key}, none) |
181 | 192 | end). |
182 | 193 |
|
183 | 194 | %%---------------------------------------------------------------------------- |
@@ -1658,7 +1669,7 @@ basic_return(#basic_message{exchange_name = ExchangeName, |
1658 | 1669 | content = Content}, |
1659 | 1670 | State = #ch{protocol = Protocol, writer_pid = WriterPid}, |
1660 | 1671 | Reason) -> |
1661 | | - ?INCR_STATS([{exchange_stats, ExchangeName, 1}], return_unroutable, State), |
| 1672 | + ?INCR_STATS(exchange_stats, ExchangeName, 1, return_unroutable, State), |
1662 | 1673 | {_Close, ReplyCode, ReplyText} = Protocol:lookup_amqp_exception(Reason), |
1663 | 1674 | ok = rabbit_writer:send_command( |
1664 | 1675 | WriterPid, |
@@ -1695,14 +1706,14 @@ record_sent(ConsumerTag, AckRequired, |
1695 | 1706 | user = #user{username = Username}, |
1696 | 1707 | conn_name = ConnName, |
1697 | 1708 | channel = ChannelNum}) -> |
1698 | | - ?INCR_STATS([{queue_stats, QName, 1}], case {ConsumerTag, AckRequired} of |
1699 | | - {none, true} -> get; |
1700 | | - {none, false} -> get_no_ack; |
1701 | | - {_ , true} -> deliver; |
1702 | | - {_ , false} -> deliver_no_ack |
1703 | | - end, State), |
| 1709 | + ?INCR_STATS(queue_stats, QName, 1, case {ConsumerTag, AckRequired} of |
| 1710 | + {none, true} -> get; |
| 1711 | + {none, false} -> get_no_ack; |
| 1712 | + {_ , true} -> deliver; |
| 1713 | + {_ , false} -> deliver_no_ack |
| 1714 | + end, State), |
1704 | 1715 | case Redelivered of |
1705 | | - true -> ?INCR_STATS([{queue_stats, QName, 1}], redeliver, State); |
| 1716 | + true -> ?INCR_STATS(queue_stats, QName, 1, redeliver, State); |
1706 | 1717 | false -> ok |
1707 | 1718 | end, |
1708 | 1719 | rabbit_trace:tap_out(Msg, ConnName, ChannelNum, Username, TraceState), |
@@ -1747,11 +1758,11 @@ ack(Acked, State = #ch{queue_names = QNames}) -> |
1747 | 1758 | foreach_per_queue( |
1748 | 1759 | fun (QPid, MsgIds) -> |
1749 | 1760 | ok = rabbit_amqqueue:ack(QPid, MsgIds, self()), |
1750 | | - ?INCR_STATS(case dict:find(QPid, QNames) of |
1751 | | - {ok, QName} -> Count = length(MsgIds), |
1752 | | - [{queue_stats, QName, Count}]; |
1753 | | - error -> [] |
1754 | | - end, ack, State) |
| 1761 | + case dict:find(QPid, QNames) of |
| 1762 | + {ok, QName} -> Count = length(MsgIds), |
| 1763 | + ?INCR_STATS(queue_stats, QName, Count, ack, State); |
| 1764 | + error -> ok |
| 1765 | + end |
1755 | 1766 | end, Acked), |
1756 | 1767 | ok = notify_limiter(State#ch.limiter, Acked). |
1757 | 1768 |
|
@@ -1816,7 +1827,7 @@ deliver_to_queues({#delivery{message = #basic_message{exchange_name = XName}, |
1816 | 1827 | confirm = false, |
1817 | 1828 | mandatory = false}, |
1818 | 1829 | []}, State) -> %% optimisation |
1819 | | - ?INCR_STATS([{exchange_stats, XName, 1}], publish, State), |
| 1830 | + ?INCR_STATS(exchange_stats, XName, 1, publish, State), |
1820 | 1831 | State; |
1821 | 1832 | deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ |
1822 | 1833 | exchange_name = XName}, |
@@ -1853,11 +1864,15 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ |
1853 | 1864 | Message, State1), |
1854 | 1865 | State3 = process_routing_confirm( Confirm, DeliveredQPids, MsgSeqNo, |
1855 | 1866 | XName, State2), |
1856 | | - ?INCR_STATS([{exchange_stats, XName, 1} | |
1857 | | - [{queue_exchange_stats, {QName, XName}, 1} || |
1858 | | - QPid <- DeliveredQPids, |
1859 | | - {ok, QName} <- [dict:find(QPid, QNames1)]]], |
1860 | | - publish, State3), |
| 1867 | + case rabbit_event:stats_level(State3, #ch.stats_timer) of |
| 1868 | + fine -> |
| 1869 | + ?INCR_STATS(exchange_stats, XName, 1, publish), |
| 1870 | + [?INCR_STATS(queue_exchange_stats, {QName, XName}, 1, publish) || |
| 1871 | + QPid <- DeliveredQPids, |
| 1872 | + {ok, QName} <- [dict:find(QPid, QNames1)]]; |
| 1873 | + _ -> |
| 1874 | + ok |
| 1875 | + end, |
1861 | 1876 | State3. |
1862 | 1877 |
|
1863 | 1878 | process_routing_mandatory(false, _, _MsgSeqNo, _Msg, State) -> |
@@ -1900,7 +1915,7 @@ send_confirms(State = #ch{tx = none, confirmed = C}) -> |
1900 | 1915 | ok -> MsgSeqNos = |
1901 | 1916 | lists:foldl( |
1902 | 1917 | fun ({MsgSeqNo, XName}, MSNs) -> |
1903 | | - ?INCR_STATS([{exchange_stats, XName, 1}], |
| 1918 | + ?INCR_STATS(exchange_stats, XName, 1, |
1904 | 1919 | confirm, State), |
1905 | 1920 | [MsgSeqNo | MSNs] |
1906 | 1921 | end, [], lists:append(C)), |
@@ -1997,13 +2012,6 @@ i(Item, _) -> |
1997 | 2012 | name(#ch{conn_name = ConnName, channel = Channel}) -> |
1998 | 2013 | list_to_binary(rabbit_misc:format("~s (~p)", [ConnName, Channel])). |
1999 | 2014 |
|
2000 | | -incr_stats(Incs, Measure) -> |
2001 | | - [begin |
2002 | | - rabbit_core_metrics:channel_stats(Type, Measure, {self(), Key}, Inc), |
2003 | | - %% Keys in the process dictionary are used to clean up the core metrics |
2004 | | - put({Type, Key}, none) |
2005 | | - end || {Type, Key, Inc} <- Incs]. |
2006 | | - |
2007 | 2015 | emit_stats(State) -> emit_stats(State, []). |
2008 | 2016 |
|
2009 | 2017 | emit_stats(State, Extra) -> |
|
0 commit comments