Skip to content

Commit 52f4dbc

Browse files
authored
Check parseable metadata on startup (#222)
Parseable will check main metadata on both remote and staging directory. If any critical differences are found it will stop server from starting before the differences are resolved.
1 parent 5e9f7b6 commit 52f4dbc

File tree

6 files changed

+209
-29
lines changed

6 files changed

+209
-29
lines changed

server/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ md-5 = "0.10.5"
3636
os_info = "3.0.7"
3737
hostname = "0.3"
3838
rand = "0.8.4"
39-
relative-path = "1.7.2"
39+
relative-path = { version = "1.7.2", features = ["serde"] }
4040
rustls = "0.20.6"
4141
rustls-pemfile = "1.0.1"
4242
rust-flatten-json = "0.2.0"

server/src/main.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ mod validator;
5454

5555
use option::CONFIG;
5656

57+
use crate::storage::resolve_parseable_metadata;
58+
5759
// Global configurations
5860
const MAX_EVENT_PAYLOAD_SIZE: usize = 1024000;
5961
const API_BASE_PATH: &str = "/api";
@@ -69,6 +71,7 @@ async fn main() -> anyhow::Result<()> {
6971
if let Err(e) = metadata::STREAM_INFO.load(&*storage).await {
7072
warn!("could not populate local metadata. {:?}", e);
7173
}
74+
resolve_parseable_metadata().await?;
7275
// track all parquet files already in the data directory
7376
storage::CACHED_FILES.track_parquet();
7477

server/src/option.rs

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -199,8 +199,7 @@ pub struct Server {
199199
long,
200200
env = "P_STAGING_DIR",
201201
default_value = "./data",
202-
value_name = "path",
203-
value_parser = validation::absolute_path
202+
value_name = "path"
204203
)]
205204
pub local_staging_path: PathBuf,
206205

@@ -245,7 +244,7 @@ impl Server {
245244
}
246245
}
247246

248-
pub(self) mod validation {
247+
pub mod validation {
249248
use std::path::PathBuf;
250249

251250
pub fn file_path(s: &str) -> Result<PathBuf, String> {
@@ -261,11 +260,4 @@ pub(self) mod validation {
261260

262261
Ok(path)
263262
}
264-
265-
pub fn absolute_path(s: &str) -> Result<PathBuf, String> {
266-
std::fs::canonicalize(s).map_err(|_| {
267-
"Could not construct absolute path from given path value for staging directory"
268-
.to_string()
269-
})
270-
}
271263
}

server/src/storage.rs

Lines changed: 52 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,17 +29,22 @@ use datafusion::parquet::errors::ParquetError;
2929
use lazy_static::lazy_static;
3030
use serde::{Deserialize, Serialize};
3131

32+
use std::fs::create_dir_all;
3233
use std::path::PathBuf;
3334
use std::sync::{Arc, Mutex};
3435

3536
mod file_link;
3637
mod localfs;
3738
mod object_storage;
3839
mod s3;
40+
mod store_metadata;
3941

4042
pub use localfs::{FSConfig, LocalFS};
4143
pub use object_storage::{ObjectStorage, ObjectStorageProvider};
4244
pub use s3::{S3Config, S3};
45+
pub use store_metadata::StorageMetadata;
46+
47+
use self::store_metadata::{put_staging_metadata, startup_check, EnvChange};
4348

4449
/// local sync interval to move data.records to /tmp dir of that stream.
4550
/// 60 sec is a reasonable value.
@@ -132,6 +137,50 @@ impl ObjectStoreFormat {
132137
}
133138
}
134139

140+
pub async fn resolve_parseable_metadata() -> Result<(), ObjectStorageError> {
141+
let check = startup_check().await?;
142+
const MISMATCH: &str = "Could not start the server because metadata file found in staging directory does not match one in the storage";
143+
let err: Option<&str> = match check {
144+
EnvChange::None => None,
145+
EnvChange::StagingMismatch => Some(MISMATCH),
146+
EnvChange::StorageMismatch => Some(MISMATCH),
147+
EnvChange::NewRemote => {
148+
Some("Could not start the server because metadata not found in storage")
149+
}
150+
EnvChange::NewStaging => {
151+
Some("Could not start the server becuase metadata not found in staging")
152+
}
153+
EnvChange::CreateBoth => {
154+
create_staging_metadata()?;
155+
create_remote_metadata().await?;
156+
None
157+
}
158+
};
159+
160+
if let Some(err) = err {
161+
let err = format!(
162+
"{}. {}",
163+
err,
164+
"Join us on Parseable Slack to report this incident : https://launchpass.com/parseable"
165+
);
166+
let err: Box<dyn std::error::Error + Send + Sync + 'static> = err.into();
167+
Err(ObjectStorageError::UnhandledError(err))
168+
} else {
169+
Ok(())
170+
}
171+
}
172+
173+
async fn create_remote_metadata() -> Result<(), ObjectStorageError> {
174+
let client = CONFIG.storage().get_object_store();
175+
client.put_metadata(&StorageMetadata::new()).await
176+
}
177+
178+
fn create_staging_metadata() -> std::io::Result<()> {
179+
create_dir_all(CONFIG.staging_dir())?;
180+
let metadata = StorageMetadata::new();
181+
put_staging_metadata(&metadata)
182+
}
183+
135184
lazy_static! {
136185
pub static ref CACHED_FILES: Mutex<FileTable<FileLink>> = Mutex::new(FileTable::new());
137186
pub static ref STORAGE_RUNTIME: Arc<RuntimeEnv> = CONFIG.storage().get_datafusion_runtime();
@@ -231,7 +280,7 @@ pub enum ObjectStorageError {
231280

232281
// Could not connect to object storage
233282
#[error("Connection Error: {0}")]
234-
ConnectionError(Box<dyn std::error::Error + Send + 'static>),
283+
ConnectionError(Box<dyn std::error::Error + Send + Sync + 'static>),
235284

236285
// IO Error when reading a file or listing path
237286
#[error("IO Error: {0}")]
@@ -242,8 +291,8 @@ pub enum ObjectStorageError {
242291
DataFusionError(#[from] datafusion::error::DataFusionError),
243292

244293
#[error("Unhandled Error: {0}")]
245-
UnhandledError(Box<dyn std::error::Error + Send + 'static>),
294+
UnhandledError(Box<dyn std::error::Error + Send + Sync + 'static>),
246295

247296
#[error("Authentication Error: {0}")]
248-
AuthenticationError(Box<dyn std::error::Error + Send + 'static>),
297+
AuthenticationError(Box<dyn std::error::Error + Send + Sync + 'static>),
249298
}

server/src/storage/object_storage.rs

Lines changed: 47 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
use super::{
2020
file_link::CacheState, AccessObject, LogStream, MoveDataError, ObjectStorageError,
21-
ObjectStoreFormat, StorageDir, CACHED_FILES,
21+
ObjectStoreFormat, StorageDir, StorageMetadata, CACHED_FILES,
2222
};
2323
use crate::{alerts::Alerts, metadata::STREAM_INFO, option::CONFIG, query::Query, stats::Stats};
2424

@@ -44,7 +44,8 @@ use std::{
4444
};
4545

4646
// metadata file names in a Stream prefix
47-
const METADATA_FILE_NAME: &str = ".metadata.json";
47+
const STREAM_METADATA_FILE_NAME: &str = ".metadata.json";
48+
pub(super) const PARSEABLE_METADATA_FILE_NAME: &str = ".parseable.json";
4849
const SCHEMA_FILE_NAME: &str = ".schema";
4950
const ALERT_FILE_NAME: &str = ".alert.json";
5051

@@ -89,7 +90,7 @@ pub trait ObjectStorage: Sync + 'static {
8990

9091
self.put_object(&schema_path(stream_name), "".into())
9192
.await?;
92-
self.put_object(&metadata_json_path(stream_name), format_json)
93+
self.put_object(&stream_json_path(stream_name), format_json)
9394
.await?;
9495

9596
Ok(())
@@ -105,15 +106,23 @@ pub trait ObjectStorage: Sync + 'static {
105106
}
106107

107108
async fn put_stats(&self, stream_name: &str, stats: &Stats) -> Result<(), ObjectStorageError> {
108-
let path = metadata_json_path(stream_name);
109-
let parseable_metadata = self.get_object(&path).await?;
109+
let path = stream_json_path(stream_name);
110+
let stream_metadata = self.get_object(&path).await?;
110111
let stats = serde_json::to_value(stats).expect("stats are perfectly serializable");
111-
let mut parseable_metadata: serde_json::Value =
112-
serde_json::from_slice(&parseable_metadata).expect("parseable config is valid json");
112+
let mut stream_metadata: serde_json::Value =
113+
serde_json::from_slice(&stream_metadata).expect("parseable config is valid json");
113114

114-
parseable_metadata["stats"] = stats;
115+
stream_metadata["stats"] = stats;
115116

116-
self.put_object(&path, to_bytes(&parseable_metadata)).await
117+
self.put_object(&path, to_bytes(&stream_metadata)).await
118+
}
119+
120+
async fn put_metadata(
121+
&self,
122+
parseable_metadata: &StorageMetadata,
123+
) -> Result<(), ObjectStorageError> {
124+
self.put_object(&parseable_json_path(), to_bytes(parseable_metadata))
125+
.await
117126
}
118127

119128
async fn get_schema(&self, stream_name: &str) -> Result<Option<Schema>, ObjectStorageError> {
@@ -133,17 +142,35 @@ pub trait ObjectStorage: Sync + 'static {
133142
}
134143

135144
async fn get_stats(&self, stream_name: &str) -> Result<Stats, ObjectStorageError> {
136-
let parseable_metadata = self.get_object(&metadata_json_path(stream_name)).await?;
137-
let parseable_metadata: Value =
138-
serde_json::from_slice(&parseable_metadata).expect("parseable config is valid json");
145+
let stream_metadata = self.get_object(&stream_json_path(stream_name)).await?;
146+
let stream_metadata: Value =
147+
serde_json::from_slice(&stream_metadata).expect("parseable config is valid json");
139148

140-
let stats = &parseable_metadata["stats"];
149+
let stats = &stream_metadata["stats"];
141150

142151
let stats = serde_json::from_value(stats.clone()).unwrap_or_default();
143152

144153
Ok(stats)
145154
}
146155

156+
async fn get_metadata(&self) -> Result<Option<StorageMetadata>, ObjectStorageError> {
157+
let parseable_metadata: Option<StorageMetadata> =
158+
match self.get_object(&parseable_json_path()).await {
159+
Ok(bytes) => {
160+
Some(serde_json::from_slice(&bytes).expect("parseable config is valid json"))
161+
}
162+
Err(err) => {
163+
if matches!(err, ObjectStorageError::NoSuchKey(_)) {
164+
None
165+
} else {
166+
return Err(err);
167+
}
168+
}
169+
};
170+
171+
Ok(parseable_metadata)
172+
}
173+
147174
async fn sync(&self) -> Result<(), MoveDataError> {
148175
if !Path::new(&CONFIG.staging_dir()).exists() {
149176
return Ok(());
@@ -265,8 +292,13 @@ fn schema_path(stream_name: &str) -> RelativePathBuf {
265292
}
266293

267294
#[inline(always)]
268-
fn metadata_json_path(stream_name: &str) -> RelativePathBuf {
269-
RelativePathBuf::from_iter([stream_name, METADATA_FILE_NAME])
295+
fn stream_json_path(stream_name: &str) -> RelativePathBuf {
296+
RelativePathBuf::from_iter([stream_name, STREAM_METADATA_FILE_NAME])
297+
}
298+
299+
#[inline(always)]
300+
fn parseable_json_path() -> RelativePathBuf {
301+
RelativePathBuf::from(PARSEABLE_METADATA_FILE_NAME)
270302
}
271303

272304
#[inline(always)]

server/src/storage/store_metadata.rs

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
use std::{
2+
fs::{self, OpenOptions},
3+
path::PathBuf,
4+
};
5+
6+
use serde::{Deserialize, Serialize};
7+
use std::io;
8+
9+
use crate::{option::CONFIG, utils::hostname_unchecked};
10+
11+
use super::{object_storage::PARSEABLE_METADATA_FILE_NAME, ObjectStorageError};
12+
13+
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
14+
pub struct StorageMetadata {
15+
pub version: String,
16+
pub mode: String,
17+
pub staging: PathBuf,
18+
pub storage: String,
19+
pub deployment_id: String,
20+
pub user: Vec<User>,
21+
pub stream: Vec<String>,
22+
}
23+
24+
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
25+
pub struct User {
26+
username: String,
27+
password: String,
28+
role: String,
29+
}
30+
31+
impl StorageMetadata {
32+
pub fn new() -> Self {
33+
Self {
34+
version: "v1".to_string(),
35+
mode: CONFIG.storage_name.to_owned(),
36+
staging: CONFIG.staging_dir().canonicalize().unwrap(),
37+
storage: CONFIG.storage().get_endpoint(),
38+
deployment_id: hostname_unchecked(),
39+
user: Vec::new(),
40+
stream: Vec::new(),
41+
}
42+
}
43+
}
44+
45+
pub async fn startup_check() -> Result<EnvChange, ObjectStorageError> {
46+
let staging_metadata = get_staging_metadata()?;
47+
let storage = CONFIG.storage().get_object_store();
48+
let remote_metadata = storage.get_metadata().await?;
49+
50+
Ok(check_metadata_conflict(staging_metadata, remote_metadata))
51+
}
52+
53+
fn check_metadata_conflict(
54+
staging_metadata: Option<StorageMetadata>,
55+
remote_metadata: Option<StorageMetadata>,
56+
) -> EnvChange {
57+
match (staging_metadata, remote_metadata) {
58+
(Some(staging), Some(remote)) if staging.mode == remote.mode => {
59+
if staging.storage != remote.storage {
60+
EnvChange::StorageMismatch
61+
} else if staging.staging != remote.staging {
62+
EnvChange::StagingMismatch
63+
} else {
64+
EnvChange::None
65+
}
66+
}
67+
(Some(staging), Some(remote)) if staging.mode != remote.mode => EnvChange::StorageMismatch,
68+
(None, None) => EnvChange::CreateBoth,
69+
(None, Some(_)) => EnvChange::NewStaging,
70+
(Some(_), None) => EnvChange::NewRemote,
71+
_ => unreachable!(),
72+
}
73+
}
74+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
75+
pub enum EnvChange {
76+
None,
77+
StagingMismatch,
78+
StorageMismatch,
79+
NewRemote,
80+
NewStaging,
81+
CreateBoth,
82+
}
83+
84+
fn get_staging_metadata() -> io::Result<Option<StorageMetadata>> {
85+
let path = CONFIG.staging_dir().join(PARSEABLE_METADATA_FILE_NAME);
86+
let bytes = match fs::read(path) {
87+
Ok(bytes) => bytes,
88+
Err(err) => match err.kind() {
89+
io::ErrorKind::NotFound => return Ok(None),
90+
_ => return Err(err),
91+
},
92+
};
93+
94+
let meta: StorageMetadata = serde_json::from_slice(&bytes).unwrap();
95+
96+
Ok(Some(meta))
97+
}
98+
99+
pub fn put_staging_metadata(meta: &StorageMetadata) -> io::Result<()> {
100+
let path = CONFIG.staging_dir().join(PARSEABLE_METADATA_FILE_NAME);
101+
let mut file = OpenOptions::new().create_new(true).write(true).open(path)?;
102+
serde_json::to_writer(&mut file, meta)?;
103+
Ok(())
104+
}

0 commit comments

Comments
 (0)