Skip to content

Commit 41edfe0

Browse files
authored
fix: proper error Message if storage is in an invalid state (#623)
fixes #615
1 parent 3d8ddeb commit 41edfe0

File tree

8 files changed

+76
-31
lines changed

8 files changed

+76
-31
lines changed

server/src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,9 @@ use crate::localcache::LocalCacheManager;
5555
async fn main() -> anyhow::Result<()> {
5656
env_logger::init();
5757
let storage = CONFIG.storage().get_object_store();
58+
CONFIG.validate().await?;
5859
migration::run_metadata_migration(&CONFIG).await?;
5960
let metadata = storage::resolve_parseable_metadata().await?;
60-
CONFIG.validate_staging()?;
6161
banner::print(&CONFIG, &metadata).await;
6262
rbac::map::init(&metadata);
6363
metadata.set_global();

server/src/migration.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ use crate::{
3232
storage::{ObjectStorage, ObjectStorageError},
3333
};
3434

35+
/// Migrate the metdata from v1 or v2 to v3
36+
/// This is a one time migration
3537
pub async fn run_metadata_migration(config: &Config) -> anyhow::Result<()> {
3638
let object_store = config.storage().get_object_store();
3739
let storage_metadata = get_storage_metadata(&*object_store).await?;

server/src/option.rs

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,7 @@ use std::sync::Arc;
2727
use url::Url;
2828

2929
use crate::oidc::{self, OpenidConfig};
30-
use crate::storage::{FSConfig, ObjectStorageProvider, S3Config};
31-
use crate::utils::validate_path_is_writeable;
30+
use crate::storage::{FSConfig, ObjectStorageError, ObjectStorageProvider, S3Config};
3231

3332
pub const MIN_CACHE_SIZE_BYTES: u64 = 1000u64.pow(3); // 1 GiB
3433

@@ -99,9 +98,36 @@ impl Config {
9998
}
10099
}
101100

102-
pub fn validate_staging(&self) -> anyhow::Result<()> {
103-
let staging_path = self.staging_dir();
104-
validate_path_is_writeable(staging_path)
101+
pub async fn validate(&self) -> Result<(), ObjectStorageError> {
102+
let obj_store = self.storage.get_object_store();
103+
let rel_path = relative_path::RelativePathBuf::from(".parseable.json");
104+
105+
let has_parseable_json = obj_store.get_object(&rel_path).await.is_ok();
106+
107+
let has_dirs = match obj_store.list_dirs_in_storage().await {
108+
Ok(dirs) => !dirs.is_empty(),
109+
Err(_) => false,
110+
};
111+
112+
let has_streams = obj_store.list_streams().await.is_ok();
113+
114+
if !has_dirs || has_parseable_json && has_streams {
115+
Ok(())
116+
} else if has_parseable_json && !has_streams {
117+
Err(ObjectStorageError::Custom(
118+
"Could not start the server because storage contains stale data from previous deployment, please choose an empty storage and restart the server.\nJoin us on Parseable Slack to report this incident : launchpass.com/parseable"
119+
.to_owned(),
120+
))
121+
} else if !has_parseable_json && !has_streams && has_dirs {
122+
Err(ObjectStorageError::Custom(
123+
"Could not start the server because storage contains some stale data, please provide an empty storage and restart the server.\nJoin us on Parseable Slack to report this incident : launchpass.com/parseable".to_owned(),
124+
))
125+
} else {
126+
Err(ObjectStorageError::Custom(
127+
"Could not start the server because storage contains stale data from previous deployment.\nJoin us on Parseable Slack to report this incident : launchpass.com/parseable"
128+
.to_owned()
129+
))
130+
}
105131
}
106132

107133
pub fn storage(&self) -> Arc<dyn ObjectStorageProvider + Send + Sync> {

server/src/storage/localfs.rs

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use tokio::fs::{self, DirEntry};
3232
use tokio_stream::wrappers::ReadDirStream;
3333

3434
use crate::metrics::storage::{localfs::REQUEST_RESPONSE_TIME, StorageMetrics};
35-
use crate::{option::validation, utils::validate_path_is_writeable};
35+
use crate::option::validation;
3636

3737
use super::{object_storage, LogStream, ObjectStorage, ObjectStorageError, ObjectStorageProvider};
3838

@@ -139,8 +139,8 @@ impl ObjectStorage for LocalFS {
139139
}
140140

141141
async fn check(&self) -> Result<(), ObjectStorageError> {
142-
fs::create_dir_all(&self.root).await?;
143-
validate_path_is_writeable(&self.root)
142+
fs::create_dir_all(&self.root)
143+
.await
144144
.map_err(|e| ObjectStorageError::UnhandledError(e.into()))
145145
}
146146

@@ -169,6 +169,23 @@ impl ObjectStorage for LocalFS {
169169
Ok(logstreams)
170170
}
171171

172+
async fn list_dirs_in_storage(&self) -> Result<Vec<String>, ObjectStorageError> {
173+
let dirs = ReadDirStream::new(fs::read_dir(&self.root).await?)
174+
.try_collect::<Vec<DirEntry>>()
175+
.await?
176+
.into_iter()
177+
.map(dir_name);
178+
179+
let dirs = FuturesUnordered::from_iter(dirs)
180+
.try_collect::<Vec<_>>()
181+
.await?
182+
.into_iter()
183+
.flatten()
184+
.collect::<Vec<_>>();
185+
186+
Ok(dirs)
187+
}
188+
172189
async fn list_dates(&self, stream_name: &str) -> Result<Vec<String>, ObjectStorageError> {
173190
let path = self.root.join(stream_name);
174191
let directories = ReadDirStream::new(fs::read_dir(&path).await?);

server/src/storage/object_storage.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ pub trait ObjectStorage: Sync + 'static {
7575
async fn check(&self) -> Result<(), ObjectStorageError>;
7676
async fn delete_stream(&self, stream_name: &str) -> Result<(), ObjectStorageError>;
7777
async fn list_streams(&self) -> Result<Vec<LogStream>, ObjectStorageError>;
78+
async fn list_dirs_in_storage(&self) -> Result<Vec<String>, ObjectStorageError>;
7879
async fn list_dates(&self, stream_name: &str) -> Result<Vec<String>, ObjectStorageError>;
7980
async fn upload_file(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError>;
8081

server/src/storage/s3.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -470,6 +470,18 @@ impl ObjectStorage for S3 {
470470
fn store_url(&self) -> url::Url {
471471
url::Url::parse(&format!("s3://{}", self.bucket)).unwrap()
472472
}
473+
474+
async fn list_dirs_in_storage(&self) -> Result<Vec<String>, ObjectStorageError> {
475+
let pre = object_store::path::Path::from("/");
476+
let resp = self.client.list_with_delimiter(Some(&pre)).await?;
477+
478+
Ok(resp
479+
.common_prefixes
480+
.iter()
481+
.flat_map(|path| path.parts())
482+
.map(|name| name.as_ref().to_string())
483+
.collect::<Vec<_>>())
484+
}
473485
}
474486

475487
impl From<object_store::Error> for ObjectStorageError {

server/src/storage/store_metadata.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,8 @@ impl StorageMetadata {
9292
}
9393
}
9494

95-
// always returns remote metadata as it is source of truth
96-
// overwrites staging metadata while updating storage info
95+
/// always returns remote metadata as it is source of truth
96+
/// overwrites staging metadata while updating storage info
9797
pub async fn resolve_parseable_metadata() -> Result<StorageMetadata, ObjectStorageError> {
9898
let staging_metadata = get_staging_metadata()?;
9999
let storage = CONFIG.storage().get_object_store();
@@ -168,7 +168,7 @@ pub async fn resolve_parseable_metadata() -> Result<StorageMetadata, ObjectStora
168168
// variant contain remote metadata
169169
#[derive(Debug, Clone, PartialEq, Eq)]
170170
pub enum EnvChange {
171-
/// No change in env i.e both staging and remote have same id
171+
/// No change in env i.e both staging and remote have same id
172172
/// or deployment id of staging is not matching with that of remote
173173
None(StorageMetadata),
174174
/// Metadata not found in storage. Treated as possible misconfiguration on user side.

server/src/utils.rs

Lines changed: 6 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@ pub mod json;
2323
pub mod uid;
2424
pub mod update;
2525

26-
use std::path::Path;
27-
2826
use chrono::{DateTime, NaiveDate, Timelike, Utc};
2927

3028
#[allow(dead_code)]
@@ -43,17 +41,6 @@ pub fn capitalize_ascii(s: &str) -> String {
4341
s[0..1].to_uppercase() + &s[1..]
4442
}
4543

46-
pub fn validate_path_is_writeable(path: &Path) -> anyhow::Result<()> {
47-
let Ok(md) = std::fs::metadata(path) else {
48-
anyhow::bail!("Could not read metadata for staging dir")
49-
};
50-
let permissions = md.permissions();
51-
if permissions.readonly() {
52-
anyhow::bail!("Staging directory {} is not writable", path.display())
53-
}
54-
Ok(())
55-
}
56-
5744
/// Convert minutes to a slot range
5845
/// e.g. given minute = 15 and OBJECT_STORE_DATA_GRANULARITY = 10 returns "10-19"
5946
pub fn minute_to_slot(minute: u32, data_granularity: u32) -> Option<String> {
@@ -263,7 +250,7 @@ mod tests {
263250
]
264251
)]
265252
#[case::same_hour_with_00_to_59_minute_block(
266-
"2022-06-11T16:00:00+00:00", "2022-06-11T16:59:59+00:00",
253+
"2022-06-11T16:00:00+00:00", "2022-06-11T16:59:59+00:00",
267254
&["date=2022-06-11/hour=16/"]
268255
)]
269256
#[case::same_date_different_hours_coherent_minute(
@@ -274,37 +261,37 @@ mod tests {
274261
]
275262
)]
276263
#[case::same_date_different_hours_incoherent_minutes(
277-
"2022-06-11T15:59:00+00:00", "2022-06-11T16:01:00+00:00",
264+
"2022-06-11T15:59:00+00:00", "2022-06-11T16:01:00+00:00",
278265
&[
279266
"date=2022-06-11/hour=15/minute=59/",
280267
"date=2022-06-11/hour=16/minute=00/"
281268
]
282269
)]
283270
#[case::same_date_different_hours_whole_hours_between_incoherent_minutes(
284-
"2022-06-11T15:59:00+00:00", "2022-06-11T17:01:00+00:00",
271+
"2022-06-11T15:59:00+00:00", "2022-06-11T17:01:00+00:00",
285272
&[
286273
"date=2022-06-11/hour=15/minute=59/",
287274
"date=2022-06-11/hour=16/",
288275
"date=2022-06-11/hour=17/minute=00/"
289276
]
290277
)]
291278
#[case::different_date_coherent_hours_and_minutes(
292-
"2022-06-11T00:00:00+00:00", "2022-06-13T00:00:00+00:00",
279+
"2022-06-11T00:00:00+00:00", "2022-06-13T00:00:00+00:00",
293280
&[
294281
"date=2022-06-11/",
295282
"date=2022-06-12/"
296283
]
297284
)]
298285
#[case::different_date_incoherent_hours_coherent_minutes(
299-
"2022-06-11T23:00:01+00:00", "2022-06-12T01:59:59+00:00",
286+
"2022-06-11T23:00:01+00:00", "2022-06-12T01:59:59+00:00",
300287
&[
301288
"date=2022-06-11/hour=23/",
302289
"date=2022-06-12/hour=00/",
303290
"date=2022-06-12/hour=01/"
304291
]
305292
)]
306293
#[case::different_date_incoherent_hours_incoherent_minutes(
307-
"2022-06-11T23:59:59+00:00", "2022-06-12T00:01:00+00:00",
294+
"2022-06-11T23:59:59+00:00", "2022-06-12T00:01:00+00:00",
308295
&[
309296
"date=2022-06-11/hour=23/minute=59/",
310297
"date=2022-06-12/hour=00/minute=00/"

0 commit comments

Comments
 (0)