diff --git a/changelog.d/23877_clickhouse_sink_dns_auto_resolution.feature.md b/changelog.d/23877_clickhouse_sink_dns_auto_resolution.feature.md new file mode 100644 index 0000000000000..b413a613bd0f0 --- /dev/null +++ b/changelog.d/23877_clickhouse_sink_dns_auto_resolution.feature.md @@ -0,0 +1,3 @@ +The ClickHouse sink now supports DNS auto-resolution for load balancing, allowing automatic discovery and rotation of ClickHouse cluster nodes through DNS lookups. This enables better high availability and load distribution when connecting to ClickHouse clusters with multiple endpoints. + +authors: sebinsunny diff --git a/src/sinks/clickhouse/config.rs b/src/sinks/clickhouse/config.rs index c33bfdbac5eda..8bca51e989528 100644 --- a/src/sinks/clickhouse/config.rs +++ b/src/sinks/clickhouse/config.rs @@ -1,17 +1,18 @@ //! Configuration for the `Clickhouse` sink. -use std::fmt; - +use futures_util::TryFutureExt; use http::{Request, StatusCode, Uri}; use hyper::Body; +use std::fmt; use vector_lib::codecs::{JsonSerializerConfig, NewlineDelimitedEncoderConfig, encoding::Framer}; use super::{ request_builder::ClickhouseRequestBuilder, - service::{ClickhouseRetryLogic, ClickhouseServiceRequestBuilder}, + service::{ClickhouseHealthLogic, ClickhouseRetryLogic, ClickhouseServiceRequestBuilder}, sink::{ClickhouseSink, PartitionKey}, }; use crate::{ + dns, http::{Auth, HttpClient, MaybeAuth}, sinks::{ prelude::*, @@ -61,6 +62,13 @@ pub struct ClickhouseConfig { #[configurable(metadata(docs::examples = "http://localhost:8123"))] pub endpoint: UriSerde, + /// Automatically resolve hostnames to all available IP addresses. + /// + /// When enabled, the hostname in the endpoint will be resolved to all its IP addresses, + /// and Vector will load balance across all resolved IPs. + #[serde(default)] + pub auto_resolve_dns: bool, + /// The table that data is inserted into. #[configurable(metadata(docs::examples = "mytable"))] pub table: Template, @@ -176,36 +184,133 @@ pub struct AsyncInsertSettingsConfig { impl_generate_config_from_default!(ClickhouseConfig); -#[async_trait::async_trait] -#[typetag::serde(name = "clickhouse")] -impl SinkConfig for ClickhouseConfig { - async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { - let endpoint = self.endpoint.with_default_parts().uri; - - let auth = self.auth.choose_one(&self.endpoint.auth)?; +#[derive(Debug, Clone)] +pub struct ClickhouseCommon { + pub endpoint: Uri, + pub auth: Option, + pub tls_settings: TlsSettings, + service_request_builder: ClickhouseServiceRequestBuilder, +} - let tls_settings = TlsSettings::from_options(self.tls.as_ref())?; +impl ClickhouseCommon { + pub async fn parse_config( + config: &ClickhouseConfig, + endpoint_str: &str, + ) -> crate::Result { + let endpoint = endpoint_str.parse::()?; + let endpoint_uri = endpoint.with_default_parts().uri; - let client = HttpClient::new(tls_settings, &cx.proxy)?; + let auth = config.auth.choose_one(&endpoint.auth)?; + let tls_settings = TlsSettings::from_options(config.tls.as_ref())?; - let clickhouse_service_request_builder = ClickhouseServiceRequestBuilder { + let service_request_builder = ClickhouseServiceRequestBuilder { auth: auth.clone(), - endpoint: endpoint.clone(), - skip_unknown_fields: self.skip_unknown_fields, - date_time_best_effort: self.date_time_best_effort, - insert_random_shard: self.insert_random_shard, - compression: self.compression, - query_settings: self.query_settings, + endpoint: endpoint_uri.clone(), + skip_unknown_fields: config.skip_unknown_fields, + date_time_best_effort: config.date_time_best_effort, + insert_random_shard: config.insert_random_shard, + compression: config.compression, + query_settings: config.query_settings, }; - let service: HttpService = - HttpService::new(client.clone(), clickhouse_service_request_builder); + Ok(Self { + endpoint: endpoint_uri, + auth, + tls_settings, + service_request_builder, + }) + } + + pub async fn parse_many(config: &ClickhouseConfig) -> crate::Result> { + let endpoint_str = config.endpoint.with_default_parts().uri.to_string(); + + let all_endpoints = if config.auto_resolve_dns { + Self::resolve_endpoint_to_ips(&endpoint_str).await? + } else { + vec![endpoint_str] + }; + + if all_endpoints.is_empty() { + return Err("No endpoints available after DNS resolution".into()); + } + + let mut commons = Vec::new(); + for endpoint_str in all_endpoints { + commons.push(Self::parse_config(config, &endpoint_str).await?); + } + Ok(commons) + } + + async fn resolve_endpoint_to_ips(endpoint_str: &str) -> crate::Result> { + let uri: Uri = endpoint_str.parse()?; + + let host = uri.host().ok_or("URI must contain a host")?; + + // Resolve hostname to all IP addresses + let ips: Vec<_> = dns::Resolver.lookup_ip(host.to_string()).await?.collect(); + + if ips.is_empty() { + return Err("No IP addresses found for hostname".into()); + } + + let mut resolved_endpoints = Vec::new(); + for ip in ips { + let new_endpoint = uri.to_string().replace(host, &ip.to_string()); + resolved_endpoints.push(new_endpoint); + } + + Ok(resolved_endpoints) + } + + pub(super) const fn get_service_request_builder(&self) -> &ClickhouseServiceRequestBuilder { + &self.service_request_builder + } + + pub async fn healthcheck(self, client: HttpClient) -> crate::Result<()> { + let uri = get_healthcheck_uri(&self.endpoint); + let mut request = Request::get(uri).body(Body::empty()).unwrap(); + + if let Some(auth) = self.auth { + auth.apply(&mut request); + } + + let response = client.send(request).await?; + + match response.status() { + StatusCode::OK => Ok(()), + status => Err(HealthcheckError::UnexpectedStatus { status }.into()), + } + } +} + +#[async_trait::async_trait] +#[typetag::serde(name = "clickhouse")] +impl SinkConfig for ClickhouseConfig { + async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { + let commons = ClickhouseCommon::parse_many(self).await?; + let common = commons[0].clone(); + + let client = HttpClient::new(common.tls_settings.clone(), &cx.proxy)?; let request_limits = self.request.into_settings(); - let service = ServiceBuilder::new() - .settings(request_limits, ClickhouseRetryLogic::default()) - .service(service); + let services = commons + .iter() + .map(|common| { + let endpoint = common.endpoint.to_string(); + let service: HttpService = + HttpService::new(client.clone(), common.get_service_request_builder().clone()); + (endpoint, service) + }) + .collect::>(); + + let service = request_limits.distributed_service( + ClickhouseRetryLogic::default(), + services, + Default::default(), + ClickhouseHealthLogic, + 1, + ); let batch_settings = self.batch.into_batcher_settings()?; @@ -235,7 +340,13 @@ impl SinkConfig for ClickhouseConfig { request_builder, ); - let healthcheck = Box::pin(healthcheck(client, endpoint, auth)); + let healthcheck = futures::future::select_ok( + commons + .into_iter() + .map(move |common| common.healthcheck(client.clone()).boxed()), + ) + .map_ok(|((), _)| ()) + .boxed(); Ok((VectorSink::from_event_streamsink(sink), healthcheck)) } @@ -258,22 +369,6 @@ fn get_healthcheck_uri(endpoint: &Uri) -> String { uri } -async fn healthcheck(client: HttpClient, endpoint: Uri, auth: Option) -> crate::Result<()> { - let uri = get_healthcheck_uri(&endpoint); - let mut request = Request::get(uri).body(Body::empty()).unwrap(); - - if let Some(auth) = auth { - auth.apply(&mut request); - } - - let response = client.send(request).await?; - - match response.status() { - StatusCode::OK => Ok(()), - status => Err(HealthcheckError::UnexpectedStatus { status }.into()), - } -} - #[cfg(test)] mod tests { use super::*; @@ -298,4 +393,25 @@ mod tests { "http://localhost:8123/path/?query=SELECT%201" ); } + + #[tokio::test] + async fn test_auto_resolve_dns_enabled() { + let config = ClickhouseConfig { + endpoint: "http://localhost:8123".parse().unwrap(), + auto_resolve_dns: true, // Enabled + table: "test_table".try_into().unwrap(), + ..Default::default() + }; + + let commons = ClickhouseCommon::parse_many(&config).await.unwrap(); + assert!(!commons.is_empty()); + + // All resolved endpoints should be IP addresses, not hostnames + for common in &commons { + let endpoint_str = common.endpoint.to_string(); + assert!(!endpoint_str.contains("localhost")); + // Should contain either IPv4 or IPv6 addresses + assert!(endpoint_str.contains("127.0.0.1") || endpoint_str.contains("::1")); + } + } } diff --git a/src/sinks/clickhouse/integration_tests.rs b/src/sinks/clickhouse/integration_tests.rs index fe7a10226ac60..4fff4498e9211 100644 --- a/src/sinks/clickhouse/integration_tests.rs +++ b/src/sinks/clickhouse/integration_tests.rs @@ -392,6 +392,65 @@ async fn templated_table() { } } +#[tokio::test] +async fn insert_events_unix_timestamps_using_dns_resolution() { + trace_init(); + + let table = random_table_name(); + let host = clickhouse_address(); + + let mut batch = BatchConfig::default(); + batch.max_events = Some(1); + + let config = ClickhouseConfig { + endpoint: host.parse().unwrap(), + auto_resolve_dns: true, + table: table.clone().try_into().unwrap(), + compression: Compression::None, + encoding: Transformer::new(None, None, Some(TimestampFormat::Unix)).unwrap(), + batch, + request: TowerRequestConfig { + retry_attempts: 1, + ..Default::default() + }, + ..Default::default() + }; + let client = ClickhouseClient::new(host); + client + .create_table( + &table, + "host String, timestamp DateTime('UTC'), message String", + ) + .await; + + let (sink, _hc) = config.build(SinkContext::default()).await.unwrap(); + + let (mut input_event, _receiver) = make_event(); + + run_and_assert_sink_compliance(sink, stream::once(ready(input_event.clone())), &SINK_TAGS) + .await; + + let output = client.select_all(&table).await; + assert_eq!(1, output.rows); + + let exp_event = input_event.as_mut_log(); + exp_event.insert( + (PathPrefix::Event, log_schema().timestamp_key().unwrap()), + format!( + "{}", + exp_event + .get_timestamp() + .unwrap() + .as_timestamp() + .unwrap() + .format("%Y-%m-%d %H:%M:%S") + ), + ); + + let expected = serde_json::to_value(exp_event).unwrap(); + assert_eq!(expected, output.data[0]); +} + fn make_event() -> (Event, BatchStatusReceiver) { let (batch, receiver) = BatchNotifier::new_with_receiver(); let mut event = LogEvent::from("raw log line").with_batch_notifier(&batch); diff --git a/src/sinks/clickhouse/service.rs b/src/sinks/clickhouse/service.rs index e9974b32a8dd9..aeaf4852b3772 100644 --- a/src/sinks/clickhouse/service.rs +++ b/src/sinks/clickhouse/service.rs @@ -17,6 +17,7 @@ use crate::{ util::{ http::{HttpRequest, HttpResponse, HttpRetryLogic, HttpServiceRequestBuilder}, retries::RetryAction, + service::HealthLogic, }, }, }; @@ -328,3 +329,30 @@ mod tests { .unwrap_err(); } } + +#[derive(Clone)] +pub struct ClickhouseHealthLogic; + +impl HealthLogic for ClickhouseHealthLogic { + type Error = crate::Error; + type Response = HttpResponse; + + fn is_healthy(&self, response: &Result) -> Option { + match response { + Ok(response) => { + let status = response.http_response.status(); + if status.is_success() { + Some(true) + } else if status.is_server_error() { + Some(false) + } else { + None + } + } + Err(error) => match error.downcast_ref::() { + Some(HttpError::CallRequest { .. }) => Some(false), + _ => None, + }, + } + } +} diff --git a/website/cue/reference/components/sinks/generated/clickhouse.cue b/website/cue/reference/components/sinks/generated/clickhouse.cue index ea6373bd3191f..f7ff9a5d29721 100644 --- a/website/cue/reference/components/sinks/generated/clickhouse.cue +++ b/website/cue/reference/components/sinks/generated/clickhouse.cue @@ -204,6 +204,16 @@ generated: components: sinks: clickhouse: configuration: { } } } + auto_resolve_dns: { + description: """ + Automatically resolve hostnames to all available IP addresses. + + When enabled, the hostname in the endpoint will be resolved to all its IP addresses, + and Vector will load balance across all resolved IPs. + """ + required: false + type: bool: default: false + } batch: { description: "Event batching behavior." required: false