Skip to content

Commit fdcf690

Browse files
fix: correlations CRUD operations (#1114)
save correlations to .users/<hash of userid>/correlations directory remove version from correlation request add user_id to config, store as hash of user_id server loads / saves the correlation against user_id from the incoming request --------- Co-authored-by: Devdutt Shenoi <[email protected]>
1 parent 640ee44 commit fdcf690

File tree

8 files changed

+227
-123
lines changed

8 files changed

+227
-123
lines changed

src/correlation/mod.rs renamed to src/correlation.rs

Lines changed: 34 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -45,36 +45,32 @@ pub static CORRELATIONS: Lazy<Correlation> = Lazy::new(Correlation::default);
4545
pub struct Correlation(RwLock<Vec<CorrelationConfig>>);
4646

4747
impl Correlation {
48-
pub async fn load(&self) -> Result<(), CorrelationError> {
49-
// lead correlations from storage
48+
//load correlations from storage
49+
pub async fn load(&self) -> anyhow::Result<()> {
5050
let store = CONFIG.storage().get_object_store();
51-
let all_correlations = store.get_correlations().await.unwrap_or_default();
52-
53-
let mut correlations = vec![];
54-
for corr in all_correlations {
55-
if corr.is_empty() {
56-
continue;
57-
}
58-
59-
let correlation: CorrelationConfig = match serde_json::from_slice(&corr) {
60-
Ok(c) => c,
61-
Err(e) => {
62-
error!("Unable to load correlation- {e}");
63-
continue;
64-
}
65-
};
66-
67-
correlations.push(correlation);
68-
}
51+
let all_correlations = store.get_all_correlations().await.unwrap_or_default();
52+
53+
let correlations: Vec<CorrelationConfig> = all_correlations
54+
.into_iter()
55+
.flat_map(|(_, correlations_bytes)| correlations_bytes)
56+
.filter_map(|correlation| {
57+
serde_json::from_slice(&correlation)
58+
.inspect_err(|e| {
59+
error!("Unable to load correlation: {e}");
60+
})
61+
.ok()
62+
})
63+
.collect();
6964

7065
let mut s = self.0.write().await;
71-
s.append(&mut correlations.clone());
66+
s.extend(correlations);
7267
Ok(())
7368
}
7469

7570
pub async fn list_correlations_for_user(
7671
&self,
7772
session_key: &SessionKey,
73+
user_id: &str,
7874
) -> Result<Vec<CorrelationConfig>, CorrelationError> {
7975
let correlations = self.0.read().await.iter().cloned().collect_vec();
8076

@@ -87,27 +83,29 @@ impl Correlation {
8783
.iter()
8884
.map(|t| t.table_name.clone())
8985
.collect_vec();
90-
if user_auth_for_query(&permissions, tables).is_ok() {
86+
if user_auth_for_query(&permissions, tables).is_ok() && c.user_id == user_id {
9187
user_correlations.push(c);
9288
}
9389
}
9490
Ok(user_correlations)
9591
}
9692

97-
pub async fn get_correlation_by_id(
93+
pub async fn get_correlation(
9894
&self,
9995
correlation_id: &str,
96+
user_id: &str,
10097
) -> Result<CorrelationConfig, CorrelationError> {
10198
let read = self.0.read().await;
102-
let correlation = read.iter().find(|c| c.id == correlation_id).cloned();
99+
let correlation = read
100+
.iter()
101+
.find(|c| c.id == correlation_id && c.user_id == user_id)
102+
.cloned();
103103

104-
if let Some(c) = correlation {
105-
Ok(c)
106-
} else {
107-
Err(CorrelationError::AnyhowError(anyhow::Error::msg(format!(
104+
correlation.ok_or_else(|| {
105+
CorrelationError::AnyhowError(anyhow::Error::msg(format!(
108106
"Unable to find correlation with ID- {correlation_id}"
109-
))))
110-
}
107+
)))
108+
})
111109
}
112110

113111
pub async fn update(&self, correlation: &CorrelationConfig) -> Result<(), CorrelationError> {
@@ -152,6 +150,7 @@ pub struct CorrelationConfig {
152150
pub version: CorrelationVersion,
153151
pub title: String,
154152
pub id: String,
153+
pub user_id: String,
155154
pub table_configs: Vec<TableConfig>,
156155
pub join_config: JoinConfig,
157156
pub filter: Option<FilterQuery>,
@@ -164,7 +163,6 @@ impl CorrelationConfig {}
164163
#[derive(Debug, Clone, Serialize, Deserialize)]
165164
#[serde(rename_all = "camelCase")]
166165
pub struct CorrelationRequest {
167-
pub version: CorrelationVersion,
168166
pub title: String,
169167
pub table_configs: Vec<TableConfig>,
170168
pub join_config: JoinConfig,
@@ -176,9 +174,10 @@ pub struct CorrelationRequest {
176174
impl From<CorrelationRequest> for CorrelationConfig {
177175
fn from(val: CorrelationRequest) -> Self {
178176
Self {
179-
version: val.version,
177+
version: CorrelationVersion::V1,
180178
title: val.title,
181179
id: get_hash(Utc::now().timestamp_micros().to_string().as_str()),
180+
user_id: String::default(),
182181
table_configs: val.table_configs,
183182
join_config: val.join_config,
184183
filter: val.filter,
@@ -189,11 +188,12 @@ impl From<CorrelationRequest> for CorrelationConfig {
189188
}
190189

191190
impl CorrelationRequest {
192-
pub fn generate_correlation_config(self, id: String) -> CorrelationConfig {
191+
pub fn generate_correlation_config(self, id: String, user_id: String) -> CorrelationConfig {
193192
CorrelationConfig {
194-
version: self.version,
193+
version: CorrelationVersion::V1,
195194
title: self.title,
196195
id,
196+
user_id,
197197
table_configs: self.table_configs,
198198
join_config: self.join_config,
199199
filter: self.filter,

src/handlers/http/correlation.rs

Lines changed: 53 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -17,40 +17,48 @@
1717
*/
1818

1919
use actix_web::{web, HttpRequest, HttpResponse, Responder};
20+
use anyhow::Error;
2021
use bytes::Bytes;
2122
use itertools::Itertools;
22-
use relative_path::RelativePathBuf;
2323

2424
use crate::rbac::Users;
25-
use crate::utils::user_auth_for_query;
26-
use crate::{
27-
option::CONFIG, storage::CORRELATIONS_ROOT_DIRECTORY,
28-
utils::actix::extract_session_key_from_req,
29-
};
25+
use crate::storage::object_storage::correlation_path;
26+
use crate::utils::{get_hash, get_user_from_request, user_auth_for_query};
27+
use crate::{option::CONFIG, utils::actix::extract_session_key_from_req};
3028

3129
use crate::correlation::{CorrelationConfig, CorrelationError, CorrelationRequest, CORRELATIONS};
3230

3331
pub async fn list(req: HttpRequest) -> Result<impl Responder, CorrelationError> {
3432
let session_key = extract_session_key_from_req(&req)
35-
.map_err(|err| CorrelationError::AnyhowError(anyhow::Error::msg(err.to_string())))?;
33+
.map_err(|err| CorrelationError::AnyhowError(Error::msg(err.to_string())))?;
34+
35+
let user_id = get_user_from_request(&req)
36+
.map(|s| get_hash(&s.to_string()))
37+
.map_err(|err| CorrelationError::AnyhowError(Error::msg(err.to_string())))?;
3638

3739
let correlations = CORRELATIONS
38-
.list_correlations_for_user(&session_key)
40+
.list_correlations_for_user(&session_key, &user_id)
3941
.await?;
4042

4143
Ok(web::Json(correlations))
4244
}
4345

4446
pub async fn get(req: HttpRequest) -> Result<impl Responder, CorrelationError> {
4547
let session_key = extract_session_key_from_req(&req)
46-
.map_err(|err| CorrelationError::AnyhowError(anyhow::Error::msg(err.to_string())))?;
48+
.map_err(|err| CorrelationError::AnyhowError(Error::msg(err.to_string())))?;
49+
50+
let user_id = get_user_from_request(&req)
51+
.map(|s| get_hash(&s.to_string()))
52+
.map_err(|err| CorrelationError::AnyhowError(Error::msg(err.to_string())))?;
4753

4854
let correlation_id = req
4955
.match_info()
5056
.get("correlation_id")
5157
.ok_or(CorrelationError::Metadata("No correlation ID Provided"))?;
5258

53-
let correlation = CORRELATIONS.get_correlation_by_id(correlation_id).await?;
59+
let correlation = CORRELATIONS
60+
.get_correlation(correlation_id, &user_id)
61+
.await?;
5462

5563
let permissions = Users.get_permissions(&session_key);
5664

@@ -68,16 +76,24 @@ pub async fn get(req: HttpRequest) -> Result<impl Responder, CorrelationError> {
6876
pub async fn post(req: HttpRequest, body: Bytes) -> Result<impl Responder, CorrelationError> {
6977
let session_key = extract_session_key_from_req(&req)
7078
.map_err(|err| CorrelationError::AnyhowError(anyhow::Error::msg(err.to_string())))?;
79+
let user_id = get_user_from_request(&req)
80+
.map(|s| get_hash(&s.to_string()))
81+
.map_err(|err| CorrelationError::AnyhowError(Error::msg(err.to_string())))?;
7182

7283
let correlation_request: CorrelationRequest = serde_json::from_slice(&body)?;
7384

7485
correlation_request.validate(&session_key).await?;
7586

76-
let correlation: CorrelationConfig = correlation_request.into();
87+
let mut correlation: CorrelationConfig = correlation_request.into();
88+
correlation.user_id.clone_from(&user_id);
89+
let correlation_id = &correlation.id;
90+
let path = correlation_path(&user_id, &format!("{}.json", correlation_id));
7791

78-
// Save to disk
7992
let store = CONFIG.storage().get_object_store();
80-
store.put_correlation(&correlation).await?;
93+
let correlation_bytes = serde_json::to_vec(&correlation)?;
94+
store
95+
.put_object(&path, Bytes::from(correlation_bytes))
96+
.await?;
8197

8298
// Save to memory
8399
CORRELATIONS.update(&correlation).await?;
@@ -88,14 +104,19 @@ pub async fn post(req: HttpRequest, body: Bytes) -> Result<impl Responder, Corre
88104
pub async fn modify(req: HttpRequest, body: Bytes) -> Result<impl Responder, CorrelationError> {
89105
let session_key = extract_session_key_from_req(&req)
90106
.map_err(|err| CorrelationError::AnyhowError(anyhow::Error::msg(err.to_string())))?;
107+
let user_id = get_user_from_request(&req)
108+
.map(|s| get_hash(&s.to_string()))
109+
.map_err(|err| CorrelationError::AnyhowError(Error::msg(err.to_string())))?;
91110

92111
let correlation_id = req
93112
.match_info()
94113
.get("correlation_id")
95114
.ok_or(CorrelationError::Metadata("No correlation ID Provided"))?;
96115

97116
// validate whether user has access to this correlation object or not
98-
let correlation = CORRELATIONS.get_correlation_by_id(correlation_id).await?;
117+
let correlation = CORRELATIONS
118+
.get_correlation(correlation_id, &user_id)
119+
.await?;
99120
let permissions = Users.get_permissions(&session_key);
100121
let tables = &correlation
101122
.table_configs
@@ -108,11 +129,17 @@ pub async fn modify(req: HttpRequest, body: Bytes) -> Result<impl Responder, Cor
108129
let correlation_request: CorrelationRequest = serde_json::from_slice(&body)?;
109130
correlation_request.validate(&session_key).await?;
110131

111-
let correlation = correlation_request.generate_correlation_config(correlation_id.to_owned());
132+
let correlation =
133+
correlation_request.generate_correlation_config(correlation_id.to_owned(), user_id.clone());
134+
135+
let correlation_id = &correlation.id;
136+
let path = correlation_path(&user_id, &format!("{}.json", correlation_id));
112137

113-
// Save to disk
114138
let store = CONFIG.storage().get_object_store();
115-
store.put_correlation(&correlation).await?;
139+
let correlation_bytes = serde_json::to_vec(&correlation)?;
140+
store
141+
.put_object(&path, Bytes::from(correlation_bytes))
142+
.await?;
116143

117144
// Save to memory
118145
CORRELATIONS.update(&correlation).await?;
@@ -123,13 +150,18 @@ pub async fn modify(req: HttpRequest, body: Bytes) -> Result<impl Responder, Cor
123150
pub async fn delete(req: HttpRequest) -> Result<impl Responder, CorrelationError> {
124151
let session_key = extract_session_key_from_req(&req)
125152
.map_err(|err| CorrelationError::AnyhowError(anyhow::Error::msg(err.to_string())))?;
153+
let user_id = get_user_from_request(&req)
154+
.map(|s| get_hash(&s.to_string()))
155+
.map_err(|err| CorrelationError::AnyhowError(Error::msg(err.to_string())))?;
126156

127157
let correlation_id = req
128158
.match_info()
129159
.get("correlation_id")
130160
.ok_or(CorrelationError::Metadata("No correlation ID Provided"))?;
131161

132-
let correlation = CORRELATIONS.get_correlation_by_id(correlation_id).await?;
162+
let correlation = CORRELATIONS
163+
.get_correlation(correlation_id, &user_id)
164+
.await?;
133165

134166
// validate user's query auth
135167
let permissions = Users.get_permissions(&session_key);
@@ -141,12 +173,10 @@ pub async fn delete(req: HttpRequest) -> Result<impl Responder, CorrelationError
141173

142174
user_auth_for_query(&permissions, tables)?;
143175

144-
// Delete from disk
176+
let correlation_id = &correlation.id;
177+
let path = correlation_path(&user_id, &format!("{}.json", correlation_id));
178+
145179
let store = CONFIG.storage().get_object_store();
146-
let path = RelativePathBuf::from_iter([
147-
CORRELATIONS_ROOT_DIRECTORY,
148-
&format!("{}.json", correlation_id),
149-
]);
150180
store.delete_object(&path).await?;
151181

152182
// Delete from memory

src/handlers/http/users/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,3 +22,4 @@ pub mod filters;
2222
pub const USERS_ROOT_DIR: &str = ".users";
2323
pub const DASHBOARDS_DIR: &str = "dashboards";
2424
pub const FILTER_DIR: &str = "filters";
25+
pub const CORRELATION_DIR: &str = "correlations";

0 commit comments

Comments
 (0)