Skip to content

Commit 0f8e9e1

Browse files
committed
feat(prometheus_remote_write sink): add custom HTTP headers support
Add support for custom HTTP headers in the prometheus_remote_write sink via the `request.headers` configuration option. This enables users to add custom headers to outgoing requests for authentication, routing, or other integration requirements with Prometheus-compatible backends.
1 parent 6b807ef commit 0f8e9e1

File tree

6 files changed

+93
-15
lines changed

6 files changed

+93
-15
lines changed
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
The `prometheus_remote_write` sink now supports custom HTTP headers via the `request.headers` configuration option. This allows users to add custom headers to outgoing requests, which is useful for authentication, routing, or other integration requirements with Prometheus-compatible backends.
2+
3+
authors: elohmeier

src/sinks/prometheus/remote_write/config.rs

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,10 @@ use crate::{
1313
UriParseSnafu,
1414
prelude::*,
1515
prometheus::PrometheusRemoteWriteAuth,
16-
util::{auth::Auth, http::http_response_retry_logic},
16+
util::{
17+
auth::Auth,
18+
http::{RequestConfig, http_response_retry_logic},
19+
},
1720
},
1821
};
1922

@@ -79,7 +82,7 @@ pub struct RemoteWriteConfig {
7982

8083
#[configurable(derived)]
8184
#[serde(default)]
82-
pub request: TowerRequestConfig,
85+
pub request: RequestConfig,
8386

8487
/// The tenant ID to send.
8588
///
@@ -141,7 +144,7 @@ impl SinkConfig for RemoteWriteConfig {
141144
async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
142145
let endpoint = self.endpoint.parse::<Uri>().context(UriParseSnafu)?;
143146
let tls_settings = TlsSettings::from_options(self.tls.as_ref())?;
144-
let request_settings = self.request.into_settings();
147+
let request_settings = self.request.tower.into_settings();
145148
let buckets = self.buckets.clone();
146149
let quantiles = self.quantiles.clone();
147150
let default_namespace = self.default_namespace.clone();
@@ -178,11 +181,18 @@ impl SinkConfig for RemoteWriteConfig {
178181
None => None,
179182
};
180183

184+
// Split headers into static and template-based
185+
let (static_headers, _template_headers) = self.request.split_headers();
186+
187+
// Validate that custom headers don't conflict with auth
188+
crate::sinks::util::http::validate_headers(&static_headers)?;
189+
181190
let healthcheck = healthcheck(
182191
client.clone(),
183192
endpoint.clone(),
184193
self.compression,
185194
auth.clone(),
195+
static_headers.clone(),
186196
)
187197
.boxed();
188198

@@ -191,6 +201,7 @@ impl SinkConfig for RemoteWriteConfig {
191201
client,
192202
auth,
193203
compression: self.compression,
204+
headers: static_headers,
194205
};
195206
let service = ServiceBuilder::new()
196207
.settings(request_settings, http_response_retry_logic())
@@ -225,10 +236,19 @@ async fn healthcheck(
225236
endpoint: Uri,
226237
compression: Compression,
227238
auth: Option<Auth>,
239+
headers: std::collections::BTreeMap<String, String>,
228240
) -> crate::Result<()> {
229241
let body = bytes::Bytes::new();
230-
let request =
231-
build_request(http::Method::GET, &endpoint, compression, body, None, auth).await?;
242+
let request = build_request(
243+
http::Method::GET,
244+
&endpoint,
245+
compression,
246+
body,
247+
None,
248+
auth,
249+
headers,
250+
)
251+
.await?;
232252
let response = client.send(request).await?;
233253

234254
match response.status() {

src/sinks/prometheus/remote_write/service.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1-
use std::task::{Context, Poll};
1+
use std::{
2+
collections::BTreeMap,
3+
task::{Context, Poll},
4+
};
25

36
#[cfg(feature = "aws-core")]
47
use aws_credential_types::provider::SharedCredentialsProvider;
@@ -34,6 +37,7 @@ pub(super) struct RemoteWriteService {
3437
pub(super) auth: Option<Auth>,
3538
pub(super) client: HttpClient,
3639
pub(super) compression: super::Compression,
40+
pub(super) headers: BTreeMap<String, String>,
3741
}
3842

3943
impl Service<RemoteWriteRequest> for RemoteWriteService {
@@ -51,6 +55,7 @@ impl Service<RemoteWriteRequest> for RemoteWriteService {
5155
let endpoint = self.endpoint.clone();
5256
let auth = self.auth.clone();
5357
let compression = self.compression;
58+
let headers = self.headers.clone();
5459

5560
Box::pin(async move {
5661
let metadata = std::mem::take(request.metadata_mut());
@@ -64,6 +69,7 @@ impl Service<RemoteWriteRequest> for RemoteWriteService {
6469
request.request,
6570
request.tenant_id.as_ref(),
6671
auth,
72+
headers,
6773
)
6874
.await?;
6975

@@ -106,6 +112,7 @@ pub(super) async fn build_request(
106112
body: Bytes,
107113
tenant_id: Option<&String>,
108114
auth: Option<Auth>,
115+
custom_headers: BTreeMap<String, String>,
109116
) -> crate::Result<http::Request<hyper::Body>> {
110117
let mut builder = http::Request::builder()
111118
.method(method)
@@ -121,6 +128,11 @@ pub(super) async fn build_request(
121128
builder = builder.header(headers::X_SCOPE_ORGID, tenant_id);
122129
}
123130

131+
// Apply custom headers
132+
for (name, value) in custom_headers {
133+
builder = builder.header(name, value);
134+
}
135+
124136
let mut request = builder.body(body)?;
125137

126138
if let Some(auth) = auth {

src/sinks/prometheus/remote_write/tests.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,38 @@ async fn sends_templated_x_scope_orgid_header() {
135135
assert_eq!(orgid.len(), 11);
136136
}
137137

138+
#[tokio::test]
139+
async fn sends_custom_headers() {
140+
let outputs = send_request(
141+
indoc! {r#"
142+
[request.headers]
143+
X-Custom-Header = "custom-value"
144+
X-Another-Header = "another-value"
145+
"#},
146+
vec![create_event("gauge-4".into(), 42.0)],
147+
)
148+
.await;
149+
150+
assert_eq!(outputs.len(), 1);
151+
let (headers, req) = &outputs[0];
152+
153+
// Verify custom headers are present
154+
assert_eq!(headers["x-custom-header"], "custom-value");
155+
assert_eq!(headers["x-another-header"], "another-value");
156+
157+
// Verify standard headers are still present
158+
assert_eq!(headers["x-prometheus-remote-write-version"], "0.1.0");
159+
assert_eq!(headers["content-type"], "application/x-protobuf");
160+
161+
// Verify the metric data is correct
162+
assert_eq!(req.timeseries.len(), 1);
163+
assert_eq!(
164+
req.timeseries[0].labels,
165+
labels!("__name__" => "gauge-4", "production" => "true", "region" => "us-west-1")
166+
);
167+
assert_eq!(req.timeseries[0].samples[0].value, 42.0);
168+
}
169+
138170
#[tokio::test]
139171
async fn retains_state_between_requests() {
140172
// This sink converts all incremental events to absolute, and

website/cue/reference/components/sinks/generated/prometheus_remote_write.cue

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -341,14 +341,8 @@ generated: components: sinks: prometheus_remote_write: configuration: {
341341
}
342342
}
343343
request: {
344-
description: """
345-
Middleware settings for outbound requests.
346-
347-
Various settings can be configured, such as concurrency and rate limits, timeouts, and retry behavior.
348-
349-
Note that the retry backoff policy follows the Fibonacci sequence.
350-
"""
351-
required: false
344+
description: "Outbound HTTP request settings."
345+
required: false
352346
type: object: options: {
353347
adaptive_concurrency: {
354348
description: """
@@ -447,6 +441,23 @@ generated: components: sinks: prometheus_remote_write: configuration: {
447441
uint: {}
448442
}
449443
}
444+
headers: {
445+
description: "Additional HTTP headers to add to every HTTP request."
446+
required: false
447+
type: object: {
448+
examples: [{
449+
Accept: "text/plain"
450+
"X-Event-Level": "{{level}}"
451+
"X-Event-Timestamp": "{{timestamp}}"
452+
"X-My-Custom-Header": "A-Value"
453+
}]
454+
options: "*": {
455+
description: "An HTTP request header and its value. Both header names and values support templating with event data."
456+
required: true
457+
type: string: {}
458+
}
459+
}
460+
}
450461
rate_limit_duration_secs: {
451462
description: "The time window used for the `rate_limit_num` option."
452463
required: false

website/cue/reference/components/sinks/prometheus_remote_write.cue

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ components: sinks: prometheus_remote_write: {
3434
retry_initial_backoff_secs: 1
3535
retry_max_duration_secs: 10
3636
timeout_secs: 60
37-
headers: false
37+
headers: true
3838
}
3939
tls: {
4040
enabled: true

0 commit comments

Comments
 (0)