Skip to content

Commit a8ff53f

Browse files
authored
fix(combined): merge $all streams by @ts with carry-forward for lagging indexes (#90)
- Fix CombinedReader ($all tab) to carry forward the last known @ts timestamp when the index lags behind, instead of defaulting to 0 which incorrectly sorts new lines to the beginning - Retry IndexReader::open() on reload when the index didn't exist at combined tab creation time - Trigger full rebuild when a new index appears so all lines get correct timestamps
1 parent 8a108e8 commit a8ff53f

File tree

2 files changed

+222
-15
lines changed

2 files changed

+222
-15
lines changed

src/index/reader.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,19 @@ impl IndexReader {
303303
}
304304
}
305305

306+
#[cfg(any(test, debug_assertions))]
307+
impl IndexReader {
308+
/// Create an IndexReader with specific timestamps for testing.
309+
pub fn with_timestamps(timestamps: &[u64]) -> Self {
310+
Self {
311+
flags: vec![0; timestamps.len()],
312+
checkpoints: Vec::new(),
313+
timestamps: timestamps.to_vec(),
314+
cached_severity_counts: SeverityCounts::default(),
315+
}
316+
}
317+
}
318+
306319
#[cfg(test)]
307320
mod tests {
308321
use super::*;

src/reader/combined_reader.rs

Lines changed: 209 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -56,18 +56,26 @@ impl CombinedReader {
5656
reader
5757
}
5858

59+
/// Get the timestamp for a line, carrying forward the last known timestamp
60+
/// from the same source when the index hasn't caught up yet.
61+
fn get_timestamp(source: &SourceEntry, line: usize, last_ts: &mut u64) -> u64 {
62+
let ts = source
63+
.index_reader
64+
.as_ref()
65+
.and_then(|ir| ir.get_timestamp(line))
66+
.unwrap_or(*last_ts);
67+
*last_ts = ts;
68+
ts
69+
}
70+
5971
/// Rebuild the merged line list from all sources, sorted by timestamp.
6072
fn build_merged(&mut self) {
6173
self.merged.clear();
6274

6375
for (source_id, source) in self.sources.iter().enumerate() {
76+
let mut last_ts = 0u64;
6477
for line in 0..source.total_lines {
65-
let timestamp = source
66-
.index_reader
67-
.as_ref()
68-
.and_then(|ir| ir.get_timestamp(line))
69-
.unwrap_or(0);
70-
78+
let timestamp = Self::get_timestamp(source, line, &mut last_ts);
7179
self.merged.push(MergedLine {
7280
source_id,
7381
file_line: line,
@@ -102,12 +110,19 @@ impl CombinedReader {
102110
for (source_id, source) in self.sources.iter().enumerate() {
103111
let prev = self.prev_totals[source_id];
104112
if source.total_lines > prev {
105-
for line in prev..source.total_lines {
106-
let timestamp = source
113+
// Carry forward the last known timestamp from this source
114+
// so new lines without index data sort near their true position.
115+
let mut last_ts = if prev > 0 {
116+
source
107117
.index_reader
108118
.as_ref()
109-
.and_then(|ir| ir.get_timestamp(line))
110-
.unwrap_or(0);
119+
.and_then(|ir| ir.get_timestamp(prev - 1))
120+
.unwrap_or(0)
121+
} else {
122+
0
123+
};
124+
for line in prev..source.total_lines {
125+
let timestamp = Self::get_timestamp(source, line, &mut last_ts);
111126
new_lines.push(MergedLine {
112127
source_id,
113128
file_line: line,
@@ -216,6 +231,7 @@ impl LogReader for CombinedReader {
216231
// Reload each source reader and refresh index readers.
217232
// Individual source failures (e.g. deleted file) are skipped gracefully.
218233
let mut any_truncated = false;
234+
let mut index_gained = false;
219235
for (i, source) in self.sources.iter_mut().enumerate() {
220236
let mut reader = match source.reader.lock() {
221237
Ok(guard) => guard,
@@ -232,14 +248,23 @@ impl LogReader for CombinedReader {
232248
any_truncated = true;
233249
}
234250

235-
if let (Some(ref mut ir), Some(ref path)) =
236-
(&mut source.index_reader, &source.source_path)
237-
{
238-
ir.refresh(path);
251+
if let Some(ref mut ir) = source.index_reader {
252+
if let Some(ref path) = source.source_path {
253+
ir.refresh(path);
254+
}
255+
} else if let Some(ref path) = source.source_path {
256+
// Index didn't exist when combined tab was created — retry.
257+
if let Some(ir) = IndexReader::open(path) {
258+
source.index_reader = Some(ir);
259+
index_gained = true;
260+
}
239261
}
240262
}
241263

242-
if any_truncated {
264+
// If any source was truncated, or a new index appeared (meaning lines
265+
// that previously had no timestamp can now be positioned correctly),
266+
// do a full rebuild.
267+
if any_truncated || index_gained {
243268
self.build_merged();
244269
} else {
245270
self.append_new_lines();
@@ -378,4 +403,173 @@ mod tests {
378403
assert_eq!(reader.merged[2].timestamp, 30);
379404
assert_eq!(reader.merged[3].timestamp, 50);
380405
}
406+
407+
#[test]
408+
fn test_timestamp_carry_forward_for_unindexed_lines() {
409+
// Source "a" has timestamps for lines 0-1 but not line 2 (index lagging).
410+
// Source "b" has timestamps for all lines.
411+
// Line a:2 should carry forward a's last known timestamp (200), not get 0.
412+
let mut source_a = make_source("a", vec!["a1", "a2", "a3"]);
413+
source_a.index_reader = Some(IndexReader::with_timestamps(&[100, 200])); // only 2 of 3 indexed
414+
415+
let mut source_b = make_source("b", vec!["b1", "b2"]);
416+
source_b.index_reader = Some(IndexReader::with_timestamps(&[150, 250]));
417+
418+
let mut reader = CombinedReader::new(vec![source_a, source_b]);
419+
420+
// Expected order by timestamp: a1(100), b1(150), a2(200), a3(200 carry), b2(250)
421+
assert_eq!(reader.total_lines(), 5);
422+
assert_eq!(reader.get_line(0).unwrap(), Some("a1".to_string())); // ts=100
423+
assert_eq!(reader.get_line(1).unwrap(), Some("b1".to_string())); // ts=150
424+
assert_eq!(reader.get_line(2).unwrap(), Some("a2".to_string())); // ts=200
425+
assert_eq!(reader.get_line(3).unwrap(), Some("a3".to_string())); // ts=200 (carried)
426+
assert_eq!(reader.get_line(4).unwrap(), Some("b2".to_string())); // ts=250
427+
}
428+
429+
#[test]
430+
fn test_no_index_lines_sort_to_beginning() {
431+
// Source without any index — all lines get timestamp 0, sort stably at start.
432+
let source_a = make_source("a", vec!["a1", "a2"]);
433+
let mut source_b = make_source("b", vec!["b1"]);
434+
source_b.index_reader = Some(IndexReader::with_timestamps(&[100]));
435+
436+
let mut reader = CombinedReader::new(vec![source_a, source_b]);
437+
438+
// a lines (ts=0) come first, then b1 (ts=100)
439+
assert_eq!(reader.get_line(0).unwrap(), Some("a1".to_string()));
440+
assert_eq!(reader.get_line(1).unwrap(), Some("a2".to_string()));
441+
assert_eq!(reader.get_line(2).unwrap(), Some("b1".to_string()));
442+
}
443+
444+
#[test]
445+
fn test_interleaved_timestamps_merge_correctly() {
446+
// Two sources with interleaved timestamps should merge in timestamp order.
447+
let mut source_a = make_source("a", vec!["a1", "a2", "a3"]);
448+
source_a.index_reader = Some(IndexReader::with_timestamps(&[10, 30, 50]));
449+
450+
let mut source_b = make_source("b", vec!["b1", "b2", "b3"]);
451+
source_b.index_reader = Some(IndexReader::with_timestamps(&[20, 40, 60]));
452+
453+
let mut reader = CombinedReader::new(vec![source_a, source_b]);
454+
455+
assert_eq!(reader.total_lines(), 6);
456+
assert_eq!(reader.get_line(0).unwrap(), Some("a1".to_string())); // ts=10
457+
assert_eq!(reader.get_line(1).unwrap(), Some("b1".to_string())); // ts=20
458+
assert_eq!(reader.get_line(2).unwrap(), Some("a2".to_string())); // ts=30
459+
assert_eq!(reader.get_line(3).unwrap(), Some("b2".to_string())); // ts=40
460+
assert_eq!(reader.get_line(4).unwrap(), Some("a3".to_string())); // ts=50
461+
assert_eq!(reader.get_line(5).unwrap(), Some("b3".to_string())); // ts=60
462+
}
463+
464+
#[test]
465+
fn test_reload_picks_up_new_index() {
466+
use crate::index::column::ColumnWriter;
467+
use crate::index::meta::{ColumnBit, IndexMeta};
468+
use crate::source::index_dir_for_log;
469+
470+
let dir = tempfile::tempdir().unwrap();
471+
472+
// Create two log files — source_a gets an index, source_b starts without one.
473+
let log_a = dir.path().join("a.log");
474+
let log_b = dir.path().join("b.log");
475+
std::fs::write(&log_a, "a1\na2\n").unwrap();
476+
std::fs::write(&log_b, "b1\nb2\n").unwrap();
477+
478+
// Build index for source_a with timestamps [200, 400]
479+
{
480+
let idx = index_dir_for_log(&log_a);
481+
std::fs::create_dir_all(&idx).unwrap();
482+
483+
let mut offsets = ColumnWriter::<u64>::create(idx.join("offsets")).unwrap();
484+
offsets.push(0u64).unwrap();
485+
offsets.push(3u64).unwrap();
486+
drop(offsets);
487+
488+
let mut flags = ColumnWriter::<u32>::create(idx.join("flags")).unwrap();
489+
flags.push(0u32).unwrap();
490+
flags.push(0u32).unwrap();
491+
drop(flags);
492+
493+
let mut time = ColumnWriter::<u64>::create(idx.join("time")).unwrap();
494+
time.push(200u64).unwrap();
495+
time.push(400u64).unwrap();
496+
drop(time);
497+
498+
let mut meta = IndexMeta::new();
499+
meta.entry_count = 2;
500+
meta.log_file_size = 6;
501+
meta.set_column(ColumnBit::Offsets);
502+
meta.set_column(ColumnBit::Flags);
503+
meta.set_column(ColumnBit::Time);
504+
meta.write_to(idx.join("meta")).unwrap();
505+
}
506+
507+
// Create sources — source_b has no index yet.
508+
let source_a = SourceEntry {
509+
name: "a".into(),
510+
reader: Arc::new(Mutex::new(
511+
crate::reader::file_reader::FileReader::new(&log_a).unwrap(),
512+
)),
513+
index_reader: IndexReader::open(&log_a),
514+
source_path: Some(log_a.clone()),
515+
total_lines: 2,
516+
renderer_names: Vec::new(),
517+
};
518+
let source_b = SourceEntry {
519+
name: "b".into(),
520+
reader: Arc::new(Mutex::new(
521+
crate::reader::file_reader::FileReader::new(&log_b).unwrap(),
522+
)),
523+
index_reader: None, // no index yet
524+
source_path: Some(log_b.clone()),
525+
total_lines: 2,
526+
renderer_names: Vec::new(),
527+
};
528+
529+
let mut reader = CombinedReader::new(vec![source_a, source_b]);
530+
531+
// Before index: b lines have ts=0, sort before a lines (ts=200,400)
532+
assert_eq!(reader.get_line(0).unwrap(), Some("b1".to_string())); // ts=0
533+
assert_eq!(reader.get_line(1).unwrap(), Some("b2".to_string())); // ts=0
534+
assert_eq!(reader.get_line(2).unwrap(), Some("a1".to_string())); // ts=200
535+
assert_eq!(reader.get_line(3).unwrap(), Some("a2".to_string())); // ts=400
536+
537+
// Now create index for source_b with timestamps [100, 300]
538+
{
539+
let idx = index_dir_for_log(&log_b);
540+
std::fs::create_dir_all(&idx).unwrap();
541+
542+
let mut offsets = ColumnWriter::<u64>::create(idx.join("offsets")).unwrap();
543+
offsets.push(0u64).unwrap();
544+
offsets.push(3u64).unwrap();
545+
drop(offsets);
546+
547+
let mut flags = ColumnWriter::<u32>::create(idx.join("flags")).unwrap();
548+
flags.push(0u32).unwrap();
549+
flags.push(0u32).unwrap();
550+
drop(flags);
551+
552+
let mut time = ColumnWriter::<u64>::create(idx.join("time")).unwrap();
553+
time.push(100u64).unwrap();
554+
time.push(300u64).unwrap();
555+
drop(time);
556+
557+
let mut meta = IndexMeta::new();
558+
meta.entry_count = 2;
559+
meta.log_file_size = 6;
560+
meta.set_column(ColumnBit::Offsets);
561+
meta.set_column(ColumnBit::Flags);
562+
meta.set_column(ColumnBit::Time);
563+
meta.write_to(idx.join("meta")).unwrap();
564+
}
565+
566+
// Reload — should discover the new index and rebuild with correct ordering.
567+
reader.reload().unwrap();
568+
569+
// After index: b1(100), a1(200), b2(300), a2(400)
570+
assert_eq!(reader.get_line(0).unwrap(), Some("b1".to_string())); // ts=100
571+
assert_eq!(reader.get_line(1).unwrap(), Some("a1".to_string())); // ts=200
572+
assert_eq!(reader.get_line(2).unwrap(), Some("b2".to_string())); // ts=300
573+
assert_eq!(reader.get_line(3).unwrap(), Some("a2".to_string())); // ts=400
574+
}
381575
}

0 commit comments

Comments
 (0)