Skip to content

Commit 68e9d3c

Browse files
Merge pull request #4 from FireTail-io/is-json
Is json
2 parents e8e9fda + 4f27e35 commit 68e9d3c

File tree

8 files changed

+276
-20
lines changed

8 files changed

+276
-20
lines changed

Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,5 @@ dev: build-dev
2323
-e FIRETAIL_KUBERNETES_SENSOR_DEV_MODE=true \
2424
-e FIRETAIL_KUBERNETES_SENSOR_DEV_SERVER_ENABLED=true \
2525
-e DISABLE_SERVICE_IP_FILTERING=true \
26+
-e ENABLE_ONLY_LOG_JSON=true \
2627
firetail/kubernetes-sensor-dev

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ POC for a FireTail Kubernetes Sensor.
1111
| `FIRETAIL_API_TOKEN` || `PS-02-XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX-XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX` | The API token the sensor will use to report logs to FireTail |
1212
| `BPF_EXPRESSION` || `tcp and (port 80 or port 443)` | The BPF filter used by the sensor. See docs for syntax info: https://www.tcpdump.org/manpages/pcap-filter.7.html |
1313
| `DISABLE_SERVICE_IP_FILTERING` || `true` | Disables polling Kubernetes for the IP addresses of services & subsequently ignoring all requests captured that aren't made to one of those IPs. |
14+
| `ENABLE_ONLY_LOG_JSON` || `true` | Enables only logging requests where the content-type implies the payload should be JSON, or the payload is valid JSON regardless of the content-type. |
15+
| `ONLY_LOG_JSON_MAX_CONTENT_LENGTH` || `1048576` | When `ENABLE_ONLY_LOG_JSON` is `true`, the sensor will only read request or response bodies to check if they're valid JSON if their length is less than `ONLY_LOG_JSON_MAX_CONTENT_LENGTH` bytes. |
1416
| `FIRETAIL_API_URL` || `https://api.logging.eu-west-1.prod.firetail.app/logs/bulk` | The API url the sensor will send logs to. Defaults to the EU region production environment. |
1517
| `FIRETAIL_KUBERNETES_SENSOR_DEV_MODE` || `true` | Enables debug logging when set to `true`, and reduces the max age of a log in a batch to be sent to FireTail. |
1618
| `FIRETAIL_KUBERNETES_SENSOR_DEV_SERVER_ENABLED` || `true` | Enables a demo web server when set to `true`; useful for sending test requests to. |

src/is_json.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package main
2+
3+
import (
4+
"bytes"
5+
"encoding/json"
6+
"io"
7+
"mime"
8+
"net/http"
9+
"strings"
10+
)
11+
12+
func isJson(reqAndResp *httpRequestAndResponse, maxContentLength int64) bool {
13+
for _, headers := range []http.Header{reqAndResp.request.Header, reqAndResp.response.Header} {
14+
contentTypeHeader := headers.Get("Content-Type")
15+
mediaType, _, err := mime.ParseMediaType(contentTypeHeader)
16+
if err == nil && mediaType == "application/json" {
17+
return true
18+
}
19+
if strings.HasSuffix(mediaType, "+json") {
20+
return true
21+
}
22+
}
23+
24+
if reqAndResp.request.ContentLength <= maxContentLength {
25+
bodyBytes, err := io.ReadAll(reqAndResp.request.Body)
26+
reqAndResp.request.Body = io.NopCloser(io.MultiReader(bytes.NewReader(bodyBytes)))
27+
if err != nil {
28+
return false
29+
}
30+
var v map[string]interface{}
31+
if json.Unmarshal(bodyBytes, &v) == nil {
32+
return true
33+
}
34+
}
35+
36+
if reqAndResp.response.ContentLength <= maxContentLength {
37+
bodyBytes, err := io.ReadAll(reqAndResp.response.Body)
38+
reqAndResp.response.Body = io.NopCloser(io.MultiReader(bytes.NewReader(bodyBytes)))
39+
if err != nil {
40+
return false
41+
}
42+
var v map[string]interface{}
43+
if json.Unmarshal(bodyBytes, &v) == nil {
44+
return true
45+
}
46+
}
47+
48+
return false
49+
}

src/is_json_test.go

Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
package main
2+
3+
import (
4+
"io"
5+
"net/http"
6+
"strings"
7+
"testing"
8+
)
9+
10+
func TestIsJson(t *testing.T) {
11+
tests := []struct {
12+
name string
13+
reqContentType string
14+
reqBody string
15+
respContentType string
16+
respBody string
17+
maxContentLength int64
18+
expectedResult bool
19+
}{
20+
{
21+
name: "Valid JSON in both request and response with correct content types",
22+
reqContentType: "application/json",
23+
reqBody: `{"key": "value"}`,
24+
respContentType: "application/json",
25+
respBody: `{"key": "value"}`,
26+
maxContentLength: 1024,
27+
expectedResult: true,
28+
},
29+
{
30+
name: "XML in request and response with correct Content-Type",
31+
reqContentType: "application/xml",
32+
reqBody: `<key>value</key>`,
33+
respContentType: "application/xml",
34+
respBody: `<key>value</key>`,
35+
maxContentLength: 1024,
36+
expectedResult: false,
37+
},
38+
{
39+
name: "XML in request with JSON in response",
40+
reqContentType: "application/xml",
41+
reqBody: `<key>value</key>`,
42+
respContentType: "application/json",
43+
respBody: `{"key": "value"}`,
44+
maxContentLength: 1024,
45+
expectedResult: true,
46+
},
47+
{
48+
name: "JSON in request with XML in response",
49+
reqContentType: "application/json",
50+
reqBody: `{"key": "value"}`,
51+
respContentType: "application/xml",
52+
respBody: `<key>value</key>`,
53+
maxContentLength: 1024,
54+
expectedResult: true,
55+
},
56+
{
57+
name: "Empty request and response bodies and headers",
58+
reqContentType: "",
59+
reqBody: "",
60+
respContentType: "",
61+
respBody: "",
62+
maxContentLength: 1024,
63+
expectedResult: false,
64+
},
65+
{
66+
name: "No content-type headers with valid JSON in request",
67+
reqContentType: "",
68+
reqBody: `{"key": "value"}`,
69+
respContentType: "",
70+
respBody: ``,
71+
maxContentLength: 1024,
72+
expectedResult: true,
73+
},
74+
{
75+
name: "No content-type headers with valid JSON in response",
76+
reqContentType: "",
77+
reqBody: ``,
78+
respContentType: "",
79+
respBody: `{"key": "value"}`,
80+
maxContentLength: 1024,
81+
expectedResult: true,
82+
},
83+
{
84+
name: "No content-type headers with invalid JSON in request",
85+
reqContentType: "",
86+
reqBody: `{"key": "value"`,
87+
respContentType: "",
88+
respBody: ``,
89+
maxContentLength: 1024,
90+
expectedResult: false,
91+
},
92+
{
93+
name: "No content-type headers with invalid JSON in response",
94+
reqContentType: "",
95+
reqBody: ``,
96+
respContentType: "",
97+
respBody: `{"key": "value"`,
98+
maxContentLength: 1024,
99+
expectedResult: false,
100+
},
101+
{
102+
name: "Content-type geo+json in request with invalid body",
103+
reqContentType: "application/geo+json",
104+
reqBody: ``,
105+
respContentType: "",
106+
respBody: ``,
107+
maxContentLength: 1024,
108+
expectedResult: true,
109+
},
110+
{
111+
name: "No content-type headers with request payload longer than max length",
112+
reqContentType: "",
113+
reqBody: `{"key": "` + strings.Repeat("a", 1025) + `"}`,
114+
respContentType: "",
115+
respBody: ``,
116+
maxContentLength: 1024,
117+
expectedResult: false,
118+
},
119+
{
120+
name: "No content-type headers with response payload longer than max length",
121+
reqContentType: "",
122+
reqBody: ``,
123+
respContentType: "",
124+
respBody: `{"key": "` + strings.Repeat("a", 1025) + `"}`,
125+
maxContentLength: 1024,
126+
expectedResult: false,
127+
},
128+
{
129+
name: "No content-type headers with request payload longer than max length and response payload shorter",
130+
reqContentType: "",
131+
reqBody: strings.Repeat("a", 1025),
132+
respContentType: "",
133+
respBody: `{"key": "value"}`,
134+
maxContentLength: 1024,
135+
expectedResult: true,
136+
},
137+
{
138+
name: "No content-type headers with request payload shorter than max length and response payload longer",
139+
reqContentType: "",
140+
reqBody: `{"key": "value"}`,
141+
respContentType: "",
142+
respBody: strings.Repeat("a", 1025),
143+
maxContentLength: 1024,
144+
expectedResult: true,
145+
},
146+
}
147+
148+
for _, tt := range tests {
149+
t.Run(tt.name, func(t *testing.T) {
150+
req, err := http.NewRequest("POST", "/", strings.NewReader(tt.reqBody))
151+
if err != nil {
152+
t.Fatalf("Failed to create request: %v", err)
153+
}
154+
if tt.reqContentType != "" {
155+
req.Header.Set("Content-Type", tt.reqContentType)
156+
}
157+
158+
resp := &http.Response{
159+
Header: make(http.Header),
160+
Body: io.NopCloser(strings.NewReader(tt.respBody)),
161+
ContentLength: int64(len(tt.respBody)),
162+
}
163+
if tt.respContentType != "" {
164+
resp.Header.Set("Content-Type", tt.respContentType)
165+
}
166+
167+
reqAndResp := httpRequestAndResponse{
168+
request: req,
169+
response: resp,
170+
}
171+
172+
result := isJson(&reqAndResp, tt.maxContentLength)
173+
if result != tt.expectedResult {
174+
t.Errorf("isJson() = %v, want %v", result, tt.expectedResult)
175+
}
176+
})
177+
}
178+
}

src/main.go

Lines changed: 46 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -15,23 +15,26 @@ import (
1515
)
1616

1717
func main() {
18+
logsApiToken, logsApiTokenSet := os.LookupEnv("FIRETAIL_API_TOKEN")
19+
if !logsApiTokenSet {
20+
log.Fatal("FIRETAIL_API_TOKEN environment variable not set")
21+
}
22+
1823
devEnabled, _ := strconv.ParseBool(os.Getenv("FIRETAIL_KUBERNETES_SENSOR_DEV_MODE"))
1924
if devEnabled {
2025
slog.Warn("🧰 Development mode enabled, setting log level to debug...")
2126
slog.SetLogLoggerLevel(slog.LevelDebug)
2227
}
2328

24-
logsApiToken, logsApiTokenSet := os.LookupEnv("FIRETAIL_API_TOKEN")
25-
if !logsApiTokenSet {
26-
log.Fatal("FIRETAIL_API_TOKEN environment variable not set")
27-
}
28-
29-
var ipManager *serviceIpManager
30-
if disableServiceIpFilter, err := strconv.ParseBool(os.Getenv("DISABLE_SERVICE_IP_FILTERING")); !(err == nil && disableServiceIpFilter) {
31-
slog.Info(
32-
"Service IP filter enabled, monitoring service IPs...",
33-
)
34-
ipManager = newServiceIpManager()
29+
devServerEnabled, err := strconv.ParseBool(os.Getenv("FIRETAIL_KUBERNETES_SENSOR_DEV_SERVER_ENABLED"))
30+
if err == nil && devServerEnabled {
31+
slog.Warn("🧰 Development server enabled, starting example HTTP server...")
32+
go func() {
33+
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
34+
fmt.Fprintf(w, "Hello, %s!", r.URL.Path[1:])
35+
})
36+
log.Fatal(http.ListenAndServe(":80", nil))
37+
}()
3538
}
3639

3740
bpfExpression, bpfExpressionSet := os.LookupEnv("BPF_EXPRESSION")
@@ -43,15 +46,28 @@ func main() {
4346
bpfExpression = "tcp and (port 80 or port 443)"
4447
}
4548

46-
devServerEnabled, err := strconv.ParseBool(os.Getenv("FIRETAIL_KUBERNETES_SENSOR_DEV_SERVER_ENABLED"))
47-
if err == nil && devServerEnabled {
48-
slog.Warn("🧰 Development server enabled, starting example HTTP server...")
49-
go func() {
50-
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
51-
fmt.Fprintf(w, "Hello, %s!", r.URL.Path[1:])
52-
})
53-
log.Fatal(http.ListenAndServe(":80", nil))
54-
}()
49+
var ipManager *serviceIpManager
50+
if disableServiceIpFilter, err := strconv.ParseBool(os.Getenv("DISABLE_SERVICE_IP_FILTERING")); !(err == nil && disableServiceIpFilter) {
51+
slog.Info(
52+
"Service IP filter enabled, monitoring service IPs...",
53+
)
54+
ipManager = newServiceIpManager()
55+
}
56+
57+
var maxContentLength int64
58+
onlyLogJson, _ := strconv.ParseBool(os.Getenv("ENABLE_ONLY_LOG_JSON"))
59+
if onlyLogJson {
60+
maxContentLengthStr, maxContentLengthSet := os.LookupEnv("ONLY_LOG_JSON_MAX_CONTENT_LENGTH")
61+
if !maxContentLengthSet {
62+
slog.Info("ONLY_LOG_JSON_MAX_CONTENT_LENGTH environment variable not set, using default: 1MiB")
63+
maxContentLength = 1048576 // 1MiB
64+
} else {
65+
maxContentLength, err = strconv.ParseInt(maxContentLengthStr, 10, 64)
66+
if err != nil {
67+
slog.Error("Failed to parse ONLY_LOG_JSON_MAX_CONTENT_LENGTH, Defaulting to 1MiB.", "Err", err.Error())
68+
maxContentLength = 1048576 // 1MiB
69+
}
70+
}
5571
}
5672

5773
requestAndResponseChannel := make(chan httpRequestAndResponse, 1)
@@ -91,6 +107,16 @@ func main() {
91107
)
92108
continue
93109
}
110+
if onlyLogJson && !isJson(&requestAndResponse, maxContentLength) {
111+
slog.Debug(
112+
"Ignoring non-JSON request:",
113+
"Src", requestAndResponse.src,
114+
"Dst", requestAndResponse.dst,
115+
"SrcPort", requestAndResponse.srcPort,
116+
"DstPort", requestAndResponse.dstPort,
117+
)
118+
continue
119+
}
94120
slog.Debug(
95121
"Captured request and response:",
96122
"Method", requestAndResponse.request.Method,

0 commit comments

Comments
 (0)