Skip to content

Commit f9727af

Browse files
authored
In-memory Engine: implement read flow part 1 (tikv#16163)
ref tikv#16141 implement in-memory engine read flow -- part 1 Signed-off-by: SpadeA-Tang <[email protected]>
1 parent ae8902d commit f9727af

File tree

33 files changed

+597
-99
lines changed

33 files changed

+597
-99
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

components/backup-stream/src/utils.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -996,7 +996,7 @@ mod test {
996996

997997
let (items, size) = super::with_record_read_throughput(|| {
998998
let mut items = vec![];
999-
let snap = engine.snapshot();
999+
let snap = engine.snapshot(None);
10001000
snap.scan(CF_DEFAULT, b"", b"", false, |k, v| {
10011001
items.push((k.to_owned(), v.to_owned()));
10021002
Ok(true)

components/cdc/src/observer.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,8 @@ impl<E: KvEngine> CmdObserver<E> for CdcObserver {
121121
// Create a snapshot here for preventing the old value was GC-ed.
122122
// TODO: only need it after enabling old value, may add a flag to indicate
123123
// whether to get it.
124-
let snapshot = RegionSnapshot::from_snapshot(Arc::new(engine.snapshot()), Arc::new(region));
124+
let snapshot =
125+
RegionSnapshot::from_snapshot(Arc::new(engine.snapshot(None)), Arc::new(region));
125126
let get_old_value = move |key,
126127
query_ts,
127128
old_value_cache: &mut OldValueCache,

components/cdc/src/old_value.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,7 @@ mod tests {
308308
value: Option<Value>,
309309
) -> Statistics {
310310
let key = key.clone().append_ts(ts.into());
311-
let snapshot = Arc::new(kv_engine.snapshot());
311+
let snapshot = Arc::new(kv_engine.snapshot(None));
312312
let mut cursor = new_write_cursor_on_key(&snapshot, &key);
313313
let load_default = Either::Left(&snapshot);
314314
let mut stats = Statistics::default();
@@ -527,7 +527,7 @@ mod tests {
527527
must_commit(&mut engine, &key, 200, 201);
528528
}
529529

530-
let snapshot = Arc::new(kv_engine.snapshot());
530+
let snapshot = Arc::new(kv_engine.snapshot(None));
531531
let mut cursor = new_old_value_cursor(&snapshot, CF_WRITE);
532532
let mut default_cursor = new_old_value_cursor(&snapshot, CF_DEFAULT);
533533
let mut load_default = |use_default_cursor: bool| {
@@ -598,7 +598,7 @@ mod tests {
598598
}
599599

600600
let key = format!("zkey-{:0>3}", 0).into_bytes();
601-
let snapshot = Arc::new(kv_engine.snapshot());
601+
let snapshot = Arc::new(kv_engine.snapshot(None));
602602
let perf_instant = ReadPerfInstant::new();
603603
let value = get_old_value(
604604
&snapshot,

components/engine_panic/src/engine.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
22

33
use engine_traits::{
4-
IterOptions, Iterable, Iterator, KvEngine, Peekable, ReadOptions, Result, SyncMutable,
4+
IterOptions, Iterable, Iterator, KvEngine, Peekable, ReadOptions, Result, SnapCtx, SyncMutable,
55
WriteOptions,
66
};
77

@@ -13,7 +13,7 @@ pub struct PanicEngine;
1313
impl KvEngine for PanicEngine {
1414
type Snapshot = PanicSnapshot;
1515

16-
fn snapshot(&self) -> Self::Snapshot {
16+
fn snapshot(&self, _: Option<SnapCtx>) -> Self::Snapshot {
1717
panic!()
1818
}
1919
fn sync(&self) -> Result<()> {

components/engine_rocks/src/engine.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22

33
use std::{any::Any, sync::Arc};
44

5-
use engine_traits::{IterOptions, Iterable, KvEngine, Peekable, ReadOptions, Result, SyncMutable};
5+
use engine_traits::{
6+
IterOptions, Iterable, KvEngine, Peekable, ReadOptions, Result, SnapCtx, SyncMutable,
7+
};
68
use rocksdb::{DBIterator, Writable, DB};
79

810
use crate::{
@@ -182,7 +184,7 @@ impl RocksEngine {
182184
impl KvEngine for RocksEngine {
183185
type Snapshot = RocksSnapshot;
184186

185-
fn snapshot(&self) -> RocksSnapshot {
187+
fn snapshot(&self, _: Option<SnapCtx>) -> RocksSnapshot {
186188
RocksSnapshot::new(self.db.clone())
187189
}
188190

@@ -292,7 +294,7 @@ mod tests {
292294
engine.put_msg(key, &r).unwrap();
293295
engine.put_msg_cf(cf, key, &r).unwrap();
294296

295-
let snap = engine.snapshot();
297+
let snap = engine.snapshot(None);
296298

297299
let mut r1: Region = engine.get_msg(key).unwrap().unwrap();
298300
assert_eq!(r, r1);

components/engine_traits/src/engine.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,10 @@ pub trait KvEngine:
3939
type Snapshot: Snapshot;
4040

4141
/// Create a snapshot
42-
fn snapshot(&self) -> Self::Snapshot;
42+
///
43+
/// SnapCtx will only be used by some type of trait implementors (ex:
44+
/// HybridEngine)
45+
fn snapshot(&self, snap_ctx: Option<SnapCtx>) -> Self::Snapshot;
4346

4447
/// Syncs any writes to disk
4548
fn sync(&self) -> Result<()>;
@@ -78,3 +81,8 @@ pub trait KvEngine:
7881
#[cfg(feature = "testexport")]
7982
fn inner_refcount(&self) -> usize;
8083
}
84+
85+
pub struct SnapCtx {
86+
pub region_id: u64,
87+
pub read_ts: u64,
88+
}

components/engine_traits/src/errors.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,8 @@ pub enum Error {
149149
EntriesUnavailable,
150150
#[error("The entries of region is compacted")]
151151
EntriesCompacted,
152+
#[error("Iterator of RegionCacheSnapshot is only supported with boundary set")]
153+
BoundaryNotSet,
152154
}
153155

154156
pub type Result<T> = result::Result<T, Error>;
@@ -165,6 +167,7 @@ impl ErrorCodeExt for Error {
165167
Error::Other(_) => error_code::UNKNOWN,
166168
Error::EntriesUnavailable => error_code::engine::DATALOSS,
167169
Error::EntriesCompacted => error_code::engine::DATACOMPACTED,
170+
Error::BoundaryNotSet => error_code::engine::BOUNDARY_NOT_SET,
168171
}
169172
}
170173
}

components/engine_traits_tests/src/iterator.rs

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,9 @@ fn iter_empty_engine() {
4141
#[test]
4242
fn iter_empty_snapshot() {
4343
let db = default_engine();
44-
iter_empty(&db.engine, |e| e.snapshot().iterator(CF_DEFAULT).unwrap());
44+
iter_empty(&db.engine, |e| {
45+
e.snapshot(None).iterator(CF_DEFAULT).unwrap()
46+
});
4547
}
4648

4749
fn iter_forward<E, I, IF>(e: &E, i: IF)
@@ -99,7 +101,9 @@ fn iter_forward_engine() {
99101
#[test]
100102
fn iter_forward_snapshot() {
101103
let db = default_engine();
102-
iter_forward(&db.engine, |e| e.snapshot().iterator(CF_DEFAULT).unwrap());
104+
iter_forward(&db.engine, |e| {
105+
e.snapshot(None).iterator(CF_DEFAULT).unwrap()
106+
});
103107
}
104108

105109
fn iter_reverse<E, I, IF>(e: &E, i: IF)
@@ -157,7 +161,9 @@ fn iter_reverse_engine() {
157161
#[test]
158162
fn iter_reverse_snapshot() {
159163
let db = default_engine();
160-
iter_reverse(&db.engine, |e| e.snapshot().iterator(CF_DEFAULT).unwrap());
164+
iter_reverse(&db.engine, |e| {
165+
e.snapshot(None).iterator(CF_DEFAULT).unwrap()
166+
});
161167
}
162168

163169
fn seek_to_key_then_forward<E, I, IF>(e: &E, i: IF)
@@ -198,7 +204,9 @@ fn seek_to_key_then_forward_engine() {
198204
#[test]
199205
fn seek_to_key_then_forward_snapshot() {
200206
let db = default_engine();
201-
seek_to_key_then_forward(&db.engine, |e| e.snapshot().iterator(CF_DEFAULT).unwrap());
207+
seek_to_key_then_forward(&db.engine, |e| {
208+
e.snapshot(None).iterator(CF_DEFAULT).unwrap()
209+
});
202210
}
203211

204212
fn seek_to_key_then_reverse<E, I, IF>(e: &E, i: IF)
@@ -239,7 +247,9 @@ fn seek_to_key_then_reverse_engine() {
239247
#[test]
240248
fn seek_to_key_then_reverse_snapshot() {
241249
let db = default_engine();
242-
seek_to_key_then_reverse(&db.engine, |e| e.snapshot().iterator(CF_DEFAULT).unwrap());
250+
seek_to_key_then_reverse(&db.engine, |e| {
251+
e.snapshot(None).iterator(CF_DEFAULT).unwrap()
252+
});
243253
}
244254

245255
fn iter_forward_then_reverse<E, I, IF>(e: &E, i: IF)
@@ -300,7 +310,9 @@ fn iter_forward_then_reverse_engine() {
300310
#[test]
301311
fn iter_forward_then_reverse_snapshot() {
302312
let db = default_engine();
303-
iter_forward_then_reverse(&db.engine, |e| e.snapshot().iterator(CF_DEFAULT).unwrap());
313+
iter_forward_then_reverse(&db.engine, |e| {
314+
e.snapshot(None).iterator(CF_DEFAULT).unwrap()
315+
});
304316
}
305317

306318
fn iter_reverse_then_forward<E, I, IF>(e: &E, i: IF)
@@ -361,7 +373,9 @@ fn iter_reverse_then_forward_engine() {
361373
#[test]
362374
fn iter_reverse_then_forward_snapshot() {
363375
let db = default_engine();
364-
iter_reverse_then_forward(&db.engine, |e| e.snapshot().iterator(CF_DEFAULT).unwrap());
376+
iter_reverse_then_forward(&db.engine, |e| {
377+
e.snapshot(None).iterator(CF_DEFAULT).unwrap()
378+
});
365379
}
366380

367381
// When seek finds an exact key then seek_for_prev behaves just like seek
@@ -405,7 +419,9 @@ fn seek_for_prev_engine() {
405419
#[test]
406420
fn seek_for_prev_snapshot() {
407421
let db = default_engine();
408-
seek_for_prev(&db.engine, |e| e.snapshot().iterator(CF_DEFAULT).unwrap());
422+
seek_for_prev(&db.engine, |e| {
423+
e.snapshot(None).iterator(CF_DEFAULT).unwrap()
424+
});
409425
}
410426

411427
// When Seek::Key doesn't find an exact match,
@@ -440,7 +456,9 @@ fn seek_key_miss_engine() {
440456
#[test]
441457
fn seek_key_miss_snapshot() {
442458
let db = default_engine();
443-
seek_key_miss(&db.engine, |e| e.snapshot().iterator(CF_DEFAULT).unwrap());
459+
seek_key_miss(&db.engine, |e| {
460+
e.snapshot(None).iterator(CF_DEFAULT).unwrap()
461+
});
444462
}
445463

446464
fn seek_key_prev_miss<E, I, IF>(e: &E, i: IF)
@@ -472,5 +490,7 @@ fn seek_key_prev_miss_engine() {
472490
#[test]
473491
fn seek_key_prev_miss_snapshot() {
474492
let db = default_engine();
475-
seek_key_prev_miss(&db.engine, |e| e.snapshot().iterator(CF_DEFAULT).unwrap());
493+
seek_key_prev_miss(&db.engine, |e| {
494+
e.snapshot(None).iterator(CF_DEFAULT).unwrap()
495+
});
476496
}

components/engine_traits_tests/src/read_consistency.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ fn snapshot_with_writes() {
1212

1313
db.engine.put(b"a", b"aa").unwrap();
1414

15-
let snapshot = db.engine.snapshot();
15+
let snapshot = db.engine.snapshot(None);
1616

1717
assert_eq!(snapshot.get_value(b"a").unwrap().unwrap(), b"aa");
1818

@@ -77,5 +77,7 @@ fn iterator_with_writes_engine() {
7777
#[test]
7878
fn iterator_with_writes_snapshot() {
7979
let db = default_engine();
80-
iterator_with_writes(&db.engine, |e| e.snapshot().iterator(CF_DEFAULT).unwrap());
80+
iterator_with_writes(&db.engine, |e| {
81+
e.snapshot(None).iterator(CF_DEFAULT).unwrap()
82+
});
8183
}

0 commit comments

Comments
 (0)