diff --git a/Cargo.lock b/Cargo.lock index 012c49206b7bc..931bd9941a958 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2595,6 +2595,7 @@ version = "0.1.0" dependencies = [ "apache-avro 0.20.0", "arrow", + "async-trait", "bytes 1.10.1", "chrono", "csv-core", @@ -12502,6 +12503,7 @@ dependencies = [ "arc-swap", "arr_macro", "arrow", + "arrow-schema", "assert_cmd", "async-compression", "async-graphql", diff --git a/Cargo.toml b/Cargo.toml index fa99780782eb1..d5460c64aa162 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -179,7 +179,7 @@ rand = { version = "0.9.2", default-features = false, features = ["small_rng", " rand_distr = { version = "0.5.1", default-features = false } regex = { version = "1.11.2", default-features = false, features = ["std", "perf"] } reqwest = { version = "0.11.26", features = ["json"] } -rust_decimal = { version = "1.33", default-features = false, features = ["std"] } +rust_decimal = { version = "1.37.0", default-features = false, features = ["std"] } semver = { version = "1.0.26", default-features = false, features = ["serde", "std"] } serde = { version = "1.0.219", default-features = false, features = ["alloc", "derive", "rc"] } serde_json = { version = "1.0.143", default-features = false, features = ["raw_value", "std"] } @@ -341,6 +341,7 @@ arc-swap = { version = "1.7", default-features = false, optional = true } async-compression = { version = "0.4.27", default-features = false, features = ["tokio", "gzip", "zstd"], optional = true } apache-avro = { version = "0.16.0", default-features = false, optional = true } arrow = { version = "56.2.0", default-features = false, features = ["ipc"], optional = true } +arrow-schema = { version = "56.2.0", default-features = false, optional = true } axum = { version = "0.6.20", default-features = false } base64 = { workspace = true, optional = true } bloomy = { version = "1.2.0", default-features = false, optional = true } @@ -583,7 +584,7 @@ enrichment-tables-mmdb = ["dep:maxminddb"] enrichment-tables-memory = ["dep:evmap", "dep:evmap-derive", "dep:thread_local"] # Codecs -codecs-arrow = ["vector-lib/arrow"] +codecs-arrow = ["dep:arrow", "dep:arrow-schema", "vector-lib/arrow"] codecs-opentelemetry = ["vector-lib/opentelemetry"] codecs-syslog = ["vector-lib/syslog"] @@ -851,7 +852,7 @@ sinks-azure_blob = ["dep:azure_core", "dep:azure_identity", "dep:azure_storage", sinks-azure_monitor_logs = [] sinks-blackhole = [] sinks-chronicle = [] -sinks-clickhouse = [] +sinks-clickhouse = ["dep:rust_decimal", "codecs-arrow"] sinks-console = [] sinks-databend = ["dep:databend-client"] sinks-datadog_events = [] diff --git a/changelog.d/24074_clickhouse_arrow_format.enhancement.md b/changelog.d/24074_clickhouse_arrow_format.enhancement.md new file mode 100644 index 0000000000000..5612cbfe21e76 --- /dev/null +++ b/changelog.d/24074_clickhouse_arrow_format.enhancement.md @@ -0,0 +1,3 @@ +The `clickhouse` sink now supports the `arrow_stream` format option, enabling high-performance binary data transfer using Apache Arrow IPC. This provides significantly better performance and smaller payload sizes compared to JSON-based formats. + +authors: benjamin-awd diff --git a/lib/codecs/Cargo.toml b/lib/codecs/Cargo.toml index 2cb4ae3bbdb35..7a622d52edc2b 100644 --- a/lib/codecs/Cargo.toml +++ b/lib/codecs/Cargo.toml @@ -15,9 +15,10 @@ path = "tests/bin/generate-avro-fixtures.rs" [dependencies] apache-avro = { version = "0.20.0", default-features = false } arrow = { version = "56.2.0", default-features = false, features = ["ipc"] } +async-trait.workspace = true bytes.workspace = true chrono.workspace = true -rust_decimal = { version = "1.37", default-features = false, features = ["std"] } +rust_decimal.workspace = true csv-core = { version = "0.1.12", default-features = false } derivative.workspace = true dyn-clone = { version = "1", default-features = false } diff --git a/lib/codecs/src/encoding/format/arrow.rs b/lib/codecs/src/encoding/format/arrow.rs index db4dc491f4cc3..3c2d3863f1fb2 100644 --- a/lib/codecs/src/encoding/format/arrow.rs +++ b/lib/codecs/src/encoding/format/arrow.rs @@ -16,6 +16,7 @@ use arrow::{ ipc::writer::StreamWriter, record_batch::RecordBatch, }; +use async_trait::async_trait; use bytes::{BufMut, Bytes, BytesMut}; use chrono::{DateTime, Utc}; use rust_decimal::Decimal; @@ -25,6 +26,18 @@ use vector_config::configurable_component; use vector_core::event::{Event, Value}; +/// Provides Arrow schema for encoding. +/// +/// Sinks can implement this trait to provide custom schema fetching logic. +#[async_trait] +pub trait SchemaProvider: Send + Sync + std::fmt::Debug { + /// Fetch the Arrow schema from the data store. + /// + /// This is called during sink configuration build phase to fetch + /// the schema once at startup, rather than at runtime. + async fn get_schema(&self) -> Result; +} + /// Configuration for Arrow IPC stream serialization #[configurable_component] #[derive(Clone, Default)] @@ -32,7 +45,7 @@ pub struct ArrowStreamSerializerConfig { /// The Arrow schema to use for encoding #[serde(skip)] #[configurable(derived)] - pub schema: Option>, + pub schema: Option, /// Allow null values for non-nullable fields in the schema. /// @@ -43,7 +56,7 @@ pub struct ArrowStreamSerializerConfig { /// When disabled (default), missing values for non-nullable fields will cause encoding errors, /// ensuring all required data is present before sending to the sink. #[serde(default)] - #[configurable(metadata(docs::examples = true))] + #[configurable(derived)] pub allow_nullable_fields: bool, } @@ -64,7 +77,7 @@ impl std::fmt::Debug for ArrowStreamSerializerConfig { impl ArrowStreamSerializerConfig { /// Create a new ArrowStreamSerializerConfig with a schema - pub fn new(schema: Arc) -> Self { + pub fn new(schema: arrow::datatypes::Schema) -> Self { Self { schema: Some(schema), allow_nullable_fields: false, @@ -91,26 +104,28 @@ pub struct ArrowStreamSerializer { impl ArrowStreamSerializer { /// Create a new ArrowStreamSerializer with the given configuration pub fn new(config: ArrowStreamSerializerConfig) -> Result { - let mut schema = config.schema.ok_or_else(|| { - vector_common::Error::from( - "Arrow serializer requires a schema. Pass a schema or fetch from provider before creating serializer." - ) - })?; + let schema = config + .schema + .ok_or_else(|| vector_common::Error::from("Arrow serializer requires a schema."))?; // If allow_nullable_fields is enabled, transform the schema once here // instead of on every batch encoding - if config.allow_nullable_fields { - schema = Arc::new(Schema::new_with_metadata( + let schema = if config.allow_nullable_fields { + Schema::new_with_metadata( schema .fields() .iter() .map(|f| Arc::new(make_field_nullable(f))) .collect::>(), schema.metadata().clone(), - )); - } + ) + } else { + schema + }; - Ok(Self { schema }) + Ok(Self { + schema: Arc::new(schema), + }) } } @@ -154,6 +169,13 @@ pub enum ArrowEncodingError { #[snafu(display("Schema must be provided before encoding"))] NoSchemaProvided, + /// Failed to fetch schema from provider + #[snafu(display("Failed to fetch schema from provider: {}", message))] + SchemaFetchError { + /// Error message from the provider + message: String, + }, + /// Unsupported Arrow data type for field #[snafu(display( "Unsupported Arrow data type for field '{}': {:?}", @@ -1500,13 +1522,9 @@ mod tests { let log2 = LogEvent::default(); let events = vec![Event::Log(log1), Event::Log(log2)]; - let schema = Arc::new(Schema::new(vec![Field::new( - "strict_field", - DataType::Int64, - false, - )])); + let schema = Schema::new(vec![Field::new("strict_field", DataType::Int64, false)]); - let mut config = ArrowStreamSerializerConfig::new(Arc::clone(&schema)); + let mut config = ArrowStreamSerializerConfig::new(schema); config.allow_nullable_fields = true; let mut serializer = diff --git a/lib/codecs/src/encoding/format/mod.rs b/lib/codecs/src/encoding/format/mod.rs index 0d21e8b94e25c..ccafb2b969cd7 100644 --- a/lib/codecs/src/encoding/format/mod.rs +++ b/lib/codecs/src/encoding/format/mod.rs @@ -23,7 +23,9 @@ mod text; use std::fmt::Debug; #[cfg(feature = "arrow")] -pub use arrow::{ArrowEncodingError, ArrowStreamSerializer, ArrowStreamSerializerConfig}; +pub use arrow::{ + ArrowEncodingError, ArrowStreamSerializer, ArrowStreamSerializerConfig, SchemaProvider, +}; pub use avro::{AvroSerializer, AvroSerializerConfig, AvroSerializerOptions}; pub use cef::{CefSerializer, CefSerializerConfig}; use dyn_clone::DynClone; diff --git a/lib/codecs/src/encoding/mod.rs b/lib/codecs/src/encoding/mod.rs index 3fe0baafa8b91..c365bc45da4fc 100644 --- a/lib/codecs/src/encoding/mod.rs +++ b/lib/codecs/src/encoding/mod.rs @@ -7,7 +7,9 @@ pub mod framing; pub mod serializer; pub use chunking::{Chunker, Chunking, GelfChunker}; #[cfg(feature = "arrow")] -pub use format::{ArrowEncodingError, ArrowStreamSerializer, ArrowStreamSerializerConfig}; +pub use format::{ + ArrowEncodingError, ArrowStreamSerializer, ArrowStreamSerializerConfig, SchemaProvider, +}; pub use format::{ AvroSerializer, AvroSerializerConfig, AvroSerializerOptions, CefSerializer, CefSerializerConfig, CsvSerializer, CsvSerializerConfig, GelfSerializer, GelfSerializerConfig, diff --git a/src/sinks/clickhouse/arrow/mod.rs b/src/sinks/clickhouse/arrow/mod.rs new file mode 100644 index 0000000000000..9ca393833c0d0 --- /dev/null +++ b/src/sinks/clickhouse/arrow/mod.rs @@ -0,0 +1,6 @@ +//! Schema fetching and Arrow type mapping for ClickHouse tables. + +pub mod parser; +pub mod schema; + +pub use schema::ClickHouseSchemaProvider; diff --git a/src/sinks/clickhouse/arrow/parser.rs b/src/sinks/clickhouse/arrow/parser.rs new file mode 100644 index 0000000000000..a13bd823487b5 --- /dev/null +++ b/src/sinks/clickhouse/arrow/parser.rs @@ -0,0 +1,647 @@ +//! ClickHouse type parsing and conversion to Arrow types. + +use arrow::datatypes::{DataType, TimeUnit}; + +const DECIMAL32_PRECISION: u8 = 9; +const DECIMAL64_PRECISION: u8 = 18; +const DECIMAL128_PRECISION: u8 = 38; +const DECIMAL256_PRECISION: u8 = 76; + +/// Represents a ClickHouse type with its modifiers and nested structure. +#[derive(Debug, PartialEq, Clone)] +pub enum ClickHouseType<'a> { + /// A primitive type like String, Int64, DateTime, etc. + Primitive(&'a str), + /// Nullable(T) + Nullable(Box>), + /// LowCardinality(T) + LowCardinality(Box>), +} + +impl<'a> ClickHouseType<'a> { + /// Returns true if this type or any of its nested types is Nullable. + pub fn is_nullable(&self) -> bool { + match self { + ClickHouseType::Nullable(_) => true, + ClickHouseType::LowCardinality(inner) => inner.is_nullable(), + _ => false, + } + } + + /// Returns the innermost base type, unwrapping all modifiers. + /// For example: LowCardinality(Nullable(String)) -> Primitive("String") + pub fn base_type(&self) -> &ClickHouseType<'a> { + match self { + ClickHouseType::Nullable(inner) | ClickHouseType::LowCardinality(inner) => { + inner.base_type() + } + _ => self, + } + } +} + +/// Parses a ClickHouse type string into a structured representation. +pub fn parse_ch_type(ty: &str) -> ClickHouseType<'_> { + let ty = ty.trim(); + + // Recursively strip and parse type modifiers + if let Some(inner) = strip_wrapper(ty, "Nullable") { + return ClickHouseType::Nullable(Box::new(parse_ch_type(inner))); + } + if let Some(inner) = strip_wrapper(ty, "LowCardinality") { + return ClickHouseType::LowCardinality(Box::new(parse_ch_type(inner))); + } + + // Base case: return primitive type for anything without modifiers + ClickHouseType::Primitive(ty) +} + +/// Helper function to strip a wrapper from a type string. +/// Returns the inner content if the type matches the wrapper pattern. +fn strip_wrapper<'a>(ty: &'a str, wrapper_name: &str) -> Option<&'a str> { + ty.strip_prefix(wrapper_name)? + .trim_start() + .strip_prefix('(')? + .strip_suffix(')') +} + +/// Unwraps ClickHouse type modifiers like Nullable() and LowCardinality(). +/// Returns a tuple of (base_type, is_nullable). +/// For example: "LowCardinality(Nullable(String))" -> ("String", true) +pub fn unwrap_type_modifiers(ch_type: &str) -> (&str, bool) { + let parsed = parse_ch_type(ch_type); + let is_nullable = parsed.is_nullable(); + + match parsed.base_type() { + ClickHouseType::Primitive(base) => (base, is_nullable), + _ => (ch_type, is_nullable), + } +} + +fn unsupported(ch_type: &str, kind: &str) -> String { + format!( + "{kind} type '{ch_type}' is not supported. \ + ClickHouse {kind} types cannot be automatically converted to Arrow format." + ) +} + +/// Converts a ClickHouse type string to an Arrow DataType. +/// Returns a tuple of (DataType, is_nullable). +pub fn clickhouse_type_to_arrow(ch_type: &str) -> Result<(DataType, bool), String> { + let (base_type, is_nullable) = unwrap_type_modifiers(ch_type); + let (type_name, _) = extract_identifier(base_type); + + let data_type = match type_name { + // Numeric + "Int8" => DataType::Int8, + "Int16" => DataType::Int16, + "Int32" => DataType::Int32, + "Int64" => DataType::Int64, + "UInt8" => DataType::UInt8, + "UInt16" => DataType::UInt16, + "UInt32" => DataType::UInt32, + "UInt64" => DataType::UInt64, + "Float32" => DataType::Float32, + "Float64" => DataType::Float64, + "Bool" => DataType::Boolean, + "Decimal" | "Decimal32" | "Decimal64" | "Decimal128" | "Decimal256" => { + parse_decimal_type(base_type)? + } + + // Strings + "String" | "FixedString" => DataType::Utf8, + + // Date and time types (timezones not currently handled, defaults to UTC) + "Date" | "Date32" => DataType::Date32, + "DateTime" => DataType::Timestamp(TimeUnit::Second, None), + "DateTime64" => parse_datetime64_precision(base_type)?, + + // Unsupported + "Array" => return Err(unsupported(ch_type, "Array")), + "Tuple" => return Err(unsupported(ch_type, "Tuple")), + "Map" => return Err(unsupported(ch_type, "Map")), + + // Unknown + _ => { + return Err(format!( + "Unknown ClickHouse type '{}'. This type cannot be automatically converted.", + type_name + )); + } + }; + + Ok((data_type, is_nullable)) +} + +/// Extracts an identifier from the start of a string. +/// Returns (identifier, remaining_string). +fn extract_identifier(input: &str) -> (&str, &str) { + for (i, c) in input.char_indices() { + if c.is_alphabetic() || c == '_' || (i > 0 && c.is_numeric()) { + continue; + } + return (&input[..i], &input[i..]); + } + (input, "") +} + +/// Parses comma-separated arguments from a parenthesized string. +/// Input: "(arg1, arg2, arg3)" -> Output: Ok(vec!["arg1".to_string(), "arg2".to_string(), "arg3".to_string()]) +/// Returns an error if parentheses are malformed. +fn parse_args(input: &str) -> Result, String> { + let trimmed = input.trim(); + if !trimmed.starts_with('(') || !trimmed.ends_with(')') { + return Err(format!( + "Expected parentheses around arguments in '{}'", + input + )); + } + + let inner = trimmed[1..trimmed.len() - 1].trim(); + if inner.is_empty() { + return Ok(vec![]); + } + + // Split by comma, handling nested parentheses and quotes + let mut args = Vec::new(); + let mut current_arg = String::new(); + let mut depth = 0; + let mut in_quotes = false; + + for c in inner.chars() { + match c { + '\'' if !in_quotes => in_quotes = true, + '\'' if in_quotes => in_quotes = false, + '(' if !in_quotes => depth += 1, + ')' if !in_quotes => depth -= 1, + ',' if depth == 0 && !in_quotes => { + args.push(current_arg.trim().to_string()); + current_arg = String::new(); + continue; + } + _ => {} + } + current_arg.push(c); + } + + if !current_arg.trim().is_empty() { + args.push(current_arg.trim().to_string()); + } + + Ok(args) +} + +/// Parses ClickHouse Decimal types and returns the appropriate Arrow decimal type. +/// ClickHouse formats: +/// - Decimal(P, S) -> generic decimal with precision P and scale S +/// - Decimal32(S) -> precision up to 9, scale S +/// - Decimal64(S) -> precision up to 18, scale S +/// - Decimal128(S) -> precision up to 38, scale S +/// - Decimal256(S) -> precision up to 76, scale S +/// +/// Uses metadata from ClickHouse's system.columns when available, otherwise falls back to parsing the type string. +fn parse_decimal_type(ch_type: &str) -> Result { + // Parse from type string + let (type_name, args_str) = extract_identifier(ch_type); + + let result = parse_args(args_str).ok().and_then(|args| match type_name { + "Decimal" if args.len() == 2 => args[0].parse::().ok().zip(args[1].parse::().ok()), + "Decimal32" | "Decimal64" | "Decimal128" | "Decimal256" if args.len() == 1 => { + args[0].parse::().ok().map(|scale| { + let precision = match type_name { + "Decimal32" => DECIMAL32_PRECISION, + "Decimal64" => DECIMAL64_PRECISION, + "Decimal128" => DECIMAL128_PRECISION, + "Decimal256" => DECIMAL256_PRECISION, + _ => unreachable!(), + }; + (precision, scale) + }) + } + _ => None, + }); + + result + .map(|(precision, scale)| { + if precision <= DECIMAL128_PRECISION { + DataType::Decimal128(precision, scale) + } else { + DataType::Decimal256(precision, scale) + } + }) + .ok_or_else(|| format!("Could not parse Decimal type '{}'.", ch_type)) +} + +/// Parses DateTime64 precision and returns the appropriate Arrow timestamp type. +/// DateTime64(0) -> Second +/// DateTime64(3) -> Millisecond +/// DateTime64(6) -> Microsecond +/// DateTime64(9) -> Nanosecond +/// +fn parse_datetime64_precision(ch_type: &str) -> Result { + // Parse from type string + let (_type_name, args_str) = extract_identifier(ch_type); + + let args = parse_args(args_str).map_err(|e| { + format!( + "Could not parse DateTime64 arguments from '{}': {}. Expected format: DateTime64(0-9) or DateTime64(0-9, 'timezone')", + ch_type, e + ) + })?; + + // DateTime64(precision) or DateTime64(precision, 'timezone') + if args.is_empty() { + return Err(format!( + "DateTime64 type '{}' has no precision argument. Expected format: DateTime64(0-9) or DateTime64(0-9, 'timezone')", + ch_type + )); + } + + // Parse the precision (first argument) + match args[0].parse::() { + Ok(0) => Ok(DataType::Timestamp(TimeUnit::Second, None)), + Ok(1..=3) => Ok(DataType::Timestamp(TimeUnit::Millisecond, None)), + Ok(4..=6) => Ok(DataType::Timestamp(TimeUnit::Microsecond, None)), + Ok(7..=9) => Ok(DataType::Timestamp(TimeUnit::Nanosecond, None)), + _ => Err(format!( + "Unsupported DateTime64 precision in '{}'. Precision must be 0-9", + ch_type + )), + } +} + +#[cfg(test)] +mod tests { + use super::*; + + // Helper function for tests that don't need metadata + fn convert_type_no_metadata(ch_type: &str) -> Result<(DataType, bool), String> { + clickhouse_type_to_arrow(ch_type) + } + + #[test] + fn test_clickhouse_type_mapping() { + assert_eq!( + convert_type_no_metadata("String").expect("Failed to convert ClickHouse type to Arrow"), + (DataType::Utf8, false) + ); + assert_eq!( + convert_type_no_metadata("Int64").expect("Failed to convert ClickHouse type to Arrow"), + (DataType::Int64, false) + ); + assert_eq!( + convert_type_no_metadata("Float64") + .expect("Failed to convert ClickHouse type to Arrow"), + (DataType::Float64, false) + ); + assert_eq!( + convert_type_no_metadata("Bool").expect("Failed to convert ClickHouse type to Arrow"), + (DataType::Boolean, false) + ); + assert_eq!( + convert_type_no_metadata("DateTime") + .expect("Failed to convert ClickHouse type to Arrow"), + (DataType::Timestamp(TimeUnit::Second, None), false) + ); + } + + #[test] + fn test_datetime64_precision_mapping() { + assert_eq!( + convert_type_no_metadata("DateTime64(0)") + .expect("Failed to convert ClickHouse type to Arrow"), + (DataType::Timestamp(TimeUnit::Second, None), false) + ); + assert_eq!( + convert_type_no_metadata("DateTime64(3)") + .expect("Failed to convert ClickHouse type to Arrow"), + (DataType::Timestamp(TimeUnit::Millisecond, None), false) + ); + assert_eq!( + convert_type_no_metadata("DateTime64(6)") + .expect("Failed to convert ClickHouse type to Arrow"), + (DataType::Timestamp(TimeUnit::Microsecond, None), false) + ); + assert_eq!( + convert_type_no_metadata("DateTime64(9)") + .expect("Failed to convert ClickHouse type to Arrow"), + (DataType::Timestamp(TimeUnit::Nanosecond, None), false) + ); + // Test with timezones + assert_eq!( + convert_type_no_metadata("DateTime64(9, 'UTC')") + .expect("Failed to convert ClickHouse type to Arrow"), + (DataType::Timestamp(TimeUnit::Nanosecond, None), false) + ); + assert_eq!( + convert_type_no_metadata("DateTime64(6, 'UTC')") + .expect("Failed to convert ClickHouse type to Arrow"), + (DataType::Timestamp(TimeUnit::Microsecond, None), false) + ); + assert_eq!( + convert_type_no_metadata("DateTime64(9, 'America/New_York')") + .expect("Failed to convert ClickHouse type to Arrow"), + (DataType::Timestamp(TimeUnit::Nanosecond, None), false) + ); + // Test edge cases for precision ranges + assert_eq!( + convert_type_no_metadata("DateTime64(1)") + .expect("Failed to convert ClickHouse type to Arrow"), + (DataType::Timestamp(TimeUnit::Millisecond, None), false) + ); + assert_eq!( + convert_type_no_metadata("DateTime64(4)") + .expect("Failed to convert ClickHouse type to Arrow"), + (DataType::Timestamp(TimeUnit::Microsecond, None), false) + ); + assert_eq!( + convert_type_no_metadata("DateTime64(7)") + .expect("Failed to convert ClickHouse type to Arrow"), + (DataType::Timestamp(TimeUnit::Nanosecond, None), false) + ); + } + + #[test] + fn test_nullable_type_mapping() { + // Non-nullable types + assert_eq!( + convert_type_no_metadata("String").expect("Failed to convert ClickHouse type to Arrow"), + (DataType::Utf8, false) + ); + assert_eq!( + convert_type_no_metadata("Int64").expect("Failed to convert ClickHouse type to Arrow"), + (DataType::Int64, false) + ); + + // Nullable types + assert_eq!( + convert_type_no_metadata("Nullable(String)") + .expect("Failed to convert ClickHouse type to Arrow"), + (DataType::Utf8, true) + ); + assert_eq!( + convert_type_no_metadata("Nullable(Int64)") + .expect("Failed to convert ClickHouse type to Arrow"), + (DataType::Int64, true) + ); + assert_eq!( + convert_type_no_metadata("Nullable(Float64)") + .expect("Failed to convert ClickHouse type to Arrow"), + (DataType::Float64, true) + ); + } + + #[test] + fn test_lowcardinality_type_mapping() { + assert_eq!( + convert_type_no_metadata("LowCardinality(String)") + .expect("Failed to convert ClickHouse type to Arrow"), + (DataType::Utf8, false) + ); + assert_eq!( + convert_type_no_metadata("LowCardinality(FixedString(10))") + .expect("Failed to convert ClickHouse type to Arrow"), + (DataType::Utf8, false) + ); + // Nullable + LowCardinality + assert_eq!( + convert_type_no_metadata("LowCardinality(Nullable(String))") + .expect("Failed to convert ClickHouse type to Arrow"), + (DataType::Utf8, true) + ); + } + + #[test] + fn test_decimal_type_mapping() { + // Generic Decimal(P, S) + assert_eq!( + convert_type_no_metadata("Decimal(10, 2)") + .expect("Failed to convert ClickHouse type to Arrow"), + (DataType::Decimal128(10, 2), false) + ); + assert_eq!( + convert_type_no_metadata("Decimal(38, 6)") + .expect("Failed to convert ClickHouse type to Arrow"), + (DataType::Decimal128(38, 6), false) + ); + assert_eq!( + convert_type_no_metadata("Decimal(50, 10)") + .expect("Failed to convert ClickHouse type to Arrow"), + (DataType::Decimal256(50, 10), false) + ); + + // Generic Decimal without spaces and with spaces + assert_eq!( + convert_type_no_metadata("Decimal(10,2)") + .expect("Failed to convert ClickHouse type to Arrow"), + (DataType::Decimal128(10, 2), false) + ); + assert_eq!( + convert_type_no_metadata("Decimal( 18 , 6 )") + .expect("Failed to convert ClickHouse type to Arrow"), + (DataType::Decimal128(18, 6), false) + ); + + // Decimal32(S) - precision up to 9 + assert_eq!( + convert_type_no_metadata("Decimal32(2)") + .expect("Failed to convert ClickHouse type to Arrow"), + (DataType::Decimal128(9, 2), false) + ); + assert_eq!( + convert_type_no_metadata("Decimal32(4)") + .expect("Failed to convert ClickHouse type to Arrow"), + (DataType::Decimal128(9, 4), false) + ); + + // Decimal64(S) - precision up to 18 + assert_eq!( + convert_type_no_metadata("Decimal64(4)") + .expect("Failed to convert ClickHouse type to Arrow"), + (DataType::Decimal128(18, 4), false) + ); + assert_eq!( + convert_type_no_metadata("Decimal64(8)") + .expect("Failed to convert ClickHouse type to Arrow"), + (DataType::Decimal128(18, 8), false) + ); + + // Decimal128(S) - precision up to 38 + assert_eq!( + convert_type_no_metadata("Decimal128(10)") + .expect("Failed to convert ClickHouse type to Arrow"), + (DataType::Decimal128(38, 10), false) + ); + + // Decimal256(S) - precision up to 76 + assert_eq!( + convert_type_no_metadata("Decimal256(20)") + .expect("Failed to convert ClickHouse type to Arrow"), + (DataType::Decimal256(76, 20), false) + ); + + // With Nullable wrapper + assert_eq!( + convert_type_no_metadata("Nullable(Decimal(18, 6))") + .expect("Failed to convert ClickHouse type to Arrow"), + (DataType::Decimal128(18, 6), true) + ); + } + + #[test] + fn test_extract_identifier() { + assert_eq!(extract_identifier("Decimal(10, 2)"), ("Decimal", "(10, 2)")); + assert_eq!(extract_identifier("DateTime64(3)"), ("DateTime64", "(3)")); + assert_eq!(extract_identifier("Int32"), ("Int32", "")); + assert_eq!( + extract_identifier("LowCardinality(String)"), + ("LowCardinality", "(String)") + ); + assert_eq!(extract_identifier("Decimal128(10)"), ("Decimal128", "(10)")); + } + + #[test] + fn test_parse_args() { + // Simple cases + assert_eq!( + parse_args("(10, 2)").unwrap(), + vec!["10".to_string(), "2".to_string()] + ); + assert_eq!(parse_args("(3)").unwrap(), vec!["3".to_string()]); + assert_eq!(parse_args("()").unwrap(), Vec::::new()); + + // With spaces + assert_eq!( + parse_args("( 10 , 2 )").unwrap(), + vec!["10".to_string(), "2".to_string()] + ); + + // With nested parentheses + assert_eq!( + parse_args("(Nullable(String))").unwrap(), + vec!["Nullable(String)".to_string()] + ); + assert_eq!( + parse_args("(Array(Int32), String)").unwrap(), + vec!["Array(Int32)".to_string(), "String".to_string()] + ); + + // With quotes + assert_eq!( + parse_args("(3, 'UTC')").unwrap(), + vec!["3".to_string(), "'UTC'".to_string()] + ); + assert_eq!( + parse_args("(9, 'America/New_York')").unwrap(), + vec!["9".to_string(), "'America/New_York'".to_string()] + ); + + // Complex nested case + assert_eq!( + parse_args("(Tuple(Int32, String), Array(Float64))").unwrap(), + vec![ + "Tuple(Int32, String)".to_string(), + "Array(Float64)".to_string() + ] + ); + + // Error cases + assert!(parse_args("10, 2").is_err()); // Missing parentheses + assert!(parse_args("(10, 2").is_err()); // Missing closing paren + } + + #[test] + fn test_array_type_not_supported() { + // Array types should return an error + let result = convert_type_no_metadata("Array(Int32)"); + assert!(result.is_err()); + let err = result.unwrap_err(); + assert!(err.contains("Array type")); + assert!(err.contains("not supported")); + } + + #[test] + fn test_tuple_type_not_supported() { + // Tuple types should return an error + let result = convert_type_no_metadata("Tuple(String, Int64)"); + assert!(result.is_err()); + let err = result.unwrap_err(); + assert!(err.contains("Tuple type")); + assert!(err.contains("not supported")); + } + + #[test] + fn test_map_type_not_supported() { + // Map types should return an error + let result = convert_type_no_metadata("Map(String, Int64)"); + assert!(result.is_err()); + let err = result.unwrap_err(); + assert!(err.contains("Map type")); + assert!(err.contains("not supported")); + } + + #[test] + fn test_unknown_type_fails() { + // Unknown types should return an error + let result = convert_type_no_metadata("UnknownType"); + assert!(result.is_err()); + let err = result.unwrap_err(); + assert!(err.contains("Unknown ClickHouse type")); + } + + #[test] + fn test_parse_ch_type_primitives() { + assert_eq!(parse_ch_type("String"), ClickHouseType::Primitive("String")); + assert_eq!(parse_ch_type("Int64"), ClickHouseType::Primitive("Int64")); + assert_eq!( + parse_ch_type("DateTime64(3)"), + ClickHouseType::Primitive("DateTime64(3)") + ); + } + + #[test] + fn test_parse_ch_type_nullable() { + assert_eq!( + parse_ch_type("Nullable(String)"), + ClickHouseType::Nullable(Box::new(ClickHouseType::Primitive("String"))) + ); + assert_eq!( + parse_ch_type("Nullable(Int64)"), + ClickHouseType::Nullable(Box::new(ClickHouseType::Primitive("Int64"))) + ); + } + + #[test] + fn test_parse_ch_type_lowcardinality() { + assert_eq!( + parse_ch_type("LowCardinality(String)"), + ClickHouseType::LowCardinality(Box::new(ClickHouseType::Primitive("String"))) + ); + assert_eq!( + parse_ch_type("LowCardinality(Nullable(String))"), + ClickHouseType::LowCardinality(Box::new(ClickHouseType::Nullable(Box::new( + ClickHouseType::Primitive("String") + )))) + ); + } + + #[test] + fn test_parse_ch_type_is_nullable() { + assert!(!parse_ch_type("String").is_nullable()); + assert!(parse_ch_type("Nullable(String)").is_nullable()); + assert!(parse_ch_type("LowCardinality(Nullable(String))").is_nullable()); + assert!(!parse_ch_type("LowCardinality(String)").is_nullable()); + } + + #[test] + fn test_parse_ch_type_base_type() { + let parsed = parse_ch_type("LowCardinality(Nullable(String))"); + assert_eq!(parsed.base_type(), &ClickHouseType::Primitive("String")); + + let parsed = parse_ch_type("Nullable(Int64)"); + assert_eq!(parsed.base_type(), &ClickHouseType::Primitive("Int64")); + + let parsed = parse_ch_type("String"); + assert_eq!(parsed.base_type(), &ClickHouseType::Primitive("String")); + } +} diff --git a/src/sinks/clickhouse/arrow/schema.rs b/src/sinks/clickhouse/arrow/schema.rs new file mode 100644 index 0000000000000..f2359ca5f3519 --- /dev/null +++ b/src/sinks/clickhouse/arrow/schema.rs @@ -0,0 +1,227 @@ +//! Schema fetching and Arrow schema construction for ClickHouse tables. + +use arrow::datatypes::{Field, Schema}; +use async_trait::async_trait; +use http::{Request, StatusCode}; +use hyper::Body; +use serde::Deserialize; +use vector_lib::codecs::encoding::format::{ArrowEncodingError, SchemaProvider}; + +use crate::http::{Auth, HttpClient}; + +use super::parser::clickhouse_type_to_arrow; + +#[derive(Debug, Deserialize)] +struct ColumnInfo { + name: String, + #[serde(rename = "type")] + column_type: String, +} + +/// URL-encodes a string for use in HTTP query parameters. +fn url_encode(s: &str) -> String { + percent_encoding::utf8_percent_encode(s, percent_encoding::NON_ALPHANUMERIC).to_string() +} + +/// Fetches the schema for a ClickHouse table and converts it to an Arrow schema. +pub async fn fetch_table_schema( + client: &HttpClient, + endpoint: &str, + database: &str, + table: &str, + auth: Option<&Auth>, +) -> crate::Result { + let query = "SELECT name, type \ + FROM system.columns \ + WHERE database = {db:String} AND table = {tbl:String} \ + ORDER BY position \ + FORMAT JSONEachRow"; + + // Build URI with query and parameters + let uri = format!( + "{}?query={}¶m_db={}¶m_tbl={}", + endpoint, + url_encode(query), + url_encode(database), + url_encode(table) + ); + let mut request = Request::get(&uri).body(Body::empty()).unwrap(); + + if let Some(auth) = auth { + auth.apply(&mut request); + } + + let response = client.send(request).await?; + + match response.status() { + StatusCode::OK => { + let body_bytes = http_body::Body::collect(response.into_body()) + .await? + .to_bytes(); + let body_str = String::from_utf8(body_bytes.into()) + .map_err(|e| format!("Failed to parse response as UTF-8: {}", e))?; + + parse_schema_from_response(&body_str) + } + status => Err(format!("Failed to fetch schema from ClickHouse: HTTP {}", status).into()), + } +} + +/// Parses the JSON response from ClickHouse and builds an Arrow schema. +fn parse_schema_from_response(response: &str) -> crate::Result { + let mut columns: Vec = Vec::new(); + + for line in response.lines() { + if line.trim().is_empty() { + continue; + } + + let column: ColumnInfo = serde_json::from_str(line) + .map_err(|e| format!("Failed to parse column info: {}", e))?; + columns.push(column); + } + + if columns.is_empty() { + return Err("No columns found in table schema".into()); + } + + let mut fields = Vec::new(); + for column in columns { + let (arrow_type, nullable) = clickhouse_type_to_arrow(&column.column_type) + .map_err(|e| format!("Failed to convert column '{}': {}", column.name, e))?; + fields.push(Field::new(&column.name, arrow_type, nullable)); + } + + Ok(Schema::new(fields)) +} + +/// Schema provider implementation for ClickHouse tables. +#[derive(Clone, Debug)] +pub struct ClickHouseSchemaProvider { + client: HttpClient, + endpoint: String, + database: String, + table: String, + auth: Option, +} + +impl ClickHouseSchemaProvider { + /// Create a new ClickHouse schema provider. + pub const fn new( + client: HttpClient, + endpoint: String, + database: String, + table: String, + auth: Option, + ) -> Self { + Self { + client, + endpoint, + database, + table, + auth, + } + } +} + +#[async_trait] +impl SchemaProvider for ClickHouseSchemaProvider { + async fn get_schema(&self) -> Result { + fetch_table_schema( + &self.client, + &self.endpoint, + &self.database, + &self.table, + self.auth.as_ref(), + ) + .await + .map_err(|e| ArrowEncodingError::SchemaFetchError { + message: e.to_string(), + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow::datatypes::{DataType, TimeUnit}; + + #[test] + fn test_parse_schema() { + let response = r#"{"name":"id","type":"Int64"} +{"name":"message","type":"String"} +{"name":"timestamp","type":"DateTime"} +"#; + + let schema = parse_schema_from_response(response).unwrap(); + assert_eq!(schema.fields().len(), 3); + assert_eq!(schema.field(0).name(), "id"); + assert_eq!(schema.field(0).data_type(), &DataType::Int64); + assert_eq!(schema.field(1).name(), "message"); + assert_eq!(schema.field(1).data_type(), &DataType::Utf8); + assert_eq!(schema.field(2).name(), "timestamp"); + assert_eq!( + schema.field(2).data_type(), + &DataType::Timestamp(TimeUnit::Second, None) + ); + } + + #[test] + fn test_parse_schema_with_type_parameters() { + // Test that type string parsing works for types with parameters + let response = r#"{"name":"bytes_sent","type":"Decimal(18, 2)"} +{"name":"timestamp","type":"DateTime64(6)"} +{"name":"duration_ms","type":"Decimal32(4)"} +"#; + + let schema = parse_schema_from_response(response).unwrap(); + assert_eq!(schema.fields().len(), 3); + + // Check Decimal parsed from type string + assert_eq!(schema.field(0).name(), "bytes_sent"); + assert_eq!(schema.field(0).data_type(), &DataType::Decimal128(18, 2)); + + // Check DateTime64 parsed from type string + assert_eq!(schema.field(1).name(), "timestamp"); + assert_eq!( + schema.field(1).data_type(), + &DataType::Timestamp(TimeUnit::Microsecond, None) + ); + + // Check Decimal32 parsed from type string + assert_eq!(schema.field(2).name(), "duration_ms"); + assert_eq!(schema.field(2).data_type(), &DataType::Decimal128(9, 4)); + } + + #[test] + fn test_schema_field_ordering() { + let response = r#"{"name":"timestamp","type":"DateTime64(3)"} +{"name":"host","type":"String"} +{"name":"message","type":"String"} +{"name":"id","type":"Int64"} +{"name":"score","type":"Float64"} +{"name":"active","type":"Bool"} +{"name":"name","type":"String"} +"#; + + let schema = parse_schema_from_response(response).unwrap(); + assert_eq!(schema.fields().len(), 7); + + assert_eq!(schema.field(0).name(), "timestamp"); + assert_eq!(schema.field(1).name(), "host"); + assert_eq!(schema.field(2).name(), "message"); + assert_eq!(schema.field(3).name(), "id"); + assert_eq!(schema.field(4).name(), "score"); + assert_eq!(schema.field(5).name(), "active"); + assert_eq!(schema.field(6).name(), "name"); + + assert_eq!( + schema.field(0).data_type(), + &DataType::Timestamp(TimeUnit::Millisecond, None) + ); + assert_eq!(schema.field(1).data_type(), &DataType::Utf8); + assert_eq!(schema.field(3).data_type(), &DataType::Int64); + assert_eq!(schema.field(4).data_type(), &DataType::Float64); + assert_eq!(schema.field(5).data_type(), &DataType::Boolean); + } +} diff --git a/src/sinks/clickhouse/config.rs b/src/sinks/clickhouse/config.rs index c33bfdbac5eda..6ff081e9aafa0 100644 --- a/src/sinks/clickhouse/config.rs +++ b/src/sinks/clickhouse/config.rs @@ -4,7 +4,8 @@ use std::fmt; use http::{Request, StatusCode, Uri}; use hyper::Body; -use vector_lib::codecs::{JsonSerializerConfig, NewlineDelimitedEncoderConfig, encoding::Framer}; +use vector_lib::codecs::encoding::format::SchemaProvider; +use vector_lib::codecs::encoding::{ArrowStreamSerializerConfig, BatchSerializerConfig}; use super::{ request_builder::ClickhouseRequestBuilder, @@ -39,6 +40,10 @@ pub enum Format { /// JSONAsString. JsonAsString, + + /// ArrowStream (beta). + #[configurable(metadata(status = "beta"))] + ArrowStream, } impl fmt::Display for Format { @@ -47,6 +52,7 @@ impl fmt::Display for Format { Format::JsonEachRow => write!(f, "JSONEachRow"), Format::JsonAsObject => write!(f, "JSONAsObject"), Format::JsonAsString => write!(f, "JSONAsString"), + Format::ArrowStream => write!(f, "ArrowStream"), } } } @@ -95,6 +101,14 @@ pub struct ClickhouseConfig { #[serde(default, skip_serializing_if = "crate::serde::is_default")] pub encoding: Transformer, + /// The batch encoding configuration for encoding events in batches. + /// + /// When specified, events are encoded together as a single batch. + /// This is mutually exclusive with per-event encoding based on the `format` field. + #[configurable(derived)] + #[serde(default)] + pub batch_encoding: Option, + #[configurable(derived)] #[serde(default)] pub batch: BatchConfig, @@ -215,15 +229,14 @@ impl SinkConfig for ClickhouseConfig { .expect("'default' should be a valid template") }); + // Resolve the encoding strategy (format + encoder) based on configuration + let (format, encoder_kind) = self + .resolve_strategy(&client, &endpoint, &database, auth.as_ref()) + .await?; + let request_builder = ClickhouseRequestBuilder { compression: self.compression, - encoding: ( - self.encoding.clone(), - Encoder::::new( - NewlineDelimitedEncoderConfig.build().into(), - JsonSerializerConfig::default().build().into(), - ), - ), + encoder: (self.encoding.clone(), encoder_kind), }; let sink = ClickhouseSink::new( @@ -231,7 +244,7 @@ impl SinkConfig for ClickhouseConfig { service, database, self.table.clone(), - self.format, + format, request_builder, ); @@ -249,6 +262,119 @@ impl SinkConfig for ClickhouseConfig { } } +impl ClickhouseConfig { + /// Resolves the encoding strategy (format + encoder) based on configuration. + /// + /// This method determines the appropriate ClickHouse format and Vector encoder + /// based on the user's configuration, ensuring they are consistent. + async fn resolve_strategy( + &self, + client: &HttpClient, + endpoint: &Uri, + database: &Template, + auth: Option<&Auth>, + ) -> crate::Result<(Format, crate::codecs::EncoderKind)> { + use crate::codecs::EncoderKind; + use vector_lib::codecs::{ + JsonSerializerConfig, NewlineDelimitedEncoderConfig, encoding::Framer, + }; + + if let Some(batch_encoding) = &self.batch_encoding { + use crate::codecs::{BatchEncoder, BatchSerializer}; + + // Validate that batch_encoding is only compatible with ArrowStream format + if self.format != Format::ArrowStream { + return Err(format!( + "'batch_encoding' is only compatible with 'format: arrow_stream'. Found 'format: {}'.", + self.format + ) + .into()); + } + + let mut arrow_config = match batch_encoding { + BatchSerializerConfig::ArrowStream(config) => config.clone(), + }; + + self.resolve_arrow_schema( + client, + endpoint.to_string(), + database, + auth, + &mut arrow_config, + ) + .await?; + + let resolved_batch_config = BatchSerializerConfig::ArrowStream(arrow_config); + let arrow_serializer = resolved_batch_config.build()?; + let batch_serializer = BatchSerializer::Arrow(arrow_serializer); + let encoder = EncoderKind::Batch(BatchEncoder::new(batch_serializer)); + + return Ok((Format::ArrowStream, encoder)); + } + + let encoder = EncoderKind::Framed(Box::new(Encoder::::new( + NewlineDelimitedEncoderConfig.build().into(), + JsonSerializerConfig::default().build().into(), + ))); + + Ok((self.format, encoder)) + } + + async fn resolve_arrow_schema( + &self, + client: &HttpClient, + endpoint: String, + database: &Template, + auth: Option<&Auth>, + config: &mut ArrowStreamSerializerConfig, + ) -> crate::Result<()> { + use super::arrow; + + if self.table.is_dynamic() || database.is_dynamic() { + return Err( + "Arrow codec requires a static table and database. Dynamic schema inference is not supported." + .into(), + ); + } + + let table_str = self.table.get_ref(); + let database_str = database.get_ref(); + + debug!( + "Fetching schema for table {}.{} at startup.", + database_str, table_str + ); + + let provider = arrow::ClickHouseSchemaProvider::new( + client.clone(), + endpoint, + database_str.to_string(), + table_str.to_string(), + auth.cloned(), + ); + + let schema = provider.get_schema().await.map_err(|e| { + format!( + "Failed to fetch schema for {}.{}: {}.", + database_str, table_str, e + ) + })?; + + config.schema = Some(schema); + + debug!( + "Successfully fetched Arrow schema with {} fields.", + config + .schema + .as_ref() + .map(|s| s.fields().len()) + .unwrap_or(0) + ); + + Ok(()) + } +} + fn get_healthcheck_uri(endpoint: &Uri) -> String { let mut uri = endpoint.to_string(); if !uri.ends_with('/') { @@ -277,6 +403,7 @@ async fn healthcheck(client: HttpClient, endpoint: Uri, auth: Option) -> c #[cfg(test)] mod tests { use super::*; + use vector_lib::codecs::encoding::ArrowStreamSerializerConfig; #[test] fn generate_config() { @@ -298,4 +425,82 @@ mod tests { "http://localhost:8123/path/?query=SELECT%201" ); } + + /// Helper to create a minimal ClickhouseConfig for testing + fn create_test_config( + format: Format, + batch_encoding: Option, + ) -> ClickhouseConfig { + ClickhouseConfig { + endpoint: "http://localhost:8123".parse::().unwrap().into(), + table: "test_table".try_into().unwrap(), + database: Some("test_db".try_into().unwrap()), + format, + batch_encoding, + ..Default::default() + } + } + + #[tokio::test] + async fn test_format_selection_with_batch_encoding() { + use crate::http::HttpClient; + use crate::tls::TlsSettings; + + // Create minimal dependencies for resolve_strategy + let tls = TlsSettings::default(); + let client = HttpClient::new(tls, &Default::default()).unwrap(); + let endpoint: http::Uri = "http://localhost:8123".parse().unwrap(); + let database: Template = "test_db".try_into().unwrap(); + + // Test incompatible formats - should all return errors + let incompatible_formats = vec![ + (Format::JsonEachRow, "json_each_row"), + (Format::JsonAsObject, "json_as_object"), + (Format::JsonAsString, "json_as_string"), + ]; + + for (format, format_name) in incompatible_formats { + let config = create_test_config( + format, + Some(BatchSerializerConfig::ArrowStream( + ArrowStreamSerializerConfig::default(), + )), + ); + + let result = config + .resolve_strategy(&client, &endpoint, &database, None) + .await; + + assert!( + result.is_err(), + "Expected error for format {} with batch_encoding, but got success", + format_name + ); + } + } + + #[test] + fn test_format_selection_without_batch_encoding() { + // When batch_encoding is None, the configured format should be used + let configs = vec![ + Format::JsonEachRow, + Format::JsonAsObject, + Format::JsonAsString, + Format::ArrowStream, + ]; + + for format in configs { + let config = create_test_config(format, None); + + assert!( + config.batch_encoding.is_none(), + "batch_encoding should be None for format {:?}", + format + ); + assert_eq!( + config.format, format, + "format should match configured value" + ); + } + } } diff --git a/src/sinks/clickhouse/integration_tests.rs b/src/sinks/clickhouse/integration_tests.rs index fe7a10226ac60..3798595708b41 100644 --- a/src/sinks/clickhouse/integration_tests.rs +++ b/src/sinks/clickhouse/integration_tests.rs @@ -16,6 +16,7 @@ use serde::Deserialize; use serde_json::Value; use tokio::time::{Duration, timeout}; use vector_lib::{ + codecs::encoding::BatchSerializerConfig, event::{BatchNotifier, BatchStatus, BatchStatusReceiver, Event, LogEvent}, lookup::PathPrefix, }; @@ -468,3 +469,139 @@ struct Stats { elapsed: f64, rows_read: usize, } + +#[tokio::test] +async fn insert_events_arrow_format() { + trace_init(); + + let table = random_table_name(); + let host = clickhouse_address(); + + let mut batch = BatchConfig::default(); + batch.max_events = Some(5); + + let config = ClickhouseConfig { + endpoint: host.parse().unwrap(), + table: table.clone().try_into().unwrap(), + compression: Compression::None, + format: crate::sinks::clickhouse::config::Format::ArrowStream, + batch_encoding: Some(BatchSerializerConfig::ArrowStream(Default::default())), + batch, + request: TowerRequestConfig { + retry_attempts: 1, + ..Default::default() + }, + ..Default::default() + }; + + let client = ClickhouseClient::new(host.clone()); + + client + .create_table( + &table, + "host String, timestamp DateTime64(3), message String, count Int64", + ) + .await; + + let (sink, _hc) = config.build(SinkContext::default()).await.unwrap(); + + let mut events: Vec = Vec::new(); + for i in 0..5 { + let mut event = LogEvent::from(format!("log message {}", i)); + event.insert("host", format!("host{}.example.com", i)); + event.insert("count", i as i64); + events.push(event.into()); + } + + run_and_assert_sink_compliance(sink, stream::iter(events), &SINK_TAGS).await; + + let output = client.select_all(&table).await; + assert_eq!(5, output.rows); + + // Verify fields exist and are correctly typed + for row in output.data.iter() { + assert!(row.get("host").and_then(|v| v.as_str()).is_some()); + assert!(row.get("message").and_then(|v| v.as_str()).is_some()); + assert!( + row.get("count") + .and_then(|v| v.as_str()) + .and_then(|s| s.parse::().ok()) + .is_some() + ); + } +} + +#[tokio::test] +async fn insert_events_arrow_with_schema_fetching() { + trace_init(); + + let table = random_table_name(); + let host = clickhouse_address(); + + let mut batch = BatchConfig::default(); + batch.max_events = Some(3); + + let client = ClickhouseClient::new(host.clone()); + + // Create table with specific typed columns including various data types + // Include standard Vector log fields: host, timestamp, message + client + .create_table( + &table, + "host String, timestamp DateTime64(3), message String, id Int64, name String, score Float64, active Bool", + ) + .await; + + let config = ClickhouseConfig { + endpoint: host.parse().unwrap(), + table: table.clone().try_into().unwrap(), + compression: Compression::None, + format: crate::sinks::clickhouse::config::Format::ArrowStream, + batch_encoding: Some(BatchSerializerConfig::ArrowStream(Default::default())), + batch, + request: TowerRequestConfig { + retry_attempts: 1, + ..Default::default() + }, + ..Default::default() + }; + + // Building the sink should fetch the schema from ClickHouse + let (sink, _hc) = config.build(SinkContext::default()).await.unwrap(); + + // Create events with various types that should match the schema + let mut events: Vec = Vec::new(); + for i in 0..3 { + let mut event = LogEvent::from(format!("Test message {}", i)); + event.insert("host", format!("host{}.example.com", i)); + event.insert("id", i as i64); + event.insert("name", format!("user_{}", i)); + event.insert("score", 95.5 + i as f64); + event.insert("active", i % 2 == 0); + events.push(event.into()); + } + + run_and_assert_sink_compliance(sink, stream::iter(events), &SINK_TAGS).await; + + let output = client.select_all(&table).await; + assert_eq!(3, output.rows); + + // Verify all fields exist and have the correct types + for row in output.data.iter() { + // Check standard Vector fields exist + assert!(row.get("host").and_then(|v| v.as_str()).is_some()); + assert!(row.get("message").and_then(|v| v.as_str()).is_some()); + assert!(row.get("timestamp").is_some()); + + // Check custom fields have correct types + assert!( + row.get("id") + .and_then(|v| v.as_str()) + .and_then(|s| s.parse::().ok()) + .is_some() + ); + assert!(row.get("name").and_then(|v| v.as_str()).is_some()); + assert!(row.get("score").and_then(|v| v.as_f64()).is_some()); + assert!(row.get("active").and_then(|v| v.as_bool()).is_some()); + } +} diff --git a/src/sinks/clickhouse/mod.rs b/src/sinks/clickhouse/mod.rs index 3a578041bf533..4b38834915379 100644 --- a/src/sinks/clickhouse/mod.rs +++ b/src/sinks/clickhouse/mod.rs @@ -9,6 +9,7 @@ //! //! This sink only supports logs for now but could support metrics and traces as well in the future. +mod arrow; pub mod config; #[cfg(all(test, feature = "clickhouse-integration-tests"))] mod integration_tests; diff --git a/src/sinks/clickhouse/request_builder.rs b/src/sinks/clickhouse/request_builder.rs index 7f8edc0d2d02e..688f1436c37df 100644 --- a/src/sinks/clickhouse/request_builder.rs +++ b/src/sinks/clickhouse/request_builder.rs @@ -1,20 +1,21 @@ //! `RequestBuilder` implementation for the `Clickhouse` sink. use bytes::Bytes; -use vector_lib::codecs::encoding::Framer; use super::sink::PartitionKey; -use crate::sinks::{prelude::*, util::http::HttpRequest}; +use crate::codecs::EncoderKind; +use crate::sinks::prelude::*; +use crate::sinks::util::http::HttpRequest; pub(super) struct ClickhouseRequestBuilder { pub(super) compression: Compression, - pub(super) encoding: (Transformer, Encoder), + pub(super) encoder: (Transformer, EncoderKind), } impl RequestBuilder<(PartitionKey, Vec)> for ClickhouseRequestBuilder { type Metadata = (PartitionKey, EventFinalizers); type Events = Vec; - type Encoder = (Transformer, Encoder); + type Encoder = (Transformer, EncoderKind); type Payload = Bytes; type Request = HttpRequest; type Error = std::io::Error; @@ -24,7 +25,7 @@ impl RequestBuilder<(PartitionKey, Vec)> for ClickhouseRequestBuilder { } fn encoder(&self) -> &Self::Encoder { - &self.encoding + &self.encoder } fn split_input( diff --git a/src/sinks/clickhouse/service.rs b/src/sinks/clickhouse/service.rs index e9974b32a8dd9..53b09270fbdfd 100644 --- a/src/sinks/clickhouse/service.rs +++ b/src/sinks/clickhouse/service.rs @@ -92,10 +92,18 @@ impl HttpServiceRequestBuilder for ClickhouseServiceRequestBuilder let auth: Option = self.auth.clone(); + // Extract format before taking payload to avoid borrow checker issues + let format = metadata.format; let payload = request.take_payload(); + // Set content type based on format + let content_type = match format { + Format::ArrowStream => "application/vnd.apache.arrow.stream", + _ => "application/x-ndjson", + }; + let mut builder = Request::post(&uri) - .header(CONTENT_TYPE, "application/x-ndjson") + .header(CONTENT_TYPE, content_type) .header(CONTENT_LENGTH, payload.len()); if let Some(ce) = self.compression.content_encoding() { builder = builder.header(CONTENT_ENCODING, ce); @@ -200,8 +208,8 @@ fn set_uri_query( #[cfg(test)] mod tests { + use super::super::config::AsyncInsertSettingsConfig; use super::*; - use crate::sinks::clickhouse::config::*; #[test] fn encode_valid() { diff --git a/website/cue/reference/components/sinks/clickhouse.cue b/website/cue/reference/components/sinks/clickhouse.cue index 893ff01c63d32..1049cf5217976 100644 --- a/website/cue/reference/components/sinks/clickhouse.cue +++ b/website/cue/reference/components/sinks/clickhouse.cue @@ -80,4 +80,90 @@ components: sinks: clickhouse: { metrics: null traces: false } + + how_it_works: { + data_formats: { + title: "Data Formats" + body: """ + The ClickHouse sink supports multiple data formats for inserting events: + + #### JSONEachRow (default) + + The default format is `JSONEachRow`, which sends events as newline-delimited JSON. Each event is + encoded as a single JSON object on its own line. This format is simple and flexible, allowing + ClickHouse to handle type conversions automatically. + + ```yaml + sinks: + clickhouse: + type: clickhouse + endpoint: http://localhost:8123 + database: my_database + table: my_table + format: json_each_row # default + ``` + + #### JSONAsObject and JSONAsString + + These formats provide alternative JSON encoding strategies: + - `json_as_object`: Wraps the entire event as a JSON object + - `json_as_string`: Encodes the event as a JSON string + + #### ArrowStream (beta) + + The `arrow_stream` format uses Apache Arrow's streaming format to send data to ClickHouse. This + format offers better performance and type safety by fetching the table schema from ClickHouse at + startup and encoding events directly into Arrow format. + + ```yaml + sinks: + clickhouse: + type: clickhouse + endpoint: http://localhost:8123 + database: my_database + table: my_table + format: arrow_stream + batch_encoding: + codec: arrow_stream + ``` + + **Note**: The ArrowStream format requires a static (non-templated) table and database name, as the + schema is fetched once at startup. Dynamic table routing is not supported with this format. + """ + } + + arrow_type_mappings: { + title: "Arrow Type Mappings" + body: """ + When using the `arrow_stream` format, Vector automatically converts ClickHouse types to Arrow types. + The sink fetches the table schema from ClickHouse and maps each column type accordingly. + + #### Unsupported ClickHouse Types + + The following ClickHouse column types are **not yet supported** by Vector's + ArrowStream implementation: + - `Array` + - `Tuple` + - `Map` + - `IPv4` + - `IPv6` + + If your table contains these types, you should use one of the JSON formats instead. + + #### Unsupported Arrow Types + + Based on [ClickHouse's Arrow format documentation](\(urls.clickhouse_arrow)), the following + types are unsupported: + - `FIXED_SIZE_BINARY` + - `JSON` + - `UUID` + - `ENUM` + + #### Timezone Handling + + DateTime and DateTime64 columns with timezone information will be converted to Arrow timestamps + without timezone metadata. All timestamps are treated as UTC by default. + """ + } + } } diff --git a/website/cue/reference/components/sinks/generated/clickhouse.cue b/website/cue/reference/components/sinks/generated/clickhouse.cue index f807b58be475c..a0f4399bdc112 100644 --- a/website/cue/reference/components/sinks/generated/clickhouse.cue +++ b/website/cue/reference/components/sinks/generated/clickhouse.cue @@ -243,6 +243,50 @@ generated: components: sinks: clickhouse: configuration: { } } } + batch_encoding: { + description: """ + The batch encoding configuration for encoding events in batches. + + When specified, events are encoded together as a single batch. + This is mutually exclusive with per-event encoding based on the `format` field. + """ + required: false + type: object: options: { + allow_nullable_fields: { + description: """ + Allow null values for non-nullable fields in the schema. + + When enabled, missing or incompatible values will be encoded as null even for fields + marked as non-nullable in the Arrow schema. This is useful when working with downstream + systems that can handle null values through defaults, computed columns, or other mechanisms. + + When disabled (default), missing values for non-nullable fields will cause encoding errors, + ensuring all required data is present before sending to the sink. + """ + required: false + type: bool: default: false + } + codec: { + description: """ + Encodes events in [Apache Arrow][apache_arrow] IPC streaming format. + + This is the streaming variant of the Arrow IPC format, which writes + a continuous stream of record batches. + + [apache_arrow]: https://arrow.apache.org/ + """ + required: true + type: string: enum: arrow_stream: """ + Encodes events in [Apache Arrow][apache_arrow] IPC streaming format. + + This is the streaming variant of the Arrow IPC format, which writes + a continuous stream of record batches. + + [apache_arrow]: https://arrow.apache.org/ + """ + } + } + } compression: { description: """ Compression configuration. @@ -333,6 +377,7 @@ generated: components: sinks: clickhouse: configuration: { type: string: { default: "json_each_row" enum: { + arrow_stream: "ArrowStream (beta)." json_as_object: "JSONAsObject." json_as_string: "JSONAsString." json_each_row: "JSONEachRow." diff --git a/website/cue/reference/urls.cue b/website/cue/reference/urls.cue index 2578c0b38c60a..1da40aef1a6fc 100644 --- a/website/cue/reference/urls.cue +++ b/website/cue/reference/urls.cue @@ -111,6 +111,7 @@ urls: { chrono_time_formats: "https://docs.rs/chrono/latest/chrono/format/strftime/index.html#specifiers" cgroups_limit_resources: "https://the.binbashtheory.com/control-resources-cgroups/" clickhouse: "https://clickhouse.com/" + clickhouse_arrow: "https://clickhouse.com/docs/en/interfaces/formats#arrow" clickhouse_http: "https://clickhouse.com/docs/en/interfaces/http/" community_id_spec: "https://github.com/corelight/community-id-spec" console: "\(wikipedia)/wiki/System_console"