Skip to content

Commit 6f3f961

Browse files
trueleonitisht
andauthored
Move leftover data.parquet to tmp with valid prefix on server startup (#73)
When server is restarted it should try to sync data on the local directory that is not yet synced to s3. Otherwise the data will get lost when new event arrives. Make use of last modified system time to generate a prefix and move data.parquet to tmp directory, It will be synced to s3 on next s3_sync run. Fixes #53 Co-authored-by: Nitish Tiwari <[email protected]>
1 parent b67d602 commit 6f3f961

File tree

3 files changed

+73
-4
lines changed

3 files changed

+73
-4
lines changed

server/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ object_store = { version = "0.4", features=["aws"] }
2828
derive_more = "0.99.17"
2929
env_logger = "0.9.0"
3030
futures = "0.3"
31+
filetime = "0.2.17"
3132
http = "0.2.4"
3233
lazy_static = "1.4.0"
3334
log = "0.4.14"

server/src/main.rs

Lines changed: 69 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,17 @@ use actix_web::{middleware, web, App, HttpServer};
2222
use actix_web_httpauth::extractors::basic::BasicAuth;
2323
use actix_web_httpauth::middleware::HttpAuthentication;
2424
use actix_web_static_files::ResourceFiles;
25+
use chrono::{DateTime, NaiveDateTime, Timelike, Utc};
2526
use clokwerk::{AsyncScheduler, Scheduler, TimeUnits};
27+
use filetime::FileTime;
2628
use log::warn;
2729
use openssl::ssl::{SslAcceptor, SslFiletype, SslMethod};
2830

2931
include!(concat!(env!("OUT_DIR"), "/generated.rs"));
3032

33+
use std::fs;
3134
use std::panic::{catch_unwind, AssertUnwindSafe};
35+
use std::path::Path;
3236
use std::thread::{self, JoinHandle};
3337
use std::time::Duration;
3438
use tokio::sync::oneshot;
@@ -51,7 +55,7 @@ mod validator;
5155
use error::Error;
5256
use option::CONFIG;
5357
use s3::S3;
54-
use storage::ObjectStorage;
58+
use storage::{ObjectStorage, StorageDir};
5559

5660
// Global configurations
5761
const MAX_EVENT_PAYLOAD_SIZE: usize = 102400;
@@ -69,6 +73,10 @@ async fn main() -> anyhow::Result<()> {
6973
warn!("could not populate local metadata. {:?}", e);
7074
}
7175

76+
// Move all exiting data.parquet file to their respective tmp directory
77+
// they will be synced to object store on next s3 sync cycle
78+
startup_sync();
79+
7280
let (localsync_handler, mut localsync_outbox, localsync_inbox) = run_local_sync();
7381
let (mut s3sync_handler, mut s3sync_outbox, mut s3sync_inbox) = s3_sync();
7482

@@ -98,6 +106,66 @@ async fn main() -> anyhow::Result<()> {
98106
}
99107
}
100108

109+
fn startup_sync() {
110+
if !Path::new(&CONFIG.parseable.local_disk_path).exists() {
111+
return;
112+
}
113+
114+
for stream in metadata::STREAM_INFO.list_streams() {
115+
let dir = StorageDir::new(&stream);
116+
// if data.parquet file is not present then skip this stream
117+
if !dir.parquet_path_exists() {
118+
continue;
119+
}
120+
if let Err(e) = dir.create_temp_dir() {
121+
log::error!(
122+
"Error creating tmp directory for {} due to error [{}]",
123+
&stream,
124+
e
125+
);
126+
continue;
127+
}
128+
// create prefix for this file from its last modified time
129+
let path = dir.data_path.join("data.parquet");
130+
131+
// metadata.modified gives us system time
132+
// This may not work on all platfomns
133+
let metadata = match fs::metadata(&path) {
134+
Ok(meta) => meta,
135+
Err(err) => {
136+
log::warn!(
137+
"Failed to get file metadata for {} due to {:?}. Skipping!",
138+
path.display(),
139+
err
140+
);
141+
continue;
142+
}
143+
};
144+
145+
let last_modified = FileTime::from_last_modification_time(&metadata);
146+
let last_modified = NaiveDateTime::from_timestamp(last_modified.unix_seconds(), 0);
147+
let last_modified: DateTime<Utc> = DateTime::from_utc(last_modified, Utc);
148+
149+
let uri = utils::date_to_prefix(last_modified.date())
150+
+ &utils::hour_to_prefix(last_modified.hour())
151+
+ &utils::minute_to_prefix(
152+
last_modified.minute(),
153+
storage::OBJECT_STORE_DATA_GRANULARITY,
154+
)
155+
.unwrap();
156+
let local_uri = str::replace(&uri, "/", ".");
157+
let hostname = utils::hostname_unchecked();
158+
let parquet_file_local = format!("{}{}.data.parquet", local_uri, hostname);
159+
if let Err(err) = dir.move_parquet_to_temp(parquet_file_local) {
160+
log::warn!(
161+
"Failed to move parquet file at {} to tmp directory due to error {}",
162+
path.display(),
163+
err
164+
)
165+
}
166+
}
167+
}
168+
101169
fn s3_sync() -> (JoinHandle<()>, oneshot::Receiver<()>, oneshot::Sender<()>) {
102170
let (outbox_tx, outbox_rx) = oneshot::channel::<()>();
103171
let (inbox_tx, inbox_rx) = oneshot::channel::<()>();

server/src/storage.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -164,18 +164,18 @@ impl StorageDir {
164164
}
165165
}
166166

167-
fn create_temp_dir(&self) -> io::Result<()> {
167+
pub fn create_temp_dir(&self) -> io::Result<()> {
168168
fs::create_dir_all(&self.temp_dir)
169169
}
170170

171-
fn move_parquet_to_temp(&self, filename: String) -> io::Result<()> {
171+
pub fn move_parquet_to_temp(&self, filename: String) -> io::Result<()> {
172172
fs::rename(
173173
self.data_path.join("data.parquet"),
174174
self.temp_dir.join(filename),
175175
)
176176
}
177177

178-
fn parquet_path_exists(&self) -> bool {
178+
pub fn parquet_path_exists(&self) -> bool {
179179
self.data_path.join("data.parquet").exists()
180180
}
181181
}

0 commit comments

Comments
 (0)