diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index a4c25eb45..e1b1b987a 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -586,7 +586,7 @@ pub async fn get_stream_info(stream_name: Path) -> Result, - Json(json): Json, + Json(mut hottier): Json, ) -> Result { let stream_name = stream_name.into_inner(); if !STREAM_INFO.stream_exists(&stream_name) { @@ -609,35 +609,28 @@ pub async fn put_stream_hot_tier( status: StatusCode::BAD_REQUEST, }); } - if CONFIG.options.hot_tier_storage_path.is_none() { - return Err(StreamError::HotTierNotEnabled(stream_name)); - } - - let mut hottier: StreamHotTier = match serde_json::from_value(json) { - Ok(hottier) => hottier, - Err(err) => return Err(StreamError::InvalidHotTierConfig(err)), - }; validator::hot_tier(&hottier.size.to_string())?; STREAM_INFO.set_hot_tier(&stream_name, true)?; - if let Some(hot_tier_manager) = HotTierManager::global() { - let existing_hot_tier_used_size = hot_tier_manager - .validate_hot_tier_size(&stream_name, &hottier.size) - .await?; - hottier.used_size = existing_hot_tier_used_size.to_string(); - hottier.available_size = hottier.size.to_string(); - hottier.version = Some(CURRENT_HOT_TIER_VERSION.to_string()); - hot_tier_manager - .put_hot_tier(&stream_name, &mut hottier) - .await?; - let storage = CONFIG.storage().get_object_store(); - let mut stream_metadata = storage.get_object_store_format(&stream_name).await?; - stream_metadata.hot_tier_enabled = Some(true); - storage - .put_stream_manifest(&stream_name, &stream_metadata) - .await?; - } + let Some(hot_tier_manager) = HotTierManager::global() else { + return Err(StreamError::HotTierNotEnabled(stream_name)); + }; + let existing_hot_tier_used_size = hot_tier_manager + .validate_hot_tier_size(&stream_name, hottier.size) + .await?; + hottier.used_size = existing_hot_tier_used_size; + hottier.available_size = hottier.size; + hottier.version = Some(CURRENT_HOT_TIER_VERSION.to_string()); + hot_tier_manager + .put_hot_tier(&stream_name, &mut hottier) + .await?; + let storage = CONFIG.storage().get_object_store(); + let mut stream_metadata = storage.get_object_store_format(&stream_name).await?; + stream_metadata.hot_tier_enabled = true; + storage + .put_stream_manifest(&stream_name, &stream_metadata) + .await?; Ok(( format!("hot tier set for stream {stream_name}"), @@ -662,22 +655,12 @@ pub async fn get_stream_hot_tier(stream_name: Path) -> Result { err.status().unwrap_or(StatusCode::INTERNAL_SERVER_ERROR) } - StreamError::HotTierNotEnabled(_) => StatusCode::BAD_REQUEST, - StreamError::InvalidHotTierConfig(_) => StatusCode::BAD_REQUEST, + StreamError::HotTierNotEnabled(_) => StatusCode::FORBIDDEN, StreamError::HotTierValidation(_) => StatusCode::BAD_REQUEST, StreamError::HotTierError(_) => StatusCode::INTERNAL_SERVER_ERROR, } diff --git a/src/hottier.rs b/src/hottier.rs index 5ad8b9fbd..329048749 100644 --- a/src/hottier.rs +++ b/src/hottier.rs @@ -27,9 +27,9 @@ use crate::{ catalog::manifest::{File, Manifest}, handlers::http::cluster::INTERNAL_STREAM_NAME, metadata::{error::stream_info::MetadataError, STREAM_INFO}, - option::{validation::bytes_to_human_size, CONFIG}, + option::CONFIG, storage::{ObjectStorage, ObjectStorageError}, - utils::extract_datetime, + utils::{extract_datetime, human_size::bytes_to_human_size}, validator::error::HotTierValidationError, }; use chrono::NaiveDate; @@ -56,36 +56,39 @@ pub const CURRENT_HOT_TIER_VERSION: &str = "v2"; #[derive(Debug, serde::Deserialize, serde::Serialize)] pub struct StreamHotTier { pub version: Option, - pub size: String, - #[serde(default)] - pub used_size: String, - #[serde(default)] - pub available_size: String, + #[serde(with = "crate::utils::human_size")] + pub size: u64, + #[serde(default, with = "crate::utils::human_size")] + pub used_size: u64, + #[serde(default, with = "crate::utils::human_size")] + pub available_size: u64, #[serde(skip_serializing_if = "Option::is_none")] pub oldest_date_time_entry: Option, } pub struct HotTierManager { filesystem: LocalFileSystem, - hot_tier_path: PathBuf, + hot_tier_path: &'static Path, } impl HotTierManager { + pub fn new(hot_tier_path: &'static Path) -> Self { + std::fs::create_dir_all(hot_tier_path).unwrap(); + HotTierManager { + filesystem: LocalFileSystem::new(), + hot_tier_path, + } + } + + /// Get a global pub fn global() -> Option<&'static HotTierManager> { static INSTANCE: OnceCell = OnceCell::new(); - let hot_tier_path = &CONFIG.options.hot_tier_storage_path; - if hot_tier_path.is_none() { - return None; - } - Some(INSTANCE.get_or_init(|| { - let hot_tier_path = hot_tier_path.as_ref().unwrap().clone(); - std::fs::create_dir_all(&hot_tier_path).unwrap(); - HotTierManager { - filesystem: LocalFileSystem::new(), - hot_tier_path, - } - })) + CONFIG + .options + .hot_tier_storage_path + .as_ref() + .map(|hot_tier_path| INSTANCE.get_or_init(|| HotTierManager::new(hot_tier_path))) } ///get the total hot tier size for all streams @@ -98,8 +101,8 @@ impl HotTierManager { for stream in STREAM_INFO.list_streams() { if self.check_stream_hot_tier_exists(&stream) && stream != current_stream { let stream_hot_tier = self.get_hot_tier(&stream).await?; - total_hot_tier_size += &stream_hot_tier.size.parse::().unwrap(); - total_hot_tier_used_size += stream_hot_tier.used_size.parse::().unwrap(); + total_hot_tier_size += &stream_hot_tier.size; + total_hot_tier_used_size += stream_hot_tier.used_size; } } Ok((total_hot_tier_size, total_hot_tier_used_size)) @@ -112,14 +115,13 @@ impl HotTierManager { pub async fn validate_hot_tier_size( &self, stream: &str, - stream_hot_tier_size: &str, + stream_hot_tier_size: u64, ) -> Result { - let stream_hot_tier_size = stream_hot_tier_size.parse::().unwrap(); let mut existing_hot_tier_used_size = 0; if self.check_stream_hot_tier_exists(stream) { //delete existing hot tier if its size is less than the updated hot tier size else return error let existing_hot_tier = self.get_hot_tier(stream).await?; - existing_hot_tier_used_size = existing_hot_tier.used_size.parse::().unwrap(); + existing_hot_tier_used_size = existing_hot_tier.used_size; if stream_hot_tier_size < existing_hot_tier_used_size { return Err(HotTierError::ObjectStorageError(ObjectStorageError::Custom(format!( @@ -134,7 +136,9 @@ impl HotTierManager { total_space, used_space, .. - } = get_disk_usage().expect("Codepath should only be hit if hottier is enabled"); + } = self + .get_disk_usage() + .expect("Codepath should only be hit if hottier is enabled"); let (total_hot_tier_size, total_hot_tier_used_size) = self.get_hot_tiers_size(stream).await?; @@ -163,37 +167,29 @@ impl HotTierManager { ///get the hot tier metadata file for the stream pub async fn get_hot_tier(&self, stream: &str) -> Result { if !self.check_stream_hot_tier_exists(stream) { - return Err(HotTierError::HotTierValidationError( - HotTierValidationError::NotFound(stream.to_owned()), - )); + return Err(HotTierValidationError::NotFound(stream.to_owned()).into()); } - let path = hot_tier_file_path(&self.hot_tier_path, stream)?; - let res = self + let path = self.hot_tier_file_path(stream)?; + let bytes = self .filesystem .get(&path) .and_then(|resp| resp.bytes()) - .await; - match res { - Ok(bytes) => { - let mut stream_hot_tier: StreamHotTier = serde_json::from_slice(&bytes)?; - let oldest_date_time_entry = self.get_oldest_date_time_entry(stream).await?; - stream_hot_tier.oldest_date_time_entry = oldest_date_time_entry; - Ok(stream_hot_tier) - } - Err(err) => Err(err.into()), - } + .await?; + + let mut stream_hot_tier: StreamHotTier = serde_json::from_slice(&bytes)?; + stream_hot_tier.oldest_date_time_entry = self.get_oldest_date_time_entry(stream).await?; + + Ok(stream_hot_tier) } pub async fn delete_hot_tier(&self, stream: &str) -> Result<(), HotTierError> { - if self.check_stream_hot_tier_exists(stream) { - let path = self.hot_tier_path.join(stream); - fs::remove_dir_all(path).await?; - Ok(()) - } else { - Err(HotTierError::HotTierValidationError( - HotTierValidationError::NotFound(stream.to_owned()), - )) + if !self.check_stream_hot_tier_exists(stream) { + return Err(HotTierValidationError::NotFound(stream.to_owned()).into()); } + let path = self.hot_tier_path.join(stream); + fs::remove_dir_all(path).await?; + + Ok(()) } ///put the hot tier metadata file for the stream @@ -203,12 +199,26 @@ impl HotTierManager { stream: &str, hot_tier: &mut StreamHotTier, ) -> Result<(), HotTierError> { - let path = hot_tier_file_path(&self.hot_tier_path, stream)?; + let path = self.hot_tier_file_path(stream)?; let bytes = serde_json::to_vec(&hot_tier)?.into(); self.filesystem.put(&path, bytes).await?; Ok(()) } + /// get the hot tier file path for the stream + pub fn hot_tier_file_path( + &self, + stream: &str, + ) -> Result { + let path = self + .hot_tier_path + .join(stream) + .join(STREAM_HOT_TIER_FILENAME); + let path = object_store::path::Path::from_absolute_path(path)?; + + Ok(path) + } + ///schedule the download of the hot tier files from S3 every minute pub fn download_from_s3<'a>(&'a self) -> Result<(), HotTierError> where @@ -235,17 +245,14 @@ impl HotTierManager { ///sync the hot tier files from S3 to the hot tier directory for all streams async fn sync_hot_tier(&self) -> Result<(), HotTierError> { - let streams = STREAM_INFO.list_streams(); - let sync_hot_tier_tasks = FuturesUnordered::new(); - for stream in streams { + let mut sync_hot_tier_tasks = FuturesUnordered::new(); + for stream in STREAM_INFO.list_streams() { if self.check_stream_hot_tier_exists(&stream) { - sync_hot_tier_tasks.push(async move { self.process_stream(stream).await }); - //self.process_stream(stream).await?; + sync_hot_tier_tasks.push(self.process_stream(stream)); } } - let res: Vec<_> = sync_hot_tier_tasks.collect().await; - for res in res { + while let Some(res) = sync_hot_tier_tasks.next().await { if let Err(err) = res { error!("Failed to run hot tier sync task {err:?}"); return Err(err); @@ -258,7 +265,7 @@ impl HotTierManager { /// delete the files from the hot tier directory if the available date range is outside the hot tier range async fn process_stream(&self, stream: String) -> Result<(), HotTierError> { let stream_hot_tier = self.get_hot_tier(&stream).await?; - let mut parquet_file_size = stream_hot_tier.used_size.parse::().unwrap(); + let mut parquet_file_size = stream_hot_tier.used_size; let object_store = CONFIG.storage().get_object_store(); let mut s3_manifest_file_list = object_store.list_manifest_files(&stream).await?; @@ -350,7 +357,7 @@ impl HotTierManager { let mut file_processed = false; let mut stream_hot_tier = self.get_hot_tier(stream).await?; if !self.is_disk_available(parquet_file.file_size).await? - || stream_hot_tier.available_size.parse::().unwrap() <= parquet_file.file_size + || stream_hot_tier.available_size <= parquet_file.file_size { if !self .cleanup_hot_tier_old_data( @@ -363,7 +370,7 @@ impl HotTierManager { { return Ok(file_processed); } - *parquet_file_size = stream_hot_tier.used_size.parse::().unwrap(); + *parquet_file_size = stream_hot_tier.used_size; } let parquet_file_path = RelativePathBuf::from(parquet_file.file_path.clone()); fs::create_dir_all(parquet_path.parent().unwrap()).await?; @@ -375,27 +382,24 @@ impl HotTierManager { .await?; file.write_all(&parquet_data).await?; *parquet_file_size += parquet_file.file_size; - stream_hot_tier.used_size = parquet_file_size.to_string(); + stream_hot_tier.used_size = *parquet_file_size; - stream_hot_tier.available_size = (stream_hot_tier.available_size.parse::().unwrap() - - parquet_file.file_size) - .to_string(); + stream_hot_tier.available_size -= parquet_file.file_size; self.put_hot_tier(stream, &mut stream_hot_tier).await?; file_processed = true; - let mut hot_tier_manifest = self - .get_stream_hot_tier_manifest_for_date(stream, &date) - .await?; + let path = self.get_stream_path_for_date(stream, &date); + let mut hot_tier_manifest = HotTierManager::get_hot_tier_manifest_from_path(path).await?; hot_tier_manifest.files.push(parquet_file.clone()); hot_tier_manifest .files .sort_by_key(|file| file.file_path.clone()); // write the manifest file to the hot tier directory let manifest_path = self - .hot_tier_path - .join(stream) - .join(format!("date={}/hottier.manifest.json", date)); + .get_stream_path_for_date(stream, &date) + .join("hottier.manifest.json"); fs::create_dir_all(manifest_path.parent().unwrap()).await?; fs::write(manifest_path, serde_json::to_vec(&hot_tier_manifest)?).await?; + Ok(file_processed) } @@ -403,54 +407,66 @@ impl HotTierManager { pub async fn fetch_hot_tier_dates(&self, stream: &str) -> Result, HotTierError> { let mut date_list = Vec::new(); let path = self.hot_tier_path.join(stream); - if path.exists() { - let directories = ReadDirStream::new(fs::read_dir(&path).await?); - let dates: Vec = directories.try_collect().await?; - for date in dates { - if !date.path().is_dir() { - continue; - } - let date = date.file_name().into_string().unwrap(); - date_list.push( - NaiveDate::parse_from_str(date.trim_start_matches("date="), "%Y-%m-%d") - .unwrap(), - ); + if !path.exists() { + return Ok(date_list); + } + + let directories = fs::read_dir(&path).await?; + let mut dates = ReadDirStream::new(directories); + while let Some(date) = dates.next().await { + let date = date?; + if !date.path().is_dir() { + continue; } + let date = NaiveDate::parse_from_str( + date.file_name() + .to_string_lossy() + .trim_start_matches("date="), + "%Y-%m-%d", + ) + .unwrap(); + date_list.push(date); } date_list.sort(); + Ok(date_list) } - ///get hot tier manifest for the stream and date - pub async fn get_stream_hot_tier_manifest_for_date( - &self, - stream: &str, - date: &NaiveDate, - ) -> Result { + ///get hot tier manifest on path + pub async fn get_hot_tier_manifest_from_path(path: PathBuf) -> Result { + if !path.exists() { + return Ok(Manifest::default()); + } + + // List the directories and prepare the hot tier manifest + let mut date_dirs = fs::read_dir(&path).await?; let mut hot_tier_manifest = Manifest::default(); - let path = self - .hot_tier_path - .join(stream) - .join(format!("date={}", date)); - if path.exists() { - let date_dirs = ReadDirStream::new(fs::read_dir(&path).await?); - let manifest_files: Vec = date_dirs.try_collect().await?; - for manifest in manifest_files { - if !manifest - .file_name() - .to_string_lossy() - .ends_with(".manifest.json") - { - continue; - } - let file = fs::read(manifest.path()).await?; - let manifest: Manifest = serde_json::from_slice(&file)?; - hot_tier_manifest.files.extend(manifest.files); + + // Avoid unnecessary checks and keep only valid manifest files + while let Some(manifest) = date_dirs.next_entry().await? { + if !manifest + .file_name() + .to_string_lossy() + .ends_with(".manifest.json") + { + continue; } + // Deserialize each manifest file and extend the hot tier manifest with its files + let file = fs::read(manifest.path()).await?; + let manifest: Manifest = serde_json::from_slice(&file)?; + hot_tier_manifest.files.extend(manifest.files); } + Ok(hot_tier_manifest) } + /// get hot tier path for the stream and date + pub fn get_stream_path_for_date(&self, stream: &str, date: &NaiveDate) -> PathBuf { + self.hot_tier_path + .join(stream) + .join(format!("date={}", date)) + } + /// Returns the list of manifest files present in hot tier directory for the stream pub async fn get_hot_tier_manifest_files( &self, @@ -488,17 +504,29 @@ impl HotTierManager { &self, stream: &str, ) -> Result, HotTierError> { - let mut hot_tier_parquet_files: Vec = Vec::new(); + // Fetch list of dates for the given stream let date_list = self.fetch_hot_tier_dates(stream).await?; + + // Create an unordered iter of futures to async collect files + let mut tasks = FuturesUnordered::new(); + + // For each date, fetch the manifest and extract parquet files for date in date_list { - let manifest = self - .get_stream_hot_tier_manifest_for_date(stream, &date) - .await?; + let path = self.get_stream_path_for_date(stream, &date); + tasks.push(async move { + HotTierManager::get_hot_tier_manifest_from_path(path) + .await + .map(|manifest| manifest.files.clone()) + .unwrap_or_default() // If fetching manifest fails, return an empty vector + }); + } - for parquet_file in manifest.files { - hot_tier_parquet_files.push(parquet_file.clone()); - } + // Collect parquet files for all dates + let mut hot_tier_parquet_files: Vec = vec![]; + while let Some(files) = tasks.next().await { + hot_tier_parquet_files.extend(files); } + Ok(hot_tier_parquet_files) } @@ -527,11 +555,7 @@ impl HotTierManager { let mut delete_successful = false; let dates = self.fetch_hot_tier_dates(stream).await?; 'loop_dates: for date in dates { - let date_str = date.to_string(); - let path = &self - .hot_tier_path - .join(stream) - .join(format!("date={}", date_str)); + let path = self.get_stream_path_for_date(stream, &date); if !path.exists() { continue; } @@ -553,12 +577,7 @@ impl HotTierManager { 'loop_files: while let Some(file_to_delete) = manifest.files.pop() { let file_size = file_to_delete.file_size; - let path_to_delete = CONFIG - .options - .hot_tier_storage_path - .as_ref() - .unwrap() - .join(&file_to_delete.file_path); + let path_to_delete = self.hot_tier_path.join(&file_to_delete.file_path); if path_to_delete.exists() { if let (Some(download_date_time), Some(delete_date_time)) = ( @@ -574,20 +593,17 @@ impl HotTierManager { fs::write(manifest_file.path(), serde_json::to_vec(&manifest)?).await?; fs::remove_dir_all(path_to_delete.parent().unwrap()).await?; - delete_empty_directory_hot_tier(path_to_delete.parent().unwrap()).await?; - - stream_hot_tier.used_size = - (stream_hot_tier.used_size.parse::().unwrap() - file_size) - .to_string(); - stream_hot_tier.available_size = - (stream_hot_tier.available_size.parse::().unwrap() + file_size) - .to_string(); + delete_empty_directory_hot_tier( + path_to_delete.parent().unwrap().to_path_buf(), + ) + .await?; + + stream_hot_tier.used_size -= file_size; + stream_hot_tier.available_size += file_size; self.put_hot_tier(stream, stream_hot_tier).await?; delete_successful = true; - if stream_hot_tier.available_size.parse::().unwrap() - <= parquet_file_size - { + if stream_hot_tier.available_size <= parquet_file_size { continue 'loop_files; } else { break 'loop_dates; @@ -609,7 +625,7 @@ impl HotTierManager { total_space, available_space, used_space, - }) = get_disk_usage() + }) = self.get_disk_usage() { if available_space < size_to_download { return Ok(false); @@ -635,11 +651,7 @@ impl HotTierManager { } for date in date_list { - let path = self - .hot_tier_path - .join(stream) - .join(format!("date={}", date)); - + let path = self.get_stream_path_for_date(stream, &date); let hours_dir = ReadDirStream::new(fs::read_dir(&path).await?); let mut hours: Vec = hours_dir.try_collect().await?; hours.retain(|entry| { @@ -678,14 +690,12 @@ impl HotTierManager { } pub async fn put_internal_stream_hot_tier(&self) -> Result<(), HotTierError> { - if CONFIG.options.hot_tier_storage_path.is_some() - && !self.check_stream_hot_tier_exists(INTERNAL_STREAM_NAME) - { + if !self.check_stream_hot_tier_exists(INTERNAL_STREAM_NAME) { let mut stream_hot_tier = StreamHotTier { version: Some(CURRENT_HOT_TIER_VERSION.to_string()), - size: INTERNAL_STREAM_HOT_TIER_SIZE_BYTES.to_string(), - used_size: "0".to_string(), - available_size: INTERNAL_STREAM_HOT_TIER_SIZE_BYTES.to_string(), + size: INTERNAL_STREAM_HOT_TIER_SIZE_BYTES, + used_size: 0, + available_size: INTERNAL_STREAM_HOT_TIER_SIZE_BYTES, oldest_date_time_entry: None, }; self.put_hot_tier(INTERNAL_STREAM_NAME, &mut stream_hot_tier) @@ -693,15 +703,34 @@ impl HotTierManager { } Ok(()) } -} -/// get the hot tier file path for the stream -pub fn hot_tier_file_path( - root: impl AsRef, - stream: &str, -) -> Result { - let path = root.as_ref().join(stream).join(STREAM_HOT_TIER_FILENAME); - object_store::path::Path::from_absolute_path(path) + /// Get the disk usage for the hot tier storage path. If we have a three disk paritions + /// mounted as follows: + /// 1. / + /// 2. /home/parseable + /// 3. /home/example/ignore + /// + /// And parseable is running with `P_HOT_TIER_DIR` pointing to a directory in + /// `/home/parseable`, we should return the usage stats of the disk mounted there. + fn get_disk_usage(&self) -> Option { + let mut disks = Disks::new_with_refreshed_list(); + // Order the disk partitions by decreasing length of mount path + disks.sort_by_key(|disk| disk.mount_point().to_str().unwrap().len()); + disks.reverse(); + + for disk in disks.iter() { + // Returns disk utilisation of first matching mount point + if self.hot_tier_path.starts_with(disk.mount_point()) { + return Some(DiskUtil { + total_space: disk.total_space(), + available_space: disk.available_space(), + used_space: disk.total_space() - disk.available_space(), + }); + } + } + + None + } } struct DiskUtil { @@ -710,65 +739,32 @@ struct DiskUtil { used_space: u64, } -/// Get the disk usage for the hot tier storage path. If we have a three disk paritions -/// mounted as follows: -/// 1. / -/// 2. /home/parseable -/// 3. /home/example/ignore -/// -/// And parseable is running with `P_HOT_TIER_DIR` pointing to a directory in -/// `/home/parseable`, we should return the usage stats of the disk mounted there. -fn get_disk_usage() -> Option { - let path = CONFIG.options.hot_tier_storage_path.as_ref()?; - let mut disks = Disks::new_with_refreshed_list(); - // Order the disk partitions by decreasing length of mount path - disks.sort_by_key(|disk| disk.mount_point().to_str().unwrap().len()); - disks.reverse(); - - for disk in disks.iter() { - // Returns disk utilisation of first matching mount point - if path.starts_with(disk.mount_point()) { - return Some(DiskUtil { - total_space: disk.total_space(), - available_space: disk.available_space(), - used_space: disk.total_space() - disk.available_space(), - }); - } +async fn delete_empty_directory_hot_tier(path: PathBuf) -> io::Result<()> { + if !path.is_dir() { + return Ok(()); } + let mut read_dir = fs::read_dir(&path).await?; - None -} - -async fn delete_empty_directory_hot_tier(path: &Path) -> io::Result<()> { - async fn delete_helper(path: &Path) -> io::Result<()> { - if path.is_dir() { - let mut read_dir = fs::read_dir(path).await?; - let mut subdirs = vec![]; + let mut tasks = vec![]; + while let Some(entry) = read_dir.next_entry().await? { + let entry_path = entry.path(); + if entry_path.is_dir() { + tasks.push(delete_empty_directory_hot_tier(entry_path)); + } + } - while let Some(entry) = read_dir.next_entry().await? { - let entry_path = entry.path(); - if entry_path.is_dir() { - subdirs.push(entry_path); - } - } - let mut tasks = vec![]; - for subdir in &subdirs { - tasks.push(delete_empty_directory_hot_tier(subdir)); - } - futures::stream::iter(tasks) - .buffer_unordered(10) - .try_collect::>() - .await?; + futures::stream::iter(tasks) + .buffer_unordered(10) + .try_collect::>() + .await?; - // Re-check the directory after deleting its subdirectories - let mut read_dir = fs::read_dir(path).await?; - if read_dir.next_entry().await?.is_none() { - fs::remove_dir(path).await?; - } - } - Ok(()) + // Re-check the directory after deleting its subdirectories + let mut read_dir = fs::read_dir(&path).await?; + if read_dir.next_entry().await?.is_none() { + fs::remove_dir(&path).await?; } - delete_helper(path).await + + Ok(()) } #[derive(Debug, thiserror::Error)] diff --git a/src/metadata.rs b/src/metadata.rs index 182bc610a..87af45d65 100644 --- a/src/metadata.rs +++ b/src/metadata.rs @@ -74,7 +74,7 @@ pub struct LogStreamMetadata { pub time_partition_limit: Option, pub custom_partition: Option, pub static_schema_flag: bool, - pub hot_tier_enabled: Option, + pub hot_tier_enabled: bool, pub stream_type: Option, pub log_source: LogSource, } @@ -290,7 +290,7 @@ impl StreamInfo { let stream = map .get_mut(stream_name) .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string()))?; - stream.hot_tier_enabled = Some(enable); + stream.hot_tier_enabled = enable; Ok(()) } diff --git a/src/migration/mod.rs b/src/migration/mod.rs index 046a9dfc0..cfd83483d 100644 --- a/src/migration/mod.rs +++ b/src/migration/mod.rs @@ -24,9 +24,8 @@ mod stream_metadata_migration; use std::{fs::OpenOptions, sync::Arc}; use crate::{ - hottier::{HotTierManager, CURRENT_HOT_TIER_VERSION}, metadata::load_stream_metadata_on_server_start, - option::{validation::human_size_to_bytes, Config, Mode, CONFIG}, + option::{Config, Mode, CONFIG}, storage::{ object_storage::{parseable_json_path, schema_path, stream_json_path}, ObjectStorage, ObjectStorageError, PARSEABLE_METADATA_FILE_NAME, PARSEABLE_ROOT_DIRECTORY, @@ -137,40 +136,11 @@ pub async fn run_migration(config: &Config) -> anyhow::Result<()> { let streams = storage.list_streams().await?; for stream in streams { migration_stream(&stream.name, &*storage).await?; - if CONFIG.options.hot_tier_storage_path.is_some() { - migration_hot_tier(&stream.name).await?; - } } Ok(()) } -/// run the migration for hot tier -async fn migration_hot_tier(stream: &str) -> anyhow::Result<()> { - if let Some(hot_tier_manager) = HotTierManager::global() { - if hot_tier_manager.check_stream_hot_tier_exists(stream) { - let mut stream_hot_tier = hot_tier_manager.get_hot_tier(stream).await?; - if stream_hot_tier.version.is_none() { - stream_hot_tier.version = Some(CURRENT_HOT_TIER_VERSION.to_string()); - stream_hot_tier.size = human_size_to_bytes(&stream_hot_tier.size) - .unwrap() - .to_string(); - stream_hot_tier.available_size = - human_size_to_bytes(&stream_hot_tier.available_size) - .unwrap() - .to_string(); - stream_hot_tier.used_size = human_size_to_bytes(&stream_hot_tier.used_size) - .unwrap() - .to_string(); - hot_tier_manager - .put_hot_tier(stream, &mut stream_hot_tier) - .await?; - } - } - } - Ok(()) -} - async fn migration_stream(stream: &str, storage: &dyn ObjectStorage) -> anyhow::Result<()> { let mut arrow_schema: Schema = Schema::empty(); diff --git a/src/option.rs b/src/option.rs index e6c2d9200..86e198713 100644 --- a/src/option.rs +++ b/src/option.rs @@ -192,13 +192,10 @@ pub mod validation { env, io, net::ToSocketAddrs, path::{Path, PathBuf}, - str::FromStr, }; use path_clean::PathClean; - use human_size::{multiples, SpecificSize}; - #[cfg(any( all(target_os = "linux", target_arch = "x86_64"), all(target_os = "macos", target_arch = "aarch64") @@ -285,45 +282,6 @@ pub mod validation { } } - pub fn human_size_to_bytes(s: &str) -> Result { - fn parse_and_map( - s: &str, - ) -> Result { - SpecificSize::::from_str(s).map(|x| x.to_bytes()) - } - - let size = parse_and_map::(s) - .or(parse_and_map::(s)) - .or(parse_and_map::(s)) - .or(parse_and_map::(s)) - .or(parse_and_map::(s)) - .or(parse_and_map::(s)) - .map_err(|_| "Could not parse given size".to_string())?; - Ok(size) - } - - pub fn bytes_to_human_size(bytes: u64) -> String { - const KIB: u64 = 1024; - const MIB: u64 = KIB * 1024; - const GIB: u64 = MIB * 1024; - const TIB: u64 = GIB * 1024; - const PIB: u64 = TIB * 1024; - - if bytes < KIB { - format!("{} B", bytes) - } else if bytes < MIB { - format!("{:.2} KB", bytes as f64 / KIB as f64) - } else if bytes < GIB { - format!("{:.2} MiB", bytes as f64 / MIB as f64) - } else if bytes < TIB { - format!("{:.2} GiB", bytes as f64 / GIB as f64) - } else if bytes < PIB { - format!("{:.2} TiB", bytes as f64 / TIB as f64) - } else { - format!("{:.2} PiB", bytes as f64 / PIB as f64) - } - } - pub fn validate_disk_usage(max_disk_usage: &str) -> Result { if let Ok(max_disk_usage) = max_disk_usage.parse::() { if (0.0..=100.0).contains(&max_disk_usage) { diff --git a/src/storage/mod.rs b/src/storage/mod.rs index f86b55757..7098b85d3 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -113,8 +113,8 @@ pub struct ObjectStoreFormat { skip_serializing_if = "std::ops::Not::not" )] pub static_schema_flag: bool, - #[serde(skip_serializing_if = "Option::is_none")] - pub hot_tier_enabled: Option, + #[serde(default)] + pub hot_tier_enabled: bool, pub stream_type: Option, #[serde(default)] pub log_source: LogSource, @@ -217,7 +217,7 @@ impl Default for ObjectStoreFormat { time_partition_limit: None, custom_partition: None, static_schema_flag: false, - hot_tier_enabled: None, + hot_tier_enabled: false, log_source: LogSource::default(), } } diff --git a/src/utils/human_size.rs b/src/utils/human_size.rs new file mode 100644 index 000000000..3cf5f40c8 --- /dev/null +++ b/src/utils/human_size.rs @@ -0,0 +1,113 @@ +use std::str::FromStr; + +use human_size::{Any, SpecificSize}; +use serde::{de, Deserialize, Deserializer, Serializer}; + +#[derive(Debug, thiserror::Error)] +enum ParsingError { + #[error("Expected 'X' | 'X Bytes', but error: {0}")] + Int(#[from] std::num::ParseIntError), + #[error("Could not parse given string as human size, erro: {0}")] + HumanSize(#[from] human_size::ParsingError), +} + +// Function to convert human-readable size to bytes (already provided) +// NOTE: consider number values as byte count, e.g. "1234" is 1234 bytes. +fn human_size_to_bytes(s: &str) -> Result { + let s = s.trim(); + if let Some(s) = s.strip_suffix("Bytes") { + let size: u64 = s.trim().parse()?; + return Ok(size); + } else if let Ok(size) = s.parse() { + return Ok(size); + } + + fn parse_and_map(s: &str) -> Result { + SpecificSize::::from_str(s).map(|x| x.to_bytes()) + } + let size = parse_and_map::(s)?; + + Ok(size) +} + +// Function to convert bytes to human-readable size (already provided) +pub fn bytes_to_human_size(bytes: u64) -> String { + const KIB: u64 = 1024; + const MIB: u64 = KIB * 1024; + const GIB: u64 = MIB * 1024; + const TIB: u64 = GIB * 1024; + const PIB: u64 = TIB * 1024; + + if bytes < KIB { + format!("{} B", bytes) + } else if bytes < MIB { + format!("{:.2} KB", bytes as f64 / KIB as f64) + } else if bytes < GIB { + format!("{:.2} MiB", bytes as f64 / MIB as f64) + } else if bytes < TIB { + format!("{:.2} GiB", bytes as f64 / GIB as f64) + } else if bytes < PIB { + format!("{:.2} TiB", bytes as f64 / TIB as f64) + } else { + format!("{:.2} PiB", bytes as f64 / PIB as f64) + } +} + +pub fn serialize(bytes: &u64, serializer: S) -> Result +where + S: Serializer, +{ + // let human_readable = bytes_to_human_size(*bytes); + // NOTE: frontend expects the size in bytes + let human_readable = format!("{bytes} Bytes"); + serializer.serialize_str(&human_readable) +} + +pub fn deserialize<'de, D>(deserializer: D) -> Result +where + D: Deserializer<'de>, +{ + let s = String::deserialize(deserializer)?; + human_size_to_bytes(&s).map_err(de::Error::custom) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parse_numeric_input_without_unit() { + assert_eq!(human_size_to_bytes("1234").unwrap(), 1234); + } + + #[test] + fn parse_bytes_string_to_bytes() { + assert_eq!(human_size_to_bytes("1234 Bytes").unwrap(), 1234); + } + + #[test] + fn handle_empty_string_input() { + assert!(matches!( + human_size_to_bytes(""), + Err(ParsingError::HumanSize(_)) + )); + } + + #[test] + fn handle_byte_string_input_without_value() { + assert!(matches!( + human_size_to_bytes("Bytes"), + Err(ParsingError::Int(_)) + )); + } + + #[test] + fn convert_mebibyte_string_to_bytes() { + assert_eq!(human_size_to_bytes("1 MiB").unwrap(), 1048576); + } + + #[test] + fn parse_gigabyte_string_input() { + assert_eq!(human_size_to_bytes("1 GB").unwrap(), 1_000_000_000); + } +} diff --git a/src/utils/mod.rs b/src/utils/mod.rs index dd75504b7..87b528b6d 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -19,10 +19,12 @@ pub mod actix; pub mod arrow; pub mod header_parsing; +pub mod human_size; pub mod json; pub mod time; pub mod uid; pub mod update; + use crate::handlers::http::rbac::RBACError; use crate::option::CONFIG; use crate::rbac::role::{Action, Permission}; diff --git a/src/validator.rs b/src/validator.rs index bcbaefea6..3f1d07a08 100644 --- a/src/validator.rs +++ b/src/validator.rs @@ -23,8 +23,8 @@ use crate::alerts::rule::base::{NumericRule, StringRule}; use crate::alerts::rule::{ColumnRule, ConsecutiveNumericRule, ConsecutiveStringRule}; use crate::alerts::{Alerts, Rule}; use crate::hottier::MIN_STREAM_HOT_TIER_SIZE_BYTES; -use crate::option::validation::bytes_to_human_size; use crate::storage::StreamType; +use crate::utils::human_size::bytes_to_human_size; // Add more sql keywords here in lower case const DENIED_NAMES: &[&str] = &[ @@ -135,16 +135,16 @@ pub fn user_name(username: &str) -> Result<(), UsernameValidationError> { } pub fn hot_tier(size: &str) -> Result<(), HotTierValidationError> { - if let Ok(size) = size.parse::() { - if size < MIN_STREAM_HOT_TIER_SIZE_BYTES { - return Err(HotTierValidationError::Size(bytes_to_human_size( - MIN_STREAM_HOT_TIER_SIZE_BYTES, - ))); - } - Ok(()) - } else { - Err(HotTierValidationError::InvalidFormat) + let Ok(size) = size.parse::() else { + return Err(HotTierValidationError::InvalidFormat); + }; + if size < MIN_STREAM_HOT_TIER_SIZE_BYTES { + return Err(HotTierValidationError::Size(bytes_to_human_size( + MIN_STREAM_HOT_TIER_SIZE_BYTES, + ))); } + + Ok(()) } pub mod error {