Skip to content

Commit 4c392a7

Browse files
[exporter/elasticsearch] Fix regression retry::max_retries not applying correctly for HTTP request levels retries (open-telemetry#41142)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description `retry::max_retries` that gets passed to ES client is incorrectly overwritten if `retry::max_retries` > 2. This causes HTTP requests to give up retrying much sooner than expected, which affects scenario where ES is down and data may be lost due to this bug. This was a regression from open-telemetry#37333, released in v0.120.0 Re-enable integration test. <!-- Issue number (e.g. open-telemetry#1234) or full URL to issue, if applicable. --> #### Link to tracking issue Fixes open-telemetry#39670 Fixes open-telemetry#40488 <!--Describe what testing was performed and which tests were added.--> #### Testing <!--Describe the documentation added.--> #### Documentation <!--Please delete paragraphs that you did not use before submitting.--> --------- Co-authored-by: Andrzej Stencel <[email protected]>
1 parent 0b47fbd commit 4c392a7

File tree

5 files changed

+155
-35
lines changed

5 files changed

+155
-35
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: bug_fix
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: elasticsearchexporter
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Fix regression `retry::max_retries` not applying correctly for HTTP request levels retries
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [39670]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: The regression affected versions from v0.120.0 and might cause data loss due to prematurely stopping HTTP request level retries, e.g. when ES is unavailable, as it was cap to a maximum of 2 retries.
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

exporter/elasticsearchexporter/esclient.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,11 @@ func newElasticsearchClient(
129129
componentHost: host,
130130
}
131131

132+
maxRetries := defaultMaxRetries
133+
if config.Retry.MaxRetries != 0 {
134+
maxRetries = config.Retry.MaxRetries
135+
}
136+
132137
return elasticsearchv8.NewClient(elasticsearchv8.Config{
133138
Transport: httpClient.Transport,
134139

@@ -145,7 +150,7 @@ func newElasticsearchClient(
145150
RetryOnError: func(_ *http.Request, err error) bool {
146151
return !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded)
147152
},
148-
MaxRetries: min(defaultMaxRetries, config.Retry.MaxRetries),
153+
MaxRetries: maxRetries,
149154
RetryBackoff: createElasticsearchBackoffFunc(&config.Retry),
150155

151156
// configure sniffing

exporter/elasticsearchexporter/exporter_test.go

Lines changed: 80 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"testing"
1818
"time"
1919

20+
"github.com/elastic/go-docappender/v2"
2021
"github.com/stretchr/testify/assert"
2122
"github.com/stretchr/testify/require"
2223
"github.com/tidwall/gjson"
@@ -496,23 +497,89 @@ func TestExporterLogs(t *testing.T) {
496497
}
497498
})
498499

500+
t.Run("retry http request batcher", func(t *testing.T) {
501+
for _, maxRetries := range []int{0, 1, 11} {
502+
t.Run(fmt.Sprintf("max retries %d", maxRetries), func(t *testing.T) {
503+
t.Parallel()
504+
expectedRetries := maxRetries
505+
if maxRetries == 0 {
506+
expectedRetries = defaultMaxRetries
507+
}
508+
509+
var attempts atomic.Int64
510+
rec := newBulkRecorder()
511+
server := newESTestServer(t, func(_ []itemRequest) ([]itemResponse, error) {
512+
// always return error, and assert that the number of attempts is expected, not more, not less.
513+
attempts.Add(1)
514+
return nil, &httpTestError{status: http.StatusServiceUnavailable, message: "oops"}
515+
})
516+
517+
exporter := newTestLogsExporter(t, server.URL, func(cfg *Config) {
518+
cfg.Retry.Enabled = true
519+
cfg.Retry.RetryOnStatus = []int{http.StatusServiceUnavailable}
520+
cfg.Retry.MaxRetries = maxRetries
521+
cfg.Retry.InitialInterval = 1 * time.Millisecond
522+
cfg.Retry.MaxInterval = 5 * time.Millisecond
523+
524+
// use sync bulk indexer
525+
cfg.Batcher.Enabled = false
526+
cfg.Batcher.enabledSet = true
527+
})
528+
529+
logs := plog.NewLogs()
530+
resourceLogs := logs.ResourceLogs().AppendEmpty()
531+
scopeLogs := resourceLogs.ScopeLogs().AppendEmpty()
532+
scopeLogs.LogRecords().AppendEmpty()
533+
logs.MarkReadOnly()
534+
err := exporter.ConsumeLogs(context.Background(), logs) // as sync bulk indexer is used, retries are finished on return
535+
var errFlushFailed docappender.ErrorFlushFailed
536+
require.ErrorAs(t, err, &errFlushFailed)
537+
538+
assert.Equal(t, 0, rec.countItems())
539+
assert.Equal(t, int64(expectedRetries+1), attempts.Load()) // initial request + retries
540+
})
541+
}
542+
})
543+
499544
t.Run("retry http request", func(t *testing.T) {
500-
failures := 0
501-
rec := newBulkRecorder()
502-
server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) {
503-
if failures == 0 {
504-
failures++
505-
return nil, &httpTestError{status: http.StatusTooManyRequests, message: "oops"}
506-
}
545+
for _, maxRetries := range []int{0, 1, 11} {
546+
t.Run(fmt.Sprintf("max retries %d", maxRetries), func(t *testing.T) {
547+
t.Parallel()
548+
expectedRetries := maxRetries
549+
if maxRetries == 0 {
550+
expectedRetries = defaultMaxRetries
551+
}
507552

508-
rec.Record(docs)
509-
return itemsAllOK(docs)
510-
})
553+
var attempts atomic.Int64
554+
rec := newBulkRecorder()
555+
server := newESTestServer(t, func(_ []itemRequest) ([]itemResponse, error) {
556+
// always return error, and assert that the number of attempts is expected, not more, not less.
557+
attempts.Add(1)
558+
return nil, &httpTestError{status: http.StatusServiceUnavailable, message: "oops"}
559+
})
511560

512-
exporter := newTestLogsExporter(t, server.URL)
513-
mustSendLogRecords(t, exporter, plog.NewLogRecord())
561+
exporter := newTestLogsExporter(t, server.URL, func(cfg *Config) {
562+
cfg.Retry.Enabled = true
563+
cfg.Retry.RetryOnStatus = []int{http.StatusServiceUnavailable}
564+
cfg.Retry.MaxRetries = maxRetries
565+
cfg.Retry.InitialInterval = 1 * time.Millisecond
566+
cfg.Retry.MaxInterval = 5 * time.Millisecond
567+
568+
// use async bulk indexer
569+
cfg.Batcher.enabledSet = false
570+
})
571+
mustSendLogRecords(t, exporter, plog.NewLogRecord()) // as sync bulk indexer is used, retries are not guaranteed to finish
514572

515-
rec.WaitItems(1)
573+
assert.Eventually(t, func() bool {
574+
return int64(expectedRetries+1) == attempts.Load()
575+
}, time.Second, 5*time.Millisecond)
576+
577+
// assert that it does not retry in async more than expected
578+
time.Sleep(20 * time.Millisecond)
579+
assert.Equal(t, int64(expectedRetries+1), attempts.Load())
580+
assert.Equal(t, 0, rec.countItems())
581+
})
582+
}
516583
})
517584

518585
t.Run("no retry", func(t *testing.T) {

exporter/elasticsearchexporter/integrationtest/datareceiver.go

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,18 @@ const (
5050
TestTracesIndex = "traces-test-idx"
5151
)
5252

53+
type errElasticsearch struct {
54+
httpStatus int
55+
httpDocStatus int
56+
}
57+
58+
func (e errElasticsearch) Error() string {
59+
if e.httpStatus != http.StatusOK {
60+
return fmt.Sprintf("Simulated Elasticsearch returned HTTP status %d", e.httpStatus)
61+
}
62+
return fmt.Sprintf("Simulated Elasticsearch returned document status %d", e.httpDocStatus)
63+
}
64+
5365
type esDataReceiver struct {
5466
testbed.DataReceiverBase
5567
receiver receiver.Logs
@@ -139,23 +151,21 @@ func (es *esDataReceiver) GenConfigYAMLStr() string {
139151
elasticsearch:
140152
endpoints: [%s]
141153
logs_index: %s
142-
logs_dynamic_index:
143-
enabled: false
144154
metrics_index: %s
145-
metrics_dynamic_index:
146-
enabled: false
147155
traces_index: %s
148-
traces_dynamic_index:
149-
enabled: false
150156
sending_queue:
151157
enabled: true
158+
block_on_overflow: true
152159
mapping:
153160
mode: otel
154161
retry:
155162
enabled: true
156163
initial_interval: 100ms
157-
max_interval: 1s
158-
max_requests: 10000`,
164+
max_interval: 500ms
165+
max_retries: 10000
166+
retry_on_status: [429, 503]
167+
timeout: 10m
168+
`,
159169
es.endpoint, TestLogsIndex, TestMetricsIndex, TestTracesIndex,
160170
)
161171

@@ -303,13 +313,22 @@ func (es *mockESReceiver) Start(ctx context.Context, host component.Host) error
303313
case TestTracesIndex:
304314
consumeErr = es.tracesConsumer.ConsumeTraces(context.Background(), emptyTrace)
305315
}
316+
var errES errElasticsearch
306317
if consumeErr != nil {
318+
if !errors.As(consumeErr, &errES) {
319+
// panic to surface test logic error because we only expect error of type errElasticsearch
320+
panic("unknown consume error")
321+
}
322+
if errES.httpStatus != http.StatusOK {
323+
w.WriteHeader(errES.httpStatus)
324+
return
325+
}
307326
response.HasErrors = true
308-
item.Status = http.StatusTooManyRequests
327+
item.Status = errES.httpDocStatus
309328
item.Error.Type = "simulated_es_error"
310329
item.Error.Reason = consumeErr.Error()
330+
itemMap[k] = item
311331
}
312-
itemMap[k] = item
313332
}
314333
}
315334
if jsonErr := json.NewEncoder(w).Encode(response); jsonErr != nil {

exporter/elasticsearchexporter/integrationtest/exporter_test.go

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44
package integrationtest
55

66
import (
7-
"errors"
87
"fmt"
8+
"net/http"
99
"sync/atomic"
1010
"testing"
1111
"time"
@@ -17,7 +17,6 @@ import (
1717
)
1818

1919
func TestExporter(t *testing.T) {
20-
t.Skip("flaky test: https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/40488")
2120
for _, eventType := range []string{"logs", "metrics", "traces"} {
2221
for _, tc := range []struct {
2322
name string
@@ -32,33 +31,36 @@ func TestExporter(t *testing.T) {
3231
// the collector allows durability testing of the ES exporter
3332
// based on the OTEL config used for testing.
3433
restartCollector bool
35-
mockESFailure bool
34+
mockESErr error
3635
}{
3736
{name: "basic"},
38-
{name: "es_intermittent_failure", mockESFailure: true},
37+
{name: "es_intermittent_http_error", mockESErr: errElasticsearch{httpStatus: http.StatusServiceUnavailable}},
38+
{name: "es_intermittent_doc_error", mockESErr: errElasticsearch{httpStatus: http.StatusOK, httpDocStatus: http.StatusTooManyRequests}},
3939

4040
{name: "batcher_enabled", batcherEnabled: ptrTo(true)},
41-
{name: "batcher_enabled_es_intermittent_failure", batcherEnabled: ptrTo(true), mockESFailure: true},
41+
{name: "batcher_enabled_es_intermittent_http_error", batcherEnabled: ptrTo(true), mockESErr: errElasticsearch{httpStatus: http.StatusServiceUnavailable}},
42+
{name: "batcher_enabled_es_intermittent_doc_error", batcherEnabled: ptrTo(true), mockESErr: errElasticsearch{httpStatus: http.StatusOK, httpDocStatus: http.StatusTooManyRequests}},
4243
{name: "batcher_disabled", batcherEnabled: ptrTo(false)},
43-
{name: "batcher_disabled_es_intermittent_failure", batcherEnabled: ptrTo(false), mockESFailure: true},
44+
{name: "batcher_disabled_es_intermittent_http_error", batcherEnabled: ptrTo(false), mockESErr: errElasticsearch{httpStatus: http.StatusServiceUnavailable}},
45+
{name: "batcher_disabled_es_intermittent_doc_error", batcherEnabled: ptrTo(false), mockESErr: errElasticsearch{httpStatus: http.StatusOK, httpDocStatus: http.StatusTooManyRequests}},
4446

4547
/* TODO: Below tests should be enabled after https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/30792 is fixed
4648
{name: "collector_restarts", restartCollector: true},
47-
{name: "collector_restart_with_es_intermittent_failure", mockESFailure: true, restartCollector: true},
49+
{name: "collector_restart_with_es_intermittent_failure", mockESErr: true, restartCollector: true},
4850
*/
4951
} {
5052
t.Run(fmt.Sprintf("%s/%s", eventType, tc.name), func(t *testing.T) {
5153
var opts []dataReceiverOption
5254
if tc.batcherEnabled != nil {
5355
opts = append(opts, withBatcherEnabled(*tc.batcherEnabled))
5456
}
55-
runner(t, eventType, tc.restartCollector, tc.mockESFailure, opts...)
57+
runner(t, eventType, tc.restartCollector, tc.mockESErr, opts...)
5658
})
5759
}
5860
}
5961
}
6062

61-
func runner(t *testing.T, eventType string, restartCollector, mockESFailure bool, opts ...dataReceiverOption) {
63+
func runner(t *testing.T, eventType string, restartCollector bool, mockESErr error, opts ...dataReceiverOption) {
6264
t.Helper()
6365

6466
var (
@@ -102,7 +104,7 @@ func runner(t *testing.T, eventType string, restartCollector, mockESFailure bool
102104
&testbed.CorrectnessResults{},
103105
testbed.WithDecisionFunc(func() error {
104106
if esFailing.Load() {
105-
return errors.New("simulated ES failure")
107+
return mockESErr
106108
}
107109
return nil
108110
}),
@@ -118,7 +120,7 @@ func runner(t *testing.T, eventType string, restartCollector, mockESFailure bool
118120
tc.Sleep(2 * time.Second)
119121

120122
// Fail ES if required and send load.
121-
if mockESFailure {
123+
if mockESErr != nil {
122124
esFailing.Store(true)
123125
tc.Sleep(2 * time.Second)
124126
}

0 commit comments

Comments
 (0)