Skip to content

Commit b4b5ca2

Browse files
authored
Add option to use local filesystem for storage (#217)
This PR enables Parseable server to accept additional arguments ( drive or s3) to switch between storage. Local filesystem behaves like an object storage through implementation of ObjectStorage trait. All methods of LocalFS are implemented using tokio::fs Configuration provided on runtime is used to select a storage provider. The storage object is handled as trait objects hence the requirement of Sized on some methods. Calls to S3 are replaced with appropriate methods.
1 parent ee2d7c8 commit b4b5ca2

File tree

16 files changed

+1192
-983
lines changed

16 files changed

+1192
-983
lines changed

server/Cargo.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ object_store = { version = "0.5.1", features = ["aws"] }
2525
derive_more = "0.99.17"
2626
env_logger = "0.9.0"
2727
futures = "0.3"
28+
fs_extra = "1.2.0"
2829
http = "0.2.4"
2930
humantime-serde = "1.1.1"
3031
lazy_static = "1.4.0"
@@ -33,6 +34,7 @@ num_cpus = "1.0.0"
3334
os_info = "3.0.7"
3435
hostname = "0.3"
3536
rand = "0.8.4"
37+
relative-path = "1.7.2"
3638
rustls = "0.20.6"
3739
rustls-pemfile = "1.0.1"
3840
rust-flatten-json = "0.2.0"
@@ -43,10 +45,11 @@ serde_json = "^1.0.8"
4345
sysinfo = "0.26.4"
4446
thiserror = "1"
4547
thread-priority = "0.9.2"
46-
tokio-stream = "0.1.8"
48+
tokio-stream = { version = "0.1.8", features = ["fs"] }
4749
tokio = { version = "1.13.1", default-features = false, features = [
4850
"sync",
4951
"macros",
52+
"fs",
5053
] }
5154
clokwerk = "0.4.0-rc1"
5255
actix-web-static-files = "4.0"

server/src/event.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@ use std::sync::RwLock;
3535

3636
use crate::metadata;
3737
use crate::metadata::LOCK_EXPECT;
38-
use crate::s3;
39-
use crate::storage::{ObjectStorage, StorageDir};
38+
use crate::option::CONFIG;
39+
use crate::storage::{ObjectStorageProvider, StorageDir};
4040

4141
use self::error::{EventError, StreamWriterError};
4242

@@ -184,7 +184,7 @@ impl Event {
184184
} else {
185185
// if stream schema is none then it is first event,
186186
// process first event and store schema in obect store
187-
self.process_first_event::<s3::S3, _>(event, inferred_schema)?
187+
self.process_first_event(event, inferred_schema)?
188188
};
189189

190190
metadata::STREAM_INFO.update_stats(
@@ -202,7 +202,7 @@ impl Event {
202202
// This is called when the first event of a log stream is received. The first event is
203203
// special because we parse this event to generate the schema for the log stream. This
204204
// schema is then enforced on rest of the events sent to this log stream.
205-
fn process_first_event<S: ObjectStorage, R: std::io::Read>(
205+
fn process_first_event<R: std::io::Read>(
206206
&self,
207207
event: json::Reader<R>,
208208
schema: Schema,
@@ -241,13 +241,13 @@ impl Event {
241241
"setting schema on objectstore for logstream {}",
242242
stream_name
243243
);
244-
let storage = S::new();
244+
let storage = CONFIG.storage().get_object_store();
245245

246246
let stream_name = stream_name.clone();
247247
spawn(async move {
248-
if let Err(e) = storage.put_schema(stream_name.clone(), &schema).await {
248+
if let Err(e) = storage.put_schema(&stream_name, &schema).await {
249249
// If this call has failed then currently there is no right way to make local state consistent
250-
// this needs a fix after more constraints are safety guarentee is provided by localwriter and s3_sync.
250+
// this needs a fix after more constraints are safety guarentee is provided by localwriter and objectstore_sync.
251251
// Reasoning -
252252
// - After dropping lock many events may process through
253253
// - Processed events may sync before metadata deletion

server/src/handlers/event.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,10 @@ use actix_web::{web, HttpRequest, HttpResponse};
2222
use serde_json::Value;
2323

2424
use crate::event;
25+
use crate::option::CONFIG;
2526
use crate::query::Query;
2627
use crate::response::QueryResponse;
27-
use crate::s3::S3;
28+
use crate::storage::ObjectStorageProvider;
2829
use crate::utils::header_parsing::{collect_labelled_headers, ParseHeaderError};
2930
use crate::utils::{self, flatten_json_body, merge};
3031

@@ -39,9 +40,9 @@ pub async fn query(_req: HttpRequest, json: web::Json<Value>) -> Result<HttpResp
3940
let json = json.into_inner();
4041
let query = Query::parse(json)?;
4142

42-
let storage = S3::new();
43+
let storage = CONFIG.storage().get_object_store();
4344

44-
let query_result = query.execute(&storage).await;
45+
let query_result = query.execute(&*storage).await;
4546

4647
query_result
4748
.map(Into::<QueryResponse>::into)

server/src/handlers/logstream.rs

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ use chrono::Utc;
2424
use serde_json::Value;
2525

2626
use crate::alerts::Alerts;
27-
use crate::s3::S3;
28-
use crate::storage::{ObjectStorage, StorageDir};
27+
use crate::option::CONFIG;
28+
use crate::storage::{ObjectStorageProvider, StorageDir};
2929
use crate::{event, response};
3030
use crate::{metadata, validator};
3131

@@ -40,17 +40,17 @@ pub async fn delete(req: HttpRequest) -> HttpResponse {
4040
.to_http();
4141
}
4242

43-
let s3 = S3::new();
43+
let objectstore = CONFIG.storage().get_object_store();
4444

45-
if s3.get_schema(&stream_name).await.is_err() {
45+
if objectstore.get_schema(&stream_name).await.is_err() {
4646
return response::ServerResponse {
4747
msg: format!("log stream {} does not exist", stream_name),
4848
code: StatusCode::BAD_REQUEST,
4949
}
5050
.to_http();
5151
}
5252

53-
if let Err(e) = s3.delete_stream(&stream_name).await {
53+
if let Err(e) = objectstore.delete_stream(&stream_name).await {
5454
return response::ServerResponse {
5555
msg: format!(
5656
"failed to delete log stream {} due to err: {}",
@@ -87,7 +87,14 @@ pub async fn delete(req: HttpRequest) -> HttpResponse {
8787
}
8888

8989
pub async fn list(_: HttpRequest) -> impl Responder {
90-
response::list_response(S3::new().list_streams().await.unwrap())
90+
response::list_response(
91+
CONFIG
92+
.storage()
93+
.get_object_store()
94+
.list_streams()
95+
.await
96+
.unwrap(),
97+
)
9198
}
9299

93100
pub async fn schema(req: HttpRequest) -> HttpResponse {
@@ -101,7 +108,12 @@ pub async fn schema(req: HttpRequest) -> HttpResponse {
101108
code: StatusCode::OK,
102109
}
103110
.to_http(),
104-
Err(_) => match S3::new().get_schema(&stream_name).await {
111+
Err(_) => match CONFIG
112+
.storage()
113+
.get_object_store()
114+
.get_schema(&stream_name)
115+
.await
116+
{
105117
Ok(None) => response::ServerResponse {
106118
msg: "log stream is not initialized, please post an event before fetching schema"
107119
.to_string(),
@@ -136,7 +148,12 @@ pub async fn get_alert(req: HttpRequest) -> HttpResponse {
136148

137149
let mut alerts = match alerts {
138150
Some(alerts) => alerts,
139-
None => match S3::new().get_alerts(&stream_name).await {
151+
None => match CONFIG
152+
.storage()
153+
.get_object_store()
154+
.get_alerts(&stream_name)
155+
.await
156+
{
140157
Ok(alerts) if alerts.alerts.is_empty() => {
141158
return response::ServerResponse {
142159
msg: "alert configuration not set for log stream {}".to_string(),
@@ -233,7 +250,12 @@ pub async fn put_alert(req: HttpRequest, body: web::Json<serde_json::Value>) ->
233250
}
234251
}
235252

236-
if let Err(e) = S3::new().put_alerts(&stream_name, &alerts).await {
253+
if let Err(e) = CONFIG
254+
.storage()
255+
.get_object_store()
256+
.put_alerts(&stream_name, &alerts)
257+
.await
258+
{
237259
return response::ServerResponse {
238260
msg: format!(
239261
"failed to set alert configuration for log stream {} due to err: {}",
@@ -333,8 +355,8 @@ pub async fn create_stream_if_not_exists(stream_name: String) -> HttpResponse {
333355
}
334356

335357
// Proceed to create log stream if it doesn't exist
336-
let s3 = S3::new();
337-
if let Err(e) = s3.create_stream(&stream_name).await {
358+
let storage = CONFIG.storage().get_object_store();
359+
if let Err(e) = storage.create_stream(&stream_name).await {
338360
// Fail if unable to create log stream on object store backend
339361
response::ServerResponse {
340362
msg: format!(

server/src/handlers/mod.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,7 @@ use actix_web::http::StatusCode;
2323
use actix_web::HttpResponse;
2424
use sysinfo::{System, SystemExt};
2525

26-
use crate::s3::S3;
27-
use crate::storage::ObjectStorage;
26+
use crate::{option::CONFIG, storage::ObjectStorageProvider};
2827

2928
pub async fn liveness() -> HttpResponse {
3029
// If the available memory is less than 100MiB, return a 503 error.
@@ -37,7 +36,7 @@ pub async fn liveness() -> HttpResponse {
3736
}
3837

3938
pub async fn readiness() -> HttpResponse {
40-
if let Ok(()) = S3::new().check().await {
39+
if CONFIG.storage().get_object_store().check().await.is_ok() {
4140
return HttpResponse::new(StatusCode::OK);
4241
}
4342

server/src/main.rs

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -47,15 +47,14 @@ mod metadata;
4747
mod option;
4848
mod query;
4949
mod response;
50-
mod s3;
5150
mod stats;
5251
mod storage;
5352
mod utils;
5453
mod validator;
5554

5655
use option::CONFIG;
57-
use s3::S3;
58-
use storage::ObjectStorage;
56+
57+
use crate::storage::ObjectStorageProvider;
5958

6059
// Global configurations
6160
const MAX_EVENT_PAYLOAD_SIZE: usize = 1024000;
@@ -67,44 +66,45 @@ async fn main() -> anyhow::Result<()> {
6766
env_logger::init();
6867
CONFIG.print();
6968
CONFIG.validate();
70-
let storage = S3::new();
71-
CONFIG.validate_storage(&storage).await;
72-
if let Err(e) = metadata::STREAM_INFO.load(&storage).await {
69+
let storage = CONFIG.storage().get_object_store();
70+
CONFIG.validate_storage(&*storage).await;
71+
if let Err(e) = metadata::STREAM_INFO.load(&*storage).await {
7372
warn!("could not populate local metadata. {:?}", e);
7473
}
7574
// track all parquet files already in the data directory
7675
storage::CACHED_FILES.track_parquet();
7776

7877
let (localsync_handler, mut localsync_outbox, localsync_inbox) = run_local_sync();
79-
let (mut s3sync_handler, mut s3sync_outbox, mut s3sync_inbox) = s3_sync();
78+
let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) =
79+
object_store_sync();
8080

8181
let app = run_http();
8282
tokio::pin!(app);
8383
loop {
8484
tokio::select! {
8585
e = &mut app => {
8686
// actix server finished .. stop other threads and stop the server
87-
s3sync_inbox.send(()).unwrap_or(());
87+
remote_sync_inbox.send(()).unwrap_or(());
8888
localsync_inbox.send(()).unwrap_or(());
8989
localsync_handler.join().unwrap_or(());
90-
s3sync_handler.join().unwrap_or(());
90+
remote_sync_handler.join().unwrap_or(());
9191
return e
9292
},
9393
_ = &mut localsync_outbox => {
9494
// crash the server if localsync fails for any reason
9595
// panic!("Local Sync thread died. Server will fail now!")
9696
return Err(anyhow::Error::msg("Failed to sync local data to disc. This can happen due to critical error in disc or environment. Please restart the Parseable server.\n\nJoin us on Parseable Slack if the issue persists after restart : https://launchpass.com/parseable"))
9797
},
98-
_ = &mut s3sync_outbox => {
99-
// s3sync failed, this is recoverable by just starting s3sync thread again
100-
s3sync_handler.join().unwrap_or(());
101-
(s3sync_handler, s3sync_outbox, s3sync_inbox) = s3_sync();
98+
_ = &mut remote_sync_outbox => {
99+
// remote_sync failed, this is recoverable by just starting remote_sync thread again
100+
remote_sync_handler.join().unwrap_or(());
101+
(remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = object_store_sync();
102102
}
103103
};
104104
}
105105
}
106106

107-
fn s3_sync() -> (JoinHandle<()>, oneshot::Receiver<()>, oneshot::Sender<()>) {
107+
fn object_store_sync() -> (JoinHandle<()>, oneshot::Receiver<()>, oneshot::Sender<()>) {
108108
let (outbox_tx, outbox_rx) = oneshot::channel::<()>();
109109
let (inbox_tx, inbox_rx) = oneshot::channel::<()>();
110110
let mut inbox_rx = AssertUnwindSafe(inbox_rx);
@@ -116,7 +116,7 @@ fn s3_sync() -> (JoinHandle<()>, oneshot::Receiver<()>, oneshot::Sender<()>) {
116116
scheduler
117117
.every((CONFIG.parseable.upload_interval as u32).seconds())
118118
.run(|| async {
119-
if let Err(e) = S3::new().s3_sync().await {
119+
if let Err(e) = CONFIG.storage().get_object_store().sync().await {
120120
warn!("failed to sync local data with object store. {:?}", e);
121121
}
122122
});

server/src/metadata.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,8 +116,8 @@ impl STREAM_INFO {
116116
map.remove(stream_name);
117117
}
118118

119-
pub async fn load(&self, storage: &impl ObjectStorage) -> Result<(), LoadError> {
120-
// When loading streams this function will assume list_streams only returns valid streams.
119+
pub async fn load(&self, storage: &(impl ObjectStorage + ?Sized)) -> Result<(), LoadError> {
120+
// When loading streams this funtion will assume list_streams only returns valid streams.
121121
// a valid stream would have a .schema file.
122122
// .schema file could be empty in that case it will be treated as an uninitialized stream.
123123
// return error in case of an error from object storage itself.

0 commit comments

Comments
 (0)