diff --git a/core/Cargo.lock b/core/Cargo.lock index 27894c616d86..2c2fd9bbae3c 100644 --- a/core/Cargo.lock +++ b/core/Cargo.lock @@ -566,6 +566,18 @@ dependencies = [ "syn 2.0.111", ] +[[package]] +name = "auto_enums" +version = "0.8.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c170965892137a3a9aeb000b4524aa3cc022a310e709d848b6e1cdce4ab4781" +dependencies = [ + "derive_utils", + "proc-macro2", + "quote", + "syn 2.0.111", +] + [[package]] name = "autocfg" version = "1.5.0" @@ -1712,7 +1724,7 @@ dependencies = [ "anstream", "anstyle", "clap_lex", - "strsim", + "strsim 0.11.1", "terminal_size", ] @@ -1743,6 +1755,12 @@ dependencies = [ "cc", ] +[[package]] +name = "cmsketch" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7ee2cfacbd29706479902b06d75ad8f1362900836aa32799eabc7e004bfd854" + [[package]] name = "coarsetime" version = "0.1.36" @@ -2260,6 +2278,16 @@ dependencies = [ "cipher", ] +[[package]] +name = "darling" +version = "0.14.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b750cb3417fd1b327431a470f388520309479ab0bf5e323505daf0290cd3850" +dependencies = [ + "darling_core 0.14.4", + "darling_macro 0.14.4", +] + [[package]] name = "darling" version = "0.20.11" @@ -2280,6 +2308,20 @@ dependencies = [ "darling_macro 0.21.3", ] +[[package]] +name = "darling_core" +version = "0.14.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "109c1ca6e6b7f82cc233a97004ea8ed7ca123a9af07a8230878fcfda9b158bf0" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2", + "quote", + "strsim 0.10.0", + "syn 1.0.109", +] + [[package]] name = "darling_core" version = "0.20.11" @@ -2290,7 +2332,7 @@ dependencies = [ "ident_case", "proc-macro2", "quote", - "strsim", + "strsim 0.11.1", "syn 2.0.111", ] @@ -2304,10 +2346,21 @@ dependencies = [ "ident_case", "proc-macro2", "quote", - "strsim", + "strsim 0.11.1", "syn 2.0.111", ] +[[package]] +name = "darling_macro" +version = "0.14.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4aab4dbc9f7611d8b55048a3a16d2d010c2c8334e46304b40ac1cc14bf3b48e" +dependencies = [ + "darling_core 0.14.4", + "quote", + "syn 1.0.109", +] + [[package]] name = "darling_macro" version = "0.20.11" @@ -2503,6 +2556,17 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "derive_utils" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ccfae181bab5ab6c5478b2ccb69e4c68a02f8c3ec72f6616bfec9dbc599d2ee0" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.111", +] + [[package]] name = "des" version = "0.8.1" @@ -2630,6 +2694,12 @@ version = "0.15.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b" +[[package]] +name = "downcast-rs" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75b325c5dbd37f80359721ad39aca5a29fb04c89279657cffdda8736d0c0b9d2" + [[package]] name = "dtoa" version = "1.0.10" @@ -3157,6 +3227,112 @@ dependencies = [ "uuid", ] +[[package]] +name = "foyer" +version = "0.18.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "642093b1a72c4a0ef89862484d669a353e732974781bb9c49a979526d1e30edc" +dependencies = [ + "equivalent", + "foyer-common", + "foyer-memory", + "foyer-storage", + "madsim-tokio", + "mixtrics", + "pin-project", + "serde", + "thiserror 2.0.17", + "tokio", + "tracing", +] + +[[package]] +name = "foyer-common" +version = "0.18.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9db9c0e4648b13e9216d785b308d43751ca975301aeb83e607ec630b6f956944" +dependencies = [ + "bincode", + "bytes", + "cfg-if", + "itertools 0.14.0", + "madsim-tokio", + "mixtrics", + "parking_lot 0.12.5", + "pin-project", + "serde", + "thiserror 2.0.17", + "tokio", + "twox-hash", +] + +[[package]] +name = "foyer-intrusive-collections" +version = "0.10.0-dev" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e4fee46bea69e0596130e3210e65d3424e0ac1e6df3bde6636304bdf1ca4a3b" +dependencies = [ + "memoffset 0.9.1", +] + +[[package]] +name = "foyer-memory" +version = "0.18.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "040dc38acbfca8f1def26bbbd9e9199090884aabb15de99f7bf4060be66ff608" +dependencies = [ + "arc-swap", + "bitflags 2.10.0", + "cmsketch", + "equivalent", + "foyer-common", + "foyer-intrusive-collections", + "hashbrown 0.15.5", + "itertools 0.14.0", + "madsim-tokio", + "mixtrics", + "parking_lot 0.12.5", + "pin-project", + "serde", + "thiserror 2.0.17", + "tokio", + "tracing", +] + +[[package]] +name = "foyer-storage" +version = "0.18.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "54a77ed888da490e997da6d6d62fcbce3f202ccf28be098c4ea595ca046fc4a9" +dependencies = [ + "allocator-api2", + "anyhow", + "auto_enums", + "bytes", + "equivalent", + "flume 0.11.1", + "foyer-common", + "foyer-memory", + "fs4", + "futures-core", + "futures-util", + "itertools 0.14.0", + "libc", + "lz4", + "madsim-tokio", + "ordered_hash_map", + "parking_lot 0.12.5", + "paste", + "pin-project", + "rand 0.9.2", + "serde", + "thiserror 2.0.17", + "tokio", + "tracing", + "twox-hash", + "zstd", +] + [[package]] name = "fs2" version = "0.4.3" @@ -3167,6 +3343,16 @@ dependencies = [ "winapi", ] +[[package]] +name = "fs4" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8640e34b88f7652208ce9e88b1a37a2ae95227d84abec377ccd3c5cfeb141ed4" +dependencies = [ + "rustix 1.1.2", + "windows-sys 0.59.0", +] + [[package]] name = "fs_extra" version = "1.3.0" @@ -3639,6 +3825,15 @@ dependencies = [ "ahash 0.7.8", ] +[[package]] +name = "hashbrown" +version = "0.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43a3c133739dddd0d2990f9a4bdf8eb4b21ef50e4851ca85ab661199821d510e" +dependencies = [ + "ahash 0.8.12", +] + [[package]] name = "hashbrown" version = "0.14.5" @@ -4854,6 +5049,25 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154" +[[package]] +name = "lz4" +version = "1.28.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a20b523e860d03443e98350ceaac5e71c6ba89aea7d960769ec3ce37f4de5af4" +dependencies = [ + "lz4-sys", +] + +[[package]] +name = "lz4-sys" +version = "1.11.1+lz4-1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6bd8c0d6c6ed0cd30b3652886bb8711dc4bb01d637a68105a3d5158039b418e6" +dependencies = [ + "cc", + "libc", +] + [[package]] name = "mac" version = "0.1.1" @@ -4908,6 +5122,61 @@ dependencies = [ "syn 2.0.111", ] +[[package]] +name = "madsim" +version = "0.2.34" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18351aac4194337d6ea9ffbd25b3d1540ecc0754142af1bff5ba7392d1f6f771" +dependencies = [ + "ahash 0.8.12", + "async-channel 2.5.0", + "async-stream", + "async-task", + "bincode", + "bytes", + "downcast-rs", + "errno", + "futures-util", + "lazy_static", + "libc", + "madsim-macros", + "naive-timer", + "panic-message", + "rand 0.8.5", + "rand_xoshiro", + "rustversion", + "serde", + "spin", + "tokio", + "tokio-util", + "toml", + "tracing", + "tracing-subscriber", +] + +[[package]] +name = "madsim-macros" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3d248e97b1a48826a12c3828d921e8548e714394bf17274dd0a93910dc946e1" +dependencies = [ + "darling 0.14.4", + "proc-macro2", + "quote", + "syn 1.0.109", +] + +[[package]] +name = "madsim-tokio" +version = "0.2.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d3eb2acc57c82d21d699119b859e2df70a91dbdb84734885a1e72be83bdecb5" +dependencies = [ + "madsim", + "spin", + "tokio", +] + [[package]] name = "maplit" version = "1.0.2" @@ -5019,6 +5288,15 @@ dependencies = [ "autocfg", ] +[[package]] +name = "memoffset" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "488016bfae457b036d996092f6cb448677611ce4449e970ceaf42695203f218a" +dependencies = [ + "autocfg", +] + [[package]] name = "metrics" version = "0.24.3" @@ -5122,6 +5400,16 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "mixtrics" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb252c728b9d77c6ef9103f0c81524fa0a3d3b161d0a936295d7fbeff6e04c11" +dependencies = [ + "itertools 0.14.0", + "parking_lot 0.12.5", +] + [[package]] name = "moka" version = "0.12.12" @@ -5195,7 +5483,7 @@ dependencies = [ "sha2", "socket2 0.6.1", "stringprep", - "strsim", + "strsim 0.11.1", "take_mut", "thiserror 2.0.17", "tokio", @@ -5275,6 +5563,12 @@ version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1d87ecb2933e8aeadb3e3a02b828fed80a7528047e68b4f424523a0981a3a084" +[[package]] +name = "naive-timer" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "034a0ad7deebf0c2abcf2435950a6666c3c15ea9d8fad0c0f48efa8a7f843fed" + [[package]] name = "nanoid" version = "0.4.0" @@ -5363,7 +5657,7 @@ dependencies = [ "bitflags 1.3.2", "cfg-if", "libc", - "memoffset", + "memoffset 0.7.1", "pin-utils", ] @@ -5578,6 +5872,7 @@ dependencies = [ "opendal-layer-dtrace", "opendal-layer-fastmetrics", "opendal-layer-fastrace", + "opendal-layer-foyer", "opendal-layer-hotpath", "opendal-layer-immutable-index", "opendal-layer-logging", @@ -5832,6 +6127,19 @@ dependencies = [ "tokio", ] +[[package]] +name = "opendal-layer-foyer" +version = "0.55.0" +dependencies = [ + "bincode", + "foyer", + "opendal-core", + "serde", + "size", + "tempfile", + "tokio", +] + [[package]] name = "opendal-layer-hotpath" version = "0.55.0" @@ -7053,6 +7361,15 @@ dependencies = [ "hashbrown 0.14.5", ] +[[package]] +name = "ordered_hash_map" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab0e5f22bf6dd04abd854a8874247813a8fa2c8c1260eba6fbb150270ce7c176" +dependencies = [ + "hashbrown 0.13.2", +] + [[package]] name = "os_pipe" version = "1.2.3" @@ -7080,6 +7397,12 @@ dependencies = [ "sha2", ] +[[package]] +name = "panic-message" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "384e52fd8fbd4cbe3c317e8216260c21a0f9134de108cea8a4dd4e7e152c472d" + [[package]] name = "parking" version = "2.2.1" @@ -8059,6 +8382,15 @@ dependencies = [ "rand_core 0.5.1", ] +[[package]] +name = "rand_xoshiro" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f97cdb2a36ed4183de61b2f824cc45c9f1037f28afe0a322e9fff4c108b5aaa" +dependencies = [ + "rand_core 0.6.4", +] + [[package]] name = "raw-cpuid" version = "11.6.0" @@ -9076,6 +9408,15 @@ dependencies = [ "zmij", ] +[[package]] +name = "serde_spanned" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8bbf91e5a4d6315eee45e704372590b30e260ee83af6639d64557f51b067776" +dependencies = [ + "serde_core", +] + [[package]] name = "serde_urlencoded" version = "0.7.1" @@ -9681,6 +10022,12 @@ dependencies = [ "unicode-properties", ] +[[package]] +name = "strsim" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" + [[package]] name = "strsim" version = "0.11.1" @@ -9868,7 +10215,7 @@ dependencies = [ "sha2", "snap", "storekey", - "strsim", + "strsim 0.11.1", "subtle", "sysinfo", "thiserror 1.0.69", @@ -10413,11 +10760,26 @@ dependencies = [ "tokio", ] +[[package]] +name = "toml" +version = "0.9.10+spec-1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0825052159284a1a8b4d6c0c86cbc801f2da5afd2b225fa548c72f2e74002f48" +dependencies = [ + "indexmap 2.12.1", + "serde_core", + "serde_spanned", + "toml_datetime", + "toml_parser", + "toml_writer", + "winnow", +] + [[package]] name = "toml_datetime" -version = "0.7.3" +version = "0.7.5+spec-1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f2cdb639ebbc97961c51720f858597f7f24c4fc295327923af55b74c3c724533" +checksum = "92e1cfed4a3038bc5a127e35a2d360f145e1f4b971b551a2ba5fd7aedf7e1347" dependencies = [ "serde_core", ] @@ -10436,13 +10798,19 @@ dependencies = [ [[package]] name = "toml_parser" -version = "1.0.4" +version = "1.0.6+spec-1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c0cbe268d35bdb4bb5a56a2de88d0ad0eb70af5384a99d648cd4b3d04039800e" +checksum = "a3198b4b0a8e11f09dd03e133c0280504d0801269e9afa46362ffde1cbeebf44" dependencies = [ "winnow", ] +[[package]] +name = "toml_writer" +version = "1.0.6+spec-1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab16f14aed21ee8bfd8ec22513f7287cd4a91aa92e44edfe2c17ddd004e92607" + [[package]] name = "tonic" version = "0.10.2" @@ -10766,6 +11134,15 @@ dependencies = [ "utf-8", ] +[[package]] +name = "twox-hash" +version = "2.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ea3136b675547379c4bd395ca6b938e5ad3c3d20fad76e7fe85f9e0d011419c" +dependencies = [ + "rand 0.9.2", +] + [[package]] name = "typed-builder" version = "0.22.0" @@ -11918,3 +12295,31 @@ name = "zmij" version = "1.0.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "30e0d8dffbae3d840f64bda38e28391faef673a7b5a6017840f2a106c8145868" + +[[package]] +name = "zstd" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e91ee311a569c327171651566e07972200e76fcfe2242a4fa446149a3881c08a" +dependencies = [ + "zstd-safe", +] + +[[package]] +name = "zstd-safe" +version = "7.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f49c4d5f0abb602a93fb8736af2a4f4dd9512e36f7f570d66e65ff867ed3b9d" +dependencies = [ + "zstd-sys", +] + +[[package]] +name = "zstd-sys" +version = "2.0.16+zstd.1.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91e19ebc2adc8f83e43039e79776e3fda8ca919132d68a1fed6a5faca2683748" +dependencies = [ + "cc", + "pkg-config", +] diff --git a/core/Cargo.toml b/core/Cargo.toml index 55b3962b9363..f8002096b3e8 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -99,6 +99,7 @@ layers-concurrent-limit = ["dep:opendal-layer-concurrent-limit"] layers-dtrace = ["dep:opendal-layer-dtrace"] layers-fastmetrics = ["dep:opendal-layer-fastmetrics"] layers-fastrace = ["dep:opendal-layer-fastrace"] +layers-foyer = ["dep:opendal-layer-foyer"] layers-hotpath = ["dep:opendal-layer-hotpath"] layers-immutable-index = ["dep:opendal-layer-immutable-index"] layers-logging = ["dep:opendal-layer-logging"] @@ -211,6 +212,7 @@ opendal-layer-chaos = { path = "layers/chaos", version = "0.55.0", optional = tr opendal-layer-concurrent-limit = { path = "layers/concurrent-limit", version = "0.55.0", optional = true, default-features = false } opendal-layer-fastmetrics = { path = "layers/fastmetrics", version = "0.55.0", optional = true, default-features = false } opendal-layer-fastrace = { path = "layers/fastrace", version = "0.55.0", optional = true, default-features = false } +opendal-layer-foyer = { path = "layers/foyer", version = "0.55.0", optional = true, default-features = false } opendal-layer-hotpath = { path = "layers/hotpath", version = "0.55.0", optional = true, default-features = false } opendal-layer-immutable-index = { path = "layers/immutable-index", version = "0.55.0", optional = true, default-features = false } opendal-layer-logging = { path = "layers/logging", version = "0.55.0", optional = true, default-features = false } diff --git a/core/core/src/docs/rfcs/0000_foyer_integration.md b/core/core/src/docs/rfcs/6370_foyer_integration.md similarity index 100% rename from core/core/src/docs/rfcs/0000_foyer_integration.md rename to core/core/src/docs/rfcs/6370_foyer_integration.md diff --git a/core/layers/foyer/Cargo.toml b/core/layers/foyer/Cargo.toml new file mode 100644 index 000000000000..bba6fbc98b37 --- /dev/null +++ b/core/layers/foyer/Cargo.toml @@ -0,0 +1,45 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +description = "Apache OpenDAL foyer hybrid cache layer" +name = "opendal-layer-foyer" + +authors = { workspace = true } +edition = { workspace = true } +homepage = { workspace = true } +license = { workspace = true } +repository = { workspace = true } +rust-version = { workspace = true } +version = { workspace = true } + +[package.metadata.docs.rs] +all-features = true + +[dependencies] +bincode = "1" +foyer = { version = "0.18", features = ["serde"] } +opendal-core = { path = "../../core", version = "0.55.0", default-features = false } +serde = { workspace = true, features = ["derive"] } + +[dev-dependencies] +opendal-core = { path = "../../core", version = "0.55.0", features = [ + "services-memory", +] } +size = "0.5" +tempfile = "3" +tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } diff --git a/core/layers/foyer/src/lib.rs b/core/layers/foyer/src/lib.rs new file mode 100644 index 000000000000..1242204a8a65 --- /dev/null +++ b/core/layers/foyer/src/lib.rs @@ -0,0 +1,630 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{ + future::Future, + ops::{Bound, Deref, Range, RangeBounds}, + sync::Arc, +}; + +use foyer::{Code, CodeError, Error as FoyerError, HybridCache}; + +use opendal_core::raw::oio::*; +use opendal_core::raw::*; +use opendal_core::*; + +/// Custom error type for when fetched data exceeds size limit. +#[derive(Debug)] +struct FetchSizeTooLarge; + +impl std::fmt::Display for FetchSizeTooLarge { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "fetched data size exceeds size limit") + } +} + +impl std::error::Error for FetchSizeTooLarge {} + +fn extract_err(e: FoyerError) -> Error { + let e = match e.downcast::() { + Ok(e) => return e, + Err(e) => e, + }; + Error::new(ErrorKind::Unexpected, e.to_string()) +} + +/// [`FoyerKey`] is a key for the foyer cache. It's encoded via bincode, which is +/// backed by foyer's "serde" feature. +/// +/// It's possible to specify a version in the [`OpRead`] args: +/// +/// - If a version is given, the object is cached under that versioned key. +/// - If version is not supplied, the object is cached exactly as returned by the backend, +/// We do NOT interpret `None` as "latest" and we do not promote it to any other version. +#[derive(Debug, Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)] +pub struct FoyerKey { + pub path: String, + pub version: Option, +} + +/// [`FoyerValue`] is a wrapper around `Buffer` that implements the `Code` trait. +#[derive(Debug)] +pub struct FoyerValue(pub Buffer); + +impl Deref for FoyerValue { + type Target = Buffer; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl Code for FoyerValue { + fn encode(&self, writer: &mut impl std::io::Write) -> std::result::Result<(), CodeError> { + let len = self.0.len() as u64; + writer.write_all(&len.to_le_bytes())?; + std::io::copy(&mut self.0.clone(), writer)?; + Ok(()) + } + + fn decode(reader: &mut impl std::io::Read) -> std::result::Result + where + Self: Sized, + { + let mut len_bytes = [0u8; 8]; + reader.read_exact(&mut len_bytes)?; + let len = u64::from_le_bytes(len_bytes) as usize; + let mut buffer = vec![0u8; len]; + reader.read_exact(&mut buffer[..len])?; + Ok(FoyerValue(buffer.into())) + } + + fn estimated_size(&self) -> usize { + 8 + self.0.len() + } +} + +/// Hybrid cache layer for OpenDAL that uses [foyer](https://github.com/foyer-rs/foyer) for caching. +/// +/// # Operation Behavior +/// - `write`: [`FoyerLayer`] will write to the foyer hybrid cache after the service's write operation is completed. +/// - `read`: [`FoyerLayer`] will first check the foyer hybrid cache for the data. If the data is not found, it will perform the read operation on the service and cache the result. +/// - `delete`: [`FoyerLayer`] will remove the data from the foyer hybrid cache regardless of whether the service's delete operation is successful. +/// - Other operations: [`FoyerLayer`] will not cache the results of other operations, such as `list`, `copy`, `rename`, etc. They will be passed through to the underlying accessor without caching. +/// +/// # Examples +/// +/// ```no_run +/// use opendal_core::{Operator, services::Memory}; +/// use opendal_layer_foyer::FoyerLayer; +/// use foyer::{HybridCacheBuilder, Engine}; +/// +/// # async fn example() -> Result<(), Box> { +/// let cache = HybridCacheBuilder::new() +/// .memory(64 * 1024 * 1024) // 64MB memory cache +/// .with_shards(4) +/// .storage(Engine::Large(Default::default())) +/// .build() +/// .await?; +/// +/// let op = Operator::new(Memory::default())? +/// .layer(FoyerLayer::new(cache)) +/// .finish(); +/// # Ok(()) +/// # } +/// ``` +/// +/// # Note +/// +/// If the object version is enabled, the foyer cache layer will treat the objects with same path but different versions as different objects. +#[derive(Debug)] +pub struct FoyerLayer { + cache: HybridCache, + size_limit: Range, +} + +impl FoyerLayer { + /// Creates a new `FoyerLayer` with the given foyer hybrid cache. + pub fn new(cache: HybridCache) -> Self { + FoyerLayer { + cache, + size_limit: 0..usize::MAX, + } + } + + /// Sets the size limit for caching. + /// + /// It is recommended to set a size limit to avoid caching large files that may not be suitable for caching. + pub fn with_size_limit>(mut self, size_limit: R) -> Self { + let start = match size_limit.start_bound() { + Bound::Included(v) => *v, + Bound::Excluded(v) => *v + 1, + Bound::Unbounded => 0, + }; + let end = match size_limit.end_bound() { + Bound::Included(v) => *v + 1, + Bound::Excluded(v) => *v, + Bound::Unbounded => usize::MAX, + }; + self.size_limit = start..end; + self + } +} + +impl Layer for FoyerLayer { + type LayeredAccess = FoyerAccessor; + + fn layer(&self, accessor: A) -> Self::LayeredAccess { + let cache = self.cache.clone(); + FoyerAccessor { + inner: Arc::new(Inner { + accessor, + cache, + size_limit: self.size_limit.clone(), + }), + } + } +} + +#[derive(Debug)] +struct Inner { + accessor: A, + cache: HybridCache, + size_limit: Range, +} + +#[derive(Debug)] +pub struct FoyerAccessor { + inner: Arc>, +} + +impl LayeredAccess for FoyerAccessor { + type Inner = A; + type Reader = Buffer; + type Writer = Writer; + type Lister = A::Lister; + type Deleter = Deleter; + + fn inner(&self) -> &Self::Inner { + &self.inner.accessor + } + + fn info(&self) -> Arc { + self.inner.accessor.info() + } + + async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { + let path_str = path.to_string(); + let version = args.version().map(|v| v.to_string()); + let original_args = args.clone(); + + // Extract range bounds before async block to avoid lifetime issues + let (range_start, range_end) = { + let r = args.range(); + let start = r.offset(); + let end = r.size().map(|size| start + size); + (start, end) + }; + + // Use fetch to read data from cache or fallback to remote. fetch() can automatically + // handle the thundering herd problem by ensuring only one request is made for a given + // key. + // + // Please note that we only cache the object if it's smaller than size_limit. And we'll + // fetch the ENTIRE object from remote to put it into cache, then slice it to the requested + // range. + let result = self + .inner + .cache + .fetch( + FoyerKey { + path: path_str.clone(), + version: version.clone(), + }, + || { + let inner = self.inner.clone(); + let path_clone = path_str.clone(); + async move { + // read the metadata first, if it's too large, do not cache + let metadata = inner + .accessor + .stat(&path_clone, OpStat::default()) + .await + .map_err(FoyerError::other)? + .into_metadata(); + + let size = metadata.content_length() as usize; + if !inner.size_limit.contains(&size) { + return Err(FoyerError::other(FetchSizeTooLarge)); + } + + // fetch the ENTIRE object from remote. + let (_, mut reader) = inner + .accessor + .read( + &path_clone, + OpRead::default().with_range(BytesRange::new(0, None)), + ) + .await + .map_err(FoyerError::other)?; + let buffer = reader.read_all().await.map_err(FoyerError::other)?; + + Ok(FoyerValue(buffer)) + } + }, + ) + .await; + + // If got entry from cache, slice it to the requested range. If it's larger than size_limit, + // we'll simply forward the request to the underlying accessor with user's given range. + match result { + Ok(entry) => { + let end = range_end.unwrap_or(entry.len() as u64); + let range = BytesContentRange::default() + .with_range(range_start, end - 1) + .with_size(entry.len() as _); + let buffer = entry.slice(range_start as usize..end as usize); + let rp = RpRead::new() + .with_size(Some(buffer.len() as _)) + .with_range(Some(range)); + Ok((rp, buffer)) + } + Err(e) => match e.downcast::() { + Ok(_) => { + let (rp, mut reader) = self.inner.accessor.read(path, original_args).await?; + let buffer = reader.read_all().await?; + Ok((rp, buffer)) + } + Err(e) => Err(extract_err(e)), + }, + } + } + + fn write( + &self, + path: &str, + args: OpWrite, + ) -> impl Future> + MaybeSend { + let inner = self.inner.clone(); + async move { + let (rp, w) = self.inner.accessor.write(path, args).await?; + Ok(( + rp, + Writer { + w, + buf: QueueBuf::new(), + path: path.to_string(), + inner, + skip_cache: false, + }, + )) + } + } + + fn delete(&self) -> impl Future> + MaybeSend { + let inner = self.inner.clone(); + async move { + let (rp, d) = inner.accessor.delete().await?; + Ok(( + rp, + Deleter { + deleter: d, + keys: vec![], + inner, + }, + )) + } + } + + async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> { + self.inner.accessor.list(path, args).await + } + + // TODO(MrCroxx): Implement copy, rename with foyer cache. +} + +pub struct Writer { + w: A::Writer, + buf: QueueBuf, + path: String, + inner: Arc>, + skip_cache: bool, +} + +impl oio::Write for Writer { + async fn write(&mut self, bs: Buffer) -> Result<()> { + if self.inner.size_limit.contains(&(self.buf.len() + bs.len())) { + self.buf.push(bs.clone()); + self.skip_cache = false; + } else { + self.buf.clear(); + self.skip_cache = true; + } + self.w.write(bs).await + } + + async fn close(&mut self) -> Result { + let buffer = self.buf.clone().collect(); + let metadata = self.w.close().await?; + if !self.skip_cache { + self.inner.cache.insert( + FoyerKey { + path: self.path.clone(), + version: metadata.version().map(|v| v.to_string()), + }, + FoyerValue(buffer), + ); + } + Ok(metadata) + } + + async fn abort(&mut self) -> Result<()> { + self.buf.clear(); + self.w.abort().await + } +} + +pub struct Deleter { + deleter: A::Deleter, + keys: Vec, + inner: Arc>, +} + +impl oio::Delete for Deleter { + async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> { + self.deleter.delete(path, args.clone()).await?; + self.keys.push(FoyerKey { + path: path.to_string(), + version: args.version().map(|v| v.to_string()), + }); + Ok(()) + } + + async fn close(&mut self) -> Result<()> { + for key in &self.keys { + self.inner.cache.remove(key); + } + self.deleter.close().await + } +} + +#[cfg(test)] +mod tests { + use foyer::{ + DirectFsDeviceOptions, Engine, HybridCacheBuilder, LargeEngineOptions, RecoverMode, + }; + use opendal_core::{Operator, services::Memory}; + use size::consts::MiB; + use std::io::Cursor; + + use super::*; + + fn key(i: u8) -> String { + format!("obj-{i}") + } + + fn value(i: u8) -> Vec { + // ~ 64KiB with metadata + vec![i; 63 * 1024] + } + + #[tokio::test] + async fn test() { + let dir = tempfile::tempdir().unwrap(); + + let cache = HybridCacheBuilder::new() + .memory(10) + .with_shards(1) + .storage(Engine::Large(LargeEngineOptions::new())) + .with_device_options( + DirectFsDeviceOptions::new(dir.path()) + .with_capacity(16 * MiB as usize) + .with_file_size(MiB as usize), + ) + .with_recover_mode(RecoverMode::None) + .build() + .await + .unwrap(); + + let op = Operator::new(Memory::default()) + .unwrap() + .layer(FoyerLayer::new(cache.clone())) + .finish(); + + assert!(op.list("/").await.unwrap().is_empty()); + + for i in 0..64 { + op.write(&key(i), value(i)).await.unwrap(); + } + + assert_eq!(op.list("/").await.unwrap().len(), 64); + + for i in 0..64 { + let buf = op.read(&key(i)).await.unwrap(); + assert_eq!(buf.to_vec(), value(i)); + } + + cache.clear().await.unwrap(); + + for i in 0..64 { + let buf = op.read(&key(i)).await.unwrap(); + assert_eq!(buf.to_vec(), value(i)); + } + + for i in 0..64 { + op.delete(&key(i)).await.unwrap(); + } + + assert!(op.list("/").await.unwrap().is_empty()); + + for i in 0..64 { + let res = op.read(&key(i)).await; + assert!(res.is_err(), "should fail to read deleted file"); + } + } + + #[tokio::test] + async fn test_size_limit() { + let dir = tempfile::tempdir().unwrap(); + + let cache = HybridCacheBuilder::new() + .memory(1024 * 1024) + .with_shards(1) + .storage(Engine::Large(LargeEngineOptions::new())) + .with_device_options( + DirectFsDeviceOptions::new(dir.path()) + .with_capacity(16 * MiB as usize) + .with_file_size(MiB as usize), + ) + .with_recover_mode(RecoverMode::None) + .build() + .await + .unwrap(); + + // Set size limit: only cache files between 1KB and 10KB + let op = Operator::new(Memory::default()) + .unwrap() + .layer(FoyerLayer::new(cache.clone()).with_size_limit(1024..10 * 1024)) + .finish(); + + let small_data = vec![1u8; 5 * 1024]; // 5KB - should be cached + let large_data = vec![2u8; 20 * 1024]; // 20KB - should NOT be cached + let tiny_data = vec![3u8; 512]; // 512B - below size limit, should NOT be cached + + // Write all files + op.write("small.txt", small_data.clone()).await.unwrap(); + op.write("large.txt", large_data.clone()).await.unwrap(); + op.write("tiny.txt", tiny_data.clone()).await.unwrap(); + + // All should be readable + let read_small = op.read("small.txt").await.unwrap(); + assert_eq!(read_small.to_vec(), small_data); + + let read_large = op.read("large.txt").await.unwrap(); + assert_eq!(read_large.to_vec(), large_data); + + let read_tiny = op.read("tiny.txt").await.unwrap(); + assert_eq!(read_tiny.to_vec(), tiny_data); + + // Clear the cache to test read-through behavior + cache.clear().await.unwrap(); + + // All files should still be readable from underlying storage + let read_small = op.read("small.txt").await.unwrap(); + assert_eq!(read_small.to_vec(), small_data); + + let read_large = op.read("large.txt").await.unwrap(); + assert_eq!(read_large.to_vec(), large_data); + + let read_tiny = op.read("tiny.txt").await.unwrap(); + assert_eq!(read_tiny.to_vec(), tiny_data); + + // After reading, small file should be cached, but large and tiny should not + // We can verify this by reading with range - cached files should support range reads + let read_small_range = op.read_with("small.txt").range(0..1024).await.unwrap(); + assert_eq!(read_small_range.len(), 1024); + assert_eq!(read_small_range.to_vec(), small_data[0..1024]); + } + + #[test] + fn test_error() { + let e = Error::new(ErrorKind::NotFound, "not found"); + let fe = FoyerError::other(e); + let oe = extract_err(fe); + assert_eq!(oe.kind(), ErrorKind::NotFound); + } + + #[test] + fn test_foyer_key_version_none_vs_empty() { + let key_none = FoyerKey { + path: "test/path".to_string(), + version: None, + }; + + let key_empty = FoyerKey { + path: "test/path".to_string(), + version: Some("".to_string()), + }; + + let mut buf_none = Vec::new(); + key_none.encode(&mut buf_none).unwrap(); + + let mut buf_empty = Vec::new(); + key_empty.encode(&mut buf_empty).unwrap(); + + assert_ne!( + buf_none, buf_empty, + "Serialization of version=None and version=\"\" should be different" + ); + + let decoded_none = FoyerKey::decode(&mut Cursor::new(&buf_none)).unwrap(); + assert_eq!(decoded_none, key_none); + let decoded_empty = FoyerKey::decode(&mut Cursor::new(&buf_empty)).unwrap(); + assert_eq!(decoded_empty, key_empty); + } + + #[test] + fn test_foyer_key_serde() { + use std::io::Cursor; + + let test_cases = vec![ + FoyerKey { + path: "simple".to_string(), + version: None, + }, + FoyerKey { + path: "with/slash/path".to_string(), + version: None, + }, + FoyerKey { + path: "versioned".to_string(), + version: Some("v1.0.0".to_string()), + }, + FoyerKey { + path: "empty-version".to_string(), + version: Some("".to_string()), + }, + FoyerKey { + path: "".to_string(), + version: None, + }, + FoyerKey { + path: "unicode/θ·―εΎ„/πŸš€".to_string(), + version: Some("η‰ˆζœ¬-1".to_string()), + }, + FoyerKey { + path: "long/".to_string().repeat(100), + version: Some("long-version-".to_string().repeat(50)), + }, + ]; + + for original in test_cases { + let mut buffer = Vec::new(); + original + .encode(&mut buffer) + .expect("encoding should succeed"); + + let decoded = + FoyerKey::decode(&mut Cursor::new(&buffer)).expect("decoding should succeed"); + + assert_eq!( + decoded, original, + "decode(encode(key)) should equal original key" + ); + } + } +} diff --git a/core/src/lib.rs b/core/src/lib.rs index da5589d779d4..2205a3fb09e5 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -385,6 +385,8 @@ pub mod layers { pub use opendal_layer_fastmetrics::*; #[cfg(feature = "layers-fastrace")] pub use opendal_layer_fastrace::*; + #[cfg(feature = "layers-foyer")] + pub use opendal_layer_foyer::*; #[cfg(feature = "layers-hotpath")] pub use opendal_layer_hotpath::*; #[cfg(feature = "layers-immutable-index")]