-
Notifications
You must be signed in to change notification settings - Fork 2k
Expand file tree
/
Copy pathhttp.rs
More file actions
211 lines (180 loc) · 5.84 KB
/
http.rs
File metadata and controls
211 lines (180 loc) · 5.84 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
use async_stream::stream;
use bytes::Buf;
use futures::Stream;
use http_body::{Body as _, Collected};
use hyper::Body;
use indexmap::IndexMap;
use tokio::time;
use url::Url;
use vector_lib::configurable::configurable_component;
use super::BuildResult;
use crate::{
config::{self, Format, ProxyConfig, provider::ProviderConfig},
http::HttpClient,
signal,
tls::{TlsConfig, TlsSettings},
};
/// Request settings.
#[configurable_component]
#[derive(Clone, Debug)]
pub struct RequestConfig {
/// HTTP headers to add to the request.
#[serde(default)]
pub headers: IndexMap<String, String>,
}
impl Default for RequestConfig {
fn default() -> Self {
Self {
headers: IndexMap::new(),
}
}
}
/// Configuration for the `http` provider.
#[configurable_component(provider("http"))]
#[derive(Clone, Debug)]
#[serde(deny_unknown_fields, default)]
pub struct HttpConfig {
/// URL for the HTTP provider.
url: Option<Url>,
#[configurable(derived)]
request: RequestConfig,
/// How often to poll the provider, in seconds.
poll_interval_secs: u64,
#[serde(flatten)]
tls_options: Option<TlsConfig>,
#[configurable(derived)]
#[serde(default, skip_serializing_if = "crate::serde::is_default")]
proxy: ProxyConfig,
/// Which config format expected to be loaded
#[configurable(derived)]
config_format: Format,
}
impl Default for HttpConfig {
fn default() -> Self {
Self {
url: None,
request: RequestConfig::default(),
poll_interval_secs: 30,
tls_options: None,
proxy: Default::default(),
config_format: Format::default(),
}
}
}
/// Makes an HTTP request to the provided endpoint, returning the String body.
async fn http_request(
url: &Url,
tls_options: Option<&TlsConfig>,
headers: &IndexMap<String, String>,
proxy: &ProxyConfig,
) -> Result<bytes::Bytes, &'static str> {
let tls_settings = TlsSettings::from_options(tls_options).map_err(|_| "Invalid TLS options")?;
let http_client =
HttpClient::<Body>::new(tls_settings, proxy).map_err(|_| "Invalid TLS settings")?;
// Build HTTP request.
let mut builder = http::request::Builder::new().uri(url.to_string());
// Augment with headers. These may be required e.g. for authentication to
// private endpoints.
for (header, value) in headers.iter() {
builder = builder.header(header.as_str(), value.as_str());
}
let request = builder
.body(Body::empty())
.map_err(|_| "Couldn't create HTTP request")?;
info!(
message = "Attempting to retrieve configuration.",
url = ?url.as_str()
);
let response = http_client.send(request).await.map_err(|err| {
let message = "HTTP error";
error!(
message = ?message,
error = ?err,
url = ?url.as_str());
message
})?;
info!(message = "Response received.", url = ?url.as_str());
response
.into_body()
.collect()
.await
.map(Collected::to_bytes)
.map_err(|err| {
let message = "Error interpreting response.";
let cause = err.into_cause();
error!(
message = ?message,
error = ?cause);
message
})
}
/// Calls `http_request`, serializing the result to a `ConfigBuilder`.
async fn http_request_to_config_builder(
url: &Url,
tls_options: Option<&TlsConfig>,
headers: &IndexMap<String, String>,
proxy: &ProxyConfig,
config_format: &Format,
) -> BuildResult {
let config_str = http_request(url, tls_options, headers, proxy)
.await
.map_err(|e| vec![e.to_owned()])?;
config::load(config_str.chunk(), *config_format)
}
/// Polls the HTTP endpoint after/every `poll_interval_secs`, returning a stream of `ConfigBuilder`.
fn poll_http(
poll_interval_secs: u64,
url: Url,
tls_options: Option<TlsConfig>,
headers: IndexMap<String, String>,
proxy: ProxyConfig,
config_format: Format,
) -> impl Stream<Item = signal::SignalTo> {
let duration = time::Duration::from_secs(poll_interval_secs);
let mut interval = time::interval_at(time::Instant::now() + duration, duration);
stream! {
loop {
interval.tick().await;
match http_request_to_config_builder(&url, tls_options.as_ref(), &headers, &proxy, &config_format).await {
Ok(config_builder) => yield signal::SignalTo::ReloadFromConfigBuilder(config_builder),
Err(_) => {},
};
info!(
message = "HTTP provider is waiting.",
poll_interval_secs = ?poll_interval_secs,
url = ?url.as_str());
}
}
}
impl ProviderConfig for HttpConfig {
async fn build(&mut self, signal_handler: &mut signal::SignalHandler) -> BuildResult {
let url = self
.url
.take()
.ok_or_else(|| vec!["URL is required for the `http` provider.".to_owned()])?;
let tls_options = self.tls_options.take();
let poll_interval_secs = self.poll_interval_secs;
let request = self.request.clone();
let config_format = self.config_format;
let proxy = ProxyConfig::from_env().merge(&self.proxy);
let config_builder = http_request_to_config_builder(
&url,
tls_options.as_ref(),
&request.headers,
&proxy,
&config_format,
)
.await?;
// Poll for changes to remote configuration.
signal_handler.add(poll_http(
poll_interval_secs,
url,
tls_options,
request.headers.clone(),
proxy.clone(),
config_format,
));
Ok(config_builder)
}
}
impl_generate_config_from_default!(HttpConfig);