Skip to content

Commit 5ca9ecb

Browse files
authored
feat: separate out ingest and query functionality of the server (#634)
Add P_MODE with options `ingest`, `query` and `all`. Default mode is `all`. There are still more changes required for both modes to work well. Will be added in next subsequent PRs. Fixes #617
1 parent b026b40 commit 5ca9ecb

File tree

3 files changed

+142
-4
lines changed

3 files changed

+142
-4
lines changed

server/src/handlers/http.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ use rustls_pemfile::{certs, pkcs8_private_keys};
3535
use crate::option::CONFIG;
3636
use crate::rbac::role::Action;
3737

38-
use self::middleware::{DisAllowRootUser, RouteExt};
38+
use self::middleware::{DisAllowRootUser, ModeFilter, RouteExt};
3939

4040
mod about;
4141
mod health_check;
@@ -76,6 +76,7 @@ pub async fn run_http(
7676
.wrap(actix_web::middleware::Logger::default())
7777
.wrap(actix_web::middleware::Compress::default())
7878
.wrap(cross_origin_config())
79+
.wrap(ModeFilter)
7980
};
8081

8182
let ssl_acceptor = match (
@@ -272,13 +273,16 @@ pub fn configure_routes(
272273
);
273274

274275
let role_api = web::scope("/role")
276+
// GET Role List
275277
.service(resource("").route(web::get().to(role::list).authorize(Action::ListRole)))
276278
.service(
279+
// PUT and GET Default Role
277280
resource("/default")
278281
.route(web::put().to(role::put_default).authorize(Action::PutRole))
279282
.route(web::get().to(role::get_default).authorize(Action::GetRole)),
280283
)
281284
.service(
285+
// PUT, GET, DELETE Roles
282286
resource("/{name}")
283287
.route(web::put().to(role::put).authorize(Action::PutRole))
284288
.route(web::delete().to(role::delete).authorize(Action::DeleteRole))
@@ -299,6 +303,7 @@ pub fn configure_routes(
299303
cfg.service(
300304
// Base path "{url}/api/v1"
301305
web::scope(&base_path())
306+
// .wrap(PathFilter)
302307
// POST "/query" ==> Get results of the SQL query passed in request body
303308
.service(
304309
web::resource("/query")

server/src/handlers/http/middleware.rs

Lines changed: 101 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,12 @@ use actix_web::{
2727
};
2828
use futures_util::future::LocalBoxFuture;
2929

30-
use crate::handlers::{
31-
AUTHORIZATION_KEY, KINESIS_COMMON_ATTRIBUTES_KEY, LOG_SOURCE_KEY, LOG_SOURCE_KINESIS,
32-
STREAM_NAME_HEADER_KEY,
30+
use crate::{
31+
handlers::{
32+
AUTHORIZATION_KEY, KINESIS_COMMON_ATTRIBUTES_KEY, LOG_SOURCE_KEY, LOG_SOURCE_KINESIS,
33+
STREAM_NAME_HEADER_KEY,
34+
},
35+
option::Mode,
3336
};
3437
use crate::{
3538
option::CONFIG,
@@ -252,3 +255,98 @@ where
252255
})
253256
}
254257
}
258+
259+
/// ModeFilterMiddleware factory
260+
pub struct ModeFilter;
261+
262+
/// PathFilterMiddleware needs to implement Service trait
263+
impl<S, B> Transform<S, ServiceRequest> for ModeFilter
264+
where
265+
S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = Error>,
266+
S::Future: 'static,
267+
B: 'static,
268+
{
269+
type Response = ServiceResponse<B>;
270+
type Error = Error;
271+
type InitError = ();
272+
type Transform = ModeFilterMiddleware<S>;
273+
type Future = Ready<Result<Self::Transform, Self::InitError>>;
274+
275+
fn new_transform(&self, service: S) -> Self::Future {
276+
ready(Ok(ModeFilterMiddleware { service }))
277+
}
278+
}
279+
280+
/// Actual middleware service
281+
pub struct ModeFilterMiddleware<S> {
282+
service: S,
283+
}
284+
285+
/// Impl the service trait for the middleware service
286+
impl<S, B> Service<ServiceRequest> for ModeFilterMiddleware<S>
287+
where
288+
S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = Error>,
289+
S::Future: 'static,
290+
B: 'static,
291+
{
292+
type Response = ServiceResponse<B>;
293+
type Error = Error;
294+
type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;
295+
296+
// impl poll_ready
297+
actix_web::dev::forward_ready!(service);
298+
299+
fn call(&self, req: ServiceRequest) -> Self::Future {
300+
let path = req.path();
301+
let mode = &CONFIG.parseable.mode;
302+
303+
// change error messages based on mode
304+
match mode {
305+
Mode::Query => {
306+
let cond = path.split('/').any(|x| x == "ingest");
307+
if cond {
308+
Box::pin(async {
309+
Err(actix_web::error::ErrorUnauthorized(
310+
"Ingest API cannot be accessed in Query Mode",
311+
))
312+
})
313+
} else {
314+
let fut = self.service.call(req);
315+
316+
Box::pin(async move {
317+
let res = fut.await?;
318+
Ok(res)
319+
})
320+
}
321+
}
322+
323+
Mode::Ingest => {
324+
let accessable_endpoints = ["ingest", "logstream", "liveness", "readiness"];
325+
let cond = path.split('/').any(|x| accessable_endpoints.contains(&x));
326+
if !cond {
327+
Box::pin(async {
328+
Err(actix_web::error::ErrorUnauthorized(
329+
"Only Ingestion API can be accessed in Ingest Mode",
330+
))
331+
})
332+
} else {
333+
let fut = self.service.call(req);
334+
335+
Box::pin(async move {
336+
let res = fut.await?;
337+
Ok(res)
338+
})
339+
}
340+
}
341+
342+
Mode::All => {
343+
let fut = self.service.call(req);
344+
345+
Box::pin(async move {
346+
let res = fut.await?;
347+
Ok(res)
348+
})
349+
}
350+
}
351+
}
352+
}

server/src/option.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,9 @@ pub struct Server {
251251

252252
/// Parquet compression algorithm
253253
pub parquet_compression: Compression,
254+
255+
/// Mode of operation
256+
pub mode: Mode,
254257
}
255258

256259
impl FromArgMatches for Server {
@@ -354,6 +357,17 @@ impl FromArgMatches for Server {
354357
_ => None,
355358
};
356359

360+
self.mode = match m
361+
.get_one::<String>(Self::MODE)
362+
.expect("Mode not set")
363+
.as_str()
364+
{
365+
"query" => Mode::Query,
366+
"ingest" => Mode::Ingest,
367+
"all" => Mode::All,
368+
_ => unreachable!(),
369+
};
370+
357371
Ok(())
358372
}
359373
}
@@ -382,6 +396,7 @@ impl Server {
382396
pub const QUERY_MEM_POOL_SIZE: &'static str = "query-mempool-size";
383397
pub const ROW_GROUP_SIZE: &'static str = "row-group-size";
384398
pub const PARQUET_COMPRESSION_ALGO: &'static str = "compression-algo";
399+
pub const MODE: &'static str = "mode";
385400
pub const DEFAULT_USERNAME: &'static str = "admin";
386401
pub const DEFAULT_PASSWORD: &'static str = "admin";
387402

@@ -578,6 +593,18 @@ impl Server {
578593
.default_value("16384")
579594
.value_parser(value_parser!(usize))
580595
.help("Number of rows in a row group"),
596+
).arg(
597+
Arg::new(Self::MODE)
598+
.long(Self::MODE)
599+
.env("P_MODE")
600+
.value_name("STRING")
601+
.required(false)
602+
.default_value("all")
603+
.value_parser([
604+
"query",
605+
"ingest",
606+
"all"])
607+
.help("Mode of operation"),
581608
)
582609
.arg(
583610
Arg::new(Self::PARQUET_COMPRESSION_ALGO)
@@ -604,6 +631,14 @@ impl Server {
604631
}
605632
}
606633

634+
#[derive(Debug, Default, Eq, PartialEq)]
635+
pub enum Mode {
636+
Query,
637+
Ingest,
638+
#[default]
639+
All,
640+
}
641+
607642
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
608643
#[allow(non_camel_case_types, clippy::upper_case_acronyms)]
609644
pub enum Compression {

0 commit comments

Comments
 (0)