Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 27 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ relative-path = { version = "1.7", features = ["serde"] }
reqwest = { version = "0.11.27", default-features = false, features = [
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be nice to have the latest version 0.12.x if there is no blocker issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will take this up in #1075

Copy link
Contributor Author

@de-sh de-sh Jan 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actix_http:latest and reqwest:v0.11+ are incompatible because they depend on different versions of http(ref1, ref2).

"rustls-tls",
"json",
"gzip",
"brotli",
] } # cannot update cause rustls is not latest `see rustls`
rustls = "0.22.4" # cannot update to 0.23 actix has not caught up yet
rustls-pemfile = "2.1.2"
Expand Down
8 changes: 3 additions & 5 deletions src/analytics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ use crate::handlers::http::cluster::utils::check_liveness;
use crate::handlers::http::{base_path_without_preceding_slash, cluster};
use crate::handlers::STREAM_NAME_HEADER_KEY;
use crate::option::{Mode, CONFIG};
use crate::storage;
use crate::{metadata, stats};
use crate::{storage, HTTP_CLIENT};

use crate::stats::Stats;
use actix_web::{web, HttpRequest, Responder};
Expand Down Expand Up @@ -132,9 +132,7 @@ impl Report {
}

pub async fn send(&self) {
let client = reqwest::Client::new();

let _ = client
let _ = HTTP_CLIENT
.post(ANALYTICS_SERVER_URL)
.header(STREAM_NAME_HEADER_KEY, "serverusageevent")
.json(&self)
Expand Down Expand Up @@ -240,7 +238,7 @@ async fn fetch_ingestors_metrics(
))
.expect("Should be a valid URL");

let resp = reqwest::Client::new()
let resp = HTTP_CLIENT
.get(uri)
.header(header::AUTHORIZATION, im.token.clone())
.header(header::CONTENT_TYPE, "application/json")
Expand Down
12 changes: 3 additions & 9 deletions src/audit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,11 @@ use std::{
fmt::{Debug, Display},
};

use crate::{about::current, storage::StorageMetadata};
use crate::{about::current, storage::StorageMetadata, HTTP_CLIENT};

use super::option::CONFIG;
use chrono::{DateTime, Utc};
use once_cell::sync::Lazy;
use reqwest::Client;
use serde::Serialize;
use serde_json::{json, Value};
use tracing::error;
Expand All @@ -38,7 +37,6 @@ static AUDIT_LOGGER: Lazy<Option<AuditLogger>> = Lazy::new(AuditLogger::new);

// AuditLogger handles sending audit logs to a remote logging system
pub struct AuditLogger {
client: Client,
log_endpoint: Url,
}

Expand All @@ -62,16 +60,12 @@ impl AuditLogger {
}
};

Some(AuditLogger {
client: reqwest::Client::new(),
log_endpoint,
})
Some(AuditLogger { log_endpoint })
}

// Sends the audit log to the configured endpoint with proper authentication
async fn send_log(&self, json: Value) {
let mut req = self
.client
let mut req = HTTP_CLIENT
.post(self.log_endpoint.as_str())
.json(&json)
.header("x-p-stream", "audit_log");
Expand Down
32 changes: 12 additions & 20 deletions src/handlers/http/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use crate::stats::Stats;
use crate::storage::object_storage::ingestor_metadata_path;
use crate::storage::{ObjectStorageError, STREAM_ROOT_DIRECTORY};
use crate::storage::{ObjectStoreFormat, PARSEABLE_ROOT_DIRECTORY};
use crate::HTTP_CLIENT;
use actix_web::http::header::{self, HeaderMap};
use actix_web::{HttpRequest, Responder};
use bytes::Bytes;
Expand Down Expand Up @@ -76,8 +77,6 @@ pub async fn sync_streams_with_ingestors(
StreamError::Anyhow(err)
})?;

let client = reqwest::Client::new();

for ingestor in ingestor_infos {
if !utils::check_liveness(&ingestor.domain_name).await {
warn!("Ingestor {} is not live", ingestor.domain_name);
Expand All @@ -89,7 +88,7 @@ pub async fn sync_streams_with_ingestors(
base_path_without_preceding_slash(),
stream_name
);
let res = client
let res = HTTP_CLIENT
.put(url)
.headers(reqwest_headers.clone())
.header(header::AUTHORIZATION, &ingestor.token)
Expand Down Expand Up @@ -126,7 +125,6 @@ pub async fn sync_users_with_roles_with_ingestors(
RBACError::Anyhow(err)
})?;

let client = reqwest::Client::new();
let role = to_vec(&role.clone()).map_err(|err| {
error!("Fatal: failed to serialize role: {:?}", err);
RBACError::SerdeError(err)
Expand All @@ -143,7 +141,7 @@ pub async fn sync_users_with_roles_with_ingestors(
username
);

let res = client
let res = HTTP_CLIENT
.put(url)
.header(header::AUTHORIZATION, &ingestor.token)
.header(header::CONTENT_TYPE, "application/json")
Expand Down Expand Up @@ -177,7 +175,6 @@ pub async fn sync_user_deletion_with_ingestors(username: &String) -> Result<(),
RBACError::Anyhow(err)
})?;

let client = reqwest::Client::new();
for ingestor in ingestor_infos.iter() {
if !utils::check_liveness(&ingestor.domain_name).await {
warn!("Ingestor {} is not live", ingestor.domain_name);
Expand All @@ -190,7 +187,7 @@ pub async fn sync_user_deletion_with_ingestors(username: &String) -> Result<(),
username
);

let res = client
let res = HTTP_CLIENT
.delete(url)
.header(header::AUTHORIZATION, &ingestor.token)
.send()
Expand Down Expand Up @@ -231,7 +228,6 @@ pub async fn sync_user_creation_with_ingestors(
user.roles.clone_from(role);
}
let username = user.username();
let client = reqwest::Client::new();

let user = to_vec(&user).map_err(|err| {
error!("Fatal: failed to serialize user: {:?}", err);
Expand All @@ -250,7 +246,7 @@ pub async fn sync_user_creation_with_ingestors(
username
);

let res = client
let res = HTTP_CLIENT
.post(url)
.header(header::AUTHORIZATION, &ingestor.token)
.header(header::CONTENT_TYPE, "application/json")
Expand Down Expand Up @@ -283,7 +279,6 @@ pub async fn sync_password_reset_with_ingestors(username: &String) -> Result<(),
error!("Fatal: failed to get ingestor info: {:?}", err);
RBACError::Anyhow(err)
})?;
let client = reqwest::Client::new();

for ingestor in ingestor_infos.iter() {
if !utils::check_liveness(&ingestor.domain_name).await {
Expand All @@ -297,7 +292,7 @@ pub async fn sync_password_reset_with_ingestors(username: &String) -> Result<(),
username
);

let res = client
let res = HTTP_CLIENT
.post(url)
.header(header::AUTHORIZATION, &ingestor.token)
.header(header::CONTENT_TYPE, "application/json")
Expand Down Expand Up @@ -338,7 +333,6 @@ pub async fn sync_role_update_with_ingestors(
RoleError::SerdeError(err)
})?;
let roles = Bytes::from(roles);
let client = reqwest::Client::new();

for ingestor in ingestor_infos.iter() {
if !utils::check_liveness(&ingestor.domain_name).await {
Expand All @@ -352,7 +346,7 @@ pub async fn sync_role_update_with_ingestors(
name
);

let res = client
let res = HTTP_CLIENT
.put(url)
.header(header::AUTHORIZATION, &ingestor.token)
.header(header::CONTENT_TYPE, "application/json")
Expand Down Expand Up @@ -401,7 +395,7 @@ pub async fn fetch_daily_stats_from_ingestors(
StreamError::Anyhow(anyhow::anyhow!("Invalid URL in Ingestor Metadata: {}", err))
})?;

let res = reqwest::Client::new()
let res = HTTP_CLIENT
.get(uri)
.header(header::AUTHORIZATION, &ingestor.token)
.header(header::CONTENT_TYPE, "application/json")
Expand Down Expand Up @@ -512,8 +506,7 @@ pub async fn send_stream_delete_request(
if !utils::check_liveness(&ingestor.domain_name).await {
return Ok(());
}
let client = reqwest::Client::new();
let resp = client
let resp = HTTP_CLIENT
.delete(url)
.header(header::CONTENT_TYPE, "application/json")
.header(header::AUTHORIZATION, ingestor.token)
Expand Down Expand Up @@ -551,8 +544,7 @@ pub async fn send_retention_cleanup_request(
if !utils::check_liveness(&ingestor.domain_name).await {
return Ok(first_event_at);
}
let client = reqwest::Client::new();
let resp = client
let resp = HTTP_CLIENT
.post(url)
.header(header::CONTENT_TYPE, "application/json")
.header(header::AUTHORIZATION, ingestor.token)
Expand Down Expand Up @@ -603,7 +595,7 @@ pub async fn get_cluster_info() -> Result<impl Responder, StreamError> {
))
.expect("should always be a valid url");

let resp = reqwest::Client::new()
let resp = HTTP_CLIENT
.get(uri)
.header(header::AUTHORIZATION, ingestor.token.clone())
.header(header::CONTENT_TYPE, "application/json")
Expand Down Expand Up @@ -752,7 +744,7 @@ async fn fetch_cluster_metrics() -> Result<Vec<Metrics>, PostError> {
PostError::Invalid(anyhow::anyhow!("Invalid URL in Ingestor Metadata: {}", err))
})?;

let res = reqwest::Client::new()
let res = HTTP_CLIENT
.get(uri)
.header(header::AUTHORIZATION, &ingestor.token)
.header(header::CONTENT_TYPE, "application/json")
Expand Down
14 changes: 8 additions & 6 deletions src/handlers/http/cluster/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@
*
*/

use crate::handlers::http::{
base_path_without_preceding_slash, logstream::error::StreamError, modal::IngestorMetadata,
use crate::{
handlers::http::{
base_path_without_preceding_slash, logstream::error::StreamError, modal::IngestorMetadata,
},
HTTP_CLIENT,
};
use actix_web::http::header;
use chrono::{DateTime, Utc};
Expand Down Expand Up @@ -235,13 +238,13 @@ pub async fn check_liveness(domain_name: &str) -> bool {
}
};

let reqw = reqwest::Client::new()
let req = HTTP_CLIENT
.get(uri)
.header(header::CONTENT_TYPE, "application/json")
.send()
.await;

reqw.is_ok()
req.is_ok()
}

/// send a request to the ingestor to fetch its stats
Expand All @@ -255,8 +258,7 @@ pub async fn send_stats_request(
return Ok(None);
}

let client = reqwest::Client::new();
let res = client
let res = HTTP_CLIENT
.get(url)
.header(header::CONTENT_TYPE, "application/json")
.header(header::AUTHORIZATION, ingestor.token)
Expand Down
4 changes: 2 additions & 2 deletions src/handlers/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use http::StatusCode;
use itertools::Itertools;
use serde_json::Value;

use crate::{option::CONFIG, storage::STREAM_ROOT_DIRECTORY};
use crate::{option::CONFIG, storage::STREAM_ROOT_DIRECTORY, HTTP_CLIENT};

use self::{cluster::get_ingestor_info, query::Query};

Expand Down Expand Up @@ -110,7 +110,7 @@ pub async fn send_query_request_to_ingestor(query: &Query) -> anyhow::Result<Vec
base_path_without_preceding_slash(),
"query"
);
let reqw = reqwest::Client::new()
let reqw = HTTP_CLIENT
.post(uri)
.json(query)
.header(http::header::AUTHORIZATION, im.token.clone())
Expand Down
Loading
Loading