diff --git a/Cargo.lock b/Cargo.lock index ff9e8907b9c2f..d35cde9c8e42d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1041,7 +1041,7 @@ dependencies = [ "bytes 1.10.1", "fastrand 2.3.0", "http 0.2.9", - "http-body 0.4.5", + "http-body 0.4.6", "percent-encoding", "pin-project-lite", "tracing 0.1.41", @@ -1069,7 +1069,7 @@ dependencies = [ "fastrand 2.3.0", "flate2", "http 0.2.9", - "http-body 0.4.5", + "http-body 0.4.6", "once_cell", "regex-lite", "tracing 0.1.41", @@ -1216,7 +1216,7 @@ dependencies = [ "hmac", "http 0.2.9", "http 1.3.1", - "http-body 0.4.5", + "http-body 0.4.6", "lru 0.12.5", "once_cell", "percent-encoding", @@ -1411,7 +1411,7 @@ dependencies = [ "crc64fast-nvme", "hex", "http 0.2.9", - "http-body 0.4.5", + "http-body 0.4.6", "md-5", "pin-project-lite", "sha1", @@ -1431,7 +1431,7 @@ dependencies = [ "flate2", "futures-util", "http 0.2.9", - "http-body 0.4.5", + "http-body 0.4.6", "pin-project-lite", "tracing 0.1.41", ] @@ -1461,7 +1461,7 @@ dependencies = [ "futures-core", "http 0.2.9", "http 1.3.1", - "http-body 0.4.5", + "http-body 0.4.6", "percent-encoding", "pin-project-lite", "pin-utils", @@ -1480,8 +1480,8 @@ dependencies = [ "h2 0.3.26", "h2 0.4.12", "http 0.2.9", - "http-body 0.4.5", - "hyper 0.14.28", + "http-body 0.4.6", + "hyper 0.14.32", "hyper-rustls 0.24.2", "pin-project-lite", "rustls 0.21.12", @@ -1533,7 +1533,7 @@ dependencies = [ "fastrand 2.3.0", "http 0.2.9", "http 1.3.1", - "http-body 0.4.5", + "http-body 0.4.6", "http-body 1.0.0", "pin-project-lite", "pin-utils", @@ -1570,7 +1570,7 @@ dependencies = [ "futures-core", "http 0.2.9", "http 1.3.1", - "http-body 0.4.5", + "http-body 0.4.6", "http-body 1.0.0", "http-body-util", "itoa", @@ -1619,8 +1619,8 @@ dependencies = [ "bytes 1.10.1", "futures-util", "http 0.2.9", - "http-body 0.4.5", - "hyper 0.14.28", + "http-body 0.4.6", + "hyper 0.14.32", "itoa", "matchit", "memchr", @@ -1673,7 +1673,7 @@ dependencies = [ "bytes 1.10.1", "futures-util", "http 0.2.9", - "http-body 0.4.5", + "http-body 0.4.6", "mime", "rustversion", "tower-layer", @@ -3497,7 +3497,7 @@ dependencies = [ "libc", "option-ext", "redox_users 0.5.0", - "windows-sys 0.60.2", + "windows-sys 0.61.0", ] [[package]] @@ -5108,9 +5108,9 @@ dependencies = [ [[package]] name = "http-body" -version = "0.4.5" +version = "0.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1" +checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" dependencies = [ "bytes 1.10.1", "http 0.2.9", @@ -5196,9 +5196,9 @@ checksum = "135b12329e5e3ce057a9f972339ea52bc954fe1e9358ef27f95e89716fbc5424" [[package]] name = "hyper" -version = "0.14.28" +version = "0.14.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf96e135eb83a2a8ddf766e426a841d8ddd7449d5f00d34ea02b41d2f19eef80" +checksum = "41dfc780fdec9373c01bae43289ea34c972e40ee3c9f6b3c8801a35f35586ce7" dependencies = [ "bytes 1.10.1", "futures-channel", @@ -5206,7 +5206,7 @@ dependencies = [ "futures-util", "h2 0.3.26", "http 0.2.9", - "http-body 0.4.5", + "http-body 0.4.6", "httparse", "httpdate", "itoa", @@ -5263,7 +5263,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6ee5d7a8f718585d1c3c61dfde28ef5b0bb14734b4db13f5ada856cdc6c612b" dependencies = [ "http 0.2.9", - "hyper 0.14.28", + "hyper 0.14.32", "linked_hash_set", "once_cell", "openssl", @@ -5303,7 +5303,7 @@ dependencies = [ "futures 0.3.31", "headers", "http 0.2.9", - "hyper 0.14.28", + "hyper 0.14.32", "openssl", "tokio", "tokio-openssl", @@ -5318,7 +5318,7 @@ checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590" dependencies = [ "futures-util", "http 0.2.9", - "hyper 0.14.28", + "hyper 0.14.32", "log", "rustls 0.21.12", "rustls-native-certs 0.6.3", @@ -5351,7 +5351,7 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" dependencies = [ - "hyper 0.14.28", + "hyper 0.14.32", "pin-project-lite", "tokio", "tokio-io-timeout", @@ -5377,7 +5377,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" dependencies = [ "bytes 1.10.1", - "hyper 0.14.28", + "hyper 0.14.32", "native-tls", "tokio", "tokio-native-tls", @@ -9279,8 +9279,8 @@ dependencies = [ "futures-util", "h2 0.3.26", "http 0.2.9", - "http-body 0.4.5", - "hyper 0.14.28", + "http-body 0.4.6", + "hyper 0.14.32", "hyper-rustls 0.24.2", "hyper-tls 0.5.0", "ipnet", @@ -11110,7 +11110,7 @@ dependencies = [ "getrandom 0.3.1", "once_cell", "rustix 1.0.1", - "windows-sys 0.60.2", + "windows-sys 0.61.0", ] [[package]] @@ -11650,8 +11650,8 @@ dependencies = [ "flate2", "h2 0.3.26", "http 0.2.9", - "http-body 0.4.5", - "hyper 0.14.28", + "http-body 0.4.6", + "hyper 0.14.32", "hyper-timeout 0.4.1", "percent-encoding", "pin-project", @@ -11776,7 +11776,7 @@ dependencies = [ "futures-core", "futures-util", "http 0.2.9", - "http-body 0.4.5", + "http-body 0.4.6", "http-range-header", "pin-project-lite", "tokio", @@ -12532,10 +12532,10 @@ dependencies = [ "hostname 0.4.0", "http 0.2.9", "http 1.3.1", - "http-body 0.4.5", + "http-body 0.4.6", "http-serde", "humantime", - "hyper 0.14.28", + "hyper 0.14.32", "hyper-openssl 0.9.2", "hyper-proxy", "indexmap 2.12.0", @@ -13213,7 +13213,7 @@ dependencies = [ "futures-util", "headers", "http 0.2.9", - "hyper 0.14.28", + "hyper 0.14.32", "log", "mime", "mime_guess", diff --git a/Cargo.toml b/Cargo.toml index ab9e6f0d3b2d2..dcf035c37f089 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -372,9 +372,9 @@ hostname = { version = "0.4.0", default-features = false } http = { version = "0.2.9", default-features = false } http-1 = { package = "http", version = "1.0", default-features = false, features = ["std"] } http-serde = "1.1.3" -http-body = { version = "0.4.5", default-features = false } +http-body = { version = "0.4.6", default-features = false } humantime.workspace = true -hyper = { version = "0.14.28", default-features = false, features = ["client", "runtime", "http1", "http2", "server", "stream"] } +hyper = { version = "0.14.32", default-features = false, features = ["client", "runtime", "http1", "http2", "server", "stream", "backports", "deprecated"] } hyper-openssl = { version = "0.9.2", default-features = false } hyper-proxy = { version = "0.9.1", default-features = false, features = ["openssl-tls"] } indexmap.workspace = true diff --git a/LICENSE-3rdparty.csv b/LICENSE-3rdparty.csv index 98e8d456e0b11..a946220a4da1b 100644 --- a/LICENSE-3rdparty.csv +++ b/LICENSE-3rdparty.csv @@ -886,12 +886,14 @@ windows-future,https://github.com/microsoft/windows-rs,MIT OR Apache-2.0,The win windows-implement,https://github.com/microsoft/windows-rs,MIT OR Apache-2.0,Microsoft windows-interface,https://github.com/microsoft/windows-rs,MIT OR Apache-2.0,Microsoft windows-link,https://github.com/microsoft/windows-rs,MIT OR Apache-2.0,Microsoft +windows-link,https://github.com/microsoft/windows-rs,MIT OR Apache-2.0,The windows-link Authors windows-numerics,https://github.com/microsoft/windows-rs,MIT OR Apache-2.0,The windows-numerics Authors windows-registry,https://github.com/microsoft/windows-rs,MIT OR Apache-2.0,Microsoft windows-result,https://github.com/microsoft/windows-rs,MIT OR Apache-2.0,Microsoft windows-service,https://github.com/mullvad/windows-service-rs,MIT OR Apache-2.0,Mullvad VPN windows-strings,https://github.com/microsoft/windows-rs,MIT OR Apache-2.0,Microsoft windows-sys,https://github.com/microsoft/windows-rs,MIT OR Apache-2.0,Microsoft +windows-sys,https://github.com/microsoft/windows-rs,MIT OR Apache-2.0,The windows-sys Authors windows-targets,https://github.com/microsoft/windows-rs,MIT OR Apache-2.0,Microsoft windows_aarch64_gnullvm,https://github.com/microsoft/windows-rs,MIT OR Apache-2.0,Microsoft windows_aarch64_msvc,https://github.com/microsoft/windows-rs,MIT OR Apache-2.0,Microsoft diff --git a/src/components/validation/resources/http.rs b/src/components/validation/resources/http.rs index a559760b9f91b..7a12f91a1ae35 100644 --- a/src/components/validation/resources/http.rs +++ b/src/components/validation/resources/http.rs @@ -13,6 +13,7 @@ use axum::{ }; use bytes::{BufMut as _, BytesMut}; use http::{Method, Request, StatusCode, Uri}; +use http_body::{Body as _, Collected}; use hyper::{Body, Client, Server}; use tokio::{ select, @@ -328,7 +329,7 @@ impl HttpResourceOutputContext<'_> { let mut decoder = decoder.clone(); async move { - match hyper::body::to_bytes(request.into_body()).await { + match request.into_body().collect().await.map(Collected::to_bytes) { Err(_) => StatusCode::INTERNAL_SERVER_ERROR.into_response(), Ok(body) => { let byte_size = body.len(); diff --git a/src/gcp.rs b/src/gcp.rs index 770283e17033b..9a0ff365a69ee 100644 --- a/src/gcp.rs +++ b/src/gcp.rs @@ -12,6 +12,7 @@ use goauth::{ credentials::Credentials, }; use http::{Uri, uri::PathAndQuery}; +use http_body::{Body as _, Collected}; use hyper::header::AUTHORIZATION; use smpl_jwt::Jwt; use snafu::{ResultExt, Snafu}; @@ -296,8 +297,10 @@ async fn get_token_implicit() -> Result { .context(GetImplicitTokenSnafu)?; let body = res.into_body(); - let bytes = hyper::body::to_bytes(body) + let bytes = body + .collect() .await + .map(Collected::to_bytes) .context(GetTokenBytesSnafu)?; // Token::from_str is irresponsible and may panic! diff --git a/src/providers/http.rs b/src/providers/http.rs index e916829ab1cef..c7dfa043eef06 100644 --- a/src/providers/http.rs +++ b/src/providers/http.rs @@ -1,6 +1,7 @@ use async_stream::stream; use bytes::Buf; use futures::Stream; +use http_body::{Body as _, Collected}; use hyper::Body; use indexmap::IndexMap; use tokio::time; @@ -111,8 +112,11 @@ async fn http_request( info!(message = "Response received.", url = ?url.as_str()); - hyper::body::to_bytes(response.into_body()) + response + .into_body() + .collect() .await + .map(Collected::to_bytes) .map_err(|err| { let message = "Error interpreting response."; let cause = err.into_cause(); diff --git a/src/sinks/azure_monitor_logs/tests.rs b/src/sinks/azure_monitor_logs/tests.rs index 31bfede218356..06f099d8db3df 100644 --- a/src/sinks/azure_monitor_logs/tests.rs +++ b/src/sinks/azure_monitor_logs/tests.rs @@ -2,7 +2,6 @@ use std::time::Duration; use futures::{future::ready, stream}; use http::Response; -use hyper::body; use openssl::{base64, hash, pkey, sign}; use tokio::time::timeout; use vector_lib::config::log_schema; @@ -205,7 +204,7 @@ async fn correct_request() { let (parts, body) = request.into_parts(); assert_eq!(&parts.method.to_string(), "POST"); - let body = body::to_bytes(body).await.unwrap(); + let body = http_body::Body::collect(body).await.unwrap().to_bytes(); let json: serde_json::Value = serde_json::from_slice(&body[..]).unwrap(); let expected_json = serde_json::json!([ { diff --git a/src/sinks/datadog/traces/apm_stats/integration_tests.rs b/src/sinks/datadog/traces/apm_stats/integration_tests.rs index bcf1c6937826d..78800c1c048e3 100644 --- a/src/sinks/datadog/traces/apm_stats/integration_tests.rs +++ b/src/sinks/datadog/traces/apm_stats/integration_tests.rs @@ -1,5 +1,7 @@ use std::{collections::HashMap, io::Read, net::SocketAddr, sync::Arc}; +use http_body::Body as _; + use axum::{ Router, body::Body, @@ -122,9 +124,11 @@ async fn process_stats(Extension(state): Extension>, mut request: debug!("`{}` server got stats payload.", state.name); let body = request.body_mut(); - let compressed_body_bytes = hyper::body::to_bytes(body) + let compressed_body_bytes = body + .collect() .await - .expect("could not decode body into bytes"); + .expect("could not decode body into bytes") + .to_bytes(); let mut gz = GzDecoder::new(compressed_body_bytes.as_ref()); let mut decompressed_body_bytes = vec![]; diff --git a/src/sinks/datadog/traces/service.rs b/src/sinks/datadog/traces/service.rs index 1fd61865b747b..9d94ed18a8cfd 100644 --- a/src/sinks/datadog/traces/service.rs +++ b/src/sinks/datadog/traces/service.rs @@ -6,6 +6,7 @@ use std::{ use bytes::{Buf, Bytes}; use futures::future::BoxFuture; use http::{Request, StatusCode, Uri}; +use http_body::{Body as _, Collected}; use hyper::Body; use snafu::ResultExt; use tower::Service; @@ -156,8 +157,10 @@ impl Service for TraceApiService { let response = client.send(http_request).await?; let (parts, body) = response.into_parts(); - let mut body = hyper::body::aggregate(body) + let mut body = body + .collect() .await + .map(Collected::aggregate) .context(CallRequestSnafu)?; let body = body.copy_to_bytes(body.remaining()); diff --git a/src/sinks/elasticsearch/common.rs b/src/sinks/elasticsearch/common.rs index 5657f04394c90..c279a202cf841 100644 --- a/src/sinks/elasticsearch/common.rs +++ b/src/sinks/elasticsearch/common.rs @@ -1,6 +1,7 @@ use bytes::{Buf, Bytes}; use http::{Response, StatusCode, Uri}; -use hyper::{Body, body}; +use http_body::Body as _; +use hyper::Body; use serde::Deserialize; use snafu::ResultExt; use vector_lib::config::{LogNamespace, proxy::ProxyConfig}; @@ -401,7 +402,7 @@ async fn get_version( .map_err(|error| format!("Failed to get Elasticsearch API version: {error}"))?; let (_, body) = response.into_parts(); - let mut body = body::aggregate(body).await?; + let mut body = body.collect().await?.aggregate(); let body = body.copy_to_bytes(body.remaining()); let ResponsePayload { version } = serde_json::from_slice(&body)?; if let Some(version) = version diff --git a/src/sinks/honeycomb/config.rs b/src/sinks/honeycomb/config.rs index e98b2f8cb3e49..05a56cd9df368 100644 --- a/src/sinks/honeycomb/config.rs +++ b/src/sinks/honeycomb/config.rs @@ -167,7 +167,7 @@ async fn healthcheck(uri: Uri, api_key: SensitiveString, client: HttpClient) -> let res = client.send(req).await?; let status = res.status(); - let body = hyper::body::to_bytes(res.into_body()).await?; + let body = http_body::Body::collect(res.into_body()).await?.to_bytes(); if status == StatusCode::BAD_REQUEST { Ok(()) diff --git a/src/sinks/keep/config.rs b/src/sinks/keep/config.rs index d07d27f0b4d60..7b81281253672 100644 --- a/src/sinks/keep/config.rs +++ b/src/sinks/keep/config.rs @@ -143,7 +143,7 @@ async fn healthcheck(uri: Uri, api_key: SensitiveString, client: HttpClient) -> let res = client.send(req).await?; let status = res.status(); - let body = hyper::body::to_bytes(res.into_body()).await?; + let body = http_body::Body::collect(res.into_body()).await?.to_bytes(); match status { StatusCode::OK => Ok(()), // Healthcheck passed diff --git a/src/sinks/prometheus/exporter.rs b/src/sinks/prometheus/exporter.rs index 63d2ed3a1a463..6072bc8274ea0 100644 --- a/src/sinks/prometheus/exporter.rs +++ b/src/sinks/prometheus/exporter.rs @@ -962,9 +962,10 @@ mod tests { } let body = result.into_body(); - let bytes = hyper::body::to_bytes(body) + let bytes = http_body::Body::collect(body) .await - .expect("Reading body failed"); + .expect("Reading body failed") + .to_bytes(); sink_handle.await.unwrap(); @@ -1041,9 +1042,10 @@ mod tests { } let body = result.into_body(); - let bytes = hyper::body::to_bytes(body) + let bytes = http_body::Body::collect(body) .await - .expect("Reading body failed"); + .expect("Reading body failed") + .to_bytes(); let result = String::from_utf8(bytes.to_vec()).unwrap(); sink_handle.await.unwrap(); @@ -1495,9 +1497,10 @@ mod integration_tests { .send(request) .await .expect("Could not send request"); - let result = hyper::body::to_bytes(result.into_body()) + let result = http_body::Body::collect(result.into_body()) .await - .expect("Error fetching body"); + .expect("Error fetching body") + .to_bytes(); String::from_utf8_lossy(&result).to_string() } @@ -1516,9 +1519,10 @@ mod integration_tests { .send(request) .await .expect("Could not fetch query"); - let result = hyper::body::to_bytes(result.into_body()) + let result = http_body::Body::collect(result.into_body()) .await - .expect("Error fetching body"); + .expect("Error fetching body") + .to_bytes(); let result = String::from_utf8_lossy(&result); serde_json::from_str(result.as_ref()).expect("Invalid JSON from prometheus") } diff --git a/src/sinks/prometheus/remote_write/service.rs b/src/sinks/prometheus/remote_write/service.rs index 8d9ee0c39fc66..6a0a75106f7b7 100644 --- a/src/sinks/prometheus/remote_write/service.rs +++ b/src/sinks/prometheus/remote_write/service.rs @@ -69,7 +69,7 @@ impl Service for RemoteWriteService { let response = client.send(http_request).await?; let (parts, body) = response.into_parts(); - let body = hyper::body::to_bytes(body).await?; + let body = http_body::Body::collect(body).await?.to_bytes(); let http_response = hyper::Response::from_parts(parts, body); if http_response.status().is_success() { diff --git a/src/sinks/splunk_hec/common/acknowledgements.rs b/src/sinks/splunk_hec/common/acknowledgements.rs index e0df41e9660e9..a61fd38c0e10e 100644 --- a/src/sinks/splunk_hec/common/acknowledgements.rs +++ b/src/sinks/splunk_hec/common/acknowledgements.rs @@ -1,3 +1,4 @@ +use http_body::{Body as _, Collected}; use hyper::Body; use serde::{Deserialize, Serialize}; use std::{ @@ -243,8 +244,11 @@ impl HecAckClient { let status = response.status(); if status.is_success() { - let response_body = hyper::body::to_bytes(response.into_body()) + let response_body = response + .into_body() + .collect() .await + .map(Collected::to_bytes) .map_err(|_| HecAckApiError::ClientParseResponse)?; serde_json::from_slice::(&response_body) .map_err(|_| HecAckApiError::ClientParseResponse) diff --git a/src/sinks/util/http.rs b/src/sinks/util/http.rs index 12a0d30caec6a..e21ef48e4ed66 100644 --- a/src/sinks/util/http.rs +++ b/src/sinks/util/http.rs @@ -6,6 +6,7 @@ use bytes::{Buf, Bytes}; use futures::{Sink, future::BoxFuture}; use headers::HeaderName; use http::{HeaderValue, Request, Response, StatusCode, header}; +use http_body::Body as _; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct OrderedHeaderName(HeaderName); @@ -49,7 +50,7 @@ use std::{ time::Duration, }; -use hyper::{Body, body}; +use hyper::Body; use pin_project::pin_project; use snafu::{ResultExt, Snafu}; use tower::{Service, ServiceBuilder}; @@ -519,7 +520,7 @@ where } let (parts, body) = response.into_parts(); - let mut body = body::aggregate(body).await?; + let mut body = body.collect().await?.aggregate(); Ok(hyper::Response::from_parts( parts, body.copy_to_bytes(body.remaining()), @@ -997,13 +998,14 @@ mod test { let new_service = make_service_fn(move |_| { let tx = tx.clone(); - let svc = service_fn(move |req| { + let svc = service_fn(move |req: http::Request| { let mut tx = tx.clone(); async move { - let mut body = hyper::body::aggregate(req.into_body()) + let mut body = http_body::Body::collect(req.into_body()) .await - .map_err(|error| format!("error: {error}"))?; + .map_err(|error| format!("error: {error}"))? + .aggregate(); let string = String::from_utf8(body.copy_to_bytes(body.remaining()).to_vec()) .map_err(|_| "Wasn't UTF-8".to_string())?; tx.try_send(string).map_err(|_| "Send error".to_string())?; diff --git a/src/sinks/util/test.rs b/src/sinks/util/test.rs index 356a9c71b2f23..477a2cbbd4a00 100644 --- a/src/sinks/util/test.rs +++ b/src/sinks/util/test.rs @@ -93,7 +93,7 @@ where let response = responder(); if response.status().is_success() { tokio::spawn(async move { - let bytes = hyper::body::to_bytes(body).await.unwrap(); + let bytes = http_body::Body::collect(body).await.unwrap().to_bytes(); tx.send((parts, bytes)).await.unwrap(); }); } diff --git a/src/sources/apache_metrics/mod.rs b/src/sources/apache_metrics/mod.rs index dc8256c535132..d9cfb1b1ec387 100644 --- a/src/sources/apache_metrics/mod.rs +++ b/src/sources/apache_metrics/mod.rs @@ -173,7 +173,7 @@ fn apache_metrics( .map_err(crate::Error::from) .and_then(|response| async { let (header, body) = response.into_parts(); - let body = hyper::body::to_bytes(body).await?; + let body = http_body::Body::collect(body).await?.to_bytes(); Ok((header, body)) }) .into_stream() diff --git a/src/sources/aws_ecs_metrics/mod.rs b/src/sources/aws_ecs_metrics/mod.rs index c64dd853f5aa4..86d5ce01f2fd6 100644 --- a/src/sources/aws_ecs_metrics/mod.rs +++ b/src/sources/aws_ecs_metrics/mod.rs @@ -1,6 +1,7 @@ use std::{env, time::Duration}; use futures::StreamExt; +use http_body::Collected; use hyper::{Body, Request}; use serde_with::serde_as; use tokio::time; @@ -193,7 +194,10 @@ async fn aws_ecs_metrics( match http_client.send(request).await { Ok(response) if response.status() == hyper::StatusCode::OK => { - match hyper::body::to_bytes(response).await { + match http_body::Body::collect(response.into_body()) + .await + .map(Collected::to_bytes) + { Ok(body) => { bytes_received.emit(ByteSize(body.len())); diff --git a/src/sources/eventstoredb_metrics/mod.rs b/src/sources/eventstoredb_metrics/mod.rs index 7172c07c8cffe..4f6d093082e1a 100644 --- a/src/sources/eventstoredb_metrics/mod.rs +++ b/src/sources/eventstoredb_metrics/mod.rs @@ -2,6 +2,7 @@ use std::time::Duration; use futures::{FutureExt, StreamExt}; use http::Uri; +use http_body::Collected; use hyper::{Body, Request}; use serde_with::serde_as; use tokio_stream::wrappers::IntervalStream; @@ -113,7 +114,10 @@ fn eventstoredb( } Ok(resp) => { - let bytes = match hyper::body::to_bytes(resp.into_body()).await { + let bytes = match http_body::Body::collect(resp.into_body()) + .await + .map(Collected::to_bytes) + { Ok(b) => b, Err(error) => { emit!(EventStoreDbMetricsHttpError { diff --git a/src/sources/gcp_pubsub.rs b/src/sources/gcp_pubsub.rs index 445d8d8f9caab..b01664b7ade63 100644 --- a/src/sources/gcp_pubsub.rs +++ b/src/sources/gcp_pubsub.rs @@ -1140,7 +1140,10 @@ mod integration_tests { .unwrap(); let response = self.client.send(request).await.unwrap(); assert_eq!(response.status(), StatusCode::OK); - let body = hyper::body::to_bytes(response.into_body()).await.unwrap(); + let body = http_body::Body::collect(response.into_body()) + .await + .unwrap() + .to_bytes(); serde_json::from_str(core::str::from_utf8(&body).unwrap()).unwrap() } diff --git a/src/sources/nginx_metrics/mod.rs b/src/sources/nginx_metrics/mod.rs index f009ae988d6e4..6deecfcb71b64 100644 --- a/src/sources/nginx_metrics/mod.rs +++ b/src/sources/nginx_metrics/mod.rs @@ -7,7 +7,8 @@ use bytes::Bytes; use chrono::Utc; use futures::{StreamExt, TryFutureExt, future::join_all}; use http::{Request, StatusCode}; -use hyper::{Body, Uri, body::to_bytes as body_to_bytes}; +use http_body::Collected; +use hyper::{Body, Uri}; use serde_with::serde_as; use snafu::{ResultExt, Snafu}; use tokio::time; @@ -251,7 +252,10 @@ impl NginxMetrics { let response = self.http_client.send(request).await?; let (parts, body) = response.into_parts(); match parts.status { - StatusCode::OK => body_to_bytes(body).err_into().await, + StatusCode::OK => http_body::Body::collect(body) + .err_into() + .await + .map(Collected::to_bytes), status => Err(Box::new(NginxError::InvalidResponseStatus { status })), } } diff --git a/src/sources/okta/client.rs b/src/sources/okta/client.rs index c13e7a5cefe27..4b06b6a6d71b9 100644 --- a/src/sources/okta/client.rs +++ b/src/sources/okta/client.rs @@ -218,7 +218,7 @@ async fn run_once(url: String, result: OktaTimeoutResult, timeout: Duration) -> next = Some(next_url); }; - let body = hyper::body::to_bytes(body).await?; + let body = http_body::Body::collect(body).await?.to_bytes(); emit!(EndpointBytesReceived { byte_size: body.len(), diff --git a/src/sources/prometheus/scrape.rs b/src/sources/prometheus/scrape.rs index 85ace26146721..414bee899c51e 100644 --- a/src/sources/prometheus/scrape.rs +++ b/src/sources/prometheus/scrape.rs @@ -318,6 +318,7 @@ impl HttpClientContext for PrometheusScrapeContext { #[cfg(all(test, feature = "sinks-prometheus"))] mod test { + use http_body::Body as _; use hyper::{ Body, Client, Response, Server, service::{make_service_fn, service_fn}, @@ -717,7 +718,7 @@ mod test { .unwrap(); assert!(response.status().is_success()); - let body = hyper::body::to_bytes(response.into_body()).await.unwrap(); + let body = response.into_body().collect().await.unwrap().to_bytes(); let lines = std::str::from_utf8(&body) .unwrap() .lines() diff --git a/src/sources/util/http_client.rs b/src/sources/util/http_client.rs index 4ae3cccf24f81..4ff9eb911e433 100644 --- a/src/sources/util/http_client.rs +++ b/src/sources/util/http_client.rs @@ -211,7 +211,7 @@ pub(crate) async fn call< }) .and_then(|response| async move { let (header, body) = response.into_parts(); - let body = hyper::body::to_bytes(body).await?; + let body = http_body::Body::collect(body).await?.to_bytes(); emit!(EndpointBytesReceived { byte_size: body.len(), protocol: "http", diff --git a/src/transforms/aws_ec2_metadata.rs b/src/transforms/aws_ec2_metadata.rs index 5890625332f92..0bbb6bf61376b 100644 --- a/src/transforms/aws_ec2_metadata.rs +++ b/src/transforms/aws_ec2_metadata.rs @@ -10,7 +10,7 @@ use arc_swap::ArcSwap; use bytes::Bytes; use futures::{Stream, StreamExt}; use http::{Request, StatusCode, Uri, uri::PathAndQuery}; -use hyper::{Body, body::to_bytes as body_to_bytes}; +use hyper::Body; use serde::Deserialize; use serde_with::serde_as; use snafu::ResultExt as _; @@ -433,7 +433,7 @@ impl MetadataClient { .into()), })?; - let token = body_to_bytes(res.into_body()).await?; + let token = http_body::Body::collect(res.into_body()).await?.to_bytes(); let next_refresh = Instant::now() + Duration::from_secs(21600); self.token = Some((token.clone(), next_refresh)); @@ -619,7 +619,7 @@ impl MetadataClient { .into()), })? { Some(res) => { - let body = body_to_bytes(res.into_body()).await?; + let body = http_body::Body::collect(res.into_body()).await?.to_bytes(); Ok(Some(body)) } None => Ok(None),