Skip to content

Commit 900bf47

Browse files
enhancement: change sizes to bytes in hot tier (#885)
in PUT /logstream/{logstream}/hottier request body: { "size": "10737418240 Bytes" } GET /logstream/{logstream}/hottier response body: { "size": "1073741824 Bytes", "used_size": "121798121 Bytes", "available_size": "951943703 Bytes", "oldest_date_time_entry": "2024-08-10T11:45:00.000Z" }
1 parent ef8eb34 commit 900bf47

File tree

4 files changed

+127
-55
lines changed

4 files changed

+127
-55
lines changed

server/src/handlers/http/logstream.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,9 @@ use crate::handlers::{
2929
CUSTOM_PARTITION_KEY, STATIC_SCHEMA_FLAG, STREAM_TYPE_KEY, TIME_PARTITION_KEY,
3030
TIME_PARTITION_LIMIT_KEY, UPDATE_STREAM_KEY,
3131
};
32-
use crate::hottier::{HotTierManager, StreamHotTier};
32+
use crate::hottier::{HotTierManager, StreamHotTier, CURRENT_HOT_TIER_VERSION};
3333
use crate::metadata::STREAM_INFO;
3434
use crate::metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE};
35-
use crate::option::validation::bytes_to_human_size;
3635
use crate::option::{Mode, CONFIG};
3736
use crate::static_schema::{convert_static_schema_to_arrow_schema, StaticSchema};
3837
use crate::stats::{event_labels_date, storage_size_labels_date, Stats};
@@ -992,8 +991,9 @@ pub async fn put_stream_hot_tier(
992991
let existing_hot_tier_used_size = hot_tier_manager
993992
.validate_hot_tier_size(&stream_name, &hottier.size)
994993
.await?;
995-
hottier.used_size = Some(bytes_to_human_size(existing_hot_tier_used_size));
994+
hottier.used_size = Some(existing_hot_tier_used_size.to_string());
996995
hottier.available_size = Some(hottier.size.clone());
996+
hottier.version = Some(CURRENT_HOT_TIER_VERSION.to_string());
997997
hot_tier_manager
998998
.put_hot_tier(&stream_name, &mut hottier)
999999
.await?;
@@ -1030,7 +1030,10 @@ pub async fn get_stream_hot_tier(req: HttpRequest) -> Result<impl Responder, Str
10301030
}
10311031

10321032
if let Some(hot_tier_manager) = HotTierManager::global() {
1033-
let hot_tier = hot_tier_manager.get_hot_tier(&stream_name).await?;
1033+
let mut hot_tier = hot_tier_manager.get_hot_tier(&stream_name).await?;
1034+
hot_tier.size = format!("{} {}", hot_tier.size, "Bytes");
1035+
hot_tier.used_size = Some(format!("{} {}", hot_tier.used_size.unwrap(), "Bytes"));
1036+
hot_tier.available_size = Some(format!("{} {}", hot_tier.available_size.unwrap(), "Bytes"));
10341037
Ok((web::Json(hot_tier), StatusCode::OK))
10351038
} else {
10361039
Err(StreamError::Custom {
@@ -1172,7 +1175,7 @@ pub mod error {
11721175
HotTierNotEnabled(String),
11731176
#[error("failed to enable hottier due to err: {0}")]
11741177
InvalidHotTierConfig(serde_json::Error),
1175-
#[error("Hot tier validation failed due to {0}")]
1178+
#[error("Hot tier validation failed: {0}")]
11761179
HotTierValidation(#[from] HotTierValidationError),
11771180
#[error("{0}")]
11781181
HotTierError(#[from] HotTierError),

server/src/hottier.rs

Lines changed: 73 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,7 @@ use crate::{
2727
catalog::manifest::{File, Manifest},
2828
handlers::http::cluster::INTERNAL_STREAM_NAME,
2929
metadata::{error::stream_info::MetadataError, STREAM_INFO},
30-
option::{
31-
validation::{bytes_to_human_size, human_size_to_bytes},
32-
CONFIG,
33-
},
30+
option::{validation::bytes_to_human_size, CONFIG},
3431
storage::{ObjectStorage, ObjectStorageError},
3532
utils::extract_datetime,
3633
validator::error::HotTierValidationError,
@@ -53,9 +50,10 @@ pub const STREAM_HOT_TIER_FILENAME: &str = ".hot_tier.json";
5350
pub const MIN_STREAM_HOT_TIER_SIZE_BYTES: u64 = 10737418240; // 10 GiB
5451
const HOT_TIER_SYNC_DURATION: Interval = clokwerk::Interval::Minutes(1);
5552
pub const INTERNAL_STREAM_HOT_TIER_SIZE_BYTES: u64 = 10485760; //10 MiB
56-
53+
pub const CURRENT_HOT_TIER_VERSION: &str = "v2";
5754
#[derive(Debug, serde::Deserialize, serde::Serialize)]
5855
pub struct StreamHotTier {
56+
pub version: Option<String>,
5957
#[serde(rename = "size")]
6058
pub size: String,
6159
#[serde(skip_serializing_if = "Option::is_none")]
@@ -97,9 +95,13 @@ impl HotTierManager {
9795
for stream in STREAM_INFO.list_streams() {
9896
if self.check_stream_hot_tier_exists(&stream) && stream != current_stream {
9997
let stream_hot_tier = self.get_hot_tier(&stream).await?;
100-
total_hot_tier_size += human_size_to_bytes(&stream_hot_tier.size).unwrap();
101-
total_hot_tier_used_size +=
102-
human_size_to_bytes(&stream_hot_tier.used_size.unwrap()).unwrap();
98+
total_hot_tier_size += &stream_hot_tier.size.parse::<u64>().unwrap();
99+
total_hot_tier_used_size += &stream_hot_tier
100+
.used_size
101+
.clone()
102+
.unwrap()
103+
.parse::<u64>()
104+
.unwrap();
103105
}
104106
}
105107
Ok((total_hot_tier_size, total_hot_tier_used_size))
@@ -112,19 +114,21 @@ impl HotTierManager {
112114
pub async fn validate_hot_tier_size(
113115
&self,
114116
stream: &str,
115-
size: &str,
117+
stream_hot_tier_size: &str,
116118
) -> Result<u64, HotTierError> {
119+
let stream_hot_tier_size = stream_hot_tier_size.parse::<u64>().unwrap();
117120
let mut existing_hot_tier_used_size = 0;
118121
if self.check_stream_hot_tier_exists(stream) {
119122
//delete existing hot tier if its size is less than the updated hot tier size else return error
120123
let existing_hot_tier = self.get_hot_tier(stream).await?;
121124
existing_hot_tier_used_size =
122-
human_size_to_bytes(&existing_hot_tier.used_size.unwrap()).unwrap();
123-
if human_size_to_bytes(size) < human_size_to_bytes(&existing_hot_tier.size) {
125+
existing_hot_tier.used_size.unwrap().parse::<u64>().unwrap();
126+
127+
if stream_hot_tier_size < existing_hot_tier_used_size {
124128
return Err(HotTierError::ObjectStorageError(ObjectStorageError::Custom(format!(
125129
"Reducing hot tier size is not supported, failed to reduce the hot tier size from {} to {}",
126-
existing_hot_tier.size,
127-
size
130+
bytes_to_human_size(existing_hot_tier_used_size),
131+
bytes_to_human_size(stream_hot_tier_size)
128132
))));
129133
}
130134
}
@@ -134,7 +138,6 @@ impl HotTierManager {
134138
if let (Some(total_disk_space), _, Some(used_disk_space)) =
135139
(total_disk_space, available_disk_space, used_disk_space)
136140
{
137-
let stream_hot_tier_size = human_size_to_bytes(size).unwrap();
138141
let (total_hot_tier_size, total_hot_tier_used_size) =
139142
self.get_hot_tiers_size(stream).await?;
140143
let disk_threshold =
@@ -147,7 +150,7 @@ impl HotTierManager {
147150

148151
if stream_hot_tier_size as f64 > max_allowed_hot_tier_size {
149152
log::error!("disk_threshold: {}, used_disk_space: {}, total_hot_tier_used_size: {}, existing_hot_tier_used_size: {}, total_hot_tier_size: {}",
150-
disk_threshold, used_disk_space, total_hot_tier_used_size, existing_hot_tier_used_size, total_hot_tier_size);
153+
bytes_to_human_size(disk_threshold as u64), bytes_to_human_size(used_disk_space), bytes_to_human_size(total_hot_tier_used_size), bytes_to_human_size(existing_hot_tier_used_size), bytes_to_human_size(total_hot_tier_size));
151154
return Err(HotTierError::ObjectStorageError(ObjectStorageError::Custom(format!(
152155
"{} is the total usable disk space for hot tier, cannot set a bigger value.", bytes_to_human_size(max_allowed_hot_tier_size as u64)
153156
))));
@@ -255,8 +258,12 @@ impl HotTierManager {
255258
/// delete the files from the hot tier directory if the available date range is outside the hot tier range
256259
async fn process_stream(&self, stream: String) -> Result<(), HotTierError> {
257260
let stream_hot_tier = self.get_hot_tier(&stream).await?;
258-
let mut parquet_file_size =
259-
human_size_to_bytes(stream_hot_tier.used_size.as_ref().unwrap()).unwrap();
261+
let mut parquet_file_size = stream_hot_tier
262+
.used_size
263+
.as_ref()
264+
.unwrap()
265+
.parse::<u64>()
266+
.unwrap();
260267

261268
let object_store = CONFIG.storage().get_object_store();
262269
let mut s3_manifest_file_list = object_store.list_manifest_files(&stream).await?;
@@ -348,7 +355,12 @@ impl HotTierManager {
348355
let mut file_processed = false;
349356
let mut stream_hot_tier = self.get_hot_tier(stream).await?;
350357
if !self.is_disk_available(parquet_file.file_size).await?
351-
|| human_size_to_bytes(&stream_hot_tier.available_size.clone().unwrap()).unwrap()
358+
|| stream_hot_tier
359+
.available_size
360+
.as_ref()
361+
.unwrap()
362+
.parse::<u64>()
363+
.unwrap()
352364
<= parquet_file.file_size
353365
{
354366
if !self
@@ -362,8 +374,12 @@ impl HotTierManager {
362374
{
363375
return Ok(file_processed);
364376
}
365-
*parquet_file_size =
366-
human_size_to_bytes(&stream_hot_tier.used_size.clone().unwrap()).unwrap();
377+
*parquet_file_size = stream_hot_tier
378+
.used_size
379+
.as_ref()
380+
.unwrap()
381+
.parse::<u64>()
382+
.unwrap();
367383
}
368384
let parquet_file_path = RelativePathBuf::from(parquet_file.file_path.clone());
369385
fs::create_dir_all(parquet_path.parent().unwrap()).await?;
@@ -375,12 +391,18 @@ impl HotTierManager {
375391
.await?;
376392
file.write_all(&parquet_data).await?;
377393
*parquet_file_size += parquet_file.file_size;
378-
stream_hot_tier.used_size = Some(bytes_to_human_size(*parquet_file_size));
379-
380-
stream_hot_tier.available_size = Some(bytes_to_human_size(
381-
human_size_to_bytes(&stream_hot_tier.available_size.clone().unwrap()).unwrap()
382-
- parquet_file.file_size,
383-
));
394+
stream_hot_tier.used_size = Some(parquet_file_size.to_string());
395+
396+
stream_hot_tier.available_size = Some(
397+
(stream_hot_tier
398+
.available_size
399+
.as_ref()
400+
.unwrap()
401+
.parse::<u64>()
402+
.unwrap()
403+
- parquet_file.file_size)
404+
.to_string(),
405+
);
384406
self.put_hot_tier(stream, &mut stream_hot_tier).await?;
385407
file_processed = true;
386408
let mut hot_tier_manifest = self
@@ -583,20 +605,34 @@ impl HotTierManager {
583605
fs::remove_dir_all(path_to_delete.parent().unwrap()).await?;
584606
delete_empty_directory_hot_tier(path_to_delete.parent().unwrap()).await?;
585607

586-
stream_hot_tier.used_size = Some(bytes_to_human_size(
587-
human_size_to_bytes(&stream_hot_tier.used_size.clone().unwrap())
608+
stream_hot_tier.used_size = Some(
609+
(stream_hot_tier
610+
.used_size
611+
.as_ref()
588612
.unwrap()
589-
- file_size,
590-
));
591-
stream_hot_tier.available_size = Some(bytes_to_human_size(
592-
human_size_to_bytes(&stream_hot_tier.available_size.clone().unwrap())
613+
.parse::<u64>()
593614
.unwrap()
594-
+ file_size,
595-
));
615+
- file_size)
616+
.to_string(),
617+
);
618+
stream_hot_tier.available_size = Some(
619+
(stream_hot_tier
620+
.available_size
621+
.as_ref()
622+
.unwrap()
623+
.parse::<u64>()
624+
.unwrap()
625+
+ file_size)
626+
.to_string(),
627+
);
596628
self.put_hot_tier(stream, stream_hot_tier).await?;
597629
delete_successful = true;
598630

599-
if human_size_to_bytes(&stream_hot_tier.available_size.clone().unwrap())
631+
if stream_hot_tier
632+
.available_size
633+
.as_ref()
634+
.unwrap()
635+
.parse::<u64>()
600636
.unwrap()
601637
<= parquet_file_size
602638
{
@@ -679,7 +715,7 @@ impl HotTierManager {
679715
.to_string_lossy()
680716
.trim_start_matches("minute=")
681717
.to_string();
682-
let oldest_date_time = format!("{} {}:{}:00", date, hour_str, minute_str);
718+
let oldest_date_time = format!("{}T{}:{}:00.000Z", date, hour_str, minute_str);
683719
return Ok(Some(oldest_date_time));
684720
}
685721
}
@@ -693,6 +729,7 @@ impl HotTierManager {
693729
&& !self.check_stream_hot_tier_exists(INTERNAL_STREAM_NAME)
694730
{
695731
let mut stream_hot_tier = StreamHotTier {
732+
version: Some(CURRENT_HOT_TIER_VERSION.to_string()),
696733
size: INTERNAL_STREAM_HOT_TIER_SIZE_BYTES.to_string(),
697734
used_size: Some("0".to_string()),
698735
available_size: Some(INTERNAL_STREAM_HOT_TIER_SIZE_BYTES.to_string()),

server/src/migration.rs

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,9 @@ mod stream_metadata_migration;
2424
use std::{fs::OpenOptions, sync::Arc};
2525

2626
use crate::{
27+
hottier::{HotTierManager, CURRENT_HOT_TIER_VERSION},
2728
metadata::load_stream_metadata_on_server_start,
28-
option::{Config, Mode, CONFIG},
29+
option::{validation::human_size_to_bytes, Config, Mode, CONFIG},
2930
storage::{
3031
object_storage::{parseable_json_path, stream_json_path},
3132
ObjectStorage, ObjectStorageError, PARSEABLE_METADATA_FILE_NAME, PARSEABLE_ROOT_DIRECTORY,
@@ -111,12 +112,44 @@ pub async fn run_migration(config: &Config) -> anyhow::Result<()> {
111112
let streams = storage.list_streams().await?;
112113

113114
for stream in streams {
114-
migration_stream(&stream.name, &*storage).await?
115+
migration_stream(&stream.name, &*storage).await?;
116+
if CONFIG.parseable.hot_tier_storage_path.is_some() {
117+
migration_hot_tier(&stream.name).await?;
118+
}
115119
}
116120

117121
Ok(())
118122
}
119123

124+
/// run the migration for hot tier
125+
async fn migration_hot_tier(stream: &str) -> anyhow::Result<()> {
126+
if let Some(hot_tier_manager) = HotTierManager::global() {
127+
if hot_tier_manager.check_stream_hot_tier_exists(stream) {
128+
let mut stream_hot_tier = hot_tier_manager.get_hot_tier(stream).await?;
129+
if stream_hot_tier.version.is_none() {
130+
stream_hot_tier.version = Some(CURRENT_HOT_TIER_VERSION.to_string());
131+
stream_hot_tier.size = human_size_to_bytes(&stream_hot_tier.size)
132+
.unwrap()
133+
.to_string();
134+
stream_hot_tier.available_size = Some(
135+
human_size_to_bytes(&stream_hot_tier.available_size.unwrap())
136+
.unwrap()
137+
.to_string(),
138+
);
139+
stream_hot_tier.used_size = Some(
140+
human_size_to_bytes(&stream_hot_tier.used_size.unwrap())
141+
.unwrap()
142+
.to_string(),
143+
);
144+
hot_tier_manager
145+
.put_hot_tier(stream, &mut stream_hot_tier)
146+
.await?;
147+
}
148+
}
149+
}
150+
Ok(())
151+
}
152+
120153
async fn migration_stream(stream: &str, storage: &dyn ObjectStorage) -> anyhow::Result<()> {
121154
let mut arrow_schema: Schema = Schema::empty();
122155
let schema_path = RelativePathBuf::from_iter([stream, STREAM_ROOT_DIRECTORY, SCHEMA_FILE_NAME]);

server/src/validator.rs

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use crate::alerts::rule::base::{NumericRule, StringRule};
2323
use crate::alerts::rule::{ColumnRule, ConsecutiveNumericRule, ConsecutiveStringRule};
2424
use crate::alerts::{Alerts, Rule};
2525
use crate::hottier::MIN_STREAM_HOT_TIER_SIZE_BYTES;
26-
use crate::option::validation::{bytes_to_human_size, human_size_to_bytes};
26+
use crate::option::validation::bytes_to_human_size;
2727
use crate::storage::StreamType;
2828

2929
// Add more sql keywords here in lower case
@@ -149,17 +149,16 @@ pub fn user_name(username: &str) -> Result<(), UsernameValidationError> {
149149
}
150150

151151
pub fn hot_tier(size: &str) -> Result<(), HotTierValidationError> {
152-
if human_size_to_bytes(size).is_err() {
153-
return Err(HotTierValidationError::InvalidFormat);
154-
}
155-
156-
if human_size_to_bytes(size).unwrap() < MIN_STREAM_HOT_TIER_SIZE_BYTES {
157-
return Err(HotTierValidationError::Size(bytes_to_human_size(
158-
MIN_STREAM_HOT_TIER_SIZE_BYTES,
159-
)));
152+
if let Ok(size) = size.parse::<u64>() {
153+
if size < MIN_STREAM_HOT_TIER_SIZE_BYTES {
154+
return Err(HotTierValidationError::Size(bytes_to_human_size(
155+
MIN_STREAM_HOT_TIER_SIZE_BYTES,
156+
)));
157+
}
158+
Ok(())
159+
} else {
160+
Err(HotTierValidationError::InvalidFormat)
160161
}
161-
162-
Ok(())
163162
}
164163
pub mod error {
165164

@@ -211,7 +210,7 @@ pub mod error {
211210

212211
#[derive(Debug, thiserror::Error)]
213212
pub enum HotTierValidationError {
214-
#[error("Please provide size in human readable format, e.g 10GiB, 20GiB")]
213+
#[error("Please provide size in bytes")]
215214
InvalidFormat,
216215

217216
#[error("Stream should have atleast {0} size")]

0 commit comments

Comments
 (0)