Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ jobs:
# https://docs.github.com/en/actions/learn-github-actions/contexts#context-availability
strategy:
matrix:
msrv: ["1.80.0"] # `Lazy` types stabilized in "1.80".
msrv: ["1.85.1"] # 2024 Edition.
name: ubuntu / ${{ matrix.msrv }}
steps:
- uses: actions/checkout@v4
Expand Down
78 changes: 39 additions & 39 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,45 +45,45 @@ jobs:
# https://github.com/rust-lang/cargo/issues/6669
- name: cargo test --doc
run: cargo test --locked --all-features --doc
minimal:
# This action chooses the oldest version of the dependencies permitted by Cargo.toml to ensure
# that this crate is compatible with the minimal version that this crate and its dependencies
# require. This will pickup issues where this create relies on functionality that was introduced
# later than the actual version specified (e.g., when we choose just a major version, but a
# method was added after this version).
#
# This particular check can be difficult to get to succeed as often transitive dependencies may
# be incorrectly specified (e.g., a dependency specifies 1.0 but really requires 1.1.5). There
# is an alternative flag available -Zdirect-minimal-versions that uses the minimal versions for
# direct dependencies of this crate, while selecting the maximal versions for the transitive
# dependencies. Alternatively, you can add a line in your Cargo.toml to artificially increase
# the minimal dependency, which you do with e.g.:
# ```toml
# # for minimal-versions
# [target.'cfg(any())'.dependencies]
# openssl = { version = "0.10.55", optional = true } # needed to allow foo to build with -Zminimal-versions
# ```
# The optional = true is necessary in case that dependency isn't otherwise transitively required
# by your library, and the target bit is so that this dependency edge never actually affects
# Cargo build order. See also
# https://github.com/jonhoo/fantoccini/blob/fde336472b712bc7ebf5b4e772023a7ba71b2262/Cargo.toml#L47-L49.
# This action is run on ubuntu with the stable toolchain, as it is not expected to fail
runs-on: ubuntu-latest
name: ubuntu / stable / minimal-versions
steps:
- uses: actions/checkout@v4
with:
submodules: true
- name: Install stable
uses: dtolnay/rust-toolchain@stable
- name: Install nightly for -Zminimal-versions
uses: dtolnay/rust-toolchain@nightly
- name: rustup default stable
run: rustup default stable
- name: cargo update -Zminimal-versions
run: cargo +nightly update -Zminimal-versions
- name: cargo test
run: cargo test --locked --all-features --all-targets
# minimal:
# # This action chooses the oldest version of the dependencies permitted by Cargo.toml to ensure
# # that this crate is compatible with the minimal version that this crate and its dependencies
# # require. This will pickup issues where this create relies on functionality that was introduced
# # later than the actual version specified (e.g., when we choose just a major version, but a
# # method was added after this version).
# #
# # This particular check can be difficult to get to succeed as often transitive dependencies may
# # be incorrectly specified (e.g., a dependency specifies 1.0 but really requires 1.1.5). There
# # is an alternative flag available -Zdirect-minimal-versions that uses the minimal versions for
# # direct dependencies of this crate, while selecting the maximal versions for the transitive
# # dependencies. Alternatively, you can add a line in your Cargo.toml to artificially increase
# # the minimal dependency, which you do with e.g.:
# # ```toml
# # # for minimal-versions
# # [target.'cfg(any())'.dependencies]
# # openssl = { version = "0.10.55", optional = true } # needed to allow foo to build with -Zminimal-versions
# # ```
# # The optional = true is necessary in case that dependency isn't otherwise transitively required
# # by your library, and the target bit is so that this dependency edge never actually affects
# # Cargo build order. See also
# # https://github.com/jonhoo/fantoccini/blob/fde336472b712bc7ebf5b4e772023a7ba71b2262/Cargo.toml#L47-L49.
# # This action is run on ubuntu with the stable toolchain, as it is not expected to fail
# runs-on: ubuntu-latest
# name: ubuntu / stable / minimal-versions
# steps:
# - uses: actions/checkout@v4
# with:
# submodules: true
# - name: Install stable
# uses: dtolnay/rust-toolchain@stable
# - name: Install nightly for -Zminimal-versions
# uses: dtolnay/rust-toolchain@nightly
# - name: rustup default stable
# run: rustup default stable
# - name: cargo update -Zminimal-versions
# run: cargo +nightly update -Zminimal-versions
# - name: cargo test
# run: cargo test --locked --all-features --all-targets
# # This project only works on Linux.
# os-check:
# # run cargo test on mac and windows
Expand Down
31 changes: 12 additions & 19 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,32 +1,25 @@
[package]
name = "async-bpm"
version = "0.1.0"
edition = "2021"
edition = "2024"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
async-channel = "2.3.1"
core_affinity = "0.7.0"
derivative = "2.0.0"
libc = "0.2.0"
rand = "0.8.0"
scc = "2.0.0"
async-channel = "2.5.0"
core_affinity = "0.8.3"
derivative = "2.2.0"
libc = "0.2.176"
rand = "0.9.2"
rand_distr = "0.5.1"
scc = "3.1.1"
tokio-uring = "0.5.0"
tracing = "0.1.0"
zipf = "7.0.0"

# Pin version "1.27" for a missing method.
tokio = { version = "1.27.0", features = ["macros", "rt", "sync", "time"] }

# Pin more recent versions for `-Zminimal-versions`.
bitflags = "1.1.0" # For tokio-uring -> io-uring -> bitflags.
proc-macro2 = "1.0.60" # For a missing feature.
slab = "0.4.4" # For a missing method.
tracing = "0.1.41"
tokio = { version = "1.47.1", features = ["macros", "rt", "sync", "time"] }

[dev-dependencies]
tokio = { version = "1.27.0", features = ["full"] }
tracing-subscriber = { version = "0.3.0", features = ["env-filter"] }
tokio = { version = "1.47.1", features = ["full"] }
tracing-subscriber = { version = "0.3.20", features = ["env-filter"] }

[profile.dev]
panic = "abort"
Expand Down
12 changes: 6 additions & 6 deletions src/bpm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@
//! pool manager would work.

use crate::{
page::{Page, PageHandle, PageId, PAGE_SIZE},
storage::{Frame, FrameGroup, StorageManager, FRAME_GROUP_SIZE},
page::{PAGE_SIZE, Page, PageHandle, PageId},
storage::{FRAME_GROUP_SIZE, Frame, FrameGroup, StorageManager},
};
use rand::prelude::*;
use scc::HashMap;
use std::sync::{atomic::AtomicBool, Arc, OnceLock};
use std::sync::{Arc, OnceLock, atomic::AtomicBool};
use std::{future::Future, io::Result};
use tokio::sync::RwLock;
use tokio::task;
Expand Down Expand Up @@ -149,7 +149,7 @@ impl BufferPoolManager {
// Get the page if it exists, otherwise create a new one return that.
let page = self
.pages
.entry(*pid)
.entry_sync(*pid)
.or_insert_with(|| {
trace!("Creating a new `Page`.");

Expand All @@ -174,8 +174,8 @@ impl BufferPoolManager {
///
/// Intended for use by an eviction algorithm.
pub(crate) fn get_random_frame_group(&self) -> Arc<FrameGroup> {
let mut rng = rand::thread_rng();
let index = rng.gen_range(0..self.frame_groups.len());
let mut rng = rand::rng();
let index = rng.random_range(0..self.frame_groups.len());

self.get_frame_group(index)
}
Expand Down
12 changes: 6 additions & 6 deletions src/page/page_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@
//! one of the methods on [`PageHandle`].

use crate::bpm::BufferPoolManager;
use crate::page::page_guard::{ReadPageGuard, WritePageGuard};
use crate::page::Page;
use crate::page::page_guard::{ReadPageGuard, WritePageGuard};
use crate::storage::{Frame, StorageManagerHandle};
use derivative::Derivative;
use std::io::Result;
use std::ops::Deref;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::atomic::Ordering;
use tokio::sync::RwLockWriteGuard;
use tracing::field::Empty;
use tracing::{info, instrument, trace, warn};
Expand Down Expand Up @@ -44,7 +44,7 @@ impl PageHandle {
///
/// Raises an error if an I/O error occurs while trying to load the data from disk into memory.
#[instrument(skip(self), err, fields(page = ?self.page.pid))]
pub async fn read(&self) -> Result<ReadPageGuard> {
pub async fn read(&self) -> Result<ReadPageGuard<'_>> {
info!("Reading `PageHandle`.");

// Optimization: attempt to read only if we observe that the `is_loaded` flag is set.
Expand Down Expand Up @@ -84,7 +84,7 @@ impl PageHandle {
///
/// Raises an error if an I/O error occurs while trying to load the data from disk into memory.
#[instrument(skip(self), err, fields(page = ?self.page.pid))]
pub async fn try_read(&self) -> Result<Option<ReadPageGuard>> {
pub async fn try_read(&self) -> Result<Option<ReadPageGuard<'_>>> {
info!("Trying to read `PageHandle`.");

// Optimization: attempt to read only if we observe that the `is_loaded` flag is set.
Expand Down Expand Up @@ -127,7 +127,7 @@ impl PageHandle {
///
/// Raises an error if an I/O error occurs while trying to load the data from disk into memory.
#[instrument(skip(self), err, fields(page = ?self.page.pid))]
pub async fn write(&self) -> Result<WritePageGuard> {
pub async fn write(&self) -> Result<WritePageGuard<'_>> {
info!("Writing `PageHandle`.");

let mut write_guard = self.page.frame.write().await;
Expand Down Expand Up @@ -157,7 +157,7 @@ impl PageHandle {
///
/// Raises an error if an I/O error occurs while trying to load the data from disk into memory.
#[instrument(skip(self), err, fields(page = ?self.page.pid))]
pub async fn try_write(&self) -> Result<Option<WritePageGuard>> {
pub async fn try_write(&self) -> Result<Option<WritePageGuard<'_>>> {
info!("Trying to write `PageHandle`.");

let Ok(mut write_guard) = self.page.frame.try_write() else {
Expand Down
4 changes: 2 additions & 2 deletions src/storage/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
//! with the the kernel to avoid unnecessary `memcpy`s from the kernel's internal buffers into
//! user-space buffers.

use crate::storage::frame_group::{EvictionState, FrameGroup, FRAME_GROUP_SIZE};
use crate::storage::frame_group::{EvictionState, FRAME_GROUP_SIZE, FrameGroup};
use crate::{
bpm::BufferPoolManager,
page::{Page, PAGE_SIZE},
page::{PAGE_SIZE, Page},
};
use std::{
ops::{Deref, DerefMut},
Expand Down
2 changes: 1 addition & 1 deletion src/storage/frame_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ use crate::storage::storage_manager::StorageManager;
use async_channel::{Receiver, Sender};
use std::io::Result;
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc, Mutex,
atomic::{AtomicUsize, Ordering},
};

/// The number of frames in a [`FrameGroup`].
Expand Down
4 changes: 2 additions & 2 deletions src/storage/storage_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ use crate::{page::PageId, storage::frame::Frame};
use std::io::Result;
use std::ops::Deref;
use std::os::unix::fs::OpenOptionsExt;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::LazyLock;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::{rc::Rc, sync::OnceLock};
use tokio_uring::fs::File;
use tokio_uring::BufResult;
use tokio_uring::fs::File;

/// The name of the database's file.
pub const DATABASE_NAME: &str = "bpm.db";
Expand Down
2 changes: 1 addition & 1 deletion tests/basic.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use async_bpm::{page::PageId, BufferPoolManager};
use async_bpm::{BufferPoolManager, page::PageId};
use std::ops::DerefMut;
use std::thread;

Expand Down
22 changes: 12 additions & 10 deletions tests/throughput.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
use async_bpm::{
page::{PageId, PAGE_SIZE},
BufferPoolManager, IO_OPERATIONS,
page::{PAGE_SIZE, PageId},
};
use core_affinity::CoreId;
use rand::thread_rng;
use rand::{distributions::Distribution, Rng};
use rand::rng;
use rand::{Rng, distr::Distribution};
use rand_distr::Zipf;
use std::{
ops::{Deref, DerefMut},
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
atomic::{AtomicUsize, Ordering},
},
thread,
};
Expand All @@ -18,7 +19,6 @@ use tokio::{
task::{JoinHandle, JoinSet},
};
use tracing::info;
use zipf::ZipfDistribution;

const SECONDS: usize = 300;

Expand Down Expand Up @@ -63,7 +63,9 @@ const READ: bool = true;
fn throughput() {
tracing_subscriber::fmt::init();

info!("Find tasks: {FIND_TASKS}, Find Threads: {FIND_THREADS}, Scan Tasks: {SCAN_TASKS}, Scan Threads: {SCAN_THREADS}");
info!(
"Find tasks: {FIND_TASKS}, Find Threads: {FIND_THREADS}, Scan Tasks: {SCAN_TASKS}, Scan Threads: {SCAN_THREADS}"
);

BufferPoolManager::initialize(FRAMES, STORAGE_PAGES);

Expand Down Expand Up @@ -178,8 +180,8 @@ fn spawn_find_task(barrier: Arc<Barrier>) -> JoinHandle<()> {
let bpm = BufferPoolManager::get();

// Since half of the threads are solely reading, we double the writers here.
let zipf = ZipfDistribution::new(STORAGE_PAGES, ZIPF_EXP).unwrap();
let mut rng = rand::thread_rng();
let zipf = Zipf::new(STORAGE_PAGES as f64, ZIPF_EXP).unwrap();
let mut rng = rand::rng();

BufferPoolManager::spawn_local(async move {
let mut handles = Vec::with_capacity(TASK_ACCESSES);
Expand Down Expand Up @@ -211,8 +213,8 @@ fn spawn_scan_task(barrier: Arc<Barrier>) -> JoinHandle<()> {
BufferPoolManager::spawn_local(async move {
barrier.wait().await;

let mut rng = thread_rng();
let start = rng.gen_range(0..STORAGE_PAGES);
let mut rng = rng();
let start = rng.random_range(0..STORAGE_PAGES);

// Continuously scan all pages.
loop {
Expand Down