diff --git a/src/banner.rs b/src/banner.rs index 30199e34d..a115cc5f9 100644 --- a/src/banner.rs +++ b/src/banner.rs @@ -18,7 +18,6 @@ */ use crossterm::style::Stylize; -use human_size::SpecificSize; use crate::about; use crate::utils::uid::Uid; @@ -93,7 +92,6 @@ fn status_info(config: &Config, scheme: &str, id: Uid) { /// Prints information about the `ObjectStorage`. /// - Mode (`Local drive`, `S3 bucket`) /// - Staging (temporary landing point for incoming events) -/// - Cache (local cache of data) /// - Store (path where the data is stored and its latency) async fn storage_info(config: &Config) { let storage = config.storage(); @@ -109,20 +107,6 @@ async fn storage_info(config: &Config) { config.staging_dir().to_string_lossy(), ); - if let Some(path) = &config.parseable.local_cache_path { - let size: SpecificSize = - SpecificSize::new(config.parseable.local_cache_size as f64, human_size::Byte) - .unwrap() - .into(); - - eprintln!( - "\ - {:8}Cache: \"{}\", (size: {})", - "", - path.display(), - size - ); - } if let Some(path) = &config.parseable.hot_tier_storage_path { eprintln!( "\ diff --git a/src/cli.rs b/src/cli.rs index 982a2a765..193da9632 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -45,15 +45,9 @@ pub struct Cli { pub domain_address: Option, /// The local staging path is used as a temporary landing point - /// for incoming events and local cache + /// for incoming events pub local_staging_path: PathBuf, - /// The local cache path is used for speeding up query on latest data - pub local_cache_path: Option, - - /// Size for local cache - pub local_cache_size: u64, - /// Username for the basic authentication on the server pub username: String, @@ -96,12 +90,6 @@ pub struct Cli { /// port use by airplane(flight query service) pub flight_port: u16, - /// to query cached data - pub query_cache_path: Option, - - /// Size for local cache - pub query_cache_size: u64, - /// CORS behaviour pub cors: bool, @@ -129,10 +117,6 @@ impl Cli { pub const ADDRESS: &'static str = "address"; pub const DOMAIN_URI: &'static str = "origin"; pub const STAGING: &'static str = "local-staging-path"; - pub const CACHE: &'static str = "cache-path"; - pub const QUERY_CACHE: &'static str = "query-cache-path"; - pub const QUERY_CACHE_SIZE: &'static str = "query-cache-size"; - pub const CACHE_SIZE: &'static str = "cache-size"; pub const USERNAME: &'static str = "username"; pub const PASSWORD: &'static str = "password"; pub const CHECK_UPDATE: &'static str = "check-update"; @@ -255,45 +239,7 @@ impl Cli { .help("Local path on this device to be used as landing point for incoming events") .next_line_help(true), ) - .arg( - Arg::new(Self::CACHE) - .long(Self::CACHE) - .env("P_CACHE_DIR") - .value_name("DIR") - .value_parser(validation::canonicalize_path) - .help("Local path on this device to be used for caching data") - .next_line_help(true), - ) - .arg( - Arg::new(Self::CACHE_SIZE) - .long(Self::CACHE_SIZE) - .env("P_CACHE_SIZE") - .value_name("size") - .default_value("1GiB") - .value_parser(validation::cache_size) - .help("Maximum allowed cache size for all streams combined (In human readable format, e.g 1GiB, 2GiB, 100MB)") - .next_line_help(true), - ) - .arg( - Arg::new(Self::QUERY_CACHE) - .long(Self::QUERY_CACHE) - .env("P_QUERY_CACHE_DIR") - .value_name("DIR") - .value_parser(validation::canonicalize_path) - .help("Local path on this device to be used for caching data") - .next_line_help(true), - ) - .arg( - Arg::new(Self::QUERY_CACHE_SIZE) - .long(Self::QUERY_CACHE_SIZE) - .env("P_QUERY_CACHE_SIZE") - .value_name("size") - .default_value("1GiB") - .value_parser(validation::cache_size) - .help("Maximum allowed cache size for all streams combined (In human readable format, e.g 1GiB, 2GiB, 100MB)") - .next_line_help(true), - ) - .arg( + .arg( Arg::new(Self::USERNAME) .long(Self::USERNAME) .env("P_USERNAME") @@ -423,7 +369,7 @@ impl Cli { .arg( // RowGroupSize controls the number of rows present in one row group // More rows = better compression but HIGHER Memory consumption during read/write - // 1048576 is the default value for DataFusion + // 1048576 is the default value for DataFusion Arg::new(Self::ROW_GROUP_SIZE) .long(Self::ROW_GROUP_SIZE) .env("P_PARQUET_ROW_GROUP_SIZE") @@ -520,8 +466,6 @@ impl FromArgMatches for Cli { self.trino_schema = m.get_one::(Self::TRINO_SCHEMA).cloned(); self.trino_username = m.get_one::(Self::TRINO_USER_NAME).cloned(); - self.local_cache_path = m.get_one::(Self::CACHE).cloned(); - self.query_cache_path = m.get_one::(Self::QUERY_CACHE).cloned(); self.tls_cert_path = m.get_one::(Self::TLS_CERT).cloned(); self.tls_key_path = m.get_one::(Self::TLS_KEY).cloned(); self.trusted_ca_certs_path = m.get_one::(Self::TRUSTED_CA_CERTS_PATH).cloned(); @@ -541,14 +485,6 @@ impl FromArgMatches for Cli { .get_one::(Self::STAGING) .cloned() .expect("default value for staging"); - self.local_cache_size = m - .get_one::(Self::CACHE_SIZE) - .cloned() - .expect("default value for cache size"); - self.query_cache_size = m - .get_one(Self::QUERY_CACHE_SIZE) - .cloned() - .expect("default value for query cache size"); self.username = m .get_one::(Self::USERNAME) .cloned() diff --git a/src/handlers/airplane.rs b/src/handlers/airplane.rs index 339dd8f6f..afb1fc3bb 100644 --- a/src/handlers/airplane.rs +++ b/src/handlers/airplane.rs @@ -34,18 +34,13 @@ use tonic::transport::{Identity, Server, ServerTlsConfig}; use tonic_web::GrpcWebLayer; use crate::handlers::http::cluster::get_ingestor_info; - -use crate::handlers::{CACHE_RESULTS_HEADER_KEY, CACHE_VIEW_HEADER_KEY, USER_ID_HEADER_KEY}; -use crate::metrics::QUERY_EXECUTE_TIME; -use crate::option::CONFIG; - -use crate::handlers::livetail::cross_origin_config; - use crate::handlers::http::query::{ - authorize_and_set_filter_tags, into_query, put_results_in_cache, update_schema_when_distributed, + authorize_and_set_filter_tags, into_query, update_schema_when_distributed, }; +use crate::handlers::livetail::cross_origin_config; +use crate::metrics::QUERY_EXECUTE_TIME; +use crate::option::CONFIG; use crate::query::{TableScanVisitor, QUERY_SESSION}; -use crate::querycache::QueryCacheManager; use crate::utils::arrow::flight::{ append_temporary_events, get_query_from_ticket, into_flight_data, run_do_get_rpc, send_to_ingester, @@ -64,8 +59,6 @@ use crate::metadata::STREAM_INFO; use crate::rbac; use crate::rbac::Users; -use super::http::query::get_results_from_cache; - #[derive(Clone, Debug)] pub struct AirServiceImpl {} @@ -156,46 +149,11 @@ impl FlightService for AirServiceImpl { let streams = visitor.into_inner(); - let query_cache_manager = QueryCacheManager::global(CONFIG.parseable.query_cache_size) - .await - .unwrap_or(None); - - let cache_results = req - .metadata() - .get(CACHE_RESULTS_HEADER_KEY) - .and_then(|value| value.to_str().ok()); // I dont think we need to own this. - - let show_cached = req - .metadata() - .get(CACHE_VIEW_HEADER_KEY) - .and_then(|value| value.to_str().ok()); - - let user_id = req - .metadata() - .get(USER_ID_HEADER_KEY) - .and_then(|value| value.to_str().ok()); let stream_name = streams .first() .ok_or_else(|| Status::aborted("Malformed SQL Provided, Table Name Not Found"))? .to_owned(); - // send the cached results - if let Ok(cache_results) = get_results_from_cache( - show_cached, - query_cache_manager, - &stream_name, - user_id, - &ticket.start_time, - &ticket.end_time, - &ticket.query, - ticket.send_null, - ticket.fields, - ) - .await - { - return cache_results.into_flight(); - } - update_schema_when_distributed(streams) .await .map_err(|err| Status::internal(err.to_string()))?; @@ -258,21 +216,6 @@ impl FlightService for AirServiceImpl { .await .map_err(|err| Status::internal(err.to_string()))?; - if let Err(err) = put_results_in_cache( - cache_results, - user_id, - query_cache_manager, - &stream_name, - &records, - query.start.to_rfc3339(), - query.end.to_rfc3339(), - ticket.query, - ) - .await - { - error!("{}", err); - }; - /* * INFO: No returning the schema with the data. * kept it in case it needs to be sent in the future. diff --git a/src/handlers/http/cache.rs b/src/handlers/http/cache.rs deleted file mode 100644 index 29efd09e4..000000000 --- a/src/handlers/http/cache.rs +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Parseable Server (C) 2022 - 2024 Parseable, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - */ - -use actix_web::{web, HttpRequest, HttpResponse, Responder}; -use anyhow::anyhow; -use bytes::Bytes; -use http::StatusCode; -use serde_json::json; - -use crate::{ - option::CONFIG, - querycache::{CacheMetadata, QueryCacheManager}, -}; - -use super::ingest::PostError; - -pub async fn list(req: HttpRequest) -> Result { - let stream = req - .match_info() - .get("stream") - .ok_or_else(|| PostError::Invalid(anyhow!("Invalid Stream Name in resource path")))?; - - let user_id = req - .match_info() - .get("user_id") - .ok_or_else(|| PostError::Invalid(anyhow!("Invalid User ID not in Resource path")))?; - - let query_cache_manager = QueryCacheManager::global(CONFIG.parseable.query_cache_size) - .await - .unwrap_or(None); - - if let Some(query_cache_manager) = query_cache_manager { - let cache = query_cache_manager - .get_cache(stream, user_id) - .await - .map_err(PostError::CacheError)?; - - let size = cache.used_cache_size(); - let queries = cache.queries(); - - let out = json!({ - "used_capacity": size, - "cache": queries - }); - - Ok((web::Json(out), StatusCode::OK)) - } else { - Err(PostError::Invalid(anyhow!( - "Query Caching is not active on server " - ))) - } -} - -pub async fn remove(req: HttpRequest, body: Bytes) -> Result { - let stream = req - .match_info() - .get("stream") - .ok_or_else(|| PostError::Invalid(anyhow!("Invalid Stream Name in resource path")))?; - - let user_id = req - .match_info() - .get("user_id") - .ok_or_else(|| PostError::Invalid(anyhow!("Invalid User ID not in Resource path")))?; - - let query = serde_json::from_slice::(&body)?; - - let query_cache_manager = QueryCacheManager::global(CONFIG.parseable.query_cache_size) - .await - .unwrap_or(None); - - if let Some(query_cache_manager) = query_cache_manager { - query_cache_manager - .remove_from_cache(query, stream, user_id) - .await?; - - Ok(HttpResponse::Ok().finish()) - } else { - Err(PostError::Invalid(anyhow!("Query Caching is not enabled"))) - } -} diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index 303f591c3..790d37ba9 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -27,7 +27,6 @@ use crate::event::{ }; use crate::handlers::http::modal::utils::logstream_utils::create_stream_and_schema_from_storage; use crate::handlers::STREAM_NAME_HEADER_KEY; -use crate::localcache::CacheError; use crate::metadata::error::stream_info::MetadataError; use crate::metadata::STREAM_INFO; use crate::option::{Mode, CONFIG}; @@ -236,8 +235,6 @@ pub enum PostError { #[error("Error: {0}")] DashboardError(#[from] DashboardError), #[error("Error: {0}")] - CacheError(#[from] CacheError), - #[error("Error: {0}")] StreamError(#[from] StreamError), } @@ -259,7 +256,6 @@ impl actix_web::ResponseError for PostError { PostError::ObjectStorageError(_) => StatusCode::INTERNAL_SERVER_ERROR, PostError::DashboardError(_) => StatusCode::INTERNAL_SERVER_ERROR, PostError::FiltersError(_) => StatusCode::INTERNAL_SERVER_ERROR, - PostError::CacheError(_) => StatusCode::INTERNAL_SERVER_ERROR, PostError::StreamError(_) => StatusCode::INTERNAL_SERVER_ERROR, } } diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index 1db58dcd4..4321e42e9 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -321,44 +321,6 @@ pub async fn put_retention( )) } -pub async fn get_cache_enabled(req: HttpRequest) -> Result { - let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); - - if CONFIG.parseable.local_cache_path.is_none() { - return Err(StreamError::CacheNotEnabled(stream_name)); - } - - let cache_enabled = STREAM_INFO.get_cache_enabled(&stream_name)?; - Ok((web::Json(cache_enabled), StatusCode::OK)) -} - -pub async fn put_enable_cache( - req: HttpRequest, - body: web::Json, -) -> Result { - let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); - let storage = CONFIG.storage().get_object_store(); - - if !metadata::STREAM_INFO.stream_exists(&stream_name) { - return Err(StreamError::StreamNotFound(stream_name)); - } - if CONFIG.parseable.local_cache_path.is_none() { - return Err(StreamError::CacheNotEnabled(stream_name)); - } - let enable_cache = body.into_inner(); - let mut stream_metadata = storage.get_object_store_format(&stream_name).await?; - stream_metadata.cache_enabled = enable_cache; - storage - .put_stream_manifest(&stream_name, &stream_metadata) - .await?; - - STREAM_INFO.set_cache_enabled(&stream_name, enable_cache)?; - Ok(( - format!("Cache set to {enable_cache} for log stream {stream_name}"), - StatusCode::OK, - )) -} - pub async fn get_stats_date(stream_name: &str, date: &str) -> Result { let event_labels = event_labels_date(stream_name, "json", date); let storage_size_labels = storage_size_labels_date(stream_name, date); @@ -812,10 +774,6 @@ pub mod error { CreateStream(#[from] CreateStreamError), #[error("Log stream {0} does not exist")] StreamNotFound(String), - #[error( - "Caching not enabled at Parseable server config. Can't enable cache for stream {0}" - )] - CacheNotEnabled(String), #[error("Log stream is not initialized, send an event to this logstream and try again")] UninitializedLogstream, #[error("Storage Error {0}")] @@ -872,7 +830,6 @@ pub mod error { StreamError::CreateStream(CreateStreamError::SerdeError(_)) => { StatusCode::BAD_REQUEST } - StreamError::CacheNotEnabled(_) => StatusCode::BAD_REQUEST, StreamError::StreamNotFound(_) => StatusCode::NOT_FOUND, StreamError::Custom { status, .. } => *status, StreamError::UninitializedLogstream => StatusCode::METHOD_NOT_ALLOWED, diff --git a/src/handlers/http/mod.rs b/src/handlers/http/mod.rs index 94b681adb..f627b613a 100644 --- a/src/handlers/http/mod.rs +++ b/src/handlers/http/mod.rs @@ -17,7 +17,9 @@ */ use actix_cors::Cors; +use actix_web::Responder; use arrow_schema::Schema; +use http::StatusCode; use itertools::Itertools; use serde_json::Value; @@ -26,7 +28,6 @@ use crate::{option::CONFIG, storage::STREAM_ROOT_DIRECTORY}; use self::{cluster::get_ingestor_info, query::Query}; pub mod about; -pub mod cache; pub mod cluster; pub mod health_check; pub mod ingest; @@ -130,3 +131,9 @@ pub async fn send_query_request_to_ingestor(query: &Query) -> anyhow::Result impl Responder { + (CACHING_NOTICE, StatusCode::GONE) +} diff --git a/src/handlers/http/modal/ingest/ingestor_logstream.rs b/src/handlers/http/modal/ingest/ingestor_logstream.rs index 151e5d745..3f0e5292d 100644 --- a/src/handlers/http/modal/ingest/ingestor_logstream.rs +++ b/src/handlers/http/modal/ingest/ingestor_logstream.rs @@ -16,11 +16,10 @@ * */ -use actix_web::{web, HttpRequest, Responder}; +use actix_web::{HttpRequest, Responder}; use bytes::Bytes; use http::StatusCode; -use itertools::Itertools; -use tracing::{error, warn}; +use tracing::warn; use crate::{ catalog::remove_manifest_from_snapshot, @@ -31,10 +30,9 @@ use crate::{ create_stream_and_schema_from_storage, create_update_stream, }, }, - metadata::{self, STREAM_INFO}, + metadata, option::CONFIG, stats, - storage::LogStream, }; pub async fn retention_cleanup( @@ -89,61 +87,3 @@ pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result, -) -> Result { - let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); - let storage = CONFIG.storage().get_object_store(); - - if CONFIG.parseable.local_cache_path.is_none() { - return Err(StreamError::CacheNotEnabled(stream_name)); - } - // here the ingest server has not found the stream - // so it should check if the stream exists in storage - let check = storage - .list_streams() - .await? - .iter() - .map(|stream| stream.name.clone()) - .contains(&stream_name); - - if !check { - error!("Stream {} not found", stream_name.clone()); - return Err(StreamError::StreamNotFound(stream_name.clone())); - } - metadata::STREAM_INFO - .upsert_stream_info( - storage.as_ref(), - LogStream { - name: stream_name.clone().to_owned(), - }, - ) - .await - .map_err(|_| StreamError::StreamNotFound(stream_name.clone()))?; - - let enable_cache = body.into_inner(); - let mut stream_metadata = storage.get_object_store_format(&stream_name).await?; - stream_metadata.cache_enabled = enable_cache; - storage - .put_stream_manifest(&stream_name, &stream_metadata) - .await?; - - STREAM_INFO.set_cache_enabled(&stream_name, enable_cache)?; - Ok(( - format!("Cache set to {enable_cache} for log stream {stream_name}"), - StatusCode::OK, - )) -} - -pub async fn get_cache_enabled(req: HttpRequest) -> Result { - let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); - - if CONFIG.parseable.local_cache_path.is_none() { - return Err(StreamError::CacheNotEnabled(stream_name)); - } - - let cache_enabled = STREAM_INFO.get_cache_enabled(&stream_name)?; - Ok((web::Json(cache_enabled), StatusCode::OK)) -} diff --git a/src/handlers/http/modal/ingest_server.rs b/src/handlers/http/modal/ingest_server.rs index 829b529de..7a47fad36 100644 --- a/src/handlers/http/modal/ingest_server.rs +++ b/src/handlers/http/modal/ingest_server.rs @@ -25,12 +25,12 @@ use super::OpenIdClient; use super::ParseableServer; use crate::analytics; use crate::handlers::airplane; +use crate::handlers::http::caching_removed; use crate::handlers::http::ingest; use crate::handlers::http::logstream; use crate::handlers::http::middleware::DisAllowRootUser; use crate::handlers::http::middleware::RouteExt; use crate::handlers::http::role; -use crate::localcache::LocalCacheManager; use crate::metrics; use crate::migration; use crate::migration::metadata_migration::migrate_ingester_metadata; @@ -101,13 +101,6 @@ impl ParseableServer for IngestServer { /// configure the server and start an instance to ingest data async fn init(&self) -> anyhow::Result<()> { - // ! Undefined and Untested behaviour - if let Some(cache_manager) = LocalCacheManager::global() { - cache_manager - .validate(CONFIG.parseable.local_cache_size) - .await?; - }; - let prometheus = metrics::build_metrics_handler(); CONFIG.storage().register_store_metrics(&prometheus); @@ -278,18 +271,10 @@ impl IngestServer { ) .service( web::resource("/cache") - // PUT "/logstream/{logstream}/cache" ==> Set retention for given logstream - .route( - web::put() - .to(ingestor_logstream::put_enable_cache) - .authorize_for_stream(Action::PutCacheEnabled), - ) - // GET "/logstream/{logstream}/cache" ==> Get retention for given logstream - .route( - web::get() - .to(ingestor_logstream::get_cache_enabled) - .authorize_for_stream(Action::GetCacheEnabled), - ), + // PUT "/logstream/{logstream}/cache" ==> caching has been deprecated + .route(web::put().to(caching_removed)) + // GET "/logstream/{logstream}/cache" ==> caching has been deprecated + .route(web::get().to(caching_removed)), ) .service( web::scope("/retention").service( diff --git a/src/handlers/http/modal/query/querier_logstream.rs b/src/handlers/http/modal/query/querier_logstream.rs index 81d7ea2fb..58277f7b8 100644 --- a/src/handlers/http/modal/query/querier_logstream.rs +++ b/src/handlers/http/modal/query/querier_logstream.rs @@ -226,35 +226,3 @@ pub async fn get_stats(req: HttpRequest) -> Result Ok((web::Json(stats), StatusCode::OK)) } - -pub async fn put_enable_cache( - req: HttpRequest, - body: web::Json, -) -> Result { - let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); - let storage = CONFIG.storage().get_object_store(); - - if !metadata::STREAM_INFO.stream_exists(&stream_name) { - return Err(StreamError::StreamNotFound(stream_name)); - } - - let enable_cache = body.into_inner(); - let mut stream_metadata = storage.get_object_store_format(&stream_name).await?; - stream_metadata.cache_enabled = enable_cache; - storage - .put_stream_manifest(&stream_name, &stream_metadata) - .await?; - - STREAM_INFO.set_cache_enabled(&stream_name, enable_cache)?; - Ok(( - format!("Cache set to {enable_cache} for log stream {stream_name}"), - StatusCode::OK, - )) -} - -pub async fn get_cache_enabled(req: HttpRequest) -> Result { - let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); - - let cache_enabled = STREAM_INFO.get_cache_enabled(&stream_name)?; - Ok((web::Json(cache_enabled), StatusCode::OK)) -} diff --git a/src/handlers/http/modal/query_server.rs b/src/handlers/http/modal/query_server.rs index 0c7261be0..792bb6571 100644 --- a/src/handlers/http/modal/query_server.rs +++ b/src/handlers/http/modal/query_server.rs @@ -17,11 +17,11 @@ */ use crate::handlers::airplane; -use crate::handlers::http::base_path; use crate::handlers::http::cluster::{self, init_cluster_metrics_schedular}; use crate::handlers::http::logstream::create_internal_stream_if_not_exists; use crate::handlers::http::middleware::{DisAllowRootUser, RouteExt}; use crate::handlers::http::{self, role}; +use crate::handlers::http::{base_path, caching_removed}; use crate::handlers::http::{logstream, MAX_EVENT_PAYLOAD_SIZE}; use crate::hottier::HotTierManager; use crate::rbac::role::Action; @@ -328,18 +328,10 @@ impl QueryServer { ) .service( web::resource("/cache") - // PUT "/logstream/{logstream}/cache" ==> Set retention for given logstream - .route( - web::put() - .to(querier_logstream::put_enable_cache) - .authorize_for_stream(Action::PutCacheEnabled), - ) - // GET "/logstream/{logstream}/cache" ==> Get retention for given logstream - .route( - web::get() - .to(querier_logstream::get_cache_enabled) - .authorize_for_stream(Action::GetCacheEnabled), - ), + // PUT "/logstream/{logstream}/cache" ==> caching has been deprecated + .route(web::put().to(caching_removed)) + // GET "/logstream/{logstream}/cache" ==> caching has been deprecated + .route(web::get().to(caching_removed)), ) .service( web::resource("/hottier") diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index f8f7dc2f2..6c0ec9fd8 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -20,14 +20,13 @@ use crate::analytics; use crate::handlers; use crate::handlers::http::about; use crate::handlers::http::base_path; -use crate::handlers::http::cache; +use crate::handlers::http::caching_removed; use crate::handlers::http::health_check; use crate::handlers::http::query; use crate::handlers::http::trino; use crate::handlers::http::users::dashboards; use crate::handlers::http::users::filters; use crate::hottier::HotTierManager; -use crate::localcache::LocalCacheManager; use crate::metrics; use crate::migration; use crate::storage; @@ -98,12 +97,6 @@ impl ParseableServer for Server { // configure the server and start an instance of the single server setup async fn init(&self) -> anyhow::Result<()> { - if let Some(cache_manager) = LocalCacheManager::global() { - cache_manager - .validate(CONFIG.parseable.local_cache_size) - .await?; - }; - let prometheus = metrics::build_metrics_handler(); CONFIG.storage().register_store_metrics(&prometheus); @@ -254,8 +247,8 @@ impl Server { web::scope("/{user_id}").service( web::scope("/{stream}").service( web::resource("") - .route(web::get().to(cache::list).authorize(Action::ListCache)) - .route(web::post().to(cache::remove).authorize(Action::RemoveCache)), + .route(web::get().to(caching_removed)) + .route(web::post().to(caching_removed)), ), ), ) @@ -361,17 +354,9 @@ impl Server { .service( web::resource("/cache") // PUT "/logstream/{logstream}/cache" ==> Set retention for given logstream - .route( - web::put() - .to(logstream::put_enable_cache) - .authorize_for_stream(Action::PutCacheEnabled), - ) + .route(web::put().to(caching_removed)) // GET "/logstream/{logstream}/cache" ==> Get retention for given logstream - .route( - web::get() - .to(logstream::get_cache_enabled) - .authorize_for_stream(Action::GetCacheEnabled), - ), + .route(web::get().to(caching_removed)), ) .service( web::resource("/hottier") diff --git a/src/handlers/http/query.rs b/src/handlers/http/query.rs index 5e3d9f0e0..e4ac92b44 100644 --- a/src/handlers/http/query.rs +++ b/src/handlers/http/query.rs @@ -19,7 +19,6 @@ use actix_web::http::header::ContentType; use actix_web::web::{self, Json}; use actix_web::{FromRequest, HttpRequest, Responder}; -use anyhow::anyhow; use chrono::{DateTime, Utc}; use datafusion::common::tree_node::TreeNode; use datafusion::error::DataFusionError; @@ -30,22 +29,18 @@ use std::collections::HashMap; use std::pin::Pin; use std::sync::Arc; use std::time::Instant; -use tracing::{error, info, warn}; +use tracing::error; use crate::event::error::EventError; use crate::handlers::http::fetch_schema; use crate::metadata::STREAM_INFO; -use arrow_array::RecordBatch; use crate::event::commit_schema; -use crate::handlers::{CACHE_RESULTS_HEADER_KEY, CACHE_VIEW_HEADER_KEY, USER_ID_HEADER_KEY}; -use crate::localcache::CacheError; use crate::metrics::QUERY_EXECUTE_TIME; use crate::option::{Mode, CONFIG}; use crate::query::error::ExecuteError; use crate::query::Query as LogicalQuery; use crate::query::{TableScanVisitor, QUERY_SESSION}; -use crate::querycache::{CacheMetadata, QueryCacheManager}; use crate::rbac::role::{Action, Permission}; use crate::rbac::Users; use crate::response::QueryResponse; @@ -88,43 +83,6 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result Result, - user_id: Option<&str>, - query_cache_manager: Option<&QueryCacheManager>, - stream: &str, - records: &[RecordBatch], - start: String, - end: String, - query: String, -) -> Result<(), QueryError> { - match (cache_results, query_cache_manager) { - (Some(_), None) => { - warn!("Instructed to cache query results but Query Caching is not Enabled in Server"); - - Ok(()) - } - // do cache - (Some(should_cache), Some(query_cache_manager)) => { - if should_cache != "true" { - error!("value of cache results header is false"); - return Err(QueryError::CacheError(CacheError::Other( - "should not cache results", - ))); - } - - let user_id = user_id.ok_or(CacheError::Other("User Id not provided"))?; - let mut cache = query_cache_manager.get_cache(stream, user_id).await?; - - let cache_key = CacheMetadata::new(query.clone(), start.clone(), end.clone()); - - // guard to stop multiple caching of the same content - if let Some(path) = cache.get_file(&cache_key) { - info!("File already exists in cache, Removing old file"); - cache.delete(&cache_key, path).await?; - } - - if let Err(err) = query_cache_manager - .create_parquet_cache(stream, records, user_id, start, end, query) - .await - { - error!("Error occured while caching query results: {:?}", err); - if query_cache_manager - .clear_cache(stream, user_id) - .await - .is_err() - { - error!("Error Clearing Unwanted files from cache dir"); - } - } - // fallthrough - Ok(()) - } - (None, _) => Ok(()), - } -} - -#[allow(clippy::too_many_arguments)] -pub async fn get_results_from_cache( - show_cached: Option<&str>, - query_cache_manager: Option<&QueryCacheManager>, - stream: &str, - user_id: Option<&str>, - start_time: &str, - end_time: &str, - query: &str, - send_null: bool, - send_fields: bool, -) -> Result { - match (show_cached, query_cache_manager) { - (Some(_), None) => { - warn!("Instructed to show cached results but Query Caching is not Enabled on Server"); - None - } - (Some(should_show), Some(query_cache_manager)) => { - if should_show != "true" { - error!("value of show cached header is false"); - return Err(QueryError::CacheError(CacheError::Other( - "should not return cached results", - ))); - } - - let user_id = - user_id.ok_or_else(|| QueryError::Anyhow(anyhow!("User Id not provided")))?; - - let mut query_cache = query_cache_manager.get_cache(stream, user_id).await?; - - let (start, end) = parse_human_time(start_time, end_time)?; - - let file_path = query_cache.get_file(&CacheMetadata::new( - query.to_string(), - start.to_rfc3339(), - end.to_rfc3339(), - )); - if let Some(file_path) = file_path { - let (records, fields) = query_cache.get_cached_records(&file_path).await?; - let response = QueryResponse { - records, - fields, - fill_null: send_null, - with_fields: send_fields, - }; - - Some(Ok(response)) - } else { - None - } - } - (_, _) => None, - } - .map_or_else(|| Err(QueryError::CacheMiss), |ret_val| ret_val) -} - pub fn authorize_and_set_filter_tags( query: &mut LogicalQuery, permissions: Vec, @@ -500,10 +330,6 @@ pub enum QueryError { Execute(#[from] ExecuteError), #[error("ObjectStorage Error: {0}")] ObjectStorage(#[from] ObjectStorageError), - #[error("Cache Error: {0}")] - CacheError(#[from] CacheError), - #[error("")] - CacheMiss, #[error("Evern Error: {0}")] EventError(#[from] EventError), #[error("Error: {0}")] diff --git a/src/handlers/mod.rs b/src/handlers/mod.rs index 4a4259354..e18990800 100644 --- a/src/handlers/mod.rs +++ b/src/handlers/mod.rs @@ -23,9 +23,6 @@ pub mod livetail; const PREFIX_TAGS: &str = "x-p-tag-"; const PREFIX_META: &str = "x-p-meta-"; pub const STREAM_NAME_HEADER_KEY: &str = "x-p-stream"; -const CACHE_RESULTS_HEADER_KEY: &str = "x-p-cache-results"; -const CACHE_VIEW_HEADER_KEY: &str = "x-p-show-cached"; -const USER_ID_HEADER_KEY: &str = "x-p-user-id"; const LOG_SOURCE_KEY: &str = "x-p-log-source"; const TIME_PARTITION_KEY: &str = "x-p-time-partition"; const TIME_PARTITION_LIMIT_KEY: &str = "x-p-time-partition-limit"; diff --git a/src/lib.rs b/src/lib.rs index 140c32dcc..ddb7f8244 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -26,14 +26,12 @@ mod event; pub mod handlers; pub mod hottier; mod livetail; -pub mod localcache; mod metadata; pub mod metrics; pub mod migration; mod oidc; pub mod option; mod query; -mod querycache; pub mod rbac; mod response; mod static_schema; diff --git a/src/localcache.rs b/src/localcache.rs deleted file mode 100644 index 9ac6d5edd..000000000 --- a/src/localcache.rs +++ /dev/null @@ -1,268 +0,0 @@ -/* - * Parseable Server (C) 2022 - 2024 Parseable, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - */ - -use std::{io, path::PathBuf}; - -use fs_extra::file::CopyOptions; -use futures_util::TryFutureExt; -use hashlru::Cache; -use human_size::{Byte, Gigibyte, SpecificSize}; -use itertools::{Either, Itertools}; -use object_store::{local::LocalFileSystem, ObjectStore}; -use once_cell::sync::OnceCell; -use parquet::errors::ParquetError; -use tokio::{fs, sync::Mutex}; -use tracing::{error, info, warn}; - -use crate::{metadata::error::stream_info::MetadataError, option::CONFIG}; - -pub const STREAM_CACHE_FILENAME: &str = ".cache.json"; -pub const CACHE_META_FILENAME: &str = ".cache_meta.json"; -pub const CURRENT_CACHE_VERSION: &str = "v1"; - -#[derive(Debug, serde::Deserialize, serde::Serialize)] -pub struct LocalCache { - version: String, - current_size: u64, - /// Mapping between storage path and cache path. - files: Cache, -} - -impl LocalCache { - fn new() -> Self { - Self { - version: CURRENT_CACHE_VERSION.to_string(), - current_size: 0, - files: Cache::new(100), - } - } -} - -#[derive(Debug, serde::Deserialize, serde::Serialize)] -pub struct CacheMeta { - version: String, - size_capacity: u64, -} - -impl CacheMeta { - fn new() -> Self { - Self { - version: CURRENT_CACHE_VERSION.to_string(), - size_capacity: 0, - } - } -} - -pub struct LocalCacheManager { - filesystem: LocalFileSystem, - cache_path: PathBuf, - cache_capacity: u64, - copy_options: CopyOptions, - semaphore: Mutex<()>, -} - -impl LocalCacheManager { - pub fn global() -> Option<&'static LocalCacheManager> { - static INSTANCE: OnceCell = OnceCell::new(); - - let cache_path = CONFIG.parseable.local_cache_path.as_ref()?; - - Some(INSTANCE.get_or_init(|| { - let cache_path = cache_path.clone(); - std::fs::create_dir_all(&cache_path).unwrap(); - LocalCacheManager { - filesystem: LocalFileSystem::new(), - cache_path, - cache_capacity: CONFIG.parseable.local_cache_size, - copy_options: CopyOptions { - overwrite: true, - skip_exist: false, - ..CopyOptions::new() - }, - semaphore: Mutex::new(()), - } - })) - } - - pub async fn validate(&self, config_capacity: u64) -> Result<(), CacheError> { - fs::create_dir_all(&self.cache_path).await?; - let path = cache_meta_path(&self.cache_path) - .map_err(|err| CacheError::ObjectStoreError(err.into()))?; - let resp = self - .filesystem - .get(&path) - .and_then(|resp| resp.bytes()) - .await; - - let updated_cache = match resp { - Ok(bytes) => { - let mut meta: CacheMeta = serde_json::from_slice(&bytes)?; - if meta.size_capacity != config_capacity { - // log the change in cache size - let configured_size_human: SpecificSize = - SpecificSize::new(config_capacity as f64, Byte) - .unwrap() - .into(); - let current_size_human: SpecificSize = - SpecificSize::new(meta.size_capacity as f64, Byte) - .unwrap() - .into(); - warn!( - "Cache size is updated from {} to {}", - current_size_human, configured_size_human - ); - meta.size_capacity = config_capacity; - Some(meta) - } else { - None - } - } - Err(object_store::Error::NotFound { .. }) => { - let mut meta = CacheMeta::new(); - meta.size_capacity = config_capacity; - Some(meta) - } - Err(err) => return Err(err.into()), - }; - - if let Some(updated_cache) = updated_cache { - let result = self - .filesystem - .put(&path, serde_json::to_vec(&updated_cache)?.into()) - .await?; - info!("Cache meta file updated: {:?}", result); - } - - Ok(()) - } - - pub async fn get_cache(&self, stream: &str) -> Result { - let path = cache_file_path(&self.cache_path, stream).unwrap(); - let res = self - .filesystem - .get(&path) - .and_then(|resp| resp.bytes()) - .await; - let cache = match res { - Ok(bytes) => serde_json::from_slice(&bytes)?, - Err(object_store::Error::NotFound { .. }) => LocalCache::new(), - Err(err) => return Err(err.into()), - }; - Ok(cache) - } - - pub async fn put_cache(&self, stream: &str, cache: &LocalCache) -> Result<(), CacheError> { - let path = cache_file_path(&self.cache_path, stream).unwrap(); - let bytes = serde_json::to_vec(cache)?.into(); - let result = self.filesystem.put(&path, bytes).await?; - info!("Cache file updated: {:?}", result); - Ok(()) - } - - pub async fn move_to_cache( - &self, - stream: &str, - key: String, - staging_path: PathBuf, - ) -> Result<(), CacheError> { - let lock = self.semaphore.lock().await; - let mut cache_path = self.cache_path.join(stream); - fs::create_dir_all(&cache_path).await?; - cache_path.push(staging_path.file_name().unwrap()); - fs_extra::file::move_file(staging_path, &cache_path, &self.copy_options)?; - let file_size = std::fs::metadata(&cache_path)?.len(); - let mut cache = self.get_cache(stream).await?; - - while cache.current_size + file_size > self.cache_capacity { - if let Some((_, file_for_removal)) = cache.files.pop_lru() { - let lru_file_size = std::fs::metadata(&file_for_removal)?.len(); - cache.current_size = cache.current_size.saturating_sub(lru_file_size); - info!("removing cache entry"); - tokio::spawn(fs::remove_file(file_for_removal)); - } else { - error!("Cache size too small"); - break; - } - } - - if cache.files.is_full() { - cache.files.resize(cache.files.capacity() * 2); - } - cache.files.push(key, cache_path); - cache.current_size += file_size; - self.put_cache(stream, &cache).await?; - drop(lock); - Ok(()) - } - - pub async fn partition_on_cached( - &self, - stream: &str, - collection: Vec, - key: fn(&T) -> &String, - ) -> Result<(Vec<(T, PathBuf)>, Vec), CacheError> { - let lock = self.semaphore.lock().await; - let mut cache = self.get_cache(stream).await?; - let (cached, remainder): (Vec<_>, Vec<_>) = collection.into_iter().partition_map(|item| { - let key = key(&item); - match cache.files.get(key).cloned() { - Some(path) => Either::Left((item, path)), - None => Either::Right(item), - } - }); - self.put_cache(stream, &cache).await?; - drop(lock); - Ok((cached, remainder)) - } -} - -fn cache_file_path( - root: impl AsRef, - stream: &str, -) -> Result { - let mut path = root.as_ref().join(stream); - path.push(STREAM_CACHE_FILENAME); - object_store::path::Path::from_absolute_path(path) -} - -fn cache_meta_path( - root: impl AsRef, -) -> Result { - let path = root.as_ref().join(CACHE_META_FILENAME); - object_store::path::Path::from_absolute_path(path) -} - -#[derive(Debug, thiserror::Error)] -pub enum CacheError { - #[error("{0}")] - Serde(#[from] serde_json::Error), - #[error("{0}")] - IOError(#[from] io::Error), - #[error("{0}")] - MoveError(#[from] fs_extra::error::Error), - #[error("{0}")] - ObjectStoreError(#[from] object_store::Error), - #[error("{0}")] - ParquetError(#[from] ParquetError), - #[error("{0}")] - MetadataError(#[from] MetadataError), - #[error("Error: Cache File Does Not Exist")] - DoesNotExist, - #[error("Error: {0}")] - Other(&'static str), -} diff --git a/src/metadata.rs b/src/metadata.rs index 5447ea796..132ce540f 100644 --- a/src/metadata.rs +++ b/src/metadata.rs @@ -243,22 +243,6 @@ impl StreamInfo { }) } - pub fn get_cache_enabled(&self, stream_name: &str) -> Result { - let map = self.read().expect(LOCK_EXPECT); - map.get(stream_name) - .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string())) - .map(|metadata| metadata.cache_enabled) - } - - pub fn set_cache_enabled(&self, stream_name: &str, enable: bool) -> Result<(), MetadataError> { - let mut map = self.write().expect(LOCK_EXPECT); - let stream = map - .get_mut(stream_name) - .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string()))?; - stream.cache_enabled = enable; - Ok(()) - } - pub fn set_hot_tier(&self, stream_name: &str, enable: bool) -> Result<(), MetadataError> { let mut map = self.write().expect(LOCK_EXPECT); let stream = map @@ -323,6 +307,7 @@ impl StreamInfo { map.remove(stream_name); } + #[allow(dead_code)] pub async fn upsert_stream_info( &self, storage: &(impl ObjectStorage + ?Sized), diff --git a/src/option.rs b/src/option.rs index fb9f3c75c..2e112ecf9 100644 --- a/src/option.rs +++ b/src/option.rs @@ -76,15 +76,6 @@ Cloud Native, log analytics platform for modern applications."#, .exit() } - if cli.local_cache_path.is_some() { - create_parseable_cli_command() - .error( - ErrorKind::ValueValidation, - "Cannot use cache with local-store subcommand.", - ) - .exit() - } - if cli.hot_tier_storage_path.is_some() { create_parseable_cli_command() .error( @@ -315,7 +306,6 @@ pub mod validation { use path_clean::PathClean; - use crate::option::MIN_CACHE_SIZE_BYTES; use human_size::{multiples, SpecificSize}; pub fn file_path(s: &str) -> Result { @@ -399,17 +389,6 @@ pub mod validation { } } - pub fn cache_size(s: &str) -> Result { - let size = human_size_to_bytes(s)?; - if size < MIN_CACHE_SIZE_BYTES { - return Err(format!( - "Specified value of cache size is smaller than current minimum of {}", - human_size_to_bytes(&MIN_CACHE_SIZE_BYTES.to_string()).unwrap() - )); - } - Ok(size) - } - pub fn validate_disk_usage(max_disk_usage: &str) -> Result { if let Ok(max_disk_usage) = max_disk_usage.parse::() { if (0.0..=100.0).contains(&max_disk_usage) { diff --git a/src/query/mod.rs b/src/query/mod.rs index b41a066f8..350e47ea8 100644 --- a/src/query/mod.rs +++ b/src/query/mod.rs @@ -208,6 +208,7 @@ impl TableScanVisitor { pub fn into_inner(self) -> Vec { self.tables } + #[allow(dead_code)] pub fn top(&self) -> Option<&str> { self.tables.first().map(|s| s.as_ref()) } diff --git a/src/query/stream_schema_provider.rs b/src/query/stream_schema_provider.rs index f27cb6998..384df0813 100644 --- a/src/query/stream_schema_provider.rs +++ b/src/query/stream_schema_provider.rs @@ -63,7 +63,6 @@ use crate::{ self, column::TypedStatistics, manifest::Manifest, snapshot::ManifestItem, ManifestFile, }, event::{self, DEFAULT_TIMESTAMP_KEY}, - localcache::LocalCacheManager, metadata::STREAM_INFO, metrics::QUERY_CACHE_HIT, option::CONFIG, @@ -318,7 +317,6 @@ impl TableProvider for StandardTableProvider { limit: Option, ) -> Result, DataFusionError> { let mut memory_exec = None; - let mut cache_exec = None; let mut hot_tier_exec = None; let mut listing_exec = None; let object_store = state @@ -417,22 +415,6 @@ impl TableProvider for StandardTableProvider { ); } - // Based on entries in the manifest files, find them in the cache and create a physical plan. - if let Some(cache_manager) = LocalCacheManager::global() { - cache_exec = get_cache_exectuion_plan( - cache_manager, - &self.stream, - &mut manifest_files, - self.schema.clone(), - projection, - filters, - limit, - state, - time_partition.clone(), - ) - .await?; - } - // Hot tier data fetch if let Some(hot_tier_manager) = HotTierManager::global() { if hot_tier_manager.check_stream_hot_tier_exists(&self.stream) { @@ -453,7 +435,7 @@ impl TableProvider for StandardTableProvider { if manifest_files.is_empty() { QUERY_CACHE_HIT.with_label_values(&[&self.stream]).inc(); return final_plan( - vec![listing_exec, memory_exec, cache_exec, hot_tier_exec], + vec![listing_exec, memory_exec, hot_tier_exec], projection, self.schema.clone(), ); @@ -474,13 +456,7 @@ impl TableProvider for StandardTableProvider { .await?; Ok(final_plan( - vec![ - listing_exec, - memory_exec, - cache_exec, - hot_tier_exec, - Some(remote_exec), - ], + vec![listing_exec, memory_exec, hot_tier_exec, Some(remote_exec)], projection, self.schema.clone(), )?) @@ -511,55 +487,6 @@ impl TableProvider for StandardTableProvider { } } -#[allow(clippy::too_many_arguments)] -async fn get_cache_exectuion_plan( - cache_manager: &LocalCacheManager, - stream: &str, - manifest_files: &mut Vec, - schema: Arc, - projection: Option<&Vec>, - filters: &[Expr], - limit: Option, - state: &dyn Session, - time_partition: Option, -) -> Result>, DataFusionError> { - let (cached, remainder) = cache_manager - .partition_on_cached(stream, manifest_files.clone(), |file: &File| { - &file.file_path - }) - .await - .map_err(|err| DataFusionError::External(Box::new(err)))?; - - // Assign remaining entries back to manifest list - // This is to be used for remote query - *manifest_files = remainder; - - let cached = cached - .into_iter() - .map(|(mut file, cache_path)| { - let cache_path = object_store::path::Path::from_absolute_path(cache_path).unwrap(); - file.file_path = cache_path.to_string(); - file - }) - .collect(); - - let (partitioned_files, statistics) = partitioned_files(cached, &schema); - let plan = create_parquet_physical_plan( - ObjectStoreUrl::parse("file:///").unwrap(), - partitioned_files, - statistics, - schema.clone(), - projection, - filters, - limit, - state, - time_partition.clone(), - ) - .await?; - - Ok(Some(plan)) -} - #[allow(clippy::too_many_arguments)] async fn get_hottier_exectuion_plan( hot_tier_manager: &HotTierManager, diff --git a/src/querycache.rs b/src/querycache.rs deleted file mode 100644 index 3169e8061..000000000 --- a/src/querycache.rs +++ /dev/null @@ -1,409 +0,0 @@ -/* - * Parseable Server (C) 2022 - 2024 Parseable, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - */ - -use arrow_array::RecordBatch; -use chrono::Utc; -use futures::TryStreamExt; -use futures_util::TryFutureExt; -use hashlru::Cache; -use human_size::{Byte, Gigibyte, SpecificSize}; -use itertools::Itertools; -use object_store::{local::LocalFileSystem, ObjectStore}; -use once_cell::sync::OnceCell; -use parquet::arrow::{AsyncArrowWriter, ParquetRecordBatchStreamBuilder}; -use std::collections::HashMap; -use std::path::{Path, PathBuf}; -use tokio::fs as AsyncFs; -use tokio::{fs, sync::Mutex}; -use tracing::{error, info, warn}; - -use crate::handlers::http::users::USERS_ROOT_DIR; -use crate::metadata::STREAM_INFO; -use crate::storage::staging::parquet_writer_props; -use crate::{localcache::CacheError, option::CONFIG, utils::hostname_unchecked}; - -pub const QUERY_CACHE_FILENAME: &str = ".cache.json"; -pub const QUERY_CACHE_META_FILENAME: &str = ".cache_meta.json"; -pub const CURRENT_QUERY_CACHE_VERSION: &str = "v1"; - -#[derive(Default, Clone, serde::Deserialize, serde::Serialize, Debug, Hash, Eq, PartialEq)] -pub struct CacheMetadata { - pub query: String, - pub start_time: String, - pub end_time: String, -} - -impl CacheMetadata { - pub const fn new(query: String, start_time: String, end_time: String) -> Self { - Self { - query, - start_time, - end_time, - } - } -} - -#[derive(Debug, serde::Deserialize, serde::Serialize)] -pub struct QueryCache { - version: String, - current_size: u64, - - /// Mapping between storage path and cache path. - files: Cache, -} - -impl QueryCache { - fn new() -> Self { - Self { - version: CURRENT_QUERY_CACHE_VERSION.to_string(), - current_size: 0, - files: Cache::new(100), - } - } - - pub fn get_file(&mut self, key: &CacheMetadata) -> Option { - self.files.get(key).cloned() - } - - pub fn used_cache_size(&self) -> u64 { - self.current_size - } - - pub fn remove(&mut self, key: &CacheMetadata) -> Option { - self.files.remove(key) - } - - pub async fn delete(&mut self, key: &CacheMetadata, path: PathBuf) -> Result<(), CacheError> { - self.files.delete(key); - AsyncFs::remove_file(path).await?; - - Ok(()) - } - - pub fn queries(&self) -> Vec<&CacheMetadata> { - self.files.keys().collect_vec() - } - - // read the parquet - // return the recordbatches - pub async fn get_cached_records( - &self, - path: &PathBuf, - ) -> Result<(Vec, Vec), CacheError> { - let file = AsyncFs::File::open(path).await?; - let builder = ParquetRecordBatchStreamBuilder::new(file).await?; - // Build a async parquet reader. - let stream = builder.build()?; - - let records = stream.try_collect::>().await?; - let fields = records.first().map_or_else(Vec::new, |record| { - record - .schema() - .fields() - .iter() - .map(|field| field.name()) - .cloned() - .collect_vec() - }); - - Ok((records, fields)) - } -} - -// .cache_meta.json -#[derive(Debug, serde::Deserialize, serde::Serialize)] -pub struct QueryCacheMeta { - version: String, - size_capacity: u64, -} - -impl QueryCacheMeta { - fn new() -> Self { - Self { - version: CURRENT_QUERY_CACHE_VERSION.to_string(), - size_capacity: 0, - } - } -} - -pub struct QueryCacheManager { - filesystem: LocalFileSystem, - cache_path: PathBuf, // refers to the path passed in the env var - total_cache_capacity: u64, - semaphore: Mutex<()>, -} - -impl QueryCacheManager { - pub fn gen_file_path(query_staging_path: &str, stream: &str, user_id: &str) -> PathBuf { - PathBuf::from_iter([ - query_staging_path, - USERS_ROOT_DIR, - user_id, - stream, - &format!( - "{}.{}.parquet", - hostname_unchecked(), - Utc::now().timestamp() - ), - ]) - } - pub async fn global(config_capacity: u64) -> Result, CacheError> { - static INSTANCE: OnceCell = OnceCell::new(); - - let cache_path = CONFIG.parseable.query_cache_path.as_ref(); - - if cache_path.is_none() { - return Ok(None); - } - - let cache_path = cache_path.unwrap(); - - let cache_manager = INSTANCE.get_or_init(|| { - let cache_path = cache_path.clone(); - std::fs::create_dir_all(&cache_path).unwrap(); - Self { - filesystem: LocalFileSystem::new(), - cache_path, - total_cache_capacity: CONFIG.parseable.query_cache_size, - semaphore: Mutex::new(()), - } - }); - - cache_manager.validate(config_capacity).await?; - - Ok(Some(cache_manager)) - } - - async fn validate(&self, config_capacity: u64) -> Result<(), CacheError> { - fs::create_dir_all(&self.cache_path).await?; - let path = query_cache_meta_path(&self.cache_path) - .map_err(|err| CacheError::ObjectStoreError(err.into()))?; - let resp = self - .filesystem - .get(&path) - .and_then(|resp| resp.bytes()) - .await; - - let updated_cache = match resp { - Ok(bytes) => { - let mut meta: QueryCacheMeta = serde_json::from_slice(&bytes)?; - if meta.size_capacity != config_capacity { - // log the change in cache size - let configured_size_human: SpecificSize = - SpecificSize::new(config_capacity as f64, Byte) - .unwrap() - .into(); - let current_size_human: SpecificSize = - SpecificSize::new(meta.size_capacity as f64, Byte) - .unwrap() - .into(); - warn!( - "Cache size is updated from {} to {}", - current_size_human, configured_size_human - ); - meta.size_capacity = config_capacity; - Some(meta) - } else { - None - } - } - Err(object_store::Error::NotFound { .. }) => { - let mut meta = QueryCacheMeta::new(); - meta.size_capacity = config_capacity; - Some(meta) - } - Err(err) => return Err(err.into()), - }; - - if let Some(updated_cache) = updated_cache { - let result = self - .filesystem - .put(&path, serde_json::to_vec(&updated_cache)?.into()) - .await?; - info!("Cache meta file updated: {:?}", result); - } - - Ok(()) - } - - pub async fn get_cache(&self, stream: &str, user_id: &str) -> Result { - let path = query_cache_file_path(&self.cache_path, stream, user_id).unwrap(); - let res = self - .filesystem - .get(&path) - .and_then(|resp| resp.bytes()) - .await; - let cache = match res { - Ok(bytes) => serde_json::from_slice(&bytes)?, - Err(object_store::Error::NotFound { .. }) => QueryCache::new(), - Err(err) => return Err(err.into()), - }; - Ok(cache) - } - - pub async fn remove_from_cache( - &self, - key: CacheMetadata, - stream: &str, - user_id: &str, - ) -> Result<(), CacheError> { - let mut cache = self.get_cache(stream, user_id).await?; - - if let Some(remove_result) = cache.remove(&key) { - self.put_cache(stream, &cache, user_id).await?; - tokio::spawn(fs::remove_file(remove_result)); - Ok(()) - } else { - Err(CacheError::DoesNotExist) - } - } - - pub async fn put_cache( - &self, - stream: &str, - cache: &QueryCache, - user_id: &str, - ) -> Result<(), CacheError> { - let path = query_cache_file_path(&self.cache_path, stream, user_id).unwrap(); - - let bytes = serde_json::to_vec(cache)?.into(); - let result = self.filesystem.put(&path, bytes).await?; - info!("Cache file updated: {:?}", result); - Ok(()) - } - - pub async fn move_to_cache( - &self, - stream: &str, - key: CacheMetadata, - file_path: &Path, - user_id: &str, - ) -> Result<(), CacheError> { - let lock = self.semaphore.lock().await; - let file_size = std::fs::metadata(file_path)?.len(); - let mut cache = self.get_cache(stream, user_id).await?; - - while cache.current_size + file_size > self.total_cache_capacity { - if let Some((_, file_for_removal)) = cache.files.pop_lru() { - let lru_file_size = fs::metadata(&file_for_removal).await?.len(); - cache.current_size = cache.current_size.saturating_sub(lru_file_size); - info!("removing cache entry"); - tokio::spawn(fs::remove_file(file_for_removal)); - } else { - error!("Cache size too small"); - break; - } - } - - if cache.files.is_full() { - cache.files.resize(cache.files.capacity() * 2); - } - cache.files.push(key, file_path.to_path_buf()); - cache.current_size += file_size; - self.put_cache(stream, &cache, user_id).await?; - drop(lock); - Ok(()) - } - - pub async fn create_parquet_cache( - &self, - table_name: &str, - records: &[RecordBatch], - user_id: &str, - start: String, - end: String, - query: String, - ) -> Result<(), CacheError> { - let parquet_path = Self::gen_file_path( - self.cache_path.to_str().expect("utf-8 compat path"), - user_id, - table_name, - ); - AsyncFs::create_dir_all(parquet_path.parent().expect("parent path exists")).await?; - let parquet_file = AsyncFs::File::create(&parquet_path).await?; - let time_partition = STREAM_INFO.get_time_partition(table_name)?; - let props = parquet_writer_props(time_partition.clone(), 0, HashMap::new()).build(); - - let sch = if let Some(record) = records.first() { - record.schema() - } else { - // the record batch is empty, do not cache and return early - return Ok(()); - }; - - let mut arrow_writer = AsyncArrowWriter::try_new(parquet_file, sch, Some(props))?; - - for record in records { - if let Err(e) = arrow_writer.write(record).await { - error!("Error While Writing to Query Cache: {}", e); - } - } - - arrow_writer.close().await?; - self.move_to_cache( - table_name, - CacheMetadata::new(query, start, end), - &parquet_path, - user_id, - ) - .await - } - - pub async fn clear_cache(&self, stream: &str, user_id: &str) -> Result<(), CacheError> { - let cache = self.get_cache(stream, user_id).await?; - let map = cache.files.values().collect_vec(); - let p_path = PathBuf::from_iter([USERS_ROOT_DIR, stream, user_id]); - let path = self.cache_path.join(p_path); - let mut paths = fs::read_dir(path).await?; - while let Some(path) = paths.next_entry().await? { - let check = path.path().is_file() - && map.contains(&&path.path()) - && !path - .path() - .file_name() - .expect("File Name is Proper") - .to_str() - .expect("Path is Proper utf-8 ") - .ends_with(".json"); - if check { - fs::remove_file(path.path()).await?; - } - } - - Ok(()) - } -} - -fn query_cache_file_path( - root: impl AsRef, - stream: &str, - user_id: &str, -) -> Result { - let local_meta_path = PathBuf::from_iter([USERS_ROOT_DIR, stream, user_id]); - let mut path = root.as_ref().join(local_meta_path); - - path.push(QUERY_CACHE_FILENAME); - object_store::path::Path::from_absolute_path(path) -} - -fn query_cache_meta_path( - root: impl AsRef, -) -> Result { - let path = root.as_ref().join(QUERY_CACHE_META_FILENAME); - object_store::path::Path::from_absolute_path(path) -} diff --git a/src/rbac/role.rs b/src/rbac/role.rs index 0e8f1ab24..f94c8f171 100644 --- a/src/rbac/role.rs +++ b/src/rbac/role.rs @@ -31,8 +31,6 @@ pub enum Action { DeleteStream, GetRetention, PutRetention, - GetCacheEnabled, - PutCacheEnabled, PutHotTierEnabled, GetHotTierEnabled, DeleteHotTierEnabled, @@ -62,8 +60,6 @@ pub enum Action { GetFilter, CreateFilter, DeleteFilter, - ListCache, - RemoveCache, Login, Metrics, } @@ -136,8 +132,6 @@ impl RoleBuilder { | Action::ListFilter | Action::CreateFilter | Action::DeleteFilter - | Action::ListCache - | Action::RemoveCache | Action::GetAnalytics => Permission::Unit(action), Action::Ingest | Action::GetSchema @@ -145,8 +139,6 @@ impl RoleBuilder { | Action::GetStats | Action::GetRetention | Action::PutRetention - | Action::GetCacheEnabled - | Action::PutCacheEnabled | Action::PutAlert | Action::GetAlert | Action::All => Permission::Stream(action, self.stream.clone().unwrap()), @@ -221,8 +213,6 @@ pub mod model { Action::GetStats, Action::GetRetention, Action::PutRetention, - Action::PutCacheEnabled, - Action::GetCacheEnabled, Action::PutHotTierEnabled, Action::GetHotTierEnabled, Action::DeleteHotTierEnabled, @@ -267,8 +257,6 @@ pub mod model { Action::Ingest, Action::QueryLLM, Action::GetStreamInfo, - Action::GetCacheEnabled, - Action::PutCacheEnabled, Action::GetFilter, Action::ListFilter, Action::CreateFilter, diff --git a/src/response.rs b/src/response.rs index a18827d71..ab89ffead 100644 --- a/src/response.rs +++ b/src/response.rs @@ -66,6 +66,7 @@ impl QueryResponse { Ok(web::Json(response)) } + #[allow(dead_code)] pub fn into_flight(self) -> Result, Status> { into_flight_data(self.records) } diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index e9ee32f18..ca70b3bd7 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -33,7 +33,6 @@ use crate::option::Mode; use crate::{ alerts::Alerts, catalog::{self, manifest::Manifest, snapshot::Snapshot}, - localcache::LocalCacheManager, metadata::STREAM_INFO, metrics::{storage::StorageMetrics, STORAGE_SIZE}, option::CONFIG, @@ -46,7 +45,6 @@ use async_trait::async_trait; use bytes::Bytes; use chrono::Local; use datafusion::{datasource::listing::ListingTableUrl, execution::runtime_env::RuntimeConfig}; -use itertools::Itertools; use relative_path::RelativePath; use relative_path::RelativePathBuf; use tracing::error; @@ -546,13 +544,7 @@ pub trait ObjectStorage: Send + Sync + 'static { let streams = STREAM_INFO.list_streams(); - let cache_manager = LocalCacheManager::global(); - let mut cache_updates: HashMap<&String, Vec<_>> = HashMap::new(); - for stream in &streams { - let cache_enabled = STREAM_INFO - .get_cache_enabled(stream) - .map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?; let time_partition = STREAM_INFO .get_time_partition(stream) .map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?; @@ -624,36 +616,9 @@ pub trait ObjectStorage: Send + Sync + 'static { let manifest = catalog::create_from_parquet_file(absolute_path.clone(), &file).unwrap(); catalog::update_snapshot(store, stream, manifest).await?; - if cache_enabled && cache_manager.is_some() { - cache_updates - .entry(stream) - .or_default() - .push((absolute_path, file)); - } else { - let _ = fs::remove_file(file); - } - } - } - // Cache management logic - if let Some(manager) = cache_manager { - let cache_updates = cache_updates - .into_iter() - .map(|(key, value)| (key.to_owned(), value)) - .collect_vec(); - - tokio::spawn(async move { - for (stream, files) in cache_updates { - for (storage_path, file) in files { - if let Err(e) = manager - .move_to_cache(&stream, storage_path, file.to_owned()) - .await - { - error!("Failed to move file to cache: {:?}", e); - } - } - } - }); + let _ = fs::remove_file(file); + } } Ok(())