Skip to content

Commit dfd77ae

Browse files
authored
AGTMETRICS-398 Fix intake v3 validation request id header (#45566)
### What does this PR do? Fix metrics v3 payloads validation request id header. It should have the same value for both v2 and v3 payloads that carry identical metrics. ### Motivation Bugfix. ### Describe how you validated your changes Tests. ### Additional Notes Co-authored-by: vikentiy.fesunov <vikentiy.fesunov@datadoghq.com>
1 parent 97e692b commit dfd77ae

File tree

4 files changed

+49
-40
lines changed

4 files changed

+49
-40
lines changed

pkg/serializer/internal/metrics/pipeline.go

Lines changed: 5 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,6 @@ import (
1111
"slices"
1212
"strconv"
1313

14-
"github.com/google/uuid"
15-
1614
"github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder/resolver"
1715
"github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder/transaction"
1816
)
@@ -75,21 +73,16 @@ type PipelineConfig struct {
7573

7674
// PipelineDestination describes how to deliver a payload to the intake.
7775
type PipelineDestination struct {
78-
Resolver resolver.DomainResolver
79-
Endpoint transaction.Endpoint
80-
AddValidationHeaders bool
76+
Resolver resolver.DomainResolver
77+
Endpoint transaction.Endpoint
78+
ValidationBatchID string
8179
}
8280

8381
type forwarder interface {
8482
SubmitTransaction(*transaction.HTTPTransaction) error
8583
}
8684

8785
func (dest *PipelineDestination) send(payloads transaction.BytesPayloads, forwarder forwarder, headers http.Header) error {
88-
batchID, err := dest.maybeMakeBatchID()
89-
if err != nil {
90-
return err
91-
}
92-
9386
domain := dest.Resolver.Resolve(dest.Endpoint)
9487
for _, auth := range dest.Resolver.GetAuthorizers() {
9588
for seq, payload := range payloads {
@@ -100,8 +93,8 @@ func (dest *PipelineDestination) send(payloads transaction.BytesPayloads, forwar
10093
for key := range headers {
10194
txn.Headers.Set(key, headers.Get(key))
10295
}
103-
if dest.AddValidationHeaders {
104-
txn.Headers.Set("X-Metrics-Request-ID", batchID)
96+
if dest.ValidationBatchID != "" {
97+
txn.Headers.Set("X-Metrics-Request-ID", dest.ValidationBatchID)
10598
txn.Headers.Set("X-Metrics-Request-Seq", strconv.Itoa(seq))
10699
txn.Headers.Set("X-Metrics-Request-Len", strconv.Itoa(len(payloads)))
107100
}
@@ -116,17 +109,6 @@ func (dest *PipelineDestination) send(payloads transaction.BytesPayloads, forwar
116109
return nil
117110
}
118111

119-
func (dest *PipelineDestination) maybeMakeBatchID() (string, error) {
120-
if dest.AddValidationHeaders {
121-
uuid, err := uuid.NewV7()
122-
if err != nil {
123-
return "", err
124-
}
125-
return uuid.String(), nil
126-
}
127-
return "", nil
128-
}
129-
130112
// PipelineContext holds information needed during and after pipeline execution.
131113
type PipelineContext struct {
132114
Destinations []PipelineDestination

pkg/serializer/internal/metrics/pipeline_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,13 @@ func TestPipelineSendValidate(t *testing.T) {
4646

4747
ctx := PipelineContext{
4848
Destinations: []PipelineDestination{{
49-
Resolver: res1,
50-
Endpoint: endpoints.SeriesEndpoint,
51-
AddValidationHeaders: true,
49+
Resolver: res1,
50+
Endpoint: endpoints.SeriesEndpoint,
51+
ValidationBatchID: "123",
5252
}, {
53-
Resolver: res2,
54-
Endpoint: endpoints.SeriesEndpoint,
55-
AddValidationHeaders: false,
53+
Resolver: res2,
54+
Endpoint: endpoints.SeriesEndpoint,
55+
ValidationBatchID: "",
5656
}}}
5757

5858
ctx.addPayload(transaction.NewBytesPayload([]byte{1}, 1))
@@ -68,14 +68,14 @@ func TestPipelineSendValidate(t *testing.T) {
6868
func(t require.TestingT, _ int, txn *transaction.HTTPTransaction) {
6969
require.Equal(t, "http://example.test", txn.Domain)
7070
require.Equal(t, []byte{1}, txn.Payload.GetContent())
71-
require.NotEmpty(t, txn.Headers.Get("x-metrics-request-id"))
71+
require.Equal(t, "123", txn.Headers.Get("x-metrics-request-id"))
7272
require.Equal(t, "0", txn.Headers.Get("x-metrics-request-seq"))
7373
require.Equal(t, "2", txn.Headers.Get("x-metrics-request-len"))
7474
},
7575
func(t require.TestingT, _ int, txn *transaction.HTTPTransaction) {
7676
require.Equal(t, "http://example.test", txn.Domain)
7777
require.Equal(t, []byte{2}, txn.Payload.GetContent())
78-
require.NotEmpty(t, txn.Headers.Get("x-metrics-request-id"))
78+
require.Equal(t, "123", txn.Headers.Get("x-metrics-request-id"))
7979
require.Equal(t, "1", txn.Headers.Get("x-metrics-request-seq"))
8080
require.Equal(t, "2", txn.Headers.Get("x-metrics-request-len"))
8181
},

pkg/serializer/metrics.go

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ import (
99
"fmt"
1010
"slices"
1111

12+
"github.com/google/uuid"
13+
1214
"github.com/DataDog/datadog-agent/comp/core/config"
1315
"github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder/endpoints"
1416
"github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder/resolver"
@@ -71,11 +73,15 @@ func (s *Serializer) buildPipelines(kind metricsKind) metrics.PipelineSet {
7173
for _, resolver := range s.Forwarder.GetDomainResolvers() {
7274
useV3 := metricsUseV3(resolver, s.config, kind)
7375
validateV3 := useV3 && validateV3
76+
batchID := ""
77+
if validateV3 {
78+
batchID = s.genUUID()
79+
}
7480

7581
dest := metrics.PipelineDestination{
76-
Resolver: resolver,
77-
Endpoint: metricsEndpointFor(kind, useV3),
78-
AddValidationHeaders: validateV3,
82+
Resolver: resolver,
83+
Endpoint: metricsEndpointFor(kind, useV3),
84+
ValidationBatchID: batchID,
7985
}
8086

8187
switch {
@@ -110,9 +116,9 @@ func (s *Serializer) buildPipelines(kind metricsKind) metrics.PipelineSet {
110116
V3: false,
111117
}
112118
vdest := metrics.PipelineDestination{
113-
Resolver: resolver,
114-
Endpoint: metricsEndpointFor(kind, false),
115-
AddValidationHeaders: true,
119+
Resolver: resolver,
120+
Endpoint: metricsEndpointFor(kind, false),
121+
ValidationBatchID: batchID,
116122
}
117123
pipelines.Add(vconf, vdest)
118124
}
@@ -121,3 +127,12 @@ func (s *Serializer) buildPipelines(kind metricsKind) metrics.PipelineSet {
121127

122128
return pipelines
123129
}
130+
131+
func (s *Serializer) genUUID() string {
132+
uuid, err := uuid.NewV7()
133+
if err != nil {
134+
s.logger.Warnf("failed to generate payload batch id: %v", err)
135+
return ""
136+
}
137+
return uuid.String()
138+
}

pkg/serializer/metrics_test.go

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -379,6 +379,18 @@ func TestPipelinesWithV3Validate(t *testing.T) {
379379

380380
pipelines := s.buildPipelines(metricsKindSeries)
381381

382+
batchID := ""
383+
outer:
384+
for _, ctx := range pipelines {
385+
for _, d := range ctx.Destinations {
386+
if d.ValidationBatchID != "" {
387+
batchID = d.ValidationBatchID
388+
break outer
389+
}
390+
}
391+
}
392+
require.NotEmpty(t, batchID)
393+
382394
testutil.ElementsMatchFn(t, maps.All(pipelines),
383395
// v3 pipeline has one destination...
384396
func(t require.TestingT, conf metrics.PipelineConfig, ctx *metrics.PipelineContext) {
@@ -389,7 +401,7 @@ func TestPipelinesWithV3Validate(t *testing.T) {
389401
func(t require.TestingT, _ int, dest metrics.PipelineDestination) {
390402
require.Equal(t, "http://example.test", dest.Resolver.GetConfigName())
391403
require.Equal(t, endpoints.V3SeriesEndpoint, dest.Endpoint)
392-
require.True(t, dest.AddValidationHeaders)
404+
require.Equal(t, batchID, dest.ValidationBatchID)
393405
})
394406
},
395407
// v2 pipeline has two destinations...
@@ -401,13 +413,13 @@ func TestPipelinesWithV3Validate(t *testing.T) {
401413
func(t require.TestingT, _ int, dest metrics.PipelineDestination) {
402414
require.Equal(t, "http://example.test", dest.Resolver.GetConfigName())
403415
require.Equal(t, endpoints.SeriesEndpoint, dest.Endpoint)
404-
require.True(t, dest.AddValidationHeaders)
416+
require.Equal(t, batchID, dest.ValidationBatchID)
405417
},
406418
// ... to the alternative domain without validation headers
407419
func(t require.TestingT, _ int, dest metrics.PipelineDestination) {
408420
require.Equal(t, "http://another.test", dest.Resolver.GetConfigName())
409421
require.Equal(t, endpoints.SeriesEndpoint, dest.Endpoint)
410-
require.False(t, dest.AddValidationHeaders)
422+
require.Empty(t, dest.ValidationBatchID)
411423
},
412424
)
413425
},

0 commit comments

Comments
 (0)