diff --git a/Cargo.toml b/Cargo.toml index e12c5c0a7..bd4f403e6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,8 +2,8 @@ name = "parseable" version = "2.3.5" authors = ["Parseable Team "] -edition = "2021" -rust-version = "1.83.0" +edition = "2024" +rust-version = "1.88.0" categories = ["logs", "observability", "metrics", "traces"] build = "build.rs" diff --git a/Dockerfile b/Dockerfile index a2e490fd2..f2c5e255d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -14,7 +14,7 @@ # along with this program. If not, see . # build stage -FROM rust:1.84.0-bookworm AS builder +FROM rust:1.88.0-bookworm AS builder LABEL org.opencontainers.image.title="Parseable" LABEL maintainer="Parseable Team " diff --git a/Dockerfile.debug b/Dockerfile.debug index de7880003..64e9638c9 100644 --- a/Dockerfile.debug +++ b/Dockerfile.debug @@ -14,7 +14,7 @@ # along with this program. If not, see . # build stage -FROM docker.io/rust:1.84.0-bookworm AS builder +FROM docker.io/rust:1.88.0-bookworm AS builder LABEL org.opencontainers.image.title="Parseable" LABEL maintainer="Parseable Team " diff --git a/build.rs b/build.rs index 61ddc9989..59aa13159 100644 --- a/build.rs +++ b/build.rs @@ -38,7 +38,7 @@ pub fn main() -> Result<()> { mod ui { - use std::fs::{self, create_dir_all, OpenOptions}; + use std::fs::{self, OpenOptions, create_dir_all}; use std::io::{self, Cursor, Read, Write}; use std::path::{Path, PathBuf}; use std::{env, panic}; diff --git a/src/alerts/alerts_utils.rs b/src/alerts/alerts_utils.rs index 1234b4d7c..685ef2290 100644 --- a/src/alerts/alerts_utils.rs +++ b/src/alerts/alerts_utils.rs @@ -27,7 +27,7 @@ use datafusion::{ sum::sum, }, logical_expr::{BinaryExpr, Literal, Operator}, - prelude::{col, lit, DataFrame, Expr}, + prelude::{DataFrame, Expr, col, lit}, }; use tracing::trace; @@ -35,13 +35,13 @@ use crate::{ alerts::LogicalOperator, handlers::http::query::{create_streams_for_distributed, update_schema_when_distributed}, parseable::PARSEABLE, - query::{resolve_stream_names, QUERY_SESSION}, + query::{QUERY_SESSION, resolve_stream_names}, utils::time::TimeRange, }; use super::{ - AggregateConfig, AggregateFunction, AggregateResult, Aggregates, AlertConfig, AlertError, - AlertOperator, AlertState, ConditionConfig, Conditions, WhereConfigOperator, ALERTS, + ALERTS, AggregateConfig, AggregateFunction, AggregateResult, Aggregates, AlertConfig, + AlertError, AlertOperator, AlertState, ConditionConfig, Conditions, WhereConfigOperator, }; /// accept the alert @@ -473,9 +473,7 @@ fn match_alert_operator(expr: &ConditionConfig) -> Expr { WhereConfigOperator::LessThanOrEqual => col(column).lt_eq(lit(value)), WhereConfigOperator::GreaterThanOrEqual => col(column).gt_eq(lit(value)), WhereConfigOperator::ILike => col(column).ilike(lit(string_value)), - WhereConfigOperator::Contains => { - col(column).like(lit(format!("%{string_value}%"))) - }, + WhereConfigOperator::Contains => col(column).like(lit(format!("%{string_value}%"))), WhereConfigOperator::BeginsWith => Expr::BinaryExpr(BinaryExpr::new( Box::new(col(column)), Operator::RegexIMatch, @@ -497,7 +495,9 @@ fn match_alert_operator(expr: &ConditionConfig) -> Expr { Operator::RegexNotIMatch, Box::new(lit(format!("{string_value}$"))), )), - _ => unreachable!("value must not be null for operators other than `is null` and `is not null`. Should've been caught in validation") + _ => unreachable!( + "value must not be null for operators other than `is null` and `is not null`. Should've been caught in validation" + ), } } else { // for maintaining column case @@ -505,7 +505,9 @@ fn match_alert_operator(expr: &ConditionConfig) -> Expr { match expr.operator { WhereConfigOperator::IsNull => col(column).is_null(), WhereConfigOperator::IsNotNull => col(column).is_not_null(), - _ => unreachable!("value must be null for `is null` and `is not null`. Should've been caught in validation") + _ => unreachable!( + "value must be null for `is null` and `is not null`. Should've been caught in validation" + ), } } } diff --git a/src/alerts/mod.rs b/src/alerts/mod.rs index 2fe458a2e..56bb109e3 100644 --- a/src/alerts/mod.rs +++ b/src/alerts/mod.rs @@ -20,8 +20,8 @@ use actix_web::http::header::ContentType; use async_trait::async_trait; use chrono::Utc; use datafusion::sql::sqlparser::parser::ParserError; -use derive_more::derive::FromStr; use derive_more::FromStrError; +use derive_more::derive::FromStr; use http::StatusCode; use once_cell::sync::Lazy; use serde::Serialize; @@ -31,7 +31,7 @@ use std::fmt::{self, Display}; use std::thread; use std::time::Duration; use tokio::sync::oneshot::{Receiver, Sender}; -use tokio::sync::{mpsc, RwLock}; +use tokio::sync::{RwLock, mpsc}; use tokio::task::JoinHandle; use tracing::{error, trace, warn}; use ulid::Ulid; @@ -40,7 +40,7 @@ pub mod alerts_utils; pub mod target; use crate::alerts::target::TARGETS; -use crate::parseable::{StreamNotFound, PARSEABLE}; +use crate::parseable::{PARSEABLE, StreamNotFound}; use crate::rbac::map::SessionKey; use crate::storage; use crate::storage::ObjectStorageError; diff --git a/src/alerts/target.rs b/src/alerts/target.rs index 7f442a26e..d13151ca0 100644 --- a/src/alerts/target.rs +++ b/src/alerts/target.rs @@ -26,11 +26,11 @@ use async_trait::async_trait; use base64::Engine; use bytes::Bytes; use chrono::Utc; -use http::{header::AUTHORIZATION, HeaderMap, HeaderValue}; +use http::{HeaderMap, HeaderValue, header::AUTHORIZATION}; use itertools::Itertools; use once_cell::sync::Lazy; use reqwest::ClientBuilder; -use serde_json::{json, Value}; +use serde_json::{Value, json}; use tokio::sync::RwLock; use tracing::{error, trace, warn}; use ulid::Ulid; @@ -288,7 +288,9 @@ impl Target { state } else { *state.lock().unwrap() = TimeoutState::default(); - warn!("Unable to fetch state for given alert_id- {alert_id}, stopping target notifs"); + warn!( + "Unable to fetch state for given alert_id- {alert_id}, stopping target notifs" + ); return; }; @@ -304,7 +306,9 @@ impl Target { state } else { *state.lock().unwrap() = TimeoutState::default(); - warn!("Unable to fetch state for given alert_id- {alert_id}, stopping target notifs"); + warn!( + "Unable to fetch state for given alert_id- {alert_id}, stopping target notifs" + ); return; }; diff --git a/src/analytics.rs b/src/analytics.rs index 8bafe8a95..6353a3f57 100644 --- a/src/analytics.rs +++ b/src/analytics.rs @@ -16,7 +16,7 @@ * * */ -use actix_web::{web, HttpRequest, Responder}; +use actix_web::{HttpRequest, Responder, web}; use chrono::{DateTime, Utc}; use clokwerk::{AsyncScheduler, Interval}; use http::header; @@ -31,19 +31,20 @@ use tracing::{error, info}; use ulid::Ulid; use crate::{ + HTTP_CLIENT, INTRA_CLUSTER_CLIENT, about::{current, platform}, handlers::{ + STREAM_NAME_HEADER_KEY, http::{ base_path_without_preceding_slash, cluster::{self, utils::check_liveness}, modal::{NodeMetadata, NodeType}, }, - STREAM_NAME_HEADER_KEY, }, option::Mode, parseable::PARSEABLE, stats::{self, Stats}, - storage, HTTP_CLIENT, INTRA_CLUSTER_CLIENT, + storage, }; const ANALYTICS_SERVER_URL: &str = "https://analytics.parseable.io:80"; @@ -239,8 +240,8 @@ fn total_event_stats() -> (Stats, Stats, Stats) { ) } -async fn fetch_ingestors_metrics( -) -> anyhow::Result<(u64, u64, usize, u64, u64, u64, u64, u64, u64, u64, u64, u64)> { +async fn fetch_ingestors_metrics() +-> anyhow::Result<(u64, u64, usize, u64, u64, u64, u64, u64, u64, u64, u64, u64)> { let event_stats = total_event_stats(); let mut node_metrics = NodeMetrics::new( total_streams(), diff --git a/src/audit.rs b/src/audit.rs index e62c53c0e..eda9236f3 100644 --- a/src/audit.rs +++ b/src/audit.rs @@ -21,12 +21,12 @@ use std::{ fmt::{Debug, Display}, }; -use crate::{about::current, parseable::PARSEABLE, storage::StorageMetadata, HTTP_CLIENT}; +use crate::{HTTP_CLIENT, about::current, parseable::PARSEABLE, storage::StorageMetadata}; use chrono::{DateTime, Utc}; use once_cell::sync::Lazy; use serde::Serialize; -use serde_json::{json, Value}; +use serde_json::{Value, json}; use tracing::error; use ulid::Ulid; diff --git a/src/catalog/mod.rs b/src/catalog/mod.rs index db89823ac..a7c37e116 100644 --- a/src/catalog/mod.rs +++ b/src/catalog/mod.rs @@ -41,7 +41,7 @@ use crate::{ query::PartialTimeFilter, stats::{event_labels_date, get_current_stats, storage_size_labels_date, update_deleted_stats}, storage::{ - object_storage::manifest_path, ObjectStorage, ObjectStorageError, ObjectStoreFormat, + ObjectStorage, ObjectStorageError, ObjectStoreFormat, object_storage::manifest_path, }, }; pub use manifest::create_from_parquet_file; diff --git a/src/cli.rs b/src/cli.rs index a4bce12b0..422eb7c36 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -26,7 +26,7 @@ use crate::connectors::kafka::config::KafkaConfig; use crate::{ oidc::{self, OpenidConfig}, - option::{validation, Compression, Mode}, + option::{Compression, Mode, validation}, storage::{AzureBlobConfig, FSConfig, GcsConfig, S3Config}, }; diff --git a/src/connectors/kafka/consumer.rs b/src/connectors/kafka/consumer.rs index 3bfc01702..e4057b050 100644 --- a/src/connectors/kafka/consumer.rs +++ b/src/connectors/kafka/consumer.rs @@ -16,20 +16,20 @@ * */ -use crate::connectors::common::shutdown::Shutdown; use crate::connectors::common::ConnectorError; +use crate::connectors::common::shutdown::Shutdown; use crate::connectors::kafka::partition_stream::{PartitionStreamReceiver, PartitionStreamSender}; use crate::connectors::kafka::state::StreamState; use crate::connectors::kafka::{ - partition_stream, ConsumerRecord, KafkaContext, StreamConsumer, TopicPartition, + ConsumerRecord, KafkaContext, StreamConsumer, TopicPartition, partition_stream, }; use futures_util::FutureExt; +use rdkafka::Statistics; use rdkafka::consumer::Consumer; use rdkafka::message::BorrowedMessage; -use rdkafka::Statistics; use std::sync::Arc; use std::time::Duration; -use tokio::sync::{mpsc, RwLock}; +use tokio::sync::{RwLock, mpsc}; use tokio_stream::wrappers::ReceiverStream; use tracing::{error, info, warn}; diff --git a/src/connectors/kafka/metrics.rs b/src/connectors/kafka/metrics.rs index 45a5f52d4..77b2872c0 100644 --- a/src/connectors/kafka/metrics.rs +++ b/src/connectors/kafka/metrics.rs @@ -18,8 +18,8 @@ use prometheus::core::{Collector, Desc}; use prometheus::{ - proto, Histogram, HistogramOpts, HistogramVec, IntCounter, IntCounterVec, IntGauge, - IntGaugeVec, Opts, + Histogram, HistogramOpts, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Opts, + proto, }; use rdkafka::Statistics; use std::sync::{Arc, RwLock}; diff --git a/src/connectors/kafka/partition_stream.rs b/src/connectors/kafka/partition_stream.rs index f1a6ca8bc..4609d4c01 100644 --- a/src/connectors/kafka/partition_stream.rs +++ b/src/connectors/kafka/partition_stream.rs @@ -18,7 +18,7 @@ use crate::connectors::kafka::{ConsumerRecord, TopicPartition}; use std::sync::Arc; -use tokio::sync::{mpsc, Notify}; +use tokio::sync::{Notify, mpsc}; use tokio_stream::wrappers::ReceiverStream; use tracing::{error, info}; diff --git a/src/connectors/kafka/processor.rs b/src/connectors/kafka/processor.rs index 5b01725ed..0d0a68971 100644 --- a/src/connectors/kafka/processor.rs +++ b/src/connectors/kafka/processor.rs @@ -28,14 +28,14 @@ use tracing::{debug, error}; use crate::{ connectors::common::processor::Processor, event::{ - format::{json, EventFormat, LogSourceEntry}, Event as ParseableEvent, USER_AGENT_KEY, + format::{EventFormat, LogSourceEntry, json}, }, parseable::PARSEABLE, storage::StreamType, }; -use super::{config::BufferConfig, ConsumerRecord, StreamConsumer, TopicPartition}; +use super::{ConsumerRecord, StreamConsumer, TopicPartition, config::BufferConfig}; #[derive(Default, Debug, Clone)] pub struct ParseableSinkProcessor; diff --git a/src/connectors/kafka/rebalance_listener.rs b/src/connectors/kafka/rebalance_listener.rs index 8372e1b5b..b82ec5a20 100644 --- a/src/connectors/kafka/rebalance_listener.rs +++ b/src/connectors/kafka/rebalance_listener.rs @@ -17,8 +17,8 @@ */ use crate::connectors::common::shutdown::Shutdown; -use crate::connectors::kafka::state::StreamState; use crate::connectors::kafka::RebalanceEvent; +use crate::connectors::kafka::state::StreamState; use std::sync::Arc; use tokio::sync::RwLock; use tokio::{runtime::Handle, sync::mpsc::Receiver}; diff --git a/src/connectors/kafka/sink.rs b/src/connectors/kafka/sink.rs index 447955686..83b4ec371 100644 --- a/src/connectors/kafka/sink.rs +++ b/src/connectors/kafka/sink.rs @@ -17,9 +17,9 @@ */ use crate::connectors::common::build_runtime; use crate::connectors::common::processor::Processor; +use crate::connectors::kafka::ConsumerRecord; use crate::connectors::kafka::consumer::KafkaStreams; use crate::connectors::kafka::processor::StreamWorker; -use crate::connectors::kafka::ConsumerRecord; use anyhow::Result; use futures_util::StreamExt; use rdkafka::consumer::Consumer; diff --git a/src/connectors/mod.rs b/src/connectors/mod.rs index d3d3dd73f..e06e48cc4 100644 --- a/src/connectors/mod.rs +++ b/src/connectors/mod.rs @@ -21,9 +21,9 @@ use std::sync::Arc; use actix_web_prometheus::PrometheusMetrics; use common::{processor::Processor, shutdown::Shutdown}; use kafka::{ - config::KafkaConfig, consumer::KafkaStreams, metrics::KafkaMetricsCollector, - processor::ParseableSinkProcessor, rebalance_listener::RebalanceListener, - sink::KafkaSinkConnector, state::StreamState, ConsumerRecord, KafkaContext, + ConsumerRecord, KafkaContext, config::KafkaConfig, consumer::KafkaStreams, + metrics::KafkaMetricsCollector, processor::ParseableSinkProcessor, + rebalance_listener::RebalanceListener, sink::KafkaSinkConnector, state::StreamState, }; use prometheus::Registry; use tokio::sync::RwLock; diff --git a/src/correlation.rs b/src/correlation.rs index f7bb65eec..9ec183004 100644 --- a/src/correlation.rs +++ b/src/correlation.rs @@ -18,7 +18,7 @@ use std::collections::{HashMap, HashSet}; -use actix_web::{http::header::ContentType, Error}; +use actix_web::{Error, http::header::ContentType}; use chrono::Utc; use datafusion::error::DataFusionError; use http::StatusCode; @@ -37,7 +37,7 @@ use crate::{ }, parseable::PARSEABLE, query::QUERY_SESSION, - rbac::{map::SessionKey, Users}, + rbac::{Users, map::SessionKey}, storage::ObjectStorageError, users::filters::FilterQuery, utils::{get_hash, user_auth_for_datasets}, diff --git a/src/enterprise/utils.rs b/src/enterprise/utils.rs index ba345c10a..b93b306ef 100644 --- a/src/enterprise/utils.rs +++ b/src/enterprise/utils.rs @@ -8,12 +8,13 @@ use relative_path::RelativePathBuf; use crate::query::stream_schema_provider::extract_primary_filter; use crate::{ catalog::{ + Snapshot, manifest::{File, Manifest}, - snapshot, Snapshot, + snapshot, }, event, parseable::PARSEABLE, - query::{stream_schema_provider::ManifestExt, PartialTimeFilter}, + query::{PartialTimeFilter, stream_schema_provider::ManifestExt}, storage::{ObjectStorage, ObjectStorageError, ObjectStoreFormat, STREAM_ROOT_DIRECTORY}, utils::time::TimeRange, }; diff --git a/src/event/format/json.rs b/src/event/format/json.rs index 36ffe0427..68cd50d26 100644 --- a/src/event/format/json.rs +++ b/src/event/format/json.rs @@ -21,7 +21,7 @@ use anyhow::anyhow; use arrow_array::RecordBatch; -use arrow_json::reader::{infer_json_schema_from_iterator, ReaderBuilder}; +use arrow_json::reader::{ReaderBuilder, infer_json_schema_from_iterator}; use arrow_schema::{DataType, Field, Fields, Schema}; use chrono::{DateTime, NaiveDate, NaiveDateTime, Utc}; use datafusion::arrow::util::bit_util::round_upto_multiple_of_64; diff --git a/src/event/format/known_schema.rs b/src/event/format/known_schema.rs index 3d6353cd8..51ba1e09a 100644 --- a/src/event/format/known_schema.rs +++ b/src/event/format/known_schema.rs @@ -35,9 +35,13 @@ pub static KNOWN_SCHEMA_LIST: Lazy = #[derive(Debug, thiserror::Error)] pub enum Error { - #[error("Event is not in the expected text/JSON format for {0}. Please create an issue on our GitHub repository (github.com/parseablehq/parseable), or reach out in the #support-channel on our Slack community for assistance. Include this error message and your log sample to help us improve compatibility.")] + #[error( + "Event is not in the expected text/JSON format for {0}. Please create an issue on our GitHub repository (github.com/parseablehq/parseable), or reach out in the #support-channel on our Slack community for assistance. Include this error message and your log sample to help us improve compatibility." + )] Unacceptable(String), - #[error("Unsupported log format: '{0}'. This format cannot be parsed by the current version. Please create an issue on our GitHub repository (github.com/parseablehq/parseable) with a sample log event, or reach out in the #support-channel on our Slack community for assistance. Include this error message and your log sample to help us improve compatibility.")] + #[error( + "Unsupported log format: '{0}'. This format cannot be parsed by the current version. Please create an issue on our GitHub repository (github.com/parseablehq/parseable) with a sample log event, or reach out in the #support-channel on our Slack community for assistance. Include this error message and your log sample to help us improve compatibility." + )] Unknown(String), } @@ -441,7 +445,7 @@ mod tests { // Updated to handle check_or_extract for array if let Value::Array(ref mut array) = json_value { for item in array { - if let Value::Object(ref mut obj) = item { + if let Value::Object(obj) = item { let schema = processor.schema_definitions.get("custom_app_log").unwrap(); schema.check_or_extract(obj, Some("raw_log")); } diff --git a/src/event/format/mod.rs b/src/event/format/mod.rs index abc0ec300..8618dc46f 100644 --- a/src/event/format/mod.rs +++ b/src/event/format/mod.rs @@ -23,7 +23,7 @@ use std::{ sync::Arc, }; -use anyhow::{anyhow, Error as AnyError}; +use anyhow::{Error as AnyError, anyhow}; use arrow_array::RecordBatch; use arrow_schema::{DataType, Field, Schema, TimeUnit}; use chrono::{DateTime, Utc}; @@ -36,7 +36,7 @@ use crate::{ utils::arrow::{add_parseable_fields, get_field}, }; -use super::{Event, DEFAULT_TIMESTAMP_KEY}; +use super::{DEFAULT_TIMESTAMP_KEY, Event}; pub mod json; pub mod known_schema; diff --git a/src/event/mod.rs b/src/event/mod.rs index 64c942de9..4da88de1a 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -26,10 +26,10 @@ use std::sync::Arc; use self::error::EventError; use crate::{ + LOCK_EXPECT, metadata::update_stats, - parseable::{StagingError, PARSEABLE}, + parseable::{PARSEABLE, StagingError}, storage::StreamType, - LOCK_EXPECT, }; use chrono::NaiveDateTime; use std::collections::HashMap; diff --git a/src/handlers/airplane.rs b/src/handlers/airplane.rs index 0bb1f3457..5c0265f8e 100644 --- a/src/handlers/airplane.rs +++ b/src/handlers/airplane.rs @@ -17,8 +17,8 @@ */ use arrow_array::RecordBatch; -use arrow_flight::flight_service_server::FlightServiceServer; use arrow_flight::PollInfo; +use arrow_flight::flight_service_server::FlightServiceServer; use arrow_schema::ArrowError; use serde_json::json; use std::net::SocketAddr; @@ -37,7 +37,7 @@ use crate::handlers::http::query::into_query; use crate::handlers::livetail::cross_origin_config; use crate::metrics::QUERY_EXECUTE_TIME; use crate::parseable::PARSEABLE; -use crate::query::{execute, resolve_stream_names, QUERY_SESSION}; +use crate::query::{QUERY_SESSION, execute, resolve_stream_names}; use crate::utils::arrow::flight::{ append_temporary_events, get_query_from_ticket, into_flight_data, run_do_get_rpc, send_to_ingester, @@ -45,9 +45,9 @@ use crate::utils::arrow::flight::{ use crate::utils::time::TimeRange; use crate::utils::user_auth_for_datasets; use arrow_flight::{ - flight_service_server::FlightService, Action, ActionType, Criteria, Empty, FlightData, - FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse, PutResult, SchemaAsIpc, - SchemaResult, Ticket, + Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightInfo, + HandshakeRequest, HandshakeResponse, PutResult, SchemaAsIpc, SchemaResult, Ticket, + flight_service_server::FlightService, }; use arrow_ipc::writer::IpcWriteOptions; use futures::stream; @@ -189,10 +189,10 @@ impl FlightService for AirServiceImpl { rbac::Response::UnAuthorized => { return Err(Status::permission_denied( "user is not authorized to access this resource", - )) + )); } rbac::Response::ReloadRequired => { - return Err(Status::unauthenticated("reload required")) + return Err(Status::unauthenticated("reload required")); } } @@ -214,7 +214,7 @@ impl FlightService for AirServiceImpl { actix_web::Either::Right(_) => { return Err(Status::failed_precondition( "Expected batch results, got stream", - )) + )); } }; diff --git a/src/handlers/http/about.rs b/src/handlers/http/about.rs index 57bf0197a..ce80c1b0e 100644 --- a/src/handlers/http/about.rs +++ b/src/handlers/http/about.rs @@ -17,7 +17,7 @@ */ use actix_web::web::Json; -use serde_json::{json, Value}; +use serde_json::{Value, json}; use crate::{ about::{self, get_latest_release}, diff --git a/src/handlers/http/alerts.rs b/src/handlers/http/alerts.rs index bbc073785..57454dda7 100644 --- a/src/handlers/http/alerts.rs +++ b/src/handlers/http/alerts.rs @@ -25,13 +25,13 @@ use crate::{ utils::{actix::extract_session_key_from_req, user_auth_for_query}, }; use actix_web::{ - web::{self, Json, Path}, HttpRequest, Responder, + web::{self, Json, Path}, }; use bytes::Bytes; use ulid::Ulid; -use crate::alerts::{AlertConfig, AlertError, AlertRequest, AlertState, ALERTS}; +use crate::alerts::{ALERTS, AlertConfig, AlertError, AlertRequest, AlertState}; // GET /alerts /// User needs at least a read access to the stream(s) that is being referenced in an alert diff --git a/src/handlers/http/audit.rs b/src/handlers/http/audit.rs index bf9991a02..f30e89646 100644 --- a/src/handlers/http/audit.rs +++ b/src/handlers/http/audit.rs @@ -29,7 +29,7 @@ use ulid::Ulid; use crate::{ audit::AuditLogBuilder, handlers::{KINESIS_COMMON_ATTRIBUTES_KEY, STREAM_NAME_HEADER_KEY}, - rbac::{map::SessionKey, Users}, + rbac::{Users, map::SessionKey}, }; const DROP_HEADERS: [&str; 4] = ["authorization", "cookie", "user-agent", "x-p-stream"]; diff --git a/src/handlers/http/cluster/mod.rs b/src/handlers/http/cluster/mod.rs index 39cc4ded9..9c549c91e 100644 --- a/src/handlers/http/cluster/mod.rs +++ b/src/handlers/http/cluster/mod.rs @@ -18,28 +18,29 @@ pub mod utils; -use futures::{future, stream, StreamExt}; +use futures::{StreamExt, future, stream}; use std::collections::HashSet; use std::future::Future; use std::sync::Arc; use std::time::Duration; +use actix_web::Responder; use actix_web::http::header::{self, HeaderMap}; use actix_web::web::Path; -use actix_web::Responder; use bytes::Bytes; use chrono::Utc; use clokwerk::{AsyncScheduler, Interval}; -use http::{header as http_header, StatusCode}; +use http::{StatusCode, header as http_header}; use itertools::Itertools; use relative_path::RelativePathBuf; use serde::de::{DeserializeOwned, Error}; use serde_json::error::Error as SerdeError; -use serde_json::{to_vec, Value as JsonValue}; +use serde_json::{Value as JsonValue, to_vec}; use tracing::{error, info, warn}; use url::Url; -use utils::{check_liveness, to_url_string, IngestionStats, QueriedStats, StorageStats}; +use utils::{IngestionStats, QueriedStats, StorageStats, check_liveness, to_url_string}; +use crate::INTRA_CLUSTER_CLIENT; use crate::handlers::http::ingest::ingest_internal_stream; use crate::metrics::prom_utils::Metrics; use crate::parseable::PARSEABLE; @@ -50,7 +51,6 @@ use crate::storage::{ ObjectStorage, ObjectStorageError, ObjectStoreFormat, PARSEABLE_ROOT_DIRECTORY, STREAM_ROOT_DIRECTORY, }; -use crate::INTRA_CLUSTER_CLIENT; use super::base_path_without_preceding_slash; use super::ingest::PostError; diff --git a/src/handlers/http/cluster/utils.rs b/src/handlers/http/cluster/utils.rs index cef80557f..103fed42c 100644 --- a/src/handlers/http/cluster/utils.rs +++ b/src/handlers/http/cluster/utils.rs @@ -17,8 +17,8 @@ */ use crate::{ - handlers::http::{base_path_without_preceding_slash, modal::NodeType}, INTRA_CLUSTER_CLIENT, + handlers::http::{base_path_without_preceding_slash, modal::NodeType}, }; use actix_web::http::header; use chrono::{DateTime, Utc}; diff --git a/src/handlers/http/correlation.rs b/src/handlers/http/correlation.rs index f9c77f9da..f37b986a5 100644 --- a/src/handlers/http/correlation.rs +++ b/src/handlers/http/correlation.rs @@ -17,7 +17,7 @@ */ use actix_web::web::{Json, Path}; -use actix_web::{web, HttpRequest, HttpResponse, Responder}; +use actix_web::{HttpRequest, HttpResponse, Responder, web}; use anyhow::Error; use itertools::Itertools; @@ -25,7 +25,7 @@ use crate::rbac::Users; use crate::utils::actix::extract_session_key_from_req; use crate::utils::{get_hash, get_user_from_request, user_auth_for_datasets}; -use crate::correlation::{CorrelationConfig, CorrelationError, CORRELATIONS}; +use crate::correlation::{CORRELATIONS, CorrelationConfig, CorrelationError}; pub async fn list(req: HttpRequest) -> Result { let session_key = extract_session_key_from_req(&req) diff --git a/src/handlers/http/demo_data.rs b/src/handlers/http/demo_data.rs index 989f475c6..005e082ef 100644 --- a/src/handlers/http/demo_data.rs +++ b/src/handlers/http/demo_data.rs @@ -21,7 +21,7 @@ use crate::{ option::Mode, parseable::PARSEABLE, }; -use actix_web::{web, HttpRequest, HttpResponse}; +use actix_web::{HttpRequest, HttpResponse, web}; use std::{collections::HashMap, fs, process::Command}; #[cfg(unix)] diff --git a/src/handlers/http/health_check.rs b/src/handlers/http/health_check.rs index 82825b415..0f5b2f591 100644 --- a/src/handlers/http/health_check.rs +++ b/src/handlers/http/health_check.rs @@ -19,12 +19,12 @@ use std::sync::Arc; use actix_web::{ + HttpResponse, body::MessageBody, dev::{ServiceRequest, ServiceResponse}, error::Error, error::ErrorServiceUnavailable, middleware::Next, - HttpResponse, }; use http::StatusCode; use once_cell::sync::Lazy; diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index 78a0f4525..5c326c308 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -19,7 +19,7 @@ use std::collections::{HashMap, HashSet}; use actix_web::web::{Json, Path}; -use actix_web::{http::header::ContentType, HttpRequest, HttpResponse}; +use actix_web::{HttpRequest, HttpResponse, http::header::ContentType}; use arrow_array::RecordBatch; use bytes::Bytes; use chrono::Utc; @@ -35,7 +35,7 @@ use crate::option::Mode; use crate::otel::logs::OTEL_LOG_KNOWN_FIELD_LIST; use crate::otel::metrics::OTEL_METRICS_KNOWN_FIELD_LIST; use crate::otel::traces::OTEL_TRACES_KNOWN_FIELD_LIST; -use crate::parseable::{StreamNotFound, PARSEABLE}; +use crate::parseable::{PARSEABLE, StreamNotFound}; use crate::storage::{ObjectStorageError, StreamType}; use crate::utils::header_parsing::ParseHeaderError; use crate::utils::json::{flatten::JsonFlattenError, strict::StrictValue}; @@ -395,7 +395,7 @@ pub async fn post_event( let mut json = json.into_inner(); match &log_source { LogSource::OtelLogs | LogSource::OtelMetrics | LogSource::OtelTraces => { - return Err(PostError::OtelNotSupported) + return Err(PostError::OtelNotSupported); } LogSource::Custom(src) => { KNOWN_SCHEMA_LIST.extract_from_inline_log( @@ -489,9 +489,13 @@ pub enum PostError { MissingTimePartition(String), #[error("{0}")] KnownFormat(#[from] known_schema::Error), - #[error("Ingestion is not allowed to stream {0} as it is already associated with a different OTEL format")] + #[error( + "Ingestion is not allowed to stream {0} as it is already associated with a different OTEL format" + )] IncorrectLogFormat(String), - #[error("Failed to ingest events in dataset {0}. Total number of fields {1} exceeds the permissible limit of {2}. We recommend creating a new dataset beyond {2} for better query performance.")] + #[error( + "Failed to ingest events in dataset {0}. Total number of fields {1} exceeds the permissible limit of {2}. We recommend creating a new dataset beyond {2} for better query performance." + )] FieldsCountLimitExceeded(String, usize, usize), #[error("Invalid query parameter")] InvalidQueryParameter, @@ -548,7 +552,7 @@ mod tests { use std::{collections::HashMap, sync::Arc}; use crate::{ - event::format::{json, EventFormat}, + event::format::{EventFormat, json}, metadata::SchemaVersion, }; @@ -688,9 +692,11 @@ mod tests { .into_iter(), ); - assert!(json::Event::new(json) - .into_recordbatch(&schema, false, None, SchemaVersion::V0, &HashMap::new()) - .is_err()); + assert!( + json::Event::new(json) + .into_recordbatch(&schema, false, None, SchemaVersion::V0, &HashMap::new()) + .is_err() + ); } #[test] @@ -890,9 +896,11 @@ mod tests { .into_iter(), ); - assert!(json::Event::new(json) - .into_recordbatch(&schema, false, None, SchemaVersion::V0, &HashMap::new()) - .is_err()); + assert!( + json::Event::new(json) + .into_recordbatch(&schema, false, None, SchemaVersion::V0, &HashMap::new()) + .is_err() + ); } #[test] diff --git a/src/handlers/http/kinesis.rs b/src/handlers/http/kinesis.rs index c714729c7..cd2958cb3 100644 --- a/src/handlers/http/kinesis.rs +++ b/src/handlers/http/kinesis.rs @@ -16,7 +16,7 @@ * */ -use base64::{engine::general_purpose::STANDARD, Engine as _}; +use base64::{Engine as _, engine::general_purpose::STANDARD}; use serde::{Deserialize, Serialize}; use serde_json::{Map, Value}; use std::str; @@ -92,7 +92,8 @@ pub async fn flatten_kinesis_logs(message: Message) -> Result, anyhow // This is a fallback to ensure we don't lose data. tracing::warn!( "Kinesis log with requestId {} and timestamp {} has more than the allowed levels of nesting, skipping flattening for this record.", - message.request_id, message.timestamp + message.request_id, + message.timestamp ); vec_kinesis_json.push(json); } diff --git a/src/handlers/http/llm.rs b/src/handlers/http/llm.rs index cc63895d7..3ecef38b1 100644 --- a/src/handlers/http/llm.rs +++ b/src/handlers/http/llm.rs @@ -16,13 +16,13 @@ * */ -use actix_web::{http::header::ContentType, web, HttpResponse, Result}; -use http::{header, StatusCode}; +use actix_web::{HttpResponse, Result, http::header::ContentType, web}; +use http::{StatusCode, header}; use itertools::Itertools; use reqwest; -use serde_json::{json, Value}; +use serde_json::{Value, json}; -use crate::{parseable::StreamNotFound, parseable::PARSEABLE}; +use crate::{parseable::PARSEABLE, parseable::StreamNotFound}; const OPEN_AI_URL: &str = "https://api.openai.com/v1/chat/completions"; diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index 6d1e00697..eb1f6911f 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -20,29 +20,29 @@ use self::error::StreamError; use super::cluster::utils::{IngestionStats, QueriedStats, StorageStats}; use super::query::update_schema_when_distributed; use crate::event::format::override_data_type; -use crate::hottier::{HotTierManager, StreamHotTier, CURRENT_HOT_TIER_VERSION}; +use crate::hottier::{CURRENT_HOT_TIER_VERSION, HotTierManager, StreamHotTier}; use crate::metadata::SchemaVersion; use crate::metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE}; -use crate::parseable::{StreamNotFound, PARSEABLE}; -use crate::rbac::role::Action; +use crate::parseable::{PARSEABLE, StreamNotFound}; use crate::rbac::Users; -use crate::stats::{event_labels_date, storage_size_labels_date, Stats}; +use crate::rbac::role::Action; +use crate::stats::{Stats, event_labels_date, storage_size_labels_date}; use crate::storage::retention::Retention; use crate::storage::{StreamInfo, StreamType}; use crate::utils::actix::extract_session_key_from_req; use crate::utils::json::flatten::{ self, convert_to_array, generic_flattening, has_more_than_max_allowed_levels, }; -use crate::{stats, validator, LOCK_EXPECT}; +use crate::{LOCK_EXPECT, stats, validator}; use actix_web::http::StatusCode; use actix_web::web::{Json, Path}; -use actix_web::{web, HttpRequest, Responder}; +use actix_web::{HttpRequest, Responder, web}; use arrow_json::reader::infer_json_schema_from_iterator; use bytes::Bytes; use chrono::Utc; use itertools::Itertools; -use serde_json::{json, Value}; +use serde_json::{Value, json}; use std::fs; use std::sync::Arc; use tracing::warn; @@ -115,14 +115,14 @@ pub async fn detect_schema(Json(json): Json) -> Result { return Err(StreamError::Custom { msg: e.to_string(), status: StatusCode::BAD_REQUEST, - }) + }); } }; if let Err(err) = flatten::flatten(&mut flattened_json, "_", None, None, None, false) { @@ -142,7 +142,7 @@ pub async fn detect_schema(Json(json): Json) -> Result return Err( - ErrorForbidden("You don't have permission to access this resource. Please contact your administrator for assistance.") - ), - rbac::Response::ReloadRequired => return Err( - ErrorUnauthorized("Your session has expired or is no longer valid. Please re-authenticate to access this resource.") - ), + rbac::Response::UnAuthorized => { + return Err(ErrorForbidden( + "You don't have permission to access this resource. Please contact your administrator for assistance.", + )); + } + rbac::Response::ReloadRequired => { + return Err(ErrorUnauthorized( + "Your session has expired or is no longer valid. Please re-authenticate to access this resource.", + )); + } _ => {} } diff --git a/src/handlers/http/mod.rs b/src/handlers/http/mod.rs index f56b156e5..425986952 100644 --- a/src/handlers/http/mod.rs +++ b/src/handlers/http/mod.rs @@ -25,7 +25,7 @@ use itertools::Itertools; use modal::{NodeMetadata, NodeType}; use serde_json::Value; -use crate::{parseable::PARSEABLE, storage::STREAM_ROOT_DIRECTORY, INTRA_CLUSTER_CLIENT}; +use crate::{INTRA_CLUSTER_CLIENT, parseable::PARSEABLE, storage::STREAM_ROOT_DIRECTORY}; use self::query::Query; diff --git a/src/handlers/http/modal/ingest/ingestor_logstream.rs b/src/handlers/http/modal/ingest/ingestor_logstream.rs index bb87e03f3..d53d34c79 100644 --- a/src/handlers/http/modal/ingest/ingestor_logstream.rs +++ b/src/handlers/http/modal/ingest/ingestor_logstream.rs @@ -19,8 +19,8 @@ use std::fs; use actix_web::{ - web::{Json, Path}, HttpRequest, Responder, + web::{Json, Path}, }; use bytes::Bytes; use http::StatusCode; @@ -29,7 +29,7 @@ use tracing::warn; use crate::{ catalog::remove_manifest_from_snapshot, handlers::http::logstream::error::StreamError, - parseable::{StreamNotFound, PARSEABLE}, + parseable::{PARSEABLE, StreamNotFound}, stats, }; diff --git a/src/handlers/http/modal/ingest/ingestor_rbac.rs b/src/handlers/http/modal/ingest/ingestor_rbac.rs index ee50c0fce..4f1862898 100644 --- a/src/handlers/http/modal/ingest/ingestor_rbac.rs +++ b/src/handlers/http/modal/ingest/ingestor_rbac.rs @@ -18,15 +18,15 @@ use std::collections::HashSet; -use actix_web::{web, Responder}; +use actix_web::{Responder, web}; use tokio::sync::Mutex; use crate::{ handlers::http::{modal::utils::rbac_utils::get_metadata, rbac::RBACError}, rbac::{ + Users, map::roles, user::{self, User as ParseableUser}, - Users, }, storage, }; diff --git a/src/handlers/http/modal/ingest/ingestor_role.rs b/src/handlers/http/modal/ingest/ingestor_role.rs index b2656c979..5744eb3fe 100644 --- a/src/handlers/http/modal/ingest/ingestor_role.rs +++ b/src/handlers/http/modal/ingest/ingestor_role.rs @@ -19,8 +19,8 @@ use std::collections::HashSet; use actix_web::{ - web::{self, Json}, HttpResponse, Responder, + web::{self, Json}, }; use crate::{ diff --git a/src/handlers/http/modal/ingest_server.rs b/src/handlers/http/modal/ingest_server.rs index a0bf9ed99..04ba8f981 100644 --- a/src/handlers/http/modal/ingest_server.rs +++ b/src/handlers/http/modal/ingest_server.rs @@ -19,22 +19,22 @@ use std::sync::Arc; use std::thread; +use actix_web::Scope; use actix_web::middleware::from_fn; use actix_web::web; -use actix_web::Scope; use actix_web_prometheus::PrometheusMetrics; use async_trait::async_trait; use base64::Engine; use bytes::Bytes; use relative_path::RelativePathBuf; use serde_json::Value; -use tokio::sync::oneshot; use tokio::sync::OnceCell; +use tokio::sync::oneshot; use crate::handlers::http::modal::NodeType; use crate::sync::sync_start; use crate::{ - analytics, + Server, analytics, handlers::{ airplane, http::{ @@ -46,14 +46,14 @@ use crate::{ migration, parseable::PARSEABLE, rbac::role::Action, - storage::{object_storage::parseable_json_path, ObjectStorageError, PARSEABLE_ROOT_DIRECTORY}, - sync, Server, + storage::{ObjectStorageError, PARSEABLE_ROOT_DIRECTORY, object_storage::parseable_json_path}, + sync, }; use super::IngestorMetadata; use super::{ - ingest::{ingestor_logstream, ingestor_rbac, ingestor_role}, OpenIdClient, ParseableServer, + ingest::{ingestor_logstream, ingestor_rbac, ingestor_role}, }; pub const INGESTOR_EXPECT: &str = "Ingestor Metadata should be set in ingestor mode"; @@ -336,7 +336,9 @@ async fn validate_credentials() -> anyhow::Result<()> { let token = format!("Basic {token}"); if check != token { - return Err(anyhow::anyhow!("Credentials do not match with other ingestors. Please check your credentials and try again.")); + return Err(anyhow::anyhow!( + "Credentials do not match with other ingestors. Please check your credentials and try again." + )); } } diff --git a/src/handlers/http/modal/mod.rs b/src/handlers/http/modal/mod.rs index 7da844a93..b925c4ac0 100644 --- a/src/handlers/http/modal/mod.rs +++ b/src/handlers/http/modal/mod.rs @@ -18,11 +18,11 @@ use std::{fmt, path::Path, sync::Arc}; -use actix_web::{middleware::from_fn, web::ServiceConfig, App, HttpServer}; +use actix_web::{App, HttpServer, middleware::from_fn, web::ServiceConfig}; use actix_web_prometheus::PrometheusMetrics; use anyhow::Context; use async_trait::async_trait; -use base64::{prelude::BASE64_STANDARD, Engine}; +use base64::{Engine, prelude::BASE64_STANDARD}; use bytes::Bytes; use futures::future; use openid::Discovered; @@ -34,7 +34,7 @@ use tokio::sync::oneshot; use tracing::{error, info, warn}; use crate::{ - alerts::{target::TARGETS, ALERTS}, + alerts::{ALERTS, target::TARGETS}, cli::Options, correlation::CORRELATIONS, oidc::Claims, @@ -45,7 +45,7 @@ use crate::{ utils::get_node_id, }; -use super::{audit, cross_origin_config, health_check, resource_check, API_BASE_PATH, API_VERSION}; +use super::{API_BASE_PATH, API_VERSION, audit, cross_origin_config, health_check, resource_check}; pub mod ingest; pub mod ingest_server; diff --git a/src/handlers/http/modal/query/querier_logstream.rs b/src/handlers/http/modal/query/querier_logstream.rs index 2be17af23..9db82c333 100644 --- a/src/handlers/http/modal/query/querier_logstream.rs +++ b/src/handlers/http/modal/query/querier_logstream.rs @@ -20,8 +20,8 @@ use core::str; use std::{collections::HashMap, fs}; use actix_web::{ - web::{self, Path}, HttpRequest, Responder, + web::{self, Path}, }; use bytes::Bytes; use chrono::Utc; @@ -37,15 +37,15 @@ use crate::{ base_path_without_preceding_slash, cluster::{ self, fetch_daily_stats, fetch_stats_from_ingestors, sync_streams_with_ingestors, - utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats}, + utils::{IngestionStats, QueriedStats, StorageStats, merge_quried_stats}, }, logstream::error::StreamError, modal::{NodeMetadata, NodeType}, }, hottier::HotTierManager, - parseable::{StreamNotFound, PARSEABLE}, + parseable::{PARSEABLE, StreamNotFound}, stats, - storage::{ObjectStoreFormat, StreamType, STREAM_ROOT_DIRECTORY}, + storage::{ObjectStoreFormat, STREAM_ROOT_DIRECTORY, StreamType}, }; const STATS_DATE_QUERY_PARAM: &str = "date"; diff --git a/src/handlers/http/modal/query/querier_rbac.rs b/src/handlers/http/modal/query/querier_rbac.rs index 8b0f81551..c28a3c9f3 100644 --- a/src/handlers/http/modal/query/querier_rbac.rs +++ b/src/handlers/http/modal/query/querier_rbac.rs @@ -18,7 +18,7 @@ use std::collections::HashSet; -use actix_web::{web, Responder}; +use actix_web::{Responder, web}; use tokio::sync::Mutex; use crate::{ @@ -31,8 +31,9 @@ use crate::{ rbac::RBACError, }, rbac::{ + Users, map::{roles, write_user_groups}, - user, Users, + user, }, validator, }; diff --git a/src/handlers/http/modal/query/querier_role.rs b/src/handlers/http/modal/query/querier_role.rs index 40a7024b4..59764b264 100644 --- a/src/handlers/http/modal/query/querier_role.rs +++ b/src/handlers/http/modal/query/querier_role.rs @@ -19,8 +19,8 @@ use std::collections::HashSet; use actix_web::{ - web::{self, Json}, HttpResponse, Responder, + web::{self, Json}, }; use crate::{ diff --git a/src/handlers/http/modal/query_server.rs b/src/handlers/http/modal/query_server.rs index 78c5d5383..8533f392b 100644 --- a/src/handlers/http/modal/query_server.rs +++ b/src/handlers/http/modal/query_server.rs @@ -22,27 +22,27 @@ use std::thread; use crate::handlers::airplane; use crate::handlers::http::cluster::{self, init_cluster_metrics_schedular}; use crate::handlers::http::middleware::{DisAllowRootUser, RouteExt}; +use crate::handlers::http::{MAX_EVENT_PAYLOAD_SIZE, logstream}; use crate::handlers::http::{base_path, prism_base_path, resource_check}; -use crate::handlers::http::{logstream, MAX_EVENT_PAYLOAD_SIZE}; use crate::handlers::http::{rbac, role}; use crate::hottier::HotTierManager; use crate::rbac::role::Action; use crate::sync::sync_start; use crate::{analytics, migration, storage, sync}; use actix_web::middleware::from_fn; -use actix_web::web::{resource, ServiceConfig}; -use actix_web::{web, Scope}; +use actix_web::web::{ServiceConfig, resource}; +use actix_web::{Scope, web}; use actix_web_prometheus::PrometheusMetrics; use async_trait::async_trait; use bytes::Bytes; -use tokio::sync::{oneshot, OnceCell}; +use tokio::sync::{OnceCell, oneshot}; use tracing::info; -use crate::parseable::PARSEABLE; use crate::Server; +use crate::parseable::PARSEABLE; use super::query::{querier_ingest, querier_logstream, querier_rbac, querier_role}; -use super::{load_on_init, NodeType, OpenIdClient, ParseableServer, QuerierMetadata}; +use super::{NodeType, OpenIdClient, ParseableServer, QuerierMetadata, load_on_init}; pub struct QueryServer; pub static QUERIER_META: OnceCell> = OnceCell::const_new(); @@ -91,8 +91,8 @@ impl ParseableServer for QueryServer { // parseable can't use local storage for persistence when running a distributed setup if PARSEABLE.storage.name() == "drive" { return Err(anyhow::anyhow!( - "This instance of the Parseable server has been configured to run in a distributed setup, it doesn't support local storage.", - )); + "This instance of the Parseable server has been configured to run in a distributed setup, it doesn't support local storage.", + )); } let mut parseable_json = PARSEABLE.validate_storage().await?; diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index 37c411c23..1bdbe3908 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -38,11 +38,11 @@ use crate::storage; use crate::sync; use crate::sync::sync_start; +use actix_web::Resource; +use actix_web::Scope; use actix_web::middleware::from_fn; use actix_web::web; use actix_web::web::resource; -use actix_web::Resource; -use actix_web::Scope; use actix_web_prometheus::PrometheusMetrics; use actix_web_static_files::ResourceFiles; use async_trait::async_trait; @@ -51,19 +51,19 @@ use tokio::sync::oneshot; use crate::{ handlers::http::{ - self, ingest, llm, logstream, + self, MAX_EVENT_PAYLOAD_SIZE, ingest, llm, logstream, middleware::{DisAllowRootUser, RouteExt}, - oidc, role, MAX_EVENT_PAYLOAD_SIZE, + oidc, role, }, parseable::PARSEABLE, rbac::role::Action, }; // use super::generate; -use super::generate; -use super::load_on_init; use super::OpenIdClient; use super::ParseableServer; +use super::generate; +use super::load_on_init; pub struct Server; diff --git a/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs index ee1a90cfd..bffd7f882 100644 --- a/src/handlers/http/modal/utils/ingest_utils.rs +++ b/src/handlers/http/modal/utils/ingest_utils.rs @@ -28,15 +28,15 @@ use tracing::warn; use crate::{ event::{ - format::{json, EventFormat, LogSource}, FORMAT_KEY, SOURCE_IP_KEY, USER_AGENT_KEY, + format::{EventFormat, LogSource, json}, }, handlers::{ + EXTRACT_LOG_KEY, LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY, http::{ ingest::PostError, - kinesis::{flatten_kinesis_logs, Message}, + kinesis::{Message, flatten_kinesis_logs}, }, - EXTRACT_LOG_KEY, LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY, }, otel::{logs::flatten_otel_logs, metrics::flatten_otel_metrics, traces::flatten_otel_traces}, parseable::PARSEABLE, @@ -223,7 +223,7 @@ fn verify_dataset_fields_count(stream_name: &str) -> Result<(), PostError> { fields_count, dataset_fields_warn_threshold as usize, PARSEABLE.options.dataset_fields_allowed_limit - ); + ); } // Check if the fields count exceeds the limit // Return an error if the fields count exceeds the limit diff --git a/src/handlers/http/oidc.rs b/src/handlers/http/oidc.rs index 74c6290bf..d2d457ff4 100644 --- a/src/handlers/http/oidc.rs +++ b/src/handlers/http/oidc.rs @@ -19,10 +19,10 @@ use std::{collections::HashSet, sync::Arc}; use actix_web::{ - cookie::{time, Cookie, SameSite}, + HttpRequest, HttpResponse, + cookie::{Cookie, SameSite, time}, http::header::{self, ContentType}, web::{self, Data}, - HttpRequest, HttpResponse, }; use http::StatusCode; use openid::{Options, Token, Userinfo}; @@ -36,10 +36,9 @@ use crate::{ oidc::{Claims, DiscoveredClient}, parseable::PARSEABLE, rbac::{ - self, - map::{SessionKey, DEFAULT_ROLE}, + self, Users, + map::{DEFAULT_ROLE, SessionKey}, user::{self, User, UserType}, - Users, }, storage::{self, ObjectStorageError, StorageMetadata}, utils::actix::extract_session_key_from_req, @@ -82,7 +81,7 @@ pub async fn login( query, client, PARSEABLE.options.scope.to_string().as_str(), - )) + )); } (Some(session_key), client) => (session_key, client), }; @@ -90,7 +89,7 @@ pub async fn login( match Users.authorize(session_key.clone(), rbac::role::Action::Login, None, None) { rbac::Response::Authorized => (), rbac::Response::UnAuthorized | rbac::Response::ReloadRequired => { - return Err(OIDCError::Unauthorized) + return Err(OIDCError::Unauthorized); } } match session_key { @@ -280,21 +279,19 @@ fn redirect_no_oauth_setup(mut url: Url) -> HttpResponse { } pub fn cookie_session(id: Ulid) -> Cookie<'static> { - let authorization_cookie = Cookie::build(SESSION_COOKIE_NAME, id.to_string()) + Cookie::build(SESSION_COOKIE_NAME, id.to_string()) .max_age(time::Duration::days(COOKIE_AGE_DAYS as i64)) .same_site(SameSite::Strict) .path("/") - .finish(); - authorization_cookie + .finish() } pub fn cookie_username(username: &str) -> Cookie<'static> { - let authorization_cookie = Cookie::build(USER_COOKIE_NAME, username.to_string()) + Cookie::build(USER_COOKIE_NAME, username.to_string()) .max_age(time::Duration::days(COOKIE_AGE_DAYS as i64)) .same_site(SameSite::Strict) .path("/") - .finish(); - authorization_cookie + .finish() } pub async fn request_token( diff --git a/src/handlers/http/prism_home.rs b/src/handlers/http/prism_home.rs index e83063685..cc68d2c29 100644 --- a/src/handlers/http/prism_home.rs +++ b/src/handlers/http/prism_home.rs @@ -18,10 +18,10 @@ use std::collections::HashMap; -use actix_web::{web, HttpRequest, Responder}; +use actix_web::{HttpRequest, Responder, web}; use crate::{ - prism::home::{generate_home_response, generate_home_search_response, PrismHomeError}, + prism::home::{PrismHomeError, generate_home_response, generate_home_search_response}, utils::actix::extract_session_key_from_req, }; diff --git a/src/handlers/http/prism_logstream.rs b/src/handlers/http/prism_logstream.rs index 69b59f5a5..6241415ca 100644 --- a/src/handlers/http/prism_logstream.rs +++ b/src/handlers/http/prism_logstream.rs @@ -17,12 +17,12 @@ */ use actix_web::{ - web::{self, Json, Path}, HttpRequest, Responder, + web::{self, Json, Path}, }; use crate::{ - prism::logstream::{get_prism_logstream_info, PrismDatasetRequest, PrismLogstreamError}, + prism::logstream::{PrismDatasetRequest, PrismLogstreamError, get_prism_logstream_info}, utils::actix::extract_session_key_from_req, }; diff --git a/src/handlers/http/query.rs b/src/handlers/http/query.rs index 8ff2312da..2cf16ff76 100644 --- a/src/handlers/http/query.rs +++ b/src/handlers/http/query.rs @@ -30,12 +30,12 @@ use datafusion::error::DataFusionError; use datafusion::execution::context::SessionState; use datafusion::sql::sqlparser::parser::ParserError; use futures::stream::once; -use futures::{future, Stream, StreamExt}; +use futures::{Stream, StreamExt, future}; use futures_util::Future; use http::StatusCode; use itertools::Itertools; use serde::{Deserialize, Serialize}; -use serde_json::{json, Value}; +use serde_json::{Value, json}; use std::collections::HashMap; use std::pin::Pin; use std::sync::Arc; @@ -45,10 +45,10 @@ use tracing::{error, warn}; use crate::event::commit_schema; use crate::metrics::QUERY_EXECUTE_TIME; -use crate::parseable::{StreamNotFound, PARSEABLE}; +use crate::parseable::{PARSEABLE, StreamNotFound}; use crate::query::error::ExecuteError; -use crate::query::{execute, CountsRequest, Query as LogicalQuery}; -use crate::query::{resolve_stream_names, QUERY_SESSION}; +use crate::query::{CountsRequest, Query as LogicalQuery, execute}; +use crate::query::{QUERY_SESSION, resolve_stream_names}; use crate::rbac::Users; use crate::response::QueryResponse; use crate::storage::ObjectStorageError; @@ -102,7 +102,7 @@ pub async fn get_records_and_fields( let records = match records { Either::Left(vec_rb) => vec_rb, Either::Right(_) => { - return Err(QueryError::CustomError("Reject streaming response".into())) + return Err(QueryError::CustomError("Reject streaming response".into())); } }; @@ -221,7 +221,7 @@ async fn handle_non_streaming_query( Either::Right(_) => { return Err(QueryError::MalformedQuery( "Expected batch results, got stream", - )) + )); } }; let total_time = format!("{:?}", time.elapsed()); @@ -268,7 +268,7 @@ async fn handle_streaming_query( Either::Left(_) => { return Err(QueryError::MalformedQuery( "Expected stream results, got batch", - )) + )); } Either::Right(stream) => stream, }; diff --git a/src/handlers/http/rbac.rs b/src/handlers/http/rbac.rs index acaabf3a6..3bb320f4a 100644 --- a/src/handlers/http/rbac.rs +++ b/src/handlers/http/rbac.rs @@ -20,20 +20,19 @@ use std::collections::{HashMap, HashSet}; use crate::{ rbac::{ - self, + self, Users, map::{read_user_groups, roles}, role::model::DefaultPrivilege, user, utils::to_prism_user, - Users, }, storage::ObjectStorageError, validator::{self, error::UsernameValidationError}, }; use actix_web::{ + Responder, http::header::ContentType, web::{self, Path}, - Responder, }; use http::StatusCode; use itertools::Itertools; diff --git a/src/handlers/http/resource_check.rs b/src/handlers/http/resource_check.rs index f5cdb67c4..41e2a21e3 100644 --- a/src/handlers/http/resource_check.rs +++ b/src/handlers/http/resource_check.rs @@ -16,7 +16,7 @@ * */ -use std::sync::{atomic::AtomicBool, Arc, LazyLock}; +use std::sync::{Arc, LazyLock, atomic::AtomicBool}; use actix_web::{ body::MessageBody, @@ -27,11 +27,11 @@ use actix_web::{ }; use tokio::{ select, - time::{interval, Duration}, + time::{Duration, interval}, }; use tracing::{info, trace, warn}; -use crate::analytics::{refresh_sys_info, SYS_INFO}; +use crate::analytics::{SYS_INFO, refresh_sys_info}; use crate::parseable::PARSEABLE; static RESOURCE_CHECK_ENABLED: LazyLock> = diff --git a/src/handlers/http/role.rs b/src/handlers/http/role.rs index e37ab61c4..b2462dc08 100644 --- a/src/handlers/http/role.rs +++ b/src/handlers/http/role.rs @@ -19,16 +19,16 @@ use std::collections::HashSet; use actix_web::{ + HttpResponse, Responder, http::header::ContentType, web::{self, Json}, - HttpResponse, Responder, }; use http::StatusCode; use crate::{ parseable::PARSEABLE, rbac::{ - map::{mut_roles, mut_sessions, read_user_groups, users, DEFAULT_ROLE}, + map::{DEFAULT_ROLE, mut_roles, mut_sessions, read_user_groups, users}, role::model::DefaultPrivilege, }, storage::{self, ObjectStorageError, StorageMetadata}, diff --git a/src/handlers/http/targets.rs b/src/handlers/http/targets.rs index 3dec1cd5e..41cad80c7 100644 --- a/src/handlers/http/targets.rs +++ b/src/handlers/http/targets.rs @@ -1,13 +1,13 @@ use actix_web::{ - web::{self, Json, Path}, HttpRequest, Responder, + web::{self, Json, Path}, }; use itertools::Itertools; use ulid::Ulid; use crate::alerts::{ - target::{Target, TARGETS}, AlertError, + target::{TARGETS, Target}, }; // POST /targets diff --git a/src/handlers/http/users/dashboards.rs b/src/handlers/http/users/dashboards.rs index f0b0f66ef..ce48fe671 100644 --- a/src/handlers/http/users/dashboards.rs +++ b/src/handlers/http/users/dashboards.rs @@ -21,13 +21,13 @@ use std::collections::HashMap; use crate::{ handlers::http::rbac::RBACError, storage::ObjectStorageError, - users::dashboards::{validate_dashboard_id, Dashboard, Tile, DASHBOARDS}, + users::dashboards::{DASHBOARDS, Dashboard, Tile, validate_dashboard_id}, utils::{get_hash, get_user_from_request}, }; use actix_web::{ + HttpRequest, HttpResponse, Responder, http::header::ContentType, web::{self, Json, Path}, - HttpRequest, HttpResponse, Responder, }; use http::StatusCode; use serde_json::Error as SerdeError; diff --git a/src/handlers/http/users/filters.rs b/src/handlers/http/users/filters.rs index fe0f06d2c..4992b512e 100644 --- a/src/handlers/http/users/filters.rs +++ b/src/handlers/http/users/filters.rs @@ -19,14 +19,14 @@ use crate::{ handlers::http::rbac::RBACError, parseable::PARSEABLE, - storage::{object_storage::filter_path, ObjectStorageError}, - users::filters::{Filter, CURRENT_FILTER_VERSION, FILTERS}, + storage::{ObjectStorageError, object_storage::filter_path}, + users::filters::{CURRENT_FILTER_VERSION, FILTERS, Filter}, utils::{actix::extract_session_key_from_req, get_hash, get_user_from_request}, }; use actix_web::{ + HttpRequest, HttpResponse, Responder, http::header::ContentType, web::{self, Json, Path}, - HttpRequest, HttpResponse, Responder, }; use bytes::Bytes; use chrono::Utc; diff --git a/src/handlers/livetail.rs b/src/handlers/livetail.rs index 57630c93c..517b56d7c 100644 --- a/src/handlers/livetail.rs +++ b/src/handlers/livetail.rs @@ -19,8 +19,8 @@ use std::net::SocketAddr; use arrow_array::RecordBatch; -use arrow_flight::encode::FlightDataEncoderBuilder; use arrow_flight::PollInfo; +use arrow_flight::encode::FlightDataEncoderBuilder; use cookie::Cookie; use futures::stream::BoxStream; use futures_util::{Future, StreamExt, TryFutureExt, TryStreamExt}; @@ -31,15 +31,15 @@ use tonic::transport::{Identity, Server, ServerTlsConfig}; use tonic::{Request, Response, Status, Streaming}; use arrow_flight::{ - flight_service_server::FlightService, flight_service_server::FlightServiceServer, Action, - ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightInfo, HandshakeRequest, - HandshakeResponse, PutResult, SchemaResult, Ticket, + Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightInfo, + HandshakeRequest, HandshakeResponse, PutResult, SchemaResult, Ticket, + flight_service_server::FlightService, flight_service_server::FlightServiceServer, }; use tonic_web::GrpcWebLayer; use tower_http::cors::CorsLayer; use tracing::{info, warn}; -use crate::livetail::{Message, LIVETAIL}; +use crate::livetail::{LIVETAIL, Message}; use crate::parseable::PARSEABLE; use crate::rbac::map::SessionKey; use crate::rbac::{self, Users}; @@ -108,10 +108,10 @@ impl FlightService for FlightServiceImpl { rbac::Response::UnAuthorized => { return Err(Status::permission_denied( "user is not authenticated to access this resource", - )) + )); } rbac::Response::ReloadRequired => { - return Err(Status::unauthenticated("reload required")) + return Err(Status::unauthenticated("reload required")); } } @@ -267,11 +267,10 @@ pub fn extract_session_key(headers: &MetadataMap) -> Result Option { - let creds = header - .get("Authorization") + header + .get("authorization") .and_then(|value| value.to_str().ok()) - .and_then(|value| Credentials::from_header(value.to_string()).ok()); - creds + .and_then(|value| Credentials::from_header(value.to_string()).ok()) } fn extract_cookie(header: &MetadataMap) -> Option { diff --git a/src/hottier.rs b/src/hottier.rs index f51d58f3d..45c2af65f 100644 --- a/src/hottier.rs +++ b/src/hottier.rs @@ -33,9 +33,9 @@ use crate::{ }; use chrono::NaiveDate; use clokwerk::{AsyncScheduler, Interval, Job}; -use futures::{stream::FuturesUnordered, StreamExt, TryStreamExt}; +use futures::{StreamExt, TryStreamExt, stream::FuturesUnordered}; use futures_util::TryFutureExt; -use object_store::{local::LocalFileSystem, ObjectStore}; +use object_store::{ObjectStore, local::LocalFileSystem}; use once_cell::sync::OnceCell; use parquet::errors::ParquetError; use relative_path::RelativePathBuf; @@ -123,11 +123,13 @@ impl HotTierManager { existing_hot_tier_used_size = existing_hot_tier.used_size; if stream_hot_tier_size < existing_hot_tier_used_size { - return Err(HotTierError::ObjectStorageError(ObjectStorageError::Custom(format!( - "Reducing hot tier size is not supported, failed to reduce the hot tier size from {} to {}", - bytes_to_human_size(existing_hot_tier_used_size), - bytes_to_human_size(stream_hot_tier_size) - )))); + return Err(HotTierError::ObjectStorageError( + ObjectStorageError::Custom(format!( + "Reducing hot tier size is not supported, failed to reduce the hot tier size from {} to {}", + bytes_to_human_size(existing_hot_tier_used_size), + bytes_to_human_size(stream_hot_tier_size) + )), + )); } } @@ -149,8 +151,14 @@ impl HotTierManager { - existing_hot_tier_used_size as f64); if stream_hot_tier_size as f64 > max_allowed_hot_tier_size { - error!("disk_threshold: {}, used_disk_space: {}, total_hot_tier_used_size: {}, existing_hot_tier_used_size: {}, total_hot_tier_size: {}", - bytes_to_human_size(disk_threshold as u64), bytes_to_human_size(used_space), bytes_to_human_size(total_hot_tier_used_size), bytes_to_human_size(existing_hot_tier_used_size), bytes_to_human_size(total_hot_tier_size)); + error!( + "disk_threshold: {}, used_disk_space: {}, total_hot_tier_used_size: {}, existing_hot_tier_used_size: {}, total_hot_tier_size: {}", + bytes_to_human_size(disk_threshold as u64), + bytes_to_human_size(used_space), + bytes_to_human_size(total_hot_tier_used_size), + bytes_to_human_size(existing_hot_tier_used_size), + bytes_to_human_size(total_hot_tier_size) + ); return Err(HotTierError::ObjectStorageError( ObjectStorageError::Custom(format!( diff --git a/src/lib.rs b/src/lib.rs index 9ef940f48..ad82ae4e2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -53,7 +53,7 @@ mod validator; use std::time::Duration; pub use handlers::http::modal::{ - ingest_server::IngestServer, query_server::QueryServer, server::Server, ParseableServer, + ParseableServer, ingest_server::IngestServer, query_server::QueryServer, server::Server, }; use once_cell::sync::Lazy; use parseable::PARSEABLE; diff --git a/src/livetail.rs b/src/livetail.rs index eca90eceb..5723ec829 100644 --- a/src/livetail.rs +++ b/src/livetail.rs @@ -24,7 +24,7 @@ use std::{ use futures_util::Stream; use tokio::sync::mpsc::{ - self, error::TrySendError, Receiver, Sender, UnboundedReceiver, UnboundedSender, + self, Receiver, Sender, UnboundedReceiver, UnboundedSender, error::TrySendError, }; use arrow_array::RecordBatch; diff --git a/src/main.rs b/src/main.rs index ca25702a1..569022c42 100644 --- a/src/main.rs +++ b/src/main.rs @@ -20,8 +20,8 @@ use std::process::exit; #[cfg(feature = "kafka")] use parseable::connectors; use parseable::{ - banner, metrics, option::Mode, parseable::PARSEABLE, rbac, storage, IngestServer, - ParseableServer, QueryServer, Server, + IngestServer, ParseableServer, QueryServer, Server, banner, metrics, option::Mode, + parseable::PARSEABLE, rbac, storage, }; use tokio::signal::ctrl_c; use tokio::sync::oneshot; @@ -29,7 +29,7 @@ use tracing::Level; use tracing::{info, warn}; use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::util::SubscriberInitExt; -use tracing_subscriber::{fmt, EnvFilter, Registry}; +use tracing_subscriber::{EnvFilter, Registry, fmt}; #[actix_web::main] async fn main() -> anyhow::Result<()> { @@ -40,11 +40,15 @@ async fn main() -> anyhow::Result<()> { Mode::Query => Box::new(QueryServer), Mode::Ingest => Box::new(IngestServer), Mode::Index => { - println!("Indexing is an enterprise feature. Check out https://www.parseable.com/pricing to know more!"); + println!( + "Indexing is an enterprise feature. Check out https://www.parseable.com/pricing to know more!" + ); exit(0) } Mode::Prism => { - println!("Prism is an enterprise feature. Check out https://www.parseable.com/pricing to know more!"); + println!( + "Prism is an enterprise feature. Check out https://www.parseable.com/pricing to know more!" + ); exit(0) } Mode::All => Box::new(Server), @@ -122,7 +126,7 @@ pub async fn block_until_shutdown_signal() { #[cfg(unix)] /// Asynchronously blocks until a shutdown signal is received pub async fn block_until_shutdown_signal() { - use tokio::signal::unix::{signal, SignalKind}; + use tokio::signal::unix::{SignalKind, signal}; let mut sigterm = signal(SignalKind::terminate()).expect("Failed to create SIGTERM signal handler"); diff --git a/src/metadata.rs b/src/metadata.rs index a5014d4dc..1b150f5c9 100644 --- a/src/metadata.rs +++ b/src/metadata.rs @@ -29,8 +29,8 @@ use crate::metrics::{ EVENTS_INGESTED, EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_INGESTED, LIFETIME_EVENTS_INGESTED_SIZE, }; -use crate::storage::retention::Retention; use crate::storage::StreamType; +use crate::storage::retention::Retention; pub fn update_stats( stream_name: &str, diff --git a/src/metrics/prom_utils.rs b/src/metrics/prom_utils.rs index d76a0ac5a..23cffc53b 100644 --- a/src/metrics/prom_utils.rs +++ b/src/metrics/prom_utils.rs @@ -16,12 +16,12 @@ * */ +use crate::INTRA_CLUSTER_CLIENT; use crate::handlers::http::base_path_without_preceding_slash; use crate::handlers::http::ingest::PostError; use crate::handlers::http::modal::Metadata; use crate::option::Mode; use crate::parseable::PARSEABLE; -use crate::INTRA_CLUSTER_CLIENT; use actix_web::http::header; use chrono::NaiveDateTime; use chrono::Utc; diff --git a/src/migration/metadata_migration.rs b/src/migration/metadata_migration.rs index c0c2c42af..9ee38708b 100644 --- a/src/migration/metadata_migration.rs +++ b/src/migration/metadata_migration.rs @@ -17,7 +17,7 @@ */ use rand::distributions::DistString; -use serde_json::{json, Map, Value as JsonValue}; +use serde_json::{Map, Value as JsonValue, json}; use crate::parseable::PARSEABLE; diff --git a/src/migration/mod.rs b/src/migration/mod.rs index e41b11117..a77cf5802 100644 --- a/src/migration/mod.rs +++ b/src/migration/mod.rs @@ -31,13 +31,13 @@ use serde_json::Value; use tracing::warn; use crate::{ - metadata::{load_daily_metrics, update_data_type_time_partition, LogStreamMetadata}, + metadata::{LogStreamMetadata, load_daily_metrics, update_data_type_time_partition}, metrics::fetch_stats_from_storage, option::Mode, - parseable::{Parseable, PARSEABLE}, + parseable::{PARSEABLE, Parseable}, storage::{ - object_storage::{parseable_json_path, schema_path, stream_json_path}, ObjectStorage, ObjectStoreFormat, PARSEABLE_METADATA_FILE_NAME, + object_storage::{parseable_json_path, schema_path, stream_json_path}, }, }; diff --git a/src/migration/stream_metadata_migration.rs b/src/migration/stream_metadata_migration.rs index cab98258a..e3ddaac62 100644 --- a/src/migration/stream_metadata_migration.rs +++ b/src/migration/stream_metadata_migration.rs @@ -19,7 +19,7 @@ use std::collections::HashMap; -use serde_json::{json, Value}; +use serde_json::{Value, json}; use crate::{ catalog::snapshot::CURRENT_SNAPSHOT_VERSION, handlers::http::cluster::INTERNAL_STREAM_NAME, diff --git a/src/option.rs b/src/option.rs index 830037d80..23415ffd8 100644 --- a/src/option.rs +++ b/src/option.rs @@ -31,7 +31,9 @@ pub enum Mode { } #[derive(Debug, thiserror::Error)] -#[error("Starting Standalone Mode is not permitted when Distributed Mode is enabled. Please restart the server with Distributed Mode enabled.")] +#[error( + "Starting Standalone Mode is not permitted when Distributed Mode is enabled. Please restart the server with Distributed Mode enabled." +)] pub struct StandaloneWithDistributed; impl Mode { diff --git a/src/otel/metrics.rs b/src/otel/metrics.rs index e71aab785..824b3a10b 100644 --- a/src/otel/metrics.rs +++ b/src/otel/metrics.rs @@ -17,8 +17,8 @@ */ use opentelemetry_proto::tonic::metrics::v1::number_data_point::Value as NumberDataPointValue; use opentelemetry_proto::tonic::metrics::v1::{ - exemplar::Value as ExemplarValue, exponential_histogram_data_point::Buckets, metric, Exemplar, - ExponentialHistogram, Gauge, Histogram, Metric, MetricsData, NumberDataPoint, Sum, Summary, + Exemplar, ExponentialHistogram, Gauge, Histogram, Metric, MetricsData, NumberDataPoint, Sum, + Summary, exemplar::Value as ExemplarValue, exponential_histogram_data_point::Buckets, metric, }; use serde_json::{Map, Value}; diff --git a/src/otel/otel_utils.rs b/src/otel/otel_utils.rs index 9cf0e1644..0442795ca 100644 --- a/src/otel/otel_utils.rs +++ b/src/otel/otel_utils.rs @@ -18,7 +18,7 @@ use chrono::DateTime; use opentelemetry_proto::tonic::common::v1::{ - any_value::Value as OtelValue, AnyValue, ArrayValue, KeyValue, KeyValueList, + AnyValue, ArrayValue, KeyValue, KeyValueList, any_value::Value as OtelValue, }; use serde_json::{Map, Value}; diff --git a/src/otel/traces.rs b/src/otel/traces.rs index 12c6e350f..1d209e97b 100644 --- a/src/otel/traces.rs +++ b/src/otel/traces.rs @@ -15,12 +15,12 @@ * along with this program. If not, see . * */ -use opentelemetry_proto::tonic::trace::v1::span::Event; -use opentelemetry_proto::tonic::trace::v1::span::Link; use opentelemetry_proto::tonic::trace::v1::ScopeSpans; use opentelemetry_proto::tonic::trace::v1::Span; use opentelemetry_proto::tonic::trace::v1::Status; use opentelemetry_proto::tonic::trace::v1::TracesData; +use opentelemetry_proto::tonic::trace::v1::span::Event; +use opentelemetry_proto::tonic::trace::v1::span::Link; use serde_json::{Map, Value}; use super::otel_utils::convert_epoch_nano_to_timestamp; diff --git a/src/parseable/mod.rs b/src/parseable/mod.rs index a69bc9881..92c00582a 100644 --- a/src/parseable/mod.rs +++ b/src/parseable/mod.rs @@ -29,8 +29,8 @@ use actix_web::http::header::HeaderMap; use arrow_schema::{Field, Schema}; use bytes::Bytes; use chrono::Utc; -use clap::{error::ErrorKind, Parser}; -use http::{header::CONTENT_TYPE, HeaderName, HeaderValue, StatusCode}; +use clap::{Parser, error::ErrorKind}; +use http::{HeaderName, HeaderValue, StatusCode, header::CONTENT_TYPE}; use once_cell::sync::Lazy; pub use staging::StagingError; use streams::StreamRef; @@ -47,20 +47,20 @@ use crate::{ format::{LogSource, LogSourceEntry}, }, handlers::{ + STREAM_TYPE_KEY, http::{ - cluster::{sync_streams_with_ingestors, INTERNAL_STREAM_NAME}, + cluster::{INTERNAL_STREAM_NAME, sync_streams_with_ingestors}, ingest::PostError, logstream::error::{CreateStreamError, StreamError}, modal::{ingest_server::INGESTOR_META, utils::logstream_utils::PutStreamHeaders}, }, - STREAM_TYPE_KEY, }, metadata::{LogStreamMetadata, SchemaVersion}, option::Mode, - static_schema::{convert_static_schema_to_arrow_schema, StaticSchema}, + static_schema::{StaticSchema, convert_static_schema_to_arrow_schema}, storage::{ - object_storage::parseable_json_path, ObjectStorageError, ObjectStorageProvider, - ObjectStoreFormat, Owner, Permisssion, StreamType, + ObjectStorageError, ObjectStorageProvider, ObjectStoreFormat, Owner, Permisssion, + StreamType, object_storage::parseable_json_path, }, validator, }; @@ -226,11 +226,19 @@ impl Parseable { } if self.storage.name() == "drive" { - return Err(ObjectStorageError::Custom(format!("Could not start the server because directory '{}' contains stale data, please use an empty directory, and restart the server.\n{}", self.storage.get_endpoint(), JOIN_COMMUNITY))); + return Err(ObjectStorageError::Custom(format!( + "Could not start the server because directory '{}' contains stale data, please use an empty directory, and restart the server.\n{}", + self.storage.get_endpoint(), + JOIN_COMMUNITY + ))); } // S3 bucket mode - Err(ObjectStorageError::Custom(format!("Could not start the server because bucket '{}' contains stale data, please use an empty bucket and restart the server.\n{}", self.storage.get_endpoint(), JOIN_COMMUNITY))) + Err(ObjectStorageError::Custom(format!( + "Could not start the server because bucket '{}' contains stale data, please use an empty bucket and restart the server.\n{}", + self.storage.get_endpoint(), + JOIN_COMMUNITY + ))) } pub fn storage(&self) -> Arc { @@ -491,11 +499,11 @@ impl Parseable { .await?; if stream_in_memory_dont_update || stream_in_storage_only_for_query_node { return Err(StreamError::Custom { - msg: format!( - "Logstream {stream_name} already exists, please create a new log stream with unique name" - ), - status: StatusCode::BAD_REQUEST, - }); + msg: format!( + "Logstream {stream_name} already exists, please create a new log stream with unique name" + ), + status: StatusCode::BAD_REQUEST, + }); } if update_stream_flag { @@ -732,9 +740,11 @@ impl Parseable { .any(|field| field.name() == partition) { return Err(CreateStreamError::Custom { - msg: format!("custom partition field {partition} does not exist in the schema for the stream {stream_name}"), - status: StatusCode::BAD_REQUEST, - }); + msg: format!( + "custom partition field {partition} does not exist in the schema for the stream {stream_name}" + ), + status: StatusCode::BAD_REQUEST, + }); } } @@ -855,11 +865,11 @@ pub fn validate_static_schema( if body.is_empty() { return Err(CreateStreamError::Custom { - msg: format!( - "Please provide schema in the request body for static schema logstream {stream_name}" - ), - status: StatusCode::BAD_REQUEST, - }); + msg: format!( + "Please provide schema in the request body for static schema logstream {stream_name}" + ), + status: StatusCode::BAD_REQUEST, + }); } let static_schema: StaticSchema = serde_json::from_slice(body)?; diff --git a/src/parseable/staging/reader.rs b/src/parseable/staging/reader.rs index 777070e75..90dc0004e 100644 --- a/src/parseable/staging/reader.rs +++ b/src/parseable/staging/reader.rs @@ -18,7 +18,7 @@ */ use std::{ - fs::{remove_file, File}, + fs::{File, remove_file}, io::{self, BufReader, Read, Seek, SeekFrom}, path::PathBuf, sync::Arc, @@ -26,7 +26,7 @@ use std::{ }; use arrow_array::{RecordBatch, TimestampMillisecondArray}; -use arrow_ipc::{reader::StreamReader, root_as_message_unchecked, MessageHeader}; +use arrow_ipc::{MessageHeader, reader::StreamReader, root_as_message_unchecked}; use arrow_schema::Schema; use byteorder::{LittleEndian, ReadBytesExt}; use itertools::kmerge_by; @@ -338,23 +338,23 @@ mod tests { }; use arrow_array::{ - cast::AsArray, types::Int64Type, Array, Float64Array, Int32Array, Int64Array, RecordBatch, - StringArray, + Array, Float64Array, Int32Array, Int64Array, RecordBatch, StringArray, cast::AsArray, + types::Int64Type, }; use arrow_ipc::writer::{ - write_message, DictionaryTracker, IpcDataGenerator, IpcWriteOptions, StreamWriter, + DictionaryTracker, IpcDataGenerator, IpcWriteOptions, StreamWriter, write_message, }; use arrow_schema::{DataType, Field, Schema}; use chrono::Utc; use temp_dir::TempDir; use crate::{ + OBJECT_STORE_DATA_GRANULARITY, parseable::staging::{ reader::{MergedReverseRecordReader, OffsetReader}, writer::DiskWriter, }, utils::time::TimeRange, - OBJECT_STORE_DATA_GRANULARITY, }; use super::get_reverse_reader; diff --git a/src/parseable/staging/writer.rs b/src/parseable/staging/writer.rs index 6397e13e9..af97435f4 100644 --- a/src/parseable/staging/writer.rs +++ b/src/parseable/staging/writer.rs @@ -162,8 +162,7 @@ impl MemWriter { fn concat_records(schema: &Arc, record: &[RecordBatch]) -> RecordBatch { let records = record.iter().map(|x| adapt_batch(schema, x)).collect_vec(); - let record = concat_batches(schema, records.iter()).unwrap(); - record + concat_batches(schema, records.iter()).unwrap() } #[derive(Debug, Default)] diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index 28cceefcd..38e30e692 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -19,7 +19,7 @@ use std::{ collections::{HashMap, HashSet}, - fs::{self, remove_file, write, File, OpenOptions}, + fs::{self, File, OpenOptions, remove_file, write}, num::NonZeroU32, path::{Path, PathBuf}, sync::{Arc, Mutex, RwLock}, @@ -34,7 +34,7 @@ use itertools::Itertools; use parquet::{ arrow::ArrowWriter, basic::Encoding, - file::{properties::WriterProperties, FOOTER_SIZE}, + file::{FOOTER_SIZE, properties::WriterProperties}, format::SortingColumn, schema::types::ColumnPath, }; @@ -43,27 +43,27 @@ use tokio::task::JoinSet; use tracing::{error, info, trace, warn}; use crate::{ + LOCK_EXPECT, OBJECT_STORE_DATA_GRANULARITY, cli::Options, event::{ - format::{LogSource, LogSourceEntry}, DEFAULT_TIMESTAMP_KEY, + format::{LogSource, LogSourceEntry}, }, handlers::http::modal::{ingest_server::INGESTOR_META, query_server::QUERIER_META}, metadata::{LogStreamMetadata, SchemaVersion}, metrics, option::Mode, - storage::{object_storage::to_bytes, retention::Retention, StreamType}, + storage::{StreamType, object_storage::to_bytes, retention::Retention}, utils::time::{Minute, TimeRange}, - LOCK_EXPECT, OBJECT_STORE_DATA_GRANULARITY, }; use super::{ + ARROW_FILE_EXTENSION, LogStream, staging::{ + StagingError, reader::{MergedRecordReader, MergedReverseRecordReader}, writer::{DiskWriter, Writer}, - StagingError, }, - LogStream, ARROW_FILE_EXTENSION, }; const INPROCESS_DIR_PREFIX: &str = "processing_"; @@ -1468,9 +1468,9 @@ mod tests { assert!(result.is_some()); let parquet_path = result.unwrap(); assert_eq!( - parquet_path.file_name().unwrap().to_str().unwrap(), - "date=2020-01-21.hour=10.minute=30.key1=value1.key2=value2.ee529ffc8e76.data.random123.parquet" - ); + parquet_path.file_name().unwrap().to_str().unwrap(), + "date=2020-01-21.hour=10.minute=30.key1=value1.key2=value2.ee529ffc8e76.data.random123.parquet" + ); } #[test] diff --git a/src/prism/home/mod.rs b/src/prism/home/mod.rs index 4862fcb00..ac7e5ce4f 100644 --- a/src/prism/home/mod.rs +++ b/src/prism/home/mod.rs @@ -27,14 +27,14 @@ use serde::Serialize; use tracing::error; use crate::{ - alerts::{get_alerts_summary, AlertError, AlertsSummary, ALERTS}, - correlation::{CorrelationError, CORRELATIONS}, + alerts::{ALERTS, AlertError, AlertsSummary, get_alerts_summary}, + correlation::{CORRELATIONS, CorrelationError}, event::format::LogSource, handlers::http::{cluster::fetch_daily_stats, logstream::error::StreamError}, parseable::PARSEABLE, - rbac::{map::SessionKey, role::Action, Users}, + rbac::{Users, map::SessionKey, role::Action}, stats::Stats, - storage::{ObjectStorageError, ObjectStoreFormat, StreamType, STREAM_ROOT_DIRECTORY}, + storage::{ObjectStorageError, ObjectStoreFormat, STREAM_ROOT_DIRECTORY, StreamType}, users::{dashboards::DASHBOARDS, filters::FILTERS}, }; diff --git a/src/prism/logstream/mod.rs b/src/prism/logstream/mod.rs index 7b3e0ee49..3150b1fdb 100644 --- a/src/prism/logstream/mod.rs +++ b/src/prism/logstream/mod.rs @@ -26,23 +26,23 @@ use serde::{Deserialize, Serialize}; use tracing::warn; use crate::{ + LOCK_EXPECT, handlers::http::{ cluster::{ fetch_stats_from_ingestors, - utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats}, + utils::{IngestionStats, QueriedStats, StorageStats, merge_quried_stats}, }, logstream::error::StreamError, - query::{update_schema_when_distributed, QueryError}, + query::{QueryError, update_schema_when_distributed}, }, hottier::{HotTierError, HotTierManager, StreamHotTier}, - parseable::{StreamNotFound, PARSEABLE}, - query::{error::ExecuteError, CountsRequest, CountsResponse}, - rbac::{map::SessionKey, role::Action, Users}, + parseable::{PARSEABLE, StreamNotFound}, + query::{CountsRequest, CountsResponse, error::ExecuteError}, + rbac::{Users, map::SessionKey, role::Action}, stats, - storage::{retention::Retention, StreamInfo, StreamType}, + storage::{StreamInfo, StreamType, retention::Retention}, utils::time::TimeParseError, validator::error::HotTierValidationError, - LOCK_EXPECT, }; #[derive(Serialize)] diff --git a/src/query/listing_table_builder.rs b/src/query/listing_table_builder.rs index 35cf00cbc..9ca484ead 100644 --- a/src/query/listing_table_builder.rs +++ b/src/query/listing_table_builder.rs @@ -27,13 +27,13 @@ use datafusion::{ error::DataFusionError, logical_expr::col, }; -use futures_util::{stream::FuturesUnordered, Future, TryStreamExt}; +use futures_util::{Future, TryStreamExt, stream::FuturesUnordered}; use itertools::Itertools; -use object_store::{path::Path, ObjectMeta, ObjectStore}; +use object_store::{ObjectMeta, ObjectStore, path::Path}; use crate::{ - event::DEFAULT_TIMESTAMP_KEY, storage::ObjectStorage, utils::time::TimeRange, - OBJECT_STORE_DATA_GRANULARITY, + OBJECT_STORE_DATA_GRANULARITY, event::DEFAULT_TIMESTAMP_KEY, storage::ObjectStorage, + utils::time::TimeRange, }; use super::PartialTimeFilter; @@ -168,9 +168,11 @@ impl ListingTableBuilder { return Ok(None); } - let file_sort_order = vec![vec![time_partition - .map_or_else(|| col(DEFAULT_TIMESTAMP_KEY), col) - .sort(true, false)]]; + let file_sort_order = vec![vec![ + time_partition + .map_or_else(|| col(DEFAULT_TIMESTAMP_KEY), col) + .sort(true, false), + ]]; let file_format = ParquetFormat::default().with_enable_pruning(true); let listing_options = ListingOptions::new(Arc::new(file_format)) .with_file_extension(".parquet") diff --git a/src/query/mod.rs b/src/query/mod.rs index e624c213c..b9aca94c9 100644 --- a/src/query/mod.rs +++ b/src/query/mod.rs @@ -40,7 +40,7 @@ use itertools::Itertools; use once_cell::sync::Lazy; use relative_path::RelativePathBuf; use serde::{Deserialize, Serialize}; -use serde_json::{json, Value}; +use serde_json::{Value, json}; use std::ops::Bound; use std::sync::Arc; use stream_schema_provider::collect_manifest_files; @@ -50,12 +50,12 @@ use tokio::runtime::Runtime; use self::error::ExecuteError; use self::stream_schema_provider::GlobalSchemaProvider; pub use self::stream_schema_provider::PartialTimeFilter; -use crate::alerts::alerts_utils::get_filter_string; use crate::alerts::Conditions; +use crate::alerts::alerts_utils::get_filter_string; +use crate::catalog::Snapshot as CatalogSnapshot; use crate::catalog::column::{Int64Type, TypedStatistics}; use crate::catalog::manifest::Manifest; use crate::catalog::snapshot::Snapshot; -use crate::catalog::Snapshot as CatalogSnapshot; use crate::event; use crate::handlers::http::query::QueryError; use crate::option::Mode; @@ -242,9 +242,11 @@ impl Query { ); LogicalPlan::Explain(Explain { verbose: plan.verbose, - stringified_plans: vec![transformed - .data - .to_stringified(PlanType::InitialLogicalPlan)], + stringified_plans: vec![ + transformed + .data + .to_stringified(PlanType::InitialLogicalPlan), + ], plan: Arc::new(transformed.data), schema: plan.schema, logical_optimization_succeeded: plan.logical_optimization_succeeded, @@ -457,20 +459,35 @@ impl CountsRequest { let date_bin = if dur.num_minutes() <= 60 * 10 { // date_bin 1 minute - format!("CAST(DATE_BIN('1 minute', \"{}\".p_timestamp, TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as start_time, DATE_BIN('1 minute', p_timestamp, TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1 minute' as end_time", self.stream) + format!( + "CAST(DATE_BIN('1 minute', \"{}\".p_timestamp, TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as start_time, DATE_BIN('1 minute', p_timestamp, TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1 minute' as end_time", + self.stream + ) } else if dur.num_minutes() > 60 * 10 && dur.num_minutes() < 60 * 240 { // date_bin 1 hour - format!("CAST(DATE_BIN('1 hour', \"{}\".p_timestamp, TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as start_time, DATE_BIN('1 hour', p_timestamp, TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1 hour' as end_time", self.stream) + format!( + "CAST(DATE_BIN('1 hour', \"{}\".p_timestamp, TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as start_time, DATE_BIN('1 hour', p_timestamp, TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1 hour' as end_time", + self.stream + ) } else { // date_bin 1 day - format!("CAST(DATE_BIN('1 day', \"{}\".p_timestamp, TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as start_time, DATE_BIN('1 day', p_timestamp, TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1 day' as end_time", self.stream) + format!( + "CAST(DATE_BIN('1 day', \"{}\".p_timestamp, TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as start_time, DATE_BIN('1 day', p_timestamp, TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1 day' as end_time", + self.stream + ) }; let query = if let Some(conditions) = &count_conditions.conditions { let f = get_filter_string(conditions).map_err(QueryError::CustomError)?; - format!("SELECT {date_bin}, COUNT(*) as count FROM \"{}\" WHERE {} GROUP BY end_time,start_time ORDER BY end_time",self.stream,f) + format!( + "SELECT {date_bin}, COUNT(*) as count FROM \"{}\" WHERE {} GROUP BY end_time,start_time ORDER BY end_time", + self.stream, f + ) } else { - format!("SELECT {date_bin}, COUNT(*) as count FROM \"{}\" GROUP BY end_time,start_time ORDER BY end_time",self.stream) + format!( + "SELECT {date_bin}, COUNT(*) as count FROM \"{}\" GROUP BY end_time,start_time ORDER BY end_time", + self.stream + ) }; Ok(query) } diff --git a/src/query/stream_schema_provider.rs b/src/query/stream_schema_provider.rs index d015bc71d..14274f13b 100644 --- a/src/query/stream_schema_provider.rs +++ b/src/query/stream_schema_provider.rs @@ -25,38 +25,38 @@ use chrono::{DateTime, NaiveDateTime, TimeDelta, Timelike, Utc}; use datafusion::{ catalog::{SchemaProvider, Session}, common::{ + Constraints, ToDFSchema, stats::Precision, tree_node::{TreeNode, TreeNodeRecursion}, - Constraints, ToDFSchema, }, datasource::{ - file_format::{parquet::ParquetFormat, FileFormat}, + MemTable, TableProvider, + file_format::{FileFormat, parquet::ParquetFormat}, listing::PartitionedFile, physical_plan::FileScanConfig, - MemTable, TableProvider, }, error::{DataFusionError, Result as DataFusionResult}, execution::{context::SessionState, object_store::ObjectStoreUrl}, logical_expr::{ - utils::conjunction, BinaryExpr, Operator, TableProviderFilterPushDown, TableType, + BinaryExpr, Operator, TableProviderFilterPushDown, TableType, utils::conjunction, }, - physical_expr::{create_physical_expr, expressions::col, LexOrdering, PhysicalSortExpr}, - physical_plan::{empty::EmptyExec, union::UnionExec, ExecutionPlan, Statistics}, + physical_expr::{LexOrdering, PhysicalSortExpr, create_physical_expr, expressions::col}, + physical_plan::{ExecutionPlan, Statistics, empty::EmptyExec, union::UnionExec}, prelude::Expr, scalar::ScalarValue, }; -use futures_util::{stream::FuturesOrdered, StreamExt, TryFutureExt, TryStreamExt}; +use futures_util::{StreamExt, TryFutureExt, TryStreamExt, stream::FuturesOrdered}; use itertools::Itertools; -use object_store::{path::Path, ObjectStore}; +use object_store::{ObjectStore, path::Path}; use relative_path::RelativePathBuf; use url::Url; use crate::{ catalog::{ + ManifestFile, Snapshot as CatalogSnapshot, column::{Column, TypedStatistics}, manifest::{File, Manifest}, snapshot::{ManifestItem, Snapshot}, - ManifestFile, Snapshot as CatalogSnapshot, }, event::DEFAULT_TIMESTAMP_KEY, hottier::HotTierManager, @@ -1016,7 +1016,7 @@ mod tests { use crate::catalog::snapshot::ManifestItem; - use super::{extract_timestamp_bound, is_overlapping_query, PartialTimeFilter}; + use super::{PartialTimeFilter, extract_timestamp_bound, is_overlapping_query}; fn datetime_min(year: i32, month: u32, day: u32) -> DateTime { NaiveDate::from_ymd_opt(year, month, day) diff --git a/src/rbac/map.rs b/src/rbac/map.rs index 23ee06440..7fd5ed703 100644 --- a/src/rbac/map.rs +++ b/src/rbac/map.rs @@ -24,7 +24,7 @@ use std::{collections::HashMap, sync::Mutex}; use super::Response; use super::{ - role::{model::DefaultPrivilege, Action, Permission, RoleBuilder}, + role::{Action, Permission, RoleBuilder, model::DefaultPrivilege}, user, }; use chrono::{DateTime, Utc}; diff --git a/src/rbac/user.rs b/src/rbac/user.rs index 5fba8975a..3a651f412 100644 --- a/src/rbac/user.rs +++ b/src/rbac/user.rs @@ -19,8 +19,8 @@ use std::collections::HashSet; use argon2::{ - password_hash::{rand_core::OsRng, PasswordHasher, SaltString}, Argon2, PasswordHash, PasswordVerifier, + password_hash::{PasswordHasher, SaltString, rand_core::OsRng}, }; use rand::distributions::{Alphanumeric, DistString}; @@ -133,12 +133,10 @@ pub fn verify(password_hash: &str, password: &str) -> bool { fn gen_hash(password: &str) -> String { let salt = SaltString::generate(&mut OsRng); let argon2 = Argon2::default(); - let hashcode = argon2 + argon2 .hash_password(password.as_bytes(), &salt) .expect("can hash random alphanumeric") - .to_string(); - - hashcode + .to_string() } pub struct PassCode { diff --git a/src/rbac/utils.rs b/src/rbac/utils.rs index df9dd52bf..f57d809f4 100644 --- a/src/rbac/utils.rs +++ b/src/rbac/utils.rs @@ -22,10 +22,10 @@ use url::Url; use crate::{parseable::PARSEABLE, rbac::map::read_user_groups}; use super::{ + Users, UsersPrism, map::roles, role::model::DefaultPrivilege, user::{User, UserType}, - Users, UsersPrism, }; pub fn to_prism_user(user: &User) -> UsersPrism { diff --git a/src/response.rs b/src/response.rs index bd02ababe..8823b032a 100644 --- a/src/response.rs +++ b/src/response.rs @@ -19,7 +19,7 @@ use crate::{handlers::http::query::QueryError, utils::arrow::record_batches_to_json}; use datafusion::arrow::record_batch::RecordBatch; use itertools::Itertools; -use serde_json::{json, Value}; +use serde_json::{Value, json}; use tracing::info; pub struct QueryResponse { diff --git a/src/stats.rs b/src/stats.rs index 7bacdfd0a..eed0d9703 100644 --- a/src/stats.rs +++ b/src/stats.rs @@ -20,9 +20,9 @@ use std::collections::HashMap; use std::sync::Arc; use once_cell::sync::Lazy; +use prometheus::IntGaugeVec; use prometheus::core::Collector; use prometheus::proto::MetricFamily; -use prometheus::IntGaugeVec; use tracing::warn; use crate::metrics::{ diff --git a/src/storage/azure_blob.rs b/src/storage/azure_blob.rs index 1dd3a4da9..34933a98d 100644 --- a/src/storage/azure_blob.rs +++ b/src/storage/azure_blob.rs @@ -32,13 +32,13 @@ use datafusion::{ runtime_env::RuntimeEnvBuilder, }, }; -use futures::{stream::FuturesUnordered, StreamExt, TryStreamExt}; +use futures::{StreamExt, TryStreamExt, stream::FuturesUnordered}; use object_store::{ + BackoffConfig, ClientOptions, ObjectMeta, ObjectStore, PutPayload, RetryConfig, azure::{MicrosoftAzure, MicrosoftAzureBuilder}, buffered::BufReader, limit::LimitStore, path::Path as StorePath, - BackoffConfig, ClientOptions, ObjectMeta, ObjectStore, PutPayload, RetryConfig, }; use relative_path::{RelativePath, RelativePathBuf}; use tokio::{fs::OpenOptions, io::AsyncReadExt}; @@ -47,15 +47,15 @@ use url::Url; use crate::{ handlers::http::users::USERS_ROOT_DIR, - metrics::storage::{azureblob::REQUEST_RESPONSE_TIME, StorageMetrics}, + metrics::storage::{StorageMetrics, azureblob::REQUEST_RESPONSE_TIME}, parseable::LogStream, }; use super::{ - metrics_layer::MetricLayer, object_storage::parseable_json_path, to_object_store_path, - ObjectStorage, ObjectStorageError, ObjectStorageProvider, CONNECT_TIMEOUT_SECS, - MIN_MULTIPART_UPLOAD_SIZE, PARSEABLE_ROOT_DIRECTORY, REQUEST_TIMEOUT_SECS, SCHEMA_FILE_NAME, - STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY, + CONNECT_TIMEOUT_SECS, MIN_MULTIPART_UPLOAD_SIZE, ObjectStorage, ObjectStorageError, + ObjectStorageProvider, PARSEABLE_ROOT_DIRECTORY, REQUEST_TIMEOUT_SECS, SCHEMA_FILE_NAME, + STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY, metrics_layer::MetricLayer, + object_storage::parseable_json_path, to_object_store_path, }; #[derive(Debug, Clone, clap::Args)] diff --git a/src/storage/gcs.rs b/src/storage/gcs.rs index 292bf3f02..23bae4710 100644 --- a/src/storage/gcs.rs +++ b/src/storage/gcs.rs @@ -25,7 +25,7 @@ use std::{ use crate::{ handlers::http::users::USERS_ROOT_DIR, - metrics::storage::{gcs::REQUEST_RESPONSE_TIME, StorageMetrics}, + metrics::storage::{StorageMetrics, gcs::REQUEST_RESPONSE_TIME}, parseable::LogStream, }; use async_trait::async_trait; @@ -37,23 +37,23 @@ use datafusion::{ runtime_env::RuntimeEnvBuilder, }, }; -use futures::{stream::FuturesUnordered, StreamExt, TryStreamExt}; +use futures::{StreamExt, TryStreamExt, stream::FuturesUnordered}; use object_store::{ + BackoffConfig, ClientOptions, ObjectMeta, ObjectStore, PutPayload, RetryConfig, buffered::BufReader, gcp::{GoogleCloudStorage, GoogleCloudStorageBuilder}, limit::LimitStore, path::Path as StorePath, - BackoffConfig, ClientOptions, ObjectMeta, ObjectStore, PutPayload, RetryConfig, }; use relative_path::{RelativePath, RelativePathBuf}; use tokio::{fs::OpenOptions, io::AsyncReadExt}; use tracing::{error, info}; use super::{ - metrics_layer::MetricLayer, object_storage::parseable_json_path, to_object_store_path, - ObjectStorage, ObjectStorageError, ObjectStorageProvider, CONNECT_TIMEOUT_SECS, - MIN_MULTIPART_UPLOAD_SIZE, PARSEABLE_ROOT_DIRECTORY, REQUEST_TIMEOUT_SECS, SCHEMA_FILE_NAME, - STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY, + CONNECT_TIMEOUT_SECS, MIN_MULTIPART_UPLOAD_SIZE, ObjectStorage, ObjectStorageError, + ObjectStorageProvider, PARSEABLE_ROOT_DIRECTORY, REQUEST_TIMEOUT_SECS, SCHEMA_FILE_NAME, + STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY, metrics_layer::MetricLayer, + object_storage::parseable_json_path, to_object_store_path, }; #[derive(Debug, Clone, clap::Args)] diff --git a/src/storage/localfs.rs b/src/storage/localfs.rs index 57a7e5a4c..8e8e99541 100644 --- a/src/storage/localfs.rs +++ b/src/storage/localfs.rs @@ -27,8 +27,8 @@ use async_trait::async_trait; use bytes::Bytes; use datafusion::{datasource::listing::ListingTableUrl, execution::runtime_env::RuntimeEnvBuilder}; use fs_extra::file::CopyOptions; -use futures::{stream::FuturesUnordered, TryStreamExt}; -use object_store::{buffered::BufReader, ObjectMeta}; +use futures::{TryStreamExt, stream::FuturesUnordered}; +use object_store::{ObjectMeta, buffered::BufReader}; use relative_path::{RelativePath, RelativePathBuf}; use tokio::{ fs::{self, DirEntry, OpenOptions}, @@ -38,14 +38,14 @@ use tokio_stream::wrappers::ReadDirStream; use crate::{ handlers::http::users::USERS_ROOT_DIR, - metrics::storage::{azureblob::REQUEST_RESPONSE_TIME, StorageMetrics}, + metrics::storage::{StorageMetrics, azureblob::REQUEST_RESPONSE_TIME}, option::validation, parseable::LogStream, storage::SETTINGS_ROOT_DIRECTORY, }; use super::{ - ObjectStorage, ObjectStorageError, ObjectStorageProvider, ALERTS_ROOT_DIRECTORY, + ALERTS_ROOT_DIRECTORY, ObjectStorage, ObjectStorageError, ObjectStorageProvider, PARSEABLE_ROOT_DIRECTORY, SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY, }; diff --git a/src/storage/metrics_layer.rs b/src/storage/metrics_layer.rs index 9cf125803..cfaaeb6d2 100644 --- a/src/storage/metrics_layer.rs +++ b/src/storage/metrics_layer.rs @@ -24,10 +24,10 @@ use std::{ use async_trait::async_trait; use bytes::Bytes; -use futures_util::{stream::BoxStream, Stream, StreamExt}; +use futures_util::{Stream, StreamExt, stream::BoxStream}; use object_store::{ - path::Path, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore, - PutMultipartOpts, PutOptions, PutPayload, PutResult, Result as ObjectStoreResult, + GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore, PutMultipartOpts, + PutOptions, PutPayload, PutResult, Result as ObjectStoreResult, path::Path, }; /* NOTE: Keeping these imports as they would make migration to object_store 0.10.0 easier diff --git a/src/storage/mod.rs b/src/storage/mod.rs index d160760d5..ea31e9bee 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -52,7 +52,7 @@ pub use localfs::FSConfig; pub use object_storage::{ObjectStorage, ObjectStorageProvider}; pub use s3::S3Config; pub use store_metadata::{ - put_remote_metadata, put_staging_metadata, resolve_parseable_metadata, StorageMetadata, + StorageMetadata, put_remote_metadata, put_staging_metadata, resolve_parseable_metadata, }; // metadata file names in a Stream prefix diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index ee385b7c1..b21251cc2 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -22,8 +22,8 @@ use async_trait::async_trait; use bytes::Bytes; use chrono::{DateTime, Utc}; use datafusion::{datasource::listing::ListingTableUrl, execution::runtime_env::RuntimeEnvBuilder}; -use object_store::buffered::BufReader; use object_store::ObjectMeta; +use object_store::buffered::BufReader; use once_cell::sync::OnceCell; use relative_path::RelativePath; use relative_path::RelativePathBuf; @@ -31,7 +31,7 @@ use std::collections::BTreeMap; use std::collections::HashMap; use std::collections::HashSet; use std::fmt::Debug; -use std::fs::{remove_file, File}; +use std::fs::{File, remove_file}; use std::num::NonZeroU32; use std::path::Path; use std::sync::Arc; @@ -43,8 +43,8 @@ use tracing::info; use tracing::{error, warn}; use ulid::Ulid; -use crate::alerts::target::Target; use crate::alerts::AlertConfig; +use crate::alerts::target::Target; use crate::catalog::{self, manifest::Manifest, snapshot::Snapshot}; use crate::correlation::{CorrelationConfig, CorrelationError}; use crate::event::format::LogSource; @@ -60,15 +60,15 @@ use crate::option::Mode; use crate::parseable::LogStream; use crate::parseable::PARSEABLE; use crate::stats::FullStats; -use crate::storage::field_stats::calculate_field_stats; use crate::storage::SETTINGS_ROOT_DIRECTORY; use crate::storage::TARGETS_ROOT_DIRECTORY; +use crate::storage::field_stats::calculate_field_stats; use crate::utils::DATASET_STATS_STREAM_NAME; use super::{ - retention::Retention, ObjectStorageError, ObjectStoreFormat, StorageMetadata, - ALERTS_ROOT_DIRECTORY, MANIFEST_FILE, PARSEABLE_METADATA_FILE_NAME, PARSEABLE_ROOT_DIRECTORY, - SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY, + ALERTS_ROOT_DIRECTORY, MANIFEST_FILE, ObjectStorageError, ObjectStoreFormat, + PARSEABLE_METADATA_FILE_NAME, PARSEABLE_ROOT_DIRECTORY, SCHEMA_FILE_NAME, + STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY, StorageMetadata, retention::Retention, }; pub trait ObjectStorageProvider: StorageMetrics + std::fmt::Debug + Send + Sync { diff --git a/src/storage/retention.rs b/src/storage/retention.rs index ce67808be..008452138 100644 --- a/src/storage/retention.rs +++ b/src/storage/retention.rs @@ -172,7 +172,7 @@ mod action { use crate::catalog::remove_manifest_from_snapshot; use crate::parseable::PARSEABLE; use chrono::{Days, NaiveDate, Utc}; - use futures::{stream::FuturesUnordered, StreamExt}; + use futures::{StreamExt, stream::FuturesUnordered}; use itertools::Itertools; use relative_path::RelativePathBuf; use tracing::{error, info}; diff --git a/src/storage/s3.rs b/src/storage/s3.rs index 83a98a6e1..1f150c4de 100644 --- a/src/storage/s3.rs +++ b/src/storage/s3.rs @@ -34,13 +34,13 @@ use datafusion::{ runtime_env::RuntimeEnvBuilder, }, }; -use futures::{stream::FuturesUnordered, StreamExt, TryStreamExt}; +use futures::{StreamExt, TryStreamExt, stream::FuturesUnordered}; use object_store::{ + BackoffConfig, ClientOptions, ObjectMeta, ObjectStore, PutPayload, RetryConfig, aws::{AmazonS3, AmazonS3Builder, AmazonS3ConfigKey, Checksum}, buffered::BufReader, limit::LimitStore, path::Path as StorePath, - BackoffConfig, ClientOptions, ObjectMeta, ObjectStore, PutPayload, RetryConfig, }; use relative_path::{RelativePath, RelativePathBuf}; use tokio::{fs::OpenOptions, io::AsyncReadExt}; @@ -48,15 +48,15 @@ use tracing::{error, info}; use crate::{ handlers::http::users::USERS_ROOT_DIR, - metrics::storage::{azureblob::REQUEST_RESPONSE_TIME, StorageMetrics}, + metrics::storage::{StorageMetrics, azureblob::REQUEST_RESPONSE_TIME}, parseable::LogStream, }; use super::{ - metrics_layer::MetricLayer, object_storage::parseable_json_path, to_object_store_path, - ObjectStorage, ObjectStorageError, ObjectStorageProvider, CONNECT_TIMEOUT_SECS, - MIN_MULTIPART_UPLOAD_SIZE, PARSEABLE_ROOT_DIRECTORY, REQUEST_TIMEOUT_SECS, SCHEMA_FILE_NAME, - STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY, + CONNECT_TIMEOUT_SECS, MIN_MULTIPART_UPLOAD_SIZE, ObjectStorage, ObjectStorageError, + ObjectStorageProvider, PARSEABLE_ROOT_DIRECTORY, REQUEST_TIMEOUT_SECS, SCHEMA_FILE_NAME, + STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY, metrics_layer::MetricLayer, + object_storage::parseable_json_path, to_object_store_path, }; // in bytes diff --git a/src/storage/store_metadata.rs b/src/storage/store_metadata.rs index 31f5283b4..e02abf137 100644 --- a/src/storage/store_metadata.rs +++ b/src/storage/store_metadata.rs @@ -18,7 +18,7 @@ use std::{ collections::HashMap, - fs::{self, create_dir_all, OpenOptions}, + fs::{self, OpenOptions, create_dir_all}, path::PathBuf, }; @@ -57,7 +57,7 @@ pub struct StorageMetadata { pub mode: String, pub staging: PathBuf, pub storage: String, - #[serde(default = "crate::utils::uid::gen")] + #[serde(default = "crate::utils::uid::generate_ulid")] pub deployment_id: uid::Uid, pub users: Vec, pub user_groups: Vec, @@ -76,7 +76,7 @@ impl Default for StorageMetadata { mode: PARSEABLE.storage.name().to_owned(), staging: PARSEABLE.options.staging_dir().to_path_buf(), storage: PARSEABLE.storage.get_endpoint(), - deployment_id: uid::gen(), + deployment_id: uid::generate_ulid(), server_mode: PARSEABLE.options.mode, users: Vec::new(), user_groups: Vec::new(), diff --git a/src/sync.rs b/src/sync.rs index c6ae55c9e..93d643c4f 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -23,11 +23,11 @@ use std::future::Future; use std::panic::AssertUnwindSafe; use tokio::sync::{mpsc, oneshot}; use tokio::task::JoinSet; -use tokio::time::{interval_at, sleep, Duration, Instant}; +use tokio::time::{Duration, Instant, interval_at, sleep}; use tokio::{select, task}; use tracing::{error, info, trace, warn}; -use crate::alerts::{alerts_utils, AlertTask}; +use crate::alerts::{AlertTask, alerts_utils}; use crate::parseable::PARSEABLE; use crate::storage::object_storage::sync_all_streams; use crate::{LOCAL_SYNC_INTERVAL, STORAGE_UPLOAD_INTERVAL}; @@ -311,12 +311,18 @@ pub async fn alert_runtime(mut rx: mpsc::Receiver) -> Result<(), anyh retry_counter = 0; } Err(err) => { - warn!("Error while evaluation- {}\nRetrying after sleeping for 1 minute", err); + warn!( + "Error while evaluation- {}\nRetrying after sleeping for 1 minute", + err + ); sleep_duration = 1; retry_counter += 1; if retry_counter > 3 { - error!("Alert with id {} failed to evaluate after 3 retries with err- {}", id, err); + error!( + "Alert with id {} failed to evaluate after 3 retries with err- {}", + id, err + ); break; } } diff --git a/src/users/filters.rs b/src/users/filters.rs index 0106c47f0..e95f90e53 100644 --- a/src/users/filters.rs +++ b/src/users/filters.rs @@ -25,7 +25,7 @@ use super::TimeFilter; use crate::{ migration::to_bytes, parseable::PARSEABLE, - rbac::{map::SessionKey, Users}, + rbac::{Users, map::SessionKey}, storage::object_storage::filter_path, utils::{get_hash, user_auth_for_datasets, user_auth_for_query}, }; diff --git a/src/utils/actix.rs b/src/utils/actix.rs index 86c4cdd63..bb01fd892 100644 --- a/src/utils/actix.rs +++ b/src/utils/actix.rs @@ -18,9 +18,9 @@ */ use actix_web::{ + Error, FromRequest, HttpRequest, dev::ServiceRequest, error::{ErrorUnauthorized, ErrorUnprocessableEntity}, - Error, FromRequest, HttpRequest, }; use actix_web_httpauth::extractors::basic::BasicAuth; diff --git a/src/utils/arrow/batch_adapter.rs b/src/utils/arrow/batch_adapter.rs index 1c85969c9..0831f93ce 100644 --- a/src/utils/arrow/batch_adapter.rs +++ b/src/utils/arrow/batch_adapter.rs @@ -16,8 +16,8 @@ * */ -use datafusion::arrow::array::new_null_array; use datafusion::arrow::array::ArrayRef; +use datafusion::arrow::array::new_null_array; use datafusion::arrow::datatypes::Schema; use datafusion::arrow::record_batch::RecordBatch; diff --git a/src/utils/arrow/flight.rs b/src/utils/arrow/flight.rs index 9ed5fff42..fd10fb349 100644 --- a/src/utils/arrow/flight.rs +++ b/src/utils/arrow/flight.rs @@ -31,7 +31,7 @@ use arrow_select::concat::concat_batches; use datafusion::logical_expr::BinaryExpr; use datafusion::prelude::Expr; use datafusion::scalar::ScalarValue; -use futures::{stream, TryStreamExt}; +use futures::{TryStreamExt, stream}; use tonic::{Request, Response, Status}; diff --git a/src/utils/human_size.rs b/src/utils/human_size.rs index ef345313c..e94c2e31d 100644 --- a/src/utils/human_size.rs +++ b/src/utils/human_size.rs @@ -1,7 +1,7 @@ use std::str::FromStr; use human_size::{Any, SpecificSize}; -use serde::{de, Deserialize, Deserializer, Serializer}; +use serde::{Deserialize, Deserializer, Serializer, de}; #[derive(Debug, thiserror::Error)] enum ParsingError { diff --git a/src/utils/json/flatten.rs b/src/utils/json/flatten.rs index 11b41beb5..cc3df7635 100644 --- a/src/utils/json/flatten.rs +++ b/src/utils/json/flatten.rs @@ -114,26 +114,26 @@ pub fn validate_custom_partition( // The field should not be null, empty, an object, an array or contain a `.` in the value match field_value { Value::Null => { - return Err(JsonFlattenError::FieldEmptyOrNull(trimmed_field.to_owned())) + return Err(JsonFlattenError::FieldEmptyOrNull(trimmed_field.to_owned())); } Value::String(s) if s.is_empty() => { - return Err(JsonFlattenError::FieldEmptyOrNull(trimmed_field.to_owned())) + return Err(JsonFlattenError::FieldEmptyOrNull(trimmed_field.to_owned())); } Value::Object(_) => { - return Err(JsonFlattenError::FieldIsObject(trimmed_field.to_owned())) + return Err(JsonFlattenError::FieldIsObject(trimmed_field.to_owned())); } Value::Array(_) => { - return Err(JsonFlattenError::FieldIsArray(trimmed_field.to_owned())) + return Err(JsonFlattenError::FieldIsArray(trimmed_field.to_owned())); } Value::String(s) if s.contains('.') => { return Err(JsonFlattenError::FieldContainsPeriod( trimmed_field.to_owned(), - )) + )); } Value::Number(n) if n.is_f64() => { return Err(JsonFlattenError::FieldContainsPeriod( trimmed_field.to_owned(), - )) + )); } _ => {} } @@ -360,8 +360,8 @@ pub fn convert_to_array(flattened: Vec) -> Result String { let now = Utc::now().to_rfc3339(); - let id = get_hash(&now).to_string().split_at(15).0.to_string(); - id + get_hash(&now).to_string().split_at(15).0.to_string() } pub fn extract_datetime(path: &str) -> Option { diff --git a/src/utils/uid.rs b/src/utils/uid.rs index 37f44a6e1..7cc68609e 100644 --- a/src/utils/uid.rs +++ b/src/utils/uid.rs @@ -20,6 +20,6 @@ use ulid::Ulid; pub type Uid = Ulid; -pub fn gen() -> Ulid { +pub fn generate_ulid() -> Ulid { Ulid::new() } diff --git a/src/validator.rs b/src/validator.rs index 9dd5b6d0c..797a14a35 100644 --- a/src/validator.rs +++ b/src/validator.rs @@ -43,10 +43,10 @@ pub fn stream_name( ' ' => { return Err(StreamNameValidationError::NameWhiteSpace( stream_name.to_owned(), - )) + )); } c if !c.is_alphanumeric() && !ALLOWED_SPECIAL_CHARS.contains(&c) => { - return Err(StreamNameValidationError::NameSpecialChar { c }) + return Err(StreamNameValidationError::NameSpecialChar { c }); } _ => {} } @@ -127,7 +127,9 @@ pub mod error { NameSpecialChar { c: char }, #[error("SQL keyword cannot be used as stream name")] SQLKeyword(String), - #[error("The stream {0} is reserved for internal use and cannot be used for user defined streams")] + #[error( + "The stream {0} is reserved for internal use and cannot be used for user defined streams" + )] InternalStream(String), }