Skip to content

Commit 0549f0a

Browse files
authored
Fix logstream listing and deletion (#72)
- Delete local data when stream is deleted. - Use `/.schema` as prefix when listing streams. - As s3 sync is an independent thread there is no guarantee of ordering between when stream list is read and when it is deleted. However when a stream is deleted, its `.schema` is gone. So by checking for `.schema` valid streams can be listed. Fixes #54 .
1 parent 7205cc2 commit 0549f0a

File tree

3 files changed

+25
-17
lines changed

3 files changed

+25
-17
lines changed

server/src/handlers/logstream.rs

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,15 @@
1616
*
1717
*/
1818

19+
use std::fs;
20+
1921
use actix_web::http::StatusCode;
2022
use actix_web::{web, HttpRequest, HttpResponse, Responder};
2123

2224
use crate::alerts::Alerts;
2325
use crate::response;
2426
use crate::s3::S3;
25-
use crate::storage::ObjectStorage;
27+
use crate::storage::{ObjectStorage, StorageDir};
2628
use crate::{metadata, validator};
2729

2830
pub async fn delete(req: HttpRequest) -> HttpResponse {
@@ -57,15 +59,21 @@ pub async fn delete(req: HttpRequest) -> HttpResponse {
5759
.to_http();
5860
}
5961

62+
let stream_dir = StorageDir::new(&stream_name);
63+
if fs::remove_dir_all(&stream_dir.data_path).is_err() {
64+
log::warn!(
65+
"failed to delete local data for stream {}. Clean {} manually",
66+
stream_name,
67+
stream_dir.data_path.to_string_lossy()
68+
)
69+
}
70+
6071
if let Err(e) = metadata::STREAM_INFO.delete_stream(&stream_name) {
61-
return response::ServerResponse {
62-
msg: format!(
63-
"failed to delete log stream {} from metadata due to err: {}",
64-
stream_name, e
65-
),
66-
code: StatusCode::INTERNAL_SERVER_ERROR,
67-
}
68-
.to_http();
72+
log::warn!(
73+
"failed to delete log stream {} from metadata due to err: {}",
74+
stream_name,
75+
e
76+
)
6977
}
7078

7179
response::ServerResponse {

server/src/s3.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,7 @@ impl S3 {
297297
.client
298298
.list_objects_v2()
299299
.bucket(&S3_CONFIG.s3_bucket_name)
300-
.delimiter('/')
300+
.delimiter("/.schema")
301301
.send()
302302
.await?;
303303

@@ -307,7 +307,7 @@ impl S3 {
307307
let logstreams: Vec<_> = common_prefixes
308308
.iter()
309309
.filter_map(CommonPrefix::prefix)
310-
.filter_map(|name| name.strip_suffix('/'))
310+
.filter_map(|name| name.strip_suffix("/.schema"))
311311
.map(String::from)
312312
.map(|name| LogStream { name })
313313
.collect();

server/src/storage.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ pub trait ObjectStorage: Sync + 'static {
7676

7777
// entries here means all the streams present on local disk
7878
for stream in streams {
79-
let sync = StorageSync::new(stream.clone());
79+
let sync = StorageSync::new(&stream);
8080

8181
// if data.parquet file not present, skip this stream
8282
if !sync.dir.parquet_path_exists() {
@@ -114,7 +114,7 @@ pub trait ObjectStorage: Sync + 'static {
114114
let streams = STREAM_INFO.list_streams();
115115

116116
for stream in streams {
117-
let dir = StorageDir::new(stream.clone());
117+
let dir = StorageDir::new(&stream);
118118

119119
for file in WalkDir::new(dir.temp_dir)
120120
.min_depth(1)
@@ -148,14 +148,14 @@ pub struct LogStream {
148148
}
149149

150150
#[derive(Debug)]
151-
struct StorageDir {
151+
pub struct StorageDir {
152152
pub data_path: PathBuf,
153153
pub temp_dir: PathBuf,
154154
}
155155

156156
impl StorageDir {
157-
fn new(stream_name: String) -> Self {
158-
let data_path = CONFIG.parseable.local_stream_data_path(&stream_name);
157+
pub fn new(stream_name: &str) -> Self {
158+
let data_path = CONFIG.parseable.local_stream_data_path(stream_name);
159159
let temp_dir = data_path.join("tmp");
160160

161161
Self {
@@ -186,7 +186,7 @@ struct StorageSync {
186186
}
187187

188188
impl StorageSync {
189-
fn new(stream_name: String) -> Self {
189+
fn new(stream_name: &str) -> Self {
190190
let dir = StorageDir::new(stream_name);
191191
let time = Utc::now();
192192
Self { dir, time }

0 commit comments

Comments
 (0)