Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
d2322d6
chore: delta lake
benjamin-awd Dec 22, 2025
ca6c2b7
chore: add support for retries
benjamin-awd Dec 29, 2025
5833541
chore: update cached table after retry
benjamin-awd Dec 29, 2025
f25bd71
chore: enable schema evolution
benjamin-awd Jan 5, 2026
04a5f29
chore: add telemetry
benjamin-awd Jan 5, 2026
24d6543
chore: add support for s3/minio
benjamin-awd Jan 5, 2026
c2f3e89
build(deps): bump delta-lake to 0.30.0
benjamin-awd Jan 5, 2026
80949c7
chore: add integration tests
benjamin-awd Jan 5, 2026
48b36f3
chore: add schema inference
benjamin-awd Jan 5, 2026
3e42802
chore: clippy
benjamin-awd Jan 5, 2026
7469a60
chore: move schema evolution log to service layer
benjamin-awd Jan 7, 2026
65a772d
chore: add delta lake cdf source
benjamin-awd Jan 8, 2026
17c8856
chore: remove verify_cdf_enabled and re-use session context
benjamin-awd Jan 8, 2026
49bf380
chore: validate change data feed during config validation
benjamin-awd Jan 8, 2026
011ab96
chore: recover after vacuum
benjamin-awd Jan 8, 2026
a1ca534
chore: clippy
benjamin-awd Jan 8, 2026
4b32f2a
chore: use tokio rwlock
benjamin-awd Jan 9, 2026
acbaf62
chore: clippy
benjamin-awd Jan 9, 2026
144b88e
chore: avoid unnecessary table clone
benjamin-awd Jan 9, 2026
26f5eaf
chore: avoid unnecessary parquet round trip serialization/deserializa…
benjamin-awd Jan 9, 2026
e8f61ad
chore: stream processing instead of collect
benjamin-awd Jan 12, 2026
ab51706
chore: add max_versions_per_poll
benjamin-awd Jan 12, 2026
c705f91
chore: don't cache entire table in memory
benjamin-awd Jan 13, 2026
2dc64b2
chore: add checkpoint mechanism
benjamin-awd Jan 13, 2026
8f75832
chore: disable checkpoint logic
benjamin-awd Jan 14, 2026
978ee2e
chore: re-enable checkpointing
benjamin-awd Jan 14, 2026
21afc94
chore: retry on FAILED_PRECONDITION errors
benjamin-awd Jan 14, 2026
f645dc7
chore: add exponential backoff
benjamin-awd Jan 14, 2026
24c7f78
chore: disable checkpointing
benjamin-awd Jan 14, 2026
e65713c
chore: catch more cases of FAILED_PRECONDITION
benjamin-awd Jan 14, 2026
5927060
chore: add more error logging
benjamin-awd Jan 14, 2026
ea402cd
chore: update docs
benjamin-awd Jan 15, 2026
68502b2
refactor(delta_lake): use structured error matching instead of string…
benjamin-awd Jan 15, 2026
9098786
refactor(delta_lake): consolidate error classification into WriteErro…
benjamin-awd Jan 15, 2026
1cfce70
refactor(delta_lake): simplify WriteErrorKind to 4 variants
benjamin-awd Jan 15, 2026
02b4cd5
chore: add logging for objectstore transient error
benjamin-awd Jan 15, 2026
3fca356
chore: use default_request_builder_concurrency_limit
benjamin-awd Jan 15, 2026
d8b3a53
refactor(delta_lake): use CommitProperties for conflict retries
benjamin-awd Jan 15, 2026
02739f3
chore: add debug logs
benjamin-awd Jan 15, 2026
771c87f
chore: use ioruntime
benjamin-awd Jan 15, 2026
e4e41e1
chore: add spawn blocking for schema evolution
benjamin-awd Jan 15, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,760 changes: 2,088 additions & 672 deletions Cargo.lock

Large diffs are not rendered by default.

22 changes: 17 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ aws-types = { version = "1.3.8", default-features = false, optional = true }
# The sts crate is needed despite not being referred to anywhere in the code because we need to set the
# `behavior-version-latest` feature. Without this we get a runtime panic when `auth.assume_role` authentication
# is configured.
aws-sdk-sts = { version = "1.73.0", default-features = false, features = ["behavior-version-latest", "rt-tokio"], optional = true }
aws-sdk-sts = { version = "1.95.0", default-features = false, features = ["behavior-version-latest", "rt-tokio"], optional = true }

# The `aws-sdk-sts` crate is needed despite not being referred to anywhere in the code because we need to set the
# `behavior-version-latest` feature. Without this we get a runtime panic when `auth.assume_role` authentication is configured.
Expand All @@ -297,6 +297,9 @@ azure_core_for_storage = { package = "azure_core", version = "0.21.0", default-f
# OpenDAL
opendal = { version = "0.54", default-features = false, features = ["services-webhdfs"], optional = true }

# Delta Lake
deltalake = { version = "0.30.0", default-features = false, features = ["datafusion", "gcs", "s3", "rustls"], optional = true }

# Tower
tower = { version = "0.5.2", default-features = false, features = ["buffer", "limit", "retry", "timeout", "util", "balance", "discover"] }
tower-http = { version = "0.4.4", default-features = false, features = ["compression-full", "decompression-gzip", "trace"] }
Expand Down Expand Up @@ -338,10 +341,13 @@ greptimedb-ingester = { git = "https://github.com/GreptimeTeam/greptimedb-ingest

# External libs
arc-swap = { version = "1.7", default-features = false, optional = true }
async-compression = { version = "0.4.27", default-features = false, features = ["tokio", "gzip", "zstd"], optional = true }
# Pinned to 0.4.19 to use xz2 instead of liblzma (avoids conflict with datafusion's xz2)
# See: https://github.com/apache/datafusion/issues/15342
async-compression = { version = "=0.4.19", default-features = false, features = ["tokio", "gzip", "zstd"], optional = true }
apache-avro = { version = "0.16.0", default-features = false, optional = true }
arrow = { version = "56.2.0", default-features = false, features = ["ipc"], optional = true }
arrow-schema = { version = "56.2.0", default-features = false, optional = true }
# Updated to 57.1.0 for compatibility with deltalake 0.30.0
arrow = { version = "57.1.0", default-features = false, features = ["ipc"], optional = true }
arrow-schema = { version = "57.1.0", default-features = false, optional = true }
axum = { version = "0.6.20", default-features = false }
base64 = { workspace = true, optional = true }
bloomy = { version = "1.2.0", default-features = false, optional = true }
Expand Down Expand Up @@ -493,7 +499,6 @@ nix = { git = "https://github.com/vectordotdev/nix.git", branch = "memfd/gnu/mus
# The `heim` crates depend on `ntapi` 0.3.7 on Windows, but that version has an
# unaligned access bug fixed in the following revision.
ntapi = { git = "https://github.com/MSxDOS/ntapi.git", rev = "24fc1e47677fc9f6e38e5f154e6011dc9b270da6" }

[features]
# Default features for *-unknown-linux-gnu and *-apple-darwin
default = ["api", "api-client", "enrichment-tables", "sinks", "sources", "sources-dnstap", "transforms", "unix", "rdkafka?/gssapi-vendored", "secrets"]
Expand Down Expand Up @@ -601,6 +606,7 @@ sources-logs = [
"sources-aws_s3",
"sources-aws_sqs",
"sources-datadog_agent",
"sources-delta_lake_cdf",
"sources-demo_logs",
"sources-docker_logs",
"sources-exec",
Expand Down Expand Up @@ -653,6 +659,7 @@ sources-aws_kinesis_firehose = ["dep:base64"]
sources-aws_s3 = ["aws-core", "dep:aws-sdk-sqs", "dep:aws-sdk-s3", "dep:async-compression", "sources-aws_sqs", "tokio-util/io"]
sources-aws_sqs = ["aws-core", "dep:aws-sdk-sqs"]
sources-datadog_agent = ["sources-utils-http-encoding", "protobuf-build", "dep:prost"]
sources-delta_lake_cdf = ["dep:deltalake", "codecs-arrow"]
sources-demo_logs = ["dep:fakedata"]
sources-dnstap = ["sources-utils-net-tcp", "dep:base64", "dep:hickory-proto", "dep:dnsmsg-parser", "dep:dnstap-parser", "protobuf-build", "dep:prost"]
sources-docker_logs = ["docker"]
Expand Down Expand Up @@ -789,6 +796,7 @@ sinks-logs = [
"sinks-clickhouse",
"sinks-console",
"sinks-databend",
"sinks-delta_lake",
"sinks-datadog_events",
"sinks-datadog_logs",
"sinks-datadog_traces",
Expand Down Expand Up @@ -855,6 +863,7 @@ sinks-chronicle = []
sinks-clickhouse = ["dep:rust_decimal", "codecs-arrow"]
sinks-console = []
sinks-databend = ["dep:databend-client"]
sinks-delta_lake = ["dep:deltalake", "codecs-arrow", "dep:opendal", "dep:arc-swap"]
sinks-datadog_events = []
sinks-datadog_logs = []
sinks-datadog_metrics = ["protobuf-build", "dep:prost", "dep:prost-reflect"]
Expand Down Expand Up @@ -906,6 +915,7 @@ all-integration-tests = [
"chronicle-integration-tests",
"clickhouse-integration-tests",
"databend-integration-tests",
"delta-lake-integration-tests",
"datadog-agent-integration-tests",
"datadog-logs-integration-tests",
"datadog-metrics-integration-tests",
Expand Down Expand Up @@ -972,6 +982,8 @@ azure-blob-integration-tests = ["sinks-azure_blob"]
chronicle-integration-tests = ["sinks-gcp"]
clickhouse-integration-tests = ["sinks-clickhouse"]
databend-integration-tests = ["sinks-databend"]
delta-lake-integration-tests = ["sinks-delta_lake", "dep:deltalake"]
delta-lake-cdf-integration-tests = ["sources-delta_lake_cdf", "dep:deltalake"]
datadog-agent-integration-tests = ["sources-datadog_agent"]
datadog-logs-integration-tests = ["sinks-datadog_logs"]
datadog-metrics-integration-tests = ["sinks-datadog_metrics", "dep:prost"]
Expand Down
2 changes: 1 addition & 1 deletion lib/codecs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ path = "tests/bin/generate-avro-fixtures.rs"

[dependencies]
apache-avro = { version = "0.20.0", default-features = false }
arrow = { version = "56.2.0", default-features = false, features = ["ipc"] }
arrow = { version = "57.1.0", default-features = false, features = ["ipc"] }
async-trait.workspace = true
bytes.workspace = true
chrono.workspace = true
Expand Down
2 changes: 1 addition & 1 deletion lib/codecs/src/encoding/format/arrow/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ fn build_array_for_field(events: &[Event], field: &Field) -> Result<ArrayRef, Ar
}

/// Builds an Arrow RecordBatch from events
pub(crate) fn build_record_batch(
pub fn build_record_batch(
schema: SchemaRef,
events: &[Event],
) -> Result<RecordBatch, ArrowEncodingError> {
Expand Down
14 changes: 11 additions & 3 deletions lib/codecs/src/encoding/format/arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use snafu::Snafu;
use std::sync::Arc;
use vector_config::configurable_component;

use builder::build_record_batch;
pub use builder::build_record_batch;

/// Provides Arrow schema for encoding.
///
Expand Down Expand Up @@ -123,6 +123,11 @@ impl ArrowStreamSerializer {
schema: SchemaRef::new(schema),
})
}

/// Get a reference to the Arrow schema used by this serializer
pub fn schema(&self) -> &SchemaRef {
&self.schema
}
}

impl tokio_util::codec::Encoder<Vec<vector_core::event::Event>> for ArrowStreamSerializer {
Expand Down Expand Up @@ -234,8 +239,11 @@ pub fn encode_events_to_arrow_ipc_stream(
Ok(buffer.into_inner().freeze())
}

/// Recursively makes a Field and all its nested fields nullable
fn make_field_nullable(field: &arrow::datatypes::Field) -> arrow::datatypes::Field {
/// Recursively makes a Field and all its nested fields nullable.
///
/// This is useful for schema evolution scenarios where new fields need to be nullable,
/// or when events may have missing fields that should be represented as null.
pub fn make_field_nullable(field: &arrow::datatypes::Field) -> arrow::datatypes::Field {
let new_data_type = match field.data_type() {
DataType::List(inner_field) => DataType::List(make_field_nullable(inner_field).into()),
DataType::Struct(fields) => {
Expand Down
1 change: 1 addition & 0 deletions lib/codecs/src/encoding/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use std::fmt::Debug;
#[cfg(feature = "arrow")]
pub use arrow::{
ArrowEncodingError, ArrowStreamSerializer, ArrowStreamSerializerConfig, SchemaProvider,
build_record_batch, make_field_nullable,
};
pub use avro::{AvroSerializer, AvroSerializerConfig, AvroSerializerOptions};
pub use cef::{CefSerializer, CefSerializerConfig};
Expand Down
2 changes: 1 addition & 1 deletion lib/file-source-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ serde_json = { version = "1.0.143", default-features = false }
bstr = { version = "1.12", default-features = false }
bytes = { version = "1.10.1", default-features = false, features = ["serde"] }
dashmap = { version = "6.1", default-features = false }
async-compression = { version = "0.4.27", features = ["tokio", "gzip"] }
async-compression = { version = "=0.4.19", features = ["tokio", "gzip"] }
vector-common = { path = "../vector-common", default-features = false }
vector-config = { path = "../vector-config", default-features = false }
tokio = { workspace = true, features = ["full"] }
Expand Down
2 changes: 1 addition & 1 deletion lib/file-source/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ futures = { version = "0.3.31", default-features = false, features = ["executor"
futures-util.workspace = true
vector-common = { path = "../vector-common", default-features = false }
file-source-common = { path = "../file-source-common" }
async-compression = { version = "0.4.27", features = ["tokio", "gzip"] }
async-compression = { version = "=0.4.19", features = ["tokio", "gzip"] }

[dev-dependencies]
tokio = { workspace = true, features = ["full"] }
Expand Down
Loading