Skip to content

Commit f5e40b6

Browse files
authored
enhancement(components): add otlp decoder (#1080)
## Summary <!-- Please provide a brief summary about what this PR does. This should help the reviewers give feedback faster and with higher quality. --> This PR creates the OTLP Decoder. The topology is updated using the new named outputs for the `Relay` such that metrics and logs will be proxied while traces will go to the OTLP Decoder. When `data_plane.otlp.proxy.traces.enabled` is true, traces will be proxied to the trace agent on port 5003 ``` data_plane: enabled: true otlp: enabled: true proxy: enabled: true receiver: protocols: grpc: endpoint: 127.0.0.1:4319 http: endpoint: 127.0.0.1:4320 traces: enabled: false metrics: enabled: true logs: enabled: true ``` ## Change Type - [ ] Bug fix - [x] New feature - [ ] Non-functional (chore, refactoring, docs) - [ ] Performance ## How did you test this PR? <!-- Please how you tested these changes here --> Run ADP and enable OTLP + proxy: ``` data_plane: enabled: true otlp: enabled: true proxy: enabled: true receiver: protocols: grpc: endpoint: 127.0.0.1:4319 http: endpoint: 127.0.0.1:4320 traces: enabled: false metrics: enabled: true logs: enabled: true ``` Send OTLP traces,metrics,logs and see that they show up in datadog. ## References <!-- Please list any issues closed by this PR. --> <!-- - Closes: <issue link> --> <!-- Any other issues or PRs relevant to this PR? Feel free to list them here. -->
1 parent 43d78f4 commit f5e40b6

File tree

13 files changed

+781
-186
lines changed

13 files changed

+781
-186
lines changed

bin/agent-data-plane/src/cli/run.rs

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,13 @@ use saluki_app::{
1212
#[cfg(feature = "python-checks")]
1313
use saluki_components::sources::ChecksConfiguration;
1414
use saluki_components::{
15+
decoders::otlp::OtlpDecoderConfiguration,
1516
destinations::DogStatsDStatisticsConfiguration,
1617
encoders::{
1718
BufferedIncrementalConfiguration, DatadogEventsConfiguration, DatadogLogsConfiguration,
1819
DatadogMetricsConfiguration, DatadogServiceChecksConfiguration, DatadogTraceConfiguration,
1920
},
20-
forwarders::{DatadogConfiguration, TraceAgentForwarderConfiguration},
21+
forwarders::{DatadogConfiguration, OtlpForwarderConfiguration},
2122
relays::otlp::OtlpRelayConfiguration,
2223
sources::{DogStatsDConfiguration, OtlpConfiguration},
2324
transforms::{
@@ -320,7 +321,6 @@ async fn add_baseline_traces_pipeline_to_blueprint(
320321
.error_context("Failed to configure Datadog Traces encoder.")?
321322
.with_environment_provider(env_provider.clone())
322323
.await?;
323-
324324
blueprint
325325
.add_encoder("dd_traces_encode", dd_traces_config)?
326326
.connect_component("dd_out", ["dd_traces_encode"])?;
@@ -406,30 +406,33 @@ fn add_otlp_pipeline_to_blueprint(
406406
env_provider: &ADPEnvironmentProvider,
407407
) -> Result<(), GenericError> {
408408
if dp_config.otlp().proxy().enabled() {
409-
// In proxy mode, we forward OTLP payloads from either the Core Agent or Trace agent, depending on the signal type.
410-
//
411-
// This means that ADP is taking over the typical OTLP Ingest configuration (otlp_config.receiver) while we instruct
412-
// the Core Agent to listen on a _different_ port for receiving OTLP payloads -- the Trace Agent continues to listen
413-
// as it normally would -- and we forward the payloads.
414-
//
415-
// This is why we use a specific override destination for the OTLP payloads that we forward to the Core Agent.
416-
let core_agent_otlp_endpoint = dp_config.otlp().proxy().core_agent_otlp_endpoint().to_string();
417-
409+
let core_agent_otlp_grpc_endpoint = dp_config.otlp().proxy().core_agent_otlp_grpc_endpoint().to_string();
410+
let core_agent_otlp_http_endpoint = dp_config.otlp().proxy().core_agent_otlp_http_endpoint().to_string();
418411
let api_key = config.get_typed::<String>("api_key").expect("API key is required");
419412

420413
let otlp_relay_config = OtlpRelayConfiguration::from_configuration(config)?;
414+
let otlp_decoder_config = OtlpDecoderConfiguration::from_configuration(config)?;
415+
421416
let local_agent_otlp_forwarder_config =
422-
DatadogConfiguration::from_configuration(config)?.with_endpoint_override(core_agent_otlp_endpoint, api_key);
423-
let local_trace_agent_otlp_forwarder_config = TraceAgentForwarderConfiguration::from_configuration(config)?;
417+
OtlpForwarderConfiguration::from_configuration(config, core_agent_otlp_grpc_endpoint)?
418+
.with_endpoint_override(core_agent_otlp_http_endpoint, api_key);
424419

425420
blueprint
426421
// Components.
427422
.add_relay("otlp_relay_in", otlp_relay_config)?
428423
.add_forwarder("local_agent_otlp_out", local_agent_otlp_forwarder_config)?
429-
.add_forwarder("local_trace_agent_otlp_out", local_trace_agent_otlp_forwarder_config)?
430-
// Metrics and logs.
431-
.connect_component("local_agent_otlp_out", ["otlp_relay_in"])?
432-
.connect_component("local_trace_agent_otlp_out", ["otlp_relay_in"])?;
424+
// Metrics and logs to forwarders.
425+
.connect_component("local_agent_otlp_out", ["otlp_relay_in.metrics", "otlp_relay_in.logs"])?;
426+
427+
if dp_config.otlp().proxy().proxy_traces() {
428+
blueprint.connect_component("local_agent_otlp_out", ["otlp_relay_in.traces"])?;
429+
} else {
430+
blueprint
431+
.add_decoder("otlp_traces_decode", otlp_decoder_config)?
432+
// Traces to decoder, then to traces encoder.
433+
.connect_component("otlp_traces_decode", ["otlp_relay_in.traces"])?
434+
.connect_component("dd_traces_encode", ["otlp_traces_decode"])?;
435+
}
433436
} else {
434437
let otlp_config =
435438
OtlpConfiguration::from_configuration(config)?.with_workload_provider(env_provider.workload().clone());

bin/agent-data-plane/src/config.rs

Lines changed: 91 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use saluki_config::GenericConfiguration;
22
use saluki_error::GenericError;
33
use saluki_io::net::ListenAddress;
4+
use tracing::info;
45

56
/// General data plane configuration.
67
#[derive(Clone, Debug)]
@@ -132,8 +133,8 @@ impl DataPlaneConfiguration {
132133
/// pipelines, such as OTLP.
133134
pub const fn traces_pipeline_required(&self) -> bool {
134135
// We consider the traces pipeline to be enabled if:
135-
// - OTLP is enabled and not in proxy mode
136-
self.otlp().enabled() && !self.otlp().proxy().enabled()
136+
// - OTLP is enabled and not in proxy mode or proxy mode is enabled and proxy traces are disabled
137+
self.otlp().enabled() && (!self.otlp().proxy().enabled() || !self.otlp().proxy().proxy_traces())
137138
}
138139
}
139140

@@ -188,7 +189,12 @@ impl DataPlaneOtlpConfiguration {
188189
}
189190

190191
/// OTLP proxying configuration.
192+
///
193+
/// In proxy mode, ADP takes over the normal "OTLP Ingest" endpoints that the Core Agent would typically listen on,
194+
/// so the Core Agent must be configured to listen on a different, separate port than it usually would so that ADP
195+
/// can proxy to it.
191196
#[derive(Clone, Debug)]
197+
#[allow(dead_code)]
192198
pub struct DataPlaneOtlpProxyConfiguration {
193199
/// Whether or not to proxy all signals to the Agent.
194200
///
@@ -198,23 +204,72 @@ pub struct DataPlaneOtlpProxyConfiguration {
198204
/// Defaults to `true`.
199205
enabled: bool,
200206

201-
/// OTLP-specific endpoint on the Core Agent to proxy signals to.
207+
/// OTLP gRPC endpoint on the Core Agent to proxy signals to.
202208
///
203-
/// In proxy mode, ADP takes over the normal "OTLP Ingest" endpoints that the Core Agent would typically listen on,
204-
/// so the Core Agent must be configured to listen on a different, separate port than it usually would so that ADP
205-
/// can proxy to it.
209+
/// Defaults to `http://localhost:4319`.
210+
core_agent_otlp_grpc_endpoint: String,
211+
212+
/// OTLP HTTP endpoint on the Core Agent to proxy signals to.
206213
///
207214
/// Defaults to `http://localhost:4320`.
208-
core_agent_otlp_endpoint: String,
215+
core_agent_otlp_http_endpoint: String,
216+
217+
/// Whether or not to proxy traces to the Core Agent.
218+
///
219+
/// Defaults to `true`.
220+
proxy_traces: bool,
221+
222+
/// Whether or not to proxy metrics to the Core Agent.
223+
///
224+
/// Defaults to `true`.
225+
proxy_metrics: bool,
226+
227+
/// Whether or not to proxy logs to the Core Agent.
228+
///
229+
/// Defaults to `true`.
230+
proxy_logs: bool,
209231
}
210232

211233
impl DataPlaneOtlpProxyConfiguration {
212234
fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
235+
let enabled = config.try_get_typed("data_plane.otlp.proxy.enabled")?.unwrap_or(false);
236+
let core_agent_otlp_grpc_endpoint = config
237+
.try_get_typed("data_plane.otlp.proxy.receiver.protocols.grpc.endpoint")?
238+
.unwrap_or("http://localhost:4319".to_string());
239+
let core_agent_otlp_http_endpoint = config
240+
.try_get_typed("data_plane.otlp.proxy.receiver.protocols.http.endpoint")?
241+
.unwrap_or("http://localhost:4320".to_string());
242+
let proxy_traces = config
243+
.try_get_typed("data_plane.otlp.proxy.traces.enabled")?
244+
.unwrap_or(true);
245+
let proxy_metrics = config
246+
.try_get_typed("data_plane.otlp.proxy.metrics.enabled")?
247+
.unwrap_or(true);
248+
let proxy_logs = config
249+
.try_get_typed("data_plane.otlp.proxy.logs.enabled")?
250+
.unwrap_or(true);
251+
252+
if enabled {
253+
info!(
254+
proxy_enabled = enabled,
255+
core_agent_otlp_grpc_endpoint = %core_agent_otlp_grpc_endpoint,
256+
core_agent_otlp_http_endpoint = %core_agent_otlp_http_endpoint,
257+
proxy_traces = proxy_traces,
258+
proxy_metrics = proxy_metrics,
259+
proxy_logs = proxy_logs,
260+
"OTLP proxy mode enabled. Select OTLP payloads will be proxied to the Core Agent."
261+
);
262+
} else {
263+
info!("OTLP proxy mode disabled. OTLP signals will be handled natively.");
264+
}
265+
213266
Ok(Self {
214-
enabled: config.try_get_typed("data_plane.otlp.proxy.enabled")?.unwrap_or(false),
215-
core_agent_otlp_endpoint: config
216-
.try_get_typed("data_plane.otlp.proxy.core_agent_otlp_endpoint")?
217-
.unwrap_or("http://localhost:4320".to_string()),
267+
enabled,
268+
core_agent_otlp_grpc_endpoint,
269+
core_agent_otlp_http_endpoint,
270+
proxy_traces,
271+
proxy_metrics,
272+
proxy_logs,
218273
})
219274
}
220275

@@ -223,8 +278,30 @@ impl DataPlaneOtlpProxyConfiguration {
223278
self.enabled
224279
}
225280

226-
/// Returns the OTLP endpoint on the Core Agent to proxy signals to.
227-
pub fn core_agent_otlp_endpoint(&self) -> &str {
228-
&self.core_agent_otlp_endpoint
281+
/// Returns the OTLP gRPC endpoint on the Core Agent to proxy signals to.
282+
pub fn core_agent_otlp_grpc_endpoint(&self) -> &str {
283+
&self.core_agent_otlp_grpc_endpoint
284+
}
285+
286+
/// Returns the OTLP HTTP endpoint on the Core Agent to proxy signals to.
287+
pub fn core_agent_otlp_http_endpoint(&self) -> &str {
288+
&self.core_agent_otlp_http_endpoint
289+
}
290+
291+
/// Returns `true` if the OTLP traces should be proxied to the Core Agent.
292+
pub const fn proxy_traces(&self) -> bool {
293+
self.proxy_traces
294+
}
295+
296+
/// Returns `true` if the OTLP metrics should be proxied to the Core Agent.
297+
#[allow(dead_code)]
298+
pub const fn proxy_metrics(&self) -> bool {
299+
self.proxy_metrics
300+
}
301+
302+
/// Returns `true` if the OTLP logs should be proxied to the Core Agent.
303+
#[allow(dead_code)]
304+
pub const fn proxy_logs(&self) -> bool {
305+
self.proxy_logs
229306
}
230307
}

lib/saluki-components/src/common/otlp/config.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,17 @@ pub struct TracesConfig {
200200
/// Corresponds to `otlp_config.traces.probabilistic_sampler` in the Agent.
201201
#[serde(default)]
202202
pub probabilistic_sampler: ProbabilisticSampler,
203+
204+
/// The internal port on the Core Agent to forward traces to.
205+
///
206+
/// Defaults to 5003.
207+
#[serde(default = "default_internal_port")]
208+
#[allow(unused)]
209+
pub internal_port: u16,
210+
}
211+
212+
const fn default_internal_port() -> u16 {
213+
5003
203214
}
204215

205216
/// Configuration for OTLP traces probabilistic sampling.
@@ -243,6 +254,7 @@ impl Default for TracesConfig {
243254
ignore_missing_datadog_fields: false,
244255
enable_otlp_compute_top_level_by_span_kind: default_enable_otlp_compute_top_level_by_span_kind(),
245256
probabilistic_sampler: ProbabilisticSampler::default(),
257+
internal_port: default_internal_port(),
246258
}
247259
}
248260
}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
//! Decoder implementations.
2+
3+
/// OTLP decoder.
4+
pub mod otlp;

0 commit comments

Comments
 (0)