|
2 | 2 | use aws_credential_types::provider::SharedCredentialsProvider; |
3 | 3 | #[cfg(feature = "aws-core")] |
4 | 4 | use aws_types::region::Region; |
| 5 | +use brotli::Decompressor as BrotliDecoder; |
5 | 6 | use bytes::{Buf, Bytes}; |
| 7 | +use flate2::read::GzDecoder; |
6 | 8 | use futures::{Sink, future::BoxFuture}; |
7 | 9 | use headers::HeaderName; |
8 | 10 | use http::{HeaderValue, Request, Response, StatusCode, header}; |
9 | 11 | use http_body::Body as _; |
| 12 | +use zstd::stream::read::Decoder as ZstdDecoder; |
10 | 13 |
|
11 | 14 | #[derive(Debug, Clone, PartialEq, Eq, Hash)] |
12 | 15 | pub struct OrderedHeaderName(HeaderName); |
@@ -38,21 +41,24 @@ impl PartialOrd for OrderedHeaderName { |
38 | 41 | Some(self.cmp(other)) |
39 | 42 | } |
40 | 43 | } |
| 44 | +use flate2::bufread::ZlibDecoder; |
| 45 | +use http::header::{CONTENT_ENCODING, CONTENT_TYPE}; |
| 46 | +use hyper::Body; |
| 47 | +use pin_project::pin_project; |
| 48 | +use snafu::{ResultExt, Snafu}; |
| 49 | +use std::io::Read; |
41 | 50 | use std::{ |
42 | 51 | collections::BTreeMap, |
43 | 52 | fmt, |
44 | 53 | future::Future, |
45 | 54 | hash::Hash, |
| 55 | + io, |
46 | 56 | marker::PhantomData, |
47 | 57 | pin::Pin, |
48 | 58 | sync::Arc, |
49 | 59 | task::{Context, Poll, ready}, |
50 | 60 | time::Duration, |
51 | 61 | }; |
52 | | - |
53 | | -use hyper::Body; |
54 | | -use pin_project::pin_project; |
55 | | -use snafu::{ResultExt, Snafu}; |
56 | 62 | use tower::{Service, ServiceBuilder}; |
57 | 63 | use tower_http::decompression::DecompressionLayer; |
58 | 64 | use vector_lib::{ |
@@ -580,15 +586,89 @@ impl<Req: Clone + Send + Sync + 'static> RetryLogic for HttpRetryLogic<Req> { |
580 | 586 | StatusCode::NOT_IMPLEMENTED => { |
581 | 587 | RetryAction::DontRetry("endpoint not implemented".into()) |
582 | 588 | } |
583 | | - _ if status.is_server_error() => RetryAction::Retry( |
584 | | - format!("{}: {}", status, String::from_utf8_lossy(response.body())).into(), |
585 | | - ), |
586 | 589 | _ if status.is_success() => RetryAction::Successful, |
587 | | - _ => RetryAction::DontRetry(format!("response status: {status}").into()), |
| 590 | + _ => { |
| 591 | + let body_preview = get_response_preview(response) |
| 592 | + .unwrap_or_else(|err| format!("cannot peek: {err:?}")); |
| 593 | + if status.is_server_error() { |
| 594 | + RetryAction::Retry(format!("{status}: {body_preview}").into()) |
| 595 | + } else { |
| 596 | + RetryAction::DontRetry( |
| 597 | + format!("response status: {status}, body: {body_preview}").into(), |
| 598 | + ) |
| 599 | + } |
| 600 | + } |
588 | 601 | } |
589 | 602 | } |
590 | 603 | } |
591 | 604 |
|
| 605 | +#[derive(Debug, Snafu)] |
| 606 | +pub enum ResponsePreviewError { |
| 607 | + #[snafu(display("Cannot preview a binary response content: {}", content_type))] |
| 608 | + BinaryContent { content_type: String }, |
| 609 | + #[snafu(display("Unknown encoding of content in HTTP response: {}", content_encoding))] |
| 610 | + UnknownEncoding { content_encoding: String }, |
| 611 | + #[snafu(display("Error reading data: {:?}", err))] |
| 612 | + IOError { err: io::Error }, |
| 613 | +} |
| 614 | + |
| 615 | +/// Try to decompress and read the first 1024 bytes from the HTTP response body. |
| 616 | +fn get_response_preview(response: &Response<Bytes>) -> crate::Result<String> { |
| 617 | + const BROTLI_INTERNAL_BUFFER_SIZE: usize = 4096; |
| 618 | + const PEEK_UTF8_CHARACTERS: usize = 1024; |
| 619 | + const UNKNOWN_CONTENT_TYPE: &str = "unspecified"; |
| 620 | + |
| 621 | + // skip binary data |
| 622 | + let content_type = response |
| 623 | + .headers() |
| 624 | + .get(CONTENT_TYPE) |
| 625 | + .and_then(|v| v.to_str().ok()) |
| 626 | + .unwrap_or(UNKNOWN_CONTENT_TYPE); |
| 627 | + if content_type.starts_with("image/") |
| 628 | + || content_type.starts_with("video/") |
| 629 | + || content_type.starts_with("audio/") |
| 630 | + || content_type == "application/octet-stream" |
| 631 | + { |
| 632 | + return Err(Box::new(ResponsePreviewError::BinaryContent { |
| 633 | + content_type: content_type.to_string(), |
| 634 | + })); |
| 635 | + } |
| 636 | + |
| 637 | + let body_bytes = response.body().as_ref(); |
| 638 | + let content_encoding = response |
| 639 | + .headers() |
| 640 | + .get(CONTENT_ENCODING) |
| 641 | + .and_then(|v| v.to_str().ok()) |
| 642 | + .unwrap_or_else(|| "unknown"); |
| 643 | + |
| 644 | + // handle different compression methods in HTTP response |
| 645 | + let mut reader: Box<dyn Read> = match content_encoding { |
| 646 | + "gzip" => Box::new(GzDecoder::new(body_bytes)), |
| 647 | + "deflate" => Box::new(ZlibDecoder::new(body_bytes)), |
| 648 | + "br" => Box::new(BrotliDecoder::new(body_bytes, BROTLI_INTERNAL_BUFFER_SIZE)), |
| 649 | + "zstd" => Box::new( |
| 650 | + ZstdDecoder::new(body_bytes).unwrap_or_else(|_| ZstdDecoder::new(body_bytes).unwrap()), |
| 651 | + ), |
| 652 | + // unspecified or identity encoding, treat as utf-8 directly |
| 653 | + UNKNOWN_CONTENT_TYPE | "identity" => Box::new(body_bytes), |
| 654 | + encoding => { |
| 655 | + return Err(Box::new(ResponsePreviewError::UnknownEncoding { |
| 656 | + content_encoding: encoding.to_string(), |
| 657 | + })); |
| 658 | + } |
| 659 | + }; |
| 660 | + |
| 661 | + // one utf-8 char takes up to 4 bytes, read at most that number of bytes for utf-8 decoding |
| 662 | + let mut buf = [0u8; PEEK_UTF8_CHARACTERS * 4]; |
| 663 | + match reader.read(&mut buf) { |
| 664 | + Ok(cnt) => Ok(String::from_utf8_lossy(&buf[..cnt]) |
| 665 | + .chars() |
| 666 | + .take(PEEK_UTF8_CHARACTERS) |
| 667 | + .collect()), |
| 668 | + Err(why) => Err(Box::new(ResponsePreviewError::IOError { err: why })), |
| 669 | + } |
| 670 | +} |
| 671 | + |
592 | 672 | /// A more generic version of `HttpRetryLogic` that accepts anything that can be converted |
593 | 673 | /// to a status code |
594 | 674 | #[derive(Debug)] |
|
0 commit comments