Skip to content

Commit 98916c4

Browse files
committed
updated stream check for local file storage
1 parent 23f1b84 commit 98916c4

File tree

4 files changed

+9
-10
lines changed

4 files changed

+9
-10
lines changed

src/handlers/http/correlation.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use bytes::Bytes;
2121
use relative_path::RelativePathBuf;
2222

2323
use crate::{
24-
option::CONFIG, storage::CORRELATION_DIRECTORY, utils::actix::extract_session_key_from_req,
24+
option::CONFIG, storage::CORRELATION_ROOT_DIRECTORY, utils::actix::extract_session_key_from_req,
2525
};
2626

2727
use crate::correlation::{
@@ -125,7 +125,7 @@ pub async fn delete(req: HttpRequest) -> Result<impl Responder, CorrelationError
125125

126126
// Delete from disk
127127
let store = CONFIG.storage().get_object_store();
128-
let path = RelativePathBuf::from_iter([CORRELATION_DIRECTORY, &correlation.id.to_string()]);
128+
let path = RelativePathBuf::from_iter([CORRELATION_ROOT_DIRECTORY, &correlation.id.to_string()]);
129129
store.delete_object(&path).await?;
130130

131131
// Delete from memory

src/storage/localfs.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,7 @@ use crate::{
3939
};
4040

4141
use super::{
42-
LogStream, ObjectStorage, ObjectStorageError, ObjectStorageProvider, PARSEABLE_ROOT_DIRECTORY,
43-
SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY,
42+
LogStream, ObjectStorage, ObjectStorageError, ObjectStorageProvider, CORRELATION_ROOT_DIRECTORY, PARSEABLE_ROOT_DIRECTORY, SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY
4443
};
4544

4645
#[derive(Debug, Clone, clap::Args)]
@@ -295,7 +294,7 @@ impl ObjectStorage for LocalFS {
295294
}
296295

297296
async fn list_streams(&self) -> Result<Vec<LogStream>, ObjectStorageError> {
298-
let ignore_dir = &["lost+found", PARSEABLE_ROOT_DIRECTORY, USERS_ROOT_DIR];
297+
let ignore_dir = &["lost+found", PARSEABLE_ROOT_DIRECTORY, USERS_ROOT_DIR, CORRELATION_ROOT_DIRECTORY];
299298
let directories = ReadDirStream::new(fs::read_dir(&self.root).await?);
300299
let entries: Vec<DirEntry> = directories.try_collect().await?;
301300
let entries = entries
@@ -315,7 +314,7 @@ impl ObjectStorage for LocalFS {
315314
}
316315

317316
async fn list_old_streams(&self) -> Result<Vec<LogStream>, ObjectStorageError> {
318-
let ignore_dir = &["lost+found", PARSEABLE_ROOT_DIRECTORY];
317+
let ignore_dir = &["lost+found", PARSEABLE_ROOT_DIRECTORY, CORRELATION_ROOT_DIRECTORY];
319318
let directories = ReadDirStream::new(fs::read_dir(&self.root).await?);
320319
let entries: Vec<DirEntry> = directories.try_collect().await?;
321320
let entries = entries

src/storage/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ pub const PARSEABLE_ROOT_DIRECTORY: &str = ".parseable";
5151
pub const SCHEMA_FILE_NAME: &str = ".schema";
5252
pub const ALERT_FILE_NAME: &str = ".alert.json";
5353
pub const MANIFEST_FILE: &str = "manifest.json";
54-
pub const CORRELATION_DIRECTORY: &str = ".correlations";
54+
pub const CORRELATION_ROOT_DIRECTORY: &str = ".correlations";
5555

5656
/// local sync interval to move data.records to /tmp dir of that stream.
5757
/// 60 sec is a reasonable value.

src/storage/object_storage.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use super::{
2121
ObjectStoreFormat, Permisssion, StorageDir, StorageMetadata,
2222
};
2323
use super::{
24-
ALERT_FILE_NAME, CORRELATION_DIRECTORY, MANIFEST_FILE, PARSEABLE_METADATA_FILE_NAME,
24+
ALERT_FILE_NAME, CORRELATION_ROOT_DIRECTORY, MANIFEST_FILE, PARSEABLE_METADATA_FILE_NAME,
2525
PARSEABLE_ROOT_DIRECTORY, SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY,
2626
};
2727

@@ -631,15 +631,15 @@ pub trait ObjectStorage: Send + Sync + 'static {
631631
correlation: &CorrelationConfig,
632632
) -> Result<(), ObjectStorageError> {
633633
let path = RelativePathBuf::from_iter([
634-
CORRELATION_DIRECTORY,
634+
CORRELATION_ROOT_DIRECTORY,
635635
&format!("{}.json", correlation.id),
636636
]);
637637
self.put_object(&path, to_bytes(correlation)).await?;
638638
Ok(())
639639
}
640640

641641
async fn get_correlations(&self) -> Result<Vec<Bytes>, CorrelationError> {
642-
let correlation_path = RelativePathBuf::from_iter([CORRELATION_DIRECTORY]);
642+
let correlation_path = RelativePathBuf::from_iter([CORRELATION_ROOT_DIRECTORY]);
643643
let correlation_bytes = self
644644
.get_objects(
645645
Some(&correlation_path),

0 commit comments

Comments
 (0)