@@ -38,7 +38,10 @@ pub struct FilesystemStore {
3838 data_dir : PathBuf ,
3939 tmp_file_counter : AtomicUsize ,
4040 gc_counter : AtomicUsize ,
41- locks : Mutex < HashMap < PathBuf , Arc < RwLock < ( ) > > > > ,
41+
42+ // Per path lock that ensures that we don't have concurrent writes to the same file. The lock also encapsulates the
43+ // latest written version per key.
44+ locks : Mutex < HashMap < PathBuf , Arc < RwLock < HashMap < String , u64 > > > > > ,
4245}
4346
4447impl FilesystemStore {
@@ -90,36 +93,12 @@ impl FilesystemStore {
9093
9194 Ok ( dest_dir_path)
9295 }
93- }
9496
95- impl KVStoreSync for FilesystemStore {
96- fn read (
97- & self , primary_namespace : & str , secondary_namespace : & str , key : & str ,
98- ) -> lightning:: io:: Result < Vec < u8 > > {
99- check_namespace_key_validity ( primary_namespace, secondary_namespace, Some ( key) , "read" ) ?;
100-
101- let mut dest_file_path = self . get_dest_dir_path ( primary_namespace, secondary_namespace) ?;
102- dest_file_path. push ( key) ;
103-
104- let mut buf = Vec :: new ( ) ;
105- {
106- let inner_lock_ref = {
107- let mut outer_lock = self . locks . lock ( ) . unwrap ( ) ;
108- Arc :: clone ( & outer_lock. entry ( dest_file_path. clone ( ) ) . or_default ( ) )
109- } ;
110- let _guard = inner_lock_ref. read ( ) . unwrap ( ) ;
111-
112- let mut f = fs:: File :: open ( dest_file_path) ?;
113- f. read_to_end ( & mut buf) ?;
114- }
115-
116- self . garbage_collect_locks ( ) ;
117-
118- Ok ( buf)
119- }
120-
121- fn write (
97+ /// Writes a specific version of a key to the filesystem. If a newer version has been written already, this function
98+ /// returns early without writing.
99+ pub ( crate ) fn write_version (
122100 & self , primary_namespace : & str , secondary_namespace : & str , key : & str , buf : & [ u8 ] ,
101+ version : Option < u64 > ,
123102 ) -> lightning:: io:: Result < ( ) > {
124103 check_namespace_key_validity ( primary_namespace, secondary_namespace, Some ( key) , "write" ) ?;
125104
@@ -153,7 +132,24 @@ impl KVStoreSync for FilesystemStore {
153132 let mut outer_lock = self . locks . lock ( ) . unwrap ( ) ;
154133 Arc :: clone ( & outer_lock. entry ( dest_file_path. clone ( ) ) . or_default ( ) )
155134 } ;
156- let _guard = inner_lock_ref. write ( ) . unwrap ( ) ;
135+ let mut guard = inner_lock_ref. write ( ) . unwrap ( ) ;
136+
137+ // If a version is provided, we check if we already have a newer version written. This is used in async
138+ // contexts to realize eventual consistency.
139+ if let Some ( version) = version {
140+ match guard. entry ( key. to_string ( ) ) {
141+ std:: collections:: hash_map:: Entry :: Vacant ( e) => {
142+ e. insert ( version) ;
143+ } ,
144+ std:: collections:: hash_map:: Entry :: Occupied ( mut e) => {
145+ if version <= * e. get ( ) {
146+ // If the version is not greater, we don't write the file.
147+ return Ok ( ( ) ) ;
148+ }
149+ e. insert ( version) ;
150+ } ,
151+ }
152+ }
157153
158154 #[ cfg( not( target_os = "windows" ) ) ]
159155 {
@@ -204,6 +200,39 @@ impl KVStoreSync for FilesystemStore {
204200
205201 res
206202 }
203+ }
204+
205+ impl KVStoreSync for FilesystemStore {
206+ fn read (
207+ & self , primary_namespace : & str , secondary_namespace : & str , key : & str ,
208+ ) -> lightning:: io:: Result < Vec < u8 > > {
209+ check_namespace_key_validity ( primary_namespace, secondary_namespace, Some ( key) , "read" ) ?;
210+
211+ let mut dest_file_path = self . get_dest_dir_path ( primary_namespace, secondary_namespace) ?;
212+ dest_file_path. push ( key) ;
213+
214+ let mut buf = Vec :: new ( ) ;
215+ {
216+ let inner_lock_ref = {
217+ let mut outer_lock = self . locks . lock ( ) . unwrap ( ) ;
218+ Arc :: clone ( & outer_lock. entry ( dest_file_path. clone ( ) ) . or_default ( ) )
219+ } ;
220+ let _guard = inner_lock_ref. read ( ) . unwrap ( ) ;
221+
222+ let mut f = fs:: File :: open ( dest_file_path) ?;
223+ f. read_to_end ( & mut buf) ?;
224+ }
225+
226+ self . garbage_collect_locks ( ) ;
227+
228+ Ok ( buf)
229+ }
230+
231+ fn write (
232+ & self , primary_namespace : & str , secondary_namespace : & str , key : & str , buf : & [ u8 ] ,
233+ ) -> lightning:: io:: Result < ( ) > {
234+ self . write_version ( primary_namespace, secondary_namespace, key, buf, None )
235+ }
207236
208237 fn remove (
209238 & self , primary_namespace : & str , secondary_namespace : & str , key : & str , lazy : bool ,
0 commit comments