Skip to content

Commit 99c3b38

Browse files
committed
eBPF tracer: kafka protocol support
1 parent e349472 commit 99c3b38

File tree

6 files changed

+86
-15
lines changed

6 files changed

+86
-15
lines changed

containers/container.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -501,6 +501,8 @@ func (c *Container) onL7Request(pid uint32, fd uint64, timestamp uint64, r *ebpf
501501
switch r.Protocol {
502502
case ebpftracer.L7ProtocolHTTP:
503503
status = strconv.Itoa(r.Status)
504+
case ebpftracer.L7ProtocolMongo, ebpftracer.L7ProtocolKafka:
505+
status = "unknown"
504506
default:
505507
if r.Status == 500 {
506508
status = "failed"

containers/metrics.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ var (
8080
ebpftracer.L7ProtocolMemcached: {Name: "container_memcached_queries_total", Help: "Total number of outbound Memcached queries"},
8181
ebpftracer.L7ProtocolMysql: {Name: "container_mysql_queries_total", Help: "Total number of outbound Mysql queries"},
8282
ebpftracer.L7ProtocolMongo: {Name: "container_mongo_queries_total", Help: "Total number of outbound Mongo queries"},
83+
ebpftracer.L7ProtocolKafka: {Name: "container_kafka_requests_total", Help: "Total number of outbound Kafka requests"},
8384
}
8485
L7Latency = map[ebpftracer.L7Protocol]prometheus.HistogramOpts{
8586
ebpftracer.L7ProtocolHTTP: {Name: "container_http_requests_duration_seconds_total", Help: "Histogram of the response time for each outbound HTTP request"},
@@ -88,6 +89,7 @@ var (
8889
ebpftracer.L7ProtocolMemcached: {Name: "container_memcached_queries_duration_seconds_total", Help: "Histogram of the execution time for each outbound Memcached query"},
8990
ebpftracer.L7ProtocolMysql: {Name: "container_mysql_queries_duration_seconds_total", Help: "Histogram of the execution time for each outbound Mysql query"},
9091
ebpftracer.L7ProtocolMongo: {Name: "container_mongo_queries_duration_seconds_total", Help: "Histogram of the execution time for each outbound Mongo query"},
92+
ebpftracer.L7ProtocolKafka: {Name: "container_kafka_requests_duration_seconds_total", Help: "Histogram of the execution time for each outbound Kafka request"},
9193
}
9294
)
9395

ebpftracer/ebpf.go

Lines changed: 4 additions & 4 deletions
Large diffs are not rendered by default.

ebpftracer/ebpf/l7/kafka.c

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
// https://kafka.apache.org/protocol.html
2+
3+
struct kafka_request_header {
4+
__s32 length;
5+
__s16 api_key;
6+
__s16 api_version;
7+
__s32 correlation_id;
8+
};
9+
10+
struct kafka_response_header {
11+
__s32 length;
12+
__s32 correlation_id;
13+
};
14+
15+
static __always_inline
16+
__s32 is_kafka_request(char *buf, int buf_size) {
17+
if (buf_size < 1) {
18+
return 0;
19+
}
20+
struct kafka_request_header h = {};
21+
if (bpf_probe_read(&h, sizeof(h), (void *)((char *)buf)) < 0) {
22+
return 0;
23+
}
24+
if (h.correlation_id > 0 && (h.api_key >= 0 && h.api_key <= 67)) {
25+
return h.correlation_id;
26+
}
27+
return 0;
28+
}
29+
30+
static __always_inline
31+
__u32 parse_kafka_status(__s32 request_id, char *buf, int buf_size, __u8 partial) {
32+
struct kafka_response_header h = {};
33+
if (bpf_probe_read(&h, sizeof(h), (void *)((char *)buf)) < 0) {
34+
return 0;
35+
}
36+
if (h.correlation_id == request_id) {
37+
return 200;
38+
}
39+
return 0;
40+
}
41+
42+

ebpftracer/ebpf/l7/l7.c

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
#include "memcached.c"
55
#include "mysql.c"
66
#include "mongo.c"
7+
#include "kafka.c"
8+
79

810
#define PROTOCOL_UNKNOWN 0
911
#define PROTOCOL_HTTP 1
@@ -12,6 +14,7 @@
1214
#define PROTOCOL_MEMCACHED 4
1315
#define PROTOCOL_MYSQL 5
1416
#define PROTOCOL_MONGO 6
17+
#define PROTOCOL_KAFKA 7
1518

1619
struct l7_event {
1720
__u64 fd;
@@ -49,6 +52,7 @@ struct l7_request {
4952
__u64 ns;
5053
__u8 protocol;
5154
__u8 partial;
55+
__s32 request_id;
5256
};
5357

5458
struct {
@@ -81,7 +85,13 @@ static inline __attribute__((__always_inline__))
8185
int trace_enter_write(__u64 fd, char *buf, __u64 size) {
8286
__u64 id = bpf_get_current_pid_tgid();
8387
struct l7_request req = {};
88+
req.protocol = PROTOCOL_UNKNOWN;
8489
req.partial = 0;
90+
req.request_id = 0;
91+
req.ns = 0;
92+
struct socket_key k = {};
93+
k.pid = id >> 32;
94+
k.fd = fd;
8595
if (is_http_request(buf)) {
8696
req.protocol = PROTOCOL_HTTP;
8797
} else if (is_postgres_query(buf, size)) {
@@ -95,12 +105,22 @@ int trace_enter_write(__u64 fd, char *buf, __u64 size) {
95105
} else if (is_mongo_query(buf, size)) {
96106
req.protocol = PROTOCOL_MONGO;
97107
} else {
108+
__s32 request_id = is_kafka_request(buf, size);
109+
if (request_id > 0) {
110+
req.request_id = request_id;
111+
req.protocol = PROTOCOL_KAFKA;
112+
struct l7_request *prev_req = bpf_map_lookup_elem(&active_l7_requests, &k);
113+
if (prev_req && prev_req->protocol == PROTOCOL_KAFKA) {
114+
req.ns = prev_req->ns;
115+
}
116+
}
117+
}
118+
if (req.protocol == PROTOCOL_UNKNOWN) {
98119
return 0;
99120
}
100-
req.ns = bpf_ktime_get_ns();
101-
struct socket_key k = {};
102-
k.pid = id >> 32;
103-
k.fd = fd;
121+
if (req.ns == 0) {
122+
req.ns = bpf_ktime_get_ns();
123+
}
104124
bpf_map_update_elem(&active_l7_requests, &k, &req, BPF_ANY);
105125
return 0;
106126
}
@@ -138,6 +158,7 @@ int trace_exit_read(struct trace_event_raw_sys_exit_rw__stub* ctx) {
138158
if (!req) {
139159
return 0;
140160
}
161+
__s32 request_id = req->request_id;
141162
struct l7_event e = {};
142163
e.protocol = req->protocol;
143164
e.fd = k.fd;
@@ -146,18 +167,17 @@ int trace_exit_read(struct trace_event_raw_sys_exit_rw__stub* ctx) {
146167
__u64 ns = req->ns;
147168
__u8 partial = req->partial;
148169
bpf_map_delete_elem(&active_l7_requests, &k);
149-
150-
if (req->protocol == PROTOCOL_HTTP) {
170+
if (e.protocol == PROTOCOL_HTTP) {
151171
e.status = parse_http_status(buf);
152-
} else if (req->protocol == PROTOCOL_POSTGRES) {
172+
} else if (e.protocol == PROTOCOL_POSTGRES) {
153173
e.status = parse_postgres_status(buf, ctx->ret);
154-
} else if (req->protocol == PROTOCOL_REDIS) {
174+
} else if (e.protocol == PROTOCOL_REDIS) {
155175
e.status = parse_redis_status(buf, ctx->ret);
156-
} else if (req->protocol == PROTOCOL_MEMCACHED) {
176+
} else if (e.protocol == PROTOCOL_MEMCACHED) {
157177
e.status = parse_memcached_status(buf, ctx->ret);
158-
} else if (req->protocol == PROTOCOL_MYSQL) {
178+
} else if (e.protocol == PROTOCOL_MYSQL) {
159179
e.status = parse_mysql_status(buf, ctx->ret);
160-
} else if (req->protocol == PROTOCOL_MONGO) {
180+
} else if (e.protocol == PROTOCOL_MONGO) {
161181
e.status = parse_mongo_status(buf, ctx->ret, partial);
162182
if (e.status == 1) {
163183
struct l7_request r = {};
@@ -167,6 +187,8 @@ int trace_exit_read(struct trace_event_raw_sys_exit_rw__stub* ctx) {
167187
bpf_map_update_elem(&active_l7_requests, &k, &r, BPF_ANY);
168188
return 0;
169189
}
190+
} else if (e.protocol == PROTOCOL_KAFKA) {
191+
e.status = parse_kafka_status(request_id, buf, ctx->ret, partial);
170192
}
171193
if (e.status == 0) {
172194
return 0;

ebpftracer/tracer.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ const (
4848
L7ProtocolMemcached L7Protocol = 4
4949
L7ProtocolMysql L7Protocol = 5
5050
L7ProtocolMongo L7Protocol = 6
51+
L7ProtocolKafka L7Protocol = 7
5152
)
5253

5354
func (p L7Protocol) String() string {
@@ -64,6 +65,8 @@ func (p L7Protocol) String() string {
6465
return "Mysql"
6566
case L7ProtocolMongo:
6667
return "Mongo"
68+
case L7ProtocolKafka:
69+
return "Kafka"
6770
}
6871
return "UNKNOWN:" + strconv.Itoa(int(p))
6972
}

0 commit comments

Comments
 (0)