Skip to content

Commit 65c8651

Browse files
ellatairagh123man
andauthored
[AGNTLOG-392] Logs Agent HTTP Retry with Exponential Backoff (#43500)
### What does this PR do? Implements automatic HTTP transport upgrade under an exponential backoff for the Logs Agent when HTTP connectivity is restored after an initial fallback to TCP, along with improved telemetry. Smart HTTP Retry and Upgrade Mechanism: - When the Logs Agent fails initial HTTP connectivity checks, it falls back to TCP protocol - If configuration permits (no force_use_tcp, socks5_proxy_address, or additional_endpoints set), the agent periodically retries HTTP connectivity - Once HTTP connectivity is verified, the agent automatically upgrades to HTTP transport - Commit to HTTP: After successful HTTP connectivity check, the agent commits to HTTP and continuously retries HTTP transport even if restart attempts fail, ensuring eventual upgrade - Configurable `logs_config.http_connectivity_retry_interval_max` for exponential backoff (`default=1hr`) Telemetry Metrics - `datadog.logs_agent.http_connectivity_check` - Tracks HTTP connectivity check results and TCP fallback events - `datadog.logs_agent.http_connectivity_retry_attempt` - Tracks HTTP connectivity retry attempts - `datadog.logs_agent.restart_attempt` - Tracks logs agent restart attempts by status and transport type ### Motivation https://datadoghq.atlassian.net/browse/AGNTLOG-392 ### Describe how you validated your changes unit tests, local testing (see comment below) ### Additional Notes Co-authored-by: gh123man <[email protected]>
1 parent 3e8624e commit 65c8651

File tree

13 files changed

+597
-51
lines changed

13 files changed

+597
-51
lines changed

comp/core/agenttelemetry/impl/config.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,16 @@ var defaultProfiles = `
227227
- name: logs.encoded_bytes_sent
228228
aggregate_tags:
229229
- compression_kind
230+
- name: logs.http_connectivity_check
231+
aggregate_tags:
232+
- status
233+
- name: logs.http_connectivity_retry_attempt
234+
aggregate_tags:
235+
- status
236+
- name: logs.restart_attempt
237+
aggregate_tags:
238+
- status
239+
- transport
230240
- name: logs.sender_latency
231241
- name: logs.truncated
232242
aggregate_tags:

comp/logs/agent/agentimpl/agent.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,11 @@ type logAgent struct {
134134

135135
// make restart thread safe
136136
restartMutex sync.Mutex
137+
138+
// HTTP retry state for TCP fallback recovery
139+
httpRetryCtx context.Context
140+
httpRetryCancel context.CancelFunc
141+
httpRetryMutex sync.Mutex
137142
}
138143

139144
func newLogsAgent(deps dependencies) provides {
@@ -212,6 +217,11 @@ func (a *logAgent) start(context.Context) error {
212217
}
213218

214219
a.startPipeline()
220+
221+
// If we're currently sending over TCP, attempt restart over HTTP
222+
if !endpoints.UseHTTP {
223+
a.smartHTTPRestart()
224+
}
215225
return nil
216226
}
217227

@@ -302,6 +312,9 @@ func (a *logAgent) stop(context.Context) error {
302312

303313
a.log.Info("Stopping logs-agent")
304314

315+
// Stop HTTP retry loop if running
316+
a.stopHTTPRetry()
317+
305318
status.Clear()
306319

307320
toStop := []startstop.Stoppable{

comp/logs/agent/agentimpl/agent_core_init.go

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"github.com/DataDog/datadog-agent/pkg/logs/launchers/journald"
2727
"github.com/DataDog/datadog-agent/pkg/logs/launchers/listener"
2828
"github.com/DataDog/datadog-agent/pkg/logs/launchers/windowsevent"
29+
"github.com/DataDog/datadog-agent/pkg/logs/metrics"
2930
"github.com/DataDog/datadog-agent/pkg/logs/pipeline"
3031
"github.com/DataDog/datadog-agent/pkg/logs/schedulers"
3132
"github.com/DataDog/datadog-agent/pkg/logs/tailers/file"
@@ -58,12 +59,37 @@ func (a *logAgent) SetupPipeline(processingRules []*config.ProcessingRule, wmeta
5859
// dependent on configuration and connectivity
5960
func buildEndpoints(coreConfig model.Reader) (*config.Endpoints, error) {
6061
httpConnectivity := config.HTTPConnectivityFailure
61-
if endpoints, err := config.BuildHTTPEndpointsWithVectorOverride(coreConfig, intakeTrackType, config.AgentJSONIntakeProtocol, config.DefaultIntakeOrigin); err == nil {
62+
if endpoints, err := buildHTTPEndpointsForConnectivityCheck(coreConfig); err == nil {
6263
httpConnectivity = http.CheckConnectivity(endpoints.Main, coreConfig)
6364
}
65+
66+
// Publish HTTP connectivity check metric
67+
if httpConnectivity == config.HTTPConnectivitySuccess {
68+
metrics.TlmHTTPConnectivityCheck.Inc("success")
69+
} else {
70+
metrics.TlmHTTPConnectivityCheck.Inc("failure")
71+
}
72+
6473
return config.BuildEndpointsWithVectorOverride(coreConfig, httpConnectivity, intakeTrackType, config.AgentJSONIntakeProtocol, config.DefaultIntakeOrigin)
6574
}
6675

76+
// buildHTTPEndpointsForConnectivityCheck builds HTTP endpoints for connectivity testing only
77+
func buildHTTPEndpointsForConnectivityCheck(coreConfig model.Reader) (*config.Endpoints, error) {
78+
return config.BuildHTTPEndpointsWithVectorOverride(coreConfig, intakeTrackType, config.AgentJSONIntakeProtocol, config.DefaultIntakeOrigin)
79+
}
80+
81+
// checkHTTPConnectivityStatus performs an HTTP connectivity check and returns the status
82+
func checkHTTPConnectivityStatus(endpoint config.Endpoint, coreConfig model.Reader) config.HTTPConnectivity {
83+
return http.CheckConnectivity(endpoint, coreConfig)
84+
}
85+
86+
// buildHTTPEndpointsForRestart builds HTTP endpoints for restart without connectivity check
87+
// This is used when we've already verified HTTP connectivity and are upgrading from TCP
88+
func buildHTTPEndpointsForRestart(coreConfig model.Reader) (*config.Endpoints, error) {
89+
// Force HTTP endpoints since we already confirmed connectivity
90+
return config.BuildEndpointsWithVectorOverride(coreConfig, config.HTTPConnectivitySuccess, intakeTrackType, config.AgentJSONIntakeProtocol, config.DefaultIntakeOrigin)
91+
}
92+
6793
// buildPipelineProvider builds a new pipeline provider with the given configuration
6894
func buildPipelineProvider(a *logAgent, processingRules []*config.ProcessingRule, diagnosticMessageReceiver *diagnostic.BufferedMessageReceiver, destinationsCtx *client.DestinationsContext) pipeline.Provider {
6995
pipelineProvider := pipeline.NewProvider(

comp/logs/agent/agentimpl/agent_restart.go

Lines changed: 192 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,23 +13,43 @@ import (
1313
"fmt"
1414
"time"
1515

16-
"github.com/DataDog/datadog-agent/pkg/logs/metrics"
16+
"github.com/DataDog/datadog-agent/comp/logs/agent/config"
17+
logsmetrics "github.com/DataDog/datadog-agent/pkg/logs/metrics"
1718
"github.com/DataDog/datadog-agent/pkg/logs/status"
19+
"github.com/DataDog/datadog-agent/pkg/util/backoff"
1820
"github.com/DataDog/datadog-agent/pkg/util/startstop"
1921
)
2022

21-
// restart conducts a partial restart of the logs-agent pipeline.
22-
// This is used to switch between transport protocols
23-
// without disrupting the entire agent.
24-
func (a *logAgent) restart(context.Context) error {
23+
const (
24+
// Transport types for telemetry
25+
transportTCP = "tcp"
26+
transportHTTP = "http"
27+
28+
// Restart status for telemetry
29+
restartStatusSuccess = "success"
30+
restartStatusFailure = "failure"
31+
)
32+
33+
// restart conducts a partial restart of the logs-agent pipeline with the provided endpoints.
34+
// This is used to switch between transport protocols without disrupting the entire agent.
35+
func (a *logAgent) restart(_ context.Context, newEndpoints *config.Endpoints) error {
2536
a.log.Info("Attempting to restart logs-agent pipeline")
2637

2738
a.restartMutex.Lock()
2839
defer a.restartMutex.Unlock()
2940

41+
// Store current endpoints for rollback if restart fails
42+
previousEndpoints := a.endpoints
43+
44+
// Determine transport type for metrics
45+
targetTransport := transportTCP
46+
if newEndpoints.UseHTTP {
47+
targetTransport = transportHTTP
48+
}
49+
3050
a.log.Info("Gracefully stopping logs-agent")
3151

32-
timeout := time.Duration(a.config.GetInt("logs_config.stop_grace_period")) * time.Second
52+
timeout := a.config.GetDuration("logs_config.stop_grace_period")
3353
_, cancel := context.WithTimeout(context.Background(), timeout)
3454
defer cancel()
3555

@@ -39,24 +59,71 @@ func (a *logAgent) restart(context.Context) error {
3959

4060
a.log.Info("Re-starting logs-agent...")
4161

42-
endpoints, err := buildEndpoints(a.config)
62+
a.endpoints = newEndpoints
63+
64+
err := a.setupAgentForRestart()
65+
if err != nil {
66+
message := fmt.Sprintf("Could not re-start logs-agent: %v", err)
67+
a.log.Error(message)
68+
a.log.Error("Attempting rollback to previous transport")
69+
logsmetrics.TlmRestartAttempt.Inc(restartStatusFailure, targetTransport)
70+
return a.rollbackToPreviousTransport(previousEndpoints)
71+
}
72+
73+
a.restartPipeline()
74+
logsmetrics.TlmRestartAttempt.Inc(restartStatusSuccess, targetTransport)
75+
return nil
76+
}
77+
78+
// restartWithHTTPUpgrade upgrades the logs-agent pipeline to HTTP transport.
79+
// This is called by the smart HTTP restart mechanism after HTTP connectivity has been verified.
80+
//
81+
// Since HTTP connectivity was verified before calling this function, we commit to HTTP
82+
// and will keep retrying HTTP even if the upgrade fails. If restart fails, the base
83+
// restart() function will rollback to TCP temporarily, but this function returns an
84+
// error to trigger retry - ensuring we eventually upgrade to HTTP since connectivity exists.
85+
func (a *logAgent) restartWithHTTPUpgrade(ctx context.Context) error {
86+
// Build HTTP endpoints since we already verified HTTP connectivity
87+
endpoints, err := buildHTTPEndpointsForRestart(a.config)
4388
if err != nil {
44-
message := fmt.Sprintf("Invalid endpoints: %v", err)
89+
message := fmt.Sprintf("Failed to build HTTP endpoints: %v", err)
4590
status.AddGlobalError(invalidEndpoints, message)
91+
a.log.Error(message)
92+
logsmetrics.TlmRestartAttempt.Inc(restartStatusFailure, transportHTTP)
4693
return errors.New(message)
4794
}
4895

49-
a.endpoints = endpoints
96+
err = a.restart(ctx, endpoints)
97+
if err != nil {
98+
// Restart failed (may have rolled back to TCP to keep agent functional)
99+
// Since we verified HTTP connectivity, return error to trigger retry
100+
a.log.Warnf("HTTP upgrade attempt failed: %v - will retry on next attempt", err)
101+
return fmt.Errorf("HTTP upgrade failed: %w", err)
102+
}
103+
104+
a.log.Info("Successfully upgraded to HTTP transport")
105+
return nil
106+
}
50107

51-
err = a.setupAgentForRestart()
108+
// rollbackToPreviousTransport attempts to restore the agent to its previous working state
109+
// after a failed transport switch. This ensures the agent continues functioning
110+
// rather than being left in a broken state.
111+
func (a *logAgent) rollbackToPreviousTransport(previousEndpoints *config.Endpoints) error {
112+
a.log.Warn("Rolling back to previous transport after failed restart")
113+
114+
a.endpoints = previousEndpoints
115+
116+
err := a.setupAgentForRestart()
52117
if err != nil {
53-
message := fmt.Sprintf("Could not re-start logs-agent: %v", err)
118+
// This is a critical failure - we can't recover
119+
message := fmt.Sprintf("CRITICAL: Failed to rollback to previous transport: %v", err)
54120
a.log.Error(message)
55121
return errors.New(message)
56122
}
57123

58124
a.restartPipeline()
59-
return nil
125+
a.log.Info("Successfully rolled back to previous transport")
126+
return errors.New("restart failed, rolled back to previous transport")
60127
}
61128

62129
// setupAgentForRestart configures and rebuilds only the transient components during a restart.
@@ -76,7 +143,7 @@ func (a *logAgent) setupAgentForRestart() error {
76143
// Unlike startPipeline, this only starts the transient components (destinations, pipeline, launchers)
77144
// since persistent components (auditor, schedulers, diagnosticMessageReceiver) remain running.
78145
func (a *logAgent) restartPipeline() {
79-
status.Init(a.started, a.endpoints, a.sources, a.tracker, metrics.LogsExpvars)
146+
status.Init(a.started, a.endpoints, a.sources, a.tracker, logsmetrics.LogsExpvars)
80147

81148
starter := startstop.NewStarter(a.destinationsCtx, a.pipelineProvider, a.launchers)
82149
starter.Start()
@@ -114,3 +181,115 @@ func (a *logAgent) partialStop() error {
114181

115182
return nil
116183
}
184+
185+
// smartHTTPRestart initiates periodic HTTP connectivity checks with exponential backoff
186+
// to automatically upgrade from TCP to HTTP when connectivity is restored.
187+
// This only runs when TCP fallback occurred (not when [force_]use_tcp is configured).
188+
func (a *logAgent) smartHTTPRestart() {
189+
// Check if we're eligible for HTTP retry
190+
if config.ShouldUseTCP(a.config) {
191+
return
192+
}
193+
194+
a.httpRetryMutex.Lock()
195+
// Cancel any existing loop to avoid leaks or duplicate retries
196+
if a.httpRetryCancel != nil {
197+
a.httpRetryCancel()
198+
}
199+
a.httpRetryCtx, a.httpRetryCancel = context.WithCancel(context.Background())
200+
ctx := a.httpRetryCtx
201+
a.httpRetryMutex.Unlock()
202+
203+
a.log.Info("Starting HTTP connectivity retry with exponential backoff")
204+
205+
// Start background goroutine for periodic HTTP checks
206+
go a.httpRetryLoop(ctx)
207+
}
208+
209+
// httpRetryLoop runs periodic HTTP connectivity checks with exponential backoff
210+
// Uses a similar backoff strategy as the TCP connection manager:
211+
// exponential backoff with randomization [2^(n-1), 2^n) seconds, capped at configured max
212+
func (a *logAgent) httpRetryLoop(ctx context.Context) {
213+
maxRetryInterval := config.HTTPConnectivityRetryIntervalMax(a.config)
214+
if maxRetryInterval.Seconds() <= 0 {
215+
a.log.Warn("HTTP connectivity retry interval max set to 0 seconds, skipping HTTP connectivity retry")
216+
return
217+
}
218+
219+
endpoints, err := buildHTTPEndpointsForConnectivityCheck(a.config)
220+
if err != nil {
221+
a.log.Errorf("Failed to build HTTP endpoints: %v", err)
222+
return
223+
}
224+
225+
policy := backoff.NewExpBackoffPolicy(
226+
endpoints.Main.BackoffFactor,
227+
endpoints.Main.BackoffBase,
228+
maxRetryInterval.Seconds(),
229+
endpoints.Main.RecoveryInterval,
230+
endpoints.Main.RecoveryReset,
231+
)
232+
233+
attempt := 0
234+
for {
235+
// Calculate backoff interval similar to connection_manager.go
236+
backoffDuration := policy.GetBackoffDuration(attempt)
237+
238+
a.log.Debugf("Next HTTP connectivity check in %v (attempt %d)", backoffDuration, attempt+1)
239+
240+
select {
241+
case <-time.After(backoffDuration):
242+
attempt++
243+
a.log.Infof("Checking HTTP connectivity (attempt %d)", attempt)
244+
245+
if a.checkHTTPConnectivity() {
246+
a.log.Info("HTTP connectivity restored - initiating upgrade to HTTP transport")
247+
248+
// Trigger HTTP upgrade. Since HTTP connectivity is verified,
249+
// we commit to HTTP and keep retrying if upgrade fails.
250+
if err := a.restartWithHTTPUpgrade(ctx); err != nil {
251+
a.log.Errorf("HTTP upgrade failed: %v - will retry", err)
252+
// Publish retry failure metric
253+
logsmetrics.TlmHTTPConnectivityRetryAttempt.Inc("failure")
254+
// Continue retrying - HTTP is available, we want to use it
255+
continue
256+
}
257+
258+
// Publish retry success metric
259+
logsmetrics.TlmHTTPConnectivityRetryAttempt.Inc("success")
260+
a.log.Info("Successfully upgraded to HTTP transport")
261+
return
262+
}
263+
264+
a.log.Debug("HTTP connectivity check failed - will retry")
265+
266+
case <-ctx.Done():
267+
a.log.Debug("HTTP retry loop stopped")
268+
return
269+
}
270+
}
271+
}
272+
273+
// checkHTTPConnectivity tests if HTTP endpoints are reachable
274+
func (a *logAgent) checkHTTPConnectivity() bool {
275+
endpoints, err := buildHTTPEndpointsForConnectivityCheck(a.config)
276+
if err != nil {
277+
a.log.Debugf("Failed to build HTTP endpoints for connectivity check: %v", err)
278+
return false
279+
}
280+
281+
connectivity := checkHTTPConnectivityStatus(endpoints.Main, a.config)
282+
return connectivity == config.HTTPConnectivitySuccess
283+
}
284+
285+
// stopHTTPRetry stops the HTTP retry loop
286+
func (a *logAgent) stopHTTPRetry() {
287+
a.httpRetryMutex.Lock()
288+
defer a.httpRetryMutex.Unlock()
289+
290+
if a.httpRetryCancel != nil {
291+
a.httpRetryCancel()
292+
a.httpRetryCancel = nil
293+
a.httpRetryCtx = nil
294+
}
295+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
// Unless explicitly stated otherwise all files in this repository are licensed
2+
// under the Apache License Version 2.0.
3+
// This product includes software developed at Datadog (https://www.datadoghq.com/).
4+
// Copyright 2023-present Datadog, Inc.
5+
6+
//go:build serverless
7+
8+
package agentimpl
9+
10+
// smartHTTPRestart is a no-op for serverless builds
11+
func (a *logAgent) smartHTTPRestart() {
12+
// No-op: serverless agents don't need HTTP retry functionality
13+
}
14+
15+
// stopHTTPRetry is a no-op for serverless builds
16+
func (a *logAgent) stopHTTPRetry() {
17+
// No-op: serverless agents don't need HTTP retry functionality
18+
}

0 commit comments

Comments
 (0)