Skip to content

Commit f4a8338

Browse files
committed
eBPF tracer: cassandra protocol support
1 parent 99c3b38 commit f4a8338

File tree

5 files changed

+74
-5
lines changed

5 files changed

+74
-5
lines changed

containers/metrics.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ var (
8181
ebpftracer.L7ProtocolMysql: {Name: "container_mysql_queries_total", Help: "Total number of outbound Mysql queries"},
8282
ebpftracer.L7ProtocolMongo: {Name: "container_mongo_queries_total", Help: "Total number of outbound Mongo queries"},
8383
ebpftracer.L7ProtocolKafka: {Name: "container_kafka_requests_total", Help: "Total number of outbound Kafka requests"},
84+
ebpftracer.L7ProtocolCassandra: {Name: "container_cassandra_queries_total", Help: "Total number of outbound Cassandra requests"},
8485
}
8586
L7Latency = map[ebpftracer.L7Protocol]prometheus.HistogramOpts{
8687
ebpftracer.L7ProtocolHTTP: {Name: "container_http_requests_duration_seconds_total", Help: "Histogram of the response time for each outbound HTTP request"},
@@ -90,6 +91,7 @@ var (
9091
ebpftracer.L7ProtocolMysql: {Name: "container_mysql_queries_duration_seconds_total", Help: "Histogram of the execution time for each outbound Mysql query"},
9192
ebpftracer.L7ProtocolMongo: {Name: "container_mongo_queries_duration_seconds_total", Help: "Histogram of the execution time for each outbound Mongo query"},
9293
ebpftracer.L7ProtocolKafka: {Name: "container_kafka_requests_duration_seconds_total", Help: "Histogram of the execution time for each outbound Kafka request"},
94+
ebpftracer.L7ProtocolCassandra: {Name: "container_cassandra_queries_duration_seconds_total", Help: "Histogram of the execution time for each outbound Cassandra request"},
9395
}
9496
)
9597

ebpftracer/ebpf.go

Lines changed: 4 additions & 4 deletions
Large diffs are not rendered by default.

ebpftracer/ebpf/l7/cassandra.c

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
// https://github.com/apache/cassandra/blob/trunk/doc/native_protocol_v4.spec
2+
3+
#define CASSANDRA_REQUEST_FRAME 0x04
4+
#define CASSANDRA_RESPONSE_FRAME 0x84
5+
6+
#define CASSANDRA_OPCODE_ERROR 0x00
7+
#define CASSANDRA_OPCODE_RESULT 0x08
8+
9+
struct cassandra_header {
10+
__u8 version;
11+
__u8 flags;
12+
__s16 stream_id;
13+
__u8 opcode;
14+
};
15+
16+
static __always_inline
17+
__s16 is_cassandra_request(char *buf, int buf_size) {
18+
if (buf_size < 1) {
19+
return -1;
20+
}
21+
struct cassandra_header h = {};
22+
if (bpf_probe_read(&h, sizeof(h), (void *)buf) < 0) {
23+
return -1;
24+
}
25+
if (h.version == CASSANDRA_REQUEST_FRAME && h.stream_id >= 0) {
26+
return h.stream_id;
27+
}
28+
return -1;
29+
}
30+
31+
static __always_inline
32+
__u32 cassandra_status(struct cassandra_header h) {
33+
if (h.version != CASSANDRA_RESPONSE_FRAME || h.stream_id == -1) {
34+
return 0;
35+
}
36+
if (h.opcode == CASSANDRA_OPCODE_RESULT) {
37+
return 200;
38+
}
39+
if (h.opcode == CASSANDRA_OPCODE_ERROR) {
40+
return 500;
41+
}
42+
return 0;
43+
}
44+

ebpftracer/ebpf/l7/l7.c

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include "mysql.c"
66
#include "mongo.c"
77
#include "kafka.c"
8+
#include "cassandra.c"
89

910

1011
#define PROTOCOL_UNKNOWN 0
@@ -15,6 +16,7 @@
1516
#define PROTOCOL_MYSQL 5
1617
#define PROTOCOL_MONGO 6
1718
#define PROTOCOL_KAFKA 7
19+
#define PROTOCOL_CASSANDRA 8
1820

1921
struct l7_event {
2022
__u64 fd;
@@ -46,6 +48,7 @@ struct {
4648
struct socket_key {
4749
__u64 fd;
4850
__u32 pid;
51+
__s16 stream_id;
4952
};
5053

5154
struct l7_request {
@@ -92,6 +95,7 @@ int trace_enter_write(__u64 fd, char *buf, __u64 size) {
9295
struct socket_key k = {};
9396
k.pid = id >> 32;
9497
k.fd = fd;
98+
k.stream_id = -1;
9599
if (is_http_request(buf)) {
96100
req.protocol = PROTOCOL_HTTP;
97101
} else if (is_postgres_query(buf, size)) {
@@ -114,6 +118,10 @@ int trace_enter_write(__u64 fd, char *buf, __u64 size) {
114118
req.ns = prev_req->ns;
115119
}
116120
}
121+
k.stream_id = is_cassandra_request(buf, size);
122+
if (k.stream_id != -1) {
123+
req.protocol = PROTOCOL_CASSANDRA;
124+
}
117125
}
118126
if (req.protocol == PROTOCOL_UNKNOWN) {
119127
return 0;
@@ -147,16 +155,26 @@ int trace_exit_read(struct trace_event_raw_sys_exit_rw__stub* ctx) {
147155
struct socket_key k = {};
148156
k.pid = id >> 32;
149157
k.fd = args->fd;
158+
k.stream_id = -1;
150159
buf = args->buf;
151160

152161
bpf_map_delete_elem(&active_reads, &id);
153162
if (ctx->ret <= 0) {
154163
return 0;
155164
}
165+
struct cassandra_header cassandra_response = {};
166+
cassandra_response.stream_id = -1;
156167

157168
struct l7_request *req = bpf_map_lookup_elem(&active_l7_requests, &k);
158169
if (!req) {
159-
return 0;
170+
if (bpf_probe_read(&cassandra_response, sizeof(cassandra_response), (void *)(buf)) < 0) {
171+
return 0;
172+
}
173+
k.stream_id = cassandra_response.stream_id;
174+
req = bpf_map_lookup_elem(&active_l7_requests, &k);
175+
if (!req) {
176+
return 0;
177+
}
160178
}
161179
__s32 request_id = req->request_id;
162180
struct l7_event e = {};
@@ -189,6 +207,8 @@ int trace_exit_read(struct trace_event_raw_sys_exit_rw__stub* ctx) {
189207
}
190208
} else if (e.protocol == PROTOCOL_KAFKA) {
191209
e.status = parse_kafka_status(request_id, buf, ctx->ret, partial);
210+
} else if (e.protocol == PROTOCOL_CASSANDRA) {
211+
e.status = cassandra_status(cassandra_response);
192212
}
193213
if (e.status == 0) {
194214
return 0;

ebpftracer/tracer.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ const (
4949
L7ProtocolMysql L7Protocol = 5
5050
L7ProtocolMongo L7Protocol = 6
5151
L7ProtocolKafka L7Protocol = 7
52+
L7ProtocolCassandra L7Protocol = 8
5253
)
5354

5455
func (p L7Protocol) String() string {
@@ -67,6 +68,8 @@ func (p L7Protocol) String() string {
6768
return "Mongo"
6869
case L7ProtocolKafka:
6970
return "Kafka"
71+
case L7ProtocolCassandra:
72+
return "Cassandra"
7073
}
7174
return "UNKNOWN:" + strconv.Itoa(int(p))
7275
}

0 commit comments

Comments
 (0)