@@ -25,14 +25,16 @@ use std::collections::{HashMap, HashSet};
2525use std:: convert:: TryInto ;
2626use std:: sync:: atomic:: Ordering :: Relaxed ;
2727use std:: sync:: atomic:: { AtomicU64 , AtomicUsize , Ordering } ;
28- use std:: sync:: Arc ;
28+ use std:: sync:: { Arc , RwLock } ;
2929use std:: time:: Instant ;
3030
3131use bytes:: Bytes ;
32+ use lru:: LruCache ;
3233use rayon:: prelude:: * ;
3334use serde:: { Deserialize , Serialize } ;
3435#[ allow( unused_imports) ]
3536use tracing:: { debug, error, info, warn} ;
37+ use tracing:: { instrument, trace} ;
3638
3739use crate :: backup:: BackupStats ;
3840use crate :: blockhash:: BlockHash ;
@@ -46,6 +48,9 @@ const BLOCKDIR_FILE_NAME_LEN: usize = crate::BLAKE_HASH_SIZE_BYTES * 2;
4648/// Take this many characters from the block hash to form the subdirectory name.
4749const SUBDIR_NAME_CHARS : usize = 3 ;
4850
51+ /// Cache this many blocks in memory, of up to 1MB each.
52+ const CACHE_SIZE : usize = 1000 ;
53+
4954/// Points to some compressed data inside the block dir.
5055///
5156/// Identifiers are: which file contains it, at what (pre-compression) offset,
@@ -69,6 +74,8 @@ pub struct Address {
6974pub struct BlockDir {
7075 transport : Arc < dyn Transport > ,
7176 pub stats : BlockDirStats ,
77+ // TODO: There are fancier caches and they might help, but this one works, and Stretto did not work for me.
78+ cache : RwLock < LruCache < BlockHash , Bytes > > ,
7279}
7380
7481/// Returns the transport-relative subdirectory name.
@@ -87,6 +94,7 @@ impl BlockDir {
8794 BlockDir {
8895 transport,
8996 stats : BlockDirStats :: default ( ) ,
97+ cache : RwLock :: new ( LruCache :: new ( CACHE_SIZE . try_into ( ) . unwrap ( ) ) ) ,
9098 }
9199 }
92100
@@ -111,6 +119,10 @@ impl BlockDir {
111119 return Ok ( hash) ;
112120 }
113121 let compressed = Compressor :: new ( ) . compress ( & block_data) ?;
122+ self . cache
123+ . write ( )
124+ . expect ( "Lock cache" )
125+ . put ( hash. clone ( ) , block_data) ;
114126 let comp_len: u64 = compressed. len ( ) . try_into ( ) . unwrap ( ) ;
115127 let hex_hash = hash. to_string ( ) ;
116128 let relpath = block_relpath ( & hash) ;
@@ -131,6 +143,10 @@ impl BlockDir {
131143 /// So, these are specifically treated as missing, so there's a chance to heal
132144 /// them later.
133145 pub fn contains ( & self , hash : & BlockHash ) -> Result < bool > {
146+ if self . cache . read ( ) . expect ( "Lock cache" ) . contains ( hash) {
147+ self . stats . cache_hit . fetch_add ( 1 , Relaxed ) ;
148+ return Ok ( true ) ;
149+ }
134150 match self . transport . metadata ( & block_relpath ( hash) ) {
135151 Err ( err) if err. is_not_found ( ) => Ok ( false ) ,
136152 Err ( err) => {
@@ -165,10 +181,13 @@ impl BlockDir {
165181 /// Return the entire contents of the block.
166182 ///
167183 /// Checks that the hash is correct with the contents.
184+ #[ instrument( skip( self ) ) ]
168185 pub fn get_block_content ( & self , hash : & BlockHash ) -> Result < Bytes > {
169- // TODO: Reuse decompressor buffer.
170- // TODO: Most importantly, cache decompressed blocks!
171- // TODO: Stats for block reads, maybe in the blockdir?
186+ if let Some ( hit) = self . cache . write ( ) . expect ( "Lock cache" ) . get ( hash) {
187+ self . stats . cache_hit . fetch_add ( 1 , Relaxed ) ;
188+ trace ! ( "Block cache hit" ) ;
189+ return Ok ( hit. clone ( ) ) ;
190+ }
172191 let mut decompressor = Decompressor :: new ( ) ;
173192 let block_relpath = block_relpath ( hash) ;
174193 let compressed_bytes = self . transport . read_file ( & block_relpath) ?;
@@ -178,6 +197,10 @@ impl BlockDir {
178197 error ! ( %hash, %actual_hash, %block_relpath, "Block file has wrong hash" ) ;
179198 return Err ( Error :: BlockCorrupt { hash : hash. clone ( ) } ) ;
180199 }
200+ self . cache
201+ . write ( )
202+ . expect ( "Lock cache" )
203+ . put ( hash. clone ( ) , decompressed_bytes. clone ( ) ) ;
181204 self . stats . read_blocks . fetch_add ( 1 , Relaxed ) ;
182205 self . stats
183206 . read_block_compressed_bytes
@@ -189,6 +212,7 @@ impl BlockDir {
189212 }
190213
191214 pub fn delete_block ( & self , hash : & BlockHash ) -> Result < ( ) > {
215+ self . cache . write ( ) . expect ( "Lock cache" ) . pop ( hash) ;
192216 self . transport
193217 . remove_file ( & block_relpath ( hash) )
194218 . map_err ( Error :: from)
@@ -290,6 +314,7 @@ pub struct BlockDirStats {
290314 pub read_blocks : AtomicUsize ,
291315 pub read_block_compressed_bytes : AtomicUsize ,
292316 pub read_block_uncompressed_bytes : AtomicUsize ,
317+ pub cache_hit : AtomicUsize ,
293318}
294319
295320#[ cfg( test) ]
@@ -309,6 +334,9 @@ mod test {
309334 . store_or_deduplicate ( Bytes :: from ( "stuff" ) , & mut stats)
310335 . unwrap ( ) ;
311336 assert ! ( blockdir. contains( & hash) . unwrap( ) ) ;
337+
338+ // Open again to get a fresh cache
339+ let blockdir = BlockDir :: open ( open_local_transport ( tempdir. path ( ) ) . unwrap ( ) ) ;
312340 OpenOptions :: new ( )
313341 . write ( true )
314342 . truncate ( true )
@@ -317,4 +345,27 @@ mod test {
317345 . expect ( "Truncate block" ) ;
318346 assert ! ( !blockdir. contains( & hash) . unwrap( ) ) ;
319347 }
348+
349+ #[ test]
350+ fn cache_hit ( ) {
351+ let tempdir = TempDir :: new ( ) . unwrap ( ) ;
352+ let blockdir = BlockDir :: open ( open_local_transport ( tempdir. path ( ) ) . unwrap ( ) ) ;
353+ let mut stats = BackupStats :: default ( ) ;
354+ let content = Bytes :: from ( "stuff" ) ;
355+ let hash = blockdir
356+ . store_or_deduplicate ( content. clone ( ) , & mut stats)
357+ . unwrap ( ) ;
358+ assert_eq ! ( blockdir. stats. cache_hit. load( Relaxed ) , 0 ) ;
359+
360+ assert ! ( blockdir. contains( & hash) . unwrap( ) ) ;
361+ assert_eq ! ( blockdir. stats. cache_hit. load( Relaxed ) , 1 ) ;
362+
363+ let retrieved = blockdir. get_block_content ( & hash) . unwrap ( ) ;
364+ assert_eq ! ( content, retrieved) ;
365+ assert_eq ! ( blockdir. stats. cache_hit. load( Relaxed ) , 2 ) ; // hit against the value written
366+
367+ let retrieved = blockdir. get_block_content ( & hash) . unwrap ( ) ;
368+ assert_eq ! ( content, retrieved) ;
369+ assert_eq ! ( blockdir. stats. cache_hit. load( Relaxed ) , 3 ) ; // hit again
370+ }
320371}
0 commit comments