Skip to content

Commit dd24e09

Browse files
authored
Add version number for the current object storage format (#143)
Fixes #122
1 parent d72ce98 commit dd24e09

File tree

4 files changed

+38
-4
lines changed

4 files changed

+38
-4
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
</p>
77

88
<p align="center">
9+
<a href="https://fossunited.org/" target="_blank"><img src="http://fossunited.org/files/fossunited-badge.svg"></a>
910
<img src="https://img.shields.io/github/commit-activity/m/parseablehq/parseable" alt="commits activity monthly">
1011
<a href="https://launchpass.com/parseable" target="_blank"><img src="https://img.shields.io/badge/join%20slack-parseable-brightgreen.svg" alt="join slack"></a>
1112
<a href="https://github.com/parseablehq/parseable/stargazers" target="_blank"><img src="https://img.shields.io/github/stars/parseablehq/parseable?style=social" alt="Github stars"></a>

server/src/handlers/logstream.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,6 @@ pub async fn put(req: HttpRequest) -> HttpResponse {
166166

167167
// Proceed to create log stream if it doesn't exist
168168
if s3.get_schema(&stream_name).await.is_err() {
169-
metadata::STREAM_INFO.add_stream(stream_name.to_string(), None, Alerts::default());
170169
// Fail if unable to create log stream on object store backend
171170
if let Err(e) = s3.create_stream(&stream_name).await {
172171
// delete the stream from metadata because we couldn't create it on object store backend
@@ -180,6 +179,7 @@ pub async fn put(req: HttpRequest) -> HttpResponse {
180179
}
181180
.to_http();
182181
}
182+
metadata::STREAM_INFO.add_stream(stream_name.to_string(), None, Alerts::default());
183183
return response::ServerResponse {
184184
msg: format!("created log stream {}", stream_name),
185185
code: StatusCode::OK,

server/src/main.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use filetime::FileTime;
2828
use log::warn;
2929
use rustls::{Certificate, PrivateKey, ServerConfig};
3030
use rustls_pemfile::{certs, pkcs8_private_keys};
31+
use std::env;
3132
use thread_priority::{ThreadBuilder, ThreadPriority};
3233

3334
include!(concat!(env!("OUT_DIR"), "/generated.rs"));
@@ -133,7 +134,7 @@ fn startup_sync() {
133134
let path = dir.data_path.join("data.records");
134135

135136
// metadata.modified gives us system time
136-
// This may not work on all platfomns
137+
// This may not work on all platforms
137138
let metadata = match fs::metadata(&path) {
138139
Ok(meta) => meta,
139140
Err(err) => {

server/src/s3.rs

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use futures::StreamExt;
2222
use http::Uri;
2323
use object_store::aws::AmazonS3Builder;
2424
use object_store::limit::LimitStore;
25+
use serde::{Deserialize, Serialize};
2526
use std::fs;
2627
use std::iter::Iterator;
2728
use std::sync::Arc;
@@ -46,6 +47,20 @@ const S3_URL_ENV_VAR: &str = "P_S3_URL";
4647
// max concurrent request allowed for datafusion object store
4748
const MAX_OBJECT_STORE_REQUESTS: usize = 1000;
4849

50+
#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
51+
pub struct ObjectStoreFormat {
52+
#[serde(rename = "objectstore-format")]
53+
pub version: String,
54+
}
55+
56+
impl ObjectStoreFormat {
57+
pub fn new() -> Self {
58+
Self {
59+
version: "v1".to_string(),
60+
}
61+
}
62+
}
63+
4964
lazy_static::lazy_static! {
5065
#[derive(Debug)]
5166
pub static ref S3_CONFIG: Arc<S3Config> = Arc::new(S3Config::parse());
@@ -195,14 +210,29 @@ impl S3 {
195210
Ok(())
196211
}
197212

198-
async fn _create_stream(&self, stream_name: &str) -> Result<(), AwsSdkError> {
213+
async fn _create_stream(&self, stream_name: &str, format: Vec<u8>) -> Result<(), AwsSdkError> {
214+
// create ./schema empty file in the stream-name prefix
215+
// this indicates that the stream has been created
216+
// but doesn't have any content yet
199217
let _resp = self
200218
.client
201219
.put_object()
202220
.bucket(&S3_CONFIG.s3_bucket_name)
203221
.key(format!("{}/.schema", stream_name))
204222
.send()
205223
.await?;
224+
// create .parseable.json file in the stream-name prefix.
225+
// This indicates the format version for this stream.
226+
// This is helpful in case we may change the backend format
227+
// in the future
228+
let _resp = self
229+
.client
230+
.put_object()
231+
.bucket(&S3_CONFIG.s3_bucket_name)
232+
.key(format!("{}/.parseable.json", stream_name))
233+
.body(format.into())
234+
.send()
235+
.await?;
206236
// Prefix created on S3, now create the directory in
207237
// the local storage as well
208238
let _res = fs::create_dir_all(CONFIG.parseable.local_stream_data_path(stream_name));
@@ -357,7 +387,9 @@ impl ObjectStorage for S3 {
357387
}
358388

359389
async fn create_stream(&self, stream_name: &str) -> Result<(), ObjectStorageError> {
360-
self._create_stream(stream_name).await?;
390+
let format = ObjectStoreFormat::new();
391+
let body = serde_json::to_vec(&format)?;
392+
self._create_stream(stream_name, body).await?;
361393

362394
Ok(())
363395
}

0 commit comments

Comments
 (0)