From 86df32b8804784ba064ab7c11bda537073cfc69d Mon Sep 17 00:00:00 2001 From: MrCroxx Date: Thu, 3 Jul 2025 23:49:23 +0800 Subject: [PATCH 01/20] feat: introduce foyer layer, partially impl it Signed-off-by: MrCroxx --- core/Cargo.lock | 420 +++++++++++++++++++++++++++++++++++++-- core/Cargo.toml | 6 + core/src/layers/foyer.rs | 304 ++++++++++++++++++++++++++++ core/src/layers/mod.rs | 5 + 4 files changed, 720 insertions(+), 15 deletions(-) create mode 100644 core/src/layers/foyer.rs diff --git a/core/Cargo.lock b/core/Cargo.lock index 455f5cb813f2..f7f73b17167e 100644 --- a/core/Cargo.lock +++ b/core/Cargo.lock @@ -380,7 +380,7 @@ checksum = "29db05b624fb6352fc11bfe30c54ab1b16a1fe937d7c05a783f4e88ef1292b3b" dependencies = [ "Inflector", "async-graphql-parser", - "darling", + "darling 0.20.11", "proc-macro-crate", "proc-macro2", "quote", @@ -567,6 +567,18 @@ dependencies = [ "syn 2.0.101", ] +[[package]] +name = "auto_enums" +version = "0.8.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c170965892137a3a9aeb000b4524aa3cc022a310e709d848b6e1cdce4ab4781" +dependencies = [ + "derive_utils", + "proc-macro2", + "quote", + "syn 2.0.101", +] + [[package]] name = "autocfg" version = "1.4.0" @@ -1769,7 +1781,7 @@ dependencies = [ "anstream", "anstyle", "clap_lex", - "strsim", + "strsim 0.11.1", "terminal_size", ] @@ -1800,6 +1812,15 @@ dependencies = [ "cc", ] +[[package]] +name = "cmsketch" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "553c840ee51da812c6cd621f9f7e07dfb00a49f91283a8e6380c78cba4f61aba" +dependencies = [ + "paste", +] + [[package]] name = "coarsetime" version = "0.1.36" @@ -2332,14 +2353,38 @@ dependencies = [ "cipher", ] +[[package]] +name = "darling" +version = "0.14.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b750cb3417fd1b327431a470f388520309479ab0bf5e323505daf0290cd3850" +dependencies = [ + "darling_core 0.14.4", + "darling_macro 0.14.4", +] + [[package]] name = "darling" version = "0.20.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc7f46116c46ff9ab3eb1597a45688b6715c6e628b5c133e288e709a29bcb4ee" dependencies = [ - "darling_core", - "darling_macro", + "darling_core 0.20.11", + "darling_macro 0.20.11", +] + +[[package]] +name = "darling_core" +version = "0.14.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "109c1ca6e6b7f82cc233a97004ea8ed7ca123a9af07a8230878fcfda9b158bf0" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2", + "quote", + "strsim 0.10.0", + "syn 1.0.109", ] [[package]] @@ -2352,17 +2397,28 @@ dependencies = [ "ident_case", "proc-macro2", "quote", - "strsim", + "strsim 0.11.1", "syn 2.0.101", ] +[[package]] +name = "darling_macro" +version = "0.14.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4aab4dbc9f7611d8b55048a3a16d2d010c2c8334e46304b40ac1cc14bf3b48e" +dependencies = [ + "darling_core 0.14.4", + "quote", + "syn 1.0.109", +] + [[package]] name = "darling_macro" version = "0.20.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc34b93ccb385b40dc71c6fceac4b2ad23662c7eeb248cf10d529b7e055b6ead" dependencies = [ - "darling_core", + "darling_core 0.20.11", "quote", "syn 2.0.101", ] @@ -2490,7 +2546,7 @@ version = "0.20.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2d5bcf7b024d6835cfb3d473887cd966994907effbe9227e8c8219824d06c4e8" dependencies = [ - "darling", + "darling 0.20.11", "proc-macro2", "quote", "syn 2.0.101", @@ -2530,6 +2586,17 @@ dependencies = [ "syn 2.0.101", ] +[[package]] +name = "derive_utils" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ccfae181bab5ab6c5478b2ccb69e4c68a02f8c3ec72f6616bfec9dbc599d2ee0" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.101", +] + [[package]] name = "des" version = "0.8.1" @@ -2657,6 +2724,12 @@ version = "0.15.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b" +[[package]] +name = "downcast-rs" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75b325c5dbd37f80359721ad39aca5a29fb04c89279657cffdda8736d0c0b9d2" + [[package]] name = "dtoa" version = "1.0.10" @@ -3116,6 +3189,105 @@ dependencies = [ "uuid", ] +[[package]] +name = "foyer" +version = "0.17.4" +source = "git+https://github.com/foyer-rs/foyer?rev=88ac58b5893dc06dfda79f361eefc75c3c60d75a#88ac58b5893dc06dfda79f361eefc75c3c60d75a" +dependencies = [ + "anyhow", + "equivalent", + "foyer-common", + "foyer-memory", + "foyer-storage", + "madsim-tokio", + "mixtrics", + "pin-project", + "serde", + "tokio", + "tracing", +] + +[[package]] +name = "foyer-common" +version = "0.17.4" +source = "git+https://github.com/foyer-rs/foyer?rev=88ac58b5893dc06dfda79f361eefc75c3c60d75a#88ac58b5893dc06dfda79f361eefc75c3c60d75a" +dependencies = [ + "bytes", + "cfg-if", + "itertools 0.14.0", + "madsim-tokio", + "mixtrics", + "parking_lot 0.12.3", + "pin-project", + "thiserror 2.0.12", + "tokio", + "twox-hash", +] + +[[package]] +name = "foyer-intrusive-collections" +version = "0.10.0-dev" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e4fee46bea69e0596130e3210e65d3424e0ac1e6df3bde6636304bdf1ca4a3b" +dependencies = [ + "memoffset 0.9.1", +] + +[[package]] +name = "foyer-memory" +version = "0.17.4" +source = "git+https://github.com/foyer-rs/foyer?rev=88ac58b5893dc06dfda79f361eefc75c3c60d75a#88ac58b5893dc06dfda79f361eefc75c3c60d75a" +dependencies = [ + "arc-swap", + "bitflags 2.9.1", + "cmsketch", + "equivalent", + "foyer-common", + "foyer-intrusive-collections", + "hashbrown 0.15.3", + "itertools 0.14.0", + "madsim-tokio", + "mixtrics", + "parking_lot 0.12.3", + "pin-project", + "serde", + "thiserror 2.0.12", + "tokio", + "tracing", +] + +[[package]] +name = "foyer-storage" +version = "0.17.4" +source = "git+https://github.com/foyer-rs/foyer?rev=88ac58b5893dc06dfda79f361eefc75c3c60d75a#88ac58b5893dc06dfda79f361eefc75c3c60d75a" +dependencies = [ + "allocator-api2", + "anyhow", + "auto_enums", + "bytes", + "equivalent", + "flume", + "foyer-common", + "foyer-memory", + "fs4", + "futures-core", + "futures-util", + "itertools 0.14.0", + "libc", + "lz4", + "madsim-tokio", + "ordered_hash_map", + "parking_lot 0.12.3", + "paste", + "pin-project", + "rand 0.9.1", + "thiserror 2.0.12", + "tokio", + "tracing", + "twox-hash", + "zstd", +] + [[package]] name = "fs2" version = "0.4.3" @@ -3126,6 +3298,16 @@ dependencies = [ "winapi", ] +[[package]] +name = "fs4" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8640e34b88f7652208ce9e88b1a37a2ae95227d84abec377ccd3c5cfeb141ed4" +dependencies = [ + "rustix 1.0.7", + "windows-sys 0.59.0", +] + [[package]] name = "fs_extra" version = "1.3.0" @@ -3577,6 +3759,15 @@ dependencies = [ "ahash 0.7.8", ] +[[package]] +name = "hashbrown" +version = "0.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43a3c133739dddd0d2990f9a4bdf8eb4b21ef50e4851ca85ab661199821d510e" +dependencies = [ + "ahash 0.8.12", +] + [[package]] name = "hashbrown" version = "0.14.5" @@ -4695,6 +4886,25 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154" +[[package]] +name = "lz4" +version = "1.28.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a20b523e860d03443e98350ceaac5e71c6ba89aea7d960769ec3ce37f4de5af4" +dependencies = [ + "lz4-sys", +] + +[[package]] +name = "lz4-sys" +version = "1.11.1+lz4-1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6bd8c0d6c6ed0cd30b3652886bb8711dc4bb01d637a68105a3d5158039b418e6" +dependencies = [ + "cc", + "libc", +] + [[package]] name = "mac" version = "0.1.1" @@ -4749,6 +4959,60 @@ dependencies = [ "syn 2.0.101", ] +[[package]] +name = "madsim" +version = "0.2.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db6694555643da293dfb89e33c2880a13b62711d64b6588bc7df6ce4110b27f1" +dependencies = [ + "ahash 0.8.12", + "async-channel 2.3.1", + "async-stream", + "async-task", + "bincode", + "bytes", + "downcast-rs", + "futures-util", + "lazy_static", + "libc", + "madsim-macros", + "naive-timer", + "panic-message", + "rand 0.8.5", + "rand_xoshiro", + "rustversion", + "serde", + "spin", + "tokio", + "tokio-util", + "toml", + "tracing", + "tracing-subscriber", +] + +[[package]] +name = "madsim-macros" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3d248e97b1a48826a12c3828d921e8548e714394bf17274dd0a93910dc946e1" +dependencies = [ + "darling 0.14.4", + "proc-macro2", + "quote", + "syn 1.0.109", +] + +[[package]] +name = "madsim-tokio" +version = "0.2.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d3eb2acc57c82d21d699119b859e2df70a91dbdb84734885a1e72be83bdecb5" +dependencies = [ + "madsim", + "spin", + "tokio", +] + [[package]] name = "maplit" version = "1.0.2" @@ -4836,6 +5100,15 @@ dependencies = [ "autocfg", ] +[[package]] +name = "memoffset" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "488016bfae457b036d996092f6cb448677611ce4449e970ceaf42695203f218a" +dependencies = [ + "autocfg", +] + [[package]] name = "metrics" version = "0.24.2" @@ -4948,6 +5221,16 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "mixtrics" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "adbcddf5a90b959eea97ae505e0391f5c6dd411fbf546d43b9c59ad1c3bd4391" +dependencies = [ + "itertools 0.14.0", + "parking_lot 0.12.3", +] + [[package]] name = "moka" version = "0.12.10" @@ -5008,7 +5291,7 @@ dependencies = [ "sha2", "socket2", "stringprep", - "strsim", + "strsim 0.11.1", "take_mut", "thiserror 1.0.69", "tokio", @@ -5088,6 +5371,12 @@ version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1d87ecb2933e8aeadb3e3a02b828fed80a7528047e68b4f424523a0981a3a084" +[[package]] +name = "naive-timer" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "034a0ad7deebf0c2abcf2435950a6666c3c15ea9d8fad0c0f48efa8a7f843fed" + [[package]] name = "nanoid" version = "0.4.0" @@ -5176,7 +5465,7 @@ dependencies = [ "bitflags 1.3.2", "cfg-if", "libc", - "memoffset", + "memoffset 0.7.1", "pin-utils", ] @@ -5387,6 +5676,7 @@ dependencies = [ "fastrace-jaeger", "flume", "foundationdb", + "foyer", "futures", "futures-rustls", "getrandom 0.2.16", @@ -5736,6 +6026,15 @@ dependencies = [ "hashbrown 0.14.5", ] +[[package]] +name = "ordered_hash_map" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab0e5f22bf6dd04abd854a8874247813a8fa2c8c1260eba6fbb150270ce7c176" +dependencies = [ + "hashbrown 0.13.2", +] + [[package]] name = "os_pipe" version = "1.2.1" @@ -5793,6 +6092,12 @@ dependencies = [ "sha2", ] +[[package]] +name = "panic-message" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "384e52fd8fbd4cbe3c317e8216260c21a0f9134de108cea8a4dd4e7e152c472d" + [[package]] name = "parking" version = "2.2.1" @@ -6726,6 +7031,15 @@ dependencies = [ "rand_core 0.5.1", ] +[[package]] +name = "rand_xoshiro" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f97cdb2a36ed4183de61b2f824cc45c9f1037f28afe0a322e9fff4c108b5aaa" +dependencies = [ + "rand_core 0.6.4", +] + [[package]] name = "raw-cpuid" version = "11.5.0" @@ -7594,6 +7908,15 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_spanned" +version = "0.6.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf41e0cfaf7226dca15e8197172c295a782857fcb97fad1808a166870dee75a3" +dependencies = [ + "serde", +] + [[package]] name = "serde_urlencoded" version = "0.7.1" @@ -7630,7 +7953,7 @@ version = "3.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8d00caa5193a3c8362ac2b73be6b9e768aa5a4b2f721d8f4b339600c3cb51f8e" dependencies = [ - "darling", + "darling 0.20.11", "proc-macro2", "quote", "syn 2.0.101", @@ -8191,6 +8514,12 @@ dependencies = [ "unicode-properties", ] +[[package]] +name = "strsim" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" + [[package]] name = "strsim" version = "0.11.1" @@ -8379,7 +8708,7 @@ dependencies = [ "sha2", "snap", "storekey", - "strsim", + "strsim 0.11.1", "subtle", "sysinfo", "thiserror 1.0.69", @@ -8818,23 +9147,47 @@ dependencies = [ "tokio", ] +[[package]] +name = "toml" +version = "0.8.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc1beb996b9d83529a9e75c17a1686767d148d70663143c7854d8b4a09ced362" +dependencies = [ + "serde", + "serde_spanned", + "toml_datetime", + "toml_edit", +] + [[package]] name = "toml_datetime" -version = "0.6.9" +version = "0.6.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3da5db5a963e24bc68be8b17b6fa82814bb22ee8660f192bb182771d498f09a3" +checksum = "22cddaf88f4fbc13c51aebbf5f8eceb5c7c5a9da2ac40a13519eb5b0a0e8f11c" +dependencies = [ + "serde", +] [[package]] name = "toml_edit" -version = "0.22.26" +version = "0.22.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "310068873db2c5b3e7659d2cc35d21855dbafa50d1ce336397c666e3cb08137e" +checksum = "41fe8c660ae4257887cf66394862d21dbca4a6ddd26f04a3560410406a2f819a" dependencies = [ "indexmap 2.9.0", + "serde", + "serde_spanned", "toml_datetime", + "toml_write", "winnow", ] +[[package]] +name = "toml_write" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d99f8c9a7727884afe522e9bd5edbfc91a3312b36a77b5fb8926e4c31a41801" + [[package]] name = "tonic" version = "0.10.2" @@ -9148,6 +9501,15 @@ dependencies = [ "utf-8", ] +[[package]] +name = "twox-hash" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b907da542cbced5261bd3256de1b3a1bf340a3d37f93425a07362a1d687de56" +dependencies = [ + "rand 0.9.1", +] + [[package]] name = "typed-builder" version = "0.10.0" @@ -10249,3 +10611,31 @@ checksum = "70b40401a28d86ce16a330b863b86fd7dbee4d7c940587ab09ab8c019f9e3fdf" dependencies = [ "num-traits", ] + +[[package]] +name = "zstd" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e91ee311a569c327171651566e07972200e76fcfe2242a4fa446149a3881c08a" +dependencies = [ + "zstd-safe", +] + +[[package]] +name = "zstd-safe" +version = "7.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f49c4d5f0abb602a93fb8736af2a4f4dd9512e36f7f570d66e65ff867ed3b9d" +dependencies = [ + "zstd-sys", +] + +[[package]] +name = "zstd-sys" +version = "2.0.15+zstd.1.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb81183ddd97d0c74cedf1d50d85c8d08c1b8b68ee863bdee9e706eedba1a237" +dependencies = [ + "cc", + "pkg-config", +] diff --git a/core/Cargo.toml b/core/Cargo.toml index 447660b974e9..54882fca263a 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -109,6 +109,8 @@ layers-await-tree = ["dep:await-tree"] layers-async-backtrace = ["dep:async-backtrace"] # Enable dtrace support. layers-dtrace = ["dep:probe"] +# Enable foyer support. +layers-foyer = ["dep:foyer"] services-aliyun-drive = [] services-alluxio = [] @@ -378,6 +380,10 @@ web-sys = { version = "0.3.77", optional = true, features = [ "StorageManager", "FileSystemGetFileOptions", ] } +# for services-foyer +# TODO(MrCroxx): Switch to a released version after testing with OpenDAL integration. +foyer = { git = "https://github.com/foyer-rs/foyer", rev = "88ac58b5893dc06dfda79f361eefc75c3c60d75a", features = [ +], optional = true } # Layers # for layers-async-backtrace diff --git a/core/src/layers/foyer.rs b/core/src/layers/foyer.rs new file mode 100644 index 000000000000..e67a9a38a330 --- /dev/null +++ b/core/src/layers/foyer.rs @@ -0,0 +1,304 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{ + future::Future, + ops::{Bound, Deref, RangeBounds}, + sync::Arc, +}; + +use foyer::{Code, CodeError, HybridCache}; + +use crate::{ + raw::{ + oio::{self, QueueBuf, Read as _}, + Access, AccessorInfo, BytesContentRange, BytesRange, Layer, LayeredAccess, MaybeSend, + OpCopy, OpCreateDir, OpDelete, OpList, OpPresign, OpRead, OpRename, OpStat, OpWrite, + RpCopy, RpCreateDir, RpDelete, RpList, RpPresign, RpRead, RpRename, RpStat, RpWrite, + }, + Buffer, Error, ErrorKind, Result, +}; + +#[derive(Debug)] +pub struct Value(Buffer); + +impl Deref for Value { + type Target = Buffer; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl From for Value { + fn from(buf: Buffer) -> Self { + Value(buf) + } +} + +impl Code for Value { + fn encode(&self, writer: &mut impl std::io::Write) -> std::result::Result<(), CodeError> { + let len = self.0.len() as u64; + writer.write_all(&len.to_le_bytes())?; + std::io::copy(&mut self.0.clone(), writer)?; + Ok(()) + } + + fn decode(reader: &mut impl std::io::Read) -> std::result::Result + where + Self: Sized, + { + let mut len_bytes = [0u8; 8]; + reader.read_exact(&mut len_bytes)?; + let len = u64::from_le_bytes(len_bytes) as usize; + let mut buffer = vec![0u8; len]; + reader.read_exact(&mut buffer[..len])?; + Ok(Value(buffer.into())) + } + + fn estimated_size(&self) -> usize { + 8 + self.0.len() + } +} + +#[derive(Debug)] +pub struct FoyerLayerBuilder {} + +#[derive(Debug)] +pub struct FoyerLayer {} + +impl Layer for FoyerLayer { + type LayeredAccess = FoyerAccessor; + + fn layer(&self, inner: A) -> Self::LayeredAccess { + todo!() + } +} + +#[derive(Debug)] +struct Inner { + accessor: A, + cache: HybridCache, +} + +#[derive(Debug)] +pub struct FoyerAccessor { + inner: Arc>, +} + +impl LayeredAccess for FoyerAccessor { + type Inner = A; + type Reader = Buffer; + type Writer = Writer; + type Lister = A::Lister; + type Deleter = Deleter; + + fn inner(&self) -> &Self::Inner { + &self.inner.accessor + } + + fn read( + &self, + path: &str, + args: OpRead, + ) -> impl Future> + MaybeSend { + let r = args.range(); + let path = path.to_string(); + async move { + let entry = self + .inner + .cache + .fetch(path.clone(), || { + let inner = self.inner.clone(); + async move { + let (_, mut reader) = inner + .accessor + .read(&path, args.with_range(BytesRange::new(0, None))) + .await?; + let buffer = reader.read_all().await?; + Ok(buffer.into()) + } + }) + .await + .map_err(|e| { + // TODO(MrCroxx): Expose internal opendal error from foyer. + Error::new(ErrorKind::Unexpected, e.to_string()) + })?; + + let r = r.to_range(); + let start = match r.start_bound() { + Bound::Included(i) => *i, + Bound::Excluded(i) => *i + 1, + Bound::Unbounded => 0, + }; + let end = match r.end_bound() { + Bound::Included(i) => *i + 1, + Bound::Excluded(i) => *i, + Bound::Unbounded => entry.len() as u64, + }; + let range = BytesContentRange::default() + .with_range(start, end - 1) + .with_size(entry.len() as _); + let buffer = entry.slice(start as usize..end as usize); + let rp = RpRead::new() + .with_size(Some(buffer.len() as _)) + .with_range(Some(range)); + Ok((rp, buffer)) + } + } + + fn write( + &self, + path: &str, + args: OpWrite, + ) -> impl Future> + MaybeSend { + let inner = self.inner.clone(); + async move { + let (rp, w) = self.inner.accessor.write(path, args).await?; + Ok(( + rp, + Writer { + w, + q: Some(QueueBuf::new()), + path: path.to_string(), + inner, + }, + )) + } + } + + fn delete(&self) -> impl Future> + MaybeSend { + let inner = self.inner.clone(); + async move { + let (rp, d) = inner.accessor.delete().await?; + Ok(( + rp, + Deleter { + d, + keys: vec![], + inner, + }, + )) + } + } + + fn list( + &self, + path: &str, + args: OpList, + ) -> impl Future> + MaybeSend { + self.inner.accessor.list(path, args) + } + + fn info(&self) -> std::sync::Arc { + self.inner.accessor.info() + } + + fn create_dir( + &self, + path: &str, + args: OpCreateDir, + ) -> impl Future> + MaybeSend { + self.inner.accessor.create_dir(path, args) + } + + fn copy( + &self, + from: &str, + to: &str, + args: OpCopy, + ) -> impl Future> + MaybeSend { + // TODO(MrCroxx): Implement copy with foyer cache. + self.inner.accessor.copy(from, to, args) + } + + fn rename( + &self, + from: &str, + to: &str, + args: OpRename, + ) -> impl Future> + MaybeSend { + // TODO(MrCroxx): Implement copy with foyer cache. + self.inner.accessor.rename(from, to, args) + } + + fn stat(&self, path: &str, args: OpStat) -> impl Future> + MaybeSend { + // TODO(MrCroxx): Implement copy with foyer cache. + self.inner.accessor.stat(path, args) + } + + fn presign( + &self, + path: &str, + args: OpPresign, + ) -> impl Future> + MaybeSend { + self.inner.accessor.presign(path, args) + } +} + +pub struct Writer { + w: A::Writer, + q: Option, + path: String, + inner: Arc>, +} + +impl oio::Write for Writer { + fn write(&mut self, bs: Buffer) -> impl Future> + MaybeSend { + async { + self.q.as_mut().unwrap().push(bs.clone()); + self.w.write(bs).await + } + } + + fn close(&mut self) -> impl Future> + MaybeSend { + async { + let buffer = self.q.take().unwrap().collect(); + let res = self.w.close().await; + self.inner.cache.insert(self.path.clone(), buffer.into()); + res + } + } + + fn abort(&mut self) -> impl Future> + MaybeSend { + self.w.abort() + } +} + +pub struct Deleter { + d: A::Deleter, + keys: Vec, + inner: Arc>, +} + +impl oio::Delete for Deleter { + fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> { + self.d.delete(path, args.clone())?; + self.keys.push(path.to_string()); + Ok(()) + } + + fn flush(&mut self) -> impl Future> + MaybeSend { + async { + for key in &self.keys { + self.inner.cache.remove(key); + } + let res = self.d.flush().await; + res + } + } +} diff --git a/core/src/layers/mod.rs b/core/src/layers/mod.rs index 50e58c280914..a67ec062dfec 100644 --- a/core/src/layers/mod.rs +++ b/core/src/layers/mod.rs @@ -119,6 +119,11 @@ mod dtrace; #[cfg(all(target_os = "linux", feature = "layers-dtrace"))] pub use self::dtrace::DtraceLayer; +#[cfg(feature = "layers-foyer")] +mod foyer; +#[cfg(feature = "layers-foyer")] +pub use self::foyer::FoyerLayer; + pub mod observe; mod correctness_check; From cb63bb738449a34f6a99a9fbfd99a0d06443e4aa Mon Sep 17 00:00:00 2001 From: MrCroxx Date: Fri, 4 Jul 2025 15:39:55 +0800 Subject: [PATCH 02/20] test: add FoyerLayer unit tests Signed-off-by: MrCroxx --- core/Cargo.lock | 36 ++++++++++++ core/Cargo.toml | 2 + core/src/layers/foyer.rs | 121 +++++++++++++++++++++++++++++++++++++-- 3 files changed, 154 insertions(+), 5 deletions(-) diff --git a/core/Cargo.lock b/core/Cargo.lock index f7f73b17167e..be75074837e5 100644 --- a/core/Cargo.lock +++ b/core/Cargo.lock @@ -2877,6 +2877,18 @@ dependencies = [ "regex", ] +[[package]] +name = "env_logger" +version = "0.11.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13c863f0904021b108aa8b2f55046443e6b1ebde8fd4a15c399893aae4fa069f" +dependencies = [ + "anstream", + "anstyle", + "env_filter", + "log", +] + [[package]] name = "equivalent" version = "1.0.2" @@ -5728,6 +5740,8 @@ dependencies = [ "sqlx", "suppaftp", "surrealdb", + "tempfile", + "test-log", "tikv-client", "tokio", "tracing", @@ -8850,6 +8864,28 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "test-log" +version = "0.2.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e33b98a582ea0be1168eba097538ee8dd4bbe0f2b01b22ac92ea30054e5be7b" +dependencies = [ + "env_logger", + "test-log-macros", + "tracing-subscriber", +] + +[[package]] +name = "test-log-macros" +version = "0.2.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "451b374529930d7601b1eef8d32bc79ae870b6079b069401709c2a8bf9e75f36" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.101", +] + [[package]] name = "thin-vec" version = "0.2.14" diff --git a/core/Cargo.toml b/core/Cargo.toml index 54882fca263a..7e741422104c 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -433,6 +433,8 @@ pretty_assertions = "1" rand = "0.8" sha2 = "0.10" size = "0.4" +tempfile = "3" +test-log = { version = "0.2", features = ["trace", "color"] } tokio = { version = "1.27", features = ["fs", "macros", "rt-multi-thread"] } tracing-opentelemetry = "0.31.0" tracing-subscriber = { version = "0.3", features = [ diff --git a/core/src/layers/foyer.rs b/core/src/layers/foyer.rs index e67a9a38a330..9faa2bd007fe 100644 --- a/core/src/layers/foyer.rs +++ b/core/src/layers/foyer.rs @@ -75,17 +75,47 @@ impl Code for Value { } } +/// Hybrid cache layer for OpenDAL that uses [foyer](https://github.com/foyer-rs/foyer) for caching. +/// +/// # Operation Behavior +/// - `write`: [`FoyerLayer`] will write to the foyer hybrid cache after the service's write operation is completed. +/// - `read`: [`FoyerLayer`] will first check the foyer hybrid cache for the data. If the data is not found, it will perform the read operation on the service and cache the result. +/// - `delete`: [`FoyerLayer`] will remove the data from the foyer hybrid cache regardless of whether the service's delete operation is successful. +/// - Other operations: [`FoyerLayer`] will not cache the results of other operations, such as `list`, `copy`, `rename`, etc. They will be passed through to the underlying accessor without caching. +/// +/// # Examples +/// +/// ```rust +/// use opendal::layers::FoyerLayer; +/// use opendal::services::S3; +/// +/// ``` #[derive(Debug)] -pub struct FoyerLayerBuilder {} +pub struct FoyerLayer { + cache: HybridCache, +} -#[derive(Debug)] -pub struct FoyerLayer {} +impl FoyerLayer { + /// Creates a new `FoyerLayer` with the given foyer hybrid cache. + pub fn new(cache: HybridCache) -> Self { + FoyerLayer { cache } + } +} + +impl From> for FoyerLayer { + fn from(cache: HybridCache) -> Self { + Self::new(cache) + } +} impl Layer for FoyerLayer { type LayeredAccess = FoyerAccessor; - fn layer(&self, inner: A) -> Self::LayeredAccess { - todo!() + fn layer(&self, accessor: A) -> Self::LayeredAccess { + let cache = self.cache.clone(); + FoyerAccessor { + inner: Arc::new(Inner { accessor, cache }), + } } } @@ -302,3 +332,84 @@ impl oio::Delete for Deleter { } } } + +#[cfg(test)] +mod tests { + + use foyer::{ + DirectFsDeviceOptions, Engine, HybridCacheBuilder, LargeEngineOptions, RecoverMode, + }; + use size::consts::MiB; + + use crate::{services::Dashmap, Operator}; + + use super::*; + + fn key(i: u8) -> String { + format!("obj-{i}") + } + + fn value(i: u8) -> Vec { + // ~ 64KiB with metadata + vec![i; 63 * 1024] + } + + #[test_log::test(tokio::test)] + async fn test() { + let dir = tempfile::tempdir().unwrap(); + + let cache = HybridCacheBuilder::new() + .memory(10) + .with_shards(1) + .storage(Engine::Large(LargeEngineOptions::new())) + .with_device_options( + DirectFsDeviceOptions::new(dir.path()) + .with_capacity(16 * MiB as usize) + .with_file_size(1 * MiB as usize), + ) + .with_recover_mode(RecoverMode::None) + .build() + .await + .unwrap(); + + let op = Operator::new(Dashmap::default()) + .unwrap() + .layer(FoyerLayer::new(cache.clone())) + .finish(); + + assert!(op.list("/").await.unwrap().is_empty()); + + for i in 0..64 { + op.write(&key(i), value(i)).await.unwrap(); + } + + assert_eq!(op.list("/").await.unwrap().len(), 64); + + for i in 0..64 { + let buf = op.read(&key(i)).await.unwrap(); + assert_eq!(buf.to_vec(), value(i)); + } + + cache.clear().await.unwrap(); + + for i in 0..64 { + let buf = op.read(&key(i)).await.unwrap(); + assert_eq!(buf.to_vec(), value(i)); + } + + for i in 0..64 { + op.delete(&key(i)).await.unwrap(); + } + + assert!(op.list("/").await.unwrap().is_empty()); + + for i in 0..64 { + let res = op.read(&key(i)).await; + // FIXME(MrCroxx): Fix foyer `remove()` when entry in disk cache write buffer. + let _ = res.unwrap_err(); + // TODO(MrCroxx): Uncomment this assertion after the exposion of internal opendal error from foyer. + // Currently, the error is [`ErrorKind::Unexpected`]. + // assert_eq!(e.kind(), ErrorKind::NotFound); + } + } +} From d636153f1e43f5c79e7d0b4857e456cb4a691883 Mon Sep 17 00:00:00 2001 From: MrCroxx Date: Sun, 6 Jul 2025 14:44:59 +0800 Subject: [PATCH 03/20] fix: bump foyer and fix bugs in deletion Signed-off-by: MrCroxx --- core/Cargo.lock | 8 ++++---- core/Cargo.toml | 2 +- core/src/layers/foyer.rs | 1 - 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/core/Cargo.lock b/core/Cargo.lock index be75074837e5..ee49b2446684 100644 --- a/core/Cargo.lock +++ b/core/Cargo.lock @@ -3204,7 +3204,7 @@ dependencies = [ [[package]] name = "foyer" version = "0.17.4" -source = "git+https://github.com/foyer-rs/foyer?rev=88ac58b5893dc06dfda79f361eefc75c3c60d75a#88ac58b5893dc06dfda79f361eefc75c3c60d75a" +source = "git+https://github.com/foyer-rs/foyer?rev=80a48d5aa243e92582823cd7594e2958063e3f3c#80a48d5aa243e92582823cd7594e2958063e3f3c" dependencies = [ "anyhow", "equivalent", @@ -3222,7 +3222,7 @@ dependencies = [ [[package]] name = "foyer-common" version = "0.17.4" -source = "git+https://github.com/foyer-rs/foyer?rev=88ac58b5893dc06dfda79f361eefc75c3c60d75a#88ac58b5893dc06dfda79f361eefc75c3c60d75a" +source = "git+https://github.com/foyer-rs/foyer?rev=80a48d5aa243e92582823cd7594e2958063e3f3c#80a48d5aa243e92582823cd7594e2958063e3f3c" dependencies = [ "bytes", "cfg-if", @@ -3248,7 +3248,7 @@ dependencies = [ [[package]] name = "foyer-memory" version = "0.17.4" -source = "git+https://github.com/foyer-rs/foyer?rev=88ac58b5893dc06dfda79f361eefc75c3c60d75a#88ac58b5893dc06dfda79f361eefc75c3c60d75a" +source = "git+https://github.com/foyer-rs/foyer?rev=80a48d5aa243e92582823cd7594e2958063e3f3c#80a48d5aa243e92582823cd7594e2958063e3f3c" dependencies = [ "arc-swap", "bitflags 2.9.1", @@ -3271,7 +3271,7 @@ dependencies = [ [[package]] name = "foyer-storage" version = "0.17.4" -source = "git+https://github.com/foyer-rs/foyer?rev=88ac58b5893dc06dfda79f361eefc75c3c60d75a#88ac58b5893dc06dfda79f361eefc75c3c60d75a" +source = "git+https://github.com/foyer-rs/foyer?rev=80a48d5aa243e92582823cd7594e2958063e3f3c#80a48d5aa243e92582823cd7594e2958063e3f3c" dependencies = [ "allocator-api2", "anyhow", diff --git a/core/Cargo.toml b/core/Cargo.toml index 7e741422104c..f61c8be4190b 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -382,7 +382,7 @@ web-sys = { version = "0.3.77", optional = true, features = [ ] } # for services-foyer # TODO(MrCroxx): Switch to a released version after testing with OpenDAL integration. -foyer = { git = "https://github.com/foyer-rs/foyer", rev = "88ac58b5893dc06dfda79f361eefc75c3c60d75a", features = [ +foyer = { git = "https://github.com/foyer-rs/foyer", rev = "80a48d5aa243e92582823cd7594e2958063e3f3c", features = [ ], optional = true } # Layers diff --git a/core/src/layers/foyer.rs b/core/src/layers/foyer.rs index 9faa2bd007fe..ee388a9ce92e 100644 --- a/core/src/layers/foyer.rs +++ b/core/src/layers/foyer.rs @@ -405,7 +405,6 @@ mod tests { for i in 0..64 { let res = op.read(&key(i)).await; - // FIXME(MrCroxx): Fix foyer `remove()` when entry in disk cache write buffer. let _ = res.unwrap_err(); // TODO(MrCroxx): Uncomment this assertion after the exposion of internal opendal error from foyer. // Currently, the error is [`ErrorKind::Unexpected`]. From a3020648711d0d083fdab9e00bdc8ddc1ec9bc30 Mon Sep 17 00:00:00 2001 From: MrCroxx Date: Sun, 6 Jul 2025 15:37:30 +0800 Subject: [PATCH 04/20] chore: bump foyer version included in the main branch Signed-off-by: MrCroxx --- core/Cargo.lock | 8 ++++---- core/Cargo.toml | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/core/Cargo.lock b/core/Cargo.lock index ee49b2446684..b05c500dfdd2 100644 --- a/core/Cargo.lock +++ b/core/Cargo.lock @@ -3204,7 +3204,7 @@ dependencies = [ [[package]] name = "foyer" version = "0.17.4" -source = "git+https://github.com/foyer-rs/foyer?rev=80a48d5aa243e92582823cd7594e2958063e3f3c#80a48d5aa243e92582823cd7594e2958063e3f3c" +source = "git+https://github.com/foyer-rs/foyer?rev=03cb77aaf807964937c2f18286668cd4c19798d1#03cb77aaf807964937c2f18286668cd4c19798d1" dependencies = [ "anyhow", "equivalent", @@ -3222,7 +3222,7 @@ dependencies = [ [[package]] name = "foyer-common" version = "0.17.4" -source = "git+https://github.com/foyer-rs/foyer?rev=80a48d5aa243e92582823cd7594e2958063e3f3c#80a48d5aa243e92582823cd7594e2958063e3f3c" +source = "git+https://github.com/foyer-rs/foyer?rev=03cb77aaf807964937c2f18286668cd4c19798d1#03cb77aaf807964937c2f18286668cd4c19798d1" dependencies = [ "bytes", "cfg-if", @@ -3248,7 +3248,7 @@ dependencies = [ [[package]] name = "foyer-memory" version = "0.17.4" -source = "git+https://github.com/foyer-rs/foyer?rev=80a48d5aa243e92582823cd7594e2958063e3f3c#80a48d5aa243e92582823cd7594e2958063e3f3c" +source = "git+https://github.com/foyer-rs/foyer?rev=03cb77aaf807964937c2f18286668cd4c19798d1#03cb77aaf807964937c2f18286668cd4c19798d1" dependencies = [ "arc-swap", "bitflags 2.9.1", @@ -3271,7 +3271,7 @@ dependencies = [ [[package]] name = "foyer-storage" version = "0.17.4" -source = "git+https://github.com/foyer-rs/foyer?rev=80a48d5aa243e92582823cd7594e2958063e3f3c#80a48d5aa243e92582823cd7594e2958063e3f3c" +source = "git+https://github.com/foyer-rs/foyer?rev=03cb77aaf807964937c2f18286668cd4c19798d1#03cb77aaf807964937c2f18286668cd4c19798d1" dependencies = [ "allocator-api2", "anyhow", diff --git a/core/Cargo.toml b/core/Cargo.toml index f61c8be4190b..8e7e16e01e6c 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -382,7 +382,7 @@ web-sys = { version = "0.3.77", optional = true, features = [ ] } # for services-foyer # TODO(MrCroxx): Switch to a released version after testing with OpenDAL integration. -foyer = { git = "https://github.com/foyer-rs/foyer", rev = "80a48d5aa243e92582823cd7594e2958063e3f3c", features = [ +foyer = { git = "https://github.com/foyer-rs/foyer", rev = "03cb77aaf807964937c2f18286668cd4c19798d1", features = [ ], optional = true } # Layers From a9541bc9983f92f7667f937d8c9c1cdbd9655c7f Mon Sep 17 00:00:00 2001 From: MrCroxx Date: Mon, 7 Jul 2025 23:03:05 +0800 Subject: [PATCH 05/20] doc: fix typos Signed-off-by: MrCroxx --- .../{0000_foyer_integration.md => 6370_foyer_integration.md} | 0 core/src/layers/foyer.rs | 2 +- 2 files changed, 1 insertion(+), 1 deletion(-) rename core/src/docs/rfcs/{0000_foyer_integration.md => 6370_foyer_integration.md} (100%) diff --git a/core/src/docs/rfcs/0000_foyer_integration.md b/core/src/docs/rfcs/6370_foyer_integration.md similarity index 100% rename from core/src/docs/rfcs/0000_foyer_integration.md rename to core/src/docs/rfcs/6370_foyer_integration.md diff --git a/core/src/layers/foyer.rs b/core/src/layers/foyer.rs index ee388a9ce92e..3c7ede8a428b 100644 --- a/core/src/layers/foyer.rs +++ b/core/src/layers/foyer.rs @@ -406,7 +406,7 @@ mod tests { for i in 0..64 { let res = op.read(&key(i)).await; let _ = res.unwrap_err(); - // TODO(MrCroxx): Uncomment this assertion after the exposion of internal opendal error from foyer. + // TODO(MrCroxx): Uncomment this assertion after the explosion of internal opendal error from foyer. // Currently, the error is [`ErrorKind::Unexpected`]. // assert_eq!(e.kind(), ErrorKind::NotFound); } From 5362d92a8be20fe88865fef57b6df95377977691 Mon Sep 17 00:00:00 2001 From: MrCroxx Date: Tue, 8 Jul 2025 14:48:03 +0800 Subject: [PATCH 06/20] fix: bump foyer version, expose error from foyer Signed-off-by: MrCroxx --- core/Cargo.lock | 10 +++++----- core/Cargo.toml | 2 +- core/src/layers/foyer.rs | 34 +++++++++++++++++++++++----------- 3 files changed, 29 insertions(+), 17 deletions(-) diff --git a/core/Cargo.lock b/core/Cargo.lock index d3f702a88111..5b9440845552 100644 --- a/core/Cargo.lock +++ b/core/Cargo.lock @@ -3204,9 +3204,8 @@ dependencies = [ [[package]] name = "foyer" version = "0.17.4" -source = "git+https://github.com/foyer-rs/foyer?rev=03cb77aaf807964937c2f18286668cd4c19798d1#03cb77aaf807964937c2f18286668cd4c19798d1" +source = "git+https://github.com/foyer-rs/foyer?rev=4b50f6ddcf4c922f6f7caab8510b3fee3598fbb4#4b50f6ddcf4c922f6f7caab8510b3fee3598fbb4" dependencies = [ - "anyhow", "equivalent", "foyer-common", "foyer-memory", @@ -3215,6 +3214,7 @@ dependencies = [ "mixtrics", "pin-project", "serde", + "thiserror 2.0.12", "tokio", "tracing", ] @@ -3222,7 +3222,7 @@ dependencies = [ [[package]] name = "foyer-common" version = "0.17.4" -source = "git+https://github.com/foyer-rs/foyer?rev=03cb77aaf807964937c2f18286668cd4c19798d1#03cb77aaf807964937c2f18286668cd4c19798d1" +source = "git+https://github.com/foyer-rs/foyer?rev=4b50f6ddcf4c922f6f7caab8510b3fee3598fbb4#4b50f6ddcf4c922f6f7caab8510b3fee3598fbb4" dependencies = [ "bytes", "cfg-if", @@ -3248,7 +3248,7 @@ dependencies = [ [[package]] name = "foyer-memory" version = "0.17.4" -source = "git+https://github.com/foyer-rs/foyer?rev=03cb77aaf807964937c2f18286668cd4c19798d1#03cb77aaf807964937c2f18286668cd4c19798d1" +source = "git+https://github.com/foyer-rs/foyer?rev=4b50f6ddcf4c922f6f7caab8510b3fee3598fbb4#4b50f6ddcf4c922f6f7caab8510b3fee3598fbb4" dependencies = [ "arc-swap", "bitflags 2.9.1", @@ -3271,7 +3271,7 @@ dependencies = [ [[package]] name = "foyer-storage" version = "0.17.4" -source = "git+https://github.com/foyer-rs/foyer?rev=03cb77aaf807964937c2f18286668cd4c19798d1#03cb77aaf807964937c2f18286668cd4c19798d1" +source = "git+https://github.com/foyer-rs/foyer?rev=4b50f6ddcf4c922f6f7caab8510b3fee3598fbb4#4b50f6ddcf4c922f6f7caab8510b3fee3598fbb4" dependencies = [ "allocator-api2", "anyhow", diff --git a/core/Cargo.toml b/core/Cargo.toml index 080c5ef33c13..a64a9954a4d3 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -382,7 +382,7 @@ web-sys = { version = "0.3.77", optional = true, features = [ ] } # for services-foyer # TODO(MrCroxx): Switch to a released version after testing with OpenDAL integration. -foyer = { git = "https://github.com/foyer-rs/foyer", rev = "03cb77aaf807964937c2f18286668cd4c19798d1", features = [ +foyer = { git = "https://github.com/foyer-rs/foyer", rev = "4b50f6ddcf4c922f6f7caab8510b3fee3598fbb4", features = [ ], optional = true } # Layers diff --git a/core/src/layers/foyer.rs b/core/src/layers/foyer.rs index 3c7ede8a428b..ece3803f6d3a 100644 --- a/core/src/layers/foyer.rs +++ b/core/src/layers/foyer.rs @@ -21,7 +21,7 @@ use std::{ sync::Arc, }; -use foyer::{Code, CodeError, HybridCache}; +use foyer::{Code, CodeError, Error as FoyerError, HybridCache}; use crate::{ raw::{ @@ -33,6 +33,14 @@ use crate::{ Buffer, Error, ErrorKind, Result, }; +fn extract_err(e: FoyerError) -> Error { + let e = match e.downcast::() { + Ok(e) => return e, + Err(e) => e, + }; + Error::new(ErrorKind::Unexpected, e.to_string()) +} + #[derive(Debug)] pub struct Value(Buffer); @@ -158,16 +166,14 @@ impl LayeredAccess for FoyerAccessor { let (_, mut reader) = inner .accessor .read(&path, args.with_range(BytesRange::new(0, None))) - .await?; - let buffer = reader.read_all().await?; + .await + .map_err(FoyerError::other)?; + let buffer = reader.read_all().await.map_err(FoyerError::other)?; Ok(buffer.into()) } }) .await - .map_err(|e| { - // TODO(MrCroxx): Expose internal opendal error from foyer. - Error::new(ErrorKind::Unexpected, e.to_string()) - })?; + .map_err(extract_err)?; let r = r.to_range(); let start = match r.start_bound() { @@ -405,10 +411,16 @@ mod tests { for i in 0..64 { let res = op.read(&key(i)).await; - let _ = res.unwrap_err(); - // TODO(MrCroxx): Uncomment this assertion after the explosion of internal opendal error from foyer. - // Currently, the error is [`ErrorKind::Unexpected`]. - // assert_eq!(e.kind(), ErrorKind::NotFound); + let e = res.unwrap_err(); + assert_eq!(e.kind(), ErrorKind::NotFound); } } + + #[test] + fn test_error() { + let e = Error::new(ErrorKind::NotFound, "not found"); + let fe = FoyerError::other(e); + let oe = extract_err(fe); + assert_eq!(oe.kind(), ErrorKind::NotFound); + } } From c7bede36f7b9773713e247cef65d67bb5e1c09aa Mon Sep 17 00:00:00 2001 From: MrCroxx Date: Tue, 8 Jul 2025 15:25:16 +0800 Subject: [PATCH 07/20] chore: make clippy happer Signed-off-by: MrCroxx --- core/src/layers/foyer.rs | 40 +++++++++++++++++----------------------- 1 file changed, 17 insertions(+), 23 deletions(-) diff --git a/core/src/layers/foyer.rs b/core/src/layers/foyer.rs index ece3803f6d3a..17ef11051b96 100644 --- a/core/src/layers/foyer.rs +++ b/core/src/layers/foyer.rs @@ -30,7 +30,7 @@ use crate::{ OpCopy, OpCreateDir, OpDelete, OpList, OpPresign, OpRead, OpRename, OpStat, OpWrite, RpCopy, RpCreateDir, RpDelete, RpList, RpPresign, RpRead, RpRename, RpStat, RpWrite, }, - Buffer, Error, ErrorKind, Result, + Buffer, Error, ErrorKind, Metadata, Result, }; fn extract_err(e: FoyerError) -> Error { @@ -294,24 +294,20 @@ pub struct Writer { } impl oio::Write for Writer { - fn write(&mut self, bs: Buffer) -> impl Future> + MaybeSend { - async { - self.q.as_mut().unwrap().push(bs.clone()); - self.w.write(bs).await - } + async fn write(&mut self, bs: Buffer) -> Result<()> { + self.q.as_mut().unwrap().push(bs.clone()); + self.w.write(bs).await } - fn close(&mut self) -> impl Future> + MaybeSend { - async { - let buffer = self.q.take().unwrap().collect(); - let res = self.w.close().await; - self.inner.cache.insert(self.path.clone(), buffer.into()); - res - } + async fn close(&mut self) -> Result { + let buffer = self.q.take().unwrap().collect(); + let res = self.w.close().await; + self.inner.cache.insert(self.path.clone(), buffer.into()); + res } - fn abort(&mut self) -> impl Future> + MaybeSend { - self.w.abort() + async fn abort(&mut self) -> Result<()> { + self.w.abort().await } } @@ -328,14 +324,12 @@ impl oio::Delete for Deleter { Ok(()) } - fn flush(&mut self) -> impl Future> + MaybeSend { - async { - for key in &self.keys { - self.inner.cache.remove(key); - } - let res = self.d.flush().await; - res + async fn flush(&mut self) -> Result { + for key in &self.keys { + self.inner.cache.remove(key); } + let res = self.d.flush().await; + res } } @@ -371,7 +365,7 @@ mod tests { .with_device_options( DirectFsDeviceOptions::new(dir.path()) .with_capacity(16 * MiB as usize) - .with_file_size(1 * MiB as usize), + .with_file_size(MiB as usize), ) .with_recover_mode(RecoverMode::None) .build() From 78a393903d6b90c1ea31960fab37bb6927b177d3 Mon Sep 17 00:00:00 2001 From: MrCroxx Date: Sun, 13 Jul 2025 12:21:03 +0800 Subject: [PATCH 08/20] refactor: resolve comments Signed-off-by: MrCroxx --- core/Cargo.lock | 35 ------- core/Cargo.toml | 1 - core/src/layers/foyer.rs | 202 +++++++++++++-------------------------- core/src/layers/mod.rs | 2 +- 4 files changed, 69 insertions(+), 171 deletions(-) diff --git a/core/Cargo.lock b/core/Cargo.lock index 5b9440845552..8f3e7a61d75e 100644 --- a/core/Cargo.lock +++ b/core/Cargo.lock @@ -2877,18 +2877,6 @@ dependencies = [ "regex", ] -[[package]] -name = "env_logger" -version = "0.11.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "13c863f0904021b108aa8b2f55046443e6b1ebde8fd4a15c399893aae4fa069f" -dependencies = [ - "anstream", - "anstyle", - "env_filter", - "log", -] - [[package]] name = "equivalent" version = "1.0.2" @@ -5741,7 +5729,6 @@ dependencies = [ "suppaftp", "surrealdb", "tempfile", - "test-log", "tikv-client", "tokio", "tracing", @@ -8852,28 +8839,6 @@ dependencies = [ "windows-sys 0.59.0", ] -[[package]] -name = "test-log" -version = "0.2.18" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e33b98a582ea0be1168eba097538ee8dd4bbe0f2b01b22ac92ea30054e5be7b" -dependencies = [ - "env_logger", - "test-log-macros", - "tracing-subscriber", -] - -[[package]] -name = "test-log-macros" -version = "0.2.18" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "451b374529930d7601b1eef8d32bc79ae870b6079b069401709c2a8bf9e75f36" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.101", -] - [[package]] name = "thin-vec" version = "0.2.14" diff --git a/core/Cargo.toml b/core/Cargo.toml index a64a9954a4d3..126be7d5f4c3 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -434,7 +434,6 @@ rand = "0.8" sha2 = "0.10" size = "0.4" tempfile = "3" -test-log = { version = "0.2", features = ["trace", "color"] } tokio = { version = "1.27", features = ["fs", "macros", "rt-multi-thread"] } tracing-opentelemetry = "0.31.0" tracing-subscriber = { version = "0.3", features = [ diff --git a/core/src/layers/foyer.rs b/core/src/layers/foyer.rs index 17ef11051b96..83dc4df302fb 100644 --- a/core/src/layers/foyer.rs +++ b/core/src/layers/foyer.rs @@ -23,15 +23,9 @@ use std::{ use foyer::{Code, CodeError, Error as FoyerError, HybridCache}; -use crate::{ - raw::{ - oio::{self, QueueBuf, Read as _}, - Access, AccessorInfo, BytesContentRange, BytesRange, Layer, LayeredAccess, MaybeSend, - OpCopy, OpCreateDir, OpDelete, OpList, OpPresign, OpRead, OpRename, OpStat, OpWrite, - RpCopy, RpCreateDir, RpDelete, RpList, RpPresign, RpRead, RpRename, RpStat, RpWrite, - }, - Buffer, Error, ErrorKind, Metadata, Result, -}; +use crate::raw::oio::*; +use crate::raw::*; +use crate::*; fn extract_err(e: FoyerError) -> Error { let e = match e.downcast::() { @@ -41,10 +35,11 @@ fn extract_err(e: FoyerError) -> Error { Error::new(ErrorKind::Unexpected, e.to_string()) } +/// [`FoyerValue`] is a wrapper around `Buffer` that implements the `Code` trait. #[derive(Debug)] -pub struct Value(Buffer); +pub struct FoyerValue(pub Buffer); -impl Deref for Value { +impl Deref for FoyerValue { type Target = Buffer; fn deref(&self) -> &Self::Target { @@ -52,13 +47,7 @@ impl Deref for Value { } } -impl From for Value { - fn from(buf: Buffer) -> Self { - Value(buf) - } -} - -impl Code for Value { +impl Code for FoyerValue { fn encode(&self, writer: &mut impl std::io::Write) -> std::result::Result<(), CodeError> { let len = self.0.len() as u64; writer.write_all(&len.to_le_bytes())?; @@ -75,7 +64,7 @@ impl Code for Value { let len = u64::from_le_bytes(len_bytes) as usize; let mut buffer = vec![0u8; len]; reader.read_exact(&mut buffer[..len])?; - Ok(Value(buffer.into())) + Ok(FoyerValue(buffer.into())) } fn estimated_size(&self) -> usize { @@ -100,22 +89,16 @@ impl Code for Value { /// ``` #[derive(Debug)] pub struct FoyerLayer { - cache: HybridCache, + cache: HybridCache, } impl FoyerLayer { /// Creates a new `FoyerLayer` with the given foyer hybrid cache. - pub fn new(cache: HybridCache) -> Self { + pub fn new(cache: HybridCache) -> Self { FoyerLayer { cache } } } -impl From> for FoyerLayer { - fn from(cache: HybridCache) -> Self { - Self::new(cache) - } -} - impl Layer for FoyerLayer { type LayeredAccess = FoyerAccessor; @@ -130,7 +113,7 @@ impl Layer for FoyerLayer { #[derive(Debug)] struct Inner { accessor: A, - cache: HybridCache, + cache: HybridCache, } #[derive(Debug)] @@ -149,52 +132,49 @@ impl LayeredAccess for FoyerAccessor { &self.inner.accessor } - fn read( - &self, - path: &str, - args: OpRead, - ) -> impl Future> + MaybeSend { - let r = args.range(); + fn info(&self) -> Arc { + self.inner.accessor.info() + } + + async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { let path = path.to_string(); - async move { - let entry = self - .inner - .cache - .fetch(path.clone(), || { - let inner = self.inner.clone(); - async move { - let (_, mut reader) = inner - .accessor - .read(&path, args.with_range(BytesRange::new(0, None))) - .await - .map_err(FoyerError::other)?; - let buffer = reader.read_all().await.map_err(FoyerError::other)?; - Ok(buffer.into()) - } - }) - .await - .map_err(extract_err)?; - - let r = r.to_range(); - let start = match r.start_bound() { - Bound::Included(i) => *i, - Bound::Excluded(i) => *i + 1, - Bound::Unbounded => 0, - }; - let end = match r.end_bound() { - Bound::Included(i) => *i + 1, - Bound::Excluded(i) => *i, - Bound::Unbounded => entry.len() as u64, - }; - let range = BytesContentRange::default() - .with_range(start, end - 1) - .with_size(entry.len() as _); - let buffer = entry.slice(start as usize..end as usize); - let rp = RpRead::new() - .with_size(Some(buffer.len() as _)) - .with_range(Some(range)); - Ok((rp, buffer)) - } + let range = args.range().to_range(); + let entry = self + .inner + .cache + .fetch(path.clone(), || { + let inner = self.inner.clone(); + async move { + let (_, mut reader) = inner + .accessor + .read(&path, args.with_range(BytesRange::new(0, None))) + .await + .map_err(FoyerError::other)?; + let buffer = reader.read_all().await.map_err(FoyerError::other)?; + Ok(FoyerValue(buffer)) + } + }) + .await + .map_err(extract_err)?; + + let start = match range.start_bound() { + Bound::Included(i) => *i, + Bound::Excluded(i) => *i + 1, + Bound::Unbounded => 0, + }; + let end = match range.end_bound() { + Bound::Included(i) => *i + 1, + Bound::Excluded(i) => *i, + Bound::Unbounded => entry.len() as u64, + }; + let range = BytesContentRange::default() + .with_range(start, end - 1) + .with_size(entry.len() as _); + let buffer = entry.slice(start as usize..end as usize); + let rp = RpRead::new() + .with_size(Some(buffer.len() as _)) + .with_range(Some(range)); + Ok((rp, buffer)) } fn write( @@ -209,7 +189,7 @@ impl LayeredAccess for FoyerAccessor { rp, Writer { w, - q: Some(QueueBuf::new()), + buf: QueueBuf::new(), path: path.to_string(), inner, }, @@ -224,7 +204,7 @@ impl LayeredAccess for FoyerAccessor { Ok(( rp, Deleter { - d, + deleter: d, keys: vec![], inner, }, @@ -232,77 +212,32 @@ impl LayeredAccess for FoyerAccessor { } } - fn list( - &self, - path: &str, - args: OpList, - ) -> impl Future> + MaybeSend { - self.inner.accessor.list(path, args) - } - - fn info(&self) -> std::sync::Arc { - self.inner.accessor.info() - } - - fn create_dir( - &self, - path: &str, - args: OpCreateDir, - ) -> impl Future> + MaybeSend { - self.inner.accessor.create_dir(path, args) - } - - fn copy( - &self, - from: &str, - to: &str, - args: OpCopy, - ) -> impl Future> + MaybeSend { - // TODO(MrCroxx): Implement copy with foyer cache. - self.inner.accessor.copy(from, to, args) - } - - fn rename( - &self, - from: &str, - to: &str, - args: OpRename, - ) -> impl Future> + MaybeSend { - // TODO(MrCroxx): Implement copy with foyer cache. - self.inner.accessor.rename(from, to, args) - } - - fn stat(&self, path: &str, args: OpStat) -> impl Future> + MaybeSend { - // TODO(MrCroxx): Implement copy with foyer cache. - self.inner.accessor.stat(path, args) + async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> { + self.inner.accessor.list(path, args).await } - fn presign( - &self, - path: &str, - args: OpPresign, - ) -> impl Future> + MaybeSend { - self.inner.accessor.presign(path, args) - } + // TODO(MrCroxx): Implement copy, rename with foyer cache. } pub struct Writer { w: A::Writer, - q: Option, + buf: QueueBuf, path: String, inner: Arc>, } impl oio::Write for Writer { async fn write(&mut self, bs: Buffer) -> Result<()> { - self.q.as_mut().unwrap().push(bs.clone()); + self.buf.push(bs.clone()); self.w.write(bs).await } async fn close(&mut self) -> Result { - let buffer = self.q.take().unwrap().collect(); + let buffer = self.buf.clone().collect(); let res = self.w.close().await; - self.inner.cache.insert(self.path.clone(), buffer.into()); + self.inner + .cache + .insert(self.path.clone(), FoyerValue(buffer)); res } @@ -312,14 +247,14 @@ impl oio::Write for Writer { } pub struct Deleter { - d: A::Deleter, + deleter: A::Deleter, keys: Vec, inner: Arc>, } impl oio::Delete for Deleter { fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> { - self.d.delete(path, args.clone())?; + self.deleter.delete(path, args.clone())?; self.keys.push(path.to_string()); Ok(()) } @@ -328,14 +263,13 @@ impl oio::Delete for Deleter { for key in &self.keys { self.inner.cache.remove(key); } - let res = self.d.flush().await; + let res = self.deleter.flush().await; res } } #[cfg(test)] mod tests { - use foyer::{ DirectFsDeviceOptions, Engine, HybridCacheBuilder, LargeEngineOptions, RecoverMode, }; @@ -354,7 +288,7 @@ mod tests { vec![i; 63 * 1024] } - #[test_log::test(tokio::test)] + #[tokio::test] async fn test() { let dir = tempfile::tempdir().unwrap(); diff --git a/core/src/layers/mod.rs b/core/src/layers/mod.rs index a67ec062dfec..8bc66f4b6cd5 100644 --- a/core/src/layers/mod.rs +++ b/core/src/layers/mod.rs @@ -122,7 +122,7 @@ pub use self::dtrace::DtraceLayer; #[cfg(feature = "layers-foyer")] mod foyer; #[cfg(feature = "layers-foyer")] -pub use self::foyer::FoyerLayer; +pub use self::foyer::{FoyerLayer, FoyerValue}; pub mod observe; From 4206f0887f477812b47ee663257bcc95e3b297cd Mon Sep 17 00:00:00 2001 From: MrCroxx Date: Wed, 6 Aug 2025 10:59:27 +0800 Subject: [PATCH 09/20] feat: introduce size_limit to opt-out large objects Signed-off-by: MrCroxx --- core/Cargo.lock | 20 ++++++++++++-------- core/Cargo.toml | 6 ++---- core/src/layers/foyer.rs | 41 ++++++++++++++++++++++++++++++++++------ 3 files changed, 49 insertions(+), 18 deletions(-) diff --git a/core/Cargo.lock b/core/Cargo.lock index 8f3e7a61d75e..8b3dc3448aef 100644 --- a/core/Cargo.lock +++ b/core/Cargo.lock @@ -3191,8 +3191,9 @@ dependencies = [ [[package]] name = "foyer" -version = "0.17.4" -source = "git+https://github.com/foyer-rs/foyer?rev=4b50f6ddcf4c922f6f7caab8510b3fee3598fbb4#4b50f6ddcf4c922f6f7caab8510b3fee3598fbb4" +version = "0.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b4d8e96374206ff1b4265f2e2e6e1f80bc3048957b2a1e7fdeef929d68f318f" dependencies = [ "equivalent", "foyer-common", @@ -3209,8 +3210,9 @@ dependencies = [ [[package]] name = "foyer-common" -version = "0.17.4" -source = "git+https://github.com/foyer-rs/foyer?rev=4b50f6ddcf4c922f6f7caab8510b3fee3598fbb4#4b50f6ddcf4c922f6f7caab8510b3fee3598fbb4" +version = "0.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "911b8e3f23d5fe55b0b240f75af1d2fa5cb7261d3f9b38ef1c57bbc9f0449317" dependencies = [ "bytes", "cfg-if", @@ -3235,8 +3237,9 @@ dependencies = [ [[package]] name = "foyer-memory" -version = "0.17.4" -source = "git+https://github.com/foyer-rs/foyer?rev=4b50f6ddcf4c922f6f7caab8510b3fee3598fbb4#4b50f6ddcf4c922f6f7caab8510b3fee3598fbb4" +version = "0.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "506883d5a8500dea1b1662f7180f3534bdcbfa718d3253db7179552ef83612fa" dependencies = [ "arc-swap", "bitflags 2.9.1", @@ -3258,8 +3261,9 @@ dependencies = [ [[package]] name = "foyer-storage" -version = "0.17.4" -source = "git+https://github.com/foyer-rs/foyer?rev=4b50f6ddcf4c922f6f7caab8510b3fee3598fbb4#4b50f6ddcf4c922f6f7caab8510b3fee3598fbb4" +version = "0.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ba8403a54a2f2032fb647e49c442e5feeb33f3989f7024f1b178341a016f06d" dependencies = [ "allocator-api2", "anyhow", diff --git a/core/Cargo.toml b/core/Cargo.toml index 126be7d5f4c3..5f6c5338bc52 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -380,10 +380,6 @@ web-sys = { version = "0.3.77", optional = true, features = [ "StorageManager", "FileSystemGetFileOptions", ] } -# for services-foyer -# TODO(MrCroxx): Switch to a released version after testing with OpenDAL integration. -foyer = { git = "https://github.com/foyer-rs/foyer", rev = "4b50f6ddcf4c922f6f7caab8510b3fee3598fbb4", features = [ -], optional = true } # Layers # for layers-async-backtrace @@ -410,6 +406,8 @@ fastmetrics = { version = "0.3.0", optional = true } tracing = { version = "0.1", optional = true } # for layers-dtrace probe = { version = "0.5.1", optional = true } +# for layers-foyer +foyer = { version = "0.18", optional = true } [target.'cfg(target_arch = "wasm32")'.dependencies] backon = { version = "1.2", features = ["gloo-timers-sleep"] } diff --git a/core/src/layers/foyer.rs b/core/src/layers/foyer.rs index 83dc4df302fb..238c53423826 100644 --- a/core/src/layers/foyer.rs +++ b/core/src/layers/foyer.rs @@ -17,7 +17,7 @@ use std::{ future::Future, - ops::{Bound, Deref, RangeBounds}, + ops::{Bound, Deref, Range, RangeBounds}, sync::Arc, }; @@ -90,12 +90,34 @@ impl Code for FoyerValue { #[derive(Debug)] pub struct FoyerLayer { cache: HybridCache, + size_limit: Range, } impl FoyerLayer { /// Creates a new `FoyerLayer` with the given foyer hybrid cache. pub fn new(cache: HybridCache) -> Self { - FoyerLayer { cache } + FoyerLayer { + cache, + size_limit: 0..usize::MAX, + } + } + + /// Sets the size limit for caching. + /// + /// It is recommended to set a size limit to avoid caching large files that may not be suitable for caching. + pub fn with_size_limit>(mut self, size_limit: R) -> Self { + let start = match size_limit.start_bound() { + Bound::Included(v) => *v, + Bound::Excluded(v) => *v + 1, + Bound::Unbounded => 0, + }; + let end = match size_limit.end_bound() { + Bound::Included(v) => *v + 1, + Bound::Excluded(v) => *v, + Bound::Unbounded => usize::MAX, + }; + self.size_limit = start..end; + self } } @@ -105,7 +127,11 @@ impl Layer for FoyerLayer { fn layer(&self, accessor: A) -> Self::LayeredAccess { let cache = self.cache.clone(); FoyerAccessor { - inner: Arc::new(Inner { accessor, cache }), + inner: Arc::new(Inner { + accessor, + cache, + size_limit: self.size_limit.clone(), + }), } } } @@ -114,6 +140,7 @@ impl Layer for FoyerLayer { struct Inner { accessor: A, cache: HybridCache, + size_limit: Range, } #[derive(Debug)] @@ -235,9 +262,11 @@ impl oio::Write for Writer { async fn close(&mut self) -> Result { let buffer = self.buf.clone().collect(); let res = self.w.close().await; - self.inner - .cache - .insert(self.path.clone(), FoyerValue(buffer)); + if self.inner.size_limit.contains(&buffer.len()) { + self.inner + .cache + .insert(self.path.clone(), FoyerValue(buffer)); + } res } From 5aead97b828775bf1fd4db8426fea734624e25a6 Mon Sep 17 00:00:00 2001 From: MrCroxx Date: Thu, 7 Aug 2025 16:37:56 +0800 Subject: [PATCH 10/20] refactor: add version to foyer cache key Signed-off-by: MrCroxx --- core/src/layers/foyer.rs | 117 ++++++++++++++++++++++++++++++++------- 1 file changed, 96 insertions(+), 21 deletions(-) diff --git a/core/src/layers/foyer.rs b/core/src/layers/foyer.rs index 238c53423826..70ca36309c72 100644 --- a/core/src/layers/foyer.rs +++ b/core/src/layers/foyer.rs @@ -35,6 +35,60 @@ fn extract_err(e: FoyerError) -> Error { Error::new(ErrorKind::Unexpected, e.to_string()) } +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct FoyerKey { + pub path: String, + pub version: Option, +} + +impl Code for FoyerKey { + fn encode(&self, writer: &mut impl std::io::Write) -> std::result::Result<(), CodeError> { + let path_len = self.path.len() as u64; + writer.write_all(&path_len.to_le_bytes())?; + writer.write_all(self.path.as_bytes())?; + if let Some(ref version) = self.version { + let version_len = version.len() as u64; + writer.write_all(&version_len.to_le_bytes())?; + writer.write_all(version.as_bytes())?; + } else { + writer.write_all(&0u64.to_le_bytes())?; + } + Ok(()) + } + + fn decode(reader: &mut impl std::io::Read) -> std::result::Result + where + Self: Sized, + { + let mut u64_buf = [0u8; 8]; + reader.read_exact(&mut u64_buf)?; + let path_len = u64::from_le_bytes(u64_buf) as usize; + let mut path_buf = vec![0u8; path_len]; + reader.read_exact(&mut path_buf[..path_len])?; + let path = String::from_utf8(path_buf).map_err(|e| CodeError::Other(Box::new(e)))?; + + reader.read_exact(&mut u64_buf)?; + let version_len = u64::from_le_bytes(u64_buf) as usize; + let version = if version_len > 0 { + let mut version_buf = vec![0u8; path_len]; + reader.read_exact(&mut version_buf[..path_len])?; + let version = + String::from_utf8(version_buf).map_err(|e| CodeError::Other(Box::new(e)))?; + Some(version) + } else { + None + }; + Ok(FoyerKey { path, version }) + } + + fn estimated_size(&self) -> usize { + // 8B length prefix + path length + 8 + self.path.len() + // 8B version length prefix + version length, if present + + 8 + self.version.as_ref().map_or(0, |v| v.len()) + } +} + /// [`FoyerValue`] is a wrapper around `Buffer` that implements the `Code` trait. #[derive(Debug)] pub struct FoyerValue(pub Buffer); @@ -68,6 +122,7 @@ impl Code for FoyerValue { } fn estimated_size(&self) -> usize { + // 8B length prefix + buffer length 8 + self.0.len() } } @@ -87,15 +142,19 @@ impl Code for FoyerValue { /// use opendal::services::S3; /// /// ``` +/// +/// # Note +/// +/// If the object version is enabled, the foyer cache layer will treat the objects with same key but different versions as different objects. #[derive(Debug)] pub struct FoyerLayer { - cache: HybridCache, + cache: HybridCache, size_limit: Range, } impl FoyerLayer { /// Creates a new `FoyerLayer` with the given foyer hybrid cache. - pub fn new(cache: HybridCache) -> Self { + pub fn new(cache: HybridCache) -> Self { FoyerLayer { cache, size_limit: 0..usize::MAX, @@ -139,7 +198,7 @@ impl Layer for FoyerLayer { #[derive(Debug)] struct Inner { accessor: A, - cache: HybridCache, + cache: HybridCache, size_limit: Range, } @@ -165,22 +224,29 @@ impl LayeredAccess for FoyerAccessor { async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { let path = path.to_string(); + let version = args.version().map(|v| v.to_string()); let range = args.range().to_range(); let entry = self .inner .cache - .fetch(path.clone(), || { - let inner = self.inner.clone(); - async move { - let (_, mut reader) = inner - .accessor - .read(&path, args.with_range(BytesRange::new(0, None))) - .await - .map_err(FoyerError::other)?; - let buffer = reader.read_all().await.map_err(FoyerError::other)?; - Ok(FoyerValue(buffer)) - } - }) + .fetch( + FoyerKey { + path: path.clone(), + version, + }, + || { + let inner = self.inner.clone(); + async move { + let (_, mut reader) = inner + .accessor + .read(&path, args.with_range(BytesRange::new(0, None))) + .await + .map_err(FoyerError::other)?; + let buffer = reader.read_all().await.map_err(FoyerError::other)?; + Ok(FoyerValue(buffer)) + } + }, + ) .await .map_err(extract_err)?; @@ -262,10 +328,16 @@ impl oio::Write for Writer { async fn close(&mut self) -> Result { let buffer = self.buf.clone().collect(); let res = self.w.close().await; - if self.inner.size_limit.contains(&buffer.len()) { - self.inner - .cache - .insert(self.path.clone(), FoyerValue(buffer)); + if let Ok(metadata) = &res { + if self.inner.size_limit.contains(&buffer.len()) { + self.inner.cache.insert( + FoyerKey { + path: self.path.clone(), + version: metadata.version().map(|v| v.to_string()), + }, + FoyerValue(buffer), + ); + } } res } @@ -277,14 +349,17 @@ impl oio::Write for Writer { pub struct Deleter { deleter: A::Deleter, - keys: Vec, + keys: Vec, inner: Arc>, } impl oio::Delete for Deleter { fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> { self.deleter.delete(path, args.clone())?; - self.keys.push(path.to_string()); + self.keys.push(FoyerKey { + path: path.to_string(), + version: args.version().map(|v| v.to_string()), + }); Ok(()) } From 7474e5ce707576916905dff8e9c7efb64306af58 Mon Sep 17 00:00:00 2001 From: Li Yazhou Date: Mon, 29 Dec 2025 20:36:45 +0800 Subject: [PATCH 11/20] fix: fmt --- core/layers/foyer/Cargo.toml | 4 +++- core/layers/foyer/src/lib.rs | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/core/layers/foyer/Cargo.toml b/core/layers/foyer/Cargo.toml index a0a6f28ba638..0f10f4694364 100644 --- a/core/layers/foyer/Cargo.toml +++ b/core/layers/foyer/Cargo.toml @@ -35,7 +35,9 @@ foyer = "0.18" opendal-core = { path = "../../core", version = "0.55.0", default-features = false } [dev-dependencies] -opendal-core = { path = "../../core", version = "0.55.0", features = ["services-memory"] } +opendal-core = { path = "../../core", version = "0.55.0", features = [ + "services-memory", +] } size = "0.5" tempfile = "3" tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } diff --git a/core/layers/foyer/src/lib.rs b/core/layers/foyer/src/lib.rs index cd6828c433a5..022b6a33e338 100644 --- a/core/layers/foyer/src/lib.rs +++ b/core/layers/foyer/src/lib.rs @@ -377,7 +377,7 @@ mod tests { }; use size::consts::MiB; - use opendal_core::{services::Dashmap, Operator}; + use opendal_core::{Operator, services::Dashmap}; use super::*; From fe8e37958f155e17eb3ea22e9897a688ef98f468 Mon Sep 17 00:00:00 2001 From: Li Yazhou Date: Mon, 29 Dec 2025 21:14:39 +0800 Subject: [PATCH 12/20] fix: cargo check --all-features --- core/Cargo.lock | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/core/Cargo.lock b/core/Cargo.lock index b8d84d5f26dd..95736c3ca759 100644 --- a/core/Cargo.lock +++ b/core/Cargo.lock @@ -338,9 +338,9 @@ dependencies = [ [[package]] name = "async-graphql" -version = "7.0.16" +version = "7.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d3ee559e72d983e7e04001ba3bf32e6b71c1d670595780723727fd8a29d36e87" +checksum = "036618f842229ba0b89652ffe425f96c7c16a49f7e3cb23b56fca7f61fd74980" dependencies = [ "async-graphql-derive", "async-graphql-parser", @@ -1787,7 +1787,7 @@ version = "3.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fde0e0ec90c9dfb3b4b1a0891a7dcd0e2bffde2f7efed5fe7c9bb00e5bfb915e" dependencies = [ - "windows-sys 0.59.0", + "windows-sys 0.48.0", ] [[package]] @@ -4286,7 +4286,7 @@ dependencies = [ "libc", "percent-encoding", "pin-project-lite", - "socket2 0.6.1", + "socket2 0.5.10", "system-configuration", "tokio", "tower-service", @@ -4544,9 +4544,9 @@ checksum = "469fb0b9cefa57e3ef31275ee7cacb78f2fdca44e4765491884a2b119d4eb130" [[package]] name = "iri-string" -version = "0.7.9" +version = "0.7.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4f867b9d1d896b67beb18518eda36fdb77a32ea590de864f1325b294a6d14397" +checksum = "c91338f0783edbd6195decb37bae672fd3b165faffb89bf7b9e6942f8b1a731a" dependencies = [ "memchr", "serde", @@ -7345,7 +7345,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7d8fae84b431384b68627d0f9b3b1245fcf9f46f6c0e3dc902e9dce64edd1967" dependencies = [ "libc", - "windows-sys 0.61.2", + "windows-sys 0.48.0", ] [[package]] @@ -7976,7 +7976,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac6c3320f9abac597dcbc668774ef006702672474aad53c6d596b62e487b40b1" dependencies = [ "heck", - "itertools 0.14.0", + "itertools 0.12.1", "log", "multimap", "once_cell", @@ -8011,7 +8011,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" dependencies = [ "anyhow", - "itertools 0.14.0", + "itertools 0.12.1", "proc-macro2", "quote", "syn 2.0.111", @@ -8024,7 +8024,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9120690fafc389a67ba3803df527d0ec9cbbc9cc45e4cc20b332996dfb672425" dependencies = [ "anyhow", - "itertools 0.14.0", + "itertools 0.12.1", "proc-macro2", "quote", "syn 2.0.111", @@ -8176,7 +8176,7 @@ dependencies = [ "quinn-udp", "rustc-hash 2.1.1", "rustls 0.23.35", - "socket2 0.6.1", + "socket2 0.5.10", "thiserror 2.0.17", "tokio", "tracing", @@ -8213,7 +8213,7 @@ dependencies = [ "cfg_aliases", "libc", "once_cell", - "socket2 0.6.1", + "socket2 0.5.10", "tracing", "windows-sys 0.60.2", ] @@ -11637,7 +11637,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.48.0", ] [[package]] @@ -12267,9 +12267,9 @@ dependencies = [ [[package]] name = "zmij" -version = "1.0.0" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6d6085d62852e35540689d1f97ad663e3971fc19cf5eceab364d62c646ea167" +checksum = "0f4a4e8e9dc5c62d159f04fcdbe07f4c3fb710415aab4754bf11505501e3251d" [[package]] name = "zstd" From dec1339ed06a0e6f6faa5ce3d0830a19c9c6dbaf Mon Sep 17 00:00:00 2001 From: Li Yazhou Date: Mon, 29 Dec 2025 21:33:32 +0800 Subject: [PATCH 13/20] fix: cargo clippy --all-features --- core/layers/foyer/src/lib.rs | 24 +++++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/core/layers/foyer/src/lib.rs b/core/layers/foyer/src/lib.rs index 022b6a33e338..7702a1a40580 100644 --- a/core/layers/foyer/src/lib.rs +++ b/core/layers/foyer/src/lib.rs @@ -137,10 +137,24 @@ impl Code for FoyerValue { /// /// # Examples /// -/// ```rust -/// use opendal::layers::FoyerLayer; -/// use opendal::services::S3; +/// ```no_run +/// use opendal_core::{Operator, services::Memory}; +/// use opendal_layer_foyer::FoyerLayer; +/// use foyer::{HybridCacheBuilder, Engine}; /// +/// # async fn example() -> Result<(), Box> { +/// let cache = HybridCacheBuilder::new() +/// .memory(64 * 1024 * 1024) // 64MB memory cache +/// .with_shards(4) +/// .storage(Engine::Large(Default::default())) +/// .build() +/// .await?; +/// +/// let op = Operator::new(Memory::default())? +/// .layer(FoyerLayer::new(cache)) +/// .finish(); +/// # Ok(()) +/// # } /// ``` /// /// # Note @@ -377,7 +391,7 @@ mod tests { }; use size::consts::MiB; - use opendal_core::{Operator, services::Dashmap}; + use opendal_core::{Operator, services::Memory}; use super::*; @@ -408,7 +422,7 @@ mod tests { .await .unwrap(); - let op = Operator::new(Dashmap::default()) + let op = Operator::new(Memory::default()) .unwrap() .layer(FoyerLayer::new(cache.clone())) .finish(); From e1abdb30e39701720cafd5474422be9c99ef3c9b Mon Sep 17 00:00:00 2001 From: Li Yazhou Date: Thu, 1 Jan 2026 16:08:03 +0800 Subject: [PATCH 14/20] refactor: utilize bincode for FoyerKey --- core/Cargo.lock | 5 ++ core/layers/foyer/Cargo.toml | 4 +- core/layers/foyer/src/lib.rs | 143 ++++++++++++++++++++++------------- 3 files changed, 99 insertions(+), 53 deletions(-) diff --git a/core/Cargo.lock b/core/Cargo.lock index 95736c3ca759..cb871d67c2cd 100644 --- a/core/Cargo.lock +++ b/core/Cargo.lock @@ -3240,6 +3240,7 @@ version = "0.18.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9db9c0e4648b13e9216d785b308d43751ca975301aeb83e607ec630b6f956944" dependencies = [ + "bincode", "bytes", "cfg-if", "itertools 0.14.0", @@ -3247,6 +3248,7 @@ dependencies = [ "mixtrics", "parking_lot 0.12.5", "pin-project", + "serde", "thiserror 2.0.17", "tokio", "twox-hash", @@ -3311,6 +3313,7 @@ dependencies = [ "paste", "pin-project", "rand 0.9.2", + "serde", "thiserror 2.0.17", "tokio", "tracing", @@ -6107,8 +6110,10 @@ dependencies = [ name = "opendal-layer-foyer" version = "0.55.0" dependencies = [ + "bincode", "foyer", "opendal-core", + "serde", "size", "tempfile", "tokio", diff --git a/core/layers/foyer/Cargo.toml b/core/layers/foyer/Cargo.toml index 0f10f4694364..bba6fbc98b37 100644 --- a/core/layers/foyer/Cargo.toml +++ b/core/layers/foyer/Cargo.toml @@ -31,8 +31,10 @@ version = { workspace = true } all-features = true [dependencies] -foyer = "0.18" +bincode = "1" +foyer = { version = "0.18", features = ["serde"] } opendal-core = { path = "../../core", version = "0.55.0", default-features = false } +serde = { workspace = true, features = ["derive"] } [dev-dependencies] opendal-core = { path = "../../core", version = "0.55.0", features = [ diff --git a/core/layers/foyer/src/lib.rs b/core/layers/foyer/src/lib.rs index 7702a1a40580..f494e5d3608c 100644 --- a/core/layers/foyer/src/lib.rs +++ b/core/layers/foyer/src/lib.rs @@ -35,60 +35,20 @@ fn extract_err(e: FoyerError) -> Error { Error::new(ErrorKind::Unexpected, e.to_string()) } -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +/// [`FoyerKey`] is a key for the foyer cache. It's encoded via bincode, which is +/// backed by foyer's "serde" feature. +/// +/// It's possible to specify a version in the [`OpRead`] args: +/// +/// - If a version is given, the object is cached under that versioned key. +/// - If version is not supplied, the object is cached exactly as returned by the backend, +/// We do NOT interpret `None` as "latest" and we do not promote it to any other version. +#[derive(Debug, Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)] pub struct FoyerKey { pub path: String, pub version: Option, } -impl Code for FoyerKey { - fn encode(&self, writer: &mut impl std::io::Write) -> std::result::Result<(), CodeError> { - let path_len = self.path.len() as u64; - writer.write_all(&path_len.to_le_bytes())?; - writer.write_all(self.path.as_bytes())?; - if let Some(ref version) = self.version { - let version_len = version.len() as u64; - writer.write_all(&version_len.to_le_bytes())?; - writer.write_all(version.as_bytes())?; - } else { - writer.write_all(&0u64.to_le_bytes())?; - } - Ok(()) - } - - fn decode(reader: &mut impl std::io::Read) -> std::result::Result - where - Self: Sized, - { - let mut u64_buf = [0u8; 8]; - reader.read_exact(&mut u64_buf)?; - let path_len = u64::from_le_bytes(u64_buf) as usize; - let mut path_buf = vec![0u8; path_len]; - reader.read_exact(&mut path_buf[..path_len])?; - let path = String::from_utf8(path_buf).map_err(|e| CodeError::Other(Box::new(e)))?; - - reader.read_exact(&mut u64_buf)?; - let version_len = u64::from_le_bytes(u64_buf) as usize; - let version = if version_len > 0 { - let mut version_buf = vec![0u8; path_len]; - reader.read_exact(&mut version_buf[..path_len])?; - let version = - String::from_utf8(version_buf).map_err(|e| CodeError::Other(Box::new(e)))?; - Some(version) - } else { - None - }; - Ok(FoyerKey { path, version }) - } - - fn estimated_size(&self) -> usize { - // 8B length prefix + path length - 8 + self.path.len() - // 8B version length prefix + version length, if present - + 8 + self.version.as_ref().map_or(0, |v| v.len()) - } -} - /// [`FoyerValue`] is a wrapper around `Buffer` that implements the `Code` trait. #[derive(Debug)] pub struct FoyerValue(pub Buffer); @@ -122,7 +82,6 @@ impl Code for FoyerValue { } fn estimated_size(&self) -> usize { - // 8B length prefix + buffer length 8 + self.0.len() } } @@ -389,9 +348,9 @@ mod tests { use foyer::{ DirectFsDeviceOptions, Engine, HybridCacheBuilder, LargeEngineOptions, RecoverMode, }; - use size::consts::MiB; - use opendal_core::{Operator, services::Memory}; + use size::consts::MiB; + use std::io::Cursor; use super::*; @@ -467,4 +426,84 @@ mod tests { let oe = extract_err(fe); assert_eq!(oe.kind(), ErrorKind::NotFound); } + + #[test] + fn test_foyer_key_version_none_vs_empty() { + let key_none = FoyerKey { + path: "test/path".to_string(), + version: None, + }; + + let key_empty = FoyerKey { + path: "test/path".to_string(), + version: Some("".to_string()), + }; + + let mut buf_none = Vec::new(); + key_none.encode(&mut buf_none).unwrap(); + + let mut buf_empty = Vec::new(); + key_empty.encode(&mut buf_empty).unwrap(); + + assert_ne!( + buf_none, buf_empty, + "Serialization of version=None and version=\"\" should be different" + ); + + let decoded_none = FoyerKey::decode(&mut Cursor::new(&buf_none)).unwrap(); + assert_eq!(decoded_none, key_none); + let decoded_empty = FoyerKey::decode(&mut Cursor::new(&buf_empty)).unwrap(); + assert_eq!(decoded_empty, key_empty); + } + + #[test] + fn test_foyer_key_serde() { + use std::io::Cursor; + + let test_cases = vec![ + FoyerKey { + path: "simple".to_string(), + version: None, + }, + FoyerKey { + path: "with/slash/path".to_string(), + version: None, + }, + FoyerKey { + path: "versioned".to_string(), + version: Some("v1.0.0".to_string()), + }, + FoyerKey { + path: "empty-version".to_string(), + version: Some("".to_string()), + }, + FoyerKey { + path: "".to_string(), + version: None, + }, + FoyerKey { + path: "unicode/θ·―εΎ„/πŸš€".to_string(), + version: Some("η‰ˆζœ¬-1".to_string()), + }, + FoyerKey { + path: "long/".to_string().repeat(100), + version: Some("long-version-".to_string().repeat(50)), + }, + ]; + + for original in test_cases { + let mut buffer = Vec::new(); + original + .encode(&mut buffer) + .expect("encoding should succeed"); + + let decoded = + FoyerKey::decode(&mut Cursor::new(&buffer)).expect("decoding should succeed"); + + assert_eq!( + decoded, original, + "decode(encode(key)) should equal original key" + ); + } + } } From 22711a43483a94327bc781706dbb52f13415c731 Mon Sep 17 00:00:00 2001 From: Li Yazhou Date: Sun, 4 Jan 2026 22:07:16 +0800 Subject: [PATCH 15/20] fix: rm the mis-committed mod.rs --- core/src/layers/mod.rs | 139 ----------------------------------------- core/src/lib.rs | 2 + 2 files changed, 2 insertions(+), 139 deletions(-) delete mode 100644 core/src/layers/mod.rs diff --git a/core/src/layers/mod.rs b/core/src/layers/mod.rs deleted file mode 100644 index c688cf1e9934..000000000000 --- a/core/src/layers/mod.rs +++ /dev/null @@ -1,139 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! `Layer` is the mechanism to intercept operations. - -mod type_eraser; -pub(crate) use type_eraser::TypeEraseLayer; - -mod error_context; -pub(crate) use error_context::ErrorContextLayer; - -mod complete; -pub(crate) use complete::CompleteLayer; - -mod concurrent_limit; -pub use concurrent_limit::ConcurrentLimitLayer; - -mod immutable_index; -pub use immutable_index::ImmutableIndexLayer; - -mod logging; -pub use logging::LoggingInterceptor; -pub use logging::LoggingLayer; - -mod timeout; -pub use timeout::TimeoutLayer; - -#[cfg(feature = "layers-chaos")] -mod chaos; -#[cfg(feature = "layers-chaos")] -pub use chaos::ChaosLayer; - -#[cfg(feature = "layers-metrics")] -mod metrics; -#[cfg(feature = "layers-metrics")] -pub use self::metrics::MetricsLayer; - -#[cfg(feature = "layers-mime-guess")] -mod mime_guess; -#[cfg(feature = "layers-mime-guess")] -pub use self::mime_guess::MimeGuessLayer; - -#[cfg(feature = "layers-prometheus")] -mod prometheus; -#[cfg(feature = "layers-prometheus")] -pub use self::prometheus::PrometheusLayer; -#[cfg(feature = "layers-prometheus")] -pub use self::prometheus::PrometheusLayerBuilder; - -#[cfg(feature = "layers-prometheus-client")] -mod prometheus_client; -#[cfg(feature = "layers-prometheus-client")] -pub use self::prometheus_client::PrometheusClientLayer; -#[cfg(feature = "layers-prometheus-client")] -pub use self::prometheus_client::PrometheusClientLayerBuilder; - -#[cfg(feature = "layers-fastmetrics")] -mod fastmetrics; -#[cfg(feature = "layers-fastmetrics")] -pub use self::fastmetrics::FastmetricsLayer; -#[cfg(feature = "layers-fastmetrics")] -pub use self::fastmetrics::FastmetricsLayerBuilder; - -mod retry; -pub use self::retry::RetryInterceptor; -pub use self::retry::RetryLayer; - -mod tail_cut; -pub use self::tail_cut::TailCutLayer; -pub use self::tail_cut::TailCutLayerBuilder; - -#[cfg(feature = "layers-tracing")] -mod tracing; -#[cfg(feature = "layers-tracing")] -pub use self::tracing::TracingLayer; - -#[cfg(feature = "layers-fastrace")] -mod fastrace; -#[cfg(feature = "layers-fastrace")] -pub use self::fastrace::FastraceLayer; - -#[cfg(feature = "layers-otel-metrics")] -mod otelmetrics; -#[cfg(feature = "layers-otel-metrics")] -pub use self::otelmetrics::OtelMetricsLayer; - -#[cfg(feature = "layers-otel-trace")] -mod oteltrace; -#[cfg(feature = "layers-otel-trace")] -pub use self::oteltrace::OtelTraceLayer; - -#[cfg(feature = "layers-throttle")] -mod throttle; -#[cfg(feature = "layers-throttle")] -pub use self::throttle::ThrottleLayer; - -#[cfg(feature = "layers-await-tree")] -mod await_tree; -#[cfg(feature = "layers-await-tree")] -pub use self::await_tree::AwaitTreeLayer; - -#[cfg(feature = "layers-async-backtrace")] -mod async_backtrace; -#[cfg(feature = "layers-async-backtrace")] -pub use self::async_backtrace::AsyncBacktraceLayer; - -#[cfg(all(target_os = "linux", feature = "layers-dtrace"))] -mod dtrace; -#[cfg(all(target_os = "linux", feature = "layers-dtrace"))] -pub use self::dtrace::DtraceLayer; - -#[cfg(feature = "layers-foyer")] -mod foyer; -#[cfg(feature = "layers-foyer")] -pub use self::foyer::{FoyerLayer, FoyerValue}; - -pub mod observe; - -mod correctness_check; -pub(crate) use correctness_check::CorrectnessCheckLayer; -mod capability_check; -pub use capability_check::CapabilityCheckLayer; - -mod http_client; -pub use http_client::HttpClientLayer; diff --git a/core/src/lib.rs b/core/src/lib.rs index da5589d779d4..2205a3fb09e5 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -385,6 +385,8 @@ pub mod layers { pub use opendal_layer_fastmetrics::*; #[cfg(feature = "layers-fastrace")] pub use opendal_layer_fastrace::*; + #[cfg(feature = "layers-foyer")] + pub use opendal_layer_foyer::*; #[cfg(feature = "layers-hotpath")] pub use opendal_layer_hotpath::*; #[cfg(feature = "layers-immutable-index")] From 74fe005329d04bedefd7bf655e1e8831803355e2 Mon Sep 17 00:00:00 2001 From: flaneur Date: Mon, 5 Jan 2026 22:09:37 +0800 Subject: [PATCH 16/20] Update core/layers/foyer/src/lib.rs Co-authored-by: Jorge Hermo --- core/layers/foyer/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/layers/foyer/src/lib.rs b/core/layers/foyer/src/lib.rs index f494e5d3608c..5366101d5c09 100644 --- a/core/layers/foyer/src/lib.rs +++ b/core/layers/foyer/src/lib.rs @@ -118,7 +118,7 @@ impl Code for FoyerValue { /// /// # Note /// -/// If the object version is enabled, the foyer cache layer will treat the objects with same key but different versions as different objects. +/// If the object version is enabled, the foyer cache layer will treat the objects with same path but different versions as different objects. #[derive(Debug)] pub struct FoyerLayer { cache: HybridCache, From 797869fdc6cb49443795abfefe14923e3c32b2cb Mon Sep 17 00:00:00 2001 From: flaneur Date: Mon, 5 Jan 2026 22:09:54 +0800 Subject: [PATCH 17/20] Update core/layers/foyer/src/lib.rs Co-authored-by: Jorge Hermo --- core/layers/foyer/src/lib.rs | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/core/layers/foyer/src/lib.rs b/core/layers/foyer/src/lib.rs index 5366101d5c09..8377e260efc6 100644 --- a/core/layers/foyer/src/lib.rs +++ b/core/layers/foyer/src/lib.rs @@ -299,19 +299,17 @@ impl oio::Write for Writer { async fn close(&mut self) -> Result { let buffer = self.buf.clone().collect(); - let res = self.w.close().await; - if let Ok(metadata) = &res { - if self.inner.size_limit.contains(&buffer.len()) { - self.inner.cache.insert( - FoyerKey { - path: self.path.clone(), - version: metadata.version().map(|v| v.to_string()), - }, - FoyerValue(buffer), - ); - } + let metadata = self.w.close().await?; + if self.inner.size_limit.contains(&buffer.len()) { + self.inner.cache.insert( + FoyerKey { + path: self.path.clone(), + version: metadata.version().map(|v| v.to_string()), + }, + FoyerValue(buffer), + ); } - res + Ok(metadata) } async fn abort(&mut self) -> Result<()> { From 1dbf2b1a1047d61a07634c446b751badc15eb39f Mon Sep 17 00:00:00 2001 From: Li Yazhou Date: Mon, 5 Jan 2026 23:08:19 +0800 Subject: [PATCH 18/20] not to cache it if the object is bigger than size_limit --- core/Cargo.lock | 425 ++++++++++++++++++++++++++++++++++- core/layers/foyer/src/lib.rs | 88 ++++++-- 2 files changed, 485 insertions(+), 28 deletions(-) diff --git a/core/Cargo.lock b/core/Cargo.lock index 27894c616d86..2c2fd9bbae3c 100644 --- a/core/Cargo.lock +++ b/core/Cargo.lock @@ -566,6 +566,18 @@ dependencies = [ "syn 2.0.111", ] +[[package]] +name = "auto_enums" +version = "0.8.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c170965892137a3a9aeb000b4524aa3cc022a310e709d848b6e1cdce4ab4781" +dependencies = [ + "derive_utils", + "proc-macro2", + "quote", + "syn 2.0.111", +] + [[package]] name = "autocfg" version = "1.5.0" @@ -1712,7 +1724,7 @@ dependencies = [ "anstream", "anstyle", "clap_lex", - "strsim", + "strsim 0.11.1", "terminal_size", ] @@ -1743,6 +1755,12 @@ dependencies = [ "cc", ] +[[package]] +name = "cmsketch" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7ee2cfacbd29706479902b06d75ad8f1362900836aa32799eabc7e004bfd854" + [[package]] name = "coarsetime" version = "0.1.36" @@ -2260,6 +2278,16 @@ dependencies = [ "cipher", ] +[[package]] +name = "darling" +version = "0.14.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b750cb3417fd1b327431a470f388520309479ab0bf5e323505daf0290cd3850" +dependencies = [ + "darling_core 0.14.4", + "darling_macro 0.14.4", +] + [[package]] name = "darling" version = "0.20.11" @@ -2280,6 +2308,20 @@ dependencies = [ "darling_macro 0.21.3", ] +[[package]] +name = "darling_core" +version = "0.14.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "109c1ca6e6b7f82cc233a97004ea8ed7ca123a9af07a8230878fcfda9b158bf0" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2", + "quote", + "strsim 0.10.0", + "syn 1.0.109", +] + [[package]] name = "darling_core" version = "0.20.11" @@ -2290,7 +2332,7 @@ dependencies = [ "ident_case", "proc-macro2", "quote", - "strsim", + "strsim 0.11.1", "syn 2.0.111", ] @@ -2304,10 +2346,21 @@ dependencies = [ "ident_case", "proc-macro2", "quote", - "strsim", + "strsim 0.11.1", "syn 2.0.111", ] +[[package]] +name = "darling_macro" +version = "0.14.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4aab4dbc9f7611d8b55048a3a16d2d010c2c8334e46304b40ac1cc14bf3b48e" +dependencies = [ + "darling_core 0.14.4", + "quote", + "syn 1.0.109", +] + [[package]] name = "darling_macro" version = "0.20.11" @@ -2503,6 +2556,17 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "derive_utils" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ccfae181bab5ab6c5478b2ccb69e4c68a02f8c3ec72f6616bfec9dbc599d2ee0" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.111", +] + [[package]] name = "des" version = "0.8.1" @@ -2630,6 +2694,12 @@ version = "0.15.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b" +[[package]] +name = "downcast-rs" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75b325c5dbd37f80359721ad39aca5a29fb04c89279657cffdda8736d0c0b9d2" + [[package]] name = "dtoa" version = "1.0.10" @@ -3157,6 +3227,112 @@ dependencies = [ "uuid", ] +[[package]] +name = "foyer" +version = "0.18.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "642093b1a72c4a0ef89862484d669a353e732974781bb9c49a979526d1e30edc" +dependencies = [ + "equivalent", + "foyer-common", + "foyer-memory", + "foyer-storage", + "madsim-tokio", + "mixtrics", + "pin-project", + "serde", + "thiserror 2.0.17", + "tokio", + "tracing", +] + +[[package]] +name = "foyer-common" +version = "0.18.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9db9c0e4648b13e9216d785b308d43751ca975301aeb83e607ec630b6f956944" +dependencies = [ + "bincode", + "bytes", + "cfg-if", + "itertools 0.14.0", + "madsim-tokio", + "mixtrics", + "parking_lot 0.12.5", + "pin-project", + "serde", + "thiserror 2.0.17", + "tokio", + "twox-hash", +] + +[[package]] +name = "foyer-intrusive-collections" +version = "0.10.0-dev" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e4fee46bea69e0596130e3210e65d3424e0ac1e6df3bde6636304bdf1ca4a3b" +dependencies = [ + "memoffset 0.9.1", +] + +[[package]] +name = "foyer-memory" +version = "0.18.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "040dc38acbfca8f1def26bbbd9e9199090884aabb15de99f7bf4060be66ff608" +dependencies = [ + "arc-swap", + "bitflags 2.10.0", + "cmsketch", + "equivalent", + "foyer-common", + "foyer-intrusive-collections", + "hashbrown 0.15.5", + "itertools 0.14.0", + "madsim-tokio", + "mixtrics", + "parking_lot 0.12.5", + "pin-project", + "serde", + "thiserror 2.0.17", + "tokio", + "tracing", +] + +[[package]] +name = "foyer-storage" +version = "0.18.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "54a77ed888da490e997da6d6d62fcbce3f202ccf28be098c4ea595ca046fc4a9" +dependencies = [ + "allocator-api2", + "anyhow", + "auto_enums", + "bytes", + "equivalent", + "flume 0.11.1", + "foyer-common", + "foyer-memory", + "fs4", + "futures-core", + "futures-util", + "itertools 0.14.0", + "libc", + "lz4", + "madsim-tokio", + "ordered_hash_map", + "parking_lot 0.12.5", + "paste", + "pin-project", + "rand 0.9.2", + "serde", + "thiserror 2.0.17", + "tokio", + "tracing", + "twox-hash", + "zstd", +] + [[package]] name = "fs2" version = "0.4.3" @@ -3167,6 +3343,16 @@ dependencies = [ "winapi", ] +[[package]] +name = "fs4" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8640e34b88f7652208ce9e88b1a37a2ae95227d84abec377ccd3c5cfeb141ed4" +dependencies = [ + "rustix 1.1.2", + "windows-sys 0.59.0", +] + [[package]] name = "fs_extra" version = "1.3.0" @@ -3639,6 +3825,15 @@ dependencies = [ "ahash 0.7.8", ] +[[package]] +name = "hashbrown" +version = "0.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43a3c133739dddd0d2990f9a4bdf8eb4b21ef50e4851ca85ab661199821d510e" +dependencies = [ + "ahash 0.8.12", +] + [[package]] name = "hashbrown" version = "0.14.5" @@ -4854,6 +5049,25 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154" +[[package]] +name = "lz4" +version = "1.28.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a20b523e860d03443e98350ceaac5e71c6ba89aea7d960769ec3ce37f4de5af4" +dependencies = [ + "lz4-sys", +] + +[[package]] +name = "lz4-sys" +version = "1.11.1+lz4-1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6bd8c0d6c6ed0cd30b3652886bb8711dc4bb01d637a68105a3d5158039b418e6" +dependencies = [ + "cc", + "libc", +] + [[package]] name = "mac" version = "0.1.1" @@ -4908,6 +5122,61 @@ dependencies = [ "syn 2.0.111", ] +[[package]] +name = "madsim" +version = "0.2.34" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18351aac4194337d6ea9ffbd25b3d1540ecc0754142af1bff5ba7392d1f6f771" +dependencies = [ + "ahash 0.8.12", + "async-channel 2.5.0", + "async-stream", + "async-task", + "bincode", + "bytes", + "downcast-rs", + "errno", + "futures-util", + "lazy_static", + "libc", + "madsim-macros", + "naive-timer", + "panic-message", + "rand 0.8.5", + "rand_xoshiro", + "rustversion", + "serde", + "spin", + "tokio", + "tokio-util", + "toml", + "tracing", + "tracing-subscriber", +] + +[[package]] +name = "madsim-macros" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3d248e97b1a48826a12c3828d921e8548e714394bf17274dd0a93910dc946e1" +dependencies = [ + "darling 0.14.4", + "proc-macro2", + "quote", + "syn 1.0.109", +] + +[[package]] +name = "madsim-tokio" +version = "0.2.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d3eb2acc57c82d21d699119b859e2df70a91dbdb84734885a1e72be83bdecb5" +dependencies = [ + "madsim", + "spin", + "tokio", +] + [[package]] name = "maplit" version = "1.0.2" @@ -5019,6 +5288,15 @@ dependencies = [ "autocfg", ] +[[package]] +name = "memoffset" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "488016bfae457b036d996092f6cb448677611ce4449e970ceaf42695203f218a" +dependencies = [ + "autocfg", +] + [[package]] name = "metrics" version = "0.24.3" @@ -5122,6 +5400,16 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "mixtrics" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb252c728b9d77c6ef9103f0c81524fa0a3d3b161d0a936295d7fbeff6e04c11" +dependencies = [ + "itertools 0.14.0", + "parking_lot 0.12.5", +] + [[package]] name = "moka" version = "0.12.12" @@ -5195,7 +5483,7 @@ dependencies = [ "sha2", "socket2 0.6.1", "stringprep", - "strsim", + "strsim 0.11.1", "take_mut", "thiserror 2.0.17", "tokio", @@ -5275,6 +5563,12 @@ version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1d87ecb2933e8aeadb3e3a02b828fed80a7528047e68b4f424523a0981a3a084" +[[package]] +name = "naive-timer" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "034a0ad7deebf0c2abcf2435950a6666c3c15ea9d8fad0c0f48efa8a7f843fed" + [[package]] name = "nanoid" version = "0.4.0" @@ -5363,7 +5657,7 @@ dependencies = [ "bitflags 1.3.2", "cfg-if", "libc", - "memoffset", + "memoffset 0.7.1", "pin-utils", ] @@ -5578,6 +5872,7 @@ dependencies = [ "opendal-layer-dtrace", "opendal-layer-fastmetrics", "opendal-layer-fastrace", + "opendal-layer-foyer", "opendal-layer-hotpath", "opendal-layer-immutable-index", "opendal-layer-logging", @@ -5832,6 +6127,19 @@ dependencies = [ "tokio", ] +[[package]] +name = "opendal-layer-foyer" +version = "0.55.0" +dependencies = [ + "bincode", + "foyer", + "opendal-core", + "serde", + "size", + "tempfile", + "tokio", +] + [[package]] name = "opendal-layer-hotpath" version = "0.55.0" @@ -7053,6 +7361,15 @@ dependencies = [ "hashbrown 0.14.5", ] +[[package]] +name = "ordered_hash_map" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab0e5f22bf6dd04abd854a8874247813a8fa2c8c1260eba6fbb150270ce7c176" +dependencies = [ + "hashbrown 0.13.2", +] + [[package]] name = "os_pipe" version = "1.2.3" @@ -7080,6 +7397,12 @@ dependencies = [ "sha2", ] +[[package]] +name = "panic-message" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "384e52fd8fbd4cbe3c317e8216260c21a0f9134de108cea8a4dd4e7e152c472d" + [[package]] name = "parking" version = "2.2.1" @@ -8059,6 +8382,15 @@ dependencies = [ "rand_core 0.5.1", ] +[[package]] +name = "rand_xoshiro" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f97cdb2a36ed4183de61b2f824cc45c9f1037f28afe0a322e9fff4c108b5aaa" +dependencies = [ + "rand_core 0.6.4", +] + [[package]] name = "raw-cpuid" version = "11.6.0" @@ -9076,6 +9408,15 @@ dependencies = [ "zmij", ] +[[package]] +name = "serde_spanned" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8bbf91e5a4d6315eee45e704372590b30e260ee83af6639d64557f51b067776" +dependencies = [ + "serde_core", +] + [[package]] name = "serde_urlencoded" version = "0.7.1" @@ -9681,6 +10022,12 @@ dependencies = [ "unicode-properties", ] +[[package]] +name = "strsim" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" + [[package]] name = "strsim" version = "0.11.1" @@ -9868,7 +10215,7 @@ dependencies = [ "sha2", "snap", "storekey", - "strsim", + "strsim 0.11.1", "subtle", "sysinfo", "thiserror 1.0.69", @@ -10413,11 +10760,26 @@ dependencies = [ "tokio", ] +[[package]] +name = "toml" +version = "0.9.10+spec-1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0825052159284a1a8b4d6c0c86cbc801f2da5afd2b225fa548c72f2e74002f48" +dependencies = [ + "indexmap 2.12.1", + "serde_core", + "serde_spanned", + "toml_datetime", + "toml_parser", + "toml_writer", + "winnow", +] + [[package]] name = "toml_datetime" -version = "0.7.3" +version = "0.7.5+spec-1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f2cdb639ebbc97961c51720f858597f7f24c4fc295327923af55b74c3c724533" +checksum = "92e1cfed4a3038bc5a127e35a2d360f145e1f4b971b551a2ba5fd7aedf7e1347" dependencies = [ "serde_core", ] @@ -10436,13 +10798,19 @@ dependencies = [ [[package]] name = "toml_parser" -version = "1.0.4" +version = "1.0.6+spec-1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c0cbe268d35bdb4bb5a56a2de88d0ad0eb70af5384a99d648cd4b3d04039800e" +checksum = "a3198b4b0a8e11f09dd03e133c0280504d0801269e9afa46362ffde1cbeebf44" dependencies = [ "winnow", ] +[[package]] +name = "toml_writer" +version = "1.0.6+spec-1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab16f14aed21ee8bfd8ec22513f7287cd4a91aa92e44edfe2c17ddd004e92607" + [[package]] name = "tonic" version = "0.10.2" @@ -10766,6 +11134,15 @@ dependencies = [ "utf-8", ] +[[package]] +name = "twox-hash" +version = "2.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ea3136b675547379c4bd395ca6b938e5ad3c3d20fad76e7fe85f9e0d011419c" +dependencies = [ + "rand 0.9.2", +] + [[package]] name = "typed-builder" version = "0.22.0" @@ -11918,3 +12295,31 @@ name = "zmij" version = "1.0.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "30e0d8dffbae3d840f64bda38e28391faef673a7b5a6017840f2a106c8145868" + +[[package]] +name = "zstd" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e91ee311a569c327171651566e07972200e76fcfe2242a4fa446149a3881c08a" +dependencies = [ + "zstd-safe", +] + +[[package]] +name = "zstd-safe" +version = "7.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f49c4d5f0abb602a93fb8736af2a4f4dd9512e36f7f570d66e65ff867ed3b9d" +dependencies = [ + "zstd-sys", +] + +[[package]] +name = "zstd-sys" +version = "2.0.16+zstd.1.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91e19ebc2adc8f83e43039e79776e3fda8ca919132d68a1fed6a5faca2683748" +dependencies = [ + "cc", + "pkg-config", +] diff --git a/core/layers/foyer/src/lib.rs b/core/layers/foyer/src/lib.rs index f494e5d3608c..8579b6c55155 100644 --- a/core/layers/foyer/src/lib.rs +++ b/core/layers/foyer/src/lib.rs @@ -27,6 +27,18 @@ use opendal_core::raw::oio::*; use opendal_core::raw::*; use opendal_core::*; +/// Custom error type for when fetched data exceeds size limit. +#[derive(Debug)] +struct FetchSizeTooLarge; + +impl std::fmt::Display for FetchSizeTooLarge { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "fetched data size exceeds size limit") + } +} + +impl std::error::Error for FetchSizeTooLarge {} + fn extract_err(e: FoyerError) -> Error { let e = match e.downcast::() { Ok(e) => return e, @@ -40,7 +52,7 @@ fn extract_err(e: FoyerError) -> Error { /// /// It's possible to specify a version in the [`OpRead`] args: /// -/// - If a version is given, the object is cached under that versioned key. +/// - If a version is given, the object is cached under that versioned key. /// - If version is not supplied, the object is cached exactly as returned by the backend, /// We do NOT interpret `None` as "latest" and we do not promote it to any other version. #[derive(Debug, Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)] @@ -196,8 +208,9 @@ impl LayeredAccess for FoyerAccessor { } async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { - let path = path.to_string(); + let path_str = path.to_string(); let version = args.version().map(|v| v.to_string()); + let original_args = args.clone(); // Extract range bounds before async block to avoid lifetime issues let (range_start, range_end) = { @@ -207,39 +220,78 @@ impl LayeredAccess for FoyerAccessor { (start, end) }; - let entry = self + // Use fetch to read data from cache or fallback to remote. fetch() can automatically + // handle the thundering herd problem by ensuring only one request is made for a given + // key. + // + // Please note that we only cache the object if it's smaller than size_limit. And we'll + // fetch the ENTIRE object from remote to put it into cache, then slice it to the requested + // range. + let result = self .inner .cache .fetch( FoyerKey { - path: path.clone(), - version, + path: path_str.clone(), + version: version.clone(), }, || { let inner = self.inner.clone(); + let path_clone = path_str.clone(); async move { + // read the metadata first, if it's too large, do not cache + let metadata = inner + .accessor + .stat(&path_clone, OpStat::default()) + .await + .map_err(FoyerError::other)? + .into_metadata(); + + let size = metadata.content_length() as usize; + if !inner.size_limit.contains(&size) { + return Err(FoyerError::other(FetchSizeTooLarge)); + } + + // fetch the ENTIRE object from remote. let (_, mut reader) = inner .accessor - .read(&path, args.with_range(BytesRange::new(0, None))) + .read( + &path_clone, + OpRead::default().with_range(BytesRange::new(0, None)), + ) .await .map_err(FoyerError::other)?; let buffer = reader.read_all().await.map_err(FoyerError::other)?; + Ok(FoyerValue(buffer)) } }, ) - .await - .map_err(extract_err)?; - - let end = range_end.unwrap_or(entry.len() as u64); - let range = BytesContentRange::default() - .with_range(range_start, end - 1) - .with_size(entry.len() as _); - let buffer = entry.slice(range_start as usize..end as usize); - let rp = RpRead::new() - .with_size(Some(buffer.len() as _)) - .with_range(Some(range)); - Ok((rp, buffer)) + .await; + + // If got entry from cache, slice it to the requested range. If it's larger than size_limit, + // we'll simply forward the request to the underlying accessor with user's given range. + match result { + Ok(entry) => { + let end = range_end.unwrap_or(entry.len() as u64); + let range = BytesContentRange::default() + .with_range(range_start, end - 1) + .with_size(entry.len() as _); + let buffer = entry.slice(range_start as usize..end as usize); + let rp = RpRead::new() + .with_size(Some(buffer.len() as _)) + .with_range(Some(range)); + Ok((rp, buffer)) + } + Err(e) => match e.downcast::() { + Ok(_) => { + let (rp, mut reader) = self.inner.accessor.read(path, original_args).await?; + let buffer = reader.read_all().await?; + Ok((rp, buffer)) + } + Err(e) => Err(extract_err(e)), + }, + } } fn write( From 9d6ea64d6ce79c7e25c027eb40b9a3c4a81f6af3 Mon Sep 17 00:00:00 2001 From: Li Yazhou Date: Mon, 5 Jan 2026 23:18:15 +0800 Subject: [PATCH 19/20] add test case for size_limit check --- core/layers/foyer/src/lib.rs | 70 ++++++++++++++++++++++++++++++++++-- 1 file changed, 68 insertions(+), 2 deletions(-) diff --git a/core/layers/foyer/src/lib.rs b/core/layers/foyer/src/lib.rs index 8579b6c55155..4ebe2e92d434 100644 --- a/core/layers/foyer/src/lib.rs +++ b/core/layers/foyer/src/lib.rs @@ -466,11 +466,77 @@ mod tests { for i in 0..64 { let res = op.read(&key(i)).await; - let e = res.unwrap_err(); - assert_eq!(e.kind(), ErrorKind::NotFound); + assert!(res.is_err(), "should fail to read deleted file"); } } + #[tokio::test] + async fn test_size_limit() { + let dir = tempfile::tempdir().unwrap(); + + let cache = HybridCacheBuilder::new() + .memory(1024 * 1024) + .with_shards(1) + .storage(Engine::Large(LargeEngineOptions::new())) + .with_device_options( + DirectFsDeviceOptions::new(dir.path()) + .with_capacity(16 * MiB as usize) + .with_file_size(MiB as usize), + ) + .with_recover_mode(RecoverMode::None) + .build() + .await + .unwrap(); + + // Set size limit: only cache files between 1KB and 10KB + let op = Operator::new(Memory::default()) + .unwrap() + .layer(FoyerLayer::new(cache.clone()).with_size_limit(1024..10 * 1024)) + .finish(); + + let small_data = vec![1u8; 5 * 1024]; // 5KB - should be cached + let large_data = vec![2u8; 20 * 1024]; // 20KB - should NOT be cached + let tiny_data = vec![3u8; 512]; // 512B - below size limit, should NOT be cached + + // Write all files + op.write("small.txt", small_data.clone()).await.unwrap(); + op.write("large.txt", large_data.clone()).await.unwrap(); + op.write("tiny.txt", tiny_data.clone()).await.unwrap(); + + // All should be readable + let read_small = op.read("small.txt").await.unwrap(); + assert_eq!(read_small.to_vec(), small_data); + + let read_large = op.read("large.txt").await.unwrap(); + assert_eq!(read_large.to_vec(), large_data); + + let read_tiny = op.read("tiny.txt").await.unwrap(); + assert_eq!(read_tiny.to_vec(), tiny_data); + + // Clear the cache to test read-through behavior + cache.clear().await.unwrap(); + + // All files should still be readable from underlying storage + let read_small = op.read("small.txt").await.unwrap(); + assert_eq!(read_small.to_vec(), small_data); + + let read_large = op.read("large.txt").await.unwrap(); + assert_eq!(read_large.to_vec(), large_data); + + let read_tiny = op.read("tiny.txt").await.unwrap(); + assert_eq!(read_tiny.to_vec(), tiny_data); + + // After reading, small file should be cached, but large and tiny should not + // We can verify this by reading with range - cached files should support range reads + let read_small_range = op + .read_with("small.txt") + .range(0..1024) + .await + .unwrap(); + assert_eq!(read_small_range.len(), 1024); + assert_eq!(read_small_range.to_vec(), small_data[0..1024]); + } + #[test] fn test_error() { let e = Error::new(ErrorKind::NotFound, "not found"); From 1a550bee62571e84384bd4b4c5ba581ad2f3c655 Mon Sep 17 00:00:00 2001 From: Li Yazhou Date: Mon, 5 Jan 2026 23:30:52 +0800 Subject: [PATCH 20/20] fix: if the writes is too big, avoid appending to buf --- core/layers/foyer/src/lib.rs | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/core/layers/foyer/src/lib.rs b/core/layers/foyer/src/lib.rs index bfe8c9d6694f..1242204a8a65 100644 --- a/core/layers/foyer/src/lib.rs +++ b/core/layers/foyer/src/lib.rs @@ -309,6 +309,7 @@ impl LayeredAccess for FoyerAccessor { buf: QueueBuf::new(), path: path.to_string(), inner, + skip_cache: false, }, )) } @@ -341,18 +342,25 @@ pub struct Writer { buf: QueueBuf, path: String, inner: Arc>, + skip_cache: bool, } impl oio::Write for Writer { async fn write(&mut self, bs: Buffer) -> Result<()> { - self.buf.push(bs.clone()); + if self.inner.size_limit.contains(&(self.buf.len() + bs.len())) { + self.buf.push(bs.clone()); + self.skip_cache = false; + } else { + self.buf.clear(); + self.skip_cache = true; + } self.w.write(bs).await } async fn close(&mut self) -> Result { let buffer = self.buf.clone().collect(); let metadata = self.w.close().await?; - if self.inner.size_limit.contains(&buffer.len()) { + if !self.skip_cache { self.inner.cache.insert( FoyerKey { path: self.path.clone(), @@ -365,6 +373,7 @@ impl oio::Write for Writer { } async fn abort(&mut self) -> Result<()> { + self.buf.clear(); self.w.abort().await } } @@ -526,11 +535,7 @@ mod tests { // After reading, small file should be cached, but large and tiny should not // We can verify this by reading with range - cached files should support range reads - let read_small_range = op - .read_with("small.txt") - .range(0..1024) - .await - .unwrap(); + let read_small_range = op.read_with("small.txt").range(0..1024).await.unwrap(); assert_eq!(read_small_range.len(), 1024); assert_eq!(read_small_range.to_vec(), small_data[0..1024]); }