diff --git a/Cargo.lock b/Cargo.lock index 82386342d1618..7fc53c7afab2d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -94,6 +94,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" dependencies = [ "cfg-if", + "const-random", "getrandom 0.2.15", "once_cell", "serde", @@ -338,6 +339,175 @@ version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711" +[[package]] +name = "arrow" +version = "56.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e833808ff2d94ed40d9379848a950d995043c7fb3e81a30b383f4c6033821cc" +dependencies = [ + "arrow-arith", + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-data", + "arrow-ipc", + "arrow-ord", + "arrow-row", + "arrow-schema", + "arrow-select", + "arrow-string", +] + +[[package]] +name = "arrow-arith" +version = "56.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad08897b81588f60ba983e3ca39bda2b179bdd84dced378e7df81a5313802ef8" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "chrono", + "num", +] + +[[package]] +name = "arrow-array" +version = "56.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8548ca7c070d8db9ce7aa43f37393e4bfcf3f2d3681df278490772fd1673d08d" +dependencies = [ + "ahash 0.8.11", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "chrono", + "half", + "hashbrown 0.16.0", + "num", +] + +[[package]] +name = "arrow-buffer" +version = "56.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e003216336f70446457e280807a73899dd822feaf02087d31febca1363e2fccc" +dependencies = [ + "bytes 1.10.1", + "half", + "num", +] + +[[package]] +name = "arrow-cast" +version = "56.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "919418a0681298d3a77d1a315f625916cb5678ad0d74b9c60108eb15fd083023" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "arrow-select", + "atoi", + "base64 0.22.1", + "chrono", + "half", + "lexical-core", + "num", + "ryu", +] + +[[package]] +name = "arrow-data" +version = "56.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a5c64fff1d142f833d78897a772f2e5b55b36cb3e6320376f0961ab0db7bd6d0" +dependencies = [ + "arrow-buffer", + "arrow-schema", + "half", + "num", +] + +[[package]] +name = "arrow-ipc" +version = "56.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d3594dcddccc7f20fd069bc8e9828ce37220372680ff638c5e00dea427d88f5" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "arrow-select", + "flatbuffers", +] + +[[package]] +name = "arrow-ord" +version = "56.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c8f82583eb4f8d84d4ee55fd1cb306720cddead7596edce95b50ee418edf66f" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "arrow-select", +] + +[[package]] +name = "arrow-row" +version = "56.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d07ba24522229d9085031df6b94605e0f4b26e099fb7cdeec37abd941a73753" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "half", +] + +[[package]] +name = "arrow-schema" +version = "56.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b3aa9e59c611ebc291c28582077ef25c97f1975383f1479b12f3b9ffee2ffabe" + +[[package]] +name = "arrow-select" +version = "56.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c41dbbd1e97bfcaee4fcb30e29105fb2c75e4d82ae4de70b792a5d3f66b2e7a" +dependencies = [ + "ahash 0.8.11", + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "num", +] + +[[package]] +name = "arrow-string" +version = "56.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53f5183c150fbc619eede22b861ea7c0eebed8eaac0333eaa7f6da5205fd504d" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "arrow-select", + "memchr", + "num", + "regex", + "regex-syntax", +] + [[package]] name = "ascii" version = "0.9.3" @@ -1875,6 +2045,29 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3eeab4423108c5d7c744f4d234de88d18d636100093ae04caf4825134b9c3a32" +[[package]] +name = "borsh" +version = "1.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad8646f98db542e39fc66e68a20b2144f6a732636df7c2354e74645faaa433ce" +dependencies = [ + "borsh-derive", + "cfg_aliases", +] + +[[package]] +name = "borsh-derive" +version = "1.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fdd1d3c0c2f5833f22386f252fe8ed005c7f59fdcddeef025c01b4c3b9fd9ac3" +dependencies = [ + "once_cell", + "proc-macro-crate 3.2.0", + "proc-macro2 1.0.101", + "quote 1.0.40", + "syn 2.0.106", +] + [[package]] name = "brotli" version = "8.0.0" @@ -2298,6 +2491,7 @@ name = "codecs" version = "0.1.0" dependencies = [ "apache-avro", + "arrow", "bytes 1.10.1", "chrono", "csv-core", @@ -2315,6 +2509,7 @@ dependencies = [ "rand 0.9.2", "regex", "rstest", + "rust_decimal", "serde", "serde_json", "serde_with 3.14.0", @@ -2527,6 +2722,26 @@ version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "28c122c3980598d243d63d9a704629a2d748d101f278052ff068be5a4423ab6f" +[[package]] +name = "const-random" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87e00182fe74b066627d63b85fd550ac2998d4b0bd86bfed477a0ae4c7c71359" +dependencies = [ + "const-random-macro", +] + +[[package]] +name = "const-random-macro" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9d839f2a20b0aee515dc581a6172f2321f96cab76c1a38a4c584a194955390e" +dependencies = [ + "getrandom 0.2.15", + "once_cell", + "tiny-keccak", +] + [[package]] name = "convert_case" version = "0.4.0" @@ -3848,6 +4063,16 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" +[[package]] +name = "flatbuffers" +version = "25.9.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09b6620799e7340ebd9968d2e0708eb82cf1971e9a16821e2091b6d6e475eed5" +dependencies = [ + "bitflags 2.9.0", + "rustc_version 0.4.1", +] + [[package]] name = "flate2" version = "1.1.2" @@ -4418,6 +4643,7 @@ checksum = "6dd08c532ae367adf81c312a4580bc67f1d0fe8bc9c460520283f4c0ff277888" dependencies = [ "cfg-if", "crunchy", + "num-traits", ] [[package]] @@ -4471,6 +4697,12 @@ dependencies = [ "foldhash", ] +[[package]] +name = "hashbrown" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5419bdc4f6a9207fbeba6d11b604d481addf78ecd10c11ad51e76c2f6482748d" + [[package]] name = "hashlink" version = "0.10.0" @@ -5885,6 +6117,63 @@ dependencies = [ "spin 0.5.2", ] +[[package]] +name = "lexical-core" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d8d125a277f807e55a77304455eb7b1cb52f2b18c143b60e766c120bd64a594" +dependencies = [ + "lexical-parse-float", + "lexical-parse-integer", + "lexical-util", + "lexical-write-float", + "lexical-write-integer", +] + +[[package]] +name = "lexical-parse-float" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52a9f232fbd6f550bc0137dcb5f99ab674071ac2d690ac69704593cb4abbea56" +dependencies = [ + "lexical-parse-integer", + "lexical-util", +] + +[[package]] +name = "lexical-parse-integer" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a7a039f8fb9c19c996cd7b2fcce303c1b2874fe1aca544edc85c4a5f8489b34" +dependencies = [ + "lexical-util", +] + +[[package]] +name = "lexical-util" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2604dd126bb14f13fb5d1bd6a66155079cb9fa655b37f875b3a742c705dbed17" + +[[package]] +name = "lexical-write-float" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50c438c87c013188d415fbabbb1dceb44249ab81664efbd31b14ae55dabb6361" +dependencies = [ + "lexical-util", + "lexical-write-integer", +] + +[[package]] +name = "lexical-write-integer" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "409851a618475d2d5796377cad353802345cba92c867d9fbcde9cf4eac4e14df" +dependencies = [ + "lexical-util", +] + [[package]] name = "libc" version = "0.2.175" @@ -9151,12 +9440,18 @@ dependencies = [ [[package]] name = "rust_decimal" -version = "1.33.1" +version = "1.39.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "06676aec5ccb8fc1da723cc8c0f9a46549f21ebb8753d3915c6c41db1e7f1dc4" +checksum = "35affe401787a9bd846712274d97654355d21b2a2c092a3139aabe31e9022282" dependencies = [ "arrayvec", + "borsh", + "bytes 1.10.1", "num-traits", + "rand 0.8.5", + "rkyv", + "serde", + "serde_json", ] [[package]] @@ -10847,6 +11142,15 @@ dependencies = [ "time-core", ] +[[package]] +name = "tiny-keccak" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c9d3793400a45f954c52e73d068316d76b6f4e36977e3fcebb13a2721e80237" +dependencies = [ + "crunchy", +] + [[package]] name = "tinystr" version = "0.7.6" @@ -12054,6 +12358,7 @@ dependencies = [ "approx", "arc-swap", "arr_macro", + "arrow", "assert_cmd", "async-compression", "async-graphql", @@ -12194,6 +12499,7 @@ dependencies = [ "roaring", "rstest", "rumqttc", + "rust_decimal", "seahash", "semver 1.0.26", "serde", diff --git a/Cargo.toml b/Cargo.toml index 3b5d6c81c6640..ba22225df5ea3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -179,6 +179,7 @@ rand = { version = "0.9.2", default-features = false, features = ["small_rng", " rand_distr = { version = "0.5.1", default-features = false } regex = { version = "1.11.2", default-features = false, features = ["std", "perf"] } reqwest = { version = "0.11.26", features = ["json"] } +rust_decimal = { version = "1.33", default-features = false, features = ["std"] } semver = { version = "1.0.26", default-features = false, features = ["serde", "std"] } serde = { version = "1.0.219", default-features = false, features = ["alloc", "derive", "rc"] } serde_json = { version = "1.0.143", default-features = false, features = ["raw_value", "std"] } @@ -337,6 +338,7 @@ greptimedb-ingester = { git = "https://github.com/GreptimeTeam/greptimedb-ingest arc-swap = { version = "1.7", default-features = false, optional = true } async-compression = { version = "0.4.27", default-features = false, features = ["tokio", "gzip", "zstd"], optional = true } apache-avro = { version = "0.16.0", default-features = false, optional = true } +arrow = { version = "56.2.0", default-features = false, features = ["ipc"], optional = true } axum = { version = "0.6.20", default-features = false } base64 = { workspace = true, optional = true } bloomy = { version = "1.2.0", default-features = false, optional = true } @@ -402,6 +404,7 @@ redis = { version = "0.32.4", default-features = false, features = ["connection- regex.workspace = true roaring = { version = "0.11.2", default-features = false, features = ["std"], optional = true } rumqttc = { version = "0.24.0", default-features = false, features = ["use-rustls"], optional = true } +rust_decimal = { workspace = true, optional = true } seahash = { version = "4.1.0", default-features = false } smallvec = { version = "1", default-features = false, features = ["union", "serde"] } snap = { version = "1.1.1", default-features = false } @@ -576,8 +579,9 @@ enrichment-tables-mmdb = ["dep:maxminddb"] enrichment-tables-memory = ["dep:evmap", "dep:evmap-derive", "dep:thread_local"] # Codecs -codecs-syslog = ["vector-lib/syslog"] +codecs-arrow = ["vector-lib/arrow"] codecs-opentelemetry = ["vector-lib/opentelemetry"] +codecs-syslog = ["vector-lib/syslog"] # Secrets secrets = ["secrets-aws-secrets-manager"] diff --git a/LICENSE-3rdparty.csv b/LICENSE-3rdparty.csv index c6050f1474c11..918b2cd1c3a32 100644 --- a/LICENSE-3rdparty.csv +++ b/LICENSE-3rdparty.csv @@ -26,6 +26,7 @@ arbitrary,https://github.com/rust-fuzz/arbitrary,MIT OR Apache-2.0,"The Rust-Fuz arc-swap,https://github.com/vorner/arc-swap,MIT OR Apache-2.0,Michal 'vorner' Vaner arr_macro,https://github.com/JoshMcguigan/arr_macro,MIT OR Apache-2.0,Josh Mcguigan arrayvec,https://github.com/bluss/arrayvec,MIT OR Apache-2.0,bluss +arrow,https://github.com/apache/arrow-rs,Apache-2.0,Apache Arrow ascii,https://github.com/tomprogrammer/rust-ascii,Apache-2.0 OR MIT,"Thomas Bahn , Torbjørn Birch Moltu , Simon Sapin " async-broadcast,https://github.com/smol-rs/async-broadcast,MIT OR Apache-2.0,"Stjepan Glavina , Yoshua Wuyts , Zeeshan Ali Khan " async-channel,https://github.com/smol-rs/async-channel,Apache-2.0 OR MIT,Stjepan Glavina @@ -106,6 +107,8 @@ blocking,https://github.com/smol-rs/blocking,Apache-2.0 OR MIT,Stjepan Glavina < bloomy,https://docs.rs/bloomy/,MIT,"Aleksandr Bezobchuk , Alexis Sellier " bollard,https://github.com/fussybeaver/bollard,Apache-2.0,Bollard contributors borrow-or-share,https://github.com/yescallop/borrow-or-share,MIT-0,Scallop Ye +borsh,https://github.com/near/borsh-rs,MIT OR Apache-2.0,Near Inc +borsh-derive,https://github.com/near/borsh-rs,Apache-2.0,Near Inc brotli,https://github.com/dropbox/rust-brotli,BSD-3-Clause AND MIT,"Daniel Reiter Horn , The Brotli Authors" brotli-decompressor,https://github.com/dropbox/rust-brotli-decompressor,BSD-3-Clause OR MIT,"Daniel Reiter Horn , The Brotli Authors" bson,https://github.com/mongodb/bson-rust,MIT,"Y. T. Chung , Kevin Yeh , Saghm Rossi , Patrick Freed , Isabel Atkinson , Abraham Egnor " @@ -147,6 +150,8 @@ community-id,https://github.com/traceflight/rs-community-id,MIT OR Apache-2.0,Ju compact_str,https://github.com/ParkMyCar/compact_str,MIT,Parker Timmerman concurrent-queue,https://github.com/smol-rs/concurrent-queue,Apache-2.0 OR MIT,"Stjepan Glavina , Taiki Endo , John Nunley " const-oid,https://github.com/RustCrypto/formats/tree/master/const-oid,Apache-2.0 OR MIT,RustCrypto Developers +const-random,https://github.com/tkaitchuck/constrandom,MIT OR Apache-2.0,Tom Kaitchuck +const-random-macro,https://github.com/tkaitchuck/constrandom,MIT OR Apache-2.0,Tom Kaitchuck convert_case,https://github.com/rutrum/convert-case,MIT,David Purdum convert_case,https://github.com/rutrum/convert-case,MIT,rutrum cookie,https://github.com/SergioBenitez/cookie-rs,MIT OR Apache-2.0,"Sergio Benitez , Alex Crichton " @@ -236,6 +241,7 @@ fastrand,https://github.com/smol-rs/fastrand,Apache-2.0 OR MIT,Stjepan Glavina < ff,https://github.com/zkcrypto/ff,MIT OR Apache-2.0,"Sean Bowe , Jack Grigg " fiat-crypto,https://github.com/mit-plv/fiat-crypto,MIT OR Apache-2.0 OR BSD-1-Clause,Fiat Crypto library authors finl_unicode,https://github.com/dahosek/finl_unicode,MIT OR Apache-2.0,The finl_unicode Authors +flatbuffers,https://github.com/google/flatbuffers,Apache-2.0,"Robert Winslow , FlatBuffers Maintainers" flate2,https://github.com/rust-lang/flate2-rs,MIT OR Apache-2.0,"Alex Crichton , Josh Triplett " float_eq,https://github.com/jtempest/float_eq-rs,MIT OR Apache-2.0,jtempest fluent-uri,https://github.com/yescallop/fluent-uri-rs,MIT,Scallop Ye @@ -362,6 +368,12 @@ kube,https://github.com/kube-rs/kube,Apache-2.0,"clux , Nata lalrpop-util,https://github.com/lalrpop/lalrpop,Apache-2.0 OR MIT,Niko Matsakis lapin,https://github.com/amqp-rs/lapin,MIT,"Geoffroy Couprie , Marc-Antoine Perennou " lazy_static,https://github.com/rust-lang-nursery/lazy-static.rs,MIT OR Apache-2.0,Marvin Löbel +lexical-core,https://github.com/Alexhuszagh/rust-lexical,MIT OR Apache-2.0,Alex Huszagh +lexical-parse-float,https://github.com/Alexhuszagh/rust-lexical,MIT OR Apache-2.0,Alex Huszagh +lexical-parse-integer,https://github.com/Alexhuszagh/rust-lexical,MIT OR Apache-2.0,Alex Huszagh +lexical-util,https://github.com/Alexhuszagh/rust-lexical,MIT OR Apache-2.0,Alex Huszagh +lexical-write-float,https://github.com/Alexhuszagh/rust-lexical,MIT OR Apache-2.0,Alex Huszagh +lexical-write-integer,https://github.com/Alexhuszagh/rust-lexical,MIT OR Apache-2.0,Alex Huszagh libc,https://github.com/rust-lang/libc,MIT OR Apache-2.0,The Rust Project Developers libflate,https://github.com/sile/libflate,MIT,Takeru Ohta libm,https://github.com/rust-lang/libm,MIT OR Apache-2.0,Jorge Aparicio @@ -658,6 +670,7 @@ thread_local,https://github.com/Amanieu/thread_local-rs,MIT OR Apache-2.0,Amanie tikv-jemalloc-sys,https://github.com/tikv/jemallocator,MIT OR Apache-2.0,"Alex Crichton , Gonzalo Brito Gadeschi , The TiKV Project Developers" tikv-jemallocator,https://github.com/tikv/jemallocator,MIT OR Apache-2.0,"Alex Crichton , Gonzalo Brito Gadeschi , Simon Sapin , Steven Fackler , The TiKV Project Developers" time,https://github.com/time-rs/time,MIT OR Apache-2.0,"Jacob Pratt , Time contributors" +tiny-keccak,https://github.com/debris/tiny-keccak,CC0-1.0,debris tinystr,https://github.com/unicode-org/icu4x,Unicode-3.0,The ICU4X Project Developers tinyvec,https://github.com/Lokathor/tinyvec,Zlib OR Apache-2.0 OR MIT,Lokathor tinyvec_macros,https://github.com/Soveu/tinyvec_macros,MIT OR Apache-2.0 OR Zlib,Soveu diff --git a/changelog.d/24074_arrow_batch_codec.feature.md b/changelog.d/24074_arrow_batch_codec.feature.md new file mode 100644 index 0000000000000..d1f61dbd7b894 --- /dev/null +++ b/changelog.d/24074_arrow_batch_codec.feature.md @@ -0,0 +1,3 @@ +A generic Arrow codec has been added to support Apache Arrow IPC serialization across Vector. This enables sinks like `clickhouse` sink to use the ArrowStream format endpoint with significantly better performance and smaller payload sizes compared to JSON-based formats. + +authors: benjamin-awd diff --git a/changelog.d/24074_clickhouse_arrow_format.enhancement.md b/changelog.d/24074_clickhouse_arrow_format.enhancement.md new file mode 100644 index 0000000000000..d1f61dbd7b894 --- /dev/null +++ b/changelog.d/24074_clickhouse_arrow_format.enhancement.md @@ -0,0 +1,3 @@ +A generic Arrow codec has been added to support Apache Arrow IPC serialization across Vector. This enables sinks like `clickhouse` sink to use the ArrowStream format endpoint with significantly better performance and smaller payload sizes compared to JSON-based formats. + +authors: benjamin-awd diff --git a/lib/codecs/Cargo.toml b/lib/codecs/Cargo.toml index 8e4ce532c54b6..df23e04f10ba4 100644 --- a/lib/codecs/Cargo.toml +++ b/lib/codecs/Cargo.toml @@ -11,8 +11,10 @@ path = "tests/bin/generate-avro-fixtures.rs" [dependencies] apache-avro = { version = "0.16.0", default-features = false } +arrow = { version = "56.2.0", default-features = false, features = ["ipc"] } bytes.workspace = true chrono.workspace = true +rust_decimal = { version = "1.37", default-features = false, features = ["std"] } csv-core = { version = "0.1.12", default-features = false } derivative.workspace = true dyn-clone = { version = "1", default-features = false } @@ -53,5 +55,6 @@ uuid.workspace = true vrl.workspace = true [features] -syslog = ["dep:syslog_loose"] +arrow = [] opentelemetry = ["dep:opentelemetry-proto"] +syslog = ["dep:syslog_loose"] diff --git a/lib/codecs/src/encoding/format/arrow.rs b/lib/codecs/src/encoding/format/arrow.rs new file mode 100644 index 0000000000000..e63dd55eda148 --- /dev/null +++ b/lib/codecs/src/encoding/format/arrow.rs @@ -0,0 +1,1308 @@ +//! Arrow IPC streaming format codec for batched event encoding +//! +//! Provides Apache Arrow IPC stream format encoding with static schema support. +//! This implements the streaming variant of the Arrow IPC protocol, which writes +//! a continuous stream of record batches without a file footer. + +use arrow::{ + array::{ + ArrayRef, BinaryBuilder, BooleanBuilder, Decimal128Builder, Decimal256Builder, + Float64Builder, Int64Builder, StringBuilder, TimestampMicrosecondBuilder, + TimestampMillisecondBuilder, TimestampNanosecondBuilder, TimestampSecondBuilder, + UInt8Builder, UInt16Builder, UInt32Builder, UInt64Builder, + }, + datatypes::{DataType, Schema, TimeUnit, i256}, + ipc::writer::StreamWriter, + record_batch::RecordBatch, +}; +use async_trait::async_trait; +use bytes::{BufMut, Bytes, BytesMut}; +use chrono::{DateTime, Utc}; +use rust_decimal::Decimal; +use snafu::Snafu; +use std::sync::Arc; +use tracing::debug; +use vector_config::configurable_component; + +use vector_core::event::{Event, Value}; + +/// Provides Arrow schema for encoding. +/// +/// Sinks can implement this trait to provide custom schema fetching logic, +/// such as fetching from a database, inferring from data, or loading from configuration. +#[async_trait] +pub trait SchemaProvider: Send + Sync + std::fmt::Debug { + /// Get the Arrow schema for encoding events. + /// + /// This method will be called once before encoding begins, and the result + /// will be cached for the lifetime of the encoder. + async fn get_schema(&self) -> Result, ArrowEncodingError>; +} + +/// Configuration for Arrow IPC stream serialization +/// +/// Supports both immediate schema provision and lazy schema loading via providers. +#[configurable_component] +#[derive(Clone, Default)] +pub struct ArrowStreamSerializerConfig { + /// The Arrow schema to use for encoding (if known immediately) + #[serde(skip)] + #[configurable(derived)] + pub schema: Option>, + + /// Schema provider for lazy schema loading (not serializable) + #[serde(skip)] + schema_provider: Option>, +} + +impl std::fmt::Debug for ArrowStreamSerializerConfig { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ArrowStreamSerializerConfig") + .field( + "schema", + &self + .schema + .as_ref() + .map(|s| format!("{} fields", s.fields().len())), + ) + .field( + "schema_provider", + &self.schema_provider.as_ref().map(|_| ""), + ) + .finish() + } +} + +impl ArrowStreamSerializerConfig { + /// Create a new ArrowStreamSerializerConfig with an immediate schema + pub fn new(schema: Arc) -> Self { + Self { + schema: Some(schema), + schema_provider: None, + } + } + + /// Create a new ArrowStreamSerializerConfig with a schema provider + /// + /// The provider will be called to fetch the schema lazily, typically during + /// sink initialization before any events are encoded. + pub fn with_provider(provider: Arc) -> Self { + Self { + schema: None, + schema_provider: Some(provider), + } + } + + /// Get the schema provider if one was configured + pub fn provider(&self) -> Option<&Arc> { + self.schema_provider.as_ref() + } + + /// Resolve the schema from the provider if present. + pub async fn resolve(&mut self) -> Result<(), ArrowEncodingError> { + // If schema already exists, nothing to do + if self.schema.is_some() { + return Ok(()); + } + + // Fetch from provider if available + if let Some(provider) = &self.schema_provider { + let schema = provider.get_schema().await?; + self.schema = Some(schema); + Ok(()) + } else { + Err(ArrowEncodingError::NoSchemaProvided) + } + } + + /// The data type of events that are accepted by `ArrowStreamEncoder`. + pub fn input_type(&self) -> vector_core::config::DataType { + vector_core::config::DataType::Log + } + + /// The schema required by the serializer. + pub fn schema_requirement(&self) -> vector_core::schema::Requirement { + vector_core::schema::Requirement::empty() + } +} + +/// Arrow IPC stream batch serializer that holds the schema +#[derive(Clone, Debug)] +pub struct ArrowStreamSerializer { + schema: Arc, +} + +impl ArrowStreamSerializer { + /// Create a new ArrowStreamSerializer with the given configuration + pub fn new(config: ArrowStreamSerializerConfig) -> Result { + let schema = config.schema.ok_or_else(|| { + vector_common::Error::from( + "Arrow serializer requires a schema. Pass a schema or fetch from provider before creating serializer." + ) + })?; + + Ok(Self { schema }) + } +} + +impl tokio_util::codec::Encoder> for ArrowStreamSerializer { + type Error = vector_common::Error; + + fn encode(&mut self, events: Vec, buffer: &mut BytesMut) -> Result<(), Self::Error> { + if events.is_empty() { + return Err(vector_common::Error::from( + "No events provided for encoding", + )); + } + + let bytes = encode_events_to_arrow_ipc_stream(&events, Some(Arc::clone(&self.schema))) + .map_err(|e| vector_common::Error::from(e.to_string()))?; + + buffer.extend_from_slice(&bytes); + Ok(()) + } +} + +/// Errors that can occur during Arrow encoding +#[derive(Debug, Snafu)] +pub enum ArrowEncodingError { + /// Failed to create Arrow record batch + #[snafu(display("Failed to create Arrow record batch: {}", source))] + RecordBatchCreation { + /// The underlying Arrow error + source: arrow::error::ArrowError, + }, + + /// Failed to write Arrow IPC data + #[snafu(display("Failed to write Arrow IPC data: {}", source))] + IpcWrite { + /// The underlying Arrow error + source: arrow::error::ArrowError, + }, + + /// No events provided for encoding + #[snafu(display("No events provided for encoding"))] + NoEvents, + + /// Schema must be provided before runtime + #[snafu(display("Schema must be provided before runtime"))] + NoSchemaProvided, + + /// Failed to fetch schema from provider + #[snafu(display("Failed to fetch schema from provider: {}", message))] + SchemaFetchError { + /// Error message from the provider + message: String, + }, + + /// Unsupported Arrow data type for field + #[snafu(display( + "Unsupported Arrow data type for field '{}': {:?}", + field_name, + data_type + ))] + UnsupportedType { + /// The field name + field_name: String, + /// The unsupported data type + data_type: DataType, + }, +} + +/// Encodes a batch of events into Arrow IPC streaming format +pub fn encode_events_to_arrow_ipc_stream( + events: &[Event], + schema: Option>, +) -> Result { + if events.is_empty() { + return Err(ArrowEncodingError::NoEvents); + } + + let schema_ref = if let Some(provided_schema) = schema { + provided_schema + } else { + return Err(ArrowEncodingError::NoSchemaProvided); + }; + + let record_batch = build_record_batch(Arc::::clone(&schema_ref), events)?; + + debug!( + "Built RecordBatch with {} rows and {} columns", + record_batch.num_rows(), + record_batch.num_columns() + ); + + // Encode to Arrow IPC format + let mut buffer = BytesMut::new().writer(); + { + let mut writer = StreamWriter::try_new(&mut buffer, &schema_ref) + .map_err(|source| ArrowEncodingError::IpcWrite { source })?; + + writer + .write(&record_batch) + .map_err(|source| ArrowEncodingError::IpcWrite { source })?; + + writer + .finish() + .map_err(|source| ArrowEncodingError::IpcWrite { source })?; + } + + let encoded_bytes = buffer.into_inner().freeze(); + debug!( + "Encoded to {} bytes of Arrow IPC stream data", + encoded_bytes.len() + ); + + Ok(encoded_bytes) +} + +/// Builds an Arrow RecordBatch from events +fn build_record_batch( + schema: Arc, + events: &[Event], +) -> Result { + let num_fields = schema.fields().len(); + let mut columns: Vec = Vec::with_capacity(num_fields); + + for field in schema.fields() { + let field_name = field.name(); + let array: ArrayRef = match field.data_type() { + DataType::Timestamp(time_unit, _) => { + build_timestamp_array(events, field_name, *time_unit)? + } + DataType::Utf8 => build_string_array(events, field_name)?, + DataType::Int64 => build_int64_array(events, field_name)?, + DataType::UInt8 => build_uint8_array(events, field_name)?, + DataType::UInt16 => build_uint16_array(events, field_name)?, + DataType::UInt32 => build_uint32_array(events, field_name)?, + DataType::UInt64 => build_uint64_array(events, field_name)?, + DataType::Float64 => build_float64_array(events, field_name)?, + DataType::Boolean => build_boolean_array(events, field_name)?, + DataType::Binary => build_binary_array(events, field_name)?, + DataType::Decimal128(precision, scale) => { + build_decimal128_array(events, field_name, *precision, *scale)? + } + DataType::Decimal256(precision, scale) => { + build_decimal256_array(events, field_name, *precision, *scale)? + } + other_type => { + return Err(ArrowEncodingError::UnsupportedType { + field_name: field_name.to_string(), + data_type: other_type.clone(), + }); + } + }; + + columns.push(array); + } + + RecordBatch::try_new(schema, columns) + .map_err(|source| ArrowEncodingError::RecordBatchCreation { source }) +} + +fn extract_timestamp(value: &Value) -> Option> { + match value { + Value::Timestamp(ts) => Some(*ts), + Value::Bytes(bytes) => std::str::from_utf8(bytes) + .ok() + .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok()) + .map(|dt| dt.with_timezone(&Utc)), + _ => None, + } +} + +fn build_timestamp_array( + events: &[Event], + field_name: &str, + time_unit: TimeUnit, +) -> Result { + macro_rules! build_array { + ($builder:ty, $converter:expr) => {{ + let mut builder = <$builder>::new(); + for event in events { + if let Event::Log(log) = event { + let value_to_append = log.get(field_name).and_then(|value| { + // First, try to extract it as a native or string timestamp + if let Some(ts) = extract_timestamp(value) { + $converter(&ts) + } + // Else, fall back to a raw integer + else if let Value::Integer(i) = value { + Some(*i) + } + // Else, it's an unsupported type (e.g., Bool, Float) + else { + None + } + }); + builder.append_option(value_to_append); + } + } + Ok(Arc::new(builder.finish())) + }}; + } + + match time_unit { + TimeUnit::Second => { + build_array!(TimestampSecondBuilder, |ts: &DateTime| Some( + ts.timestamp() + )) + } + TimeUnit::Millisecond => { + build_array!(TimestampMillisecondBuilder, |ts: &DateTime| Some( + ts.timestamp_millis() + )) + } + TimeUnit::Microsecond => { + build_array!(TimestampMicrosecondBuilder, |ts: &DateTime| Some( + ts.timestamp_micros() + )) + } + TimeUnit::Nanosecond => { + build_array!(TimestampNanosecondBuilder, |ts: &DateTime| ts + .timestamp_nanos_opt()) + } + } +} + +fn build_string_array(events: &[Event], field_name: &str) -> Result { + let mut builder = StringBuilder::new(); + + for event in events { + if let Event::Log(log) = event { + if let Some(value) = log.get(field_name) { + match value { + Value::Bytes(bytes) => { + // Attempt direct UTF-8 conversion first, fallback to lossy + match std::str::from_utf8(bytes) { + Ok(s) => builder.append_value(s), + Err(_) => builder.append_value(&String::from_utf8_lossy(bytes)), + } + } + Value::Object(obj) => match serde_json::to_string(&obj) { + Ok(s) => builder.append_value(s), + Err(_) => builder.append_null(), + }, + Value::Array(arr) => match serde_json::to_string(&arr) { + Ok(s) => builder.append_value(s), + Err(_) => builder.append_null(), + }, + _ => { + builder.append_value(&value.to_string_lossy()); + } + } + } else { + builder.append_null(); + } + } + } + + Ok(Arc::new(builder.finish())) +} + +fn build_int64_array(events: &[Event], field_name: &str) -> Result { + let mut builder = Int64Builder::new(); + + for event in events { + if let Event::Log(log) = event { + match log.get(field_name) { + Some(Value::Integer(i)) => builder.append_value(*i), + _ => builder.append_null(), + } + } + } + + Ok(Arc::new(builder.finish())) +} + +fn build_uint8_array(events: &[Event], field_name: &str) -> Result { + let mut builder = UInt8Builder::new(); + + for event in events { + if let Event::Log(log) = event { + match log.get(field_name) { + Some(Value::Integer(i)) if *i >= 0 && *i <= u8::MAX as i64 => { + builder.append_value(*i as u8) + } + _ => builder.append_null(), + } + } + } + + Ok(Arc::new(builder.finish())) +} + +fn build_uint16_array(events: &[Event], field_name: &str) -> Result { + let mut builder = UInt16Builder::new(); + + for event in events { + if let Event::Log(log) = event { + match log.get(field_name) { + Some(Value::Integer(i)) if *i >= 0 && *i <= u16::MAX as i64 => { + builder.append_value(*i as u16) + } + _ => builder.append_null(), + } + } + } + + Ok(Arc::new(builder.finish())) +} + +fn build_uint32_array(events: &[Event], field_name: &str) -> Result { + let mut builder = UInt32Builder::new(); + + for event in events { + if let Event::Log(log) = event { + match log.get(field_name) { + Some(Value::Integer(i)) if *i >= 0 && *i <= u32::MAX as i64 => { + builder.append_value(*i as u32) + } + _ => builder.append_null(), + } + } + } + + Ok(Arc::new(builder.finish())) +} + +fn build_uint64_array(events: &[Event], field_name: &str) -> Result { + let mut builder = UInt64Builder::new(); + + for event in events { + if let Event::Log(log) = event { + match log.get(field_name) { + Some(Value::Integer(i)) if *i >= 0 => builder.append_value(*i as u64), + _ => builder.append_null(), + } + } + } + + Ok(Arc::new(builder.finish())) +} + +fn build_float64_array(events: &[Event], field_name: &str) -> Result { + let mut builder = Float64Builder::new(); + + for event in events { + if let Event::Log(log) = event { + match log.get(field_name) { + Some(Value::Float(f)) => builder.append_value(f.into_inner()), + Some(Value::Integer(i)) => builder.append_value(*i as f64), + _ => builder.append_null(), + } + } + } + + Ok(Arc::new(builder.finish())) +} + +fn build_boolean_array(events: &[Event], field_name: &str) -> Result { + let mut builder = BooleanBuilder::new(); + + for event in events { + if let Event::Log(log) = event { + match log.get(field_name) { + Some(Value::Boolean(b)) => builder.append_value(*b), + _ => builder.append_null(), + } + } + } + + Ok(Arc::new(builder.finish())) +} + +fn build_binary_array(events: &[Event], field_name: &str) -> Result { + let mut builder = BinaryBuilder::new(); + + for event in events { + if let Event::Log(log) = event { + match log.get(field_name) { + Some(Value::Bytes(bytes)) => builder.append_value(bytes), + _ => builder.append_null(), + } + } + } + + Ok(Arc::new(builder.finish())) +} + +fn build_decimal128_array( + events: &[Event], + field_name: &str, + precision: u8, + scale: i8, +) -> Result { + let mut builder = Decimal128Builder::new() + .with_precision_and_scale(precision, scale) + .map_err(|_| ArrowEncodingError::UnsupportedType { + field_name: field_name.to_string(), + data_type: DataType::Decimal128(precision, scale), + })?; + + let target_scale = scale.unsigned_abs() as u32; + + for event in events { + if let Event::Log(log) = event { + match log.get(field_name) { + Some(Value::Float(f)) => { + if let Ok(mut decimal) = Decimal::try_from(f.into_inner()) { + decimal.rescale(target_scale); + let mantissa = decimal.mantissa(); + builder.append_value(mantissa); + } else { + builder.append_null(); + } + } + Some(Value::Integer(i)) => { + let mut decimal = Decimal::from(*i); + decimal.rescale(target_scale); + let mantissa = decimal.mantissa(); + builder.append_value(mantissa); + } + _ => builder.append_null(), + } + } + } + + Ok(Arc::new(builder.finish())) +} + +fn build_decimal256_array( + events: &[Event], + field_name: &str, + precision: u8, + scale: i8, +) -> Result { + let mut builder = Decimal256Builder::new() + .with_precision_and_scale(precision, scale) + .map_err(|_| ArrowEncodingError::UnsupportedType { + field_name: field_name.to_string(), + data_type: DataType::Decimal256(precision, scale), + })?; + + let target_scale = scale.unsigned_abs() as u32; + + for event in events { + if let Event::Log(log) = event { + match log.get(field_name) { + Some(Value::Float(f)) => { + if let Ok(mut decimal) = Decimal::try_from(f.into_inner()) { + decimal.rescale(target_scale); + let mantissa = decimal.mantissa(); + // rust_decimal does not support i256 natively so we upcast here + builder.append_value(i256::from_i128(mantissa)); + } else { + builder.append_null(); + } + } + Some(Value::Integer(i)) => { + let mut decimal = Decimal::from(*i); + decimal.rescale(target_scale); + let mantissa = decimal.mantissa(); + builder.append_value(i256::from_i128(mantissa)); + } + _ => builder.append_null(), + } + } + } + + Ok(Arc::new(builder.finish())) +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow::{ + array::{ + Array, BinaryArray, BooleanArray, Float64Array, Int64Array, StringArray, + TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray, + TimestampSecondArray, + }, + datatypes::Field, + ipc::reader::StreamReader, + }; + use chrono::Utc; + use std::io::Cursor; + use vector_core::event::LogEvent; + + #[test] + fn test_encode_all_types() { + let mut log = LogEvent::default(); + log.insert("string_field", "test"); + log.insert("int_field", 42); + log.insert("float_field", 3.15); + log.insert("bool_field", true); + log.insert("bytes_field", bytes::Bytes::from("binary")); + log.insert("timestamp_field", Utc::now()); + + let events = vec![Event::Log(log)]; + + let schema = Arc::new(Schema::new(vec![ + Field::new("string_field", DataType::Utf8, true), + Field::new("int_field", DataType::Int64, true), + Field::new("float_field", DataType::Float64, true), + Field::new("bool_field", DataType::Boolean, true), + Field::new("bytes_field", DataType::Binary, true), + Field::new( + "timestamp_field", + DataType::Timestamp(TimeUnit::Millisecond, None), + true, + ), + ])); + + let result = encode_events_to_arrow_ipc_stream(&events, Some(Arc::clone(&schema))); + assert!(result.is_ok()); + + let bytes = result.unwrap(); + let cursor = Cursor::new(bytes); + let mut reader = StreamReader::try_new(cursor, None).unwrap(); + let batch = reader.next().unwrap().unwrap(); + + assert_eq!(batch.num_rows(), 1); + assert_eq!(batch.num_columns(), 6); + + // Verify each column has data + assert_eq!( + batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap() + .value(0), + "test" + ); + assert_eq!( + batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap() + .value(0), + 42 + ); + assert!( + (batch + .column(2) + .as_any() + .downcast_ref::() + .unwrap() + .value(0) + - 3.15) + .abs() + < 0.001 + ); + assert!( + batch + .column(3) + .as_any() + .downcast_ref::() + .unwrap() + .value(0), + "{}", + true + ); + assert_eq!( + batch + .column(4) + .as_any() + .downcast_ref::() + .unwrap() + .value(0), + b"binary" + ); + assert!( + !batch + .column(5) + .as_any() + .downcast_ref::() + .unwrap() + .is_null(0) + ); + } + + #[test] + fn test_encode_null_values() { + let mut log1 = LogEvent::default(); + log1.insert("field_a", 1); + // field_b is missing + + let mut log2 = LogEvent::default(); + log2.insert("field_b", 2); + // field_a is missing + + let events = vec![Event::Log(log1), Event::Log(log2)]; + + let schema = Arc::new(Schema::new(vec![ + Field::new("field_a", DataType::Int64, true), + Field::new("field_b", DataType::Int64, true), + ])); + + let result = encode_events_to_arrow_ipc_stream(&events, Some(Arc::clone(&schema))); + assert!(result.is_ok()); + + let bytes = result.unwrap(); + let cursor = Cursor::new(bytes); + let mut reader = StreamReader::try_new(cursor, None).unwrap(); + let batch = reader.next().unwrap().unwrap(); + + assert_eq!(batch.num_rows(), 2); + + let field_a = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(field_a.value(0), 1); + assert!(field_a.is_null(1)); + + let field_b = batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + assert!(field_b.is_null(0)); + assert_eq!(field_b.value(1), 2); + } + + #[test] + fn test_encode_type_mismatches() { + let mut log1 = LogEvent::default(); + log1.insert("field", 42); // Integer + + let mut log2 = LogEvent::default(); + log2.insert("field", 3.15); // Float - type mismatch! + + let events = vec![Event::Log(log1), Event::Log(log2)]; + + // Schema expects Int64 + let schema = Arc::new(Schema::new(vec![Field::new( + "field", + DataType::Int64, + true, + )])); + + let result = encode_events_to_arrow_ipc_stream(&events, Some(Arc::clone(&schema))); + assert!(result.is_ok()); + + let bytes = result.unwrap(); + let cursor = Cursor::new(bytes); + let mut reader = StreamReader::try_new(cursor, None).unwrap(); + let batch = reader.next().unwrap().unwrap(); + + assert_eq!(batch.num_rows(), 2); + + let field_array = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(field_array.value(0), 42); + assert!(field_array.is_null(1)); // Type mismatch becomes null + } + + #[test] + fn test_encode_complex_json_values() { + use serde_json::json; + + let mut log = LogEvent::default(); + log.insert( + "object_field", + json!({"key": "value", "nested": {"count": 42}}), + ); + log.insert("array_field", json!([1, 2, 3])); + + let events = vec![Event::Log(log)]; + + let schema = Arc::new(Schema::new(vec![ + Field::new("object_field", DataType::Utf8, true), + Field::new("array_field", DataType::Utf8, true), + ])); + + let result = encode_events_to_arrow_ipc_stream(&events, Some(Arc::clone(&schema))); + assert!(result.is_ok()); + + let bytes = result.unwrap(); + let cursor = Cursor::new(bytes); + let mut reader = StreamReader::try_new(cursor, None).unwrap(); + let batch = reader.next().unwrap().unwrap(); + + assert_eq!(batch.num_rows(), 1); + + let object_array = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let object_str = object_array.value(0); + assert!(object_str.contains("key")); + assert!(object_str.contains("value")); + + let array_array = batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + let array_str = array_array.value(0); + assert_eq!(array_str, "[1,2,3]"); + } + + #[test] + fn test_encode_unsupported_type() { + let mut log = LogEvent::default(); + log.insert("field", "value"); + + let events = vec![Event::Log(log)]; + + // Use an unsupported type + let schema = Arc::new(Schema::new(vec![Field::new( + "field", + DataType::Duration(TimeUnit::Millisecond), + true, + )])); + + let result = encode_events_to_arrow_ipc_stream(&events, Some(schema)); + assert!(result.is_err()); + assert!(matches!( + result.unwrap_err(), + ArrowEncodingError::UnsupportedType { .. } + )); + } + + #[test] + fn test_encode_without_schema_fails() { + let mut log1 = LogEvent::default(); + log1.insert("message", "hello"); + + let events = vec![Event::Log(log1)]; + + let result = encode_events_to_arrow_ipc_stream(&events, None); + assert!(result.is_err()); + assert!(matches!( + result.unwrap_err(), + ArrowEncodingError::NoSchemaProvided + )); + } + + #[test] + fn test_encode_empty_events() { + let events: Vec = vec![]; + let result = encode_events_to_arrow_ipc_stream(&events, None); + assert!(result.is_err()); + assert!(matches!(result.unwrap_err(), ArrowEncodingError::NoEvents)); + } + + #[test] + fn test_encode_timestamp_precisions() { + let now = Utc::now(); + let mut log = LogEvent::default(); + log.insert("ts_second", now); + log.insert("ts_milli", now); + log.insert("ts_micro", now); + log.insert("ts_nano", now); + + let events = vec![Event::Log(log)]; + + let schema = Arc::new(Schema::new(vec![ + Field::new( + "ts_second", + DataType::Timestamp(TimeUnit::Second, None), + true, + ), + Field::new( + "ts_milli", + DataType::Timestamp(TimeUnit::Millisecond, None), + true, + ), + Field::new( + "ts_micro", + DataType::Timestamp(TimeUnit::Microsecond, None), + true, + ), + Field::new( + "ts_nano", + DataType::Timestamp(TimeUnit::Nanosecond, None), + true, + ), + ])); + + let result = encode_events_to_arrow_ipc_stream(&events, Some(Arc::clone(&schema))); + assert!(result.is_ok()); + + let bytes = result.unwrap(); + let cursor = Cursor::new(bytes); + let mut reader = StreamReader::try_new(cursor, None).unwrap(); + let batch = reader.next().unwrap().unwrap(); + + assert_eq!(batch.num_rows(), 1); + assert_eq!(batch.num_columns(), 4); + + let ts_second = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert!(!ts_second.is_null(0)); + assert_eq!(ts_second.value(0), now.timestamp()); + + let ts_milli = batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + assert!(!ts_milli.is_null(0)); + assert_eq!(ts_milli.value(0), now.timestamp_millis()); + + let ts_micro = batch + .column(2) + .as_any() + .downcast_ref::() + .unwrap(); + assert!(!ts_micro.is_null(0)); + assert_eq!(ts_micro.value(0), now.timestamp_micros()); + + let ts_nano = batch + .column(3) + .as_any() + .downcast_ref::() + .unwrap(); + assert!(!ts_nano.is_null(0)); + assert_eq!(ts_nano.value(0), now.timestamp_nanos_opt().unwrap()); + } + + #[test] + fn test_encode_mixed_timestamp_string_and_native() { + // Test mixing string timestamps with native Timestamp values + let mut log1 = LogEvent::default(); + log1.insert("ts", "2025-10-22T10:18:44.256Z"); // String + + let mut log2 = LogEvent::default(); + log2.insert("ts", Utc::now()); // Native Timestamp + + let mut log3 = LogEvent::default(); + log3.insert("ts", 1729594724256000000_i64); // Integer (nanoseconds) + + let events = vec![Event::Log(log1), Event::Log(log2), Event::Log(log3)]; + + let schema = Arc::new(Schema::new(vec![Field::new( + "ts", + DataType::Timestamp(TimeUnit::Nanosecond, None), + true, + )])); + + let result = encode_events_to_arrow_ipc_stream(&events, Some(Arc::clone(&schema))); + assert!(result.is_ok()); + + let bytes = result.unwrap(); + let cursor = Cursor::new(bytes); + let mut reader = StreamReader::try_new(cursor, None).unwrap(); + let batch = reader.next().unwrap().unwrap(); + + assert_eq!(batch.num_rows(), 3); + + let ts_array = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + + // All three should be non-null + assert!(!ts_array.is_null(0)); + assert!(!ts_array.is_null(1)); + assert!(!ts_array.is_null(2)); + + // First one should match the parsed string + let expected = chrono::DateTime::parse_from_rfc3339("2025-10-22T10:18:44.256Z") + .unwrap() + .timestamp_nanos_opt() + .unwrap(); + assert_eq!(ts_array.value(0), expected); + + // Third one should match the integer + assert_eq!(ts_array.value(2), 1729594724256000000_i64); + } + + #[test] + fn test_encode_invalid_string_timestamp() { + // Test that invalid timestamp strings become null + let mut log1 = LogEvent::default(); + log1.insert("timestamp", "not-a-timestamp"); + + let mut log2 = LogEvent::default(); + log2.insert("timestamp", "2025-10-22T10:18:44.256Z"); // Valid + + let mut log3 = LogEvent::default(); + log3.insert("timestamp", "2025-99-99T99:99:99Z"); // Invalid + + let events = vec![Event::Log(log1), Event::Log(log2), Event::Log(log3)]; + + let schema = Arc::new(Schema::new(vec![Field::new( + "timestamp", + DataType::Timestamp(TimeUnit::Nanosecond, None), + true, + )])); + + let result = encode_events_to_arrow_ipc_stream(&events, Some(Arc::clone(&schema))); + assert!(result.is_ok()); + + let bytes = result.unwrap(); + let cursor = Cursor::new(bytes); + let mut reader = StreamReader::try_new(cursor, None).unwrap(); + let batch = reader.next().unwrap().unwrap(); + + assert_eq!(batch.num_rows(), 3); + + let ts_array = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + + // Invalid timestamps should be null + assert!(ts_array.is_null(0)); + assert!(!ts_array.is_null(1)); // Valid one + assert!(ts_array.is_null(2)); + } + + #[test] + fn test_encode_decimal128_from_integer() { + use arrow::array::Decimal128Array; + + let mut log = LogEvent::default(); + // Store quantity as integer: 1000 + log.insert("quantity", 1000_i64); + + let events = vec![Event::Log(log)]; + + // Decimal(10, 3) - will represent 1000 as 1000.000 + let schema = Arc::new(Schema::new(vec![Field::new( + "quantity", + DataType::Decimal128(10, 3), + true, + )])); + + let result = encode_events_to_arrow_ipc_stream(&events, Some(Arc::clone(&schema))); + assert!(result.is_ok()); + + let bytes = result.unwrap(); + let cursor = Cursor::new(bytes); + let mut reader = StreamReader::try_new(cursor, None).unwrap(); + let batch = reader.next().unwrap().unwrap(); + + assert_eq!(batch.num_rows(), 1); + + let decimal_array = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + + assert!(!decimal_array.is_null(0)); + // 1000 with scale 3 = 1000 * 10^3 = 1000000 + assert_eq!(decimal_array.value(0), 1000000_i128); + } + + #[test] + fn test_encode_decimal256() { + use arrow::array::Decimal256Array; + + let mut log = LogEvent::default(); + // Very large precision number + log.insert("big_value", 123456789.123456_f64); + + let events = vec![Event::Log(log)]; + + // Decimal256(50, 6) - high precision decimal + let schema = Arc::new(Schema::new(vec![Field::new( + "big_value", + DataType::Decimal256(50, 6), + true, + )])); + + let result = encode_events_to_arrow_ipc_stream(&events, Some(Arc::clone(&schema))); + assert!(result.is_ok()); + + let bytes = result.unwrap(); + let cursor = Cursor::new(bytes); + let mut reader = StreamReader::try_new(cursor, None).unwrap(); + let batch = reader.next().unwrap().unwrap(); + + assert_eq!(batch.num_rows(), 1); + + let decimal_array = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + + assert!(!decimal_array.is_null(0)); + // Value should be non-null and encoded + let value = decimal_array.value(0); + assert!(value.to_i128().is_some()); + } + + #[test] + fn test_encode_decimal_null_values() { + use arrow::array::Decimal128Array; + + let mut log1 = LogEvent::default(); + log1.insert("price", 99.99_f64); + + let log2 = LogEvent::default(); + // No price field - should be null + + let mut log3 = LogEvent::default(); + log3.insert("price", 50.00_f64); + + let events = vec![Event::Log(log1), Event::Log(log2), Event::Log(log3)]; + + let schema = Arc::new(Schema::new(vec![Field::new( + "price", + DataType::Decimal128(10, 2), + true, + )])); + + let result = encode_events_to_arrow_ipc_stream(&events, Some(Arc::clone(&schema))); + assert!(result.is_ok()); + + let bytes = result.unwrap(); + let cursor = Cursor::new(bytes); + let mut reader = StreamReader::try_new(cursor, None).unwrap(); + let batch = reader.next().unwrap().unwrap(); + + assert_eq!(batch.num_rows(), 3); + + let decimal_array = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + + // First row: 99.99 + assert!(!decimal_array.is_null(0)); + assert_eq!(decimal_array.value(0), 9999_i128); + + // Second row: null + assert!(decimal_array.is_null(1)); + + // Third row: 50.00 + assert!(!decimal_array.is_null(2)); + assert_eq!(decimal_array.value(2), 5000_i128); + } + + #[test] + fn test_encode_unsigned_integer_types() { + use arrow::array::{UInt8Array, UInt16Array, UInt32Array, UInt64Array}; + + let mut log = LogEvent::default(); + log.insert("uint8_field", 255_i64); + log.insert("uint16_field", 65535_i64); + log.insert("uint32_field", 4294967295_i64); + log.insert("uint64_field", 9223372036854775807_i64); + + let events = vec![Event::Log(log)]; + + let schema = Arc::new(Schema::new(vec![ + Field::new("uint8_field", DataType::UInt8, true), + Field::new("uint16_field", DataType::UInt16, true), + Field::new("uint32_field", DataType::UInt32, true), + Field::new("uint64_field", DataType::UInt64, true), + ])); + + let result = encode_events_to_arrow_ipc_stream(&events, Some(Arc::clone(&schema))); + assert!(result.is_ok()); + + let bytes = result.unwrap(); + let cursor = Cursor::new(bytes); + let mut reader = StreamReader::try_new(cursor, None).unwrap(); + let batch = reader.next().unwrap().unwrap(); + + assert_eq!(batch.num_rows(), 1); + assert_eq!(batch.num_columns(), 4); + + // Verify uint8 + let uint8_array = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(uint8_array.value(0), 255_u8); + + // Verify uint16 + let uint16_array = batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(uint16_array.value(0), 65535_u16); + + // Verify uint32 + let uint32_array = batch + .column(2) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(uint32_array.value(0), 4294967295_u32); + + // Verify uint64 + let uint64_array = batch + .column(3) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(uint64_array.value(0), 9223372036854775807_u64); + } + + #[test] + fn test_encode_unsigned_integers_with_null_and_overflow() { + use arrow::array::{UInt8Array, UInt32Array}; + + let mut log1 = LogEvent::default(); + log1.insert("uint8_field", 100_i64); + log1.insert("uint32_field", 1000_i64); + + let mut log2 = LogEvent::default(); + log2.insert("uint8_field", 300_i64); // Overflow - should be null + log2.insert("uint32_field", -1_i64); // Negative - should be null + + let log3 = LogEvent::default(); + // Missing fields - should be null + + let events = vec![Event::Log(log1), Event::Log(log2), Event::Log(log3)]; + + let schema = Arc::new(Schema::new(vec![ + Field::new("uint8_field", DataType::UInt8, true), + Field::new("uint32_field", DataType::UInt32, true), + ])); + + let result = encode_events_to_arrow_ipc_stream(&events, Some(Arc::clone(&schema))); + assert!(result.is_ok()); + + let bytes = result.unwrap(); + let cursor = Cursor::new(bytes); + let mut reader = StreamReader::try_new(cursor, None).unwrap(); + let batch = reader.next().unwrap().unwrap(); + + assert_eq!(batch.num_rows(), 3); + + // Check uint8 column + let uint8_array = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(uint8_array.value(0), 100_u8); // Valid + assert!(uint8_array.is_null(1)); // Overflow + assert!(uint8_array.is_null(2)); // Missing + + // Check uint32 column + let uint32_array = batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(uint32_array.value(0), 1000_u32); // Valid + assert!(uint32_array.is_null(1)); // Negative + assert!(uint32_array.is_null(2)); // Missing + } +} diff --git a/lib/codecs/src/encoding/format/mod.rs b/lib/codecs/src/encoding/format/mod.rs index 9377cdca5d906..ccafb2b969cd7 100644 --- a/lib/codecs/src/encoding/format/mod.rs +++ b/lib/codecs/src/encoding/format/mod.rs @@ -3,6 +3,8 @@ #![deny(missing_docs)] +#[cfg(feature = "arrow")] +mod arrow; mod avro; mod cef; mod common; @@ -20,6 +22,10 @@ mod text; use std::fmt::Debug; +#[cfg(feature = "arrow")] +pub use arrow::{ + ArrowEncodingError, ArrowStreamSerializer, ArrowStreamSerializerConfig, SchemaProvider, +}; pub use avro::{AvroSerializer, AvroSerializerConfig, AvroSerializerOptions}; pub use cef::{CefSerializer, CefSerializerConfig}; use dyn_clone::DynClone; diff --git a/lib/codecs/src/encoding/mod.rs b/lib/codecs/src/encoding/mod.rs index 8dd2c4ddc79a5..d690e2727fd4e 100644 --- a/lib/codecs/src/encoding/mod.rs +++ b/lib/codecs/src/encoding/mod.rs @@ -6,6 +6,10 @@ pub mod format; pub mod framing; pub mod serializer; pub use chunking::{Chunker, Chunking, GelfChunker}; +#[cfg(feature = "arrow")] +pub use format::{ + ArrowEncodingError, ArrowStreamSerializer, ArrowStreamSerializerConfig, SchemaProvider, +}; pub use format::{ AvroSerializer, AvroSerializerConfig, AvroSerializerOptions, CefSerializer, CefSerializerConfig, CsvSerializer, CsvSerializerConfig, GelfSerializer, GelfSerializerConfig, @@ -24,6 +28,8 @@ pub use framing::{ NewlineDelimitedEncoderConfig, VarintLengthDelimitedEncoder, VarintLengthDelimitedEncoderConfig, }; +#[cfg(feature = "arrow")] +pub use serializer::BatchSerializerConfig; pub use serializer::{Serializer, SerializerConfig}; /// An error that occurred while building an encoder. diff --git a/lib/codecs/src/encoding/serializer.rs b/lib/codecs/src/encoding/serializer.rs index eef088fca4b72..cc8b87f61aa50 100644 --- a/lib/codecs/src/encoding/serializer.rs +++ b/lib/codecs/src/encoding/serializer.rs @@ -5,6 +5,8 @@ use vector_config::configurable_component; use vector_core::{config::DataType, event::Event, schema}; use super::chunking::Chunker; +#[cfg(feature = "arrow")] +use super::format::{ArrowStreamSerializer, ArrowStreamSerializerConfig}; use super::format::{ AvroSerializer, AvroSerializerConfig, AvroSerializerOptions, CefSerializer, CefSerializerConfig, CsvSerializer, CsvSerializerConfig, GelfSerializer, GelfSerializerConfig, @@ -132,6 +134,49 @@ impl Default for SerializerConfig { } } +/// Batch serializer configuration. +#[configurable_component] +#[derive(Clone, Debug)] +#[serde(tag = "codec", rename_all = "snake_case")] +#[configurable(metadata(docs::enum_tag_description = "The codec to use for batch encoding events."))] +pub enum BatchSerializerConfig { + /// Encodes events in [Apache Arrow][apache_arrow] IPC streaming format. + /// + /// This is the streaming variant of the Arrow IPC format, which writes + /// a continuous stream of record batches. + /// + /// [apache_arrow]: https://arrow.apache.org/ + #[cfg(feature = "arrow")] + #[serde(rename = "arrow_stream")] + ArrowStream(ArrowStreamSerializerConfig), +} + +#[cfg(feature = "arrow")] +impl BatchSerializerConfig { + /// Build the `ArrowStreamSerializer` from this configuration. + pub fn build(&self) -> Result> { + match self { + BatchSerializerConfig::ArrowStream(arrow_config) => { + ArrowStreamSerializer::new(arrow_config.clone()) + } + } + } + + /// The data type of events that are accepted by this batch serializer. + pub fn input_type(&self) -> DataType { + match self { + BatchSerializerConfig::ArrowStream(arrow_config) => arrow_config.input_type(), + } + } + + /// The schema required by the batch serializer. + pub fn schema_requirement(&self) -> schema::Requirement { + match self { + BatchSerializerConfig::ArrowStream(arrow_config) => arrow_config.schema_requirement(), + } + } +} + impl From for SerializerConfig { fn from(config: AvroSerializerConfig) -> Self { Self::Avro { avro: config.avro } diff --git a/lib/vector-lib/Cargo.toml b/lib/vector-lib/Cargo.toml index 3ba208e62c5ad..b368b25890cab 100644 --- a/lib/vector-lib/Cargo.toml +++ b/lib/vector-lib/Cargo.toml @@ -26,8 +26,9 @@ vrl = { workspace = true, optional = true } [features] api = ["vector-tap/api"] api-client = ["dep:vector-api-client"] -lua = ["vector-core/lua"] +arrow = ["codecs/arrow"] file-source = ["dep:file-source", "dep:file-source-common"] +lua = ["vector-core/lua"] opentelemetry = ["dep:opentelemetry-proto", "codecs/opentelemetry"] prometheus = ["dep:prometheus-parser"] proptest = ["vector-lookup/proptest", "vrl/proptest"] diff --git a/src/codecs/encoding/config.rs b/src/codecs/encoding/config.rs index a04f44315047a..377bb50552de4 100644 --- a/src/codecs/encoding/config.rs +++ b/src/codecs/encoding/config.rs @@ -1,4 +1,4 @@ -use crate::codecs::Transformer; +use crate::codecs::{Encoder, EncoderKind, Transformer}; use vector_lib::{ codecs::{ CharacterDelimitedEncoder, LengthDelimitedEncoder, NewlineDelimitedEncoder, @@ -138,6 +138,13 @@ impl EncodingConfigWithFraming { Ok((framer, serializer)) } + + /// Build the `Transformer` and `EncoderKind` for this config. + pub fn build_encoder(&self, sink_type: SinkType) -> crate::Result<(Transformer, EncoderKind)> { + let (framer, serializer) = self.build(sink_type)?; + let encoder = EncoderKind::Framed(Encoder::::new(framer, serializer)); + Ok((self.transformer(), encoder)) + } } /// The way a sink processes outgoing events. diff --git a/src/codecs/encoding/encoder.rs b/src/codecs/encoding/encoder.rs index f1d0741bb669c..aad4da34242d6 100644 --- a/src/codecs/encoding/encoder.rs +++ b/src/codecs/encoding/encoder.rs @@ -1,5 +1,7 @@ use bytes::BytesMut; use tokio_util::codec::Encoder as _; +#[cfg(feature = "codecs-arrow")] +use vector_lib::codecs::encoding::ArrowStreamSerializer; use vector_lib::codecs::{ CharacterDelimitedEncoder, NewlineDelimitedEncoder, TextSerializerConfig, encoding::{Error, Framer, Serializer}, @@ -10,6 +12,66 @@ use crate::{ internal_events::{EncoderFramingError, EncoderSerializeError}, }; +/// Serializers that support batch encoding (encoding all events at once). +#[derive(Debug, Clone)] +pub enum BatchSerializer { + /// Arrow IPC stream format serializer. + #[cfg(feature = "codecs-arrow")] + Arrow(ArrowStreamSerializer), +} + +/// An encoder that encodes batches of events. +#[derive(Debug, Clone)] +pub struct BatchEncoder { + serializer: BatchSerializer, +} + +impl BatchEncoder { + /// Creates a new `BatchEncoder` with the specified batch serializer. + pub const fn new(serializer: BatchSerializer) -> Self { + Self { serializer } + } + + /// Get the batch serializer. + pub const fn serializer(&self) -> &BatchSerializer { + &self.serializer + } + + /// Get the HTTP content type. + pub const fn content_type(&self) -> &'static str { + match &self.serializer { + #[cfg(feature = "codecs-arrow")] + BatchSerializer::Arrow(_) => "application/vnd.apache.arrow.stream", + } + } +} + +impl tokio_util::codec::Encoder> for BatchEncoder { + type Error = Error; + + #[allow(unused_variables)] + fn encode(&mut self, events: Vec, buffer: &mut BytesMut) -> Result<(), Self::Error> { + #[allow(unreachable_patterns)] + match &mut self.serializer { + #[cfg(feature = "codecs-arrow")] + BatchSerializer::Arrow(serializer) => serializer + .encode(events, buffer) + .map_err(Error::SerializingError), + _ => unreachable!("BatchSerializer cannot be constructed without encode()"), + } + } +} + +/// An wrapper that supports both framed and batch encoding modes. +#[derive(Debug, Clone)] +pub enum EncoderKind { + /// Uses framing to encode individual events + Framed(Encoder), + /// Encodes events in batches without framing + #[cfg(feature = "codecs-arrow")] + Batch(BatchEncoder), +} + #[derive(Debug, Clone)] /// An encoder that can encode structured events into byte frames. pub struct Encoder diff --git a/src/codecs/encoding/mod.rs b/src/codecs/encoding/mod.rs index 69ede063e896b..36d637bd75090 100644 --- a/src/codecs/encoding/mod.rs +++ b/src/codecs/encoding/mod.rs @@ -3,5 +3,5 @@ mod encoder; mod transformer; pub use config::{EncodingConfig, EncodingConfigWithFraming, SinkType}; -pub use encoder::Encoder; +pub use encoder::{BatchEncoder, BatchSerializer, Encoder, EncoderKind}; pub use transformer::{TimestampFormat, Transformer}; diff --git a/src/codecs/mod.rs b/src/codecs/mod.rs index 4247846cca3a8..32b0e9efb7f8b 100644 --- a/src/codecs/mod.rs +++ b/src/codecs/mod.rs @@ -9,6 +9,7 @@ mod ready_frames; pub use decoding::{Decoder, DecodingConfig}; pub use encoding::{ - Encoder, EncodingConfig, EncodingConfigWithFraming, SinkType, TimestampFormat, Transformer, + BatchEncoder, BatchSerializer, Encoder, EncoderKind, EncodingConfig, EncodingConfigWithFraming, + SinkType, TimestampFormat, Transformer, }; pub use ready_frames::ReadyFrames; diff --git a/src/sinks/util/encoding.rs b/src/sinks/util/encoding.rs index bb5a938ec017f..a04dc4f382a49 100644 --- a/src/sinks/util/encoding.rs +++ b/src/sinks/util/encoding.rs @@ -97,6 +97,55 @@ impl Encoder for (Transformer, crate::codecs::Encoder<()>) { } } +#[cfg(feature = "codecs-arrow")] +impl Encoder> for (Transformer, crate::codecs::BatchEncoder) { + fn encode_input( + &self, + events: Vec, + writer: &mut dyn io::Write, + ) -> io::Result<(usize, GroupedCountByteSize)> { + use tokio_util::codec::Encoder as _; + + let mut encoder = self.1.clone(); + let mut byte_size = telemetry().create_request_count_byte_size(); + let n_events = events.len(); + let mut transformed_events = Vec::with_capacity(n_events); + + for mut event in events { + self.0.transform(&mut event); + byte_size.add_event(&event, event.estimated_json_encoded_size_of()); + transformed_events.push(event); + } + + let mut bytes = BytesMut::new(); + encoder + .encode(transformed_events, &mut bytes) + .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?; + + write_all(writer, n_events, &bytes)?; + Ok((bytes.len(), byte_size)) + } +} + +impl Encoder> for (Transformer, crate::codecs::EncoderKind) { + fn encode_input( + &self, + events: Vec, + writer: &mut dyn io::Write, + ) -> io::Result<(usize, GroupedCountByteSize)> { + // Delegate to the specific encoder implementation + match &self.1 { + crate::codecs::EncoderKind::Framed(encoder) => { + (self.0.clone(), encoder.clone()).encode_input(events, writer) + } + #[cfg(feature = "codecs-arrow")] + crate::codecs::EncoderKind::Batch(encoder) => { + (self.0.clone(), encoder.clone()).encode_input(events, writer) + } + } + } +} + /// Write the buffer to the writer. If the operation fails, emit an internal event which complies with the /// instrumentation spec- as this necessitates both an Error and EventsDropped event. ///