Skip to content

Commit 7305873

Browse files
author
Devdutt Shenoi
committed
refactor: human_size parsing
1 parent 94c2189 commit 7305873

File tree

7 files changed

+121
-122
lines changed

7 files changed

+121
-122
lines changed

src/handlers/http/logstream.rs

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ use bytes::Bytes;
5050
use chrono::Utc;
5151
use http::{HeaderName, HeaderValue};
5252
use itertools::Itertools;
53-
use serde_json::Value;
53+
use serde_json::{json, Value};
5454
use std::collections::HashMap;
5555
use std::fs;
5656
use std::num::NonZeroU32;
@@ -630,10 +630,10 @@ pub async fn put_stream_hot_tier(
630630
STREAM_INFO.set_hot_tier(&stream_name, true)?;
631631
if let Some(hot_tier_manager) = HotTierManager::global() {
632632
let existing_hot_tier_used_size = hot_tier_manager
633-
.validate_hot_tier_size(&stream_name, &hottier.size)
633+
.validate_hot_tier_size(&stream_name, hottier.size)
634634
.await?;
635-
hottier.used_size = existing_hot_tier_used_size.to_string();
636-
hottier.available_size = hottier.size.to_string();
635+
hottier.used_size = existing_hot_tier_used_size;
636+
hottier.available_size = hottier.size;
637637
hottier.version = Some(CURRENT_HOT_TIER_VERSION.to_string());
638638
hot_tier_manager
639639
.put_hot_tier(&stream_name, &mut hottier)
@@ -674,11 +674,23 @@ pub async fn get_stream_hot_tier(req: HttpRequest) -> Result<impl Responder, Str
674674
}
675675

676676
if let Some(hot_tier_manager) = HotTierManager::global() {
677-
let mut hot_tier = hot_tier_manager.get_hot_tier(&stream_name).await?;
678-
hot_tier.size = format!("{} {}", hot_tier.size, "Bytes");
679-
hot_tier.used_size = format!("{} Bytes", hot_tier.used_size);
680-
hot_tier.available_size = format!("{} Bytes", hot_tier.available_size);
681-
Ok((web::Json(hot_tier), StatusCode::OK))
677+
let StreamHotTier {
678+
version,
679+
size,
680+
used_size,
681+
available_size,
682+
oldest_date_time_entry,
683+
} = hot_tier_manager.get_hot_tier(&stream_name).await?;
684+
let mut json = json!({
685+
"version": version,
686+
"size": format!("{size} Bytes"),
687+
"used_size": format!("{used_size} Bytes"),
688+
"available_size": format!("{available_size} Bytes"),
689+
});
690+
if let Some(entry) = oldest_date_time_entry {
691+
json["oldest_date_time_entry"] = serde_json::Value::String(entry);
692+
}
693+
Ok((web::Json(json), StatusCode::OK))
682694
} else {
683695
Err(StreamError::Custom {
684696
msg: format!("hot tier not initialised for stream {}", stream_name),

src/hottier.rs

Lines changed: 31 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,9 @@ use crate::{
2727
catalog::manifest::{File, Manifest},
2828
handlers::http::cluster::INTERNAL_STREAM_NAME,
2929
metadata::{error::stream_info::MetadataError, STREAM_INFO},
30-
option::{validation::bytes_to_human_size, CONFIG},
30+
option::CONFIG,
3131
storage::{ObjectStorage, ObjectStorageError},
32-
utils::extract_datetime,
32+
utils::{extract_datetime, human_size::bytes_to_human_size},
3333
validator::error::HotTierValidationError,
3434
};
3535
use chrono::NaiveDate;
@@ -56,10 +56,12 @@ pub const CURRENT_HOT_TIER_VERSION: &str = "v2";
5656
#[derive(Debug, serde::Deserialize, serde::Serialize)]
5757
pub struct StreamHotTier {
5858
pub version: Option<String>,
59-
pub size: String,
60-
pub used_size: String,
61-
pub available_size: String,
62-
#[serde(skip_serializing_if = "Option::is_none")]
59+
#[serde(with = "crate::utils::human_size")]
60+
pub size: u64,
61+
#[serde(with = "crate::utils::human_size")]
62+
pub used_size: u64,
63+
#[serde(with = "crate::utils::human_size")]
64+
pub available_size: u64,
6365
pub oldest_date_time_entry: Option<String>,
6466
}
6567

@@ -97,8 +99,8 @@ impl HotTierManager {
9799
for stream in STREAM_INFO.list_streams() {
98100
if self.check_stream_hot_tier_exists(&stream) && stream != current_stream {
99101
let stream_hot_tier = self.get_hot_tier(&stream).await?;
100-
total_hot_tier_size += &stream_hot_tier.size.parse::<u64>().unwrap();
101-
total_hot_tier_used_size += stream_hot_tier.used_size.parse::<u64>().unwrap();
102+
total_hot_tier_size += &stream_hot_tier.size;
103+
total_hot_tier_used_size += stream_hot_tier.used_size;
102104
}
103105
}
104106
Ok((total_hot_tier_size, total_hot_tier_used_size))
@@ -111,14 +113,13 @@ impl HotTierManager {
111113
pub async fn validate_hot_tier_size(
112114
&self,
113115
stream: &str,
114-
stream_hot_tier_size: &str,
116+
stream_hot_tier_size: u64,
115117
) -> Result<u64, HotTierError> {
116-
let stream_hot_tier_size = stream_hot_tier_size.parse::<u64>().unwrap();
117118
let mut existing_hot_tier_used_size = 0;
118119
if self.check_stream_hot_tier_exists(stream) {
119120
//delete existing hot tier if its size is less than the updated hot tier size else return error
120121
let existing_hot_tier = self.get_hot_tier(stream).await?;
121-
existing_hot_tier_used_size = existing_hot_tier.used_size.parse::<u64>().unwrap();
122+
existing_hot_tier_used_size = existing_hot_tier.used_size;
122123

123124
if stream_hot_tier_size < existing_hot_tier_used_size {
124125
return Err(HotTierError::ObjectStorageError(ObjectStorageError::Custom(format!(
@@ -169,20 +170,17 @@ impl HotTierManager {
169170
));
170171
}
171172
let path = hot_tier_file_path(&self.hot_tier_path, stream)?;
172-
let res = self
173+
let bytes = self
173174
.filesystem
174175
.get(&path)
175176
.and_then(|resp| resp.bytes())
176-
.await;
177-
match res {
178-
Ok(bytes) => {
179-
let mut stream_hot_tier: StreamHotTier = serde_json::from_slice(&bytes)?;
180-
let oldest_date_time_entry = self.get_oldest_date_time_entry(stream).await?;
181-
stream_hot_tier.oldest_date_time_entry = oldest_date_time_entry;
182-
Ok(stream_hot_tier)
183-
}
184-
Err(err) => Err(err.into()),
185-
}
177+
.await?;
178+
179+
let mut stream_hot_tier: StreamHotTier = serde_json::from_slice(&bytes)?;
180+
let oldest_date_time_entry = self.get_oldest_date_time_entry(stream).await?;
181+
stream_hot_tier.oldest_date_time_entry = oldest_date_time_entry;
182+
183+
Ok(stream_hot_tier)
186184
}
187185

188186
pub async fn delete_hot_tier(&self, stream: &str) -> Result<(), HotTierError> {
@@ -256,7 +254,7 @@ impl HotTierManager {
256254
/// delete the files from the hot tier directory if the available date range is outside the hot tier range
257255
async fn process_stream(&self, stream: String) -> Result<(), HotTierError> {
258256
let stream_hot_tier = self.get_hot_tier(&stream).await?;
259-
let mut parquet_file_size = stream_hot_tier.used_size.parse::<u64>().unwrap();
257+
let mut parquet_file_size = stream_hot_tier.used_size;
260258

261259
let object_store = CONFIG.storage().get_object_store();
262260
let mut s3_manifest_file_list = object_store.list_manifest_files(&stream).await?;
@@ -348,7 +346,7 @@ impl HotTierManager {
348346
let mut file_processed = false;
349347
let mut stream_hot_tier = self.get_hot_tier(stream).await?;
350348
if !self.is_disk_available(parquet_file.file_size).await?
351-
|| stream_hot_tier.available_size.parse::<u64>().unwrap() <= parquet_file.file_size
349+
|| stream_hot_tier.available_size <= parquet_file.file_size
352350
{
353351
if !self
354352
.cleanup_hot_tier_old_data(
@@ -361,7 +359,7 @@ impl HotTierManager {
361359
{
362360
return Ok(file_processed);
363361
}
364-
*parquet_file_size = stream_hot_tier.used_size.parse::<u64>().unwrap();
362+
*parquet_file_size = stream_hot_tier.used_size;
365363
}
366364
let parquet_file_path = RelativePathBuf::from(parquet_file.file_path.clone());
367365
fs::create_dir_all(parquet_path.parent().unwrap()).await?;
@@ -373,11 +371,9 @@ impl HotTierManager {
373371
.await?;
374372
file.write_all(&parquet_data).await?;
375373
*parquet_file_size += parquet_file.file_size;
376-
stream_hot_tier.used_size = parquet_file_size.to_string();
374+
stream_hot_tier.used_size = *parquet_file_size;
377375

378-
stream_hot_tier.available_size = (stream_hot_tier.available_size.parse::<u64>().unwrap()
379-
- parquet_file.file_size)
380-
.to_string();
376+
stream_hot_tier.available_size = stream_hot_tier.available_size - parquet_file.file_size;
381377
self.put_hot_tier(stream, &mut stream_hot_tier).await?;
382378
file_processed = true;
383379
let path = self.get_stream_path_for_date(stream, &date);
@@ -598,18 +594,12 @@ impl HotTierManager {
598594
fs::remove_dir_all(path_to_delete.parent().unwrap()).await?;
599595
delete_empty_directory_hot_tier(path_to_delete.parent().unwrap()).await?;
600596

601-
stream_hot_tier.used_size =
602-
(stream_hot_tier.used_size.parse::<u64>().unwrap() - file_size)
603-
.to_string();
604-
stream_hot_tier.available_size =
605-
(stream_hot_tier.available_size.parse::<u64>().unwrap() + file_size)
606-
.to_string();
597+
stream_hot_tier.used_size = stream_hot_tier.used_size - file_size;
598+
stream_hot_tier.available_size = stream_hot_tier.available_size + file_size;
607599
self.put_hot_tier(stream, stream_hot_tier).await?;
608600
delete_successful = true;
609601

610-
if stream_hot_tier.available_size.parse::<u64>().unwrap()
611-
<= parquet_file_size
612-
{
602+
if stream_hot_tier.available_size <= parquet_file_size {
613603
continue 'loop_files;
614604
} else {
615605
break 'loop_dates;
@@ -699,9 +689,9 @@ impl HotTierManager {
699689
if !self.check_stream_hot_tier_exists(INTERNAL_STREAM_NAME) {
700690
let mut stream_hot_tier = StreamHotTier {
701691
version: Some(CURRENT_HOT_TIER_VERSION.to_string()),
702-
size: INTERNAL_STREAM_HOT_TIER_SIZE_BYTES.to_string(),
703-
used_size: "0".to_string(),
704-
available_size: INTERNAL_STREAM_HOT_TIER_SIZE_BYTES.to_string(),
692+
size: INTERNAL_STREAM_HOT_TIER_SIZE_BYTES,
693+
used_size: 0,
694+
available_size: INTERNAL_STREAM_HOT_TIER_SIZE_BYTES,
705695
oldest_date_time_entry: None,
706696
};
707697
self.put_hot_tier(INTERNAL_STREAM_NAME, &mut stream_hot_tier)

src/migration/mod.rs

Lines changed: 7 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,8 @@ mod stream_metadata_migration;
2424
use std::{fs::OpenOptions, sync::Arc};
2525

2626
use crate::{
27-
hottier::{HotTierManager, CURRENT_HOT_TIER_VERSION},
2827
metadata::load_stream_metadata_on_server_start,
29-
option::{validation::human_size_to_bytes, Config, Mode, CONFIG},
28+
option::{Config, Mode, CONFIG},
3029
storage::{
3130
object_storage::{parseable_json_path, schema_path, stream_json_path},
3231
ObjectStorage, ObjectStorageError, PARSEABLE_METADATA_FILE_NAME, PARSEABLE_ROOT_DIRECTORY,
@@ -137,38 +136,17 @@ pub async fn run_migration(config: &Config) -> anyhow::Result<()> {
137136
let streams = storage.list_streams().await?;
138137
for stream in streams {
139138
migration_stream(&stream.name, &*storage).await?;
140-
if CONFIG.options.hot_tier_storage_path.is_some() {
141-
migration_hot_tier(&stream.name).await?;
142-
}
143139
}
144140

145141
Ok(())
146142
}
147143

148-
/// run the migration for hot tier
149-
async fn migration_hot_tier(stream: &str) -> anyhow::Result<()> {
150-
if let Some(hot_tier_manager) = HotTierManager::global() {
151-
if hot_tier_manager.check_stream_hot_tier_exists(stream) {
152-
let mut stream_hot_tier = hot_tier_manager.get_hot_tier(stream).await?;
153-
if stream_hot_tier.version.is_none() {
154-
stream_hot_tier.version = Some(CURRENT_HOT_TIER_VERSION.to_string());
155-
stream_hot_tier.size = human_size_to_bytes(&stream_hot_tier.size)
156-
.unwrap()
157-
.to_string();
158-
stream_hot_tier.available_size =
159-
human_size_to_bytes(&stream_hot_tier.available_size)
160-
.unwrap()
161-
.to_string();
162-
stream_hot_tier.used_size = human_size_to_bytes(&stream_hot_tier.used_size)
163-
.unwrap()
164-
.to_string();
165-
hot_tier_manager
166-
.put_hot_tier(stream, &mut stream_hot_tier)
167-
.await?;
168-
}
169-
}
170-
}
171-
Ok(())
144+
#[derive(Debug, serde::Deserialize, serde::Serialize)]
145+
pub struct StreamHotTier {
146+
pub version: Option<String>,
147+
pub size: String,
148+
pub used_size: String,
149+
pub available_size: String,
172150
}
173151

174152
async fn migration_stream(stream: &str, storage: &dyn ObjectStorage) -> anyhow::Result<()> {

src/option.rs

Lines changed: 0 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -192,13 +192,10 @@ pub mod validation {
192192
env, io,
193193
net::ToSocketAddrs,
194194
path::{Path, PathBuf},
195-
str::FromStr,
196195
};
197196

198197
use path_clean::PathClean;
199198

200-
use human_size::{multiples, SpecificSize};
201-
202199
#[cfg(any(
203200
all(target_os = "linux", target_arch = "x86_64"),
204201
all(target_os = "macos", target_arch = "aarch64")
@@ -285,45 +282,6 @@ pub mod validation {
285282
}
286283
}
287284

288-
pub fn human_size_to_bytes(s: &str) -> Result<u64, String> {
289-
fn parse_and_map<T: human_size::Multiple>(
290-
s: &str,
291-
) -> Result<u64, human_size::ParsingError> {
292-
SpecificSize::<T>::from_str(s).map(|x| x.to_bytes())
293-
}
294-
295-
let size = parse_and_map::<multiples::Mebibyte>(s)
296-
.or(parse_and_map::<multiples::Megabyte>(s))
297-
.or(parse_and_map::<multiples::Gigibyte>(s))
298-
.or(parse_and_map::<multiples::Gigabyte>(s))
299-
.or(parse_and_map::<multiples::Tebibyte>(s))
300-
.or(parse_and_map::<multiples::Terabyte>(s))
301-
.map_err(|_| "Could not parse given size".to_string())?;
302-
Ok(size)
303-
}
304-
305-
pub fn bytes_to_human_size(bytes: u64) -> String {
306-
const KIB: u64 = 1024;
307-
const MIB: u64 = KIB * 1024;
308-
const GIB: u64 = MIB * 1024;
309-
const TIB: u64 = GIB * 1024;
310-
const PIB: u64 = TIB * 1024;
311-
312-
if bytes < KIB {
313-
format!("{} B", bytes)
314-
} else if bytes < MIB {
315-
format!("{:.2} KB", bytes as f64 / KIB as f64)
316-
} else if bytes < GIB {
317-
format!("{:.2} MiB", bytes as f64 / MIB as f64)
318-
} else if bytes < TIB {
319-
format!("{:.2} GiB", bytes as f64 / GIB as f64)
320-
} else if bytes < PIB {
321-
format!("{:.2} TiB", bytes as f64 / TIB as f64)
322-
} else {
323-
format!("{:.2} PiB", bytes as f64 / PIB as f64)
324-
}
325-
}
326-
327285
pub fn validate_disk_usage(max_disk_usage: &str) -> Result<f64, String> {
328286
if let Ok(max_disk_usage) = max_disk_usage.parse::<f64>() {
329287
if (0.0..=100.0).contains(&max_disk_usage) {

src/utils/human_size.rs

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
use std::str::FromStr;
2+
3+
use human_size::{multiples, SpecificSize};
4+
use serde::{de, Deserialize, Deserializer, Serializer};
5+
6+
// Function to convert human-readable size to bytes (already provided)
7+
pub fn human_size_to_bytes(s: &str) -> Result<u64, String> {
8+
fn parse_and_map<T: human_size::Multiple>(s: &str) -> Result<u64, human_size::ParsingError> {
9+
SpecificSize::<T>::from_str(s).map(|x| x.to_bytes())
10+
}
11+
12+
let size = parse_and_map::<multiples::Mebibyte>(s)
13+
.or(parse_and_map::<multiples::Megabyte>(s))
14+
.or(parse_and_map::<multiples::Gigibyte>(s))
15+
.or(parse_and_map::<multiples::Gigabyte>(s))
16+
.or(parse_and_map::<multiples::Tebibyte>(s))
17+
.or(parse_and_map::<multiples::Terabyte>(s))
18+
.map_err(|_| "Could not parse given size".to_string())?;
19+
Ok(size)
20+
}
21+
22+
// Function to convert bytes to human-readable size (already provided)
23+
pub fn bytes_to_human_size(bytes: u64) -> String {
24+
const KIB: u64 = 1024;
25+
const MIB: u64 = KIB * 1024;
26+
const GIB: u64 = MIB * 1024;
27+
const TIB: u64 = GIB * 1024;
28+
const PIB: u64 = TIB * 1024;
29+
30+
if bytes < KIB {
31+
format!("{} B", bytes)
32+
} else if bytes < MIB {
33+
format!("{:.2} KB", bytes as f64 / KIB as f64)
34+
} else if bytes < GIB {
35+
format!("{:.2} MiB", bytes as f64 / MIB as f64)
36+
} else if bytes < TIB {
37+
format!("{:.2} GiB", bytes as f64 / GIB as f64)
38+
} else if bytes < PIB {
39+
format!("{:.2} TiB", bytes as f64 / TIB as f64)
40+
} else {
41+
format!("{:.2} PiB", bytes as f64 / PIB as f64)
42+
}
43+
}
44+
45+
pub fn serialize<S>(bytes: &u64, serializer: S) -> Result<S::Ok, S::Error>
46+
where
47+
S: Serializer,
48+
{
49+
let human_readable = bytes_to_human_size(*bytes);
50+
serializer.serialize_str(&human_readable)
51+
}
52+
53+
pub fn deserialize<'de, D>(deserializer: D) -> Result<u64, D::Error>
54+
where
55+
D: Deserializer<'de>,
56+
{
57+
let s = String::deserialize(deserializer)?;
58+
human_size_to_bytes(&s).map_err(|e| de::Error::custom(e))
59+
}

0 commit comments

Comments
 (0)