Skip to content

Commit 5ea0a93

Browse files
authored
Merge pull request prometheus#2322 from babiel/tcpstat-netlink
refactor: Use netlink for tcpstat collector
2 parents 19e4a23 + 9ece38f commit 5ea0a93

File tree

4 files changed

+130
-130
lines changed

4 files changed

+130
-130
lines changed

collector/fixtures/proc/net/tcpstat

Lines changed: 0 additions & 3 deletions
This file was deleted.

collector/tcpstat_linux.go

Lines changed: 87 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,12 @@ package collector
1818

1919
import (
2020
"fmt"
21-
"io"
22-
"io/ioutil"
2321
"os"
24-
"strconv"
25-
"strings"
22+
"syscall"
23+
"unsafe"
2624

2725
"github.com/go-kit/log"
26+
"github.com/mdlayher/netlink"
2827
"github.com/prometheus/client_golang/prometheus"
2928
)
3029

@@ -80,16 +79,64 @@ func NewTCPStatCollector(logger log.Logger) (Collector, error) {
8079
}, nil
8180
}
8281

82+
// InetDiagSockID (inet_diag_sockid) contains the socket identity.
83+
// https://github.com/torvalds/linux/blob/v4.0/include/uapi/linux/inet_diag.h#L13
84+
type InetDiagSockID struct {
85+
SourcePort [2]byte
86+
DestPort [2]byte
87+
SourceIP [4][4]byte
88+
DestIP [4][4]byte
89+
Interface uint32
90+
Cookie [2]uint32
91+
}
92+
93+
// InetDiagReqV2 (inet_diag_req_v2) is used to request diagnostic data.
94+
// https://github.com/torvalds/linux/blob/v4.0/include/uapi/linux/inet_diag.h#L37
95+
type InetDiagReqV2 struct {
96+
Family uint8
97+
Protocol uint8
98+
Ext uint8
99+
Pad uint8
100+
States uint32
101+
ID InetDiagSockID
102+
}
103+
104+
const sizeOfDiagRequest = 0x38
105+
106+
func (req *InetDiagReqV2) Serialize() []byte {
107+
return (*(*[sizeOfDiagRequest]byte)(unsafe.Pointer(req)))[:]
108+
}
109+
110+
func (req *InetDiagReqV2) Len() int {
111+
return sizeOfDiagRequest
112+
}
113+
114+
type InetDiagMsg struct {
115+
Family uint8
116+
State uint8
117+
Timer uint8
118+
Retrans uint8
119+
ID InetDiagSockID
120+
Expires uint32
121+
RQueue uint32
122+
WQueue uint32
123+
UID uint32
124+
Inode uint32
125+
}
126+
127+
func parseInetDiagMsg(b []byte) *InetDiagMsg {
128+
return (*InetDiagMsg)(unsafe.Pointer(&b[0]))
129+
}
130+
83131
func (c *tcpStatCollector) Update(ch chan<- prometheus.Metric) error {
84-
tcpStats, err := getTCPStats(procFilePath("net/tcp"))
132+
tcpStats, err := getTCPStats(syscall.AF_INET)
85133
if err != nil {
86134
return fmt.Errorf("couldn't get tcpstats: %w", err)
87135
}
88136

89137
// if enabled ipv6 system
90-
tcp6File := procFilePath("net/tcp6")
91-
if _, hasIPv6 := os.Stat(tcp6File); hasIPv6 == nil {
92-
tcp6Stats, err := getTCPStats(tcp6File)
138+
if _, hasIPv6 := os.Stat(procFilePath("net/tcp6")); hasIPv6 == nil {
139+
tcp6Stats, err := getTCPStats(syscall.AF_INET6)
93140
if err != nil {
94141
return fmt.Errorf("couldn't get tcp6stats: %w", err)
95142
}
@@ -102,59 +149,51 @@ func (c *tcpStatCollector) Update(ch chan<- prometheus.Metric) error {
102149
for st, value := range tcpStats {
103150
ch <- c.desc.mustNewConstMetric(value, st.String())
104151
}
152+
105153
return nil
106154
}
107155

108-
func getTCPStats(statsFile string) (map[tcpConnectionState]float64, error) {
109-
file, err := os.Open(statsFile)
156+
func getTCPStats(family uint8) (map[tcpConnectionState]float64, error) {
157+
const TCPFAll = 0xFFF
158+
const InetDiagInfo = 2
159+
const SockDiagByFamily = 20
160+
161+
conn, err := netlink.Dial(syscall.NETLINK_INET_DIAG, nil)
110162
if err != nil {
111-
return nil, err
163+
return nil, fmt.Errorf("couldn't connect netlink: %w", err)
164+
}
165+
defer conn.Close()
166+
167+
msg := netlink.Message{
168+
Header: netlink.Header{
169+
Type: SockDiagByFamily,
170+
Flags: syscall.NLM_F_REQUEST | syscall.NLM_F_DUMP,
171+
},
172+
Data: (&InetDiagReqV2{
173+
Family: family,
174+
Protocol: syscall.IPPROTO_TCP,
175+
States: TCPFAll,
176+
Ext: 0 | 1<<(InetDiagInfo-1),
177+
}).Serialize(),
112178
}
113-
defer file.Close()
114-
115-
return parseTCPStats(file)
116-
}
117179

118-
func parseTCPStats(r io.Reader) (map[tcpConnectionState]float64, error) {
119-
tcpStats := map[tcpConnectionState]float64{}
120-
contents, err := ioutil.ReadAll(r)
180+
messages, err := conn.Execute(msg)
121181
if err != nil {
122182
return nil, err
123183
}
124184

125-
for _, line := range strings.Split(string(contents), "\n")[1:] {
126-
parts := strings.Fields(line)
127-
if len(parts) == 0 {
128-
continue
129-
}
130-
if len(parts) < 5 {
131-
return nil, fmt.Errorf("invalid TCP stats line: %q", line)
132-
}
133-
134-
qu := strings.Split(parts[4], ":")
135-
if len(qu) < 2 {
136-
return nil, fmt.Errorf("cannot parse tx_queues and rx_queues: %q", line)
137-
}
138-
139-
tx, err := strconv.ParseUint(qu[0], 16, 64)
140-
if err != nil {
141-
return nil, err
142-
}
143-
tcpStats[tcpConnectionState(tcpTxQueuedBytes)] += float64(tx)
144-
145-
rx, err := strconv.ParseUint(qu[1], 16, 64)
146-
if err != nil {
147-
return nil, err
148-
}
149-
tcpStats[tcpConnectionState(tcpRxQueuedBytes)] += float64(rx)
185+
return parseTCPStats(messages)
186+
}
150187

151-
st, err := strconv.ParseInt(parts[3], 16, 8)
152-
if err != nil {
153-
return nil, err
154-
}
188+
func parseTCPStats(msgs []netlink.Message) (map[tcpConnectionState]float64, error) {
189+
tcpStats := map[tcpConnectionState]float64{}
155190

156-
tcpStats[tcpConnectionState(st)]++
191+
for _, m := range msgs {
192+
msg := parseInetDiagMsg(m.Data)
157193

194+
tcpStats[tcpTxQueuedBytes] += float64(msg.WQueue)
195+
tcpStats[tcpRxQueuedBytes] += float64(msg.RQueue)
196+
tcpStats[tcpConnectionState(msg.State)]++
158197
}
159198

160199
return tcpStats, nil

collector/tcpstat_linux_test.go

Lines changed: 42 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -14,66 +14,56 @@
1414
package collector
1515

1616
import (
17-
"os"
18-
"strings"
17+
"bytes"
18+
"encoding/binary"
19+
"syscall"
1920
"testing"
21+
22+
"github.com/mdlayher/netlink"
2023
)
2124

22-
func Test_parseTCPStatsError(t *testing.T) {
23-
tests := []struct {
24-
name string
25-
in string
26-
}{
27-
{
28-
name: "too few fields",
29-
in: "sl local_address\n 0: 00000000:0016",
30-
},
31-
{
32-
name: "missing colon in tx-rx field",
33-
in: "sl local_address rem_address st tx_queue rx_queue\n" +
34-
" 1: 0F02000A:0016 0202000A:8B6B 01 0000000000000001",
35-
},
36-
{
37-
name: "tx parsing issue",
38-
in: "sl local_address rem_address st tx_queue rx_queue\n" +
39-
" 1: 0F02000A:0016 0202000A:8B6B 01 0000000x:00000001",
40-
},
25+
func Test_parseTCPStats(t *testing.T) {
26+
encode := func(m InetDiagMsg) []byte {
27+
var buf bytes.Buffer
28+
err := binary.Write(&buf, binary.LittleEndian, m)
29+
if err != nil {
30+
panic(err)
31+
}
32+
return buf.Bytes()
33+
}
34+
35+
msg := []netlink.Message{
4136
{
42-
name: "rx parsing issue",
43-
in: "sl local_address rem_address st tx_queue rx_queue\n" +
44-
" 1: 0F02000A:0016 0202000A:8B6B 01 00000000:0000000x",
37+
Data: encode(InetDiagMsg{
38+
Family: syscall.AF_INET,
39+
State: uint8(tcpEstablished),
40+
Timer: 0,
41+
Retrans: 0,
42+
ID: InetDiagSockID{},
43+
Expires: 0,
44+
RQueue: 11,
45+
WQueue: 21,
46+
UID: 0,
47+
Inode: 0,
48+
}),
4549
},
4650
{
47-
name: "state parsing issue",
48-
in: "sl local_address rem_address st tx_queue rx_queue\n" +
49-
" 1: 0F02000A:0016 0202000A:8B6B 0H 00000000:00000001",
51+
Data: encode(InetDiagMsg{
52+
Family: syscall.AF_INET,
53+
State: uint8(tcpListen),
54+
Timer: 0,
55+
Retrans: 0,
56+
ID: InetDiagSockID{},
57+
Expires: 0,
58+
RQueue: 11,
59+
WQueue: 21,
60+
UID: 0,
61+
Inode: 0,
62+
}),
5063
},
5164
}
52-
for _, tt := range tests {
53-
t.Run(tt.name, func(t *testing.T) {
54-
if _, err := parseTCPStats(strings.NewReader(tt.in)); err == nil {
55-
t.Fatal("expected an error, but none occurred")
56-
}
57-
})
58-
}
59-
}
60-
61-
func TestTCPStat(t *testing.T) {
62-
63-
noFile, _ := os.Open("follow the white rabbit")
64-
defer noFile.Close()
65-
66-
if _, err := parseTCPStats(noFile); err == nil {
67-
t.Fatal("expected an error, but none occurred")
68-
}
6965

70-
file, err := os.Open("fixtures/proc/net/tcpstat")
71-
if err != nil {
72-
t.Fatal(err)
73-
}
74-
defer file.Close()
75-
76-
tcpStats, err := parseTCPStats(file)
66+
tcpStats, err := parseTCPStats(msg)
7767
if err != nil {
7868
t.Fatal(err)
7969
}
@@ -89,35 +79,8 @@ func TestTCPStat(t *testing.T) {
8979
if want, got := 42, int(tcpStats[tcpTxQueuedBytes]); want != got {
9080
t.Errorf("want tcpstat number of bytes in tx queue %d, got %d", want, got)
9181
}
92-
if want, got := 1, int(tcpStats[tcpRxQueuedBytes]); want != got {
82+
if want, got := 22, int(tcpStats[tcpRxQueuedBytes]); want != got {
9383
t.Errorf("want tcpstat number of bytes in rx queue %d, got %d", want, got)
9484
}
9585

9686
}
97-
98-
func Test_getTCPStats(t *testing.T) {
99-
type args struct {
100-
statsFile string
101-
}
102-
tests := []struct {
103-
name string
104-
args args
105-
wantErr bool
106-
}{
107-
{
108-
name: "file not found",
109-
args: args{statsFile: "somewhere over the rainbow"},
110-
wantErr: true,
111-
},
112-
}
113-
for _, tt := range tests {
114-
t.Run(tt.name, func(t *testing.T) {
115-
_, err := getTCPStats(tt.args.statsFile)
116-
if (err != nil) != tt.wantErr {
117-
t.Errorf("getTCPStats() error = %v, wantErr %v", err, tt.wantErr)
118-
return
119-
}
120-
// other cases are covered by TestTCPStat()
121-
})
122-
}
123-
}

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ require (
1212
github.com/jsimonetti/rtnetlink v1.1.1
1313
github.com/lufia/iostat v1.2.1
1414
github.com/mattn/go-xmlrpc v0.0.3
15+
github.com/mdlayher/netlink v1.6.0
1516
github.com/mdlayher/wifi v0.0.0-20220320220353-954ff73a19a5
1617
github.com/prometheus/client_golang v1.12.1
1718
github.com/prometheus/client_model v0.2.0

0 commit comments

Comments
 (0)