Bound Syslog/CEF message size to prevent unbounded memory growth#2452
Bound Syslog/CEF message size to prevent unbounded memory growth#2452utpilla wants to merge 4 commits intoopen-telemetry:mainfrom
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #2452 +/- ##
========================================
Coverage 88.40% 88.40%
========================================
Files 603 603
Lines 213389 213561 +172
========================================
+ Hits 188639 188794 +155
- Misses 24224 24241 +17
Partials 526 526
🚀 New features to boost your workflow:
|
| let _ = peer_addr; | ||
|
|
||
| let mut line_bytes = Vec::new(); | ||
| let mut line_bytes = Vec::with_capacity(4096); |
|
|
||
| let socket = effect_handler.udp_socket(udp_config.listening_addr)?; | ||
| let mut buf = [0u8; 1024]; // ToDo: Find out the maximum allowed size for syslog messages | ||
| let mut buf = vec![0u8; MAX_MESSAGE_SIZE]; |
There was a problem hiding this comment.
wondering why would we allocate for worst possible scenario? Could we be more conservative, and accept that vec! will re-grow if we actually has such a big message? Maybe 1/2 or 1/4 of the MAX size for initial size?
There was a problem hiding this comment.
The UDP receive buffer is fundamentally different from TCP's Vec. recv_from writes into a fixed &mut [u8] slice. It doesn't append or grow. If the buffer is smaller than the incoming datagram, the OS kernel silently discards the excess bytes with no way to recover them. So, we must allocate the full MAX_MESSAGE_SIZE upfront to avoid permanent data loss. This is a one-time 16 KiB allocation for the lifetime of the receiver (one per core).
| max_size: usize, | ||
| ) -> std::io::Result<BoundedReadResult> { | ||
| let n = (&mut *reader) | ||
| .take(max_size as u64) |
There was a problem hiding this comment.
I think this doesn’t fully enforce the size cap.
Because the read is inside tokio::select!, it can be interrupted if the batch timer fires before a newline arrives. When that happens, any bytes already appended to line_bytes stay in the buffer, and the next loop iteration starts reading the same message again.
I think read_line_bounded() should consider how much is already in buf and only allow reading the remaining space. If buf is already at the limit, it should return Truncated immediately.
| // Handle incomplete line (log, emit metrics, etc.) | ||
| Ok(bounded_result) => { | ||
| if matches!(bounded_result, BoundedReadResult::Truncated) { | ||
| metrics.borrow_mut().received_logs_truncated.inc(); |
There was a problem hiding this comment.
I see from the PR description that splitting an oversized TCP message into head + tail is intentional, but I’m not sure this is the right semantic for a single syslog message.
In this branch we parse and emit the truncated prefix, clear line_bytes, and then continue reading, which means the remaining bytes from the same original message get emitted as a second record. That makes one input message turn into multiple downstream records, and received_logs_total no longer maps cleanly to original messages.
Why do we want to preserve and emit the tail as a separate logical message instead of either keeping a single truncated record or discarding until the next newline after truncation?
There was a problem hiding this comment.
Why do we want to preserve and emit the tail as a separate logical message instead of either keeping a single truncated record or discarding until the next newline after truncation?
The truncated head preserves the syslog header (timestamp, hostname, severity, facility) which is the most valuable structured metadata. The tail preserves part of the message body. Discarding either means losing data that could be useful for debugging.
The receiver's abstraction is "newline-delimited messages up to 16 KiB each." From its perspective, the head and tail are two separate protocol-level messages. The received_logs_truncated metric (incremented only once per oversized message, not per fragment) alerts operators that truncation is occurring, so they can investigate if needed.
Messages exceeding 16 KiB are extremely rare in practice. When one does occur, it's likely an unusual event worth investigating which makes preserving partial data better than silently dropping it.
Change Summary
Introduces a 16 KiB per-message size limit (
MAX_MESSAGE_SIZE) for the syslog CEF receiver to prevent unbounded memory growth from malicious or misbehaving clients.Before vs After
Vecreceived_logs_truncatedmetricVec::new()(multiple early reallocs)Vec::with_capacity(4096)Problem
read_until(b'\n')would keep appending bytes to aVecuntil a newline appeared. A client sending data without newlines could exhaust memory.Changes
read_line_bounded()helper that wrapsAsyncReadExt::take()+AsyncBufReadExt::read_until()to cap TCP reads atMAX_MESSAGE_SIZE(16 KiB) per message. Returns aBoundedReadResultenum (Complete,Truncated, orEof) so callers can distinguish outcomes.Vec, matching the TCP limit.received_logs_truncatedmetric to track messages that exceeded the size limit. For TCP, truncation is detected precisely. For UDP, it uses a best-effort heuristic (datagram fills buffer).Vec::with_capacity(4096)to avoid early reallocations for typical messages.bufmay contain partial data fromselect!cancellation and must not be cleared between calls unless discarding the current message.Design decisions
received_logs_totalcounts each bounded read as a separate message. An oversized message that splits into head + tail counts as 2.What issue does this PR close?
Towards #1149
How are these changes tested?
read_line_bounded: Empty reader, complete line, truncation, exact-limit edge case, EOF with partial data, and multi-call behavior after truncation.received_logs_truncatedfield.Are there any user-facing changes?
No