diff --git a/build.rs b/build.rs index e73bf7082..61ddc9989 100644 --- a/build.rs +++ b/build.rs @@ -151,7 +151,7 @@ mod ui { .expect("has segemnts") .find(|v| v.starts_with('v')) .expect("version segement"); - println!("cargo:rustc-env=UI_VERSION={}", ui_version); + println!("cargo:rustc-env=UI_VERSION={ui_version}"); } Ok(()) diff --git a/src/alerts/alerts_utils.rs b/src/alerts/alerts_utils.rs index 2e2e8a443..14ca7f58d 100644 --- a/src/alerts/alerts_utils.rs +++ b/src/alerts/alerts_utils.rs @@ -88,7 +88,7 @@ async fn execute_base_query( original_query: &str, ) -> Result { let stream_name = query.first_table_name().ok_or_else(|| { - AlertError::CustomError(format!("Table name not found in query- {}", original_query)) + AlertError::CustomError(format!("Table name not found in query- {original_query}")) })?; let time_partition = PARSEABLE.get_stream(&stream_name)?.get_time_partition(); @@ -412,7 +412,7 @@ pub fn get_filter_string(where_clause: &Conditions) -> Result { let value = match NumberOrString::from_string(value.to_owned()) { NumberOrString::Number(val) => format!("{val}"), NumberOrString::String(val) => { - format!("'{}'", val) + format!("'{val}'") } }; format!("{} {}", condition.operator, value) @@ -456,23 +456,23 @@ fn match_alert_operator(expr: &ConditionConfig) -> Expr { WhereConfigOperator::BeginsWith => Expr::BinaryExpr(BinaryExpr::new( Box::new(col(column)), Operator::RegexIMatch, - Box::new(lit(format!("^{}", value))), + Box::new(lit(format!("^{value}"))), )), WhereConfigOperator::EndsWith => Expr::BinaryExpr(BinaryExpr::new( Box::new(col(column)), Operator::RegexIMatch, - Box::new(lit(format!("{}$", value))), + Box::new(lit(format!("{value}$"))), )), WhereConfigOperator::DoesNotContain => col(column).not_ilike(lit(value)), WhereConfigOperator::DoesNotBeginWith => Expr::BinaryExpr(BinaryExpr::new( Box::new(col(column)), Operator::RegexNotIMatch, - Box::new(lit(format!("^{}", value))), + Box::new(lit(format!("^{value}"))), )), WhereConfigOperator::DoesNotEndWith => Expr::BinaryExpr(BinaryExpr::new( Box::new(col(column)), Operator::RegexNotIMatch, - Box::new(lit(format!("{}$", value))), + Box::new(lit(format!("{value}$"))), )), _ => unreachable!("value must not be null for operators other than `is null` and `is not null`. Should've been caught in validation") } diff --git a/src/alerts/mod.rs b/src/alerts/mod.rs index be1aa1c21..f6a93d4af 100644 --- a/src/alerts/mod.rs +++ b/src/alerts/mod.rs @@ -365,7 +365,7 @@ impl Conditions { format!("{} {}", expr2.column, expr2.operator) }; - format!("[{} {op} {}]", expr1_msg, expr2_msg) + format!("[{expr1_msg} {op} {expr2_msg}]") } }, None => { diff --git a/src/catalog/mod.rs b/src/catalog/mod.rs index f8113545a..db89823ac 100644 --- a/src/catalog/mod.rs +++ b/src/catalog/mod.rs @@ -459,8 +459,8 @@ pub fn partition_path( let lower = lower_bound.date_naive().format("%Y-%m-%d").to_string(); let upper = upper_bound.date_naive().format("%Y-%m-%d").to_string(); if lower == upper { - RelativePathBuf::from_iter([stream, &format!("date={}", lower)]) + RelativePathBuf::from_iter([stream, &format!("date={lower}")]) } else { - RelativePathBuf::from_iter([stream, &format!("date={}:{}", lower, upper)]) + RelativePathBuf::from_iter([stream, &format!("date={lower}:{upper}")]) } } diff --git a/src/cli.rs b/src/cli.rs index cda2361c5..9a37dc578 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -563,8 +563,7 @@ impl Options { } else { if endpoint.starts_with("http") { panic!( - "Invalid value `{}`, please set the environment variable `{}` to `:` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.", - endpoint, env_var + "Invalid value `{endpoint}`, please set the environment variable `{env_var}` to `:` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.", ); } endpoint.to_string() @@ -579,15 +578,14 @@ impl Options { if addr_parts.len() != 2 { panic!( - "Invalid value `{}`, please set the environment variable to `:` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.", - endpoint + "Invalid value `{endpoint}`, please set the environment variable to `:` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details." ); } let hostname = self.resolve_env_var(addr_parts[0]); let port = self.resolve_env_var(addr_parts[1]); - self.build_url(&format!("{}:{}", hostname, port)) + self.build_url(&format!("{hostname}:{port}")) } /// resolve the env var @@ -597,15 +595,13 @@ impl Options { if let Some(env_var) = value.strip_prefix('$') { let resolved_value = env::var(env_var).unwrap_or_else(|_| { panic!( - "The environment variable `{}` is not set. Please set it to a valid value. Refer to the documentation: https://logg.ing/env for more details.", - env_var + "The environment variable `{env_var}` is not set. Please set it to a valid value. Refer to the documentation: https://logg.ing/env for more details." ); }); if resolved_value.starts_with("http") { panic!( - "Invalid value `{}`, please set the environment variable `{}` to `` without the scheme (e.g., 192.168.1.1 or example.com). Please refer to the documentation: https://logg.ing/env for more details.", - resolved_value, env_var + "Invalid value `{resolved_value}`, please set the environment variable `{env_var}` to `` without the scheme (e.g., 192.168.1.1 or example.com). Please refer to the documentation: https://logg.ing/env for more details.", ); } @@ -621,8 +617,7 @@ impl Options { .parse::() .unwrap_or_else(|err| { panic!( - "{err}, failed to parse `{}` as Url. Please set the environment variable `P_ADDR` to `:` without the scheme (e.g., 192.168.1.1:8000). Please refer to the documentation: https://logg.ing/env for more details.", - address + "{err}, failed to parse `{address}` as Url. Please set the environment variable `P_ADDR` to `:` without the scheme (e.g., 192.168.1.1:8000). Please refer to the documentation: https://logg.ing/env for more details." ); }) } diff --git a/src/connectors/common/mod.rs b/src/connectors/common/mod.rs index cb77d983c..511fd337c 100644 --- a/src/connectors/common/mod.rs +++ b/src/connectors/common/mod.rs @@ -95,7 +95,7 @@ impl FromStr for BadData { "drop" => Ok(BadData::Drop), "fail" => Ok(BadData::Fail), "dlt" => Ok(BadData::Dlt), - _ => Err(format!("Invalid bad data policy: {}", s)), + _ => Err(format!("Invalid bad data policy: {s}")), } } } diff --git a/src/connectors/kafka/config.rs b/src/connectors/kafka/config.rs index ee7e77446..855b19c58 100644 --- a/src/connectors/kafka/config.rs +++ b/src/connectors/kafka/config.rs @@ -827,7 +827,7 @@ impl std::str::FromStr for SecurityProtocol { "SSL" => Ok(SecurityProtocol::Ssl), "SASL_SSL" => Ok(SecurityProtocol::SaslSsl), "SASL_PLAINTEXT" => Ok(SecurityProtocol::SaslPlaintext), - _ => Err(format!("Invalid security protocol: {}", s)), + _ => Err(format!("Invalid security protocol: {s}")), } } } @@ -965,7 +965,7 @@ impl std::str::FromStr for SaslMechanism { "SCRAM-SHA-512" => Ok(SaslMechanism::ScramSha512), "GSSAPI" => Ok(SaslMechanism::Gssapi), "OAUTHBEARER" => Ok(SaslMechanism::OAuthBearer), - _ => Err(format!("Invalid SASL mechanism: {}", s)), + _ => Err(format!("Invalid SASL mechanism: {s}")), } } } @@ -985,7 +985,7 @@ impl std::str::FromStr for Acks { "0" => Ok(Acks::None), "1" => Ok(Acks::Leader), "all" => Ok(Acks::All), - _ => Err(format!("Invalid acks value: {}", s)), + _ => Err(format!("Invalid acks value: {s}")), } } } diff --git a/src/handlers/airplane.rs b/src/handlers/airplane.rs index d2528d3a8..8831346eb 100644 --- a/src/handlers/airplane.rs +++ b/src/handlers/airplane.rs @@ -253,7 +253,7 @@ impl FlightService for AirServiceImpl { let time = time.elapsed().as_secs_f64(); QUERY_EXECUTE_TIME - .with_label_values(&[&format!("flight-query-{}", stream_name)]) + .with_label_values(&[&format!("flight-query-{stream_name}")]) .observe(time); // Airplane takes off 🛫 diff --git a/src/handlers/http/cluster/mod.rs b/src/handlers/http/cluster/mod.rs index bedacc25d..6fb787af4 100644 --- a/src/handlers/http/cluster/mod.rs +++ b/src/handlers/http/cluster/mod.rs @@ -783,13 +783,12 @@ pub async fn remove_node(node_url: Path) -> Result String { return str; } - format!("http://{}/", str) + format!("http://{str}/") } diff --git a/src/handlers/http/llm.rs b/src/handlers/http/llm.rs index 5327f1b31..cc63895d7 100644 --- a/src/handlers/http/llm.rs +++ b/src/handlers/http/llm.rs @@ -67,11 +67,10 @@ impl From<&arrow_schema::Field> for Field { fn build_prompt(stream: &str, prompt: &str, schema_json: &str) -> String { format!( - r#"I have a table called {}. -It has the columns:\n{} -Based on this schema, generate valid SQL for the query: "{}" -Generate only simple SQL as output. Also add comments in SQL syntax to explain your actions. Don't output anything else. If it is not possible to generate valid SQL, output an SQL comment saying so."#, - stream, schema_json, prompt + r#"I have a table called {stream}. +It has the columns:\n{schema_json} +Based on this schema, generate valid SQL for the query: "{prompt}" +Generate only simple SQL as output. Also add comments in SQL syntax to explain your actions. Don't output anything else. If it is not possible to generate valid SQL, output an SQL comment saying so."# ) } diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index 18cc0f074..6d1e00697 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -30,6 +30,9 @@ use crate::stats::{event_labels_date, storage_size_labels_date, Stats}; use crate::storage::retention::Retention; use crate::storage::{StreamInfo, StreamType}; use crate::utils::actix::extract_session_key_from_req; +use crate::utils::json::flatten::{ + self, convert_to_array, generic_flattening, has_more_than_max_allowed_levels, +}; use crate::{stats, validator, LOCK_EXPECT}; use actix_web::http::StatusCode; @@ -102,22 +105,60 @@ pub async fn list(req: HttpRequest) -> Result { } pub async fn detect_schema(Json(json): Json) -> Result { - let log_records: Vec = match json { - Value::Array(arr) => arr, - value @ Value::Object(_) => vec![value], - _ => { + // flatten before infer + if !has_more_than_max_allowed_levels(&json, 1) { + //perform generic flattening, return error if failed to flatten + let mut flattened_json = match generic_flattening(&json) { + Ok(flattened) => match convert_to_array(flattened) { + Ok(array) => array, + Err(e) => { + return Err(StreamError::Custom { + msg: format!("Failed to convert to array: {e}"), + status: StatusCode::BAD_REQUEST, + }) + } + }, + Err(e) => { + return Err(StreamError::Custom { + msg: e.to_string(), + status: StatusCode::BAD_REQUEST, + }) + } + }; + if let Err(err) = flatten::flatten(&mut flattened_json, "_", None, None, None, false) { return Err(StreamError::Custom { - msg: "please send json events as part of the request".to_string(), + msg: err.to_string(), status: StatusCode::BAD_REQUEST, - }) + }); } - }; - - let mut schema = Arc::new(infer_json_schema_from_iterator(log_records.iter().map(Ok)).unwrap()); - for log_record in log_records { - schema = override_data_type(schema, log_record, SchemaVersion::V1); + let flattened_json_arr = match flattened_json { + Value::Array(arr) => arr, + value @ Value::Object(_) => vec![value], + _ => unreachable!("flatten would have failed beforehand"), + }; + let mut schema = match infer_json_schema_from_iterator(flattened_json_arr.iter().map(Ok)) { + Ok(schema) => Arc::new(schema), + Err(e) => { + return Err(StreamError::Custom { + msg: format!("Failed to infer schema: {e}"), + status: StatusCode::BAD_REQUEST, + }) + } + }; + for flattened_json in flattened_json_arr { + schema = override_data_type(schema, flattened_json, SchemaVersion::V1); + } + Ok((web::Json(schema), StatusCode::OK)) + } else { + // error out if the JSON is heavily nested + Err(StreamError::Custom { + msg: format!( + "JSON is too deeply nested (exceeds level {}), cannot flatten", + PARSEABLE.options.event_flatten_level + ), + status: StatusCode::BAD_REQUEST, + }) } - Ok((web::Json(schema), StatusCode::OK)) } pub async fn get_schema(stream_name: Path) -> Result { diff --git a/src/handlers/http/mod.rs b/src/handlers/http/mod.rs index 69890736d..7ea660b53 100644 --- a/src/handlers/http/mod.rs +++ b/src/handlers/http/mod.rs @@ -89,7 +89,7 @@ pub fn base_path_without_preceding_slash() -> String { /// An `anyhow::Result` containing the `arrow_schema::Schema` for the specified stream. pub async fn fetch_schema(stream_name: &str) -> anyhow::Result { let path_prefix = - relative_path::RelativePathBuf::from(format!("{}/{}", stream_name, STREAM_ROOT_DIRECTORY)); + relative_path::RelativePathBuf::from(format!("{stream_name}/{STREAM_ROOT_DIRECTORY}")); let store = PARSEABLE.storage.get_object_store(); let res: Vec = store .get_objects( diff --git a/src/handlers/http/modal/ingest_server.rs b/src/handlers/http/modal/ingest_server.rs index e0d1c85c9..c0564449c 100644 --- a/src/handlers/http/modal/ingest_server.rs +++ b/src/handlers/http/modal/ingest_server.rs @@ -322,7 +322,7 @@ async fn validate_credentials() -> anyhow::Result<()> { PARSEABLE.options.username, PARSEABLE.options.password )); - let token = format!("Basic {}", token); + let token = format!("Basic {token}"); if check != token { return Err(anyhow::anyhow!("Credentials do not match with other ingestors. Please check your credentials and try again.")); diff --git a/src/handlers/http/oidc.rs b/src/handlers/http/oidc.rs index 1b5db874c..580a03b93 100644 --- a/src/handlers/http/oidc.rs +++ b/src/handlers/http/oidc.rs @@ -240,7 +240,7 @@ fn redirect_to_oidc( } fn redirect_to_oidc_logout(mut logout_endpoint: Url, redirect: &Url) -> HttpResponse { - logout_endpoint.set_query(Some(&format!("post_logout_redirect_uri={}", redirect))); + logout_endpoint.set_query(Some(&format!("post_logout_redirect_uri={redirect}"))); HttpResponse::TemporaryRedirect() .insert_header((header::CACHE_CONTROL, "no-store")) .insert_header((header::LOCATION, logout_endpoint.to_string())) diff --git a/src/handlers/http/query.rs b/src/handlers/http/query.rs index a817546d0..cb8e95344 100644 --- a/src/handlers/http/query.rs +++ b/src/handlers/http/query.rs @@ -322,8 +322,7 @@ async fn handle_streaming_query( // Combine the initial fields chunk with the records stream let fields_chunk = once(future::ok::<_, actix_web::Error>(Bytes::from(format!( - "{}\n", - fields_json + "{fields_json}\n" )))); Box::pin(fields_chunk.chain(records_stream)) as Pin>>> @@ -356,7 +355,7 @@ fn create_batch_processor( error!("Failed to parse record batch into JSON: {}", e); actix_web::error::ErrorInternalServerError(e) })?; - Ok(Bytes::from(format!("{}\n", response))) + Ok(Bytes::from(format!("{response}\n"))) } Err(e) => Err(actix_web::error::ErrorInternalServerError(e)), } diff --git a/src/handlers/http/users/dashboards.rs b/src/handlers/http/users/dashboards.rs index 16a885969..d1e0cc89b 100644 --- a/src/handlers/http/users/dashboards.rs +++ b/src/handlers/http/users/dashboards.rs @@ -83,7 +83,7 @@ pub async fn post( } DASHBOARDS.update(&dashboard).await; - let path = dashboard_path(&user_id, &format!("{}.json", dashboard_id)); + let path = dashboard_path(&user_id, &format!("{dashboard_id}.json")); let store = PARSEABLE.storage.get_object_store(); let dashboard_bytes = serde_json::to_vec(&dashboard)?; @@ -120,7 +120,7 @@ pub async fn update( } DASHBOARDS.update(&dashboard).await; - let path = dashboard_path(&user_id, &format!("{}.json", dashboard_id)); + let path = dashboard_path(&user_id, &format!("{dashboard_id}.json")); let store = PARSEABLE.storage.get_object_store(); let dashboard_bytes = serde_json::to_vec(&dashboard)?; @@ -145,7 +145,7 @@ pub async fn delete( { return Err(DashboardError::Metadata("Dashboard does not exist")); } - let path = dashboard_path(&user_id, &format!("{}.json", dashboard_id)); + let path = dashboard_path(&user_id, &format!("{dashboard_id}.json")); let store = PARSEABLE.storage.get_object_store(); store.delete_object(&path).await?; diff --git a/src/handlers/http/users/filters.rs b/src/handlers/http/users/filters.rs index 378e80df3..fe0f06d2c 100644 --- a/src/handlers/http/users/filters.rs +++ b/src/handlers/http/users/filters.rs @@ -66,11 +66,7 @@ pub async fn post( filter.version = Some(CURRENT_FILTER_VERSION.to_string()); FILTERS.update(&filter).await; - let path = filter_path( - &user_id, - &filter.stream_name, - &format!("{}.json", filter_id), - ); + let path = filter_path(&user_id, &filter.stream_name, &format!("{filter_id}.json")); let store = PARSEABLE.storage.get_object_store(); let filter_bytes = serde_json::to_vec(&filter)?; @@ -95,11 +91,7 @@ pub async fn update( filter.version = Some(CURRENT_FILTER_VERSION.to_string()); FILTERS.update(&filter).await; - let path = filter_path( - &user_id, - &filter.stream_name, - &format!("{}.json", filter_id), - ); + let path = filter_path(&user_id, &filter.stream_name, &format!("{filter_id}.json")); let store = PARSEABLE.storage.get_object_store(); let filter_bytes = serde_json::to_vec(&filter)?; @@ -120,11 +112,7 @@ pub async fn delete( .await .ok_or(FiltersError::Metadata("Filter does not exist"))?; - let path = filter_path( - &user_id, - &filter.stream_name, - &format!("{}.json", filter_id), - ); + let path = filter_path(&user_id, &filter.stream_name, &format!("{filter_id}.json")); let store = PARSEABLE.storage.get_object_store(); store.delete_object(&path).await?; diff --git a/src/hottier.rs b/src/hottier.rs index a8128641d..f51d58f3d 100644 --- a/src/hottier.rs +++ b/src/hottier.rs @@ -461,9 +461,7 @@ impl HotTierManager { /// get hot tier path for the stream and date pub fn get_stream_path_for_date(&self, stream: &str, date: &NaiveDate) -> PathBuf { - self.hot_tier_path - .join(stream) - .join(format!("date={}", date)) + self.hot_tier_path.join(stream).join(format!("date={date}")) } /// Returns the list of manifest files present in hot tier directory for the stream @@ -679,7 +677,7 @@ impl HotTierManager { .to_string_lossy() .trim_start_matches("minute=") .to_string(); - let oldest_date_time = format!("{}T{}:{}:00.000Z", date, hour_str, minute_str); + let oldest_date_time = format!("{date}T{hour_str}:{minute_str}:00.000Z"); return Ok(Some(oldest_date_time)); } } diff --git a/src/option.rs b/src/option.rs index 7378deb8a..830037d80 100644 --- a/src/option.rs +++ b/src/option.rs @@ -200,8 +200,7 @@ pub mod validation { Ok(size) } else { Err(format!( - "Invalid value for P_DATASET_FIELD_COUNT_LIMIT. It should be between 1 and {}", - DATASET_FIELD_COUNT_LIMIT + "Invalid value for P_DATASET_FIELD_COUNT_LIMIT. It should be between 1 and {DATASET_FIELD_COUNT_LIMIT}" )) } } else { diff --git a/src/otel/metrics.rs b/src/otel/metrics.rs index 4edff77fe..e71aab785 100644 --- a/src/otel/metrics.rs +++ b/src/otel/metrics.rs @@ -343,13 +343,13 @@ fn flatten_exp_histogram(exp_histogram: &ExponentialHistogram) -> Vec) -> Option { }; // mask to XXX for everything after the @ symbol - Some(format!("{}@XXX", masked_username)) + Some(format!("{masked_username}@XXX")) } else { // mask all other strings with X Some("X".repeat(input.len())) diff --git a/src/storage/azure_blob.rs b/src/storage/azure_blob.rs index bcd8d2a51..1dd3a4da9 100644 --- a/src/storage/azure_blob.rs +++ b/src/storage/azure_blob.rs @@ -247,7 +247,7 @@ impl BlobStore { if let Err(object_store::Error::NotFound { source, .. }) = &resp { return Err(ObjectStorageError::Custom( - format!("Failed to upload, error: {:?}", source).to_string(), + format!("Failed to upload, error: {source:?}").to_string(), )); } @@ -682,7 +682,7 @@ impl ObjectStorage for BlobStore { let stream_json_check = FuturesUnordered::new(); for dir in &dirs { - let key = format!("{}/{}", dir, STREAM_METADATA_FILE_NAME); + let key = format!("{dir}/{STREAM_METADATA_FILE_NAME}"); let task = async move { self.client.head(&StorePath::from(key)).await.map(|_| ()) }; stream_json_check.push(task); } diff --git a/src/storage/localfs.rs b/src/storage/localfs.rs index 3b79f4028..cc6a82a53 100644 --- a/src/storage/localfs.rs +++ b/src/storage/localfs.rs @@ -452,7 +452,7 @@ impl ObjectStorage for LocalFS { fn query_prefixes(&self, prefixes: Vec) -> Vec { prefixes .into_iter() - .filter_map(|prefix| ListingTableUrl::parse(format!("/{}", prefix)).ok()) + .filter_map(|prefix| ListingTableUrl::parse(format!("/{prefix}")).ok()) .collect() } diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index c4a2ab5d2..f87b98b4b 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -1139,10 +1139,9 @@ fn get_stats_sql(stream_name: &str, field_name: &str, max_field_statistics: usiz field_value, value_count FROM field_summary - WHERE rn <= {} + WHERE rn <= {max_field_statistics} ORDER BY value_count DESC - "#, - max_field_statistics + "# ) } diff --git a/src/storage/s3.rs b/src/storage/s3.rs index 7a8d2bbdc..83a98a6e1 100644 --- a/src/storage/s3.rs +++ b/src/storage/s3.rs @@ -765,7 +765,7 @@ impl ObjectStorage for S3 { let stream_json_check = FuturesUnordered::new(); for dir in &dirs { - let key = format!("{}/{}", dir, STREAM_METADATA_FILE_NAME); + let key = format!("{dir}/{STREAM_METADATA_FILE_NAME}"); let task = async move { self.client.head(&StorePath::from(key)).await.map(|_| ()) }; stream_json_check.push(task); } diff --git a/src/storage/store_metadata.rs b/src/storage/store_metadata.rs index 22b21176d..93a3ca76c 100644 --- a/src/storage/store_metadata.rs +++ b/src/storage/store_metadata.rs @@ -163,8 +163,7 @@ fn handle_none_env( fn handle_new_remote_env() -> Result<(StorageMetadata, bool, bool), ObjectStorageError> { Err(ObjectStorageError::UnhandledError(format!( - "Could not start the server because staging directory indicates stale data from previous deployment, please choose an empty staging directory and restart the server. {}", - JOIN_COMMUNITY + "Could not start the server because staging directory indicates stale data from previous deployment, please choose an empty staging directory and restart the server. {JOIN_COMMUNITY}" ).into())) } @@ -174,8 +173,7 @@ fn handle_new_staging_env( if metadata.server_mode == Mode::All && PARSEABLE.options.mode == Mode::Ingest { return Err(ObjectStorageError::UnhandledError( format!( - "Starting Ingest Mode is not allowed, Since Query Server has not been started yet. {}", - JOIN_COMMUNITY + "Starting Ingest Mode is not allowed, Since Query Server has not been started yet. {JOIN_COMMUNITY}" ) .into(), )); diff --git a/src/users/filters.rs b/src/users/filters.rs index 2baf22216..42d34bb9d 100644 --- a/src/users/filters.rs +++ b/src/users/filters.rs @@ -134,7 +134,7 @@ impl Filters { (user_id, stream_name, filter_id) { let path = - filter_path(user_id, stream_name, &format!("{}.json", filter_id)); + filter_path(user_id, stream_name, &format!("{filter_id}.json")); let filter_bytes = to_bytes(&filter_value); store.put_object(&path, filter_bytes.clone()).await?; } diff --git a/src/utils/arrow/flight.rs b/src/utils/arrow/flight.rs index e310f51f4..9ed5fff42 100644 --- a/src/utils/arrow/flight.rs +++ b/src/utils/arrow/flight.rs @@ -97,10 +97,10 @@ pub async fn append_temporary_events( > { let schema = PARSEABLE .get_stream(stream_name) - .map_err(|err| Status::failed_precondition(format!("Metadata Error: {}", err)))? + .map_err(|err| Status::failed_precondition(format!("Metadata Error: {err}")))? .get_schema(); let rb = concat_batches(&schema, minute_result) - .map_err(|err| Status::failed_precondition(format!("ArrowError: {}", err)))?; + .map_err(|err| Status::failed_precondition(format!("ArrowError: {err}")))?; let event = push_logs_unchecked(rb, stream_name) .await diff --git a/src/utils/human_size.rs b/src/utils/human_size.rs index 3cf5f40c8..ef345313c 100644 --- a/src/utils/human_size.rs +++ b/src/utils/human_size.rs @@ -39,7 +39,7 @@ pub fn bytes_to_human_size(bytes: u64) -> String { const PIB: u64 = TIB * 1024; if bytes < KIB { - format!("{} B", bytes) + format!("{bytes} B") } else if bytes < MIB { format!("{:.2} KB", bytes as f64 / KIB as f64) } else if bytes < GIB { diff --git a/src/utils/json/mod.rs b/src/utils/json/mod.rs index 9de308c25..2b45bcf68 100644 --- a/src/utils/json/mod.rs +++ b/src/utils/json/mod.rs @@ -111,8 +111,7 @@ impl Visitor<'_> for TrueFromStr { match s { "true" => Ok(true), other => Err(E::custom(format!( - r#"Expected value: "true", got: {}"#, - other + r#"Expected value: "true", got: {other}"# ))), } } diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 4cd5cac3a..71c07f5dc 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -53,8 +53,7 @@ pub fn extract_datetime(path: &str) -> Option { let minute_str = caps.get(3)?.as_str(); let date = NaiveDate::parse_from_str(date_str, "%Y-%m-%d").ok()?; - let time = - NaiveTime::parse_from_str(&format!("{}:{}", hour_str, minute_str), "%H:%M").ok()?; + let time = NaiveTime::parse_from_str(&format!("{hour_str}:{minute_str}"), "%H:%M").ok()?; Some(NaiveDateTime::new(date, time)) } else { None @@ -83,7 +82,7 @@ async fn get_tables_from_query(query: &str) -> Result