6
6
* LICENSE file in the root directory of this source tree.
7
7
*/
8
8
9
- use std:: mem:: swap;
10
9
use std:: mem:: take;
11
10
use std:: ops:: DerefMut ;
12
11
use std:: sync:: Arc ;
@@ -19,6 +18,9 @@ use tokio::io::AsyncWrite;
19
18
use tokio:: io:: AsyncWriteExt ;
20
19
use tokio:: io:: BufReader ;
21
20
21
+ /// Maximum byte size of a single log line before truncation
22
+ const MAX_BYTE_SIZE_LOG_LINE : usize = 256 * 1024 ;
23
+
22
24
/// A tailer (ring buffer) of (text) log lines.
23
25
pub struct LogTailer {
24
26
state : Arc < Mutex < State > > ,
@@ -32,6 +34,25 @@ struct State {
32
34
}
33
35
34
36
impl LogTailer {
37
+ /// Helper method to push a line to the ring buffer
38
+ fn push_line_to_buffer ( state : & Arc < Mutex < State > > , byte_buffer : & mut [ u8 ] , max : usize ) {
39
+ // use lossy string rather than truncated valid utf8
40
+ // from_utf8_lossy(b"Hello\xFF\xFEWorld") returns "Hello��World"
41
+ let mut buffer: String = String :: from_utf8_lossy ( byte_buffer) . to_string ( ) ;
42
+ // Remove trailing newline if present
43
+ while buffer. ends_with ( '\n' ) {
44
+ buffer. pop ( ) ;
45
+ }
46
+ let mut locked = state. lock ( ) . unwrap ( ) ;
47
+ let next = locked. next ;
48
+ if next < locked. lines . len ( ) {
49
+ locked. lines [ next] = buffer;
50
+ } else {
51
+ locked. lines . push ( buffer. clone ( ) ) ;
52
+ }
53
+ locked. next = ( next + 1 ) % max;
54
+ }
55
+
35
56
/// Create a new tailer given a `stream`. The tailer tails the reader in the
36
57
/// background, while keeping at most `max` log lines in its buffer. The tailer
37
58
/// stops when the stream is ended (i.e., returns an EOF).
@@ -58,25 +79,66 @@ impl LogTailer {
58
79
// and make this awaitable, etc
59
80
let handle = tokio:: spawn ( async move {
60
81
let mut reader = BufReader :: new ( stream) ;
61
- let mut buffer = String :: new ( ) ;
82
+ let mut skip_until_newline = false ;
83
+ let mut byte_buffer: Vec < u8 > = Vec :: new ( ) ;
62
84
loop {
63
- buffer. clear ( ) ; // clear retains the buffer
64
- // TODO: we should probably limit line length
65
- if reader. read_line ( & mut buffer) . await ? == 0 {
85
+ // this gives at most a reference to 8KB of data in the internal buffer
86
+ // based on internal implementation of BufReader's `DEFAULT_BUF_SIZE`
87
+ let reader_buf = reader. fill_buf ( ) . await ?;
88
+
89
+ if reader_buf. is_empty ( ) {
90
+ // EOF reached, write any remaining buffer content as a line
91
+ if !byte_buffer. is_empty ( ) {
92
+ Self :: push_line_to_buffer ( & state, & mut byte_buffer, max) ;
93
+ }
66
94
break Ok ( ( ) ) ;
67
95
}
68
- let _ = tee. write_all ( buffer. as_bytes ( ) ) . await ;
69
- while buffer. ends_with ( '\n' ) {
70
- buffer. pop ( ) ;
96
+
97
+ // find newline pos or the end of buffer if no newline found
98
+ let new_line_pos = reader_buf
99
+ . iter ( )
100
+ . position ( |& b| b == b'\n' )
101
+ . unwrap_or ( reader_buf. len ( ) ) ;
102
+
103
+ if skip_until_newline {
104
+ // funnel through the tee stream
105
+ let mut to_consume = reader_buf. len ( ) ;
106
+ if new_line_pos != reader_buf. len ( ) {
107
+ to_consume = new_line_pos + 1 ;
108
+ skip_until_newline = false ;
109
+ }
110
+ tee. write_all ( & reader_buf[ ..to_consume] ) . await ?;
111
+ reader. consume ( to_consume) ;
112
+ continue ;
71
113
}
72
- let mut locked = state. lock ( ) . unwrap ( ) ;
73
- let next = locked. next ;
74
- if next < locked. lines . len ( ) {
75
- swap ( & mut locked. lines [ next] , & mut buffer) ;
114
+
115
+ let to_be_consumed = if new_line_pos != reader_buf. len ( ) {
116
+ new_line_pos + 1
76
117
} else {
77
- locked. lines . push ( buffer. clone ( ) ) ;
118
+ reader_buf. len ( )
119
+ } ;
120
+
121
+ byte_buffer. extend ( & reader_buf[ ..to_be_consumed] ) ;
122
+ tee. write_all ( & reader_buf[ ..to_be_consumed] ) . await ?;
123
+ if byte_buffer. len ( ) >= MAX_BYTE_SIZE_LOG_LINE || new_line_pos != reader_buf. len ( ) {
124
+ skip_until_newline = byte_buffer. len ( ) >= MAX_BYTE_SIZE_LOG_LINE
125
+ && new_line_pos == reader_buf. len ( ) ;
126
+ // Truncate to MAX_BYTE_SIZE_LOG_LINE if necessary before pushing
127
+ if byte_buffer. len ( ) > MAX_BYTE_SIZE_LOG_LINE {
128
+ byte_buffer. truncate ( MAX_BYTE_SIZE_LOG_LINE ) ;
129
+ }
130
+
131
+ // we are pushing a line that doesnt have a newline
132
+ if byte_buffer. len ( ) == MAX_BYTE_SIZE_LOG_LINE
133
+ && new_line_pos == reader_buf. len ( )
134
+ {
135
+ byte_buffer. extend_from_slice ( "<TRUNCATED>" . as_bytes ( ) ) ;
136
+ }
137
+ Self :: push_line_to_buffer ( & state, & mut byte_buffer, max) ;
138
+ byte_buffer. clear ( ) ;
78
139
}
79
- locked. next = ( next + 1 ) % max;
140
+
141
+ reader. consume ( to_be_consumed) ;
80
142
}
81
143
} ) ;
82
144
@@ -92,7 +154,6 @@ impl LogTailer {
92
154
lines. rotate_left ( next) ;
93
155
lines
94
156
}
95
-
96
157
/// Abort the tailer. This will stop any ongoing reads, and drop the
97
158
/// stream. Abort is complete after `join` returns.
98
159
pub fn abort ( & self ) {
@@ -143,6 +204,83 @@ mod tests {
143
204
assert_eq ! ( lines. next_line( ) . await . unwrap( ) . unwrap( ) , "world" ) ;
144
205
}
145
206
207
+ #[ tokio:: test]
208
+ async fn test_read_buffer_boundary ( ) {
209
+ let mut input_bytes = Vec :: new ( ) ;
210
+ // reader buffer's default size is 8KB. We assert that the tee function reads
211
+ // correctly when the lines are exactly 8KB and 8KB + 1 bytes
212
+ input_bytes. extend ( vec ! [ b'a' ; 8191 ] ) ;
213
+ input_bytes. extend ( [ b'\n' ] ) ;
214
+ input_bytes. extend ( vec ! [ b'b' ; 8192 ] ) ;
215
+ let reader = Cursor :: new ( input_bytes) ;
216
+
217
+ let ( lines, result) = LogTailer :: new ( 5 , reader) . join ( ) . await ;
218
+ assert ! ( result. is_ok( ) ) ;
219
+
220
+ // Should have 3 lines
221
+ assert_eq ! ( lines. len( ) , 2 ) ;
222
+
223
+ assert_eq ! ( lines[ 0 ] , format!( "{}" , "a" . repeat( 8191 ) ) ) ;
224
+
225
+ assert_eq ! ( lines[ 1 ] , format!( "{}" , "b" . repeat( 8192 ) ) ) ;
226
+ }
227
+
228
+ #[ tokio:: test]
229
+ async fn test_line_truncation ( ) {
230
+ // Create input with 3 MAX_BYTE_SIZE_LOG_LINE-byte lines
231
+ let mut input_bytes = Vec :: new ( ) ;
232
+ // first line is exactly `MAX_BYTE_SIZE_LOG_LINE` bytes including `\n`
233
+ input_bytes. extend ( vec ! [ b'a' ; MAX_BYTE_SIZE_LOG_LINE - 1 ] ) ;
234
+ input_bytes. extend ( [ b'\n' ] ) ;
235
+
236
+ // second line is more than `MAX_BYTE_SIZE_LOG_LINE` bytes including `\n`
237
+ input_bytes. extend ( vec ! [ b'b' ; MAX_BYTE_SIZE_LOG_LINE ] ) ;
238
+ input_bytes. extend ( [ b'\n' ] ) ;
239
+
240
+ // last line of the input stream is < `MAX_BYTE_SIZE_LOG_LINE` bytes to ensure complete flush
241
+ input_bytes. extend ( vec ! [ b'c' ; MAX_BYTE_SIZE_LOG_LINE - 1 ] ) ;
242
+
243
+ let reader = Cursor :: new ( input_bytes) ;
244
+
245
+ let ( lines, result) = LogTailer :: new ( 5 , reader) . join ( ) . await ;
246
+ assert ! ( result. is_ok( ) ) ;
247
+
248
+ // Should have 3 lines
249
+ assert_eq ! ( lines. len( ) , 3 ) ;
250
+
251
+ // First line should be MAX_BYTE_SIZE_LOG_LINE-1 'a's
252
+ assert_eq ! (
253
+ lines[ 0 ] ,
254
+ format!( "{}" , "a" . repeat( MAX_BYTE_SIZE_LOG_LINE - 1 ) )
255
+ ) ;
256
+
257
+ // Second line should be `MAX_BYTE_SIZE_LOG_LINE` 'b's + "<TRUNCATED>"
258
+ assert_eq ! (
259
+ lines[ 1 ] ,
260
+ format!( "{}<TRUNCATED>" , "b" . repeat( MAX_BYTE_SIZE_LOG_LINE ) )
261
+ ) ;
262
+
263
+ // last line before stream closes should be MAX_BYTE_SIZE_LOG_LINE-1 c's
264
+ assert_eq ! ( lines[ 2 ] , "c" . repeat( MAX_BYTE_SIZE_LOG_LINE - 1 ) ) ;
265
+ }
266
+
267
+ #[ tokio:: test]
268
+ async fn test_ring_buffer_behavior ( ) {
269
+ let input = "line1\n line2\n line3\n line4\n line5\n line6\n line7\n " ;
270
+ let reader = Cursor :: new ( input. as_bytes ( ) ) ;
271
+ let max_lines = 3 ; // Small ring buffer for easy testing
272
+
273
+ let ( lines, result) = LogTailer :: new ( max_lines, reader) . join ( ) . await ;
274
+ assert ! ( result. is_ok( ) ) ;
275
+
276
+ // Should only have the last 3 lines (ring buffer behavior)
277
+ // Lines 1-4 should be overwritten (lost due to ring buffer)
278
+ assert_eq ! ( lines. len( ) , 3 ) ;
279
+ assert_eq ! ( lines[ 0 ] , "line5" ) ; // oldest in current buffer
280
+ assert_eq ! ( lines[ 1 ] , "line6" ) ; // middle
281
+ assert_eq ! ( lines[ 2 ] , "line7" ) ; // newest
282
+ }
283
+
146
284
#[ tokio:: test]
147
285
async fn test_streaming_logtailer ( ) {
148
286
let ( reader, mut writer) = tokio:: io:: simplex ( 1 ) ;
@@ -184,4 +322,56 @@ mod tests {
184
322
tailer. abort ( ) ;
185
323
tailer. join ( ) . await . 1 . unwrap_err ( ) ;
186
324
}
325
+
326
+ #[ tokio:: test]
327
+ async fn test_multibyte_character_on_internal_buffer_boundary ( ) {
328
+ // Test: Multi-byte characters split across internal buffer boundaries
329
+ let mut input_bytes = Vec :: new ( ) ;
330
+ input_bytes. extend ( vec ! [ b'a' ; 8191 ] ) ;
331
+ let euro_bytes = "€" . as_bytes ( ) ; // [0xE2, 0x82, 0xAC]
332
+ // add 3 bytes of the euro sign, but across internal buffer
333
+ // 1st byte will be part of the first buffer call but remaining will spillover
334
+ // to the next buffer call
335
+ input_bytes. extend ( euro_bytes) ;
336
+ input_bytes. push ( b'\n' ) ;
337
+ input_bytes. extend ( vec ! [ b'b' ; 8192 ] ) ;
338
+ let reader = Cursor :: new ( input_bytes) ;
339
+ let ( lines, result) = LogTailer :: new ( 5 , reader) . join ( ) . await ;
340
+
341
+ assert ! ( result. is_ok( ) ) ;
342
+ assert_eq ! ( lines. len( ) , 2 ) ;
343
+ assert_eq ! ( lines[ 0 ] , format!( "{}€" , "a" . repeat( 8191 ) ) ) ;
344
+ assert_eq ! ( lines[ 1 ] , format!( "{}" , "b" . repeat( 8192 ) ) ) ;
345
+ }
346
+
347
+ #[ tokio:: test]
348
+ async fn test_truncation_with_utf8_errors ( ) {
349
+ // Test: UTF-8 errors interacting with line length limits
350
+ let mut input_bytes = Vec :: new ( ) ;
351
+
352
+ // Fill near max capacity, then add invalid bytes
353
+ input_bytes. extend ( vec ! [ b'a' ; MAX_BYTE_SIZE_LOG_LINE - 1 ] ) ;
354
+ input_bytes. push ( 0xFF ) ; // Invalid byte at the boundary of the limit
355
+ input_bytes. extend ( vec ! [ b'b' ; 100 ] ) ; // Exceed limit, so skipped
356
+ input_bytes. push ( b'\n' ) ;
357
+ input_bytes. extend ( vec ! [ b'c' ; 100 ] ) ; // new string after newline
358
+ input_bytes. push ( b'\n' ) ;
359
+ input_bytes. push ( 0xFF ) ; // Invalid byte at the start, expect <INVALID_UTF8>
360
+
361
+ let reader = Cursor :: new ( input_bytes) ;
362
+ let ( lines, result) = LogTailer :: new ( 5 , reader) . join ( ) . await ;
363
+
364
+ assert ! ( result. is_ok( ) ) ;
365
+ assert_eq ! ( lines. len( ) , 3 ) ;
366
+ assert_eq ! (
367
+ lines[ 0 ] ,
368
+ format!(
369
+ "{}{}" ,
370
+ "a" . repeat( MAX_BYTE_SIZE_LOG_LINE - 1 ) ,
371
+ "�<TRUNCATED>"
372
+ )
373
+ ) ;
374
+ assert_eq ! ( lines[ 1 ] , format!( "{}" , "c" . repeat( 100 ) ) ) ;
375
+ assert_eq ! ( lines[ 2 ] , "�" ) ;
376
+ }
187
377
}
0 commit comments