Skip to content

Commit bd4bf84

Browse files
committed
ADD: Add support for hinting merge ordering
1 parent 2fc3667 commit bd4bf84

File tree

2 files changed

+135
-15
lines changed

2 files changed

+135
-15
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
# Changelog
22

3+
## 0.43.1 - TBD
4+
5+
### Enhancements
6+
- Added `MergeRecordDecoder::with_hints` that allows hinting the minimum timestamp from
7+
each decoder
8+
39
## 0.43.0 - 2025-10-22
410

511
### Enhancements

rust/dbn/src/decode/merge.rs

Lines changed: 129 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ where
2424
/// inner decoders returns an error while decoding the first record. A decoder
2525
/// returning `Ok(None)` does not result in a failure.
2626
pub fn new(decoders: Vec<D>) -> crate::Result<Self> {
27+
let hints = decoders.iter().map(|d| d.metadata().start).collect();
2728
let Some((first, rest)) = decoders.split_first() else {
2829
return Err(Error::BadArgument {
2930
param_name: "decoders".to_owned(),
@@ -36,7 +37,7 @@ where
3637
.merge(rest.iter().map(|d| d.metadata().clone()))?;
3738
Ok(Self {
3839
metadata,
39-
decoder: RecordDecoder::new(decoders)?,
40+
decoder: RecordDecoder::with_hints(decoders, hints)?,
4041
})
4142
}
4243
}
@@ -102,10 +103,37 @@ pub struct RecordDecoder<D> {
102103

103104
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
104105
struct StreamHead {
105-
raw_index_ts: u64,
106+
index_ts: IndexTs,
106107
decoder_idx: usize,
107108
}
108109

110+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
111+
enum IndexTs {
112+
Real(u64),
113+
Hint(u64),
114+
}
115+
116+
impl IndexTs {
117+
fn ts(&self) -> u64 {
118+
match self {
119+
IndexTs::Real(t) => *t,
120+
IndexTs::Hint(t) => *t,
121+
}
122+
}
123+
}
124+
125+
impl PartialOrd for IndexTs {
126+
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
127+
Some(self.cmp(other))
128+
}
129+
}
130+
131+
impl Ord for IndexTs {
132+
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
133+
self.ts().cmp(&other.ts())
134+
}
135+
}
136+
109137
impl<D> RecordDecoder<D>
110138
where
111139
D: DecodeRecordRef,
@@ -123,12 +151,12 @@ where
123151
desc: "none provided".to_owned(),
124152
});
125153
};
126-
let mut min_heap = BinaryHeap::new();
154+
let mut min_heap = BinaryHeap::with_capacity(decoders.len());
127155
// Populate heap for first time or all streams fully processed
128156
for (decoder_idx, decoder) in decoders.iter_mut().enumerate() {
129157
if let Some(rec) = decoder.decode_record_ref()? {
130158
min_heap.push(Reverse(StreamHead {
131-
raw_index_ts: rec.raw_index_ts(),
159+
index_ts: IndexTs::Real(rec.raw_index_ts()),
132160
decoder_idx,
133161
}));
134162
};
@@ -139,16 +167,89 @@ where
139167
is_first: true,
140168
})
141169
}
170+
171+
/// Creates a new record-stream merging decoder with a hint for the start time
172+
/// for each decoder. This can assist the merger to avoid reading from a decoder
173+
/// before necessary.
174+
///
175+
/// The hint timestamp must be <= raw_index_ts() of the first record in the file.
176+
/// [`Metadata::start`] is an example source of for hint.
177+
///
178+
/// # Errors
179+
/// This function returns an error if `decoders` is empty or `decoders` and
180+
/// `start_ts_hints` are of different lengths. It will also return an error if
181+
/// one of the inner decoders returns an error while decoding the first record. A
182+
/// decoder returning `Ok(None)` does not result in a failure.
183+
pub fn with_hints(decoders: Vec<D>, start_ts_hints: Vec<u64>) -> crate::Result<Self> {
184+
if decoders.is_empty() {
185+
return Err(Error::BadArgument {
186+
param_name: "decoders".to_owned(),
187+
desc: "none provided".to_owned(),
188+
});
189+
};
190+
if decoders.len() != start_ts_hints.len() {
191+
return Err(Error::BadArgument {
192+
param_name: "hints".to_owned(),
193+
desc: "must have the same length as `decoders`".to_owned(),
194+
});
195+
}
196+
let min_heap = start_ts_hints
197+
.into_iter()
198+
.enumerate()
199+
.map(|(decoder_idx, hint)| {
200+
Reverse(StreamHead {
201+
index_ts: IndexTs::Hint(hint),
202+
decoder_idx,
203+
})
204+
})
205+
.collect();
206+
Ok(Self {
207+
decoders,
208+
min_heap,
209+
is_first: true,
210+
})
211+
}
142212
}
143213

144214
impl<D> RecordDecoder<D> {
215+
// does not handle hints. Should only be called after `decode_record_ref`
145216
fn peek_decoder_idx(&self) -> Option<usize> {
146217
self.min_heap
147218
.peek()
148219
.map(|Reverse(StreamHead { decoder_idx, .. })| *decoder_idx)
149220
}
150221
}
151222

223+
impl<D> RecordDecoder<D>
224+
where
225+
D: DecodeRecordRef,
226+
{
227+
// handles hints
228+
fn next_decoder_idx(&mut self) -> crate::Result<Option<usize>> {
229+
loop {
230+
let Some(Reverse(StreamHead {
231+
index_ts,
232+
decoder_idx,
233+
})) = self.min_heap.peek().cloned()
234+
else {
235+
return Ok(None);
236+
};
237+
match index_ts {
238+
IndexTs::Real(_) => return Ok(Some(decoder_idx)),
239+
IndexTs::Hint(_) => {
240+
self.min_heap.pop();
241+
if let Some(rec) = self.decoders[decoder_idx].decode_record_ref()? {
242+
self.min_heap.push(Reverse(StreamHead {
243+
index_ts: IndexTs::Real(rec.raw_index_ts()),
244+
decoder_idx,
245+
}));
246+
}
247+
}
248+
}
249+
}
250+
}
251+
}
252+
152253
impl<D> DecodeRecordRef for RecordDecoder<D>
153254
where
154255
D: private::LastRecord + DecodeRecordRef,
@@ -159,20 +260,20 @@ where
159260
} else {
160261
// Pop last record
161262
let Some(Reverse(StreamHead {
162-
raw_index_ts: _,
263+
index_ts: _,
163264
decoder_idx,
164265
})) = self.min_heap.pop()
165266
else {
166267
return Ok(None);
167268
};
168269
if let Some(rec) = self.decoders[decoder_idx].decode_record_ref()? {
169270
self.min_heap.push(Reverse(StreamHead {
170-
raw_index_ts: rec.raw_index_ts(),
271+
index_ts: IndexTs::Real(rec.raw_index_ts()),
171272
decoder_idx,
172273
}));
173274
}
174275
}
175-
let Some(decoder_idx) = self.peek_decoder_idx() else {
276+
let Some(decoder_idx) = self.next_decoder_idx()? else {
176277
return Ok(None);
177278
};
178279
Ok(self.decoders[decoder_idx].last_record())
@@ -221,6 +322,7 @@ where
221322
#[cfg(test)]
222323
mod tests {
223324
use fallible_streaming_iterator::FallibleStreamingIterator;
325+
use rstest::*;
224326

225327
use crate::{rtype, test_utils::VecStream, Mbp1Msg, Record, RecordHeader};
226328

@@ -234,9 +336,9 @@ mod tests {
234336
}
235337
}
236338

237-
#[test]
238-
fn stream_merging() {
239-
let target = RecordDecoder::new(vec![
339+
#[rstest]
340+
fn stream_merging(#[values(None, Some(vec![5, 1, 50]))] hints: Option<Vec<u64>>) {
341+
let decoders = vec![
240342
VecStream::new(vec![new_mbp1(10), new_mbp1(100), new_mbp1(1000)]),
241343
VecStream::new(vec![
242344
new_mbp1(11),
@@ -256,7 +358,12 @@ mod tests {
256358
new_mbp1(500),
257359
new_mbp1(5000),
258360
]),
259-
])
361+
];
362+
let target = if let Some(hints) = hints {
363+
RecordDecoder::with_hints(decoders, hints)
364+
} else {
365+
RecordDecoder::new(decoders)
366+
}
260367
.unwrap()
261368
.decode_stream::<Mbp1Msg>();
262369
let mut timestamps = Vec::new();
@@ -273,9 +380,11 @@ mod tests {
273380
assert!(iter.next().unwrap().is_none());
274381
}
275382

276-
#[test]
277-
fn stream_merging_w_empty() {
278-
let target = RecordDecoder::new(vec![
383+
#[rstest]
384+
fn stream_merging_w_empty(
385+
#[values(None, Some(vec![0, 10, 11, 1, 1, 50]))] hints: Option<Vec<u64>>,
386+
) {
387+
let decoders = vec![
279388
VecStream::new(Vec::new()),
280389
VecStream::new(vec![new_mbp1(10), new_mbp1(100)]),
281390
VecStream::new(Vec::new()),
@@ -288,7 +397,12 @@ mod tests {
288397
]),
289398
VecStream::new(vec![new_mbp1(1), new_mbp1(50)]),
290399
VecStream::new(Vec::new()),
291-
])
400+
];
401+
let target = if let Some(hints) = hints {
402+
RecordDecoder::with_hints(decoders, hints)
403+
} else {
404+
RecordDecoder::new(decoders)
405+
}
292406
.unwrap()
293407
.decode_stream::<Mbp1Msg>();
294408
let mut timestamps = Vec::new();

0 commit comments

Comments
 (0)