Skip to content

refactor: clean up parts of the codebase #981

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 26 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/analytics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ pub struct Report {
memory_total_bytes: u64,
platform: String,
storage_mode: String,
server_mode: String,
server_mode: Mode,
version: String,
commit_hash: String,
active_ingestors: u64,
Expand Down Expand Up @@ -111,7 +111,7 @@ impl Report {
memory_total_bytes: mem_total,
platform: platform().to_string(),
storage_mode: CONFIG.get_storage_mode_string().to_string(),
server_mode: CONFIG.parseable.mode.to_string(),
server_mode: CONFIG.parseable.mode,
version: current().released_version.to_string(),
commit_hash: current().commit_hash,
active_ingestors: ingestor_metrics.0,
Expand Down
22 changes: 7 additions & 15 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ impl Cli {
.arg(
// RowGroupSize controls the number of rows present in one row group
// More rows = better compression but HIGHER Memory consumption during read/write
// 1048576 is the default value for DataFusion
// 1048576 is the default value for DataFusion
Arg::new(Self::ROW_GROUP_SIZE)
.long(Self::ROW_GROUP_SIZE)
.env("P_PARQUET_ROW_GROUP_SIZE")
Expand Down Expand Up @@ -591,20 +591,12 @@ impl FromArgMatches for Cli {
.get_one::<usize>(Self::ROW_GROUP_SIZE)
.cloned()
.expect("default for row_group size");
self.parquet_compression = match m
.get_one::<String>(Self::PARQUET_COMPRESSION_ALGO)
.expect("default for compression algo")
.as_str()
{
"uncompressed" => Compression::UNCOMPRESSED,
"snappy" => Compression::SNAPPY,
"gzip" => Compression::GZIP,
"lzo" => Compression::LZO,
"brotli" => Compression::BROTLI,
"lz4" => Compression::LZ4,
"zstd" => Compression::ZSTD,
_ => unreachable!(),
};
self.parquet_compression = serde_json::from_str(
m.get_one::<String>(Self::PARQUET_COMPRESSION_ALGO)
.expect("default for compression algo")
.as_str(),
)
.expect("unexpected compression algo");

let openid_client_id = m.get_one::<String>(Self::OPENID_CLIENT_ID).cloned();
let openid_client_secret = m.get_one::<String>(Self::OPENID_CLIENT_SECRET).cloned();
Expand Down
4 changes: 1 addition & 3 deletions src/handlers/http/kinesis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,14 @@ use std::collections::BTreeMap;
use std::str;

#[derive(Serialize, Deserialize, Debug)]
#[serde(rename = "camelCase")]
struct Message {
#[serde(rename = "records")]
records: Vec<Data>,
#[serde(rename = "requestId")]
request_id: String,
timestamp: u64,
}
#[derive(Serialize, Deserialize, Debug)]
struct Data {
#[serde(rename = "data")]
data: String,
}

Expand Down
8 changes: 4 additions & 4 deletions src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -651,8 +651,8 @@ pub async fn put_stream_hot_tier(
let existing_hot_tier_used_size = hot_tier_manager
.validate_hot_tier_size(&stream_name, &hottier.size)
.await?;
hottier.used_size = Some(existing_hot_tier_used_size.to_string());
hottier.available_size = Some(hottier.size.clone());
hottier.used_size = existing_hot_tier_used_size.to_string();
hottier.available_size = hottier.size.to_string();
hottier.version = Some(CURRENT_HOT_TIER_VERSION.to_string());
hot_tier_manager
.put_hot_tier(&stream_name, &mut hottier)
Expand Down Expand Up @@ -695,8 +695,8 @@ pub async fn get_stream_hot_tier(req: HttpRequest) -> Result<impl Responder, Str
if let Some(hot_tier_manager) = HotTierManager::global() {
let mut hot_tier = hot_tier_manager.get_hot_tier(&stream_name).await?;
hot_tier.size = format!("{} {}", hot_tier.size, "Bytes");
hot_tier.used_size = Some(format!("{} {}", hot_tier.used_size.unwrap(), "Bytes"));
hot_tier.available_size = Some(format!("{} {}", hot_tier.available_size.unwrap(), "Bytes"));
hot_tier.used_size = format!("{} Bytes", hot_tier.used_size);
hot_tier.available_size = format!("{} Bytes", hot_tier.available_size);
Ok((web::Json(hot_tier), StatusCode::OK))
} else {
Err(StreamError::Custom {
Expand Down
2 changes: 1 addition & 1 deletion src/handlers/http/middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ use crate::{
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Debug)]
#[serde(rename = "camelCase")]
struct Message {
#[serde(rename = "commonAttributes")]
common_attributes: CommonAttributes,
}

Expand Down
9 changes: 2 additions & 7 deletions src/handlers/http/modal/ingest_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ use crate::storage::PARSEABLE_ROOT_DIRECTORY;
use crate::sync;

use crate::{handlers::http::base_path, option::CONFIG};
use actix_web::body::MessageBody;
use actix_web::web;
use actix_web::web::resource;
use actix_web::Scope;
Expand Down Expand Up @@ -323,9 +322,7 @@ impl IngestServer {
.clone_from(&INGESTOR_META.domain_name);
store_data.port.clone_from(&INGESTOR_META.port);

let resource = serde_json::to_string(&store_data)?
.try_into_bytes()
.map_err(|err| anyhow!(err))?;
let resource = Bytes::from(serde_json::to_vec(&store_data)?);

// if pushing to object store fails propagate the error
return store
Expand All @@ -334,9 +331,7 @@ impl IngestServer {
.map_err(|err| anyhow!(err));
}
} else {
let resource = serde_json::to_string(&resource)?
.try_into_bytes()
.map_err(|err| anyhow!(err))?;
let resource = Bytes::from(serde_json::to_vec(&resource)?);

store.put_object(&path, resource).await?;
}
Expand Down
6 changes: 2 additions & 4 deletions src/handlers/http/modal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ impl IngestorMetadata {
#[cfg(test)]
mod test {
use actix_web::body::MessageBody;
use bytes::Bytes;
use rstest::rstest;

use super::{IngestorMetadata, DEFAULT_VERSION};
Expand Down Expand Up @@ -255,10 +256,7 @@ mod test {
"8002".to_string(),
);

let lhs = serde_json::to_string(&im)
.unwrap()
.try_into_bytes()
.unwrap();
let lhs = Bytes::from(serde_json::to_vec(&im).unwrap());
let rhs = br#"{"version":"v3","port":"8000","domain_name":"https://localhost:8000","bucket_name":"somebucket","token":"Basic YWRtaW46YWRtaW4=","ingestor_id":"ingestor_id","flight_port":"8002"}"#
.try_into_bytes()
.unwrap();
Expand Down
94 changes: 22 additions & 72 deletions src/hottier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,13 @@ pub const MIN_STREAM_HOT_TIER_SIZE_BYTES: u64 = 10737418240; // 10 GiB
const HOT_TIER_SYNC_DURATION: Interval = clokwerk::Interval::Minutes(1);
pub const INTERNAL_STREAM_HOT_TIER_SIZE_BYTES: u64 = 10485760; //10 MiB
pub const CURRENT_HOT_TIER_VERSION: &str = "v2";

#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub struct StreamHotTier {
pub version: Option<String>,
#[serde(rename = "size")]
pub size: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub used_size: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub available_size: Option<String>,
pub used_size: String,
pub available_size: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub oldest_date_time_entry: Option<String>,
}
Expand Down Expand Up @@ -98,12 +96,7 @@ impl HotTierManager {
if self.check_stream_hot_tier_exists(&stream) && stream != current_stream {
let stream_hot_tier = self.get_hot_tier(&stream).await?;
total_hot_tier_size += &stream_hot_tier.size.parse::<u64>().unwrap();
total_hot_tier_used_size += &stream_hot_tier
.used_size
.clone()
.unwrap()
.parse::<u64>()
.unwrap();
total_hot_tier_used_size += stream_hot_tier.used_size.parse::<u64>().unwrap();
}
}
Ok((total_hot_tier_size, total_hot_tier_used_size))
Expand All @@ -123,8 +116,7 @@ impl HotTierManager {
if self.check_stream_hot_tier_exists(stream) {
//delete existing hot tier if its size is less than the updated hot tier size else return error
let existing_hot_tier = self.get_hot_tier(stream).await?;
existing_hot_tier_used_size =
existing_hot_tier.used_size.unwrap().parse::<u64>().unwrap();
existing_hot_tier_used_size = existing_hot_tier.used_size.parse::<u64>().unwrap();

if stream_hot_tier_size < existing_hot_tier_used_size {
return Err(HotTierError::ObjectStorageError(ObjectStorageError::Custom(format!(
Expand Down Expand Up @@ -260,12 +252,7 @@ impl HotTierManager {
/// delete the files from the hot tier directory if the available date range is outside the hot tier range
async fn process_stream(&self, stream: String) -> Result<(), HotTierError> {
let stream_hot_tier = self.get_hot_tier(&stream).await?;
let mut parquet_file_size = stream_hot_tier
.used_size
.as_ref()
.unwrap()
.parse::<u64>()
.unwrap();
let mut parquet_file_size = stream_hot_tier.used_size.parse::<u64>().unwrap();

let object_store = CONFIG.storage().get_object_store();
let mut s3_manifest_file_list = object_store.list_manifest_files(&stream).await?;
Expand Down Expand Up @@ -357,13 +344,7 @@ impl HotTierManager {
let mut file_processed = false;
let mut stream_hot_tier = self.get_hot_tier(stream).await?;
if !self.is_disk_available(parquet_file.file_size).await?
|| stream_hot_tier
.available_size
.as_ref()
.unwrap()
.parse::<u64>()
.unwrap()
<= parquet_file.file_size
|| stream_hot_tier.available_size.parse::<u64>().unwrap() <= parquet_file.file_size
{
if !self
.cleanup_hot_tier_old_data(
Expand All @@ -376,12 +357,7 @@ impl HotTierManager {
{
return Ok(file_processed);
}
*parquet_file_size = stream_hot_tier
.used_size
.as_ref()
.unwrap()
.parse::<u64>()
.unwrap();
*parquet_file_size = stream_hot_tier.used_size.parse::<u64>().unwrap();
}
let parquet_file_path = RelativePathBuf::from(parquet_file.file_path.clone());
fs::create_dir_all(parquet_path.parent().unwrap()).await?;
Expand All @@ -393,18 +369,11 @@ impl HotTierManager {
.await?;
file.write_all(&parquet_data).await?;
*parquet_file_size += parquet_file.file_size;
stream_hot_tier.used_size = Some(parquet_file_size.to_string());

stream_hot_tier.available_size = Some(
(stream_hot_tier
.available_size
.as_ref()
.unwrap()
.parse::<u64>()
.unwrap()
- parquet_file.file_size)
.to_string(),
);
stream_hot_tier.used_size = parquet_file_size.to_string();

stream_hot_tier.available_size = (stream_hot_tier.available_size.parse::<u64>().unwrap()
- parquet_file.file_size)
.to_string();
self.put_hot_tier(stream, &mut stream_hot_tier).await?;
file_processed = true;
let mut hot_tier_manifest = self
Expand Down Expand Up @@ -614,35 +583,16 @@ impl HotTierManager {
fs::remove_dir_all(path_to_delete.parent().unwrap()).await?;
delete_empty_directory_hot_tier(path_to_delete.parent().unwrap()).await?;

stream_hot_tier.used_size = Some(
(stream_hot_tier
.used_size
.as_ref()
.unwrap()
.parse::<u64>()
.unwrap()
- file_size)
.to_string(),
);
stream_hot_tier.available_size = Some(
(stream_hot_tier
.available_size
.as_ref()
.unwrap()
.parse::<u64>()
.unwrap()
+ file_size)
.to_string(),
);
stream_hot_tier.used_size =
(stream_hot_tier.used_size.parse::<u64>().unwrap() - file_size)
.to_string();
stream_hot_tier.available_size =
(stream_hot_tier.available_size.parse::<u64>().unwrap() + file_size)
.to_string();
self.put_hot_tier(stream, stream_hot_tier).await?;
delete_successful = true;

if stream_hot_tier
.available_size
.as_ref()
.unwrap()
.parse::<u64>()
.unwrap()
if stream_hot_tier.available_size.parse::<u64>().unwrap()
<= parquet_file_size
{
continue 'loop_files;
Expand Down Expand Up @@ -740,8 +690,8 @@ impl HotTierManager {
let mut stream_hot_tier = StreamHotTier {
version: Some(CURRENT_HOT_TIER_VERSION.to_string()),
size: INTERNAL_STREAM_HOT_TIER_SIZE_BYTES.to_string(),
used_size: Some("0".to_string()),
available_size: Some(INTERNAL_STREAM_HOT_TIER_SIZE_BYTES.to_string()),
used_size: "0".to_string(),
available_size: INTERNAL_STREAM_HOT_TIER_SIZE_BYTES.to_string(),
oldest_date_time_entry: None,
};
self.put_hot_tier(INTERNAL_STREAM_NAME, &mut stream_hot_tier)
Expand Down
Loading
Loading