Skip to content

feat: DEPRECATE caching #1035

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 0 additions & 16 deletions src/banner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/

use crossterm::style::Stylize;
use human_size::SpecificSize;

use crate::about;
use crate::utils::uid::Uid;
Expand Down Expand Up @@ -93,7 +92,6 @@ fn status_info(config: &Config, scheme: &str, id: Uid) {
/// Prints information about the `ObjectStorage`.
/// - Mode (`Local drive`, `S3 bucket`)
/// - Staging (temporary landing point for incoming events)
/// - Cache (local cache of data)
/// - Store (path where the data is stored and its latency)
async fn storage_info(config: &Config) {
let storage = config.storage();
Expand All @@ -109,20 +107,6 @@ async fn storage_info(config: &Config) {
config.staging_dir().to_string_lossy(),
);

if let Some(path) = &config.parseable.local_cache_path {
let size: SpecificSize<human_size::Gigibyte> =
SpecificSize::new(config.parseable.local_cache_size as f64, human_size::Byte)
.unwrap()
.into();

eprintln!(
"\
{:8}Cache: \"{}\", (size: {})",
"",
path.display(),
size
);
}
if let Some(path) = &config.parseable.hot_tier_storage_path {
eprintln!(
"\
Expand Down
70 changes: 3 additions & 67 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,9 @@ pub struct Cli {
pub domain_address: Option<Url>,

/// The local staging path is used as a temporary landing point
/// for incoming events and local cache
/// for incoming events
pub local_staging_path: PathBuf,

/// The local cache path is used for speeding up query on latest data
pub local_cache_path: Option<PathBuf>,

/// Size for local cache
pub local_cache_size: u64,

/// Username for the basic authentication on the server
pub username: String,

Expand Down Expand Up @@ -96,12 +90,6 @@ pub struct Cli {
/// port use by airplane(flight query service)
pub flight_port: u16,

/// to query cached data
pub query_cache_path: Option<PathBuf>,

/// Size for local cache
pub query_cache_size: u64,

/// CORS behaviour
pub cors: bool,

Expand Down Expand Up @@ -129,10 +117,6 @@ impl Cli {
pub const ADDRESS: &'static str = "address";
pub const DOMAIN_URI: &'static str = "origin";
pub const STAGING: &'static str = "local-staging-path";
pub const CACHE: &'static str = "cache-path";
pub const QUERY_CACHE: &'static str = "query-cache-path";
pub const QUERY_CACHE_SIZE: &'static str = "query-cache-size";
pub const CACHE_SIZE: &'static str = "cache-size";
pub const USERNAME: &'static str = "username";
pub const PASSWORD: &'static str = "password";
pub const CHECK_UPDATE: &'static str = "check-update";
Expand Down Expand Up @@ -255,45 +239,7 @@ impl Cli {
.help("Local path on this device to be used as landing point for incoming events")
.next_line_help(true),
)
.arg(
Arg::new(Self::CACHE)
.long(Self::CACHE)
.env("P_CACHE_DIR")
.value_name("DIR")
.value_parser(validation::canonicalize_path)
.help("Local path on this device to be used for caching data")
.next_line_help(true),
)
.arg(
Arg::new(Self::CACHE_SIZE)
.long(Self::CACHE_SIZE)
.env("P_CACHE_SIZE")
.value_name("size")
.default_value("1GiB")
.value_parser(validation::cache_size)
.help("Maximum allowed cache size for all streams combined (In human readable format, e.g 1GiB, 2GiB, 100MB)")
.next_line_help(true),
)
.arg(
Arg::new(Self::QUERY_CACHE)
.long(Self::QUERY_CACHE)
.env("P_QUERY_CACHE_DIR")
.value_name("DIR")
.value_parser(validation::canonicalize_path)
.help("Local path on this device to be used for caching data")
.next_line_help(true),
)
.arg(
Arg::new(Self::QUERY_CACHE_SIZE)
.long(Self::QUERY_CACHE_SIZE)
.env("P_QUERY_CACHE_SIZE")
.value_name("size")
.default_value("1GiB")
.value_parser(validation::cache_size)
.help("Maximum allowed cache size for all streams combined (In human readable format, e.g 1GiB, 2GiB, 100MB)")
.next_line_help(true),
)
.arg(
.arg(
Arg::new(Self::USERNAME)
.long(Self::USERNAME)
.env("P_USERNAME")
Expand Down Expand Up @@ -423,7 +369,7 @@ impl Cli {
.arg(
// RowGroupSize controls the number of rows present in one row group
// More rows = better compression but HIGHER Memory consumption during read/write
// 1048576 is the default value for DataFusion
// 1048576 is the default value for DataFusion
Arg::new(Self::ROW_GROUP_SIZE)
.long(Self::ROW_GROUP_SIZE)
.env("P_PARQUET_ROW_GROUP_SIZE")
Expand Down Expand Up @@ -520,8 +466,6 @@ impl FromArgMatches for Cli {
self.trino_schema = m.get_one::<String>(Self::TRINO_SCHEMA).cloned();
self.trino_username = m.get_one::<String>(Self::TRINO_USER_NAME).cloned();

self.local_cache_path = m.get_one::<PathBuf>(Self::CACHE).cloned();
self.query_cache_path = m.get_one::<PathBuf>(Self::QUERY_CACHE).cloned();
self.tls_cert_path = m.get_one::<PathBuf>(Self::TLS_CERT).cloned();
self.tls_key_path = m.get_one::<PathBuf>(Self::TLS_KEY).cloned();
self.trusted_ca_certs_path = m.get_one::<PathBuf>(Self::TRUSTED_CA_CERTS_PATH).cloned();
Expand All @@ -541,14 +485,6 @@ impl FromArgMatches for Cli {
.get_one::<PathBuf>(Self::STAGING)
.cloned()
.expect("default value for staging");
self.local_cache_size = m
.get_one::<u64>(Self::CACHE_SIZE)
.cloned()
.expect("default value for cache size");
self.query_cache_size = m
.get_one(Self::QUERY_CACHE_SIZE)
.cloned()
.expect("default value for query cache size");
self.username = m
.get_one::<String>(Self::USERNAME)
.cloned()
Expand Down
65 changes: 4 additions & 61 deletions src/handlers/airplane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,13 @@ use tonic::transport::{Identity, Server, ServerTlsConfig};
use tonic_web::GrpcWebLayer;

use crate::handlers::http::cluster::get_ingestor_info;

use crate::handlers::{CACHE_RESULTS_HEADER_KEY, CACHE_VIEW_HEADER_KEY, USER_ID_HEADER_KEY};
use crate::metrics::QUERY_EXECUTE_TIME;
use crate::option::CONFIG;

use crate::handlers::livetail::cross_origin_config;

use crate::handlers::http::query::{
authorize_and_set_filter_tags, into_query, put_results_in_cache, update_schema_when_distributed,
authorize_and_set_filter_tags, into_query, update_schema_when_distributed,
};
use crate::handlers::livetail::cross_origin_config;
use crate::metrics::QUERY_EXECUTE_TIME;
use crate::option::CONFIG;
use crate::query::{TableScanVisitor, QUERY_SESSION};
use crate::querycache::QueryCacheManager;
use crate::utils::arrow::flight::{
append_temporary_events, get_query_from_ticket, into_flight_data, run_do_get_rpc,
send_to_ingester,
Expand All @@ -64,8 +59,6 @@ use crate::metadata::STREAM_INFO;
use crate::rbac;
use crate::rbac::Users;

use super::http::query::get_results_from_cache;

#[derive(Clone, Debug)]
pub struct AirServiceImpl {}

Expand Down Expand Up @@ -156,46 +149,11 @@ impl FlightService for AirServiceImpl {

let streams = visitor.into_inner();

let query_cache_manager = QueryCacheManager::global(CONFIG.parseable.query_cache_size)
.await
.unwrap_or(None);

let cache_results = req
.metadata()
.get(CACHE_RESULTS_HEADER_KEY)
.and_then(|value| value.to_str().ok()); // I dont think we need to own this.

let show_cached = req
.metadata()
.get(CACHE_VIEW_HEADER_KEY)
.and_then(|value| value.to_str().ok());

let user_id = req
.metadata()
.get(USER_ID_HEADER_KEY)
.and_then(|value| value.to_str().ok());
let stream_name = streams
.first()
.ok_or_else(|| Status::aborted("Malformed SQL Provided, Table Name Not Found"))?
.to_owned();

// send the cached results
if let Ok(cache_results) = get_results_from_cache(
show_cached,
query_cache_manager,
&stream_name,
user_id,
&ticket.start_time,
&ticket.end_time,
&ticket.query,
ticket.send_null,
ticket.fields,
)
.await
{
return cache_results.into_flight();
}

update_schema_when_distributed(streams)
.await
.map_err(|err| Status::internal(err.to_string()))?;
Expand Down Expand Up @@ -258,21 +216,6 @@ impl FlightService for AirServiceImpl {
.await
.map_err(|err| Status::internal(err.to_string()))?;

if let Err(err) = put_results_in_cache(
cache_results,
user_id,
query_cache_manager,
&stream_name,
&records,
query.start.to_rfc3339(),
query.end.to_rfc3339(),
ticket.query,
)
.await
{
error!("{}", err);
};

/*
* INFO: No returning the schema with the data.
* kept it in case it needs to be sent in the future.
Expand Down
95 changes: 0 additions & 95 deletions src/handlers/http/cache.rs

This file was deleted.

4 changes: 0 additions & 4 deletions src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use crate::event::{
};
use crate::handlers::http::modal::utils::logstream_utils::create_stream_and_schema_from_storage;
use crate::handlers::STREAM_NAME_HEADER_KEY;
use crate::localcache::CacheError;
use crate::metadata::error::stream_info::MetadataError;
use crate::metadata::STREAM_INFO;
use crate::option::{Mode, CONFIG};
Expand Down Expand Up @@ -236,8 +235,6 @@ pub enum PostError {
#[error("Error: {0}")]
DashboardError(#[from] DashboardError),
#[error("Error: {0}")]
CacheError(#[from] CacheError),
#[error("Error: {0}")]
StreamError(#[from] StreamError),
}

Expand All @@ -259,7 +256,6 @@ impl actix_web::ResponseError for PostError {
PostError::ObjectStorageError(_) => StatusCode::INTERNAL_SERVER_ERROR,
PostError::DashboardError(_) => StatusCode::INTERNAL_SERVER_ERROR,
PostError::FiltersError(_) => StatusCode::INTERNAL_SERVER_ERROR,
PostError::CacheError(_) => StatusCode::INTERNAL_SERVER_ERROR,
PostError::StreamError(_) => StatusCode::INTERNAL_SERVER_ERROR,
}
}
Expand Down
Loading
Loading