Skip to content

Commit b2a4cee

Browse files
authored
feat(appsec/proxy): add metrics (#4072)
Co-authored-by: flavien.darche <[email protected]>
1 parent 3b824da commit b2a4cee

File tree

14 files changed

+514
-235
lines changed

14 files changed

+514
-235
lines changed

contrib/envoyproxy/go-control-plane/cmd/serviceextensions/env.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,16 @@
11
// Unless explicitly stated otherwise all files in this repository are licensed
22
// under the Apache License Version 2.0.
33
// This product includes software developed at Datadog (https://www.datadoghq.com/).
4-
// Copyright 2016 Datadog, Inc.
4+
// Copyright 2025 Datadog, Inc.
55

66
package main
77

88
import (
99
"net"
1010
"strconv"
1111

12+
gocontrolplane "github.com/DataDog/dd-trace-go/contrib/envoyproxy/go-control-plane/v2"
13+
"github.com/DataDog/dd-trace-go/v2/instrumentation"
1214
"github.com/DataDog/dd-trace-go/v2/instrumentation/env"
1315
)
1416

@@ -17,13 +19,16 @@ import (
1719
func intEnv(key string, def int) int {
1820
vv, ok := env.Lookup(key)
1921
if !ok {
22+
gocontrolplane.Instrumentation().TelemetryRegisterAppConfig(key, def, instrumentation.TelemetryOriginDefault)
2023
return def
2124
}
2225
v, err := strconv.Atoi(vv)
2326
if err != nil {
2427
log.Warn("Non-integer value for env var %s, defaulting to %d. Parse failed with error: %v", key, def, err)
28+
gocontrolplane.Instrumentation().TelemetryRegisterAppConfig(key, def, instrumentation.TelemetryOriginDefault)
2529
return def
2630
}
31+
gocontrolplane.Instrumentation().TelemetryRegisterAppConfig(key, def, instrumentation.TelemetryOriginEnvVar)
2732
return v
2833
}
2934

@@ -39,22 +44,25 @@ func intEnvNil(key string) *int {
3944
log.Warn("Non-integer value for env var %s. Parse failed with error: %v", key, err)
4045
return nil
4146
}
47+
gocontrolplane.Instrumentation().TelemetryRegisterAppConfig(key, &v, instrumentation.TelemetryOriginEnvVar)
4248
return &v
4349
}
4450

4551
// IpEnv returns the valid IP value of an environment variable, or def otherwise.
4652
func ipEnv(key string, def net.IP) net.IP {
4753
vv, ok := env.Lookup(key)
4854
if !ok {
55+
gocontrolplane.Instrumentation().TelemetryRegisterAppConfig(key, def.String(), instrumentation.TelemetryOriginDefault)
4956
return def
5057
}
5158

5259
ip := net.ParseIP(vv)
5360
if ip == nil {
5461
log.Warn("Non-IP value for env var %s, defaulting to %s", key, def.String())
62+
gocontrolplane.Instrumentation().TelemetryRegisterAppConfig(key, def.String(), instrumentation.TelemetryOriginDefault)
5563
return def
5664
}
57-
65+
gocontrolplane.Instrumentation().TelemetryRegisterAppConfig(key, vv, instrumentation.TelemetryOriginEnvVar)
5866
return ip
5967
}
6068

@@ -63,13 +71,16 @@ func ipEnv(key string, def net.IP) net.IP {
6371
func boolEnv(key string, def bool) bool {
6472
vv, ok := env.Lookup(key)
6573
if !ok {
74+
gocontrolplane.Instrumentation().TelemetryRegisterAppConfig(key, def, instrumentation.TelemetryOriginDefault)
6675
return def
6776
}
6877
v, err := strconv.ParseBool(vv)
6978
if err != nil {
7079
log.Warn("Non-boolean value for env var %s, defaulting to %t. Parse failed with error: %v", key, def, err)
80+
gocontrolplane.Instrumentation().TelemetryRegisterAppConfig(key, def, instrumentation.TelemetryOriginDefault)
7181
return def
7282
}
83+
gocontrolplane.Instrumentation().TelemetryRegisterAppConfig(key, v, instrumentation.TelemetryOriginEnvVar)
7384
return v
7485
}
7586

@@ -78,7 +89,9 @@ func boolEnv(key string, def bool) bool {
7889
func stringEnv(key, def string) string {
7990
v, ok := env.Lookup(key)
8091
if !ok {
92+
gocontrolplane.Instrumentation().TelemetryRegisterAppConfig(key, v, instrumentation.TelemetryOriginDefault)
8193
return def
8294
}
95+
gocontrolplane.Instrumentation().TelemetryRegisterAppConfig(key, v, instrumentation.TelemetryOriginEnvVar)
8396
return v
8497
}

contrib/envoyproxy/go-control-plane/cmd/serviceextensions/main.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,11 +63,16 @@ func getDefaultEnvVars() map[string]string {
6363
// initializeEnvironment sets up required environment variables with their defaults
6464
func initializeEnvironment() {
6565
for k, v := range getDefaultEnvVars() {
66-
if env.Get(k) == "" {
66+
setValue := env.Get(k)
67+
if setValue == "" {
6768
if err := os.Setenv(k, v); err != nil {
6869
log.Error("service_extension: failed to set %s environment variable: %s\n", k, err.Error())
70+
continue
6971
}
72+
gocontrolplane.Instrumentation().TelemetryRegisterAppConfig(k, v, instrumentation.TelemetryOriginDefault)
73+
continue
7074
}
75+
gocontrolplane.Instrumentation().TelemetryRegisterAppConfig(k, setValue, instrumentation.TelemetryOriginEnvVar)
7176
}
7277
}
7378

contrib/envoyproxy/go-control-plane/envoy.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,11 @@ func init() {
2424
instr = instrumentation.Load(instrumentation.PackageEnvoyProxyGoControlPlane)
2525
}
2626

27+
// Instrumentation returns the instrumentation.Instrumentation package instrumentation
28+
func Instrumentation() *instrumentation.Instrumentation {
29+
return instr
30+
}
31+
2732
// Integration represents the proxy integration type that is used for the External Processing.
2833
type Integration int
2934

contrib/haproxy/stream-processing-offload/cmd/spoa/env.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,37 +7,45 @@ package main
77

88
import (
99
"net"
10-
"os"
1110
"strconv"
11+
12+
streamprocessingoffload "github.com/DataDog/dd-trace-go/contrib/haproxy/stream-processing-offload/v2"
13+
"github.com/DataDog/dd-trace-go/v2/instrumentation"
14+
"github.com/DataDog/dd-trace-go/v2/instrumentation/env"
1215
)
1316

1417
// IntEnv returns the parsed int value of an environment variable, or
1518
// def otherwise.
1619
func intEnv(key string, def int) int {
17-
vv, ok := os.LookupEnv(key)
20+
vv, ok := env.Lookup(key)
1821
if !ok {
22+
streamprocessingoffload.Instrumentation().TelemetryRegisterAppConfig(key, def, instrumentation.TelemetryOriginDefault)
1923
return def
2024
}
2125
v, err := strconv.Atoi(vv)
2226
if err != nil {
2327
log.Warn("Non-integer value for env var %s, defaulting to %d. Parse failed with error: %v", key, def, err)
28+
streamprocessingoffload.Instrumentation().TelemetryRegisterAppConfig(key, def, instrumentation.TelemetryOriginDefault)
2429
return def
2530
}
31+
streamprocessingoffload.Instrumentation().TelemetryRegisterAppConfig(key, def, instrumentation.TelemetryOriginEnvVar)
2632
return v
2733
}
2834

2935
// IpEnv returns the valid IP value of an environment variable, or def otherwise.
3036
func ipEnv(key string, def net.IP) net.IP {
31-
vv, ok := os.LookupEnv(key)
37+
vv, ok := env.Lookup(key)
3238
if !ok {
39+
streamprocessingoffload.Instrumentation().TelemetryRegisterAppConfig(key, def.String(), instrumentation.TelemetryOriginDefault)
3340
return def
3441
}
3542

3643
ip := net.ParseIP(vv)
3744
if ip == nil {
3845
log.Warn("Non-IP value for env var %s, defaulting to %s", key, def.String())
46+
streamprocessingoffload.Instrumentation().TelemetryRegisterAppConfig(key, def.String(), instrumentation.TelemetryOriginDefault)
3947
return def
4048
}
41-
49+
streamprocessingoffload.Instrumentation().TelemetryRegisterAppConfig(key, vv, instrumentation.TelemetryOriginEnvVar)
4250
return ip
4351
}

contrib/haproxy/stream-processing-offload/cmd/spoa/log.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ type Logger struct {
1717

1818
// NewLogger creates a new Logger instance
1919
func NewLogger() *Logger {
20-
return &Logger{streamprocessingoffload.Logger()}
20+
return &Logger{streamprocessingoffload.Instrumentation().Logger()}
2121
}
2222

2323
func (l Logger) Errorf(format string, args ...interface{}) {

contrib/haproxy/stream-processing-offload/cmd/spoa/main.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"syscall"
1818
"time"
1919

20+
"github.com/DataDog/dd-trace-go/v2/instrumentation/env"
2021
"github.com/negasus/haproxy-spoe-go/agent"
2122

2223
"github.com/DataDog/dd-trace-go/contrib/haproxy/stream-processing-offload/v2"
@@ -47,11 +48,16 @@ func getDefaultEnvVars() map[string]string {
4748
// initializeEnvironment sets up required environment variables with their defaults
4849
func initializeEnvironment() {
4950
for k, v := range getDefaultEnvVars() {
50-
if os.Getenv(k) == "" {
51+
setValue := env.Get(k)
52+
if setValue == "" {
5153
if err := os.Setenv(k, v); err != nil {
5254
log.Error("haproxy_spoa: failed to set %s environment variable: %s\n", k, err.Error())
55+
continue
5356
}
57+
streamprocessingoffload.Instrumentation().TelemetryRegisterAppConfig(k, v, instrumentation.TelemetryOriginDefault)
58+
continue
5459
}
60+
streamprocessingoffload.Instrumentation().TelemetryRegisterAppConfig(k, setValue, instrumentation.TelemetryOriginEnvVar)
5561
}
5662
}
5763

contrib/haproxy/stream-processing-offload/haproxy.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,9 @@ func init() {
2323
instr = instrumentation.Load(instrumentation.PackageHAProxyStreamProcessingOffload)
2424
}
2525

26-
// Logger returns the integration logger for the HAProxy Stream Processing Offload package
27-
func Logger() instrumentation.Logger {
28-
return instr.Logger()
26+
// Instrumentation returns the instrumentation.Instrumentation package instrumentation
27+
func Instrumentation() *instrumentation.Instrumentation {
28+
return instr
2929
}
3030

3131
// HAProxySPOA defines the AppSec HAProxy Stream Processing Offload Agent

instrumentation/appsec/proxy/message_processor.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ func (mp *Processor) OnRequestHeaders(ctx context.Context, req RequestHeaders) (
7878
if bodyLimit <= 0 {
7979
mp.instr.Logger().Info("external_processing: body parsing size limit set to 0 or negative. The request and response bodies will NOT be analyzed.")
8080
}
81+
RegisterConfig(mp)
8182
mp.instr.Logger().Info("external_processing: first request received. Configuration: BlockingUnavailable=%v, BodyParsingSizeLimit=%dB, Framework=%s", mp.BlockingUnavailable, mp.computedBodyParsingSizeLimit.Load(), mp.Framework)
8283
})
8384

@@ -109,7 +110,6 @@ func (mp *Processor) OnRequestHeaders(ctx context.Context, req RequestHeaders) (
109110

110111
if !req.GetEndOfStream() && mp.isBodySupported(httpRequest.Header.Get("Content-Type")) {
111112
reqState.State = MessageTypeRequestBody
112-
// Todo: Set telemetry body size (using content-length)
113113
}
114114

115115
if err := mp.ContinueMessageFunc(reqState.Context, ContinueActionOptions{
@@ -143,7 +143,7 @@ func (mp *Processor) OnRequestBody(req HTTPBody, reqState *RequestState) error {
143143
return mp.ContinueMessageFunc(reqState.Context, ContinueActionOptions{MessageType: MessageTypeRequestBody})
144144
}
145145

146-
blocked := processBody(reqState.Context, reqState.requestBuffer, req.GetBody(), req.GetEndOfStream(), appsec.MonitorParsedHTTPBody)
146+
blocked := processBody(reqState.Context, reqState.requestBuffer, req.GetBody(), req.GetEndOfStream(), appsec.MonitorParsedHTTPBody, "request")
147147
if blocked != nil && !mp.BlockingUnavailable {
148148
mp.instr.Logger().Debug("external_processing: request blocked, end the stream")
149149
actionOpts := reqState.BlockAction()
@@ -187,7 +187,6 @@ func (mp *Processor) OnResponseHeaders(res ResponseHeaders, reqState *RequestSta
187187
}
188188
}
189189

190-
// TODO: Set telemetry body size (using content-length)
191190
reqState.State = MessageTypeResponseBody
192191

193192
// Run the waf on the response headers only when we are sure to not receive a response body
@@ -226,7 +225,7 @@ func (mp *Processor) OnResponseBody(resp HTTPBody, reqState *RequestState) error
226225
return io.EOF
227226
}
228227

229-
blocked := processBody(reqState.Context, reqState.responseBuffer, resp.GetBody(), resp.GetEndOfStream(), appsec.MonitorHTTPResponseBody)
228+
blocked := processBody(reqState.Context, reqState.responseBuffer, resp.GetBody(), resp.GetEndOfStream(), appsec.MonitorHTTPResponseBody, "response")
230229
if reqState.responseBuffer.analyzed {
231230
reqState.Close() // Call Close to ensure the response headers are analyzed
232231

@@ -260,14 +259,15 @@ func (mp *Processor) OnResponseTrailers(reqState *RequestState) error {
260259
return mp.ContinueMessageFunc(reqState.Context, ContinueActionOptions{MessageType: MessageTypeResponseTrailers})
261260
}
262261

263-
func processBody(ctx context.Context, bodyBuffer *bodyBuffer, body []byte, eos bool, analyzeBody func(ctx context.Context, encodable any) error) error {
262+
func processBody(ctx context.Context, bodyBuffer *bodyBuffer, body []byte, eos bool, analyzeBody func(ctx context.Context, encodable any) error, direction string) error {
264263
if bodyBuffer.analyzed {
265264
return nil
266265
}
267266

268267
bodyBuffer.append(body)
269268

270269
if eos || bodyBuffer.truncated {
270+
EmitBodySize(len(bodyBuffer.buffer), direction, bodyBuffer.truncated)
271271
bodyBuffer.analyzed = true
272272
return analyzeBody(ctx, json.NewEncodableFromData(bodyBuffer.buffer, bodyBuffer.truncated))
273273
}

instrumentation/appsec/proxy/metrics.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,12 @@ package proxy
77

88
import (
99
"context"
10+
"strconv"
1011
"sync/atomic"
1112
"time"
1213

1314
"github.com/DataDog/dd-trace-go/v2/instrumentation"
15+
"github.com/DataDog/dd-trace-go/v2/internal/telemetry"
1416
)
1517

1618
type metrics struct {
@@ -44,3 +46,18 @@ func newMetricsReporter(ctx context.Context, logger instrumentation.Logger) *met
4446
func (m *metrics) incrementRequestCount() {
4547
m.requestCounter.Add(1)
4648
}
49+
50+
func EmitBodySize(bodySize int, direction string, truncated bool) {
51+
telemetry.Distribution(telemetry.NamespaceAppSec, "instrum.body_size", []string{
52+
"direction:" + direction,
53+
"truncated:" + strconv.FormatBool(truncated),
54+
}).Submit(float64(bodySize))
55+
}
56+
57+
func RegisterConfig(mp *Processor) {
58+
telemetry.RegisterAppConfigs(
59+
telemetry.Configuration{Name: "appsec.proxy.blockingUnavailable", Value: mp.BlockingUnavailable},
60+
telemetry.Configuration{Name: "appsec.proxy.bodyParsingSizeLimit", Value: mp.computedBodyParsingSizeLimit.Load()},
61+
telemetry.Configuration{Name: "appsec.proxy.framework", Value: mp.Framework},
62+
)
63+
}

0 commit comments

Comments
 (0)