diff --git a/Cargo.lock b/Cargo.lock index 14844d2706f27..859097314d6ed 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -94,6 +94,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" dependencies = [ "cfg-if", + "const-random", "getrandom 0.2.15", "once_cell", "serde", @@ -362,6 +363,175 @@ version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711" +[[package]] +name = "arrow" +version = "56.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e833808ff2d94ed40d9379848a950d995043c7fb3e81a30b383f4c6033821cc" +dependencies = [ + "arrow-arith", + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-data", + "arrow-ipc", + "arrow-ord", + "arrow-row", + "arrow-schema", + "arrow-select", + "arrow-string", +] + +[[package]] +name = "arrow-arith" +version = "56.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad08897b81588f60ba983e3ca39bda2b179bdd84dced378e7df81a5313802ef8" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "chrono", + "num", +] + +[[package]] +name = "arrow-array" +version = "56.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8548ca7c070d8db9ce7aa43f37393e4bfcf3f2d3681df278490772fd1673d08d" +dependencies = [ + "ahash 0.8.11", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "chrono", + "half", + "hashbrown 0.16.0", + "num", +] + +[[package]] +name = "arrow-buffer" +version = "56.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e003216336f70446457e280807a73899dd822feaf02087d31febca1363e2fccc" +dependencies = [ + "bytes 1.10.1", + "half", + "num", +] + +[[package]] +name = "arrow-cast" +version = "56.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "919418a0681298d3a77d1a315f625916cb5678ad0d74b9c60108eb15fd083023" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "arrow-select", + "atoi", + "base64 0.22.1", + "chrono", + "half", + "lexical-core", + "num", + "ryu", +] + +[[package]] +name = "arrow-data" +version = "56.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a5c64fff1d142f833d78897a772f2e5b55b36cb3e6320376f0961ab0db7bd6d0" +dependencies = [ + "arrow-buffer", + "arrow-schema", + "half", + "num", +] + +[[package]] +name = "arrow-ipc" +version = "56.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d3594dcddccc7f20fd069bc8e9828ce37220372680ff638c5e00dea427d88f5" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "arrow-select", + "flatbuffers", +] + +[[package]] +name = "arrow-ord" +version = "56.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c8f82583eb4f8d84d4ee55fd1cb306720cddead7596edce95b50ee418edf66f" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "arrow-select", +] + +[[package]] +name = "arrow-row" +version = "56.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d07ba24522229d9085031df6b94605e0f4b26e099fb7cdeec37abd941a73753" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "half", +] + +[[package]] +name = "arrow-schema" +version = "56.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b3aa9e59c611ebc291c28582077ef25c97f1975383f1479b12f3b9ffee2ffabe" + +[[package]] +name = "arrow-select" +version = "56.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c41dbbd1e97bfcaee4fcb30e29105fb2c75e4d82ae4de70b792a5d3f66b2e7a" +dependencies = [ + "ahash 0.8.11", + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "num", +] + +[[package]] +name = "arrow-string" +version = "56.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53f5183c150fbc619eede22b861ea7c0eebed8eaac0333eaa7f6da5205fd504d" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "arrow-select", + "memchr", + "num", + "regex", + "regex-syntax", +] + [[package]] name = "ascii" version = "0.9.3" @@ -1938,6 +2108,29 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3eeab4423108c5d7c744f4d234de88d18d636100093ae04caf4825134b9c3a32" +[[package]] +name = "borsh" +version = "1.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad8646f98db542e39fc66e68a20b2144f6a732636df7c2354e74645faaa433ce" +dependencies = [ + "borsh-derive", + "cfg_aliases", +] + +[[package]] +name = "borsh-derive" +version = "1.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fdd1d3c0c2f5833f22386f252fe8ed005c7f59fdcddeef025c01b4c3b9fd9ac3" +dependencies = [ + "once_cell", + "proc-macro-crate 3.2.0", + "proc-macro2 1.0.101", + "quote 1.0.40", + "syn 2.0.106", +] + [[package]] name = "brotli" version = "8.0.0" @@ -2361,6 +2554,7 @@ name = "codecs" version = "0.1.0" dependencies = [ "apache-avro 0.20.0", + "arrow", "bytes 1.10.1", "chrono", "csv-core", @@ -2378,6 +2572,7 @@ dependencies = [ "rand 0.9.2", "regex", "rstest", + "rust_decimal", "serde", "serde_json", "serde_with 3.14.0", @@ -2578,6 +2773,26 @@ version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "28c122c3980598d243d63d9a704629a2d748d101f278052ff068be5a4423ab6f" +[[package]] +name = "const-random" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87e00182fe74b066627d63b85fd550ac2998d4b0bd86bfed477a0ae4c7c71359" +dependencies = [ + "const-random-macro", +] + +[[package]] +name = "const-random-macro" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9d839f2a20b0aee515dc581a6172f2321f96cab76c1a38a4c584a194955390e" +dependencies = [ + "getrandom 0.2.15", + "once_cell", + "tiny-keccak", +] + [[package]] name = "convert_case" version = "0.4.0" @@ -3928,6 +4143,16 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" +[[package]] +name = "flatbuffers" +version = "25.9.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09b6620799e7340ebd9968d2e0708eb82cf1971e9a16821e2091b6d6e475eed5" +dependencies = [ + "bitflags 2.9.0", + "rustc_version 0.4.1", +] + [[package]] name = "flate2" version = "1.1.2" @@ -4504,6 +4729,7 @@ checksum = "6dd08c532ae367adf81c312a4580bc67f1d0fe8bc9c460520283f4c0ff277888" dependencies = [ "cfg-if", "crunchy", + "num-traits", ] [[package]] @@ -5974,6 +6200,63 @@ dependencies = [ "spin 0.5.2", ] +[[package]] +name = "lexical-core" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d8d125a277f807e55a77304455eb7b1cb52f2b18c143b60e766c120bd64a594" +dependencies = [ + "lexical-parse-float", + "lexical-parse-integer", + "lexical-util", + "lexical-write-float", + "lexical-write-integer", +] + +[[package]] +name = "lexical-parse-float" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52a9f232fbd6f550bc0137dcb5f99ab674071ac2d690ac69704593cb4abbea56" +dependencies = [ + "lexical-parse-integer", + "lexical-util", +] + +[[package]] +name = "lexical-parse-integer" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a7a039f8fb9c19c996cd7b2fcce303c1b2874fe1aca544edc85c4a5f8489b34" +dependencies = [ + "lexical-util", +] + +[[package]] +name = "lexical-util" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2604dd126bb14f13fb5d1bd6a66155079cb9fa655b37f875b3a742c705dbed17" + +[[package]] +name = "lexical-write-float" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50c438c87c013188d415fbabbb1dceb44249ab81664efbd31b14ae55dabb6361" +dependencies = [ + "lexical-util", + "lexical-write-integer", +] + +[[package]] +name = "lexical-write-integer" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "409851a618475d2d5796377cad353802345cba92c867d9fbcde9cf4eac4e14df" +dependencies = [ + "lexical-util", +] + [[package]] name = "libc" version = "0.2.175" @@ -9241,12 +9524,18 @@ dependencies = [ [[package]] name = "rust_decimal" -version = "1.33.1" +version = "1.39.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "06676aec5ccb8fc1da723cc8c0f9a46549f21ebb8753d3915c6c41db1e7f1dc4" +checksum = "35affe401787a9bd846712274d97654355d21b2a2c092a3139aabe31e9022282" dependencies = [ "arrayvec", + "borsh", + "bytes 1.10.1", "num-traits", + "rand 0.8.5", + "rkyv", + "serde", + "serde_json", ] [[package]] @@ -10967,6 +11256,15 @@ dependencies = [ "time-core", ] +[[package]] +name = "tiny-keccak" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c9d3793400a45f954c52e73d068316d76b6f4e36977e3fcebb13a2721e80237" +dependencies = [ + "crunchy", +] + [[package]] name = "tinystr" version = "0.7.6" @@ -12173,6 +12471,7 @@ dependencies = [ "approx", "arc-swap", "arr_macro", + "arrow", "assert_cmd", "async-compression", "async-graphql", @@ -12314,6 +12613,7 @@ dependencies = [ "roaring", "rstest", "rumqttc", + "rust_decimal", "seahash", "semver 1.0.26", "serde", diff --git a/Cargo.toml b/Cargo.toml index 8e0d690724f7a..c79f952325636 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -179,6 +179,7 @@ rand = { version = "0.9.2", default-features = false, features = ["small_rng", " rand_distr = { version = "0.5.1", default-features = false } regex = { version = "1.11.2", default-features = false, features = ["std", "perf"] } reqwest = { version = "0.11.26", features = ["json"] } +rust_decimal = { version = "1.33", default-features = false, features = ["std"] } semver = { version = "1.0.26", default-features = false, features = ["serde", "std"] } serde = { version = "1.0.219", default-features = false, features = ["alloc", "derive", "rc"] } serde_json = { version = "1.0.143", default-features = false, features = ["raw_value", "std"] } @@ -339,6 +340,7 @@ greptimedb-ingester = { git = "https://github.com/GreptimeTeam/greptimedb-ingest arc-swap = { version = "1.7", default-features = false, optional = true } async-compression = { version = "0.4.27", default-features = false, features = ["tokio", "gzip", "zstd"], optional = true } apache-avro = { version = "0.16.0", default-features = false, optional = true } +arrow = { version = "56.2.0", default-features = false, features = ["ipc"], optional = true } axum = { version = "0.6.20", default-features = false } base64 = { workspace = true, optional = true } bloomy = { version = "1.2.0", default-features = false, optional = true } @@ -404,6 +406,7 @@ redis = { version = "0.32.4", default-features = false, features = ["connection- regex.workspace = true roaring = { version = "0.11.2", default-features = false, features = ["std"], optional = true } rumqttc = { version = "0.24.0", default-features = false, features = ["use-rustls"], optional = true } +rust_decimal = { workspace = true, optional = true } seahash = { version = "4.1.0", default-features = false } smallvec = { version = "1", default-features = false, features = ["union", "serde"] } snap = { version = "1.1.1", default-features = false } @@ -580,8 +583,9 @@ enrichment-tables-mmdb = ["dep:maxminddb"] enrichment-tables-memory = ["dep:evmap", "dep:evmap-derive", "dep:thread_local"] # Codecs -codecs-syslog = ["vector-lib/syslog"] +codecs-arrow = ["vector-lib/arrow"] codecs-opentelemetry = ["vector-lib/opentelemetry"] +codecs-syslog = ["vector-lib/syslog"] # Secrets secrets = ["secrets-aws-secrets-manager"] diff --git a/LICENSE-3rdparty.csv b/LICENSE-3rdparty.csv index fbf3097a61c5d..d8d6edc54a79d 100644 --- a/LICENSE-3rdparty.csv +++ b/LICENSE-3rdparty.csv @@ -32,6 +32,18 @@ arc-swap,https://github.com/vorner/arc-swap,MIT OR Apache-2.0,Michal 'vorner' Va arr_macro,https://github.com/JoshMcguigan/arr_macro,MIT OR Apache-2.0,Josh Mcguigan arr_macro_impl,https://github.com/JoshMcguigan/arr_macro,MIT OR Apache-2.0,Josh Mcguigan arrayvec,https://github.com/bluss/arrayvec,MIT OR Apache-2.0,bluss +arrow,https://github.com/apache/arrow-rs,Apache-2.0,Apache Arrow +arrow-arith,https://github.com/apache/arrow-rs,Apache-2.0,Apache Arrow +arrow-array,https://github.com/apache/arrow-rs,Apache-2.0,Apache Arrow +arrow-buffer,https://github.com/apache/arrow-rs,Apache-2.0,Apache Arrow +arrow-cast,https://github.com/apache/arrow-rs,Apache-2.0,Apache Arrow +arrow-data,https://github.com/apache/arrow-rs,Apache-2.0,Apache Arrow +arrow-ipc,https://github.com/apache/arrow-rs,Apache-2.0,Apache Arrow +arrow-ord,https://github.com/apache/arrow-rs,Apache-2.0,Apache Arrow +arrow-row,https://github.com/apache/arrow-rs,Apache-2.0,Apache Arrow +arrow-schema,https://github.com/apache/arrow-rs,Apache-2.0,Apache Arrow +arrow-select,https://github.com/apache/arrow-rs,Apache-2.0,Apache Arrow +arrow-string,https://github.com/apache/arrow-rs,Apache-2.0,Apache Arrow ascii,https://github.com/tomprogrammer/rust-ascii,Apache-2.0 OR MIT,"Thomas Bahn , Torbjørn Birch Moltu , Simon Sapin " async-broadcast,https://github.com/smol-rs/async-broadcast,MIT OR Apache-2.0,"Stjepan Glavina , Yoshua Wuyts , Zeeshan Ali Khan " async-channel,https://github.com/smol-rs/async-channel,Apache-2.0 OR MIT,Stjepan Glavina @@ -122,6 +134,8 @@ bollard-stubs,https://github.com/fussybeaver/bollard,Apache-2.0,Bollard contribu bon,https://github.com/elastio/bon,MIT OR Apache-2.0,The bon Authors bon-macros,https://github.com/elastio/bon,MIT OR Apache-2.0,The bon-macros Authors borrow-or-share,https://github.com/yescallop/borrow-or-share,MIT-0,Scallop Ye +borsh,https://github.com/near/borsh-rs,MIT OR Apache-2.0,Near Inc +borsh-derive,https://github.com/near/borsh-rs,Apache-2.0,Near Inc brotli,https://github.com/dropbox/rust-brotli,BSD-3-Clause AND MIT,"Daniel Reiter Horn , The Brotli Authors" brotli-decompressor,https://github.com/dropbox/rust-brotli-decompressor,BSD-3-Clause OR MIT,"Daniel Reiter Horn , The Brotli Authors" bson,https://github.com/mongodb/bson-rust,MIT,"Y. T. Chung , Kevin Yeh , Saghm Rossi , Patrick Freed , Isabel Atkinson , Abraham Egnor " @@ -168,6 +182,8 @@ compression-codecs,https://github.com/Nullus157/async-compression,MIT OR Apache- compression-core,https://github.com/Nullus157/async-compression,MIT OR Apache-2.0,"Wim Looman , Allen Bui " concurrent-queue,https://github.com/smol-rs/concurrent-queue,Apache-2.0 OR MIT,"Stjepan Glavina , Taiki Endo , John Nunley " const-oid,https://github.com/RustCrypto/formats/tree/master/const-oid,Apache-2.0 OR MIT,RustCrypto Developers +const-random,https://github.com/tkaitchuck/constrandom,MIT OR Apache-2.0,Tom Kaitchuck +const-random-macro,https://github.com/tkaitchuck/constrandom,MIT OR Apache-2.0,Tom Kaitchuck convert_case,https://github.com/rutrum/convert-case,MIT,David Purdum convert_case,https://github.com/rutrum/convert-case,MIT,rutrum cookie,https://github.com/SergioBenitez/cookie-rs,MIT OR Apache-2.0,"Sergio Benitez , Alex Crichton " @@ -265,6 +281,7 @@ fastrand,https://github.com/smol-rs/fastrand,Apache-2.0 OR MIT,Stjepan Glavina < ff,https://github.com/zkcrypto/ff,MIT OR Apache-2.0,"Sean Bowe , Jack Grigg " fiat-crypto,https://github.com/mit-plv/fiat-crypto,MIT OR Apache-2.0 OR BSD-1-Clause,Fiat Crypto library authors finl_unicode,https://github.com/dahosek/finl_unicode,MIT OR Apache-2.0,The finl_unicode Authors +flatbuffers,https://github.com/google/flatbuffers,Apache-2.0,"Robert Winslow , FlatBuffers Maintainers" flate2,https://github.com/rust-lang/flate2-rs,MIT OR Apache-2.0,"Alex Crichton , Josh Triplett " float_eq,https://github.com/jtempest/float_eq-rs,MIT OR Apache-2.0,jtempest fluent-uri,https://github.com/yescallop/fluent-uri-rs,MIT,Scallop Ye @@ -406,6 +423,12 @@ kube-runtime,https://github.com/kube-rs/kube,Apache-2.0,"clux lapin,https://github.com/amqp-rs/lapin,MIT,"Geoffroy Couprie , Marc-Antoine Perennou " lazy_static,https://github.com/rust-lang-nursery/lazy-static.rs,MIT OR Apache-2.0,Marvin Löbel +lexical-core,https://github.com/Alexhuszagh/rust-lexical,MIT OR Apache-2.0,Alex Huszagh +lexical-parse-float,https://github.com/Alexhuszagh/rust-lexical,MIT OR Apache-2.0,Alex Huszagh +lexical-parse-integer,https://github.com/Alexhuszagh/rust-lexical,MIT OR Apache-2.0,Alex Huszagh +lexical-util,https://github.com/Alexhuszagh/rust-lexical,MIT OR Apache-2.0,Alex Huszagh +lexical-write-float,https://github.com/Alexhuszagh/rust-lexical,MIT OR Apache-2.0,Alex Huszagh +lexical-write-integer,https://github.com/Alexhuszagh/rust-lexical,MIT OR Apache-2.0,Alex Huszagh libc,https://github.com/rust-lang/libc,MIT OR Apache-2.0,The Rust Project Developers libflate,https://github.com/sile/libflate,MIT,Takeru Ohta libflate_lz77,https://github.com/sile/libflate,MIT,Takeru Ohta @@ -745,6 +768,7 @@ tikv-jemallocator,https://github.com/tikv/jemallocator,MIT OR Apache-2.0,"Alex C time,https://github.com/time-rs/time,MIT OR Apache-2.0,"Jacob Pratt , Time contributors" time-core,https://github.com/time-rs/time,MIT OR Apache-2.0,"Jacob Pratt , Time contributors" time-macros,https://github.com/time-rs/time,MIT OR Apache-2.0,"Jacob Pratt , Time contributors" +tiny-keccak,https://github.com/debris/tiny-keccak,CC0-1.0,debris tinystr,https://github.com/unicode-org/icu4x,Unicode-3.0,The ICU4X Project Developers tinyvec,https://github.com/Lokathor/tinyvec,Zlib OR Apache-2.0 OR MIT,Lokathor tinyvec_macros,https://github.com/Soveu/tinyvec_macros,MIT OR Apache-2.0 OR Zlib,Soveu diff --git a/changelog.d/24074_arrow_batch_codec.feature.md b/changelog.d/24074_arrow_batch_codec.feature.md new file mode 100644 index 0000000000000..f9624481b2947 --- /dev/null +++ b/changelog.d/24074_arrow_batch_codec.feature.md @@ -0,0 +1,6 @@ +A generic [Apache Arrow](https://arrow.apache.org/) codec has been added to +support [Arrow IPC](https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format) serialization across Vector. This enables sinks +like the `clickhouse` sink to use the ArrowStream format endpoint with significantly better performance and smaller payload sizes compared +to JSON-based formats. + +authors: benjamin-awd diff --git a/lib/codecs/Cargo.toml b/lib/codecs/Cargo.toml index 741c95b84a895..57ce81892c774 100644 --- a/lib/codecs/Cargo.toml +++ b/lib/codecs/Cargo.toml @@ -11,8 +11,10 @@ path = "tests/bin/generate-avro-fixtures.rs" [dependencies] apache-avro = { version = "0.20.0", default-features = false } +arrow = { version = "56.2.0", default-features = false, features = ["ipc"] } bytes.workspace = true chrono.workspace = true +rust_decimal = { version = "1.37", default-features = false, features = ["std"] } csv-core = { version = "0.1.12", default-features = false } derivative.workspace = true dyn-clone = { version = "1", default-features = false } @@ -53,5 +55,6 @@ uuid.workspace = true vrl.workspace = true [features] -syslog = ["dep:syslog_loose"] +arrow = [] opentelemetry = ["dep:opentelemetry-proto"] +syslog = ["dep:syslog_loose"] diff --git a/lib/codecs/src/encoding/format/arrow.rs b/lib/codecs/src/encoding/format/arrow.rs new file mode 100644 index 0000000000000..7588b32b94452 --- /dev/null +++ b/lib/codecs/src/encoding/format/arrow.rs @@ -0,0 +1,1445 @@ +//! Arrow IPC streaming format codec for batched event encoding +//! +//! Provides Apache Arrow IPC stream format encoding with static schema support. +//! This implements the streaming variant of the Arrow IPC protocol, which writes +//! a continuous stream of record batches without a file footer. + +use arrow::{ + array::{ + ArrayRef, BinaryBuilder, BooleanBuilder, Decimal128Builder, Decimal256Builder, + Float32Builder, Float64Builder, Int8Builder, Int16Builder, Int32Builder, Int64Builder, + StringBuilder, TimestampMicrosecondBuilder, TimestampMillisecondBuilder, + TimestampNanosecondBuilder, TimestampSecondBuilder, UInt8Builder, UInt16Builder, + UInt32Builder, UInt64Builder, + }, + datatypes::{DataType, Schema, TimeUnit, i256}, + ipc::writer::StreamWriter, + record_batch::RecordBatch, +}; +use bytes::{BufMut, Bytes, BytesMut}; +use chrono::{DateTime, Utc}; +use rust_decimal::Decimal; +use snafu::Snafu; +use std::sync::Arc; +use vector_config::configurable_component; + +use vector_core::event::{Event, Value}; + +/// Configuration for Arrow IPC stream serialization +#[configurable_component] +#[derive(Clone, Default)] +pub struct ArrowStreamSerializerConfig { + /// The Arrow schema to use for encoding + #[serde(skip)] + #[configurable(derived)] + pub schema: Option>, +} + +impl std::fmt::Debug for ArrowStreamSerializerConfig { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ArrowStreamSerializerConfig") + .field( + "schema", + &self + .schema + .as_ref() + .map(|s| format!("{} fields", s.fields().len())), + ) + .finish() + } +} + +impl ArrowStreamSerializerConfig { + /// Create a new ArrowStreamSerializerConfig with a schema + pub fn new(schema: Arc) -> Self { + Self { + schema: Some(schema), + } + } + + /// The data type of events that are accepted by `ArrowStreamEncoder`. + pub fn input_type(&self) -> vector_core::config::DataType { + vector_core::config::DataType::Log + } + + /// The schema required by the serializer. + pub fn schema_requirement(&self) -> vector_core::schema::Requirement { + vector_core::schema::Requirement::empty() + } +} + +/// Arrow IPC stream batch serializer that holds the schema +#[derive(Clone, Debug)] +pub struct ArrowStreamSerializer { + schema: Arc, +} + +impl ArrowStreamSerializer { + /// Create a new ArrowStreamSerializer with the given configuration + pub fn new(config: ArrowStreamSerializerConfig) -> Result { + let schema = config.schema.ok_or_else(|| { + vector_common::Error::from( + "Arrow serializer requires a schema. Pass a schema or fetch from provider before creating serializer." + ) + })?; + + Ok(Self { schema }) + } +} + +impl tokio_util::codec::Encoder> for ArrowStreamSerializer { + type Error = ArrowEncodingError; + + fn encode(&mut self, events: Vec, buffer: &mut BytesMut) -> Result<(), Self::Error> { + if events.is_empty() { + return Err(ArrowEncodingError::NoEvents); + } + + let bytes = encode_events_to_arrow_ipc_stream(&events, Some(Arc::clone(&self.schema)))?; + + buffer.extend_from_slice(&bytes); + Ok(()) + } +} + +/// Errors that can occur during Arrow encoding +#[derive(Debug, Snafu)] +pub enum ArrowEncodingError { + /// Failed to create Arrow record batch + #[snafu(display("Failed to create Arrow record batch: {}", source))] + RecordBatchCreation { + /// The underlying Arrow error + source: arrow::error::ArrowError, + }, + + /// Failed to write Arrow IPC data + #[snafu(display("Failed to write Arrow IPC data: {}", source))] + IpcWrite { + /// The underlying Arrow error + source: arrow::error::ArrowError, + }, + + /// No events provided for encoding + #[snafu(display("No events provided for encoding"))] + NoEvents, + + /// Schema must be provided before encoding + #[snafu(display("Schema must be provided before encoding"))] + NoSchemaProvided, + + /// Unsupported Arrow data type for field + #[snafu(display( + "Unsupported Arrow data type for field '{}': {:?}", + field_name, + data_type + ))] + UnsupportedType { + /// The field name + field_name: String, + /// The unsupported data type + data_type: DataType, + }, + + /// Null value encountered for non-nullable field + #[snafu(display("Null value for non-nullable field '{}'", field_name))] + NullConstraint { + /// The field name + field_name: String, + }, + + /// IO error during encoding + #[snafu(display("IO error: {}", source))] + Io { + /// The underlying IO error + source: std::io::Error, + }, +} + +impl From for ArrowEncodingError { + fn from(error: std::io::Error) -> Self { + Self::Io { source: error } + } +} + +/// Encodes a batch of events into Arrow IPC streaming format +pub fn encode_events_to_arrow_ipc_stream( + events: &[Event], + schema: Option>, +) -> Result { + if events.is_empty() { + return Err(ArrowEncodingError::NoEvents); + } + + let schema_ref = schema.ok_or(ArrowEncodingError::NoSchemaProvided)?; + + let record_batch = build_record_batch(schema_ref.clone(), events)?; + + let ipc_err = |source| ArrowEncodingError::IpcWrite { source }; + + let mut buffer = BytesMut::new().writer(); + let mut writer = StreamWriter::try_new(&mut buffer, &schema_ref).map_err(ipc_err)?; + writer.write(&record_batch).map_err(ipc_err)?; + writer.finish().map_err(ipc_err)?; + + Ok(buffer.into_inner().freeze()) +} + +/// Builds an Arrow RecordBatch from events +fn build_record_batch( + schema: Arc, + events: &[Event], +) -> Result { + let num_fields = schema.fields().len(); + let mut columns: Vec = Vec::with_capacity(num_fields); + + for field in schema.fields() { + let field_name = field.name(); + let nullable = field.is_nullable(); + let array: ArrayRef = match field.data_type() { + DataType::Timestamp(time_unit, _) => { + build_timestamp_array(events, field_name, *time_unit, nullable)? + } + DataType::Utf8 => build_string_array(events, field_name, nullable)?, + DataType::Int8 => build_int8_array(events, field_name, nullable)?, + DataType::Int16 => build_int16_array(events, field_name, nullable)?, + DataType::Int32 => build_int32_array(events, field_name, nullable)?, + DataType::Int64 => build_int64_array(events, field_name, nullable)?, + DataType::UInt8 => build_uint8_array(events, field_name, nullable)?, + DataType::UInt16 => build_uint16_array(events, field_name, nullable)?, + DataType::UInt32 => build_uint32_array(events, field_name, nullable)?, + DataType::UInt64 => build_uint64_array(events, field_name, nullable)?, + DataType::Float32 => build_float32_array(events, field_name, nullable)?, + DataType::Float64 => build_float64_array(events, field_name, nullable)?, + DataType::Boolean => build_boolean_array(events, field_name, nullable)?, + DataType::Binary => build_binary_array(events, field_name, nullable)?, + DataType::Decimal128(precision, scale) => { + build_decimal128_array(events, field_name, *precision, *scale, nullable)? + } + DataType::Decimal256(precision, scale) => { + build_decimal256_array(events, field_name, *precision, *scale, nullable)? + } + other_type => { + return Err(ArrowEncodingError::UnsupportedType { + field_name: field_name.into(), + data_type: other_type.clone(), + }); + } + }; + + columns.push(array); + } + + RecordBatch::try_new(schema, columns) + .map_err(|source| ArrowEncodingError::RecordBatchCreation { source }) +} + +/// Macro to handle appending null or returning an error for non-nullable fields. +macro_rules! handle_null_constraints { + ($builder:expr, $nullable:expr, $field_name:expr) => {{ + if !$nullable { + return Err(ArrowEncodingError::NullConstraint { + field_name: $field_name.into(), + }); + } + $builder.append_null(); + }}; +} + +/// Macro to generate a `build_*_array` function for primitive types. +macro_rules! define_build_primitive_array_fn { + ( + $fn_name:ident, // The function name (e.g., build_int8_array) + $builder_ty:ty, // The builder type (e.g., Int8Builder) + // One or more match arms for valid Value types + $( $value_pat:pat $(if $guard:expr)? => $append_expr:expr ),+ + ) => { + fn $fn_name( + events: &[Event], + field_name: &str, + nullable: bool, + ) -> Result { + let mut builder = <$builder_ty>::with_capacity(events.len()); + + for event in events { + if let Event::Log(log) = event { + match log.get(field_name) { + $( + $value_pat $(if $guard)? => builder.append_value($append_expr), + )+ + // All other patterns are treated as null/invalid + _ => handle_null_constraints!(builder, nullable, field_name), + } + } + } + Ok(Arc::new(builder.finish())) + } + }; +} + +fn extract_timestamp(value: &Value) -> Option> { + match value { + Value::Timestamp(ts) => Some(*ts), + Value::Bytes(bytes) => std::str::from_utf8(bytes) + .ok() + .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok()) + .map(|dt| dt.with_timezone(&Utc)), + _ => None, + } +} + +fn build_timestamp_array( + events: &[Event], + field_name: &str, + time_unit: TimeUnit, + nullable: bool, +) -> Result { + macro_rules! build_array { + ($builder:ty, $converter:expr) => {{ + let mut builder = <$builder>::with_capacity(events.len()); + for event in events { + if let Event::Log(log) = event { + let value_to_append = log.get(field_name).and_then(|value| { + // First, try to extract it as a native or string timestamp + if let Some(ts) = extract_timestamp(value) { + $converter(&ts) + } + // Else, fall back to a raw integer + else if let Value::Integer(i) = value { + Some(*i) + } + // Else, it's an unsupported type (e.g., Bool, Float) + else { + None + } + }); + + if value_to_append.is_none() && !nullable { + return Err(ArrowEncodingError::NullConstraint { + field_name: field_name.into(), + }); + } + + builder.append_option(value_to_append); + } + } + Ok(Arc::new(builder.finish())) + }}; + } + + match time_unit { + TimeUnit::Second => { + build_array!(TimestampSecondBuilder, |ts: &DateTime| Some( + ts.timestamp() + )) + } + TimeUnit::Millisecond => { + build_array!(TimestampMillisecondBuilder, |ts: &DateTime| Some( + ts.timestamp_millis() + )) + } + TimeUnit::Microsecond => { + build_array!(TimestampMicrosecondBuilder, |ts: &DateTime| Some( + ts.timestamp_micros() + )) + } + TimeUnit::Nanosecond => { + build_array!(TimestampNanosecondBuilder, |ts: &DateTime| ts + .timestamp_nanos_opt()) + } + } +} + +fn build_string_array( + events: &[Event], + field_name: &str, + nullable: bool, +) -> Result { + let mut builder = StringBuilder::with_capacity(events.len(), 0); + + for event in events { + if let Event::Log(log) = event { + let mut appended = false; + if let Some(value) = log.get(field_name) { + match value { + Value::Bytes(bytes) => { + // Attempt direct UTF-8 conversion first, fallback to lossy + match std::str::from_utf8(bytes) { + Ok(s) => builder.append_value(s), + Err(_) => builder.append_value(&String::from_utf8_lossy(bytes)), + } + appended = true; + } + Value::Object(obj) => { + if let Ok(s) = serde_json::to_string(&obj) { + builder.append_value(s); + appended = true; + } + } + Value::Array(arr) => { + if let Ok(s) = serde_json::to_string(&arr) { + builder.append_value(s); + appended = true; + } + } + _ => { + builder.append_value(&value.to_string_lossy()); + appended = true; + } + } + } + + if !appended { + handle_null_constraints!(builder, nullable, field_name); + } + } + } + + Ok(Arc::new(builder.finish())) +} + +define_build_primitive_array_fn!( + build_int8_array, + Int8Builder, + Some(Value::Integer(i)) if *i >= i8::MIN as i64 && *i <= i8::MAX as i64 => *i as i8 +); + +define_build_primitive_array_fn!( + build_int16_array, + Int16Builder, + Some(Value::Integer(i)) if *i >= i16::MIN as i64 && *i <= i16::MAX as i64 => *i as i16 +); + +define_build_primitive_array_fn!( + build_int32_array, + Int32Builder, + Some(Value::Integer(i)) if *i >= i32::MIN as i64 && *i <= i32::MAX as i64 => *i as i32 +); + +define_build_primitive_array_fn!( + build_int64_array, + Int64Builder, + Some(Value::Integer(i)) => *i +); + +define_build_primitive_array_fn!( + build_uint8_array, + UInt8Builder, + Some(Value::Integer(i)) if *i >= 0 && *i <= u8::MAX as i64 => *i as u8 +); + +define_build_primitive_array_fn!( + build_uint16_array, + UInt16Builder, + Some(Value::Integer(i)) if *i >= 0 && *i <= u16::MAX as i64 => *i as u16 +); + +define_build_primitive_array_fn!( + build_uint32_array, + UInt32Builder, + Some(Value::Integer(i)) if *i >= 0 && *i <= u32::MAX as i64 => *i as u32 +); + +define_build_primitive_array_fn!( + build_uint64_array, + UInt64Builder, + Some(Value::Integer(i)) if *i >= 0 => *i as u64 +); + +define_build_primitive_array_fn!( + build_float32_array, + Float32Builder, + Some(Value::Float(f)) => f.into_inner() as f32, + Some(Value::Integer(i)) => *i as f32 +); + +define_build_primitive_array_fn!( + build_float64_array, + Float64Builder, + Some(Value::Float(f)) => f.into_inner(), + Some(Value::Integer(i)) => *i as f64 +); + +define_build_primitive_array_fn!( + build_boolean_array, + BooleanBuilder, + Some(Value::Boolean(b)) => *b +); + +fn build_binary_array( + events: &[Event], + field_name: &str, + nullable: bool, +) -> Result { + let mut builder = BinaryBuilder::with_capacity(events.len(), 0); + + for event in events { + if let Event::Log(log) = event { + match log.get(field_name) { + Some(Value::Bytes(bytes)) => builder.append_value(bytes), + _ => handle_null_constraints!(builder, nullable, field_name), + } + } + } + + Ok(Arc::new(builder.finish())) +} + +fn build_decimal128_array( + events: &[Event], + field_name: &str, + precision: u8, + scale: i8, + nullable: bool, +) -> Result { + let mut builder = Decimal128Builder::with_capacity(events.len()) + .with_precision_and_scale(precision, scale) + .map_err(|_| ArrowEncodingError::UnsupportedType { + field_name: field_name.into(), + data_type: DataType::Decimal128(precision, scale), + })?; + + let target_scale = scale.unsigned_abs() as u32; + + for event in events { + if let Event::Log(log) = event { + let mut appended = false; + match log.get(field_name) { + Some(Value::Float(f)) => { + if let Ok(mut decimal) = Decimal::try_from(f.into_inner()) { + decimal.rescale(target_scale); + let mantissa = decimal.mantissa(); + builder.append_value(mantissa); + appended = true; + } + } + Some(Value::Integer(i)) => { + let mut decimal = Decimal::from(*i); + decimal.rescale(target_scale); + let mantissa = decimal.mantissa(); + builder.append_value(mantissa); + appended = true; + } + _ => {} + } + + if !appended { + handle_null_constraints!(builder, nullable, field_name); + } + } + } + + Ok(Arc::new(builder.finish())) +} + +fn build_decimal256_array( + events: &[Event], + field_name: &str, + precision: u8, + scale: i8, + nullable: bool, +) -> Result { + let mut builder = Decimal256Builder::with_capacity(events.len()) + .with_precision_and_scale(precision, scale) + .map_err(|_| ArrowEncodingError::UnsupportedType { + field_name: field_name.into(), + data_type: DataType::Decimal256(precision, scale), + })?; + + let target_scale = scale.unsigned_abs() as u32; + + for event in events { + if let Event::Log(log) = event { + let mut appended = false; + match log.get(field_name) { + Some(Value::Float(f)) => { + if let Ok(mut decimal) = Decimal::try_from(f.into_inner()) { + decimal.rescale(target_scale); + let mantissa = decimal.mantissa(); + // rust_decimal does not support i256 natively so we upcast here + builder.append_value(i256::from_i128(mantissa)); + appended = true; + } + } + Some(Value::Integer(i)) => { + let mut decimal = Decimal::from(*i); + decimal.rescale(target_scale); + let mantissa = decimal.mantissa(); + builder.append_value(i256::from_i128(mantissa)); + appended = true; + } + _ => {} + } + + if !appended { + handle_null_constraints!(builder, nullable, field_name); + } + } + } + + Ok(Arc::new(builder.finish())) +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow::{ + array::{ + Array, BinaryArray, BooleanArray, Float64Array, Int64Array, StringArray, + TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray, + TimestampSecondArray, + }, + datatypes::Field, + ipc::reader::StreamReader, + }; + use chrono::Utc; + use std::io::Cursor; + use vector_core::event::LogEvent; + + #[test] + fn test_encode_all_types() { + let mut log = LogEvent::default(); + log.insert("string_field", "test"); + log.insert("int8_field", 127); + log.insert("int16_field", 32000); + log.insert("int32_field", 1000000); + log.insert("int64_field", 42); + log.insert("float32_field", 3.15); + log.insert("float64_field", 3.15); + log.insert("bool_field", true); + log.insert("bytes_field", bytes::Bytes::from("binary")); + log.insert("timestamp_field", Utc::now()); + + let events = vec![Event::Log(log)]; + + let schema = Arc::new(Schema::new(vec![ + Field::new("string_field", DataType::Utf8, true), + Field::new("int8_field", DataType::Int8, true), + Field::new("int16_field", DataType::Int16, true), + Field::new("int32_field", DataType::Int32, true), + Field::new("int64_field", DataType::Int64, true), + Field::new("float32_field", DataType::Float32, true), + Field::new("float64_field", DataType::Float64, true), + Field::new("bool_field", DataType::Boolean, true), + Field::new("bytes_field", DataType::Binary, true), + Field::new( + "timestamp_field", + DataType::Timestamp(TimeUnit::Millisecond, None), + true, + ), + ])); + + let result = encode_events_to_arrow_ipc_stream(&events, Some(Arc::clone(&schema))); + assert!(result.is_ok()); + + let bytes = result.unwrap(); + let cursor = Cursor::new(bytes); + let mut reader = StreamReader::try_new(cursor, None).unwrap(); + let batch = reader.next().unwrap().unwrap(); + + assert_eq!(batch.num_rows(), 1); + assert_eq!(batch.num_columns(), 10); + + // Verify string field + assert_eq!( + batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap() + .value(0), + "test" + ); + + // Verify int8 field + assert_eq!( + batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap() + .value(0), + 127 + ); + + // Verify int16 field + assert_eq!( + batch + .column(2) + .as_any() + .downcast_ref::() + .unwrap() + .value(0), + 32000 + ); + + // Verify int32 field + assert_eq!( + batch + .column(3) + .as_any() + .downcast_ref::() + .unwrap() + .value(0), + 1000000 + ); + + // Verify int64 field + assert_eq!( + batch + .column(4) + .as_any() + .downcast_ref::() + .unwrap() + .value(0), + 42 + ); + + // Verify float32 field + assert!( + (batch + .column(5) + .as_any() + .downcast_ref::() + .unwrap() + .value(0) + - 3.15) + .abs() + < 0.001 + ); + + // Verify float64 field + assert!( + (batch + .column(6) + .as_any() + .downcast_ref::() + .unwrap() + .value(0) + - 3.15) + .abs() + < 0.001 + ); + + // Verify boolean field + assert!( + batch + .column(7) + .as_any() + .downcast_ref::() + .unwrap() + .value(0), + "{}", + true + ); + + // Verify binary field + assert_eq!( + batch + .column(8) + .as_any() + .downcast_ref::() + .unwrap() + .value(0), + b"binary" + ); + + // Verify timestamp field + assert!( + !batch + .column(9) + .as_any() + .downcast_ref::() + .unwrap() + .is_null(0) + ); + } + + #[test] + fn test_encode_null_values() { + let mut log1 = LogEvent::default(); + log1.insert("field_a", 1); + // field_b is missing + + let mut log2 = LogEvent::default(); + log2.insert("field_b", 2); + // field_a is missing + + let events = vec![Event::Log(log1), Event::Log(log2)]; + + let schema = Arc::new(Schema::new(vec![ + Field::new("field_a", DataType::Int64, true), + Field::new("field_b", DataType::Int64, true), + ])); + + let result = encode_events_to_arrow_ipc_stream(&events, Some(Arc::clone(&schema))); + assert!(result.is_ok()); + + let bytes = result.unwrap(); + let cursor = Cursor::new(bytes); + let mut reader = StreamReader::try_new(cursor, None).unwrap(); + let batch = reader.next().unwrap().unwrap(); + + assert_eq!(batch.num_rows(), 2); + + let field_a = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(field_a.value(0), 1); + assert!(field_a.is_null(1)); + + let field_b = batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + assert!(field_b.is_null(0)); + assert_eq!(field_b.value(1), 2); + } + + #[test] + fn test_encode_type_mismatches() { + let mut log1 = LogEvent::default(); + log1.insert("field", 42); // Integer + + let mut log2 = LogEvent::default(); + log2.insert("field", 3.15); // Float - type mismatch! + + let events = vec![Event::Log(log1), Event::Log(log2)]; + + // Schema expects Int64 + let schema = Arc::new(Schema::new(vec![Field::new( + "field", + DataType::Int64, + true, + )])); + + let result = encode_events_to_arrow_ipc_stream(&events, Some(Arc::clone(&schema))); + assert!(result.is_ok()); + + let bytes = result.unwrap(); + let cursor = Cursor::new(bytes); + let mut reader = StreamReader::try_new(cursor, None).unwrap(); + let batch = reader.next().unwrap().unwrap(); + + assert_eq!(batch.num_rows(), 2); + + let field_array = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(field_array.value(0), 42); + assert!(field_array.is_null(1)); // Type mismatch becomes null + } + + #[test] + fn test_encode_complex_json_values() { + use serde_json::json; + + let mut log = LogEvent::default(); + log.insert( + "object_field", + json!({"key": "value", "nested": {"count": 42}}), + ); + log.insert("array_field", json!([1, 2, 3])); + + let events = vec![Event::Log(log)]; + + let schema = Arc::new(Schema::new(vec![ + Field::new("object_field", DataType::Utf8, true), + Field::new("array_field", DataType::Utf8, true), + ])); + + let result = encode_events_to_arrow_ipc_stream(&events, Some(Arc::clone(&schema))); + assert!(result.is_ok()); + + let bytes = result.unwrap(); + let cursor = Cursor::new(bytes); + let mut reader = StreamReader::try_new(cursor, None).unwrap(); + let batch = reader.next().unwrap().unwrap(); + + assert_eq!(batch.num_rows(), 1); + + let object_array = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let object_str = object_array.value(0); + assert!(object_str.contains("key")); + assert!(object_str.contains("value")); + + let array_array = batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + let array_str = array_array.value(0); + assert_eq!(array_str, "[1,2,3]"); + } + + #[test] + fn test_encode_unsupported_type() { + let mut log = LogEvent::default(); + log.insert("field", "value"); + + let events = vec![Event::Log(log)]; + + // Use an unsupported type + let schema = Arc::new(Schema::new(vec![Field::new( + "field", + DataType::Duration(TimeUnit::Millisecond), + true, + )])); + + let result = encode_events_to_arrow_ipc_stream(&events, Some(schema)); + assert!(result.is_err()); + assert!(matches!( + result.unwrap_err(), + ArrowEncodingError::UnsupportedType { .. } + )); + } + + #[test] + fn test_encode_without_schema_fails() { + let mut log1 = LogEvent::default(); + log1.insert("message", "hello"); + + let events = vec![Event::Log(log1)]; + + let result = encode_events_to_arrow_ipc_stream(&events, None); + assert!(result.is_err()); + assert!(matches!( + result.unwrap_err(), + ArrowEncodingError::NoSchemaProvided + )); + } + + #[test] + fn test_encode_empty_events() { + let events: Vec = vec![]; + let result = encode_events_to_arrow_ipc_stream(&events, None); + assert!(result.is_err()); + assert!(matches!(result.unwrap_err(), ArrowEncodingError::NoEvents)); + } + + #[test] + fn test_encode_timestamp_precisions() { + let now = Utc::now(); + let mut log = LogEvent::default(); + log.insert("ts_second", now); + log.insert("ts_milli", now); + log.insert("ts_micro", now); + log.insert("ts_nano", now); + + let events = vec![Event::Log(log)]; + + let schema = Arc::new(Schema::new(vec![ + Field::new( + "ts_second", + DataType::Timestamp(TimeUnit::Second, None), + true, + ), + Field::new( + "ts_milli", + DataType::Timestamp(TimeUnit::Millisecond, None), + true, + ), + Field::new( + "ts_micro", + DataType::Timestamp(TimeUnit::Microsecond, None), + true, + ), + Field::new( + "ts_nano", + DataType::Timestamp(TimeUnit::Nanosecond, None), + true, + ), + ])); + + let result = encode_events_to_arrow_ipc_stream(&events, Some(Arc::clone(&schema))); + assert!(result.is_ok()); + + let bytes = result.unwrap(); + let cursor = Cursor::new(bytes); + let mut reader = StreamReader::try_new(cursor, None).unwrap(); + let batch = reader.next().unwrap().unwrap(); + + assert_eq!(batch.num_rows(), 1); + assert_eq!(batch.num_columns(), 4); + + let ts_second = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert!(!ts_second.is_null(0)); + assert_eq!(ts_second.value(0), now.timestamp()); + + let ts_milli = batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + assert!(!ts_milli.is_null(0)); + assert_eq!(ts_milli.value(0), now.timestamp_millis()); + + let ts_micro = batch + .column(2) + .as_any() + .downcast_ref::() + .unwrap(); + assert!(!ts_micro.is_null(0)); + assert_eq!(ts_micro.value(0), now.timestamp_micros()); + + let ts_nano = batch + .column(3) + .as_any() + .downcast_ref::() + .unwrap(); + assert!(!ts_nano.is_null(0)); + assert_eq!(ts_nano.value(0), now.timestamp_nanos_opt().unwrap()); + } + + #[test] + fn test_encode_mixed_timestamp_string_and_native() { + // Test mixing string timestamps with native Timestamp values + let mut log1 = LogEvent::default(); + log1.insert("ts", "2025-10-22T10:18:44.256Z"); // String + + let mut log2 = LogEvent::default(); + log2.insert("ts", Utc::now()); // Native Timestamp + + let mut log3 = LogEvent::default(); + log3.insert("ts", 1729594724256000000_i64); // Integer (nanoseconds) + + let events = vec![Event::Log(log1), Event::Log(log2), Event::Log(log3)]; + + let schema = Arc::new(Schema::new(vec![Field::new( + "ts", + DataType::Timestamp(TimeUnit::Nanosecond, None), + true, + )])); + + let result = encode_events_to_arrow_ipc_stream(&events, Some(Arc::clone(&schema))); + assert!(result.is_ok()); + + let bytes = result.unwrap(); + let cursor = Cursor::new(bytes); + let mut reader = StreamReader::try_new(cursor, None).unwrap(); + let batch = reader.next().unwrap().unwrap(); + + assert_eq!(batch.num_rows(), 3); + + let ts_array = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + + // All three should be non-null + assert!(!ts_array.is_null(0)); + assert!(!ts_array.is_null(1)); + assert!(!ts_array.is_null(2)); + + // First one should match the parsed string + let expected = chrono::DateTime::parse_from_rfc3339("2025-10-22T10:18:44.256Z") + .unwrap() + .timestamp_nanos_opt() + .unwrap(); + assert_eq!(ts_array.value(0), expected); + + // Third one should match the integer + assert_eq!(ts_array.value(2), 1729594724256000000_i64); + } + + #[test] + fn test_encode_invalid_string_timestamp() { + // Test that invalid timestamp strings become null + let mut log1 = LogEvent::default(); + log1.insert("timestamp", "not-a-timestamp"); + + let mut log2 = LogEvent::default(); + log2.insert("timestamp", "2025-10-22T10:18:44.256Z"); // Valid + + let mut log3 = LogEvent::default(); + log3.insert("timestamp", "2025-99-99T99:99:99Z"); // Invalid + + let events = vec![Event::Log(log1), Event::Log(log2), Event::Log(log3)]; + + let schema = Arc::new(Schema::new(vec![Field::new( + "timestamp", + DataType::Timestamp(TimeUnit::Nanosecond, None), + true, + )])); + + let result = encode_events_to_arrow_ipc_stream(&events, Some(Arc::clone(&schema))); + assert!(result.is_ok()); + + let bytes = result.unwrap(); + let cursor = Cursor::new(bytes); + let mut reader = StreamReader::try_new(cursor, None).unwrap(); + let batch = reader.next().unwrap().unwrap(); + + assert_eq!(batch.num_rows(), 3); + + let ts_array = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + + // Invalid timestamps should be null + assert!(ts_array.is_null(0)); + assert!(!ts_array.is_null(1)); // Valid one + assert!(ts_array.is_null(2)); + } + + #[test] + fn test_encode_decimal128_from_integer() { + use arrow::array::Decimal128Array; + + let mut log = LogEvent::default(); + // Store quantity as integer: 1000 + log.insert("quantity", 1000_i64); + + let events = vec![Event::Log(log)]; + + // Decimal(10, 3) - will represent 1000 as 1000.000 + let schema = Arc::new(Schema::new(vec![Field::new( + "quantity", + DataType::Decimal128(10, 3), + true, + )])); + + let result = encode_events_to_arrow_ipc_stream(&events, Some(Arc::clone(&schema))); + assert!(result.is_ok()); + + let bytes = result.unwrap(); + let cursor = Cursor::new(bytes); + let mut reader = StreamReader::try_new(cursor, None).unwrap(); + let batch = reader.next().unwrap().unwrap(); + + assert_eq!(batch.num_rows(), 1); + + let decimal_array = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + + assert!(!decimal_array.is_null(0)); + // 1000 with scale 3 = 1000 * 10^3 = 1000000 + assert_eq!(decimal_array.value(0), 1000000_i128); + } + + #[test] + fn test_encode_decimal256() { + use arrow::array::Decimal256Array; + + let mut log = LogEvent::default(); + // Very large precision number + log.insert("big_value", 123456789.123456_f64); + + let events = vec![Event::Log(log)]; + + // Decimal256(50, 6) - high precision decimal + let schema = Arc::new(Schema::new(vec![Field::new( + "big_value", + DataType::Decimal256(50, 6), + true, + )])); + + let result = encode_events_to_arrow_ipc_stream(&events, Some(Arc::clone(&schema))); + assert!(result.is_ok()); + + let bytes = result.unwrap(); + let cursor = Cursor::new(bytes); + let mut reader = StreamReader::try_new(cursor, None).unwrap(); + let batch = reader.next().unwrap().unwrap(); + + assert_eq!(batch.num_rows(), 1); + + let decimal_array = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + + assert!(!decimal_array.is_null(0)); + // Value should be non-null and encoded + let value = decimal_array.value(0); + assert!(value.to_i128().is_some()); + } + + #[test] + fn test_encode_decimal_null_values() { + use arrow::array::Decimal128Array; + + let mut log1 = LogEvent::default(); + log1.insert("price", 99.99_f64); + + let log2 = LogEvent::default(); + // No price field - should be null + + let mut log3 = LogEvent::default(); + log3.insert("price", 50.00_f64); + + let events = vec![Event::Log(log1), Event::Log(log2), Event::Log(log3)]; + + let schema = Arc::new(Schema::new(vec![Field::new( + "price", + DataType::Decimal128(10, 2), + true, + )])); + + let result = encode_events_to_arrow_ipc_stream(&events, Some(Arc::clone(&schema))); + assert!(result.is_ok()); + + let bytes = result.unwrap(); + let cursor = Cursor::new(bytes); + let mut reader = StreamReader::try_new(cursor, None).unwrap(); + let batch = reader.next().unwrap().unwrap(); + + assert_eq!(batch.num_rows(), 3); + + let decimal_array = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + + // First row: 99.99 + assert!(!decimal_array.is_null(0)); + assert_eq!(decimal_array.value(0), 9999_i128); + + // Second row: null + assert!(decimal_array.is_null(1)); + + // Third row: 50.00 + assert!(!decimal_array.is_null(2)); + assert_eq!(decimal_array.value(2), 5000_i128); + } + + #[test] + fn test_encode_unsigned_integer_types() { + use arrow::array::{UInt8Array, UInt16Array, UInt32Array, UInt64Array}; + + let mut log = LogEvent::default(); + log.insert("uint8_field", 255_i64); + log.insert("uint16_field", 65535_i64); + log.insert("uint32_field", 4294967295_i64); + log.insert("uint64_field", 9223372036854775807_i64); + + let events = vec![Event::Log(log)]; + + let schema = Arc::new(Schema::new(vec![ + Field::new("uint8_field", DataType::UInt8, true), + Field::new("uint16_field", DataType::UInt16, true), + Field::new("uint32_field", DataType::UInt32, true), + Field::new("uint64_field", DataType::UInt64, true), + ])); + + let result = encode_events_to_arrow_ipc_stream(&events, Some(Arc::clone(&schema))); + assert!(result.is_ok()); + + let bytes = result.unwrap(); + let cursor = Cursor::new(bytes); + let mut reader = StreamReader::try_new(cursor, None).unwrap(); + let batch = reader.next().unwrap().unwrap(); + + assert_eq!(batch.num_rows(), 1); + assert_eq!(batch.num_columns(), 4); + + // Verify uint8 + let uint8_array = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(uint8_array.value(0), 255_u8); + + // Verify uint16 + let uint16_array = batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(uint16_array.value(0), 65535_u16); + + // Verify uint32 + let uint32_array = batch + .column(2) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(uint32_array.value(0), 4294967295_u32); + + // Verify uint64 + let uint64_array = batch + .column(3) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(uint64_array.value(0), 9223372036854775807_u64); + } + + #[test] + fn test_encode_unsigned_integers_with_null_and_overflow() { + use arrow::array::{UInt8Array, UInt32Array}; + + let mut log1 = LogEvent::default(); + log1.insert("uint8_field", 100_i64); + log1.insert("uint32_field", 1000_i64); + + let mut log2 = LogEvent::default(); + log2.insert("uint8_field", 300_i64); // Overflow - should be null + log2.insert("uint32_field", -1_i64); // Negative - should be null + + let log3 = LogEvent::default(); + // Missing fields - should be null + + let events = vec![Event::Log(log1), Event::Log(log2), Event::Log(log3)]; + + let schema = Arc::new(Schema::new(vec![ + Field::new("uint8_field", DataType::UInt8, true), + Field::new("uint32_field", DataType::UInt32, true), + ])); + + let result = encode_events_to_arrow_ipc_stream(&events, Some(Arc::clone(&schema))); + assert!(result.is_ok()); + + let bytes = result.unwrap(); + let cursor = Cursor::new(bytes); + let mut reader = StreamReader::try_new(cursor, None).unwrap(); + let batch = reader.next().unwrap().unwrap(); + + assert_eq!(batch.num_rows(), 3); + + // Check uint8 column + let uint8_array = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(uint8_array.value(0), 100_u8); // Valid + assert!(uint8_array.is_null(1)); // Overflow + assert!(uint8_array.is_null(2)); // Missing + + // Check uint32 column + let uint32_array = batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(uint32_array.value(0), 1000_u32); // Valid + assert!(uint32_array.is_null(1)); // Negative + assert!(uint32_array.is_null(2)); // Missing + } + + #[test] + fn test_encode_non_nullable_field_with_null_value() { + // Test that encoding fails when a non-nullable field encounters a null value + let mut log1 = LogEvent::default(); + log1.insert("required_field", 42); + + let log2 = LogEvent::default(); + // log2 is missing required_field - should cause an error + + let events = vec![Event::Log(log1), Event::Log(log2)]; + + // Create schema with non-nullable field + let schema = Arc::new(Schema::new(vec![Field::new( + "required_field", + DataType::Int64, + false, // Not nullable + )])); + + let result = encode_events_to_arrow_ipc_stream(&events, Some(schema)); + assert!(result.is_err()); + + match result.unwrap_err() { + ArrowEncodingError::NullConstraint { field_name } => { + assert_eq!(field_name, "required_field"); + } + other => panic!("Expected NullConstraint error, got: {:?}", other), + } + } + + #[test] + fn test_encode_non_nullable_string_field_with_missing_value() { + // Test that encoding fails for non-nullable string field + let mut log1 = LogEvent::default(); + log1.insert("name", "Alice"); + + let mut log2 = LogEvent::default(); + log2.insert("name", "Bob"); + + let log3 = LogEvent::default(); + // log3 is missing name field + + let events = vec![Event::Log(log1), Event::Log(log2), Event::Log(log3)]; + + let schema = Arc::new(Schema::new(vec![Field::new( + "name", + DataType::Utf8, + false, // Not nullable + )])); + + let result = encode_events_to_arrow_ipc_stream(&events, Some(schema)); + assert!(result.is_err()); + + match result.unwrap_err() { + ArrowEncodingError::NullConstraint { field_name } => { + assert_eq!(field_name, "name"); + } + other => panic!("Expected NullConstraint error, got: {:?}", other), + } + } + + #[test] + fn test_encode_non_nullable_field_all_values_present() { + // Test that encoding succeeds when all values are present for non-nullable field + let mut log1 = LogEvent::default(); + log1.insert("id", 1); + + let mut log2 = LogEvent::default(); + log2.insert("id", 2); + + let mut log3 = LogEvent::default(); + log3.insert("id", 3); + + let events = vec![Event::Log(log1), Event::Log(log2), Event::Log(log3)]; + + let schema = Arc::new(Schema::new(vec![Field::new( + "id", + DataType::Int64, + false, // Not nullable + )])); + + let result = encode_events_to_arrow_ipc_stream(&events, Some(Arc::clone(&schema))); + assert!(result.is_ok()); + + let bytes = result.unwrap(); + let cursor = Cursor::new(bytes); + let mut reader = StreamReader::try_new(cursor, None).unwrap(); + let batch = reader.next().unwrap().unwrap(); + + assert_eq!(batch.num_rows(), 3); + + let id_array = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + + assert_eq!(id_array.value(0), 1); + assert_eq!(id_array.value(1), 2); + assert_eq!(id_array.value(2), 3); + assert!(!id_array.is_null(0)); + assert!(!id_array.is_null(1)); + assert!(!id_array.is_null(2)); + } +} diff --git a/lib/codecs/src/encoding/format/mod.rs b/lib/codecs/src/encoding/format/mod.rs index 9377cdca5d906..0d21e8b94e25c 100644 --- a/lib/codecs/src/encoding/format/mod.rs +++ b/lib/codecs/src/encoding/format/mod.rs @@ -3,6 +3,8 @@ #![deny(missing_docs)] +#[cfg(feature = "arrow")] +mod arrow; mod avro; mod cef; mod common; @@ -20,6 +22,8 @@ mod text; use std::fmt::Debug; +#[cfg(feature = "arrow")] +pub use arrow::{ArrowEncodingError, ArrowStreamSerializer, ArrowStreamSerializerConfig}; pub use avro::{AvroSerializer, AvroSerializerConfig, AvroSerializerOptions}; pub use cef::{CefSerializer, CefSerializerConfig}; use dyn_clone::DynClone; diff --git a/lib/codecs/src/encoding/mod.rs b/lib/codecs/src/encoding/mod.rs index 8dd2c4ddc79a5..3fe0baafa8b91 100644 --- a/lib/codecs/src/encoding/mod.rs +++ b/lib/codecs/src/encoding/mod.rs @@ -6,6 +6,8 @@ pub mod format; pub mod framing; pub mod serializer; pub use chunking::{Chunker, Chunking, GelfChunker}; +#[cfg(feature = "arrow")] +pub use format::{ArrowEncodingError, ArrowStreamSerializer, ArrowStreamSerializerConfig}; pub use format::{ AvroSerializer, AvroSerializerConfig, AvroSerializerOptions, CefSerializer, CefSerializerConfig, CsvSerializer, CsvSerializerConfig, GelfSerializer, GelfSerializerConfig, @@ -24,18 +26,22 @@ pub use framing::{ NewlineDelimitedEncoderConfig, VarintLengthDelimitedEncoder, VarintLengthDelimitedEncoderConfig, }; +#[cfg(feature = "arrow")] +pub use serializer::BatchSerializerConfig; pub use serializer::{Serializer, SerializerConfig}; /// An error that occurred while building an encoder. pub type BuildError = Box; -/// An error that occurred while encoding structured events into byte frames. +/// An error that occurred while encoding structured events. #[derive(Debug)] pub enum Error { /// The error occurred while encoding the byte frame boundaries. FramingError(BoxedFramingError), /// The error occurred while serializing a structured event into bytes. SerializingError(vector_common::Error), + /// A schema constraint was violated during encoding (e.g., null value for non-nullable field). + SchemaConstraintViolation(vector_common::Error), } impl std::fmt::Display for Error { @@ -43,6 +49,9 @@ impl std::fmt::Display for Error { match self { Self::FramingError(error) => write!(formatter, "FramingError({error})"), Self::SerializingError(error) => write!(formatter, "SerializingError({error})"), + Self::SchemaConstraintViolation(error) => { + write!(formatter, "SchemaConstraintViolation({error})") + } } } } diff --git a/lib/codecs/src/encoding/serializer.rs b/lib/codecs/src/encoding/serializer.rs index fdc8397deca5d..899e03d60e4ec 100644 --- a/lib/codecs/src/encoding/serializer.rs +++ b/lib/codecs/src/encoding/serializer.rs @@ -4,6 +4,8 @@ use bytes::BytesMut; use vector_config::configurable_component; use vector_core::{config::DataType, event::Event, schema}; +#[cfg(feature = "arrow")] +use super::format::{ArrowStreamSerializer, ArrowStreamSerializerConfig}; #[cfg(feature = "opentelemetry")] use super::format::{OtlpSerializer, OtlpSerializerConfig}; use super::{ @@ -134,6 +136,53 @@ impl Default for SerializerConfig { } } +/// Batch serializer configuration. +#[configurable_component] +#[derive(Clone, Debug)] +#[serde(tag = "codec", rename_all = "snake_case")] +#[configurable(metadata( + docs::enum_tag_description = "The codec to use for batch encoding events." +))] +pub enum BatchSerializerConfig { + /// Encodes events in [Apache Arrow][apache_arrow] IPC streaming format. + /// + /// This is the streaming variant of the Arrow IPC format, which writes + /// a continuous stream of record batches. + /// + /// [apache_arrow]: https://arrow.apache.org/ + #[cfg(feature = "arrow")] + #[serde(rename = "arrow_stream")] + ArrowStream(ArrowStreamSerializerConfig), +} + +#[cfg(feature = "arrow")] +impl BatchSerializerConfig { + /// Build the `ArrowStreamSerializer` from this configuration. + pub fn build( + &self, + ) -> Result> { + match self { + BatchSerializerConfig::ArrowStream(arrow_config) => { + ArrowStreamSerializer::new(arrow_config.clone()) + } + } + } + + /// The data type of events that are accepted by this batch serializer. + pub fn input_type(&self) -> DataType { + match self { + BatchSerializerConfig::ArrowStream(arrow_config) => arrow_config.input_type(), + } + } + + /// The schema required by the batch serializer. + pub fn schema_requirement(&self) -> schema::Requirement { + match self { + BatchSerializerConfig::ArrowStream(arrow_config) => arrow_config.schema_requirement(), + } + } +} + impl From for SerializerConfig { fn from(config: AvroSerializerConfig) -> Self { Self::Avro { avro: config.avro } diff --git a/lib/vector-lib/Cargo.toml b/lib/vector-lib/Cargo.toml index 7088a79978d60..c72af97fdaa62 100644 --- a/lib/vector-lib/Cargo.toml +++ b/lib/vector-lib/Cargo.toml @@ -26,6 +26,7 @@ vrl = { workspace = true, optional = true } [features] allocation-tracing = ["vector-top?/allocation-tracing"] api-client = ["dep:vector-api-client"] +arrow = ["codecs/arrow"] api = ["vector-tap/api"] file-source = ["dep:file-source", "dep:file-source-common"] lua = ["vector-core/lua"] diff --git a/src/codecs/encoding/config.rs b/src/codecs/encoding/config.rs index a04f44315047a..255db45b538ed 100644 --- a/src/codecs/encoding/config.rs +++ b/src/codecs/encoding/config.rs @@ -1,4 +1,4 @@ -use crate::codecs::Transformer; +use crate::codecs::{Encoder, EncoderKind, Transformer}; use vector_lib::{ codecs::{ CharacterDelimitedEncoder, LengthDelimitedEncoder, NewlineDelimitedEncoder, @@ -138,6 +138,13 @@ impl EncodingConfigWithFraming { Ok((framer, serializer)) } + + /// Build the `Transformer` and `EncoderKind` for this config. + pub fn build_encoder(&self, sink_type: SinkType) -> crate::Result<(Transformer, EncoderKind)> { + let (framer, serializer) = self.build(sink_type)?; + let encoder = EncoderKind::Framed(Box::new(Encoder::::new(framer, serializer))); + Ok((self.transformer(), encoder)) + } } /// The way a sink processes outgoing events. diff --git a/src/codecs/encoding/encoder.rs b/src/codecs/encoding/encoder.rs index f1d0741bb669c..333c29b4840cf 100644 --- a/src/codecs/encoding/encoder.rs +++ b/src/codecs/encoding/encoder.rs @@ -1,5 +1,7 @@ use bytes::BytesMut; use tokio_util::codec::Encoder as _; +#[cfg(feature = "codecs-arrow")] +use vector_lib::codecs::encoding::ArrowStreamSerializer; use vector_lib::codecs::{ CharacterDelimitedEncoder, NewlineDelimitedEncoder, TextSerializerConfig, encoding::{Error, Framer, Serializer}, @@ -10,6 +12,74 @@ use crate::{ internal_events::{EncoderFramingError, EncoderSerializeError}, }; +/// Serializers that support batch encoding (encoding all events at once). +#[derive(Debug, Clone)] +pub enum BatchSerializer { + /// Arrow IPC stream format serializer. + #[cfg(feature = "codecs-arrow")] + Arrow(ArrowStreamSerializer), +} + +/// An encoder that encodes batches of events. +#[derive(Debug, Clone)] +pub struct BatchEncoder { + serializer: BatchSerializer, +} + +impl BatchEncoder { + /// Creates a new `BatchEncoder` with the specified batch serializer. + pub const fn new(serializer: BatchSerializer) -> Self { + Self { serializer } + } + + /// Get the batch serializer. + pub const fn serializer(&self) -> &BatchSerializer { + &self.serializer + } + + /// Get the HTTP content type. + #[cfg(feature = "codecs-arrow")] + pub const fn content_type(&self) -> &'static str { + match &self.serializer { + BatchSerializer::Arrow(_) => "application/vnd.apache.arrow.stream", + } + } +} + +impl tokio_util::codec::Encoder> for BatchEncoder { + type Error = Error; + + #[allow(unused_variables)] + fn encode(&mut self, events: Vec, buffer: &mut BytesMut) -> Result<(), Self::Error> { + #[allow(unreachable_patterns)] + match &mut self.serializer { + #[cfg(feature = "codecs-arrow")] + BatchSerializer::Arrow(serializer) => { + serializer.encode(events, buffer).map_err(|err| { + use vector_lib::codecs::encoding::ArrowEncodingError; + match err { + ArrowEncodingError::NullConstraint { .. } => { + Error::SchemaConstraintViolation(Box::new(err)) + } + _ => Error::SerializingError(Box::new(err)), + } + }) + } + _ => unreachable!("BatchSerializer cannot be constructed without encode()"), + } + } +} + +/// An wrapper that supports both framed and batch encoding modes. +#[derive(Debug, Clone)] +pub enum EncoderKind { + /// Uses framing to encode individual events + Framed(Box>), + /// Encodes events in batches without framing + #[cfg(feature = "codecs-arrow")] + Batch(BatchEncoder), +} + #[derive(Debug, Clone)] /// An encoder that can encode structured events into byte frames. pub struct Encoder diff --git a/src/codecs/encoding/mod.rs b/src/codecs/encoding/mod.rs index 69ede063e896b..36d637bd75090 100644 --- a/src/codecs/encoding/mod.rs +++ b/src/codecs/encoding/mod.rs @@ -3,5 +3,5 @@ mod encoder; mod transformer; pub use config::{EncodingConfig, EncodingConfigWithFraming, SinkType}; -pub use encoder::Encoder; +pub use encoder::{BatchEncoder, BatchSerializer, Encoder, EncoderKind}; pub use transformer::{TimestampFormat, Transformer}; diff --git a/src/codecs/mod.rs b/src/codecs/mod.rs index 4247846cca3a8..32b0e9efb7f8b 100644 --- a/src/codecs/mod.rs +++ b/src/codecs/mod.rs @@ -9,6 +9,7 @@ mod ready_frames; pub use decoding::{Decoder, DecodingConfig}; pub use encoding::{ - Encoder, EncodingConfig, EncodingConfigWithFraming, SinkType, TimestampFormat, Transformer, + BatchEncoder, BatchSerializer, Encoder, EncoderKind, EncodingConfig, EncodingConfigWithFraming, + SinkType, TimestampFormat, Transformer, }; pub use ready_frames::ReadyFrames; diff --git a/src/internal_events/codecs.rs b/src/internal_events/codecs.rs index a2c41b7806be1..0d36a0a0b1fa9 100644 --- a/src/internal_events/codecs.rs +++ b/src/internal_events/codecs.rs @@ -84,9 +84,9 @@ pub struct EncoderSerializeError<'a> { impl InternalEvent for EncoderSerializeError<'_> { fn emit(self) { - let reason = "Failed serializing frame."; + const SERIALIZE_REASON: &str = "Failed serializing frame."; error!( - message = reason, + message = SERIALIZE_REASON, error = %self.error, error_code = "encoder_serialize", error_type = error_type::ENCODER_FAILED, @@ -99,7 +99,10 @@ impl InternalEvent for EncoderSerializeError<'_> { "stage" => error_stage::SENDING, ) .increment(1); - emit!(ComponentEventsDropped:: { count: 1, reason }); + emit!(ComponentEventsDropped:: { + count: 1, + reason: SERIALIZE_REASON + }); } } @@ -132,3 +135,34 @@ impl InternalEvent for EncoderWriteError<'_, E> { } } } + +#[cfg(feature = "codecs-arrow")] +#[derive(Debug)] +pub struct EncoderNullConstraintError<'a> { + pub error: &'a crate::Error, +} + +#[cfg(feature = "codecs-arrow")] +impl InternalEvent for EncoderNullConstraintError<'_> { + fn emit(self) { + const CONSTRAINT_REASON: &str = "Schema constraint violation."; + error!( + message = CONSTRAINT_REASON, + error = %self.error, + error_code = "encoding_null_constraint", + error_type = error_type::ENCODER_FAILED, + stage = error_stage::SENDING, + ); + counter!( + "component_errors_total", + "error_code" => "encoding_null_constraint", + "error_type" => error_type::ENCODER_FAILED, + "stage" => error_stage::SENDING, + ) + .increment(1); + emit!(ComponentEventsDropped:: { + count: 1, + reason: CONSTRAINT_REASON + }); + } +} diff --git a/src/sinks/util/encoding.rs b/src/sinks/util/encoding.rs index bb5a938ec017f..6265021ef6f1a 100644 --- a/src/sinks/util/encoding.rs +++ b/src/sinks/util/encoding.rs @@ -8,6 +8,8 @@ use vector_lib::{ request_metadata::GroupedCountByteSize, }; +#[cfg(feature = "codecs-arrow")] +use crate::internal_events::EncoderNullConstraintError; use crate::{codecs::Transformer, event::Event, internal_events::EncoderWriteError}; pub trait Encoder { @@ -97,6 +99,65 @@ impl Encoder for (Transformer, crate::codecs::Encoder<()>) { } } +#[cfg(feature = "codecs-arrow")] +impl Encoder> for (Transformer, crate::codecs::BatchEncoder) { + fn encode_input( + &self, + events: Vec, + writer: &mut dyn io::Write, + ) -> io::Result<(usize, GroupedCountByteSize)> { + use tokio_util::codec::Encoder as _; + + let mut encoder = self.1.clone(); + let mut byte_size = telemetry().create_request_count_byte_size(); + let n_events = events.len(); + let mut transformed_events = Vec::with_capacity(n_events); + + for mut event in events { + self.0.transform(&mut event); + byte_size.add_event(&event, event.estimated_json_encoded_size_of()); + transformed_events.push(event); + } + + let mut bytes = BytesMut::new(); + encoder + .encode(transformed_events, &mut bytes) + .map_err(|error| { + if let vector_lib::codecs::encoding::Error::SchemaConstraintViolation( + ref constraint_error, + ) = error + { + emit!(EncoderNullConstraintError { + error: constraint_error + }); + } + io::Error::new(io::ErrorKind::InvalidData, error) + })?; + + write_all(writer, n_events, &bytes)?; + Ok((bytes.len(), byte_size)) + } +} + +impl Encoder> for (Transformer, crate::codecs::EncoderKind) { + fn encode_input( + &self, + events: Vec, + writer: &mut dyn io::Write, + ) -> io::Result<(usize, GroupedCountByteSize)> { + // Delegate to the specific encoder implementation + match &self.1 { + crate::codecs::EncoderKind::Framed(encoder) => { + (self.0.clone(), *encoder.clone()).encode_input(events, writer) + } + #[cfg(feature = "codecs-arrow")] + crate::codecs::EncoderKind::Batch(encoder) => { + (self.0.clone(), encoder.clone()).encode_input(events, writer) + } + } + } +} + /// Write the buffer to the writer. If the operation fails, emit an internal event which complies with the /// instrumentation spec- as this necessitates both an Error and EventsDropped event. ///