Skip to content

Commit ddb8e7e

Browse files
authored
Fix LocalFS stream listing (#321)
* add ignore directories * Check for stream.json in s3 Fixes #318
1 parent eac2d3e commit ddb8e7e

File tree

3 files changed

+79
-29
lines changed

3 files changed

+79
-29
lines changed

server/src/storage/localfs.rs

Lines changed: 50 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -34,15 +34,15 @@ use datafusion::{
3434
execution::runtime_env::{RuntimeConfig, RuntimeEnv},
3535
};
3636
use fs_extra::file::{move_file, CopyOptions};
37-
use futures::StreamExt;
37+
use futures::{stream::FuturesUnordered, TryStreamExt};
3838
use relative_path::RelativePath;
39-
use tokio::fs;
39+
use tokio::fs::{self, DirEntry};
4040
use tokio_stream::wrappers::ReadDirStream;
4141

4242
use crate::metrics::storage::{localfs::REQUEST_RESPONSE_TIME, StorageMetrics};
4343
use crate::{option::validation, utils::validate_path_is_writeable};
4444

45-
use super::{LogStream, ObjectStorage, ObjectStorageError, ObjectStorageProvider};
45+
use super::{object_storage, LogStream, ObjectStorage, ObjectStorageError, ObjectStorageProvider};
4646

4747
#[derive(Debug, Clone, clap::Args)]
4848
#[command(
@@ -152,29 +152,25 @@ impl ObjectStorage for LocalFS {
152152
let path = self.root.join(stream_name);
153153
Ok(fs::remove_dir_all(path).await?)
154154
}
155+
155156
async fn list_streams(&self) -> Result<Vec<LogStream>, ObjectStorageError> {
157+
let ignore_dir = &["lost+found"];
156158
let directories = ReadDirStream::new(fs::read_dir(&self.root).await?);
157-
let directories = directories
158-
.filter_map(|res| async {
159-
let entry = res.ok()?;
160-
if entry.file_type().await.ok()?.is_dir() {
161-
Some(LogStream {
162-
name: entry
163-
.path()
164-
.file_name()
165-
.expect("valid path")
166-
.to_str()
167-
.expect("valid unicode")
168-
.to_string(),
169-
})
170-
} else {
171-
None
172-
}
173-
})
174-
.collect::<Vec<LogStream>>()
175-
.await;
159+
let entries: Vec<DirEntry> = directories.try_collect().await?;
160+
let entries = entries
161+
.into_iter()
162+
.map(|entry| dir_with_stream(entry, ignore_dir));
163+
164+
let logstream_dirs: Vec<Option<String>> =
165+
FuturesUnordered::from_iter(entries).try_collect().await?;
166+
167+
let logstreams = logstream_dirs
168+
.into_iter()
169+
.flatten()
170+
.map(|name| LogStream { name })
171+
.collect();
176172

177-
Ok(directories)
173+
Ok(logstreams)
178174
}
179175

180176
async fn upload_file(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError> {
@@ -228,6 +224,37 @@ impl ObjectStorage for LocalFS {
228224
}
229225
}
230226

227+
async fn dir_with_stream(
228+
entry: DirEntry,
229+
ignore_dirs: &[&str],
230+
) -> Result<Option<String>, ObjectStorageError> {
231+
let dir_name = entry
232+
.path()
233+
.file_name()
234+
.expect("valid path")
235+
.to_str()
236+
.expect("valid unicode")
237+
.to_owned();
238+
239+
if ignore_dirs.contains(&dir_name.as_str()) {
240+
return Ok(None);
241+
}
242+
243+
if entry.file_type().await?.is_dir() {
244+
let path = entry.path();
245+
let stream_json_path = path.join(object_storage::STREAM_METADATA_FILE_NAME);
246+
if stream_json_path.exists() {
247+
Ok(Some(dir_name))
248+
} else {
249+
let err: Box<dyn std::error::Error + Send + Sync + 'static> =
250+
format!("found {}", entry.path().display()).into();
251+
Err(ObjectStorageError::UnhandledError(err))
252+
}
253+
} else {
254+
Ok(None)
255+
}
256+
}
257+
231258
impl From<fs_extra::error::Error> for ObjectStorageError {
232259
fn from(e: fs_extra::error::Error) -> Self {
233260
ObjectStorageError::UnhandledError(Box::new(e))

server/src/storage/object_storage.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ use std::{
5757
};
5858

5959
// metadata file names in a Stream prefix
60-
const STREAM_METADATA_FILE_NAME: &str = ".stream.json";
60+
pub(super) const STREAM_METADATA_FILE_NAME: &str = ".stream.json";
6161
pub(super) const PARSEABLE_METADATA_FILE_NAME: &str = ".parseable.json";
6262
const SCHEMA_FILE_NAME: &str = ".schema";
6363
const ALERT_FILE_NAME: &str = ".alert.json";

server/src/storage/s3.rs

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,9 @@ use datafusion::datasource::listing::{
3636
use datafusion::datasource::object_store::ObjectStoreRegistry;
3737
use datafusion::error::DataFusionError;
3838
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
39-
use futures::StreamExt;
39+
use futures::stream::FuturesUnordered;
40+
use futures::{StreamExt, TryStreamExt};
41+
use itertools::Itertools;
4042
use md5::{Digest, Md5};
4143
use object_store::aws::AmazonS3Builder;
4244
use object_store::limit::LimitStore;
@@ -50,7 +52,7 @@ use std::time::Instant;
5052
use crate::metrics::storage::{s3::REQUEST_RESPONSE_TIME, StorageMetrics};
5153
use crate::storage::{LogStream, ObjectStorage, ObjectStorageError};
5254

53-
use super::ObjectStorageProvider;
55+
use super::{object_storage, ObjectStorageProvider};
5456

5557
#[derive(Debug, Clone, clap::Args)]
5658
#[command(
@@ -265,15 +267,36 @@ impl S3 {
265267
let common_prefixes = resp.common_prefixes().unwrap_or_default();
266268

267269
// return prefixes at the root level
268-
let logstreams: Vec<_> = common_prefixes
270+
let dirs: Vec<_> = common_prefixes
269271
.iter()
270272
.filter_map(CommonPrefix::prefix)
271273
.filter_map(|name| name.strip_suffix('/'))
272274
.map(String::from)
273-
.map(|name| LogStream { name })
274275
.collect();
275276

276-
Ok(logstreams)
277+
let stream_json_check = FuturesUnordered::new();
278+
279+
for dir in &dirs {
280+
let key = format!("{}/{}", dir, object_storage::STREAM_METADATA_FILE_NAME);
281+
let task = async move {
282+
self.client
283+
.head_object()
284+
.bucket(&self.bucket)
285+
.key(key)
286+
.send()
287+
.await
288+
.map(|_| ())
289+
};
290+
291+
stream_json_check.push(task);
292+
}
293+
294+
stream_json_check.try_collect().await?;
295+
296+
Ok(dirs
297+
.into_iter()
298+
.map(|name| LogStream { name })
299+
.collect_vec())
277300
}
278301

279302
async fn _upload_file(

0 commit comments

Comments
 (0)