diff --git a/Cargo.lock b/Cargo.lock index bb5478d575f5c..ac8cfb7d0fb8e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2151,9 +2151,9 @@ dependencies = [ [[package]] name = "brotli" -version = "8.0.0" +version = "8.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf19e729cdbd51af9a397fb9ef8ac8378007b797f8273cfbfdf45dcaa316167b" +checksum = "4bd8b9603c7aa97359dbd97ecf258968c95f3adddd6db2f7e7a5bef101c84560" dependencies = [ "alloc-no-stdlib", "alloc-stdlib", @@ -12558,6 +12558,7 @@ dependencies = [ "base64 0.22.1", "bloomy", "bollard", + "brotli", "byteorder", "bytes 1.10.1", "bytesize", diff --git a/Cargo.toml b/Cargo.toml index 26e0f5b355f4c..8286a92a416c8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -438,6 +438,7 @@ heim = { git = "https://github.com/vectordotdev/heim.git", branch = "update-nix" mlua = { version = "0.10.5", default-features = false, features = ["lua54", "send", "vendored", "macros"], optional = true } sysinfo = "0.37.2" byteorder = "1.5.0" +brotli = "8.0.2" [target.'cfg(windows)'.dependencies] windows-service = "0.8.0" diff --git a/changelog.d/display_sink_response_error_message.enhancement.md b/changelog.d/display_sink_response_error_message.enhancement.md new file mode 100644 index 0000000000000..10e7396f48b79 --- /dev/null +++ b/changelog.d/display_sink_response_error_message.enhancement.md @@ -0,0 +1,3 @@ +Added HTTP response body previews to error logs. When an HTTP sink request fails and log level of target `sink-http-response` is set to `DEBUG`/`TRACE`, Vector will now attempt to decompress (gzip, zstd, br, deflate) and log the first 1024 characters of the response body to help troubleshooting. + +authors: Keuin diff --git a/src/sinks/util/http.rs b/src/sinks/util/http.rs index e21ef48e4ed66..e5f7259faf0fb 100644 --- a/src/sinks/util/http.rs +++ b/src/sinks/util/http.rs @@ -2,11 +2,14 @@ use aws_credential_types::provider::SharedCredentialsProvider; #[cfg(feature = "aws-core")] use aws_types::region::Region; +use brotli::Decompressor as BrotliDecoder; use bytes::{Buf, Bytes}; +use flate2::read::GzDecoder; use futures::{Sink, future::BoxFuture}; use headers::HeaderName; use http::{HeaderValue, Request, Response, StatusCode, header}; use http_body::Body as _; +use zstd::stream::read::Decoder as ZstdDecoder; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct OrderedHeaderName(HeaderName); @@ -38,21 +41,24 @@ impl PartialOrd for OrderedHeaderName { Some(self.cmp(other)) } } +use flate2::bufread::ZlibDecoder; +use http::header::{CONTENT_ENCODING, CONTENT_TYPE}; +use hyper::Body; +use pin_project::pin_project; +use snafu::{ResultExt, Snafu}; +use std::io::Read; use std::{ collections::BTreeMap, fmt, future::Future, hash::Hash, + io, marker::PhantomData, pin::Pin, sync::Arc, task::{Context, Poll, ready}, time::Duration, }; - -use hyper::Body; -use pin_project::pin_project; -use snafu::{ResultExt, Snafu}; use tower::{Service, ServiceBuilder}; use tower_http::decompression::DecompressionLayer; use vector_lib::{ @@ -580,13 +586,92 @@ impl RetryLogic for HttpRetryLogic { StatusCode::NOT_IMPLEMENTED => { RetryAction::DontRetry("endpoint not implemented".into()) } - _ if status.is_server_error() => RetryAction::Retry( - format!("{}: {}", status, String::from_utf8_lossy(response.body())).into(), - ), _ if status.is_success() => RetryAction::Successful, - _ => RetryAction::DontRetry(format!("response status: {status}").into()), + _ => { + const LOG_SINK_HTTP_RESPONSE: &str = "sink-http-response"; + if tracing::enabled!(target: LOG_SINK_HTTP_RESPONSE, tracing::Level::DEBUG) { + // gate the response body decoding logic + let body_preview = get_response_preview(response) + .unwrap_or_else(|err| format!("cannot peek: {err:?}")); + debug!(target: LOG_SINK_HTTP_RESPONSE, message = "sink http response", %status, %body_preview); + } + let msg = format!("response status: {status}").into(); + if status.is_server_error() { + RetryAction::Retry(msg) + } else { + RetryAction::DontRetry(msg) + } + } + } + } +} + +#[derive(Debug, Snafu)] +pub enum ResponsePreviewError { + #[snafu(display("Cannot preview a binary response content: {}", content_type))] + BinaryContent { content_type: String }, + #[snafu(display("Unknown encoding of content in HTTP response: {}", content_encoding))] + UnknownEncoding { content_encoding: String }, + #[snafu(display("Error reading data: {:?}", err))] + IOError { err: io::Error }, +} + +/// Try to decompress and read the first 1024 bytes from the HTTP response body. +fn get_response_preview(response: &Response) -> crate::Result { + const BROTLI_INTERNAL_BUFFER_SIZE: usize = 4096; + const PEEK_UTF8_CHARACTERS: usize = 1024; + const UNKNOWN_CONTENT_ENCODING: &str = "unspecified"; + + // skip binary data + if let Some(content_type) = response + .headers() + .get(CONTENT_TYPE) + .and_then(|v| v.to_str().ok()) + { + let is_binary_resp = content_type.starts_with("image/") + || content_type.starts_with("video/") + || content_type.starts_with("audio/") + || content_type == "application/octet-stream"; + if is_binary_resp { + return Err(Box::new(ResponsePreviewError::BinaryContent { + content_type: content_type.to_string(), + })); } } + + let body_bytes = response.body().as_ref(); + let content_encoding = response + .headers() + .get(CONTENT_ENCODING) + .and_then(|v| v.to_str().ok()) + .unwrap_or_else(|| UNKNOWN_CONTENT_ENCODING); + + // handle different compression methods in HTTP response + let mut reader: Box = match content_encoding { + "gzip" => Box::new(GzDecoder::new(body_bytes)), + "deflate" => Box::new(ZlibDecoder::new(body_bytes)), + "br" => Box::new(BrotliDecoder::new(body_bytes, BROTLI_INTERNAL_BUFFER_SIZE)), + "zstd" => Box::new( + ZstdDecoder::new(body_bytes).unwrap_or_else(|_| ZstdDecoder::new(body_bytes).unwrap()), + ), + // unspecified or identity encoding, treat as utf-8 directly + UNKNOWN_CONTENT_ENCODING | "identity" => Box::new(body_bytes), + encoding => { + return Err(Box::new(ResponsePreviewError::UnknownEncoding { + content_encoding: encoding.to_string(), + })); + } + }; + + // one utf-8 char takes up to 4 bytes, read at most that number of bytes for utf-8 decoding + let mut buf = [0u8; PEEK_UTF8_CHARACTERS * 4]; + match reader.read(&mut buf) { + Ok(cnt) => Ok(String::from_utf8_lossy(&buf[..cnt]) + .chars() + .take(PEEK_UTF8_CHARACTERS) + .collect()), + Err(why) => Err(Box::new(ResponsePreviewError::IOError { err: why })), + } } /// A more generic version of `HttpRetryLogic` that accepts anything that can be converted diff --git a/website/content/en/guides/developer/debugging.md b/website/content/en/guides/developer/debugging.md index 4e84514264e02..b0af5dcf59103 100644 --- a/website/content/en/guides/developer/debugging.md +++ b/website/content/en/guides/developer/debugging.md @@ -35,6 +35,10 @@ You can set different verbosity levels for specific components: VECTOR_LOG=info,vector::sources::aws_s3=warn vector --config path/to/config.yaml ``` +There are some standalone log targets which may help you to debug: + +* `sink-http-response`: log sinks HTTP response body in `DEBUG` level. + You can find more information on the syntax [here](https://docs.rs/tracing-subscriber/latest/tracing_subscriber/filter/struct.EnvFilter.html#usage-notes). ### Vector Tools