Skip to content

Commit 1a0cf76

Browse files
AGTMETRICS-487 Add dogstatsd http server component (#45445)
### What does this PR do? Add server for http-based dogstatsd variant for pre-aggregated scalar metrics. See https://datadoghq.atlassian.net/wiki/spaces/AM/pages/6164448193 ### Motivation Improve reliability and throughput when sending pre-aggregated metrics. ### Describe how you validated your changes Unit tests, fuzzing for parser, and manual tests with the client poc. ### Additional Notes Co-authored-by: carlosroman <carlos.roman@datadoghq.com>
1 parent f88aba5 commit 1a0cf76

File tree

20 files changed

+7707
-0
lines changed

20 files changed

+7707
-0
lines changed

.github/CODEOWNERS

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -557,6 +557,7 @@
557557
/pkg/privateactionrunner/ @DataDog/action-platform
558558
/pkg/privateactionrunner/bundles/ddagent/networkpath @DataDog/action-platform @Datadog/network-path
559559
/pkg/proto/ @DataDog/agent-runtimes
560+
/pkg/proto/datadog/dogstatsdhttp @DataDog/agent-metric-pipelines
560561
/pkg/proto/datadog/kubemetadata @DataDog/container-platform
561562
/pkg/proto/datadog/languagedetection @DataDog/container-experiences
562563
/pkg/proto/datadog/privateactionrunner @DataDog/action-platform

cmd/agent/subcommands/run/command.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,8 @@ import (
108108
"github.com/DataDog/datadog-agent/comp/core/workloadmeta/defaults"
109109
workloadmetafx "github.com/DataDog/datadog-agent/comp/core/workloadmeta/fx"
110110
"github.com/DataDog/datadog-agent/comp/dogstatsd"
111+
dogstatsdhttp "github.com/DataDog/datadog-agent/comp/dogstatsd/http/def"
112+
dogstatsdhttpfx "github.com/DataDog/datadog-agent/comp/dogstatsd/http/fx"
111113
replay "github.com/DataDog/datadog-agent/comp/dogstatsd/replay/def"
112114
dogstatsdServer "github.com/DataDog/datadog-agent/comp/dogstatsd/server"
113115
dogstatsddebug "github.com/DataDog/datadog-agent/comp/dogstatsd/serverDebug"
@@ -264,6 +266,7 @@ func run(log log.Component,
264266
telemetry telemetry.Component,
265267
sysprobeConf sysprobeconfig.Component,
266268
server dogstatsdServer.Component,
269+
_ dogstatsdhttp.Component,
267270
_ replay.Component,
268271
_ defaultforwarder.Component,
269272
wmeta workloadmeta.Component,
@@ -458,6 +461,7 @@ func getSharedFxOption() fx.Option {
458461
demultiplexerimpl.Module(demultiplexerimpl.NewDefaultParams(demultiplexerimpl.WithDogstatsdNoAggregationPipelineConfig())),
459462
demultiplexerendpointfx.Module(),
460463
dogstatsd.Bundle(dogstatsdServer.Params{Serverless: false}),
464+
dogstatsdhttpfx.Module(),
461465
fx.Provide(func(logsagent option.Option[logsAgent.Component]) option.Option[logsagentpipeline.Component] {
462466
if la, ok := logsagent.Get(); ok {
463467
return option.New[logsagentpipeline.Component](la)

comp/README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,10 @@ Package workloadmeta provides the workloadmeta component for the Datadog Agent
246246

247247

248248

249+
### [comp/dogstatsd/http](https://pkg.go.dev/github.com/DataDog/datadog-agent/comp/dogstatsd/http)
250+
251+
Package http defines dogstatsd http server component
252+
249253
### [comp/dogstatsd/pidmap](https://pkg.go.dev/github.com/DataDog/datadog-agent/comp/dogstatsd/pidmap)
250254

251255
Package pidmap implements a component for tracking pid and containerID relations
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
// Unless explicitly stated otherwise all files in this repository are licensed
2+
// under the Apache License Version 2.0.
3+
// This product includes software developed at Datadog (https://www.datadoghq.com/).
4+
// Copyright 2016-present Datadog, Inc.
5+
6+
// Package http defines dogstatsd http server component
7+
package http
8+
9+
// team: agent-metric-pipelines
10+
11+
// Component is the dogstatsd http server component. It does not have any methods.
12+
type Component interface{}

comp/dogstatsd/http/fx/fx.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
// Unless explicitly stated otherwise all files in this repository are licensed
2+
// under the Apache License Version 2.0.
3+
// This product includes software developed at Datadog (https://www.datadoghq.com/).
4+
// Copyright 2016-present Datadog, Inc.
5+
6+
// Package fx defines the fx options for this component.
7+
package fx
8+
9+
import (
10+
impl "github.com/DataDog/datadog-agent/comp/dogstatsd/http/impl"
11+
"github.com/DataDog/datadog-agent/pkg/util/fxutil"
12+
)
13+
14+
// Module defines the fx options for this component
15+
func Module() fxutil.Module {
16+
return fxutil.Component(
17+
fxutil.ProvideComponentConstructor(
18+
impl.NewComponent,
19+
),
20+
)
21+
}

comp/dogstatsd/http/impl/fx.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
// Unless explicitly stated otherwise all files in this repository are licensed
2+
// under the Apache License Version 2.0.
3+
// This product includes software developed at Datadog (https://www.datadoghq.com/).
4+
// Copyright 2016-present Datadog, Inc.
5+
6+
package httpimpl
7+
8+
import (
9+
"github.com/DataDog/datadog-agent/comp/aggregator/demultiplexer"
10+
"github.com/DataDog/datadog-agent/comp/core/config"
11+
hostname "github.com/DataDog/datadog-agent/comp/core/hostname/hostnameinterface"
12+
log "github.com/DataDog/datadog-agent/comp/core/log/def"
13+
tagger "github.com/DataDog/datadog-agent/comp/core/tagger/def"
14+
comp "github.com/DataDog/datadog-agent/comp/def"
15+
def "github.com/DataDog/datadog-agent/comp/dogstatsd/http/def"
16+
)
17+
18+
// Requires declares the inputs for NewComponent
19+
type Requires struct {
20+
Lc comp.Lifecycle
21+
Log log.Component
22+
Config config.Component
23+
Tagger tagger.Component
24+
Hostname hostname.Component
25+
Demux demultiplexer.Component
26+
}
27+
28+
// Provides defines the output of this component
29+
type Provides struct {
30+
Component def.Component
31+
}
32+
33+
func NewComponent(req Requires) (Provides, error) {
34+
s := &server{
35+
config: req.Config,
36+
log: req.Log,
37+
tagger: req.Tagger,
38+
hostname: req.Hostname,
39+
40+
out: req.Demux.Serializer(),
41+
}
42+
43+
req.Lc.Append(comp.Hook{
44+
OnStart: s.start,
45+
OnStop: s.stop,
46+
})
47+
48+
return Provides{
49+
Component: s,
50+
}, nil
51+
}
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
// Unless explicitly stated otherwise all files in this repository are licensed
2+
// under the Apache License Version 2.0.
3+
// This product includes software developed at Datadog (https://www.datadoghq.com/).
4+
// Copyright 2016-present Datadog, Inc.
5+
6+
package httpimpl
7+
8+
import (
9+
"fmt"
10+
"io"
11+
"net/http"
12+
13+
log "github.com/DataDog/datadog-agent/comp/core/log/def"
14+
tagger "github.com/DataDog/datadog-agent/comp/core/tagger/def"
15+
pb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/dogstatsdhttp"
16+
)
17+
18+
type requestCtx struct {
19+
prefix string
20+
log log.Component
21+
writer http.ResponseWriter
22+
}
23+
24+
func (ctx *requestCtx) debugf(format string, args ...any) {
25+
ctx.log.Debugf(ctx.prefix+format, args...)
26+
}
27+
28+
func (ctx *requestCtx) errorf(format string, args ...any) {
29+
ctx.log.Errorf(ctx.prefix+format, args...)
30+
}
31+
32+
func (ctx *requestCtx) respond(status int, format string, args ...any) {
33+
msg := fmt.Sprintf(format, args...)
34+
ctx.debugf("complete with status %d: %q", status, msg)
35+
ctx.writer.WriteHeader(status)
36+
_, err := ctx.writer.Write([]byte(msg))
37+
if err != nil {
38+
ctx.debugf("failed to write response: %v", err)
39+
}
40+
}
41+
42+
type handlerBase struct {
43+
log log.Component
44+
tagger tagger.Component
45+
hostname string
46+
out serializer
47+
}
48+
49+
type seriesHandler struct {
50+
handlerBase
51+
}
52+
53+
func (h *seriesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
54+
ctx := requestCtx{
55+
prefix: fmt.Sprintf("dogstatsdhttp %q: ", r.RemoteAddr),
56+
log: h.log,
57+
writer: w,
58+
}
59+
60+
origin, err := originFromHeader(r.Header, h.tagger)
61+
if err != nil {
62+
ctx.respond(http.StatusBadRequest, "origin detection error: %v", err)
63+
return
64+
}
65+
66+
body, err := io.ReadAll(r.Body)
67+
r.Body.Close()
68+
if err != nil {
69+
ctx.respond(http.StatusBadRequest, "error reading body: %v", err)
70+
return
71+
}
72+
73+
var payload pb.Payload
74+
err = payload.UnmarshalVT(body)
75+
if err != nil {
76+
ctx.respond(http.StatusBadRequest, "error parsing payload: %v", err)
77+
return
78+
}
79+
80+
it, err := newSeriesIterator(&payload, origin, h.hostname)
81+
if err != nil {
82+
ctx.respond(http.StatusBadRequest, "error decoding payload dictionaries: %v", err)
83+
return
84+
}
85+
86+
err = h.out.SendIterableSeries(it)
87+
if err != nil {
88+
// this error indicates a failure in the agent itself, so we don't
89+
// propagate it to the client
90+
ctx.errorf("failed to serialize payload: %v", err)
91+
ctx.respond(http.StatusInternalServerError, "internal error")
92+
return
93+
}
94+
95+
if it.err != nil {
96+
ctx.respond(http.StatusBadRequest, "error decoding payload: %v", it.err)
97+
return
98+
}
99+
100+
ctx.respond(http.StatusOK, "OK")
101+
}

0 commit comments

Comments
 (0)