Skip to content

Commit cc34909

Browse files
committed
more accurate binding of L7 requests to TCP connections
1 parent ef256ee commit cc34909

File tree

6 files changed

+78
-38
lines changed

6 files changed

+78
-38
lines changed

containers/container.go

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ type ActiveConnection struct {
6363
ActualDest netaddr.IPPort
6464
Pid uint32
6565
Fd uint64
66+
Timestamp uint64
67+
Closed time.Time
6668
}
6769

6870
type L7Stats struct {
@@ -89,7 +91,7 @@ type Container struct {
8991
connectsSuccessful map[AddrPair]int64 // dst:actual_dst -> count
9092
connectsFailed map[netaddr.IPPort]int64 // dst -> count
9193
connectLastAttempt map[netaddr.IPPort]time.Time // dst -> time
92-
connectionsActive map[AddrPair]ActiveConnection
94+
connectionsActive map[AddrPair]*ActiveConnection
9395
retransmits map[AddrPair]int64 // dst:actual_dst -> count
9496

9597
l7Stats map[ebpftracer.L7Protocol]map[AddrPair]*L7Stats // protocol -> dst:actual_dst -> stats
@@ -119,7 +121,7 @@ func NewContainer(cg *cgroup.Cgroup, md *ContainerMetadata) *Container {
119121
connectsSuccessful: map[AddrPair]int64{},
120122
connectsFailed: map[netaddr.IPPort]int64{},
121123
connectLastAttempt: map[netaddr.IPPort]time.Time{},
122-
connectionsActive: map[AddrPair]ActiveConnection{},
124+
connectionsActive: map[AddrPair]*ActiveConnection{},
123125
retransmits: map[AddrPair]int64{},
124126
l7Stats: map[ebpftracer.L7Protocol]map[AddrPair]*L7Stats{},
125127

@@ -265,6 +267,9 @@ func (c *Container) Collect(ch chan<- prometheus.Metric) {
265267

266268
connections := map[AddrPair]int{}
267269
for c, conn := range c.connectionsActive {
270+
if !conn.Closed.IsZero() {
271+
continue
272+
}
268273
connections[AddrPair{src: c.dst, dst: conn.ActualDest}]++
269274
}
270275
for d, count := range connections {
@@ -378,7 +383,7 @@ func (c *Container) onListenClose(pid uint32, addr netaddr.IPPort) {
378383
}
379384
}
380385

381-
func (c *Container) onConnectionOpen(pid uint32, fd uint64, src, dst netaddr.IPPort, failed bool) {
386+
func (c *Container) onConnectionOpen(pid uint32, fd uint64, src, dst netaddr.IPPort, timestamp uint64, failed bool) {
382387
if dst.IP().IsLoopback() {
383388
netNs, err := proc.GetNetNs(pid)
384389
isHostNs := err == nil && hostNetNsId == netNs.UniqueId()
@@ -405,10 +410,11 @@ func (c *Container) onConnectionOpen(pid uint32, fd uint64, src, dst netaddr.IPP
405410
} else {
406411
actualDst := ConntrackGetActualDestination(src, dst)
407412
c.connectsSuccessful[AddrPair{src: dst, dst: actualDst}]++
408-
c.connectionsActive[AddrPair{src: src, dst: dst}] = ActiveConnection{
413+
c.connectionsActive[AddrPair{src: src, dst: dst}] = &ActiveConnection{
409414
ActualDest: actualDst,
410415
Pid: pid,
411416
Fd: fd,
417+
Timestamp: timestamp,
412418
}
413419
}
414420
c.connectLastAttempt[dst] = time.Now()
@@ -417,16 +423,17 @@ func (c *Container) onConnectionOpen(pid uint32, fd uint64, src, dst netaddr.IPP
417423
func (c *Container) onConnectionClose(srcDst AddrPair) bool {
418424
c.lock.Lock()
419425
defer c.lock.Unlock()
420-
if _, ok := c.connectionsActive[srcDst]; !ok {
426+
conn := c.connectionsActive[srcDst]
427+
if conn == nil {
421428
return false
422429
}
423-
delete(c.connectionsActive, srcDst)
430+
conn.Closed = time.Now()
424431
return true
425432
}
426433

427-
func (c *Container) onL7Request(pid uint32, fd uint64, r *ebpftracer.L7Request) {
434+
func (c *Container) onL7Request(pid uint32, fd uint64, timestamp uint64, r *ebpftracer.L7Request) {
428435
for dest, conn := range c.connectionsActive {
429-
if conn.Pid == pid && conn.Fd == fd {
436+
if conn.Pid == pid && conn.Fd == fd && (timestamp == 0 || conn.Timestamp == timestamp) {
430437
key := AddrPair{src: dest.dst, dst: conn.ActualDest}
431438
stats := c.l7Stats[r.Protocol]
432439
if stats == nil {
@@ -731,9 +738,13 @@ func (c *Container) gc(now time.Time) {
731738

732739
c.revalidateListens(now, listens)
733740

734-
for srcDst := range c.connectionsActive {
741+
for srcDst, conn := range c.connectionsActive {
735742
if _, ok := established[srcDst]; !ok {
736743
delete(c.connectionsActive, srcDst)
744+
continue
745+
}
746+
if !conn.Closed.IsZero() && now.Sub(conn.Closed) > gcInterval {
747+
delete(c.connectionsActive, srcDst)
737748
}
738749
}
739750
for dst, at := range c.connectLastAttempt {

containers/registry.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -178,13 +178,13 @@ func (r *Registry) handleEvents(ch <-chan ebpftracer.Event) {
178178

179179
case ebpftracer.EventTypeConnectionOpen:
180180
if c := r.getOrCreateContainer(e.Pid); c != nil {
181-
c.onConnectionOpen(e.Pid, e.Fd, e.SrcAddr, e.DstAddr, false)
181+
c.onConnectionOpen(e.Pid, e.Fd, e.SrcAddr, e.DstAddr, e.Timestamp, false)
182182
} else {
183183
klog.Infoln("TCP connection from unknown container", e)
184184
}
185185
case ebpftracer.EventTypeConnectionError:
186186
if c := r.getOrCreateContainer(e.Pid); c != nil {
187-
c.onConnectionOpen(e.Pid, e.Fd, e.SrcAddr, e.DstAddr, true)
187+
c.onConnectionOpen(e.Pid, e.Fd, e.SrcAddr, e.DstAddr, 0, true)
188188
} else {
189189
klog.Infoln("TCP connection error from unknown container", e)
190190
}
@@ -207,7 +207,7 @@ func (r *Registry) handleEvents(ch <-chan ebpftracer.Event) {
207207
continue
208208
}
209209
if c := r.containersByPid[e.Pid]; c != nil {
210-
c.onL7Request(e.Pid, e.Fd, e.L7Request)
210+
c.onL7Request(e.Pid, e.Fd, e.Timestamp, e.L7Request)
211211
}
212212
}
213213
}

ebpftracer/ebpf.go

Lines changed: 4 additions & 4 deletions
Large diffs are not rendered by default.

ebpftracer/ebpf/l7/l7.c

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
struct l7_event {
1717
__u64 fd;
18+
__u64 connection_timestamp;
1819
__u32 pid;
1920
__u32 status;
2021
__u64 duration;
@@ -136,6 +137,7 @@ int trace_exit_read(struct trace_event_raw_sys_exit_rw__stub* ctx) {
136137
e.protocol = req->protocol;
137138
e.fd = k.fd;
138139
e.pid = k.pid;
140+
e.connection_timestamp = 0;
139141
__u64 ns = req->ns;
140142
__u8 partial = req->partial;
141143
bpf_map_delete_elem(&active_l7_requests, &k);
@@ -165,6 +167,10 @@ int trace_exit_read(struct trace_event_raw_sys_exit_rw__stub* ctx) {
165167
return 0;
166168
}
167169
e.duration = bpf_ktime_get_ns() - ns;
170+
__u64 *timestamp = bpf_map_lookup_elem(&connection_timestamps, &k);
171+
if (timestamp) {
172+
e.connection_timestamp = *timestamp;
173+
}
168174
bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, &e, sizeof(e));
169175
return 0;
170176
}

ebpftracer/ebpf/tcp/state.c

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
struct tcp_event {
44
__u64 fd;
5+
__u64 timestamp;
56
__u32 type;
67
__u32 pid;
78
__u16 sport;
@@ -59,6 +60,13 @@ struct {
5960
__uint(max_entries, 10240);
6061
} sk_info SEC(".maps");
6162

63+
struct {
64+
__uint(type, BPF_MAP_TYPE_LRU_HASH);
65+
__uint(key_size, sizeof(struct sk_info));
66+
__uint(value_size, sizeof(__u64));
67+
__uint(max_entries, 32768);
68+
} connection_timestamps SEC(".maps");
69+
6270
SEC("tracepoint/sock/inet_sock_set_state")
6371
int inet_sock_set_state(void *ctx)
6472
{
@@ -88,18 +96,22 @@ int inet_sock_set_state(void *ctx)
8896

8997
__u64 fd = 0;
9098
__u32 type = 0;
99+
__u64 timestamp = 0;
91100
void *map = &tcp_connect_events;
92101
if (args.oldstate == BPF_TCP_SYN_SENT) {
102+
struct sk_info *i = bpf_map_lookup_elem(&sk_info, &args.skaddr);
103+
if (!i) {
104+
return 0;
105+
}
93106
if (args.newstate == BPF_TCP_ESTABLISHED) {
107+
timestamp = bpf_ktime_get_ns();
108+
struct sk_info k = {};
109+
k.pid = i->pid;
110+
k.fd = i->fd;
111+
bpf_map_update_elem(&connection_timestamps, &k, &timestamp, BPF_ANY);
94112
type = EVENT_TYPE_CONNECTION_OPEN;
95113
} else if (args.newstate == BPF_TCP_CLOSE) {
96114
type = EVENT_TYPE_CONNECTION_ERROR;
97-
} else {
98-
return 0;
99-
}
100-
struct sk_info *i = bpf_map_lookup_elem(&sk_info, &args.skaddr);
101-
if (!i) {
102-
return 0;
103115
}
104116
pid = i->pid;
105117
fd = i->fd;
@@ -124,6 +136,7 @@ int inet_sock_set_state(void *ctx)
124136

125137
struct tcp_event e = {};
126138
e.type = type;
139+
e.timestamp = timestamp;
127140
e.pid = pid;
128141
e.sport = args.sport;
129142
e.dport = args.dport;
@@ -136,15 +149,15 @@ int inet_sock_set_state(void *ctx)
136149
return 0;
137150
}
138151

139-
struct trace_event_raw_sys_enter_connect__stub {
152+
struct trace_event_raw_args_with_fd__stub {
140153
__u64 unused;
141154
long int id;
142155
__u64 fd;
143156
};
144157

145158
SEC("tracepoint/syscalls/sys_enter_connect")
146159
int sys_enter_connect(void *ctx) {
147-
struct trace_event_raw_sys_enter_connect__stub args = {};
160+
struct trace_event_raw_args_with_fd__stub args = {};
148161
if (bpf_probe_read(&args, sizeof(args), ctx) < 0) {
149162
return 0;
150163
}

ebpftracer/tracer.go

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ type Event struct {
8181
SrcAddr netaddr.IPPort
8282
DstAddr netaddr.IPPort
8383
Fd uint64
84+
Timestamp uint64
8485
L7Request *L7Request
8586
}
8687

@@ -287,17 +288,25 @@ func (e procEvent) Event() Event {
287288
}
288289

289290
type tcpEvent struct {
290-
Fd uint64
291-
Type uint32
292-
Pid uint32
293-
SPort uint16
294-
DPort uint16
295-
SAddr [16]byte
296-
DAddr [16]byte
291+
Fd uint64
292+
Timestamp uint64
293+
Type uint32
294+
Pid uint32
295+
SPort uint16
296+
DPort uint16
297+
SAddr [16]byte
298+
DAddr [16]byte
297299
}
298300

299301
func (e tcpEvent) Event() Event {
300-
return Event{Type: EventType(e.Type), Pid: e.Pid, SrcAddr: ipPort(e.SAddr, e.SPort), DstAddr: ipPort(e.DAddr, e.DPort), Fd: e.Fd}
302+
return Event{
303+
Type: EventType(e.Type),
304+
Pid: e.Pid,
305+
SrcAddr: ipPort(e.SAddr, e.SPort),
306+
DstAddr: ipPort(e.DAddr, e.DPort),
307+
Fd: e.Fd,
308+
Timestamp: e.Timestamp,
309+
}
301310
}
302311

303312
type fileEvent struct {
@@ -311,15 +320,16 @@ func (e fileEvent) Event() Event {
311320
}
312321

313322
type l7Event struct {
314-
Fd uint64
315-
Pid uint32
316-
Status uint32
317-
Duration uint64
318-
Protocol uint8
323+
Fd uint64
324+
ConnectionTimestamp uint64
325+
Pid uint32
326+
Status uint32
327+
Duration uint64
328+
Protocol uint8
319329
}
320330

321331
func (e l7Event) Event() Event {
322-
return Event{Type: EventTypeL7Request, Pid: e.Pid, Fd: e.Fd, L7Request: &L7Request{
332+
return Event{Type: EventTypeL7Request, Pid: e.Pid, Fd: e.Fd, Timestamp: e.ConnectionTimestamp, L7Request: &L7Request{
323333
Protocol: L7Protocol(e.Protocol),
324334
Status: int(e.Status),
325335
Duration: time.Duration(e.Duration),

0 commit comments

Comments
 (0)