Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions comp/core/agenttelemetry/impl/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,16 @@ var defaultProfiles = `
- name: logs.encoded_bytes_sent
aggregate_tags:
- compression_kind
- name: logs.http_connectivity_check
aggregate_tags:
- status
- name: logs.http_connectivity_retry_attempt
aggregate_tags:
- status
- name: logs.restart_attempt
aggregate_tags:
- status
- transport
- name: logs.sender_latency
- name: logs.truncated
aggregate_tags:
Expand Down
13 changes: 13 additions & 0 deletions comp/logs/agent/agentimpl/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,11 @@ type logAgent struct {

// make restart thread safe
restartMutex sync.Mutex

// HTTP retry state for TCP fallback recovery
httpRetryCtx context.Context
httpRetryCancel context.CancelFunc
httpRetryMutex sync.Mutex
}

func newLogsAgent(deps dependencies) provides {
Expand Down Expand Up @@ -212,6 +217,11 @@ func (a *logAgent) start(context.Context) error {
}

a.startPipeline()

// If we're currently sending over TCP, attempt restart over HTTP
if !endpoints.UseHTTP {
a.smartHTTPRestart()
}
return nil
}

Expand Down Expand Up @@ -302,6 +312,9 @@ func (a *logAgent) stop(context.Context) error {

a.log.Info("Stopping logs-agent")

// Stop HTTP retry loop if running
a.stopHTTPRetry()

status.Clear()

toStop := []startstop.Stoppable{
Expand Down
28 changes: 27 additions & 1 deletion comp/logs/agent/agentimpl/agent_core_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/DataDog/datadog-agent/pkg/logs/launchers/journald"
"github.com/DataDog/datadog-agent/pkg/logs/launchers/listener"
"github.com/DataDog/datadog-agent/pkg/logs/launchers/windowsevent"
"github.com/DataDog/datadog-agent/pkg/logs/metrics"
"github.com/DataDog/datadog-agent/pkg/logs/pipeline"
"github.com/DataDog/datadog-agent/pkg/logs/schedulers"
"github.com/DataDog/datadog-agent/pkg/logs/tailers/file"
Expand Down Expand Up @@ -58,12 +59,37 @@ func (a *logAgent) SetupPipeline(processingRules []*config.ProcessingRule, wmeta
// dependent on configuration and connectivity
func buildEndpoints(coreConfig model.Reader) (*config.Endpoints, error) {
httpConnectivity := config.HTTPConnectivityFailure
if endpoints, err := config.BuildHTTPEndpointsWithVectorOverride(coreConfig, intakeTrackType, config.AgentJSONIntakeProtocol, config.DefaultIntakeOrigin); err == nil {
if endpoints, err := buildHTTPEndpointsForConnectivityCheck(coreConfig); err == nil {
httpConnectivity = http.CheckConnectivity(endpoints.Main, coreConfig)
}

// Publish HTTP connectivity check metric
if httpConnectivity == config.HTTPConnectivitySuccess {
metrics.TlmHTTPConnectivityCheck.Inc("success")
} else {
metrics.TlmHTTPConnectivityCheck.Inc("failure")
}

return config.BuildEndpointsWithVectorOverride(coreConfig, httpConnectivity, intakeTrackType, config.AgentJSONIntakeProtocol, config.DefaultIntakeOrigin)
}

// buildHTTPEndpointsForConnectivityCheck builds HTTP endpoints for connectivity testing only
func buildHTTPEndpointsForConnectivityCheck(coreConfig model.Reader) (*config.Endpoints, error) {
return config.BuildHTTPEndpointsWithVectorOverride(coreConfig, intakeTrackType, config.AgentJSONIntakeProtocol, config.DefaultIntakeOrigin)
}

// checkHTTPConnectivityStatus performs an HTTP connectivity check and returns the status
func checkHTTPConnectivityStatus(endpoint config.Endpoint, coreConfig model.Reader) config.HTTPConnectivity {
return http.CheckConnectivity(endpoint, coreConfig)
}

// buildHTTPEndpointsForRestart builds HTTP endpoints for restart without connectivity check
// This is used when we've already verified HTTP connectivity and are upgrading from TCP
func buildHTTPEndpointsForRestart(coreConfig model.Reader) (*config.Endpoints, error) {
// Force HTTP endpoints since we already confirmed connectivity
return config.BuildEndpointsWithVectorOverride(coreConfig, config.HTTPConnectivitySuccess, intakeTrackType, config.AgentJSONIntakeProtocol, config.DefaultIntakeOrigin)
}

// buildPipelineProvider builds a new pipeline provider with the given configuration
func buildPipelineProvider(a *logAgent, processingRules []*config.ProcessingRule, diagnosticMessageReceiver *diagnostic.BufferedMessageReceiver, destinationsCtx *client.DestinationsContext) pipeline.Provider {
pipelineProvider := pipeline.NewProvider(
Expand Down
205 changes: 192 additions & 13 deletions comp/logs/agent/agentimpl/agent_restart.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,43 @@ import (
"fmt"
"time"

"github.com/DataDog/datadog-agent/pkg/logs/metrics"
"github.com/DataDog/datadog-agent/comp/logs/agent/config"
logsmetrics "github.com/DataDog/datadog-agent/pkg/logs/metrics"
"github.com/DataDog/datadog-agent/pkg/logs/status"
"github.com/DataDog/datadog-agent/pkg/util/backoff"
"github.com/DataDog/datadog-agent/pkg/util/startstop"
)

// restart conducts a partial restart of the logs-agent pipeline.
// This is used to switch between transport protocols
// without disrupting the entire agent.
func (a *logAgent) restart(context.Context) error {
const (
// Transport types for telemetry
transportTCP = "tcp"
transportHTTP = "http"

// Restart status for telemetry
restartStatusSuccess = "success"
restartStatusFailure = "failure"
)

// restart conducts a partial restart of the logs-agent pipeline with the provided endpoints.
// This is used to switch between transport protocols without disrupting the entire agent.
func (a *logAgent) restart(_ context.Context, newEndpoints *config.Endpoints) error {
a.log.Info("Attempting to restart logs-agent pipeline")

a.restartMutex.Lock()
defer a.restartMutex.Unlock()

// Store current endpoints for rollback if restart fails
previousEndpoints := a.endpoints

// Determine transport type for metrics
targetTransport := transportTCP
if newEndpoints.UseHTTP {
targetTransport = transportHTTP
}

a.log.Info("Gracefully stopping logs-agent")

timeout := time.Duration(a.config.GetInt("logs_config.stop_grace_period")) * time.Second
timeout := a.config.GetDuration("logs_config.stop_grace_period")
_, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

Expand All @@ -39,24 +59,71 @@ func (a *logAgent) restart(context.Context) error {

a.log.Info("Re-starting logs-agent...")

endpoints, err := buildEndpoints(a.config)
a.endpoints = newEndpoints

err := a.setupAgentForRestart()
if err != nil {
message := fmt.Sprintf("Could not re-start logs-agent: %v", err)
a.log.Error(message)
a.log.Error("Attempting rollback to previous transport")
logsmetrics.TlmRestartAttempt.Inc(restartStatusFailure, targetTransport)
return a.rollbackToPreviousTransport(previousEndpoints)
}

a.restartPipeline()
logsmetrics.TlmRestartAttempt.Inc(restartStatusSuccess, targetTransport)
return nil
}

// restartWithHTTPUpgrade upgrades the logs-agent pipeline to HTTP transport.
// This is called by the smart HTTP restart mechanism after HTTP connectivity has been verified.
//
// Since HTTP connectivity was verified before calling this function, we commit to HTTP
// and will keep retrying HTTP even if the upgrade fails. If restart fails, the base
// restart() function will rollback to TCP temporarily, but this function returns an
// error to trigger retry - ensuring we eventually upgrade to HTTP since connectivity exists.
func (a *logAgent) restartWithHTTPUpgrade(ctx context.Context) error {
// Build HTTP endpoints since we already verified HTTP connectivity
endpoints, err := buildHTTPEndpointsForRestart(a.config)
if err != nil {
message := fmt.Sprintf("Invalid endpoints: %v", err)
message := fmt.Sprintf("Failed to build HTTP endpoints: %v", err)
status.AddGlobalError(invalidEndpoints, message)
a.log.Error(message)
logsmetrics.TlmRestartAttempt.Inc(restartStatusFailure, transportHTTP)
return errors.New(message)
}

a.endpoints = endpoints
err = a.restart(ctx, endpoints)
if err != nil {
// Restart failed (may have rolled back to TCP to keep agent functional)
// Since we verified HTTP connectivity, return error to trigger retry
a.log.Warnf("HTTP upgrade attempt failed: %v - will retry on next attempt", err)
return fmt.Errorf("HTTP upgrade failed: %w", err)
}

a.log.Info("Successfully upgraded to HTTP transport")
return nil
}

err = a.setupAgentForRestart()
// rollbackToPreviousTransport attempts to restore the agent to its previous working state
// after a failed transport switch. This ensures the agent continues functioning
// rather than being left in a broken state.
func (a *logAgent) rollbackToPreviousTransport(previousEndpoints *config.Endpoints) error {
a.log.Warn("Rolling back to previous transport after failed restart")

a.endpoints = previousEndpoints

err := a.setupAgentForRestart()
if err != nil {
message := fmt.Sprintf("Could not re-start logs-agent: %v", err)
// This is a critical failure - we can't recover
message := fmt.Sprintf("CRITICAL: Failed to rollback to previous transport: %v", err)
a.log.Error(message)
return errors.New(message)
}

a.restartPipeline()
return nil
a.log.Info("Successfully rolled back to previous transport")
return errors.New("restart failed, rolled back to previous transport")
}

// setupAgentForRestart configures and rebuilds only the transient components during a restart.
Expand All @@ -76,7 +143,7 @@ func (a *logAgent) setupAgentForRestart() error {
// Unlike startPipeline, this only starts the transient components (destinations, pipeline, launchers)
// since persistent components (auditor, schedulers, diagnosticMessageReceiver) remain running.
func (a *logAgent) restartPipeline() {
status.Init(a.started, a.endpoints, a.sources, a.tracker, metrics.LogsExpvars)
status.Init(a.started, a.endpoints, a.sources, a.tracker, logsmetrics.LogsExpvars)

starter := startstop.NewStarter(a.destinationsCtx, a.pipelineProvider, a.launchers)
starter.Start()
Expand Down Expand Up @@ -114,3 +181,115 @@ func (a *logAgent) partialStop() error {

return nil
}

// smartHTTPRestart initiates periodic HTTP connectivity checks with exponential backoff
// to automatically upgrade from TCP to HTTP when connectivity is restored.
// This only runs when TCP fallback occurred (not when [force_]use_tcp is configured).
func (a *logAgent) smartHTTPRestart() {
// Check if we're eligible for HTTP retry
if config.ShouldUseTCP(a.config) {
return
}

a.httpRetryMutex.Lock()
// Cancel any existing loop to avoid leaks or duplicate retries
if a.httpRetryCancel != nil {
a.httpRetryCancel()
}
a.httpRetryCtx, a.httpRetryCancel = context.WithCancel(context.Background())
ctx := a.httpRetryCtx
a.httpRetryMutex.Unlock()

a.log.Info("Starting HTTP connectivity retry with exponential backoff")

// Start background goroutine for periodic HTTP checks
go a.httpRetryLoop(ctx)
}

// httpRetryLoop runs periodic HTTP connectivity checks with exponential backoff
// Uses a similar backoff strategy as the TCP connection manager:
// exponential backoff with randomization [2^(n-1), 2^n) seconds, capped at configured max
func (a *logAgent) httpRetryLoop(ctx context.Context) {
maxRetryInterval := config.HTTPConnectivityRetryIntervalMax(a.config)
if maxRetryInterval.Seconds() <= 0 {
a.log.Warn("HTTP connectivity retry interval max set to 0 seconds, skipping HTTP connectivity retry")
return
}

endpoints, err := buildHTTPEndpointsForConnectivityCheck(a.config)
if err != nil {
a.log.Errorf("Failed to build HTTP endpoints: %v", err)
return
}

policy := backoff.NewExpBackoffPolicy(
endpoints.Main.BackoffFactor,
endpoints.Main.BackoffBase,
maxRetryInterval.Seconds(),
endpoints.Main.RecoveryInterval,
endpoints.Main.RecoveryReset,
)

attempt := 0
for {
// Calculate backoff interval similar to connection_manager.go
backoffDuration := policy.GetBackoffDuration(attempt)

a.log.Debugf("Next HTTP connectivity check in %v (attempt %d)", backoffDuration, attempt+1)

select {
case <-time.After(backoffDuration):
attempt++
a.log.Infof("Checking HTTP connectivity (attempt %d)", attempt)

if a.checkHTTPConnectivity() {
a.log.Info("HTTP connectivity restored - initiating upgrade to HTTP transport")

// Trigger HTTP upgrade. Since HTTP connectivity is verified,
// we commit to HTTP and keep retrying if upgrade fails.
if err := a.restartWithHTTPUpgrade(ctx); err != nil {
a.log.Errorf("HTTP upgrade failed: %v - will retry", err)
// Publish retry failure metric
logsmetrics.TlmHTTPConnectivityRetryAttempt.Inc("failure")
// Continue retrying - HTTP is available, we want to use it
continue
}

// Publish retry success metric
logsmetrics.TlmHTTPConnectivityRetryAttempt.Inc("success")
a.log.Info("Successfully upgraded to HTTP transport")
return
}

a.log.Debug("HTTP connectivity check failed - will retry")

case <-ctx.Done():
a.log.Debug("HTTP retry loop stopped")
return
}
}
}

// checkHTTPConnectivity tests if HTTP endpoints are reachable
func (a *logAgent) checkHTTPConnectivity() bool {
endpoints, err := buildHTTPEndpointsForConnectivityCheck(a.config)
if err != nil {
a.log.Debugf("Failed to build HTTP endpoints for connectivity check: %v", err)
return false
}

connectivity := checkHTTPConnectivityStatus(endpoints.Main, a.config)
return connectivity == config.HTTPConnectivitySuccess
}

// stopHTTPRetry stops the HTTP retry loop
func (a *logAgent) stopHTTPRetry() {
a.httpRetryMutex.Lock()
defer a.httpRetryMutex.Unlock()

if a.httpRetryCancel != nil {
a.httpRetryCancel()
a.httpRetryCancel = nil
a.httpRetryCtx = nil
}
}
18 changes: 18 additions & 0 deletions comp/logs/agent/agentimpl/agent_restart_serverless.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2023-present Datadog, Inc.

//go:build serverless

package agentimpl

// smartHTTPRestart is a no-op for serverless builds
func (a *logAgent) smartHTTPRestart() {
// No-op: serverless agents don't need HTTP retry functionality
}

// stopHTTPRetry is a no-op for serverless builds
func (a *logAgent) stopHTTPRetry() {
// No-op: serverless agents don't need HTTP retry functionality
}
Loading