Skip to content
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 54 additions & 24 deletions lightning-persister/src/fs_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ fn path_to_windows_str<T: AsRef<OsStr>>(path: &T) -> Vec<u16> {
// The number of read/write/remove/list operations after which we clean up our `locks` HashMap.
const GC_LOCK_INTERVAL: usize = 25;

// The number of times we retry listing keys in `FilesystemStore::list` before we give up reaching
// a consistent view and error out.
const LIST_DIR_CONSISTENCY_RETRIES: usize = 10;

/// A [`KVStoreSync`] implementation that writes to and reads from the file system.
pub struct FilesystemStore {
data_dir: PathBuf,
Expand Down Expand Up @@ -306,23 +310,45 @@ impl KVStoreSync for FilesystemStore {
check_namespace_key_validity(primary_namespace, secondary_namespace, None, "list")?;

let prefixed_dest = self.get_dest_dir_path(primary_namespace, secondary_namespace)?;
let mut keys = Vec::new();

if !Path::new(&prefixed_dest).exists() {
return Ok(Vec::new());
}

for entry in fs::read_dir(&prefixed_dest)? {
let entry = entry?;
let p = entry.path();
let mut keys;
let mut retries = LIST_DIR_CONSISTENCY_RETRIES;

if !dir_entry_is_key(&p)? {
continue;
}
'retry_list: loop {
keys = Vec::new();
'skip_entry: for entry in fs::read_dir(&prefixed_dest)? {
let entry = entry?;
let p = entry.path();

let key = get_key_from_dir_entry(&p, &prefixed_dest)?;

keys.push(key);
let res = dir_entry_is_key(&entry);
match res {
Ok(true) => {
let key = get_key_from_dir_entry_path(&p, &prefixed_dest)?;
keys.push(key);
},
Ok(false) => {
// We didn't error, but the entry is not a valid key (e.g., a directory,
// or a temp file).
continue 'skip_entry;
},
Err(e) => {
if e.kind() == lightning::io::ErrorKind::NotFound && retries > 0 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is right because the dir_entry_is_key metadata() call gets its error mapped to an ErrorKind::Other.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, good catch, I had forgotten we started to do that at some point. Now added a commit reverting this, as remapping to ErrorKind::Other is nonsense, IMO, if we already have stronger typed io::Error.

// We had found the entry in `read_dir` above, so some race happend.
// Retry the `read_dir` to get a consistent view.
retries -= 1;
continue 'retry_list;
} else {
// For all errors or if we exhausted retries, bubble up.
return Err(e.into());
}
},
}
}
break 'retry_list;
}

self.garbage_collect_locks();
Expand All @@ -331,7 +357,8 @@ impl KVStoreSync for FilesystemStore {
}
}

fn dir_entry_is_key(p: &Path) -> Result<bool, lightning::io::Error> {
fn dir_entry_is_key(dir_entry: &fs::DirEntry) -> Result<bool, lightning::io::Error> {
let p = dir_entry.path();
if let Some(ext) = p.extension() {
#[cfg(target_os = "windows")]
{
Expand All @@ -346,7 +373,7 @@ fn dir_entry_is_key(p: &Path) -> Result<bool, lightning::io::Error> {
}
}

let metadata = p.metadata().map_err(|e| {
let metadata = dir_entry.metadata().map_err(|e| {
let msg = format!(
"Failed to list keys at path {}: {}",
PrintableString(p.to_str().unwrap_or_default()),
Expand Down Expand Up @@ -377,7 +404,7 @@ fn dir_entry_is_key(p: &Path) -> Result<bool, lightning::io::Error> {
Ok(true)
}

fn get_key_from_dir_entry(p: &Path, base_path: &Path) -> Result<String, lightning::io::Error> {
fn get_key_from_dir_entry_path(p: &Path, base_path: &Path) -> Result<String, lightning::io::Error> {
match p.strip_prefix(&base_path) {
Ok(stripped_path) => {
if let Some(relative_path) = stripped_path.to_str() {
Expand Down Expand Up @@ -435,24 +462,27 @@ impl MigratableKVStore for FilesystemStore {
let mut keys = Vec::new();

'primary_loop: for primary_entry in fs::read_dir(prefixed_dest)? {
let primary_path = primary_entry?.path();
let primary_entry = primary_entry?;
let primary_path = primary_entry.path();

if dir_entry_is_key(&primary_path)? {
if dir_entry_is_key(&primary_entry)? {
let primary_namespace = String::new();
let secondary_namespace = String::new();
let key = get_key_from_dir_entry(&primary_path, prefixed_dest)?;
let key = get_key_from_dir_entry_path(&primary_path, prefixed_dest)?;
keys.push((primary_namespace, secondary_namespace, key));
continue 'primary_loop;
}

// The primary_entry is actually also a directory.
'secondary_loop: for secondary_entry in fs::read_dir(&primary_path)? {
let secondary_path = secondary_entry?.path();
let secondary_entry = secondary_entry?;
let secondary_path = secondary_entry.path();

if dir_entry_is_key(&secondary_path)? {
let primary_namespace = get_key_from_dir_entry(&primary_path, prefixed_dest)?;
if dir_entry_is_key(&secondary_entry)? {
let primary_namespace =
get_key_from_dir_entry_path(&primary_path, prefixed_dest)?;
let secondary_namespace = String::new();
let key = get_key_from_dir_entry(&secondary_path, &primary_path)?;
let key = get_key_from_dir_entry_path(&secondary_path, &primary_path)?;
keys.push((primary_namespace, secondary_namespace, key));
continue 'secondary_loop;
}
Expand All @@ -462,12 +492,12 @@ impl MigratableKVStore for FilesystemStore {
let tertiary_entry = tertiary_entry?;
let tertiary_path = tertiary_entry.path();

if dir_entry_is_key(&tertiary_path)? {
if dir_entry_is_key(&tertiary_entry)? {
let primary_namespace =
get_key_from_dir_entry(&primary_path, prefixed_dest)?;
get_key_from_dir_entry_path(&primary_path, prefixed_dest)?;
let secondary_namespace =
get_key_from_dir_entry(&secondary_path, &primary_path)?;
let key = get_key_from_dir_entry(&tertiary_path, &secondary_path)?;
get_key_from_dir_entry_path(&secondary_path, &primary_path)?;
let key = get_key_from_dir_entry_path(&tertiary_path, &secondary_path)?;
keys.push((primary_namespace, secondary_namespace, key));
} else {
debug_assert!(
Expand Down
Loading