Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ heim = { git = "https://github.com/vectordotdev/heim.git", branch = "update-nix"
mlua = { version = "0.10.5", default-features = false, features = ["lua54", "send", "vendored", "macros"], optional = true }
sysinfo = "0.37.2"
byteorder = "1.5.0"
brotli = "8.0.2"

[target.'cfg(windows)'.dependencies]
windows-service = "0.8.0"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Added HTTP response body previews to error logs. When an HTTP sink request fails and log level of target `sink-http-response` is set to `DEBUG`/`TRACE`, Vector will now attempt to decompress (gzip, zstd, br, deflate) and log the first 1024 characters of the response body to help troubleshooting.

authors: Keuin
101 changes: 93 additions & 8 deletions src/sinks/util/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@
use aws_credential_types::provider::SharedCredentialsProvider;
#[cfg(feature = "aws-core")]
use aws_types::region::Region;
use brotli::Decompressor as BrotliDecoder;
use bytes::{Buf, Bytes};
use flate2::read::GzDecoder;
use futures::{Sink, future::BoxFuture};
use headers::HeaderName;
use http::{HeaderValue, Request, Response, StatusCode, header};
use http_body::Body as _;
use zstd::stream::read::Decoder as ZstdDecoder;

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct OrderedHeaderName(HeaderName);
Expand Down Expand Up @@ -38,21 +41,24 @@ impl PartialOrd for OrderedHeaderName {
Some(self.cmp(other))
}
}
use flate2::bufread::ZlibDecoder;
use http::header::{CONTENT_ENCODING, CONTENT_TYPE};
use hyper::Body;
use pin_project::pin_project;
use snafu::{ResultExt, Snafu};
use std::io::Read;
use std::{
collections::BTreeMap,
fmt,
future::Future,
hash::Hash,
io,
marker::PhantomData,
pin::Pin,
sync::Arc,
task::{Context, Poll, ready},
time::Duration,
};

use hyper::Body;
use pin_project::pin_project;
use snafu::{ResultExt, Snafu};
use tower::{Service, ServiceBuilder};
use tower_http::decompression::DecompressionLayer;
use vector_lib::{
Expand Down Expand Up @@ -580,13 +586,92 @@ impl<Req: Clone + Send + Sync + 'static> RetryLogic for HttpRetryLogic<Req> {
StatusCode::NOT_IMPLEMENTED => {
RetryAction::DontRetry("endpoint not implemented".into())
}
_ if status.is_server_error() => RetryAction::Retry(
format!("{}: {}", status, String::from_utf8_lossy(response.body())).into(),
),
_ if status.is_success() => RetryAction::Successful,
_ => RetryAction::DontRetry(format!("response status: {status}").into()),
_ => {
const LOG_SINK_HTTP_RESPONSE: &str = "sink-http-response";
if tracing::enabled!(target: LOG_SINK_HTTP_RESPONSE, tracing::Level::DEBUG) {
// gate the response body decoding logic
let body_preview = get_response_preview(response)
.unwrap_or_else(|err| format!("cannot peek: {err:?}"));
debug!(target: LOG_SINK_HTTP_RESPONSE, message = "sink http response", %status, %body_preview);
}
let msg = format!("response status: {status}").into();
if status.is_server_error() {
RetryAction::Retry(msg)
} else {
RetryAction::DontRetry(msg)
}
}
}
}
}

#[derive(Debug, Snafu)]
pub enum ResponsePreviewError {
#[snafu(display("Cannot preview a binary response content: {}", content_type))]
BinaryContent { content_type: String },
#[snafu(display("Unknown encoding of content in HTTP response: {}", content_encoding))]
UnknownEncoding { content_encoding: String },
#[snafu(display("Error reading data: {:?}", err))]
IOError { err: io::Error },
}

/// Try to decompress and read the first 1024 bytes from the HTTP response body.
fn get_response_preview(response: &Response<Bytes>) -> crate::Result<String> {
const BROTLI_INTERNAL_BUFFER_SIZE: usize = 4096;
const PEEK_UTF8_CHARACTERS: usize = 1024;
const UNKNOWN_CONTENT_ENCODING: &str = "unspecified";

// skip binary data
if let Some(content_type) = response
.headers()
.get(CONTENT_TYPE)
.and_then(|v| v.to_str().ok())
{
let is_binary_resp = content_type.starts_with("image/")
|| content_type.starts_with("video/")
|| content_type.starts_with("audio/")
|| content_type == "application/octet-stream";
if is_binary_resp {
return Err(Box::new(ResponsePreviewError::BinaryContent {
content_type: content_type.to_string(),
}));
}
}

let body_bytes = response.body().as_ref();
let content_encoding = response
.headers()
.get(CONTENT_ENCODING)
.and_then(|v| v.to_str().ok())
.unwrap_or_else(|| UNKNOWN_CONTENT_ENCODING);

// handle different compression methods in HTTP response
let mut reader: Box<dyn Read> = match content_encoding {
"gzip" => Box::new(GzDecoder::new(body_bytes)),
"deflate" => Box::new(ZlibDecoder::new(body_bytes)),
"br" => Box::new(BrotliDecoder::new(body_bytes, BROTLI_INTERNAL_BUFFER_SIZE)),
"zstd" => Box::new(
ZstdDecoder::new(body_bytes).unwrap_or_else(|_| ZstdDecoder::new(body_bytes).unwrap()),
),
Comment on lines +651 to +656
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure these would work given we're reading only partial data

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this works because the truncation is applied on decompressed byte stream, not the original compressed data (body_bytes). The decompressors can always read as many bytes as they need.

// unspecified or identity encoding, treat as utf-8 directly
UNKNOWN_CONTENT_ENCODING | "identity" => Box::new(body_bytes),
encoding => {
return Err(Box::new(ResponsePreviewError::UnknownEncoding {
content_encoding: encoding.to_string(),
}));
}
};

// one utf-8 char takes up to 4 bytes, read at most that number of bytes for utf-8 decoding
let mut buf = [0u8; PEEK_UTF8_CHARACTERS * 4];
match reader.read(&mut buf) {
Ok(cnt) => Ok(String::from_utf8_lossy(&buf[..cnt])
.chars()
.take(PEEK_UTF8_CHARACTERS)
.collect()),
Err(why) => Err(Box::new(ResponsePreviewError::IOError { err: why })),
}
}

/// A more generic version of `HttpRetryLogic` that accepts anything that can be converted
Expand Down
4 changes: 4 additions & 0 deletions website/content/en/guides/developer/debugging.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ You can set different verbosity levels for specific components:
VECTOR_LOG=info,vector::sources::aws_s3=warn vector --config path/to/config.yaml
```

There are some standalone log targets which may help you to debug:

* `sink-http-response`: log sinks HTTP response body in `DEBUG` level.

You can find more information on the syntax [here](https://docs.rs/tracing-subscriber/latest/tracing_subscriber/filter/struct.EnvFilter.html#usage-notes).

### Vector Tools
Expand Down
Loading