Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/analytics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use crate::{
option::Mode,
parseable::PARSEABLE,
stats::{self, Stats},
storage, HTTP_CLIENT,
storage, HTTP_CLIENT, INTRA_CLUSTER_CLIENT,
};

const ANALYTICS_SERVER_URL: &str = "https://analytics.parseable.io:80";
Expand Down Expand Up @@ -280,7 +280,7 @@ async fn fetch_ingestors_metrics(
))
.expect("Should be a valid URL");

let resp = HTTP_CLIENT
let resp = INTRA_CLUSTER_CLIENT
.get(uri)
.header(header::AUTHORIZATION, im.token.clone())
.header(header::CONTENT_TYPE, "application/json")
Expand Down
12 changes: 12 additions & 0 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,18 @@ pub struct Options {
)]
pub trusted_ca_certs_path: Option<PathBuf>,

/// Allows invalid TLS certificates for intra-cluster communication.
/// This is needed when nodes connect to each other via IP addresses
/// which don't match the domain names in their certificates.
/// SECURITY NOTE: Only enable this for trusted internal networks.
#[arg(
long,
env = "P_TLS_SKIP_VERIFY",
value_name = "bool",
default_value = "false"
)]
pub skip_tls: bool,

// Storage configuration
#[arg(
long,
Expand Down
22 changes: 11 additions & 11 deletions src/handlers/http/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use crate::storage::{
ObjectStorage, ObjectStorageError, ObjectStoreFormat, PARSEABLE_ROOT_DIRECTORY,
STREAM_ROOT_DIRECTORY,
};
use crate::HTTP_CLIENT;
use crate::INTRA_CLUSTER_CLIENT;

use super::base_path_without_preceding_slash;
use super::ingest::PostError;
Expand Down Expand Up @@ -128,7 +128,7 @@ pub async fn sync_streams_with_ingestors(
let headers = reqwest_headers_clone.clone();
let body = body_clone.clone();
async move {
let res = HTTP_CLIENT
let res = INTRA_CLUSTER_CLIENT
.put(url)
.headers(headers)
.header(header::AUTHORIZATION, &ingestor.token)
Expand Down Expand Up @@ -179,7 +179,7 @@ pub async fn sync_users_with_roles_with_ingestors(
let role_data = role_data.clone();

async move {
let res = HTTP_CLIENT
let res = INTRA_CLUSTER_CLIENT
.put(url)
.header(header::AUTHORIZATION, &ingestor.token)
.header(header::CONTENT_TYPE, "application/json")
Expand Down Expand Up @@ -221,7 +221,7 @@ pub async fn sync_user_deletion_with_ingestors(username: &str) -> Result<(), RBA
);

async move {
let res = HTTP_CLIENT
let res = INTRA_CLUSTER_CLIENT
.delete(url)
.header(header::AUTHORIZATION, &ingestor.token)
.send()
Expand Down Expand Up @@ -278,7 +278,7 @@ pub async fn sync_user_creation_with_ingestors(
let user_data = user_data.clone();

async move {
let res = HTTP_CLIENT
let res = INTRA_CLUSTER_CLIENT
.post(url)
.header(header::AUTHORIZATION, &ingestor.token)
.header(header::CONTENT_TYPE, "application/json")
Expand Down Expand Up @@ -320,7 +320,7 @@ pub async fn sync_password_reset_with_ingestors(username: &str) -> Result<(), RB
);

async move {
let res = HTTP_CLIENT
let res = INTRA_CLUSTER_CLIENT
.post(url)
.header(header::AUTHORIZATION, &ingestor.token)
.header(header::CONTENT_TYPE, "application/json")
Expand Down Expand Up @@ -364,7 +364,7 @@ pub async fn sync_role_update_with_ingestors(
let privileges = privileges.clone();

async move {
let res = HTTP_CLIENT
let res = INTRA_CLUSTER_CLIENT
.put(url)
.header(header::AUTHORIZATION, &ingestor.token)
.header(header::CONTENT_TYPE, "application/json")
Expand Down Expand Up @@ -491,7 +491,7 @@ pub async fn send_stream_delete_request(
if !utils::check_liveness(&ingestor.domain_name).await {
return Ok(());
}
let resp = HTTP_CLIENT
let resp = INTRA_CLUSTER_CLIENT
.delete(url)
.header(header::CONTENT_TYPE, "application/json")
.header(header::AUTHORIZATION, ingestor.token)
Expand Down Expand Up @@ -529,7 +529,7 @@ pub async fn send_retention_cleanup_request(
if !utils::check_liveness(&ingestor.domain_name).await {
return Ok(first_event_at);
}
let resp = HTTP_CLIENT
let resp = INTRA_CLUSTER_CLIENT
.post(url)
.header(header::CONTENT_TYPE, "application/json")
.header(header::AUTHORIZATION, ingestor.token)
Expand Down Expand Up @@ -636,7 +636,7 @@ async fn fetch_node_info<T: Metadata>(node: &T) -> Result<utils::ClusterInfo, St
))
.expect("should always be a valid url");

let resp = HTTP_CLIENT
let resp = INTRA_CLUSTER_CLIENT
.get(uri)
.header(header::AUTHORIZATION, node.token().to_owned())
.header(header::CONTENT_TYPE, "application/json")
Expand Down Expand Up @@ -855,7 +855,7 @@ where
}

// Fetch metrics
let res = HTTP_CLIENT
let res = INTRA_CLUSTER_CLIENT
.get(uri)
.header(header::AUTHORIZATION, node.token())
.header(header::CONTENT_TYPE, "application/json")
Expand Down
4 changes: 2 additions & 2 deletions src/handlers/http/cluster/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

use crate::{
handlers::http::{base_path_without_preceding_slash, modal::NodeType},
HTTP_CLIENT,
INTRA_CLUSTER_CLIENT,
};
use actix_web::http::header;
use chrono::{DateTime, Utc};
Expand Down Expand Up @@ -188,7 +188,7 @@ pub async fn check_liveness(domain_name: &str) -> bool {
}
};

let req = HTTP_CLIENT
let req = INTRA_CLUSTER_CLIENT
.get(uri)
.header(header::CONTENT_TYPE, "application/json")
.send()
Expand Down
4 changes: 2 additions & 2 deletions src/handlers/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use itertools::Itertools;
use modal::{NodeMetadata, NodeType};
use serde_json::Value;

use crate::{parseable::PARSEABLE, storage::STREAM_ROOT_DIRECTORY, HTTP_CLIENT};
use crate::{parseable::PARSEABLE, storage::STREAM_ROOT_DIRECTORY, INTRA_CLUSTER_CLIENT};

use self::query::Query;

Expand Down Expand Up @@ -119,7 +119,7 @@ pub async fn send_query_request_to_ingestor(query: &Query) -> anyhow::Result<Vec
base_path_without_preceding_slash(),
"query"
);
let reqw = HTTP_CLIENT
let reqw = INTRA_CLUSTER_CLIENT
.post(uri)
.json(query)
.header(http::header::AUTHORIZATION, im.token.clone())
Expand Down
20 changes: 20 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ pub use handlers::http::modal::{
ingest_server::IngestServer, query_server::QueryServer, server::Server, ParseableServer,
};
use once_cell::sync::Lazy;
use parseable::PARSEABLE;
use reqwest::{Client, ClientBuilder};

// It is very unlikely that panic will occur when dealing with locks.
Expand Down Expand Up @@ -85,3 +86,22 @@ pub static HTTP_CLIENT: Lazy<Client> = Lazy::new(|| {
.build()
.expect("Construction of client shouldn't fail")
});

//separate client is created for intra cluster communication
//allow invalid certificates for connecting other nodes in the cluster
//required when querier/prism server tries to connect to other nodes via IP address directly
//but the certificate is valid for a specific domain name
pub static INTRA_CLUSTER_CLIENT: Lazy<Client> = Lazy::new(|| {
ClientBuilder::new()
.connect_timeout(Duration::from_secs(3)) // set a timeout of 3s for each connection setup
.timeout(Duration::from_secs(30)) // set a timeout of 30s for each request
.pool_idle_timeout(Duration::from_secs(90)) // set a timeout of 90s for each idle connection
.pool_max_idle_per_host(32) // max 32 idle connections per host
.gzip(true) // gzip compress for all requests
.brotli(true) // brotli compress for all requests
.use_rustls_tls() // use only the rustls backend
.http1_only() // use only http/1.1
.danger_accept_invalid_certs(PARSEABLE.options.skip_tls)
.build()
.expect("Construction of client shouldn't fail")
});
4 changes: 2 additions & 2 deletions src/metrics/prom_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::handlers::http::ingest::PostError;
use crate::handlers::http::modal::Metadata;
use crate::option::Mode;
use crate::parseable::PARSEABLE;
use crate::HTTP_CLIENT;
use crate::INTRA_CLUSTER_CLIENT;
use actix_web::http::header;
use chrono::NaiveDateTime;
use chrono::Utc;
Expand Down Expand Up @@ -237,7 +237,7 @@ impl Metrics {
.map_err(|err| {
PostError::Invalid(anyhow::anyhow!("Invalid URL in Ingestor Metadata: {}", err))
})?;
let res = HTTP_CLIENT
let res = INTRA_CLUSTER_CLIENT
.get(uri)
.header(header::CONTENT_TYPE, "application/json")
.header(header::AUTHORIZATION, metadata.token())
Expand Down
Loading