Skip to content

Commit a63198a

Browse files
Merge pull request ClickHouse#80188 from ClickHouse/async-metrics-sockets
Add asynchronous metrics about sockets
2 parents 0bf759c + 7b8b813 commit a63198a

File tree

4 files changed

+140
-0
lines changed

4 files changed

+140
-0
lines changed

src/Common/AsynchronousMetrics.cpp

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@
2323
# include <jemalloc/jemalloc.h>
2424
#endif
2525

26+
#if defined(OS_LINUX)
27+
# include <netinet/tcp.h>
28+
#endif
29+
2630

2731
namespace DB
2832
{
@@ -86,6 +90,8 @@ AsynchronousMetrics::AsynchronousMetrics(
8690
openFileIfExists("/proc/cpuinfo", cpuinfo);
8791
openFileIfExists("/proc/sys/fs/file-nr", file_nr);
8892
openFileIfExists("/proc/net/dev", net_dev);
93+
openFileIfExists("/proc/net/tcp", net_tcp);
94+
openFileIfExists("/proc/net/tcp6", net_tcp6);
8995

9096
/// CGroups v2
9197
openCgroupv2MetricFile("memory.max", cgroupmem_limit_in_bytes);
@@ -1607,6 +1613,132 @@ void AsynchronousMetrics::update(TimePoint update_time, bool force_update)
16071613
}
16081614
}
16091615

1616+
if (net_tcp || net_tcp6)
1617+
{
1618+
UInt64 total_sockets = 0;
1619+
UInt64 sockets_by_state[16] = {};
1620+
UInt64 transmit_queue_size = 0;
1621+
UInt64 receive_queue_size = 0;
1622+
UInt64 unrecovered_retransmits = 0;
1623+
std::unordered_set<std::string> remote_addresses;
1624+
1625+
auto process_net = [&](const char * path, auto & file)
1626+
{
1627+
try
1628+
{
1629+
file->rewind();
1630+
/// Header
1631+
skipToNextLineOrEOF(*file);
1632+
1633+
while (!file->eof())
1634+
{
1635+
/// Line number
1636+
skipWhitespaceIfAny(*file, true);
1637+
skipStringUntilWhitespace(*file);
1638+
skipWhitespaceIfAny(*file, true);
1639+
1640+
/// Local address and port
1641+
skipStringUntilWhitespace(*file);
1642+
skipWhitespaceIfAny(*file, true);
1643+
1644+
/// Remote address and port
1645+
String remote_address_and_port;
1646+
readStringUntilWhitespace(remote_address_and_port, *file);
1647+
skipWhitespaceIfAny(*file, true);
1648+
if (auto pos = remote_address_and_port.find(':'); pos != std::string::npos)
1649+
remote_address_and_port.resize(pos);
1650+
remote_addresses.emplace(remote_address_and_port);
1651+
1652+
/// Socket state
1653+
UInt8 state = 0;
1654+
char state_hex[2]{};
1655+
readPODBinary(state_hex, *file);
1656+
skipWhitespaceIfAny(*file, true);
1657+
state = unhex2(state_hex);
1658+
if (state < 16)
1659+
++sockets_by_state[state];
1660+
1661+
/// tx_queue:rx_queue
1662+
String tx_rx_queue;
1663+
readStringUntilWhitespace(tx_rx_queue, *file);
1664+
skipWhitespaceIfAny(*file, true);
1665+
if (auto pos = tx_rx_queue.find(':'); pos != std::string::npos)
1666+
{
1667+
std::string_view tx_queue = std::string_view(tx_rx_queue).substr(0, pos);
1668+
std::string_view rx_queue = std::string_view(tx_rx_queue).substr(pos + 1);
1669+
1670+
if (tx_queue.size() == 8 && rx_queue.size() == 8)
1671+
{
1672+
UInt32 tx_queue_size = unhexUInt<UInt32>(tx_queue.data()); // NOLINT
1673+
UInt32 rx_queue_size = unhexUInt<UInt32>(rx_queue.data()); // NOLINT
1674+
1675+
transmit_queue_size += tx_queue_size;
1676+
receive_queue_size += rx_queue_size;
1677+
}
1678+
}
1679+
1680+
/// tr:when
1681+
skipStringUntilWhitespace(*file);
1682+
skipWhitespaceIfAny(*file, true);
1683+
1684+
/// Retransmits
1685+
String retransmits_str;
1686+
readStringUntilWhitespace(retransmits_str, *file);
1687+
if (retransmits_str.size() == 8)
1688+
{
1689+
UInt32 retransmits = unhexUInt<UInt32>(retransmits_str.data());
1690+
unrecovered_retransmits += retransmits;
1691+
}
1692+
1693+
skipToNextLineOrEOF(*file);
1694+
++total_sockets;
1695+
}
1696+
}
1697+
catch (...)
1698+
{
1699+
tryLogCurrentException(__PRETTY_FUNCTION__);
1700+
openFileIfExists(path, file);
1701+
}
1702+
};
1703+
1704+
if (net_tcp)
1705+
process_net("/proc/net/tcp", net_tcp);
1706+
1707+
if (net_tcp6)
1708+
process_net("/proc/net/tcp6", net_tcp6);
1709+
1710+
new_values["NetworkTCPSockets"] = { total_sockets,
1711+
"Total number of network sockets used on the server across TCPv4 and TCPv6, in all states." };
1712+
1713+
auto process_socket_state = [&](UInt8 state, const char * description)
1714+
{
1715+
if (state < 16 && sockets_by_state[state])
1716+
new_values[fmt::format("NetworkTCPSockets_{}", description)] = { sockets_by_state[state],
1717+
"Total number of network sockets in the specific state on the server across TCPv4 and TCPv6." };
1718+
};
1719+
1720+
process_socket_state(TCP_ESTABLISHED, "ESTABLISHED");
1721+
process_socket_state(TCP_SYN_SENT, "SYN_SENT");
1722+
process_socket_state(TCP_SYN_RECV, "SYN_RECV");
1723+
process_socket_state(TCP_FIN_WAIT1, "FIN_WAIT1");
1724+
process_socket_state(TCP_FIN_WAIT2, "FIN_WAIT2");
1725+
process_socket_state(TCP_TIME_WAIT, "TIME_WAIT");
1726+
process_socket_state(TCP_CLOSE, "CLOSE");
1727+
process_socket_state(TCP_CLOSE_WAIT, "CLOSE_WAIT");
1728+
process_socket_state(TCP_LAST_ACK, "LAST_ACK");
1729+
process_socket_state(TCP_LISTEN, "LISTEN");
1730+
process_socket_state(TCP_CLOSING, "CLOSING");
1731+
1732+
new_values["NetworkTCPTransmitQueue"] = { transmit_queue_size,
1733+
"Total size of transmit queues of network sockets used on the server across TCPv4 and TCPv6." };
1734+
new_values["NetworkTCPReceiveQueue"] = { receive_queue_size,
1735+
"Total size of receive queues of network sockets used on the server across TCPv4 and TCPv6." };
1736+
new_values["NetworkTCPUnrecoveredRetransmits"] = { unrecovered_retransmits,
1737+
"Total size of current retransmits (unrecovered at this moment) of network sockets used on the server across TCPv4 and TCPv6." };
1738+
new_values["NetworkTCPSocketRemoteAddresses"] = { remote_addresses.size(),
1739+
"Total number of unique remote addresses of network sockets used on the server across TCPv4 and TCPv6." };
1740+
}
1741+
16101742
if (vm_max_map_count)
16111743
{
16121744
try

src/Common/AsynchronousMetrics.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,8 @@ class AsynchronousMetrics
132132
std::optional<ReadBufferFromFilePRead> file_nr TSA_GUARDED_BY(data_mutex);
133133
std::optional<ReadBufferFromFilePRead> uptime TSA_GUARDED_BY(data_mutex);
134134
std::optional<ReadBufferFromFilePRead> net_dev TSA_GUARDED_BY(data_mutex);
135+
std::optional<ReadBufferFromFilePRead> net_tcp TSA_GUARDED_BY(data_mutex);
136+
std::optional<ReadBufferFromFilePRead> net_tcp6 TSA_GUARDED_BY(data_mutex);
135137

136138
std::optional<ReadBufferFromFilePRead> cgroupmem_limit_in_bytes TSA_GUARDED_BY(data_mutex);
137139
std::optional<ReadBufferFromFilePRead> cgroupmem_usage_in_bytes TSA_GUARDED_BY(data_mutex);
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
1
2+
1
3+
1
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
SELECT value > 0 FROM system.asynchronous_metrics WHERE name = 'NetworkTCPSockets';
2+
SELECT value > 0 FROM system.asynchronous_metrics WHERE name = 'NetworkTCPSockets_LISTEN';
3+
SELECT value > 0 FROM system.asynchronous_metrics WHERE name = 'NetworkTCPSocketRemoteAddresses';

0 commit comments

Comments
 (0)