Skip to content

Commit 3272b27

Browse files
authored
Merge pull request #3 from vectordotdev/master
Merge upstream
2 parents 7bb0714 + 3eae931 commit 3272b27

File tree

24 files changed

+628
-87
lines changed

24 files changed

+628
-87
lines changed

.github/workflows/test.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ jobs:
163163
- uses: ./.github/actions/setup
164164
with:
165165
rust: true
166-
- run: cd rust-doc && make docs
166+
- run: cd rust-doc && make ci-docs-build
167167

168168
test-vrl:
169169
name: VRL - Linux

Cargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
Fixed a bug in the `file` source, which could silently corrupt data when using multi-char delimiters.
2+
3+
authors: lfrancke
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
Added the following metrics to record the utilization level of the buffer that
2+
all sources send into:
3+
4+
- `source_buffer_max_byte_size`
5+
- `source_buffer_max_event_size`
6+
- `source_buffer_utilization`
7+
- `source_buffer_utilization_level`
8+
9+
authors: bruceg

lib/dnstap-parser/src/vrl_functions/parse_dnstap.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ impl Function for ParseDnstap {
2929
}
3030

3131
fn examples(&self) -> &'static [Example] {
32-
&[Example {
32+
&[example!(
3333
title: "Parse dnstap query message",
3434
source: r#"parse_dnstap!("ChVqYW1lcy1WaXJ0dWFsLU1hY2hpbmUSC0JJTkQgOS4xNi4zGgBy5wEIAxACGAEiEAAAAAAAAAAAAAAAAAAAAAAqECABBQJwlAAAAAAAAAAAADAw8+0CODVA7+zq9wVNMU3WNlI2kwIAAAABAAAAAAABCWZhY2Vib29rMQNjb20AAAEAAQAAKQIAAACAAAAMAAoACOxjCAG9zVgzWgUDY29tAGAAbQAAAAByZLM4AAAAAQAAAAAAAQJoNQdleGFtcGxlA2NvbQAABgABAAApBNABAUAAADkADwA1AAlubyBTRVAgbWF0Y2hpbmcgdGhlIERTIGZvdW5kIGZvciBkbnNzZWMtZmFpbGVkLm9yZy54AQ==")"#,
3535
result: Ok(indoc!(
@@ -135,7 +135,7 @@ impl Function for ParseDnstap {
135135
"timestamp": "2020-06-30T03:50:07.920014129Z"
136136
}"#
137137
)),
138-
}]
138+
)]
139139
}
140140

141141
fn compile(

lib/enrichment/src/find_enrichment_table_records.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,15 +82,15 @@ impl Function for FindEnrichmentTableRecords {
8282
}
8383

8484
fn examples(&self) -> &'static [Example] {
85-
&[Example {
85+
&[example!(
8686
title: "find records",
8787
source: r#"find_enrichment_table_records!("test", {"surname": "Smith"})"#,
8888
result: Ok(
8989
indoc! { r#"[{"id": 1, "firstname": "Bob", "surname": "Smith"},
9090
{"id": 2, "firstname": "Fred", "surname": "Smith"}]"#,
9191
},
9292
),
93-
}]
93+
)]
9494
}
9595

9696
fn compile(

lib/enrichment/src/get_enrichment_table_record.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,11 +79,11 @@ impl Function for GetEnrichmentTableRecord {
7979
}
8080

8181
fn examples(&self) -> &'static [Example] {
82-
&[Example {
82+
&[example!(
8383
title: "find records",
8484
source: r#"get_enrichment_table_record!("test", {"id": 1})"#,
8585
result: Ok(r#"{"id": 1, "firstname": "Bob", "surname": "Smith"}"#),
86-
}]
86+
)]
8787
}
8888

8989
fn compile(

lib/file-source-common/src/buffer.rs

Lines changed: 222 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,56 @@ pub async fn read_until_with_max_size<'a, R: AsyncBufRead + ?Sized + Unpin>(
4646
let delim_len = delim.len();
4747
let mut discarded_for_size_and_truncated = Vec::new();
4848
let mut reader = Box::new(reader);
49+
50+
// Used to track partial delimiter matches across buffer boundaries.
51+
// Data is read in chunks from the reader (see `fill_buf` below).
52+
// A multi-byte delimiter may be split across the "old" and "new" buffers.
53+
// Any potential partial delimiter that was found in the "old" buffer is stored in this variable.
54+
let mut partial_delim: BytesMut = BytesMut::with_capacity(delim_len);
55+
4956
loop {
57+
// Read the next chunk of data
5058
let available: &[u8] = match reader.fill_buf().await {
5159
Ok(n) => n,
5260
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => continue,
5361
Err(e) => return Err(e),
5462
};
5563

64+
// First, check if we have a partial delimiter from the previous iteration/buffer
65+
if !partial_delim.is_empty() {
66+
let expected_suffix = &delim[partial_delim.len()..];
67+
let expected_suffix_len = expected_suffix.len();
68+
69+
// We already know that we have a partial delimiter match from the previous buffer.
70+
// Here we check what part of the delimiter is missing and whether the new buffer
71+
// contains the remaining part.
72+
if available.len() >= expected_suffix_len
73+
&& &available[..expected_suffix_len] == expected_suffix
74+
{
75+
// Complete delimiter found! Consume the remainder of the delimiter so we can start
76+
// processing data after the delimiter.
77+
reader.consume(expected_suffix_len);
78+
*position += expected_suffix_len as u64;
79+
total_read += expected_suffix_len;
80+
partial_delim.clear();
81+
82+
// Found a complete delimiter, return the current buffer so we can proceed with the
83+
// next record after this delimiter in the next call.
84+
return Ok(ReadResult {
85+
successfully_read: Some(total_read),
86+
discarded_for_size_and_truncated,
87+
});
88+
} else {
89+
// Not a complete delimiter after all.
90+
// Add partial_delim to output buffer as it is actual data.
91+
if !discarding {
92+
buf.extend_from_slice(&partial_delim);
93+
}
94+
partial_delim.clear();
95+
// Continue processing current available buffer
96+
}
97+
}
98+
5699
let (done, used) = {
57100
match delim_finder.find(available) {
58101
Some(i) => {
@@ -62,13 +105,75 @@ pub async fn read_until_with_max_size<'a, R: AsyncBufRead + ?Sized + Unpin>(
62105
(true, i + delim_len)
63106
}
64107
None => {
65-
if !discarding {
66-
buf.extend_from_slice(available);
108+
// No delimiter found in current buffer. But there could be a partial delimiter
109+
// at the end of this buffer. For multi-byte delimiters like \r\n, we need
110+
// to handle the case where the delimiter is split across buffer boundaries
111+
// (e.g. \r in the "old" buffer, then we read new data and find \n in the new
112+
// buffer).
113+
let mut partial_match_len = 0;
114+
115+
// We only need to check if we're not already at the end of the buffer and if we
116+
// have a delimiter that has more than one byte.
117+
if !available.is_empty() && delim_len > 1 {
118+
// Check if the end of the current buffer matches a prefix of the delimiter
119+
// by testing from longest to shortest possible prefix.
120+
//
121+
// This loop runs at most (delim_len - 1) iterations:
122+
// - 2-byte delimiter (\r\n): 1 iteration max
123+
// - 5-byte delimiter: 4 iterations max
124+
//
125+
// This part of the code is only called if all of these are true:
126+
//
127+
// - We have a new buffer (e.g. every 8kB, i.e. only called once per buffer)
128+
// - We have a multi-byte delimiter
129+
// - This delimiter could not be found in the current buffer
130+
//
131+
// Even for longer delimiters the performance impact is negligible.
132+
//
133+
// Example 1:
134+
// Delimiter: \r\n
135+
// Iteration 1: It checks if the current buffer ends with "\r",
136+
// if it does we have a potential partial delimiter.
137+
// The next chunk will confirm whether this is truly part of a delimiter.
138+
139+
// Example 2:
140+
// Delimiter: ABCDE
141+
// Iteration 1: It checks if the current buffer ends with "ABCD" (we don't
142+
// need to check "ABCDE" because that would have been caught by
143+
// `delim_finder.find` earlier)
144+
// Iteration 2: It checks if the current buffer ends with "ABC"
145+
// Iterations 3-4: Same for "AB" and "A"
146+
for prefix_len in (1..delim_len).rev() {
147+
if available.len() >= prefix_len
148+
&& available.ends_with(&delim[..prefix_len])
149+
{
150+
partial_match_len = prefix_len;
151+
break;
152+
}
153+
}
154+
}
155+
156+
let bytes_to_copy = available.len() - partial_match_len;
157+
158+
if !discarding && bytes_to_copy > 0 {
159+
buf.extend_from_slice(&available[..bytes_to_copy]);
67160
}
161+
162+
// If we found a potential partial delimiter, save it for the next iteration
163+
if partial_match_len > 0 {
164+
partial_delim.clear();
165+
partial_delim.extend_from_slice(&available[bytes_to_copy..]);
166+
}
167+
68168
(false, available.len())
69169
}
70170
}
71171
};
172+
173+
// Check if we're at EOF before we start processing
174+
// (for borrow checker, has to come before `consume`)
175+
let at_eof = available.is_empty();
176+
72177
reader.consume(used);
73178
*position += used as u64; // do this at exactly same time
74179
total_read += used;
@@ -92,11 +197,12 @@ pub async fn read_until_with_max_size<'a, R: AsyncBufRead + ?Sized + Unpin>(
92197
discarding = false;
93198
buf.clear();
94199
}
95-
} else if used == 0 {
96-
// We've hit EOF but not yet seen a newline. This can happen when unlucky timing causes
97-
// us to observe an incomplete write. We return None here and let the loop continue
98-
// next time the method is called. This is safe because the buffer is specific to this
99-
// FileWatcher.
200+
} else if used == 0 && at_eof {
201+
// We've hit EOF but haven't seen a delimiter. This can happen when:
202+
// 1. The file ends without a trailing delimiter
203+
// 2. We're observing an incomplete write
204+
//
205+
// Return None to signal the caller to retry later.
100206
return Ok(ReadResult {
101207
successfully_read: None,
102208
discarded_for_size_and_truncated,
@@ -262,4 +368,113 @@ mod test {
262368
.await
263369
.unwrap()
264370
}
371+
372+
/// Generic test helper that tests delimiter splits across buffer boundaries
373+
/// for any delimiter length. This function:
374+
/// 1. Creates test data with delimiters positioned to split at buffer boundaries
375+
/// 2. Tests multiple iterations to ensure state tracking works correctly
376+
/// 3. Verifies all lines are correctly separated without merging
377+
async fn test_delimiter_boundary_split_helper(delimiter: &[u8], num_lines: usize) {
378+
let delimiter_len = delimiter.len();
379+
380+
// Use a buffer capacity that will force splits
381+
// We'll position delimiters to split at this boundary
382+
let buffer_capacity = 10;
383+
384+
// Build test data where each delimiter is positioned to split across buffer boundary
385+
// Strategy: For each line, calculate position so delimiter starts at boundary - (delimiter_len - 1)
386+
let mut data = Vec::new();
387+
let mut expected_lines = Vec::new();
388+
389+
for i in 0..num_lines {
390+
// Create line content that positions the delimiter to split at buffer boundary
391+
// We want the delimiter to straddle a buffer_capacity boundary
392+
393+
// Calculate how many bytes until the next buffer boundary
394+
let current_pos = data.len();
395+
let bytes_until_boundary = buffer_capacity - (current_pos % buffer_capacity);
396+
397+
// Create line content that will position delimiter to split
398+
// We want (delimiter_len - 1) bytes before boundary, then 1 byte after
399+
let line_content = if bytes_until_boundary > delimiter_len {
400+
let content_len = bytes_until_boundary - (delimiter_len - 1);
401+
format!("line{:0width$}", i, width = content_len.saturating_sub(4)).into_bytes()
402+
} else {
403+
// Not enough room in this buffer, pad to next boundary
404+
let padding = bytes_until_boundary;
405+
let extra_content = buffer_capacity - (delimiter_len - 1);
406+
let mut content = vec![b'X'; padding];
407+
content.extend_from_slice(
408+
format!("L{:0width$}", i, width = extra_content.saturating_sub(1)).as_bytes(),
409+
);
410+
content
411+
};
412+
413+
expected_lines.push(line_content.clone());
414+
data.extend_from_slice(&line_content);
415+
data.extend_from_slice(delimiter);
416+
}
417+
418+
// Now test reading this data
419+
let cursor = Cursor::new(data);
420+
let mut reader = BufReader::with_capacity(buffer_capacity, cursor);
421+
let mut position = 0;
422+
let max_size = 1024;
423+
424+
// Read each line and verify it matches expected
425+
for (i, expected_line) in expected_lines.iter().enumerate() {
426+
let mut buffer = BytesMut::new();
427+
let result = read_until_with_max_size(
428+
Box::pin(&mut reader),
429+
&mut position,
430+
delimiter,
431+
&mut buffer,
432+
max_size,
433+
)
434+
.await
435+
.unwrap();
436+
437+
assert_eq!(
438+
buffer.as_ref(),
439+
expected_line.as_slice(),
440+
"Line {} should match expected content. Got: {:?}, Expected: {:?}",
441+
i,
442+
String::from_utf8_lossy(&buffer),
443+
String::from_utf8_lossy(expected_line)
444+
);
445+
446+
assert!(
447+
result.successfully_read.is_some(),
448+
"Should find delimiter for line {}",
449+
i
450+
);
451+
}
452+
}
453+
454+
#[tokio::test]
455+
async fn test_single_byte_delimiter_boundary() {
456+
// Test single-byte delimiter (should work without any special handling)
457+
test_delimiter_boundary_split_helper(b"\n", 5).await;
458+
}
459+
460+
#[tokio::test]
461+
async fn test_two_byte_delimiter_boundary() {
462+
// Test two-byte delimiter (CRLF case)
463+
test_delimiter_boundary_split_helper(b"\r\n", 5).await;
464+
}
465+
466+
#[tokio::test]
467+
async fn test_three_byte_delimiter_boundary() {
468+
test_delimiter_boundary_split_helper(b"|||", 5).await;
469+
}
470+
471+
#[tokio::test]
472+
async fn test_four_byte_delimiter_boundary() {
473+
test_delimiter_boundary_split_helper(b"<|>|", 5).await;
474+
}
475+
476+
#[tokio::test]
477+
async fn test_five_byte_delimiter_boundary() {
478+
test_delimiter_boundary_split_helper(b"<<>>>", 5).await;
479+
}
265480
}

0 commit comments

Comments
 (0)