-
Notifications
You must be signed in to change notification settings - Fork 14
Fix incorrect flush after truncate, other improvements #99
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Zero the full slice of removed data, not just the first 16 bytes
0411076 to
d32ae09
Compare
d32ae09 to
827edcc
Compare
|
|
||
| thread::spawn(move || { | ||
| trace!("{log_msg}"); | ||
| error!("{log_msg}"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe warn? error should mean something unexpected happened, but as I understood, this path is possible
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Of wait, is that expected that with new changes this path is an actual error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With the new changes it should be impossible to hit this branch. That's why I promoted it to an error.
Still want to demote it to a warning?
| /// | ||
| /// The entries are not guaranteed to be removed until the segment is | ||
| /// flushed. | ||
| pub fn truncate(&mut self, from: usize) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please write a basic unit test for this function.
It seems to be completely uncovered.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a function that covers it:
Lines 1104 to 1211 in 95c4310
| #[test] | |
| fn test_truncate_flush() { | |
| init_logger(); | |
| let dir = Builder::new().prefix("wal").tempdir().unwrap(); | |
| // 2 entries should fit in each segment | |
| let mut wal = Wal::with_options( | |
| dir.path(), | |
| &WalOptions { | |
| segment_capacity: 4096, | |
| segment_queue_len: 3, | |
| retain_closed: NonZeroUsize::new(1).unwrap(), | |
| }, | |
| ) | |
| .unwrap(); | |
| let entry: [u8; 2000] = [42u8; 2000]; | |
| // wal is empty | |
| assert!(wal.entry(0).is_none()); | |
| // add 10 entries | |
| for i in 0..10 { | |
| assert_eq!(i, wal.append(&&entry[..]).unwrap()); | |
| } | |
| // 4 closed segments | |
| assert_eq!(wal.num_entries(), 10); | |
| assert_eq!(wal.first_index(), 0); | |
| assert_eq!(wal.last_index(), 9); | |
| assert_eq!(wal.closed_segments.len(), 4); // 4 x 2 entries | |
| assert_eq!(wal.closed_segments[0].segment.len(), 2); | |
| assert_eq!(wal.closed_segments[1].segment.len(), 2); | |
| assert_eq!(wal.closed_segments[2].segment.len(), 2); | |
| assert_eq!(wal.closed_segments[3].segment.len(), 2); | |
| assert_eq!(wal.open_segment.segment.len(), 2); // 1 x 2 entries | |
| // first flush to set `flush_offset | |
| wal.flush_open_segment().unwrap(); | |
| // content unchanged after flushing | |
| assert_eq!(wal.num_entries(), 10); | |
| assert_eq!(wal.first_index(), 0); | |
| assert_eq!(wal.last_index(), 9); | |
| assert_eq!(wal.closed_segments.len(), 4); // 4 x 2 entries | |
| assert_eq!(wal.closed_segments[0].segment.len(), 2); | |
| assert_eq!(wal.closed_segments[1].segment.len(), 2); | |
| assert_eq!(wal.closed_segments[2].segment.len(), 2); | |
| assert_eq!(wal.closed_segments[3].segment.len(), 2); | |
| assert_eq!(wal.open_segment.segment.len(), 2); // 1 x 2 entries | |
| wal.truncate(9).unwrap(); | |
| assert_eq!(wal.open_segment.segment.len(), 1); // 1 x 2 entries | |
| // truncate half of it | |
| wal.truncate(5).unwrap(); | |
| // assert truncation | |
| for i in 5..10 { | |
| assert!(wal.entry(i).is_none()); | |
| } | |
| // flush again with `flush_offset` > segment size | |
| wal.flush_open_segment().unwrap(); | |
| assert_eq!(wal.num_entries(), 5); // 5 entries removed | |
| assert_eq!(wal.first_index(), 0); | |
| assert_eq!(wal.last_index(), 4); | |
| assert_eq!(wal.closed_segments.len(), 3); // (0, 1) + (2, 3) + (4, empty slot) | |
| assert_eq!(wal.closed_segments[0].segment.len(), 2); | |
| assert_eq!(wal.closed_segments[1].segment.len(), 2); | |
| assert_eq!(wal.closed_segments[2].segment.len(), 1); | |
| assert_eq!(wal.open_segment.segment.len(), 0); // empty open segment | |
| // add 5 more entries | |
| for i in 0..5 { | |
| assert_eq!(i + 5, wal.append(&&entry[..]).unwrap()); | |
| } | |
| // 5 closed segments | |
| assert_eq!(wal.num_entries(), 10); | |
| assert_eq!(wal.first_index(), 0); | |
| assert_eq!(wal.last_index(), 9); | |
| assert_eq!(wal.closed_segments.len(), 5); | |
| assert_eq!(wal.closed_segments[0].segment.len(), 2); // 1,2 | |
| assert_eq!(wal.closed_segments[1].segment.len(), 2); // 3 | |
| assert_eq!(wal.closed_segments[2].segment.len(), 1); // 4 empty slot due to truncation | |
| assert_eq!(wal.closed_segments[3].segment.len(), 2); // 5, 6 | |
| assert_eq!(wal.closed_segments[4].segment.len(), 2); // 7, 8 | |
| assert_eq!(wal.open_segment.segment.len(), 1); // 9 | |
| eprintln!("wal: {wal:?}"); | |
| eprintln!("wal open: {:?}", wal.open_segment); | |
| eprintln!("wal closed: {:?}", wal.closed_segments); | |
| // test persistence | |
| drop(wal); | |
| let wal = Wal::open(dir.path()).unwrap(); | |
| assert_eq!(wal.num_entries(), 10); | |
| assert_eq!(wal.first_index(), 0); | |
| assert_eq!(wal.last_index(), 9); | |
| assert_eq!(wal.closed_segments.len(), 5); | |
| assert_eq!(wal.closed_segments[0].segment.len(), 2); | |
| assert_eq!(wal.closed_segments[1].segment.len(), 2); | |
| assert_eq!(wal.closed_segments[2].segment.len(), 1); // previously half truncated | |
| assert_eq!(wal.closed_segments[3].segment.len(), 2); | |
| assert_eq!(wal.closed_segments[4].segment.len(), 2); | |
| assert_eq!(wal.open_segment.segment.len(), 1); | |
| } |
But it doesn't hurt to add a bit more testing.
I cannot fully assert the actual flush behavior to disk though, since the kernel takes care of this in a non-deterministic way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added test in 68d63ca
We must only move it back, and not forward. Data starting at the current flush offset may still not be flushed.
Fix incorrect flush after truncate, because
flush_offsetwas not bumped when truncating. This also improves or simplifies other things.I'd recommend to review this PR per commit.
These changes have been tested as described in qdrant/qdrant#7577.
Before this PR I repeatedly see the following log line:
which indicates flushing errors. This PR resolves the issue and I haven't seen that log line since.