Skip to content

Commit 3b939a9

Browse files
committed
refactor: fix cargo lint checks part 2
1 parent 816f19f commit 3b939a9

File tree

14 files changed

+197
-201
lines changed

14 files changed

+197
-201
lines changed

src/handlers/http/cluster/mod.rs

Lines changed: 29 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1073,25 +1073,25 @@ pub fn init_cluster_metrics_schedular() -> Result<(), PostError> {
10731073
.run(move || async {
10741074
let result: Result<(), PostError> = async {
10751075
let cluster_metrics = fetch_cluster_metrics().await;
1076-
if let Ok(metrics) = cluster_metrics {
1077-
if !metrics.is_empty() {
1078-
info!("Cluster metrics fetched successfully from all ingestors");
1079-
if let Ok(metrics_bytes) = serde_json::to_vec(&metrics) {
1080-
if matches!(
1081-
ingest_internal_stream(
1082-
INTERNAL_STREAM_NAME.to_string(),
1083-
bytes::Bytes::from(metrics_bytes),
1084-
)
1085-
.await,
1086-
Ok(())
1087-
) {
1088-
info!("Cluster metrics successfully ingested into internal stream");
1089-
} else {
1090-
error!("Failed to ingest cluster metrics into internal stream");
1091-
}
1076+
if let Ok(metrics) = cluster_metrics
1077+
&& !metrics.is_empty()
1078+
{
1079+
info!("Cluster metrics fetched successfully from all ingestors");
1080+
if let Ok(metrics_bytes) = serde_json::to_vec(&metrics) {
1081+
if matches!(
1082+
ingest_internal_stream(
1083+
INTERNAL_STREAM_NAME.to_string(),
1084+
bytes::Bytes::from(metrics_bytes),
1085+
)
1086+
.await,
1087+
Ok(())
1088+
) {
1089+
info!("Cluster metrics successfully ingested into internal stream");
10921090
} else {
1093-
error!("Failed to serialize cluster metrics");
1091+
error!("Failed to ingest cluster metrics into internal stream");
10941092
}
1093+
} else {
1094+
error!("Failed to serialize cluster metrics");
10951095
}
10961096
}
10971097
Ok(())
@@ -1186,21 +1186,21 @@ async fn get_available_querier() -> Result<QuerierMetadata, QueryError> {
11861186
});
11871187

11881188
// Find the next available querier using round-robin strategy
1189-
if let Some(selected_domain) = select_next_querier(&mut map).await {
1190-
if let Some(status) = map.get_mut(&selected_domain) {
1191-
status.available = false;
1192-
status.last_used = Some(Instant::now());
1193-
return Ok(status.metadata.clone());
1194-
}
1189+
if let Some(selected_domain) = select_next_querier(&mut map).await
1190+
&& let Some(status) = map.get_mut(&selected_domain)
1191+
{
1192+
status.available = false;
1193+
status.last_used = Some(Instant::now());
1194+
return Ok(status.metadata.clone());
11951195
}
11961196

11971197
// If no querier is available, use least-recently-used strategy
1198-
if let Some(selected_domain) = select_least_recently_used_querier(&mut map) {
1199-
if let Some(status) = map.get_mut(&selected_domain) {
1200-
status.available = false;
1201-
status.last_used = Some(Instant::now());
1202-
return Ok(status.metadata.clone());
1203-
}
1198+
if let Some(selected_domain) = select_least_recently_used_querier(&mut map)
1199+
&& let Some(status) = map.get_mut(&selected_domain)
1200+
{
1201+
status.available = false;
1202+
status.last_used = Some(Instant::now());
1203+
return Ok(status.metadata.clone());
12041204
}
12051205

12061206
// If no querier is available, return an error

src/handlers/http/modal/query/querier_logstream.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -76,10 +76,10 @@ pub async fn delete(stream_name: Path<String>) -> Result<impl Responder, StreamE
7676
)
7777
}
7878

79-
if let Some(hot_tier_manager) = HotTierManager::global() {
80-
if hot_tier_manager.check_stream_hot_tier_exists(&stream_name) {
81-
hot_tier_manager.delete_hot_tier(&stream_name).await?;
82-
}
79+
if let Some(hot_tier_manager) = HotTierManager::global()
80+
&& hot_tier_manager.check_stream_hot_tier_exists(&stream_name)
81+
{
82+
hot_tier_manager.delete_hot_tier(&stream_name).await?;
8383
}
8484

8585
let ingestor_metadata: Vec<NodeMetadata> = cluster::get_node_info(NodeType::Ingestor)

src/handlers/http/modal/ssl_acceptor.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -38,17 +38,17 @@ pub fn get_ssl_acceptor(
3838

3939
let mut certs = rustls_pemfile::certs(cert_file).collect::<Result<Vec<_>, _>>()?;
4040
// Load CA certificates from the directory
41-
if let Some(other_cert_dir) = other_certs {
42-
if other_cert_dir.is_dir() {
43-
for entry in fs::read_dir(other_cert_dir)? {
44-
let path = entry.unwrap().path();
41+
if let Some(other_cert_dir) = other_certs
42+
&& other_cert_dir.is_dir()
43+
{
44+
for entry in fs::read_dir(other_cert_dir)? {
45+
let path = entry.unwrap().path();
4546

46-
if path.is_file() {
47-
let other_cert_file = &mut BufReader::new(File::open(&path)?);
48-
let mut other_certs = rustls_pemfile::certs(other_cert_file)
49-
.collect::<Result<Vec<_>, _>>()?;
50-
certs.append(&mut other_certs);
51-
}
47+
if path.is_file() {
48+
let other_cert_file = &mut BufReader::new(File::open(&path)?);
49+
let mut other_certs = rustls_pemfile::certs(other_cert_file)
50+
.collect::<Result<Vec<_>, _>>()?;
51+
certs.append(&mut other_certs);
5252
}
5353
}
5454
}

src/handlers/http/modal/utils/ingest_utils.rs

Lines changed: 23 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -174,34 +174,35 @@ pub fn get_custom_fields_from_header(req: &HttpRequest) -> HashMap<String, Strin
174174
}
175175

176176
let header_name = header_name.as_str();
177-
if header_name.starts_with("x-p-") && !IGNORE_HEADERS.contains(&header_name) {
178-
if let Ok(value) = header_value.to_str() {
179-
let key = header_name.trim_start_matches("x-p-");
180-
if !key.is_empty() {
181-
// Truncate value if it exceeds the maximum length
182-
let truncated_value = if value.len() > MAX_FIELD_VALUE_LENGTH {
183-
warn!(
184-
"Header value for '{}' exceeds maximum length, truncating",
185-
header_name
186-
);
187-
&value[..MAX_FIELD_VALUE_LENGTH]
188-
} else {
189-
value
190-
};
191-
p_custom_fields.insert(key.to_string(), truncated_value.to_string());
192-
} else {
177+
if header_name.starts_with("x-p-")
178+
&& !IGNORE_HEADERS.contains(&header_name)
179+
&& let Ok(value) = header_value.to_str()
180+
{
181+
let key = header_name.trim_start_matches("x-p-");
182+
if !key.is_empty() {
183+
// Truncate value if it exceeds the maximum length
184+
let truncated_value = if value.len() > MAX_FIELD_VALUE_LENGTH {
193185
warn!(
194-
"Ignoring header with empty key after prefix: {}",
186+
"Header value for '{}' exceeds maximum length, truncating",
195187
header_name
196188
);
197-
}
189+
&value[..MAX_FIELD_VALUE_LENGTH]
190+
} else {
191+
value
192+
};
193+
p_custom_fields.insert(key.to_string(), truncated_value.to_string());
194+
} else {
195+
warn!(
196+
"Ignoring header with empty key after prefix: {}",
197+
header_name
198+
);
198199
}
199200
}
200201

201-
if header_name == LOG_SOURCE_KEY {
202-
if let Ok(value) = header_value.to_str() {
203-
p_custom_fields.insert(FORMAT_KEY.to_string(), value.to_string());
204-
}
202+
if header_name == LOG_SOURCE_KEY
203+
&& let Ok(value) = header_value.to_str()
204+
{
205+
p_custom_fields.insert(FORMAT_KEY.to_string(), value.to_string());
205206
}
206207
}
207208

src/hottier.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -588,11 +588,10 @@ impl HotTierManager {
588588
if let (Some(download_date_time), Some(delete_date_time)) = (
589589
extract_datetime(download_file_path.to_str().unwrap()),
590590
extract_datetime(path_to_delete.to_str().unwrap()),
591-
) {
592-
if download_date_time <= delete_date_time {
593-
delete_successful = false;
594-
break 'loop_files;
595-
}
591+
) && download_date_time <= delete_date_time
592+
{
593+
delete_successful = false;
594+
break 'loop_files;
596595
}
597596

598597
fs::write(manifest_file.path(), serde_json::to_vec(&manifest)?).await?;

src/livetail.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,8 @@ impl Stream for ReceiverPipe {
161161
impl Drop for ReceiverPipe {
162162
fn drop(&mut self) {
163163
if let Some(map) = self._ref.upgrade()
164-
&& let Some(pipes) = map.write().unwrap().get_mut(&self.stream)
164+
&& let Ok(mut guard) = map.write()
165+
&& let Some(pipes) = guard.get_mut(&self.stream)
165166
{
166167
pipes.retain(|x| x.id != self.id)
167168
}

src/metadata.rs

Lines changed: 17 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -143,25 +143,23 @@ pub async fn update_data_type_time_partition(
143143
schema: &mut Schema,
144144
time_partition: Option<&String>,
145145
) -> anyhow::Result<()> {
146-
if let Some(time_partition) = time_partition {
147-
if let Ok(time_partition_field) = schema.field_with_name(time_partition) {
148-
if time_partition_field.data_type() != &DataType::Timestamp(TimeUnit::Millisecond, None)
149-
{
150-
let mut fields = schema
151-
.fields()
152-
.iter()
153-
.filter(|field| field.name() != time_partition)
154-
.cloned()
155-
.collect::<Vec<Arc<Field>>>();
156-
let time_partition_field = Arc::new(Field::new(
157-
time_partition,
158-
DataType::Timestamp(TimeUnit::Millisecond, None),
159-
true,
160-
));
161-
fields.push(time_partition_field);
162-
*schema = Schema::new(fields);
163-
}
164-
}
146+
if let Some(time_partition) = time_partition
147+
&& let Ok(time_partition_field) = schema.field_with_name(time_partition)
148+
&& time_partition_field.data_type() != &DataType::Timestamp(TimeUnit::Millisecond, None)
149+
{
150+
let mut fields = schema
151+
.fields()
152+
.iter()
153+
.filter(|field| field.name() != time_partition)
154+
.cloned()
155+
.collect::<Vec<Arc<Field>>>();
156+
let time_partition_field = Arc::new(Field::new(
157+
time_partition,
158+
DataType::Timestamp(TimeUnit::Millisecond, None),
159+
true,
160+
));
161+
fields.push(time_partition_field);
162+
*schema = Schema::new(fields);
165163
}
166164

167165
Ok(())

src/migration/metadata_migration.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -85,10 +85,10 @@ pub fn v2_v3(mut storage_metadata: JsonValue) -> JsonValue {
8585
if !privileges.is_empty() {
8686
for privilege in privileges.iter_mut() {
8787
let privilege_value = privilege.get_mut("privilege");
88-
if let Some(value) = privilege_value {
89-
if value.as_str().unwrap() == "ingester" {
90-
*value = JsonValue::String("ingestor".to_string());
91-
}
88+
if let Some(value) = privilege_value
89+
&& value.as_str().unwrap() == "ingester"
90+
{
91+
*value = JsonValue::String("ingestor".to_string());
9292
}
9393
}
9494
let role_name =
@@ -124,10 +124,10 @@ pub fn v3_v4(mut storage_metadata: JsonValue) -> JsonValue {
124124
};
125125
for privilege in privileges.iter_mut() {
126126
let privilege_value = privilege.get_mut("privilege");
127-
if let Some(value) = privilege_value {
128-
if value.as_str().unwrap() == "ingester" {
129-
*value = JsonValue::String("ingestor".to_string());
130-
}
127+
if let Some(value) = privilege_value
128+
&& value.as_str().unwrap() == "ingester"
129+
{
130+
*value = JsonValue::String("ingestor".to_string());
131131
}
132132
}
133133
}
@@ -185,10 +185,10 @@ pub fn v5_v6(mut storage_metadata: JsonValue) -> JsonValue {
185185
for (_, role_permissions) in roles.iter_mut() {
186186
if let JsonValue::Array(permissions) = role_permissions {
187187
for permission in permissions.iter_mut() {
188-
if let JsonValue::Object(perm_obj) = permission {
189-
if let Some(JsonValue::Object(resource)) = perm_obj.get_mut("resource") {
190-
resource.remove("tag");
191-
}
188+
if let JsonValue::Object(perm_obj) = permission
189+
&& let Some(JsonValue::Object(resource)) = perm_obj.get_mut("resource")
190+
{
191+
resource.remove("tag");
192192
}
193193
}
194194
}

src/otel/otel_utils.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -169,10 +169,10 @@ pub fn insert_if_some<T: ToString>(map: &mut Map<String, Value>, key: &str, opti
169169
}
170170

171171
pub fn insert_number_if_some(map: &mut Map<String, Value>, key: &str, option: &Option<f64>) {
172-
if let Some(value) = option {
173-
if let Some(number) = serde_json::Number::from_f64(*value) {
174-
map.insert(key.to_string(), Value::Number(number));
175-
}
172+
if let Some(value) = option
173+
&& let Some(number) = serde_json::Number::from_f64(*value)
174+
{
175+
map.insert(key.to_string(), Value::Number(number));
176176
}
177177
}
178178

src/parseable/streams.rs

Lines changed: 25 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -449,16 +449,16 @@ impl Stream {
449449
let mut schemas: Vec<Schema> = Vec::new();
450450

451451
for file in dir.flatten() {
452-
if let Some(ext) = file.path().extension() {
453-
if ext.eq("schema") {
454-
let file = File::open(file.path()).expect("Schema File should exist");
455-
456-
let schema = match serde_json::from_reader(file) {
457-
Ok(schema) => schema,
458-
Err(_) => continue,
459-
};
460-
schemas.push(schema);
461-
}
452+
if let Some(ext) = file.path().extension()
453+
&& ext.eq("schema")
454+
{
455+
let file = File::open(file.path()).expect("Schema File should exist");
456+
457+
let schema = match serde_json::from_reader(file) {
458+
Ok(schema) => schema,
459+
Err(_) => continue,
460+
};
461+
schemas.push(schema);
462462
}
463463
}
464464

@@ -742,26 +742,26 @@ impl Stream {
742742
}
743743

744744
// After deleting the last file, try to remove the inprocess directory if empty
745-
if i == arrow_files.len() - 1 {
746-
if let Some(parent_dir) = file.parent() {
747-
match fs::read_dir(parent_dir) {
748-
Ok(mut entries) => {
749-
if entries.next().is_none() {
750-
if let Err(err) = fs::remove_dir(parent_dir) {
751-
warn!(
752-
"Failed to remove inprocess directory {}: {err}",
753-
parent_dir.display()
754-
);
755-
}
756-
}
757-
}
758-
Err(err) => {
745+
if i == arrow_files.len() - 1
746+
&& let Some(parent_dir) = file.parent()
747+
{
748+
match fs::read_dir(parent_dir) {
749+
Ok(mut entries) => {
750+
if entries.next().is_none()
751+
&& let Err(err) = fs::remove_dir(parent_dir)
752+
{
759753
warn!(
760-
"Failed to read inprocess directory {}: {err}",
754+
"Failed to remove inprocess directory {}: {err}",
761755
parent_dir.display()
762756
);
763757
}
764758
}
759+
Err(err) => {
760+
warn!(
761+
"Failed to read inprocess directory {}: {err}",
762+
parent_dir.display()
763+
);
764+
}
765765
}
766766
}
767767
}

0 commit comments

Comments
 (0)