Skip to content

refactor: clean up parts of the codebase #981

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 26 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/analytics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ pub struct Report {
memory_total_bytes: u64,
platform: String,
storage_mode: String,
server_mode: String,
server_mode: Mode,
version: String,
commit_hash: String,
active_ingestors: u64,
Expand Down Expand Up @@ -112,7 +112,7 @@ impl Report {
memory_total_bytes: mem_total,
platform: platform().to_string(),
storage_mode: CONFIG.get_storage_mode_string().to_string(),
server_mode: CONFIG.parseable.mode.to_string(),
server_mode: CONFIG.parseable.mode,
version: current().released_version.to_string(),
commit_hash: current().commit_hash,
active_ingestors: ingestor_metrics.0,
Expand Down
20 changes: 6 additions & 14 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -527,20 +527,12 @@ impl FromArgMatches for Cli {
.get_one::<usize>(Self::ROW_GROUP_SIZE)
.cloned()
.expect("default for row_group size");
self.parquet_compression = match m
.get_one::<String>(Self::PARQUET_COMPRESSION_ALGO)
.expect("default for compression algo")
.as_str()
{
"uncompressed" => Compression::UNCOMPRESSED,
"snappy" => Compression::SNAPPY,
"gzip" => Compression::GZIP,
"lzo" => Compression::LZO,
"brotli" => Compression::BROTLI,
"lz4" => Compression::LZ4,
"zstd" => Compression::ZSTD,
_ => unreachable!(),
};
self.parquet_compression = serde_json::from_str(&format!(
"{:?}",
m.get_one::<String>(Self::PARQUET_COMPRESSION_ALGO)
.expect("default for compression algo")
))
.expect("unexpected compression algo");

let openid_client_id = m.get_one::<String>(Self::OPENID_CLIENT_ID).cloned();
let openid_client_secret = m.get_one::<String>(Self::OPENID_CLIENT_SECRET).cloned();
Expand Down
12 changes: 6 additions & 6 deletions src/event/format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ impl EventFormat for Event {
// also extract the arrow schema, tags and metadata from the incoming json
fn to_data(
self,
schema: HashMap<String, Arc<Field>>,
static_schema_flag: Option<String>,
time_partition: Option<String>,
schema: &HashMap<String, Arc<Field>>,
static_schema_flag: Option<&String>,
time_partition: Option<&String>,
) -> Result<(Self::Data, Vec<Arc<Field>>, bool, Tags, Metadata), anyhow::Error> {
let data = flatten_json_body(self.data, None, None, None, false)?;
let data = flatten_json_body(&self.data, None, None, None, false)?;
let stream_schema = schema;

// incoming event may be a single json or a json array
Expand All @@ -66,13 +66,13 @@ impl EventFormat for Event {
collect_keys(value_arr.iter()).expect("fields can be collected from array of objects");

let mut is_first = false;
let schema = match derive_arrow_schema(&stream_schema, fields) {
let schema = match derive_arrow_schema(stream_schema, fields) {
Ok(schema) => schema,
Err(_) => match infer_json_schema_from_iterator(value_arr.iter().map(Ok)) {
Ok(mut infer_schema) => {
let new_infer_schema = super::super::format::update_field_type_in_schema(
Arc::new(infer_schema),
Some(&stream_schema),
Some(stream_schema),
time_partition,
Some(&value_arr),
);
Expand Down
31 changes: 15 additions & 16 deletions src/event/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,22 +42,21 @@ pub trait EventFormat: Sized {

fn to_data(
self,
schema: HashMap<String, Arc<Field>>,
static_schema_flag: Option<String>,
time_partition: Option<String>,
schema: &HashMap<String, Arc<Field>>,
static_schema_flag: Option<&String>,
time_partition: Option<&String>,
) -> Result<(Self::Data, EventSchema, bool, Tags, Metadata), AnyError>;

fn decode(data: Self::Data, schema: Arc<Schema>) -> Result<RecordBatch, AnyError>;

fn into_recordbatch(
self,
storage_schema: HashMap<String, Arc<Field>>,
static_schema_flag: Option<String>,
time_partition: Option<String>,
storage_schema: &HashMap<String, Arc<Field>>,
static_schema_flag: Option<&String>,
time_partition: Option<&String>,
) -> Result<(RecordBatch, bool), AnyError> {
let (data, mut schema, is_first, tags, metadata) = self.to_data(
storage_schema.clone(),
static_schema_flag.clone(),
time_partition.clone(),
)?;
let (data, mut schema, is_first, tags, metadata) =
self.to_data(storage_schema, static_schema_flag, time_partition)?;

if get_field(&schema, DEFAULT_TAGS_KEY).is_some() {
return Err(anyhow!("field {} is a reserved field", DEFAULT_TAGS_KEY));
Expand Down Expand Up @@ -120,8 +119,8 @@ pub trait EventFormat: Sized {

fn is_schema_matching(
new_schema: Arc<Schema>,
storage_schema: HashMap<String, Arc<Field>>,
static_schema_flag: Option<String>,
storage_schema: &HashMap<String, Arc<Field>>,
static_schema_flag: Option<&String>,
) -> bool {
if static_schema_flag.is_none() {
return true;
Expand Down Expand Up @@ -207,7 +206,7 @@ pub fn override_timestamp_fields(
pub fn update_field_type_in_schema(
inferred_schema: Arc<Schema>,
existing_schema: Option<&HashMap<String, Arc<Field>>>,
time_partition: Option<String>,
time_partition: Option<&String>,
log_records: Option<&Vec<Value>>,
) -> Arc<Schema> {
let mut updated_schema = inferred_schema.clone();
Expand Down Expand Up @@ -236,12 +235,12 @@ pub fn update_field_type_in_schema(
if time_partition.is_none() {
return updated_schema;
}
let time_partition_field_name = time_partition.unwrap();

let new_schema: Vec<Field> = updated_schema
.fields()
.iter()
.map(|field| {
if *field.name() == time_partition_field_name {
if field.name() == time_partition.unwrap() {
if field.data_type() == &DataType::Utf8 {
let new_data_type = DataType::Timestamp(TimeUnit::Millisecond, None);
Field::new(field.name().clone(), new_data_type, true)
Expand Down
34 changes: 17 additions & 17 deletions src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ pub async fn ingest(req: HttpRequest, body: Bytes) -> Result<HttpResponse, PostE
}
create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string()).await?;

flatten_and_push_logs(req, body, stream_name).await?;
flatten_and_push_logs(req, body, &stream_name).await?;
Ok(HttpResponse::Ok().finish())
} else {
Err(PostError::Header(ParseHeaderError::MissingStreamName))
Expand All @@ -84,7 +84,7 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<
tags: String::default(),
metadata: String::default(),
};
event.into_recordbatch(schema, None, None)?
event.into_recordbatch(&schema, None, None)?
};
event::Event {
rb,
Expand Down Expand Up @@ -114,9 +114,9 @@ pub async fn handle_otel_ingestion(
.iter()
.find(|&(key, _)| key == STREAM_NAME_HEADER_KEY)
{
let stream_name = stream_name.to_str().unwrap().to_owned();
create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string()).await?;
push_logs(stream_name.to_string(), req.clone(), body).await?;
let stream_name = stream_name.to_str().unwrap();
create_stream_if_not_exists(stream_name, &StreamType::UserDefined.to_string()).await?;
push_logs(stream_name, &req, &body).await?;
} else {
return Err(PostError::Header(ParseHeaderError::MissingStreamName));
}
Expand Down Expand Up @@ -149,7 +149,7 @@ pub async fn post_event(req: HttpRequest, body: Bytes) -> Result<HttpResponse, P
}
}

flatten_and_push_logs(req, body, stream_name).await?;
flatten_and_push_logs(req, body, &stream_name).await?;
Ok(HttpResponse::Ok().finish())
}

Expand Down Expand Up @@ -319,7 +319,7 @@ mod tests {
.append_header((PREFIX_META.to_string() + "C", "meta1"))
.to_http_request();

let (rb, _) = into_event_batch(req, json, HashMap::default(), None, None).unwrap();
let (rb, _) = into_event_batch(&req, &json, HashMap::default(), None, None).unwrap();

assert_eq!(rb.num_rows(), 1);
assert_eq!(rb.num_columns(), 6);
Expand Down Expand Up @@ -359,7 +359,7 @@ mod tests {

let req = TestRequest::default().to_http_request();

let (rb, _) = into_event_batch(req, json, HashMap::default(), None, None).unwrap();
let (rb, _) = into_event_batch(&req, &json, HashMap::default(), None, None).unwrap();

assert_eq!(rb.num_rows(), 1);
assert_eq!(rb.num_columns(), 5);
Expand Down Expand Up @@ -391,7 +391,7 @@ mod tests {

let req = TestRequest::default().to_http_request();

let (rb, _) = into_event_batch(req, json, schema, None, None).unwrap();
let (rb, _) = into_event_batch(&req, &json, schema, None, None).unwrap();

assert_eq!(rb.num_rows(), 1);
assert_eq!(rb.num_columns(), 5);
Expand Down Expand Up @@ -423,7 +423,7 @@ mod tests {

let req = TestRequest::default().to_http_request();

assert!(into_event_batch(req, json, schema, None, None).is_err());
assert!(into_event_batch(&req, &json, schema, None, None).is_err());
}

#[test]
Expand All @@ -441,7 +441,7 @@ mod tests {

let req = TestRequest::default().to_http_request();

let (rb, _) = into_event_batch(req, json, schema, None, None).unwrap();
let (rb, _) = into_event_batch(&req, &json, schema, None, None).unwrap();

assert_eq!(rb.num_rows(), 1);
assert_eq!(rb.num_columns(), 3);
Expand All @@ -453,7 +453,7 @@ mod tests {

let req = TestRequest::default().to_http_request();

assert!(into_event_batch(req, json, HashMap::default(), None, None).is_err())
assert!(into_event_batch(&req, &json, HashMap::default(), None, None).is_err())
}

#[test]
Expand All @@ -476,7 +476,7 @@ mod tests {

let req = TestRequest::default().to_http_request();

let (rb, _) = into_event_batch(req, json, HashMap::default(), None, None).unwrap();
let (rb, _) = into_event_batch(&req, &json, HashMap::default(), None, None).unwrap();

assert_eq!(rb.num_rows(), 3);
assert_eq!(rb.num_columns(), 6);
Expand Down Expand Up @@ -524,7 +524,7 @@ mod tests {

let req = TestRequest::default().to_http_request();

let (rb, _) = into_event_batch(req, json, HashMap::default(), None, None).unwrap();
let (rb, _) = into_event_batch(&req, &json, HashMap::default(), None, None).unwrap();

assert_eq!(rb.num_rows(), 3);
assert_eq!(rb.num_columns(), 6);
Expand Down Expand Up @@ -572,7 +572,7 @@ mod tests {
);
let req = TestRequest::default().to_http_request();

let (rb, _) = into_event_batch(req, json, schema, None, None).unwrap();
let (rb, _) = into_event_batch(&req, &json, schema, None, None).unwrap();

assert_eq!(rb.num_rows(), 3);
assert_eq!(rb.num_columns(), 6);
Expand Down Expand Up @@ -621,7 +621,7 @@ mod tests {
.into_iter(),
);

assert!(into_event_batch(req, json, schema, None, None).is_err());
assert!(into_event_batch(&req, &json, schema, None, None).is_err());
}

#[test]
Expand Down Expand Up @@ -649,7 +649,7 @@ mod tests {

let req = TestRequest::default().to_http_request();

let (rb, _) = into_event_batch(req, json, HashMap::default(), None, None).unwrap();
let (rb, _) = into_event_batch(&req, &json, HashMap::default(), None, None).unwrap();

assert_eq!(rb.num_rows(), 4);
assert_eq!(rb.num_columns(), 7);
Expand Down
4 changes: 1 addition & 3 deletions src/handlers/http/kinesis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,14 @@ use std::collections::BTreeMap;
use std::str;

#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
struct Message {
#[serde(rename = "records")]
records: Vec<Data>,
#[serde(rename = "requestId")]
request_id: String,
timestamp: u64,
}
#[derive(Serialize, Deserialize, Debug)]
struct Data {
#[serde(rename = "data")]
data: String,
}

Expand Down
8 changes: 4 additions & 4 deletions src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -614,8 +614,8 @@ pub async fn put_stream_hot_tier(
let existing_hot_tier_used_size = hot_tier_manager
.validate_hot_tier_size(&stream_name, &hottier.size)
.await?;
hottier.used_size = Some(existing_hot_tier_used_size.to_string());
hottier.available_size = Some(hottier.size.clone());
hottier.used_size = existing_hot_tier_used_size.to_string();
hottier.available_size = hottier.size.to_string();
hottier.version = Some(CURRENT_HOT_TIER_VERSION.to_string());
hot_tier_manager
.put_hot_tier(&stream_name, &mut hottier)
Expand Down Expand Up @@ -658,8 +658,8 @@ pub async fn get_stream_hot_tier(req: HttpRequest) -> Result<impl Responder, Str
if let Some(hot_tier_manager) = HotTierManager::global() {
let mut hot_tier = hot_tier_manager.get_hot_tier(&stream_name).await?;
hot_tier.size = format!("{} {}", hot_tier.size, "Bytes");
hot_tier.used_size = Some(format!("{} {}", hot_tier.used_size.unwrap(), "Bytes"));
hot_tier.available_size = Some(format!("{} {}", hot_tier.available_size.unwrap(), "Bytes"));
hot_tier.used_size = format!("{} Bytes", hot_tier.used_size);
hot_tier.available_size = format!("{} Bytes", hot_tier.available_size);
Ok((web::Json(hot_tier), StatusCode::OK))
} else {
Err(StreamError::Custom {
Expand Down
2 changes: 1 addition & 1 deletion src/handlers/http/middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ use crate::{
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
struct Message {
#[serde(rename = "commonAttributes")]
common_attributes: CommonAttributes,
}

Expand Down
9 changes: 2 additions & 7 deletions src/handlers/http/modal/ingest_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ use crate::storage::PARSEABLE_ROOT_DIRECTORY;
use crate::sync;

use crate::{handlers::http::base_path, option::CONFIG};
use actix_web::body::MessageBody;
use actix_web::web;
use actix_web::web::resource;
use actix_web::Scope;
Expand Down Expand Up @@ -309,9 +308,7 @@ impl IngestServer {
.clone_from(&INGESTOR_META.domain_name);
store_data.port.clone_from(&INGESTOR_META.port);

let resource = serde_json::to_string(&store_data)?
.try_into_bytes()
.map_err(|err| anyhow!(err))?;
let resource = Bytes::from(serde_json::to_vec(&store_data)?);

// if pushing to object store fails propagate the error
return store
Expand All @@ -320,9 +317,7 @@ impl IngestServer {
.map_err(|err| anyhow!(err));
}
} else {
let resource = serde_json::to_string(&resource)?
.try_into_bytes()
.map_err(|err| anyhow!(err))?;
let resource = Bytes::from(serde_json::to_vec(&resource)?);

store.put_object(&path, resource).await?;
}
Expand Down
6 changes: 2 additions & 4 deletions src/handlers/http/modal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ impl IngestorMetadata {
#[cfg(test)]
mod test {
use actix_web::body::MessageBody;
use bytes::Bytes;
use rstest::rstest;

use super::{IngestorMetadata, DEFAULT_VERSION};
Expand Down Expand Up @@ -256,10 +257,7 @@ mod test {
"8002".to_string(),
);

let lhs = serde_json::to_string(&im)
.unwrap()
.try_into_bytes()
.unwrap();
let lhs = Bytes::from(serde_json::to_vec(&im).unwrap());
let rhs = br#"{"version":"v3","port":"8000","domain_name":"https://localhost:8000","bucket_name":"somebucket","token":"Basic YWRtaW46YWRtaW4=","ingestor_id":"ingestor_id","flight_port":"8002"}"#
.try_into_bytes()
.unwrap();
Expand Down
Loading
Loading