Skip to content

Commit 46da6b2

Browse files
authored
Refactoring: rely on Range/RangeInclusive to harden the API. (#68)
1 parent 16ff7c8 commit 46da6b2

File tree

12 files changed

+169
-132
lines changed

12 files changed

+169
-132
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
workspace = { members = ["mrecordlog_cli"] }
22
[package]
33
name = "mrecordlog"
4-
version = "0.4.0"
4+
version = "0.5.0"
55
edition = "2021"
66
license = "MIT"
77
description = "Quickwit's shared record log."

src/block_read_write.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ impl<'a> From<&'a [u8]> for ArrayReader<'a> {
4848
}
4949
}
5050

51-
impl<'a> BlockRead for ArrayReader<'a> {
51+
impl BlockRead for ArrayReader<'_> {
5252
fn next_block(&mut self) -> io::Result<bool> {
5353
if self.data.len() < BLOCK_NUM_BYTES {
5454
return Ok(false);

src/mem/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
mod queue;
22
mod queues;
3+
mod rolling_buffer;
34
mod summary;
45

56
pub(crate) use self::queue::MemQueue;

src/mem/queue.rs

Lines changed: 6 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -1,94 +1,11 @@
1-
use std::borrow::Cow;
2-
use std::collections::VecDeque;
3-
use std::ops::{Bound, RangeBounds};
1+
use std::ops::{Bound, RangeBounds, RangeToInclusive};
42

3+
use super::rolling_buffer::RollingBuffer;
54
use crate::error::AppendError;
65
use crate::mem::QueueSummary;
76
use crate::rolling::FileNumber;
87
use crate::Record;
98

10-
#[derive(Default)]
11-
struct RollingBuffer {
12-
buffer: VecDeque<u8>,
13-
}
14-
15-
impl RollingBuffer {
16-
fn new() -> Self {
17-
RollingBuffer {
18-
buffer: VecDeque::new(),
19-
}
20-
}
21-
22-
fn len(&self) -> usize {
23-
self.buffer.len()
24-
}
25-
26-
fn capacity(&self) -> usize {
27-
self.buffer.capacity()
28-
}
29-
30-
fn clear(&mut self) {
31-
self.buffer.clear();
32-
self.buffer.shrink_to_fit();
33-
}
34-
35-
fn drain_start(&mut self, pos: usize) {
36-
let target_capacity = self.len() * 9 / 8;
37-
self.buffer.drain(..pos);
38-
// In order to avoid leaking memory we shrink the buffer.
39-
// The last maximum length (= the length before drain)
40-
// is a good estimate of what we will need in the future.
41-
//
42-
// We add 1/8 to that in order to make sure that we don't end up
43-
// shrinking / allocating for small variations.
44-
45-
if self.buffer.capacity() > target_capacity {
46-
self.buffer.shrink_to(target_capacity);
47-
}
48-
}
49-
50-
fn extend(&mut self, slice: &[u8]) {
51-
self.buffer.extend(slice.iter().copied());
52-
}
53-
54-
fn get_range(&self, bounds: impl RangeBounds<usize>) -> Cow<[u8]> {
55-
let start = match bounds.start_bound() {
56-
Bound::Included(pos) => *pos,
57-
Bound::Excluded(pos) => pos + 1,
58-
Bound::Unbounded => 0,
59-
};
60-
61-
let end = match bounds.end_bound() {
62-
Bound::Included(pos) => pos + 1,
63-
Bound::Excluded(pos) => *pos,
64-
Bound::Unbounded => self.len(),
65-
};
66-
67-
let (left_part_of_queue, right_part_of_queue) = self.buffer.as_slices();
68-
69-
if end < left_part_of_queue.len() {
70-
Cow::Borrowed(&left_part_of_queue[start..end])
71-
} else if start >= left_part_of_queue.len() {
72-
let start = start - left_part_of_queue.len();
73-
let end = end - left_part_of_queue.len();
74-
75-
Cow::Borrowed(&right_part_of_queue[start..end])
76-
} else {
77-
// VecDeque is a rolling buffer. As a result, we do not have
78-
// access to a continuous buffer.
79-
//
80-
// Here the requested slice cross the boundary and we need to allocate and copy the data
81-
// in a new buffer.
82-
let mut res = Vec::with_capacity(end - start);
83-
res.extend_from_slice(&left_part_of_queue[start..]);
84-
let end = end - left_part_of_queue.len();
85-
res.extend_from_slice(&right_part_of_queue[..end]);
86-
87-
Cow::Owned(res)
88-
}
89-
}
90-
}
91-
929
#[derive(Clone)]
9310
struct RecordMeta {
9411
start_offset: usize,
@@ -247,7 +164,8 @@ impl MemQueue {
247164
///
248165
/// If truncating to a future position, make the queue go forward to that position.
249166
/// Return the number of record removed.
250-
pub fn truncate(&mut self, truncate_up_to_pos: u64) -> usize {
167+
pub fn truncate_head(&mut self, truncate_range: RangeToInclusive<u64>) -> usize {
168+
let truncate_up_to_pos = truncate_range.end;
251169
if self.start_position > truncate_up_to_pos {
252170
return 0;
253171
}
@@ -267,7 +185,8 @@ impl MemQueue {
267185
for record_meta in &mut self.record_metas {
268186
record_meta.start_offset -= start_offset_to_keep;
269187
}
270-
self.concatenated_records.drain_start(start_offset_to_keep);
188+
self.concatenated_records
189+
.truncate_head(..start_offset_to_keep);
271190
self.start_position = truncate_up_to_pos + 1;
272191
first_record_to_keep
273192
}

src/mem/queues.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use std::collections::HashMap;
2-
use std::ops::RangeBounds;
2+
use std::ops::{RangeBounds, RangeToInclusive};
33

44
use tracing::{info, warn};
55

@@ -154,9 +154,9 @@ impl MemQueues {
154154
///
155155
/// If there are no records `<= position`, the method will
156156
/// not do anything.
157-
pub fn truncate(&mut self, queue: &str, position: u64) -> Option<usize> {
157+
pub fn truncate(&mut self, queue: &str, position: RangeToInclusive<u64>) -> Option<usize> {
158158
if let Ok(queue) = self.get_queue_mut(queue) {
159-
Some(queue.truncate(position))
159+
Some(queue.truncate_head(position))
160160
} else {
161161
None
162162
}

src/mem/rolling_buffer.rs

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
use std::borrow::Cow;
2+
use std::collections::VecDeque;
3+
use std::ops::{Bound, RangeBounds, RangeTo};
4+
5+
#[derive(Default)]
6+
pub struct RollingBuffer {
7+
buffer: VecDeque<u8>,
8+
}
9+
10+
impl RollingBuffer {
11+
pub fn new() -> Self {
12+
RollingBuffer {
13+
buffer: VecDeque::new(),
14+
}
15+
}
16+
17+
pub fn len(&self) -> usize {
18+
self.buffer.len()
19+
}
20+
21+
pub fn capacity(&self) -> usize {
22+
self.buffer.capacity()
23+
}
24+
25+
pub fn clear(&mut self) {
26+
self.buffer.clear();
27+
self.buffer.shrink_to_fit();
28+
}
29+
30+
// Removes all of the data up to pos byte excluded (meaning pos is kept).
31+
//
32+
// If we notice that the rolling buffer was very large, this function may shrink
33+
// it.
34+
pub fn truncate_head(&mut self, first_pos_to_keep: RangeTo<usize>) {
35+
let target_capacity = self.len() * 9 / 8;
36+
self.buffer.drain(first_pos_to_keep);
37+
// In order to avoid leaking memory we shrink the buffer.
38+
// The last maximum length (= the length before drain)
39+
// is a good estimate of what we will need in the future.
40+
//
41+
// We add 1/8 to that in order to make sure that we don't end up
42+
// shrinking / allocating for small variations.
43+
44+
if self.buffer.capacity() > target_capacity {
45+
self.buffer.shrink_to(target_capacity);
46+
}
47+
}
48+
49+
pub fn extend(&mut self, slice: &[u8]) {
50+
self.buffer.extend(slice.iter().copied());
51+
}
52+
53+
pub fn get_range(&self, bounds: impl RangeBounds<usize>) -> Cow<[u8]> {
54+
let start = match bounds.start_bound() {
55+
Bound::Included(pos) => *pos,
56+
Bound::Excluded(pos) => pos + 1,
57+
Bound::Unbounded => 0,
58+
};
59+
60+
let end = match bounds.end_bound() {
61+
Bound::Included(pos) => pos + 1,
62+
Bound::Excluded(pos) => *pos,
63+
Bound::Unbounded => self.len(),
64+
};
65+
66+
let (left_part_of_queue, right_part_of_queue) = self.buffer.as_slices();
67+
68+
if end < left_part_of_queue.len() {
69+
Cow::Borrowed(&left_part_of_queue[start..end])
70+
} else if start >= left_part_of_queue.len() {
71+
let start = start - left_part_of_queue.len();
72+
let end = end - left_part_of_queue.len();
73+
74+
Cow::Borrowed(&right_part_of_queue[start..end])
75+
} else {
76+
// VecDeque is a rolling buffer. As a result, we do not have
77+
// access to a continuous buffer.
78+
//
79+
// Here the requested slice cross the boundary and we need to allocate and copy the data
80+
// in a new buffer.
81+
let mut res = Vec::with_capacity(end - start);
82+
res.extend_from_slice(&left_part_of_queue[start..]);
83+
let end = end - left_part_of_queue.len();
84+
res.extend_from_slice(&right_part_of_queue[..end]);
85+
86+
Cow::Owned(res)
87+
}
88+
}
89+
}

src/mem/tests.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ fn test_mem_queues_truncate() {
8585
.append_record("droopy", &1.into(), 5, b"payer")
8686
.unwrap();
8787
}
88-
mem_queues.truncate("droopy", 3);
88+
mem_queues.truncate("droopy", ..=3);
8989
let droopy: Vec<Record> = mem_queues.range("droopy", 0..).unwrap().collect();
9090
assert_eq!(
9191
&droopy[..],
@@ -210,7 +210,7 @@ fn test_mem_queues_keep_filenum() {
210210
assert!(!files[0].can_be_deleted());
211211
assert!(!files[1].can_be_deleted());
212212

213-
mem_queues.truncate("droopy", 1);
213+
mem_queues.truncate("droopy", ..=1);
214214

215215
assert!(!files[0].can_be_deleted());
216216
assert!(!files[1].can_be_deleted());
@@ -223,13 +223,13 @@ fn test_mem_queues_keep_filenum() {
223223
assert!(!files[1].can_be_deleted());
224224
assert!(!files[2].can_be_deleted());
225225

226-
mem_queues.truncate("droopy", 3);
226+
mem_queues.truncate("droopy", ..=3);
227227

228228
assert!(files[0].can_be_deleted());
229229
assert!(files[1].can_be_deleted());
230230
assert!(!files[2].can_be_deleted());
231231

232-
mem_queues.truncate("droopy", 4);
232+
mem_queues.truncate("droopy", ..=4);
233233

234234
let empty_queues = mem_queues.empty_queues().collect::<Vec<_>>();
235235
assert_eq!(empty_queues.len(), 1);

src/multi_record_log.rs

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use std::io;
2-
use std::ops::RangeBounds;
2+
use std::ops::{RangeBounds, RangeToInclusive};
33
use std::path::Path;
44

55
use bytes::Buf;
@@ -72,8 +72,11 @@ impl MultiRecordLog {
7272
.map_err(|_| ReadRecordError::Corruption)?;
7373
}
7474
}
75-
MultiPlexedRecord::Truncate { position, queue } => {
76-
in_mem_queues.truncate(queue, position);
75+
MultiPlexedRecord::Truncate {
76+
truncate_range,
77+
queue,
78+
} => {
79+
in_mem_queues.truncate(queue, truncate_range);
7780
}
7881
MultiPlexedRecord::RecordPosition { queue, position } => {
7982
in_mem_queues.ack_position(queue, position);
@@ -227,20 +230,30 @@ impl MultiRecordLog {
227230
Ok(())
228231
}
229232

230-
/// Truncates the queue up to `position`, included. This method immediately truncates the
231-
/// underlying in-memory queue whereas the backing log files are deleted asynchronously when
232-
/// they become exclusively composed of deleted records.
233+
/// Truncates the queue up to a given `position`, included. This method immediately
234+
/// truncates the underlying in-memory queue whereas the backing log files are deleted
235+
/// asynchronously when they become exclusively composed of deleted records.
233236
///
234237
/// This method will always truncate the record log and release the associated memory.
235238
/// It returns the number of records deleted.
236-
pub fn truncate(&mut self, queue: &str, position: u64) -> Result<usize, TruncateError> {
237-
info!(position = position, queue = queue, "truncate queue");
239+
pub fn truncate(
240+
&mut self,
241+
queue: &str,
242+
truncate_range: RangeToInclusive<u64>,
243+
) -> Result<usize, TruncateError> {
244+
info!(range=?truncate_range, queue = queue, "truncate queue");
238245
if !self.queue_exists(queue) {
239246
return Err(TruncateError::MissingQueue(queue.to_string()));
240247
}
241248
self.record_log_writer
242-
.write_record(MultiPlexedRecord::Truncate { position, queue })?;
243-
let removed_count = self.in_mem_queues.truncate(queue, position).unwrap_or(0);
249+
.write_record(MultiPlexedRecord::Truncate {
250+
truncate_range,
251+
queue,
252+
})?;
253+
let removed_count = self
254+
.in_mem_queues
255+
.truncate(queue, truncate_range)
256+
.unwrap_or(0);
244257
self.run_gc_if_necessary()?;
245258
self.persist_on_policy()?;
246259
Ok(removed_count)

src/persist_policy.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,16 @@ impl PersistAction {
1818
/// We have two type of operations on the mrecordlog.
1919
///
2020
/// Critical records are relatively rare and really need to be persisted:
21-
/// - RecordPosition { queue: &'a str, position: u64 },
22-
/// - DeleteQueue.
21+
/// - RecordPosition { queue: &'a str, position: u64 },
22+
/// - DeleteQueue.
2323
///
2424
/// For these operations, we want to always flush and fsync.
2525
///
2626
/// On the other hand,
27-
/// - Truncate
28-
/// - AppendRecords
29-
/// are considered are more frequent and one might want to sacrifice
27+
/// - Truncate
28+
/// - AppendRecords
29+
///
30+
/// are both considered are more frequent and one might want to sacrifice
3031
/// persistence guarantees for performance.
3132
///
3233
/// The `PersistPolicy` defines the trade-off applied for the second kind of

0 commit comments

Comments
 (0)