diff --git a/.github/copilot-instructions.md b/.github/copilot-instructions.md new file mode 100644 index 000000000..87712678c --- /dev/null +++ b/.github/copilot-instructions.md @@ -0,0 +1,70 @@ +# GitHub Copilot Instructions for lsm-tree (structured-world fork) + +## Project Overview + +This is a **maintained fork** of [fjall-rs/lsm-tree](https://github.com/fjall-rs/lsm-tree) — a K.I.S.S. LSM-tree implementation in Rust. We maintain additional features and hardening for the [CoordiNode](https://github.com/structured-world/coordinode) database engine while contributing patches upstream. + +## Review Scope Rules (CRITICAL) + +**Review ONLY code within the PR's diff.** For issues found in code outside the diff: +- Do NOT suggest inline code fixes for unchanged lines +- Instead, suggest creating a **separate issue** with the finding (e.g., "Consider opening an issue to add size validation in `from_reader` — this is outside the scope of this clippy-fix PR") + +**Each PR has a defined scope in its description.** Read the "out of scope" section before reviewing. If something is listed as out of scope, do not flag it — it is tracked in another PR. + +**Cross-PR awareness:** This fork has multiple feature branches in parallel. If a hardening or feature seems missing, check whether it exists in another open PR before suggesting it. Reference the other PR number if known. + +**Prefer issue suggestions over code suggestions for out-of-scope findings.** This keeps PRs focused and reviewable. + +## Rust Code Standards + +- **Unsafe code:** Prefer safe alternatives. If `unsafe` is required, it must have a `// SAFETY:` comment explaining the invariant. +- **Error handling:** No `unwrap()` or `expect()` on I/O paths. Use `Result` propagation. `expect()` is acceptable for programmer invariants (e.g., lock poisoning) with `#[expect(clippy::expect_used, reason = "...")]`. +- **Clippy:** Code must pass `cargo clippy --all-features -- -D warnings`. Use `#[expect(...)]` (not `#[allow(...)]`) for justified suppressions — `#[expect]` warns if the suppression becomes unnecessary. +- **Casts:** Prefer `TryFrom`/`TryInto` for fallible conversions. `as` casts are acceptable for infallible cases (e.g., `u32` to `u64`) with `#[expect(clippy::cast_possible_truncation)]` and a reason. +- **Feature gates:** Code behind `#[cfg(feature = "...")]` must compile with any combination of features. Variables used only in feature-gated branches must also be feature-gated. + +## Testing Standards + +- **Corruption tests:** When adding validation for on-disk data, add a test that tampers the relevant field and asserts the error. Use the same serialization path as production (e.g., `lz4_flex::compress` not `compress_prepend_size`). +- **No mocks for storage:** Tests use real on-disk files via `tempfile::tempdir()`. +- **Test naming:** `fn __()` — e.g., `fn lz4_corrupted_header_triggers_decompress_error()`. + +## Commit Message Format + +``` +(scope): + +- Detail 1 +- Detail 2 +``` + +Types: `feat`, `fix`, `refactor`, `test`, `docs`, `style`, `chore`, `perf`, `ci`, `build`, `revert` + +**Forbidden patterns:** "address review", "fix PR comments", "WIP", "temporary" + +## Build and Test + +```bash +cargo clippy --all-features -- -D warnings # Lint (strict) +cargo test --features lz4 # Tests with lz4 +cargo test --all-features # Tests with all features +cargo fmt --all -- --check # Format check +``` + +## Feature Flags + +| Flag | Description | +|------|-------------| +| `lz4` | LZ4 compression (enabled by default in fjall) | +| `zstd` | Zstd compression (PR #1) | +| `bytes_1` | Use `bytes` crate for Slice type | +| `metrics` | Expose prometheus metrics | + +## Architecture Notes + +- `src/table/block/` — On-disk block format (header + compressed payload) +- `src/vlog/blob_file/` — Value log for large values (separate from LSM blocks) +- `src/compaction/` — Compaction strategies (leveled, FIFO, tiered) +- `src/seqno.rs` — Sequence number generator (MVCC versioning) +- Compression is pluggable via `CompressionType` enum with `#[cfg(feature)]` variants diff --git a/.github/dependabot.yml b/.github/dependabot.yml new file mode 100644 index 000000000..a45d101ac --- /dev/null +++ b/.github/dependabot.yml @@ -0,0 +1,22 @@ +version: 2 +updates: + - package-ecosystem: "cargo" + directory: "/" + schedule: + interval: "weekly" + open-pull-requests-limit: 10 + groups: + minor-and-patch: + patterns: + - "*" + update-types: + - "minor" + - "patch" + commit-message: + prefix: "chore(deps)" + - package-ecosystem: "github-actions" + directory: "/" + schedule: + interval: "weekly" + commit-message: + prefix: "ci(deps)" diff --git a/.github/instructions/code-review.instructions.md b/.github/instructions/code-review.instructions.md new file mode 100644 index 000000000..bff87e21f --- /dev/null +++ b/.github/instructions/code-review.instructions.md @@ -0,0 +1,27 @@ +--- +applyTo: "**/*.rs" +--- + +# Code Review Instructions for lsm-tree (structured-world fork) + +## Scope Rules (CRITICAL) + +- **Review ONLY code within the PR's diff.** Do not suggest inline fixes for unchanged lines. +- For issues in code **outside the diff**, suggest creating a **separate issue** instead of proposing code changes. Example: "Consider opening an issue to add size validation here — this is outside this PR's scope." +- **Read the PR description carefully.** If the PR body has an "out of scope" section listing items handled by other PRs, do NOT flag those items. +- This fork has **multiple feature branches in parallel**. A hardening that seems missing in one PR may already exist in another open PR. Check the "out of scope" section for cross-references. + +## Rust Standards + +- `unsafe` blocks require `// SAFETY:` comments explaining the invariant +- Prefer `#[expect(lint)]` over `#[allow(lint)]` — `#[expect]` warns when suppression becomes unnecessary +- Use `TryFrom`/`TryInto` for fallible conversions; `as` casts need `#[expect(clippy::cast_possible_truncation)]` with reason +- No `unwrap()` / `expect()` on I/O paths — use `Result` propagation +- `expect()` is acceptable for programmer invariants (lock poisoning) with `#[expect(clippy::expect_used, reason = "...")]` +- Code must pass `cargo clippy --all-features -- -D warnings` + +## Testing + +- Corruption/validation tests: tamper the relevant on-disk field and assert the error +- Use same serialization as production (e.g., `lz4_flex::compress` not `compress_prepend_size`) +- Test naming: `fn __()` diff --git a/.github/workflows/coordinode-ci.yml b/.github/workflows/coordinode-ci.yml new file mode 100644 index 000000000..eb0ca6396 --- /dev/null +++ b/.github/workflows/coordinode-ci.yml @@ -0,0 +1,92 @@ +name: CoordiNode CI + +on: + push: + branches: + - main + - "feat/#*" + - "fix/#*" + pull_request: + branches: + - main + +env: + CARGO_TERM_COLOR: always + +jobs: + lint: + # Fast gate — runs on every push and PR + timeout-minutes: 10 + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v6 + - uses: dtolnay/rust-toolchain@stable + with: + components: rustfmt, clippy + - uses: Swatinem/rust-cache@v2 + - name: Format + run: cargo fmt --all -- --check + - name: Clippy (strict) + run: cargo clippy --all-features -- -D warnings + + test: + # Full matrix — only on push to main/feature branches (not on PRs) + if: github.event_name == 'push' + needs: lint + timeout-minutes: 20 + strategy: + matrix: + rust_version: [stable, "1.90.0"] + os: [ubuntu-latest, windows-latest, macos-latest] + runs-on: ${{ matrix.os }} + steps: + - uses: actions/checkout@v6 + - uses: dtolnay/rust-toolchain@stable + with: + toolchain: ${{ matrix.rust_version }} + - uses: Swatinem/rust-cache@v2 + with: + prefix-key: ${{ runner.os }}-cargo + - uses: taiki-e/install-action@nextest + - name: Run tests + run: cargo nextest run --all-features + - name: Run doc tests + run: cargo test --doc --features lz4 + + test-pr: + # Lightweight — only on PRs (ubuntu stable) + if: github.event_name == 'pull_request' + needs: lint + timeout-minutes: 15 + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v6 + - uses: dtolnay/rust-toolchain@stable + - uses: Swatinem/rust-cache@v2 + - uses: taiki-e/install-action@nextest + - name: Run tests + run: cargo nextest run --all-features + - name: Run doc tests + run: cargo test --doc --features lz4 + + codecov: + if: github.event_name == 'push' + needs: lint + timeout-minutes: 20 + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v6 + - uses: dtolnay/rust-toolchain@nightly + with: + components: llvm-tools-preview + - uses: Swatinem/rust-cache@v2 + - uses: taiki-e/install-action@cargo-llvm-cov + - uses: taiki-e/install-action@nextest + - run: cargo llvm-cov --no-report nextest --all-features + - run: cargo llvm-cov --no-report --doc --features lz4 + - run: cargo llvm-cov report --doctests --lcov --output-path lcov.info + - uses: codecov/codecov-action@v5 + with: + files: lcov.info + env: + CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} diff --git a/.github/workflows/coordinode-release.yml b/.github/workflows/coordinode-release.yml new file mode 100644 index 000000000..0e15c138f --- /dev/null +++ b/.github/workflows/coordinode-release.yml @@ -0,0 +1,31 @@ +name: Release + +on: + push: + branches: + - main + +permissions: + contents: write + pull-requests: write + +jobs: + release-plz: + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v6 + with: + fetch-depth: 0 + + - name: Install Rust toolchain + uses: dtolnay/rust-toolchain@stable + + - name: Run release-plz + uses: release-plz/action@v0.5 + with: + command: release-pr + config: .release-plz.toml + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + CARGO_REGISTRY_TOKEN: ${{ secrets.CARGO_REGISTRY_TOKEN }} diff --git a/.github/workflows/upstream-monitor.yml b/.github/workflows/upstream-monitor.yml new file mode 100644 index 000000000..8def494b2 --- /dev/null +++ b/.github/workflows/upstream-monitor.yml @@ -0,0 +1,130 @@ +name: Upstream Monitor + +on: + schedule: + - cron: "0 8 * * 1,4" + workflow_dispatch: + +permissions: + contents: write + pull-requests: write + +jobs: + check-upstream: + timeout-minutes: 10 + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v6 + with: + fetch-depth: 0 + + - name: Add upstream remote + run: git remote add upstream https://github.com/fjall-rs/lsm-tree.git + + - name: Fetch upstream and origin + run: | + git fetch upstream main + git fetch origin main + + - name: Check for new upstream commits + id: check + run: | + BEHIND=$(git rev-list origin/main..upstream/main --count) + echo "behind=$BEHIND" >> "$GITHUB_OUTPUT" + echo "Commits behind upstream: $BEHIND" + + - name: Try merge and create PR or issue + if: steps.check.outputs.behind > 0 + env: + GH_TOKEN: ${{ secrets.GITHUB_TOKEN }} + BEHIND: ${{ steps.check.outputs.behind }} + run: | + git config user.name "github-actions[bot]" + git config user.email "github-actions[bot]@users.noreply.github.com" + + SYNC_BRANCH="chore/upstream-sync-$(date +%Y%m%d)" + git checkout -b "$SYNC_BRANCH" origin/main + + if git merge --no-commit --no-ff upstream/main 2>&1; then + git commit -m "chore: sync upstream ($BEHIND new commits)" + git push origin "$SYNC_BRANCH" + + gh pr create \ + --title "chore: sync upstream ($BEHIND new commits)" \ + --body "$(cat <<'EOF' + ## Upstream Sync + + Automated sync from [fjall-rs/lsm-tree](https://github.com/fjall-rs/lsm-tree) main branch. + + **Commits behind:** ${{ steps.check.outputs.behind }} + + ### Review checklist + - [ ] Review upstream changes for breaking modifications + - [ ] Verify our patches still apply cleanly + - [ ] Run full test suite + EOF + )" \ + --base main \ + --head "$SYNC_BRANCH" + else + CONFLICTS=$(git diff --name-only --diff-filter=U 2>/dev/null || true) + git merge --abort + + gh issue create \ + --title "Upstream sync conflict ($BEHIND new commits)" \ + --body "$(cat </dev/null; then + echo "Branch '$BRANCH' is fully merged into upstream/main" + + EXISTING=$(gh issue list --search "Upstream merged: $BRANCH" --state open --json number --jq 'length') + if [ "$EXISTING" = "0" ]; then + gh issue create \ + --title "Upstream merged: $BRANCH" \ + --body "$(cat <

-[![CI](https://github.com/fjall-rs/lsm-tree/actions/workflows/test.yml/badge.svg)](https://github.com/fjall-rs/lsm-tree/actions/workflows/test.yml) +[![CI](https://github.com/structured-world/lsm-tree/actions/workflows/coordinode-ci.yml/badge.svg)](https://github.com/structured-world/lsm-tree/actions/workflows/coordinode-ci.yml) +[![Upstream CI](https://github.com/fjall-rs/lsm-tree/actions/workflows/test.yml/badge.svg)](https://github.com/fjall-rs/lsm-tree/actions/workflows/test.yml) [![docs.rs](https://img.shields.io/docsrs/lsm-tree?color=green)](https://docs.rs/lsm-tree) [![Crates.io](https://img.shields.io/crates/v/lsm-tree?color=blue)](https://crates.io/crates/lsm-tree) ![MSRV](https://img.shields.io/badge/MSRV-1.90.0-blue) -[![dependency status](https://deps.rs/repo/github/fjall-rs/lsm-tree/status.svg)](https://deps.rs/repo/github/fjall-rs/lsm-tree) + +> **Maintained fork** by [Structured World Foundation](https://sw.foundation) for the [CoordiNode](https://github.com/structured-world/coordinode) database engine. +> Based on [fjall-rs/lsm-tree](https://github.com/fjall-rs/lsm-tree). We contribute patches upstream and maintain additional features needed for CoordiNode (zstd compression, custom sequence number generators, batch get, intra-L0 compaction, security hardening). A K.I.S.S. implementation of log-structured merge trees (LSM-trees/LSMTs) in Rust. @@ -68,12 +71,24 @@ Uses [`bytes`](https://github.com/tokio-rs/bytes) as the underlying `Slice` type cargo bench --features lz4 ``` +## Support the Project + +
+ +![USDT TRC-20 Donation QR Code](assets/usdt-qr.svg) + +USDT (TRC-20): `TFDsezHa1cBkoeZT5q2T49Wp66K8t2DmdA` + +
+ ## License All source code is licensed under MIT OR Apache-2.0. All contributions are to be licensed as MIT OR Apache-2.0. +Original project by [fjall-rs](https://github.com/fjall-rs/lsm-tree). This fork is maintained by [Structured World Foundation](https://sw.foundation). + ## Footnotes [1] https://rocksdb.org/blog/2017/05/12/partitioned-index-filter.html diff --git a/assets/usdt-qr.svg b/assets/usdt-qr.svg new file mode 100644 index 000000000..a8dd3dcf5 --- /dev/null +++ b/assets/usdt-qr.svg @@ -0,0 +1 @@ + diff --git a/clippy.toml b/clippy.toml new file mode 100644 index 000000000..519df83ba --- /dev/null +++ b/clippy.toml @@ -0,0 +1 @@ +allowed-duplicate-crates = ["hashbrown"] diff --git a/src/abstract_tree.rs b/src/abstract_tree.rs index 0a3f123a2..c6f1aee9b 100644 --- a/src/abstract_tree.rs +++ b/src/abstract_tree.rs @@ -511,6 +511,84 @@ pub trait AbstractTree { self.get(key, seqno).map(|x| x.is_some()) } + /// Returns `true` if the tree contains any key with the given prefix. + /// + /// This is a convenience method that checks whether the corresponding + /// prefix iterator yields at least one item, while surfacing any IO + /// errors via the `Result` return type. Implementations may override + /// this method to provide a more efficient prefix-existence check. + /// + /// # Examples + /// + /// ``` + /// # let folder = tempfile::tempdir()?; + /// use lsm_tree::{AbstractTree, Config, Tree}; + /// + /// let tree = Config::new(folder, Default::default(), Default::default()).open()?; + /// assert!(!tree.contains_prefix("abc", 0, None)?); + /// + /// tree.insert("abc:1", "value", 0); + /// assert!(tree.contains_prefix("abc", 1, None)?); + /// assert!(!tree.contains_prefix("xyz", 1, None)?); + /// # + /// # Ok::<(), lsm_tree::Error>(()) + /// ``` + /// + /// # Errors + /// + /// Will return `Err` if an IO error occurs. + fn contains_prefix>( + &self, + prefix: K, + seqno: SeqNo, + index: Option<(Arc, SeqNo)>, + ) -> crate::Result { + match self.prefix(prefix, seqno, index).next() { + Some(guard) => guard.key().map(|_| true), + None => Ok(false), + } + } + + /// Reads multiple keys from the tree. + /// + /// Implementations may choose to perform all lookups against a single + /// version snapshot and acquire the version lock only once, which can be + /// more efficient than calling [`AbstractTree::get`] in a loop. The + /// default trait implementation, however, is a convenience wrapper that + /// simply calls [`AbstractTree::get`] for each key and therefore does not + /// guarantee a single-snapshot or single-lock acquisition. Optimized + /// implementations (such as [`Tree`] and [`BlobTree`]) provide the + /// single-snapshot/one-lock behavior. + /// + /// # Examples + /// + /// ``` + /// # let folder = tempfile::tempdir()?; + /// use lsm_tree::{AbstractTree, Config, Tree}; + /// + /// let tree = Config::new(folder, Default::default(), Default::default()).open()?; + /// tree.insert("a", "value_a", 0); + /// tree.insert("b", "value_b", 1); + /// + /// let results = tree.multi_get(["a", "b", "c"], 2)?; + /// assert_eq!(results[0], Some("value_a".as_bytes().into())); + /// assert_eq!(results[1], Some("value_b".as_bytes().into())); + /// assert_eq!(results[2], None); + /// # + /// # Ok::<(), lsm_tree::Error>(()) + /// ``` + /// + /// # Errors + /// + /// Will return `Err` if an IO error occurs. + fn multi_get>( + &self, + keys: impl IntoIterator, + seqno: SeqNo, + ) -> crate::Result>> { + keys.into_iter().map(|key| self.get(key, seqno)).collect() + } + /// Inserts a key-value pair into the tree. /// /// If the key already exists, the item will be overwritten. diff --git a/src/blob_tree/mod.rs b/src/blob_tree/mod.rs index 73ea1c119..58777024c 100644 --- a/src/blob_tree/mod.rs +++ b/src/blob_tree/mod.rs @@ -163,6 +163,29 @@ impl BlobTree { blobs_folder: Arc::new(blobs_folder), }) } + + /// Resolves a single key against a pre-acquired [`SuperVersion`]. + fn resolve_key( + &self, + super_version: &crate::version::SuperVersion, + key: &[u8], + seqno: SeqNo, + ) -> crate::Result> { + let Some(item) = crate::Tree::get_internal_entry_from_version(super_version, key, seqno)? + else { + return Ok(None); + }; + + let (_, v) = resolve_value_handle( + self.id(), + self.blobs_folder.as_path(), + &self.index.config.cache, + &super_version.version, + item, + )?; + + Ok(Some(v)) + } } impl AbstractTree for BlobTree { @@ -332,6 +355,7 @@ impl AbstractTree for BlobTree { self.index.get_flush_lock() } + #[expect(clippy::too_many_lines, reason = "flush logic is inherently complex")] fn flush_to_tables( &self, stream: impl Iterator>, @@ -555,6 +579,18 @@ impl AbstractTree for BlobTree { self.index.contains_key(key, seqno) } + // NOTE: Override the default implementation to delegate directly + // to the index tree, avoiding extra iterator/guard overhead for + // prefix checks + fn contains_prefix>( + &self, + prefix: K, + seqno: SeqNo, + index: Option<(Arc, SeqNo)>, + ) -> crate::Result { + self.index.contains_prefix(prefix, seqno, index) + } + // NOTE: Override the default implementation to not fetch // data from the value log, so we get much faster scans fn len(&self, seqno: SeqNo, index: Option<(Arc, SeqNo)>) -> crate::Result { @@ -584,30 +620,20 @@ impl AbstractTree for BlobTree { } fn get>(&self, key: K, seqno: SeqNo) -> crate::Result> { - let key = key.as_ref(); - - #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")] - let super_version = self - .index - .version_history - .read() - .expect("lock is poisoned") - .get_version_for_snapshot(seqno); - - let Some(item) = crate::Tree::get_internal_entry_from_version(&super_version, key, seqno)? - else { - return Ok(None); - }; + let super_version = self.index.get_version_for_snapshot(seqno); + self.resolve_key(&super_version, key.as_ref(), seqno) + } - let (_, v) = resolve_value_handle( - self.id(), - self.blobs_folder.as_path(), - &self.index.config.cache, - &super_version.version, - item, - )?; + fn multi_get>( + &self, + keys: impl IntoIterator, + seqno: SeqNo, + ) -> crate::Result>> { + let super_version = self.index.get_version_for_snapshot(seqno); - Ok(Some(v)) + keys.into_iter() + .map(|key| self.resolve_key(&super_version, key.as_ref(), seqno)) + .collect() } fn remove>(&self, key: K, seqno: SeqNo) -> (u64, u64) { diff --git a/src/compaction/leveled/mod.rs b/src/compaction/leveled/mod.rs index f4bd190ab..213707d27 100644 --- a/src/compaction/leveled/mod.rs +++ b/src/compaction/leveled/mod.rs @@ -279,18 +279,31 @@ impl CompactionStrategy for Strategy { // Trivial move into Lmax 'trivial_lmax: { + #[expect( + clippy::expect_used, + reason = "level 0 is guaranteed to exist in a valid version" + )] let l0 = version.level(0).expect("first level should exist"); if !l0.is_empty() && l0.is_disjoint() { let lmax_index = version.level_count() - 1; - if (1..lmax_index) - .any(|idx| !version.level(idx).expect("level should exist").is_empty()) - { + if (1..lmax_index).any(|idx| { + #[expect( + clippy::expect_used, + reason = "levels within level_count are guaranteed to exist" + )] + let level = version.level(idx).expect("level should exist"); + !level.is_empty() + }) { // There are intermediary levels with data, cannot trivially move to Lmax break 'trivial_lmax; } + #[expect( + clippy::expect_used, + reason = "lmax_index is derived from level_count so level is guaranteed to exist" + )] let lmax = version.level(lmax_index).expect("last level should exist"); if !lmax @@ -299,6 +312,10 @@ impl CompactionStrategy for Strategy { { return Choice::Move(CompactionInput { table_ids: l0.list_ids(), + #[expect( + clippy::cast_possible_truncation, + reason = "level count is at most 7, fits in u8" + )] dest_level: lmax_index as u8, canonical_level: 1, target_size: self.target_size, diff --git a/src/compaction/worker.rs b/src/compaction/worker.rs index b1f7052b4..951fadbbf 100644 --- a/src/compaction/worker.rs +++ b/src/compaction/worker.rs @@ -19,7 +19,8 @@ use crate::{ tree::inner::TreeId, version::{Run, SuperVersions, Version}, vlog::{BlobFileMergeScanner, BlobFileScanner, BlobFileWriter}, - BlobFile, Config, HashSet, InternalValue, SeqNo, SequenceNumberCounter, Table, TableId, + BlobFile, Config, HashSet, InternalValue, SeqNo, SequenceNumberCounter, + SharedSequenceNumberGenerator, Table, TableId, }; use std::{ sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard}, @@ -35,9 +36,9 @@ pub type CompactionReader<'a> = Box( }) } +#[expect( + clippy::significant_drop_tightening, + reason = "version_history_lock must be held across upgrade_version and maintenance" +)] fn move_tables( compaction_state: &MutexGuard<'_, CompactionState>, opts: &Options, diff --git a/src/config/mod.rs b/src/config/mod.rs index 8b8aac8dd..5f455a523 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -21,7 +21,8 @@ pub type PartitioningPolicy = PinningPolicy; use crate::{ compaction::filter::Factory, path::absolute_path, version::DEFAULT_LEVEL_COUNT, AnyTree, - BlobTree, Cache, CompressionType, DescriptorTable, SequenceNumberCounter, Tree, + BlobTree, Cache, CompressionType, DescriptorTable, SequenceNumberCounter, + SharedSequenceNumberGenerator, Tree, }; use std::{ path::{Path, PathBuf}, @@ -234,10 +235,10 @@ pub struct Config { /// The global sequence number generator /// - /// Should be shared between multple trees of a database - pub(crate) seqno: SequenceNumberCounter, + /// Should be shared between multiple trees of a database + pub(crate) seqno: SharedSequenceNumberGenerator, - pub(crate) visible_seqno: SequenceNumberCounter, + pub(crate) visible_seqno: SharedSequenceNumberGenerator, } // TODO: remove default? @@ -246,8 +247,8 @@ impl Default for Config { Self { path: absolute_path(Path::new(DEFAULT_FILE_FOLDER)), descriptor_table: Some(Arc::new(DescriptorTable::new(256))), - seqno: SequenceNumberCounter::default(), - visible_seqno: SequenceNumberCounter::default(), + seqno: SharedSequenceNumberGenerator::from(SequenceNumberCounter::default()), + visible_seqno: SharedSequenceNumberGenerator::from(SequenceNumberCounter::default()), cache: Arc::new(Cache::with_capacity_bytes( /* 16 MiB */ 16 * 1_024 * 1_024, @@ -307,12 +308,29 @@ impl Config { ) -> Self { Self { path: absolute_path(path.as_ref()), - seqno, - visible_seqno, + seqno: Arc::new(seqno), + visible_seqno: Arc::new(visible_seqno), ..Default::default() } } + /// Overrides the sequence number generator. + /// + /// By default, [`SequenceNumberCounter`] is used. This allows plugging in + /// a custom generator (e.g., HLC for distributed databases). + #[must_use] + pub fn seqno_generator(mut self, generator: SharedSequenceNumberGenerator) -> Self { + self.seqno = generator; + self + } + + /// Overrides the visible sequence number generator. + #[must_use] + pub fn visible_seqno_generator(mut self, generator: SharedSequenceNumberGenerator) -> Self { + self.visible_seqno = generator; + self + } + /// Sets the global cache. /// /// You can create a global [`Cache`] and share it between multiple @@ -479,4 +497,22 @@ impl Config { AnyTree::Standard(Tree::open(self)?) }) } + + /// Like [`Config::new`], but accepts pre-built shared generators. + /// + /// This is useful when the caller already has + /// [`SharedSequenceNumberGenerator`] instances (e.g., from a higher-level + /// database that shares generators across multiple trees). + pub fn new_with_generators>( + path: P, + seqno: SharedSequenceNumberGenerator, + visible_seqno: SharedSequenceNumberGenerator, + ) -> Self { + Self { + path: absolute_path(path.as_ref()), + seqno, + visible_seqno, + ..Default::default() + } + } } diff --git a/src/lib.rs b/src/lib.rs index d429d71c5..ea505e657 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -144,6 +144,10 @@ pub mod util; mod value; mod value_type; + +/// Integrity verification for SST and blob files. +pub mod verify; + mod version; mod vlog; @@ -183,7 +187,9 @@ pub use { ingestion::AnyIngestion, iter_guard::IterGuard as Guard, memtable::{Memtable, MemtableId}, - seqno::SequenceNumberCounter, + seqno::{ + SequenceNumberCounter, SequenceNumberGenerator, SharedSequenceNumberGenerator, MAX_SEQNO, + }, slice::Slice, tree::Tree, value::SeqNo, diff --git a/src/manifest.rs b/src/manifest.rs index 38ae44ec3..f5a9886b8 100644 --- a/src/manifest.rs +++ b/src/manifest.rs @@ -8,6 +8,10 @@ use std::{io::Read, path::Path}; pub struct Manifest { pub version: FormatVersion, + #[expect( + dead_code, + reason = "deserialized from on-disk manifest, retained for validation" + )] pub tree_type: TreeType, pub level_count: u8, } diff --git a/src/seqno.rs b/src/seqno.rs index 1fd8fb790..c78ee31ea 100644 --- a/src/seqno.rs +++ b/src/seqno.rs @@ -3,14 +3,122 @@ // (found in the LICENSE-* files in the repository) use crate::SeqNo; -use std::sync::{ - atomic::{ - AtomicU64, - Ordering::{AcqRel, Acquire, Release}, +use std::{ + fmt::Debug, + sync::{ + atomic::{ + AtomicU64, + Ordering::{AcqRel, Acquire, Release}, + }, + Arc, }, - Arc, }; +/// The maximum allowed sequence number value. +/// +/// The MSB (`0x8000_0000_0000_0000`) is reserved for internal use by the +/// transaction layer, so all sequence numbers must be strictly less than +/// this boundary. Values returned by [`SequenceNumberGenerator::next`] +/// must be at most `MAX_SEQNO - 1` (`0x7FFF_FFFF_FFFF_FFFE`), leaving +/// room for `seqno + 1` operations used internally by the engine. +pub const MAX_SEQNO: SeqNo = 0x7FFF_FFFF_FFFF_FFFF; + +/// Trait for custom sequence number generation. +/// +/// Implementations must be thread-safe and provide atomic operations +/// for sequence number management. +/// +/// # Invariants +/// +/// Implementors **must** respect the following invariants, which are +/// relied upon by the storage engine (e.g., for MVCC and transactions): +/// +/// - The most-significant bit (MSB) of a sequence number is reserved +/// for internal use by the transaction layer. +/// - All sequence numbers produced by this generator **must** therefore +/// be strictly less than `0x8000_0000_0000_0000`. +/// - Calls to [`SequenceNumberGenerator::next`] on a given generator +/// instance must return monotonically increasing values (each `next` +/// returns a value that is strictly greater than any value previously +/// returned by `next` on the same instance) until the reserved MSB +/// boundary is reached. +/// - In practice, this means `next`-generated sequence numbers are +/// unique for the lifetime of the generator (modulo wrap-around, which +/// must not occur within the reserved MSB range). +/// +/// Implementors are also responsible for ensuring that [`get`] does not +/// return values that violate these invariants, and that [`set`] and +/// [`fetch_max`] do not violate them (e.g., by setting the counter to a +/// value at or above `0x8000_0000_0000_0000`, or by moving the counter +/// backwards such that subsequent calls to [`next`] would observe +/// non-monotonic sequence numbers). +/// +/// The [`set`] method is a special-case escape hatch intended for use +/// during initialization or recovery. It may move the counter forwards +/// or backwards and therefore **may** cause subsequent calls to +/// [`next`] to observe non-monotonic sequence numbers. This is +/// permitted; callers are responsible for using [`fetch_max`] (or other +/// application-level logic) after `set` if they need to reestablish any +/// monotonicity guarantees. +/// +/// # Supertraits +/// +/// `UnwindSafe + RefUnwindSafe` are required because generators may be +/// captured inside `catch_unwind` (e.g., during ingestion error recovery). +/// +/// The default implementation is [`SequenceNumberCounter`], which uses +/// an `Arc` for lock-free atomic operations. It enforces the +/// MSB boundary in `new`, `set`, and `fetch_max` (via assert/clamp), +/// and panics in `next` if the counter reaches the reserved range. +pub trait SequenceNumberGenerator: + Send + Sync + Debug + std::panic::UnwindSafe + std::panic::RefUnwindSafe + 'static +{ + /// Gets the next sequence number, atomically incrementing the counter. + /// + /// This must: + /// - Return a value strictly greater than any value previously returned + /// by `next` on the same generator instance (monotonic increase). + /// - Never return a value greater than `MAX_SEQNO - 1`, leaving room + /// for internal `seqno + 1` operations that must remain strictly + /// less than [`MAX_SEQNO`]. + #[must_use] + fn next(&self) -> SeqNo; + + /// Gets the current sequence number without incrementing. + /// + /// This should be consistent with the value that will be used as the + /// basis for the next call to [`next`], and must not itself alter the + /// monotonicity guarantees. + #[must_use] + fn get(&self) -> SeqNo; + + /// Sets the sequence number to the given value. + /// + /// Implementations must ensure that `value` is strictly less than + /// `0x8000_0000_0000_0000` (the MSB is reserved). + /// + /// This method is intended for direct overrides (e.g., during + /// initialization or recovery) and is **not** required to preserve + /// monotonicity with respect to values previously returned by + /// [`next`]. In particular, calling `set` with a value lower than a + /// previously returned `next` value may cause subsequent calls to + /// [`next`] to observe non-monotonic sequence numbers. This is + /// allowed; callers that need to advance the sequence number in a + /// monotonic fashion should prefer [`fetch_max`] instead of `set`. + fn set(&self, value: SeqNo); + + /// Atomically updates the sequence number to the maximum of + /// the current value and the given value. + /// + /// The resulting stored value must: + /// - Remain strictly less than `0x8000_0000_0000_0000`. + /// - Preserve the monotonicity guarantees expected by [`next`]. + fn fetch_max(&self, value: SeqNo); +} + +/// A shared, cloneable sequence number generator. +pub type SharedSequenceNumberGenerator = Arc; + /// Thread-safe sequence number generator /// /// # Examples @@ -46,61 +154,142 @@ use std::sync::{ pub struct SequenceNumberCounter(Arc); impl SequenceNumberCounter { - /// Creates a new counter, setting it to some previous value + /// Creates a new counter, setting it to some previous value. + /// + /// # Panics + /// + /// Panics if `prev > MAX_SEQNO` (reserved MSB range). + /// + /// Note: `prev == MAX_SEQNO` is allowed but means the counter is + /// exhausted — any subsequent call to [`next()`](Self::next) will panic. #[must_use] pub fn new(prev: SeqNo) -> Self { + assert!( + prev <= MAX_SEQNO, + "Sequence number must not use the reserved MSB" + ); Self(Arc::new(AtomicU64::new(prev))) } - /// Gets the would-be-next sequence number, without incrementing the counter. + /// Allocates and returns the next sequence number, atomically + /// incrementing the internal counter. + /// + /// The returned value is the counter's value **before** the increment + /// (i.e., pre-increment / fetch-then-add semantics). + /// + /// # Panics + /// + /// Panics if the current value is already [`MAX_SEQNO`] (i.e., when + /// advancing past it would enter the reserved MSB range). + #[must_use] + pub fn next(&self) -> SeqNo { + ::next(self) + } + + /// Returns the current internal counter value without incrementing. /// - /// This should only be used when creating a snapshot. + /// This is the value that the next call to [`next()`](Self::next) will + /// return (and then advance past). #[must_use] pub fn get(&self) -> SeqNo { - self.0.load(Acquire) + ::get(self) } - /// Gets the next sequence number. - #[must_use] - #[allow(clippy::missing_panics_doc, reason = "we should never run out of u64s")] - pub fn next(&self) -> SeqNo { - let seqno = self.0.fetch_add(1, AcqRel); + /// Sets the sequence number to the given value. + /// + /// # Panics + /// + /// Panics if `value > MAX_SEQNO` (reserved MSB range). + pub fn set(&self, value: SeqNo) { + ::set(self, value); + } + + /// Atomically updates the sequence number to the maximum of + /// the current value and the provided value. + pub fn fetch_max(&self, value: SeqNo) { + ::fetch_max(self, value); + } +} - // The MSB is reserved for transactions. - // - // This gives us 63-bit sequence numbers technically. - assert!(seqno < 0x8000_0000_0000_0000, "Ran out of sequence numbers"); +impl SequenceNumberGenerator for SequenceNumberCounter { + fn next(&self) -> SeqNo { + // We use fetch_update (CAS loop) instead of fetch_add to ensure the + // internal counter never enters the reserved MSB range. With fetch_add, + // a caught panic (via catch_unwind, which the trait requires via + // UnwindSafe) would leave the counter permanently stuck at an invalid + // value. fetch_update only stores the new value on success. + match self.0.fetch_update(AcqRel, Acquire, |current| { + if current >= MAX_SEQNO { + None + } else { + Some(current + 1) + } + }) { + Ok(seqno) => seqno, + Err(current) => panic!("Ran out of sequence numbers (current: {current})"), + } + } - seqno + fn get(&self) -> SeqNo { + self.0.load(Acquire) } - /// Sets the sequence number. - pub fn set(&self, seqno: SeqNo) { - self.0.store(seqno, Release); + fn set(&self, value: SeqNo) { + assert!( + value <= MAX_SEQNO, + "Sequence number must not use the reserved MSB" + ); + self.0.store(value, Release); } - /// Maximizes the sequence number. - pub fn fetch_max(&self, seqno: SeqNo) { - self.0.fetch_max(seqno, AcqRel); + fn fetch_max(&self, value: SeqNo) { + // Clamp to the maximum allowed sequence number to avoid storing + // a value in the reserved MSB range. + let clamped = value.min(MAX_SEQNO); + self.0.fetch_max(clamped, AcqRel); + } +} + +// Orphan rules satisfied: SequenceNumberGenerator is a local trait, +// so Arc is covered by a local type parameter. +impl From for SharedSequenceNumberGenerator { + fn from(counter: SequenceNumberCounter) -> Self { + Arc::new(counter) } } #[cfg(test)] mod tests { + use super::MAX_SEQNO; use test_log::test; #[test] - fn not_max_seqno() { + fn next_below_max_returns_valid_seqno() { let counter = super::SequenceNumberCounter::default(); - counter.set(0x7FFF_FFFF_FFFF_FFFF); + counter.set(MAX_SEQNO - 1); let _ = counter.next(); } #[test] - #[should_panic = "Ran out of sequence numbers"] - fn max_seqno() { + #[should_panic(expected = "Ran out of sequence numbers")] + fn next_at_max_panics() { let counter = super::SequenceNumberCounter::default(); - counter.set(0x8000_0000_0000_0000); + counter.set(MAX_SEQNO); let _ = counter.next(); } + + #[test] + #[should_panic(expected = "Sequence number must not use the reserved MSB")] + fn set_reserved_range_panics() { + let counter = super::SequenceNumberCounter::default(); + counter.set(MAX_SEQNO + 1); + } + + #[test] + fn fetch_max_clamps_reserved_to_max() { + let counter = super::SequenceNumberCounter::default(); + counter.set(100); + counter.fetch_max(MAX_SEQNO + 1); + assert_eq!(counter.get(), MAX_SEQNO); + } } diff --git a/src/slice/slice_bytes/mod.rs b/src/slice/slice_bytes/mod.rs index 1872460cb..a229e3adb 100644 --- a/src/slice/slice_bytes/mod.rs +++ b/src/slice/slice_bytes/mod.rs @@ -26,6 +26,7 @@ impl Slice { } #[doc(hidden)] + #[must_use] pub unsafe fn builder_unzeroed(len: usize) -> Builder { // Use `with_capacity` & `set_len`` to avoid zeroing the buffer let mut builder = Builder::with_capacity(len); @@ -57,7 +58,15 @@ impl Slice { { let mut writer = &mut builder[..]; + #[expect( + clippy::expect_used, + reason = "writing into a pre-allocated buffer of exact size cannot fail" + )] writer.write_all(left).expect("should write"); + #[expect( + clippy::expect_used, + reason = "writing into a pre-allocated buffer of exact size cannot fail" + )] writer.write_all(right).expect("should write"); } diff --git a/src/table/block/mod.rs b/src/table/block/mod.rs index c7f8e3c66..fb42423b6 100644 --- a/src/table/block/mod.rs +++ b/src/table/block/mod.rs @@ -106,23 +106,21 @@ impl Block { #[cfg(feature = "lz4")] CompressionType::Lz4 => { - #[warn(unsafe_code)] - let mut builder = - unsafe { Slice::builder_unzeroed(header.uncompressed_length as usize) }; + // NOTE: size cap validation for uncompressed_length is in PR #7 + // (feat/#258-security-validate-uncompressedlength-before-decomp) + let mut buf = vec![0u8; header.uncompressed_length as usize]; - lz4_flex::decompress_into(&raw_data, &mut builder) + let bytes_written = lz4_flex::decompress_into(&raw_data, &mut buf) .map_err(|_| crate::Error::Decompress(compression))?; - builder.freeze().into() - } - }; + // Runtime validation: corrupted data may decompress to fewer bytes + if bytes_written != header.uncompressed_length as usize { + return Err(crate::Error::Decompress(compression)); + } - debug_assert_eq!(header.uncompressed_length, { - #[expect(clippy::cast_possible_truncation, reason = "values are u32 length max")] - { - data.len() as u32 + Slice::from(buf) } - }); + }; Ok(Self { header, data }) } @@ -167,14 +165,18 @@ impl Block { #[expect(clippy::indexing_slicing)] let raw_data = &buf[Header::serialized_len()..]; - #[warn(unsafe_code)] - let mut builder = - unsafe { Slice::builder_unzeroed(header.uncompressed_length as usize) }; + // NOTE: size cap validation for uncompressed_length is in PR #7 + let mut decompressed = vec![0u8; header.uncompressed_length as usize]; - lz4_flex::decompress_into(raw_data, &mut builder) + let bytes_written = lz4_flex::decompress_into(raw_data, &mut decompressed) .map_err(|_| crate::Error::Decompress(compression))?; - builder.freeze().into() + // Runtime validation: corrupted data may decompress to fewer bytes + if bytes_written != header.uncompressed_length as usize { + return Err(crate::Error::Decompress(compression)); + } + + Slice::from(decompressed) } }; @@ -228,4 +230,42 @@ mod tests { Ok(()) } + + #[test] + #[cfg(feature = "lz4")] + fn lz4_corrupted_uncompressed_length_triggers_decompress_error() { + use crate::coding::Encode; + use std::io::Cursor; + + let payload: &[u8] = b"hello world"; + + // Compress with lz4 using the block format + let compressed = lz4_flex::compress(payload); + + // Build a header with corrupted uncompressed_length (1 byte too large) + let data_length = compressed.len() as u32; + let uncompressed_length_correct = payload.len() as u32; + let uncompressed_length_corrupted = uncompressed_length_correct + 1; + + let checksum = Checksum::from_raw(crate::hash::hash128(&compressed)); + + let header = Header { + data_length, + uncompressed_length: uncompressed_length_corrupted, + checksum, + block_type: BlockType::Data, + }; + + let mut buf = header.encode_into_vec(); + buf.extend_from_slice(&compressed); + + let mut cursor = Cursor::new(buf); + let result = Block::from_reader(&mut cursor, CompressionType::Lz4); + + match result { + Err(crate::Error::Decompress(CompressionType::Lz4)) => { /* expected */ } + Ok(_) => panic!("expected Error::Decompress, but got Ok(Block)"), + Err(other) => panic!("expected Error::Decompress, got different error: {other:?}"), + } + } } diff --git a/src/table/data_block/iter.rs b/src/table/data_block/iter.rs index e8c605ba4..c8b4ba663 100644 --- a/src/table/data_block/iter.rs +++ b/src/table/data_block/iter.rs @@ -8,7 +8,7 @@ use crate::{ block::{Decoder, ParsedItem}, data_block::DataBlockParsedItem, }, - InternalValue, + InternalValue, SeqNo, }; /// The data block iterator handles double-ended scans over a data block @@ -34,15 +34,31 @@ impl<'a> Iter<'a> { true } - pub fn seek(&mut self, needle: &[u8]) -> bool { - // Find the restart interval whose head key is the last one strictly below `needle`. - // The decoder then performs a linear scan within that interval; we stop as soon as we - // reach a key ≥ needle. This minimizes parsing work while preserving correctness. - if !self - .decoder - .inner_mut() - .seek(|head_key, _| head_key < needle, false) - { + /// Seeks to the last restart interval whose head key is strictly below the + /// target `needle`, or equal to it with a seqno that is at least the given + /// snapshot boundary. + /// + /// Here `seqno` is a snapshot boundary: point reads return the first item + /// with `item.seqno < seqno`. Using the internal key ordering + /// (`user_key` ASC, `seqno` DESC), this skips restart intervals that can only + /// contain versions newer than the snapshot, so any visible version for + /// `needle` will be found within roughly one restart interval of the + /// resulting position. + pub fn seek_to_key_seqno(&mut self, needle: &[u8], seqno: SeqNo) -> bool { + self.decoder.inner_mut().seek( + |head_key, head_seqno| match head_key.cmp(needle) { + std::cmp::Ordering::Less => true, + std::cmp::Ordering::Equal => head_seqno >= seqno, + std::cmp::Ordering::Greater => false, + }, + false, + ) + } + + pub fn seek(&mut self, needle: &[u8], seqno: SeqNo) -> bool { + // Reuse the seqno-aware binary search from `seek_to_key_seqno`, then + // follow up with a linear scan to position at the exact key. + if !self.seek_to_key_seqno(needle, seqno) { return false; } @@ -75,9 +91,14 @@ impl<'a> Iter<'a> { } } - pub fn seek_upper(&mut self, needle: &[u8]) -> bool { - // Reverse-bound seek: position the high scanner at the first restart whose head key is - // ≤ needle, then walk backwards inside the interval until we find a key ≤ needle. + /// Reverse inclusive seek: position at the last key `<= needle`. + /// + /// `seqno` is accepted for API uniformity with the forward seek methods + /// ([`seek`], [`seek_exclusive`]) but is **intentionally unused** here. + /// Backward binary search cannot leverage seqno because intervals are + /// visited from the selected one toward index 0 — a tighter predicate + /// would skip intervals that may contain the visible version. + pub fn seek_upper(&mut self, needle: &[u8], _seqno: SeqNo) -> bool { if !self .decoder .inner_mut() @@ -112,15 +133,10 @@ impl<'a> Iter<'a> { } } - pub fn seek_exclusive(&mut self, needle: &[u8]) -> bool { - // Exclusive lower bound: identical to `seek`, except we must not yield entries equal to - // `needle`. We therefore keep consuming while keys compare equal and only stop once we - // observe a strictly greater key. - if !self - .decoder - .inner_mut() - .seek(|head_key, _| head_key < needle, false) - { + pub fn seek_exclusive(&mut self, needle: &[u8], seqno: SeqNo) -> bool { + // Exclusive lower bound: same seqno-aware binary search, but the linear + // scan below skips entries equal to `needle`. + if !self.seek_to_key_seqno(needle, seqno) { return false; } @@ -144,9 +160,11 @@ impl<'a> Iter<'a> { } } - pub fn seek_upper_exclusive(&mut self, needle: &[u8]) -> bool { - // Exclusive upper bound: mirror of `seek_upper`. We must not include entries equal to - // `needle`, so we consume equals from the high end until we see a strictly smaller key. + /// Reverse exclusive seek: position at the last key `< needle`. + /// + /// See [`seek_upper`] for why `seqno` is accepted but unused in reverse + /// seeks. + pub fn seek_upper_exclusive(&mut self, needle: &[u8], _seqno: SeqNo) -> bool { if !self .decoder .inner_mut() diff --git a/src/table/data_block/iter_test.rs b/src/table/data_block/iter_test.rs index 8ff8fcdc0..1086d7b65 100644 --- a/src/table/data_block/iter_test.rs +++ b/src/table/data_block/iter_test.rs @@ -5,7 +5,7 @@ mod tests { block::{BlockType, Header, ParsedItem}, Block, DataBlock, }, - Checksum, InternalValue, Slice, + Checksum, InternalValue, SeqNo, Slice, ValueType::{Tombstone, Value}, }; use test_log::test; @@ -71,8 +71,8 @@ mod tests { { let mut iter = data_block.iter(); - iter.seek(&10u64.to_be_bytes()); - iter.seek_upper(&110u64.to_be_bytes()); + iter.seek(&10u64.to_be_bytes(), SeqNo::MAX); + iter.seek_upper(&110u64.to_be_bytes(), SeqNo::MAX); let iter = iter.map(|x| x.materialize(data_block.as_slice())); assert_eq!( @@ -83,8 +83,8 @@ mod tests { { let mut iter: crate::table::data_block::Iter<'_> = data_block.iter(); - iter.seek(&10u64.to_be_bytes()); - iter.seek_upper(&110u64.to_be_bytes()); + iter.seek(&10u64.to_be_bytes(), SeqNo::MAX); + iter.seek_upper(&110u64.to_be_bytes(), SeqNo::MAX); let iter = iter.map(|x| x.materialize(data_block.as_slice())); assert_eq!( @@ -95,8 +95,8 @@ mod tests { { let mut iter = data_block.iter(); - iter.seek(&10u64.to_be_bytes()); - iter.seek_upper(&110u64.to_be_bytes()); + iter.seek(&10u64.to_be_bytes(), SeqNo::MAX); + iter.seek_upper(&110u64.to_be_bytes(), SeqNo::MAX); let mut iter = iter.map(|item| item.materialize(&data_block.inner.data)); let mut count = 0; @@ -145,8 +145,8 @@ mod tests { { let mut iter = data_block.iter(); - iter.seek(&10u64.to_be_bytes()); - iter.seek_upper(&109u64.to_be_bytes()); + iter.seek(&10u64.to_be_bytes(), SeqNo::MAX); + iter.seek_upper(&109u64.to_be_bytes(), SeqNo::MAX); let iter = iter.map(|x| x.materialize(data_block.as_slice())); assert_eq!( @@ -157,8 +157,8 @@ mod tests { { let mut iter: crate::table::data_block::Iter<'_> = data_block.iter(); - iter.seek(&10u64.to_be_bytes()); - iter.seek_upper(&109u64.to_be_bytes()); + iter.seek(&10u64.to_be_bytes(), SeqNo::MAX); + iter.seek_upper(&109u64.to_be_bytes(), SeqNo::MAX); let iter = iter.map(|x| x.materialize(data_block.as_slice())); assert_eq!( @@ -169,8 +169,8 @@ mod tests { { let mut iter = data_block.iter(); - iter.seek(&10u64.to_be_bytes()); - iter.seek_upper(&109u64.to_be_bytes()); + iter.seek(&10u64.to_be_bytes(), SeqNo::MAX); + iter.seek_upper(&109u64.to_be_bytes(), SeqNo::MAX); let mut iter = iter.map(|item| item.materialize(&data_block.inner.data)); let mut count = 0; @@ -218,8 +218,8 @@ mod tests { }); let mut iter = data_block.iter(); - iter.seek(&5u64.to_be_bytes()); - iter.seek_upper(&9u64.to_be_bytes()); + iter.seek(&5u64.to_be_bytes(), SeqNo::MAX); + iter.seek_upper(&9u64.to_be_bytes(), SeqNo::MAX); let mut iter = iter.map(|item| item.materialize(&data_block.inner.data)); let mut count = 0; @@ -345,7 +345,7 @@ mod tests { let mut iter = data_block.iter(); - assert!(iter.seek_upper(b"d"), "should seek"); + assert!(iter.seek_upper(b"d", SeqNo::MAX), "should seek"); let iter = iter.map(|item| item.materialize(&data_block.inner.data)); @@ -386,7 +386,7 @@ mod tests { { let mut iter = data_block.iter(); - assert!(!iter.seek(b"a"), "should not seek"); + assert!(!iter.seek(b"a", SeqNo::MAX), "should not seek"); let iter = iter.map(|item| item.materialize(&data_block.inner.data)); @@ -398,7 +398,7 @@ mod tests { { let mut iter = data_block.iter(); - assert!(!iter.seek_upper(b"g"), "should not seek"); + assert!(!iter.seek_upper(b"g", SeqNo::MAX), "should not seek"); let iter = iter.map(|item| item.materialize(&data_block.inner.data)); @@ -410,7 +410,7 @@ mod tests { { let mut iter = data_block.iter(); - assert!(iter.seek_upper(b"b"), "should seek"); + assert!(iter.seek_upper(b"b", SeqNo::MAX), "should seek"); let iter = iter.map(|item| item.materialize(&data_block.inner.data)); @@ -425,7 +425,7 @@ mod tests { { let mut iter = data_block.iter(); - assert!(iter.seek(b"f"), "should seek"); + assert!(iter.seek(b"f", SeqNo::MAX), "should seek"); let iter = iter.map(|item| item.materialize(&data_block.inner.data)); @@ -466,8 +466,8 @@ mod tests { let mut iter = data_block.iter(); - assert!(iter.seek(b"c"), "should seek"); - assert!(iter.seek_upper(b"d"), "should seek"); + assert!(iter.seek(b"c", SeqNo::MAX), "should seek"); + assert!(iter.seek_upper(b"d", SeqNo::MAX), "should seek"); let iter = iter.map(|item| item.materialize(&data_block.inner.data)); @@ -507,7 +507,7 @@ mod tests { let mut iter = data_block.iter(); - assert!(iter.seek_upper(b"b"), "should seek"); + assert!(iter.seek_upper(b"b", SeqNo::MAX), "should seek"); let iter = iter.map(|item| item.materialize(&data_block.inner.data)); @@ -548,8 +548,8 @@ mod tests { { let mut iter = data_block.iter(); - assert!(iter.seek(b"d"), "should seek"); - assert!(iter.seek_upper(b"d"), "should seek"); + assert!(iter.seek(b"d", SeqNo::MAX), "should seek"); + assert!(iter.seek_upper(b"d", SeqNo::MAX), "should seek"); let iter = iter.map(|item| item.materialize(&data_block.inner.data)); @@ -564,8 +564,8 @@ mod tests { { let mut iter = data_block.iter(); - assert!(iter.seek_upper(b"d"), "should seek"); - assert!(iter.seek(b"d"), "should seek"); + assert!(iter.seek_upper(b"d", SeqNo::MAX), "should seek"); + assert!(iter.seek(b"d", SeqNo::MAX), "should seek"); let iter = iter.map(|item| item.materialize(&data_block.inner.data)); @@ -580,8 +580,8 @@ mod tests { { let mut iter = data_block.iter(); - assert!(iter.seek(b"d"), "should seek"); - assert!(iter.seek_upper(b"d"), "should seek"); + assert!(iter.seek(b"d", SeqNo::MAX), "should seek"); + assert!(iter.seek_upper(b"d", SeqNo::MAX), "should seek"); let iter = iter.map(|item| item.materialize(&data_block.inner.data)); @@ -602,8 +602,8 @@ mod tests { { let mut iter = data_block.iter(); - assert!(iter.seek_upper(b"d"), "should seek"); - assert!(iter.seek(b"d"), "should seek"); + assert!(iter.seek_upper(b"d", SeqNo::MAX), "should seek"); + assert!(iter.seek(b"d", SeqNo::MAX), "should seek"); let iter = iter.map(|item| item.materialize(&data_block.inner.data)); @@ -651,8 +651,8 @@ mod tests { { let mut iter = data_block.iter(); - assert!(iter.seek(b"f"), "should seek"); - iter.seek_upper(b"e"); + assert!(iter.seek(b"f", SeqNo::MAX), "should seek"); + iter.seek_upper(b"e", SeqNo::MAX); let mut iter = iter.map(|item| item.materialize(&data_block.inner.data)); @@ -662,8 +662,8 @@ mod tests { { let mut iter = data_block.iter(); - assert!(iter.seek(b"f"), "should seek"); - iter.seek_upper(b"e"); + assert!(iter.seek(b"f", SeqNo::MAX), "should seek"); + iter.seek_upper(b"e", SeqNo::MAX); let mut iter = iter.map(|item| item.materialize(&data_block.inner.data)); @@ -673,8 +673,8 @@ mod tests { { let mut iter = data_block.iter(); - assert!(iter.seek_upper(b"e"), "should seek"); - iter.seek(b"f"); + assert!(iter.seek_upper(b"e", SeqNo::MAX), "should seek"); + iter.seek(b"f", SeqNo::MAX); let mut iter = iter.map(|item| item.materialize(&data_block.inner.data)); @@ -684,8 +684,8 @@ mod tests { { let mut iter = data_block.iter(); - assert!(iter.seek_upper(b"e"), "should seek"); - iter.seek(b"f"); + assert!(iter.seek_upper(b"e", SeqNo::MAX), "should seek"); + iter.seek(b"f", SeqNo::MAX); let mut iter = iter.map(|item| item.materialize(&data_block.inner.data)); @@ -721,7 +721,7 @@ mod tests { let mut iter = data_block.iter(); - assert!(iter.seek(b"b"), "should seek correctly"); + assert!(iter.seek(b"b", SeqNo::MAX), "should seek correctly"); let iter = iter.map(|item| item.materialize(&data_block.inner.data)); @@ -758,7 +758,7 @@ mod tests { let mut iter = data_block.iter(); - assert!(iter.seek(b"d"), "should seek correctly"); + assert!(iter.seek(b"d", SeqNo::MAX), "should seek correctly"); let iter = iter.map(|item| item.materialize(&data_block.inner.data)); @@ -798,7 +798,7 @@ mod tests { let mut iter = data_block.iter(); - assert!(iter.seek(b"f"), "should seek correctly"); + assert!(iter.seek(b"f", SeqNo::MAX), "should seek correctly"); let iter = iter.map(|item| item.materialize(&data_block.inner.data)); @@ -838,7 +838,7 @@ mod tests { let mut iter = data_block.iter(); - assert!(!iter.seek(b"a"), "should not find exact match"); + assert!(!iter.seek(b"a", SeqNo::MAX), "should not find exact match"); let iter = iter.map(|item| item.materialize(&data_block.inner.data)); @@ -875,7 +875,7 @@ mod tests { let mut iter = data_block.iter(); - assert!(!iter.seek(b"g"), "should not find exact match"); + assert!(!iter.seek(b"g", SeqNo::MAX), "should not find exact match"); assert!(iter.next().is_none(), "should not collect any items"); } @@ -1270,11 +1270,151 @@ mod tests { assert_eq!(data_block.iter().count(), items.len()); let mut iter = data_block.iter(); - iter.seek(&[0]); - iter.seek_upper(&[0]); + iter.seek(&[0], SeqNo::MAX); + iter.seek_upper(&[0], SeqNo::MAX); assert_eq!(0, iter.count()); Ok(()) } + + /// Verifies that `seek(needle, seqno)` with a seqno-aware predicate still + /// positions the iterator correctly when a key has many versions spanning + /// multiple restart intervals. + #[test] + fn data_block_seek_seqno_aware() -> crate::Result<()> { + // Build a block where key "b" has 10 versions (seqno 10..1) with + // restart_interval=2, so versions span 5 restart intervals. + let mut items = Vec::new(); + for seqno in (1..=10).rev() { + items.push(InternalValue::from_components(b"b", b"", seqno, Value)); + } + + for restart_interval in [1, 2, 3, 5] { + let bytes = DataBlock::encode_into_vec(&items, restart_interval, 0.0)?; + let data_block = DataBlock::new(Block { + data: bytes.into(), + header: Header { + block_type: BlockType::Data, + checksum: Checksum::from_raw(0), + data_length: 0, + uncompressed_length: 0, + }, + }); + + // With SeqNo::MAX, seek behaves like key-only (no seqno filtering). + { + let mut iter = data_block.iter(); + assert!( + iter.seek(b"b", SeqNo::MAX), + "should find key with MAX seqno" + ); + let entry = iter.next().expect("should have entry"); + let materialized = entry.materialize(&data_block.inner.data); + assert_eq!(materialized.key.user_key.as_ref(), b"b"); + // First version returned is the newest (seqno 10). + assert_eq!(materialized.key.seqno, 10); + } + + // With a specific snapshot seqno, the binary search lands on the + // restart interval containing (or nearest to) the target seqno. + // The first entry returned is the head of that interval. + { + let mut iter = data_block.iter(); + assert!(iter.seek(b"b", 5), "should find key with snapshot seqno 5"); + let entry = iter.next().expect("should have entry"); + let materialized = entry.materialize(&data_block.inner.data); + assert_eq!(materialized.key.user_key.as_ref(), b"b"); + // The landing entry's seqno must be >= the snapshot boundary, + // proving the seqno-aware predicate skipped past older intervals. + assert!( + materialized.key.seqno >= 5, + "restart_interval={restart_interval}: landing seqno {} should be >= snapshot 5", + materialized.key.seqno, + ); + // With restart_interval=1 each entry is its own interval, so + // the predicate lands exactly on the target seqno — a key-only + // seek would land on seqno 10 instead. + if restart_interval == 1 { + assert_eq!( + materialized.key.seqno, 5, + "with restart_interval=1, seqno-aware seek must land exactly on target" + ); + } + } + } + + Ok(()) + } + + /// Verifies that `seek` with seqno still works correctly when the block + /// contains multiple distinct keys with versions. + #[test] + fn data_block_seek_seqno_aware_mixed_keys() -> crate::Result<()> { + let items = vec![ + InternalValue::from_components(b"a", b"", 10, Value), + InternalValue::from_components(b"a", b"", 5, Value), + InternalValue::from_components(b"b", b"", 10, Value), + InternalValue::from_components(b"b", b"", 7, Value), + InternalValue::from_components(b"b", b"", 3, Value), + InternalValue::from_components(b"c", b"", 10, Value), + ]; + + for restart_interval in [1, 2, 3] { + let bytes = DataBlock::encode_into_vec(&items, restart_interval, 0.0)?; + let data_block = DataBlock::new(Block { + data: bytes.into(), + header: Header { + block_type: BlockType::Data, + checksum: Checksum::from_raw(0), + data_length: 0, + uncompressed_length: 0, + }, + }); + + // Forward seek with seqno narrows restart interval selection. + { + let mut iter = data_block.iter(); + assert!(iter.seek(b"b", 5), "should find b at snapshot 5"); + let entry = iter.next().expect("should have entry"); + let mat = entry.materialize(&data_block.inner.data); + assert_eq!(mat.key.user_key.as_ref(), b"b"); + // Landing seqno must be >= snapshot boundary. + assert!( + mat.key.seqno >= 5, + "restart_interval={restart_interval}: seqno {} should be >= 5", + mat.key.seqno, + ); + // With restart_interval=1, seqno-aware seek lands on (b,7) — + // the last head with seqno >= 5 — whereas key-only would land + // on (b,10). + if restart_interval == 1 { + assert_eq!(mat.key.seqno, 7); + } + } + + // Exclusive forward seek with seqno. + { + let mut iter = data_block.iter(); + assert!( + iter.seek_exclusive(b"b", 5), + "should find entry > b at snapshot 5" + ); + let entry = iter.next().expect("should have entry"); + let mat = entry.materialize(&data_block.inner.data); + assert_eq!(mat.key.user_key.as_ref(), b"c"); + } + + // Upper seek still works with seqno (predicate unchanged for backward). + { + let mut iter = data_block.iter(); + assert!(iter.seek_upper(b"b", 5), "should find upper bound b"); + let entry = iter.next_back().expect("should have entry"); + let mat = entry.materialize(&data_block.inner.data); + assert_eq!(mat.key.user_key.as_ref(), b"b"); + } + } + + Ok(()) + } } diff --git a/src/table/data_block/mod.rs b/src/table/data_block/mod.rs index 7104ccfaa..18da4451c 100644 --- a/src/table/data_block/mod.rs +++ b/src/table/data_block/mod.rs @@ -407,7 +407,6 @@ impl DataBlock { .map(|reader| reader.bucket_count()) } - // TODO: handle seqno more nicely (make Key generic, so we can do binary search over (key, seqno)) #[must_use] pub fn point_read(&self, needle: &[u8], seqno: SeqNo) -> Option { let iter = if let Some(hash_index_reader) = self.get_hash_index_reader() { @@ -416,10 +415,10 @@ impl DataBlock { return None; } MARKER_CONFLICT => { - // NOTE: Fallback to binary search + // NOTE: Fallback to seqno-aware binary search let mut iter = self.iter(); - if !iter.seek(needle) { + if !iter.seek_to_key_seqno(needle, seqno) { return None; } @@ -437,8 +436,9 @@ impl DataBlock { } else { let mut iter = self.iter(); - // NOTE: Fallback to binary search - if !iter.seek(needle) { + // NOTE: Seqno-aware binary search reduces linear scanning by skipping most + // restart intervals that contain only versions newer than the target seqno + if !iter.seek_to_key_seqno(needle, seqno) { return None; } @@ -449,14 +449,14 @@ impl DataBlock { for item in iter { match item.compare_key(needle, &self.inner.data) { std::cmp::Ordering::Greater => { - // We are before our searched key/seqno + // We are past our searched key return None; } std::cmp::Ordering::Equal => { // If key is same as needle, check sequence number } std::cmp::Ordering::Less => { - // We are past our searched key + // We are before our searched key continue; } } @@ -1233,4 +1233,122 @@ mod tests { Ok(()) } + + #[test] + fn data_block_point_read_seqno_aware_seek() -> crate::Result<()> { + // Key "a" with seqno 5,4,3,2,1 — point_read("a", seqno=3) + // returns the first version with seqno < 3, i.e., v2 ("a2") + let items = [ + InternalValue::from_components(b"a", b"a5", 5, Value), + InternalValue::from_components(b"a", b"a4", 4, Value), + InternalValue::from_components(b"a", b"a3", 3, Value), + InternalValue::from_components(b"a", b"a2", 2, Value), + InternalValue::from_components(b"a", b"a1", 1, Value), + ]; + + // Test across various restart intervals: at restart_interval=1 every item + // is a restart head so binary search lands exactly; at larger intervals it + // may scan within the restart range but must still return the correct version. + for restart_interval in 1..=4 { + let bytes = DataBlock::encode_into_vec(&items, restart_interval, 0.0)?; + + let data_block = DataBlock::new(Block { + data: bytes.into(), + header: Header { + block_type: BlockType::Data, + checksum: Checksum::from_raw(0), + data_length: 0, + uncompressed_length: 0, + }, + }); + + // seqno=4 → should see version with seqno=3 (first with seqno < 4) + assert_eq!( + Some(items[2].clone()), + data_block.point_read(b"a", 4), + "restart_interval={restart_interval}: seqno=4 should return v3", + ); + + // seqno=3 → should see version with seqno=2 + assert_eq!( + Some(items[3].clone()), + data_block.point_read(b"a", 3), + "restart_interval={restart_interval}: seqno=3 should return v2", + ); + + // seqno=6 → should see latest version (seqno=5) + assert_eq!( + Some(items[0].clone()), + data_block.point_read(b"a", 6), + "restart_interval={restart_interval}: seqno=6 should return v5", + ); + + // seqno=1 → no visible version (all seqno >= 1) + assert!( + data_block.point_read(b"a", 1).is_none(), + "restart_interval={restart_interval}: seqno=1 should return None", + ); + + // Non-existent key + assert!( + data_block.point_read(b"b", SeqNo::MAX).is_none(), + "restart_interval={restart_interval}: key 'b' should not exist", + ); + } + + Ok(()) + } + + #[test] + fn data_block_point_read_seqno_aware_seek_mixed_keys() -> crate::Result<()> { + // Multiple keys with multiple versions + let items = [ + InternalValue::from_components(b"a", b"a3", 3, Value), + InternalValue::from_components(b"a", b"a2", 2, Value), + InternalValue::from_components(b"a", b"a1", 1, Value), + InternalValue::from_components(b"b", b"b5", 5, Value), + InternalValue::from_components(b"b", b"b4", 4, Value), + InternalValue::from_components(b"b", b"b3", 3, Value), + InternalValue::from_components(b"b", b"b2", 2, Value), + InternalValue::from_components(b"b", b"b1", 1, Value), + InternalValue::from_components(b"c", b"c1", 1, Value), + ]; + + for restart_interval in 1..=4 { + let bytes = DataBlock::encode_into_vec(&items, restart_interval, 0.0)?; + + let data_block = DataBlock::new(Block { + data: bytes.into(), + header: Header { + block_type: BlockType::Data, + checksum: Checksum::from_raw(0), + data_length: 0, + uncompressed_length: 0, + }, + }); + + // Read "b" at seqno=4 → should return version with seqno=3 + assert_eq!( + Some(items[5].clone()), + data_block.point_read(b"b", 4), + "restart_interval={restart_interval}: b@4 should return b3", + ); + + // Read "a" at seqno=2 → should return version with seqno=1 + assert_eq!( + Some(items[2].clone()), + data_block.point_read(b"a", 2), + "restart_interval={restart_interval}: a@2 should return a1", + ); + + // Read "c" at seqno=2 → should return version with seqno=1 + assert_eq!( + Some(items[8].clone()), + data_block.point_read(b"c", 2), + "restart_interval={restart_interval}: c@2 should return c1", + ); + } + + Ok(()) + } } diff --git a/src/table/filter/mod.rs b/src/table/filter/mod.rs index fdff471e4..89d829174 100644 --- a/src/table/filter/mod.rs +++ b/src/table/filter/mod.rs @@ -43,9 +43,15 @@ impl BloomConstructionPolicy { /// Returns the estimated filter size in bytes. #[must_use] pub fn estimated_filter_size(&self, n: usize) -> usize { + if n == 0 { + return 0; + } + #[expect( clippy::cast_precision_loss, - reason = "truncation is fine because this is an estimation" + clippy::cast_possible_truncation, + clippy::cast_sign_loss, + reason = "truncation and precision loss are fine because this is an estimation" )] match self { Self::BitsPerKey(bpk) => (*bpk * (n as f32)) as usize / 8, diff --git a/src/table/iter.rs b/src/table/iter.rs index b03b69da1..073a3d31d 100644 --- a/src/table/iter.rs +++ b/src/table/iter.rs @@ -37,20 +37,20 @@ self_cell!( ); impl OwnedDataBlockIter { - fn seek_lower_inclusive(&mut self, needle: &[u8], _seqno: SeqNo) -> bool { - self.with_dependent_mut(|_, m| m.seek(needle /* TODO: , seqno */)) + fn seek_lower_inclusive(&mut self, needle: &[u8], seqno: SeqNo) -> bool { + self.with_dependent_mut(|_, m| m.seek(needle, seqno)) } - fn seek_upper_inclusive(&mut self, needle: &[u8], _seqno: SeqNo) -> bool { - self.with_dependent_mut(|_, m| m.seek_upper(needle /* TODO: , seqno */)) + fn seek_upper_inclusive(&mut self, needle: &[u8], seqno: SeqNo) -> bool { + self.with_dependent_mut(|_, m| m.seek_upper(needle, seqno)) } - fn seek_lower_exclusive(&mut self, needle: &[u8], _seqno: SeqNo) -> bool { - self.with_dependent_mut(|_, m| m.seek_exclusive(needle /* TODO: , seqno */)) + fn seek_lower_exclusive(&mut self, needle: &[u8], seqno: SeqNo) -> bool { + self.with_dependent_mut(|_, m| m.seek_exclusive(needle, seqno)) } - fn seek_upper_exclusive(&mut self, needle: &[u8], _seqno: SeqNo) -> bool { - self.with_dependent_mut(|_, m| m.seek_upper_exclusive(needle /* TODO: , seqno */)) + fn seek_upper_exclusive(&mut self, needle: &[u8], seqno: SeqNo) -> bool { + self.with_dependent_mut(|_, m| m.seek_upper_exclusive(needle, seqno)) } pub fn seek_lower_bound(&mut self, bound: &Bound, seqno: SeqNo) -> bool { @@ -119,6 +119,10 @@ pub struct Iter { } impl Iter { + #[expect( + clippy::too_many_arguments, + reason = "iterator requires full context for block loading" + )] pub fn new( table_id: GlobalTableId, global_seqno: SeqNo, diff --git a/src/table/mod.rs b/src/table/mod.rs index 42e6bc7dc..10ac8a2eb 100644 --- a/src/table/mod.rs +++ b/src/table/mod.rs @@ -440,10 +440,10 @@ impl Table { } /// Tries to recover a table from a file. - #[warn( + #[expect( clippy::too_many_arguments, clippy::too_many_lines, - reason = "TODO: refactor" + reason = "recovery requires many context parameters and is inherently complex" )] pub fn recover( file_path: PathBuf, @@ -617,7 +617,15 @@ impl Table { self.metadata.key_range.overlaps_with_bounds(bounds) } - /// Returns the highest sequence number in the table. + /// Returns the highest effective sequence number in the table. + /// + /// For tables produced by flush/compaction (`global_seqno == 0`), this + /// returns the highest item seqno directly. + /// + /// For tables produced by bulk ingestion (`global_seqno > 0`), items + /// are written with local seqno 0 and the table carries a global offset. + /// The effective seqno of each item is `global_seqno + local_seqno`, + /// which mirrors the translation in [`Table::get`]. #[must_use] pub fn get_highest_seqno(&self) -> SeqNo { self.metadata.seqnos.1 + self.global_seqno() diff --git a/src/table/util.rs b/src/table/util.rs index 45862d152..a85b31e2e 100644 --- a/src/table/util.rs +++ b/src/table/util.rs @@ -28,7 +28,10 @@ pub struct SliceIndexes(pub usize, pub usize); /// Loads a block from disk or block cache, if cached. /// /// Also handles file descriptor opening and caching. -#[warn(clippy::too_many_arguments)] +#[expect( + clippy::too_many_arguments, + reason = "block loading requires many context parameters" +)] pub fn load_block( table_id: GlobalTableId, path: &Path, @@ -56,7 +59,6 @@ pub fn load_block( BlockType::Data | BlockType::Meta => { metrics.data_block_load_cached.fetch_add(1, Relaxed); } - _ => {} } return Ok(block); @@ -108,7 +110,6 @@ pub fn load_block( .data_block_io_requested .fetch_add(handle.size().into(), Relaxed); } - _ => {} } // Cache FD diff --git a/src/tree/mod.rs b/src/tree/mod.rs index 453e9891c..e63b61031 100644 --- a/src/tree/mod.rs +++ b/src/tree/mod.rs @@ -645,6 +645,23 @@ impl AbstractTree for Tree { .map(|x| x.value)) } + fn multi_get>( + &self, + keys: impl IntoIterator, + seqno: SeqNo, + ) -> crate::Result>> { + let super_version = self.get_version_for_snapshot(seqno); + + keys.into_iter() + .map(|key| { + Ok( + Self::get_internal_entry_from_version(&super_version, key.as_ref(), seqno)? + .map(|x| x.value), + ) + }) + .collect() + } + fn insert, V: Into>( &self, key: K, @@ -1012,6 +1029,10 @@ impl Tree { } /// Recovers the level manifest, loading all tables from disk. + #[expect( + clippy::too_many_lines, + reason = "recovery logic is inherently complex" + )] fn recover_levels>( tree_path: P, tree_id: TreeId, diff --git a/src/verify.rs b/src/verify.rs new file mode 100644 index 000000000..7ff68a385 --- /dev/null +++ b/src/verify.rs @@ -0,0 +1,217 @@ +// Copyright (c) 2024-present, fjall-rs +// This source code is licensed under both the Apache 2.0 and MIT License +// (found in the LICENSE-* files in the repository) + +use crate::{checksum::Checksum, table::TableId}; +use std::path::PathBuf; + +/// Describes a single integrity error found during verification. +#[derive(Debug)] +#[non_exhaustive] +pub enum IntegrityError { + /// Full-file checksum mismatch for an SST table. + SstFileCorrupted { + /// Table ID + table_id: TableId, + /// Path to the corrupted file + path: PathBuf, + /// Checksum stored in the manifest + expected: Checksum, + /// Checksum computed from disk + got: Checksum, + }, + + /// Full-file checksum mismatch for a blob file. + BlobFileCorrupted { + /// Blob file ID + blob_file_id: u64, + /// Path to the corrupted file + path: PathBuf, + /// Checksum stored in the manifest + expected: Checksum, + /// Checksum computed from disk + got: Checksum, + }, + + /// I/O error while reading a file during verification. + IoError { + /// Path to the file that could not be read + path: PathBuf, + /// The underlying I/O error + error: std::io::Error, + }, +} + +impl std::fmt::Display for IntegrityError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::SstFileCorrupted { + table_id, + path, + expected, + got, + } => write!( + f, + "SST table {table_id} corrupted at {}: expected {expected}, got {got}", + path.display() + ), + Self::BlobFileCorrupted { + blob_file_id, + path, + expected, + got, + } => write!( + f, + "blob file {blob_file_id} corrupted at {}: expected {expected}, got {got}", + path.display() + ), + Self::IoError { path, error } => { + write!(f, "I/O error reading {}: {}", path.display(), error) + } + } + } +} + +impl std::error::Error for IntegrityError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + match self { + Self::IoError { error, .. } => Some(error), + _ => None, + } + } +} + +/// Result of an integrity verification scan. +/// +/// The `sst_files_checked` and `blob_files_checked` counters reflect +/// the number of files *attempted* — including those that produced I/O +/// errors. This lets callers reconcile the total against the manifest +/// even when some files were unreadable. +#[derive(Debug)] +#[non_exhaustive] +pub struct IntegrityReport { + /// Number of SST table files checked (includes I/O errors). + pub sst_files_checked: usize, + + /// Number of blob files checked (includes I/O errors). + pub blob_files_checked: usize, + + /// Integrity errors found during verification. + pub errors: Vec, +} + +impl IntegrityReport { + /// Returns `true` if no errors were found. + #[must_use] + pub fn is_ok(&self) -> bool { + self.errors.is_empty() + } + + /// Total number of files checked (SST + blob). + #[must_use] + pub fn files_checked(&self) -> usize { + self.sst_files_checked + self.blob_files_checked + } +} + +/// Computes a streaming XXH3 128-bit checksum for a file without loading it entirely into memory. +fn stream_checksum(path: &std::path::Path) -> std::io::Result { + use std::io::Read; + + let mut reader = std::fs::File::open(path)?; + let mut hasher = xxhash_rust::xxh3::Xxh3Default::new(); + let mut buf = vec![0u8; 64 * 1024]; + + loop { + let n = match reader.read(&mut buf) { + Ok(n) => n, + Err(e) if e.kind() == std::io::ErrorKind::Interrupted => continue, + Err(e) => return Err(e), + }; + if n == 0 { + break; + } + // Safety: Read::read guarantees n <= buf.len(), so get(..n) always + // returns Some. We use .get() instead of direct indexing to satisfy + // the crate-wide #[deny(clippy::indexing_slicing)] lint. + if let Some(chunk) = buf.get(..n) { + hasher.update(chunk); + } + } + + Ok(Checksum::from_raw(hasher.digest128())) +} + +/// Verifies full-file checksums for all SST and blob files in the given tree. +/// +/// Each file's content is read from disk and hashed with XXHash-3 128-bit, +/// then compared against the checksum stored in the version manifest. +/// +/// This detects silent bit-rot, partial writes, and other on-disk corruption. +/// +/// Per-file errors (e.g., unreadable files, checksum mismatches) are collected +/// into [`IntegrityReport::errors`] — the scan always runs to completion. +#[must_use] +pub fn verify_integrity(tree: &impl crate::AbstractTree) -> IntegrityReport { + let version = tree.current_version(); + + let mut report = IntegrityReport { + sst_files_checked: 0, + blob_files_checked: 0, + errors: Vec::new(), + }; + + // Verify all SST table files + for table in version.iter_tables() { + let path = &*table.path; + let expected = table.checksum(); + + match stream_checksum(path) { + Ok(got) if got != expected => { + report.errors.push(IntegrityError::SstFileCorrupted { + table_id: table.id(), + path: (*table.path).clone(), + expected, + got, + }); + } + Ok(_) => {} + Err(e) => { + report.errors.push(IntegrityError::IoError { + path: (*table.path).clone(), + error: e, + }); + } + } + + report.sst_files_checked += 1; + } + + // Verify all blob files + for blob_file in version.blob_files.iter() { + let path = blob_file.path(); + let expected = blob_file.checksum(); + + match stream_checksum(path) { + Ok(got) if got != expected => { + report.errors.push(IntegrityError::BlobFileCorrupted { + blob_file_id: blob_file.id(), + path: path.to_path_buf(), + expected, + got, + }); + } + Ok(_) => {} + Err(e) => { + report.errors.push(IntegrityError::IoError { + path: path.to_path_buf(), + error: e, + }); + } + } + + report.blob_files_checked += 1; + } + + report +} diff --git a/src/version/run.rs b/src/version/run.rs index 9318f45fe..6f81a3bc0 100644 --- a/src/version/run.rs +++ b/src/version/run.rs @@ -12,6 +12,7 @@ pub trait Ranged { /// Item inside a run /// /// May point to an interval [min, max] of tables in the next run. +#[expect(dead_code, reason = "planned for cascading index optimization")] pub struct Indexed { inner: T, // cascade_indexes: (u32, u32), @@ -138,6 +139,10 @@ impl Run { // find last index where pred holds let end = s.iter().rposition(&pred).map_or(start, |i| i + 1); + #[expect( + clippy::expect_used, + reason = "start..end are derived from position/rposition on the same slice" + )] s.get(start..end).expect("should be in range") } diff --git a/src/version/super_version.rs b/src/version/super_version.rs index 71bf32186..e1b6dd6a5 100644 --- a/src/version/super_version.rs +++ b/src/version/super_version.rs @@ -6,7 +6,7 @@ use crate::{ memtable::Memtable, tree::sealed::SealedMemtables, version::{persist_version, Version}, - SeqNo, SequenceNumberCounter, + SeqNo, SharedSequenceNumberGenerator, MAX_SEQNO, }; use std::{collections::VecDeque, path::Path, sync::Arc}; @@ -110,12 +110,14 @@ impl SuperVersions { /// and returns a new version. /// /// The function takes care of persisting the version changes on disk. + // Takes &SharedSequenceNumberGenerator (not &dyn SequenceNumberGenerator) + // because Config stores Arc and all callers already have that type. pub(crate) fn upgrade_version crate::Result>( &mut self, tree_path: &Path, f: F, - seqno: &SequenceNumberCounter, - visible_seqno: &SequenceNumberCounter, + seqno: &SharedSequenceNumberGenerator, + visible_seqno: &SharedSequenceNumberGenerator, ) -> crate::Result<()> { self.upgrade_version_with_seqno(tree_path, f, seqno.next(), visible_seqno) } @@ -131,7 +133,7 @@ impl SuperVersions { tree_path: &Path, f: F, seqno: SeqNo, - visible_seqno: &SequenceNumberCounter, + visible_seqno: &SharedSequenceNumberGenerator, ) -> crate::Result<()> { let mut next_version = f(&self.latest_version())?; next_version.seqno = seqno; @@ -140,7 +142,9 @@ impl SuperVersions { persist_version(tree_path, &next_version.version)?; self.append_version(next_version); - visible_seqno.fetch_max(seqno + 1); + // Clamp to stay below the reserved MSB range. + let next_visible = seqno.saturating_add(1).min(MAX_SEQNO); + visible_seqno.fetch_max(next_visible); Ok(()) } diff --git a/src/vlog/blob_file/reader.rs b/src/vlog/blob_file/reader.rs index c1e3e8739..43563acda 100644 --- a/src/vlog/blob_file/reader.rs +++ b/src/vlog/blob_file/reader.rs @@ -29,13 +29,12 @@ impl<'a> Reader<'a> { pub fn get(&self, key: &'a [u8], vhandle: &'a ValueHandle) -> crate::Result { debug_assert_eq!(vhandle.blob_file_id, self.blob_file.id()); - let add_size = (BLOB_HEADER_LEN as u64) + (key.len() as u64); + let add_size = BLOB_HEADER_LEN + key.len(); + let read_len = (vhandle.on_disk_size as usize) + .checked_add(add_size) + .ok_or(crate::Error::Unrecoverable)?; - let value = crate::file::read_exact( - self.file, - vhandle.offset, - (u64::from(vhandle.on_disk_size) + add_size) as usize, - )?; + let value = crate::file::read_exact(self.file, vhandle.offset, read_len)?; let mut reader = Cursor::new(&value[..]); @@ -58,7 +57,7 @@ impl<'a> Reader<'a> { reader.seek(std::io::SeekFrom::Current(key_len.into()))?; - let raw_data = value.slice((add_size as usize)..); + let raw_data = value.slice(add_size..); { let checksum = { @@ -86,13 +85,19 @@ impl<'a> Reader<'a> { #[cfg(feature = "lz4")] CompressionType::Lz4 => { - #[warn(unsafe_code)] - let mut builder = unsafe { UserValue::builder_unzeroed(real_val_len as usize) }; + // NOTE: size cap validation for real_val_len is in PR #7 + // (feat/#258-security-validate-uncompressedlength-before-decomp) + let mut buf = vec![0u8; real_val_len]; - lz4_flex::decompress_into(&raw_data, &mut builder) + let bytes_written = lz4_flex::decompress_into(&raw_data, &mut buf) .map_err(|_| crate::Error::Decompress(self.blob_file.0.meta.compression))?; - builder.freeze().into() + // Runtime validation: corrupted data may decompress to fewer bytes + if bytes_written != real_val_len { + return Err(crate::Error::Decompress(self.blob_file.0.meta.compression)); + } + + UserValue::from(buf) } }; @@ -152,4 +157,49 @@ mod tests { Ok(()) } + + #[test] + #[cfg(feature = "lz4")] + fn blob_reader_lz4_corrupted_real_val_len_triggers_decompress_error() -> crate::Result<()> { + use byteorder::WriteBytesExt; + + let id_generator = SequenceNumberCounter::default(); + + let folder = tempfile::tempdir()?; + let mut writer = crate::vlog::BlobFileWriter::new(id_generator, folder.path(), 0, None)? + .use_target_size(u64::MAX) + .use_compression(CompressionType::Lz4); + + let handle = writer.write(b"a", 0, b"abcdef")?; + + let blob_file = writer.finish()?; + let blob_file = blob_file.first().unwrap(); + + // Tamper the real_val_len field in the blob file. + // Header layout: MAGIC(4) + Checksum(16) + SeqNo(8) + KeyLen(2) + RealValLen(4) + ... + // RealValLen is at offset 30 from the blob start. + let real_val_len_offset = handle.offset + 4 + 16 + 8 + 2; + + { + use std::io::{Seek, Write}; + let mut file = std::fs::OpenOptions::new() + .write(true) + .open(&blob_file.0.path)?; + file.seek(std::io::SeekFrom::Start(real_val_len_offset))?; + // Write a corrupted value: original len + 1 + file.write_u32::(b"abcdef".len() as u32 + 1)?; + file.flush()?; + } + + let file = File::open(&blob_file.0.path)?; + let reader = Reader::new(blob_file, &file); + + match reader.get(b"a", &handle) { + Err(crate::Error::Decompress(_)) => { /* expected */ } + Ok(_) => panic!("expected Error::Decompress, but got Ok"), + Err(other) => panic!("expected Error::Decompress, got: {other:?}"), + } + + Ok(()) + } } diff --git a/src/vlog/mod.rs b/src/vlog/mod.rs index 5f59a0a7d..c0ecb725c 100644 --- a/src/vlog/mod.rs +++ b/src/vlog/mod.rs @@ -90,11 +90,10 @@ pub fn recover_blob_files( log::error!("meta section in blob file #{blob_file_id} is missing - maybe the file is corrupted?"); })?; - let metadata_slice = crate::file::read_exact( - &file, - metadata_section.pos(), - metadata_section.len() as usize, - )?; + let metadata_len = usize::try_from(metadata_section.len()) + .map_err(|_| crate::Error::Unrecoverable)?; + let metadata_slice = + crate::file::read_exact(&file, metadata_section.pos(), metadata_len)?; Metadata::from_slice(&metadata_slice)? }; diff --git a/tests/custom_seqno_generator.rs b/tests/custom_seqno_generator.rs new file mode 100644 index 000000000..26e968f45 --- /dev/null +++ b/tests/custom_seqno_generator.rs @@ -0,0 +1,105 @@ +// Copyright (c) 2024-present, fjall-rs +// This source code is licensed under both the Apache 2.0 and MIT License +// (found in the LICENSE-* files in the repository) + +use lsm_tree::{ + AbstractTree, Config, SeqNo, SequenceNumberCounter, SequenceNumberGenerator, + SharedSequenceNumberGenerator, MAX_SEQNO, +}; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; + +/// A custom generator that starts from a configurable offset, +/// proving the trait-object wiring works end-to-end. +/// +/// Enforces the same MSB invariant as [`SequenceNumberCounter`]. +#[derive(Debug)] +struct OffsetGenerator { + counter: AtomicU64, +} + +impl OffsetGenerator { + fn new(start: u64) -> SharedSequenceNumberGenerator { + assert!(start <= MAX_SEQNO, "start must not exceed MAX_SEQNO"); + Arc::new(Self { + counter: AtomicU64::new(start), + }) + } +} + +impl SequenceNumberGenerator for OffsetGenerator { + fn next(&self) -> SeqNo { + match self + .counter + .fetch_update(Ordering::AcqRel, Ordering::Acquire, |current| { + if current >= MAX_SEQNO { + None + } else { + Some(current + 1) + } + }) { + Ok(seqno) => seqno, + Err(current) => panic!("Ran out of sequence numbers (current: {current})"), + } + } + + fn get(&self) -> SeqNo { + self.counter.load(Ordering::Acquire) + } + + fn set(&self, value: SeqNo) { + assert!(value <= MAX_SEQNO, "value must not exceed MAX_SEQNO"); + self.counter.store(value, Ordering::Release); + } + + fn fetch_max(&self, value: SeqNo) { + let clamped = value.min(MAX_SEQNO); + self.counter.fetch_max(clamped, Ordering::AcqRel); + } +} + +#[test] +fn custom_generator_via_builder() -> lsm_tree::Result<()> { + let path = lsm_tree::get_tmp_folder(); + + let seqno = OffsetGenerator::new(1000); + let visible_seqno = OffsetGenerator::new(1000); + + let tree = Config::new( + &path, + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .seqno_generator(seqno.clone()) + .visible_seqno_generator(visible_seqno.clone()) + .open()?; + + let s = seqno.next(); + assert_eq!(s, 1000); + tree.insert(b"key", b"value", s); + visible_seqno.fetch_max(s + 1); + + assert_eq!(tree.len(visible_seqno.get(), None)?, 1); + + Ok(()) +} + +#[test] +fn custom_generator_via_new_with_generators() -> lsm_tree::Result<()> { + let path = lsm_tree::get_tmp_folder(); + + let seqno = OffsetGenerator::new(5000); + let visible_seqno = OffsetGenerator::new(5000); + + let tree = Config::new_with_generators(&path, seqno.clone(), visible_seqno.clone()).open()?; + + let s = seqno.next(); + assert_eq!(s, 5000); + tree.insert(b"hello", b"world", s); + visible_seqno.fetch_max(s + 1); + + let val = tree.get(b"hello", visible_seqno.get())?.unwrap(); + assert_eq!(&*val, b"world"); + + Ok(()) +} diff --git a/tests/ingestion_seqno.rs b/tests/ingestion_seqno.rs index 105965ed9..87e36abd6 100644 --- a/tests/ingestion_seqno.rs +++ b/tests/ingestion_seqno.rs @@ -1,4 +1,4 @@ -use lsm_tree::{get_tmp_folder, AbstractTree, Config, SequenceNumberCounter}; +use lsm_tree::{get_tmp_folder, AbstractTree, Config, SeqNo, SequenceNumberCounter}; #[test] fn ingestion_persisted_seqno() -> lsm_tree::Result<()> { @@ -23,3 +23,53 @@ fn ingestion_persisted_seqno() -> lsm_tree::Result<()> { Ok(()) } + +/// Verify that get_highest_persisted_seqno reflects the global offset +/// after mixed insert + ingest, and that ingested data is visible. +#[test] +fn ingestion_seqno_after_regular_inserts() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + + let seqno = SequenceNumberCounter::default(); + let visible_seqno = SequenceNumberCounter::default(); + + let tree = Config::new(&folder, seqno.clone(), visible_seqno.clone()).open()?; + + // Regular inserts advance the seqno counter + let s0 = seqno.next(); + tree.insert("x", "x0", s0); + visible_seqno.fetch_max(s0 + 1); + + let s1 = seqno.next(); + tree.insert("y", "y0", s1); + visible_seqno.fetch_max(s1 + 1); + + tree.flush_active_memtable(0)?; + assert_eq!(tree.get_highest_persisted_seqno(), Some(s1)); + + // Capture counter before ingestion — ingestion allocates this + // value as global_seqno via seqno.next() + let ingest_global_seqno = seqno.get(); + + // Bulk-ingest: items get local seqno 0 but the table carries + // a global_seqno offset + let mut ingestion = tree.ingestion()?; + ingestion.write("a", "a0")?; + ingestion.write("b", "b0")?; + ingestion.finish()?; + + // effective = global_seqno + local_max (0) + let expected_seqno = ingest_global_seqno; + + assert_eq!( + tree.get_highest_persisted_seqno(), + Some(expected_seqno), + "ingested table must report effective seqno (global_seqno + local_seqno)" + ); + + // Verify data is visible + assert!(tree.get("a", SeqNo::MAX)?.is_some()); + assert!(tree.get("b", SeqNo::MAX)?.is_some()); + + Ok(()) +} diff --git a/tests/multi_get.rs b/tests/multi_get.rs new file mode 100644 index 000000000..9357d96ca --- /dev/null +++ b/tests/multi_get.rs @@ -0,0 +1,237 @@ +use lsm_tree::{ + get_tmp_folder, AbstractTree, Config, KvSeparationOptions, SeqNo, SequenceNumberCounter, +}; +use test_log::test; + +#[test] +fn multi_get_all_existing() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + + let tree = Config::new( + &folder, + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .open()?; + + for i in 0..100u64 { + tree.insert(format!("key_{i:04}"), format!("value_{i}"), i); + } + + tree.flush_active_memtable(0)?; + + let keys: Vec = (0..100u64).map(|i| format!("key_{i:04}")).collect(); + let results = tree.multi_get(&keys, SeqNo::MAX)?; + + assert_eq!(results.len(), 100); + for (i, result) in results.iter().enumerate() { + let expected = format!("value_{i}"); + assert_eq!( + result.as_deref(), + Some(expected.as_bytes()), + "mismatch at index {i}", + ); + } + + Ok(()) +} + +#[test] +fn multi_get_mixed_existing_and_missing() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + + let tree = Config::new( + &folder, + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .open()?; + + tree.insert("a", "val_a", 0); + tree.insert("c", "val_c", 1); + tree.insert("e", "val_e", 2); + + let results = tree.multi_get(["a", "b", "c", "d", "e"], 3)?; + + assert_eq!(results.len(), 5); + assert_eq!(results[0].as_deref(), Some(b"val_a".as_slice())); + assert_eq!(results[1], None); + assert_eq!(results[2].as_deref(), Some(b"val_c".as_slice())); + assert_eq!(results[3], None); + assert_eq!(results[4].as_deref(), Some(b"val_e".as_slice())); + + Ok(()) +} + +#[test] +fn multi_get_empty_keys() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + + let tree = Config::new( + &folder, + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .open()?; + + tree.insert("a", "val_a", 0); + + let results = tree.multi_get(Vec::<&str>::new(), 1)?; + assert!(results.is_empty()); + + Ok(()) +} + +#[test] +fn multi_get_snapshot_isolation() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + + let tree = Config::new( + &folder, + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .open()?; + + tree.insert("a", "v1", 0); + tree.insert("b", "v1", 1); + + // Update values at higher seqno + tree.insert("a", "v2", 2); + tree.insert("b", "v2", 3); + + // Read at snapshot seqno=2: should see a=v1, b=v1 + // Snapshot semantics: entry visible iff entry.seqno < snapshot_seqno + // (memtable lookup uses `seqno - 1` as upper bound, see Memtable::get). + // So a@2 (v2) is NOT visible at seqno=2, only a@0 (v1) is. + let results = tree.multi_get(["a", "b"], 2)?; + assert_eq!(results[0].as_deref(), Some(b"v1".as_slice())); + assert_eq!(results[1].as_deref(), Some(b"v1".as_slice())); + + // Read at snapshot seqno=4: should see a=v2, b=v2 + let results = tree.multi_get(["a", "b"], 4)?; + assert_eq!(results[0].as_deref(), Some(b"v2".as_slice())); + assert_eq!(results[1].as_deref(), Some(b"v2".as_slice())); + + Ok(()) +} + +#[test] +fn multi_get_with_tombstones() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + + let tree = Config::new( + &folder, + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .open()?; + + tree.insert("a", "val_a", 0); + tree.insert("b", "val_b", 1); + tree.remove("a", 2); + + let results = tree.multi_get(["a", "b"], 3)?; + assert_eq!(results[0], None); + assert_eq!(results[1].as_deref(), Some(b"val_b".as_slice())); + + Ok(()) +} + +#[test] +fn multi_get_from_disk() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + + let tree = Config::new( + &folder, + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .open()?; + + tree.insert("a", "val_a", 0); + tree.insert("b", "val_b", 1); + tree.insert("c", "val_c", 2); + tree.flush_active_memtable(0)?; + + // Insert more to memtable + tree.insert("d", "val_d", 3); + + // Multi-get spanning both disk and memtable + let results = tree.multi_get(["a", "b", "c", "d", "e"], SeqNo::MAX)?; + assert_eq!(results.len(), 5); + assert_eq!(results[0].as_deref(), Some(b"val_a".as_slice())); + assert_eq!(results[1].as_deref(), Some(b"val_b".as_slice())); + assert_eq!(results[2].as_deref(), Some(b"val_c".as_slice())); + assert_eq!(results[3].as_deref(), Some(b"val_d".as_slice())); + assert_eq!(results[4], None); + + Ok(()) +} + +#[test] +fn multi_get_blob_tree_with_kv_separation() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + + let tree = Config::new( + &folder, + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .with_kv_separation(Some(KvSeparationOptions { + separation_threshold: 1, // separate all values + ..Default::default() + })) + .open()?; + + let big_val_a = b"aaa".repeat(1000); + let big_val_b = b"bbb".repeat(1000); + + tree.insert("a", big_val_a.as_slice(), 0); + tree.insert("b", big_val_b.as_slice(), 1); + tree.insert("c", b"ccc".repeat(1000).as_slice(), 2); + tree.remove("c", 3); + + tree.flush_active_memtable(0)?; + + // Verify blob indirections were created + assert!(tree.blob_file_count() > 0); + + let results = tree.multi_get(["a", "b", "c", "missing"], SeqNo::MAX)?; + + assert_eq!(results.len(), 4); + assert_eq!(results[0].as_deref(), Some(big_val_a.as_slice())); + assert_eq!(results[1].as_deref(), Some(big_val_b.as_slice())); + assert_eq!(results[2], None); // tombstoned + assert_eq!(results[3], None); // never existed + + Ok(()) +} + +#[test] +fn multi_get_unsorted_and_duplicate_keys() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + + let tree = Config::new( + &folder, + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .open()?; + + tree.insert("a", "val_a", 0); + tree.insert("b", "val_b", 1); + tree.insert("c", "val_c", 2); + + // Unsorted keys with a duplicate — results must match input order 1:1 + let results = tree.multi_get(["c", "a", "b", "a", "missing"], 3)?; + + assert_eq!(results.len(), 5); + assert_eq!(results[0].as_deref(), Some(b"val_c".as_slice())); + assert_eq!(results[1].as_deref(), Some(b"val_a".as_slice())); + assert_eq!(results[2].as_deref(), Some(b"val_b".as_slice())); + assert_eq!(results[3].as_deref(), Some(b"val_a".as_slice())); // duplicate + assert_eq!(results[4], None); + + Ok(()) +} diff --git a/tests/tree_contains_prefix.rs b/tests/tree_contains_prefix.rs new file mode 100644 index 000000000..a9c486661 --- /dev/null +++ b/tests/tree_contains_prefix.rs @@ -0,0 +1,207 @@ +use lsm_tree::{ + get_tmp_folder, AbstractTree, Config, KvSeparationOptions, SeqNo, SequenceNumberCounter, +}; +use test_log::test; + +#[test] +fn tree_contains_prefix_empty_tree() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + + let tree = Config::new( + &folder, + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .open()?; + + assert!(!tree.contains_prefix("abc", SeqNo::MAX, None)?); + assert!(!tree.contains_prefix("", SeqNo::MAX, None)?); + + Ok(()) +} + +#[test] +fn tree_contains_prefix_empty_prefix_nonempty_tree() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + + let tree = Config::new( + &folder, + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .open()?; + + tree.insert("abc:1", "value1", 0); + tree.insert("def:1", "value2", 1); + + // Empty prefix matches all keys + assert!(tree.contains_prefix("", 2, None)?); + + // Still respects MVCC visibility + assert!(!tree.contains_prefix("", 0, None)?); + assert!(tree.contains_prefix("", 1, None)?); + + // After deleting all keys, empty prefix should not match + tree.remove("abc:1", 2); + tree.remove("def:1", 3); + assert!(!tree.contains_prefix("", 4, None)?); + + Ok(()) +} + +#[test] +fn tree_contains_prefix_basic() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + + let tree = Config::new( + &folder, + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .open()?; + + tree.insert("abc:1", "value1", 0); + tree.insert("abc:2", "value2", 1); + tree.insert("def:1", "value3", 2); + + assert!(tree.contains_prefix("abc", 3, None)?); + assert!(tree.contains_prefix("def", 3, None)?); + assert!(!tree.contains_prefix("xyz", 3, None)?); + // "ab" is a valid prefix for "abc:*" keys + assert!(tree.contains_prefix("ab", 3, None)?); + + Ok(()) +} + +#[test] +fn tree_contains_prefix_no_match() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + + let tree = Config::new( + &folder, + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .open()?; + + tree.insert("abc", "value", 0); + tree.insert("abd", "value", 1); + + assert!(!tree.contains_prefix("xyz", 2, None)?); + assert!(!tree.contains_prefix("abe", 2, None)?); + assert!(!tree.contains_prefix("abca", 2, None)?); + + Ok(()) +} + +#[test] +fn tree_contains_prefix_mvcc() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + + let tree = Config::new( + &folder, + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .open()?; + + // Insert at seqno 4 + tree.insert("abc:1", "value", 4); + + // Not visible at seqno 3 (seqno filter is item_seqno < query_seqno) + assert!(!tree.contains_prefix("abc", 3, None)?); + + // Not visible at seqno 4 (strict less-than) + assert!(!tree.contains_prefix("abc", 4, None)?); + + // Visible at seqno 5 + assert!(tree.contains_prefix("abc", 5, None)?); + + // Visible at MAX + assert!(tree.contains_prefix("abc", SeqNo::MAX, None)?); + + Ok(()) +} + +#[test] +fn tree_contains_prefix_after_delete() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + + let tree = Config::new( + &folder, + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .open()?; + + tree.insert("abc:1", "value", 0); + tree.remove("abc:1", 1); + + // After deletion, prefix should not match + assert!(!tree.contains_prefix("abc", 2, None)?); + + // But at seqno 1 (before delete), it should still be visible + assert!(tree.contains_prefix("abc", 1, None)?); + + Ok(()) +} + +#[test] +fn tree_contains_prefix_after_flush() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + + let tree = Config::new( + &folder, + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .open()?; + + tree.insert("abc:1", "value1", 0); + tree.insert("abc:2", "value2", 1); + tree.flush_active_memtable(0)?; + + assert!(tree.contains_prefix("abc", 2, None)?); + assert!(!tree.contains_prefix("xyz", 2, None)?); + + Ok(()) +} + +#[test] +fn tree_contains_prefix_blobtree() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + + let tree = Config::new( + &folder, + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .with_kv_separation(Some(KvSeparationOptions::default())) + .open()?; + + assert!(!tree.contains_prefix("abc", SeqNo::MAX, None)?); + + tree.insert("abc:1", "value1", 0); + tree.insert("abc:2", "value2", 1); + tree.insert("def:1", "value3", 2); + + assert!(tree.contains_prefix("abc", 3, None)?); + assert!(tree.contains_prefix("def", 3, None)?); + assert!(!tree.contains_prefix("xyz", 3, None)?); + + // MVCC visibility + assert!(!tree.contains_prefix("abc", 0, None)?); + assert!(tree.contains_prefix("abc", 1, None)?); + + // After delete + tree.remove("abc:1", 3); + tree.remove("abc:2", 4); + assert!(!tree.contains_prefix("abc", 5, None)?); + + // After flush + tree.insert("ghi:1", "value", 5); + tree.flush_active_memtable(0)?; + assert!(tree.contains_prefix("ghi", 6, None)?); + + Ok(()) +} diff --git a/tests/verify_integrity.rs b/tests/verify_integrity.rs new file mode 100644 index 000000000..a54a0c33b --- /dev/null +++ b/tests/verify_integrity.rs @@ -0,0 +1,379 @@ +use lsm_tree::{ + // AbstractTree must be in scope for enum_dispatch method resolution on AnyTree + get_tmp_folder, + verify, + AbstractTree, + Config, + KvSeparationOptions, + SequenceNumberCounter, +}; +use test_log::test; + +#[test] +fn verify_integrity_clean_tree() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + + let tree = Config::new( + &folder, + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .open()?; + + for key in ('a'..='z').map(|c| c.to_string()) { + tree.insert(key, b"value", 0); + } + tree.flush_active_memtable(0)?; + + let report = verify::verify_integrity(&tree); + + assert!(report.is_ok(), "clean tree should have no errors"); + assert_eq!(1, report.sst_files_checked); + assert_eq!(0, report.blob_files_checked); + + Ok(()) +} + +#[test] +fn verify_integrity_detect_sst_corruption() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + + let tree = Config::new( + &folder, + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .open()?; + + for key in ('a'..='z').map(|c| c.to_string()) { + tree.insert(key, b"value", 0); + } + tree.flush_active_memtable(0)?; + + // Corrupt a byte in the SST file + let version = tree.current_version(); + let table = version.iter_tables().next().unwrap(); + { + use std::io::{Seek, Write}; + let mut f = std::fs::OpenOptions::new().write(true).open(&*table.path)?; + f.seek(std::io::SeekFrom::Start(100))?; + f.write_all(b"CORRUPT")?; + f.sync_all()?; + } + + let report = verify::verify_integrity(&tree); + + assert!(!report.is_ok(), "corrupted tree should have errors"); + assert_eq!(1, report.sst_files_checked); + assert_eq!(1, report.errors.len()); + + // Verify error type + match &report.errors[0] { + verify::IntegrityError::SstFileCorrupted { table_id, .. } => { + assert_eq!(*table_id, table.id()); + } + other => panic!("expected SstFileCorrupted, got: {other}"), + } + + Ok(()) +} + +#[test] +fn verify_integrity_blob_tree_clean() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + + let tree = Config::new( + &folder, + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .with_kv_separation(Some(KvSeparationOptions::default().separation_threshold(1))) + .open()?; + + for key in ('a'..='z').map(|c| c.to_string()) { + tree.insert(key, b"value", 0); + } + tree.flush_active_memtable(0)?; + + let report = verify::verify_integrity(&tree); + + assert!(report.is_ok(), "clean blob tree should have no errors"); + assert!(report.sst_files_checked > 0); + assert!(report.blob_files_checked > 0); + + Ok(()) +} + +#[test] +fn verify_integrity_detect_blob_corruption() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + + let tree = Config::new( + &folder, + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .with_kv_separation(Some(KvSeparationOptions::default().separation_threshold(1))) + .open()?; + + for key in ('a'..='z').map(|c| c.to_string()) { + tree.insert(key, b"value", 0); + } + tree.flush_active_memtable(0)?; + + // Corrupt a byte in the blob file + let version = tree.current_version(); + let blob_file = version.blob_files.iter().next().unwrap(); + { + use std::io::{Seek, Write}; + let mut f = std::fs::OpenOptions::new() + .write(true) + .open(blob_file.path())?; + f.seek(std::io::SeekFrom::Start(100))?; + f.write_all(b"CORRUPT")?; + f.sync_all()?; + } + + let report = verify::verify_integrity(&tree); + + assert!(!report.is_ok(), "corrupted blob tree should have errors"); + assert_eq!(1, report.errors.len()); + + match &report.errors[0] { + verify::IntegrityError::BlobFileCorrupted { blob_file_id, .. } => { + assert_eq!(*blob_file_id, blob_file.id()); + } + other => panic!("expected BlobFileCorrupted, got: {other}"), + } + + Ok(()) +} + +#[test] +fn verify_integrity_multiple_tables() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + + let tree = Config::new( + &folder, + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .open()?; + + // Create multiple SST files + for batch in 0..3 { + for i in 0..10 { + let key = format!("batch{batch}_key{i:04}"); + tree.insert(key, b"value", 0); + } + tree.flush_active_memtable(0)?; + } + + let report = verify::verify_integrity(&tree); + + assert!(report.is_ok()); + assert_eq!(3, report.sst_files_checked); + assert_eq!(3, report.files_checked()); + + Ok(()) +} + +#[test] +fn verify_integrity_missing_sst_file() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + + let tree = Config::new( + &folder, + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .open()?; + + for key in ('a'..='z').map(|c| c.to_string()) { + tree.insert(key, b"value", 0); + } + tree.flush_active_memtable(0)?; + + // Delete the SST file to trigger an IoError + let version = tree.current_version(); + let table = version.iter_tables().next().unwrap(); + std::fs::remove_file(&*table.path)?; + + let report = verify::verify_integrity(&tree); + + assert!(!report.is_ok(), "missing file should produce an error"); + assert_eq!(1, report.errors.len()); + + match &report.errors[0] { + verify::IntegrityError::IoError { path, .. } => { + assert_eq!(path, table.path.as_ref()); + } + other => panic!("expected IoError, got: {other}"), + } + + Ok(()) +} + +#[test] +fn verify_integrity_missing_blob_file() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + + let tree = Config::new( + &folder, + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .with_kv_separation(Some(KvSeparationOptions::default().separation_threshold(1))) + .open()?; + + for key in ('a'..='z').map(|c| c.to_string()) { + tree.insert(key, b"value", 0); + } + tree.flush_active_memtable(0)?; + + // Delete the blob file to trigger an IoError on blob path + let version = tree.current_version(); + let blob_file = version.blob_files.iter().next().unwrap(); + let blob_path = blob_file.path().to_path_buf(); + std::fs::remove_file(&blob_path)?; + + let report = verify::verify_integrity(&tree); + + assert!(!report.is_ok(), "missing blob file should produce an error"); + assert_eq!(1, report.errors.len()); + + match &report.errors[0] { + verify::IntegrityError::IoError { path, .. } => { + assert_eq!(path, &blob_path); + } + other => panic!("expected IoError, got: {other}"), + } + + Ok(()) +} + +#[test] +fn verify_integrity_display_and_error_trait() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + + // -- SstFileCorrupted Display -- + let tree = Config::new( + &folder, + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .open()?; + + for key in ('a'..='z').map(|c| c.to_string()) { + tree.insert(key, b"value", 0); + } + tree.flush_active_memtable(0)?; + + // Corrupt SST + let version = tree.current_version(); + let table = version.iter_tables().next().unwrap(); + { + use std::io::{Seek, Write}; + let mut f = std::fs::OpenOptions::new().write(true).open(&*table.path)?; + f.seek(std::io::SeekFrom::Start(50))?; + f.write_all(b"XX")?; + f.sync_all()?; + } + + let report = verify::verify_integrity(&tree); + assert!(!report.errors.is_empty()); + + let msg = format!("{}", &report.errors[0]); + assert!(msg.contains("SST table"), "SstFileCorrupted Display: {msg}"); + assert!( + msg.contains("corrupted at"), + "SstFileCorrupted Display: {msg}" + ); + + // Error::source for non-IoError should be None + assert!( + std::error::Error::source(&report.errors[0]).is_none(), + "SstFileCorrupted should have no source" + ); + + drop(tree); + + // -- BlobFileCorrupted Display -- + let folder2 = get_tmp_folder(); + let tree2 = Config::new( + &folder2, + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .with_kv_separation(Some(KvSeparationOptions::default().separation_threshold(1))) + .open()?; + + for key in ('a'..='z').map(|c| c.to_string()) { + tree2.insert(key, b"value", 0); + } + tree2.flush_active_memtable(0)?; + + // Corrupt blob + let version2 = tree2.current_version(); + let blob_file = version2.blob_files.iter().next().unwrap(); + { + use std::io::{Seek, Write}; + let mut f = std::fs::OpenOptions::new() + .write(true) + .open(blob_file.path())?; + f.seek(std::io::SeekFrom::Start(50))?; + f.write_all(b"XX")?; + f.sync_all()?; + } + + let report2 = verify::verify_integrity(&tree2); + let blob_err = report2 + .errors + .iter() + .find(|e| matches!(e, verify::IntegrityError::BlobFileCorrupted { .. })); + assert!(blob_err.is_some(), "should have BlobFileCorrupted error"); + + let msg = format!("{}", blob_err.unwrap()); + assert!( + msg.contains("blob file"), + "BlobFileCorrupted Display: {msg}" + ); + assert!( + msg.contains("corrupted at"), + "BlobFileCorrupted Display: {msg}" + ); + + drop(tree2); + + // -- IoError Display + Error::source -- + let folder3 = get_tmp_folder(); + let tree3 = Config::new( + &folder3, + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .open()?; + + for key in ('a'..='z').map(|c| c.to_string()) { + tree3.insert(key, b"value", 0); + } + tree3.flush_active_memtable(0)?; + + let version3 = tree3.current_version(); + let table3 = version3.iter_tables().next().unwrap(); + std::fs::remove_file(&*table3.path)?; + + let report3 = verify::verify_integrity(&tree3); + assert!(!report3.errors.is_empty()); + + let msg = format!("{}", &report3.errors[0]); + assert!(msg.contains("I/O error reading"), "IoError Display: {msg}"); + + // Error::source for IoError should return the underlying io::Error + assert!( + std::error::Error::source(&report3.errors[0]).is_some(), + "IoError should have a source" + ); + + Ok(()) +}