Skip to content

Commit 01e0b1d

Browse files
authored
Merge branch 'master' into pront-fix-top-tests
2 parents 6b30e1d + 5ce1198 commit 01e0b1d

File tree

21 files changed

+251
-53
lines changed

21 files changed

+251
-53
lines changed

Cargo.lock

Lines changed: 8 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -497,8 +497,6 @@ wiremock = "0.6.4"
497497
zstd = { version = "0.13.0", default-features = false }
498498

499499
[patch.crates-io]
500-
# The upgrade for `tokio-util` >= 0.6.9 is blocked on https://github.com/vectordotdev/vector/issues/11257.
501-
tokio-util = { git = "https://github.com/vectordotdev/tokio", branch = "tokio-util-0.7.13-framed-read-continue-on-error" }
502500
nix = { git = "https://github.com/vectordotdev/nix.git", branch = "memfd/gnu/musl" }
503501
# The `heim` crates depend on `ntapi` 0.3.7 on Windows, but that version has an
504502
# unaligned access bug fixed in the following revision.
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
Remove the `tokio-util` patch override and preserve recoverable decoding behavior via `DecoderFramedRead`.
2+
3+
authors: Trighap52

lib/codecs/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ memchr = { version = "2", default-features = false }
3131
metrics.workspace = true
3232
opentelemetry-proto = { path = "../opentelemetry-proto", optional = true }
3333
ordered-float.workspace = true
34+
pin-project.workspace = true
3435
prost.workspace = true
3536
prost-reflect.workspace = true
3637
rand.workspace = true
Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
use bytes::BytesMut;
2+
use futures::Stream;
3+
use pin_project::pin_project;
4+
use std::{
5+
io,
6+
pin::Pin,
7+
task::{Context, Poll},
8+
};
9+
use tokio::io::AsyncRead;
10+
use tokio_util::codec::{Decoder, FramedRead};
11+
12+
/// Internal wrapper that converts decoder errors into successful results.
13+
///
14+
/// This wrapper transforms a decoder's error result from `Err(error)` into
15+
/// `Ok(Some(Err(error)))`, which prevents `FramedRead` from terminating the stream
16+
/// while still propagating the error to the caller.
17+
struct DecoderResultWrapper<D> {
18+
inner: D,
19+
}
20+
21+
impl<D> DecoderResultWrapper<D>
22+
where
23+
D: Decoder,
24+
{
25+
const fn new(inner: D) -> Self {
26+
Self { inner }
27+
}
28+
}
29+
30+
impl<D> Decoder for DecoderResultWrapper<D>
31+
where
32+
D: Decoder,
33+
{
34+
type Item = Result<D::Item, D::Error>;
35+
type Error = io::Error;
36+
37+
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
38+
match self.inner.decode(src) {
39+
Ok(item) => Ok(item.map(Ok)),
40+
Err(error) => Ok(Some(Err(error))),
41+
}
42+
}
43+
44+
fn decode_eof(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
45+
match self.inner.decode_eof(src) {
46+
Ok(item) => Ok(item.map(Ok)),
47+
Err(error) => Ok(Some(Err(error))),
48+
}
49+
}
50+
}
51+
52+
/// A `tokio_util::codec::FramedRead` wrapper that continues decoding after recoverable decoder errors.
53+
///
54+
/// # Problem
55+
///
56+
/// The standard `tokio_util::codec::FramedRead` terminates the stream when a decoder
57+
/// returns an error. This is problematic for Vector because:
58+
/// - Vector decoders classify some errors as recoverable (e.g., malformed JSON in one line
59+
/// shouldn't stop processing subsequent valid lines)
60+
/// - Sources need to continue processing data even after encountering decode errors
61+
/// - Metrics and observability require tracking both successful and failed decode attempts
62+
///
63+
/// # Solution
64+
///
65+
/// `DecoderFramedRead` wraps the decoder in a `DecoderResultWrapper` that transforms
66+
/// decoder errors into successful results containing the error. This allows:
67+
/// - The stream to continue after errors
68+
/// - Callers to inspect errors and decide whether to continue (via `StreamDecodingError::can_continue()`)
69+
/// - Proper error metrics and logging
70+
///
71+
/// # When to Use
72+
///
73+
/// Use `DecoderFramedRead` when:
74+
/// - You're using a Vector `Decoder` that implements error recovery logic
75+
/// - You need to continue processing after decode errors
76+
/// - You're processing line-delimited or record-based formats where one bad record shouldn't stop processing
77+
///
78+
/// Use standard `FramedRead` when:
79+
/// - You're using simple decoders (e.g., `CharacterDelimitedDecoder`) that don't need error recovery
80+
/// - Any decode error should terminate the stream
81+
/// - You're working with binary protocols where errors indicate corruption
82+
///
83+
/// # Example
84+
///
85+
/// ```ignore
86+
/// use vector_lib::codecs::{DecoderFramedRead, Decoder};
87+
/// use futures::StreamExt;
88+
///
89+
/// let decoder = Decoder::new(
90+
/// Framer::NewlineDelimited(NewlineDelimitedDecoder::new()),
91+
/// Deserializer::Json(JsonDeserializer::default()),
92+
/// );
93+
///
94+
/// let mut stream = DecoderFramedRead::new(reader, decoder);
95+
///
96+
/// while let Some(result) = stream.next().await {
97+
/// match result {
98+
/// Ok(events) => process_events(events),
99+
/// Err(error) if error.can_continue() => {
100+
/// // Log the error but continue processing
101+
/// warn!("Decode error (continuing): {}", error);
102+
/// }
103+
/// Err(error) => {
104+
/// // Fatal error, stop processing
105+
/// error!("Fatal decode error: {}", error);
106+
/// break;
107+
/// }
108+
/// }
109+
/// }
110+
/// ```
111+
#[pin_project]
112+
pub struct DecoderFramedRead<T, D> {
113+
#[pin]
114+
inner: FramedRead<T, DecoderResultWrapper<D>>,
115+
}
116+
117+
impl<T, D> DecoderFramedRead<T, D>
118+
where
119+
T: AsyncRead,
120+
D: Decoder,
121+
{
122+
/// Creates a new `DecoderFramedRead` with the given decoder.
123+
///
124+
/// This wraps the provided decoder to enable error recovery, allowing the stream
125+
/// to continue processing after recoverable decode errors.
126+
///
127+
/// # Arguments
128+
///
129+
/// * `inner` - The async reader to read from
130+
/// * `decoder` - The decoder to use for parsing data
131+
pub fn new(inner: T, decoder: D) -> Self {
132+
Self {
133+
inner: FramedRead::new(inner, DecoderResultWrapper::new(decoder)),
134+
}
135+
}
136+
137+
/// Creates a new `DecoderFramedRead` with a specific buffer capacity.
138+
///
139+
/// Use this when you know the expected message size to optimize memory usage.
140+
///
141+
/// # Arguments
142+
///
143+
/// * `inner` - The async reader to read from
144+
/// * `decoder` - The decoder to use for parsing data
145+
/// * `capacity` - The initial buffer capacity in bytes
146+
pub fn with_capacity(inner: T, decoder: D, capacity: usize) -> Self {
147+
Self {
148+
inner: FramedRead::with_capacity(inner, DecoderResultWrapper::new(decoder), capacity),
149+
}
150+
}
151+
152+
/// Returns a reference to the underlying I/O stream.
153+
///
154+
/// This is useful for accessing the underlying reader's properties or state
155+
/// without consuming the `DecoderFramedRead`.
156+
pub fn get_ref(&self) -> &T {
157+
self.inner.get_ref()
158+
}
159+
160+
/// Returns a mutable reference to the underlying I/O stream.
161+
///
162+
/// This allows modifying the underlying reader's state, though care should be
163+
/// taken not to interfere with ongoing decoding operations.
164+
pub fn get_mut(&mut self) -> &mut T {
165+
self.inner.get_mut()
166+
}
167+
168+
/// Returns a reference to the internal read buffer.
169+
///
170+
/// This provides access to any buffered but not yet decoded data. Useful for
171+
/// debugging or implementing custom recovery logic.
172+
pub fn read_buffer(&self) -> &BytesMut {
173+
self.inner.read_buffer()
174+
}
175+
}
176+
177+
impl<T, D> Stream for DecoderFramedRead<T, D>
178+
where
179+
T: AsyncRead,
180+
D: Decoder,
181+
D::Error: From<io::Error>,
182+
{
183+
type Item = Result<D::Item, D::Error>;
184+
185+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
186+
let this = self.project();
187+
188+
// The DecoderResultWrapper transforms errors into Ok(Err(...)) so the stream continues.
189+
// We need to unwrap this double Result structure here.
190+
match this.inner.poll_next(cx) {
191+
Poll::Ready(Some(Ok(Ok(item)))) => Poll::Ready(Some(Ok(item))),
192+
Poll::Ready(Some(Ok(Err(error)))) => Poll::Ready(Some(Err(error))),
193+
Poll::Ready(Some(Err(error))) => Poll::Ready(Some(Err(error.into()))),
194+
Poll::Ready(None) => Poll::Ready(None),
195+
Poll::Pending => Poll::Pending,
196+
}
197+
}
198+
}

lib/codecs/src/decoding/decoder.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -105,12 +105,12 @@ impl tokio_util::codec::Decoder for Decoder {
105105
mod tests {
106106
use bytes::Bytes;
107107
use futures::{StreamExt, stream};
108-
use tokio_util::{codec::FramedRead, io::StreamReader};
108+
use tokio_util::io::StreamReader;
109109
use vrl::value::Value;
110110

111111
use super::Decoder;
112112
use crate::{
113-
JsonDeserializer, NewlineDelimitedDecoder, StreamDecodingError,
113+
DecoderFramedRead, JsonDeserializer, NewlineDelimitedDecoder, StreamDecodingError,
114114
decoding::{Deserializer, Framer},
115115
};
116116

@@ -127,7 +127,7 @@ mod tests {
127127
Framer::NewlineDelimited(NewlineDelimitedDecoder::new()),
128128
Deserializer::Json(JsonDeserializer::default()),
129129
);
130-
let mut stream = FramedRead::new(reader, decoder);
130+
let mut stream = DecoderFramedRead::new(reader, decoder);
131131

132132
let next = stream.next().await.unwrap();
133133
let event = next.unwrap().0.pop().unwrap().into_log();

lib/codecs/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,14 @@
55
#![deny(warnings)]
66

77
mod common;
8+
mod decoder_framed_read;
89
pub mod decoding;
910
pub mod encoding;
1011
pub mod gelf;
1112
pub mod internal_events;
1213
mod ready_frames;
1314

15+
pub use decoder_framed_read::DecoderFramedRead;
1416
pub use decoding::{
1517
BytesDecoder, BytesDecoderConfig, BytesDeserializer, BytesDeserializerConfig,
1618
CharacterDelimitedDecoder, CharacterDelimitedDecoderConfig, Decoder, DecodingConfig,

src/sources/amqp.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,12 @@ use futures::{FutureExt, StreamExt};
99
use futures_util::Stream;
1010
use lapin::{Channel, acker::Acker, message::Delivery, options::BasicQosOptions};
1111
use snafu::Snafu;
12-
use tokio_util::codec::FramedRead;
1312
use vector_lib::{
1413
EstimatedJsonEncodedSizeOf,
15-
codecs::decoding::{DeserializerConfig, FramingConfig},
14+
codecs::{
15+
DecoderFramedRead,
16+
decoding::{DeserializerConfig, FramingConfig},
17+
},
1618
config::{LegacyKey, LogNamespace, SourceAcknowledgementsConfig, log_schema},
1719
configurable::configurable_component,
1820
event::{Event, LogEvent},
@@ -322,7 +324,7 @@ async fn receive_event(
322324
) -> Result<(), ()> {
323325
let payload = Cursor::new(Bytes::copy_from_slice(&msg.data));
324326
let decoder = config.decoder(log_namespace).map_err(|_e| ())?;
325-
let mut stream = FramedRead::new(payload, decoder);
327+
let mut stream = DecoderFramedRead::new(payload, decoder);
326328

327329
// Extract timestamp from AMQP message
328330
let timestamp = msg

src/sources/aws_kinesis_firehose/handlers.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,10 @@ use chrono::Utc;
66
use flate2::read::MultiGzDecoder;
77
use futures::StreamExt;
88
use snafu::{ResultExt, Snafu};
9-
use tokio_util::codec::FramedRead;
109
use vector_common::constants::GZIP_MAGIC;
1110
use vector_lib::{
1211
EstimatedJsonEncodedSizeOf,
13-
codecs::StreamDecodingError,
12+
codecs::{DecoderFramedRead, StreamDecodingError},
1413
config::{LegacyKey, LogNamespace},
1514
event::BatchNotifier,
1615
finalization::AddBatchNotifier,
@@ -68,7 +67,7 @@ pub(super) async fn firehose(
6867
.map_err(reject::custom)?;
6968
context.bytes_received.emit(ByteSize(bytes.len()));
7069

71-
let mut stream = FramedRead::new(bytes.as_ref(), context.decoder.clone());
70+
let mut stream = DecoderFramedRead::new(bytes.as_ref(), context.decoder.clone());
7271
loop {
7372
match stream.next().await {
7473
Some(Ok((mut events, _byte_size))) => {

src/sources/demo_logs.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,10 @@ use rand::prelude::IndexedRandom;
77
use serde_with::serde_as;
88
use snafu::Snafu;
99
use tokio::time::{self, Duration};
10-
use tokio_util::codec::FramedRead;
1110
use vector_lib::{
1211
EstimatedJsonEncodedSizeOf,
1312
codecs::{
14-
StreamDecodingError,
13+
DecoderFramedRead, StreamDecodingError,
1514
decoding::{DeserializerConfig, FramingConfig},
1615
},
1716
config::{DataType, LegacyKey, LogNamespace},
@@ -234,7 +233,7 @@ async fn demo_logs_source(
234233

235234
let line = format.generate_line(n);
236235

237-
let mut stream = FramedRead::new(line.as_bytes(), decoder.clone());
236+
let mut stream = DecoderFramedRead::new(line.as_bytes(), decoder.clone());
238237
while let Some(next) = stream.next().await {
239238
match next {
240239
Ok((events, _byte_size)) => {

0 commit comments

Comments
 (0)