Skip to content

Commit ee2d7c8

Browse files
authored
Update metadata for stream (#216)
1 parent c73296d commit ee2d7c8

File tree

2 files changed

+93
-21
lines changed

2 files changed

+93
-21
lines changed

server/src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -293,7 +293,7 @@ pub fn configure_routes(cfg: &mut web::ServiceConfig) {
293293
web::resource(stats_path("{logstream}"))
294294
.route(web::get().to(handlers::logstream::get_stats)),
295295
)
296-
// GET "/liveness" ==> Livenss check as per https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/#define-a-liveness-command
296+
// GET "/liveness" ==> Liveness check as per https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/#define-a-liveness-command
297297
.service(web::resource(liveness_path()).route(web::get().to(handlers::liveness)))
298298
// GET "/readiness" ==> Readiness check as per https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/#define-readiness-probes
299299
.service(web::resource(readiness_path()).route(web::get().to(handlers::readiness)))

server/src/s3.rs

Lines changed: 92 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use aws_sdk_s3::RetryConfig;
2525
use aws_sdk_s3::{Client, Credentials, Endpoint, Region};
2626
use aws_smithy_async::rt::sleep::default_async_sleep;
2727
use bytes::Bytes;
28+
use chrono::Local;
2829
use clap::builder::ArgPredicate;
2930
use datafusion::arrow::datatypes::Schema;
3031
use datafusion::datasource::file_format::parquet::ParquetFormat;
@@ -58,23 +59,94 @@ const DEFAULT_S3_BUCKET: &str = "parseable";
5859
const DEFAULT_S3_ACCESS_KEY: &str = "minioadmin";
5960
const DEFAULT_S3_SECRET_KEY: &str = "minioadmin";
6061

62+
// metadata file names in a Stream prefix
63+
const METADATA_FILE_NAME: &str = ".metadata.json";
64+
const SCHEMA_FILE_NAME: &str = ".schema";
65+
const ALERT_FILE_NAME: &str = ".alert.json";
66+
6167
// max concurrent request allowed for datafusion object store
6268
const MAX_OBJECT_STORE_REQUESTS: usize = 1000;
6369

70+
// all the supported permissions
71+
// const PERMISSIONS_READ: &str = "readonly";
72+
// const PERMISSIONS_WRITE: &str = "writeonly";
73+
// const PERMISSIONS_DELETE: &str = "delete";
74+
// const PERMISSIONS_READ_WRITE: &str = "readwrite";
75+
const PERMISSIONS_ALL: &str = "all";
76+
6477
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
6578
pub struct ObjectStoreFormat {
66-
#[serde(rename = "objectstore-format")]
6779
pub version: String,
80+
#[serde(rename = "objectstore-format")]
81+
pub objectstore_format: String,
82+
#[serde(rename = "created-at")]
83+
pub created_at: String,
84+
pub owner: Owner,
85+
pub access: Access,
86+
}
87+
88+
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
89+
pub struct Owner {
90+
pub id: String,
91+
pub group: String,
92+
}
93+
94+
impl Owner {
95+
pub fn new(id: String, group: String) -> Self {
96+
Self { id, group }
97+
}
98+
}
99+
100+
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
101+
pub struct Access {
102+
pub objects: Vec<AccessObject>,
103+
}
104+
105+
impl Access {
106+
pub fn new(objects: Vec<AccessObject>) -> Self {
107+
Self { objects }
108+
}
109+
}
110+
111+
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
112+
pub struct AccessObject {
113+
pub id: String,
114+
pub group: String,
115+
pub permissions: Vec<String>,
116+
}
117+
118+
impl AccessObject {
119+
pub fn new(id: String) -> Self {
120+
Self {
121+
id: id.clone(),
122+
group: id,
123+
permissions: vec![PERMISSIONS_ALL.to_string()],
124+
}
125+
}
68126
}
69127

70128
impl Default for ObjectStoreFormat {
71129
fn default() -> Self {
72130
Self {
73131
version: "v1".to_string(),
132+
objectstore_format: "v1".to_string(),
133+
created_at: Local::now().to_rfc3339(),
134+
owner: Owner::new("".to_string(), "".to_string()),
135+
access: Access::new(vec![]),
74136
}
75137
}
76138
}
77139

140+
impl ObjectStoreFormat {
141+
fn set_id(&mut self, id: String) {
142+
self.owner.id.clone_from(&id);
143+
self.owner.group = id;
144+
}
145+
fn set_access(&mut self, access: Vec<AccessObject>) {
146+
self.access.objects = access;
147+
}
148+
}
149+
78150
lazy_static::lazy_static! {
79151
#[derive(Debug)]
80152
pub static ref S3_CONFIG: Arc<S3Config> = Arc::new(CONFIG.storage().clone());
@@ -219,7 +291,7 @@ impl S3 {
219291
.client
220292
.put_object()
221293
.bucket(&S3_CONFIG.s3_bucket_name)
222-
.key(format!("{}/.schema", stream_name))
294+
.key(format!("{}/{}", stream_name, SCHEMA_FILE_NAME))
223295
.body(body.into_bytes().into())
224296
.send()
225297
.await?;
@@ -235,26 +307,22 @@ impl S3 {
235307
.client
236308
.put_object()
237309
.bucket(&S3_CONFIG.s3_bucket_name)
238-
.key(format!("{}/.schema", stream_name))
310+
.key(format!("{}/{}", stream_name, SCHEMA_FILE_NAME))
239311
.send()
240312
.await?;
241-
self._put_parseable_config(stream_name, format).await?;
313+
self._put_stream_meta(stream_name, format).await?;
242314
// Prefix created on S3, now create the directory in
243315
// the local storage as well
244316
let _res = fs::create_dir_all(CONFIG.parseable.local_stream_data_path(stream_name));
245317
Ok(())
246318
}
247319

248-
async fn _put_parseable_config(
249-
&self,
250-
stream_name: &str,
251-
body: Vec<u8>,
252-
) -> Result<(), AwsSdkError> {
320+
async fn _put_stream_meta(&self, stream_name: &str, body: Vec<u8>) -> Result<(), AwsSdkError> {
253321
let _resp = self
254322
.client
255323
.put_object()
256324
.bucket(&S3_CONFIG.s3_bucket_name)
257-
.key(format!("{}/.parseable.json", stream_name))
325+
.key(format!("{}/{}", stream_name, METADATA_FILE_NAME))
258326
.body(body.into())
259327
.send()
260328
.await?;
@@ -297,7 +365,7 @@ impl S3 {
297365
.client
298366
.put_object()
299367
.bucket(&S3_CONFIG.s3_bucket_name)
300-
.key(format!("{}/.alert.json", stream_name))
368+
.key(format!("{}/{}", stream_name, ALERT_FILE_NAME))
301369
.body(body.into())
302370
.send()
303371
.await?;
@@ -306,23 +374,23 @@ impl S3 {
306374
}
307375

308376
async fn _get_schema(&self, stream_name: &str) -> Result<Bytes, AwsSdkError> {
309-
self._get(stream_name, "schema").await
377+
self._get(stream_name, SCHEMA_FILE_NAME).await
310378
}
311379

312380
async fn _alert_exists(&self, stream_name: &str) -> Result<Bytes, AwsSdkError> {
313-
self._get(stream_name, "alert.json").await
381+
self._get(stream_name, ALERT_FILE_NAME).await
314382
}
315383

316-
async fn _get_parseable_config(&self, stream_name: &str) -> Result<Bytes, AwsSdkError> {
317-
self._get(stream_name, "parseable.json").await
384+
async fn _get_stream_meta(&self, stream_name: &str) -> Result<Bytes, AwsSdkError> {
385+
self._get(stream_name, METADATA_FILE_NAME).await
318386
}
319387

320388
async fn _get(&self, stream_name: &str, resource: &str) -> Result<Bytes, AwsSdkError> {
321389
let resp = self
322390
.client
323391
.get_object()
324392
.bucket(&S3_CONFIG.s3_bucket_name)
325-
.key(format!("{}/.{}", stream_name, resource))
393+
.key(format!("{}/{}", stream_name, resource))
326394
.send()
327395
.await?;
328396
let body = resp.body.collect().await;
@@ -414,7 +482,11 @@ impl ObjectStorage for S3 {
414482
}
415483

416484
async fn create_stream(&self, stream_name: &str) -> Result<(), ObjectStorageError> {
417-
let format = ObjectStoreFormat::default();
485+
let mut format = ObjectStoreFormat::default();
486+
format.set_id(CONFIG.parseable.username.clone());
487+
let access_object = AccessObject::new(CONFIG.parseable.username.clone());
488+
format.set_access(vec![access_object]);
489+
418490
let body = serde_json::to_vec(&format)?;
419491
self._create_stream(stream_name, body).await?;
420492

@@ -440,13 +512,13 @@ impl ObjectStorage for S3 {
440512

441513
async fn put_stats(&self, stream_name: &str, stats: &Stats) -> Result<(), ObjectStorageError> {
442514
let stats = serde_json::to_value(stats).expect("stats are perfectly serializable");
443-
let parseable_metadata = self._get_parseable_config(stream_name).await?;
515+
let parseable_metadata = self._get_stream_meta(stream_name).await?;
444516
let mut parseable_metadata: Value =
445517
serde_json::from_slice(&parseable_metadata).expect("parseable config is valid json");
446518

447519
parseable_metadata["stats"] = stats;
448520

449-
self._put_parseable_config(stream_name, parseable_metadata.to_string().into_bytes())
521+
self._put_stream_meta(stream_name, parseable_metadata.to_string().into_bytes())
450522
.await?;
451523
Ok(())
452524
}
@@ -470,7 +542,7 @@ impl ObjectStorage for S3 {
470542
}
471543

472544
async fn get_stats(&self, stream_name: &str) -> Result<Stats, ObjectStorageError> {
473-
let parseable_metadata = self._get_parseable_config(stream_name).await?;
545+
let parseable_metadata = self._get_stream_meta(stream_name).await?;
474546
let parseable_metadata: Value =
475547
serde_json::from_slice(&parseable_metadata).expect("parseable config is valid json");
476548

0 commit comments

Comments
 (0)