Skip to content

Commit e60a884

Browse files
committed
Feature: Backend for Correlation
1 parent d332358 commit e60a884

File tree

11 files changed

+520
-4
lines changed

11 files changed

+520
-4
lines changed

src/cli.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -529,7 +529,9 @@ impl FromArgMatches for Cli {
529529
self.kafka_host = m.get_one::<String>(Self::KAFKA_HOST).cloned();
530530
self.kafka_group = m.get_one::<String>(Self::KAFKA_GROUP).cloned();
531531
self.kafka_client_id = m.get_one::<String>(Self::KAFKA_CLIENT_ID).cloned();
532-
self.kafka_security_protocol = m.get_one::<SslProtocol>(Self::KAFKA_SECURITY_PROTOCOL).cloned();
532+
self.kafka_security_protocol = m
533+
.get_one::<SslProtocol>(Self::KAFKA_SECURITY_PROTOCOL)
534+
.cloned();
533535
self.kafka_partitions = m.get_one::<String>(Self::KAFKA_PARTITIONS).cloned();
534536

535537
self.tls_cert_path = m.get_one::<PathBuf>(Self::TLS_CERT).cloned();

src/correlation/correlation_utils.rs

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Parseable Server (C) 2022 - 2024 Parseable, Inc.
3+
*
4+
* This program is free software: you can redistribute it and/or modify
5+
* it under the terms of the GNU Affero General Public License as
6+
* published by the Free Software Foundation, either version 3 of the
7+
* License, or (at your option) any later version.
8+
*
9+
* This program is distributed in the hope that it will be useful,
10+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
* GNU Affero General Public License for more details.
13+
*
14+
* You should have received a copy of the GNU Affero General Public License
15+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
16+
*
17+
*/
18+
19+
use datafusion::common::tree_node::TreeNode;
20+
21+
use crate::{
22+
query::{TableScanVisitor, QUERY_SESSION},
23+
rbac::{
24+
map::SessionKey,
25+
role::{Action, Permission},
26+
Users,
27+
},
28+
};
29+
30+
use super::CorrelationError;
31+
32+
async fn get_tables_from_query(query: &str) -> Result<TableScanVisitor, CorrelationError> {
33+
let session_state = QUERY_SESSION.state();
34+
let raw_logical_plan = session_state
35+
.create_logical_plan(query)
36+
.await
37+
.map_err(|err| CorrelationError::AnyhowError(err.into()))?;
38+
39+
let mut visitor = TableScanVisitor::default();
40+
let _ = raw_logical_plan.visit(&mut visitor);
41+
Ok(visitor)
42+
}
43+
44+
pub async fn user_auth_for_query(
45+
session_key: &SessionKey,
46+
query: &str,
47+
) -> Result<(), CorrelationError> {
48+
let tables = get_tables_from_query(query).await?;
49+
let permissions = Users.get_permissions(session_key);
50+
51+
for table_name in tables.into_inner().iter() {
52+
let mut authorized = false;
53+
54+
// in permission check if user can run query on the stream.
55+
// also while iterating add any filter tags for this stream
56+
for permission in permissions.iter() {
57+
match permission {
58+
Permission::Stream(Action::All, _) => {
59+
authorized = true;
60+
break;
61+
}
62+
Permission::StreamWithTag(Action::Query, ref stream, _)
63+
if stream == table_name || stream == "*" =>
64+
{
65+
authorized = true;
66+
}
67+
_ => (),
68+
}
69+
}
70+
71+
if !authorized {
72+
return Err(CorrelationError::Unauthorized);
73+
}
74+
}
75+
76+
Ok(())
77+
}

src/correlation/http_handlers.rs

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
/*
2+
* Parseable Server (C) 2022 - 2024 Parseable, Inc.
3+
*
4+
* This program is free software: you can redistribute it and/or modify
5+
* it under the terms of the GNU Affero General Public License as
6+
* published by the Free Software Foundation, either version 3 of the
7+
* License, or (at your option) any later version.
8+
*
9+
* This program is distributed in the hope that it will be useful,
10+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
* GNU Affero General Public License for more details.
13+
*
14+
* You should have received a copy of the GNU Affero General Public License
15+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
16+
*
17+
*/
18+
19+
use actix_web::{web, HttpRequest, Responder};
20+
use bytes::Bytes;
21+
use relative_path::RelativePathBuf;
22+
23+
use crate::{
24+
option::CONFIG,
25+
storage::{CORRELATION_DIRECTORY, PARSEABLE_ROOT_DIRECTORY},
26+
utils::{actix::extract_session_key_from_req, uid::Uid},
27+
};
28+
29+
use super::{
30+
correlation_utils::user_auth_for_query, CorrelationConfig, CorrelationError,
31+
CorrelationRequest, CORRELATIONS,
32+
};
33+
34+
pub async fn list(req: HttpRequest) -> Result<impl Responder, CorrelationError> {
35+
let session_key = extract_session_key_from_req(&req)
36+
.map_err(|err| CorrelationError::AnyhowError(anyhow::Error::msg(err.to_string())))?;
37+
38+
let correlations = CORRELATIONS
39+
.list_correlations_for_user(&session_key)
40+
.await?;
41+
42+
Ok(web::Json(correlations))
43+
}
44+
45+
pub async fn get(req: HttpRequest) -> Result<impl Responder, CorrelationError> {
46+
let session_key = extract_session_key_from_req(&req)
47+
.map_err(|err| CorrelationError::AnyhowError(anyhow::Error::msg(err.to_string())))?;
48+
49+
let correlation_id = req
50+
.match_info()
51+
.get("correlation_id")
52+
.ok_or(CorrelationError::Metadata("No correlation ID Provided"))?;
53+
54+
let correlation = CORRELATIONS.get_correlation_by_id(correlation_id).await?;
55+
56+
if user_auth_for_query(&session_key, &correlation.query)
57+
.await
58+
.is_ok()
59+
{
60+
Ok(web::Json(correlation))
61+
} else {
62+
Err(CorrelationError::Unauthorized)
63+
}
64+
}
65+
66+
pub async fn post(req: HttpRequest, body: Bytes) -> Result<impl Responder, CorrelationError> {
67+
let session_key = extract_session_key_from_req(&req)
68+
.map_err(|err| CorrelationError::AnyhowError(anyhow::Error::msg(err.to_string())))?;
69+
70+
let correlation_request: CorrelationRequest = serde_json::from_slice(&body)?;
71+
let correlation: CorrelationConfig = correlation_request.into();
72+
73+
// validate user's query auth
74+
user_auth_for_query(&session_key, &correlation.query).await?;
75+
76+
// Save to disk
77+
let store = CONFIG.storage().get_object_store();
78+
store.put_correlation(&correlation).await?;
79+
80+
// Save to memory
81+
CORRELATIONS.update(&correlation).await?;
82+
83+
Ok(format!(
84+
"Saved correlation with ID- {}",
85+
correlation.id.to_string()
86+
))
87+
}
88+
89+
pub async fn modify(req: HttpRequest, body: Bytes) -> Result<impl Responder, CorrelationError> {
90+
let session_key = extract_session_key_from_req(&req)
91+
.map_err(|err| CorrelationError::AnyhowError(anyhow::Error::msg(err.to_string())))?;
92+
93+
let correlation_id = req
94+
.match_info()
95+
.get("correlation_id")
96+
.ok_or(CorrelationError::Metadata("No correlation ID Provided"))?;
97+
98+
let correlation_request: CorrelationRequest = serde_json::from_slice(&body)?;
99+
100+
// validate user's query auth
101+
user_auth_for_query(&session_key, &correlation_request.query).await?;
102+
103+
let correlation: CorrelationConfig = CorrelationConfig {
104+
version: correlation_request.version,
105+
id: Uid::from_string(correlation_id)
106+
.map_err(|err| CorrelationError::AnyhowError(anyhow::Error::msg(err.to_string())))?,
107+
query: correlation_request.query,
108+
};
109+
110+
// Save to disk
111+
let store = CONFIG.storage().get_object_store();
112+
store.put_correlation(&correlation).await?;
113+
114+
// Save to memory
115+
CORRELATIONS.update(&correlation).await?;
116+
117+
Ok(format!(
118+
"Modified correlation with ID- {}",
119+
correlation.id.to_string()
120+
))
121+
}
122+
123+
pub async fn delete(req: HttpRequest) -> Result<impl Responder, CorrelationError> {
124+
let session_key = extract_session_key_from_req(&req)
125+
.map_err(|err| CorrelationError::AnyhowError(anyhow::Error::msg(err.to_string())))?;
126+
127+
let correlation_id = req
128+
.match_info()
129+
.get("correlation_id")
130+
.ok_or(CorrelationError::Metadata("No correlation ID Provided"))?;
131+
132+
let correlation = CORRELATIONS.get_correlation_by_id(correlation_id).await?;
133+
134+
// validate user's query auth
135+
user_auth_for_query(&session_key, &correlation.query).await?;
136+
137+
// Delete from disk
138+
let store = CONFIG.storage().get_object_store();
139+
let path = RelativePathBuf::from_iter([
140+
PARSEABLE_ROOT_DIRECTORY,
141+
CORRELATION_DIRECTORY,
142+
&format!("{}", correlation.id),
143+
]);
144+
store.delete_object(&path).await?;
145+
146+
// Delete from memory
147+
CORRELATIONS.delete(correlation_id).await?;
148+
Ok(format!("Deleted correlation with ID- {correlation_id}"))
149+
}

0 commit comments

Comments
 (0)