Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions examples/hybrid_full.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,16 @@

use std::{hash::BuildHasherDefault, num::NonZeroUsize, sync::Arc};

use anyhow::Result;
use chrono::Datelike;
use foyer::{
AdmitAllPicker, DirectFsDeviceOptions, Engine, FifoPicker, HybridCache, HybridCacheBuilder, HybridCachePolicy,
IopsCounter, LargeEngineOptions, LruConfig, RecoverMode, RejectAllPicker, RuntimeOptions, SmallEngineOptions,
Throttle, TokioRuntimeOptions, TombstoneLogConfigBuilder,
IopsCounter, LargeEngineOptions, LruConfig, RecoverMode, RejectAllPicker, Result, RuntimeOptions,
SmallEngineOptions, Throttle, TokioRuntimeOptions, TombstoneLogConfigBuilder,
};
use tempfile::tempdir;

#[tokio::main]
async fn main() -> Result<()> {
async fn main() -> anyhow::Result<()> {
let dir = tempdir()?;

let hybrid: HybridCache<u64, String> = HybridCacheBuilder::new()
Expand Down Expand Up @@ -110,7 +109,8 @@ async fn main() -> Result<()> {
async fn mock() -> Result<String> {
let now = chrono::Utc::now();
if format!("{}{}{}", now.year(), now.month(), now.day()) == "20230512" {
return Err(anyhow::anyhow!("Hi, time traveler!"));
let e: Box<dyn std::error::Error + Send + Sync + 'static> = "Hi, time traveler!".into();
return Err(e.into());
}
Ok("Hello, foyer.".to_string())
}
6 changes: 3 additions & 3 deletions foyer-memory/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ use foyer_common::{
use mixtrics::{metrics::BoxedRegistry, registry::noop::NoopMetricsRegistry};
use pin_project::pin_project;
use serde::{Deserialize, Serialize};
use tokio::sync::oneshot;

use crate::{
error::Error,
eviction::{
fifo::{Fifo, FifoConfig},
lfu::{Lfu, LfuConfig},
Expand Down Expand Up @@ -945,7 +945,7 @@ impl<K, V, ER, S, P> Future for Fetch<K, V, ER, S, P>
where
K: Key,
V: Value,
ER: From<oneshot::error::RecvError>,
ER: From<Error>,
S: HashBuilder,
P: Properties,
{
Expand Down Expand Up @@ -1165,7 +1165,7 @@ mod tests {
let entry = cache
.fetch(i, || async move {
tokio::time::sleep(Duration::from_micros(10)).await;
Ok::<_, tokio::sync::oneshot::error::RecvError>(i)
Ok::<_, Error>(i)
})
.await
.unwrap();
Expand Down
8 changes: 8 additions & 0 deletions foyer-memory/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,21 @@
/// Config error.
#[error("config error: {0}")]
ConfigError(String),
/// Wait error.
#[error("wait for concurrent fetch result error: {0}")]
Wait(Box<dyn std::error::Error + Send + Sync + 'static>),
}

impl Error {
/// Combine multiple errors into one error.
pub fn multiple(errs: Vec<Error>) -> Self {
Self::Multiple(MultipleError(errs))
}

/// Error on waiting for concurrrent fetch result.
pub fn wait(err: impl std::error::Error + Send + Sync + 'static) -> Self {
Self::Wait(Box::new(err))
}

Check warning on line 40 in foyer-memory/src/error.rs

View check run for this annotation

Codecov / codecov/patch

foyer-memory/src/error.rs#L38-L40

Added lines #L38 - L40 were not covered by tests
}

#[derive(thiserror::Error, Debug)]
Expand Down
4 changes: 2 additions & 2 deletions foyer-memory/src/raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -988,7 +988,7 @@ where
impl<E, ER, S, I> Future for RawFetchInner<E, ER, S, I>
where
E: Eviction,
ER: From<oneshot::error::RecvError>,
ER: From<Error>,
S: HashBuilder,
I: Indexer<Eviction = E>,
{
Expand All @@ -997,7 +997,7 @@ where
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.project() {
RawFetchInnerProj::Hit(opt) => Poll::Ready(Ok(opt.take().unwrap()).into()),
RawFetchInnerProj::Wait(waiter) => waiter.poll(cx).map_err(|err| err.into()).map(Diversion::from),
RawFetchInnerProj::Wait(waiter) => waiter.poll(cx).map_err(|e| Error::wait(e).into()).map(Diversion::from),
RawFetchInnerProj::Miss(handle) => handle.poll(cx).map(|join| join.unwrap()),
}
}
Expand Down
2 changes: 1 addition & 1 deletion foyer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ strict_assertions = [
]

[dependencies]
anyhow = { workspace = true }
equivalent = { workspace = true }
fastrace = { workspace = true, optional = true }
foyer-common = { workspace = true }
Expand All @@ -43,6 +42,7 @@ foyer-storage = { workspace = true }
mixtrics = { workspace = true }
pin-project = { workspace = true }
serde = { workspace = true }
thiserror = { workspace = true }
tracing = { workspace = true }

[target.'cfg(madsim)'.dependencies]
Expand Down
8 changes: 5 additions & 3 deletions foyer/src/hybrid/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ use foyer_memory::{Cache, CacheBuilder, EvictionConfig, Weighter};
use foyer_storage::{AdmissionPicker, Compression, DeviceOptions, Engine, RecoverMode, RuntimeOptions, StoreBuilder};
use mixtrics::{metrics::BoxedRegistry, registry::noop::NoopMetricsRegistry};

use super::cache::{HybridCacheOptions, HybridCachePipe};
use crate::{HybridCache, HybridCachePolicy, HybridCacheProperties};
use crate::hybrid::{
cache::{HybridCache, HybridCacheOptions, HybridCachePipe, HybridCachePolicy, HybridCacheProperties},
error::Result,
};

/// Hybrid cache builder.
pub struct HybridCacheBuilder<K, V> {
Expand Down Expand Up @@ -304,7 +306,7 @@ where
}

/// Build and open the hybrid cache with the given configurations.
pub async fn build(self) -> anyhow::Result<HybridCache<K, V, S>> {
pub async fn build(self) -> Result<HybridCache<K, V, S>> {
let builder = self.builder;

let piped = !builder.is_noop() && self.options.policy == HybridCachePolicy::WriteOnEviction;
Expand Down
53 changes: 25 additions & 28 deletions foyer/src/hybrid/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,12 @@
use foyer_storage::{IoThrottler, Load, Statistics, Store};
use pin_project::pin_project;
use serde::{Deserialize, Serialize};
use tokio::sync::oneshot;

use super::writer::HybridCacheStorageWriter;
use crate::{HybridCacheBuilder, HybridCacheWriter};
use crate::hybrid::{
builder::HybridCacheBuilder,
error::{Error, Result},
writer::{HybridCacheStorageWriter, HybridCacheWriter},
};

#[cfg(feature = "tracing")]
macro_rules! root_span {
Expand Down Expand Up @@ -316,7 +318,7 @@
memory: Cache<K, V, S, HybridCacheProperties>,
storage: Store<K, V, S, HybridCacheProperties>,
flush_on_close: bool,
) -> anyhow::Result<()> {
) -> Result<()> {
if closed.fetch_or(true, Ordering::Relaxed) {
return Ok(());
}
Expand All @@ -333,7 +335,7 @@
Ok(())
}

async fn close(&self) -> anyhow::Result<()> {
async fn close(&self) -> Result<()> {
Self::close_inner(
self.closed.clone(),
self.memory.clone(),
Expand Down Expand Up @@ -571,7 +573,7 @@
}

/// Get cached entry with the given key from the hybrid cache.
pub async fn get<Q>(&self, key: &Q) -> anyhow::Result<Option<HybridCacheEntry<K, V, S>>>
pub async fn get<Q>(&self, key: &Q) -> Result<Option<HybridCacheEntry<K, V, S>>>
where
Q: Hash + Equivalent<K> + Send + Sync + 'static + Clone,
{
Expand Down Expand Up @@ -650,7 +652,7 @@
///
/// `obtain` is always supposed to be used instead of `get` if the overhead of getting the ownership of the given
/// key is acceptable.
pub async fn obtain(&self, key: K) -> anyhow::Result<Option<HybridCacheEntry<K, V, S>>>
pub async fn obtain(&self, key: K) -> Result<Option<HybridCacheEntry<K, V, S>>>
where
K: Clone,
{
Expand All @@ -667,7 +669,7 @@
|| {
let store = self.inner.storage.clone();
async move {
match store.load(&key).await.map_err(anyhow::Error::from) {
match store.load(&key).await.map_err(Error::from) {

Check warning on line 672 in foyer/src/hybrid/cache.rs

View check run for this annotation

Codecov / codecov/patch

foyer/src/hybrid/cache.rs#L672

Added line #L672 was not covered by tests
Ok(Load::Entry {
key: _,
value,
Expand All @@ -681,7 +683,7 @@
},
Ok(Load::Throttled) => Err(ObtainFetchError::Throttled).into(),
Ok(Load::Miss) => Err(ObtainFetchError::NotExist).into(),
Err(e) => Err(ObtainFetchError::Err(e)).into(),
Err(e) => Err(ObtainFetchError::Other(e)).into(),

Check warning on line 686 in foyer/src/hybrid/cache.rs

View check run for this annotation

Codecov / codecov/patch

foyer/src/hybrid/cache.rs#L686

Added line #L686 was not covered by tests
}
}
},
Expand Down Expand Up @@ -720,11 +722,7 @@
try_cancel!(self, span, record_hybrid_obtain_threshold);
Ok(None)
}
Err(ObtainFetchError::RecvError(_)) => {
try_cancel!(self, span, record_hybrid_obtain_threshold);
Ok(None)
}
Err(ObtainFetchError::Err(e)) => {
Err(ObtainFetchError::Other(e)) => {

Check warning on line 725 in foyer/src/hybrid/cache.rs

View check run for this annotation

Codecov / codecov/patch

foyer/src/hybrid/cache.rs#L725

Added line #L725 was not covered by tests
try_cancel!(self, span, record_hybrid_obtain_threshold);
Err(e)
}
Expand Down Expand Up @@ -766,7 +764,7 @@
}

/// Clear the hybrid cache.
pub async fn clear(&self) -> anyhow::Result<()> {
pub async fn clear(&self) -> Result<()> {
self.inner.memory.clear();
self.inner.storage.destroy().await?;
Ok(())
Expand All @@ -781,7 +779,7 @@
/// For more details, please refer to [`super::builder::HybridCacheBuilder::with_flush_on_close()`].
///
/// If `close` is not called explicitly, the hybrid cache will be closed when its last copy is dropped.
pub async fn close(&self) -> anyhow::Result<()> {
pub async fn close(&self) -> Result<()> {
self.inner.close().await
}

Expand Down Expand Up @@ -819,13 +817,12 @@
enum ObtainFetchError {
Throttled,
NotExist,
RecvError(oneshot::error::RecvError),
Err(anyhow::Error),
Other(Error),
}

impl From<oneshot::error::RecvError> for ObtainFetchError {
fn from(e: oneshot::error::RecvError) -> Self {
Self::RecvError(e)
impl From<foyer_memory::Error> for ObtainFetchError {
fn from(e: foyer_memory::Error) -> Self {
Self::Other(e.into())

Check warning on line 825 in foyer/src/hybrid/cache.rs

View check run for this annotation

Codecov / codecov/patch

foyer/src/hybrid/cache.rs#L824-L825

Added lines #L824 - L825 were not covered by tests
}
}

Expand All @@ -846,7 +843,7 @@
S: HashBuilder + Debug,
{
#[pin]
inner: Fetch<K, V, anyhow::Error, S, HybridCacheProperties>,
inner: Fetch<K, V, Error, S, HybridCacheProperties>,
policy: HybridCachePolicy,
storage: Store<K, V, S, HybridCacheProperties>,
}
Expand All @@ -857,7 +854,7 @@
V: StorageValue,
S: HashBuilder + Debug,
{
type Output = anyhow::Result<CacheEntry<K, V, S, HybridCacheProperties>>;
type Output = Result<CacheEntry<K, V, S, HybridCacheProperties>>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
Expand Down Expand Up @@ -885,7 +882,7 @@
V: StorageValue,
S: HashBuilder + Debug,
{
type Target = Fetch<K, V, anyhow::Error, S, HybridCacheProperties>;
type Target = Fetch<K, V, Error, S, HybridCacheProperties>;

fn deref(&self) -> &Self::Target {
&self.inner
Expand All @@ -905,7 +902,7 @@
pub fn fetch<F, FU>(&self, key: K, fetch: F) -> HybridFetch<K, V, S>
where
F: FnOnce() -> FU,
FU: Future<Output = anyhow::Result<V>> + Send + 'static,
FU: Future<Output = Result<V>> + Send + 'static,
{
self.fetch_inner(key, HybridCacheProperties::default(), fetch)
}
Expand All @@ -922,15 +919,15 @@
) -> HybridFetch<K, V, S>
where
F: FnOnce() -> FU,
FU: Future<Output = anyhow::Result<V>> + Send + 'static,
FU: Future<Output = Result<V>> + Send + 'static,
{
self.fetch_inner(key, properties, fetch)
}

fn fetch_inner<F, FU>(&self, key: K, properties: HybridCacheProperties, fetch: F) -> HybridFetch<K, V, S>
where
F: FnOnce() -> FU,
FU: Future<Output = anyhow::Result<V>> + Send + 'static,
FU: Future<Output = Result<V>> + Send + 'static,
{
root_span!(self, span, "foyer::hybrid::cache::fetch");

Expand All @@ -950,7 +947,7 @@
let runtime = self.storage().runtime().clone();

async move {
let throttled = match store.load(&key).await.map_err(anyhow::Error::from) {
let throttled = match store.load(&key).await.map_err(Error::from) {
Ok(Load::Entry {
key: _,
value,
Expand Down
Loading
Loading