Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
42c4fb5
enhancement(clickhouse sink): Add `ArrowStream` format
benjamin-awd Nov 17, 2025
a9a67c2
chore: add docs
benjamin-awd Dec 12, 2025
e7b89c0
build(deps): use rust_decimal from parent Cargo.toml
benjamin-awd Dec 13, 2025
9e93926
refactor: simplify validation for batch_encoding format
benjamin-awd Dec 13, 2025
85d7aa3
chore: remove unnecessary :: prefix
benjamin-awd Dec 13, 2025
4e101ac
chore: add more robust handling for schema lookup query
benjamin-awd Dec 13, 2025
4211ce3
refactor: add framework for storing clickhouse types
benjamin-awd Dec 13, 2025
a4e634b
refactor: separate schema handling and type parsing
benjamin-awd Dec 13, 2025
1826aa3
chore: simplify error message for bad decimal parsing
benjamin-awd Dec 13, 2025
1db0f07
refactor: use extract_identifier for clickhouse type to arrow conversion
benjamin-awd Dec 13, 2025
e1b22a1
refactor: set schema to none if provider is used
benjamin-awd Dec 13, 2025
5525afb
chore: add beta flag for arrowstream format
benjamin-awd Dec 13, 2025
834ce89
chore: remove unnecessary Arc for schema provider
benjamin-awd Dec 13, 2025
aa69c1c
refactor: move schema provider logic to build phase
benjamin-awd Dec 14, 2025
d60aac6
Use workspace rust_decimal
thomasqueirozb Dec 15, 2025
1a54581
docs: add how_it_works to Clickhouse sink
benjamin-awd Dec 16, 2025
b4c0bc6
chore: fix punctuation for logs
benjamin-awd Dec 16, 2025
b8203d1
chore: remove redundant example from docs
benjamin-awd Dec 16, 2025
4710668
chore: spelling
benjamin-awd Dec 16, 2025
e96c42b
chore: update docs
benjamin-awd Dec 16, 2025
1621814
build(deps): remove unnecessary dependencies from sinks-clickhouse
benjamin-awd Dec 19, 2025
4efff48
refactor(codecs): store owned Schema in config, wrap in Arc only at s…
benjamin-awd Dec 19, 2025
faad2ad
chore: fix spacing issue in docs
benjamin-awd Dec 19, 2025
f8bb7de
Fix cue formatting
thomasqueirozb Dec 19, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ rand = { version = "0.9.2", default-features = false, features = ["small_rng", "
rand_distr = { version = "0.5.1", default-features = false }
regex = { version = "1.11.2", default-features = false, features = ["std", "perf"] }
reqwest = { version = "0.11.26", features = ["json"] }
rust_decimal = { version = "1.33", default-features = false, features = ["std"] }
rust_decimal = { version = "1.37.0", default-features = false, features = ["std"] }
semver = { version = "1.0.26", default-features = false, features = ["serde", "std"] }
serde = { version = "1.0.219", default-features = false, features = ["alloc", "derive", "rc"] }
serde_json = { version = "1.0.143", default-features = false, features = ["raw_value", "std"] }
Expand Down Expand Up @@ -341,6 +341,7 @@ arc-swap = { version = "1.7", default-features = false, optional = true }
async-compression = { version = "0.4.27", default-features = false, features = ["tokio", "gzip", "zstd"], optional = true }
apache-avro = { version = "0.16.0", default-features = false, optional = true }
arrow = { version = "56.2.0", default-features = false, features = ["ipc"], optional = true }
arrow-schema = { version = "56.2.0", default-features = false, optional = true }
axum = { version = "0.6.20", default-features = false }
base64 = { workspace = true, optional = true }
bloomy = { version = "1.2.0", default-features = false, optional = true }
Expand Down Expand Up @@ -583,7 +584,7 @@ enrichment-tables-mmdb = ["dep:maxminddb"]
enrichment-tables-memory = ["dep:evmap", "dep:evmap-derive", "dep:thread_local"]

# Codecs
codecs-arrow = ["vector-lib/arrow"]
codecs-arrow = ["dep:arrow", "dep:arrow-schema", "vector-lib/arrow"]
codecs-opentelemetry = ["vector-lib/opentelemetry"]
codecs-syslog = ["vector-lib/syslog"]

Expand Down Expand Up @@ -851,7 +852,7 @@ sinks-azure_blob = ["dep:azure_core", "dep:azure_identity", "dep:azure_storage",
sinks-azure_monitor_logs = []
sinks-blackhole = []
sinks-chronicle = []
sinks-clickhouse = []
sinks-clickhouse = ["dep:rust_decimal", "codecs-arrow"]
sinks-console = []
sinks-databend = ["dep:databend-client"]
sinks-datadog_events = []
Expand Down
3 changes: 3 additions & 0 deletions changelog.d/24074_clickhouse_arrow_format.enhancement.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
The `clickhouse` sink now supports the `arrow_stream` format option, enabling high-performance binary data transfer using Apache Arrow IPC. This provides significantly better performance and smaller payload sizes compared to JSON-based formats.

authors: benjamin-awd
3 changes: 2 additions & 1 deletion lib/codecs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ path = "tests/bin/generate-avro-fixtures.rs"
[dependencies]
apache-avro = { version = "0.20.0", default-features = false }
arrow = { version = "56.2.0", default-features = false, features = ["ipc"] }
async-trait.workspace = true
bytes.workspace = true
chrono.workspace = true
rust_decimal = { version = "1.37", default-features = false, features = ["std"] }
rust_decimal.workspace = true
csv-core = { version = "0.1.12", default-features = false }
derivative.workspace = true
dyn-clone = { version = "1", default-features = false }
Expand Down
56 changes: 37 additions & 19 deletions lib/codecs/src/encoding/format/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use arrow::{
ipc::writer::StreamWriter,
record_batch::RecordBatch,
};
use async_trait::async_trait;
use bytes::{BufMut, Bytes, BytesMut};
use chrono::{DateTime, Utc};
use rust_decimal::Decimal;
Expand All @@ -25,14 +26,26 @@ use vector_config::configurable_component;

use vector_core::event::{Event, Value};

/// Provides Arrow schema for encoding.
///
/// Sinks can implement this trait to provide custom schema fetching logic.
#[async_trait]
pub trait SchemaProvider: Send + Sync + std::fmt::Debug {
/// Fetch the Arrow schema from the data store.
///
/// This is called during sink configuration build phase to fetch
/// the schema once at startup, rather than at runtime.
async fn get_schema(&self) -> Result<Schema, ArrowEncodingError>;
}

/// Configuration for Arrow IPC stream serialization
#[configurable_component]
#[derive(Clone, Default)]
pub struct ArrowStreamSerializerConfig {
/// The Arrow schema to use for encoding
#[serde(skip)]
#[configurable(derived)]
pub schema: Option<Arc<arrow::datatypes::Schema>>,
pub schema: Option<arrow::datatypes::Schema>,

/// Allow null values for non-nullable fields in the schema.
///
Expand All @@ -43,7 +56,7 @@ pub struct ArrowStreamSerializerConfig {
/// When disabled (default), missing values for non-nullable fields will cause encoding errors,
/// ensuring all required data is present before sending to the sink.
#[serde(default)]
#[configurable(metadata(docs::examples = true))]
#[configurable(derived)]
pub allow_nullable_fields: bool,
}

Expand All @@ -64,7 +77,7 @@ impl std::fmt::Debug for ArrowStreamSerializerConfig {

impl ArrowStreamSerializerConfig {
/// Create a new ArrowStreamSerializerConfig with a schema
pub fn new(schema: Arc<arrow::datatypes::Schema>) -> Self {
pub fn new(schema: arrow::datatypes::Schema) -> Self {
Self {
schema: Some(schema),
allow_nullable_fields: false,
Expand All @@ -91,26 +104,28 @@ pub struct ArrowStreamSerializer {
impl ArrowStreamSerializer {
/// Create a new ArrowStreamSerializer with the given configuration
pub fn new(config: ArrowStreamSerializerConfig) -> Result<Self, vector_common::Error> {
let mut schema = config.schema.ok_or_else(|| {
vector_common::Error::from(
"Arrow serializer requires a schema. Pass a schema or fetch from provider before creating serializer."
)
})?;
let schema = config
.schema
.ok_or_else(|| vector_common::Error::from("Arrow serializer requires a schema."))?;

// If allow_nullable_fields is enabled, transform the schema once here
// instead of on every batch encoding
if config.allow_nullable_fields {
schema = Arc::new(Schema::new_with_metadata(
let schema = if config.allow_nullable_fields {
Schema::new_with_metadata(
schema
.fields()
.iter()
.map(|f| Arc::new(make_field_nullable(f)))
.collect::<Vec<_>>(),
schema.metadata().clone(),
));
}
)
} else {
schema
};

Ok(Self { schema })
Ok(Self {
schema: Arc::new(schema),
})
}
}

Expand Down Expand Up @@ -154,6 +169,13 @@ pub enum ArrowEncodingError {
#[snafu(display("Schema must be provided before encoding"))]
NoSchemaProvided,

/// Failed to fetch schema from provider
#[snafu(display("Failed to fetch schema from provider: {}", message))]
SchemaFetchError {
/// Error message from the provider
message: String,
},

/// Unsupported Arrow data type for field
#[snafu(display(
"Unsupported Arrow data type for field '{}': {:?}",
Expand Down Expand Up @@ -1500,13 +1522,9 @@ mod tests {
let log2 = LogEvent::default();
let events = vec![Event::Log(log1), Event::Log(log2)];

let schema = Arc::new(Schema::new(vec![Field::new(
"strict_field",
DataType::Int64,
false,
)]));
let schema = Schema::new(vec![Field::new("strict_field", DataType::Int64, false)]);

let mut config = ArrowStreamSerializerConfig::new(Arc::clone(&schema));
let mut config = ArrowStreamSerializerConfig::new(schema);
config.allow_nullable_fields = true;

let mut serializer =
Expand Down
4 changes: 3 additions & 1 deletion lib/codecs/src/encoding/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ mod text;
use std::fmt::Debug;

#[cfg(feature = "arrow")]
pub use arrow::{ArrowEncodingError, ArrowStreamSerializer, ArrowStreamSerializerConfig};
pub use arrow::{
ArrowEncodingError, ArrowStreamSerializer, ArrowStreamSerializerConfig, SchemaProvider,
};
pub use avro::{AvroSerializer, AvroSerializerConfig, AvroSerializerOptions};
pub use cef::{CefSerializer, CefSerializerConfig};
use dyn_clone::DynClone;
Expand Down
4 changes: 3 additions & 1 deletion lib/codecs/src/encoding/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ pub mod framing;
pub mod serializer;
pub use chunking::{Chunker, Chunking, GelfChunker};
#[cfg(feature = "arrow")]
pub use format::{ArrowEncodingError, ArrowStreamSerializer, ArrowStreamSerializerConfig};
pub use format::{
ArrowEncodingError, ArrowStreamSerializer, ArrowStreamSerializerConfig, SchemaProvider,
};
pub use format::{
AvroSerializer, AvroSerializerConfig, AvroSerializerOptions, CefSerializer,
CefSerializerConfig, CsvSerializer, CsvSerializerConfig, GelfSerializer, GelfSerializerConfig,
Expand Down
6 changes: 6 additions & 0 deletions src/sinks/clickhouse/arrow/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
//! Schema fetching and Arrow type mapping for ClickHouse tables.

pub mod parser;
pub mod schema;

pub use schema::ClickHouseSchemaProvider;
Loading
Loading