Skip to content

Commit 8f2266e

Browse files
benthecarmanclaude
andcommitted
Add FilesystemStoreV2 with paginated listing support
Implements PaginatedKVStore traits with timestamp-prefixed filenames for newest-first pagination and [empty] directory markers for consistent namespace hierarchy. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 3269813 commit 8f2266e

File tree

4 files changed

+796
-43
lines changed

4 files changed

+796
-43
lines changed

lightning-persister/src/fs_store/common.rs

Lines changed: 135 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
1-
//! Common utilities shared between [`FilesystemStore`].
1+
//! Common utilities shared between [`FilesystemStore`] and [`FilesystemStoreV2`] implementations.
22
//!
33
//! [`FilesystemStore`]: crate::fs_store::v1::FilesystemStore
4+
//! [`FilesystemStoreV2`]: crate::fs_store::v2::FilesystemStoreV2
45
56
use crate::utils::{check_namespace_key_validity, is_valid_kvstore_str};
67

@@ -45,6 +46,11 @@ fn path_to_windows_str<T: AsRef<OsStr>>(path: &T) -> Vec<u16> {
4546
// a consistent view and error out.
4647
const LIST_DIR_CONSISTENCY_RETRIES: usize = 10;
4748

49+
// The directory name used for empty namespaces in v2.
50+
// Uses brackets which are not in KVSTORE_NAMESPACE_KEY_ALPHABET, preventing collisions
51+
// with valid namespace names.
52+
pub(crate) const EMPTY_NAMESPACE_DIR: &str = "[empty]";
53+
4854
/// Inner state shared between sync and async operations for filesystem stores.
4955
///
5056
/// This struct manages the data directory, temporary file counter, and per-path locks
@@ -103,6 +109,19 @@ impl FilesystemStoreState {
103109
let outer_lock = self.inner.locks.lock().unwrap();
104110
outer_lock.len()
105111
}
112+
113+
pub(crate) fn get_checked_dest_file_path(
114+
&self, primary_namespace: &str, secondary_namespace: &str, key: Option<&str>,
115+
operation: &str, use_empty_ns_dir: bool,
116+
) -> lightning::io::Result<PathBuf> {
117+
self.inner.get_checked_dest_file_path(
118+
primary_namespace,
119+
secondary_namespace,
120+
key,
121+
operation,
122+
use_empty_ns_dir,
123+
)
124+
}
106125
}
107126

108127
impl FilesystemStoreInner {
@@ -112,7 +131,7 @@ impl FilesystemStoreInner {
112131
}
113132

114133
fn get_dest_dir_path(
115-
&self, primary_namespace: &str, secondary_namespace: &str,
134+
&self, primary_namespace: &str, secondary_namespace: &str, use_empty_ns_dir: bool,
116135
) -> std::io::Result<PathBuf> {
117136
let mut dest_dir_path = {
118137
#[cfg(target_os = "windows")]
@@ -127,21 +146,35 @@ impl FilesystemStoreInner {
127146
}
128147
};
129148

130-
dest_dir_path.push(primary_namespace);
131-
if !secondary_namespace.is_empty() {
132-
dest_dir_path.push(secondary_namespace);
149+
if use_empty_ns_dir {
150+
dest_dir_path.push(if primary_namespace.is_empty() {
151+
EMPTY_NAMESPACE_DIR
152+
} else {
153+
primary_namespace
154+
});
155+
dest_dir_path.push(if secondary_namespace.is_empty() {
156+
EMPTY_NAMESPACE_DIR
157+
} else {
158+
secondary_namespace
159+
});
160+
} else {
161+
dest_dir_path.push(primary_namespace);
162+
if !secondary_namespace.is_empty() {
163+
dest_dir_path.push(secondary_namespace);
164+
}
133165
}
134166

135167
Ok(dest_dir_path)
136168
}
137169

138170
fn get_checked_dest_file_path(
139171
&self, primary_namespace: &str, secondary_namespace: &str, key: Option<&str>,
140-
operation: &str,
172+
operation: &str, use_empty_ns_dir: bool,
141173
) -> lightning::io::Result<PathBuf> {
142174
check_namespace_key_validity(primary_namespace, secondary_namespace, key, operation)?;
143175

144-
let mut dest_file_path = self.get_dest_dir_path(primary_namespace, secondary_namespace)?;
176+
let mut dest_file_path =
177+
self.get_dest_dir_path(primary_namespace, secondary_namespace, use_empty_ns_dir)?;
145178
if let Some(key) = key {
146179
dest_file_path.push(key);
147180
}
@@ -217,8 +250,13 @@ impl FilesystemStoreInner {
217250
/// returns early without writing.
218251
fn write_version(
219252
&self, inner_lock_ref: Arc<RwLock<u64>>, dest_file_path: PathBuf, buf: Vec<u8>,
220-
version: u64,
253+
version: u64, preserve_mtime: bool,
221254
) -> lightning::io::Result<()> {
255+
let mtime = if preserve_mtime {
256+
fs::metadata(&dest_file_path).ok().and_then(|m| m.modified().ok())
257+
} else {
258+
None
259+
};
222260
let parent_directory = dest_file_path.parent().ok_or_else(|| {
223261
let msg =
224262
format!("Could not retrieve parent directory of {}.", dest_file_path.display());
@@ -238,6 +276,13 @@ impl FilesystemStoreInner {
238276
{
239277
let mut tmp_file = fs::File::create(&tmp_file_path)?;
240278
tmp_file.write_all(&buf)?;
279+
280+
// If we need to preserve the original mtime (for updates), set it before fsync.
281+
if let Some(mtime) = mtime {
282+
let times = fs::FileTimes::new().set_modified(mtime);
283+
tmp_file.set_times(times)?;
284+
}
285+
241286
tmp_file.sync_all()?;
242287
}
243288

@@ -370,13 +415,13 @@ impl FilesystemStoreInner {
370415
})
371416
}
372417

373-
fn list(&self, prefixed_dest: PathBuf) -> lightning::io::Result<Vec<String>> {
418+
fn list(&self, prefixed_dest: PathBuf, is_v2: bool) -> lightning::io::Result<Vec<String>> {
374419
if !Path::new(&prefixed_dest).exists() {
375420
return Ok(Vec::new());
376421
}
377422

378423
let mut keys;
379-
let mut retries = LIST_DIR_CONSISTENCY_RETRIES;
424+
let mut retries = if is_v2 { 0 } else { LIST_DIR_CONSISTENCY_RETRIES };
380425

381426
'retry_list: loop {
382427
keys = Vec::new();
@@ -387,7 +432,7 @@ impl FilesystemStoreInner {
387432
let res = dir_entry_is_key(&entry);
388433
match res {
389434
Ok(true) => {
390-
let key = get_key_from_dir_entry_path(&p, &prefixed_dest)?;
435+
let key = get_key_from_dir_entry_path(&p, &prefixed_dest, false)?;
391436
keys.push(key);
392437
},
393438
Ok(false) => {
@@ -396,6 +441,14 @@ impl FilesystemStoreInner {
396441
continue 'skip_entry;
397442
},
398443
Err(e) => {
444+
// In version 2 if a file has been deleted between the `read_dir` and our attempt
445+
// to access it, we should just add it to the list to give a more consistent view.
446+
if is_v2 {
447+
let key = get_key_from_dir_entry_path(&p, &prefixed_dest, false)?;
448+
keys.push(key);
449+
continue 'skip_entry;
450+
}
451+
399452
if e.kind() == lightning::io::ErrorKind::NotFound && retries > 0 {
400453
// We had found the entry in `read_dir` above, so some race happend.
401454
// Retry the `read_dir` to get a consistent view.
@@ -418,64 +471,73 @@ impl FilesystemStoreInner {
418471
impl FilesystemStoreState {
419472
pub(crate) fn read_impl(
420473
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
474+
use_empty_ns_dir: bool,
421475
) -> Result<Vec<u8>, lightning::io::Error> {
422476
let path = self.inner.get_checked_dest_file_path(
423477
primary_namespace,
424478
secondary_namespace,
425479
Some(key),
426480
"read",
481+
use_empty_ns_dir,
427482
)?;
428483
self.inner.read(path)
429484
}
430485

431486
pub(crate) fn write_impl(
432487
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
488+
use_empty_ns_dir: bool,
433489
) -> Result<(), lightning::io::Error> {
434490
let path = self.inner.get_checked_dest_file_path(
435491
primary_namespace,
436492
secondary_namespace,
437493
Some(key),
438494
"write",
495+
use_empty_ns_dir,
439496
)?;
440497
let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(path.clone());
441-
self.inner.write_version(inner_lock_ref, path, buf, version)
498+
self.inner.write_version(inner_lock_ref, path, buf, version, use_empty_ns_dir)
442499
}
443500

444501
pub(crate) fn remove_impl(
445502
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
503+
use_empty_ns_dir: bool,
446504
) -> Result<(), lightning::io::Error> {
447505
let path = self.inner.get_checked_dest_file_path(
448506
primary_namespace,
449507
secondary_namespace,
450508
Some(key),
451509
"remove",
510+
use_empty_ns_dir,
452511
)?;
453512
let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(path.clone());
454513
self.inner.remove_version(inner_lock_ref, path, lazy, version)
455514
}
456515

457516
pub(crate) fn list_impl(
458-
&self, primary_namespace: &str, secondary_namespace: &str,
517+
&self, primary_namespace: &str, secondary_namespace: &str, use_empty_ns_dir: bool,
459518
) -> Result<Vec<String>, lightning::io::Error> {
460519
let path = self.inner.get_checked_dest_file_path(
461520
primary_namespace,
462521
secondary_namespace,
463522
None,
464523
"list",
524+
use_empty_ns_dir,
465525
)?;
466-
self.inner.list(path)
526+
self.inner.list(path, use_empty_ns_dir)
467527
}
468528

469529
#[cfg(feature = "tokio")]
470530
pub(crate) fn read_async(
471531
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
532+
use_empty_ns_dir: bool,
472533
) -> impl Future<Output = Result<Vec<u8>, lightning::io::Error>> + 'static + Send {
473534
let this = Arc::clone(&self.inner);
474535
let path = this.get_checked_dest_file_path(
475536
primary_namespace,
476537
secondary_namespace,
477538
Some(key),
478539
"read",
540+
use_empty_ns_dir,
479541
);
480542

481543
async move {
@@ -492,10 +554,17 @@ impl FilesystemStoreState {
492554
#[cfg(feature = "tokio")]
493555
pub(crate) fn write_async(
494556
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
557+
use_empty_ns_dir: bool,
495558
) -> impl Future<Output = Result<(), lightning::io::Error>> + 'static + Send {
496559
let this = Arc::clone(&self.inner);
497560
let path = this
498-
.get_checked_dest_file_path(primary_namespace, secondary_namespace, Some(key), "write")
561+
.get_checked_dest_file_path(
562+
primary_namespace,
563+
secondary_namespace,
564+
Some(key),
565+
"write",
566+
use_empty_ns_dir,
567+
)
499568
.map(|path| (self.get_new_version_and_lock_ref(path.clone()), path));
500569

501570
async move {
@@ -504,7 +573,7 @@ impl FilesystemStoreState {
504573
Err(e) => return Err(e),
505574
};
506575
tokio::task::spawn_blocking(move || {
507-
this.write_version(inner_lock_ref, path, buf, version)
576+
this.write_version(inner_lock_ref, path, buf, version, use_empty_ns_dir)
508577
})
509578
.await
510579
.unwrap_or_else(|e| Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e)))
@@ -514,10 +583,17 @@ impl FilesystemStoreState {
514583
#[cfg(feature = "tokio")]
515584
pub(crate) fn remove_async(
516585
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
586+
use_empty_ns_dir: bool,
517587
) -> impl Future<Output = Result<(), lightning::io::Error>> + 'static + Send {
518588
let this = Arc::clone(&self.inner);
519589
let path = this
520-
.get_checked_dest_file_path(primary_namespace, secondary_namespace, Some(key), "remove")
590+
.get_checked_dest_file_path(
591+
primary_namespace,
592+
secondary_namespace,
593+
Some(key),
594+
"remove",
595+
use_empty_ns_dir,
596+
)
521597
.map(|path| (self.get_new_version_and_lock_ref(path.clone()), path));
522598

523599
async move {
@@ -535,26 +611,33 @@ impl FilesystemStoreState {
535611

536612
#[cfg(feature = "tokio")]
537613
pub(crate) fn list_async(
538-
&self, primary_namespace: &str, secondary_namespace: &str,
614+
&self, primary_namespace: &str, secondary_namespace: &str, use_empty_ns_dir: bool,
539615
) -> impl Future<Output = Result<Vec<String>, lightning::io::Error>> + 'static + Send {
540616
let this = Arc::clone(&self.inner);
541617

542-
let path =
543-
this.get_checked_dest_file_path(primary_namespace, secondary_namespace, None, "list");
618+
let path = this.get_checked_dest_file_path(
619+
primary_namespace,
620+
secondary_namespace,
621+
None,
622+
"list",
623+
use_empty_ns_dir,
624+
);
544625

545626
async move {
546627
let path = match path {
547628
Ok(path) => path,
548629
Err(e) => return Err(e),
549630
};
550-
tokio::task::spawn_blocking(move || this.list(path)).await.unwrap_or_else(|e| {
551-
Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e))
552-
})
631+
tokio::task::spawn_blocking(move || this.list(path, use_empty_ns_dir))
632+
.await
633+
.unwrap_or_else(|e| {
634+
Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e))
635+
})
553636
}
554637
}
555638

556639
pub(crate) fn list_all_keys_impl(
557-
&self,
640+
&self, use_empty_ns_dir: bool,
558641
) -> Result<Vec<(String, String, String)>, lightning::io::Error> {
559642
let prefixed_dest = &self.inner.data_dir;
560643
if !prefixed_dest.exists() {
@@ -570,7 +653,7 @@ impl FilesystemStoreState {
570653
if dir_entry_is_key(&primary_entry)? {
571654
let primary_namespace = String::new();
572655
let secondary_namespace = String::new();
573-
let key = get_key_from_dir_entry_path(&primary_path, prefixed_dest)?;
656+
let key = get_key_from_dir_entry_path(&primary_path, prefixed_dest, false)?;
574657
keys.push((primary_namespace, secondary_namespace, key));
575658
continue 'primary_loop;
576659
}
@@ -581,10 +664,13 @@ impl FilesystemStoreState {
581664
let secondary_path = secondary_entry.path();
582665

583666
if dir_entry_is_key(&secondary_entry)? {
584-
let primary_namespace =
585-
get_key_from_dir_entry_path(&primary_path, prefixed_dest)?;
667+
let primary_namespace = get_key_from_dir_entry_path(
668+
&primary_path,
669+
prefixed_dest,
670+
use_empty_ns_dir,
671+
)?;
586672
let secondary_namespace = String::new();
587-
let key = get_key_from_dir_entry_path(&secondary_path, &primary_path)?;
673+
let key = get_key_from_dir_entry_path(&secondary_path, &primary_path, false)?;
588674
keys.push((primary_namespace, secondary_namespace, key));
589675
continue 'secondary_loop;
590676
}
@@ -595,11 +681,18 @@ impl FilesystemStoreState {
595681
let tertiary_path = tertiary_entry.path();
596682

597683
if dir_entry_is_key(&tertiary_entry)? {
598-
let primary_namespace =
599-
get_key_from_dir_entry_path(&primary_path, prefixed_dest)?;
600-
let secondary_namespace =
601-
get_key_from_dir_entry_path(&secondary_path, &primary_path)?;
602-
let key = get_key_from_dir_entry_path(&tertiary_path, &secondary_path)?;
684+
let primary_namespace = get_key_from_dir_entry_path(
685+
&primary_path,
686+
prefixed_dest,
687+
use_empty_ns_dir,
688+
)?;
689+
let secondary_namespace = get_key_from_dir_entry_path(
690+
&secondary_path,
691+
&primary_path,
692+
use_empty_ns_dir,
693+
)?;
694+
let key =
695+
get_key_from_dir_entry_path(&tertiary_path, &secondary_path, false)?;
603696
keys.push((primary_namespace, secondary_namespace, key));
604697
} else {
605698
debug_assert!(
@@ -663,10 +756,18 @@ fn dir_entry_is_key(dir_entry: &fs::DirEntry) -> Result<bool, lightning::io::Err
663756
Ok(true)
664757
}
665758

666-
fn get_key_from_dir_entry_path(p: &Path, base_path: &Path) -> Result<String, lightning::io::Error> {
759+
/// Gets the key from a directory entry path by stripping the base path and validating the result.
760+
/// If `map_empty_ns_dir` is true, treats entries with the name of `EMPTY_NAMESPACE_DIR` as an empty string.
761+
/// `map_empty_ns_dir` should always be false when reading keys and only be true when listing namespaces.
762+
pub(crate) fn get_key_from_dir_entry_path(
763+
p: &Path, base_path: &Path, map_empty_ns_dir: bool,
764+
) -> Result<String, lightning::io::Error> {
667765
match p.strip_prefix(&base_path) {
668766
Ok(stripped_path) => {
669767
if let Some(relative_path) = stripped_path.to_str() {
768+
if map_empty_ns_dir && relative_path == EMPTY_NAMESPACE_DIR {
769+
return Ok(String::new());
770+
}
670771
if is_valid_kvstore_str(relative_path) {
671772
return Ok(relative_path.to_string());
672773
} else {

0 commit comments

Comments
 (0)