Skip to content

Commit 2543b55

Browse files
authored
[chore][exporter/elasticsearch] Fix flaky integration test by 1:1 bulk request to Consume correspondence (open-telemetry#41276)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description Fix flaky integration test with received > sent, caused by race condition where Consume* returns an error halfway through processing a bulk request, causing duplicates in the mock backend because the first N documents went through and an emulated http error causes the entire request to be retried, including the first N documents. <!-- Issue number (e.g. open-telemetry#1234) or full URL to issue, if applicable. --> #### Link to tracking issue Fixes open-telemetry#41208 Fixes open-telemetry#41259 <!--Describe what testing was performed and which tests were added.--> #### Testing <!--Describe the documentation added.--> #### Documentation <!--Please delete paragraphs that you did not use before submitting.-->
1 parent db9f564 commit 2543b55

File tree

1 file changed

+57
-31
lines changed

1 file changed

+57
-31
lines changed

exporter/elasticsearchexporter/integrationtest/datareceiver.go

Lines changed: 57 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -275,17 +275,6 @@ func (es *mockESReceiver) Start(ctx context.Context, host component.Host) error
275275
return fmt.Errorf("failed to bind to address %s: %w", es.config.Endpoint, err)
276276
}
277277

278-
// Ideally bulk request items should be converted to the corresponding event record
279-
// however, since we only assert count for now there is no need to do the actual
280-
// translation. Instead we use a pre-initialized empty models to
281-
// reduce allocation impact on tests and benchmarks.
282-
emptyLogs := plog.NewLogs()
283-
emptyLogs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()
284-
emptyMetrics := pmetric.NewMetrics()
285-
emptyMetrics.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetEmptySum().DataPoints().AppendEmpty()
286-
emptyTrace := ptrace.NewTraces()
287-
emptyTrace.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty()
288-
289278
r := mux.NewRouter()
290279
r.Use(func(next http.Handler) http.Handler {
291280
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
@@ -301,29 +290,66 @@ func (es *mockESReceiver) Start(ctx context.Context, host component.Host) error
301290
fmt.Fprintln(w, "{}")
302291
return
303292
}
293+
var index string
294+
var itemCount int
304295
_, response := docappendertest.DecodeBulkRequest(r)
305296
for _, itemMap := range response.Items {
306-
for k, item := range itemMap {
307-
var consumeErr error
308-
switch item.Index {
309-
case TestLogsIndex:
310-
consumeErr = es.logsConsumer.ConsumeLogs(context.Background(), emptyLogs)
311-
case TestMetricsIndex:
312-
consumeErr = es.metricsConsumer.ConsumeMetrics(context.Background(), emptyMetrics)
313-
case TestTracesIndex:
314-
consumeErr = es.tracesConsumer.ConsumeTraces(context.Background(), emptyTrace)
297+
for _, item := range itemMap {
298+
if index == "" {
299+
index = item.Index
300+
} else if item.Index != index {
301+
panic("mock ES receiver assumes that all documents target the same index")
315302
}
316-
var errES errElasticsearch
317-
if consumeErr != nil {
318-
if !errors.As(consumeErr, &errES) {
319-
// panic to surface test logic error because we only expect error of type errElasticsearch
320-
panic("unknown consume error")
321-
}
322-
if errES.httpStatus != http.StatusOK {
323-
w.WriteHeader(errES.httpStatus)
324-
return
325-
}
326-
response.HasErrors = true
303+
itemCount++
304+
}
305+
}
306+
307+
// Assuming all documents are of the same type (logs, metrics, traces),
308+
// create a pdata struct with the same number of records and send them in 1 Consume* call,
309+
// i.e. a 1:1 bulk request to Consume* function call correspondence.
310+
// This avoids a race condition where Consume* returns an error halfway through processing a bulk request,
311+
// causing duplicates in the mock backend because the first N documents went through and an emulated http error
312+
// causes the entire request to be retried, including the first N documents.
313+
var consumeErr error
314+
switch index {
315+
case TestLogsIndex:
316+
emptyLogs := plog.NewLogs()
317+
lr := emptyLogs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords()
318+
for range itemCount {
319+
lr.AppendEmpty()
320+
}
321+
emptyLogs.MarkReadOnly()
322+
consumeErr = es.logsConsumer.ConsumeLogs(context.Background(), emptyLogs)
323+
case TestMetricsIndex:
324+
emptyMetrics := pmetric.NewMetrics()
325+
dp := emptyMetrics.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetEmptySum().DataPoints()
326+
for range itemCount {
327+
dp.AppendEmpty()
328+
}
329+
emptyMetrics.MarkReadOnly()
330+
consumeErr = es.metricsConsumer.ConsumeMetrics(context.Background(), emptyMetrics)
331+
case TestTracesIndex:
332+
emptyTrace := ptrace.NewTraces()
333+
spans := emptyTrace.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans()
334+
for range itemCount {
335+
spans.AppendEmpty()
336+
}
337+
emptyTrace.MarkReadOnly()
338+
consumeErr = es.tracesConsumer.ConsumeTraces(context.Background(), emptyTrace)
339+
}
340+
if consumeErr != nil {
341+
var errES errElasticsearch
342+
if !errors.As(consumeErr, &errES) {
343+
// panic to surface test logic error because we only expect error of type errElasticsearch
344+
panic("unknown consume error")
345+
}
346+
if errES.httpStatus != http.StatusOK {
347+
w.WriteHeader(errES.httpStatus)
348+
return
349+
}
350+
response.HasErrors = true
351+
for _, itemMap := range response.Items {
352+
for k, item := range itemMap {
327353
item.Status = errES.httpDocStatus
328354
item.Error.Type = "simulated_es_error"
329355
item.Error.Reason = consumeErr.Error()

0 commit comments

Comments
 (0)