Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
The ClickHouse sink now supports DNS auto-resolution for load balancing, allowing automatic discovery and rotation of ClickHouse cluster nodes through DNS lookups. This enables better high availability and load distribution when connecting to ClickHouse clusters with multiple endpoints.

authors: sebinsunny
198 changes: 157 additions & 41 deletions src/sinks/clickhouse/config.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
//! Configuration for the `Clickhouse` sink.

use std::fmt;

use futures_util::TryFutureExt;
use http::{Request, StatusCode, Uri};
use hyper::Body;
use std::fmt;
use vector_lib::codecs::{JsonSerializerConfig, NewlineDelimitedEncoderConfig, encoding::Framer};

use super::{
request_builder::ClickhouseRequestBuilder,
service::{ClickhouseRetryLogic, ClickhouseServiceRequestBuilder},
service::{ClickhouseHealthLogic, ClickhouseRetryLogic, ClickhouseServiceRequestBuilder},
sink::{ClickhouseSink, PartitionKey},
};
use crate::{
dns,
http::{Auth, HttpClient, MaybeAuth},
sinks::{
prelude::*,
Expand Down Expand Up @@ -61,6 +62,13 @@ pub struct ClickhouseConfig {
#[configurable(metadata(docs::examples = "http://localhost:8123"))]
pub endpoint: UriSerde,

/// Automatically resolve hostnames to all available IP addresses.
///
/// When enabled, the hostname in the endpoint will be resolved to all its IP addresses,
/// and Vector will load balance across all resolved IPs.
#[serde(default)]
pub auto_resolve_dns: bool,

/// The table that data is inserted into.
#[configurable(metadata(docs::examples = "mytable"))]
pub table: Template,
Expand Down Expand Up @@ -176,36 +184,133 @@ pub struct AsyncInsertSettingsConfig {

impl_generate_config_from_default!(ClickhouseConfig);

#[async_trait::async_trait]
#[typetag::serde(name = "clickhouse")]
impl SinkConfig for ClickhouseConfig {
async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
let endpoint = self.endpoint.with_default_parts().uri;

let auth = self.auth.choose_one(&self.endpoint.auth)?;
#[derive(Debug, Clone)]
pub struct ClickhouseCommon {
pub endpoint: Uri,
pub auth: Option<Auth>,
pub tls_settings: TlsSettings,
service_request_builder: ClickhouseServiceRequestBuilder,
}

let tls_settings = TlsSettings::from_options(self.tls.as_ref())?;
impl ClickhouseCommon {
pub async fn parse_config(
config: &ClickhouseConfig,
endpoint_str: &str,
) -> crate::Result<Self> {
let endpoint = endpoint_str.parse::<UriSerde>()?;
let endpoint_uri = endpoint.with_default_parts().uri;

let client = HttpClient::new(tls_settings, &cx.proxy)?;
let auth = config.auth.choose_one(&endpoint.auth)?;
let tls_settings = TlsSettings::from_options(config.tls.as_ref())?;

let clickhouse_service_request_builder = ClickhouseServiceRequestBuilder {
let service_request_builder = ClickhouseServiceRequestBuilder {
auth: auth.clone(),
endpoint: endpoint.clone(),
skip_unknown_fields: self.skip_unknown_fields,
date_time_best_effort: self.date_time_best_effort,
insert_random_shard: self.insert_random_shard,
compression: self.compression,
query_settings: self.query_settings,
endpoint: endpoint_uri.clone(),
skip_unknown_fields: config.skip_unknown_fields,
date_time_best_effort: config.date_time_best_effort,
insert_random_shard: config.insert_random_shard,
compression: config.compression,
query_settings: config.query_settings,
};

let service: HttpService<ClickhouseServiceRequestBuilder, PartitionKey> =
HttpService::new(client.clone(), clickhouse_service_request_builder);
Ok(Self {
endpoint: endpoint_uri,
auth,
tls_settings,
service_request_builder,
})
}

pub async fn parse_many(config: &ClickhouseConfig) -> crate::Result<Vec<Self>> {
let endpoint_str = config.endpoint.with_default_parts().uri.to_string();

let all_endpoints = if config.auto_resolve_dns {
Self::resolve_endpoint_to_ips(&endpoint_str).await?
} else {
vec![endpoint_str]
};

if all_endpoints.is_empty() {
return Err("No endpoints available after DNS resolution".into());
}

let mut commons = Vec::new();
for endpoint_str in all_endpoints {
commons.push(Self::parse_config(config, &endpoint_str).await?);
}
Ok(commons)
}

async fn resolve_endpoint_to_ips(endpoint_str: &str) -> crate::Result<Vec<String>> {
let uri: Uri = endpoint_str.parse()?;

let host = uri.host().ok_or("URI must contain a host")?;

// Resolve hostname to all IP addresses
let ips: Vec<_> = dns::Resolver.lookup_ip(host.to_string()).await?.collect();

if ips.is_empty() {
return Err("No IP addresses found for hostname".into());
}

let mut resolved_endpoints = Vec::new();
for ip in ips {
let new_endpoint = uri.to_string().replace(host, &ip.to_string());
resolved_endpoints.push(new_endpoint);
}

Ok(resolved_endpoints)
}

pub(super) const fn get_service_request_builder(&self) -> &ClickhouseServiceRequestBuilder {
&self.service_request_builder
}

pub async fn healthcheck(self, client: HttpClient) -> crate::Result<()> {
let uri = get_healthcheck_uri(&self.endpoint);
let mut request = Request::get(uri).body(Body::empty()).unwrap();

if let Some(auth) = self.auth {
auth.apply(&mut request);
}

let response = client.send(request).await?;

match response.status() {
StatusCode::OK => Ok(()),
status => Err(HealthcheckError::UnexpectedStatus { status }.into()),
}
}
}

#[async_trait::async_trait]
#[typetag::serde(name = "clickhouse")]
impl SinkConfig for ClickhouseConfig {
async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
let commons = ClickhouseCommon::parse_many(self).await?;
let common = commons[0].clone();

let client = HttpClient::new(common.tls_settings.clone(), &cx.proxy)?;

let request_limits = self.request.into_settings();

let service = ServiceBuilder::new()
.settings(request_limits, ClickhouseRetryLogic::default())
.service(service);
let services = commons
.iter()
.map(|common| {
let endpoint = common.endpoint.to_string();
let service: HttpService<ClickhouseServiceRequestBuilder, PartitionKey> =
HttpService::new(client.clone(), common.get_service_request_builder().clone());
(endpoint, service)
})
.collect::<Vec<_>>();

let service = request_limits.distributed_service(
ClickhouseRetryLogic::default(),
services,
Default::default(),
ClickhouseHealthLogic,
1,
);

let batch_settings = self.batch.into_batcher_settings()?;

Expand Down Expand Up @@ -235,7 +340,13 @@ impl SinkConfig for ClickhouseConfig {
request_builder,
);

let healthcheck = Box::pin(healthcheck(client, endpoint, auth));
let healthcheck = futures::future::select_ok(
commons
.into_iter()
.map(move |common| common.healthcheck(client.clone()).boxed()),
)
.map_ok(|((), _)| ())
.boxed();

Ok((VectorSink::from_event_streamsink(sink), healthcheck))
}
Expand All @@ -258,22 +369,6 @@ fn get_healthcheck_uri(endpoint: &Uri) -> String {
uri
}

async fn healthcheck(client: HttpClient, endpoint: Uri, auth: Option<Auth>) -> crate::Result<()> {
let uri = get_healthcheck_uri(&endpoint);
let mut request = Request::get(uri).body(Body::empty()).unwrap();

if let Some(auth) = auth {
auth.apply(&mut request);
}

let response = client.send(request).await?;

match response.status() {
StatusCode::OK => Ok(()),
status => Err(HealthcheckError::UnexpectedStatus { status }.into()),
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand All @@ -298,4 +393,25 @@ mod tests {
"http://localhost:8123/path/?query=SELECT%201"
);
}

#[tokio::test]
async fn test_auto_resolve_dns_enabled() {
let config = ClickhouseConfig {
endpoint: "http://localhost:8123".parse().unwrap(),
auto_resolve_dns: true, // Enabled
table: "test_table".try_into().unwrap(),
..Default::default()
};

let commons = ClickhouseCommon::parse_many(&config).await.unwrap();
assert!(!commons.is_empty());

// All resolved endpoints should be IP addresses, not hostnames
for common in &commons {
let endpoint_str = common.endpoint.to_string();
assert!(!endpoint_str.contains("localhost"));
// Should contain either IPv4 or IPv6 addresses
assert!(endpoint_str.contains("127.0.0.1") || endpoint_str.contains("::1"));
}
}
}
59 changes: 59 additions & 0 deletions src/sinks/clickhouse/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,65 @@ async fn templated_table() {
}
}

#[tokio::test]
async fn insert_events_unix_timestamps_using_dns_resolution() {
trace_init();

let table = random_table_name();
let host = clickhouse_address();

let mut batch = BatchConfig::default();
batch.max_events = Some(1);

let config = ClickhouseConfig {
endpoint: host.parse().unwrap(),
auto_resolve_dns: true,
table: table.clone().try_into().unwrap(),
compression: Compression::None,
encoding: Transformer::new(None, None, Some(TimestampFormat::Unix)).unwrap(),
batch,
request: TowerRequestConfig {
retry_attempts: 1,
..Default::default()
},
..Default::default()
};
let client = ClickhouseClient::new(host);
client
.create_table(
&table,
"host String, timestamp DateTime('UTC'), message String",
)
.await;

let (sink, _hc) = config.build(SinkContext::default()).await.unwrap();

let (mut input_event, _receiver) = make_event();

run_and_assert_sink_compliance(sink, stream::once(ready(input_event.clone())), &SINK_TAGS)
.await;

let output = client.select_all(&table).await;
assert_eq!(1, output.rows);

let exp_event = input_event.as_mut_log();
exp_event.insert(
(PathPrefix::Event, log_schema().timestamp_key().unwrap()),
format!(
"{}",
exp_event
.get_timestamp()
.unwrap()
.as_timestamp()
.unwrap()
.format("%Y-%m-%d %H:%M:%S")
),
);

let expected = serde_json::to_value(exp_event).unwrap();
assert_eq!(expected, output.data[0]);
}

fn make_event() -> (Event, BatchStatusReceiver) {
let (batch, receiver) = BatchNotifier::new_with_receiver();
let mut event = LogEvent::from("raw log line").with_batch_notifier(&batch);
Expand Down
28 changes: 28 additions & 0 deletions src/sinks/clickhouse/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::{
util::{
http::{HttpRequest, HttpResponse, HttpRetryLogic, HttpServiceRequestBuilder},
retries::RetryAction,
service::HealthLogic,
},
},
};
Expand Down Expand Up @@ -328,3 +329,30 @@ mod tests {
.unwrap_err();
}
}

#[derive(Clone)]
pub struct ClickhouseHealthLogic;

impl HealthLogic for ClickhouseHealthLogic {
type Error = crate::Error;
type Response = HttpResponse;

fn is_healthy(&self, response: &Result<Self::Response, Self::Error>) -> Option<bool> {
match response {
Ok(response) => {
let status = response.http_response.status();
if status.is_success() {
Some(true)
} else if status.is_server_error() {
Some(false)
} else {
None
}
}
Err(error) => match error.downcast_ref::<HttpError>() {
Some(HttpError::CallRequest { .. }) => Some(false),
_ => None,
},
}
}
}
10 changes: 10 additions & 0 deletions website/cue/reference/components/sinks/generated/clickhouse.cue
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,16 @@ generated: components: sinks: clickhouse: configuration: {
}
}
}
auto_resolve_dns: {
description: """
Automatically resolve hostnames to all available IP addresses.

When enabled, the hostname in the endpoint will be resolved to all its IP addresses,
and Vector will load balance across all resolved IPs.
Comment on lines +209 to +212
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Automatically resolve hostnames to all available IP addresses.
When enabled, the hostname in the endpoint will be resolved to all its IP addresses,
and Vector will load balance across all resolved IPs.
Automatically resolve hostnames to all available IP addresses.
When enabled, the hostname in the endpoint is resolved to all its IP addresses,
and Vector load balances across all resolved IPs.

Non-blocking verb tense nit.

"""
required: false
type: bool: default: false
}
batch: {
description: "Event batching behavior."
required: false
Expand Down
Loading