Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 20 additions & 17 deletions bin/agent-data-plane/src/cli/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@ use saluki_app::{
#[cfg(feature = "python-checks")]
use saluki_components::sources::ChecksConfiguration;
use saluki_components::{
decoders::otlp::OtlpDecoderConfiguration,
destinations::DogStatsDStatisticsConfiguration,
encoders::{
BufferedIncrementalConfiguration, DatadogEventsConfiguration, DatadogLogsConfiguration,
DatadogMetricsConfiguration, DatadogServiceChecksConfiguration, DatadogTraceConfiguration,
},
forwarders::{DatadogConfiguration, TraceAgentForwarderConfiguration},
forwarders::{DatadogConfiguration, OtlpForwarderConfiguration},
relays::otlp::OtlpRelayConfiguration,
sources::{DogStatsDConfiguration, OtlpConfiguration},
transforms::{
Expand Down Expand Up @@ -320,7 +321,6 @@ async fn add_baseline_traces_pipeline_to_blueprint(
.error_context("Failed to configure Datadog Traces encoder.")?
.with_environment_provider(env_provider.clone())
.await?;

blueprint
.add_encoder("dd_traces_encode", dd_traces_config)?
.connect_component("dd_out", ["dd_traces_encode"])?;
Expand Down Expand Up @@ -406,30 +406,33 @@ fn add_otlp_pipeline_to_blueprint(
env_provider: &ADPEnvironmentProvider,
) -> Result<(), GenericError> {
if dp_config.otlp().proxy().enabled() {
// In proxy mode, we forward OTLP payloads from either the Core Agent or Trace agent, depending on the signal type.
//
// This means that ADP is taking over the typical OTLP Ingest configuration (otlp_config.receiver) while we instruct
// the Core Agent to listen on a _different_ port for receiving OTLP payloads -- the Trace Agent continues to listen
// as it normally would -- and we forward the payloads.
//
// This is why we use a specific override destination for the OTLP payloads that we forward to the Core Agent.
let core_agent_otlp_endpoint = dp_config.otlp().proxy().core_agent_otlp_endpoint().to_string();

let core_agent_otlp_grpc_endpoint = dp_config.otlp().proxy().core_agent_otlp_grpc_endpoint().to_string();
let core_agent_otlp_http_endpoint = dp_config.otlp().proxy().core_agent_otlp_http_endpoint().to_string();
let api_key = config.get_typed::<String>("api_key").expect("API key is required");

let otlp_relay_config = OtlpRelayConfiguration::from_configuration(config)?;
let otlp_decoder_config = OtlpDecoderConfiguration::from_configuration(config)?;

let local_agent_otlp_forwarder_config =
DatadogConfiguration::from_configuration(config)?.with_endpoint_override(core_agent_otlp_endpoint, api_key);
let local_trace_agent_otlp_forwarder_config = TraceAgentForwarderConfiguration::from_configuration(config)?;
OtlpForwarderConfiguration::from_configuration(config, core_agent_otlp_grpc_endpoint)?
.with_endpoint_override(core_agent_otlp_http_endpoint, api_key);

blueprint
// Components.
.add_relay("otlp_relay_in", otlp_relay_config)?
.add_forwarder("local_agent_otlp_out", local_agent_otlp_forwarder_config)?
.add_forwarder("local_trace_agent_otlp_out", local_trace_agent_otlp_forwarder_config)?
// Metrics and logs.
.connect_component("local_agent_otlp_out", ["otlp_relay_in"])?
.connect_component("local_trace_agent_otlp_out", ["otlp_relay_in"])?;
// Metrics and logs to forwarders.
.connect_component("local_agent_otlp_out", ["otlp_relay_in.metrics", "otlp_relay_in.logs"])?;

if dp_config.otlp().proxy().proxy_traces() {
blueprint.connect_component("local_agent_otlp_out", ["otlp_relay_in.traces"])?;
} else {
blueprint
.add_decoder("otlp_traces_decode", otlp_decoder_config)?
// Traces to decoder, then to traces encoder.
.connect_component("otlp_traces_decode", ["otlp_relay_in.traces"])?
.connect_component("dd_traces_encode", ["otlp_traces_decode"])?;
}
} else {
let otlp_config =
OtlpConfiguration::from_configuration(config)?.with_workload_provider(env_provider.workload().clone());
Expand Down
105 changes: 91 additions & 14 deletions bin/agent-data-plane/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use saluki_config::GenericConfiguration;
use saluki_error::GenericError;
use saluki_io::net::ListenAddress;
use tracing::info;

/// General data plane configuration.
#[derive(Clone, Debug)]
Expand Down Expand Up @@ -132,8 +133,8 @@ impl DataPlaneConfiguration {
/// pipelines, such as OTLP.
pub const fn traces_pipeline_required(&self) -> bool {
// We consider the traces pipeline to be enabled if:
// - OTLP is enabled and not in proxy mode
self.otlp().enabled() && !self.otlp().proxy().enabled()
// - OTLP is enabled and not in proxy mode or proxy mode is enabled and proxy traces are disabled
self.otlp().enabled() && (!self.otlp().proxy().enabled() || !self.otlp().proxy().proxy_traces())
}
}

Expand Down Expand Up @@ -188,7 +189,12 @@ impl DataPlaneOtlpConfiguration {
}

/// OTLP proxying configuration.
///
/// In proxy mode, ADP takes over the normal "OTLP Ingest" endpoints that the Core Agent would typically listen on,
/// so the Core Agent must be configured to listen on a different, separate port than it usually would so that ADP
/// can proxy to it.
#[derive(Clone, Debug)]
#[allow(dead_code)]
pub struct DataPlaneOtlpProxyConfiguration {
/// Whether or not to proxy all signals to the Agent.
///
Expand All @@ -198,23 +204,72 @@ pub struct DataPlaneOtlpProxyConfiguration {
/// Defaults to `true`.
enabled: bool,

/// OTLP-specific endpoint on the Core Agent to proxy signals to.
/// OTLP gRPC endpoint on the Core Agent to proxy signals to.
///
/// In proxy mode, ADP takes over the normal "OTLP Ingest" endpoints that the Core Agent would typically listen on,
/// so the Core Agent must be configured to listen on a different, separate port than it usually would so that ADP
/// can proxy to it.
/// Defaults to `http://localhost:4319`.
core_agent_otlp_grpc_endpoint: String,

/// OTLP HTTP endpoint on the Core Agent to proxy signals to.
///
/// Defaults to `http://localhost:4320`.
core_agent_otlp_endpoint: String,
core_agent_otlp_http_endpoint: String,

/// Whether or not to proxy traces to the Core Agent.
///
/// Defaults to `true`.
proxy_traces: bool,

/// Whether or not to proxy metrics to the Core Agent.
///
/// Defaults to `true`.
proxy_metrics: bool,

/// Whether or not to proxy logs to the Core Agent.
///
/// Defaults to `true`.
proxy_logs: bool,
}

impl DataPlaneOtlpProxyConfiguration {
fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
let enabled = config.try_get_typed("data_plane.otlp.proxy.enabled")?.unwrap_or(false);
let core_agent_otlp_grpc_endpoint = config
.try_get_typed("data_plane.otlp.proxy.receiver.protocols.grpc.endpoint")?
.unwrap_or("http://localhost:4319".to_string());
let core_agent_otlp_http_endpoint = config
.try_get_typed("data_plane.otlp.proxy.receiver.protocols.http.endpoint")?
.unwrap_or("http://localhost:4320".to_string());
let proxy_traces = config
.try_get_typed("data_plane.otlp.proxy.traces.enabled")?
.unwrap_or(true);
let proxy_metrics = config
.try_get_typed("data_plane.otlp.proxy.metrics.enabled")?
.unwrap_or(true);
let proxy_logs = config
.try_get_typed("data_plane.otlp.proxy.logs.enabled")?
.unwrap_or(true);

if enabled {
info!(
proxy_enabled = enabled,
core_agent_otlp_grpc_endpoint = %core_agent_otlp_grpc_endpoint,
core_agent_otlp_http_endpoint = %core_agent_otlp_http_endpoint,
proxy_traces = proxy_traces,
proxy_metrics = proxy_metrics,
proxy_logs = proxy_logs,
"OTLP proxy mode enabled. Select OTLP payloads will be proxied to the Core Agent."
);
} else {
info!("OTLP proxy mode disabled. OTLP signals will be handled natively.");
}

Ok(Self {
enabled: config.try_get_typed("data_plane.otlp.proxy.enabled")?.unwrap_or(false),
core_agent_otlp_endpoint: config
.try_get_typed("data_plane.otlp.proxy.core_agent_otlp_endpoint")?
.unwrap_or("http://localhost:4320".to_string()),
enabled,
core_agent_otlp_grpc_endpoint,
core_agent_otlp_http_endpoint,
proxy_traces,
proxy_metrics,
proxy_logs,
})
}

Expand All @@ -223,8 +278,30 @@ impl DataPlaneOtlpProxyConfiguration {
self.enabled
}

/// Returns the OTLP endpoint on the Core Agent to proxy signals to.
pub fn core_agent_otlp_endpoint(&self) -> &str {
&self.core_agent_otlp_endpoint
/// Returns the OTLP gRPC endpoint on the Core Agent to proxy signals to.
pub fn core_agent_otlp_grpc_endpoint(&self) -> &str {
&self.core_agent_otlp_grpc_endpoint
}

/// Returns the OTLP HTTP endpoint on the Core Agent to proxy signals to.
pub fn core_agent_otlp_http_endpoint(&self) -> &str {
&self.core_agent_otlp_http_endpoint
}

/// Returns `true` if the OTLP traces should be proxied to the Core Agent.
pub const fn proxy_traces(&self) -> bool {
self.proxy_traces
}

/// Returns `true` if the OTLP metrics should be proxied to the Core Agent.
#[allow(dead_code)]
pub const fn proxy_metrics(&self) -> bool {
self.proxy_metrics
}

/// Returns `true` if the OTLP logs should be proxied to the Core Agent.
#[allow(dead_code)]
pub const fn proxy_logs(&self) -> bool {
self.proxy_logs
}
}
12 changes: 12 additions & 0 deletions lib/saluki-components/src/common/otlp/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,17 @@ pub struct TracesConfig {
/// Corresponds to `otlp_config.traces.probabilistic_sampler` in the Agent.
#[serde(default)]
pub probabilistic_sampler: ProbabilisticSampler,

/// The internal port on the Core Agent to forward traces to.
///
/// Defaults to 5003.
#[serde(default = "default_internal_port")]
#[allow(unused)]
pub internal_port: u16,
}

const fn default_internal_port() -> u16 {
5003
}

/// Configuration for OTLP traces probabilistic sampling.
Expand Down Expand Up @@ -243,6 +254,7 @@ impl Default for TracesConfig {
ignore_missing_datadog_fields: false,
enable_otlp_compute_top_level_by_span_kind: default_enable_otlp_compute_top_level_by_span_kind(),
probabilistic_sampler: ProbabilisticSampler::default(),
internal_port: default_internal_port(),
}
}
}
4 changes: 4 additions & 0 deletions lib/saluki-components/src/decoders/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
//! Decoder implementations.

/// OTLP decoder.
pub mod otlp;
Loading